Source code for schneider.agent

"""schneider agent module


"""

from copy import copy
from os import access, chmod, environ, R_OK, makedirs, kill, unlink
from os.path import dirname, expanduser, isdir, join
from signal import SIGTERM
from subprocess import run, SubprocessError
from sys import exit, stderr


class AgentNotCreated(Exception):
    """No associated ssh-agent created beforehand"""

    pass


[docs]class SSHAgent: """SSH Agent class Manages an associated ssh-agent process and kann add keys to it. Example:: agent = schneider.agent.SSHAgent("my_agent") agent.create() agent.add_key("~/.ssh/keys/my_key") """ def __init__(self, name, base_path="~/.ssh"): """Constructor. :param: name - agents name :param: base_path - used for ssh-agent files; defaults to `~./ssh` """ self.name = name self.identity = None self.path = None # check for path self.agent_file = join(expanduser(base_path), "agents", "agent-"+name) self.auth_sock = None self.pid = None def __del__(self): """Destructor. Kills the associated ssh-agent if necessary. """ if self.pid!=None: self.destroy() def __str__(self): return f"Name: {self.name}\tPID: {self.pid}\tAgent: {self.agent_file}"
[docs] def create(self): """Create a new ssh agent process. If the ssh-agent creation raises a `SubprocessError`, the method prints an error message and exits with returncode 1. :return: functional Agent instance """ # check if dir exists; if not, create it with restricted permissions if not isdir(dirname(self.agent_file)): makedirs(dirname(self.agent_file)) chmod(dirname(self.agent_file), 0o700) try: proc = run(["/usr/bin/env", "-i", "/bin/sh", "-c", "ssh-agent > {0}".format(self.agent_file)]) proc.check_returncode() except SubprocessError as err: print("cannot create {}".format(self.agent_file)) exit(1) else: # read details from agent file self.__read_agent_file()
def __read_agent_file(self): if access(self.agent_file, R_OK): with open(self.agent_file, "r") as handle: for line in handle: parts = line.split("=") if len(parts)>1: if parts[0] == "SSH_AUTH_SOCK": self.auth_sock = parts[1].split(";")[0] elif parts[0] == "SSH_AGENT_PID": self.pid = int(parts[1].split(";")[0]) else: print("unknown line in agent file: {}. ignoring".format(line)) exit(2) else: # does not need the echo line in the file pass
[docs] def destroy(self): """Destroy the related ssh-agent process. Will be called automatically at instance deletion. """ kill(self.pid, SIGTERM) unlink(self.agent_file) self.pid = None self.auth_sock = None
[docs] def add_key(self, path): """Add a new key to the ssh-agent process.""" # check if agent was "created" before if self.auth_sock == None: raise AgentNotCreated(self) # `/bin/sh -c ". <agent file> >/dev/null 2>/dev/null; ssh-add key_file"` env = copy(environ) env["SSH_AUTH_SOCK"] = self.auth_sock env["SSH_AGENT_PID"] = str(self.pid) try: proc = run(["ssh-add", expanduser(path)], env=env) proc.check_returncode() except (SubprocessError, FileNotFoundError) as err: print("cannot add key from {}".format(path), file=stderr) print("killing ssh-agent", file=stderr) self.destroy() exit(2)
[docs] def env(self): """Return the environment variables for this ssh-agent process. Raises `AgentNotCreated` if no ssh-agent was created before. :return: Dictionary with `SSH_AUTH_SOCK` and `SSH_AGENT_PID` :rtype: dict """ if self.pid == None: raise AgentNotCreated(self) else: return { "SSH_AUTH_SOCK": self.auth_sock, "SSH_AGENT_PID": str(self.pid), }