#! /usr/bin/env python """ adbfs Virtual filesystem for Midnight Commander * Copyright (c) 2016, Roman Dobosz, * Published under 3-clause BSD-style license (see LICENSE file) """ import ConfigParser import argparse from datetime import datetime import errno import json import os import re import subprocess import sys __version__ = 0.9 XDG_CONFIG_HOME = os.getenv('XDG_CONFIG_HOME', os.path.expanduser('~/.config')) class NoBoxFoundException(OSError): """ Exception raised in case of not found either toolbox or busybox on remote filesystem accessed via adb """ pass class Conf(object): """Simple config parser""" boxes = {'busybox': {'ls': 'busybox ls -anel', 'rls': 'busybox ls -Ranel {}', 'file_re': r'^(?P[-bcdlps][-rwxsStT]{9})\s+' r'(?P\d+)\s' r'(?P\d+)\s+' r'(?P\d+)\s+' r'(?P\d+)\s[A-Z,a-z]{3}\s' r'(?P[A-Z,a-z]{3}\s+' r'\d+\s\d{2}:\d{2}:\d{2}\s+\d{4})\s' r'(?P.*)'}, 'toolbox': {'ls': 'toolbox ls -anl', 'rls': 'toolbox ls -Ranl {}', 'file_re': r'^(?P[-bcdlps][-rwxsStT]{9})\s+' r'(?P\d+)\s+' r'(?P\d+)\s+' r'(?P\d+)?\s' r'(?P\d{4}-\d{2}-\d{2}\s' r'\d{2}:\d{2})\s' r'(?P.*)'}} def __init__(self): self.box = None self.debug = False self.dirs_to_skip = ['acct', 'charger', 'd', 'dev', 'proc', 'sys'] self.root = None self.suppress_colors = False self.read() self.get_the_box() def get_the_box(self): """Detect if we dealing with busybox or toolbox""" try: with open(os.devnull, 'w') as fnull: result = subprocess.check_output('adb shell which ' 'busybox'.split(), stderr=fnull) if 'busybox' in result: self.box = Conf.boxes['busybox'] if self.suppress_colors: self.box.update({'ls': 'busybox ls --color=none -anel', 'rls': 'busybox ls --color=none ' '-Ranel {}'}) Adb.file_re = re.compile(self.box['file_re']) return except subprocess.CalledProcessError: pass try: with open(os.devnull, 'w') as fnull: result = subprocess.check_output('adb shell which ' 'toolbox'.split(), stderr=fnull) if 'toolbox' in result: self.box = Conf.boxes['toolbox'] Adb.file_re = re.compile(self.box['file_re']) return except subprocess.CalledProcessError: pass raise NoBoxFoundException(errno.ENOENT, 'There is no toolbox or busybox available') def read(self): """ Read config file and change the options according to values from that file. """ if not os.path.exists(XDG_CONFIG_HOME): return conf_fname = os.path.join(XDG_CONFIG_HOME, 'mc', 'adbfs.ini') if not os.path.exists(conf_fname): return cfg = ConfigParser.SafeConfigParser() cfg_map = {'debug': (cfg.getboolean, 'debug'), 'dirs_to_skip': (cfg.get, 'dirs_to_skip'), 'suppress_colors': (cfg.get, 'suppress_colors'), 'root': (cfg.get, 'root')} cfg.read(conf_fname) for key, (function, attr) in cfg_map.items(): try: setattr(self, attr, function('adbfs', key)) except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): pass if self.dirs_to_skip and isinstance(self.dirs_to_skip, str): self.dirs_to_skip = json.loads(self.dirs_to_skip, encoding='ascii') self.dirs_to_skip = [x.encode('utf-8') for x in self.dirs_to_skip] else: self.dirs_to_skip = [] class File(object): """Item in filesystem representation""" def __init__(self, perms=None, links=1, uid=0, gid=0, size=None, date_time=None, date=None, name=None): """initialize file""" self.perms = perms self.links = links self.uid = uid self.gid = gid self.size = size if size else 0 self.date_time = date_time # as string self.name = name self.date = date # as string self.dirname = '' self.type = None self.string = None self.link_target = None self.filepath = None def _correct_link(self): """Canonize filename and fill the link attr""" try: name, target = self.name.split(' -> ') except ValueError: return self.name = name if not self.size: self.size = 0 if target.startswith('/'): self.link_target = target else: self.link_target = os.path.abspath(os.path.join(self.dirname, target)) def update(self, dirname): """update object fields""" month_num = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12} self.dirname = dirname if self.date_time: date = self.date_time.split() date = '%s-%02d-%s %s' % (date[1], month_num[date[0]], date[3], date[2]) date = datetime.strptime(date, '%d-%m-%Y %H:%M:%S') elif self.date: date = datetime.strptime(self.date, '%Y-%m-%d %H:%M') self.date_time = date.strftime('%m/%d/%Y %H:%M:01') self.type = self.perms[0] if self.perms else None if self.type == 'l' and ' -> ' in self.name: self._correct_link() self.filepath = os.path.join(self.dirname, self.name) def mk_link_relative(self): """Convert links to relative""" self.link_target = os.path.relpath(self.link_target, self.dirname) def __repr__(self): """represent the file/entire node""" fullname = os.path.join(self.dirname, self.name) if self.link_target: fullname += ' -> ' + self.link_target return ''.format(type=self.type, name=fullname, id=hex(id(self))) def __str__(self): """display the file/entire node""" template = ('{perms} {links:>4} {uid:<8} {gid:<8} {size:>8} ' '{date_time} {fullname}\n') if not self.name: return '' fullname = os.path.join(self.dirname, self.name) if self.link_target: fullname += ' -> ' + self.link_target return template.format(perms=self.perms, links=self.links, uid=self.uid, gid=self.gid, size=self.size, date_time=self.date_time, fullname=fullname) class Adb(object): """Class for interact with android rooted device through adb""" file_re = None current_re = re.compile(r'^(\./)?(?P.+):$') def __init__(self): """Prepare archive content for operations""" super(Adb, self).__init__() self.conf = Conf() self.error = '' self._entries = [] self._links = {} self._got_root = False self.__su_check() def __su_check(self): """Check if we are able to get elevated privileges""" try: with open(os.devnull, 'w') as fnull: result = subprocess.check_output('adb shell su -c ' 'whoami'.split(), stderr=fnull) except subprocess.CalledProcessError: return if 'root' in result: self._got_root = True def _find_target(self, needle): """Find link target""" if needle in self._links: elem = self._links[needle] target = os.path.abspath(os.path.join(elem.dirname, elem.link_target)) return self._find_target(target) for entry in self._entries: if entry.filepath == needle: return entry return None def _normalize_links(self): """ There might be a case of a chain of linked files, like: /foo -> /mnt/foo /bar -> /foo If one want to follow such 'bar' link - MC in extfs mode will fail to figure out the right target. This helper will correct the thing. """ elems_to_remove = [] for entry in self._links.values(): target_entry = self._find_target(entry.link_target) if target_entry: entry.link_target = target_entry.filepath entry.mk_link_relative() else: elems_to_remove.append(self._entries.index(entry)) for idx in sorted(elems_to_remove, reverse=True): del self._entries[idx] def _retrieve_single_dir_list(self, dir_): """Retrieve file list using adb""" if self._got_root: command = ['adb', 'shell', 'su', '-c', self.conf.box['rls'].format(dir_)] else: command = ['adb', 'shell'] command += self.conf.box['rls'].format(dir_).split(' ') try: if self.conf.debug: print 'executing', ' '.join(command) lines = subprocess.check_output(command) except subprocess.CalledProcessError: sys.stderr.write('Cannot read directory. Is device connected?\n') return 1 lines = [l.strip() for l in lines.split('\n') if l.strip()] if len(lines) == 1: reg_match = self.file_re.match(lines[0]) entry = File(**reg_match.groupdict()) entry.update('/') if entry.filepath in self.conf.dirs_to_skip: return self._entries.append(entry) if entry.type == 'l': self._links[entry.filepath] = entry self._retrieve_single_dir_list(entry.link_target) else: for line in lines: current_dir_re = self.current_re.match(line) if current_dir_re: current_dir = current_dir_re.groupdict()['dir'] if not current_dir: current_dir = '/' continue reg_match = self.file_re.match(line) if not reg_match: continue entry = File(**reg_match.groupdict()) if entry.name in ('.', '..'): continue entry.update(current_dir) if entry.filepath in self.conf.dirs_to_skip: continue self._entries.append(entry) if entry.type == 'l': self._links[entry.filepath] = entry def _retrieve_file_list(self, root=None): """Retrieve file list using adb""" if not root: lscmd = self.conf.box['ls'] else: lscmd = self.conf.box['rls'].format(root.filepath) if self._got_root: command = ['adb', 'shell', 'su', '-c', lscmd] else: command = ['adb', 'shell'] + lscmd.split() try: if self.conf.debug: print 'executing', ' '.join(command) lines = subprocess.check_output(command) except subprocess.CalledProcessError: sys.stderr.write('Cannot read directory. Is device connected?\n') return 1 current_dir = root.dirname if root else '/' for line in lines.split('\n'): line = line.strip() current_dir_re = self.current_re.match(line) if current_dir_re: current_dir = current_dir_re.groupdict()['dir'] if not current_dir: current_dir = '/' continue reg_match = self.file_re.match(line) if not reg_match: continue entry = File(**reg_match.groupdict()) if entry.name in ('.', '..'): continue entry.update(current_dir) if entry.filepath in self.conf.dirs_to_skip: continue self._entries.append(entry) if root is None and entry.type == 'd': self._retrieve_file_list(entry) if entry.type == 'l': self._links[entry.filepath] = entry def run(self, fname): """Not supported""" sys.stderr.write('Not supported - or maybe you are on compatible ' 'architecture?\n') return 1 def list(self): """Output list contents directory""" if self.error: sys.stderr.write(self.error) return 1 if self.conf.root: self._retrieve_single_dir_list(self.conf.root) else: self._retrieve_file_list() self._normalize_links() sys.stdout.write(''.join([str(entry) for entry in self._entries])) return 0 def copyout(self, src, dst): """Copy file form the device using adb.""" if self.error: sys.stderr.write(self.error) return 1 cmd = ['adb', 'pull', src, dst] if self.conf.debug: sys.stderr.write(' '.join(cmd) + '\n') with open(os.devnull, 'w') as fnull: try: err = subprocess.call(cmd, stdout=fnull, stderr=fnull) except subprocess.CalledProcessError: sys.stderr.write('Error executing adb shell') return 1 return err def copyin(self, src, dst): """Copy file to the device through adb.""" if self.error: sys.stderr.write(self.error) return 1 if not dst.startswith('/'): dst = '/' + dst cmd = ['adb', 'push', src, dst] if self.conf.debug: sys.stderr.write(' '.join(cmd) + '\n') with open(os.devnull, 'w') as fnull: try: err = subprocess.call(cmd, stdout=fnull, stderr=fnull) except subprocess.CalledProcessError: sys.stderr.write('Error executing adb shell') return 1 if err != 0: sys.stderr.write('Cannot push the file, ' '%s, error %d' % (dst, err)) return 1 return 0 def rm(self, dst): """Remove file from device.""" if self.error: sys.stderr.write(self.error) return 1 cmd = ['adb', 'shell', 'rm', dst] try: err = subprocess.check_output(cmd) except subprocess.CalledProcessError: sys.stderr.write('Error executing adb shell') return 1 if err != '': sys.stderr.write(err) return 1 return 0 def rmdir(self, dst): """Remove directory from device.""" if self.error: sys.stderr.write(self.error) return 1 cmd = ['adb', 'shell', 'rm', '-r', dst] try: err = subprocess.check_output(cmd) except subprocess.CalledProcessError: sys.stderr.write('Error executing adb shell') return 1 if err != '': sys.stderr.write(err) return 1 return 0 def mkdir(self, dst): """Make directory on the device through adb.""" if self.error: sys.stderr.write(self.error) return 1 if not dst.startswith('/'): dst = '/' + dst cmd = ['adb', 'shell', 'mkdir', dst] try: err = subprocess.check_output(cmd) except subprocess.CalledProcessError: sys.stderr.write('Error executing adb shell') return 1 if err != '': sys.stderr.write(err) return 1 return 0 CALL_MAP = {'list': lambda a: Adb().list(), 'copyin': lambda a: Adb().copyin(a.src, a.dst), 'copyout': lambda a: Adb().copyout(a.src, a.dst), 'mkdir': lambda a: Adb().mkdir(a.dst), 'rmdir': lambda a: Adb().rmdir(a.dst), 'rm': lambda a: Adb().rm(a.dst), 'run': lambda a: Adb().run(a.dst)} def main(): """parse commandline""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(help='supported commands') parser_list = subparsers.add_parser('list') parser_copyin = subparsers.add_parser('copyin') parser_copyout = subparsers.add_parser('copyout') parser_rm = subparsers.add_parser('rm') parser_mkdir = subparsers.add_parser('mkdir') parser_rmdir = subparsers.add_parser('rmdir') parser_run = subparsers.add_parser('run') parser_list.add_argument('arch') parser_list.set_defaults(func=CALL_MAP['list']) parser_copyin.add_argument('arch') parser_copyin.add_argument('dst') parser_copyin.add_argument('src') parser_copyin.set_defaults(func=CALL_MAP['copyin']) parser_copyout.add_argument('arch') parser_copyout.add_argument('src') parser_copyout.add_argument('dst') parser_copyout.set_defaults(func=CALL_MAP['copyout']) parser_rm.add_argument('arch') parser_rm.add_argument('dst') parser_rm.set_defaults(func=CALL_MAP['rm']) parser_mkdir.add_argument('arch') parser_mkdir.add_argument('dst') parser_mkdir.set_defaults(func=CALL_MAP['mkdir']) parser_rmdir.add_argument('arch') parser_rmdir.add_argument('dst') parser_rmdir.set_defaults(func=CALL_MAP['rmdir']) parser_run.add_argument('arch') parser_run.add_argument('dst') parser_run.set_defaults(func=CALL_MAP['run']) parser.add_argument('--version', action='version', version='%(prog)s ' + str(__version__)) args = parser.parse_args() return args.func(args) if __name__ == '__main__': sys.exit(main())