from magic import from_file from gapi.api import API from posixpath import join as pathjoin from googleapiclient.http import MediaFileUpload import re import os APPLICATION_NAME = "Google Calendar API Python" def path_translate( root, want, path, file ): l = len(root)+1 rel = path[l:] new_rel = rel.replace(os.sep,'/') return pathjoin(want,new_rel,file) def is_folder(element): return element['mimeType'] == 'application/vnd.google-apps.folder' def split_path(path,rev=False): s = re.findall(r'(?:^/|[^/\x00]+)',path) if rev: return s[::-1] else: return s def without(node,*keys): node_meta = set(node.keys()) nope = set(keys) ret = {} for key in node_meta-nope: ret[key] = node[key] return ret def remote_path_join(path,*paths): ret = path for path in paths: if not ret.endswith('/'): ret += '/' else: ret += path return ret class drive_file: def __init__(self, api, info, parent ): self.id = info.pop('id') self.name = info.pop('name') self.api = api self.info = info self.parent = parent self._fullpath = None @classmethod def from_id(cls,id,api,parent): info = api.service.files().get(fileId='root').execute() return cls(api,info,parent) @property def fullpath(self): if self._fullpath is None: if self.parent is None: path = '/' else: path = pathjoin(self.parent.fullpath,self.name) self._fullpath = path return self._fullpath def __str__(self): return '{}@{}'.format(type(self).__name__,self.fullpath) def __repr__(self): return str(self) class drive_folder(drive_file): def __init__(self, api, info, parent ): super().__init__(api,info,parent) self.folders = {} self.files = {} def create_folder(self,name): meta = { 'name':name, 'mimeType':'application/vnd.google-apps.folder', 'parents' : [self.id] } info = self.api.service.files().create(body = meta).execute() self.folders[name] = drive_folder(self.api,info,self) def refresh(self): '''finds all files and folders from a parent''' parent_id = self.id child_list = self.__list_children__() for child in child_list: name = child['name'] if is_folder(child): self.folders[name] = drive_folder(self.api,child,self) else: self.files[name] = drive_file(self.api,child,self) def __list_children__(self): page_token = None first = True while page_token or first: first = False response = self.api.service.files().list(q = "'{}' in parents and trashed = false".format(self.id),pageToken=page_token).execute() page_token = response.get('nextPageToken',None) for file in response['files']: yield file # def create_file(self, class drive_api(API): def __init__( self, app_name, client_secret_file, credentials_dir, scopes = 'https://www.googleapis.com/auth/drive', version = 'v3', ): super().__init__('drive',scopes,app_name,client_secret_file,credentials_dir,version) self.root = drive_folder.from_id('root',self,None) def get_file_by_path(self,path,ret_file = True): '''gets a file or folder by remote path''' if isinstance(path,str): path = split_path(path) else: path = path.copy() parent = self.root end = path.pop() if end == '/': return self.root for sub in path: if sub == '/': pass else: try: if not parent.folders: parent.refresh() parent = parent.folders[sub] except KeyError: raise FileNotFoundError try: if ret_file: if not parent.files: parent.refresh() ret = parent.files[end] else: if not parent.folders: parent.refresh() ret = parent.folders[end] return ret except KeyError: raise FileNotFoundError def mkdirs(self,path): '''makes directories if they do not exist already''' if isinstance(path,str): path = split_path(path) else: path = path.copy() missing = [] for i in range(len(path),0,-1): try: parent = self.get_file_by_path(path[:i],False) break except FileNotFoundError: missing.append(path[i-1]) while len(missing) > 0: name = missing.pop() parent.create_folder(name) parent = parent.folders[name] return parent def upload(self,local_path,remote_path,verbose = False): if not isinstance(remote_path,str): remote_path = pathjoin(*remote_path) if os.path.isdir(local_path): for root,dirs,files in os.walk(local_path,topdown = False): for file in files: new_path = path_translate(local_path,remote_path,root,file) self.__upload_file__(os.path.join(root,file),new_path) if verbose: print(new_path) return self.get_file_by_path(remote_path,False) else: return self.__upload_file__(local_path,remote_path) def __upload_file__(self,local_path,remote_path): '''creates file if it does not exists otherwise update the file''' if isinstance(remote_path,str): remote_path = split_path(remote_path) else: remote_path = remote_path.copy() _upload = MediaFileUpload(local_path) try: old_file = self.get_file_by_path(remote_path) self.service.files().update(fileId=old_file.id,media_body=_upload).execute() return old_file except FileNotFoundError: path = remote_path end = path.pop() parent = self.mkdirs(path) parent_id = parent.id meta = { 'name':end, 'parents':[parent_id] } new_f_meta = self.service.files().create( body = meta, media_body = _upload ).execute() new_f_meta = without(new_f_meta,'kind') new_f = drive_file(self,new_f_meta,parent) parent.files[end] = new_f return new_f if __name__ == "__main__": my_api = drive_api(APPLICATION_NAME,r'..\test\drive\client_secret.json',r'..\test\drive') service = my_api.service