From 729fd2fbe7094a3fe4ea7c68dcd7ddf9c77d56c0 Mon Sep 17 00:00:00 2001 From: Raphael Roberts Date: Tue, 16 Apr 2019 17:15:54 -0500 Subject: [PATCH] Added directory abstraction --- pyadb/internal/directory.py | 109 ++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 pyadb/internal/directory.py diff --git a/pyadb/internal/directory.py b/pyadb/internal/directory.py new file mode 100644 index 0000000..0c24817 --- /dev/null +++ b/pyadb/internal/directory.py @@ -0,0 +1,109 @@ +import posixpath + + +class File: + """File abstraction. Only real method is to get fullpath""" + + def __init__(self, name, parent_dir): + self.name = name + self.parent_dir = parent_dir + self.fullpath = None + + def get_fullpath(self, real_time=False): + if self.fullpath is None or real_time: + if self.parent is None: + self.fullpath = self.name + else: + self.fullpath = posixpath.join( + self.parent_dir.get_fullpath(real_time), self.name) + return self.fullpath + + +class Directory(File): + """Directory abstraction with optional listener""" + + def __init__(self, name, is_listening=False): + super().__init__(name, None) + self.children = {} + self.is_listening = is_listening + if is_listening: + self.modified = False + + def __index__(self, key): + return self.children[key] + + def mark_modified(self): + if self.is_listening: + self.modified = True + if self.parent_dir: + self.parent_dir.mark_modified() + + def traverse(self, path, create_intermediate=False, notify=True): + """Traverse filesystem returning either file or directory""" + if type(path) == str: + path = path.split('/') + + if len(path) == 1: + try: + return self[path[0]] + except KeyError: + raise FileNotFoundError + + _next, *rest = path + if _next == "..": + if self.parent_dir: + selected = self.parent_dir + else: + selected = self + elif _next == '.': + selected = self + else: + try: + selected = self[_next] + except KeyError: + if create_intermediate: + new = Directory(_next, self.is_listening) + self.add_child(new, notify) + selected = new + else: + raise FileNotFoundError + return selected.traverse( + rest, create_intermediate=create_intermediate, + notify=notify + ) + + def add_child(self, child, notify=True): + if notify and self.is_listening: + self.mark_modified() + self.children[child.name] = child + child.parent_dir = self + + def remove_child(self, child_name, notify=True): + if notify and self.is_listening: + self.mark_modified() + del self[child_name] + + def iter_files(self): + for child in self.children.values(): + if isinstance(child, Directory): + yield from child.iter_files() + else: + yield child + + @classmethod + def from_dir_listing( + cls, + listing, + rootname="", + sep='/', + is_listening=False + ): + """Create a Directory object with files from file listing""" + root = Directory(rootname, is_listening=is_listening) + for file in listing: + parts, file = file.split(sep) + parent = root.traverse( + parts, create_intermediate=True, notify=False) + file = File(file) + parent.add_child(file, notify=False) + return root