Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_work_if_secondary_source(setup, launcher):
A = TestDriver('A')
B = TestDriver('B', speed_bump=True)
C = TestDriver('C')
setup.add(A)
setup.add(B)
setup.add(C)
circus = CircusClient(endpoint=get_circusctl_endpoint(setup.name))
try:
launcher()
circus.call({
'command': "stop",
'properties': {
'name': 'B',
'waiting': True
}
})
launcher.copy_file('default', 'test', 20, A, C)
loop = BooleanLoop()
def test_velocity(setup, launcher):
A = TestDriver('A', velocity=0.1)
B = TestDriver('B', velocity=0.4)
C = TestDriver('C', velocity=0.8)
D = TestDriver('D', velocity=0.7)
setup.add(A)
setup.add(B)
setup.add(C)
setup.add(D)
circus = CircusClient(endpoint=get_circusctl_endpoint(setup.name))
try:
launcher()
circus.call({
'command': "stop",
'properties': {
'name': 'A',
'waiting': True
}
})
launcher.copy_file('default', 'test', 20, B, C, D)
loop = CounterLoop(2)
def __init__(self, endpoint):
self.endpoint = str(endpoint)
self.stats_endpoint = None
self.client = CircusClient(endpoint=self.endpoint)
self.connected = False
self.watchers = []
self.plugins = []
self.stats = defaultdict(list)
self.dstats = []
self.sockets = None
self.use_sockets = False
self.embed_httpd = False
def create_circus_client():
return CircusClient(timeout=15)
def create_circus_client():
return CircusClient(timeout=15)
def _running(self):
"""Return Brewpi instances running using suffix as filter"""
client = CircusClient(endpoint=self.circus_endpoint)
try:
call = client.call({"command": "list", "properties": {}})
except CallError:
self.log.error("Could not get running processes", exc_info=True)
return []
running_devices = [x for x in call['watchers'] if x.startswith(self.prefix)]
return running_devices
def client(self):
"""
Return an instance of the CircusClient with the endpoint defined by the controller endpoint, which
used the port that was written to the port file upon starting of the daemon
N.B. This is quite slow the first time it is run due to the import of zmq.ssh
in circus/utils.py in circus 0.15.0, which ultimately follows the import of CircusClient.
:return: CircusClient instance
"""
from circus.client import CircusClient
return CircusClient(endpoint=self.get_controller_endpoint(), timeout=self._DAEMON_TIMEOUT)
def _add_process(self, name):
"""Spawn a new brewpi.py process via circus, 'dev-' is appended to the name to
keep brewpi devices seperated from other devices
"""
client = CircusClient(endpoint=self.circus_endpoint)
proc_name = self.prefix + name
# https://github.com/circus-tent/circus/issues/927
proc_name = proc_name.lower()
try:
call = client.call(
{
"command": "add",
"properties": {
"cmd": self.command_tmpl % name,
"name": proc_name,
"options": {
"copy_env": True,
"stdout_stream": {
"class": "FileStream",
"filename": "%s/%s-stdout.log" % (self.logfilepath, proc_name),
},
def _rm_process(self, name):
client = CircusClient(endpoint=self.circus_endpoint)
# https://github.com/circus-tent/circus/issues/927
name = name.lower()
cmd_rm = {
"command": "rm",
"properties": {"name": name}
}
try:
call_rm = client.call(cmd_rm)
self.log.debug("_rm_device circus client call: %s", str(call_rm))
except CallError:
self.log.debug("Could not rm process: %s", name, exc_info=True)
def main():
logging.basicConfig()
# TODO, we should ask the server for its command list
commands = get_commands()
globalopts = parse_arguments(sys.argv[1:], commands)
if globalopts['endpoint'] is None:
globalopts['endpoint'] = os.environ.get('CIRCUSCTL_ENDPOINT',
DEFAULT_ENDPOINT_DEALER)
client = CircusClient(endpoint=globalopts['endpoint'],
timeout=globalopts['timeout'],
ssh_server=globalopts['ssh'],
ssh_keyfile=globalopts['ssh_keyfile'])
CircusCtl(client, commands).start(globalopts)