commit
5ff71cb9be
10 changed files with 271 additions and 0 deletions
-
2.gitignore
-
1MANIFEST.in
-
0requirements.txt
-
15setup.py
-
1ssh_config_utils/__init__.py
-
90ssh_config_utils/client_keywords.txt
-
57ssh_config_utils/config_file.py
-
34ssh_config_utils/host.py
-
40ssh_config_utils/parser.py
-
31ssh_config_utils/serializer.py
@ -0,0 +1,2 @@ |
|||||
|
/tests |
||||
|
__pycache__ |
||||
@ -0,0 +1 @@ |
|||||
|
include ssh_config_utils/client_keywords.txt |
||||
@ -0,0 +1,15 @@ |
|||||
|
from setuptools import setup, find_packages |
||||
|
|
||||
|
with open("requirements.txt") as file: |
||||
|
INSTALL_REQUIRES = file.read().rstrip().split("\n") |
||||
|
|
||||
|
setup( |
||||
|
name="ssh-config-utils", |
||||
|
version="1.0", |
||||
|
description="Easy way to modify and sync ssh config files", |
||||
|
install_requires=INSTALL_REQUIRES, |
||||
|
author="Raphael Roberts", |
||||
|
author_email="raphael.roberts48@gmail.com", |
||||
|
packages=find_packages(), |
||||
|
entry_points={"console_scripts": ["ssh-config=ssh_config_utils:main"]}, |
||||
|
) |
||||
@ -0,0 +1 @@ |
|||||
|
import argparse |
||||
@ -0,0 +1,90 @@ |
|||||
|
AddKeysToAgent |
||||
|
AddressFamily |
||||
|
BatchMode |
||||
|
BindAddress |
||||
|
BindInterface |
||||
|
CASignatureAlgorithms |
||||
|
CanonicalDomains |
||||
|
CanonicalizeFallbackLocal |
||||
|
CanonicalizeHostname |
||||
|
CanonicalizeMaxDots |
||||
|
CanonicalizePermittedCNAMEs |
||||
|
CertificateFile |
||||
|
ChallengeResponseAuthentication |
||||
|
CheckHostIP |
||||
|
Ciphers |
||||
|
ClearAllForwardings |
||||
|
Compression |
||||
|
ConnectTimeout |
||||
|
ConnectionAttempts |
||||
|
ControlMaster |
||||
|
ControlPath |
||||
|
ControlPersist |
||||
|
DynamicForward |
||||
|
EnableSSHKeysign |
||||
|
EscapeChar |
||||
|
ExitOnForwardFailure |
||||
|
FingerprintHash |
||||
|
ForwardAgent |
||||
|
ForwardX11 |
||||
|
ForwardX11Timeout |
||||
|
ForwardX11Trusted |
||||
|
GSSAPIAuthentication |
||||
|
GSSAPIDelegateCredentials |
||||
|
GatewayPorts |
||||
|
GlobalKnownHostsFile |
||||
|
HashKnownHosts |
||||
|
Host |
||||
|
HostKeyAlgorithms |
||||
|
HostKeyAlias |
||||
|
HostbasedAuthentication |
||||
|
HostbasedKeyTypes |
||||
|
Hostname |
||||
|
IPQoS |
||||
|
IdentitiesOnly |
||||
|
IdentityAgent |
||||
|
IdentityFile |
||||
|
IgnoreUnknown |
||||
|
Include |
||||
|
KbdInteractiveAuthentication |
||||
|
KbdInteractiveDevices |
||||
|
KexAlgorithms |
||||
|
LocalCommand |
||||
|
LocalForward |
||||
|
LogLevel |
||||
|
MACs |
||||
|
Match |
||||
|
NoHostAuthenticationForLocalhost |
||||
|
NumberOfPasswordPrompts |
||||
|
PKCS11Provider |
||||
|
PasswordAuthentication |
||||
|
PermitLocalCommand |
||||
|
Port |
||||
|
PreferredAuthentications |
||||
|
ProxyCommand |
||||
|
ProxyJump |
||||
|
ProxyUseFdpass |
||||
|
PubkeyAcceptedKeyTypes |
||||
|
PubkeyAuthentication |
||||
|
RekeyLimit |
||||
|
RemoteCommand |
||||
|
RemoteForward |
||||
|
RequestTTY |
||||
|
RevokedHostKeys |
||||
|
SendEnv |
||||
|
ServerAliveCountMax |
||||
|
ServerAliveInterval |
||||
|
SetEnv |
||||
|
StreamLocalBindMask |
||||
|
StreamLocalBindUnlink |
||||
|
StrictHostKeyChecking |
||||
|
SyslogFacility |
||||
|
TCPKeepAlive |
||||
|
Tunnel |
||||
|
TunnelDevice |
||||
|
UpdateHostKeys |
||||
|
User |
||||
|
UserKnownHostsFile |
||||
|
VerifyHostKeyDNS |
||||
|
VisualHostKey |
||||
|
XAuthLocation |
||||
@ -0,0 +1,57 @@ |
|||||
|
from pathlib import Path |
||||
|
|
||||
|
from ssh_config_utils.host import GlobalHost, Host |
||||
|
from ssh_config_utils.parser import parse_config_text |
||||
|
|
||||
|
|
||||
|
class ConfigFile: |
||||
|
def __init__(self, global_host, hosts): |
||||
|
self.hosts = {} |
||||
|
global_host = global_host |
||||
|
for host in hosts: |
||||
|
self.hosts.setdefault(host.name, host) |
||||
|
|
||||
|
@classmethod |
||||
|
def read_file(cls, fp): |
||||
|
if isinstance(fp, (str, bytes, Path)): |
||||
|
with open(fp) as file: |
||||
|
text = file.read() |
||||
|
else: |
||||
|
text = fp.read() |
||||
|
|
||||
|
data = parse_config_text(text) |
||||
|
global_host = None |
||||
|
hosts = [] |
||||
|
|
||||
|
for host_data in data: |
||||
|
for key, values in host_data.items(): |
||||
|
if len(values) == 1: |
||||
|
host_data[key] = values[0] |
||||
|
if host_data["host"] == "*": |
||||
|
if global_host is None: |
||||
|
del host_data["host"] |
||||
|
global_host = GlobalHost(host_data) |
||||
|
else: |
||||
|
name = host_data.pop("host") |
||||
|
hosts.append(Host(name, host_data)) |
||||
|
return cls(global_host, hosts) |
||||
|
|
||||
|
def __str__(self): |
||||
|
return self.format(indent=" " * 2, host_seperator="\n" * 2) |
||||
|
|
||||
|
def format(self, **format_data): |
||||
|
hosts = sorted(self.hosts.values(), key=lambda host: host.name) |
||||
|
return format_data["host_seperator"].join( |
||||
|
map(lambda host: host.format(**format_data), hosts) |
||||
|
) |
||||
|
|
||||
|
def write_file(self, fp, **format_data): |
||||
|
default_data = {"indent": " " * 2, "host_seperator": "\n" * 2} |
||||
|
default_data.update(format_data) |
||||
|
text = self.format(**default_data) |
||||
|
|
||||
|
if isinstance(fp, (str, bytes, Path)): |
||||
|
with open(fp, "w") as file: |
||||
|
file.write(text) |
||||
|
else: |
||||
|
text = fp.write(text) |
||||
@ -0,0 +1,34 @@ |
|||||
|
from ssh_config_utils.serializer import serialize_host, get_proper_name |
||||
|
from ssh_config_utils.parser import CamelCase_to_snake, KEYWORDS_LOWER_TRANSLATE |
||||
|
|
||||
|
|
||||
|
class Host: |
||||
|
def __init__(self, name, options): |
||||
|
self.options = options |
||||
|
self.name = name |
||||
|
|
||||
|
def __getattr__(self, name): |
||||
|
|
||||
|
if name in {"name", "options"}: |
||||
|
return super().__getattribute__(name) |
||||
|
return self.options[name] |
||||
|
|
||||
|
def __setattr__(self, name, value): |
||||
|
|
||||
|
if name in {"options", "name"}: |
||||
|
super().__setattr__(name, value) |
||||
|
else: |
||||
|
|
||||
|
proper_name = get_proper_name(name) |
||||
|
self.options[CamelCase_to_snake(proper_name)] = value |
||||
|
|
||||
|
def __str__(self): |
||||
|
return serialize_host(self.name, self.options, dict(indent=" " * 2)) |
||||
|
|
||||
|
def format(self, **format_data): |
||||
|
return serialize_host(self.name, self.options, format_data) |
||||
|
|
||||
|
|
||||
|
class GlobalHost(Host): |
||||
|
def __init__(self, options): |
||||
|
super().__init__("*", options) |
||||
@ -0,0 +1,40 @@ |
|||||
|
import os |
||||
|
import re |
||||
|
import shlex |
||||
|
|
||||
|
with open( |
||||
|
os.path.join(os.path.dirname(__file__), "client_keywords.txt") |
||||
|
) as keywords_file: |
||||
|
KEYWORDS = list(filter(bool, keywords_file.read().split("\n"))) |
||||
|
KEYWORDS_LOWER = list(map(str.lower, KEYWORDS)) |
||||
|
KEYWORDS_LOWER_TRANSLATE = dict(zip(KEYWORDS_LOWER, KEYWORDS)) |
||||
|
|
||||
|
|
||||
|
HOST_SPLITTER = re.compile( |
||||
|
r"host(?:.(?!\bhost\b))+", flags=re.MULTILINE | re.DOTALL | re.IGNORECASE |
||||
|
) |
||||
|
KEY_VALUE = re.compile(r"(?P<key>\S+)(?:[ \t]+|[ \t]*=[ \t]*)(?P<values>.*)") |
||||
|
|
||||
|
|
||||
|
def CamelCase_to_snake(text): # noqa |
||||
|
return "_".join(map(str.lower, re.findall(r"[A-Z][a-z]*", text))) |
||||
|
|
||||
|
|
||||
|
def parse_config_text(text): |
||||
|
text_no_leading_whitespace = re.sub(r"^[ \t]+", "", text, flags=re.MULTILINE) |
||||
|
text_no_comments = re.sub( |
||||
|
r"^\#.*", "", text_no_leading_whitespace, flags=re.MULTILINE |
||||
|
) |
||||
|
hosts = HOST_SPLITTER.finditer(text_no_comments) |
||||
|
for host in hosts: |
||||
|
host_dict = {} |
||||
|
for line in host.group(0).split("\n"): |
||||
|
match = KEY_VALUE.match(line) |
||||
|
if match: |
||||
|
key = match.group("key").lower() |
||||
|
if key in KEYWORDS_LOWER_TRANSLATE.keys(): |
||||
|
snake_case_key = CamelCase_to_snake(KEYWORDS_LOWER_TRANSLATE[key]) |
||||
|
if snake_case_key not in host_dict.keys(): |
||||
|
value = shlex.split(match.group("values")) |
||||
|
host_dict[snake_case_key] = value |
||||
|
yield host_dict |
||||
@ -0,0 +1,31 @@ |
|||||
|
from ssh_config_utils.parser import KEYWORDS_LOWER_TRANSLATE |
||||
|
|
||||
|
WHITESPACE = (" ", "\t") |
||||
|
|
||||
|
|
||||
|
def get_proper_name(name): |
||||
|
return KEYWORDS_LOWER_TRANSLATE[name.lower().replace("_", "")] |
||||
|
|
||||
|
|
||||
|
def quote(term): |
||||
|
|
||||
|
term = str(term) |
||||
|
if any(thing in term for thing in WHITESPACE): |
||||
|
term = '"{}"'.format(term) |
||||
|
return term |
||||
|
|
||||
|
|
||||
|
def serialize_host(name, data, format_data): |
||||
|
lines = ["Host {}".format(name)] |
||||
|
for key, value in sorted(data.items(), key=lambda item: item[0]): |
||||
|
|
||||
|
if isinstance(value, list): |
||||
|
for index in range(len(value)): |
||||
|
value[index] = quote(value[index]) |
||||
|
value = " ".join(value) |
||||
|
else: |
||||
|
value = quote(value) |
||||
|
lines.append( |
||||
|
"{}{} {}".format(format_data["indent"], get_proper_name(key), value) |
||||
|
) |
||||
|
return "\n".join(lines) |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue