|
|
|
@ -44,6 +44,80 @@ def remote_path_join(path,*paths): |
|
|
|
ret += path |
|
|
|
return ret |
|
|
|
|
|
|
|
class drive_file: |
|
|
|
def __init__(self, |
|
|
|
api, |
|
|
|
info, |
|
|
|
parent |
|
|
|
): |
|
|
|
self.id = info.pop('id') |
|
|
|
self.name = info.pop('name') |
|
|
|
self.api = api |
|
|
|
self.info = info |
|
|
|
self.parent = parent |
|
|
|
self._fullpath = None |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def from_id(cls,id,api,parent): |
|
|
|
info = api.service.files().get(fileId='root').execute() |
|
|
|
return cls(api,info,parent) |
|
|
|
|
|
|
|
@property |
|
|
|
def fullpath(self): |
|
|
|
if self._fullpath is None: |
|
|
|
if self.parent is None: |
|
|
|
path = '/' |
|
|
|
else: |
|
|
|
path = pathjoin(self.parent.fullpath,self.name) |
|
|
|
self._fullpath = path |
|
|
|
return self._fullpath |
|
|
|
|
|
|
|
def __str__(self): |
|
|
|
return '{}@{}'.format(type(self).__name__,self.fullpath) |
|
|
|
|
|
|
|
def __repr__(self): |
|
|
|
return str(self) |
|
|
|
|
|
|
|
class drive_folder(drive_file): |
|
|
|
def __init__(self, |
|
|
|
api, |
|
|
|
info, |
|
|
|
parent |
|
|
|
): |
|
|
|
super().__init__(api,info,parent) |
|
|
|
self.folders = {} |
|
|
|
self.files = {} |
|
|
|
|
|
|
|
def create_folder(self,name): |
|
|
|
meta = { |
|
|
|
'name':name, |
|
|
|
'mimeType':'application/vnd.google-apps.folder', |
|
|
|
'parents' : [self.id] |
|
|
|
} |
|
|
|
info = self.api.service.files().create(body = meta).execute() |
|
|
|
self.folders[name] = drive_folder(self.api,info,self) |
|
|
|
|
|
|
|
def refresh(self): |
|
|
|
'''finds all files and folders from a parent''' |
|
|
|
parent_id = self.id |
|
|
|
child_list = self.__list_children__() |
|
|
|
for child in child_list: |
|
|
|
name = child['name'] |
|
|
|
if is_folder(child): |
|
|
|
self.folders[name] = drive_folder(self.api,child,self) |
|
|
|
else: |
|
|
|
self.files[name] = drive_file(self.api,child,self) |
|
|
|
|
|
|
|
def __list_children__(self): |
|
|
|
page_token = None |
|
|
|
first = True |
|
|
|
while page_token or first: |
|
|
|
first = False |
|
|
|
response = self.api.service.files().list(q = "'{}' in parents and trashed = false".format(self.id),pageToken=page_token).execute() |
|
|
|
page_token = response.get('nextPageToken',None) |
|
|
|
for file in response['files']: |
|
|
|
yield file |
|
|
|
|
|
|
|
class drive_api(API): |
|
|
|
|
|
|
|
def __init__( |
|
|
|
@ -55,43 +129,35 @@ class drive_api(API): |
|
|
|
version = 'v3', |
|
|
|
): |
|
|
|
super().__init__('drive',scopes,app_name,client_secret_file,credentials_dir,version) |
|
|
|
root = {} |
|
|
|
self.filesystem = {'folders':{'/':root}} |
|
|
|
root_meta = self.service.files().get(fileId='root').execute() |
|
|
|
del root_meta['kind'] |
|
|
|
root[0] = root_meta |
|
|
|
root['parent'] = self.filesystem |
|
|
|
self.__fill_in__(root) |
|
|
|
|
|
|
|
def get_file_by_path(self,path,ret_file = True,no_parent = True): |
|
|
|
self.root = drive_folder.from_id('root',self,None) |
|
|
|
|
|
|
|
def get_file_by_path(self,path,ret_file = True): |
|
|
|
'''gets a file or folder by remote path''' |
|
|
|
if isinstance(path,str): |
|
|
|
path = split_path(path) |
|
|
|
else: |
|
|
|
path = path.copy() |
|
|
|
parent = self.filesystem |
|
|
|
parent = self.root |
|
|
|
end = path.pop() |
|
|
|
if end == '/': |
|
|
|
return self.root |
|
|
|
for sub in path: |
|
|
|
try: |
|
|
|
if not 'folders' in parent.keys(): |
|
|
|
self.__fill_in__(parent) |
|
|
|
parent = parent['folders'][sub] |
|
|
|
except KeyError: |
|
|
|
raise FileNotFoundError |
|
|
|
if sub != '/': |
|
|
|
try: |
|
|
|
if not parent.folders: |
|
|
|
parent.refresh() |
|
|
|
parent = parent.folders[sub] |
|
|
|
except KeyError: |
|
|
|
raise FileNotFoundError |
|
|
|
try: |
|
|
|
if ret_file: |
|
|
|
if not 'files' in parent.keys(): |
|
|
|
self.__fill_in__(parent) |
|
|
|
ret = parent['files'][end] |
|
|
|
if no_parent: |
|
|
|
ret = without(ret,'parent') |
|
|
|
|
|
|
|
if not parent.files: |
|
|
|
parent.refresh() |
|
|
|
ret = parent.files[end] |
|
|
|
else: |
|
|
|
if not 'folders' in parent.keys(): |
|
|
|
self.__fill_in__(parent) |
|
|
|
ret = parent['folders'][end] |
|
|
|
if no_parent: |
|
|
|
ret[0] = without(ret[0],'parent') |
|
|
|
if not parent.folders: |
|
|
|
parent.refresh() |
|
|
|
ret = parent.folders[end] |
|
|
|
return ret |
|
|
|
except KeyError: |
|
|
|
raise FileNotFoundError |
|
|
|
@ -103,25 +169,19 @@ class drive_api(API): |
|
|
|
else: |
|
|
|
path = path.copy() |
|
|
|
missing = [] |
|
|
|
for i in range(len(path),-1,-1): |
|
|
|
for i in range(len(path),0,-1): |
|
|
|
try: |
|
|
|
parent = self.get_file_by_path(path[:i],False,no_parent = False) |
|
|
|
parent = self.get_file_by_path(path[:i],False) |
|
|
|
break |
|
|
|
except FileNotFoundError: |
|
|
|
missing.append(path[i-1]) |
|
|
|
while len(missing) > 0: |
|
|
|
|
|
|
|
new_folder = {'folders':{},'files':{}} |
|
|
|
new_folder['parent'] = parent |
|
|
|
name = missing.pop() |
|
|
|
new_meta = self.__create_remote_folder__(name,parent) |
|
|
|
del new_meta['name'] |
|
|
|
del new_meta['kind'] |
|
|
|
new_folder[0] = new_meta |
|
|
|
parent['folders'][name] = new_folder |
|
|
|
parent = new_folder |
|
|
|
parent.create_folder(name) |
|
|
|
parent = parent.folders[name] |
|
|
|
return parent |
|
|
|
def upload(self,local_path,remote_path): |
|
|
|
|
|
|
|
def upload(self,local_path,remote_path,verbose = False): |
|
|
|
if not isinstance(remote_path,str): |
|
|
|
remote_path = pathjoin(*remote_path) |
|
|
|
if os.path.isdir(local_path): |
|
|
|
@ -129,8 +189,9 @@ class drive_api(API): |
|
|
|
for file in files: |
|
|
|
new_path = path_translate(local_path,remote_path,root,file) |
|
|
|
self.__upload_file__(os.path.join(root,file),new_path) |
|
|
|
print(new_path) |
|
|
|
return self.get_file_by_path(remote_path,False,False) |
|
|
|
if verbose: |
|
|
|
print(new_path) |
|
|
|
return self.get_file_by_path(remote_path,False) |
|
|
|
else: |
|
|
|
return self.__upload_file__(local_path,remote_path) |
|
|
|
|
|
|
|
@ -144,14 +205,14 @@ class drive_api(API): |
|
|
|
|
|
|
|
try: |
|
|
|
old_file = self.get_file_by_path(remote_path) |
|
|
|
self.service.files().update(fileId=old_file['id'],media_body=_upload).execute() |
|
|
|
self.service.files().update(fileId=old_file.id,media_body=_upload).execute() |
|
|
|
return old_file |
|
|
|
|
|
|
|
except FileNotFoundError: |
|
|
|
path = remote_path |
|
|
|
end = path.pop() |
|
|
|
parent = self.mkdirs(path) |
|
|
|
parent_id = parent[0]['id'] |
|
|
|
parent_id = parent.id |
|
|
|
meta = { |
|
|
|
'name':end, |
|
|
|
'parents':[parent_id] |
|
|
|
@ -161,43 +222,9 @@ class drive_api(API): |
|
|
|
media_body = _upload |
|
|
|
).execute() |
|
|
|
new_f_meta = without(new_f_meta,'kind') |
|
|
|
new_f_meta['parent'] = parent |
|
|
|
parent['files'][end] = new_f_meta |
|
|
|
return new_f_meta |
|
|
|
|
|
|
|
def __create_remote_folder__(self,name,parent): |
|
|
|
meta = { |
|
|
|
'name':name, |
|
|
|
'mimeType':'application/vnd.google-apps.folder', |
|
|
|
'parents' : [parent[0]['id']] |
|
|
|
} |
|
|
|
return self.service.files().create(body = meta).execute() |
|
|
|
|
|
|
|
def __fill_in__(self,parent): |
|
|
|
'''finds all files and folders from a parent''' |
|
|
|
parent_id = parent[0]['id'] |
|
|
|
child_list = self.__list_children__(parent_id) |
|
|
|
parent['files'] = {} |
|
|
|
parent['folders'] = {} |
|
|
|
for child in child_list: |
|
|
|
del child['kind'] |
|
|
|
name = child.pop('name') |
|
|
|
child['parent'] = parent |
|
|
|
if is_folder(child): |
|
|
|
parent['folders'][name] = {} |
|
|
|
parent['folders'][name][0] = child |
|
|
|
else: |
|
|
|
parent['files'][name] = child |
|
|
|
|
|
|
|
def __list_children__(self,parent_id): |
|
|
|
page_token = None |
|
|
|
first = True |
|
|
|
while page_token or first: |
|
|
|
first = False |
|
|
|
response = self.service.files().list(q = "'{}' in parents and trashed = false".format(parent_id),pageToken=page_token).execute() |
|
|
|
page_token = response.get('nextPageToken',None) |
|
|
|
for file in response['files']: |
|
|
|
yield file |
|
|
|
new_f = drive_file(self,new_f_meta,parent) |
|
|
|
parent.files[end] = new_f |
|
|
|
return new_f |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
my_api = drive_api(APPLICATION_NAME,r'..\test\drive\client_secret.json',r'..\test\drive') |
|
|
|
|