Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Scenario:
- Launch the full architecture
Test if:
- each 50 ms, reports are writen in the output database
"""
# Pusher
output_mongodb = MongoDB(DB_URI, 'MongoDB1', 'test_result')
pusher = PusherActor("pusher_mongodb", PowerModel(), output_mongodb, level_logger=LOG_LEVEL)
# Formula
formula_factory = (lambda name, verbose:
DummyFormulaActor(name, {'my_pusher': pusher}, level_logger=verbose))
# Dispatcher
route_table = RouteTable()
route_table.dispatch_rule(HWPCReport, HWPCDispatchRule(getattr(HWPCDepthLevel, 'SOCKET'), primary=True))
dispatcher = DispatcherActor('dispatcher', formula_factory, route_table, level_logger=LOG_LEVEL)
# Puller
input_mongodb = MongoDB(DB_URI, 'MongoDB1', 'test_hwrep')
report_filter = Filter()
report_filter.filter(lambda msg: True, dispatcher)
puller = PullerActor("puller_mongodb", input_mongodb, report_filter, HWPCModel(), level_logger=LOG_LEVEL)
supervisor.launch_actor(pusher)
supervisor.launch_actor(dispatcher)
supervisor.launch_actor(puller)
t = time.time()
def actor(self, dispatch_rules, formula_queue):
route_table = RouteTable()
for report_type, gbr in dispatch_rules:
route_table.dispatch_rule(report_type, gbr)
return DispatcherActor('dispatcher_test', lambda name, verbose: FakeActor(name, [], verbose, queue=formula_queue),
route_table, level_logger=logging.DEBUG)
'input': {'csv': {'puller' : {'files': FILES,
'model': 'HWPCReport',
'name': 'puller',
}}},
'output': {'csv': {'pusher': {'model': 'PowerReport', 'name': 'pusher', 'directory': ROOT_PATH}}}}
# Pusher
pusher_generator = PusherGenerator()
pushers = pusher_generator.generate(config)
# Formula
formula_factory = (lambda name,
verbose: DummyFormulaActor(name, pushers, level_logger=config['verbose']))
# Dispatcher
route_table = RouteTable()
route_table.dispatch_rule(HWPCReport,
HWPCDispatchRule(getattr(HWPCDepthLevel, 'SOCKET'), primary=True))
dispatcher = DispatcherActor('dispatcher', formula_factory, route_table,
level_logger=LOG_LEVEL)
# Puller
report_filter = Filter()
report_filter.filter(lambda msg: True, dispatcher)
puller_generator = PullerGenerator(report_filter)
pullers = puller_generator.generate(config)
for _, pusher in pushers.items():
supervisor.launch_actor(pusher)
supervisor.launch_actor(dispatcher)
def init_state():
""" return a fresh dispatcher state """
return DispatcherState(Mock(), lambda formula_id: Mock(), RouteTable())
signal.signal(signal.SIGTERM, term_handler)
signal.signal(signal.SIGINT, term_handler)
stream_mode = True
supervisor = BackendSupervisor(stream_mode)
# Pusher
output_mongodb = MongoDB(DB_URI, 'MongoDB1', 'test_result')
pusher = PusherActor("pusher_mongodb", PowerModel(), output_mongodb, level_logger=LOG_LEVEL)
# Formula
formula_factory = (lambda name, verbose:
DummyFormulaActor(name, {'id_pusher': pusher}, level_logger=verbose))
# Dispatcher
route_table = RouteTable()
route_table.dispatch_rule(HWPCReport, HWPCDispatchRule(getattr(HWPCDepthLevel, 'ROOT'), primary=True))
dispatcher = DispatcherActor('dispatcher', formula_factory, route_table,
level_logger=LOG_LEVEL)
# Puller
input_mongodb = MongoDB(DB_URI, 'MongoDB1', 'test_hwrep')
report_filter = Filter()
report_filter.filter(lambda msg: True, dispatcher)
puller = PullerActor("puller_mongodb", input_mongodb,
report_filter, HWPCModel(), stream_mode=stream_mode, level_logger=LOG_LEVEL)
supervisor.launch_actor(pusher)
supervisor.launch_actor(dispatcher)
supervisor.launch_actor(puller)
time.sleep(1)
def route_table_with_primary_rule():
"""
return a RouteTable with :
- a HWPCGrouptBy rule as primary rule
"""
route_table = RouteTable()
route_table.dispatch_rule(HWPCReport, HWPCDispatchRule(HWPCDepthLevel.ROOT,
primary=True))
return route_table
def pytest_generate_tests(metafunc):
"""
Function called by pytest when collecting a test_XXX function
define the route_table fixtures in test environement with collected the
value _route_table if it exist or with an empty RouteTable
:param metafunc: the test context given by pytest
"""
if 'route_table' in metafunc.fixturenames:
route_table = getattr(metafunc.function, '_route_table', None)
if isinstance(route_table, RouteTable):
metafunc.parametrize('route_table', [route_table])
elif route_table == 'all':
metafunc.parametrize('route_table',
[route_table_no_hwpc(),
route_table_hwpc_not_primary(),
route_table_with_primary_rule()])
else:
metafunc.parametrize('route_table', [RouteTable()])
if 'formula_init_msg' in metafunc.fixturenames:
formula_init_msg = getattr(metafunc.function, '_formula_init_msg', None)
if isinstance(formula_init_msg, Report):
metafunc.parametrize('formula_init_msg', [[formula_init_msg]])
elif formula_init_msg == 'all':
metafunc.parametrize('formula_init_msg',
[[], [gen_good_report()]])
def route_table_no_hwpc():
"""
return a RouteTable with :
- a FakeGBR as primary rule
"""
route_table = RouteTable()
route_table.dispatch_rule(FakeReport, FakeGBR(primary=True))
return route_table
##########################################################################
# Actor Creation
# Pusher
output_mongodb = MongoDB(args.output_uri,
args.output_db, args.output_collection,
HWPCModel())
pusher = PusherActor("pusher_mongodb", PowerReport, output_mongodb,
level_logger=args.verbose)
# Formula
formula_factory = (lambda name, verbose:
RAPLFormulaActor(name, pusher, level_logger=verbose))
# Dispatcher
route_table = RouteTable()
route_table.dispatch_rule(HWPCReport, HWPCDispatchRule(
getattr(HWPCDepthLevel, args.hwpc_dispatch_rule), primary=True))
dispatcher = DispatcherActor('dispatcher', formula_factory, route_table,
level_logger=args.verbose)
# Puller
input_mongodb = MongoDB(args.input_uri,
args.input_db, args.input_collection,
HWPCModel(), stream_mode=args.stream_mode)
report_filter = Filter()
report_filter.filter(lambda msg: True, dispatcher)
puller = PullerActor("puller_mongodb", input_mongodb,
report_filter, level_logger=args.verbose)
##########################################################################