commit
a1799f5a61
5 changed files with 324 additions and 0 deletions
-
3.gitignore
-
59body_create.py
-
128gcalendar.py
-
46get_classes.py
-
88scraper.py
@ -0,0 +1,3 @@ |
|||
*.pkl |
|||
__pycache__ |
|||
/api_info |
|||
@ -0,0 +1,59 @@ |
|||
from dateutil import rrule |
|||
import datetime |
|||
import pickle |
|||
# event = { |
|||
# 'summary': 'Google I/O 2015', |
|||
# 'location': '800 Howard St., San Francisco, CA 94103', |
|||
# 'description': 'A chance to hear more about Google\'s developer products.', |
|||
# 'start': { |
|||
# 'dateTime': '2015-05-28T09:00:00-07:00', |
|||
# 'timeZone': 'America/Los_Angeles', |
|||
# }, |
|||
# 'end': { |
|||
# 'dateTime': '2015-05-28T17:00:00-07:00', |
|||
# 'timeZone': 'America/Los_Angeles', |
|||
# }, |
|||
# 'recurrence': [ |
|||
# 'RRULE:FREQ=DAILY;COUNT=2' |
|||
# ], |
|||
# 'attendees': [ |
|||
# {'email': 'lpage@example.com'}, |
|||
# {'email': 'sbrin@example.com'}, |
|||
# ], |
|||
# 'reminders': { |
|||
# 'useDefault': False, |
|||
# 'overrides': [ |
|||
# {'method': 'email', 'minutes': 24 * 60}, |
|||
# {'method': 'popup', 'minutes': 10}, |
|||
# ], |
|||
# }, |
|||
# } |
|||
|
|||
def rrule_former(class_obj): |
|||
days = class_obj.days |
|||
start =datetime.datetime.combine(class_obj.date_range[0],class_obj.time_range[0]).astimezone() |
|||
end =datetime.datetime.combine(class_obj.date_range[1],class_obj.time_range[1]).astimezone() |
|||
|
|||
days = list(map(lambda day: (day -1) % 7,days)) |
|||
ret = rrule.rrule(freq=rrule.WEEKLY,dtstart=start,wkst=rrule.SU,until=end,byweekday=days) |
|||
return ret |
|||
|
|||
|
|||
|
|||
def test(): |
|||
#test |
|||
now = datetime.datetime.now() |
|||
from munch import Munch |
|||
test_obj = Munch( |
|||
days=[1,3,5], |
|||
time_range=[ |
|||
now.time(), |
|||
(now+datetime.timedelta(seconds=50*60)).time() |
|||
], |
|||
date_range=[ |
|||
now.date(), |
|||
(now+datetime.timedelta(days=20)).date() |
|||
], |
|||
) |
|||
test_result = rrule_former(test_obj) |
|||
return locals() |
|||
@ -0,0 +1,128 @@ |
|||
# from __future__ import print_function |
|||
from apiclient import discovery |
|||
import datetime |
|||
from googleapiclient.discovery import build |
|||
from googleapiclient.errors import HttpError |
|||
|
|||
from oauth2client import client |
|||
from oauth2client import tools |
|||
from oauth2client.file import Storage |
|||
from oauth2client.service_account import ServiceAccountCredentials |
|||
|
|||
import httplib2 |
|||
import os |
|||
import pytz |
|||
import sys |
|||
import argparse |
|||
import tzlocal |
|||
flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args() |
|||
dt_fmt = '%Y-%m-%dT%H:%M:%S' |
|||
APPLICATION_NAME = 'Google Calendar API Python' |
|||
def dateTime(datetime): |
|||
if not datetime.tzinfo: |
|||
datetime = datetime.astimezone() |
|||
zone = tzlocal.get_localzone().zone |
|||
datetime = datetime.isoformat(timespec='seconds') |
|||
return { |
|||
"timeZone":zone, |
|||
"dateTime":datetime, |
|||
} |
|||
|
|||
class api: |
|||
def __init__(self,client_secret_file,credentials_dir,scopes = 'https://www.googleapis.com/auth/calendar'): |
|||
self.client_secret_file = client_secret_file |
|||
self.credentials_dir = credentials_dir |
|||
self.scopes = scopes |
|||
self._service = None |
|||
self._service_settime = None |
|||
self.calendars=self.get_calendars() |
|||
self.ids = dict((calendar['summary'].lower(),calendar['id']) for calendar in self.calendars) |
|||
|
|||
def get_credentials(self): |
|||
|
|||
credential_path = os.path.join(self.credentials_dir, |
|||
'token.json') |
|||
|
|||
store = Storage(credential_path) |
|||
credentials = store.get() |
|||
if not credentials or credentials.invalid: |
|||
flow = client.flow_from_clientsecrets(self.client_secret_file, self.scopes) |
|||
flow.user_agent = APPLICATION_NAME |
|||
if flags: |
|||
credentials = tools.run_flow(flow, store, flags) |
|||
else: # Needed only for compatibility with Python 2.6 |
|||
credentials = tools.run(flow, store) |
|||
print('Storing credentials to ' + credential_path) |
|||
return credentials |
|||
|
|||
def build_service(self): |
|||
credentials = self.get_credentials() |
|||
|
|||
http = credentials.authorize(httplib2.Http()) |
|||
|
|||
service = build('calendar', 'v3', http=http, cache_discovery=False) |
|||
return service |
|||
|
|||
def _needs_renewal(self): |
|||
now = datetime.datetime.today() |
|||
if self._service_settime: |
|||
return (now - self._service_settime) > datetime.timedelta(seconds = 60**2) |
|||
else: |
|||
return True |
|||
# elif |
|||
@property |
|||
def service(self): |
|||
if self._needs_renewal(): |
|||
service = self.build_service() |
|||
self._service = service |
|||
self._service_settime = datetime.datetime.today() |
|||
return service |
|||
else: |
|||
return self._service |
|||
|
|||
|
|||
def create_event(self, calendar_id, body): |
|||
|
|||
service = self.service |
|||
event = service.events().insert(calendarId=calendar_id, body=body).execute() |
|||
return event['id'] |
|||
|
|||
def update_event(self,calendar_id, event_id, body): |
|||
service = self.service |
|||
try: |
|||
event = service.events().get(calendarId=calendar_id, eventId=event_id).execute() |
|||
except HttpError as e: |
|||
if e.resp.status==404: |
|||
return self.create_event(calendar_id, body) |
|||
|
|||
updated_event = service.events().update(calendarId=calendar_id, eventId=event['id'], body=body).execute() |
|||
return updated_event["id"] |
|||
|
|||
def get_calendars(self): |
|||
page_token = None |
|||
cl = [] |
|||
while True: |
|||
calendar_list = self.service.calendarList().list(pageToken=page_token).execute() |
|||
cl += list(calendar_list_entry for calendar_list_entry in calendar_list['items']) |
|||
page_token = calendar_list.get('nextPageToken') |
|||
if not page_token: |
|||
break |
|||
return cl |
|||
|
|||
def get_events(self,id): |
|||
service = self.service |
|||
try: |
|||
id = self.ids[id] |
|||
except KeyError: |
|||
pass |
|||
page_token = None |
|||
ret = [] |
|||
while True: |
|||
events = service.events().list(calendarId='primary', pageToken=page_token).execute() |
|||
ret += events['items'] |
|||
page_token = events.get('nextPageToken') |
|||
if not page_token: |
|||
break |
|||
return ret |
|||
# if __name__ == "__main__": |
|||
# test = api(r"X:\Users\Ralphie\Downloads\client_secret.json",r"X:\Users\Ralphie\Downloads") |
|||
@ -0,0 +1,46 @@ |
|||
from pyppeteer import launch |
|||
import asyncio |
|||
import time |
|||
import scraper |
|||
set_semester = "document.getElementsByName('term_in')[0].selectedIndex = 0" |
|||
xpaths = { |
|||
'tab':".//a[text()='Current Student']", |
|||
'schedule':".//a[text()='Student Detail Schedule']", |
|||
'submit':"//input[@value='Submit']", |
|||
'frame':"//frame[@src='/cp/ip/login?sys=sctssb&url=https://ssb.neiu.edu/mercury_neiuprod/bwskfshd.P_CrseSchdDetl']" |
|||
} |
|||
async def xpath_single_element(xpath,page): |
|||
await page.waitForXPath(xpath) |
|||
elements = await page.xpath(xpath) |
|||
return elements[0] |
|||
async def main_loop(login): |
|||
browser = await launch(headless = False) |
|||
page_list = await browser.pages() |
|||
page = page_list[0] |
|||
r = await page.goto('https://neiuport.neiu.edu/cp/home/displaylogin') |
|||
await page.evaluate(login) |
|||
await page.waitFor('#tab') |
|||
student_tab = await xpath_single_element(xpaths['tab'],page) |
|||
await student_tab.click() |
|||
await page.waitForXPath(xpaths['schedule']) |
|||
schedule = await xpath_single_element(xpaths['schedule'],page) |
|||
await schedule.click() |
|||
page.waitForXPath(xpaths['frame']) |
|||
await asyncio.sleep(3) |
|||
frame = page.frames[-1] |
|||
submit= await xpath_single_element(xpaths['submit'],frame) |
|||
await submit.click() |
|||
await asyncio.sleep(1) |
|||
content = await page.frames[-1].content() |
|||
await browser.close() |
|||
return scraper.get_classes(content) |
|||
|
|||
def get_classes(user,password): |
|||
login = """document.getElementById('user').value='{}' |
|||
document.getElementById('pass').value='{}' |
|||
login()""".format(user,password) |
|||
loop = asyncio.get_event_loop() |
|||
r = loop.run_until_complete |
|||
return r(main_loop(login)) |
|||
if __name__ == "__main__": |
|||
cl = get_classes('rlroberts5','YxmZZ905p0w6') |
|||
@ -0,0 +1,88 @@ |
|||
from bs4 import BeautifulSoup as BS |
|||
import datetime |
|||
import re |
|||
from operator import sub |
|||
def dateparse(datetime_str): |
|||
date = '%b %d, %Y' |
|||
time = '%I:%M %p' |
|||
try: |
|||
return datetime.datetime.strptime(datetime_str,date) |
|||
except ValueError: |
|||
return datetime.datetime.strptime(datetime_str,time) |
|||
days = [None,'M','T','W','R','F',None] |
|||
simp_exceptions = ['Grade Mode'] |
|||
def datetime2date_time(dtime,mode): |
|||
if mode == 'date': |
|||
return datetime.date(dtime.year,dtime.month,dtime.day) |
|||
elif mode == 'time': |
|||
return datetime.time(dtime.hour,dtime.minute,dtime.second) |
|||
def seconds_from_midnight(t): |
|||
return t.hour*60**2+ t.minute*60+t.second |
|||
class Class: |
|||
def __init__(self,data): |
|||
info,times = data |
|||
#info |
|||
self.title,self.abrv,self.session = info.find('caption').text.split(' - ') |
|||
self.session = int(self.session) |
|||
rows = info.find_all('tr') |
|||
for row in rows: |
|||
name = row.find('th').text.rstrip(':') |
|||
data = re.sub(r'^ +|[\n\r\t]','',row.find('td').text) |
|||
|
|||
if name == 'Status': |
|||
type,date = data.split(' on ') |
|||
type = type.replace('*','') |
|||
self.type = type |
|||
self.registration_date = dateparse(date) |
|||
else: |
|||
if name in simp_exceptions: |
|||
name = name.lower().replace(' ','_') |
|||
else: |
|||
name = name.lower().split(' ')[-1] |
|||
if name != 'instructor': |
|||
data = data.lower() |
|||
try: |
|||
data = int(re.sub(r'\.\d+','',data)) |
|||
except: |
|||
|
|||
pass |
|||
self.__dict__[name] = data |
|||
|
|||
#time |
|||
headers,data = times.find_all('tr') |
|||
data = (col.text for col in data.find_all('td')) |
|||
headers = (header.text.lower() for header in headers.find_all('th')) |
|||
time_data = dict(zip(headers,data)) |
|||
if time_data['time'] == 'TBA': |
|||
self.time_range = None |
|||
else: |
|||
s,e = map(dateparse,time_data['time'].split(' - ')) |
|||
self.time_range = ( |
|||
datetime2date_time(s,'time'), |
|||
datetime2date_time(e,'time'), |
|||
) |
|||
s,e = map(dateparse,time_data['date range'].split(' - ')) |
|||
self.date_range = ( |
|||
datetime2date_time(s,'date'), |
|||
datetime2date_time(e,'date'), |
|||
) |
|||
time_data['days'] = re.sub('[^{}]'.format(''.join(filter(bool,days))),'',time_data['days']) |
|||
self.days = list(days.index(time_data['days'][i]) for i in range(len(time_data['days']))) |
|||
self.location = time_data['where'] |
|||
@property |
|||
def length(self): |
|||
return datetime.timedelta(seconds = sub( |
|||
seconds_from_midnight(self.time_range[1]), |
|||
seconds_from_midnight(self.time_range[0]), |
|||
)) |
|||
def get_classes(page): |
|||
if not isinstance(page,BS): |
|||
page = BS(page,'lxml') |
|||
tables = page.find_all('table',attrs= {'class':'datadisplaytable'}) |
|||
groups = ((tables[i],tables[i+1]) for i in range(0,len(tables),2)) |
|||
return list(map(Class,groups)) |
|||
|
|||
if __name__ == "__main__": |
|||
with open('schedule.html') as file: |
|||
page = BS(file.read(),'lxml') |
|||
class1,*classes = get_classes(page) |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue