|
|
# from magic import from_filefrom gapi.api import APIfrom posixpath import join as pathjoinfrom googleapiclient.http import MediaFileUploadimport reimport os
APPLICATION_NAME = "Google Calendar API Python"
def path_translate(root, want, path, file): l = len(root) + 1 rel = path[l:] new_rel = rel.replace(os.sep, "/") return pathjoin(want, new_rel, file)
def is_folder(element): return element["mimeType"] == "application/vnd.google-apps.folder"
def split_path(path, rev=False): s = re.findall(r"(?:^/|[^/\x00]+)", path) if rev: return s[::-1] else: return s
def without(node, *keys): node_meta = set(node.keys()) nope = set(keys) ret = {} for key in node_meta - nope: ret[key] = node[key] return ret
def remote_path_join(path, *paths): ret = path for path in paths: if not ret.endswith("/"): ret += "/" else: ret += path return ret
class drive_file: def __init__(self, api, info, parent): self.id = info.pop("id") self.name = info.pop("name") self.api = api self.info = info self.parent = parent self._fullpath = None
@classmethod def from_id(cls, id, api, parent): info = api.service.files().get(fileId="root").execute() return cls(api, info, parent)
@property def fullpath(self): if self._fullpath is None: if self.parent is None: path = "/" else: path = pathjoin(self.parent.fullpath, self.name) self._fullpath = path return self._fullpath
def __str__(self): return "{}@{}".format(type(self).__name__, self.fullpath)
def __repr__(self): return str(self)
class drive_folder(drive_file): def __init__(self, api, info, parent): super().__init__(api, info, parent) self.folders = {} self.files = {}
def create_folder(self, name): meta = { "name": name, "mimeType": "application/vnd.google-apps.folder", "parents": [self.id], } info = self.api.service.files().create(body=meta).execute() self.folders[name] = drive_folder(self.api, info, self)
def refresh(self): """finds all files and folders from a parent""" parent_id = self.id child_list = self.__list_children__() for child in child_list: name = child["name"] if is_folder(child): self.folders[name] = drive_folder(self.api, child, self) else: self.files[name] = drive_file(self.api, child, self)
def __list_children__(self): page_token = None first = True while page_token or first: first = False response = ( self.api.service.files() .list( q="'{}' in parents and trashed = false".format(self.id), pageToken=page_token, ) .execute() ) page_token = response.get("nextPageToken", None) for file in response["files"]: yield file
class drive_api(API): def __init__( self, app_name, client_secret_file, credentials_dir, scopes="https://www.googleapis.com/auth/drive", version="v3", ): super().__init__( "drive", scopes, app_name, client_secret_file, credentials_dir, version ) self.root = drive_folder.from_id("root", self, None)
def get_file_by_path(self, path, ret_file=True): """gets a file or folder by remote path""" if isinstance(path, str): path = split_path(path) else: path = path.copy() parent = self.root end = path.pop() if end == "/": return self.root for sub in path: if sub != "/": try: if not parent.folders: parent.refresh() parent = parent.folders[sub] except KeyError: raise FileNotFoundError try: if ret_file: if not parent.files: parent.refresh() ret = parent.files[end] else: if not parent.folders: parent.refresh() ret = parent.folders[end] return ret except KeyError: raise FileNotFoundError
def mkdirs(self, path): """makes directories if they do not exist already""" if isinstance(path, str): path = split_path(path) else: path = path.copy() missing = [] for i in range(len(path), 0, -1): try: parent = self.get_file_by_path(path[:i], False) break except FileNotFoundError: missing.append(path[i - 1]) while len(missing) > 0: name = missing.pop() parent.create_folder(name) parent = parent.folders[name] return parent
def upload(self, local_path, remote_path, verbose=False): if not isinstance(remote_path, str): remote_path = pathjoin(*remote_path) if os.path.isdir(local_path): for root, dirs, files in os.walk(local_path, topdown=False): for file in files: new_path = path_translate(local_path, remote_path, root, file) self.__upload_file__(os.path.join(root, file), new_path) if verbose: print(new_path) return self.get_file_by_path(remote_path, False) else: return self.__upload_file__(local_path, remote_path)
def __upload_file__(self, local_path, remote_path): """creates file if it does not exists otherwise update the file""" if isinstance(remote_path, str): remote_path = split_path(remote_path) else: remote_path = remote_path.copy() _upload = MediaFileUpload(local_path)
try: old_file = self.get_file_by_path(remote_path) self.service.files().update( fileId=old_file.id, media_body=_upload ).execute() return old_file
except FileNotFoundError: path = remote_path end = path.pop() parent = self.mkdirs(path) parent_id = parent.id meta = {"name": end, "parents": [parent_id]} new_f_meta = ( self.service.files().create(body=meta, media_body=_upload).execute() ) new_f_meta = without(new_f_meta, "kind") new_f = drive_file(self, new_f_meta, parent) parent.files[end] = new_f return new_f
if __name__ == "__main__": my_api = drive_api( APPLICATION_NAME, r"..\test\drive\client_secret.json", r"..\test\drive" ) service = my_api.service
|