Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if counts != {'ps': 1, 'sshd': 1}:
assert 0, (
'Docker container %r contained extra running processes '
'after test completed: %r' % (
self.container_name,
counts
)
)
def close(self):
args = ['docker', 'rm', '-f', self.container_name]
subprocess__check_output(args)
class BrokerMixin(object):
broker_class = mitogen.master.Broker
broker_shutdown = False
def setUp(self):
super(BrokerMixin, self).setUp()
self.broker = self.broker_class()
def tearDown(self):
if not self.broker_shutdown:
self.broker.shutdown()
self.broker.join()
del self.broker
super(BrokerMixin, self).tearDown()
def sync_with_broker(self):
sync_with_broker(self.broker)
def wrapper(func):
if func.__module__ != '__main__':
return func
import mitogen.parent
import mitogen.utils
if profiling:
mitogen.core.enable_profiling()
mitogen.master.Router.profiling = profiling
mitogen.utils.log_to_file(level=log_level)
return mitogen.core._profile_hook(
'app.main',
mitogen.utils.run_with_router,
func,
)
' Original '
' '
' Minimized '
' '
' Compressed '
)
for mod in (
mitogen.parent,
mitogen.fork,
mitogen.ssh,
mitogen.sudo,
mitogen.select,
mitogen.service,
mitogen.fakessh,
mitogen.master,
):
original = inspect.getsource(mod)
original_size = len(original)
minimized = mitogen.minify.minimize_source(original)
minimized_size = len(minimized)
compressed = zlib.compress(minimized, 9)
compressed_size = len(compressed)
print(
'%-25s'
' '
'%5i %4.1fKiB'
' '
'%5i %4.1fKiB %.1f%%'
' '
'%5i %4.1fKiB %.1f%%'
% (