mirror of
https://github.com/gryf/fs-uae-wrapper.git
synced 2025-12-19 04:20:23 +01:00
Added archive file handler
This commit is contained in:
136
fs_uae_wrapper/file_archive.py
Normal file
136
fs_uae_wrapper/file_archive.py
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
"""
|
||||||
|
File archive classes
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
class Archive(object):
|
||||||
|
"""Base class for archive support"""
|
||||||
|
ADD = ['a']
|
||||||
|
EXTRACT = ['x']
|
||||||
|
ARCH = 'false'
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.archiver = None
|
||||||
|
self.which()
|
||||||
|
|
||||||
|
def create(self, arch_name):
|
||||||
|
"""
|
||||||
|
Create archive using self.archiver and parameters in self.ADD
|
||||||
|
attribute
|
||||||
|
"""
|
||||||
|
return subprocess.call([self.ARCH] + self.ADD + [arch_name, '.']) == 0
|
||||||
|
|
||||||
|
def extract(self, arch_name):
|
||||||
|
"""
|
||||||
|
Create archive using self.archiver and parameters in self.ADD
|
||||||
|
attribute
|
||||||
|
"""
|
||||||
|
return subprocess.call([self.ARCH] + self.EXTRACT +
|
||||||
|
[arch_name, '.']) == 0
|
||||||
|
|
||||||
|
def which(self):
|
||||||
|
"""
|
||||||
|
Check if there selected archiver is available in the system and place
|
||||||
|
it to the archiver attribute
|
||||||
|
"""
|
||||||
|
|
||||||
|
if isinstance(self.ARCH, list):
|
||||||
|
executables = self.ARCH
|
||||||
|
else:
|
||||||
|
executables = [self.ARCH]
|
||||||
|
for fname in executables:
|
||||||
|
for path in os.environ["PATH"].split(os.pathsep):
|
||||||
|
path = os.path.join(path.strip('"'), fname)
|
||||||
|
if os.path.isfile(path) and os.access(path, os.X_OK):
|
||||||
|
self.archiver = fname
|
||||||
|
return
|
||||||
|
|
||||||
|
self.archiver = None
|
||||||
|
|
||||||
|
|
||||||
|
class TarArchive(Archive):
|
||||||
|
ADD = ['cf']
|
||||||
|
EXTRACT = ['xf']
|
||||||
|
ARCH = 'tar'
|
||||||
|
|
||||||
|
|
||||||
|
class TarGzipArchive(TarArchive):
|
||||||
|
ADD = ['zcf']
|
||||||
|
|
||||||
|
|
||||||
|
class TarBzip2Archive(TarArchive):
|
||||||
|
ADD = ['jcf']
|
||||||
|
|
||||||
|
|
||||||
|
class TarXzArchive(TarArchive):
|
||||||
|
ADD = ['Jcf']
|
||||||
|
|
||||||
|
|
||||||
|
class LhaArchive(Archive):
|
||||||
|
ARCH = ['lha']
|
||||||
|
|
||||||
|
|
||||||
|
class ZipArchive(Archive):
|
||||||
|
ADD = ['a', '-tzip']
|
||||||
|
ARCH = '7z'
|
||||||
|
|
||||||
|
|
||||||
|
class SevenZArchive(Archive):
|
||||||
|
ARCH = '7z'
|
||||||
|
|
||||||
|
|
||||||
|
class LzxArchive(Archive):
|
||||||
|
ARCH = 'unlzx'
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create(self, arch_name):
|
||||||
|
sys.stderr.write('Cannot create LZX archive. Only extracting is'
|
||||||
|
'supported\n')
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class RarArchive(Archive):
|
||||||
|
ARCH = ['rar', 'unrar']
|
||||||
|
|
||||||
|
def create(self, arch_name):
|
||||||
|
if self.archiver == 'unrar':
|
||||||
|
sys.stderr.write('Cannot create RAR archive. Only extracting is'
|
||||||
|
'supported by unrar.\n')
|
||||||
|
return False
|
||||||
|
|
||||||
|
return subprocess.call([self.ARCH] + self.ADD + [arch_name,
|
||||||
|
os.listdir('.')]) == 0
|
||||||
|
|
||||||
|
|
||||||
|
def get_archiver(filename):
|
||||||
|
"""Return right class for provided archive filename"""
|
||||||
|
|
||||||
|
if not os.path.exists(filename):
|
||||||
|
sys.stderr.write("Archive `%s' doesn't exists.\n" % filename)
|
||||||
|
return None
|
||||||
|
|
||||||
|
_, ext = os.path.splitext(filename)
|
||||||
|
|
||||||
|
archivers = {'.tar': TarArchive,
|
||||||
|
'.tgz': TarGzipArchive,
|
||||||
|
'.tar.gz': TarGzipArchive,
|
||||||
|
'.tar.bz2': TarBzip2Archive,
|
||||||
|
'.tar.xz': TarXzArchive,
|
||||||
|
'.rar': RarArchive,
|
||||||
|
'.7z': SevenZArchive,
|
||||||
|
'.zip': ZipArchive,
|
||||||
|
'.lha': LhaArchive,
|
||||||
|
'.lzx': LzxArchive}
|
||||||
|
|
||||||
|
archiver = archivers.get(ext)
|
||||||
|
if not archiver:
|
||||||
|
return None
|
||||||
|
|
||||||
|
archobj = archiver()
|
||||||
|
if archobj.archiver is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return archobj
|
||||||
84
tests/test_archive.py
Normal file
84
tests/test_archive.py
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import shutil
|
||||||
|
from tempfile import mkstemp, mkdtemp
|
||||||
|
from unittest import TestCase
|
||||||
|
|
||||||
|
try:
|
||||||
|
from unittest import mock
|
||||||
|
except ImportError:
|
||||||
|
import mock
|
||||||
|
|
||||||
|
from fs_uae_wrapper import archive
|
||||||
|
from fs_uae_wrapper import utils
|
||||||
|
|
||||||
|
|
||||||
|
class TestArchive(TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
fd, self.fname = mkstemp()
|
||||||
|
self.dirname = mkdtemp()
|
||||||
|
self.confdir = mkdtemp()
|
||||||
|
os.close(fd)
|
||||||
|
self._argv = sys.argv[:]
|
||||||
|
sys.argv = ['fs-uae-wrapper']
|
||||||
|
self.curdir = os.path.abspath(os.curdir)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
os.chdir(self.curdir)
|
||||||
|
try:
|
||||||
|
shutil.rmtree(self.dirname)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
shutil.rmtree(self.confdir)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
os.unlink(self.fname)
|
||||||
|
sys.argv = self._argv[:]
|
||||||
|
|
||||||
|
def test_validate_options(self):
|
||||||
|
|
||||||
|
arch = archive.Archive('Config.fs-uae', utils.CmdOption(), {})
|
||||||
|
self.assertFalse(arch._validate_options())
|
||||||
|
|
||||||
|
arch.all_options['wrapper'] = 'archive'
|
||||||
|
self.assertFalse(arch._validate_options())
|
||||||
|
|
||||||
|
arch.all_options['wrapper_archive'] = 'fake.tgz'
|
||||||
|
self.assertTrue(arch._validate_options())
|
||||||
|
|
||||||
|
@mock.patch('fs_uae_wrapper.base.Base._run_emulator')
|
||||||
|
@mock.patch('fs_uae_wrapper.base.Base._kickstart_option')
|
||||||
|
@mock.patch('fs_uae_wrapper.base.Base._copy_conf')
|
||||||
|
@mock.patch('fs_uae_wrapper.base.Base._extract')
|
||||||
|
def test_run(self, extr, cconf, kick, runemul):
|
||||||
|
|
||||||
|
extr.return_value = False
|
||||||
|
cconf.return_value = False
|
||||||
|
kick.return_value = {}
|
||||||
|
runemul.return_value = False
|
||||||
|
|
||||||
|
try:
|
||||||
|
arch = archive.Archive('Config.fs-uae', utils.CmdOption(), {})
|
||||||
|
self.assertFalse(arch.run())
|
||||||
|
|
||||||
|
arch.all_options = {'wrapper': 'archive',
|
||||||
|
'wrapper_archive': 'fake.tgz'}
|
||||||
|
|
||||||
|
self.assertFalse(arch.run())
|
||||||
|
|
||||||
|
extr.return_value = True
|
||||||
|
self.assertFalse(arch.run())
|
||||||
|
|
||||||
|
cconf.return_value = True
|
||||||
|
self.assertTrue(arch.run())
|
||||||
|
|
||||||
|
kick.return_value = {'foo': 'bar'}
|
||||||
|
self.assertTrue(arch.run())
|
||||||
|
self.assertDictEqual(arch.fsuae_options, {'foo': 'bar'})
|
||||||
|
|
||||||
|
runemul.return_value = True
|
||||||
|
self.assertTrue(arch.run())
|
||||||
|
finally:
|
||||||
|
arch.clean()
|
||||||
Reference in New Issue
Block a user