Browse Source

blackened code

atexit
Raphael Roberts 7 years ago
parent
commit
000f44d29e
  1. 174
      ctabus/__init__.py
  2. 1
      ctabus/__main__.py
  3. 36
      ctabus/fetch.py
  4. 15
      ctabus/internal/config.py
  5. 12
      ctabus/internal/disk_cache.py
  6. 41
      ctabus/internal/print2d.py
  7. 31
      ctabus/internal/search.py
  8. 32
      setup.py

174
ctabus/__init__.py

@ -18,30 +18,35 @@ from ctabus import fetch
from ctabus.internal.config import log_dir, recent_list
from ctabus.internal.disk_cache import disk_cache, make_key
HAS_TOAST = shutil.which('termux-toast') is not None
HAS_TOAST = shutil.which("termux-toast") is not None
CHICAGO_TZ = tz.gettz("America/Chicago")
DATETIME_FORMAT = "%A, %B %e, %Y %H:%M:%S"
# https://stackoverflow.com/a/5967539
parser = argparse.ArgumentParser(prog='ctabus')
parser.add_argument('-l', '--lucky', action='store_true',
help='picks first result')
parser.add_argument('-p', '--periodic', metavar='SEC',
type=int, help='checks periodically')
parser.add_argument('-r', '--route', default=None)
parser.add_argument('-d', '--direction', default=None)
parser.add_argument('-t', '--disable_toast', action='store_false')
parser.add_argument('-k', '--kill-cache', action="store_true")
parser = argparse.ArgumentParser(prog="ctabus")
parser.add_argument("-l", "--lucky", action="store_true", help="picks first result")
parser.add_argument(
'arg', nargs='*' if len(recent_list.elements) > 0 else '+', metavar='(stop-id | cross streets)', default=[':recents:'])
"-p", "--periodic", metavar="SEC", type=int, help="checks periodically"
)
parser.add_argument("-r", "--route", default=None)
parser.add_argument("-d", "--direction", default=None)
parser.add_argument("-t", "--disable_toast", action="store_false")
parser.add_argument("-k", "--kill-cache", action="store_true")
parser.add_argument(
"arg",
nargs="*" if len(recent_list.elements) > 0 else "+",
metavar="(stop-id | cross streets)",
default=[":recents:"],
)
def toast(text):
read, write = os.pipe()
os.write(write, text.encode())
os.close(write)
subprocess.Popen(["termux-toast", "-s", "-g", "top", "-c", "white", "-b", "black"],
stdin=read)
subprocess.Popen(
["termux-toast", "-s", "-g", "top", "-c", "white", "-b", "black"], stdin=read
)
def atoi(text):
@ -49,44 +54,42 @@ def atoi(text):
def numb_sort(text):
'''
"""
alist.sort(key=natural_keys) sorts in human order
http://nedbatchelder.com/blog/200712/human_sorting.html
(See Toothy's implementation in the comments)
'''
return [atoi(c) for c in re.split(r'(\d+)', text)]
"""
return [atoi(c) for c in re.split(r"(\d+)", text)]
def clearscr():
os.system('cls' if os.name == 'nt' else 'clear')
os.system("cls" if os.name == "nt" else "clear")
def pprint_delta(delta):
delta = str(delta)
days = None
s1 = delta.split(', ')
s1 = delta.split(", ")
if len(s1) > 1:
days, time = s1
else:
time = s1[0]
time = time.split('.')[0]
hour, minute, second = map(int, time.split(':'))
time = ''
time = time.split(".")[0]
hour, minute, second = map(int, time.split(":"))
time = ""
if hour:
time += '{hour} hour'.format(hour=hour) + ('s' if hour != 1 else '')
time += "{hour} hour".format(hour=hour) + ("s" if hour != 1 else "")
if minute:
if time and not time.endswith(', '):
time += ', '
time += '{minute} minute'.format(minute=minute) + \
('s' if minute != 1 else '')
if time and not time.endswith(", "):
time += ", "
time += "{minute} minute".format(minute=minute) + ("s" if minute != 1 else "")
if second:
if time and not time.endswith(', '):
time += ', '
time += '{second} second'.format(second=second) + \
('s' if second != 1 else '')
ret = ''
if time and not time.endswith(", "):
time += ", "
time += "{second} second".format(second=second) + ("s" if second != 1 else "")
ret = ""
if days:
ret = days + ', ' if time else ''
ret = days + ", " if time else ""
ret += time
return ret
@ -100,35 +103,36 @@ class Table:
def sort_by(self, key_name, key_function):
if key_function is None:
def key(obj): return obj[key_name]
def key(obj):
return obj[key_name]
else:
def key(obj): return key_function(obj[key_name])
self.dicts = sorted(
self.dicts, key=key)
def key(obj):
return key_function(obj[key_name])
self.dicts = sorted(self.dicts, key=key)
def set_display_categories(self, categories):
self.display = [
[
_dict[column_name] for column_name in categories
] for _dict in self.dicts
[_dict[column_name] for column_name in categories] for _dict in self.dicts
]
def get_index_interactive(self):
from ctabus.internal.print2d import create_table, render_table
if self.display is None:
raise Exception("Display columns not set")
display = [
[i] + row for i, row in enumerate(self.display)
]
display = [[i] + row for i, row in enumerate(self.display)]
table = create_table(display, DATETIME_FORMAT)
render_table(table)
which = None
while which is None:
try:
which = int(input('Which one?: '))
which = int(input("Which one?: "))
if which >= len(self.dicts):
print("Max index is {}".format(len(self.dicts)-1))
print("Max index is {}".format(len(self.dicts) - 1))
except ValueError:
which = None
@ -136,6 +140,7 @@ class Table:
def get_name_interactive(self, data_column):
from ctabus.internal.print2d import create_table, render_table
if self.display is None:
raise Exception("Display columns not set")
display = self.display
@ -144,7 +149,7 @@ class Table:
which = None
valid = set(_dict[data_column] for _dict in self.dicts)
while which is None:
which = input('Which one?: ')
which = input("Which one?: ")
if which not in valid:
which = None
return which
@ -175,71 +180,67 @@ def gen_list(objs, data, *displays, key=None, sort=0, num_pic=True):
quit()
config = '''\
config = """\
{route} - {end} ({direction})
{nm}, stop {stop_id}
{delta} ({t})\
'''
"""
def show(data, rt_filter=None, _clear=False, enable_toast=False):
times = data['prd']
times = data["prd"]
now = datetime.datetime.now(CHICAGO_TZ)
arrivals = sorted(times, key=lambda t: t['prdtm'])
arrivals = sorted(times, key=lambda t: t["prdtm"])
if rt_filter is not None:
arrivals = filter(lambda arrival: arrival['rt'] == rt_filter, arrivals)
arrivals = filter(lambda arrival: arrival["rt"] == rt_filter, arrivals)
if _clear:
clearscr()
do_toast = True
for bustime in arrivals:
before = date_parse(bustime['prdtm'])
before = date_parse(bustime["prdtm"])
arrival = before.replace(tzinfo=CHICAGO_TZ)
if arrival > now:
stop_id = bustime['stpid']
delta = pprint_delta(arrival-now)
t = arrival.strftime('%H:%M:%S')
route = bustime['rt']
direction = bustime['rtdir']
end = bustime['des']
nm = bustime['stpnm'].rstrip()
stop_id = bustime["stpid"]
delta = pprint_delta(arrival - now)
t = arrival.strftime("%H:%M:%S")
route = bustime["rt"]
direction = bustime["rtdir"]
end = bustime["des"]
nm = bustime["stpnm"].rstrip()
if do_toast and enable_toast:
toast(config.format(**locals()) + '\n'*2+"\n")
toast(config.format(**locals()) + "\n" * 2 + "\n")
do_toast = False
print(
config.format(**locals()), end='\n'*2
)
print("="*36)
print(config.format(**locals()), end="\n" * 2)
print("=" * 36)
def _picker(args):
# save on import time slightly
from ctabus.internal.search import Search, StopSearch
# routes
if not args.route:
data = fetch.get_routes()['routes']
route = gen_list(data, 'rt', 'rt', 'rtnm',
num_pic=False, key=numb_sort)
data = fetch.get_routes()["routes"]
route = gen_list(data, "rt", "rt", "rtnm", num_pic=False, key=numb_sort)
else:
route = args.route
data = fetch.get_directions(route)['directions']
data = fetch.get_directions(route)["directions"]
# direction
if not args.direction:
for direction_obj in data:
friendly_name = fetch.get_name_from_direction(
route, direction_obj['dir'])
direction_obj['friendly_name'] = friendly_name
direction = gen_list(data, 'dir', 'dir', 'friendly_name')
friendly_name = fetch.get_name_from_direction(route, direction_obj["dir"])
direction_obj["friendly_name"] = friendly_name
direction = gen_list(data, "dir", "dir", "friendly_name")
else:
s = Search(args.direction)
direction = sorted((obj['dir'] for obj in data), key=s)[0]
direction = sorted((obj["dir"] for obj in data), key=s)[0]
# direction
stops = fetch.get_stops(route, direction)['stops']
stops = fetch.get_stops(route, direction)["stops"]
s = StopSearch(args.arg)
if args.lucky:
stop_id = sorted(stops, key=lambda stop: s(stop['stpnm']))[
0]['stpid']
stop_id = sorted(stops, key=lambda stop: s(stop["stpnm"]))[0]["stpid"]
else:
stop_id = gen_list(stops, 'stpid', 'stpnm', key=s)
stop_id = gen_list(stops, "stpid", "stpnm", key=s)
return stop_id
@ -249,8 +250,9 @@ def _picker_recent(args):
info = fetch.get_data_from_stop_id(stop)
recent_stops.append(info)
display_table = Table(recent_stops)
display_table.set_display_categories(['stop_id', 'stop_name',
'route_number', 'route_direction', 'route_name'])
display_table.set_display_categories(
["stop_id", "stop_name", "route_number", "route_direction", "route_name"]
)
index = display_table.get_index_interactive()
return recent_list.get(index)
@ -272,7 +274,7 @@ def _main_periodic(args, stop_id, init_data):
e = time.perf_counter() - s
print("Error fetching times")
if e < args.periodic:
time.sleep(args.periodic-e)
time.sleep(args.periodic - e)
except KeyboardInterrupt:
_done = True
@ -280,16 +282,16 @@ def _main_periodic(args, stop_id, init_data):
def main(args=None):
if args is None:
args = parser.parse_args()
sys.stderr = open(osp.join(log_dir, 'stderr.log'), 'w')
sys.stderr = open(osp.join(log_dir, "stderr.log"), "w")
if args.kill_cache:
for cache_obj in disk_cache.caches:
cache_obj.delete_cache()
args.arg = ' '.join(args.arg)
args.arg = " ".join(args.arg)
from_recent = False
if args.arg.isdecimal():
stop_id = args.arg
else:
if args.arg == ':recents:':
if args.arg == ":recents:":
stop_id = _picker_recent(args)
from_recent = True
else:
@ -297,10 +299,10 @@ def main(args=None):
if not from_recent:
recent_list.add(stop_id)
data = fetch.get_times(stop_id)
info = data['prd'][0]
key = make_key(info['rt'], info['rtdir'], fetch.api, None)
info = data["prd"][0]
key = make_key(info["rt"], info["rtdir"], fetch.api, None)
if key not in fetch.get_name_from_direction.cache.keys():
fetch.get_name_from_direction.cache[key] = info['des']
fetch.get_name_from_direction.cache[key] = info["des"]
fetch.get_name_from_direction.fresh = True
if args.periodic is not None:
@ -309,6 +311,6 @@ def main(args=None):
show(data, args.route)
if __name__ == '__main__':
if __name__ == "__main__":
args = parser.parse_args()
main(args)

1
ctabus/__main__.py

@ -1,3 +1,4 @@
from ctabus import main
if __name__ == "__main__":
main()

36
ctabus/fetch.py

@ -8,56 +8,56 @@ from ctabus.internal.disk_cache import disk_cache
def get_data(type, api_key=api, timeout=None, **args):
base_url = "http://www.ctabustracker.com/bustime/api/v2/{type}?{query}"
args['key'] = api_key
args['format'] = 'json'
args["key"] = api_key
args["format"] = "json"
url = base_url.format(type=type, query=urlencode(args))
if timeout is not None:
response = urlopen(url, timeout=timeout)
else:
response = urlopen(url)
data = json.load(response)['bustime-response']
data = json.load(response)["bustime-response"]
try:
data['error']
data["error"]
raise Exception(str(data["error"]))
except KeyError:
return data
def get_times(stop_id, api_key=api, timeout=None):
return get_data('getpredictions', api_key, stpid=stop_id, timeout=timeout)
return get_data("getpredictions", api_key, stpid=stop_id, timeout=timeout)
@disk_cache
def get_routes(api_key=api, timeout=None):
return get_data('getroutes', api_key, timeout=timeout)
return get_data("getroutes", api_key, timeout=timeout)
@disk_cache
def get_directions(route, api_key=api, timeout=None):
return get_data('getdirections', api_key, rt=route, timeout=timeout)
return get_data("getdirections", api_key, rt=route, timeout=timeout)
@disk_cache
def get_stops(route, direction, api_key=api, timeout=None):
return get_data('getstops', api_key, rt=route, dir=direction,
timeout=timeout)
return get_data("getstops", api_key, rt=route, dir=direction, timeout=timeout)
@disk_cache
def get_name_from_direction(route, direction, api_key=api, timeout=None):
test_stop = get_stops(route, direction, api_key=api_key,
timeout=timeout)['stops'][0]['stpid']
return get_times(test_stop, api_key=api, timeout=timeout)['prd'][0]['des']
test_stop = get_stops(route, direction, api_key=api_key, timeout=timeout)["stops"][
0
]["stpid"]
return get_times(test_stop, api_key=api, timeout=timeout)["prd"][0]["des"]
@disk_cache
def get_data_from_stop_id(stop_id):
info = get_times(stop_id)['prd'][0]
info = get_times(stop_id)["prd"][0]
ret = {
'route_direction': info['rtdir'],
'route_name': info['des'],
'route_number': info['rt'],
'stop_id': stop_id,
'stop_name': info['stpnm'],
"route_direction": info["rtdir"],
"route_name": info["des"],
"route_number": info["rt"],
"stop_id": stop_id,
"stop_name": info["stpnm"],
}
return ret

15
ctabus/internal/config.py

@ -4,7 +4,7 @@ import os
import appdirs
app_dirs = appdirs.AppDirs('ctabus')
app_dirs = appdirs.AppDirs("ctabus")
config_dir = app_dirs.user_config_dir
cache_dir = app_dirs.user_cache_dir
log_dir = app_dirs.user_log_dir
@ -16,11 +16,14 @@ for dir in (config_dir, cache_dir, log_dir, state_dir):
recent_json = os.path.join(state_dir, "recent.json")
try:
with open(os.path.join(config_dir, 'api.txt')) as file:
with open(os.path.join(config_dir, "api.txt")) as file:
API_KEY = file.read().rstrip()
except FileNotFoundError:
raise FileNotFoundError("Please place your CTA Bus Tracker api key in a text file located at '{}'".format(
os.path.join(config_dir, 'api.txt')))
raise FileNotFoundError(
"Please place your CTA Bus Tracker api key in a text file located at '{}'".format(
os.path.join(config_dir, "api.txt")
)
)
class RecentList:
@ -37,7 +40,7 @@ class RecentList:
def add(self, element):
if element not in self.current:
if len(self.elements)+1 > self.maxsize:
if len(self.elements) + 1 > self.maxsize:
del self.elements[-1]
self.elements.insert(0, element)
self.fresh = True
@ -54,7 +57,7 @@ class RecentList:
def save(self):
if self.fresh:
with open(recent_json, 'w') as file:
with open(recent_json, "w") as file:
json.dump(self.elements, file, sort_keys=True, indent=4)

12
ctabus/internal/disk_cache.py

@ -8,12 +8,12 @@ from ctabus.internal.config import cache_dir
def make_key(*args, **kwargs):
return args, tuple(sorted(
kwargs.items(), key=lambda item: item[0]))
return args, tuple(sorted(kwargs.items(), key=lambda item: item[0]))
class disk_cache:
"""Decorator to make persistent cache"""
caches = []
use_lzma = True
@ -42,10 +42,10 @@ class disk_cache:
def load_cache(self):
try:
if disk_cache.use_lzma:
with lzma.open(self.fname, 'rb') as file:
with lzma.open(self.fname, "rb") as file:
cache = pickle.load(file)
else:
with open(self.fname, 'rb') as file:
with open(self.fname, "rb") as file:
cache = pickle.load(file)
self.fresh = False
except FileNotFoundError:
@ -55,10 +55,10 @@ class disk_cache:
def save_cache(self):
if disk_cache.use_lzma:
with lzma.open(self.fname, 'wb') as file:
with lzma.open(self.fname, "wb") as file:
pickle.dump(self.cache, file, pickle.HIGHEST_PROTOCOL)
else:
with open(self.fname, 'wb') as file:
with open(self.fname, "wb") as file:
pickle.dump(self.cache, file, pickle.HIGHEST_PROTOCOL)
def delete_cache(self):

41
ctabus/internal/print2d.py

@ -16,25 +16,25 @@ def getpager():
return plainpager
if not sys.stdin.isatty() or not sys.stdout.isatty():
return plainpager
use_pager = os.environ.get('MANPAGER') or os.environ.get('PAGER')
use_pager = os.environ.get("MANPAGER") or os.environ.get("PAGER")
if use_pager:
if sys.platform == 'win32': # pipes completely broken in Windows
if sys.platform == "win32": # pipes completely broken in Windows
return lambda text: tempfilepager(plain(text), use_pager)
elif os.environ.get('TERM') in ('dumb', 'emacs'):
elif os.environ.get("TERM") in ("dumb", "emacs"):
return lambda text: pipepager(plain(text), use_pager)
else:
return lambda text: pipepager(text, use_pager)
if os.environ.get('TERM') in ('dumb', 'emacs'):
if os.environ.get("TERM") in ("dumb", "emacs"):
return plainpager
if sys.platform == 'win32':
return lambda text: tempfilepager(plain(text), 'more <')
if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
return lambda text: pipepager(text, 'less -X')
if sys.platform == "win32":
return lambda text: tempfilepager(plain(text), "more <")
if hasattr(os, "system") and os.system("(less) 2>/dev/null") == 0:
return lambda text: pipepager(text, "less -X")
def str_coerce(s, **kwargs):
if isinstance(s, datetime.datetime):
return s.strftime(kwargs['datetime_format'])
return s.strftime(kwargs["datetime_format"])
else:
return str(s)
@ -50,33 +50,34 @@ def create_table(list_param, datetime_format):
def fix_iteration(table: AsciiTable, max_width):
col_length = len(table.table_data[0])
average_size = ((max_width-1) // col_length)
average_size = average_size - \
(1 + table.padding_left + table.padding_right)
sizes = list(map(lambda init_size: 1 + table.padding_left +
init_size + table.padding_right, table.column_widths))
average_size = (max_width - 1) // col_length
average_size = average_size - (1 + table.padding_left + table.padding_right)
sizes = list(
map(
lambda init_size: 1 + table.padding_left + init_size + table.padding_right,
table.column_widths,
)
)
# pick the biggest column to work on
sorted_sizes = sorted(
range(col_length), key=lambda i: sizes[i], reverse=True)
sorted_sizes = sorted(range(col_length), key=lambda i: sizes[i], reverse=True)
workon = sorted_sizes[0]
# either do the maximum possible size or the average size, whichever is larger. The other columns will accommodate.
wrap_to = max(
(
average_size,
(max_width-1) - table.table_width +
table.column_widths[workon]
(max_width - 1) - table.table_width + table.column_widths[workon],
)
)
if wrap_to > 0:
for row in table.table_data:
row[workon] = fill(row[workon], wrap_to+1)
row[workon] = fill(row[workon], wrap_to + 1)
return True
else:
return False
def render_table(table: AsciiTable, interactive=True):
'''Do all wrapping to make the table fit in screen'''
"""Do all wrapping to make the table fit in screen"""
MAX_WIDTH = terminal_size()[0]
table.inner_row_border = True
if not table.table_width < MAX_WIDTH:

31
ctabus/internal/search.py

@ -5,7 +5,7 @@ import edlib
def editdistance(a, b):
return edlib.align(a, b)['editDistance']
return edlib.align(a, b)["editDistance"]
class Search:
@ -27,34 +27,29 @@ class StopSearch(Search):
def __init__(self, query):
super().__init__(query)
query = query.lower()
parts = re.split(r' ?(?:(?<!\w)and(?!\w)|&) ?', query)
self.query = ' & '.join(parts)
self.query_reversed = ' & '.join(reversed(parts))
parts = re.split(r" ?(?:(?<!\w)and(?!\w)|&) ?", query)
self.query = " & ".join(parts)
self.query_reversed = " & ".join(reversed(parts))
def __call__(self, stop):
stop = stop.lower()
paren = re.search(r'\((?P<data>[^\)]+)\)', stop)
ret = [
editdistance(self.query, stop),
editdistance(self.query_reversed, stop),
]
paren = re.search(r"\((?P<data>[^\)]+)\)", stop)
ret = [editdistance(self.query, stop), editdistance(self.query_reversed, stop)]
if paren:
paren = paren.group('data')
paren = paren.group("data")
ret.append(editdistance(self.query, paren))
if self.raw_lower in stop:
ret = (item - 100 for item in ret)
return min(
ret
)
return min(ret)
def __str__(self):
return '{}|{}'.format(self.query, self.query_reversed)
return "{}|{}".format(self.query, self.query_reversed)
if __name__ == "__main__":
with open('stops_out.json') as file:
with open("stops_out.json") as file:
data = json.load(file)
names = [stop['stpnm'] for stop in data['stops']]
names = [stop["stpnm"] for stop in data["stops"]]
while True:
q = StopSearch(input('Search: '))
print('\n'.join(sorted(names, key=q)), end='\n'*3)
q = StopSearch(input("Search: "))
print("\n".join(sorted(names, key=q)), end="\n" * 3)

32
setup.py

@ -1,26 +1,22 @@
from setuptools import setup, find_packages
with open('requirements.txt') as file:
INSTALL_REQUIRES = file.read().rstrip().split('\n')
with open("requirements.txt") as file:
INSTALL_REQUIRES = file.read().rstrip().split("\n")
setup(
name='ctabus',
version='2.1.3',
description='Python package for tracking cta bus times',
name="ctabus",
version="2.1.3",
description="Python package for tracking cta bus times",
install_requires=INSTALL_REQUIRES,
author='rlbr',
author_email='raphael.roberts48@gmail.com',
author="rlbr",
author_email="raphael.roberts48@gmail.com",
packages=find_packages(),
entry_points={
'console_scripts': ['ctabus=ctabus:main']
},
entry_points={"console_scripts": ["ctabus=ctabus:main"]},
classifiers=[
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
]
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
],
)
Loading…
Cancel
Save