Browse Source

The NotificationManager is working, still need to integrate it

atexit
Raphael Roberts 6 years ago
parent
commit
26a17f2fcf
  1. 89
      ctabus/internal/notification.py

89
ctabus/internal/notification.py

@ -1,68 +1,91 @@
from concurrent.futures import ThreadPoolExecutor
from io import StringIO
import os
import subprocess
import uuid
from ctabus.internal.config import state_dir
EXECUTOR = ThreadPoolExecutor()
NOTIFICATION_EXE = "termux-notification"
class NotificationManager:
NEXT_ID = 0
class NotificationException(Exception):
def __init__(self, message):
self.message = message
def __init__(self, workdir="."):
class NotificationManager:
def __init__(self, workdir=state_dir):
self.workdir = workdir
self.id = NotificationManager.NEXT_ID
NotificationManager.NEXT_ID += 1
self.id = uuid.uuid4().hex
self.title = None
self.message = None
self.listener_future = None
self.listen_fifo_path = None
self.title = None
self._posting = False
self._poster = None
self._live = True
def on_done(self):
pass
def set_title(self, title):
self.title = title
self.show()
def say(self, message):
self.message = message
if self.listener_future is None:
self.setup_listener()
self.show()
def _cleanup(self):
self.on_done()
self._live = False
os.remove(self.listen_fifo_path)
def _listen(self):
stop_cond = str(id)
with open(self.listen_fifo_path) as fifo:
if fifo.read(len(stop_cond)) == stop_cond:
self._cleanup()
def setup_listener(self):
out = None
while out != self.id:
with open(self.listen_fifo_path) as fifo:
out = fifo.read(len(self.id)).rstrip()
self._cleanup()
def _setup_listener(self):
self.listen_fifo_path = os.path.abspath(
os.path.join(self.workdir, "{}.{}".format(self.__class__, self.id))
os.path.join(self.workdir, "{}.{}".format(self.__class__.__name__, self.id))
)
os.mkfifo(self.listen_fifo_path)
self.listener_future = EXECUTOR.submit(self._listen)
def cleanup(self):
os.remove(self.listen_fifo_path)
def show(self):
cmdline = [NOTIFICATION_EXE]
if self.message is None:
message = "Placeholder"
else:
message = self.message
if self.title is None:
title = "Placeholder"
else:
title = self.title
cmdline += ["--title", self.title]
cmdline += [
if not self._live:
raise NotificationException(
"Attempted show on cleaned-up NotificationManager"
)
if self.listener_future is None:
self._setup_listener()
if self._posting:
self._poster.wait()
self._posting = False
cmdline = [
NOTIFICATION_EXE,
"--id",
self.id,
"--on-delete",
"echo {} > {}".format(self.id, self.listen_fifo_path),
]
subprocess.Popen(cmdline, stdin=StringIO(message))
message = self.message if self.message is not None else "Placeholder"
title = self.title if self.title is not None else "Placeholder"
cmdline += ["--title", title]
with subprocess.Popen(cmdline, stdin=subprocess.PIPE, text=True) as proc:
proc.communicate(input=message)
self._poster = proc
self._posting = True
if __name__ == "__main__":

Loading…
Cancel
Save