Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Formula
formula_factory = (lambda name, verbose:
DummyFormulaActor(name, {'id_pusher': pusher}, level_logger=verbose))
# Dispatcher
route_table = RouteTable()
route_table.dispatch_rule(HWPCReport, HWPCDispatchRule(getattr(HWPCDepthLevel, 'ROOT'), primary=True))
dispatcher = DispatcherActor('dispatcher', formula_factory, route_table,
level_logger=LOG_LEVEL)
# Puller
input_mongodb = MongoDB(DB_URI, 'MongoDB1', 'test_hwrep')
report_filter = Filter()
report_filter.filter(lambda msg: True, dispatcher)
puller = PullerActor("puller_mongodb", input_mongodb,
report_filter, HWPCModel(), stream_mode=stream_mode, level_logger=LOG_LEVEL)
supervisor.launch_actor(pusher)
supervisor.launch_actor(dispatcher)
supervisor.launch_actor(puller)
time.sleep(1)
os.kill(dispatcher.pid, signal.SIGKILL)
supervisor.join()
def puller(request, database, stream_mode):
"""
Setup and Teardown for managing a PullerActor
setup: create a PullerActor and start its process
teardown: terminate the PullerActor process
"""
dispatcher = DispatcherActor('dispatcher__', Mock(), Mock())
filt = Filter()
filt.filter(lambda msg: True, dispatcher)
puller_actor = PullerActor(
"test_puller_" + str(request.node.name),
database,
filt,
HWPCModel(),
stream_mode=stream_mode,
level_logger=LOG_LEVEL)
return puller_actor
- databases uri are titi
- databases db are tata
- databases collection are tutu and huhu
"""
args = { 'verbose': True, 'stream': True, 'input': {'mongodb': {'toto': {'model': 'hwpc_report', 'name': 'toto',
'uri': 'titi', 'db': 'tata', 'collection': 'tutu'},
'titi': {'model': 'hwpc_report', 'name': 'titi',
'uri': 'titi', 'db': 'tata', 'collection': 'huhu'}}}}
generator = PullerGenerator(None)
result = generator.generate(args)
assert len(result) == 2
assert 'toto' in result
puller = result['toto']
assert isinstance(puller, PullerActor)
assert puller.name == 'toto'
db = puller.state.database
assert isinstance(db, MongoDB)
assert db.uri == 'titi'
assert db.db_name == 'tata'
assert db.collection_name == 'tutu'
assert 'titi' in result
puller = result['titi']
assert isinstance(puller, PullerActor)
assert puller.name == 'titi'
db = puller.state.database
# Formula
formula_factory = (lambda name, verbose:
DummyFormulaActor(name, {'my_pusher': pusher}, level_logger=verbose))
# Dispatcher
route_table = RouteTable()
route_table.dispatch_rule(HWPCReport, HWPCDispatchRule(getattr(HWPCDepthLevel, 'SOCKET'), primary=True))
dispatcher = DispatcherActor('dispatcher', formula_factory, route_table, level_logger=LOG_LEVEL)
# Puller
input_mongodb = MongoDB(DB_URI, 'MongoDB1', 'test_hwrep')
report_filter = Filter()
report_filter.filter(lambda msg: True, dispatcher)
puller = PullerActor("puller_mongodb", input_mongodb, report_filter, HWPCModel(), level_logger=LOG_LEVEL)
supervisor.launch_actor(pusher)
supervisor.launch_actor(dispatcher)
supervisor.launch_actor(puller)
t = time.time()
number_of_output_reports = 0
for i in range(3):
time.sleep(0.2)
current = get_number_of_output_reports()
assert current >= number_of_output_reports
number_of_output_reports = current
time.sleep(0.1)
- puller name is toto
- puller database type is MongoDB
- database uri is titi
- database db is tata
- database collection is tutu
"""
args = {'verbose': True, 'stream': True, 'input': {'mongodb': {'toto': {'model': 'hwpc_report', 'name': 'toto', 'uri': 'titi',
'db': 'tata', 'collection': 'tutu'}}}}
generator = PullerGenerator(None)
result = generator.generate(args)
assert len(result) == 1
assert 'toto' in result
puller = result['toto']
assert isinstance(puller, PullerActor)
assert puller.name == 'toto'
db = puller.state.database
assert isinstance(db, MongoDB)
assert db.uri == 'titi'
assert db.db_name == 'tata'
assert db.collection_name == 'tutu'
def actor(self, fake_db, filt, fake_filter):
filter = fake_filter if filt is None else filt
return PullerActor('puller_test', fake_db, filter, 0, level_logger=logging.DEBUG)
# Dispatcher
route_table = RouteTable()
route_table.dispatch_rule(HWPCReport, HWPCDispatchRule(
getattr(HWPCDepthLevel, args.hwpc_dispatch_rule), primary=True))
dispatcher = DispatcherActor('dispatcher', formula_factory, route_table,
level_logger=args.verbose)
# Puller
input_mongodb = MongoDB(args.input_uri,
args.input_db, args.input_collection,
HWPCModel(), stream_mode=args.stream_mode)
report_filter = Filter()
report_filter.filter(lambda msg: True, dispatcher)
puller = PullerActor("puller_mongodb", input_mongodb,
report_filter, level_logger=args.verbose)
##########################################################################
# Actor start step
# Setup signal handler
def term_handler(_, __):
puller.send_kill()
dispatcher.send_kill()
pusher.send_kill()
exit(0)
signal.signal(signal.SIGTERM, term_handler)
signal.signal(signal.SIGINT, term_handler)
supervisor = BackendSupervisor(puller.state.stream_mode)
def join(self):
"""
wait until all actor are terminated
"""
# List the different kind of actor
for actor in self.supervised_actors:
if isinstance(actor, PullerActor):
self.pullers.append(actor)
elif isinstance(actor, DispatcherActor):
self.dispatchers.append(actor)
else:
self.pushers.append(actor)
if self.stream_mode:
self.join_stream_mode_on()
else:
self.join_stream_mode_off()
def _actor_factory(self, name, db, model, stream_mode, level_logger):
return PullerActor(name, db, self.report_filter, model, stream_mode, level_logger)