1
0
mirror of https://github.com/gryf/mc_adbfs.git synced 2025-12-19 20:38:00 +01:00

Fixed error handling, replacing manual commandline parsing with argparse

This commit is contained in:
2016-06-04 18:02:33 +02:00
parent d1e8f42429
commit de5793c672

153
adbfs
View File

@@ -14,6 +14,7 @@ import pipes
import re import re
import subprocess import subprocess
import sys import sys
import argparse
DEBUG = os.getenv("ADBFS_DEBUG", False) DEBUG = os.getenv("ADBFS_DEBUG", False)
@@ -159,41 +160,60 @@ class Adb(object):
def __init__(self): def __init__(self):
"""Prepare archive content for operations""" """Prepare archive content for operations"""
super(Adb, self).__init__() super(Adb, self).__init__()
self.error = ''
self.box = None
self._entries = [] self._entries = []
self._links = {} self._links = {}
self._got_root = False
self.__su_check() self.__su_check()
self.__get_the_box() self.__get_the_box()
self.file_re = re.compile(self.box['file_re'])
def __get_the_box(self): def __get_the_box(self):
"""Detect if we dealing with busybox or toolbox""" """Detect if we dealing with busybox or toolbox"""
result = subprocess.check_output('adb shell which busybox'.split())
try:
with open(os.devnull, "w") as fnull:
result = subprocess.check_output('adb shell which '
'busybox'.split(),
stderr=fnull)
if 'busybox' in result: if 'busybox' in result:
self.box = Adb.boxes['busybox'] self.box = Adb.boxes['busybox']
self.file_re = re.compile(self.box['file_re'])
return return
except subprocess.CalledProcessError:
pass
try:
with open(os.devnull, "w") as fnull:
result = subprocess.check_output('adb shell which '
'toolbox'.split(),
stderr=fnull)
result = subprocess.check_output('adb shell which toolbox'.split())
if 'toolbox' in result: if 'toolbox' in result:
self.box = Adb.boxes['toolbox'] self.box = Adb.boxes['toolbox']
self.file_re = re.compile(self.box['file_re'])
return return
except subprocess.CalledProcessError:
pass
print "There is no toolbox or busybox available" if DEBUG else None self.error = "There is no toolbox or busybox available"
sys.exit(100)
def __su_check(self): def __su_check(self):
"""Check if we are able to get elevated privileges""" """Check if we are able to get elevated privileges"""
try: try:
result = subprocess.check_output('adb shell su -c whoami'.split()) with open(os.devnull, "w") as fnull:
except: result = subprocess.check_output('adb shell su -c '
sys.stderr.write("Unable to get to the device") 'whoami'.split(),
sys.exit(102) stderr=fnull)
if 'root' in result: except subprocess.CalledProcessError:
return return
print "Unable to get root privileges on device" if DEBUG else None if 'root' in result:
sys.exit(101) self._got_root = True
return
def _find_target(self, needle): def _find_target(self, needle):
"""Find link target""" """Find link target"""
@@ -287,6 +307,10 @@ class Adb(object):
def list(self): def list(self):
"""Output list contents directory""" """Output list contents directory"""
if self.error:
sys.stderr.write(self.error)
return 1
self._retrieve_file_list() self._retrieve_file_list()
self._normalize_links() self._normalize_links()
# with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), # with open(os.path.join(os.path.dirname(os.path.realpath(__file__)),
@@ -300,18 +324,28 @@ class Adb(object):
def copyout(self, src, dst): def copyout(self, src, dst):
"""Copy file form the device using adb.""" """Copy file form the device using adb."""
# cmd = ["adb", "pull", pipes.quote(src), pipes.quote(dst)] if self.error:
sys.stderr.write(self.error)
return 1
cmd = ["adb", "pull", src, dst] cmd = ["adb", "pull", src, dst]
if DEBUG: if DEBUG:
sys.stderr.write(" ".join(cmd) + "\n") sys.stderr.write(" ".join(cmd) + "\n")
with open(os.devnull, "w") as fnull: with open(os.devnull, "w") as fnull:
try:
err = subprocess.call(cmd, stdout=fnull, stderr=fnull) err = subprocess.call(cmd, stdout=fnull, stderr=fnull)
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 1
return err return err
def copyin(self, src, dst): def copyin(self, src, dst):
"""Copy file to the device through adb.""" """Copy file to the device through adb."""
if self.error:
sys.stderr.write(self.error)
return 1
if not dst.startswith("/"): if not dst.startswith("/"):
dst = "/" + dst dst = "/" + dst
@@ -321,7 +355,11 @@ class Adb(object):
sys.stderr.write(" ".join(cmd) + "\n") sys.stderr.write(" ".join(cmd) + "\n")
with open(os.devnull, "w") as fnull: with open(os.devnull, "w") as fnull:
try:
err = subprocess.call(cmd, stdout=fnull, stderr=fnull) err = subprocess.call(cmd, stdout=fnull, stderr=fnull)
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 1
if err != 0: if err != 0:
sys.stderr.write("Cannot push the file, " sys.stderr.write("Cannot push the file, "
@@ -331,8 +369,16 @@ class Adb(object):
def rm(self, dst): def rm(self, dst):
"""Remove file from device.""" """Remove file from device."""
if self.error:
sys.stderr.write(self.error)
return 1
cmd = ["adb", "shell", "rm", dst] cmd = ["adb", "shell", "rm", dst]
try:
err = subprocess.check_output(cmd) err = subprocess.check_output(cmd)
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 1
if err != "": if err != "":
sys.stderr.write(err) sys.stderr.write(err)
@@ -341,8 +387,16 @@ class Adb(object):
def rmdir(self, dst): def rmdir(self, dst):
"""Remove directory from device.""" """Remove directory from device."""
if self.error:
sys.stderr.write(self.error)
return 1
cmd = ["adb", "shell", "rm", "-r", dst] cmd = ["adb", "shell", "rm", "-r", dst]
try:
err = subprocess.check_output(cmd) err = subprocess.check_output(cmd)
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 1
if err != "": if err != "":
sys.stderr.write(err) sys.stderr.write(err)
@@ -351,9 +405,16 @@ class Adb(object):
def mkdir(self, dst): def mkdir(self, dst):
"""Make directory on the device through adb.""" """Make directory on the device through adb."""
if self.error:
sys.stderr.write(self.error)
return 1
cmd = ["adb", "shell", "mkdir", dst] cmd = ["adb", "shell", "mkdir", dst]
try:
err = subprocess.check_output(cmd) err = subprocess.check_output(cmd)
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 1
if err != "": if err != "":
sys.stderr.write(err) sys.stderr.write(err)
@@ -369,44 +430,46 @@ CALL_MAP = {'list': lambda a: Adb().list(),
'rm': lambda a: Adb().rm(a.dst), 'rm': lambda a: Adb().rm(a.dst),
'run': lambda a: Adb().run(a.dst)} 'run': lambda a: Adb().run(a.dst)}
def main(): def main():
"""parse commandline""" """parse commandline"""
try: parser = argparse.ArgumentParser()
if DEBUG: subparsers = parser.add_subparsers(help='supported commands')
sys.stderr.write("commandline: %s\n" % " ".join(sys.argv)) parser_list = subparsers.add_parser('list')
if sys.argv[1] not in ('list', 'copyin', 'copyout', 'rm', "rmdir", parser_copyin = subparsers.add_parser('copyin')
'mkdir', "run"): parser_copyout = subparsers.add_parser('copyout')
print "missing argument" if DEBUG else None parser_rm = subparsers.add_parser('rm')
sys.exit(2) parser_mkdir = subparsers.add_parser('mkdir')
except IndexError: parser_run = subparsers.add_parser('run')
sys.exit(2)
class Arg(object): parser_list.add_argument('arch')
"""Mimic argparse/optparse object""" parser_list.set_defaults(func=CALL_MAP['list'])
dst = None
src = None
arch = None
arg = Arg() parser_copyin.add_argument('arch')
parser_copyin.add_argument('src')
parser_copyin.add_argument('dst')
parser_copyin.set_defaults(func=CALL_MAP['copyin'])
try: parser_copyout.add_argument('arch')
arg.arch = sys.argv[2] parser_copyout.add_argument('src')
if sys.argv[1] == 'copyin': parser_copyout.add_argument('dst')
arg.src = sys.argv[4] parser_copyout.set_defaults(func=CALL_MAP['copyout'])
arg.dst = sys.argv[3]
if sys.argv[1] == 'copyout': parser_rm.add_argument('arch')
arg.src = sys.argv[3] parser_rm.add_argument('dst')
arg.dst = sys.argv[4] parser_rm.set_defaults(func=CALL_MAP['rm'])
elif sys.argv[1] in ('rm', 'rmdir', 'run', 'mkdir'):
arg.dst = sys.argv[3] parser_mkdir.add_argument('arch')
except IndexError: parser_mkdir.add_argument('dst')
if DEBUG: parser_mkdir.set_defaults(func=CALL_MAP['mkdir'])
print "there is a problem with identifying command and its args"
print "current args:", sys.argv parser_run.add_argument('arch')
sys.exit(2) parser_run.add_argument('dst')
parser_run.set_defaults(func=CALL_MAP['run'])
args = parser.parse_args()
return args.func(args)
return CALL_MAP[sys.argv[1]](arg)
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(main()) sys.exit(main())