From d5a0870b806bf783851c44826ebdd83181a543dc Mon Sep 17 00:00:00 2001 From: Raphael Roberts Date: Thu, 25 Apr 2019 13:55:21 -0500 Subject: [PATCH] Moving things around to be more sane and starting on setup.py --- ctabus/__init__.py | 259 ++++++++++++++++++++++++++++++++++ ctabus/ctabus.py | 7 +- ctabus/internal/config.py | 14 ++ ctabus/internal/disk_cache.py | 2 +- ctabus/internal/print2d.py | 9 +- ctabus/internal/search.py | 3 +- ctabus/main.py | 256 --------------------------------- setup.py | 17 +++ 8 files changed, 302 insertions(+), 265 deletions(-) create mode 100644 ctabus/internal/config.py delete mode 100755 ctabus/main.py create mode 100644 setup.py diff --git a/ctabus/__init__.py b/ctabus/__init__.py index e69de29..5a1c0a5 100644 --- a/ctabus/__init__.py +++ b/ctabus/__init__.py @@ -0,0 +1,259 @@ +#!/usr/bin/python3 +from dateutil.parser import parse as date_parse +from dateutil import tz +import argparse +import datetime +import os +import re +import socket +import time +import urllib +import subprocess +import os.path as osp +import sys +import shutil + +from ctabus.internal.disk_cache import disk_cache, make_key +from ctabus import ctabus + +HAS_TOAST = shutil.which('termux-toast') is not None +CHICAGO_TZ = tz.gettz("America/Chicago") +DATETIME_FORMAT = "%A, %B %e, %Y %H:%M:%S" +# https://stackoverflow.com/a/5967539 + +parser = argparse.ArgumentParser(prog='ctabus') +parser.add_argument('-l', '--lucky', action='store_true', + help='picks first result') +parser.add_argument('-p', '--periodic', metavar='SEC', + type=int, help='checks periodically') +parser.add_argument('-r', '--route', default=None) +parser.add_argument('-d', '--direction', default=None) +parser.add_argument('-t', '--disable_toast', action='store_false') +parser.add_argument('-k', '--kill-cache', action="store_true") +parser.add_argument('arg', nargs='+', metavar='(stop-id | cross streets)') + + +def toast(text): + read, write = os.pipe() + os.write(write, text.encode()) + os.close(write) + subprocess.Popen(["termux-toast", "-g", "top", "-c", "white", "-b", "black"], + stdin=read) + + +def atoi(text): + return int(text) if text.isdigit() else text + + +def numb_sort(text): + ''' + alist.sort(key=natural_keys) sorts in human order + http://nedbatchelder.com/blog/200712/human_sorting.html + (See Toothy's implementation in the comments) + ''' + return [atoi(c) for c in re.split(r'(\d+)', text)] + + +def clearscr(): + os.system('cls' if os.name == 'nt' else 'clear') + + +def pprint_delta(delta): + delta = str(delta) + days = None + s1 = delta.split(', ') + if len(s1) > 1: + days, time = s1 + else: + time = s1[0] + time = time.split('.')[0] + hour, minute, second = map(int, time.split(':')) + time = '' + if hour: + time += '{hour} hour'.format(hour=hour) + ('s' if hour != 1 else '') + if minute: + if time and not time.endswith(', '): + time += ', ' + time += '{minute} minute'.format(minute=minute) + \ + ('s' if minute != 1 else '') + if second: + if time and not time.endswith(', '): + time += ', ' + time += '{second} second'.format(second=second) + \ + ('s' if second != 1 else '') + ret = '' + if days: + ret = days + ', ' if time else '' + ret += time + return ret + + +def gen_list(objs, data, *displays, key=None, sort=0, num_pic=True): + from print2d import create_table, render_table + # sort based on column number + k = displays[sort] + display_data = {obj[k]: obj[data] for obj in objs} + srt_keys = sorted(display_data.keys(), key=key) + + display = sorted( + [ + [obj[d] for d in displays] for obj in objs + ], + key=lambda row: key(row[sort]) if key else row[sort] + ) + if num_pic: + display = [[i] + data for i, data in enumerate(display)] + + table = create_table(display, DATETIME_FORMAT) + render_table(table) + if num_pic: + which = None + while not which: + try: + which = input('Which one?: ') + except KeyboardInterrupt: + quit() + try: + which = srt_keys[int(which)] + except (ValueError, IndexError): + which = None + return display_data[which] + else: + ret = None + while not ret: + try: + ret = display_data[input('Which one?: ')] + except KeyError: + pass + return ret + + +config = '''\ +{route} - {end} ({direction}) +{nm}, stop {stop_id} +{delta} ({t})\ +''' + + +def show(data, rt_filter=None, _clear=False, enable_toast=False): + times = data['prd'] + now = datetime.datetime.now(CHICAGO_TZ) + arrivals = sorted(times, key=lambda t: t['prdtm']) + if rt_filter is not None: + arrivals = filter(lambda arrival: arrival['rt'] == rt_filter, arrivals) + if _clear: + clearscr() + do_toast = True + for bustime in arrivals: + before = date_parse(bustime['prdtm']) + arrival = before.replace(tzinfo=CHICAGO_TZ) + if arrival > now: + stop_id = bustime['stpid'] + delta = pprint_delta(arrival-now) + t = arrival.strftime('%H:%M:%S') + route = bustime['rt'] + direction = bustime['rtdir'] + end = bustime['des'] + nm = bustime['stpnm'].rstrip() + if do_toast and enable_toast: + toast(config.format(**locals()) + '\n'*2+"\n") + do_toast = False + print( + config.format(**locals()), end='\n'*2 + ) + print("="*36) + + +def _picker(args): + # save on import time slightly + from search import Search, StopSearch + # routes + if not args.route: + data = ctabus.get_routes()['routes'] + route = gen_list(data, 'rt', 'rt', 'rtnm', + num_pic=False, key=numb_sort) + else: + route = args.route + data = ctabus.get_directions(route)['directions'] + # direction + if not args.direction: + for direction_obj in data: + friendly_name = ctabus.get_name_from_direction( + route, direction_obj['dir']) + direction_obj['friendly_name'] = friendly_name + direction = gen_list(data, 'dir', 'dir', 'friendly_name') + else: + s = Search(args.direction) + direction = sorted((obj['dir'] for obj in data), key=s)[0] + # direction + stops = ctabus.get_stops(route, direction)['stops'] + s = StopSearch(args.arg) + if args.lucky: + stop_id = sorted(stops, key=lambda stop: s(stop['stpnm']))[ + 0]['stpid'] + else: + stop_id = gen_list(stops, 'stpid', 'stpnm', key=s) + return stop_id + + +def _picker_recent(args): + pass + + +def _main_periodic(args, stop_id, init_data): + _done = False + data = init_data + while not _done: + try: + show(data, args.route, True, args.disable_toast and HAS_TOAST) + s = time.perf_counter() + timeout = 1 + if args.periodic > timeout: + timeout = args.periodic + data = ctabus.get_times(stop_id, timeout=timeout) + e = time.perf_counter() - s + except KeyboardInterrupt: + _done = True + except (urllib.error.URLError, socket.timeout): + e = time.perf_counter() - s + print("Error fetching times") + if e < args.periodic: + time.sleep(args.periodic-e) + + +def main(args=None): + if args is None: + args = parser.parse_args() + sys.stderr = open(osp.join(osp.dirname(__file__), 'stderr.log'), 'w') + if args.kill_cache: + for cache_obj in disk_cache.caches: + cache_obj.delete_cache() + args.arg = ' '.join(args.arg) + + if args.arg.isdecimal(): + stop_id = args.arg + else: + if args.arg == ':recent:': + pass + else: + stop_id = _picker(args) + + data = ctabus.get_times(stop_id) + info = data['prd'][0] + key = make_key(info['rt'], info['rtdir'], ctabus.api, None) + if key not in ctabus.get_name_from_direction.cache.keys(): + ctabus.get_name_from_direction.cache[key] = info['des'] + ctabus.get_name_from_direction.fresh = True + + if args.periodic is not None: + _main_periodic(args, stop_id, data) + else: + show(data, args.route) + for cache_obj in disk_cache.caches: + if cache_obj.fresh: + cache_obj.save_cache() + + +if __name__ == '__main__': + args = parser.parse_args() + main(args) diff --git a/ctabus/ctabus.py b/ctabus/ctabus.py index 6e968cd..3dd74c3 100644 --- a/ctabus/ctabus.py +++ b/ctabus/ctabus.py @@ -1,8 +1,9 @@ +import json from urllib.parse import urlencode from urllib.request import urlopen -from disk_cache import disk_cache -import json -from sensitive import api + +from ctabus.internal.disk_cache import disk_cache +from ctabus.internal.config import API_KEY as api def get_data(type, api_key=api, timeout=None, **args): diff --git a/ctabus/internal/config.py b/ctabus/internal/config.py new file mode 100644 index 0000000..4cf9455 --- /dev/null +++ b/ctabus/internal/config.py @@ -0,0 +1,14 @@ +import os + +import appdirs + +app_dirs = appdirs.AppDirs('ctabus') +config_dir = app_dirs.user_config_dir +try: + with open(os.path.join(config_dir, 'api.txt')) as file: + API_KEY = file.read().rstrip() +except FileNotFoundError: + if not os.path.exists(config_dir): + os.makedirs(config_dir) + raise FileNotFoundError("Please place your CTA Bus Tracker api key in a text file located at '{}'".format( + os.path.join(config_dir, 'api.txt'))) diff --git a/ctabus/internal/disk_cache.py b/ctabus/internal/disk_cache.py index b5a9b3c..acd7529 100644 --- a/ctabus/internal/disk_cache.py +++ b/ctabus/internal/disk_cache.py @@ -1,7 +1,7 @@ import pickle import os import lzma -cache_path = os.path.abspath(os.path.join(__file__, "..", "__pycache__")) +from ctabus.config import cache_path if not os.path.exists(cache_path): os.mkdir(cache_path) diff --git a/ctabus/internal/print2d.py b/ctabus/internal/print2d.py index fbf7851..2a53ac6 100644 --- a/ctabus/internal/print2d.py +++ b/ctabus/internal/print2d.py @@ -1,10 +1,11 @@ -from terminaltables.terminal_io import terminal_size -from terminaltables import AsciiTable -from textwrap import fill -from pydoc import pipepager, tempfilepager, plainpager, plain import datetime import os import sys +from pydoc import pipepager, tempfilepager, plainpager, plain + +from terminaltables.terminal_io import terminal_size +from terminaltables import AsciiTable +from textwrap import fill def getpager(): diff --git a/ctabus/internal/search.py b/ctabus/internal/search.py index ee58af8..1495bd4 100644 --- a/ctabus/internal/search.py +++ b/ctabus/internal/search.py @@ -1,7 +1,8 @@ -import edlib import re import json +import edlib + def editdistance(a, b): return edlib.align(a, b)['editDistance'] diff --git a/ctabus/main.py b/ctabus/main.py deleted file mode 100755 index 7cdf169..0000000 --- a/ctabus/main.py +++ /dev/null @@ -1,256 +0,0 @@ -#!/usr/bin/python3 -from dateutil.parser import parse as date_parse -from dateutil import tz -from disk_cache import disk_cache, make_key -import argparse -import ctabus -import datetime -import os -import re -import socket -import time -import urllib -import subprocess -# for logging -import os.path as osp -import sys -import shutil -HAS_TOAST = shutil.which('termux-toast') is not None -CHICAGO_TZ = tz.gettz("America/Chicago") -DATETIME_FORMAT = "%A, %B %e, %Y %H:%M:%S" -# https://stackoverflow.com/a/5967539 - -parser = argparse.ArgumentParser(prog='ctabus') -parser.add_argument('-l', '--lucky', action='store_true', - help='picks first result') -parser.add_argument('-p', '--periodic', metavar='SEC', - type=int, help='checks periodically') -parser.add_argument('-r', '--route', default=None) -parser.add_argument('-d', '--direction', default=None) -parser.add_argument('-t', '--disable_toast', action='store_false') -parser.add_argument('-k', '--kill-cache', action="store_true") -parser.add_argument('arg', nargs='+', metavar='(stop-id | cross streets)') - - -def toast(text): - read, write = os.pipe() - os.write(write, text.encode()) - os.close(write) - subprocess.Popen(["termux-toast", "-g", "top", "-c", "white", "-b", "black"], - stdin=read) - - -def atoi(text): - return int(text) if text.isdigit() else text - - -def numb_sort(text): - ''' - alist.sort(key=natural_keys) sorts in human order - http://nedbatchelder.com/blog/200712/human_sorting.html - (See Toothy's implementation in the comments) - ''' - return [atoi(c) for c in re.split(r'(\d+)', text)] - - -def clearscr(): - os.system('cls' if os.name == 'nt' else 'clear') - - -def pprint_delta(delta): - delta = str(delta) - days = None - s1 = delta.split(', ') - if len(s1) > 1: - days, time = s1 - else: - time = s1[0] - time = time.split('.')[0] - hour, minute, second = map(int, time.split(':')) - time = '' - if hour: - time += '{hour} hour'.format(hour=hour) + ('s' if hour != 1 else '') - if minute: - if time and not time.endswith(', '): - time += ', ' - time += '{minute} minute'.format(minute=minute) + \ - ('s' if minute != 1 else '') - if second: - if time and not time.endswith(', '): - time += ', ' - time += '{second} second'.format(second=second) + \ - ('s' if second != 1 else '') - ret = '' - if days: - ret = days + ', ' if time else '' - ret += time - return ret - - -def gen_list(objs, data, *displays, key=None, sort=0, num_pic=True): - from print2d import create_table, render_table - # sort based on column number - k = displays[sort] - display_data = {obj[k]: obj[data] for obj in objs} - srt_keys = sorted(display_data.keys(), key=key) - - display = sorted( - [ - [obj[d] for d in displays] for obj in objs - ], - key=lambda row: key(row[sort]) if key else row[sort] - ) - if num_pic: - display = [[i] + data for i, data in enumerate(display)] - - table = create_table(display, DATETIME_FORMAT) - render_table(table) - if num_pic: - which = None - while not which: - try: - which = input('Which one?: ') - except KeyboardInterrupt: - quit() - try: - which = srt_keys[int(which)] - except (ValueError, IndexError): - which = None - return display_data[which] - else: - ret = None - while not ret: - try: - ret = display_data[input('Which one?: ')] - except KeyError: - pass - return ret - - -config = '''\ -{route} - {end} ({direction}) -{nm}, stop {stop_id} -{delta} ({t})\ -''' - - -def show(data, rt_filter=None, _clear=False, enable_toast=False): - times = data['prd'] - now = datetime.datetime.now(CHICAGO_TZ) - arrivals = sorted(times, key=lambda t: t['prdtm']) - if rt_filter is not None: - arrivals = filter(lambda arrival: arrival['rt'] == rt_filter, arrivals) - if _clear: - clearscr() - do_toast = True - for bustime in arrivals: - before = date_parse(bustime['prdtm']) - arrival = before.replace(tzinfo=CHICAGO_TZ) - if arrival > now: - stop_id = bustime['stpid'] - delta = pprint_delta(arrival-now) - t = arrival.strftime('%H:%M:%S') - route = bustime['rt'] - direction = bustime['rtdir'] - end = bustime['des'] - nm = bustime['stpnm'].rstrip() - if do_toast and enable_toast: - toast(config.format(**locals()) + '\n'*2+"\n") - do_toast = False - print( - config.format(**locals()), end='\n'*2 - ) - print("="*36) - - -def _picker(args): - # save on import time slightly - from search import Search, StopSearch - # routes - if not args.route: - data = ctabus.get_routes()['routes'] - route = gen_list(data, 'rt', 'rt', 'rtnm', - num_pic=False, key=numb_sort) - else: - route = args.route - data = ctabus.get_directions(route)['directions'] - # direction - if not args.direction: - for direction_obj in data: - friendly_name = ctabus.get_name_from_direction( - route, direction_obj['dir']) - direction_obj['friendly_name'] = friendly_name - direction = gen_list(data, 'dir', 'dir', 'friendly_name') - else: - s = Search(args.direction) - direction = sorted((obj['dir'] for obj in data), key=s)[0] - # direction - stops = ctabus.get_stops(route, direction)['stops'] - s = StopSearch(args.arg) - if args.lucky: - stop_id = sorted(stops, key=lambda stop: s(stop['stpnm']))[ - 0]['stpid'] - else: - stop_id = gen_list(stops, 'stpid', 'stpnm', key=s) - return stop_id - - -def _picker_recent(args): - pass - - -def _main_periodic(args, stop_id, init_data): - _done = False - data = init_data - while not _done: - try: - show(data, args.route, True, args.disable_toast and HAS_TOAST) - s = time.perf_counter() - timeout = 1 - if args.periodic > timeout: - timeout = args.periodic - data = ctabus.get_times(stop_id, timeout=timeout) - e = time.perf_counter() - s - except KeyboardInterrupt: - _done = True - except (urllib.error.URLError, socket.timeout): - e = time.perf_counter() - s - print("Error fetching times") - if e < args.periodic: - time.sleep(args.periodic-e) - - -def main(args): - sys.stderr = open(osp.join(osp.dirname(__file__), 'stderr.log'), 'w') - if args.kill_cache: - for cache_obj in disk_cache.caches: - cache_obj.delete_cache() - args.arg = ' '.join(args.arg) - - if args.arg.isdecimal(): - stop_id = args.arg - else: - if args.arg == ':recent:': - pass - else: - stop_id = _picker(args) - - data = ctabus.get_times(stop_id) - info = data['prd'][0] - key = make_key(info['rt'], info['rtdir'], ctabus.api, None) - if key not in ctabus.get_name_from_direction.cache.keys(): - ctabus.get_name_from_direction.cache[key] = info['des'] - ctabus.get_name_from_direction.fresh = True - - if args.periodic is not None: - _main_periodic(args, stop_id, data) - else: - show(data, args.route) - for cache_obj in disk_cache.caches: - if cache_obj.fresh: - cache_obj.save_cache() - - -if __name__ == '__main__': - args = parser.parse_args() - main(args) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..a658514 --- /dev/null +++ b/setup.py @@ -0,0 +1,17 @@ +from setuptools import setup + +with open('requirements.txt') as file: + INSTALL_REQUIRES = file.rstrip().split('\n') + +setup( + name='ctabus', + version='1.0', + description='Python package for tracking cta bus times', + install_requires=INSTALL_REQUIRES, + author='rlbr', + author_email='raphael.roberts48@gmail.com', + packages=['ctabus'], + entry_points={ + 'console_scripts': ['ctabus=ctabus:main'] + } +)