188 lines
5.9 KiB
Python
Executable File
188 lines
5.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
UADF Virtual filesystem
|
|
|
|
This extfs provides quick and dirty read-only access to disk image files for
|
|
the Commodore Amiga adf or adz (gzipped adfs) and dms.
|
|
|
|
It requires the unadf utility, unfortunately there is no original sources,
|
|
since authors page doesn't exists anymore. Luckily, there is a copy of the
|
|
source (and useful patches) in Debian repository:
|
|
http://packages.debian.org/sid/unadf
|
|
|
|
There should be one change made to the source of unadf, though. While using
|
|
"-lr" switch it by default also displays comments, separated by the comma.
|
|
However there is no way to distinguish where filename ends and comment starts,
|
|
if comment or filename already contains any comma.
|
|
|
|
The patched sources are available from: https://github.com/lclevy/ADFlib
|
|
|
|
It also requires xdms utility, for optional dms support.
|
|
|
|
Changelog:
|
|
1.3 Switch to Python3
|
|
1.2 Added failsafe for filenames in archive with spaces and nodos message.
|
|
1.1 Moved common code into extfslib library
|
|
1.0 Initial release
|
|
|
|
Author: Roman 'gryf' Dobosz <gryf73@gmail.com>
|
|
Date: 2019-06-30
|
|
Version: 1.3
|
|
Licence: BSD
|
|
"""
|
|
|
|
import sys
|
|
import re
|
|
import os
|
|
import gzip
|
|
from subprocess import check_output, check_call, CalledProcessError
|
|
from tempfile import mkstemp, mkdtemp
|
|
import shutil
|
|
|
|
from extfslib import Archive, parse_args
|
|
|
|
|
|
class UAdf(Archive):
|
|
"""
|
|
Class for interact with c1541 program and MC
|
|
"""
|
|
LINE_PAT = re.compile(b'\s*(?P<size>\d+)?'
|
|
b'\s{2}(?P<date>\d{4}/\d{2}/\d{2})'
|
|
b'\s{2}\s?(?P<time>\d+:\d{2}:\d{2})'
|
|
b'\s{2}(?P<fpath>.*)')
|
|
ARCHIVER = b"unadf"
|
|
DMS = b"xdms"
|
|
CMDS = {"list": b"-lr",
|
|
"read": b"r",
|
|
"write": b"w",
|
|
"delete": b"d"}
|
|
DATETIME = b"%s-%s-%s %02d:%s"
|
|
|
|
def __init__(self, fname):
|
|
"""Prepare archive content for operations"""
|
|
self._clean = True
|
|
self._arch = fname
|
|
|
|
if fname.lower().endswith(".adz"):
|
|
self._ungzip()
|
|
|
|
if fname.lower().endswith(".dms"):
|
|
self._undms()
|
|
|
|
super(UAdf, self).__init__(self._arch)
|
|
|
|
def __del__(self):
|
|
"""Cleanup"""
|
|
if not self._clean:
|
|
try:
|
|
os.unlink(self._arch)
|
|
except OSError:
|
|
pass
|
|
|
|
def _parse_dt(self, date, time):
|
|
"""Return parsed datetime which fulfill extfs standards date."""
|
|
year, month, day = date.split(b"/")
|
|
hours, minutes, _unused = time.split(b":")
|
|
return self.DATETIME % (month, day, year, int(hours), minutes)
|
|
|
|
def _ungzip(self):
|
|
"""Create temporary file for ungzipped adf file since unadf does not
|
|
accept gzipped content in any way including reading from stdin."""
|
|
fdesc, tmp_fname = mkstemp(suffix=".adf")
|
|
os.close(fdesc)
|
|
|
|
with gzip.open(self._arch) as gobj:
|
|
with open(tmp_fname, "wb") as fobj:
|
|
fobj.write(gobj.read())
|
|
self._arch = tmp_fname
|
|
self._clean = False
|
|
|
|
def _undms(self):
|
|
"""Create temporary adf file extracted from dms."""
|
|
fdesc, tmp_fname = mkstemp(suffix=".adf")
|
|
os.close(fdesc)
|
|
|
|
try:
|
|
check_call([self.DMS, b'-q', b'u', self._arch, "+" + tmp_fname])
|
|
self._arch = tmp_fname
|
|
self._clean = False
|
|
except (CalledProcessError, OSError):
|
|
pass
|
|
|
|
def _get_dir(self):
|
|
"""Retrieve directory"""
|
|
contents = []
|
|
with open(os.devnull, "w") as fnull:
|
|
try:
|
|
out = check_output([self.ARCHIVER, self.CMDS['list'],
|
|
self._arch], stderr=fnull)
|
|
except CalledProcessError:
|
|
return contents
|
|
|
|
for line in out.split(b"\n"):
|
|
match = self.LINE_PAT.match(line)
|
|
if not match:
|
|
continue
|
|
|
|
match_entry = match.groupdict()
|
|
entry = {}
|
|
for key in match_entry:
|
|
entry[bytes(key, 'utf-8')] = match_entry[key]
|
|
del match_entry
|
|
|
|
entry[b'perms'] = b"-rw-r--r--"
|
|
if not entry[b'size']:
|
|
entry[b'perms'] = b"drwxr-xr-x"
|
|
entry[b'size'] = b"0"
|
|
entry[b'display_name'] = self._map_name(entry[b'fpath'])
|
|
entry[b'datetime'] = self._parse_dt(entry[b'date'], entry[b'time'])
|
|
entry[b'uid'] = bytes(str(self._uid), 'utf-8')
|
|
entry[b'gid'] = bytes(str(self._gid), 'utf-8')
|
|
contents.append(entry)
|
|
|
|
return contents
|
|
|
|
def list(self):
|
|
"""
|
|
Output list contents of adf image.
|
|
Convert filenames to be Unix filesystem friendly
|
|
Add suffix to show user what kind of file do he dealing with.
|
|
"""
|
|
if not self._contents:
|
|
sys.stderr.write("Nodos or archive error\n")
|
|
return 1
|
|
|
|
for entry in self._contents:
|
|
sys.stdout.buffer.write(self.ITEM % entry)
|
|
return 0
|
|
|
|
def copyout(self, src, dst):
|
|
"""Copy file form the adf image."""
|
|
real_src = self._get_real_name(src)
|
|
if not real_src:
|
|
raise IOError("No such file or directory")
|
|
|
|
if b" " in real_src:
|
|
sys.stderr.write("unadf is unable to operate on filepath with "
|
|
"space inside.\nUse affs to mount image and than"
|
|
" extract desired files.\n")
|
|
return 1
|
|
|
|
extract_dir = mkdtemp()
|
|
cmd = [self.ARCHIVER, self._arch, real_src, b"-d", extract_dir]
|
|
if check_call(cmd, stdout=open(os.devnull, 'wb'),
|
|
stderr=open(os.devnull, 'wb')) != 0:
|
|
shutil.rmtree(extract_dir)
|
|
sys.stderr.write("unadf returned with nonzero exit code\n")
|
|
return 1
|
|
|
|
shutil.move(os.path.join(bytes(extract_dir, "utf8"), real_src),
|
|
bytes(dst, "utf8"))
|
|
shutil.rmtree(extract_dir)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(parse_args(UAdf))
|