You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
|
|
import datetimeimport osimport pickle
from googleapiclient.discovery import buildfrom google_auth_oauthlib.flow import InstalledAppFlowfrom google.auth.transport.requests import Request
import gapi
HOUR = datetime.timedelta(seconds=60 ** 2)
class API: def __init__( self, service, scopes, app_name, client_secret_file=os.path.join(gapi.state_dir, "client_secret.json"), credentials_dir=gapi.state_dir, version="v3", ): self.service_name = service self.app_name = app_name self.client_secret_file = client_secret_file self.credentials_dir = credentials_dir self.scopes = scopes self._service = None self._service_settime = None self.version = version
def get_credentials(self): creds = None # The file token.pickle stores the user's access and refresh tokens, and is # created automatically when the authorization flow completes for the first # time. credential_path = os.path.join(self.credentials_dir, "token.pickle") if os.path.exists(credential_path): with open(credential_path, "rb") as token: creds = pickle.load(token) # If there are no (valid) credentials available, let the user log in. if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( self.client_secret_file, self.scopes ) creds = flow.run_local_server(port=0) # Save the credentials for the next run print("Storing credentials to " + credential_path) with open(credential_path, "wb") as token: pickle.dump(creds, token) return creds
def build_service(self): credentials = self.get_credentials()
service = build(self.service_name, self.version, credentials=credentials) return service
def _needs_renewal(self): now = datetime.datetime.today() if self._service_settime: return (now - self._service_settime) > HOUR else: return True
@property def service(self): if self._needs_renewal(): service = self.build_service() self._service = service self._service_settime = datetime.datetime.today() return service else: return self._service
|