Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import inspect
import sys
import zlib
import mitogen.fakessh
import mitogen.fork
import mitogen.master
import mitogen.minify
import mitogen.parent
import mitogen.select
import mitogen.service
import mitogen.ssh
import mitogen.sudo
router = mitogen.master.Router()
context = mitogen.parent.Context(router, 0)
stream = mitogen.ssh.Stream(router, 0, max_message_size=0, hostname='foo')
print('SSH command size: %s' % (len(' '.join(stream.get_boot_command())),))
print('Preamble size: %s (%.2fKiB)' % (
len(stream.get_preamble()),
len(stream.get_preamble()) / 1024.0,
))
if '--dump' in sys.argv:
print(zlib.decompress(stream.get_preamble()))
exit()
print(
' '
' '
' Original '
elif state == 'in_plain':
line, nl, buf = bytes_partition(buf, b('\n'))
yield line + nl, not (nl or buf)
if nl:
state = 'start_of_line'
class PasswordError(mitogen.core.StreamError):
pass
class HostKeyError(mitogen.core.StreamError):
pass
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = False
#: Default to whatever is available as 'python' on the remote machine,
#: overriding sys.executable use.
python_path = 'python'
#: Number of -v invocations to pass on command line.
ssh_debug_level = 0
#: The path to the SSH binary.
ssh_path = 'ssh'
hostname = None
username = None
port = None
def _convert_exit_status(status):
"""
Convert a :func:`os.waitpid`-style exit status to a :mod:`subprocess` style
exit status.
"""
if os.WIFEXITED(status):
return os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
return -os.WTERMSIG(status)
elif os.WIFSTOPPED(status):
return -os.WSTOPSIG(status)
class Process(mitogen.parent.Process):
def poll(self):
try:
pid, status = os.waitpid(self.pid, os.WNOHANG)
except OSError:
e = sys.exc_info()[1]
if e.args[0] == errno.ECHILD:
LOG.warn('%r: waitpid(%r) produced ECHILD', self, self.pid)
return
raise
if not pid:
return
return _convert_exit_status(status)
class Options(mitogen.parent.Options):
try:
proc = subprocess.Popen(
args=args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
except OSError:
e = sys.exc_info()[1]
raise Error('could not execute %s: %s', argv, e)
output, _ = proc.communicate()
if not proc.returncode:
return output.decode('utf-8', 'replace')
raise Error("%s exitted with status %d: %s",
mitogen.parent.Argv(args), proc.returncode, output)
def create_child(self, args):
return mitogen.parent.create_child(args, preexec_fn=self.preexec_fn)
def get_boot_command(self):
argv = mitogen.parent.Argv(super(Stream, self).get_boot_command())
return [self.su_path, self.username, '-c', str(argv)]
def _configure_context(cls, econtext):
mitogen.parent.upgrade_router(econtext)
econtext.debugger = cls(econtext.router)
def upgrade(self):
self.id_allocator = IdAllocator(self)
self.responder = ModuleResponder(self)
self.log_forwarder = LogForwarder(self)
self.route_monitor = mitogen.parent.RouteMonitor(router=self)
self.add_handler( # TODO: cutpaste.
fn=self._on_detaching,
handle=mitogen.core.DETACHING,
persist=True,
)
container = None
username = None
buildah_path = 'buildah'
def __init__(self, container=None, buildah_path=None, username=None,
**kwargs):
super(Options, self).__init__(**kwargs)
assert container is not None
self.container = container
if buildah_path:
self.buildah_path = buildah_path
if username:
self.username = username
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = False
# TODO: better way of capturing errors such as "No such container."
create_child_args = {
'merge_stdio': True
}
def _get_name(self):
return u'buildah.' + self.options.container
def get_boot_command(self):
args = [self.options.buildah_path, 'run']
if self.options.username:
args += ['--user=' + self.options.username]
args += ['--', self.options.container]
def _accept_client(self, sock):
sock.setblocking(True)
try:
pid, = struct.unpack('>L', sock.recv(4))
except (struct.error, socket.error):
LOG.error('%r: failed to read remote identity: %s',
self, sys.exc_info()[1])
return
context_id = self._router.id_allocator.allocate()
context = mitogen.parent.Context(self._router, context_id)
stream = mitogen.core.Stream(self._router, context_id)
stream.name = u'unix_client.%d' % (pid,)
stream.auth_id = mitogen.context_id
stream.is_privileged = True
try:
sock.send(struct.pack('>LLL', context_id, mitogen.context_id,
os.getpid()))
except socket.error:
LOG.error('%r: failed to assign identity to PID %d: %s',
self, pid, sys.exc_info()[1])
return
LOG.debug('%r: accepted %r', self, stream)
stream.accept(sock.fileno(), sock.fileno())
self._router.register(context, stream)