Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from powerapi.database import InfluxDB, CantConnectToInfluxDBException
from powerapi.report_model import PowerModel, BadInputData
from powerapi.report import PowerReport
from tests.influx_utils import generate_power_report, create_empty_db
from tests.influx_utils import create_non_empty_db, delete_db, get_all_reports
INFLUX_URI = 'localhost'
INFLUX_PORT = 8086
INFLUX_DBNAME = 'unit_test'
SENSOR_NAME = 'sensor_test'
TARGET_NAME = 'target_test'
POWER_REPORT_0 = PowerReport(datetime.datetime.fromtimestamp(0), SENSOR_NAME,
TARGET_NAME, 100, {})
POWER_REPORT_1 = PowerReport(datetime.datetime.fromtimestamp(1), SENSOR_NAME,
TARGET_NAME, 100, {})
POWER_REPORT_2 = PowerReport(datetime.datetime.fromtimestamp(2), SENSOR_NAME,
TARGET_NAME, 100, {})
def bad_serialize():
return {
'measurement': 'power_consumption',
'tags': {'target': TARGET_NAME},
'time': str(datetime.datetime.fromtimestamp(100)),
'fields': {
'power': 100
}}
def gen_power_report():
return PowerReport(1, "sensor", "target", 0.11, {"metadata1": "truc", "metadata2": "oui"})
from tests.influx_utils import generate_power_report, create_empty_db
from tests.influx_utils import create_non_empty_db, delete_db, get_all_reports
INFLUX_URI = 'localhost'
INFLUX_PORT = 8086
INFLUX_DBNAME = 'unit_test'
SENSOR_NAME = 'sensor_test'
TARGET_NAME = 'target_test'
POWER_REPORT_0 = PowerReport(datetime.datetime.fromtimestamp(0), SENSOR_NAME,
TARGET_NAME, 100, {})
POWER_REPORT_1 = PowerReport(datetime.datetime.fromtimestamp(1), SENSOR_NAME,
TARGET_NAME, 100, {})
POWER_REPORT_2 = PowerReport(datetime.datetime.fromtimestamp(2), SENSOR_NAME,
TARGET_NAME, 100, {})
def bad_serialize():
return {
'measurement': 'power_consumption',
'tags': {'target': TARGET_NAME},
'time': str(datetime.datetime.fromtimestamp(100)),
'fields': {
'power': 100
}}
BAD_POWER_REPORT = PowerReport(datetime.datetime.fromtimestamp(2), SENSOR_NAME, TARGET_NAME, 100, {})
BAD_POWER_REPORT.serialize = bad_serialize
hwpc_report = create_report_root(
[create_group_report('rapl', [
create_socket_report(socket_id,
[create_core_report('1',
None, None,
events=events)])
])])
validation_report_1 = PowerReport(hwpc_report.timestamp,
hwpc_report.sensor,
hwpc_report.target,
math.ldexp(raw_power_1, -32),
{'socket': socket_id,
'event': rapl_event_id_1})
validation_report_2 = PowerReport(hwpc_report.timestamp,
hwpc_report.sensor,
hwpc_report.target,
math.ldexp(raw_power_2, -32),
{'socket': socket_id,
'event': rapl_event_id_2})
result = RAPLFormulaHWPCReportHandler(get_fake_pusher())._process_report(
hwpc_report, state)
assert len(result) == 2
assert validation_report_1 in result
assert validation_report_2 in result
def gen_power_report():
global CPT
CPT += 1
return PowerReport(timestamp_to_datetime(CPT), SENSOR, TARGET, 0.11, {"metadata1": "truc", "metadata2": "oui"})
raw_power_1 = 10
rapl_event_id_1 = 'RAPL_1'
raw_power_2 = 20
rapl_event_id_2 = 'RAPL_2'
events = {rapl_event_id_1: raw_power_1, rapl_event_id_2: raw_power_2}
hwpc_report = create_report_root(
[create_group_report('rapl', [
create_socket_report(socket_id,
[create_core_report('1',
None, None,
events=events)])
])])
validation_report_1 = PowerReport(hwpc_report.timestamp,
hwpc_report.sensor,
hwpc_report.target,
math.ldexp(raw_power_1, -32),
{'socket': socket_id,
'event': rapl_event_id_1})
validation_report_2 = PowerReport(hwpc_report.timestamp,
hwpc_report.sensor,
hwpc_report.target,
math.ldexp(raw_power_2, -32),
{'socket': socket_id,
'event': rapl_event_id_2})
result = RAPLFormulaHWPCReportHandler(get_fake_pusher())._process_report(
hwpc_report, state)
def launch_powerapi(args, logger):
##########################################################################
# Actor Creation
# Pusher
output_mongodb = MongoDB(args.output_uri,
args.output_db, args.output_collection,
HWPCModel())
pusher = PusherActor("pusher_mongodb", PowerReport, output_mongodb,
level_logger=args.verbose)
# Formula
formula_factory = (lambda name, verbose:
RAPLFormulaActor(name, pusher, level_logger=verbose))
# Dispatcher
route_table = RouteTable()
route_table.dispatch_rule(HWPCReport, HWPCDispatchRule(
getattr(HWPCDepthLevel, args.hwpc_dispatch_rule), primary=True))
dispatcher = DispatcherActor('dispatcher', formula_factory, route_table,
level_logger=args.verbose)
# Puller
input_mongodb = MongoDB(args.input_uri,
def _gen_power_report(report, socket, event, counter):
"""
Generate a power report for a RAPL event.
:param report: HWPC report
:param socket: Socket ID
:param event: RAPL event name
:param counter: RAPL event counter
"""
power = math.ldexp(counter, -32)
metadata = {'socket': socket, 'event': event}
return PowerReport(report.timestamp, report.sensor, report.target,
power, metadata)
def estimate(self, report):
"""
Method that estimate the power consumption from an input report
:param report: Input Report
:return: List of PowerReport
"""
result_msg = PowerReport(report.timestamp, report.sensor,
report.target, {}, 42)
return [result_msg]