mirror of
https://github.com/gryf/mc_adbfs.git
synced 2025-12-18 12:00:19 +01:00
337 lines
10 KiB
Python
Executable File
337 lines
10 KiB
Python
Executable File
#! /usr/bin/env python
|
|
"""
|
|
adbfs Virtual filesystem for Midnight Commander
|
|
|
|
* Copyright (c) 2015, Roman Dobosz,
|
|
* Published under 3-clause BSD-style license (see LICENSE file)
|
|
|
|
Version: 0.7
|
|
"""
|
|
|
|
from datetime import datetime
|
|
import subprocess
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
|
|
DEBUG = os.getenv("ADBFS_DEBUG", False)
|
|
SKIP_SYSTEM_DIR = os.getenv("ADBFS_SKIP_SYSTEM_DIR", True)
|
|
|
|
|
|
class File(object):
|
|
"""Item in filesystem representation"""
|
|
def __init__(self, perms=None, links=1, uid=0, gid=0, size=0,
|
|
date_time=None, name=None):
|
|
"""initialize file"""
|
|
self.perms = perms
|
|
self.links = links
|
|
self.uid = uid
|
|
self.gid = gid
|
|
self.size = size
|
|
self.date_time = date_time # as string
|
|
self.name = name
|
|
|
|
self.dirname = ""
|
|
self.type = None
|
|
self.string = None
|
|
self.link_target = None
|
|
self.filepath = None
|
|
|
|
def _correct_link(self):
|
|
"""Canonize filename and fill the link attr"""
|
|
try:
|
|
name, target = self.name.split(" -> ")
|
|
except ValueError:
|
|
return
|
|
|
|
self.name = name
|
|
if target.startswith("/"):
|
|
self.link_target = target
|
|
else:
|
|
self.link_target = os.path.abspath(os.path.join(self.dirname,
|
|
target))
|
|
|
|
def update(self, dirname):
|
|
"""update object fields"""
|
|
month_num = {"Jan": 1,
|
|
"Feb": 2,
|
|
"Mar": 3,
|
|
"Apr": 4,
|
|
"May": 5,
|
|
"Jun": 6,
|
|
"Jul": 7,
|
|
"Aug": 8,
|
|
"Sep": 9,
|
|
"Oct": 10,
|
|
"Nov": 11,
|
|
"Dec": 12}
|
|
self.dirname = dirname
|
|
date = self.date_time.split()
|
|
date = "%s-%02d-%s %s" % (date[1],
|
|
month_num[date[0]],
|
|
date[3],
|
|
date[2])
|
|
date = datetime.strptime(date, "%d-%m-%Y %H:%M:%S")
|
|
self.date_time = date.strftime("%m/%d/%Y %H:%M:01")
|
|
self.type = self.perms[0] if self.perms else None
|
|
|
|
if self.type == "l" and " -> " in self.name:
|
|
self._correct_link()
|
|
|
|
self.filepath = os.path.join(self.dirname, self.name)
|
|
|
|
def mk_link_relative(self, target_type):
|
|
"""Convert links to relative"""
|
|
rel_path = self.dirname
|
|
# if target_type == "d":
|
|
# rel_path = self.filepath
|
|
self.link_target = os.path.relpath(self.link_target, rel_path)
|
|
|
|
def __repr__(self):
|
|
"""represent the file/entire node"""
|
|
fullname = os.path.join(self.dirname, self.name)
|
|
if self.link_target:
|
|
fullname += " -> " + self.link_target
|
|
return "<File {type} {name} {id}>".format(type=self.type,
|
|
name=fullname,
|
|
id=hex(id(self)))
|
|
|
|
def __str__(self):
|
|
"""display the file/entire node"""
|
|
template = ("{perms} {links:>4} {uid:<8} {gid:<8} {size:>8} "
|
|
"{date_time} {fullname}\n")
|
|
|
|
if not self.name:
|
|
return ""
|
|
|
|
fullname = os.path.join(self.dirname, self.name)
|
|
if self.link_target:
|
|
fullname += " -> " + self.link_target
|
|
|
|
return template.format(perms=self.perms,
|
|
links=self.links,
|
|
uid=self.uid,
|
|
gid=self.gid,
|
|
size=self.size,
|
|
date_time=self.date_time,
|
|
fullname=fullname)
|
|
|
|
|
|
class Adb(object):
|
|
"""Class for interact with android rooted device through adb"""
|
|
dirs_to_skip = ["acct", "charger", "d", "dev", "proc", "sys"]
|
|
file_re = re.compile(r'^(?P<perms>[-bcdlps][-rwxsStT]{9})\s+'
|
|
r'(?P<links>\d+)\s'
|
|
r'(?P<uid>\d+)\s+'
|
|
r'(?P<gid>\d+)\s+'
|
|
r'(?P<size>\d+)\s[A-Z,a-z]{3}\s'
|
|
r'(?P<date_time>[A-Z,a-z]{3}\s+'
|
|
r'\d+\s\d{2}:\d{2}:\d{2}\s+\d{4})\s'
|
|
r'(?P<name>.*)')
|
|
|
|
current_re = re.compile(r"^(\./)?(?P<dir>.+):$")
|
|
as_root = os.getenv("ADBFS_AS_ROOT", False)
|
|
verbose = os.getenv("ADBFS_VERBOSE", False)
|
|
|
|
def __init__(self):
|
|
"""Prepare archive content for operations"""
|
|
super(Adb, self).__init__()
|
|
self._entries = []
|
|
self._links = {}
|
|
|
|
def _find_target(self, needle):
|
|
"""Find link target"""
|
|
|
|
if needle in self._links:
|
|
elem = self._links[needle]
|
|
target = os.path.abspath(os.path.join(elem.dirname,
|
|
elem.link_target))
|
|
return self._find_target(target)
|
|
|
|
for entry in self._entries:
|
|
if entry.filepath == needle:
|
|
return entry
|
|
return None
|
|
|
|
def _normalize_links(self):
|
|
"""
|
|
There might be a case of a chain of linked files, like:
|
|
|
|
/foo -> /mnt/foo
|
|
/bar -> /foo
|
|
|
|
If one want to follow such 'bar' link - MC in extfs mode will fail to
|
|
figure out the right target. This helper will correct the thing.
|
|
"""
|
|
elems_to_remove = []
|
|
for entry in self._links.values():
|
|
target_entry = self._find_target(entry.link_target)
|
|
if target_entry:
|
|
entry.link_target = target_entry.filepath
|
|
entry.mk_link_relative(target_entry.type)
|
|
else:
|
|
elems_to_remove.append(self._entries.index(entry))
|
|
|
|
for idx in sorted(elems_to_remove, reverse=True):
|
|
del self._entries[idx]
|
|
|
|
def _retrieve_file_list(self, root=None):
|
|
"""Retrieve file list using adb"""
|
|
|
|
# if root:
|
|
# print "retrieve for %s" % root.filepath
|
|
command = ["adb", "shell", "su", "-c"]
|
|
if not root:
|
|
command.append("'busybox ls -anel'")
|
|
else:
|
|
command.append("'busybox ls -Ranel {}'".format(root.filepath))
|
|
|
|
try:
|
|
lines = subprocess.check_output(command)
|
|
except subprocess.CalledProcessError:
|
|
sys.stderr.write("Cannot read directory. Is device connected?\n")
|
|
return 1
|
|
|
|
current_dir = root.dirname if root else "/"
|
|
for line in lines.split("\n"):
|
|
line = line.strip()
|
|
current_dir_re = self.current_re.match(line)
|
|
if current_dir_re:
|
|
current_dir = current_dir_re.groupdict()["dir"]
|
|
if not current_dir:
|
|
current_dir = "/"
|
|
continue
|
|
|
|
reg_match = self.file_re.match(line)
|
|
if not reg_match:
|
|
continue
|
|
|
|
entry = File(**reg_match.groupdict())
|
|
if entry.name in (".", ".."):
|
|
continue
|
|
|
|
if SKIP_SYSTEM_DIR and entry.name in Adb.dirs_to_skip:
|
|
continue
|
|
|
|
entry.update(current_dir)
|
|
|
|
self._entries.append(entry)
|
|
if root is None and entry.type == "d":
|
|
self._retrieve_file_list(entry)
|
|
|
|
if entry.type == "l":
|
|
self._links[entry.filepath] = entry
|
|
|
|
def run(self, fname):
|
|
"""Not supported"""
|
|
sys.stderr.write("Not supported - or maybe you are on compatible "
|
|
"architecture?\n")
|
|
return 1
|
|
|
|
def list(self):
|
|
"""Output list contents directory"""
|
|
self._retrieve_file_list()
|
|
# self._retrieve_file_list_from_pickle()
|
|
# self._save_file_list_to_pickle()
|
|
self._normalize_links()
|
|
sys.stdout.write("".join([str(entry) for entry in self._entries]))
|
|
return 0
|
|
|
|
def copyout(self, src, dst):
|
|
"""Copy file form the device using adb."""
|
|
with open(os.devnull, "w") as fnull:
|
|
return subprocess.call(["adb", "pull", src, dst],
|
|
stdout=fnull, stderr=fnull)
|
|
|
|
def copyin(self, src, dst):
|
|
"""Copy file to the device through adb."""
|
|
if not dst.startswith("/"):
|
|
dst = "/" + dst
|
|
|
|
with open(os.devnull, "w") as fnull:
|
|
err = subprocess.call(["adb", "push", src, dst],
|
|
stdout=fnull, stderr=fnull)
|
|
|
|
if err != 0:
|
|
sys.stderr.write("Cannot push the file, "
|
|
"%s, error %d" % (dst, err))
|
|
return 1
|
|
return 0
|
|
|
|
def rm(self, dst):
|
|
"""Remove file from device."""
|
|
cmd = ["adb", "shell", "rm", dst]
|
|
err = subprocess.check_output(cmd)
|
|
|
|
if err != "":
|
|
sys.stderr.write(err)
|
|
return 1
|
|
return 0
|
|
|
|
def rmdir(self, dst):
|
|
"""Remove directory from device."""
|
|
cmd = ["adb", "shell", "rm", "-r", dst]
|
|
err = subprocess.check_output(cmd)
|
|
|
|
if err != "":
|
|
sys.stderr.write(err)
|
|
return 1
|
|
return 0
|
|
|
|
def mkdir(self, dst):
|
|
"""Make directory on the device through adb."""
|
|
cmd = ["adb", "shell", "mkdir", dst]
|
|
err = subprocess.check_output(cmd)
|
|
|
|
if err != "":
|
|
sys.stderr.write(err)
|
|
return 1
|
|
return 0
|
|
|
|
|
|
CALL_MAP = {'list': lambda a: Adb().list(),
|
|
'copyin': lambda a: Adb().copyin(a.src, a.dst),
|
|
'copyout': lambda a: Adb().copyout(a.src, a.dst),
|
|
'mkdir': lambda a: Adb().mkdir(a.dst),
|
|
'rmdir': lambda a: Adb().rmdir(a.dst),
|
|
'rm': lambda a: Adb().rm(a.dst),
|
|
'run': lambda a: Adb().run(a.dst)}
|
|
|
|
def main():
|
|
"""parse commandline"""
|
|
try:
|
|
if DEBUG:
|
|
sys.stderr.write("commandline: %s\n" % " ".join(sys.argv))
|
|
if sys.argv[1] not in ('list', 'copyin', 'copyout', 'rm', "rmdir",
|
|
'mkdir', "run"):
|
|
sys.exit(2)
|
|
except IndexError:
|
|
sys.exit(2)
|
|
|
|
class Arg(object):
|
|
"""Mimic argparse/optparse object"""
|
|
dst = None
|
|
src = None
|
|
arch = None
|
|
|
|
arg = Arg()
|
|
|
|
try:
|
|
arg.arch = sys.argv[2]
|
|
if sys.argv[1] == 'copyin':
|
|
arg.src = sys.argv[4]
|
|
arg.dst = sys.argv[3]
|
|
if sys.argv[1] == 'copyout':
|
|
arg.src = sys.argv[3]
|
|
arg.dst = sys.argv[4]
|
|
elif sys.argv[1] in ('rm', 'rmdir', 'run', 'mkdir'):
|
|
arg.dst = sys.argv[3]
|
|
except IndexError:
|
|
sys.exit(2)
|
|
|
|
return CALL_MAP[sys.argv[1]](arg)
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|