diff --git a/ctabus.py b/ctabus.py index e1de5c9..aee9540 100644 --- a/ctabus.py +++ b/ctabus.py @@ -1,13 +1,14 @@ from urllib.parse import urlencode from urllib.request import urlopen import json -import os.path as osp from sensitive import api -def get_data(type,api_key = api,**args): + + +def get_data(type, api_key=api, **args): base_url = "http://www.ctabustracker.com/bustime/api/v2/{type}?{query}" args['key'] = api_key args['format'] = 'json' - url = base_url.format(type = type,query = urlencode(args)) + url = base_url.format(type=type, query=urlencode(args)) response = urlopen(url) data = json.load(response)['bustime-response'] try: @@ -16,16 +17,18 @@ def get_data(type,api_key = api,**args): except KeyError: return data -def get_times(stop_id,api_key = api): - return get_data('getpredictions',api_key,stpid=stop_id) -def get_routes(api_key = api): - return get_data('getroutes',api_key) +def get_times(stop_id, api_key=api): + return get_data('getpredictions', api_key, stpid=stop_id) + + +def get_routes(api_key=api): + return get_data('getroutes', api_key) + -def get_directions(route,api_key = api): - return get_data('getdirections',api_key,rt=route) +def get_directions(route, api_key=api): + return get_data('getdirections', api_key, rt=route) -def get_stops(route,direction,api_key = api): - return get_data('getstops',api_key,rt = route,dir=direction) -# \ No newline at end of file +def get_stops(route, direction, api_key=api): + return get_data('getstops', api_key, rt=route, dir=direction) diff --git a/main.py b/main.py index 5291b1a..10f16d1 100755 --- a/main.py +++ b/main.py @@ -10,72 +10,80 @@ import time # for logging import os.path as osp import sys -import re CHICAGO_TZ = tz.gettz("America/Chicago") # https://stackoverflow.com/a/5967539 + + def atoi(text): return int(text) if text.isdigit() else text + + def numb_sort(text): ''' alist.sort(key=natural_keys) sorts in human order http://nedbatchelder.com/blog/200712/human_sorting.html (See Toothy's implementation in the comments) ''' - return [ atoi(c) for c in re.split(r'(\d+)', text) ] + return [atoi(c) for c in re.split(r'(\d+)', text)] + def clearscr(): os.system('cls' if os.name == 'nt' else 'clear') + def pprint_delta(delta): delta = str(delta) - days= None + days = None s1 = delta.split(', ') if len(s1) > 1: - days,time = s1 + days, time = s1 else: time = s1[0] time = time.split('.')[0] - hour,minute,second = map(int,time.split(':')) + hour, minute, second = map(int, time.split(':')) time = '' if hour: time += '{hour} hour'.format(hour=hour) + ('s' if hour != 1 else '') if minute: if time and not time.endswith(', '): time += ', ' - time += '{minute} minute'.format(minute=minute) + ('s' if minute != 1 else '') + time += '{minute} minute'.format(minute=minute) + \ + ('s' if minute != 1 else '') if second: if time and not time.endswith(', '): time += ', ' - time += '{second} second'.format(second=second) + ('s' if second != 1 else '') + time += '{second} second'.format(second=second) + \ + ('s' if second != 1 else '') ret = '' if days: ret = days + ', ' if time else '' ret += time return ret -def gen_list(objs,data,*displays,key = None,sort = 0,num_pic = True): + +def gen_list(objs, data, *displays, key=None, sort=0, num_pic=True): k = displays[sort] - display_data = {obj[k]:obj[data] for obj in objs} - srt_keys = sorted(display_data.keys(),key=key) + display_data = {obj[k]: obj[data] for obj in objs} + srt_keys = sorted(display_data.keys(), key=key) display = sorted( [ [obj[d] for d in displays] for obj in objs ], - key = lambda row: key(row[sort]) if key else row[sort] + key=lambda row: key(row[sort]) if key else row[sort] ) if num_pic: - display = [[i] + data for i,data in enumerate(display)] + display = [[i] + data for i, data in enumerate(display)] opts = { - 'spacer':' ', - 'seperator':' ', + 'spacer': ' ', + 'seperator': ' ', 'interactive': True, - 'bottom':'=', - 'l_end':'<', - 'r_end':'>', - } - print2d(display,**opts) + 'bottom': '=', + 'l_end': '<', + 'r_end': '>', + } + print2d(display, **opts) if num_pic: which = None while not which: @@ -92,88 +100,97 @@ def gen_list(objs,data,*displays,key = None,sort = 0,num_pic = True): ret = None while not ret: try: - ret = display_data[input('Which one?: ')] + ret = display_data[input('Which one?: ')] except KeyError: pass return ret + config = '''\ {route} - {end} ({direction}) {nm}, stop {stop_id} {delta} ({t})\ ''' -def show(data,rt_filter=None,_clear=False): + + +def show(data, rt_filter=None, _clear=False): times = data['prd'] today = datetime.datetime.now(CHICAGO_TZ) - arrivals = sorted(times,key = lambda t: t['prdtm']) + arrivals = sorted(times, key=lambda t: t['prdtm']) if rt_filter is not None: - arrivals =filter(lambda arrival: arrival['rt'] == rt_filter,arrivals) + arrivals = filter(lambda arrival: arrival['rt'] == rt_filter, arrivals) if _clear: clearscr() - for time in arrivals: - before = date_parse(time['prdtm']) + for bustime in arrivals: + before = date_parse(bustime['prdtm']) arrival = before.replace(tzinfo=CHICAGO_TZ) if arrival > today: - stop_id = time['stpid'] + stop_id = bustime['stpid'] delta = pprint_delta(arrival-today) t = arrival.strftime('%H:%M:%S') - route = time['rt'] - direction = time['rtdir'] - end = time['des'] - nm = time['stpnm'].rstrip() + route = bustime['rt'] + direction = bustime['rtdir'] + end = bustime['des'] + nm = bustime['stpnm'].rstrip() print( - config.format(**locals()),end= '\n'*2 + config.format(**locals()), end='\n'*2 ) print("="*36) + + if __name__ == '__main__': - parser = argparse.ArgumentParser(prog = 'ctabus') - parser.add_argument('-l','--lucky',action='store_true',help = 'picks first result') - parser.add_argument('-p','--periodic',metavar = 'SEC',type=int,help='checks periodically') - parser.add_argument('-r','--route',default = None) - parser.add_argument('-d','--direction',default = None) - parser.add_argument('arg',nargs = '+',metavar = '(stop-id | cross streets)') + parser = argparse.ArgumentParser(prog='ctabus') + parser.add_argument('-l', '--lucky', action='store_true', + help='picks first result') + parser.add_argument('-p', '--periodic', metavar='SEC', + type=int, help='checks periodically') + parser.add_argument('-r', '--route', default=None) + parser.add_argument('-d', '--direction', default=None) + parser.add_argument('arg', nargs='+', metavar='(stop-id | cross streets)') args = parser.parse_args() - sys.stderr = open(osp.join(osp.dirname(__file__),'stderr.log'),'w') + sys.stderr = open(osp.join(osp.dirname(__file__), 'stderr.log'), 'w') args.arg = ' '.join(args.arg) if not args.arg.isdecimal(): # save on import time slightly from print2d import print2d - from search import Search,StopSearch - #routes + from search import Search, StopSearch + # routes if not args.route: data = ctabus.get_routes()['routes'] - route = gen_list(data,'rt','rt','rtnm',num_pic = False,key=numb_sort) + route = gen_list(data, 'rt', 'rt', 'rtnm', + num_pic=False, key=numb_sort) else: route = args.route data = ctabus.get_directions(route)['directions'] - #direction + # direction if not args.direction: - direction = gen_list(data,'dir','dir') + direction = gen_list(data, 'dir', 'dir') else: s = Search(args.direction) - direction = sorted((obj['dir'] for obj in data),key = s)[0] - #direction - stops = ctabus.get_stops(route,direction)['stops'] + direction = sorted((obj['dir'] for obj in data), key=s)[0] + # direction + stops = ctabus.get_stops(route, direction)['stops'] s = StopSearch(args.arg) if args.lucky: - stop_id = sorted(stops,key=lambda stop: s(stop['stpnm']))[0]['stpid'] + stop_id = sorted(stops, key=lambda stop: s(stop['stpnm']))[ + 0]['stpid'] else: - stop_id = gen_list(stops,'stpid','stpnm',key = s) + stop_id = gen_list(stops, 'stpid', 'stpnm', key=s) else: stop_id = args.arg - data=ctabus.get_times(stop_id) + data = ctabus.get_times(stop_id) if args.periodic is not None: _done = False while not _done: try: - show(data,args.route,True) + show(data, args.route, True) s = time.perf_counter() data = ctabus.get_times(stop_id) e = time.perf_counter() - s if e < args.periodic: time.sleep(args.periodic-e) - except KeyboardInterrupt as e: + except KeyboardInterrupt: _done = True else: - show(data,args.route) + show(data, args.route) diff --git a/print2d.py b/print2d.py index 5f66c5f..100b993 100644 --- a/print2d.py +++ b/print2d.py @@ -1,31 +1,44 @@ import datetime from pydoc import pager -def str_coerce(s,**kwargs): - if isinstance(s,datetime.datetime): + + +def str_coerce(s, **kwargs): + if isinstance(s, datetime.datetime): return s.strftime(kwargs['datetime_format']) else: return str(s) -def print2d(l,datetime_format = "%A, %B %e, %Y %H:%M:%S",seperator= ' | ',spacer = ' ',bottom = '=',l_end = '|',r_end = '|',interactive = False): - l = [[str_coerce(s,datetime_format = datetime_format) for s in row] for row in l] + + +def print2d(list_param, + datetime_format="%A, %B %e, %Y %H:%M:%S", + seperator=' | ', + spacer=' ', + bottom='=', + l_end='|', r_end='|', + interactive=False + ): + list_param = [[str_coerce(s, datetime_format=datetime_format) + for s in row] for row in list_param] max_col = [] - for row in l: - for i,col in enumerate(row): + for row in list_param: + for i, col in enumerate(row): try: - max_col[i] = max(max_col[i],len(col)) + max_col[i] = max(max_col[i], len(col)) except IndexError: max_col.append(len(col)) fmt_row = '{content}' if l_end: - fmt_row = '{} {}'.format(l_end,fmt_row) + fmt_row = '{} {}'.format(l_end, fmt_row) if r_end: - fmt_row = '{} {}'.format(fmt_row,r_end) + fmt_row = '{} {}'.format(fmt_row, r_end) done = [] - for row in l: - content = seperator.join(col.ljust(max_col[i],spacer if i < len(row)-1 or r_end else ' ') for i,col in enumerate(row)) - done.append(fmt_row.format(content = content)) + for row in list_param: + content = seperator.join(col.ljust(max_col[i], spacer if i < len( + row)-1 or r_end else ' ') for i, col in enumerate(row)) + done.append(fmt_row.format(content=content)) if bottom: bottom = bottom*len(done[0]) @@ -34,10 +47,10 @@ def print2d(l,datetime_format = "%A, %B %e, %Y %H:%M:%S",seperator= ' | ',spacer row_sep = '\n' final = row_sep.join(done) if bottom: - final = '\n'.join((bottom,final,bottom)) + final = '\n'.join((bottom, final, bottom)) if interactive: if not bottom: final += '\n' pager(final) else: - return final \ No newline at end of file + return final diff --git a/search.py b/search.py index 4485405..ee58af8 100644 --- a/search.py +++ b/search.py @@ -1,42 +1,54 @@ import edlib -def editdistance(a,b): - return edlib.align(a,b)['editDistance'] import re import json + + +def editdistance(a, b): + return edlib.align(a, b)['editDistance'] + + class Search: - def __init__(self,query): + def __init__(self, query): self.raw_lower = query.lower() - def __call__(self,arg): + + def __call__(self, arg): arg = arg.lower() - return editdistance(self.raw_lower,arg) + return editdistance(self.raw_lower, arg) + def __str__(self): print(self.raw_lower) + def __repr__(self): return str(self) + + class StopSearch(Search): - def __init__(self,query): + def __init__(self, query): super().__init__(query) query = query.lower() - parts = re.split(r' ?(?:(?[^\)]+)\)',stop) - ret= [ - editdistance(self.query,stop), - editdistance(self.query_reversed,stop), + paren = re.search(r'\((?P[^\)]+)\)', stop) + ret = [ + editdistance(self.query, stop), + editdistance(self.query_reversed, stop), ] if paren: paren = paren.group('data') - ret.append(editdistance(self.query,paren)) + ret.append(editdistance(self.query, paren)) if self.raw_lower in stop: ret = (item - 100 for item in ret) return min( - ret - ) + ret + ) + def __str__(self): - return '{}|{}'.format(self.query,self.query_reversed) + return '{}|{}'.format(self.query, self.query_reversed) + if __name__ == "__main__": with open('stops_out.json') as file: @@ -44,4 +56,4 @@ if __name__ == "__main__": names = [stop['stpnm'] for stop in data['stops']] while True: q = StopSearch(input('Search: ')) - print('\n'.join(sorted(names,key=q)),end='\n'*3) \ No newline at end of file + print('\n'.join(sorted(names, key=q)), end='\n'*3)