#! /usr/bin/env python """ adbfs Virtual filesystem for Midnight Commander * Copyright (c) 2015, Roman Dobosz, * Published under 3-clause BSD-style license (see LICENSE file) Version: 0.7 """ from datetime import datetime import os import pipes import re import subprocess import sys DEBUG = os.getenv("ADBFS_DEBUG", False) SKIP_SYSTEM_DIR = os.getenv("ADBFS_SKIP_SYSTEM_DIR", True) BOX = os.getenv("ADBFS_BOX", "busybox") class File(object): """Item in filesystem representation""" def __init__(self, perms=None, links=1, uid=0, gid=0, size=None, date_time=None, date=None, name=None): """initialize file""" self.perms = perms self.links = links self.uid = uid self.gid = gid self.size = size if size else 0 self.date_time = date_time # as string self.name = name self.date = date # as string self.dirname = "" self.type = None self.string = None self.link_target = None self.filepath = None def _correct_link(self): """Canonize filename and fill the link attr""" try: name, target = self.name.split(" -> ") except ValueError: return self.name = name if not self.size: self.size = 0 if target.startswith("/"): self.link_target = target else: self.link_target = os.path.abspath(os.path.join(self.dirname, target)) def update(self, dirname): """update object fields""" month_num = {"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6, "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12} self.dirname = dirname if self.date_time: date = self.date_time.split() date = "%s-%02d-%s %s" % (date[1], month_num[date[0]], date[3], date[2]) date = datetime.strptime(date, "%d-%m-%Y %H:%M:%S") elif self.date: date = datetime.strptime(self.date, "%Y-%m-%d %H:%M") self.date_time = date.strftime("%m/%d/%Y %H:%M:01") self.type = self.perms[0] if self.perms else None if self.type == "l" and " -> " in self.name: self._correct_link() self.filepath = os.path.join(self.dirname, self.name) def mk_link_relative(self, target_type): """Convert links to relative""" self.link_target = os.path.relpath(self.link_target, self.dirname) def __repr__(self): """represent the file/entire node""" fullname = os.path.join(self.dirname, self.name) if self.link_target: fullname += " -> " + self.link_target return "".format(type=self.type, name=fullname, id=hex(id(self))) def __str__(self): """display the file/entire node""" template = ("{perms} {links:>4} {uid:<8} {gid:<8} {size:>8} " "{date_time} {fullname}\n") if not self.name: return "" fullname = os.path.join(self.dirname, self.name) if self.link_target: fullname += " -> " + self.link_target return template.format(perms=self.perms, links=self.links, uid=self.uid, gid=self.gid, size=self.size, date_time=self.date_time, fullname=fullname) class Adb(object): """Class for interact with android rooted device through adb""" dirs_to_skip = ["acct", "charger", "d", "dev", "proc", "sys"] file_re = None current_re = re.compile(r"^(\./)?(?P.+):$") as_root = os.getenv("ADBFS_AS_ROOT", False) verbose = os.getenv("ADBFS_VERBOSE", False) boxes = {'busybox': {'ls': 'busybox ls -anel', 'rls': 'busybox ls -Ranel {}', 'file_re': r'^(?P[-bcdlps][-rwxsStT]{9})\s+' r'(?P\d+)\s' r'(?P\d+)\s+' r'(?P\d+)\s+' r'(?P\d+)\s[A-Z,a-z]{3}\s' r'(?P[A-Z,a-z]{3}\s+' r'\d+\s\d{2}:\d{2}:\d{2}\s+\d{4})\s' r'(?P.*)'}, 'toolbox': {'ls': 'toolbox ls -anl', 'rls': 'toolbox ls -Ranl {}', 'file_re': r'^(?P[-bcdlps][-rwxsStT]{9})\s+' r'(?P\d+)\s+' r'(?P\d+)\s+' r'(?P\d+)?\s' r'(?P\d{4}-\d{2}-\d{2}\s' r'\d{2}:\d{2})\s' r'(?P.*)'} } def __init__(self): """Prepare archive content for operations""" super(Adb, self).__init__() self._entries = [] self._links = {} self.__su_check() self.__get_the_box() self.file_re = re.compile(self.box['file_re']) def __get_the_box(self): """Detect if we dealing with busybox or toolbox""" result = subprocess.check_output('adb shell which busybox'.split()) if 'busybox' in result: self.box = Adb.boxes['busybox'] return result = subprocess.check_output('adb shell which toolbox'.split()) if 'toolbox' in result: self.box = Adb.boxes['toolbox'] return print "There is no toolbox or busybox available" if DEBUG else None sys.exit(100) def __su_check(self): """Check if we are able to get elevated privileges""" try: result = subprocess.check_output('adb shell su -c whoami'.split()) except: sys.stderr.write("Unable to get to the device") sys.exit(102) if 'root' in result: return print "Unable to get root privileges on device" if DEBUG else None sys.exit(101) def _find_target(self, needle): """Find link target""" if needle in self._links: elem = self._links[needle] target = os.path.abspath(os.path.join(elem.dirname, elem.link_target)) return self._find_target(target) for entry in self._entries: if entry.filepath == needle: return entry return None def _normalize_links(self): """ There might be a case of a chain of linked files, like: /foo -> /mnt/foo /bar -> /foo If one want to follow such 'bar' link - MC in extfs mode will fail to figure out the right target. This helper will correct the thing. """ elems_to_remove = [] for entry in self._links.values(): target_entry = self._find_target(entry.link_target) if target_entry: entry.link_target = target_entry.filepath entry.mk_link_relative(target_entry.type) else: elems_to_remove.append(self._entries.index(entry)) for idx in sorted(elems_to_remove, reverse=True): del self._entries[idx] def _retrieve_file_list(self, root=None): """Retrieve file list using adb""" command = ["adb", "shell", "su", "-c"] if not root: command.append(self.box['ls']) else: command.append(self.box['rls'].format(root.filepath)) try: if DEBUG: print "executing", " ".join(command) lines = subprocess.check_output(command) except subprocess.CalledProcessError: sys.stderr.write("Cannot read directory. Is device connected?\n") return 1 current_dir = root.dirname if root else "/" for line in lines.split("\n"): line = line.strip() current_dir_re = self.current_re.match(line) if current_dir_re: current_dir = current_dir_re.groupdict()["dir"] if not current_dir: current_dir = "/" continue reg_match = self.file_re.match(line) if not reg_match: continue entry = File(**reg_match.groupdict()) if entry.name in (".", ".."): continue if SKIP_SYSTEM_DIR and entry.name in Adb.dirs_to_skip: continue entry.update(current_dir) self._entries.append(entry) if root is None and entry.type == "d": self._retrieve_file_list(entry) if entry.type == "l": self._links[entry.filepath] = entry def run(self, fname): """Not supported""" sys.stderr.write("Not supported - or maybe you are on compatible " "architecture?\n") return 1 def list(self): """Output list contents directory""" self._retrieve_file_list() self._normalize_links() # with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), # # "list.pcl"), "w") as fob: # "list.pcl")) as fob: # import cPickle # # cPickle.dump(self._entries, fob) # self._entries = cPickle.load(fob) sys.stdout.write("".join([str(entry) for entry in self._entries])) return 0 def copyout(self, src, dst): """Copy file form the device using adb.""" # cmd = ["adb", "pull", pipes.quote(src), pipes.quote(dst)] cmd = ["adb", "pull", src, dst] if DEBUG: sys.stderr.write(" ".join(cmd) + "\n") with open(os.devnull, "w") as fnull: err = subprocess.call(cmd, stdout=fnull, stderr=fnull) return err def copyin(self, src, dst): """Copy file to the device through adb.""" if not dst.startswith("/"): dst = "/" + dst # cmd = ["adb", "push", pipes.quote(src), pipes.quote(dst)] cmd = ["adb", "push", src, dst] if DEBUG: sys.stderr.write(" ".join(cmd) + "\n") with open(os.devnull, "w") as fnull: err = subprocess.call(cmd, stdout=fnull, stderr=fnull) if err != 0: sys.stderr.write("Cannot push the file, " "%s, error %d" % (dst, err)) return 1 return 0 def rm(self, dst): """Remove file from device.""" cmd = ["adb", "shell", "rm", pipes.quote(dst)] err = subprocess.check_output(cmd) if err != "": sys.stderr.write(err) return 1 return 0 def rmdir(self, dst): """Remove directory from device.""" cmd = ["adb", "shell", "rm", "-r", pipes.quote(dst)] err = subprocess.check_output(cmd) if err != "": sys.stderr.write(err) return 1 return 0 def mkdir(self, dst): """Make directory on the device through adb.""" cmd = ["adb", "shell", "mkdir", pipes.quote(dst)] err = subprocess.check_output(cmd) if err != "": sys.stderr.write(err) return 1 return 0 CALL_MAP = {'list': lambda a: Adb().list(), 'copyin': lambda a: Adb().copyin(a.src, a.dst), 'copyout': lambda a: Adb().copyout(a.src, a.dst), 'mkdir': lambda a: Adb().mkdir(a.dst), 'rmdir': lambda a: Adb().rmdir(a.dst), 'rm': lambda a: Adb().rm(a.dst), 'run': lambda a: Adb().run(a.dst)} def main(): """parse commandline""" try: if DEBUG: sys.stderr.write("commandline: %s\n" % " ".join(sys.argv)) if sys.argv[1] not in ('list', 'copyin', 'copyout', 'rm', "rmdir", 'mkdir', "run"): print "missing argument" if DEBUG else None sys.exit(2) except IndexError: sys.exit(2) class Arg(object): """Mimic argparse/optparse object""" dst = None src = None arch = None arg = Arg() try: arg.arch = sys.argv[2] if sys.argv[1] == 'copyin': arg.src = sys.argv[4] arg.dst = sys.argv[3] if sys.argv[1] == 'copyout': arg.src = sys.argv[3] arg.dst = sys.argv[4] elif sys.argv[1] in ('rm', 'rmdir', 'run', 'mkdir'): arg.dst = sys.argv[3] except IndexError: if DEBUG: print "there is a problem with identifying command and its args" print "current args:", sys.argv sys.exit(2) return CALL_MAP[sys.argv[1]](arg) if __name__ == "__main__": sys.exit(main())