Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Setup signal handler
def term_handler(_, __):
puller.hard_kill()
dispatcher.hard_kill()
pusher.hard_kill()
exit(0)
signal.signal(signal.SIGTERM, term_handler)
signal.signal(signal.SIGINT, term_handler)
stream_mode = True
supervisor = BackendSupervisor(stream_mode)
# Pusher
output_mongodb = MongoDB(DB_URI, 'MongoDB1', 'test_result')
pusher = PusherActor("pusher_mongodb", PowerModel(), output_mongodb, level_logger=LOG_LEVEL)
# Formula
formula_factory = (lambda name, verbose:
DummyFormulaActor(name, {'id_pusher': pusher}, level_logger=verbose))
# Dispatcher
route_table = RouteTable()
route_table.dispatch_rule(HWPCReport, HWPCDispatchRule(getattr(HWPCDepthLevel, 'ROOT'), primary=True))
dispatcher = DispatcherActor('dispatcher', formula_factory, route_table,
level_logger=LOG_LEVEL)
# Puller
input_mongodb = MongoDB(DB_URI, 'MongoDB1', 'test_hwrep')
report_filter = Filter()
report_filter.filter(lambda msg: True, dispatcher)
# Save one time with a file that doesn't exist
power_reports.append(gen_power_report())
csvdb.save(power_reports[0], PowerModel())
# Save three time
for _ in range(3):
power_reports.append(gen_power_report())
csvdb.save(power_reports[-1], PowerModel())
# Read the the csvdb and compare the data
reading_power_reports = []
csvdb_read = CsvDB(current_path=PATH_TO_SAVE)
csvdb_read.add_file(PATH_TO_SAVE + SENSOR + "-" + TARGET + "/PowerReport.csv")
csvdb_read.connect()
csvdb_read_iter = csvdb_read.iter(PowerModel(), False)
for _ in range(4):
reading_power_reports.append(next(csvdb_read_iter))
with pytest.raises(StopIteration) as pytest_wrapped:
next(csvdb_read_iter)
assert pytest_wrapped.type == StopIteration
for i in range(4):
assert power_reports[i] == reading_power_reports[i]
def test_csvdb_all_reports(clean_csv_files):
"""
Test create/save/read all kind of reports
"""
all_reports = [(PowerModel(), gen_power_report),
(HWPCModel(), gen_hwpc_report)]
for model, generator in all_reports:
# Load DB
csvdb = CsvDB(current_path=PATH_TO_SAVE)
csvdb.connect()
# Create report
report = generator()
# Save report
csvdb.save(report, model)
# Read report
for r, d, f in os.walk(PATH_TO_SAVE + report.sensor + "-" + report.target + "/"):
for file in f:
def test_write_many_report_in_non_empty_db(init_database_with_one_report, database):
"""
call the save_many method with One PowerReport
test if the report was writen in the database
"""
database.connect()
database.save_many([POWER_REPORT_1, POWER_REPORT_2], PowerModel())
check_db_reports(init_database_with_one_report, [POWER_REPORT_0, POWER_REPORT_1, POWER_REPORT_2])
- 1 pusher (connected to MongoDB1 [collection test_result]
MongoDB1 content :
- 30 HWPC repport with two socket and one RAPL_EVENT
Scenario:
- Launch the full architecture
Test if:
- after powerapi handle all the reports, test if all the reports was handeled
by the formula and pushed to the mongo database and if no report were lost
"""
# Pusher
output_mongodb = MongoDB(DB_URI, 'MongoDB1', 'test_result')
pusher = PusherActor("pusher_mongodb", PowerModel(), output_mongodb, level_logger=LOG_LEVEL)
# Formula
formula_factory = (lambda name, verbose:
DummyFormulaActor(name, {'my_pusher': pusher}, level_logger=verbose, sleep_time=0.1))
# DummyFormulaActor(name, {'my_pusher': pusher}, level_logger=verbose))
# Dispatcher
route_table = RouteTable()
route_table.dispatch_rule(HWPCReport, HWPCDispatchRule(getattr(HWPCDepthLevel, 'SOCKET'), primary=True))
dispatcher = DispatcherActor('dispatcher', formula_factory, route_table, level_logger=LOG_LEVEL)
# Puller
input_mongodb = MongoDB(DB_URI, 'MongoDB1', 'test_hwrep')
report_filter = Filter()
report_filter.filter(lambda msg: True, dispatcher)
def test_write_one_bad_report_in_empty_db(init_empty_database, database):
"""
call the save method with One PowerReport with no sensor field
test if a BadInputData is raise and the database is still empty
"""
database.connect()
with pytest.raises(BadInputData):
database.save(BAD_POWER_REPORT, PowerModel())
assert get_all_reports(init_empty_database, INFLUX_DBNAME) == []
def test_write_one_bad_report_in_non_empty_db(init_database_with_one_report, database):
"""
call the save method with One PowerReport with no sensor field
test if a BadInputData is raise and the database is still empty
"""
database.connect()
with pytest.raises(BadInputData):
database.save(BAD_POWER_REPORT, PowerModel())
check_db_reports(init_database_with_one_report, [POWER_REPORT_0])
def test_write_one_report_in_empty_db(init_empty_database, database):
"""
call the save method with One PowerReport
test if the report was writen in the database
"""
database.connect()
database.save(POWER_REPORT_1, PowerModel())
check_db_reports(init_empty_database, [POWER_REPORT_1])
@define_report_model(PowerModel())
def test_pusher_create_ok(started_pusher):
"""
Create a PusherActor with a good configuration
"""
assert is_actor_alive(started_pusher)
# Connect data and send a PowerReport
initialized_pusher.connect_data()
saved_report = gen_power_report()
initialized_pusher.send_data(saved_report)
# wait for the message is sending to the pusher
time.sleep(0.5)
# Kill it
initialized_pusher.soft_kill()
assert not is_actor_alive(initialized_pusher, 5)
# Open a database for read the saved report
mongodb = initialized_pusher.state.database
mongodb.connect()
mongodb_iter = mongodb.iter(PowerModel(), False)
new_report = next(mongodb_iter)
assert saved_report == new_report