#! /usr/bin/env python """ Lha Virtual filesystem executive for Midnight Commander. Tested against python 2.7, lha[1] 1.14 and mc 4.8.7 [1] http://lha.sourceforge.jp Changelog: 1.0 Initial release Author: Roman 'gryf' Dobosz Date: 2013-05-05 Version: 1.0 Licence: BSD """ import os import sys import re import shutil from subprocess import call, check_call, check_output, CalledProcessError from tempfile import mkdtemp, mkstemp # Define which archiver you are using with appropriate options ARCHIVER = "lha" CMDS = {"list": "lq", "read": "pq", "write": "aq", "delete": "dq"} LINE_LHD = re.compile("^(?P[d-][rswx-]{9})" "\s+(?P\d+)/" "(?P\d+)" "\s+(?P\d+)" "\s+(\*{6}|\d+\.\d%)" "\s(?P[JFMASOND][a-z]{2})\s+" # month "(?P\d+)\s+" # day "(?P\d{4}|(\d{2}:\d{2}))" # year/hour "\s(?P.*)") LINE_LHx = re.compile("^(?P(\[generic\])|(\[unknown\])|([d-][rswx-]{9}))" "\s+(?P\d+)" "\s+(\*{6}|\d+\.\d%)" "\s(?P[JFMASOND][a-z]{2})\s+" # month "(?P\d+)\s+" # day "(?P\d{4}|(\d{2}:\d{2}))" # year/hour "\s(?P.*)") class Archive(object): """Archive handle. Provides interface to MC's extfs subsystem""" def __init__(self, fname): """Prepare archive content for operations""" self._filemap = {} self._arch = fname self._pattern = None self._uid = str(os.getuid()) self._gid = str(os.getgid()) self._contents = self._get_dir() def _identify(self): """Check for lha header""" pat_map = {"-lhd-": LINE_LHD, "-lh0-": LINE_LHx, "-lh1-": LINE_LHx, "-lh5-": LINE_LHx, "-lh6-": LINE_LHx} fobj = open(self._arch) fobj.seek(2) ident = fobj.read(5) fobj.close() return pat_map[ident] def _map_name(self, name): """MC still have a bug in extfs subsystem, in case of filepaths with leading space. This is workaround to this bug, which replaces leading space with tilda. Real name is remembered in _filemap attribute and used in real operations.""" if name.startswith(" "): new_name = "".join(["~", name[1:]]) self._filemap[new_name] = name return new_name return name def _get_real_name(self, name): """Get real filepath of the file. See _map_name docstring for details.""" new_name = self._filemap.get(name) if new_name: return new_name return name def _get_dir(self): """Prepare archive file listing""" if not self._pattern: self._pattern = self._identify() self._filemap = {} contents = [] if self._pattern == LINE_LHx: perms = "-rw-r--r--" out = self._call_command("list") if not out: return for line in out.split("\n"): if line.endswith("/"): line = line[:-1] if self._pattern == LINE_LHx: perms = "drw-r--r--" match = self._pattern.match(line) if not match: continue entry = match.groupdict() # UID and GID sometimes can have strange values depending on # the information that was written into archive. Most of the # times I was dealing with Amiga lha archives, so that i don't # really care about real user/group entry['uid'] = self._uid entry['gid'] = self._gid if self._pattern == LINE_LHx: entry['perms'] = perms entry['display_name'] = self._map_name(entry['fpath']) contents.append(entry) return contents def _call_command(self, cmd, src=None, dst=None): """ Return status of the provided command, which can be one of: write read delete list """ command = [ARCHIVER, CMDS.get(cmd), self._arch] if src and dst: command.append(src) command.append(dst) elif src or dst: command.append(src and src or dst) try: output = check_output(command) except CalledProcessError: return None return output def list(self): """Output contents of the archive to stdout""" for entry in self._contents: sys.stdout.write("%(perms)s 1 %(uid)-8s %(gid)-8s %(size)8s " "%(month)s %(day)s %(yh)s %(display_name)s\n" % entry) return 0 def rm(self, dst): """Remove file from archive""" dst = self._get_real_name(dst) # deleting with quiet option enabled will output nothing, so we get # empty string here or None in case of error. Not smart. if self._call_command('delete', dst=dst) is None: return 1 return 0 def run(self, dst): """Execute file out of archive""" fdesc, tmp_file = mkstemp() os.close(fdesc) result = 0 if self.copyout(dst, tmp_file) != 0: result = 1 os.chmod(tmp_file, int("700", 8)) try: result = call([tmp_file]) finally: try: os.unlink(tmp_file) except OSError: pass return result def mkdir(self, dst): """Create empty directory in archive""" return self.copyin(dst) def copyin(self, dst, src=None): """Copy file to the archive or create direcotry inside. If src is empty, create empty directory with dst name.""" current_dir = os.path.abspath(os.curdir) tmpdir = mkdtemp() arch_abspath = os.path.realpath(self._arch) os.chdir(tmpdir) if src: os.makedirs(os.path.dirname(dst)) os.link(src, dst) else: os.makedirs(dst) try: result = check_call([ARCHIVER, CMDS["write"], arch_abspath, dst]) except CalledProcessError: return 1 finally: os.chdir(current_dir) shutil.rmtree(tmpdir) return result def copyout(self, src, dst): """Copy file out form archive.""" src = self._get_real_name(src) fobj = open(dst, "wb") try: result = check_call([ARCHIVER, CMDS['read'], self._arch, src], stdout=fobj) except CalledProcessError: return 1 finally: fobj.close() return result CALL_MAP = {'list': lambda a: Archive(a.arch).list(), 'copyin': lambda a: Archive(a.arch).copyin(a.src, a.dst), 'copyout': lambda a: Archive(a.arch).copyout(a.src, a.dst), 'mkdir': lambda a: Archive(a.arch).mkdir(a.dst), 'rm': lambda a: Archive(a.arch).rm(a.dst), 'run': lambda a: Archive(a.arch).run(a.dst)} def parse_args(): """Use ArgumentParser to check for script arguments and execute.""" parser = ArgumentParser() subparsers = parser.add_subparsers(help='supported commands') parser_list = subparsers.add_parser('list', help="List contents of " "archive") parser_copyin = subparsers.add_parser('copyin', help="Copy file into " "archive") parser_copyout = subparsers.add_parser('copyout', help="Copy file out of " "archive") parser_rm = subparsers.add_parser('rm', help="Delete file in archive") parser_mkdir = subparsers.add_parser('mkdir', help="Create directory in " "archive") parser_run = subparsers.add_parser('run', help="Execute archived file") parser_list.add_argument('arch', help="archive filename") parser_list.set_defaults(func=CALL_MAP['list']) parser_copyin.add_argument('arch', help="archive filename") parser_copyin.add_argument('src', help="source filename") parser_copyin.add_argument('dst', help="destination filename (to be " "written into archive)") parser_copyin.set_defaults(func=CALL_MAP['copyin']) parser_copyout.add_argument('arch', help="archive filename") parser_copyout.add_argument('src', help="source filename (to be read from" " archive") parser_copyout.add_argument('dst', help="destination filename") parser_copyout.set_defaults(func=CALL_MAP['copyout']) parser_rm.add_argument('arch', help="archive filename") parser_rm.add_argument('dst', help="File inside archive to be deleted") parser_rm.set_defaults(func=CALL_MAP['rm']) parser_mkdir.add_argument('arch', help="archive filename") parser_mkdir.add_argument('dst', help="Directory name inside archive to " "be created") parser_mkdir.set_defaults(func=CALL_MAP['mkdir']) parser_run.add_argument('arch', help="archive filename") parser_run.add_argument('dst', help="File to be executed") parser_run.set_defaults(func=CALL_MAP['run']) args = parser.parse_args() return args.func(args) def no_parse(): """Failsafe argument "parsing". Note, that it blindly takes positional arguments without checking them. In case of wrong arguments it will silently exit""" try: if sys.argv[1] not in ('list', 'copyin', 'copyout', 'rm', 'mkdir', "run"): sys.exit(2) except IndexError: sys.exit(2) class Arg(object): """Mimic argparse object""" dst = None src = None arch = None arg = Arg() try: arg.arch = sys.argv[2] if sys.argv[1] in ('copyin', 'copyout'): arg.src = sys.argv[3] arg.dst = sys.argv[4] elif sys.argv[1] in ('rm', 'run', 'mkdir'): arg.dst = sys.argv[3] except IndexError: sys.exit(2) return CALL_MAP[sys.argv[1]](arg) if __name__ == "__main__": try: from argparse import ArgumentParser PARSE_FUNC = parse_args except ImportError: PARSE_FUNC = no_parse sys.exit(PARSE_FUNC())