Browse Source

Refactor of gen_list and added recently used list. Plans to add config

atexit v2.1
Raphael Roberts 7 years ago
parent
commit
d448db6d02
  1. 142
      ctabus/__init__.py
  2. 13
      ctabus/fetch.py
  3. 52
      ctabus/internal/config.py
  4. 35
      ctabus/internal/disk_cache.py
  5. 2
      setup.py

142
ctabus/__init__.py

@ -15,7 +15,7 @@ import time
import urllib
from ctabus import fetch
from ctabus.internal.config import log_dir
from ctabus.internal.config import log_dir, recent_list
from ctabus.internal.disk_cache import disk_cache, make_key
HAS_TOAST = shutil.which('termux-toast') is not None
@ -32,7 +32,8 @@ parser.add_argument('-r', '--route', default=None)
parser.add_argument('-d', '--direction', default=None)
parser.add_argument('-t', '--disable_toast', action='store_false')
parser.add_argument('-k', '--kill-cache', action="store_true")
parser.add_argument('arg', nargs='+', metavar='(stop-id | cross streets)')
parser.add_argument(
'arg', nargs='*' if len(recent_list.elements) > 0 else '+', metavar='(stop-id | cross streets)', default=[':recents:'])
def toast(text):
@ -90,44 +91,88 @@ def pprint_delta(delta):
return ret
def gen_list(objs, data, *displays, key=None, sort=0, num_pic=True):
from ctabus.internal.print2d import create_table, render_table
# sort based on column number
k = displays[sort]
display_data = {obj[k]: obj[data] for obj in objs}
srt_keys = sorted(display_data.keys(), key=key)
display = sorted(
[
[obj[d] for d in displays] for obj in objs
],
key=lambda row: key(row[sort]) if key else row[sort]
)
if num_pic:
display = [[i] + data for i, data in enumerate(display)]
table = create_table(display, DATETIME_FORMAT)
render_table(table)
if num_pic:
class Table:
imported = False
def __init__(self, dicts):
self.dicts = dicts
self.display = None
def sort_by(self, key_name, key_function):
if key_function is None:
def key(obj): return obj[key_name]
else:
def key(obj): return key_function(obj[key_name])
self.dicts = sorted(
self.dicts, key=key)
def set_display_categories(self, categories):
self.display = [
[
_dict[column_name] for column_name in categories
] for _dict in self.dicts
]
def get_index_interactive(self):
from ctabus.internal.print2d import create_table, render_table
if self.display is None:
raise Exception("Display columns not set")
display = [
[i] + row for i, row in enumerate(self.display)
]
table = create_table(display, DATETIME_FORMAT)
render_table(table)
which = None
while not which:
while which is None:
try:
which = input('Which one?: ')
except KeyboardInterrupt:
quit()
try:
which = srt_keys[int(which)]
except (ValueError, IndexError):
which = int(input('Which one?: '))
if which >= len(self.dicts):
print("Max index is {}".format(len(self.dicts)-1))
except ValueError:
which = None
return display_data[which]
else:
ret = None
while not ret:
try:
ret = display_data[input('Which one?: ')]
except KeyError:
pass
return ret
return which
def get_name_interactive(self, data_column):
from ctabus.internal.print2d import create_table, render_table
if self.display is None:
raise Exception("Display columns not set")
display = self.display
table = create_table(display, DATETIME_FORMAT)
render_table(table)
which = None
valid = set(_dict[data_column] for _dict in self.dicts)
while which is None:
which = input('Which one?: ')
if which not in valid:
which = None
return which
def gen_list(objs, data, *displays, key=None, sort=0, num_pic=True):
"""Returns an item from objs[data] based on name or index
:param objs: list of dicts that contain data and display
:param data: name of dictionary key to return
:param displays: things to show the user
:param key: function to use for sorting
:param sort: which column of the displays should be used for sorting
:param num_pic: indicate whether or not to use indices to pick value
"""
display_table = Table(objs)
display_table.sort_by(displays[sort], key)
display_table.set_display_categories(displays)
try:
if num_pic:
which = display_table.get_index_interactive()
return display_table.dicts[which][data]
else:
which = display_table.get_name_interactive(data)
return which
except KeyboardInterrupt:
quit()
config = '''\
@ -199,7 +244,15 @@ def _picker(args):
def _picker_recent(args):
pass
recent_stops = []
for stop in recent_list.elements:
info = fetch.get_data_from_stop_id(stop)
recent_stops.append(info)
display_table = Table(recent_stops)
display_table.set_display_categories(['stop_id', 'stop_name',
'route_number', 'route_direction', 'route_name'])
index = display_table.get_index_interactive()
return recent_list.get(index)
def _main_periodic(args, stop_id, init_data):
@ -232,15 +285,17 @@ def main(args=None):
for cache_obj in disk_cache.caches:
cache_obj.delete_cache()
args.arg = ' '.join(args.arg)
from_recent = False
if args.arg.isdecimal():
stop_id = args.arg
else:
if args.arg == ':recent:':
pass
if args.arg == ':recents:':
stop_id = _picker_recent(args)
from_recent = True
else:
stop_id = _picker(args)
if not from_recent:
recent_list.add(stop_id)
data = fetch.get_times(stop_id)
info = data['prd'][0]
key = make_key(info['rt'], info['rtdir'], fetch.api, None)
@ -252,9 +307,6 @@ def main(args=None):
_main_periodic(args, stop_id, data)
else:
show(data, args.route)
for cache_obj in disk_cache.caches:
if cache_obj.fresh:
cache_obj.save_cache()
if __name__ == '__main__':

13
ctabus/fetch.py

@ -48,3 +48,16 @@ def get_name_from_direction(route, direction, api_key=api, timeout=None):
test_stop = get_stops(route, direction, api_key=api_key,
timeout=timeout)['stops'][0]['stpid']
return get_times(test_stop, api_key=api, timeout=timeout)['prd'][0]['des']
@disk_cache
def get_data_from_stop_id(stop_id):
info = get_times(stop_id)['prd'][0]
ret = {
'route_direction': info['rtdir'],
'route_name': info['des'],
'route_number': info['rt'],
'stop_id': stop_id,
'stop_name': info['stpnm'],
}
return ret

52
ctabus/internal/config.py

@ -1,3 +1,5 @@
import atexit
import json
import os
import appdirs
@ -6,12 +8,60 @@ app_dirs = appdirs.AppDirs('ctabus')
config_dir = app_dirs.user_config_dir
cache_dir = app_dirs.user_cache_dir
log_dir = app_dirs.user_log_dir
for dir in (config_dir, cache_dir, log_dir):
state_dir = app_dirs.user_state_dir
for dir in (config_dir, cache_dir, log_dir, state_dir):
if not os.path.exists(dir):
os.makedirs(dir)
recent_json = os.path.join(state_dir, "recent.json")
try:
with open(os.path.join(config_dir, 'api.txt')) as file:
API_KEY = file.read().rstrip()
except FileNotFoundError:
raise FileNotFoundError("Please place your CTA Bus Tracker api key in a text file located at '{}'".format(
os.path.join(config_dir, 'api.txt')))
class RecentList:
def __init__(self, maxsize=10):
self.maxsize = maxsize
try:
with open(recent_json) as file:
self.elements = json.load(file)
self.fresh = False
except FileNotFoundError:
self.elements = []
self.fresh = True
def add(self, element):
if len(self.elements)+1 > self.maxsize:
del self.elements[-1]
self.elements.insert(0, element)
self.fresh = True
def get(self, element_name_or_index):
if type(element_name_or_index) == int:
index = element_name_or_index
else:
index = self.elements.index(element)
ret = self.elements.pop(index)
self.elements.insert(0, ret)
self.fresh = True
return ret
def save(self):
if self.fresh:
with open(recent_json, 'w') as file:
json.dump(self.elements, file, sort_keys=True, indent=4)
recent_list = RecentList()
def save_if_modified():
if recent_list.fresh:
recent_list.save()
atexit.register(save_if_modified)

35
ctabus/internal/disk_cache.py

@ -1,6 +1,7 @@
import lzma
import os
import pickle
import atexit
from ctabus.internal.config import cache_dir
@ -14,9 +15,14 @@ def make_key(*args, **kwargs):
class disk_cache:
"""Decorator to make persistent cache"""
caches = []
use_lzma = True
def __init__(self, func):
self.fname = "{}.{}.dc".format(func.__module__, func.__name__)
if disk_cache.use_lzma:
self.fname = "{}.{}.dc.lzma".format(func.__module__, func.__name__)
else:
self.fname = "{}.{}.dc".format(func.__module__, func.__name__)
self.fname = os.path.join(cache_dir, self.fname)
self.func = func
self.load_cache()
@ -35,19 +41,36 @@ class disk_cache:
def load_cache(self):
try:
with lzma.open(self.fname, 'rb') as file:
cache = pickle.load(file)
self.fresh = False
if disk_cache.use_lzma:
with lzma.open(self.fname, 'rb') as file:
cache = pickle.load(file)
else:
with open(self.fname, 'rb') as file:
cache = pickle.load(file)
self.fresh = False
except FileNotFoundError:
cache = {}
self.fresh = True
self.cache = cache
def save_cache(self):
with lzma.open(self.fname, 'wb') as file:
pickle.dump(self.cache, file, pickle.HIGHEST_PROTOCOL)
if disk_cache.use_lzma:
with lzma.open(self.fname, 'wb') as file:
pickle.dump(self.cache, file, pickle.HIGHEST_PROTOCOL)
else:
with open(self.fname, 'wb') as file:
pickle.dump(self.cache, file, pickle.HIGHEST_PROTOCOL)
def delete_cache(self):
os.remove(self.fname)
self.cache = {}
self.fresh = True
def save_if_modified():
for cache_obj in disk_cache.caches:
if cache_obj.fresh:
cache_obj.save_cache()
atexit.register(save_if_modified)

2
setup.py

@ -5,7 +5,7 @@ with open('requirements.txt') as file:
setup(
name='ctabus',
version='2.0',
version='2.1',
description='Python package for tracking cta bus times',
install_requires=INSTALL_REQUIRES,
author='rlbr',

Loading…
Cancel
Save