Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@event_handler('service', 'event1')
def handle_event1(self, event_data):
self.upstream_rpc.method()
@event_handler('service', 'event')
def handle(self, event_data):
time.sleep(9) # >2x heartbeat
tracker(event_data)
@event_handler("service", "event")
@rpc
def echo(self, arg):
return arg # pragma: no cover
@event_handler('foo-service', 'spam')
def handle_spam(self, evt_data):
ham = self.foo_service.spam(evt_data)
handle_spam_called(ham)
@event_handler("orders_service", "order_created")
@statsd.timer('request_reservation')
def place_order(self, payload):
print("service {} received: {} ... placing order to exchange".format(
self.name, payload))
# place order in stock exchange
exchange_resp = self.__place_order_exchange(payload)
# event: emit order placed event
self.__create_event("order_placed", payload)
return json.dumps({'exchange_response': exchange_resp})
@event_handler("eventgen_controller", "{}_setup".format(eventgen_name), handler_type=BROADCAST,
reliable_delivery=False)
def event_handler_setup(self, payload):
return self.setup(data=payload)
@event_handler("eventgen_controller", "all_index", handler_type=BROADCAST, reliable_delivery=False)
def event_handler_all_index(self, payload):
return self.index()
@event_handler("eventgen_controller", "{}_set_volume".format(eventgen_name), handler_type=BROADCAST,
reliable_delivery=False)
def event_handler_set_volume(self, payload):
if payload['perDayVolume']:
return self.set_volume(payload['perDayVolume'])
@event_handler("market_service", "order_placed")
@statsd.timer('place_order')
def handle_place_order(self, payload):
print("[{}] {} received order_placed event ... updating order to placed".format(
payload, self.name))
@event_handler("eventgen_server", "server_volume", handler_type=BROADCAST, reliable_delivery=False)
def event_handler_get_volume(self, payload):
return self.receive_volume(payload)