Browse Source

made code pep8 compliant

no_compress
Raphael Roberts 7 years ago
parent
commit
181b045f10
  1. 27
      ctabus.py
  2. 121
      main.py
  3. 41
      print2d.py
  4. 46
      search.py

27
ctabus.py

@ -1,13 +1,14 @@
from urllib.parse import urlencode
from urllib.request import urlopen
import json
import os.path as osp
from sensitive import api
def get_data(type,api_key = api,**args):
def get_data(type, api_key=api, **args):
base_url = "http://www.ctabustracker.com/bustime/api/v2/{type}?{query}"
args['key'] = api_key
args['format'] = 'json'
url = base_url.format(type = type,query = urlencode(args))
url = base_url.format(type=type, query=urlencode(args))
response = urlopen(url)
data = json.load(response)['bustime-response']
try:
@ -16,16 +17,18 @@ def get_data(type,api_key = api,**args):
except KeyError:
return data
def get_times(stop_id,api_key = api):
return get_data('getpredictions',api_key,stpid=stop_id)
def get_routes(api_key = api):
return get_data('getroutes',api_key)
def get_times(stop_id, api_key=api):
return get_data('getpredictions', api_key, stpid=stop_id)
def get_routes(api_key=api):
return get_data('getroutes', api_key)
def get_directions(route,api_key = api):
return get_data('getdirections',api_key,rt=route)
def get_directions(route, api_key=api):
return get_data('getdirections', api_key, rt=route)
def get_stops(route,direction,api_key = api):
return get_data('getstops',api_key,rt = route,dir=direction)
#
def get_stops(route, direction, api_key=api):
return get_data('getstops', api_key, rt=route, dir=direction)

121
main.py

@ -10,72 +10,80 @@ import time
# for logging
import os.path as osp
import sys
import re
CHICAGO_TZ = tz.gettz("America/Chicago")
# https://stackoverflow.com/a/5967539
def atoi(text):
return int(text) if text.isdigit() else text
def numb_sort(text):
'''
alist.sort(key=natural_keys) sorts in human order
http://nedbatchelder.com/blog/200712/human_sorting.html
(See Toothy's implementation in the comments)
'''
return [ atoi(c) for c in re.split(r'(\d+)', text) ]
return [atoi(c) for c in re.split(r'(\d+)', text)]
def clearscr():
os.system('cls' if os.name == 'nt' else 'clear')
def pprint_delta(delta):
delta = str(delta)
days= None
days = None
s1 = delta.split(', ')
if len(s1) > 1:
days,time = s1
days, time = s1
else:
time = s1[0]
time = time.split('.')[0]
hour,minute,second = map(int,time.split(':'))
hour, minute, second = map(int, time.split(':'))
time = ''
if hour:
time += '{hour} hour'.format(hour=hour) + ('s' if hour != 1 else '')
if minute:
if time and not time.endswith(', '):
time += ', '
time += '{minute} minute'.format(minute=minute) + ('s' if minute != 1 else '')
time += '{minute} minute'.format(minute=minute) + \
('s' if minute != 1 else '')
if second:
if time and not time.endswith(', '):
time += ', '
time += '{second} second'.format(second=second) + ('s' if second != 1 else '')
time += '{second} second'.format(second=second) + \
('s' if second != 1 else '')
ret = ''
if days:
ret = days + ', ' if time else ''
ret += time
return ret
def gen_list(objs,data,*displays,key = None,sort = 0,num_pic = True):
def gen_list(objs, data, *displays, key=None, sort=0, num_pic=True):
k = displays[sort]
display_data = {obj[k]:obj[data] for obj in objs}
srt_keys = sorted(display_data.keys(),key=key)
display_data = {obj[k]: obj[data] for obj in objs}
srt_keys = sorted(display_data.keys(), key=key)
display = sorted(
[
[obj[d] for d in displays] for obj in objs
],
key = lambda row: key(row[sort]) if key else row[sort]
key=lambda row: key(row[sort]) if key else row[sort]
)
if num_pic:
display = [[i] + data for i,data in enumerate(display)]
display = [[i] + data for i, data in enumerate(display)]
opts = {
'spacer':' ',
'seperator':' ',
'spacer': ' ',
'seperator': ' ',
'interactive': True,
'bottom':'=',
'l_end':'<',
'r_end':'>',
}
print2d(display,**opts)
'bottom': '=',
'l_end': '<',
'r_end': '>',
}
print2d(display, **opts)
if num_pic:
which = None
while not which:
@ -92,88 +100,97 @@ def gen_list(objs,data,*displays,key = None,sort = 0,num_pic = True):
ret = None
while not ret:
try:
ret = display_data[input('Which one?: ')]
ret = display_data[input('Which one?: ')]
except KeyError:
pass
return ret
config = '''\
{route} - {end} ({direction})
{nm}, stop {stop_id}
{delta} ({t})\
'''
def show(data,rt_filter=None,_clear=False):
def show(data, rt_filter=None, _clear=False):
times = data['prd']
today = datetime.datetime.now(CHICAGO_TZ)
arrivals = sorted(times,key = lambda t: t['prdtm'])
arrivals = sorted(times, key=lambda t: t['prdtm'])
if rt_filter is not None:
arrivals =filter(lambda arrival: arrival['rt'] == rt_filter,arrivals)
arrivals = filter(lambda arrival: arrival['rt'] == rt_filter, arrivals)
if _clear:
clearscr()
for time in arrivals:
before = date_parse(time['prdtm'])
for bustime in arrivals:
before = date_parse(bustime['prdtm'])
arrival = before.replace(tzinfo=CHICAGO_TZ)
if arrival > today:
stop_id = time['stpid']
stop_id = bustime['stpid']
delta = pprint_delta(arrival-today)
t = arrival.strftime('%H:%M:%S')
route = time['rt']
direction = time['rtdir']
end = time['des']
nm = time['stpnm'].rstrip()
route = bustime['rt']
direction = bustime['rtdir']
end = bustime['des']
nm = bustime['stpnm'].rstrip()
print(
config.format(**locals()),end= '\n'*2
config.format(**locals()), end='\n'*2
)
print("="*36)
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog = 'ctabus')
parser.add_argument('-l','--lucky',action='store_true',help = 'picks first result')
parser.add_argument('-p','--periodic',metavar = 'SEC',type=int,help='checks periodically')
parser.add_argument('-r','--route',default = None)
parser.add_argument('-d','--direction',default = None)
parser.add_argument('arg',nargs = '+',metavar = '(stop-id | cross streets)')
parser = argparse.ArgumentParser(prog='ctabus')
parser.add_argument('-l', '--lucky', action='store_true',
help='picks first result')
parser.add_argument('-p', '--periodic', metavar='SEC',
type=int, help='checks periodically')
parser.add_argument('-r', '--route', default=None)
parser.add_argument('-d', '--direction', default=None)
parser.add_argument('arg', nargs='+', metavar='(stop-id | cross streets)')
args = parser.parse_args()
sys.stderr = open(osp.join(osp.dirname(__file__),'stderr.log'),'w')
sys.stderr = open(osp.join(osp.dirname(__file__), 'stderr.log'), 'w')
args.arg = ' '.join(args.arg)
if not args.arg.isdecimal():
# save on import time slightly
from print2d import print2d
from search import Search,StopSearch
#routes
from search import Search, StopSearch
# routes
if not args.route:
data = ctabus.get_routes()['routes']
route = gen_list(data,'rt','rt','rtnm',num_pic = False,key=numb_sort)
route = gen_list(data, 'rt', 'rt', 'rtnm',
num_pic=False, key=numb_sort)
else:
route = args.route
data = ctabus.get_directions(route)['directions']
#direction
# direction
if not args.direction:
direction = gen_list(data,'dir','dir')
direction = gen_list(data, 'dir', 'dir')
else:
s = Search(args.direction)
direction = sorted((obj['dir'] for obj in data),key = s)[0]
#direction
stops = ctabus.get_stops(route,direction)['stops']
direction = sorted((obj['dir'] for obj in data), key=s)[0]
# direction
stops = ctabus.get_stops(route, direction)['stops']
s = StopSearch(args.arg)
if args.lucky:
stop_id = sorted(stops,key=lambda stop: s(stop['stpnm']))[0]['stpid']
stop_id = sorted(stops, key=lambda stop: s(stop['stpnm']))[
0]['stpid']
else:
stop_id = gen_list(stops,'stpid','stpnm',key = s)
stop_id = gen_list(stops, 'stpid', 'stpnm', key=s)
else:
stop_id = args.arg
data=ctabus.get_times(stop_id)
data = ctabus.get_times(stop_id)
if args.periodic is not None:
_done = False
while not _done:
try:
show(data,args.route,True)
show(data, args.route, True)
s = time.perf_counter()
data = ctabus.get_times(stop_id)
e = time.perf_counter() - s
if e < args.periodic:
time.sleep(args.periodic-e)
except KeyboardInterrupt as e:
except KeyboardInterrupt:
_done = True
else:
show(data,args.route)
show(data, args.route)

41
print2d.py

@ -1,31 +1,44 @@
import datetime
from pydoc import pager
def str_coerce(s,**kwargs):
if isinstance(s,datetime.datetime):
def str_coerce(s, **kwargs):
if isinstance(s, datetime.datetime):
return s.strftime(kwargs['datetime_format'])
else:
return str(s)
def print2d(l,datetime_format = "%A, %B %e, %Y %H:%M:%S",seperator= ' | ',spacer = ' ',bottom = '=',l_end = '|',r_end = '|',interactive = False):
l = [[str_coerce(s,datetime_format = datetime_format) for s in row] for row in l]
def print2d(list_param,
datetime_format="%A, %B %e, %Y %H:%M:%S",
seperator=' | ',
spacer=' ',
bottom='=',
l_end='|', r_end='|',
interactive=False
):
list_param = [[str_coerce(s, datetime_format=datetime_format)
for s in row] for row in list_param]
max_col = []
for row in l:
for i,col in enumerate(row):
for row in list_param:
for i, col in enumerate(row):
try:
max_col[i] = max(max_col[i],len(col))
max_col[i] = max(max_col[i], len(col))
except IndexError:
max_col.append(len(col))
fmt_row = '{content}'
if l_end:
fmt_row = '{} {}'.format(l_end,fmt_row)
fmt_row = '{} {}'.format(l_end, fmt_row)
if r_end:
fmt_row = '{} {}'.format(fmt_row,r_end)
fmt_row = '{} {}'.format(fmt_row, r_end)
done = []
for row in l:
content = seperator.join(col.ljust(max_col[i],spacer if i < len(row)-1 or r_end else ' ') for i,col in enumerate(row))
done.append(fmt_row.format(content = content))
for row in list_param:
content = seperator.join(col.ljust(max_col[i], spacer if i < len(
row)-1 or r_end else ' ') for i, col in enumerate(row))
done.append(fmt_row.format(content=content))
if bottom:
bottom = bottom*len(done[0])
@ -34,10 +47,10 @@ def print2d(l,datetime_format = "%A, %B %e, %Y %H:%M:%S",seperator= ' | ',spacer
row_sep = '\n'
final = row_sep.join(done)
if bottom:
final = '\n'.join((bottom,final,bottom))
final = '\n'.join((bottom, final, bottom))
if interactive:
if not bottom:
final += '\n'
pager(final)
else:
return final
return final

46
search.py

@ -1,42 +1,54 @@
import edlib
def editdistance(a,b):
return edlib.align(a,b)['editDistance']
import re
import json
def editdistance(a, b):
return edlib.align(a, b)['editDistance']
class Search:
def __init__(self,query):
def __init__(self, query):
self.raw_lower = query.lower()
def __call__(self,arg):
def __call__(self, arg):
arg = arg.lower()
return editdistance(self.raw_lower,arg)
return editdistance(self.raw_lower, arg)
def __str__(self):
print(self.raw_lower)
def __repr__(self):
return str(self)
class StopSearch(Search):
def __init__(self,query):
def __init__(self, query):
super().__init__(query)
query = query.lower()
parts = re.split(r' ?(?:(?<!\w)and(?!\w)|&) ?',query)
parts = re.split(r' ?(?:(?<!\w)and(?!\w)|&) ?', query)
self.query = ' & '.join(parts)
self.query_reversed = ' & '.join(reversed(parts))
def __call__(self,stop):
def __call__(self, stop):
stop = stop.lower()
paren = re.search(r'\((?P<data>[^\)]+)\)',stop)
ret= [
editdistance(self.query,stop),
editdistance(self.query_reversed,stop),
paren = re.search(r'\((?P<data>[^\)]+)\)', stop)
ret = [
editdistance(self.query, stop),
editdistance(self.query_reversed, stop),
]
if paren:
paren = paren.group('data')
ret.append(editdistance(self.query,paren))
ret.append(editdistance(self.query, paren))
if self.raw_lower in stop:
ret = (item - 100 for item in ret)
return min(
ret
)
ret
)
def __str__(self):
return '{}|{}'.format(self.query,self.query_reversed)
return '{}|{}'.format(self.query, self.query_reversed)
if __name__ == "__main__":
with open('stops_out.json') as file:
@ -44,4 +56,4 @@ if __name__ == "__main__":
names = [stop['stpnm'] for stop in data['stops']]
while True:
q = StopSearch(input('Search: '))
print('\n'.join(sorted(names,key=q)),end='\n'*3)
print('\n'.join(sorted(names, key=q)), end='\n'*3)
Loading…
Cancel
Save