diff --git a/ctabus/__init__.py b/ctabus/__init__.py index 627f077..bc961fb 100644 --- a/ctabus/__init__.py +++ b/ctabus/__init__.py @@ -18,30 +18,35 @@ from ctabus import fetch from ctabus.internal.config import log_dir, recent_list from ctabus.internal.disk_cache import disk_cache, make_key -HAS_TOAST = shutil.which('termux-toast') is not None +HAS_TOAST = shutil.which("termux-toast") is not None CHICAGO_TZ = tz.gettz("America/Chicago") DATETIME_FORMAT = "%A, %B %e, %Y %H:%M:%S" # https://stackoverflow.com/a/5967539 -parser = argparse.ArgumentParser(prog='ctabus') -parser.add_argument('-l', '--lucky', action='store_true', - help='picks first result') -parser.add_argument('-p', '--periodic', metavar='SEC', - type=int, help='checks periodically') -parser.add_argument('-r', '--route', default=None) -parser.add_argument('-d', '--direction', default=None) -parser.add_argument('-t', '--disable_toast', action='store_false') -parser.add_argument('-k', '--kill-cache', action="store_true") +parser = argparse.ArgumentParser(prog="ctabus") +parser.add_argument("-l", "--lucky", action="store_true", help="picks first result") parser.add_argument( - 'arg', nargs='*' if len(recent_list.elements) > 0 else '+', metavar='(stop-id | cross streets)', default=[':recents:']) + "-p", "--periodic", metavar="SEC", type=int, help="checks periodically" +) +parser.add_argument("-r", "--route", default=None) +parser.add_argument("-d", "--direction", default=None) +parser.add_argument("-t", "--disable_toast", action="store_false") +parser.add_argument("-k", "--kill-cache", action="store_true") +parser.add_argument( + "arg", + nargs="*" if len(recent_list.elements) > 0 else "+", + metavar="(stop-id | cross streets)", + default=[":recents:"], +) def toast(text): read, write = os.pipe() os.write(write, text.encode()) os.close(write) - subprocess.Popen(["termux-toast", "-s", "-g", "top", "-c", "white", "-b", "black"], - stdin=read) + subprocess.Popen( + ["termux-toast", "-s", "-g", "top", "-c", "white", "-b", "black"], stdin=read + ) def atoi(text): @@ -49,44 +54,42 @@ def atoi(text): def numb_sort(text): - ''' + """ alist.sort(key=natural_keys) sorts in human order http://nedbatchelder.com/blog/200712/human_sorting.html (See Toothy's implementation in the comments) - ''' - return [atoi(c) for c in re.split(r'(\d+)', text)] + """ + return [atoi(c) for c in re.split(r"(\d+)", text)] def clearscr(): - os.system('cls' if os.name == 'nt' else 'clear') + os.system("cls" if os.name == "nt" else "clear") def pprint_delta(delta): delta = str(delta) days = None - s1 = delta.split(', ') + s1 = delta.split(", ") if len(s1) > 1: days, time = s1 else: time = s1[0] - time = time.split('.')[0] - hour, minute, second = map(int, time.split(':')) - time = '' + time = time.split(".")[0] + hour, minute, second = map(int, time.split(":")) + time = "" if hour: - time += '{hour} hour'.format(hour=hour) + ('s' if hour != 1 else '') + time += "{hour} hour".format(hour=hour) + ("s" if hour != 1 else "") if minute: - if time and not time.endswith(', '): - time += ', ' - time += '{minute} minute'.format(minute=minute) + \ - ('s' if minute != 1 else '') + if time and not time.endswith(", "): + time += ", " + time += "{minute} minute".format(minute=minute) + ("s" if minute != 1 else "") if second: - if time and not time.endswith(', '): - time += ', ' - time += '{second} second'.format(second=second) + \ - ('s' if second != 1 else '') - ret = '' + if time and not time.endswith(", "): + time += ", " + time += "{second} second".format(second=second) + ("s" if second != 1 else "") + ret = "" if days: - ret = days + ', ' if time else '' + ret = days + ", " if time else "" ret += time return ret @@ -100,35 +103,36 @@ class Table: def sort_by(self, key_name, key_function): if key_function is None: - def key(obj): return obj[key_name] + + def key(obj): + return obj[key_name] + else: - def key(obj): return key_function(obj[key_name]) - self.dicts = sorted( - self.dicts, key=key) + def key(obj): + return key_function(obj[key_name]) + + self.dicts = sorted(self.dicts, key=key) def set_display_categories(self, categories): self.display = [ - [ - _dict[column_name] for column_name in categories - ] for _dict in self.dicts + [_dict[column_name] for column_name in categories] for _dict in self.dicts ] def get_index_interactive(self): from ctabus.internal.print2d import create_table, render_table + if self.display is None: raise Exception("Display columns not set") - display = [ - [i] + row for i, row in enumerate(self.display) - ] + display = [[i] + row for i, row in enumerate(self.display)] table = create_table(display, DATETIME_FORMAT) render_table(table) which = None while which is None: try: - which = int(input('Which one?: ')) + which = int(input("Which one?: ")) if which >= len(self.dicts): - print("Max index is {}".format(len(self.dicts)-1)) + print("Max index is {}".format(len(self.dicts) - 1)) except ValueError: which = None @@ -136,6 +140,7 @@ class Table: def get_name_interactive(self, data_column): from ctabus.internal.print2d import create_table, render_table + if self.display is None: raise Exception("Display columns not set") display = self.display @@ -144,7 +149,7 @@ class Table: which = None valid = set(_dict[data_column] for _dict in self.dicts) while which is None: - which = input('Which one?: ') + which = input("Which one?: ") if which not in valid: which = None return which @@ -175,71 +180,67 @@ def gen_list(objs, data, *displays, key=None, sort=0, num_pic=True): quit() -config = '''\ +config = """\ {route} - {end} ({direction}) {nm}, stop {stop_id} {delta} ({t})\ -''' +""" def show(data, rt_filter=None, _clear=False, enable_toast=False): - times = data['prd'] + times = data["prd"] now = datetime.datetime.now(CHICAGO_TZ) - arrivals = sorted(times, key=lambda t: t['prdtm']) + arrivals = sorted(times, key=lambda t: t["prdtm"]) if rt_filter is not None: - arrivals = filter(lambda arrival: arrival['rt'] == rt_filter, arrivals) + arrivals = filter(lambda arrival: arrival["rt"] == rt_filter, arrivals) if _clear: clearscr() do_toast = True for bustime in arrivals: - before = date_parse(bustime['prdtm']) + before = date_parse(bustime["prdtm"]) arrival = before.replace(tzinfo=CHICAGO_TZ) if arrival > now: - stop_id = bustime['stpid'] - delta = pprint_delta(arrival-now) - t = arrival.strftime('%H:%M:%S') - route = bustime['rt'] - direction = bustime['rtdir'] - end = bustime['des'] - nm = bustime['stpnm'].rstrip() + stop_id = bustime["stpid"] + delta = pprint_delta(arrival - now) + t = arrival.strftime("%H:%M:%S") + route = bustime["rt"] + direction = bustime["rtdir"] + end = bustime["des"] + nm = bustime["stpnm"].rstrip() if do_toast and enable_toast: - toast(config.format(**locals()) + '\n'*2+"\n") + toast(config.format(**locals()) + "\n" * 2 + "\n") do_toast = False - print( - config.format(**locals()), end='\n'*2 - ) - print("="*36) + print(config.format(**locals()), end="\n" * 2) + print("=" * 36) def _picker(args): # save on import time slightly from ctabus.internal.search import Search, StopSearch + # routes if not args.route: - data = fetch.get_routes()['routes'] - route = gen_list(data, 'rt', 'rt', 'rtnm', - num_pic=False, key=numb_sort) + data = fetch.get_routes()["routes"] + route = gen_list(data, "rt", "rt", "rtnm", num_pic=False, key=numb_sort) else: route = args.route - data = fetch.get_directions(route)['directions'] + data = fetch.get_directions(route)["directions"] # direction if not args.direction: for direction_obj in data: - friendly_name = fetch.get_name_from_direction( - route, direction_obj['dir']) - direction_obj['friendly_name'] = friendly_name - direction = gen_list(data, 'dir', 'dir', 'friendly_name') + friendly_name = fetch.get_name_from_direction(route, direction_obj["dir"]) + direction_obj["friendly_name"] = friendly_name + direction = gen_list(data, "dir", "dir", "friendly_name") else: s = Search(args.direction) - direction = sorted((obj['dir'] for obj in data), key=s)[0] + direction = sorted((obj["dir"] for obj in data), key=s)[0] # direction - stops = fetch.get_stops(route, direction)['stops'] + stops = fetch.get_stops(route, direction)["stops"] s = StopSearch(args.arg) if args.lucky: - stop_id = sorted(stops, key=lambda stop: s(stop['stpnm']))[ - 0]['stpid'] + stop_id = sorted(stops, key=lambda stop: s(stop["stpnm"]))[0]["stpid"] else: - stop_id = gen_list(stops, 'stpid', 'stpnm', key=s) + stop_id = gen_list(stops, "stpid", "stpnm", key=s) return stop_id @@ -249,8 +250,9 @@ def _picker_recent(args): info = fetch.get_data_from_stop_id(stop) recent_stops.append(info) display_table = Table(recent_stops) - display_table.set_display_categories(['stop_id', 'stop_name', - 'route_number', 'route_direction', 'route_name']) + display_table.set_display_categories( + ["stop_id", "stop_name", "route_number", "route_direction", "route_name"] + ) index = display_table.get_index_interactive() return recent_list.get(index) @@ -272,7 +274,7 @@ def _main_periodic(args, stop_id, init_data): e = time.perf_counter() - s print("Error fetching times") if e < args.periodic: - time.sleep(args.periodic-e) + time.sleep(args.periodic - e) except KeyboardInterrupt: _done = True @@ -280,16 +282,16 @@ def _main_periodic(args, stop_id, init_data): def main(args=None): if args is None: args = parser.parse_args() - sys.stderr = open(osp.join(log_dir, 'stderr.log'), 'w') + sys.stderr = open(osp.join(log_dir, "stderr.log"), "w") if args.kill_cache: for cache_obj in disk_cache.caches: cache_obj.delete_cache() - args.arg = ' '.join(args.arg) + args.arg = " ".join(args.arg) from_recent = False if args.arg.isdecimal(): stop_id = args.arg else: - if args.arg == ':recents:': + if args.arg == ":recents:": stop_id = _picker_recent(args) from_recent = True else: @@ -297,10 +299,10 @@ def main(args=None): if not from_recent: recent_list.add(stop_id) data = fetch.get_times(stop_id) - info = data['prd'][0] - key = make_key(info['rt'], info['rtdir'], fetch.api, None) + info = data["prd"][0] + key = make_key(info["rt"], info["rtdir"], fetch.api, None) if key not in fetch.get_name_from_direction.cache.keys(): - fetch.get_name_from_direction.cache[key] = info['des'] + fetch.get_name_from_direction.cache[key] = info["des"] fetch.get_name_from_direction.fresh = True if args.periodic is not None: @@ -309,6 +311,6 @@ def main(args=None): show(data, args.route) -if __name__ == '__main__': +if __name__ == "__main__": args = parser.parse_args() main(args) diff --git a/ctabus/__main__.py b/ctabus/__main__.py index 2afcd1a..f7af55d 100644 --- a/ctabus/__main__.py +++ b/ctabus/__main__.py @@ -1,3 +1,4 @@ from ctabus import main + if __name__ == "__main__": main() diff --git a/ctabus/fetch.py b/ctabus/fetch.py index 1a8ee2d..123a51e 100644 --- a/ctabus/fetch.py +++ b/ctabus/fetch.py @@ -8,56 +8,56 @@ from ctabus.internal.disk_cache import disk_cache def get_data(type, api_key=api, timeout=None, **args): base_url = "http://www.ctabustracker.com/bustime/api/v2/{type}?{query}" - args['key'] = api_key - args['format'] = 'json' + args["key"] = api_key + args["format"] = "json" url = base_url.format(type=type, query=urlencode(args)) if timeout is not None: response = urlopen(url, timeout=timeout) else: response = urlopen(url) - data = json.load(response)['bustime-response'] + data = json.load(response)["bustime-response"] try: - data['error'] + data["error"] raise Exception(str(data["error"])) except KeyError: return data def get_times(stop_id, api_key=api, timeout=None): - return get_data('getpredictions', api_key, stpid=stop_id, timeout=timeout) + return get_data("getpredictions", api_key, stpid=stop_id, timeout=timeout) @disk_cache def get_routes(api_key=api, timeout=None): - return get_data('getroutes', api_key, timeout=timeout) + return get_data("getroutes", api_key, timeout=timeout) @disk_cache def get_directions(route, api_key=api, timeout=None): - return get_data('getdirections', api_key, rt=route, timeout=timeout) + return get_data("getdirections", api_key, rt=route, timeout=timeout) @disk_cache def get_stops(route, direction, api_key=api, timeout=None): - return get_data('getstops', api_key, rt=route, dir=direction, - timeout=timeout) + return get_data("getstops", api_key, rt=route, dir=direction, timeout=timeout) @disk_cache def get_name_from_direction(route, direction, api_key=api, timeout=None): - test_stop = get_stops(route, direction, api_key=api_key, - timeout=timeout)['stops'][0]['stpid'] - return get_times(test_stop, api_key=api, timeout=timeout)['prd'][0]['des'] + test_stop = get_stops(route, direction, api_key=api_key, timeout=timeout)["stops"][ + 0 + ]["stpid"] + return get_times(test_stop, api_key=api, timeout=timeout)["prd"][0]["des"] @disk_cache def get_data_from_stop_id(stop_id): - info = get_times(stop_id)['prd'][0] + info = get_times(stop_id)["prd"][0] ret = { - 'route_direction': info['rtdir'], - 'route_name': info['des'], - 'route_number': info['rt'], - 'stop_id': stop_id, - 'stop_name': info['stpnm'], + "route_direction": info["rtdir"], + "route_name": info["des"], + "route_number": info["rt"], + "stop_id": stop_id, + "stop_name": info["stpnm"], } return ret diff --git a/ctabus/internal/config.py b/ctabus/internal/config.py index d5f85fe..87672dd 100644 --- a/ctabus/internal/config.py +++ b/ctabus/internal/config.py @@ -4,7 +4,7 @@ import os import appdirs -app_dirs = appdirs.AppDirs('ctabus') +app_dirs = appdirs.AppDirs("ctabus") config_dir = app_dirs.user_config_dir cache_dir = app_dirs.user_cache_dir log_dir = app_dirs.user_log_dir @@ -16,11 +16,14 @@ for dir in (config_dir, cache_dir, log_dir, state_dir): recent_json = os.path.join(state_dir, "recent.json") try: - with open(os.path.join(config_dir, 'api.txt')) as file: + with open(os.path.join(config_dir, "api.txt")) as file: API_KEY = file.read().rstrip() except FileNotFoundError: - raise FileNotFoundError("Please place your CTA Bus Tracker api key in a text file located at '{}'".format( - os.path.join(config_dir, 'api.txt'))) + raise FileNotFoundError( + "Please place your CTA Bus Tracker api key in a text file located at '{}'".format( + os.path.join(config_dir, "api.txt") + ) + ) class RecentList: @@ -37,7 +40,7 @@ class RecentList: def add(self, element): if element not in self.current: - if len(self.elements)+1 > self.maxsize: + if len(self.elements) + 1 > self.maxsize: del self.elements[-1] self.elements.insert(0, element) self.fresh = True @@ -54,7 +57,7 @@ class RecentList: def save(self): if self.fresh: - with open(recent_json, 'w') as file: + with open(recent_json, "w") as file: json.dump(self.elements, file, sort_keys=True, indent=4) diff --git a/ctabus/internal/disk_cache.py b/ctabus/internal/disk_cache.py index 3612e14..90c06bd 100644 --- a/ctabus/internal/disk_cache.py +++ b/ctabus/internal/disk_cache.py @@ -8,12 +8,12 @@ from ctabus.internal.config import cache_dir def make_key(*args, **kwargs): - return args, tuple(sorted( - kwargs.items(), key=lambda item: item[0])) + return args, tuple(sorted(kwargs.items(), key=lambda item: item[0])) class disk_cache: """Decorator to make persistent cache""" + caches = [] use_lzma = True @@ -42,10 +42,10 @@ class disk_cache: def load_cache(self): try: if disk_cache.use_lzma: - with lzma.open(self.fname, 'rb') as file: + with lzma.open(self.fname, "rb") as file: cache = pickle.load(file) else: - with open(self.fname, 'rb') as file: + with open(self.fname, "rb") as file: cache = pickle.load(file) self.fresh = False except FileNotFoundError: @@ -55,10 +55,10 @@ class disk_cache: def save_cache(self): if disk_cache.use_lzma: - with lzma.open(self.fname, 'wb') as file: + with lzma.open(self.fname, "wb") as file: pickle.dump(self.cache, file, pickle.HIGHEST_PROTOCOL) else: - with open(self.fname, 'wb') as file: + with open(self.fname, "wb") as file: pickle.dump(self.cache, file, pickle.HIGHEST_PROTOCOL) def delete_cache(self): diff --git a/ctabus/internal/print2d.py b/ctabus/internal/print2d.py index 0b13e92..eb7fb77 100644 --- a/ctabus/internal/print2d.py +++ b/ctabus/internal/print2d.py @@ -16,25 +16,25 @@ def getpager(): return plainpager if not sys.stdin.isatty() or not sys.stdout.isatty(): return plainpager - use_pager = os.environ.get('MANPAGER') or os.environ.get('PAGER') + use_pager = os.environ.get("MANPAGER") or os.environ.get("PAGER") if use_pager: - if sys.platform == 'win32': # pipes completely broken in Windows + if sys.platform == "win32": # pipes completely broken in Windows return lambda text: tempfilepager(plain(text), use_pager) - elif os.environ.get('TERM') in ('dumb', 'emacs'): + elif os.environ.get("TERM") in ("dumb", "emacs"): return lambda text: pipepager(plain(text), use_pager) else: return lambda text: pipepager(text, use_pager) - if os.environ.get('TERM') in ('dumb', 'emacs'): + if os.environ.get("TERM") in ("dumb", "emacs"): return plainpager - if sys.platform == 'win32': - return lambda text: tempfilepager(plain(text), 'more <') - if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0: - return lambda text: pipepager(text, 'less -X') + if sys.platform == "win32": + return lambda text: tempfilepager(plain(text), "more <") + if hasattr(os, "system") and os.system("(less) 2>/dev/null") == 0: + return lambda text: pipepager(text, "less -X") def str_coerce(s, **kwargs): if isinstance(s, datetime.datetime): - return s.strftime(kwargs['datetime_format']) + return s.strftime(kwargs["datetime_format"]) else: return str(s) @@ -50,33 +50,34 @@ def create_table(list_param, datetime_format): def fix_iteration(table: AsciiTable, max_width): col_length = len(table.table_data[0]) - average_size = ((max_width-1) // col_length) - average_size = average_size - \ - (1 + table.padding_left + table.padding_right) - sizes = list(map(lambda init_size: 1 + table.padding_left + - init_size + table.padding_right, table.column_widths)) + average_size = (max_width - 1) // col_length + average_size = average_size - (1 + table.padding_left + table.padding_right) + sizes = list( + map( + lambda init_size: 1 + table.padding_left + init_size + table.padding_right, + table.column_widths, + ) + ) # pick the biggest column to work on - sorted_sizes = sorted( - range(col_length), key=lambda i: sizes[i], reverse=True) + sorted_sizes = sorted(range(col_length), key=lambda i: sizes[i], reverse=True) workon = sorted_sizes[0] # either do the maximum possible size or the average size, whichever is larger. The other columns will accommodate. wrap_to = max( ( average_size, - (max_width-1) - table.table_width + - table.column_widths[workon] + (max_width - 1) - table.table_width + table.column_widths[workon], ) ) if wrap_to > 0: for row in table.table_data: - row[workon] = fill(row[workon], wrap_to+1) + row[workon] = fill(row[workon], wrap_to + 1) return True else: return False def render_table(table: AsciiTable, interactive=True): - '''Do all wrapping to make the table fit in screen''' + """Do all wrapping to make the table fit in screen""" MAX_WIDTH = terminal_size()[0] table.inner_row_border = True if not table.table_width < MAX_WIDTH: diff --git a/ctabus/internal/search.py b/ctabus/internal/search.py index 26a68ca..3644797 100644 --- a/ctabus/internal/search.py +++ b/ctabus/internal/search.py @@ -5,7 +5,7 @@ import edlib def editdistance(a, b): - return edlib.align(a, b)['editDistance'] + return edlib.align(a, b)["editDistance"] class Search: @@ -27,34 +27,29 @@ class StopSearch(Search): def __init__(self, query): super().__init__(query) query = query.lower() - parts = re.split(r' ?(?:(?[^\)]+)\)', stop) - ret = [ - editdistance(self.query, stop), - editdistance(self.query_reversed, stop), - ] + paren = re.search(r"\((?P[^\)]+)\)", stop) + ret = [editdistance(self.query, stop), editdistance(self.query_reversed, stop)] if paren: - paren = paren.group('data') + paren = paren.group("data") ret.append(editdistance(self.query, paren)) if self.raw_lower in stop: ret = (item - 100 for item in ret) - return min( - ret - ) + return min(ret) def __str__(self): - return '{}|{}'.format(self.query, self.query_reversed) + return "{}|{}".format(self.query, self.query_reversed) if __name__ == "__main__": - with open('stops_out.json') as file: + with open("stops_out.json") as file: data = json.load(file) - names = [stop['stpnm'] for stop in data['stops']] + names = [stop["stpnm"] for stop in data["stops"]] while True: - q = StopSearch(input('Search: ')) - print('\n'.join(sorted(names, key=q)), end='\n'*3) + q = StopSearch(input("Search: ")) + print("\n".join(sorted(names, key=q)), end="\n" * 3) diff --git a/setup.py b/setup.py index 70a7139..4fd1755 100644 --- a/setup.py +++ b/setup.py @@ -1,26 +1,22 @@ from setuptools import setup, find_packages -with open('requirements.txt') as file: - INSTALL_REQUIRES = file.read().rstrip().split('\n') +with open("requirements.txt") as file: + INSTALL_REQUIRES = file.read().rstrip().split("\n") setup( - name='ctabus', - version='2.1.3', - description='Python package for tracking cta bus times', + name="ctabus", + version="2.1.3", + description="Python package for tracking cta bus times", install_requires=INSTALL_REQUIRES, - author='rlbr', - author_email='raphael.roberts48@gmail.com', + author="rlbr", + author_email="raphael.roberts48@gmail.com", packages=find_packages(), - entry_points={ - 'console_scripts': ['ctabus=ctabus:main'] - }, + entry_points={"console_scripts": ["ctabus=ctabus:main"]}, classifiers=[ - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 3 :: Only', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - - ] - + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + ], )