|
|
@ -0,0 +1,69 @@ |
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
|
from io import StringIO |
|
|
|
|
|
import os |
|
|
|
|
|
|
|
|
|
|
|
import subprocess |
|
|
|
|
|
|
|
|
|
|
|
EXECUTOR = ThreadPoolExecutor() |
|
|
|
|
|
NOTIFICATION_EXE = "termux-notification" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NotificationManager: |
|
|
|
|
|
NEXT_ID = 0 |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, workdir="."): |
|
|
|
|
|
self.workdir = workdir |
|
|
|
|
|
self.id = NotificationManager.NEXT_ID |
|
|
|
|
|
NotificationManager.NEXT_ID += 1 |
|
|
|
|
|
self.message = None |
|
|
|
|
|
self.listener_future = None |
|
|
|
|
|
self.listen_fifo_path = None |
|
|
|
|
|
self.title = None |
|
|
|
|
|
|
|
|
|
|
|
def set_title(self, title): |
|
|
|
|
|
self.title = title |
|
|
|
|
|
self.show() |
|
|
|
|
|
|
|
|
|
|
|
def say(self, message): |
|
|
|
|
|
self.message = message |
|
|
|
|
|
if self.listener_future is None: |
|
|
|
|
|
self.setup_listener() |
|
|
|
|
|
self.show() |
|
|
|
|
|
|
|
|
|
|
|
def _listen(self): |
|
|
|
|
|
stop_cond = str(id) |
|
|
|
|
|
with open(self.listen_fifo_path) as fifo: |
|
|
|
|
|
if fifo.read(len(stop_cond)) == stop_cond: |
|
|
|
|
|
self._cleanup() |
|
|
|
|
|
|
|
|
|
|
|
def setup_listener(self): |
|
|
|
|
|
self.listen_fifo_path = os.path.abspath( |
|
|
|
|
|
os.path.join(self.workdir, "{}.{}".format(self.__class__, self.id)) |
|
|
|
|
|
) |
|
|
|
|
|
os.mkfifo(self.listen_fifo_path) |
|
|
|
|
|
self.listener_future = EXECUTOR.submit(self._listen) |
|
|
|
|
|
|
|
|
|
|
|
def cleanup(self): |
|
|
|
|
|
os.remove(self.listen_fifo_path) |
|
|
|
|
|
|
|
|
|
|
|
def show(self): |
|
|
|
|
|
cmdline = [NOTIFICATION_EXE] |
|
|
|
|
|
if self.message is None: |
|
|
|
|
|
message = "Placeholder" |
|
|
|
|
|
else: |
|
|
|
|
|
message = self.message |
|
|
|
|
|
if self.title is None: |
|
|
|
|
|
title = "Placeholder" |
|
|
|
|
|
else: |
|
|
|
|
|
title = self.title |
|
|
|
|
|
|
|
|
|
|
|
cmdline += ["--title", self.title] |
|
|
|
|
|
cmdline += [ |
|
|
|
|
|
"--on-delete", |
|
|
|
|
|
"echo {} > {}".format(self.id, self.listen_fifo_path), |
|
|
|
|
|
] |
|
|
|
|
|
subprocess.Popen(cmdline, stdin=StringIO(message)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
nm = NotificationManager() |