Adflib 0.8 has been released June 2023, so that this script needed to be adapted to the new version.
199 lines
6.3 KiB
Python
Executable File
199 lines
6.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
UADF Virtual filesystem
|
|
|
|
This extfs provides quick and dirty read-only access to disk image files for
|
|
the Commodore Amiga adf or adz (gzipped adfs) and dms.
|
|
|
|
It requires the unadf utility, unfortunately there is no original sources,
|
|
since authors page doesn't exists anymore. Luckily, there is a copy of the
|
|
source (and useful patches) in Debian repository:
|
|
http://packages.debian.org/sid/unadf
|
|
|
|
There should be one change made to the source of unadf, though. While using
|
|
"-lr" switch it by default also displays comments, separated by the comma.
|
|
However there is no way to distinguish where filename ends and comment starts,
|
|
if comment or filename already contains any comma.
|
|
|
|
The patched sources are available from: https://github.com/lclevy/ADFlib
|
|
|
|
It also requires xdms utility, for optional dms support.
|
|
|
|
Changelog:
|
|
1.4 Adapt to unadf 1.2 and use Latin1 as default encoding
|
|
1.3 Switch to Python3
|
|
1.2 Added failsafe for filenames in archive with spaces and nodos message.
|
|
1.1 Moved common code into extfslib library
|
|
1.0 Initial release
|
|
|
|
Author: Roman 'gryf' Dobosz <gryf73@gmail.com>
|
|
Date: 2023-10-16
|
|
Version: 1.4
|
|
Licence: BSD
|
|
"""
|
|
|
|
import gzip
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
from extfslib import Archive, parse_args
|
|
|
|
|
|
class UAdf(Archive):
|
|
"""
|
|
Class for interact with c1541 program and MC
|
|
"""
|
|
LINE_PAT = re.compile(r'\s*(?P<size>\d+)?'
|
|
r'\s{2}(?P<date>\d{4}/\d{2}/\d{2})'
|
|
r'\s{2}\s?(?P<time>\d+:\d{2}:\d{2})'
|
|
r'\s{2}(?P<fpath>.*)')
|
|
ARCHIVER = "unadf"
|
|
DMS = "xdms"
|
|
CMDS = {"list": "-lr",
|
|
"read": "r",
|
|
"write": "w",
|
|
"delete": "d"}
|
|
DATETIME = "%s-%s-%s %02d:%s"
|
|
|
|
def __init__(self, fname):
|
|
"""Prepare archive content for operations"""
|
|
self._clean = True
|
|
self._arch = fname
|
|
|
|
if fname.lower().endswith(".adz"):
|
|
self._ungzip()
|
|
|
|
if fname.lower().endswith(".dms"):
|
|
self._undms()
|
|
|
|
super(UAdf, self).__init__(self._arch)
|
|
|
|
def __del__(self):
|
|
"""Cleanup"""
|
|
if not self._clean:
|
|
try:
|
|
os.unlink(self._arch)
|
|
except OSError:
|
|
pass
|
|
|
|
def _parse_dt(self, date, time):
|
|
"""Return parsed datetime which fulfill extfs standards date."""
|
|
year, month, day = date.split("/")
|
|
hours, minutes, _unused = time.split(":")
|
|
return self.DATETIME % (month, day, year, int(hours), minutes)
|
|
|
|
def _ungzip(self):
|
|
"""Create temporary file for ungzipped adf file since unadf does not
|
|
accept gzipped content in any way including reading from stdin."""
|
|
fdesc, tmp_fname = tempfile.mkstemp(suffix=".adf")
|
|
os.close(fdesc)
|
|
|
|
with gzip.open(self._arch) as gobj:
|
|
with open(tmp_fname, "wb") as fobj:
|
|
fobj.write(gobj.read())
|
|
self._arch = tmp_fname
|
|
self._clean = False
|
|
|
|
def _undms(self):
|
|
"""Create temporary adf file extracted from dms."""
|
|
fdesc, tmp_fname = tempfile.mkstemp(suffix=".adf")
|
|
os.close(fdesc)
|
|
|
|
try:
|
|
subprocess.check_call([self.DMS, '-q', 'u', self._arch,
|
|
"+" + tmp_fname])
|
|
self._arch = tmp_fname
|
|
self._clean = False
|
|
except (subprocess.CalledProcessError, OSError):
|
|
pass
|
|
|
|
def _map_name(self, name):
|
|
if name.startswith(" "):
|
|
new_name = "".join(["~", name[1:]])
|
|
return new_name
|
|
return name
|
|
|
|
def _get_dir(self):
|
|
"""Retrieve directory"""
|
|
contents = []
|
|
out = subprocess.run([self.ARCHIVER, self.CMDS['list'], self._arch],
|
|
capture_output=True, encoding="latin-1")
|
|
|
|
if out.stderr:
|
|
sys.stderr.write(out.stderr)
|
|
|
|
for line in out.stdout.split("\n"):
|
|
match = self.LINE_PAT.match(line)
|
|
if not match:
|
|
continue
|
|
|
|
match_entry = match.groupdict()
|
|
entry = {}
|
|
for key in match_entry:
|
|
entry[key] = match_entry[key]
|
|
del match_entry
|
|
|
|
entry['perms'] = "-rw-r--r--"
|
|
if not entry['size']:
|
|
entry['perms'] = "drwxr-xr-x"
|
|
entry['size'] = "0"
|
|
entry['display_name'] = self._map_name(entry['fpath'])
|
|
entry['datetime'] = self._parse_dt(entry['date'], entry['time'])
|
|
entry['uid'] = str(self._uid)
|
|
entry['gid'] = str(self._gid)
|
|
contents.append(entry)
|
|
|
|
return contents
|
|
|
|
def list(self):
|
|
"""
|
|
Output list contents of adf image.
|
|
Convert filenames to be Unix filesystem friendly
|
|
Add suffix to show user what kind of file do he dealing with.
|
|
"""
|
|
if not self._contents:
|
|
sys.stderr.write("Nodos or image error\n")
|
|
return 1
|
|
|
|
for entry in self._contents:
|
|
print(self.ITEM.decode('utf-8')[:-1] % entry)
|
|
return 0
|
|
|
|
def copyout(self, src, dst):
|
|
"""Copy file form the adf image."""
|
|
real_src = [e['display_name'] for e in self._contents
|
|
if e['display_name'] == src]
|
|
|
|
if not real_src:
|
|
raise IOError("No such file or directory")
|
|
|
|
real_src = real_src[0].encode('latin-1')
|
|
|
|
if " " in real_src:
|
|
sys.stderr.write("unadf is unable to operate on filepath with "
|
|
"space inside.\nUse affs to mount image and than"
|
|
" extract desired files.\n")
|
|
return 1
|
|
|
|
extract_dir = tempfile.mkdtemp()
|
|
cmd = [self.ARCHIVER, "-d", extract_dir, self._arch, real_src]
|
|
if check_call(cmd, stdout=open(os.devnull, 'wb'),
|
|
stderr=open(os.devnull, 'wb')) != 0:
|
|
shutil.rmtree(extract_dir)
|
|
sys.stderr.write("unadf returned with nonzero exit code\n")
|
|
return 1
|
|
|
|
# use subprocess, as shutil will crash on binary encoded filenames
|
|
subprocess.run([b'mv', os.path.join(extract_dir.encode("latin-1"),
|
|
real_src), dst.encode('latin-1')])
|
|
shutil.rmtree(extract_dir)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(parse_args(UAdf))
|