You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

73 lines
2.2 KiB

  1. import datetime
  2. from pathlib import Path
  3. from subprocess import list2cmdline
  4. from typing import Union
  5. import paramiko
  6. def client_from_config(name: str, ssh_config_root: Path):
  7. """Create a client from a Host entry in the .ssh/config file"""
  8. config = paramiko.config.SSHConfig()
  9. cp = ssh_config_root / "config"
  10. with open(cp) as file:
  11. config.parse(file)
  12. info = config.lookup(name)
  13. info["username"] = info.pop("user", None)
  14. info["key_filename"] = info.pop("identityfile", None)
  15. info["port"] = int(info.pop("port", 22))
  16. params = dict(
  17. hostname=None,
  18. port=22,
  19. username=None,
  20. password=None,
  21. pkey=None,
  22. key_filename=None,
  23. timeout=None,
  24. allow_agent=True,
  25. look_for_keys=True,
  26. compress=False,
  27. sock=None,
  28. gss_auth=False,
  29. gss_kex=False,
  30. gss_deleg_creds=True,
  31. gss_host=None,
  32. banner_timeout=None,
  33. auth_timeout=None,
  34. gss_trust_dns=True,
  35. passphrase=None,
  36. disabled_algorithms=None,
  37. )
  38. for key, value in info.items():
  39. if key in params.keys():
  40. params[key] = value
  41. client = paramiko.client.SSHClient()
  42. client.load_host_keys(ssh_config_root / "known_hosts")
  43. client.connect(**params)
  44. return client
  45. def exec_remote(client: paramiko.SSHClient, command: Union[str, list]):
  46. """Function to execute specified command on conection"""
  47. if not isinstance(command, str):
  48. command = list2cmdline(command)
  49. stdin, stdout, stderr = client.exec_command(command)
  50. return stdout, stderr
  51. def get_next_number(backup_path: Path, hostname):
  52. f_names = (path.name for path in backup_path.glob("*.tar.gz"))
  53. f_names_fields = map(lambda s: s.split("__"), f_names)
  54. f_names_fields_filtered = filter(
  55. lambda fields: fields[1] == hostname, f_names_fields
  56. )
  57. try:
  58. return max(int(fields[0]) for fields in f_names_fields_filtered) + 1
  59. except ValueError:
  60. return 0
  61. def get_new_backup_filepath(backup_path: Path, time_format, hostname):
  62. next_number = get_next_number(backup_path, hostname)
  63. dt = datetime.datetime.today()
  64. return backup_path / f"{next_number}__{hostname}__{dt.strftime(time_format)}.tar.gz"