You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

144 lines
5.1 KiB

import os
import posixpath
import shutil
from send2trash import send2trash
from pyadb.internal import config
from pyadb.internal.directory import Directory, get_unmodified
from pyadb.internal.cli_wrap import AdbWrapper
def merge(src, dst, log=False):
if not os.path.exists(dst):
return False
ok = True
for path, dirs, files in os.walk(src):
relPath = os.path.relpath(path, src)
destPath = os.path.join(dst, relPath)
if not os.path.exists(destPath):
os.makedirs(destPath)
for file in files:
destFile = os.path.join(destPath, file)
if os.path.isfile(destFile):
if log:
print("Skipping existing file: " +
os.path.join(relPath, file))
ok = False
continue
srcFile = os.path.join(path, file)
shutil.move(srcFile, destFile)
for path, dirs, files in os.walk(src, False):
if len(files) == 0 and len(dirs) == 0:
os.rmdir(path)
return ok
class FileSystem(AdbWrapper):
STAT_TYPE = 1
def stat(self, file):
"""Returns information on file"""
# %a Access bits (octal) |%A Access bits (flags)|%b Size/512
# %B Bytes per %b (512) |%d Device ID (dec) |%D Device ID (hex)
# %f All mode bits (hex) |%F File type |%g Group ID
# %G Group name |%h Hard links |%i Inode
# %m Mount point |%n Filename |%N Long filename
# %o I/O block size |%s Size (bytes) |%t Devtype major
# %T Devtype minor (hex) |%u User ID |%U User name
# %x Access time |%X Access unix time |%y File write time
# %Y File write unix time|%z Dir change time |%Z Dir change time
# The valid format escape sequences for filesystems:
# %a Available blocks |%b Total blocks |%c Total inodes
# %d Free inodes |%f Free blocks |%i File system ID
# %l Max filename length |%n File name |%s Fragment size
# %S Best transfer size |%t FS type (hex) |%T FS type
command = 'stat -c "%A;%F;%U;%G" {};echo $?'.format(file)
res = self.sudo(command, output="out")
output, res = res.split('\n')
if res == '0':
return output.split(';')
def exists(self, file):
return self.stat(file) is not None
def isfile(self, file):
return self.stat(file)[FileSystem.STAT_TYPE] == 'file'
def isdir(self, file):
return self.stat(file)[FileSystem.STAT_TYPE] == 'directory'
def islink(self, file):
return self.stat(file)[FileSystem.STAT_TYPE] == 'symbolic link'
def delete(self, path):
return self.sudo(["rm", "-rf", path], output="out")
def pull(self, remote, local, override=False):
parent = os.path.dirname(local)
if not os.path.exists(parent):
os.makedirs(parent)
if os.path.exists(local):
if override:
if os.path.isdir(local):
shutil.rmtree(local)
else:
os.remove(local)
else:
raise FileExistsError
self.execute(['pull', remote, local])
def push(self, local, remote):
self.execute(['push', remote, local])
def merge_local_with_remote(self, remote, local, dry=True, delete=False):
"""Merge local directory with directory on phone, optionally deleting""" # noqa
stdout, stderr = self.execute(
['cd', remote, ';', 'find', '.', '-type', 'f'],
output_streams=True
)
listing = stdout.read().decode().rstrip().split(config.LINEFEED)
remote_dirtree = Directory.from_dir_listing(
listing, is_listening=True)
prev_working = os.getcwd()
os.chdir(local)
for root, dirs, files in os.walk("."):
new_root = root.split(os.sep)
parent = remote_dirtree.traverse(
new_root, create_intermediate=False)
for file in files:
fp = os.path.join(root, file)
try:
parent.remove_child(file)
except FileNotFoundError:
if delete:
if dry:
print("Removing:", fp)
else:
send2trash(fp)
os.chdir(prev_working)
for file in get_unmodified(remote_dirtree):
if config.IS_WINDOWS:
local_path = os.path.join(local, file.replace('/', os.sep))
else:
local_path = os.path.join(local, file)
remote_path = posixpath.join(remote, file)
if dry:
print(remote_path, local_path, sep='->')
else:
self.pull(remote_path, local_path)
class NormalDevice(FileSystem):
"""Device model that represents when the device is booted in an os
"""
pass
class RecoveryMode(FileSystem):
"""Device model that represents wehn the device is in recovery mode
"""
pass