import datetime from pathlib import Path from subprocess import list2cmdline from typing import Union import paramiko from openwrt_backup.config import HOSTNAME, NUMBER def client_from_config(name: str, ssh_config_root: Path): """Create a client from a Host entry in the .ssh/config file""" config = paramiko.config.SSHConfig() cp = ssh_config_root / "config" with open(cp) as file: config.parse(file) info = config.lookup(name) info["username"] = info.pop("user", None) info["key_filename"] = info.pop("identityfile", None) info["port"] = int(info.pop("port", 22)) params = dict( hostname=None, port=22, username=None, password=None, pkey=None, key_filename=None, timeout=None, allow_agent=True, look_for_keys=True, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, banner_timeout=None, auth_timeout=None, gss_trust_dns=True, passphrase=None, disabled_algorithms=None, ) for key, value in info.items(): if key in params.keys(): params[key] = value client = paramiko.client.SSHClient() client.load_host_keys(ssh_config_root / "known_hosts") client.connect(**params) return client def exec_remote(client: paramiko.SSHClient, command: Union[str, list]): """Function to execute specified command on conection""" if not isinstance(command, str): command = list2cmdline(command) stdin, stdout, stderr = client.exec_command(command) return stdout, stderr def get_next_number(backup_path: Path, hostname): f_names = (path.name for path in backup_path.glob("*.tar.gz")) f_names_fields = map(lambda s: s.split("__"), f_names) f_names_fields_filtered = filter( lambda fields: fields[HOSTNAME] == hostname, f_names_fields ) try: return max(int(fields[NUMBER]) for fields in f_names_fields_filtered) + 1 except ValueError: return 0 def get_new_backup_filepath(backup_path: Path, time_format, hostname): next_number = get_next_number(backup_path, hostname) dt = datetime.datetime.today() return ( backup_path / f"{hostname}__{next_number:03d}__{dt.strftime(time_format)}.tar.gz" )