You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
78 lines
2.3 KiB
78 lines
2.3 KiB
import datetime
|
|
from pathlib import Path
|
|
from subprocess import list2cmdline
|
|
from typing import Union
|
|
|
|
import paramiko
|
|
|
|
from openwrt_backup.config import HOSTNAME, NUMBER
|
|
|
|
|
|
def client_from_config(name: str, ssh_config_root: Path):
|
|
"""Create a client from a Host entry in the .ssh/config file"""
|
|
config = paramiko.config.SSHConfig()
|
|
cp = ssh_config_root / "config"
|
|
with open(cp) as file:
|
|
config.parse(file)
|
|
info = config.lookup(name)
|
|
info["username"] = info.pop("user", None)
|
|
info["key_filename"] = info.pop("identityfile", None)
|
|
info["port"] = int(info.pop("port", 22))
|
|
params = dict(
|
|
hostname=None,
|
|
port=22,
|
|
username=None,
|
|
password=None,
|
|
pkey=None,
|
|
key_filename=None,
|
|
timeout=None,
|
|
allow_agent=True,
|
|
look_for_keys=True,
|
|
compress=False,
|
|
sock=None,
|
|
gss_auth=False,
|
|
gss_kex=False,
|
|
gss_deleg_creds=True,
|
|
gss_host=None,
|
|
banner_timeout=None,
|
|
auth_timeout=None,
|
|
gss_trust_dns=True,
|
|
passphrase=None,
|
|
disabled_algorithms=None,
|
|
)
|
|
for key, value in info.items():
|
|
if key in params.keys():
|
|
params[key] = value
|
|
client = paramiko.client.SSHClient()
|
|
client.load_host_keys(ssh_config_root / "known_hosts")
|
|
client.connect(**params)
|
|
return client
|
|
|
|
|
|
def exec_remote(client: paramiko.SSHClient, command: Union[str, list]):
|
|
"""Function to execute specified command on conection"""
|
|
if not isinstance(command, str):
|
|
command = list2cmdline(command)
|
|
stdin, stdout, stderr = client.exec_command(command)
|
|
return stdout, stderr
|
|
|
|
|
|
def get_next_number(backup_path: Path, hostname):
|
|
f_names = (path.name for path in backup_path.glob("*.tar.gz"))
|
|
f_names_fields = map(lambda s: s.split("__"), f_names)
|
|
f_names_fields_filtered = filter(
|
|
lambda fields: fields[HOSTNAME] == hostname, f_names_fields
|
|
)
|
|
try:
|
|
return max(int(fields[NUMBER]) for fields in f_names_fields_filtered) + 1
|
|
except ValueError:
|
|
return 0
|
|
|
|
|
|
def get_new_backup_filepath(backup_path: Path, time_format, hostname):
|
|
next_number = get_next_number(backup_path, hostname)
|
|
dt = datetime.datetime.today()
|
|
return (
|
|
backup_path
|
|
/ f"{hostname}__{next_number:03d}__{dt.strftime(time_format)}.tar.gz"
|
|
)
|