You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
204 lines
6.6 KiB
204 lines
6.6 KiB
from magic import from_file
|
|
from gapi.api import API
|
|
from posixpath import join as pathjoin
|
|
from googleapiclient.http import MediaFileUpload
|
|
import re
|
|
import os
|
|
|
|
APPLICATION_NAME = "Google Calendar API Python"
|
|
def path_translate(
|
|
root,
|
|
want,
|
|
path,
|
|
file
|
|
):
|
|
l = len(root)+1
|
|
rel = path[l:]
|
|
new_rel = rel.replace(os.sep,'/')
|
|
return pathjoin(want,new_rel,file)
|
|
|
|
def is_folder(element):
|
|
return element['mimeType'] == 'application/vnd.google-apps.folder'
|
|
|
|
def split_path(path,rev=False):
|
|
s = re.findall(r'(?:^/|[^/\x00]+)',path)
|
|
if rev:
|
|
return s[::-1]
|
|
else:
|
|
return s
|
|
|
|
def without(node,*keys):
|
|
node_meta = set(node.keys())
|
|
nope = set(keys)
|
|
ret = {}
|
|
for key in node_meta-nope:
|
|
ret[key] = node[key]
|
|
return ret
|
|
|
|
def remote_path_join(path,*paths):
|
|
ret = path
|
|
for path in paths:
|
|
if not ret.endswith('/'):
|
|
ret += '/'
|
|
else:
|
|
ret += path
|
|
return ret
|
|
|
|
class drive_api(API):
|
|
|
|
def __init__(
|
|
self,
|
|
app_name,
|
|
client_secret_file,
|
|
credentials_dir,
|
|
scopes = 'https://www.googleapis.com/auth/drive',
|
|
version = 'v3',
|
|
):
|
|
super().__init__('drive',scopes,app_name,client_secret_file,credentials_dir,version)
|
|
root = {}
|
|
self.filesystem = {'folders':{'/':root}}
|
|
root_meta = self.service.files().get(fileId='root').execute()
|
|
del root_meta['kind']
|
|
root[0] = root_meta
|
|
root['parent'] = self.filesystem
|
|
self.__fill_in__(root)
|
|
|
|
def get_file_by_path(self,path,ret_file = True,no_parent = True):
|
|
'''gets a file or folder by remote path'''
|
|
if isinstance(path,str):
|
|
path = split_path(path)
|
|
else:
|
|
path = path.copy()
|
|
parent = self.filesystem
|
|
end = path.pop()
|
|
for sub in path:
|
|
try:
|
|
if not 'folders' in parent.keys():
|
|
self.__fill_in__(parent)
|
|
parent = parent['folders'][sub]
|
|
except KeyError:
|
|
raise FileNotFoundError
|
|
try:
|
|
if ret_file:
|
|
if not 'files' in parent.keys():
|
|
self.__fill_in__(parent)
|
|
ret = parent['files'][end]
|
|
if no_parent:
|
|
ret = without(ret,'parent')
|
|
|
|
else:
|
|
if not 'folders' in parent.keys():
|
|
self.__fill_in__(parent)
|
|
ret = parent['folders'][end]
|
|
if no_parent:
|
|
ret[0] = without(ret[0],'parent')
|
|
return ret
|
|
except KeyError:
|
|
raise FileNotFoundError
|
|
|
|
def mkdirs(self,path):
|
|
'''makes directories if they do not exist already'''
|
|
if isinstance(path,str):
|
|
path = split_path(path)
|
|
else:
|
|
path = path.copy()
|
|
missing = []
|
|
for i in range(len(path),-1,-1):
|
|
try:
|
|
parent = self.get_file_by_path(path[:i],False,no_parent = False)
|
|
break
|
|
except FileNotFoundError:
|
|
missing.append(path[i-1])
|
|
while len(missing) > 0:
|
|
|
|
new_folder = {'folders':{},'files':{}}
|
|
new_folder['parent'] = parent
|
|
name = missing.pop()
|
|
new_meta = self.__create_remote_folder__(name,parent)
|
|
del new_meta['name']
|
|
del new_meta['kind']
|
|
new_folder[0] = new_meta
|
|
parent['folders'][name] = new_folder
|
|
parent = new_folder
|
|
return parent
|
|
def upload(self,local_path,remote_path):
|
|
if not isinstance(remote_path,str):
|
|
remote_path = pathjoin(*remote_path)
|
|
if os.path.isdir(local_path):
|
|
for root,dirs,files in os.walk(local_path,topdown = False):
|
|
for file in files:
|
|
new_path = path_translate(local_path,remote_path,root,file)
|
|
self.__upload_file__(os.path.join(root,file),new_path)
|
|
print(new_path)
|
|
return self.get_file_by_path(remote_path,False,False)
|
|
else:
|
|
return self.__upload_file__(local_path,remote_path)
|
|
|
|
def __upload_file__(self,local_path,remote_path):
|
|
'''creates file if it does not exists otherwise update the file'''
|
|
if isinstance(remote_path,str):
|
|
remote_path = split_path(remote_path)
|
|
else:
|
|
remote_path = remote_path.copy()
|
|
_upload = MediaFileUpload(local_path)
|
|
|
|
try:
|
|
old_file = self.get_file_by_path(remote_path)
|
|
self.service.files().update(fileId=old_file['id'],media_body=_upload).execute()
|
|
return old_file
|
|
|
|
except FileNotFoundError:
|
|
path = remote_path
|
|
end = path.pop()
|
|
parent = self.mkdirs(path)
|
|
parent_id = parent[0]['id']
|
|
meta = {
|
|
'name':end,
|
|
'parents':[parent_id]
|
|
}
|
|
new_f_meta = self.service.files().create(
|
|
body = meta,
|
|
media_body = _upload
|
|
).execute()
|
|
new_f_meta = without(new_f_meta,'kind')
|
|
new_f_meta['parent'] = parent
|
|
parent['files'][end] = new_f_meta
|
|
return new_f_meta
|
|
|
|
def __create_remote_folder__(self,name,parent):
|
|
meta = {
|
|
'name':name,
|
|
'mimeType':'application/vnd.google-apps.folder',
|
|
'parents' : [parent[0]['id']]
|
|
}
|
|
return self.service.files().create(body = meta).execute()
|
|
|
|
def __fill_in__(self,parent):
|
|
'''finds all files and folders from a parent'''
|
|
parent_id = parent[0]['id']
|
|
child_list = self.__list_children__(parent_id)
|
|
parent['files'] = {}
|
|
parent['folders'] = {}
|
|
for child in child_list:
|
|
del child['kind']
|
|
name = child.pop('name')
|
|
child['parent'] = parent
|
|
if is_folder(child):
|
|
parent['folders'][name] = {}
|
|
parent['folders'][name][0] = child
|
|
else:
|
|
parent['files'][name] = child
|
|
|
|
def __list_children__(self,parent_id):
|
|
page_token = None
|
|
first = True
|
|
while page_token or first:
|
|
first = False
|
|
response = self.service.files().list(q = "'{}' in parents and trashed = false".format(parent_id),pageToken=page_token).execute()
|
|
page_token = response.get('nextPageToken',None)
|
|
for file in response['files']:
|
|
yield file
|
|
|
|
if __name__ == "__main__":
|
|
my_api = drive_api(APPLICATION_NAME,r'..\test\drive\client_secret.json',r'..\test\drive')
|
|
service = my_api.service
|