#!/usr/bin/env python3 """ Lha Virtual filesystem executive for Midnight Commander. Tested against python 3.6, lha[1] 1.14 and mc 4.8.22 [1] http://lha.sourceforge.jp Changelog: 2.0 Switch to python3 1.2 Moved item pattern to extfslib module 1.1 Moved common code into extfslib library 1.0 Initial release Author: Roman 'gryf' Dobosz Date: 2013-05-16 Version: 1.2 Licence: BSD """ import os import sys import re import shutil from subprocess import call, check_call, CalledProcessError from tempfile import mkdtemp, mkstemp from extfslib import Archive, parse_args class ULha(Archive): """Archive handle. Provides interface to MC's extfs subsystem""" LINE_PAT = re.compile(b"^((?P[d-][rswx-]{9})|(\[generic\])|" b"(\[unknown\]))" b"((\s+\d+/\d+\s+)|(\s+))" b"(?P)(?P)" # just for the record b"(?P\d+)" b"\s+(\*{6}|\d+\.\d%)" b"\s(?P[JFMASOND][a-z]{2})\s+" # month b"(?P\d+)\s+" # day b"(?P\d{4}|(\d{2}:\d{2}))" # year/hour b"\s(?P.*)") ARCHIVER = b"lha" CMDS = {"list": b"lq", "read": b"pq", "write": b"aq", "delete": b"dq"} DATETIME = b"%(month)s %(day)s %(yh)s" def _get_dir(self): """Prepare archive file listing""" contents = [] out = self._call_command("list") if not out: return for line in out.split(b"\n"): # -lhd- can store empty directories perms = b"-rw-r--r--" if line.endswith(bytes(os.path.sep, 'utf-8')): line = line[:-1] perms = b"drw-r--r--" match = self.LINE_PAT.match(line) if not match: continue match_entry = match.groupdict() entry = {} for key in match_entry: entry[bytes(key, 'utf-8')] = match_entry[key] del match_entry # UID and GID sometimes can have strange values depending on # the information that was written into archive. Most of the # times I was dealing with Amiga lha archives, so that i don't # really care about real user/group entry[b'uid'] = bytes(str(self._uid), 'utf-8') entry[b'gid'] = bytes(str(self._gid), 'utf-8') entry[b'datetime'] = self.DATETIME % entry if not entry[b'perms']: entry[b'perms'] = perms entry[b'display_name'] = self._map_name(entry[b'fpath']) contents.append(entry) return contents def list(self): """Output contents of the archive to stdout""" for entry in self._contents: sys.stdout.buffer.write(self.ITEM % entry) return 0 def rm(self, dst): """Remove file from archive""" dst = self._get_real_name(dst) # deleting with quiet option enabled will output nothing, so we get # empty string here or None in case of error. Not smart. if self._call_command('delete', dst=dst) is None: return 1 return 0 def rmdir(self, dst): """Remove empty directory""" dst = self._get_real_name(dst) if not dst.endswith(bytes(os.path.sep, 'utf-8')): dst += bytes(os.path.sep, 'utf-8') if self._call_command('delete', dst=dst) is None: return 1 return 0 def run(self, dst): """Execute file out of archive""" fdesc, tmp_file = mkstemp() os.close(fdesc) result = 0 if self.copyout(dst, tmp_file) != 0: result = 1 os.chmod(tmp_file, int("700", 8)) try: result = call([tmp_file]) finally: try: os.unlink(tmp_file) except OSError: pass return result def mkdir(self, dst): """Create empty directory in archive""" return self.copyin(dst) def copyin(self, dst, src=None): """Copy file to the archive or create direcotry inside. If src is empty, create empty directory with dst name.""" current_dir = os.path.abspath(os.curdir) tmpdir = mkdtemp() arch_abspath = os.path.realpath(self._arch) os.chdir(tmpdir) if src: os.makedirs(os.path.dirname(dst)) shutil.copy2(src, dst) else: os.makedirs(dst) try: result = check_call([self.ARCHIVER.decode('utf-8'), self.CMDS["write"].decode('utf-8'), arch_abspath, dst]) except CalledProcessError: return 1 finally: os.chdir(current_dir) shutil.rmtree(tmpdir) return result def copyout(self, src, dst): """Copy file out form archive.""" src = self._get_real_name(src) fobj = open(dst, "wb") try: result = check_call([self.ARCHIVER, self.CMDS['read'], self._arch, src], stdout=fobj) except CalledProcessError: return 1 finally: fobj.close() return result if __name__ == "__main__": sys.exit(parse_args(ULha))