Files
mc_uadf/uadf
2023-10-23 18:16:54 +02:00

209 lines
6.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
UADF Virtual filesystem
This extfs provides quick and dirty read-only access to disk image files for
the Commodore Amiga adf or adz (gzipped adfs) and dms.
It requires the unadf utility, unfortunately there is no original sources,
since authors page doesn't exists anymore. Luckily, there is a copy of the
source (and useful patches) in Debian repository:
http://packages.debian.org/sid/unadf
There should be one change made to the source of unadf, though. While using
"-lr" switch it by default also displays comments, separated by the comma.
However there is no way to distinguish where filename ends and comment starts,
if comment or filename already contains any comma.
The patched sources are available from: https://github.com/lclevy/ADFlib
It also requires xdms utility, for optional dms support.
Changelog:
1.4 Adapt to unadf 1.2 and use Latin1 as default encoding
1.3 Switch to Python3
1.2 Added failsafe for filenames in archive with spaces and nodos message.
1.1 Moved common code into extfslib library
1.0 Initial release
Author: Roman 'gryf' Dobosz <gryf73@gmail.com>
Date: 2023-10-19
Version: 1.4
Licence: BSD
"""
import gzip
import os
import re
import shutil
import subprocess
import sys
import tempfile
import extfslib
BANNER_PAT = re.compile(r'unADF v\d.\d : a unzip like for .ADF files, '
r'powered by ADFlib (.*)\n\n')
class UAdf(extfslib.Archive):
"""
Class for interact with ADF/DMS images and MC
"""
LINE_PAT = re.compile(r'\s*(?P<size>\d+)?'
r'\s{2}(?P<date>\d{4}/\d{2}/\d{2})'
r'\s{2}\s?(?P<time>\d+:\d{2}:\d{2})'
r'\s{2}(?P<fpath>.*)')
ARCHIVER = "unadf"
DMS = "xdms"
CMDS = {"list": "-lr",
"read": "r",
"write": "w",
"delete": "d"}
DATETIME = "%s-%s-%s %02d:%s"
def __init__(self, fname):
"""Prepare archive content for operations"""
self._clean = True
self._arch = fname
if fname.lower().endswith(".adz"):
self._ungzip()
if fname.lower().endswith(".dms"):
self._undms()
super(UAdf, self).__init__(self._arch)
def __del__(self):
"""Cleanup"""
if not self._clean:
try:
os.unlink(self._arch)
except OSError:
pass
def _parse_dt(self, date, time):
"""Return parsed datetime which fulfill extfs standards date."""
year, month, day = date.split("/")
hours, minutes, _unused = time.split(":")
return self.DATETIME % (month, day, year, int(hours), minutes)
def _ungzip(self):
"""Create temporary file for ungzipped adf file since unadf does not
accept gzipped content in any way including reading from stdin."""
fdesc, tmp_fname = tempfile.mkstemp(suffix=".adf")
os.close(fdesc)
with gzip.open(self._arch) as gobj:
with open(tmp_fname, "wb") as fobj:
fobj.write(gobj.read())
self._arch = tmp_fname
self._clean = False
def _undms(self):
"""Create temporary adf file extracted from dms."""
fdesc, tmp_fname = tempfile.mkstemp(suffix=".adf")
os.close(fdesc)
result = subprocess.run([self.DMS, '-q', 'u', self._arch,
"+" + tmp_fname])
if result.returncode == 0:
self._arch = tmp_fname
self._clean = False
def _parse_banner(self, string):
match = BANNER_PAT.match(string)
if not match:
return
if match.end() == len(string):
return
return string[match.end():]
def _get_dir(self):
"""Retrieve directory"""
contents = []
out = subprocess.run([self.ARCHIVER, self.CMDS['list'], self._arch],
capture_output=True, encoding="latin-1")
error_msg = self._parse_banner(out.stderr)
if error_msg:
sys.stderr.write(error_msg)
for line in out.stdout.split("\n"):
match = self.LINE_PAT.match(line)
if not match:
continue
match_entry = match.groupdict()
entry = {}
for key in match_entry:
entry[key] = match_entry[key]
del match_entry
entry['perms'] = "-rw-r--r--"
if not entry['size']:
entry['perms'] = "drwxr-xr-x"
entry['size'] = "0"
entry['display_name'] = self._map_name(entry['fpath'])
entry['datetime'] = self._parse_dt(entry['date'], entry['time'])
entry['uid'] = str(self._uid)
entry['gid'] = str(self._gid)
contents.append(entry)
return contents
def list(self):
"""
Output list contents of adf image.
Convert filenames to be Unix filesystem friendly
Add suffix to show user what kind of file do he dealing with.
"""
if not self._contents:
sys.stderr.write("Nodos or image error\n")
return 1
for entry in self._contents:
print(self.ITEM.decode('utf-8')[:-1] % entry)
return 0
def copyout(self, src, dst):
"""Copy file form the adf image."""
real_src = [e['display_name'] for e in self._contents
if e['display_name'] == src]
if not real_src:
raise IOError("No such file or directory")
real_src = real_src[0].encode('latin-1')
if b" " in real_src:
sys.stderr.write("unadf is unable to operate on filepath with "
"space inside.\nUse affs to mount image and than"
" extract desired files.\n")
return 1
extract_dir = tempfile.mkdtemp()
cmd = [self.ARCHIVER, "-d", extract_dir, self._arch, real_src]
result = subprocess.run(cmd, capture_output=True)
error_msg = self._parse_banner(result.stderr.decode('utf-8'))
if error_msg:
sys.stderr.write("unadf returned with error:\n")
sys.stderr.write(error_msg)
shutil.rmtree(extract_dir)
return 1
# use subprocess, as shutil will crash on binary encoded filenames
subprocess.run([b'mv', os.path.join(extract_dir.encode("latin-1"),
real_src), dst.encode('latin-1')])
shutil.rmtree(extract_dir)
return 0
if __name__ == "__main__":
sys.exit(extfslib.parse_args(UAdf))