Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setup(self):
self.push_socket = SafeContext.get_context().socket(zmq.PUSH)
self.push_socket.connect(self.addr)
self.add_handler(PoisonPillMessage, PoisonPillMessageHandler())
self.add_handler(StartMessage, StartHandler())
self.add_handler(Report, HWPCReportHandler(self.push_socket))
def test_send_message_on_data_canal_to_non_initialized_actor_raise_NotConnectedException(self, actor):
with pytest.raises(NotConnectedException):
actor.send_data(StartMessage())
# Define DispatcherState
dispatcher_state = DispatcherState(Mock(), None, None)
assert dispatcher_state.initialized is False
# Define StartHandler
start_handler = StartHandler(dispatcher_state)
# Test Random message when state is not initialized
to_send = [OKMessage(), ErrorMessage("Error"),
HWPCReport("test", "test", "test", {})]
for msg in to_send:
start_handler.handle(msg)
assert dispatcher_state.initialized is False
# Try to initialize the state
start_handler.handle(StartMessage())
assert dispatcher_state.initialized is True
def started_actor(self, init_actor, fake_db):
init_actor.send_control(StartMessage())
# remove OkMessage from control socket
_ = init_actor.receive_control(2000)
# remove 'connected' string from Queue
_ = fake_db.q.get(timeout=2)
return init_actor
test to launch this actor when the supervisor already supervise another
actor and when it supervise no actors
Test if :
- the size of the supervised actor list is increase by one
- the new actor was append to the list
- the new actor has receive a StartMessage
"""
list_length = len(supervisor.supervised_actors)
actor = FakeActor()
supervisor.launch_actor(actor)
assert len(supervisor.supervised_actors) == list_length + 1
assert actor in supervisor.supervised_actors
assert len(actor.send_msg) == 1
assert isinstance(actor.send_msg.pop(), StartMessage)
def test_send_StartMessage_answer_OkMessage(self, init_actor):
init_actor.send_control(StartMessage())
msg = init_actor.receive_control(2000)
print(msg)
assert isinstance(msg, OKMessage)
def test_send_message_on_control_canal_to_non_initialized_actor_raise_NotConnectedException(self, actor):
with pytest.raises(NotConnectedException):
actor.send_control(StartMessage())
:param boolean startMessage: True a StartMessage need to be sent to
this actor
:raise: zmq.error.ZMQError if a communication error occurs
:raise: powerapi.actor.ActorInitError if the actor crash during the
initialisation process
"""
if actor.is_alive():
raise ActorAlreadyLaunchedException()
actor.start()
actor.connect_control()
actor.connect_data()
if start_message:
actor.send_control(StartMessage())
msg = actor.receive_control(2000)
if isinstance(msg, ErrorMessage):
raise ActorInitError(msg.error_message)
elif msg is None:
if actor.is_alive():
actor.terminate()
raise FailConfigureError("Unable to configure the " + actor.name)
else:
raise CrashConfigureError("The " + actor.name + " crash during initialisation process")
self.supervised_actors.append(actor)
def setup(self):
"""
Define StartMessage, PoisonPillMessage handlers and a handler for
each report type
"""
self.add_handler(PoisonPillMessage, PusherPoisonPillMessageHandler(self.state))
self.add_handler(self.state.report_model.get_type(), ReportHandler(self.state, self.delay, self.max_size))
self.add_handler(StartMessage, PusherStartHandler(self.state))
def setup(self):
"""
Define StartMessage handler and PoisonPillMessage handler
"""
self.add_handler(PoisonPillMessage, PullerPoisonPillMessageHandler(self.state))
self.add_handler(StartMessage, PullerStartHandler(self.state, 0.1))