You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

231 lines
6.9 KiB

from magic import from_file
from gapi.api import API
from posixpath import join as pathjoin
from googleapiclient.http import MediaFileUpload
import re
import os
APPLICATION_NAME = "Google Calendar API Python"
def path_translate(
root,
want,
path,
file
):
l = len(root)+1
rel = path[l:]
new_rel = rel.replace(os.sep,'/')
return pathjoin(want,new_rel,file)
def is_folder(element):
return element['mimeType'] == 'application/vnd.google-apps.folder'
def split_path(path,rev=False):
s = re.findall(r'(?:^/|[^/\x00]+)',path)
if rev:
return s[::-1]
else:
return s
def without(node,*keys):
node_meta = set(node.keys())
nope = set(keys)
ret = {}
for key in node_meta-nope:
ret[key] = node[key]
return ret
def remote_path_join(path,*paths):
ret = path
for path in paths:
if not ret.endswith('/'):
ret += '/'
else:
ret += path
return ret
class drive_file:
def __init__(self,
api,
info,
parent
):
self.id = info.pop('id')
self.name = info.pop('name')
self.api = api
self.info = info
self.parent = parent
self._fullpath = None
@classmethod
def from_id(cls,id,api,parent):
info = api.service.files().get(fileId='root').execute()
return cls(api,info,parent)
@property
def fullpath(self):
if self._fullpath is None:
if self.parent is None:
path = '/'
else:
path = pathjoin(self.parent.fullpath,self.name)
self._fullpath = path
return self._fullpath
def __str__(self):
return '{}@{}'.format(type(self).__name__,self.fullpath)
def __repr__(self):
return str(self)
class drive_folder(drive_file):
def __init__(self,
api,
info,
parent
):
super().__init__(api,info,parent)
self.folders = {}
self.files = {}
def create_folder(self,name):
meta = {
'name':name,
'mimeType':'application/vnd.google-apps.folder',
'parents' : [self.id]
}
info = self.api.service.files().create(body = meta).execute()
self.folders['name'] = drive_folder(self.api,info,self)
def refresh(self):
'''finds all files and folders from a parent'''
parent_id = self.id
child_list = self.__list_children__()
for child in child_list:
name = child['name']
if is_folder(child):
self.folders[name] = drive_folder(self.api,child,self)
else:
self.files[name] = drive_file(self.api,child,self)
def __list_children__(self):
page_token = None
first = True
while page_token or first:
first = False
response = self.api.service.files().list(q = "'{}' in parents and trashed = false".format(self.id),pageToken=page_token).execute()
page_token = response.get('nextPageToken',None)
for file in response['files']:
yield file
# def create_file(self,
class drive_api(API):
def __init__(
self,
app_name,
client_secret_file,
credentials_dir,
scopes = 'https://www.googleapis.com/auth/drive',
version = 'v3',
):
super().__init__('drive',scopes,app_name,client_secret_file,credentials_dir,version)
self.root = drive_folder.from_id('root',self,None)
def get_file_by_path(self,path,ret_file = True):
if path == '/':
return self.root
'''gets a file or folder by remote path'''
if isinstance(path,str):
path = split_path(path)
else:
path = path.copy()
parent = self.root
end = path.pop()
for sub in path:
if sub == '/':
pass
else:
try:
if not parent.folders:
parent.refresh()
parent = parent.folders[sub]
except KeyError:
raise FileNotFoundError
try:
if ret_file:
if not parent.files:
parent.refresh()
ret = parent.files[end]
else:
if not parent.folders:
parent.refresh()
ret = parent.folders[end]
return ret
except KeyError:
raise FileNotFoundError
def mkdirs(self,path):
'''makes directories if they do not exist already'''
if isinstance(path,str):
path = split_path(path)
else:
path = path.copy()
missing = []
for i in range(len(path),-1,-1):
try:
parent = self.get_file_by_path(path[:i],False)
break
except FileNotFoundError:
missing.append(path[i-1])
while len(missing) > 0:
name = missing.pop()
parent = parent.folders[name]
return parent
def upload(self,local_path,remote_path):
if not isinstance(remote_path,str):
remote_path = pathjoin(*remote_path)
if os.path.isdir(local_path):
for root,dirs,files in os.walk(local_path,topdown = False):
for file in files:
new_path = path_translate(local_path,remote_path,root,file)
self.__upload_file__(os.path.join(root,file),new_path)
print(new_path)
return self.get_file_by_path(remote_path,False,False)
else:
return self.__upload_file__(local_path,remote_path)
def __upload_file__(self,local_path,remote_path):
'''creates file if it does not exists otherwise update the file'''
if isinstance(remote_path,str):
remote_path = split_path(remote_path)
else:
remote_path = remote_path.copy()
_upload = MediaFileUpload(local_path)
try:
old_file = self.get_file_by_path(remote_path)
self.service.files().update(fileId=old_file['id'],media_body=_upload).execute()
return old_file
except FileNotFoundError:
path = remote_path
end = path.pop()
parent = self.mkdirs(path)
parent_id = parent[0]['id']
meta = {
'name':end,
'parents':[parent_id]
}
new_f_meta = self.service.files().create(
body = meta,
media_body = _upload
).execute()
new_f_meta = without(new_f_meta,'kind')
new_f_meta['parent'] = parent
parent['files'][end] = new_f_meta
return new_f_meta
if __name__ == "__main__":
my_api = drive_api(APPLICATION_NAME,r'..\test\drive\client_secret.json',r'..\test\drive')
service = my_api.service