1
0
mirror of https://github.com/gryf/fs-uae-wrapper.git synced 2025-12-18 20:10:26 +01:00

Added support for zip archiver

Also fixed issue with extension get from regexp
This commit is contained in:
2017-01-02 15:39:28 +01:00
parent 230ec154b6
commit ee65e7ed7f
2 changed files with 60 additions and 52 deletions

View File

@@ -7,6 +7,24 @@ import sys
import re
def which(archivers):
"""
Check if there selected archiver is available in the system and place it
to the archiver attribute
"""
if not isinstance(archivers, list):
archivers = [archivers]
for fname in archivers:
for path in os.environ["PATH"].split(os.pathsep):
path = os.path.join(path.strip('"'), fname)
if os.path.isfile(path) and os.access(path, os.X_OK):
return fname
return None
class Archive(object):
"""Base class for archive support"""
ADD = ['a']
@@ -14,15 +32,15 @@ class Archive(object):
ARCH = 'false'
def __init__(self):
self.archiver = None
self.which()
self.archiver = which(self.ARCH)
self._compess = self.archiver
self._decompess = self.archiver
def create(self, arch_name):
"""
Create archive using self.archiver and parameters in self.ADD
attribute
Create archive. Return True on success, False otherwise.
"""
result = subprocess.call([self.archiver] + self.ADD + [arch_name, '.'])
result = subprocess.call([self._compess] + self.ADD + [arch_name, '.'])
if result != 0:
sys.stderr.write("Unable to create archive `%s'\n" % arch_name)
return False
@@ -30,38 +48,19 @@ class Archive(object):
def extract(self, arch_name):
"""
Create archive using self.archiver and parameters in self.ADD
attribute
Extract archive. Return True on success, False otherwise.
"""
if not os.path.exists(arch_name):
sys.stderr.write("Archive `%s' doesn't exists.\n" % arch_name)
return False
result = subprocess.call([self.archiver] + self.EXTRACT + [arch_name])
result = subprocess.call([self._decompess] + self.EXTRACT +
[arch_name])
if result != 0:
sys.stderr.write("Unable to extract archive `%s'\n" % arch_name)
return False
return True
def which(self):
"""
Check if there selected archiver is available in the system and place
it to the archiver attribute
"""
if isinstance(self.ARCH, list):
executables = self.ARCH
else:
executables = [self.ARCH]
for fname in executables:
for path in os.environ["PATH"].split(os.pathsep):
path = os.path.join(path.strip('"'), fname)
if os.path.isfile(path) and os.access(path, os.X_OK):
self.archiver = fname
return
self.archiver = None
class TarArchive(Archive):
ADD = ['cf']
@@ -87,7 +86,14 @@ class LhaArchive(Archive):
class ZipArchive(Archive):
ADD = ['a', '-tzip']
ARCH = '7z'
ARCH = ['7z', 'zip']
def __init__(self):
super(ZipArchive, self).__init__()
if self.archiver == 'zip':
self._decompess = which('unzip')
ZipArchive.ADD = ['-r']
ZipArchive.EXTRACT = []
class SevenZArchive(Archive):
@@ -114,7 +120,7 @@ class RarArchive(Archive):
'supported by unrar.\n')
return False
result = subprocess.call([self.archiver] + self.ADD + [arch_name] +
result = subprocess.call([self._compess] + self.ADD + [arch_name] +
sorted(os.listdir('.')))
if result != 0:
sys.stderr.write("Unable to create archive `%s'\n" % arch_name)
@@ -126,11 +132,11 @@ def get_archiver(arch_name):
"""Return right class for provided archive file name"""
_, ext = os.path.splitext(arch_name)
re_tar = re.compile('.*([tT][aA][rR].[^.]+$)')
re_tar = re.compile('.*(.[tT][aA][rR].[^.]+$)')
result = re_tar.match(arch_name)
if result:
ext = result.groups[0]
ext = result.groups()[0]
archivers = {'.tar': TarArchive,
'.tgz': TarGzipArchive,

View File

@@ -61,27 +61,19 @@ class TestArchive(TestCase):
call.assert_called_once_with(['false', 'x', 'foo'])
def test_archive_which(self):
arch = file_archive.Archive()
self.assertEqual(arch.archiver, 'false')
arch.ARCH = 'sh'
self.assertEqual(file_archive.which('sh'), 'sh')
self.assertIsNone(file_archive.which('blahblahexec'))
self.assertEqual(file_archive.which(['blahblahexec', 'pip', 'sh']),
'pip')
arch.which()
self.assertEqual(arch.archiver, 'sh')
arch.ARCH = 'blahblahexec'
arch.which()
self.assertIsNone(arch.archiver)
arch.ARCH = ['blahblahexec', 'pip', 'sh']
arch.which()
self.assertEqual(arch.archiver, 'pip')
@mock.patch('fs_uae_wrapper.file_archive.Archive.which')
@mock.patch('fs_uae_wrapper.file_archive.which')
@mock.patch('subprocess.call')
def test_tar(self, call, which):
with open('foo', 'w') as fobj:
fobj.write('\n')
which.return_value = 'tar'
arch = file_archive.TarArchive()
arch.archiver = 'tar'
call.return_value = 0
@@ -130,12 +122,14 @@ class TestArchive(TestCase):
self.assertFalse(arch.extract('foo'))
call.assert_called_once_with(['tar', 'xf', 'foo'])
@mock.patch('fs_uae_wrapper.file_archive.Archive.which')
@mock.patch('fs_uae_wrapper.file_archive.which')
@mock.patch('subprocess.call')
def test_lha(self, call, which):
with open('foo', 'w') as fobj:
fobj.write('\n')
which.return_value = 'lha'
arch = file_archive.LhaArchive()
arch.archiver = 'lha'
call.return_value = 0
@@ -148,12 +142,14 @@ class TestArchive(TestCase):
self.assertFalse(arch.extract('foo'))
call.assert_called_once_with(['lha', 'x', 'foo'])
@mock.patch('fs_uae_wrapper.file_archive.Archive.which')
@mock.patch('fs_uae_wrapper.file_archive.which')
@mock.patch('subprocess.call')
def test_lzx(self, call, which):
with open('foo', 'w') as fobj:
fobj.write('\n')
which.return_value = 'unlzx'
arch = file_archive.LzxArchive()
arch.archiver = 'unlzx'
call.return_value = 0
@@ -166,12 +162,14 @@ class TestArchive(TestCase):
self.assertFalse(arch.extract('foo'))
call.assert_called_once_with(['unlzx', '-x', 'foo'])
@mock.patch('fs_uae_wrapper.file_archive.Archive.which')
@mock.patch('fs_uae_wrapper.file_archive.which')
@mock.patch('subprocess.call')
def test_7zip(self, call, which):
with open('foo', 'w') as fobj:
fobj.write('\n')
which.return_value = '7z'
arch = file_archive.SevenZArchive()
arch.archiver = '7z'
call.return_value = 0
@@ -184,12 +182,14 @@ class TestArchive(TestCase):
self.assertFalse(arch.extract('foo'))
call.assert_called_once_with(['7z', 'x', 'foo'])
@mock.patch('fs_uae_wrapper.file_archive.Archive.which')
@mock.patch('fs_uae_wrapper.file_archive.which')
@mock.patch('subprocess.call')
def test_zip(self, call, which):
with open('foo', 'w') as fobj:
fobj.write('\n')
which.return_value = '7z'
arch = file_archive.ZipArchive()
arch.archiver = '7z'
call.return_value = 0
@@ -202,10 +202,12 @@ class TestArchive(TestCase):
self.assertFalse(arch.extract('foo'))
call.assert_called_once_with(['7z', 'x', 'foo'])
@mock.patch('fs_uae_wrapper.file_archive.Archive.which')
@mock.patch('fs_uae_wrapper.file_archive.which')
@mock.patch('subprocess.call')
def test_rar(self, call, which):
which.return_value = 'rar'
arch = file_archive.RarArchive()
arch.archiver = 'rar'
call.return_value = 0
@@ -233,7 +235,7 @@ class TestArchive(TestCase):
call.reset_mock()
call.return_value = 0
arch.archiver = 'unrar'
arch._compess = arch._decompess = arch.archiver = 'unrar'
self.assertFalse(arch.create('foo'))
call.assert_not_called()