You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

78 lines
2.3 KiB

  1. import datetime
  2. from pathlib import Path
  3. from subprocess import list2cmdline
  4. from typing import Union
  5. import paramiko
  6. from openwrt_backup.config import HOSTNAME, NUMBER
  7. def client_from_config(name: str, ssh_config_root: Path):
  8. """Create a client from a Host entry in the .ssh/config file"""
  9. config = paramiko.config.SSHConfig()
  10. cp = ssh_config_root / "config"
  11. with open(cp) as file:
  12. config.parse(file)
  13. info = config.lookup(name)
  14. info["username"] = info.pop("user", None)
  15. info["key_filename"] = info.pop("identityfile", None)
  16. info["port"] = int(info.pop("port", 22))
  17. params = dict(
  18. hostname=None,
  19. port=22,
  20. username=None,
  21. password=None,
  22. pkey=None,
  23. key_filename=None,
  24. timeout=None,
  25. allow_agent=True,
  26. look_for_keys=True,
  27. compress=False,
  28. sock=None,
  29. gss_auth=False,
  30. gss_kex=False,
  31. gss_deleg_creds=True,
  32. gss_host=None,
  33. banner_timeout=None,
  34. auth_timeout=None,
  35. gss_trust_dns=True,
  36. passphrase=None,
  37. disabled_algorithms=None,
  38. )
  39. for key, value in info.items():
  40. if key in params.keys():
  41. params[key] = value
  42. client = paramiko.client.SSHClient()
  43. client.load_host_keys(ssh_config_root / "known_hosts")
  44. client.connect(**params)
  45. return client
  46. def exec_remote(client: paramiko.SSHClient, command: Union[str, list]):
  47. """Function to execute specified command on conection"""
  48. if not isinstance(command, str):
  49. command = list2cmdline(command)
  50. stdin, stdout, stderr = client.exec_command(command)
  51. return stdout, stderr
  52. def get_next_number(backup_path: Path, hostname):
  53. f_names = (path.name for path in backup_path.glob("*.tar.gz"))
  54. f_names_fields = map(lambda s: s.split("__"), f_names)
  55. f_names_fields_filtered = filter(
  56. lambda fields: fields[HOSTNAME] == hostname, f_names_fields
  57. )
  58. try:
  59. return max(int(fields[NUMBER]) for fields in f_names_fields_filtered) + 1
  60. except ValueError:
  61. return 0
  62. def get_new_backup_filepath(backup_path: Path, time_format, hostname):
  63. next_number = get_next_number(backup_path, hostname)
  64. dt = datetime.datetime.today()
  65. return (
  66. backup_path
  67. / f"{hostname}__{next_number:03d}__{dt.strftime(time_format)}.tar.gz"
  68. )