Browse Source

Ran black on files and organized imports (stdlib, 3rd party, package)

refactor
Raphael Roberts 6 years ago
parent
commit
ea65158a76
  1. 135
      pyadb/adb.py
  2. 5
      pyadb/extras/open_youtube.py
  3. 24
      pyadb/extras/sync_clip.py
  4. 39
      pyadb/internal/__init__.py
  5. 35
      pyadb/internal/android_db.py
  6. 31
      pyadb/internal/cli_wrap.py
  7. 32
      pyadb/internal/config.py
  8. 23
      pyadb/internal/directory.py
  9. 15
      pyadb/internal/load_config.py
  10. 11
      pyadb/test.py
  11. 6
      setup.py

135
pyadb/adb.py

@ -9,6 +9,7 @@ import shlex
from android_db import AndroidSQLConn
from load_things import loaded
from decode_parcel import decode_parcel
debug = True
config = loaded.config
keycodes = loaded.keycodes
@ -28,8 +29,7 @@ def merge(src, dst, log=False):
destFile = os.path.join(destPath, file)
if os.path.isfile(destFile):
if log:
print("Skipping existing file: " +
os.path.join(relPath, file))
print("Skipping existing file: " + os.path.join(relPath, file))
ok = False
continue
srcFile = os.path.join(path, file)
@ -41,13 +41,18 @@ def merge(src, dst, log=False):
def _adb(*args, output="shell"):
'''Output modes:
"""Output modes:
"out": return output
"shell": print to shell
"buffered": read line by line'''
"buffered": read line by line"""
args = [exe] + list(args)
if output == "out":
return subprocess.check_output(args, shell=False).decode().replace('\r\n', '\n').rstrip()
return (
subprocess.check_output(args, shell=False)
.decode()
.replace("\r\n", "\n")
.rstrip()
)
elif output == "shell":
ret = subprocess.call(args, shell=False)
if ret:
@ -58,11 +63,11 @@ def _adb(*args, output="shell"):
def kill_server():
_adb('kill-server')
_adb("kill-server")
def start_server():
_adb('start-server')
_adb("start-server")
def get_info():
@ -72,12 +77,8 @@ def get_info():
main = {}
for device in formed:
categories = re.split(" +", device)
device_dict = {
"serial": categories[0],
"mode": categories[1]
}
device_dict.update(dict(category.split(":")
for category in categories[2:]))
device_dict = {"serial": categories[0], "mode": categories[1]}
device_dict.update(dict(category.split(":") for category in categories[2:]))
main[categories[0]] = device_dict
return main
@ -86,21 +87,21 @@ class ADBWrapper:
root_mode = False
def connect(ip, port=5555):
if not re.match(r'(\d{1,3}\.){3}\d{1,3}', ip):
if not re.match(r"(\d{1,3}\.){3}\d{1,3}", ip):
raise TypeError("Invalid ip")
if not all(int(n) <= 255 and int(n) >= 0 for n in ip.split('.')):
if not all(int(n) <= 255 and int(n) >= 0 for n in ip.split(".")):
raise TypeError("Invalid ip")
if not (port >= 0 and port <= 2**16-1):
if not (port >= 0 and port <= 2 ** 16 - 1):
raise TyperError("Port must be in the range 0-65536")
id = '{}:{}'.format(ip, port)
_adb('connect', '{}:{}'.format(ip, port))
id = "{}:{}".format(ip, port)
_adb("connect", "{}:{}".format(ip, port))
dev = Device(id)
dev.tcip = True
return dev
def disconnect(self):
if self.tcip:
_adb('disconnect', self.serial)
_adb("disconnect", self.serial)
def db_connect(self, filepath):
return AndroidSQLConn(self, filepath)
@ -125,33 +126,33 @@ class ADBWrapper:
self.__dict__.update(info)
def adb(self, *args, output="shell"):
args = ['-s', self.serial] + list(args)
args = ["-s", self.serial] + list(args)
return _adb(*args, output=output)
def shell(self, *args, output="shell"):
args = ('shell',)+args
args = ("shell",) + args
return self.adb(*args, output=output)
def sudo(self, *args, output="shell"):
if self.mode == 'recovery' or self.root_mode:
if self.mode == "recovery" or self.root_mode:
return self.shell(*args, output=output)
else:
return self.shell('su', '--', '--', *args, output=output)
return self.shell("su", "--", "--", *args, output=output)
@classmethod
def root(cls):
cls.root_mode = True
_adb('root')
_adb("root")
@classmethod
def unroot(cls):
cls.root_mode = False
_adb('unroot')
_adb("unroot")
def reboot(self, mode=None):
if mode:
if mode == "soft":
if self.mode != 'recovery':
if self.mode != "recovery":
pid = self.shell("pidof", "zygote", output="out")
return self.sudo("kill", pid, output="shell")
else:
@ -169,9 +170,8 @@ class ADBWrapper:
class FSActionWrapper(ADBWrapper):
def stat(self, file):
'''\
"""\
%a Access bits (octal) |%A Access bits (flags)|%b Size/512
%B Bytes per %b (512) |%d Device ID (dec) |%D Device ID (hex)
%f All mode bits (hex) |%F File type |%g Group ID
@ -186,24 +186,24 @@ The valid format escape sequences for filesystems:
%a Available blocks |%b Total blocks |%c Total inodes
%d Free inodes |%f Free blocks |%i File system ID
%l Max filename length |%n File name |%s Fragment size
%S Best transfer size |%t FS type (hex) |%T FS type (driver name)'''
%S Best transfer size |%t FS type (hex) |%T FS type (driver name)"""
command = 'stat -c "%A;%F;%U;%G" {};echo $?'.format(file)
res = self.sudo(command, output="out")
output, res = res.split('\n')
if res == '0':
return output.split(';')
output, res = res.split("\n")
if res == "0":
return output.split(";")
def exists(self, file):
return self.stat(file) is not None
def isfile(self, file):
return self.stat(file)[1] == 'file'
return self.stat(file)[1] == "file"
def isdir(self, file):
return self.stat(file)[1] == 'directory'
return self.stat(file)[1] == "directory"
def islink(self, file):
return self.stat(file)[1] == 'symbolic link'
return self.stat(file)[1] == "symbolic link"
def delete(self, path):
return self.sudo("rm", "-rf", path, output="out")
@ -211,13 +211,13 @@ The valid format escape sequences for filesystems:
def copy(self, remote, local, del_duplicates=True, ignore_error=True):
remote_stat = self.stat(remote)
if remote_stat is not None:
if remote_stat[1] == "directory" and not remote.endswith('/'):
remote += '/'
if remote_stat[1] == "directory" and not remote.endswith("/"):
remote += "/"
merge_flag = False
if os.path.exists(local):
last = os.path.split(local)[-1]
real_dir = local
local = os.path.join(config['local']['temp'], last)
local = os.path.join(config["local"]["temp"], last)
merge_flag = True
try:
self.adb("pull", "-a", remote, local)
@ -235,18 +235,18 @@ The valid format escape sequences for filesystems:
def move(self, remote, local, del_duplicates=True, ignore_error=False):
if self.exists(remote):
self.copy(remote, local, del_duplicates=del_duplicates,
ignore_error=ignore_error)
self.copy(
remote, local, del_duplicates=del_duplicates, ignore_error=ignore_error
)
self.delete(remote)
else:
print("File not found: {}".format(remote))
def push(self, local, remote):
self.adb('push', local, remote)
self.adb("push", local, remote)
class Input(ADBWrapper):
def send_keycode(self, code):
try:
keycode = keycodes[code]
@ -255,23 +255,26 @@ class Input(ADBWrapper):
self.shell("input", "keyevent", keycode)
def unlock_phone(self, password):
if self.mode == 'recovery':
if self.mode == "recovery":
return
if not decode_parcel(self.shell('service', 'call', 'power', '12', output="out"), 'int'):
self.send_keycode('power')
if decode_parcel(self.shell('service', 'call', 'trust', '7', output="out"), 'int'):
self.send_keycode('space')
if not decode_parcel(
self.shell("service", "call", "power", "12", output="out"), "int"
):
self.send_keycode("power")
if decode_parcel(
self.shell("service", "call", "trust", "7", output="out"), "int"
):
self.send_keycode("space")
self.shell("input", "text", str(password))
self.send_keycode('enter')
self.send_keycode("enter")
class TWRP(FSActionWrapper):
def backup(self, *partitions, name=None, backupdir=None):
if self.mode != 'recovery':
self.reboot('recovery')
if self.mode != "recovery":
self.reboot("recovery")
if backupdir is None:
backupdir = config['local']['twrp']
backupdir = config["local"]["twrp"]
else:
backupdir = backupdir
options_dict = {
@ -283,36 +286,38 @@ class TWRP(FSActionWrapper):
"spec_part_2": "2",
"spec_part_3": "3",
"boot": "B",
"as": "A"
"as": "A",
}
options = "".join(options_dict[option] for option in partitions)
if not name:
name = "backup_" + \
datetime.datetime.today().strftime(defaults['date_format'])
name = "backup_" + datetime.datetime.today().strftime(
defaults["date_format"]
)
filename = os.path.join(backupdir, name)
self.shell("twrp", "backup", options, name)
phone_dir = "/data/media/0/TWRP/BACKUPS/{serial}/{name}".format(
serial=self.serial, name=name)
serial=self.serial, name=name
)
self.move(phone_dir, filename)
def wipe(self, partition):
if self.mode != 'recovery':
self.reboot('recovery')
if self.mode != "recovery":
self.reboot("recovery")
self.shell("twrp", "wipe", partition)
def install(self, name):
if self.mode != 'recovery':
self.reboot('recovery')
if self.mode != "recovery":
self.reboot("recovery")
if os.path.exists(name):
local_name = name
name = os.path.split(name)[-1]
update_path = '{}/{}'.format(config['remote']['updates'], name)
if not self.exists(config['remote']['updates']):
self.sudo('mkdir', config['remote']['updates'])
update_path = "{}/{}".format(config["remote"]["updates"], name)
if not self.exists(config["remote"]["updates"]):
self.sudo("mkdir", config["remote"]["updates"])
if not self.exists(update_path):
self.push(local_name, config['remote']['updates'])
self.push(local_name, config["remote"]["updates"])
else:
update_path = '{}/{}'.format(config['remote']['updates'], name)
update_path = "{}/{}".format(config["remote"]["updates"], name)
self.shell("twrp", "install", update_path)

5
pyadb/extras/open_youtube.py

@ -1,9 +1,10 @@
from adb import Device
import pyperclip
from adb import Device
def open_youtube(Device, link):
Device.shell(*('am start -a android.intent.action.VIEW'.split(' ')), link)
Device.shell(*("am start -a android.intent.action.VIEW".split(" ")), link)
if __name__ == "__main__":

24
pyadb/extras/sync_clip.py

@ -1,26 +1,32 @@
import pyperclip
import time
import re
import pyperclip
def sync_clipboard(dev):
db_file = '/data/data/com.catchingnow.tinyclipboardmanager/databases/clippingnow.db'
db_file = "/data/data/com.catchingnow.tinyclipboardmanager/databases/clippingnow.db"
try:
pid = dev.shell(
'pidof', 'com.catchingnow.tinyclipboardmanager', output="out")
dev.sudo('kill', pid, output='shell')
pid = dev.shell("pidof", "com.catchingnow.tinyclipboardmanager", output="out")
dev.sudo("kill", pid, output="shell")
except:
pass
t = time.time()
text = pyperclip.paste()
conn = dev.db_connect(db_file)
out = conn.execute("INSERT INTO cliphistory VALUES (?,?,?);", [
int(1000*t), text, False])
dev.shell('am', 'start', '-n',
'com.catchingnow.tinyclipboardmanager/.activity.ActivityMain')
out = conn.execute(
"INSERT INTO cliphistory VALUES (?,?,?);", [int(1000 * t), text, False]
)
dev.shell(
"am",
"start",
"-n",
"com.catchingnow.tinyclipboardmanager/.activity.ActivityMain",
)
if __name__ == "__main__":
import adb
d = adb.Device.prim_device()
sync_clipboard(d)

39
pyadb/internal/__init__.py

@ -1,7 +1,9 @@
import os
import posixpath
import shutil
from send2trash import send2trash
from pyadb.internal import config
from pyadb.internal.directory import Directory, get_unmodified
from pyadb.internal.cli_wrap import AdbWrapper
@ -20,8 +22,7 @@ def merge(src, dst, log=False):
destFile = os.path.join(destPath, file)
if os.path.isfile(destFile):
if log:
print("Skipping existing file: " +
os.path.join(relPath, file))
print("Skipping existing file: " + os.path.join(relPath, file))
ok = False
continue
srcFile = os.path.join(path, file)
@ -53,22 +54,23 @@ class FileSystem(AdbWrapper):
# %l Max filename length |%n File name |%s Fragment size
# %S Best transfer size |%t FS type (hex) |%T FS type
command = 'stat -c "%A;%F;%U;%G" {};echo $?'.format(file)
res = self.sudo(command, output="out")
output, res = res.split('\n')
if res == '0':
return output.split(';')
stdout, stderr = self.sudo(command, True)
output, res = stdout.read().decode().rstrip().split(config.LINEFEED)
if res == "0":
return output.split(";")
def exists(self, file):
return self.stat(file) is not None
def isfile(self, file):
return self.stat(file)[FileSystem.STAT_TYPE] == 'file'
return self.stat(file)[FileSystem.STAT_TYPE] == "file"
def isdir(self, file):
return self.stat(file)[FileSystem.STAT_TYPE] == 'directory'
return self.stat(file)[FileSystem.STAT_TYPE] == "directory"
def islink(self, file):
return self.stat(file)[FileSystem.STAT_TYPE] == 'symbolic link'
return self.stat(file)[FileSystem.STAT_TYPE] == "symbolic link"
def delete(self, path):
return self.sudo(["rm", "-rf", path], output="out")
@ -86,26 +88,23 @@ class FileSystem(AdbWrapper):
else:
raise FileExistsError
self.execute(['pull', remote, local])
self.execute(["pull", remote, local])
def push(self, local, remote):
self.execute(['push', remote, local])
self.execute(["push", remote, local])
def merge_local_with_remote(self, remote, local, dry=True, delete=False):
"""Merge local directory with directory on phone, optionally deleting""" # noqa
stdout, stderr = self.execute(
['cd', remote, ';', 'find', '.', '-type', 'f'],
output_streams=True
["cd", remote, ";", "find", ".", "-type", "f"], output_streams=True
)
listing = stdout.read().decode().rstrip().split(config.LINEFEED)
remote_dirtree = Directory.from_dir_listing(
listing, is_listening=True)
remote_dirtree = Directory.from_dir_listing(listing, is_listening=True)
prev_working = os.getcwd()
os.chdir(local)
for root, dirs, files in os.walk("."):
new_root = root.split(os.sep)
parent = remote_dirtree.traverse(
new_root, create_intermediate=False)
parent = remote_dirtree.traverse(new_root, create_intermediate=False)
for file in files:
fp = os.path.join(root, file)
try:
@ -120,12 +119,12 @@ class FileSystem(AdbWrapper):
os.chdir(prev_working)
for file in get_unmodified(remote_dirtree):
if config.IS_WINDOWS:
local_path = os.path.join(local, file.replace('/', os.sep))
local_path = os.path.join(local, file.replace("/", os.sep))
else:
local_path = os.path.join(local, file)
remote_path = posixpath.join(remote, file)
if dry:
print(remote_path, local_path, sep='->')
print(remote_path, local_path, sep="->")
else:
self.pull(remote_path, local_path)
@ -134,6 +133,7 @@ class NormalDevice(FileSystem):
"""Device model that represents when the device is booted in an os
"""
pass
@ -141,4 +141,3 @@ class RecoveryMode(FileSystem):
"""Device model that represents wehn the device is in recovery mode
"""
pass

35
pyadb/internal/android_db.py

@ -1,10 +1,11 @@
import shlex
import re
import csv
import re
import shlex
import sys
csv.field_size_limit(2**31-1)
dict_params = re.compile(r':([^\ \,\.\\\(\)\=]+)')
list_params = re.compile(r'\?')
csv.field_size_limit(2 ** 31 - 1)
dict_params = re.compile(r":([^\ \,\.\\\(\)\=]+)")
list_params = re.compile(r"\?")
class AndroidSQLConn:
@ -17,22 +18,26 @@ class AndroidSQLConn:
if isinstance(param, str):
return "'{}'".format(param.replace("'", "''"))
elif isinstance(param, bool):
return '1' if param else '0'
return "1" if param else "0"
elif param is None:
return 'NULL'
return "NULL"
else:
return str(param)
def _sub_params_(SQL, params):
params = iter(params)
try:
return list_params.sub(lambda match: AndroidSQLConn._quote_param_(next(params)), SQL)
return list_params.sub(
lambda match: AndroidSQLConn._quote_param_(next(params)), SQL
)
except StopIteration:
raise TypeError("Not enough parameters")
def _sub_params_dict_(SQL, params):
try:
return dict_params.sub(lambda match: AndroidSQLConn._quote_param_(params[match.group(1)]), SQL)
return dict_params.sub(
lambda match: AndroidSQLConn._quote_param_(params[match.group(1)]), SQL
)
except KeyError:
raise TypeError("Parameter specified but not in mapping")
@ -47,12 +52,16 @@ class AndroidSQLConn:
shell = self.device.sudo
else:
shell = self.device.shell
out = shell('sqlite3', '-header', '-csv',
shlex.quote(self.filepath), SQL, output="out")
out = shell(
"sqlite3", "-header", "-csv", shlex.quote(self.filepath), SQL, output="out"
)
if out:
return csv.DictReader(out.splitlines())
if __name__ == "__main__":
print(AndroidSQLConn._sub_params_(
"SELECT * FROM table WHERE name=?,age=?;", ["boby;DROP TABLE table", 1]))
print(
AndroidSQLConn._sub_params_(
"SELECT * FROM table WHERE name=?,age=?;", ["boby;DROP TABLE table", 1]
)
)

31
pyadb/internal/cli_wrap.py

@ -1,24 +1,21 @@
import subprocess
import shlex
import subprocess
class AdbWrapper:
def __init__(self,
executable_path,
serial=None
):
def __init__(self, executable_path, serial=None):
self.executable_path = executable_path
self.serial = serial
self.selfrooted = False
self.rooted = False
def exec(self, comspec, output_streams=False):
def execute(self, comspec, output_streams=False):
"""Execute ADB command. If output streams is true, return stdout and stderr streams"""
if isinstance(comspec, str):
comspec = shlex.split(comspec)
comspec.insert(0, self.executable_path)
if self.serial:
comspec = comspec[:1] + ['-s', self.serial] + comspec[1:]
comspec = comspec[:1] + ["-s", self.serial] + comspec[1:]
if output_streams:
res = subprocess.run(comspec, capture_output=True, check=True)
@ -30,25 +27,27 @@ class AdbWrapper:
def shell(self, comspec, output_streams=False):
if isinstance(comspec, str):
comspec = shlex.split(comspec)
comspec.insert(0, 'exec-out')
return self.exec(comspec, output_streams)
comspec.insert(0, "exec-out")
return self.exececute(comspec, output_streams)
def sudo(self, comspec, output_streams=False):
if isinstance(comspec, str):
comspec = shlex.split(comspec)
if not self.rooted:
comspec = ['su', '--', '--'] + comspec
comspec = ["su", "--", "--"] + comspec
return self.shell(comspec, output_streams)
def root(self):
self.exec('root')
self.exececute("root")
self.rooted = True
def unroot(self):
self.exec('unroot')
self.exececute("unroot")
self.rooted = False
if __name__ == "__main__":
exec_path = r"C:\Program Files\platform-tools\adb.exe"
wrapper = AdbWrapper(exec_path, 'LGD415d60b8c9b')
wrapper = AdbWrapper(exec_path, "LGD415d60b8c9b")
wrapper.root()
wrapper.sudo(['ls', '/'])
wrapper.sudo(["ls", "/"])

32
pyadb/internal/config.py

@ -0,0 +1,32 @@
import configparser
import re
import os.path as osp
# https://stackoverflow.com/a/11866695
ROOT = osp.dirname(__file__)
CONFIG_PATH = osp.join(ROOT, "config.ini")
class AdbConfig(configparser.ConfigParser):
def __init__(self):
super().__init__(interpolation=configparser.ExtendedInterpolation())
def getlist(self, section, option, fallback=None):
data = self.get(section, option, fallback=fallback)
if data == fallback:
return data
return list(filter(bool, re.split(" *, *", data)))
def getpath(self, section, option, fallback=None):
data = self.get(section, option, fallback=fallback)
if data == fallback:
return data
return osp.expandvars(data)
def getpaths(self, section, option, fallback=None):
data = self.getlist(section, option, fallback)
return list(map(osp.expandvars, data))
config = AdbConfig()
config.read(CONFIG_PATH)

23
pyadb/internal/directory.py

@ -15,7 +15,8 @@ class File:
self.fullpath = self.name
else:
self.fullpath = posixpath.join(
self.parent_dir.get_fullpath(real_time), self.name)
self.parent_dir.get_fullpath(real_time), self.name
)
return self.fullpath
@ -44,7 +45,7 @@ class Directory(File):
def traverse(self, path, create_intermediate=False, notify=True):
"""Traverse filesystem returning either file or directory"""
if type(path) == str:
path = path.split('/')
path = path.split("/")
if len(path) == 1:
return self[path[0]]
@ -55,7 +56,7 @@ class Directory(File):
selected = self.parent_dir
else:
selected = self
elif _next == '.':
elif _next == ".":
selected = self
else:
try:
@ -68,8 +69,7 @@ class Directory(File):
else:
raise e
return selected.traverse(
rest, create_intermediate=create_intermediate,
notify=notify
rest, create_intermediate=create_intermediate, notify=notify
)
def add_child(self, child, notify=True):
@ -91,19 +91,12 @@ class Directory(File):
yield child
@classmethod
def from_dir_listing(
cls,
listing,
rootname="",
sep='/',
is_listening=False
):
def from_dir_listing(cls, listing, rootname="", sep="/", is_listening=False):
"""Create a Directory object with files from file listing"""
root = Directory(rootname, is_listening=is_listening)
for file in listing:
parts, file = file.split(sep)
parent = root.traverse(
parts, create_intermediate=True, notify=False)
parent = root.traverse(parts, create_intermediate=True, notify=False)
file = File(file)
parent.add_child(file, notify=False)
return root
@ -117,4 +110,4 @@ def get_unmodified(dirtree: Directory):
else:
yield child.get_fullpath()
else:
yield dirtree.get_fullpath()+'/'
yield dirtree.get_fullpath() + dirtree.sep

15
pyadb/internal/load_config.py

@ -1,15 +0,0 @@
import json
import munch
import os
import configparser
path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..\\'))
parser = configparser.ConfigParser(
interpolation=configparser.ExtendedInterpolation())
parser.read(os.path.join(path, 'config.ini'))
with open(os.path.join(path, 'keycodes.json')) as k:
keycodes = munch.munchify(json.load(k))
config = parser.__dict__['_sections']
for key, value in config['local'].items():
config['local'][key] = os.path.expandvars(value)
loaded = munch.munchify(dict(config=config, keycodes=keycodes))

11
pyadb/test.py

@ -1,8 +1,13 @@
import time
from adb import Device
d = Device.prim_device()
d.root()
conn = d.db_connect(
'/data/data/com.catchingnow.tinyclipboardmanager/databases/clippingnow.db')
out = conn.execute('INSERT INTO cliphistory (date,history,star) values (?,?,?)', [
int(time.time()*1000), "test_test", False])
"/data/data/com.catchingnow.tinyclipboardmanager/databases/clippingnow.db"
)
out = conn.execute(
"INSERT INTO cliphistory (date,history,star) values (?,?,?)",
[int(time.time() * 1000), "test_test", False],
)

6
setup.py

@ -1,11 +1,11 @@
import setuptools
import setuptools
setuptools.setup(
setuptools.setup(
name="pyadb",
version="1.0",
author="Raphael Roberts",
author_email="raphael.roberts48@gmail.com",
author_email="raphael.roberts48@gmail.com",
description="Python ADB wrapper",
packages=setuptools.find_packages(),
)
Loading…
Cancel
Save