1
0
mirror of https://github.com/gryf/boxpy.git synced 2025-12-19 05:30:18 +01:00

Added wrapper on subprocess.run

This commit is contained in:
2021-06-26 11:13:59 +02:00
parent 82c52030c8
commit cc4b4da253

202
box.py
View File

@@ -211,6 +211,24 @@ def convert_to_mega(size):
return result return result
class Run:
"""
Helper class on subprocess.run()
command is a list with command and its params to execute
"""
def __init__(self, command, capture_output=True):
result = subprocess.run(command, encoding='utf-8',
capture_output=capture_output)
if result.stdout:
LOG.debug2(result.stdout)
if result.stderr:
LOG.debug2(result.stderr)
self.returncode = result.returncode
self.stdout = result.stdout
self.stderr = result.stderr
class BoxError(Exception): class BoxError(Exception):
pass pass
@@ -507,13 +525,7 @@ class VBoxManage:
return disk_path return disk_path
def get_media_size(self, media_path): def get_media_size(self, media_path):
try: out = Run(['vboxmanage', 'showmediuminfo', media_path]).stdout
out = subprocess.check_output(['vboxmanage', 'showmediuminfo',
media_path],
encoding=sys.getdefaultencoding(),
stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
return None
for line in out.split('\n'): for line in out.split('\n'):
if line.startswith('Capacity:'): if line.startswith('Capacity:'):
@@ -525,17 +537,13 @@ class VBoxManage:
return line.split(' ')[0].strip() return line.split(' ')[0].strip()
def get_vm_info(self): def get_vm_info(self):
try: out = Run(['vboxmanage', 'showvminfo', self.name_or_uuid])
out = subprocess.check_output(['vboxmanage', 'showvminfo', if out.returncode != 0:
self.name_or_uuid],
encoding=sys.getdefaultencoding(),
stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
return {} return {}
self.vm_info = {} self.vm_info = {}
for line in out.split('\n'): for line in out.stdout.split('\n'):
if line.startswith('Config file:'): if line.startswith('Config file:'):
self.vm_info['config_file'] = line.split('Config ' self.vm_info['config_file'] = line.split('Config '
'file:')[1].strip() 'file:')[1].strip()
@@ -560,39 +568,36 @@ class VBoxManage:
return self.vm_info return self.vm_info
def poweroff(self, silent=False): def poweroff(self, silent=False):
cmd = ['vboxmanage', 'controlvm', self.name_or_uuid, 'poweroff'] Run(['vboxmanage', 'controlvm', self.name_or_uuid, 'poweroff'])
if silent:
subprocess.call(cmd, stderr=subprocess.DEVNULL)
else:
subprocess.call(cmd)
def vmlist(self, only_running=False, long_list=False): def vmlist(self, only_running=False, long_list=False):
subcommand = 'runningvms' if only_running else 'vms' subcommand = 'runningvms' if only_running else 'vms'
long_list = '-l' if long_list else '-s' long_list = '-l' if long_list else '-s'
subprocess.call(['vboxmanage', 'list', subcommand, long_list]) return Run(['vboxmanage', 'list', subcommand, long_list]).stdout
def get_running_vms(self): def get_running_vms(self):
return subprocess.getoutput('vboxmanage list runningvms') return Run(['vboxmanage', 'list', 'runningvms']).stdout
def destroy(self): def destroy(self):
LOG.info('Removing VM %s.', self.name_or_uuid)
self.poweroff(silent=True) self.poweroff(silent=True)
if subprocess.call(['vboxmanage', 'unregistervm', self.name_or_uuid, if Run(['vboxmanage', 'unregistervm', self.name_or_uuid,
'--delete']) != 0: '--delete']).returncode != 0:
raise BoxVBoxFailure(f'Removing VM {self.name_or_uuid} failed') LOG.fatal('Removing VM "%s" failed', self.name_or_uuid)
raise BoxVBoxFailure()
def create(self, cpus, memory, port=None): def create(self, cpus, memory, port=None):
LOG.info('Creating VM %s.', self.name_or_uuid)
self.uuid = None self.uuid = None
memory = convert_to_mega(memory) memory = convert_to_mega(memory)
try: out = Run(['vboxmanage', 'createvm', '--name', self.name_or_uuid,
out = subprocess.check_output(['vboxmanage', 'createvm', '--name', '--register'])
self.name_or_uuid, '--register'], if out.returncode != 0:
encoding=sys.getdefaultencoding()) LOG.fatal('Failed to create VM.')
except subprocess.CalledProcessError:
return None return None
for line in out.split('\n'): for line in out.stdout.split('\n'):
LOG.details(line)
if line.startswith('UUID:'): if line.startswith('UUID:'):
self.uuid = line.split('UUID:')[1].strip() self.uuid = line.split('UUID:')[1].strip()
@@ -602,67 +607,75 @@ class VBoxManage:
if not port: if not port:
port = self._find_unused_port() port = self._find_unused_port()
if subprocess.call(['vboxmanage', 'modifyvm', self.name_or_uuid, if Run(['vboxmanage', 'modifyvm', self.name_or_uuid,
'--memory', str(memory), '--memory', str(memory),
'--cpus', str(cpus), '--cpus', str(cpus),
'--boot1', 'disk', '--boot1', 'disk',
'--acpi', 'on', '--acpi', 'on',
'--audio', 'none', '--audio', 'none',
'--nic1', 'nat', '--nic1', 'nat',
'--natpf1', f'guestssh,tcp,,{port},,22']) != 0: '--natpf1', f'guestssh,tcp,,{port},,22']).returncode != 0:
raise BoxVBoxFailure(f'Cannot modify VM "{self.name_or_uuid}".') LOG.fatal(f'Cannot modify VM "{self.name_or_uuid}".')
raise BoxVBoxFailure()
return self.uuid return self.uuid
def convertfromraw(self, src, dst): def convertfromraw(self, src, dst):
if subprocess.call(["vboxmanage", "convertfromraw", src, dst]) != 0: if Run(["vboxmanage", "convertfromraw", src, dst]).returncode != 0:
os.unlink(src) os.unlink(src)
raise BoxVBoxFailure('Cannot convert image to VDI.') LOG.fatal('Cannot convert image to VDI.')
raise BoxVBoxFailure()
os.unlink(src) os.unlink(src)
def closemedium(self, type_, mediumpath): def closemedium(self, type_, mediumpath):
if subprocess.call(['vboxmanage', 'closemedium', type_, if Run(['vboxmanage', 'closemedium', type_,
mediumpath]) != 0: mediumpath]).returncode != 0:
raise BoxVBoxFailure(f'Failed close medium {mediumpath}.') LOG.fatal('Failed close medium %.', mediumpath)
raise BoxVBoxFailure()
def create_controller(self, name, type_): def create_controller(self, name, type_):
if subprocess.call(['vboxmanage', 'storagectl', self.name_or_uuid, if Run(['vboxmanage', 'storagectl', self.name_or_uuid, '--name', name,
'--name', name, '--add', type_]) != 0: '--add', type_]).returncode != 0:
raise BoxVBoxFailure(f'Adding controller {type_} has failed.') LOG.fatal('Adding controller %s has failed.', type_)
raise BoxVBoxFailure()
def move_and_resize_image(self, src, dst, size): def move_and_resize_image(self, src, dst, size):
fullpath = os.path.join(self.get_vm_base_path(), dst) fullpath = os.path.join(self.get_vm_base_path(), dst)
size = convert_to_mega(size) size = convert_to_mega(size)
if subprocess.call(['vboxmanage', 'modifymedium', 'disk', src, if Run(['vboxmanage', 'modifymedium', 'disk', src, '--resize',
'--resize', str(size), '--move', fullpath]) != 0: str(size), '--move', fullpath]).returncode != 0:
raise BoxVBoxFailure(f'Resizing and moving image {dst} has ' LOG.fatal('Resizing and moving image %s has failed', dst)
f'failed') raise BoxVBoxFailure()
return fullpath return fullpath
def storageattach(self, controller_name, port, type_, image): def storageattach(self, controller_name, port, type_, image):
if subprocess.call(['vboxmanage', 'storageattach', self.name_or_uuid, if Run(['vboxmanage', 'storageattach', self.name_or_uuid,
'--storagectl', controller_name, '--storagectl', controller_name,
'--port', str(port), '--port', str(port),
'--device', '0', '--device', '0',
'--type', type_, '--type', type_,
'--medium', image]) != 0: '--medium', image]).returncode != 0:
raise BoxVBoxFailure(f'Attaching {image} to VM has failed.') LOG.fatal('Attaching %s to VM has failed.', image)
raise BoxVBoxFailure()
def poweron(self): def poweron(self):
if subprocess.call(['vboxmanage', 'startvm', self.name_or_uuid, if Run(['vboxmanage', 'startvm', self.name_or_uuid, '--type',
'--type', 'headless']) != 0: 'headless']).returncode != 0:
raise BoxVBoxFailure(f'Failed to start: {self.name_or_uuid}.') LOG.fatal('Failed to start: %s.', self.name_or_uuid)
raise BoxVBoxFailure()
def setextradata(self, key, val): def setextradata(self, key, val):
if subprocess.call(['vboxmanage', 'setextradata', self.name_or_uuid, if Run(['vboxmanage', 'setextradata', self.name_or_uuid, key,
key, val]) != 0: val]).returncode != 0:
raise BoxVBoxFailure(f'Failed to start: {self.name_or_uuid}.') LOG.fatal('Failed to start: %s.', self.name_or_uuid)
raise BoxVBoxFailure()
def add_nic(self, nic, kind): def add_nic(self, nic, kind):
if subprocess.call(['vboxmanage', 'modifyvm', self.name_or_uuid, if Run(['vboxmanage', 'modifyvm', self.name_or_uuid, f'--{nic}',
f'--{nic}', kind]) != 0: kind]).returncode != 0:
raise BoxVBoxFailure(f'Cannot modify VM "{self.name_or_uuid}".') LOG.fatal('Cannot modify VM "%s".', self.name_or_uuid)
raise BoxVBoxFailure()
def is_port_in_use(self, port): def is_port_in_use(self, port):
used_ports = self._get_defined_ports() used_ports = self._get_defined_ports()
@@ -682,15 +695,12 @@ class VBoxManage:
def _get_defined_ports(self): def _get_defined_ports(self):
self.get_vm_info() self.get_vm_info()
try: out = Run(['vboxmanage', 'list', 'vms'])
out = subprocess.check_output(['vboxmanage', 'list', 'vms'], if out.returncode != 0:
encoding=sys.getdefaultencoding(),
stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
return {} return {}
used_ports = {} used_ports = {}
for line in out.split('\n'): for line in out.stdout.split('\n'):
if not line: if not line:
continue continue
vm_name = line.split('"')[1] vm_name = line.split('"')[1]
@@ -698,15 +708,11 @@ class VBoxManage:
if self.vm_info.get('uuid') and self.vm_info['uuid'] == vm_uuid: if self.vm_info.get('uuid') and self.vm_info['uuid'] == vm_uuid:
continue continue
try: info = Run(['vboxmanage', 'showvminfo', vm_uuid])
info = (subprocess if info.returncode != 0:
.check_output(['vboxmanage', 'showvminfo', vm_uuid],
encoding=sys.getdefaultencoding(),
stderr=subprocess.DEVNULL))
except subprocess.CalledProcessError:
continue continue
for line in info.split('\n'): for line in info.stdout.split('\n'):
if line.startswith('Config file:'): if line.startswith('Config file:'):
config = line.split('Config ' 'file:')[1].strip() config = line.split('Config ' 'file:')[1].strip()
@@ -746,15 +752,15 @@ class Image:
return self.vbox.move_and_resize_image(vdi_path, disk_img, size) return self.vbox.move_and_resize_image(vdi_path, disk_img, size)
def cleanup(self): def cleanup(self):
subprocess.call(['rm', '-fr', self._tmp]) Run(['rm', '-fr', self._tmp])
def _convert_to_raw(self): def _convert_to_raw(self):
img_path = os.path.join(CACHE_DIR, self._img_fname) img_path = os.path.join(CACHE_DIR, self._img_fname)
raw_path = os.path.join(self._tmp, self._img_fname + ".raw") raw_path = os.path.join(self._tmp, self._img_fname + ".raw")
if subprocess.call(['qemu-img', 'convert', '-O', 'raw', if Run(['qemu-img', 'convert', '-O', 'raw', img_path,
img_path, raw_path]) != 0: raw_path]).returncode != 0:
raise BoxConvertionError(f'Cannot convert image {self._img_fname} ' LOG.fatal('Cannot convert image %s to RAW.', self._img_fname)
'to RAW.') raise BoxConvertionError()
def _download_image(self): def _download_image(self):
raise NotImplementedError() raise NotImplementedError()
@@ -781,9 +787,7 @@ class Ubuntu(Image):
expected_sum = None expected_sum = None
fname = os.path.join(self._tmp, self._checksum_file) fname = os.path.join(self._tmp, self._checksum_file)
# TODO: make the verbosity switch be dependent from verbosity of the Run(['wget', self._checksum_url, '-q', '-O', fname])
# script.
subprocess.call(['wget', self._checksum_url, '-q', '-O', fname])
with open(fname) as fobj: with open(fname) as fobj:
for line in fobj.readlines(): for line in fobj.readlines():
@@ -795,8 +799,8 @@ class Ubuntu(Image):
raise BoxError('Cannot find provided cloud image') raise BoxError('Cannot find provided cloud image')
if os.path.exists(os.path.join(CACHE_DIR, self._img_fname)): if os.path.exists(os.path.join(CACHE_DIR, self._img_fname)):
cmd = 'sha256sum ' + os.path.join(CACHE_DIR, self._img_fname) cmd = ['sha256sum', os.path.join(CACHE_DIR, self._img_fname)]
calulated_sum = subprocess.getoutput(cmd).split(' ')[0] calulated_sum = Run(cmd).stdout.split(' ')[0]
return calulated_sum == expected_sum return calulated_sum == expected_sum
return False return False
@@ -808,12 +812,12 @@ class Ubuntu(Image):
fname = os.path.join(CACHE_DIR, self._img_fname) fname = os.path.join(CACHE_DIR, self._img_fname)
LOG.info('Downloading image %s', self._img_fname) LOG.info('Downloading image %s', self._img_fname)
subprocess.call(['wget', '-q', self._img_url, '-O', fname]) Run(['wget', '-q', self._img_url, '-O', fname])
if not self._checksum(): if not self._checksum():
# TODO: make some retry mechanism? # TODO: make some retry mechanism?
raise BoxSysCommandError('Checksum for downloaded image differ ' LOG.fatal('Checksum for downloaded image differ from expected.')
'from expected.') raise BoxSysCommandError()
else: else:
LOG.info('Downloaded image %s', self._img_fname) LOG.info('Downloaded image %s', self._img_fname)
@@ -843,7 +847,7 @@ class Fedora(Image):
# TODO: make the verbosity switch be dependent from verbosity of the # TODO: make the verbosity switch be dependent from verbosity of the
# script. # script.
fname = os.path.join(self._tmp, self._checksum_file) fname = os.path.join(self._tmp, self._checksum_file)
subprocess.call(['wget', self._checksum_url, '-q', '-O', fname]) Run(['wget', self._checksum_url, '-q', '-O', fname])
with open(fname) as fobj: with open(fname) as fobj:
for line in fobj.readlines(): for line in fobj.readlines():
@@ -857,8 +861,8 @@ class Fedora(Image):
raise BoxError('Cannot find provided cloud image') raise BoxError('Cannot find provided cloud image')
if os.path.exists(os.path.join(CACHE_DIR, self._img_fname)): if os.path.exists(os.path.join(CACHE_DIR, self._img_fname)):
cmd = 'sha256sum ' + os.path.join(CACHE_DIR, self._img_fname) cmd = ['sha256sum', os.path.join(CACHE_DIR, self._img_fname)]
calulated_sum = subprocess.getoutput(cmd).split(' ')[0] calulated_sum = Run(cmd).stdout.split(' ')[0]
return calulated_sum == expected_sum return calulated_sum == expected_sum
return False return False
@@ -869,7 +873,7 @@ class Fedora(Image):
return return
fname = os.path.join(CACHE_DIR, self._img_fname) fname = os.path.join(CACHE_DIR, self._img_fname)
subprocess.call(['wget', '-q', self._img_url, '-O', fname]) Run(['wget', '-q', self._img_url, '-O', fname])
if not self._checksum(): if not self._checksum():
# TODO: make some retry mechanism? # TODO: make some retry mechanism?
@@ -910,7 +914,7 @@ class IsoImage:
return os.path.join(self._tmp, CLOUD_IMAGE) return os.path.join(self._tmp, CLOUD_IMAGE)
def cleanup(self): def cleanup(self):
subprocess.call(['rm', '-fr', self._tmp]) Run(['rm', '-fr', self._tmp])
def _create_cloud_image(self): def _create_cloud_image(self):
# meta-data # meta-data
@@ -926,12 +930,12 @@ class IsoImage:
mkiso = 'mkisofs' if shutil.which('mkisofs') else 'genisoimage' mkiso = 'mkisofs' if shutil.which('mkisofs') else 'genisoimage'
# create ISO image # create ISO image
if subprocess.call([mkiso, '-J', '-R', '-V', 'cidata', '-o', if Run([mkiso, '-J', '-R', '-V', 'cidata', '-o',
os.path.join(self._tmp, CLOUD_IMAGE), os.path.join(self._tmp, CLOUD_IMAGE),
os.path.join(self._tmp, 'user-data'), os.path.join(self._tmp, 'user-data'),
os.path.join(self._tmp, 'meta-data')]) != 0: os.path.join(self._tmp, 'meta-data')]).returncode != 0:
raise BoxSysCommandError('Cannot create ISO image for config ' LOG.fatal('Cannot create ISO image for config drive')
'drive') raise BoxSysCommandError()
LOG = FakeLogger(colors=True) LOG = FakeLogger(colors=True)
@@ -950,6 +954,7 @@ def vmcreate(args, conf=None):
return 11 return 11
if not vbox.create(conf.cpus, conf.memory, conf.port): if not vbox.create(conf.cpus, conf.memory, conf.port):
LOG.fatal('Error creating virtual machine: %s', conf.name)
return 10 return 10
vbox.create_controller('IDE', 'ide') vbox.create_controller('IDE', 'ide')
@@ -1026,7 +1031,7 @@ def vmdestroy(args):
def vmlist(args): def vmlist(args):
VBoxManage().vmlist(args.running, args.long) print(VBoxManage().vmlist(args.running, args.long).strip())
return 0 return 0
@@ -1058,16 +1063,11 @@ def shell_completion(args):
def connect(args): def connect(args):
vbox = VBoxManage(args.name) vbox = VBoxManage(args.name)
conf = Config(args, vbox) conf = Config(args, vbox)
try: return Run(['ssh', '-o', 'StrictHostKeyChecking=no',
subprocess.call(['ssh', '-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null', '-o', 'UserKnownHostsFile=/dev/null',
'-i', conf.ssh_key_path[:-4], '-i', conf.ssh_key_path[:-4],
f'ssh://{DISTROS[conf.distro]["username"]}' f'ssh://{DISTROS[conf.distro]["username"]}'
f'@localhost:{conf.port}']) f'@localhost:{conf.port}'], False).returncode
except subprocess.CalledProcessError:
return None
return 0
def main(): def main():