1 changed files with 109 additions and 0 deletions
@ -0,0 +1,109 @@ |
|||
import posixpath |
|||
|
|||
|
|||
class File: |
|||
"""File abstraction. Only real method is to get fullpath""" |
|||
|
|||
def __init__(self, name, parent_dir): |
|||
self.name = name |
|||
self.parent_dir = parent_dir |
|||
self.fullpath = None |
|||
|
|||
def get_fullpath(self, real_time=False): |
|||
if self.fullpath is None or real_time: |
|||
if self.parent is None: |
|||
self.fullpath = self.name |
|||
else: |
|||
self.fullpath = posixpath.join( |
|||
self.parent_dir.get_fullpath(real_time), self.name) |
|||
return self.fullpath |
|||
|
|||
|
|||
class Directory(File): |
|||
"""Directory abstraction with optional listener""" |
|||
|
|||
def __init__(self, name, is_listening=False): |
|||
super().__init__(name, None) |
|||
self.children = {} |
|||
self.is_listening = is_listening |
|||
if is_listening: |
|||
self.modified = False |
|||
|
|||
def __index__(self, key): |
|||
return self.children[key] |
|||
|
|||
def mark_modified(self): |
|||
if self.is_listening: |
|||
self.modified = True |
|||
if self.parent_dir: |
|||
self.parent_dir.mark_modified() |
|||
|
|||
def traverse(self, path, create_intermediate=False, notify=True): |
|||
"""Traverse filesystem returning either file or directory""" |
|||
if type(path) == str: |
|||
path = path.split('/') |
|||
|
|||
if len(path) == 1: |
|||
try: |
|||
return self[path[0]] |
|||
except KeyError: |
|||
raise FileNotFoundError |
|||
|
|||
_next, *rest = path |
|||
if _next == "..": |
|||
if self.parent_dir: |
|||
selected = self.parent_dir |
|||
else: |
|||
selected = self |
|||
elif _next == '.': |
|||
selected = self |
|||
else: |
|||
try: |
|||
selected = self[_next] |
|||
except KeyError: |
|||
if create_intermediate: |
|||
new = Directory(_next, self.is_listening) |
|||
self.add_child(new, notify) |
|||
selected = new |
|||
else: |
|||
raise FileNotFoundError |
|||
return selected.traverse( |
|||
rest, create_intermediate=create_intermediate, |
|||
notify=notify |
|||
) |
|||
|
|||
def add_child(self, child, notify=True): |
|||
if notify and self.is_listening: |
|||
self.mark_modified() |
|||
self.children[child.name] = child |
|||
child.parent_dir = self |
|||
|
|||
def remove_child(self, child_name, notify=True): |
|||
if notify and self.is_listening: |
|||
self.mark_modified() |
|||
del self[child_name] |
|||
|
|||
def iter_files(self): |
|||
for child in self.children.values(): |
|||
if isinstance(child, Directory): |
|||
yield from child.iter_files() |
|||
else: |
|||
yield child |
|||
|
|||
@classmethod |
|||
def from_dir_listing( |
|||
cls, |
|||
listing, |
|||
rootname="", |
|||
sep='/', |
|||
is_listening=False |
|||
): |
|||
"""Create a Directory object with files from file listing""" |
|||
root = Directory(rootname, is_listening=is_listening) |
|||
for file in listing: |
|||
parts, file = file.split(sep) |
|||
parent = root.traverse( |
|||
parts, create_intermediate=True, notify=False) |
|||
file = File(file) |
|||
parent.add_child(file, notify=False) |
|||
return root |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue