#! /usr/bin/env python """ adbfs Virtual filesystem for Midnight Commander * Copyright (c) 2015, Roman Dobosz, * Published under 3-clause BSD-style license (see LICENSE file) """ from datetime import datetime import subprocess import os import re import sys class Adb(object): """Class for interact with android rooted device through adb""" adb = "/opt/android-sdk-update-manager/platform-tools/adb" skip_system_dir = os.getenv("ADBFS_SKIP_SYSTEM_DIR", True) dirs_to_skip = ["acct", "charger", "d", "dev", "proc", "sys"] file_re = re.compile(r'^(?P[-bcdlps][-rwxsStT]{9})\s+' r'(?P\d+)\s' r'(?P\d+)\s+' r'(?P\d+)\s+' r'(?P\d+)\s[A-Z,a-z]{3}\s' r'(?P[A-Z,a-z]{3}\s+' r'\d+\s\d{2}:\d{2}:\d{2}\s+\d{4})\s' r'(?P.*)') current_re = re.compile(r"^(\./)?(?P.+):$") as_root = os.getenv("ADBFS_AS_ROOT", False) verbose = os.getenv("ADBFS_VERBOSE", False) def __init__(self): """Prepare archive content for operations""" super(Adb, self).__init__() self._entries = [] def _correct_entry(self, entry, current_dir): """parse date string, append current_dir to the entry""" month_num = {"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6, "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12} entry["dir"] = current_dir entry["fullname"] = os.path.join(current_dir, entry['name']) date = entry["datetime"].split() date = "%s-%02d-%s %s" % (date[1], month_num[date[0]], date[3], date[2]) date = datetime.strptime(date, "%d-%m-%Y %H:%M:%S") entry["datetime"] = date.strftime("%m/%d/%Y %H:%M:01") def _mk_rel_links(self, entry): """Convert links to relative, if needed""" fname, target = entry['name'].split(" -> ") if not target.startswith("/"): return dir_ = entry["dir"] if entry["dir"] else "/" target = os.path.relpath(os.path.join(dir_, target), dir_) entry['name'] = fname + " -> " + target def _find_target(self, needle): """Find link target""" for entry in self._entries: if ' -> ' in entry["name"] and entry['perms'].startswith("l"): fullname, target = entry["fullname"].split(" -> ") if fullname == needle: dir_ = entry["dir"] if entry["dir"] else "/" target = os.path.join(os.path.join(dir_, target), dir_) target = os.path.abspath(target) return self._find_target(target) else: if entry['fullname'] == needle: return entry['fullname'] return None def _normalize_links(self): """ There might be a case of a chain of linked files, like: /foo -> /mnt/foo /bar -> /foo If one want to follow such 'bar' link - MC in extfs mode will fail to figure out the right target. This helper will correct the thing and remove dead links. """ elems_to_rm = [] for entry in self._entries: if not entry["perms"].startswith("l"): continue fname, target = entry['name'].split(" -> ") target = self._find_target(target) if not target: elems_to_rm.append(self._entries.index(entry)) continue entry['name'] = fname + " -> " + target self._mk_rel_links(entry) for idx in sorted(elems_to_rm, reverse=True): del self._entries[idx] def _retrieve_file_list(self, root=None): """Retrieve file list using adb""" command = [Adb.adb, "shell", "su", "-c"] if not root: command.append("'busybox ls -anel'") else: command.append("'busybox ls -Ranel {dir}/{name}'".format(**root)) lines = subprocess.check_output(command) current_dir = root["dir"] if root else "" for line in lines.split("\n"): line = line.strip() current_dir_re = self.current_re.match(line) if current_dir_re: current_dir = current_dir_re.groupdict()["dir"] continue reg_match = self.file_re.match(line) if not reg_match: continue entry = reg_match.groupdict() if entry["name"] in (".", ".."): continue if Adb.skip_system_dir and entry['name'] in Adb.dirs_to_skip: continue self._correct_entry(entry, current_dir) entry["str"] = ("{perms} {links:>4} {uid:<8} {gid:<8} {size:>8} " "{datetime} {dir}/{name}\n".format(**entry)) self._entries.append(entry) if root is None and entry["perms"].startswith("d"): self._retrieve_file_list(entry) self._normalize_links() def run(self, fname): """Not supported""" sys.stderr.write("Not supported - or maybe you are on compatible " "architecture?\n") return 1 def list(self): """Output list contents directory""" self._retrieve_file_list() sys.stdout.write("".join([entry["str"] for entry in self._entries])) return 0 def copyout(self, src, dst): """Copy file form the device using adb.""" with open(os.devnull, "w") as fnull: return subprocess.call([Adb.adb, "pull", src, dst], stdout=fnull, stderr=fnull) def copyin(self, src, dst): """Copy file to the device through adb.""" if not dst.startswith("/"): dst = "/" + dst with open(os.devnull, "w") as fnull: err = subprocess.call([Adb.adb, "push", src, dst], stdout=fnull, stderr=fnull) if err != 0: sys.stderr.write("Cannot push the file, " "%s, error %d" % (dst, err)) return 1 return 0 def rm(self, dst): """Remove file from device.""" cmd = [Adb.adb, "shell", "rm", dst] err = subprocess.check_output(cmd) if err != "": sys.stderr.write(err) return 1 return 0 def rmdir(self, dst): """Remove directory from device.""" cmd = [Adb.adb, "shell", "rm", "-r", dst] err = subprocess.check_output(cmd) if err != "": sys.stderr.write(err) return 1 return 0 def mkdir(self, dst): """Make directory on the device through adb.""" cmd = [Adb.adb, "shell", "mkdir", dst] err = subprocess.check_output(cmd) if err != "": sys.stderr.write(err) return 1 return 0 CALL_MAP = {'list': lambda a: Adb().list(), 'copyin': lambda a: Adb().copyin(a.src, a.dst), 'copyout': lambda a: Adb().copyout(a.src, a.dst), 'mkdir': lambda a: Adb().mkdir(a.dst), 'rmdir': lambda a: Adb().rmdir(a.dst), 'rm': lambda a: Adb().rm(a.dst), 'run': lambda a: Adb().run(a.dst)} def main(): """parse commandline""" try: sys.stderr.write("comnandline: %s\n" % " ".join(sys.argv)) if sys.argv[1] not in ('list', 'copyin', 'copyout', 'rm', "rmdir", 'mkdir', "run"): sys.exit(2) except IndexError: sys.exit(2) class Arg(object): """Mimic argparse/optparse object""" dst = None src = None arch = None arg = Arg() try: arg.arch = sys.argv[2] if sys.argv[1] == 'copyin': arg.src = sys.argv[4] arg.dst = sys.argv[3] if sys.argv[1] == 'copyout': arg.src = sys.argv[3] arg.dst = sys.argv[4] elif sys.argv[1] in ('rm', 'rmdir', 'run', 'mkdir'): arg.dst = sys.argv[3] except IndexError: sys.exit(2) return CALL_MAP[sys.argv[1]](arg) if __name__ == "__main__": sys.exit(main())