1
0
mirror of https://github.com/gryf/mc_adbfs.git synced 2026-03-27 22:03:34 +01:00

7 Commits
0.7 ... 0.9

2 changed files with 373 additions and 90 deletions

View File

@@ -13,11 +13,13 @@ Rquirements
* An Android device or emulator preferably rooted * An Android device or emulator preferably rooted
* Busybox installed and available in the path on the device * Busybox installed and available in the path on the device
Make sure, that issuing from command line:: Make sure, that issuing from command line:
.. code:: shell-session
$ adb shell busybox ls $ adb shell busybox ls
should display files from root directory on the device. it should display files from root directory on the device.
Features Features
======== ========
@@ -39,15 +41,43 @@ if needed.
Usage Usage
===== =====
To use it, just issue:: To use it, just issue:
cd adbfs:// .. code:: shell-session
cd adbfs://
under MC - after some time you should see the files and directories on your under MC - after some time you should see the files and directories on your
device. For convenience you can add a bookmark (accessible under CTRL+\) for device. For convenience you can add a bookmark (accessible under CTRL+\) for
fast access. The time is depended on how many files and directories you have on fast access. The time is depended on how many files and directories you have on
your device and how fast it is :) your device and how fast it is :)
Configuration
=============
You can use configure behaviour of this plugin using ``.ini`` file located under
``$XDG_CONFIG_HOME/mc/adbfs.ini`` (which usually is located under
``~/.config/mc/adbfs.ini``), and have default values, like:
.. code:: ini
[adbfs]
debug = false
skip_dirs = true
dirs_to_skip = ["acct", "charger", "d", "dev", "proc", "sys"]
root =
where:
* ``debug`` will provide a little bit more verbose information, useful for
debugging
* ``dirs_to_skip`` list of paths to directories which will be skipped during
reading. If leaved empty, or setted to empty list (``[]``) will read
everything (slow!)
* ``root`` root directory to read. Everything outside of that directory will be
omitted. That would be the fastest way to access certain location on the
device. Note, that ``dirs_to_skip`` still apply inside this directory.
Limitations Limitations
=========== ===========
@@ -55,6 +85,8 @@ Limitations
files are on the device and so on files are on the device and so on
* Some filenames might be still inaccessible for operating * Some filenames might be still inaccessible for operating
* All files operations which needs root privileges will fail (for now) * All files operations which needs root privileges will fail (for now)
* The implementation is experimental and it's by now working with mine device;
while it might not work with yours
License License
======= =======

423
adbfs
View File

@@ -2,36 +2,139 @@
""" """
adbfs Virtual filesystem for Midnight Commander adbfs Virtual filesystem for Midnight Commander
* Copyright (c) 2015, Roman Dobosz, * Copyright (c) 2016, Roman Dobosz,
* Published under 3-clause BSD-style license (see LICENSE file) * Published under 3-clause BSD-style license (see LICENSE file)
Version: 0.7
""" """
import ConfigParser
import argparse
from datetime import datetime from datetime import datetime
import errno
import json
import os import os
import pipes
import re import re
import subprocess import subprocess
import sys import sys
__version__ = 0.8
DEBUG = os.getenv("ADBFS_DEBUG", False) XDG_CONFIG_HOME = os.getenv("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))
SKIP_SYSTEM_DIR = os.getenv("ADBFS_SKIP_SYSTEM_DIR", True)
class NoBoxFoundException(OSError):
"""
Exception raised in case of not found either toolbox or busybox on remote
filesystem accessed via adb
"""
pass
class Conf(object):
"""Simple config parser"""
boxes = {'busybox': {'ls': 'busybox ls -anel',
'rls': 'busybox ls -Ranel {}',
'file_re': r'^(?P<perms>[-bcdlps][-rwxsStT]{9})\s+'
r'(?P<links>\d+)\s'
r'(?P<uid>\d+)\s+'
r'(?P<gid>\d+)\s+'
r'(?P<size>\d+)\s[A-Z,a-z]{3}\s'
r'(?P<date_time>[A-Z,a-z]{3}\s+'
r'\d+\s\d{2}:\d{2}:\d{2}\s+\d{4})\s'
r'(?P<name>.*)'},
'toolbox': {'ls': 'toolbox ls -anl',
'rls': 'toolbox ls -Ranl {}',
'file_re': r'^(?P<perms>[-bcdlps][-rwxsStT]{9})\s+'
r'(?P<uid>\d+)\s+'
r'(?P<gid>\d+)\s+'
r'(?P<size>\d+)?\s'
r'(?P<date>\d{4}-\d{2}-\d{2}\s'
r'\d{2}:\d{2})\s'
r'(?P<name>.*)'}
}
def __init__(self):
self.box = None
self.debug = False
self.dirs_to_skip = ["acct", "charger", "d", "dev", "proc", "sys"]
self.root = None
self.get_the_box()
self.read()
def get_the_box(self):
"""Detect if we dealing with busybox or toolbox"""
try:
with open(os.devnull, "w") as fnull:
result = subprocess.check_output('adb shell which '
'busybox'.split(),
stderr=fnull)
if 'busybox' in result:
self.box = Conf.boxes['busybox']
Adb.file_re = re.compile(self.box['file_re'])
return
except subprocess.CalledProcessError:
pass
try:
with open(os.devnull, "w") as fnull:
result = subprocess.check_output('adb shell which '
'toolbox'.split(),
stderr=fnull)
if 'toolbox' in result:
self.box = Conf.boxes['toolbox']
Adb.file_re = re.compile(self.box['file_re'])
return
except subprocess.CalledProcessError:
pass
raise NoBoxFoundException(errno.ENOENT,
"There is no toolbox or busybox available")
def read(self):
"""
Read config file and change the options according to values from that
file.
"""
if not os.path.exists(XDG_CONFIG_HOME):
return
conf_fname = os.path.join(XDG_CONFIG_HOME, 'mc', 'adbfs.ini')
if not os.path.exists(conf_fname):
return
cfg = ConfigParser.SafeConfigParser()
cfg_map = {'debug': (cfg.getboolean, 'debug'),
'dirs_to_skip': (cfg.get, 'dirs_to_skip'),
'root': (cfg.get, 'root')}
cfg.read(conf_fname)
for key, (function, attr) in cfg_map.items():
try:
setattr(self, attr, function('adbfs', key))
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
pass
if self.dirs_to_skip and isinstance(self.dirs_to_skip, str):
self.dirs_to_skip = json.loads(self.dirs_to_skip, encoding="ascii")
self.dirs_to_skip = [x.encode('utf-8') for x in self.dirs_to_skip]
else:
self.dirs_to_skip = []
class File(object): class File(object):
"""Item in filesystem representation""" """Item in filesystem representation"""
def __init__(self, perms=None, links=1, uid=0, gid=0, size=0, def __init__(self, perms=None, links=1, uid=0, gid=0, size=None,
date_time=None, name=None): date_time=None, date=None, name=None):
"""initialize file""" """initialize file"""
self.perms = perms self.perms = perms
self.links = links self.links = links
self.uid = uid self.uid = uid
self.gid = gid self.gid = gid
self.size = size self.size = size if size else 0
self.date_time = date_time # as string self.date_time = date_time # as string
self.name = name self.name = name
self.date = date # as string
self.dirname = "" self.dirname = ""
self.type = None self.type = None
@@ -47,6 +150,10 @@ class File(object):
return return
self.name = name self.name = name
if not self.size:
self.size = 0
if target.startswith("/"): if target.startswith("/"):
self.link_target = target self.link_target = target
else: else:
@@ -68,13 +175,18 @@ class File(object):
"Nov": 11, "Nov": 11,
"Dec": 12} "Dec": 12}
self.dirname = dirname self.dirname = dirname
date = self.date_time.split() if self.date_time:
date = "%s-%02d-%s %s" % (date[1], date = self.date_time.split()
month_num[date[0]], date = "%s-%02d-%s %s" % (date[1],
date[3], month_num[date[0]],
date[2]) date[3],
date = datetime.strptime(date, "%d-%m-%Y %H:%M:%S") date[2])
date = datetime.strptime(date, "%d-%m-%Y %H:%M:%S")
elif self.date:
date = datetime.strptime(self.date, "%Y-%m-%d %H:%M")
self.date_time = date.strftime("%m/%d/%Y %H:%M:01") self.date_time = date.strftime("%m/%d/%Y %H:%M:01")
self.type = self.perms[0] if self.perms else None self.type = self.perms[0] if self.perms else None
if self.type == "l" and " -> " in self.name: if self.type == "l" and " -> " in self.name:
@@ -82,12 +194,9 @@ class File(object):
self.filepath = os.path.join(self.dirname, self.name) self.filepath = os.path.join(self.dirname, self.name)
def mk_link_relative(self, target_type): def mk_link_relative(self):
"""Convert links to relative""" """Convert links to relative"""
rel_path = self.dirname self.link_target = os.path.relpath(self.link_target, self.dirname)
# if target_type == "d":
# rel_path = self.filepath
self.link_target = os.path.relpath(self.link_target, rel_path)
def __repr__(self): def __repr__(self):
"""represent the file/entire node""" """represent the file/entire node"""
@@ -121,25 +230,34 @@ class File(object):
class Adb(object): class Adb(object):
"""Class for interact with android rooted device through adb""" """Class for interact with android rooted device through adb"""
dirs_to_skip = ["acct", "charger", "d", "dev", "proc", "sys"] file_re = None
file_re = re.compile(r'^(?P<perms>[-bcdlps][-rwxsStT]{9})\s+'
r'(?P<links>\d+)\s'
r'(?P<uid>\d+)\s+'
r'(?P<gid>\d+)\s+'
r'(?P<size>\d+)\s[A-Z,a-z]{3}\s'
r'(?P<date_time>[A-Z,a-z]{3}\s+'
r'\d+\s\d{2}:\d{2}:\d{2}\s+\d{4})\s'
r'(?P<name>.*)')
current_re = re.compile(r"^(\./)?(?P<dir>.+):$") current_re = re.compile(r"^(\./)?(?P<dir>.+):$")
as_root = os.getenv("ADBFS_AS_ROOT", False)
verbose = os.getenv("ADBFS_VERBOSE", False)
def __init__(self): def __init__(self):
"""Prepare archive content for operations""" """Prepare archive content for operations"""
super(Adb, self).__init__() super(Adb, self).__init__()
self.conf = Conf()
self.error = ''
self._entries = [] self._entries = []
self._links = {} self._links = {}
self._got_root = False
self.__su_check()
def __su_check(self):
"""Check if we are able to get elevated privileges"""
try:
with open(os.devnull, "w") as fnull:
result = subprocess.check_output('adb shell su -c '
'whoami'.split(),
stderr=fnull)
except subprocess.CalledProcessError:
return
if 'root' in result:
self._got_root = True
return
def _find_target(self, needle): def _find_target(self, needle):
"""Find link target""" """Find link target"""
@@ -153,6 +271,7 @@ class Adb(object):
for entry in self._entries: for entry in self._entries:
if entry.filepath == needle: if entry.filepath == needle:
return entry return entry
return None return None
def _normalize_links(self): def _normalize_links(self):
@@ -170,25 +289,80 @@ class Adb(object):
target_entry = self._find_target(entry.link_target) target_entry = self._find_target(entry.link_target)
if target_entry: if target_entry:
entry.link_target = target_entry.filepath entry.link_target = target_entry.filepath
entry.mk_link_relative(target_entry.type) entry.mk_link_relative()
else: else:
elems_to_remove.append(self._entries.index(entry)) elems_to_remove.append(self._entries.index(entry))
for idx in sorted(elems_to_remove, reverse=True): for idx in sorted(elems_to_remove, reverse=True):
del self._entries[idx] del self._entries[idx]
def _retrieve_file_list(self, root=None): def _retrieve_single_dir_list(self, dir_):
"""Retrieve file list using adb""" """Retrieve file list using adb"""
# if root:
# print "retrieve for %s" % root.filepath
command = ["adb", "shell", "su", "-c"] command = ["adb", "shell", "su", "-c"]
if not root: command.append(self.conf.box['rls'].format(dir_))
command.append("'busybox ls -anel'")
else:
command.append("'busybox ls -Ranel {}'".format(root.filepath))
try: try:
if self.conf.debug:
print "executing", " ".join(command)
lines = subprocess.check_output(command)
except subprocess.CalledProcessError:
sys.stderr.write("Cannot read directory. Is device connected?\n")
return 1
lines = [l.strip() for l in lines.split('\n') if l.strip()]
if len(lines) == 1:
reg_match = self.file_re.match(lines[0])
entry = File(**reg_match.groupdict())
entry.update('/')
if entry.filepath in self.conf.dirs_to_skip:
return
self._entries.append(entry)
if entry.type == "l":
self._links[entry.filepath] = entry
self._retrieve_single_dir_list(entry.link_target)
else:
for line in lines:
current_dir_re = self.current_re.match(line)
if current_dir_re:
current_dir = current_dir_re.groupdict()["dir"]
if not current_dir:
current_dir = "/"
continue
reg_match = self.file_re.match(line)
if not reg_match:
continue
entry = File(**reg_match.groupdict())
if entry.name in (".", ".."):
continue
entry.update(current_dir)
if entry.filepath in self.conf.dirs_to_skip:
continue
self._entries.append(entry)
if entry.type == "l":
self._links[entry.filepath] = entry
def _retrieve_file_list(self, root=None):
"""Retrieve file list using adb"""
command = ["adb", "shell", "su", "-c"]
if not root:
command.append(self.conf.box['ls'])
else:
command.append(self.conf.box['rls'].format(root.filepath))
try:
if self.conf.debug:
print "executing", " ".join(command)
lines = subprocess.check_output(command) lines = subprocess.check_output(command)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
sys.stderr.write("Cannot read directory. Is device connected?\n") sys.stderr.write("Cannot read directory. Is device connected?\n")
@@ -212,11 +386,11 @@ class Adb(object):
if entry.name in (".", ".."): if entry.name in (".", ".."):
continue continue
if SKIP_SYSTEM_DIR and entry.name in Adb.dirs_to_skip:
continue
entry.update(current_dir) entry.update(current_dir)
if entry.filepath in self.conf.dirs_to_skip:
continue
self._entries.append(entry) self._entries.append(entry)
if root is None and entry.type == "d": if root is None and entry.type == "d":
self._retrieve_file_list(entry) self._retrieve_file_list(entry)
@@ -232,29 +406,63 @@ class Adb(object):
def list(self): def list(self):
"""Output list contents directory""" """Output list contents directory"""
self._retrieve_file_list() if self.error:
# self._retrieve_file_list_from_pickle() sys.stderr.write(self.error)
# self._save_file_list_to_pickle() return 1
if self.conf.root:
self._retrieve_single_dir_list(self.conf.root)
else:
self._retrieve_file_list()
self._normalize_links() self._normalize_links()
# with open(os.path.join(os.path.dirname(os.path.realpath(__file__)),
# # "list.pcl"), "w") as fob:
# "list.pcl")) as fob:
# import cPickle
# # cPickle.dump(self._entries, fob)
# self._entries = cPickle.load(fob)
sys.stdout.write("".join([str(entry) for entry in self._entries])) sys.stdout.write("".join([str(entry) for entry in self._entries]))
return 0 return 0
def copyout(self, src, dst): def copyout(self, src, dst):
"""Copy file form the device using adb.""" """Copy file form the device using adb."""
if self.error:
sys.stderr.write(self.error)
return 1
cmd = ["adb", "pull", src, dst]
if self.conf.debug:
sys.stderr.write(" ".join(cmd) + "\n")
with open(os.devnull, "w") as fnull: with open(os.devnull, "w") as fnull:
return subprocess.call(["adb", "pull", pipes.quote(src), try:
pipes.quote(dst)], err = subprocess.call(cmd, stdout=fnull, stderr=fnull)
stdout=fnull, stderr=fnull) except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 1
return err
def copyin(self, src, dst): def copyin(self, src, dst):
"""Copy file to the device through adb.""" """Copy file to the device through adb."""
if self.error:
sys.stderr.write(self.error)
return 1
if not dst.startswith("/"): if not dst.startswith("/"):
dst = "/" + dst dst = "/" + dst
# cmd = ["adb", "push", pipes.quote(src), pipes.quote(dst)]
cmd = ["adb", "push", src, dst]
if self.conf.debug:
sys.stderr.write(" ".join(cmd) + "\n")
with open(os.devnull, "w") as fnull: with open(os.devnull, "w") as fnull:
err = subprocess.call(["adb", "push", pipes.quote(src), try:
pipes.quote(dst)], err = subprocess.call(cmd, stdout=fnull, stderr=fnull)
stdout=fnull, stderr=fnull) except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 1
if err != 0: if err != 0:
sys.stderr.write("Cannot push the file, " sys.stderr.write("Cannot push the file, "
@@ -264,8 +472,16 @@ class Adb(object):
def rm(self, dst): def rm(self, dst):
"""Remove file from device.""" """Remove file from device."""
cmd = ["adb", "shell", "rm", pipes.quote(dst)] if self.error:
err = subprocess.check_output(cmd) sys.stderr.write(self.error)
return 1
cmd = ["adb", "shell", "rm", dst]
try:
err = subprocess.check_output(cmd)
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 1
if err != "": if err != "":
sys.stderr.write(err) sys.stderr.write(err)
@@ -274,8 +490,16 @@ class Adb(object):
def rmdir(self, dst): def rmdir(self, dst):
"""Remove directory from device.""" """Remove directory from device."""
cmd = ["adb", "shell", "rm", "-r", pipes.quote(dst)] if self.error:
err = subprocess.check_output(cmd) sys.stderr.write(self.error)
return 1
cmd = ["adb", "shell", "rm", "-r", dst]
try:
err = subprocess.check_output(cmd)
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 1
if err != "": if err != "":
sys.stderr.write(err) sys.stderr.write(err)
@@ -284,8 +508,19 @@ class Adb(object):
def mkdir(self, dst): def mkdir(self, dst):
"""Make directory on the device through adb.""" """Make directory on the device through adb."""
cmd = ["adb", "shell", "mkdir", pipes.quote(dst)] if self.error:
err = subprocess.check_output(cmd) sys.stderr.write(self.error)
return 1
if not dst.startswith("/"):
dst = "/" + dst
cmd = ["adb", "shell", "mkdir", dst]
try:
err = subprocess.check_output(cmd)
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 1
if err != "": if err != "":
sys.stderr.write(err) sys.stderr.write(err)
@@ -301,39 +536,55 @@ CALL_MAP = {'list': lambda a: Adb().list(),
'rm': lambda a: Adb().rm(a.dst), 'rm': lambda a: Adb().rm(a.dst),
'run': lambda a: Adb().run(a.dst)} 'run': lambda a: Adb().run(a.dst)}
def main(): def main():
"""parse commandline""" """parse commandline"""
try: parser = argparse.ArgumentParser()
if DEBUG: subparsers = parser.add_subparsers(help='supported commands')
sys.stderr.write("commandline: %s\n" % " ".join(sys.argv)) parser_list = subparsers.add_parser('list')
if sys.argv[1] not in ('list', 'copyin', 'copyout', 'rm', "rmdir", parser_copyin = subparsers.add_parser('copyin')
'mkdir', "run"): parser_copyout = subparsers.add_parser('copyout')
sys.exit(2) parser_rm = subparsers.add_parser('rm')
except IndexError: parser_mkdir = subparsers.add_parser('mkdir')
sys.exit(2) parser_rmdir = subparsers.add_parser('rmdir')
parser_run = subparsers.add_parser('run')
class Arg(object): parser_list.add_argument('arch')
"""Mimic argparse/optparse object""" parser_list.set_defaults(func=CALL_MAP['list'])
dst = None
src = None
arch = None
arg = Arg() parser_copyin.add_argument('arch')
parser_copyin.add_argument('dst')
parser_copyin.add_argument('src')
parser_copyin.set_defaults(func=CALL_MAP['copyin'])
try: parser_copyout.add_argument('arch')
arg.arch = sys.argv[2] parser_copyout.add_argument('src')
if sys.argv[1] == 'copyin': parser_copyout.add_argument('dst')
arg.src = sys.argv[4] parser_copyout.set_defaults(func=CALL_MAP['copyout'])
arg.dst = sys.argv[3]
if sys.argv[1] == 'copyout': parser_rm.add_argument('arch')
arg.src = sys.argv[3] parser_rm.add_argument('dst')
arg.dst = sys.argv[4] parser_rm.set_defaults(func=CALL_MAP['rm'])
elif sys.argv[1] in ('rm', 'rmdir', 'run', 'mkdir'):
arg.dst = sys.argv[3] parser_mkdir.add_argument('arch')
except IndexError: parser_mkdir.add_argument('dst')
sys.exit(2) parser_mkdir.set_defaults(func=CALL_MAP['mkdir'])
parser_rmdir.add_argument('arch')
parser_rmdir.add_argument('dst')
parser_rmdir.set_defaults(func=CALL_MAP['rmdir'])
parser_run.add_argument('arch')
parser_run.add_argument('dst')
parser_run.set_defaults(func=CALL_MAP['run'])
parser.add_argument('--version', action='version',
version='%(prog)s ' + str(__version__))
args = parser.parse_args()
return args.func(args)
return CALL_MAP[sys.argv[1]](arg)
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(main()) sys.exit(main())