You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
236 lines
7.0 KiB
236 lines
7.0 KiB
# from magic import from_file
|
|
from gapi.api import API
|
|
from posixpath import join as pathjoin
|
|
from googleapiclient.http import MediaFileUpload
|
|
import re
|
|
import os
|
|
|
|
APPLICATION_NAME = "Google Calendar API Python"
|
|
|
|
|
|
def path_translate(root, want, path, file):
|
|
l = len(root) + 1
|
|
rel = path[l:]
|
|
new_rel = rel.replace(os.sep, "/")
|
|
return pathjoin(want, new_rel, file)
|
|
|
|
|
|
def is_folder(element):
|
|
return element["mimeType"] == "application/vnd.google-apps.folder"
|
|
|
|
|
|
def split_path(path, rev=False):
|
|
s = re.findall(r"(?:^/|[^/\x00]+)", path)
|
|
if rev:
|
|
return s[::-1]
|
|
else:
|
|
return s
|
|
|
|
|
|
def without(node, *keys):
|
|
node_meta = set(node.keys())
|
|
nope = set(keys)
|
|
ret = {}
|
|
for key in node_meta - nope:
|
|
ret[key] = node[key]
|
|
return ret
|
|
|
|
|
|
def remote_path_join(path, *paths):
|
|
ret = path
|
|
for path in paths:
|
|
if not ret.endswith("/"):
|
|
ret += "/"
|
|
else:
|
|
ret += path
|
|
return ret
|
|
|
|
|
|
class drive_file:
|
|
def __init__(self, api, info, parent):
|
|
self.id = info.pop("id")
|
|
self.name = info.pop("name")
|
|
self.api = api
|
|
self.info = info
|
|
self.parent = parent
|
|
self._fullpath = None
|
|
|
|
@classmethod
|
|
def from_id(cls, id, api, parent):
|
|
info = api.service.files().get(fileId="root").execute()
|
|
return cls(api, info, parent)
|
|
|
|
@property
|
|
def fullpath(self):
|
|
if self._fullpath is None:
|
|
if self.parent is None:
|
|
path = "/"
|
|
else:
|
|
path = pathjoin(self.parent.fullpath, self.name)
|
|
self._fullpath = path
|
|
return self._fullpath
|
|
|
|
def __str__(self):
|
|
return "{}@{}".format(type(self).__name__, self.fullpath)
|
|
|
|
def __repr__(self):
|
|
return str(self)
|
|
|
|
|
|
class drive_folder(drive_file):
|
|
def __init__(self, api, info, parent):
|
|
super().__init__(api, info, parent)
|
|
self.folders = {}
|
|
self.files = {}
|
|
|
|
def create_folder(self, name):
|
|
meta = {
|
|
"name": name,
|
|
"mimeType": "application/vnd.google-apps.folder",
|
|
"parents": [self.id],
|
|
}
|
|
info = self.api.service.files().create(body=meta).execute()
|
|
self.folders[name] = drive_folder(self.api, info, self)
|
|
|
|
def refresh(self):
|
|
"""finds all files and folders from a parent"""
|
|
parent_id = self.id
|
|
child_list = self.__list_children__()
|
|
for child in child_list:
|
|
name = child["name"]
|
|
if is_folder(child):
|
|
self.folders[name] = drive_folder(self.api, child, self)
|
|
else:
|
|
self.files[name] = drive_file(self.api, child, self)
|
|
|
|
def __list_children__(self):
|
|
page_token = None
|
|
first = True
|
|
while page_token or first:
|
|
first = False
|
|
response = (
|
|
self.api.service.files()
|
|
.list(
|
|
q="'{}' in parents and trashed = false".format(self.id),
|
|
pageToken=page_token,
|
|
)
|
|
.execute()
|
|
)
|
|
page_token = response.get("nextPageToken", None)
|
|
for file in response["files"]:
|
|
yield file
|
|
|
|
|
|
class drive_api(API):
|
|
def __init__(
|
|
self,
|
|
app_name,
|
|
client_secret_file,
|
|
credentials_dir,
|
|
scopes="https://www.googleapis.com/auth/drive",
|
|
version="v3",
|
|
):
|
|
super().__init__(
|
|
"drive", scopes, app_name, client_secret_file, credentials_dir, version
|
|
)
|
|
self.root = drive_folder.from_id("root", self, None)
|
|
|
|
def get_file_by_path(self, path, ret_file=True):
|
|
"""gets a file or folder by remote path"""
|
|
if isinstance(path, str):
|
|
path = split_path(path)
|
|
else:
|
|
path = path.copy()
|
|
parent = self.root
|
|
end = path.pop()
|
|
if end == "/":
|
|
return self.root
|
|
for sub in path:
|
|
if sub != "/":
|
|
try:
|
|
if not parent.folders:
|
|
parent.refresh()
|
|
parent = parent.folders[sub]
|
|
except KeyError:
|
|
raise FileNotFoundError
|
|
try:
|
|
if ret_file:
|
|
if not parent.files:
|
|
parent.refresh()
|
|
ret = parent.files[end]
|
|
else:
|
|
if not parent.folders:
|
|
parent.refresh()
|
|
ret = parent.folders[end]
|
|
return ret
|
|
except KeyError:
|
|
raise FileNotFoundError
|
|
|
|
def mkdirs(self, path):
|
|
"""makes directories if they do not exist already"""
|
|
if isinstance(path, str):
|
|
path = split_path(path)
|
|
else:
|
|
path = path.copy()
|
|
missing = []
|
|
for i in range(len(path), 0, -1):
|
|
try:
|
|
parent = self.get_file_by_path(path[:i], False)
|
|
break
|
|
except FileNotFoundError:
|
|
missing.append(path[i - 1])
|
|
while len(missing) > 0:
|
|
name = missing.pop()
|
|
parent.create_folder(name)
|
|
parent = parent.folders[name]
|
|
return parent
|
|
|
|
def upload(self, local_path, remote_path, verbose=False):
|
|
if not isinstance(remote_path, str):
|
|
remote_path = pathjoin(*remote_path)
|
|
if os.path.isdir(local_path):
|
|
for root, dirs, files in os.walk(local_path, topdown=False):
|
|
for file in files:
|
|
new_path = path_translate(local_path, remote_path, root, file)
|
|
self.__upload_file__(os.path.join(root, file), new_path)
|
|
if verbose:
|
|
print(new_path)
|
|
return self.get_file_by_path(remote_path, False)
|
|
else:
|
|
return self.__upload_file__(local_path, remote_path)
|
|
|
|
def __upload_file__(self, local_path, remote_path):
|
|
"""creates file if it does not exists otherwise update the file"""
|
|
if isinstance(remote_path, str):
|
|
remote_path = split_path(remote_path)
|
|
else:
|
|
remote_path = remote_path.copy()
|
|
_upload = MediaFileUpload(local_path)
|
|
|
|
try:
|
|
old_file = self.get_file_by_path(remote_path)
|
|
self.service.files().update(
|
|
fileId=old_file.id, media_body=_upload
|
|
).execute()
|
|
return old_file
|
|
|
|
except FileNotFoundError:
|
|
path = remote_path
|
|
end = path.pop()
|
|
parent = self.mkdirs(path)
|
|
parent_id = parent.id
|
|
meta = {"name": end, "parents": [parent_id]}
|
|
new_f_meta = (
|
|
self.service.files().create(body=meta, media_body=_upload).execute()
|
|
)
|
|
new_f_meta = without(new_f_meta, "kind")
|
|
new_f = drive_file(self, new_f_meta, parent)
|
|
parent.files[end] = new_f
|
|
return new_f
|
|
|
|
|
|
if __name__ == "__main__":
|
|
my_api = drive_api(
|
|
APPLICATION_NAME, r"..\test\drive\client_secret.json", r"..\test\drive"
|
|
)
|
|
service = my_api.service
|