Unadf does return 0 exit code always, even, if there was an issue in either parsing image, mounting it or on any other issue. To be worst, it always throw an banner to the stderr, so it cannot be used to check if there was an error either. As a workaround, let's check if there is a banner, strip it out and then see if anything extra is on the stderr, so that it can be used to see if there was any issues.
214 lines
6.6 KiB
Python
Executable File
214 lines
6.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
UADF Virtual filesystem
|
|
|
|
This extfs provides quick and dirty read-only access to disk image files for
|
|
the Commodore Amiga adf or adz (gzipped adfs) and dms.
|
|
|
|
It requires the unadf utility, unfortunately there is no original sources,
|
|
since authors page doesn't exists anymore. Luckily, there is a copy of the
|
|
source (and useful patches) in Debian repository:
|
|
http://packages.debian.org/sid/unadf
|
|
|
|
There should be one change made to the source of unadf, though. While using
|
|
"-lr" switch it by default also displays comments, separated by the comma.
|
|
However there is no way to distinguish where filename ends and comment starts,
|
|
if comment or filename already contains any comma.
|
|
|
|
The patched sources are available from: https://github.com/lclevy/ADFlib
|
|
|
|
It also requires xdms utility, for optional dms support.
|
|
|
|
Changelog:
|
|
1.4 Adapt to unadf 1.2 and use Latin1 as default encoding
|
|
1.3 Switch to Python3
|
|
1.2 Added failsafe for filenames in archive with spaces and nodos message.
|
|
1.1 Moved common code into extfslib library
|
|
1.0 Initial release
|
|
|
|
Author: Roman 'gryf' Dobosz <gryf73@gmail.com>
|
|
Date: 2023-10-16
|
|
Version: 1.4
|
|
Licence: BSD
|
|
"""
|
|
|
|
import gzip
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
import extfslib
|
|
|
|
|
|
BANNER_PAT = re.compile(r'unADF v\d.\d : a unzip like for .ADF files, '
|
|
r'powered by ADFlib (.*)\n\n')
|
|
|
|
|
|
class UAdf(extfslib.Archive):
|
|
"""
|
|
Class for interact with c1541 program and MC
|
|
"""
|
|
LINE_PAT = re.compile(r'\s*(?P<size>\d+)?'
|
|
r'\s{2}(?P<date>\d{4}/\d{2}/\d{2})'
|
|
r'\s{2}\s?(?P<time>\d+:\d{2}:\d{2})'
|
|
r'\s{2}(?P<fpath>.*)')
|
|
ARCHIVER = "unadf"
|
|
DMS = "xdms"
|
|
CMDS = {"list": "-lr",
|
|
"read": "r",
|
|
"write": "w",
|
|
"delete": "d"}
|
|
DATETIME = "%s-%s-%s %02d:%s"
|
|
|
|
def __init__(self, fname):
|
|
"""Prepare archive content for operations"""
|
|
self._clean = True
|
|
self._arch = fname
|
|
|
|
if fname.lower().endswith(".adz"):
|
|
self._ungzip()
|
|
|
|
if fname.lower().endswith(".dms"):
|
|
self._undms()
|
|
|
|
super(UAdf, self).__init__(self._arch)
|
|
|
|
def __del__(self):
|
|
"""Cleanup"""
|
|
if not self._clean:
|
|
try:
|
|
os.unlink(self._arch)
|
|
except OSError:
|
|
pass
|
|
|
|
def _parse_dt(self, date, time):
|
|
"""Return parsed datetime which fulfill extfs standards date."""
|
|
year, month, day = date.split("/")
|
|
hours, minutes, _unused = time.split(":")
|
|
return self.DATETIME % (month, day, year, int(hours), minutes)
|
|
|
|
def _ungzip(self):
|
|
"""Create temporary file for ungzipped adf file since unadf does not
|
|
accept gzipped content in any way including reading from stdin."""
|
|
fdesc, tmp_fname = tempfile.mkstemp(suffix=".adf")
|
|
os.close(fdesc)
|
|
|
|
with gzip.open(self._arch) as gobj:
|
|
with open(tmp_fname, "wb") as fobj:
|
|
fobj.write(gobj.read())
|
|
self._arch = tmp_fname
|
|
self._clean = False
|
|
|
|
def _undms(self):
|
|
"""Create temporary adf file extracted from dms."""
|
|
fdesc, tmp_fname = tempfile.mkstemp(suffix=".adf")
|
|
os.close(fdesc)
|
|
|
|
try:
|
|
subprocess.check_call([self.DMS, '-q', 'u', self._arch,
|
|
"+" + tmp_fname])
|
|
self._arch = tmp_fname
|
|
self._clean = False
|
|
except (subprocess.CalledProcessError, OSError):
|
|
pass
|
|
|
|
def _parse_banner(self, string):
|
|
match = BANNER_PAT.match(string)
|
|
if not match:
|
|
return
|
|
|
|
if match.end() == len(string):
|
|
return
|
|
|
|
return string[match.end():]
|
|
|
|
def _map_name(self, name):
|
|
if name.startswith(" "):
|
|
new_name = "".join(["~", name[1:]])
|
|
return new_name
|
|
return name
|
|
|
|
def _get_dir(self):
|
|
"""Retrieve directory"""
|
|
contents = []
|
|
out = subprocess.run([self.ARCHIVER, self.CMDS['list'], self._arch],
|
|
capture_output=True, encoding="latin-1")
|
|
|
|
error_msg = self._parse_banner(out.stderr)
|
|
if error_msg:
|
|
sys.stderr.write(error_msg)
|
|
|
|
for line in out.stdout.split("\n"):
|
|
match = self.LINE_PAT.match(line)
|
|
if not match:
|
|
continue
|
|
|
|
match_entry = match.groupdict()
|
|
entry = {}
|
|
for key in match_entry:
|
|
entry[key] = match_entry[key]
|
|
del match_entry
|
|
|
|
entry['perms'] = "-rw-r--r--"
|
|
if not entry['size']:
|
|
entry['perms'] = "drwxr-xr-x"
|
|
entry['size'] = "0"
|
|
entry['display_name'] = self._map_name(entry['fpath'])
|
|
entry['datetime'] = self._parse_dt(entry['date'], entry['time'])
|
|
entry['uid'] = str(self._uid)
|
|
entry['gid'] = str(self._gid)
|
|
contents.append(entry)
|
|
|
|
return contents
|
|
|
|
def list(self):
|
|
"""
|
|
Output list contents of adf image.
|
|
Convert filenames to be Unix filesystem friendly
|
|
Add suffix to show user what kind of file do he dealing with.
|
|
"""
|
|
if not self._contents:
|
|
sys.stderr.write("Nodos or image error\n")
|
|
return 1
|
|
|
|
for entry in self._contents:
|
|
print(self.ITEM.decode('utf-8')[:-1] % entry)
|
|
return 0
|
|
|
|
def copyout(self, src, dst):
|
|
"""Copy file form the adf image."""
|
|
real_src = [e['display_name'] for e in self._contents
|
|
if e['display_name'] == src]
|
|
|
|
if not real_src:
|
|
raise IOError("No such file or directory")
|
|
|
|
real_src = real_src[0].encode('latin-1')
|
|
|
|
if " " in real_src:
|
|
sys.stderr.write("unadf is unable to operate on filepath with "
|
|
"space inside.\nUse affs to mount image and than"
|
|
" extract desired files.\n")
|
|
return 1
|
|
|
|
extract_dir = tempfile.mkdtemp()
|
|
cmd = [self.ARCHIVER, "-d", extract_dir, self._arch, real_src]
|
|
if check_call(cmd, stdout=open(os.devnull, 'wb'),
|
|
stderr=open(os.devnull, 'wb')) != 0:
|
|
shutil.rmtree(extract_dir)
|
|
sys.stderr.write("unadf returned with nonzero exit code\n")
|
|
return 1
|
|
|
|
# use subprocess, as shutil will crash on binary encoded filenames
|
|
subprocess.run([b'mv', os.path.join(extract_dir.encode("latin-1"),
|
|
real_src), dst.encode('latin-1')])
|
|
shutil.rmtree(extract_dir)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(extfslib.parse_args(UAdf))
|