mirror of
https://github.com/gryf/mc_adbfs.git
synced 2025-12-18 12:00:19 +01:00
Added wrapper on adb shell commands
This commit is contained in:
77
adbfs
77
adbfs
@@ -16,6 +16,11 @@ import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import shlex
|
||||
try:
|
||||
from shlex import quote
|
||||
except ImportError:
|
||||
from pipes import quote
|
||||
|
||||
__version__ = 0.12
|
||||
|
||||
@@ -337,9 +342,20 @@ class Adb(object):
|
||||
|
||||
self.__su_check()
|
||||
|
||||
def _shell_cmd(self, with_root, *args):
|
||||
cmd = [self.conf.adb_command, 'shell']
|
||||
|
||||
if with_root and self._got_root:
|
||||
_args = [quote(x) for x in args]
|
||||
cmd += ['su', '-c', quote(' '.join(_args))]
|
||||
else:
|
||||
cmd += args
|
||||
|
||||
return cmd
|
||||
|
||||
def __su_check(self):
|
||||
"""Check if we are able to get elevated privileges"""
|
||||
cmd = [self.conf.adb_command] + 'shell su -c whoami'.split()
|
||||
cmd = self._shell_cmd(False, 'su -c whoami')
|
||||
try:
|
||||
with open(os.devnull, 'w') as fnull:
|
||||
result = check_output(cmd, stderr=fnull)
|
||||
@@ -389,10 +405,8 @@ class Adb(object):
|
||||
|
||||
def _retrieve_single_dir_list(self, dir_):
|
||||
"""Retrieve file list using adb"""
|
||||
lscmd = self.conf.box['rls'].format(dir_)
|
||||
if self._got_root:
|
||||
lscmd = 'su -c "{}"'.format(lscmd)
|
||||
command = [self.conf.adb_command, 'shell', lscmd]
|
||||
lscmd = self.conf.box['rls'].format(quote(dir_))
|
||||
command = self._shell_cmd(True, *shlex.split(lscmd))
|
||||
|
||||
try:
|
||||
if self.conf.debug:
|
||||
@@ -449,12 +463,9 @@ class Adb(object):
|
||||
if not root:
|
||||
lscmd = self.conf.box['ls']
|
||||
else:
|
||||
lscmd = self.conf.box['rls'].format(root.filepath)
|
||||
lscmd = self.conf.box['rls'].format(shlex.quite(root.filepath))
|
||||
|
||||
if self._got_root:
|
||||
lscmd = 'su -c "{}"'.format(lscmd)
|
||||
|
||||
command = [self.conf.adb_command, 'shell', lscmd]
|
||||
command = self._shell_cmd(True, *shlex.split(lscmd))
|
||||
|
||||
try:
|
||||
if self.conf.debug:
|
||||
@@ -463,7 +474,7 @@ class Adb(object):
|
||||
lines = check_output(command)
|
||||
except subprocess.CalledProcessError:
|
||||
sys.stderr.write('Cannot read directory. Is device connected?\n')
|
||||
return 1
|
||||
return 2
|
||||
|
||||
current_dir = root.dirname if root else '/'
|
||||
for line in lines.split('\n'):
|
||||
@@ -499,13 +510,13 @@ class Adb(object):
|
||||
"""Not supported"""
|
||||
sys.stderr.write('Not supported - or maybe you are on compatible '
|
||||
'architecture?\n')
|
||||
return 1
|
||||
return 3
|
||||
|
||||
def list(self):
|
||||
"""Output list contents directory"""
|
||||
if self.error:
|
||||
sys.stderr.write(self.error)
|
||||
return 1
|
||||
return 4
|
||||
|
||||
if self.conf.root:
|
||||
self._retrieve_single_dir_list(self.conf.root)
|
||||
@@ -520,7 +531,7 @@ class Adb(object):
|
||||
"""Copy file form the device using adb."""
|
||||
if self.error:
|
||||
sys.stderr.write(self.error)
|
||||
return 1
|
||||
return 5
|
||||
|
||||
cmd = [self.conf.adb_command, 'pull', src, dst]
|
||||
if self.conf.debug:
|
||||
@@ -531,7 +542,7 @@ class Adb(object):
|
||||
err = subprocess.call(cmd, stdout=fnull, stderr=fnull)
|
||||
except subprocess.CalledProcessError:
|
||||
sys.stderr.write('Error executing adb shell')
|
||||
return 1
|
||||
return 6
|
||||
|
||||
return err
|
||||
|
||||
@@ -539,7 +550,7 @@ class Adb(object):
|
||||
"""Copy file to the device through adb."""
|
||||
if self.error:
|
||||
sys.stderr.write(self.error)
|
||||
return 1
|
||||
return 7
|
||||
if not dst.startswith('/'):
|
||||
dst = '/' + dst
|
||||
|
||||
@@ -552,69 +563,69 @@ class Adb(object):
|
||||
err = subprocess.call(cmd, stdout=fnull, stderr=fnull)
|
||||
except subprocess.CalledProcessError:
|
||||
sys.stderr.write('Error executing adb shell')
|
||||
return 1
|
||||
return 8
|
||||
|
||||
if err != 0:
|
||||
sys.stderr.write('Cannot push the file, '
|
||||
'%s, error %d' % (dst, err))
|
||||
return 1
|
||||
return 9
|
||||
return 0
|
||||
|
||||
def rm(self, dst):
|
||||
"""Remove file from device."""
|
||||
if self.error:
|
||||
sys.stderr.write(self.error)
|
||||
return 1
|
||||
return 10
|
||||
|
||||
cmd = [self.conf.adb_command, 'shell', 'rm', dst]
|
||||
cmd = self._shell_cmd(False, 'rm', dst)
|
||||
try:
|
||||
err = check_output(cmd)
|
||||
err = check_output(cmd).strip()
|
||||
except subprocess.CalledProcessError:
|
||||
sys.stderr.write('Error executing adb shell')
|
||||
return 1
|
||||
return 11
|
||||
|
||||
if err != '':
|
||||
sys.stderr.write(err)
|
||||
return 1
|
||||
return 12
|
||||
return 0
|
||||
|
||||
def rmdir(self, dst):
|
||||
"""Remove directory from device."""
|
||||
if self.error:
|
||||
sys.stderr.write(self.error)
|
||||
return 1
|
||||
return 13
|
||||
|
||||
cmd = [self.conf.adb_command, 'shell', 'rm', '-r', dst]
|
||||
cmd = self._shell_cmd(False, 'rm -r %s' % quote(dst))
|
||||
try:
|
||||
err = check_output(cmd)
|
||||
err = check_output(cmd).strip()
|
||||
except subprocess.CalledProcessError:
|
||||
sys.stderr.write('Error executing adb shell')
|
||||
return 1
|
||||
return 14
|
||||
|
||||
if err != '':
|
||||
sys.stderr.write(err)
|
||||
return 1
|
||||
return 15
|
||||
return 0
|
||||
|
||||
def mkdir(self, dst):
|
||||
"""Make directory on the device through adb."""
|
||||
if self.error:
|
||||
sys.stderr.write(self.error)
|
||||
return 1
|
||||
return 16
|
||||
|
||||
if not dst.startswith('/'):
|
||||
dst = '/' + dst
|
||||
|
||||
cmd = [self.conf.adb_command, 'shell', 'mkdir', dst]
|
||||
cmd = self._shell_cmd(False, 'mkdir %s' % quote(dst))
|
||||
try:
|
||||
err = check_output(cmd)
|
||||
err = check_output(cmd).strip()
|
||||
except subprocess.CalledProcessError:
|
||||
sys.stderr.write('Error executing adb shell')
|
||||
return 1
|
||||
return 17
|
||||
|
||||
if err != '':
|
||||
sys.stderr.write(err)
|
||||
return 1
|
||||
return 18
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user