|
|
import datetimeimport jsonimport osimport reimport shutilimport subprocessimport timeimport shlexfrom android_db import AndroidSQLConnfrom load_things import loadedfrom decode_parcel import decode_parceldebug = Trueconfig = loaded.configkeycodes = loaded.keycodesexe = config.defaults.exe
def merge(src, dst, log=False): if not os.path.exists(dst): return False ok = True for path, dirs, files in os.walk(src): relPath = os.path.relpath(path, src) destPath = os.path.join(dst, relPath) if not os.path.exists(destPath): os.makedirs(destPath) for file in files: destFile = os.path.join(destPath, file) if os.path.isfile(destFile): if log: print("Skipping existing file: " + os.path.join(relPath, file)) ok = False continue srcFile = os.path.join(path, file) shutil.move(srcFile, destFile) for path, dirs, files in os.walk(src, False): if len(files) == 0 and len(dirs) == 0: os.rmdir(path) return ok
def _adb(*args, output="shell"): '''Output modes:
"out": return output "shell": print to shell "buffered": read line by line'''
args = [exe] + list(args) if output == "out": return subprocess.check_output(args, shell=False).decode().replace('\r\n', '\n').rstrip() elif output == "shell": ret = subprocess.call(args, shell=False) if ret: raise subprocess.CalledProcessError(ret, args) elif output == "buffered": p = subprocess.Popen(args, stdout=subprocess.PIPE) return p.stdout
def kill_server(): _adb('kill-server')
def start_server(): _adb('start-server')
def get_info(): start_server() thing = _adb("devices", "-l", output="out") formed = list(filter(bool, thing.split("\n")))[1:] main = {} for device in formed: categories = re.split(" +", device) device_dict = { "serial": categories[0], "mode": categories[1] } device_dict.update(dict(category.split(":") for category in categories[2:])) main[categories[0]] = device_dict return main
class ADBWrapper: root_mode = False
def connect(ip, port=5555): if not re.match(r'(\d{1,3}\.){3}\d{1,3}', ip): raise TypeError("Invalid ip") if not all(int(n) <= 255 and int(n) >= 0 for n in ip.split('.')): raise TypeError("Invalid ip") if not (port >= 0 and port <= 2**16-1): raise TyperError("Port must be in the range 0-65536") id = '{}:{}'.format(ip, port) _adb('connect', '{}:{}'.format(ip, port)) dev = Device(id) dev.tcip = True return dev
def disconnect(self): if self.tcip: _adb('disconnect', self.serial)
def db_connect(self, filepath): return AndroidSQLConn(self, filepath)
def prim_device(): cont = True while cont: try: d = Device() cont = False except IndexError: time.sleep(1) return d
def __init__(self, serial=None): self.tcip = False if serial: self.serial = serial info = get_info()[serial] else: serial, info = list(get_info().items())[0] self.__dict__.update(info)
def adb(self, *args, output="shell"): args = ['-s', self.serial] + list(args) return _adb(*args, output=output)
def shell(self, *args, output="shell"): args = ('shell',)+args return self.adb(*args, output=output)
def sudo(self, *args, output="shell"): if self.mode == 'recovery' or self.root_mode: return self.shell(*args, output=output) else: return self.shell('su', '--', '--', *args, output=output)
@classmethod def root(cls): cls.root_mode = True _adb('root')
@classmethod def unroot(cls): cls.root_mode = False _adb('unroot')
def reboot(self, mode=None): if mode: if mode == "soft": if self.mode != 'recovery': pid = self.shell("pidof", "zygote", output="out") return self.sudo("kill", pid, output="shell") else: return self.reboot() else: self.adb("reboot", mode) else: self.adb("reboot") while True: infos = get_info() if len(infos) > 0: self.__dict__.update(infos[self.serial]) break time.sleep(1)
class FSActionWrapper(ADBWrapper):
def stat(self, file): '''\
%a Access bits (octal) |%A Access bits (flags)|%b Size/512%B Bytes per %b (512) |%d Device ID (dec) |%D Device ID (hex)%f All mode bits (hex) |%F File type |%g Group ID%G Group name |%h Hard links |%i Inode%m Mount point |%n Filename |%N Long filename%o I/O block size |%s Size (bytes) |%t Devtype major (hex)%T Devtype minor (hex) |%u User ID |%U User name%x Access time |%X Access unix time |%y File write time%Y File write unix time|%z Dir change time |%Z Dir change unix time
The valid format escape sequences for filesystems:%a Available blocks |%b Total blocks |%c Total inodes%d Free inodes |%f Free blocks |%i File system ID%l Max filename length |%n File name |%s Fragment size%S Best transfer size |%t FS type (hex) |%T FS type (driver name)'''
command = 'stat -c "%A;%F;%U;%G" {};echo $?'.format(file) res = self.sudo(command, output="out") output, res = res.split('\n') if res == '0': return output.split(';')
def exists(self, file): return self.stat(file) is not None
def isfile(self, file): return self.stat(file)[1] == 'file'
def isdir(self, file): return self.stat(file)[1] == 'directory'
def islink(self, file): return self.stat(file)[1] == 'symbolic link'
def delete(self, path): return self.sudo("rm", "-rf", path, output="out")
def copy(self, remote, local, del_duplicates=True, ignore_error=True): remote_stat = self.stat(remote) if remote_stat is not None: if remote_stat[1] == "directory" and not remote.endswith('/'): remote += '/' merge_flag = False if os.path.exists(local): last = os.path.split(local)[-1] real_dir = local local = os.path.join(config['local']['temp'], last) merge_flag = True try: self.adb("pull", "-a", remote, local) except subprocess.CalledProcessError as e: if ignore_error: pass else: raise e if merge_flag: merge(local, real_dir) if os.path.exists(local) and del_duplicates: shutil.rmtree(local) else: print("File not found: {}".format(remote))
def move(self, remote, local, del_duplicates=True, ignore_error=False): if self.exists(remote): self.copy(remote, local, del_duplicates=del_duplicates, ignore_error=ignore_error) self.delete(remote) else: print("File not found: {}".format(remote))
def push(self, local, remote): self.adb('push', local, remote)
class Input(ADBWrapper):
def send_keycode(self, code): try: keycode = keycodes[code] except KeyError: keycode = str(code) self.shell("input", "keyevent", keycode)
def unlock_phone(self, password): if self.mode == 'recovery': return if not decode_parcel(self.shell('service', 'call', 'power', '12', output="out"), 'int'): self.send_keycode('power') if decode_parcel(self.shell('service', 'call', 'trust', '7', output="out"), 'int'): self.send_keycode('space') self.shell("input", "text", str(password)) self.send_keycode('enter')
class TWRP(FSActionWrapper):
def backup(self, *partitions, name=None, backupdir=None): if self.mode != 'recovery': self.reboot('recovery') if backupdir is None: backupdir = config['local']['twrp'] else: backupdir = backupdir options_dict = { "system": "S", "data": "D", "cache": "C", "recovery": "R", "spec_part_1": "1", "spec_part_2": "2", "spec_part_3": "3", "boot": "B", "as": "A" } options = "".join(options_dict[option] for option in partitions) if not name: name = "backup_" + \ datetime.datetime.today().strftime(defaults['date_format']) filename = os.path.join(backupdir, name) self.shell("twrp", "backup", options, name) phone_dir = "/data/media/0/TWRP/BACKUPS/{serial}/{name}".format( serial=self.serial, name=name) self.move(phone_dir, filename)
def wipe(self, partition): if self.mode != 'recovery': self.reboot('recovery') self.shell("twrp", "wipe", partition)
def install(self, name): if self.mode != 'recovery': self.reboot('recovery') if os.path.exists(name): local_name = name name = os.path.split(name)[-1] update_path = '{}/{}'.format(config['remote']['updates'], name) if not self.exists(config['remote']['updates']): self.sudo('mkdir', config['remote']['updates']) if not self.exists(update_path): self.push(local_name, config['remote']['updates']) else: update_path = '{}/{}'.format(config['remote']['updates'], name) self.shell("twrp", "install", update_path)
class Device(TWRP, Input): pass
if __name__ == "__main__" and debug: d = Device.prim_device()
|