commit ac964193f68f2e34434ce24f58100e224eff2da6 Author: gryf Date: Sun May 5 21:33:46 2013 +0200 Initial import diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..87e3ff8 --- /dev/null +++ b/README.rst @@ -0,0 +1,16 @@ +================================= +ulha extfs for Midnight Commander +================================= + +This is Midnight Commander extfs plugin for handling lha/lzh archives. +It requires `lha `_ free LHA implementation to work. + +Installation +------------ +* copy ``ulha.py`` to ``~/.local/share/mc/extfs/ulha`` +* add or change entry for lha/lzh files handle in ``~/.config/mc/mc.ext``:: + + # lha + regex/\.[lL]([Hh][aA]|[Zz][hH])$ + Open=%cd %p/ulha:// + View=%view{ascii} lha l %f diff --git a/ulha b/ulha new file mode 100755 index 0000000..1e6930d --- /dev/null +++ b/ulha @@ -0,0 +1,329 @@ +#! /usr/bin/env python +""" +Lha Virtual filesystem executive for Midnight Commander. + +Tested against python 2.7, lha[1] 1.14 and mc 4.8.7 + +[1] http://lha.sourceforge.jp + +Changelog: + 1.0 Initial release + +Author: Roman 'gryf' Dobosz +Date: 2013-05-05 +Version: 1.0 +Licence: BSD +""" +import os +import sys +import re +import shutil +from subprocess import call, check_call, check_output, CalledProcessError +from tempfile import mkdtemp, mkstemp + + +# Define which archiver you are using with appropriate options +ARCHIVER = "lha" +CMDS = {"list": "lq", + "read": "pq", + "write": "aq", + "delete": "dq"} + +LINE_LHD = re.compile("^(?P[d-][rswx-]{9})" + "\s+(?P\d+)/" + "(?P\d+)" + "\s+(?P\d+)" + "\s+(\*{6}|\d+\.\d%)" + "\s(?P[JFMASOND][a-z]{2})\s+" # month + "(?P\d+)\s+" # day + "(?P\d{4}|(\d{2}:\d{2}))" # year/hour + "\s(?P.*)") + +LINE_LHx = re.compile("^(?P(\[generic\])|(\[unknown\])|([d-][rswx-]{9}))" + "\s+(?P\d+)" + "\s+(\*{6}|\d+\.\d%)" + "\s(?P[JFMASOND][a-z]{2})\s+" # month + "(?P\d+)\s+" # day + "(?P\d{4}|(\d{2}:\d{2}))" # year/hour + "\s(?P.*)") + + +class Archive(object): + """Archive handle. Provides interface to MC's extfs subsystem""" + def __init__(self, fname): + """Prepare archive content for operations""" + self._filemap = {} + self._arch = fname + self._pattern = None + self._uid = str(os.getuid()) + self._gid = str(os.getgid()) + + self._contents = self._get_dir() + + def _identify(self): + """Check for lha header""" + pat_map = {"-lhd-": LINE_LHD, + "-lh0-": LINE_LHx, + "-lh1-": LINE_LHx, + "-lh5-": LINE_LHx, + "-lh6-": LINE_LHx} + fobj = open(self._arch) + fobj.seek(2) + ident = fobj.read(5) + fobj.close() + return pat_map[ident] + + def _map_name(self, name): + """MC still have a bug in extfs subsystem, in case of filepaths with + leading space. This is workaround to this bug, which replaces leading + space with tilda. Real name is remembered in _filemap attribute and + used in real operations.""" + if name.startswith(" "): + new_name = "".join(["~", name[1:]]) + self._filemap[new_name] = name + return new_name + return name + + def _get_real_name(self, name): + """Get real filepath of the file. See _map_name docstring for + details.""" + new_name = self._filemap.get(name) + if new_name: + return new_name + return name + + def _get_dir(self): + """Prepare archive file listing""" + if not self._pattern: + self._pattern = self._identify() + + self._filemap = {} + contents = [] + + if self._pattern == LINE_LHx: + perms = "-rw-r--r--" + + out = self._call_command("list") + if not out: + return + + for line in out.split("\n"): + if line.endswith("/"): + line = line[:-1] + if self._pattern == LINE_LHx: + perms = "drw-r--r--" + + match = self._pattern.match(line) + if not match: + continue + + entry = match.groupdict() + # UID and GID sometimes can have strange values depending on + # the information that was written into archive. Most of the + # times I was dealing with Amiga lha archives, so that i don't + # really care about real user/group + entry['uid'] = self._uid + entry['gid'] = self._gid + + if self._pattern == LINE_LHx: + entry['perms'] = perms + + entry['display_name'] = self._map_name(entry['fpath']) + contents.append(entry) + return contents + + def _call_command(self, cmd, src=None, dst=None): + """ + Return status of the provided command, which can be one of: + write + read + delete + list + """ + command = [ARCHIVER, CMDS.get(cmd), self._arch] + + if src and dst: + command.append(src) + command.append(dst) + elif src or dst: + command.append(src and src or dst) + + try: + output = check_output(command) + except CalledProcessError: + return None + return output + + + def list(self): + """Output contents of the archive to stdout""" + for entry in self._contents: + sys.stdout.write("%(perms)s 1 %(uid)-8s %(gid)-8s %(size)8s " + "%(month)s %(day)s %(yh)s %(display_name)s\n" % + entry) + return 0 + + def rm(self, dst): + """Remove file from archive""" + dst = self._get_real_name(dst) + # deleting with quiet option enabled will output nothing, so we get + # empty string here or None in case of error. Not smart. + if self._call_command('delete', dst=dst) is None: + return 1 + return 0 + + def run(self, dst): + """Execute file out of archive""" + fdesc, tmp_file = mkstemp() + os.close(fdesc) + result = 0 + + if self.copyout(dst, tmp_file) != 0: + result = 1 + + os.chmod(tmp_file, int("700", 8)) + + try: + result = call([tmp_file]) + finally: + try: + os.unlink(tmp_file) + except OSError: + pass + + return result + + def mkdir(self, dst): + """Create empty directory in archive""" + return self.copyin(dst) + + def copyin(self, dst, src=None): + """Copy file to the archive or create direcotry inside. + If src is empty, create empty directory with dst name.""" + current_dir = os.path.abspath(os.curdir) + + tmpdir = mkdtemp() + arch_abspath = os.path.realpath(self._arch) + os.chdir(tmpdir) + if src: + os.makedirs(os.path.dirname(dst)) + os.link(src, dst) + else: + os.makedirs(dst) + + try: + result = check_call([ARCHIVER, CMDS["write"], arch_abspath, dst]) + except CalledProcessError: + return 1 + finally: + os.chdir(current_dir) + shutil.rmtree(tmpdir) + return result + + def copyout(self, src, dst): + """Copy file out form archive.""" + src = self._get_real_name(src) + fobj = open(dst, "wb") + try: + result = check_call([ARCHIVER, CMDS['read'], self._arch, src], + stdout=fobj) + except CalledProcessError: + return 1 + finally: + fobj.close() + return result + +CALL_MAP = {'list': lambda a: Archive(a.arch).list(), + 'copyin': lambda a: Archive(a.arch).copyin(a.src, a.dst), + 'copyout': lambda a: Archive(a.arch).copyout(a.src, a.dst), + 'mkdir': lambda a: Archive(a.arch).mkdir(a.dst), + 'rm': lambda a: Archive(a.arch).rm(a.dst), + 'run': lambda a: Archive(a.arch).run(a.dst)} + + +def parse_args(): + """Use ArgumentParser to check for script arguments and execute.""" + parser = ArgumentParser() + subparsers = parser.add_subparsers(help='supported commands') + parser_list = subparsers.add_parser('list', help="List contents of " + "archive") + parser_copyin = subparsers.add_parser('copyin', help="Copy file into " + "archive") + parser_copyout = subparsers.add_parser('copyout', help="Copy file out of " + "archive") + parser_rm = subparsers.add_parser('rm', help="Delete file in archive") + parser_mkdir = subparsers.add_parser('mkdir', help="Create directory in " + "archive") + parser_run = subparsers.add_parser('run', help="Execute archived file") + + parser_list.add_argument('arch', help="archive filename") + parser_list.set_defaults(func=CALL_MAP['list']) + + parser_copyin.add_argument('arch', help="archive filename") + parser_copyin.add_argument('src', help="source filename") + parser_copyin.add_argument('dst', help="destination filename (to be " + "written into archive)") + parser_copyin.set_defaults(func=CALL_MAP['copyin']) + + parser_copyout.add_argument('arch', help="archive filename") + parser_copyout.add_argument('src', help="source filename (to be read from" + " archive") + parser_copyout.add_argument('dst', help="destination filename") + parser_copyout.set_defaults(func=CALL_MAP['copyout']) + + parser_rm.add_argument('arch', help="archive filename") + parser_rm.add_argument('dst', help="File inside archive to be deleted") + parser_rm.set_defaults(func=CALL_MAP['rm']) + + parser_mkdir.add_argument('arch', help="archive filename") + parser_mkdir.add_argument('dst', help="Directory name inside archive to " + "be created") + parser_mkdir.set_defaults(func=CALL_MAP['mkdir']) + + parser_run.add_argument('arch', help="archive filename") + parser_run.add_argument('dst', help="File to be executed") + parser_run.set_defaults(func=CALL_MAP['run']) + + args = parser.parse_args() + return args.func(args) + + +def no_parse(): + """Failsafe argument "parsing". Note, that it blindly takes positional + arguments without checking them. In case of wrong arguments it will + silently exit""" + try: + if sys.argv[1] not in ('list', 'copyin', 'copyout', 'rm', 'mkdir', + "run"): + sys.exit(2) + except IndexError: + sys.exit(2) + + class Arg(object): + """Mimic argparse object""" + dst = None + src = None + arch = None + + arg = Arg() + + try: + arg.arch = sys.argv[2] + if sys.argv[1] in ('copyin', 'copyout'): + arg.src = sys.argv[3] + arg.dst = sys.argv[4] + elif sys.argv[1] in ('rm', 'run', 'mkdir'): + arg.dst = sys.argv[3] + except IndexError: + sys.exit(2) + + return CALL_MAP[sys.argv[1]](arg) + +if __name__ == "__main__": + try: + from argparse import ArgumentParser + PARSE_FUNC = parse_args + except ImportError: + PARSE_FUNC = no_parse + + sys.exit(PARSE_FUNC())