Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_firewall_receive(app_firewall, watcher):
# good event
packet = str.encode(dump_event(return_value(), 1))
app_firewall.protocol.add_buffer(packet)
assert watcher.wait('return_value')
# bad name
packet = str.encode(dump_event(Event.create('unallow_event'), 1))
app_firewall.protocol.add_buffer(packet)
assert watcher.wait('firewall_block')
# bad channel
event = return_value()
event.channels = ('prohibits_channel',)
packet = str.encode(dump_event(event, 1))
app_firewall.protocol.add_buffer(packet)
assert watcher.wait('firewall_block')
def started(self, component):
"""started Event handler
Setup a simple timer to fire every second.
"""
# Timer(seconds, event, persist=False)
Timer(1, Event.create("timerevent"),persist=True).register(self)
def started(self, component):
print("started")
Timer(20, Event.create("end")).register(self)
# Fire some test events
N = 3
for loop in range(1, N):
self.fire(ActionMessage(source="test_thing", headers=None, message={"who": "you"}))
Timer(1, ActionMessage(source="test_thing", headers=None, message={"who": "me"})).register(self)
Timer(2, ActionMessage(source="test_thing", headers=None, message={"who": "us"})).register(self)
def started(self, component):
x = yield self.call(Event.create("hello"))
print(x)
self.stop()
def started(self, component):
"""started Event handler
Setup 3 timers at 5, 1 and 3 seconds.
The 2nd two timers are persitent meaning that
they are fired repeatedly every 1 and 3 seconds
respectively.
"""
# Timer(seconds, event, persist=False)
Timer(5, Event.create("hello")).register(self)
Timer(1, Event.create("foo"), persist=True).register(self)
Timer(3, Event.create("bar"), persist=True).register(self)
def started(self, component):
# x = yield self.call(task(factorial, 10))
Timer(1, Event.create("foo"), persist=True).register(self)
self.fire(task(download_web_page, 'http://www.slickdeals.net')) # async
self.fire(task(download_web_page, 'http://www.google.com')) # async
self.fire(task(download_web_page, 'http://www.yahoo.com')) # async
def disconnected(self, event, *args, **kwargs):
# Wait a while then try to reconnect
LOG.info("We got disconnected, reconnect")
Timer(10, Event.create("reconnect")).register(self)
def started(self, component):
"""started Event handler
Setup 3 timers at 5, 1 and 3 seconds.
The 2nd two timers are persitent meaning that
they are fired repeatedly every 1 and 3 seconds
respectively.
"""
# Timer(seconds, event, persist=False)
Timer(5, Event.create("hello")).register(self)
Timer(1, Event.create("foo"), persist=True).register(self)
Timer(3, Event.create("bar"), persist=True).register(self)
def started(self, component):
Timer(1, Event.create("foo"), persist=True).register(self)
x = yield self.call(task(factorial, 10))
print("{0:d}".format(x.value))
self.stop()
def started(self, component):
"""started Event handler
Setup 3 timers at 5, 1 and 3 seconds.
The 2nd two timers are persitent meaning that
they are fired repeatedly every 1 and 3 seconds
respectively.
"""
# Timer(seconds, event, persist=False)
Timer(5, Event.create("hello")).register(self)
Timer(1, Event.create("foo"), persist=True).register(self)
Timer(3, Event.create("bar"), persist=True).register(self)