216 lines
6.7 KiB
Python
Executable File
216 lines
6.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
UADF Virtual filesystem
|
|
|
|
This extfs provides quick and dirty read-only access to disk image files for
|
|
the Commodore Amiga adf or adz (gzipped adfs) and dms.
|
|
|
|
It requires the unadf utility, unfortunately there is no original sources,
|
|
since authors page doesn't exists anymore. Luckily, there is a copy of the
|
|
source (and useful patches) in Debian repository:
|
|
http://packages.debian.org/sid/unadf
|
|
|
|
There should be one change made to the source of unadf, though. While using
|
|
"-lr" switch it by default also displays comments, separated by the comma.
|
|
However there is no way to distinguish where filename ends and comment starts,
|
|
if comment or filename already contains any comma.
|
|
|
|
The patched sources are available from: https://github.com/lclevy/ADFlib
|
|
|
|
It also requires xdms utility, for optional dms support.
|
|
|
|
Changelog:
|
|
1.4 Adapt to unadf 1.2 and use Latin1 as default encoding
|
|
1.3 Switch to Python3
|
|
1.2 Added failsafe for filenames in archive with spaces and nodos message.
|
|
1.1 Moved common code into extfslib library
|
|
1.0 Initial release
|
|
|
|
Author: Roman 'gryf' Dobosz <gryf73@gmail.com>
|
|
Date: 2023-10-16
|
|
Version: 1.4
|
|
Licence: BSD
|
|
"""
|
|
|
|
import gzip
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
import extfslib
|
|
|
|
|
|
BANNER_PAT = re.compile(r'unADF v\d.\d : a unzip like for .ADF files, '
|
|
r'powered by ADFlib (.*)\n\n')
|
|
|
|
|
|
class UAdf(extfslib.Archive):
|
|
"""
|
|
Class for interact with c1541 program and MC
|
|
"""
|
|
LINE_PAT = re.compile(r'\s*(?P<size>\d+)?'
|
|
r'\s{2}(?P<date>\d{4}/\d{2}/\d{2})'
|
|
r'\s{2}\s?(?P<time>\d+:\d{2}:\d{2})'
|
|
r'\s{2}(?P<fpath>.*)')
|
|
ARCHIVER = "unadf"
|
|
DMS = "xdms"
|
|
CMDS = {"list": "-lr",
|
|
"read": "r",
|
|
"write": "w",
|
|
"delete": "d"}
|
|
DATETIME = "%s-%s-%s %02d:%s"
|
|
|
|
def __init__(self, fname):
|
|
"""Prepare archive content for operations"""
|
|
self._clean = True
|
|
self._arch = fname
|
|
|
|
if fname.lower().endswith(".adz"):
|
|
self._ungzip()
|
|
|
|
if fname.lower().endswith(".dms"):
|
|
self._undms()
|
|
|
|
super(UAdf, self).__init__(self._arch)
|
|
|
|
def __del__(self):
|
|
"""Cleanup"""
|
|
if not self._clean:
|
|
try:
|
|
os.unlink(self._arch)
|
|
except OSError:
|
|
pass
|
|
|
|
def _parse_dt(self, date, time):
|
|
"""Return parsed datetime which fulfill extfs standards date."""
|
|
year, month, day = date.split("/")
|
|
hours, minutes, _unused = time.split(":")
|
|
return self.DATETIME % (month, day, year, int(hours), minutes)
|
|
|
|
def _ungzip(self):
|
|
"""Create temporary file for ungzipped adf file since unadf does not
|
|
accept gzipped content in any way including reading from stdin."""
|
|
fdesc, tmp_fname = tempfile.mkstemp(suffix=".adf")
|
|
os.close(fdesc)
|
|
|
|
with gzip.open(self._arch) as gobj:
|
|
with open(tmp_fname, "wb") as fobj:
|
|
fobj.write(gobj.read())
|
|
self._arch = tmp_fname
|
|
self._clean = False
|
|
|
|
def _undms(self):
|
|
"""Create temporary adf file extracted from dms."""
|
|
fdesc, tmp_fname = tempfile.mkstemp(suffix=".adf")
|
|
os.close(fdesc)
|
|
|
|
result = subprocess.run([self.DMS, '-q', 'u', self._arch,
|
|
"+" + tmp_fname])
|
|
if result.returncode == 0:
|
|
self._arch = tmp_fname
|
|
self._clean = False
|
|
|
|
def _parse_banner(self, string):
|
|
match = BANNER_PAT.match(string)
|
|
if not match:
|
|
return
|
|
|
|
if match.end() == len(string):
|
|
return
|
|
|
|
return string[match.end():]
|
|
|
|
def _map_name(self, name):
|
|
if name.startswith(" "):
|
|
new_name = "".join(["~", name[1:]])
|
|
return new_name
|
|
return name
|
|
|
|
def _get_dir(self):
|
|
"""Retrieve directory"""
|
|
contents = []
|
|
out = subprocess.run([self.ARCHIVER, self.CMDS['list'], self._arch],
|
|
capture_output=True, encoding="latin-1")
|
|
|
|
error_msg = self._parse_banner(out.stderr)
|
|
if error_msg:
|
|
sys.stderr.write(error_msg)
|
|
|
|
for line in out.stdout.split("\n"):
|
|
match = self.LINE_PAT.match(line)
|
|
if not match:
|
|
continue
|
|
|
|
match_entry = match.groupdict()
|
|
entry = {}
|
|
for key in match_entry:
|
|
entry[key] = match_entry[key]
|
|
del match_entry
|
|
|
|
entry['perms'] = "-rw-r--r--"
|
|
if not entry['size']:
|
|
entry['perms'] = "drwxr-xr-x"
|
|
entry['size'] = "0"
|
|
entry['display_name'] = self._map_name(entry['fpath'])
|
|
entry['datetime'] = self._parse_dt(entry['date'], entry['time'])
|
|
entry['uid'] = str(self._uid)
|
|
entry['gid'] = str(self._gid)
|
|
contents.append(entry)
|
|
|
|
return contents
|
|
|
|
def list(self):
|
|
"""
|
|
Output list contents of adf image.
|
|
Convert filenames to be Unix filesystem friendly
|
|
Add suffix to show user what kind of file do he dealing with.
|
|
"""
|
|
if not self._contents:
|
|
sys.stderr.write("Nodos or image error\n")
|
|
return 1
|
|
|
|
for entry in self._contents:
|
|
print(self.ITEM.decode('utf-8')[:-1] % entry)
|
|
return 0
|
|
|
|
def copyout(self, src, dst):
|
|
"""Copy file form the adf image."""
|
|
real_src = [e['display_name'] for e in self._contents
|
|
if e['display_name'] == src]
|
|
|
|
if not real_src:
|
|
raise IOError("No such file or directory")
|
|
|
|
real_src = real_src[0].encode('latin-1')
|
|
|
|
if " " in real_src:
|
|
sys.stderr.write("unadf is unable to operate on filepath with "
|
|
"space inside.\nUse affs to mount image and than"
|
|
" extract desired files.\n")
|
|
return 1
|
|
|
|
extract_dir = tempfile.mkdtemp()
|
|
cmd = [self.ARCHIVER, "-d", extract_dir, self._arch, real_src]
|
|
result = subprocess.run(cmd, stdout=open(os.devnull, 'wb'),
|
|
stderr=open(os.devnull, 'wb'))
|
|
|
|
error_msg = self._parse_banner(result.stderr)
|
|
if error_msg:
|
|
sys.stderr.write("unadf returned with error:\n")
|
|
sys.stderr.write(error_msg)
|
|
shutil.rmtree(extract_dir)
|
|
return 1
|
|
|
|
# use subprocess, as shutil will crash on binary encoded filenames
|
|
subprocess.run([b'mv', os.path.join(extract_dir.encode("latin-1"),
|
|
real_src), dst.encode('latin-1')])
|
|
shutil.rmtree(extract_dir)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(extfslib.parse_args(UAdf))
|