Source code for schneider.agent
"""schneider agent module
"""
from copy import copy
from os import access, chmod, environ, R_OK, makedirs, kill, unlink
from os.path import dirname, expanduser, isdir, join
from signal import SIGTERM
from subprocess import run, SubprocessError
from sys import exit, stderr
class AgentNotCreated(Exception):
"""No associated ssh-agent created beforehand"""
pass
[docs]class SSHAgent:
"""SSH Agent class
Manages an associated ssh-agent process and kann add keys to it.
Example::
agent = schneider.agent.SSHAgent("my_agent")
agent.create()
agent.add_key("~/.ssh/keys/my_key")
"""
def __init__(self, name, base_path="~/.ssh"):
"""Constructor.
:param: name - agents name
:param: base_path - used for ssh-agent files; defaults to `~./ssh`
"""
self.name = name
self.identity = None
self.path = None
# check for path
self.agent_file = join(expanduser(base_path), "agents", "agent-"+name)
self.auth_sock = None
self.pid = None
def __del__(self):
"""Destructor.
Kills the associated ssh-agent if necessary.
"""
if self.pid!=None:
self.destroy()
def __str__(self):
return f"Name: {self.name}\tPID: {self.pid}\tAgent: {self.agent_file}"
[docs] def create(self):
"""Create a new ssh agent process.
If the ssh-agent creation raises a `SubprocessError`, the method prints
an error message and exits with returncode 1.
:return: functional Agent instance
"""
# check if dir exists; if not, create it with restricted permissions
if not isdir(dirname(self.agent_file)):
makedirs(dirname(self.agent_file))
chmod(dirname(self.agent_file), 0o700)
try:
proc = run(["/usr/bin/env", "-i",
"/bin/sh", "-c",
"ssh-agent > {0}".format(self.agent_file)])
proc.check_returncode()
except SubprocessError as err:
print("cannot create {}".format(self.agent_file))
exit(1)
else:
# read details from agent file
self.__read_agent_file()
def __read_agent_file(self):
if access(self.agent_file, R_OK):
with open(self.agent_file, "r") as handle:
for line in handle:
parts = line.split("=")
if len(parts)>1:
if parts[0] == "SSH_AUTH_SOCK":
self.auth_sock = parts[1].split(";")[0]
elif parts[0] == "SSH_AGENT_PID":
self.pid = int(parts[1].split(";")[0])
else:
print("unknown line in agent file: {}. ignoring".format(line))
exit(2)
else:
# does not need the echo line in the file
pass
[docs] def destroy(self):
"""Destroy the related ssh-agent process.
Will be called automatically at instance deletion.
"""
kill(self.pid, SIGTERM)
unlink(self.agent_file)
self.pid = None
self.auth_sock = None
[docs] def add_key(self, path):
"""Add a new key to the ssh-agent process."""
# check if agent was "created" before
if self.auth_sock == None:
raise AgentNotCreated(self)
# `/bin/sh -c ". <agent file> >/dev/null 2>/dev/null; ssh-add key_file"`
env = copy(environ)
env["SSH_AUTH_SOCK"] = self.auth_sock
env["SSH_AGENT_PID"] = str(self.pid)
try:
proc = run(["ssh-add", expanduser(path)], env=env)
proc.check_returncode()
except (SubprocessError, FileNotFoundError) as err:
print("cannot add key from {}".format(path), file=stderr)
print("killing ssh-agent", file=stderr)
self.destroy()
exit(2)
[docs] def env(self):
"""Return the environment variables for this ssh-agent process.
Raises `AgentNotCreated` if no ssh-agent was created before.
:return: Dictionary with `SSH_AUTH_SOCK` and `SSH_AGENT_PID`
:rtype: dict
"""
if self.pid == None:
raise AgentNotCreated(self)
else:
return {
"SSH_AUTH_SOCK": self.auth_sock,
"SSH_AGENT_PID": str(self.pid),
}