174 lines
5.0 KiB
Python
Executable File
174 lines
5.0 KiB
Python
Executable File
#! /usr/bin/env python
|
|
"""
|
|
Lha Virtual filesystem executive for Midnight Commander.
|
|
|
|
Tested against python 2.7, lha[1] 1.14 and mc 4.8.7
|
|
|
|
[1] http://lha.sourceforge.jp
|
|
|
|
Changelog:
|
|
1.2 Moved item pattern to extfslib module
|
|
1.1 Moved common code into extfslib library
|
|
1.0 Initial release
|
|
|
|
Author: Roman 'gryf' Dobosz <gryf73@gmail.com>
|
|
Date: 2013-05-16
|
|
Version: 1.2
|
|
Licence: BSD
|
|
"""
|
|
import os
|
|
import sys
|
|
import re
|
|
import shutil
|
|
from subprocess import call, check_call, CalledProcessError
|
|
from tempfile import mkdtemp, mkstemp
|
|
|
|
from extfslib import Archive, parse_args
|
|
|
|
|
|
class ULha(Archive):
|
|
"""Archive handle. Provides interface to MC's extfs subsystem"""
|
|
|
|
LINE_PAT = re.compile("^((?P<perms>[d-][rswx-]{9})|(\[generic\])|"
|
|
"(\[unknown\]))"
|
|
"((\s+\d+/\d+\s+)|(\s+))"
|
|
"(?P<uid>)(?P<gid>)" # just for the record
|
|
"(?P<size>\d+)"
|
|
"\s+(\*{6}|\d+\.\d%)"
|
|
"\s(?P<month>[JFMASOND][a-z]{2})\s+" # month
|
|
"(?P<day>\d+)\s+" # day
|
|
"(?P<yh>\d{4}|(\d{2}:\d{2}))" # year/hour
|
|
"\s(?P<fpath>.*)")
|
|
ARCHIVER = "lha"
|
|
CMDS = {"list": "lq",
|
|
"read": "pq",
|
|
"write": "aq",
|
|
"delete": "dq"}
|
|
DATETIME = "%(month)s %(day)s %(yh)s"
|
|
|
|
def _get_dir(self):
|
|
"""Prepare archive file listing"""
|
|
contents = []
|
|
|
|
out = self._call_command("list")
|
|
if not out:
|
|
return
|
|
|
|
for line in out.split("\n"):
|
|
# -lhd- can store empty directories
|
|
perms = "-rw-r--r--"
|
|
if line.endswith(os.path.sep):
|
|
line = line[:-1]
|
|
perms = "drw-r--r--"
|
|
|
|
match = self.LINE_PAT.match(line)
|
|
if not match:
|
|
continue
|
|
|
|
entry = match.groupdict()
|
|
# UID and GID sometimes can have strange values depending on
|
|
# the information that was written into archive. Most of the
|
|
# times I was dealing with Amiga lha archives, so that i don't
|
|
# really care about real user/group
|
|
entry['uid'] = self._uid
|
|
entry['gid'] = self._gid
|
|
entry['datetime'] = self.DATETIME % entry
|
|
|
|
if not entry['perms']:
|
|
entry['perms'] = perms
|
|
|
|
entry['display_name'] = self._map_name(entry['fpath'])
|
|
contents.append(entry)
|
|
return contents
|
|
|
|
def list(self):
|
|
"""Output contents of the archive to stdout"""
|
|
for entry in self._contents:
|
|
sys.stdout.write(self.ITEM % entry)
|
|
return 0
|
|
|
|
def rm(self, dst):
|
|
"""Remove file from archive"""
|
|
dst = self._get_real_name(dst)
|
|
# deleting with quiet option enabled will output nothing, so we get
|
|
# empty string here or None in case of error. Not smart.
|
|
if self._call_command('delete', dst=dst) is None:
|
|
return 1
|
|
return 0
|
|
|
|
def rmdir(self, dst):
|
|
"""Remove empty directory"""
|
|
dst = self._get_real_name(dst)
|
|
if not dst.endswith(os.path.sep):
|
|
dst += os.path.sep
|
|
|
|
if self._call_command('delete', dst=dst) is None:
|
|
return 1
|
|
return 0
|
|
|
|
def run(self, dst):
|
|
"""Execute file out of archive"""
|
|
fdesc, tmp_file = mkstemp()
|
|
os.close(fdesc)
|
|
result = 0
|
|
|
|
if self.copyout(dst, tmp_file) != 0:
|
|
result = 1
|
|
|
|
os.chmod(tmp_file, int("700", 8))
|
|
|
|
try:
|
|
result = call([tmp_file])
|
|
finally:
|
|
try:
|
|
os.unlink(tmp_file)
|
|
except OSError:
|
|
pass
|
|
|
|
return result
|
|
|
|
def mkdir(self, dst):
|
|
"""Create empty directory in archive"""
|
|
return self.copyin(dst)
|
|
|
|
def copyin(self, dst, src=None):
|
|
"""Copy file to the archive or create direcotry inside.
|
|
If src is empty, create empty directory with dst name."""
|
|
current_dir = os.path.abspath(os.curdir)
|
|
|
|
tmpdir = mkdtemp()
|
|
arch_abspath = os.path.realpath(self._arch)
|
|
os.chdir(tmpdir)
|
|
if src:
|
|
os.makedirs(os.path.dirname(dst))
|
|
os.link(src, dst)
|
|
else:
|
|
os.makedirs(dst)
|
|
|
|
try:
|
|
result = check_call([self.ARCHIVER, self.CMDS["write"],
|
|
arch_abspath, dst])
|
|
except CalledProcessError:
|
|
return 1
|
|
finally:
|
|
os.chdir(current_dir)
|
|
shutil.rmtree(tmpdir)
|
|
return result
|
|
|
|
def copyout(self, src, dst):
|
|
"""Copy file out form archive."""
|
|
src = self._get_real_name(src)
|
|
fobj = open(dst, "wb")
|
|
try:
|
|
result = check_call([self.ARCHIVER, self.CMDS['read'], self._arch,
|
|
src], stdout=fobj)
|
|
except CalledProcessError:
|
|
return 1
|
|
finally:
|
|
fobj.close()
|
|
return result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(parse_args(ULha))
|