Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_data(self, outfile):
if self.process:
raise RuntimeError('`get_data` called before `stop`')
stdout, _ = self.output
with csvwriter(outfile) as writer:
active_sites = [c.site for c in self.active_channels]
# Write column headers
row = []
if 'output' in active_sites:
row.append('output_power')
if 'USB' in active_sites:
row.append('USB_power')
writer.writerow(row)
# Write data
for line in stdout.splitlines():
# Each output line is a main_output, usb_output measurement pair.
# (If our user only requested one channel we still collect both,
# and just ignore one of them)
output, usb = line.split()
def get_data(self, outfile): # pylint: disable=R0914
all_channels = [c.label for c in self.list_channels()]
active_channels = [c.label for c in self.active_channels]
active_indexes = [all_channels.index(ac) for ac in active_channels]
num_of_ports = len(self.resistor_values)
struct_format = '{}I'.format(num_of_ports * self.attributes_per_sample)
not_a_full_row_seen = False
self.raw_data_file = os.path.join(self.raw_output_directory, '0000000000')
self.logger.debug('Parsing raw data file: {}'.format(self.raw_data_file))
with open(self.raw_data_file, 'rb') as bfile:
with csvwriter(outfile) as writer:
writer.writerow(active_channels)
while True:
data = bfile.read(num_of_ports * self.bytes_per_sample)
if data == '':
break
try:
unpacked_data = struct.unpack(struct_format, data)
row = [unpacked_data[i] / 1000 for i in active_indexes]
writer.writerow(row)
except struct.error:
if not_a_full_row_seen:
self.logger.warning('possibly missaligned caiman raw data, row contained {} bytes'.format(len(data)))
continue
else:
not_a_full_row_seen = True
return MeasurementsCsv(outfile, self.active_channels, self.sample_rate_hz)
def write_measurements_csv(measurements, filepath):
headers = sorted(measurements.keys())
columns = [measurements[h] for h in headers]
with csvwriter(filepath) as writer:
writer.writerow(headers)
writer.writerows(zip_longest(*columns))
def _write_outputs(self, outputs, output):
if self.use_all_classifiers:
classifiers = set([])
for out in outputs:
for metric in out.metrics:
classifiers.update(list(metric.classifiers.keys()))
extra_columns = list(classifiers)
elif self.extra_columns:
extra_columns = self.extra_columns
else:
extra_columns = []
outfile = output.get_path('results.csv')
with csvwriter(outfile) as writer:
writer.writerow(['id', 'workload', 'iteration', 'metric', ] +
extra_columns + ['value', 'units'])
for o in outputs:
if o.kind == 'job':
header = [o.id, o.label, o.iteration]
elif o.kind == 'run':
# Should be a RunOutput. Run-level metrics aren't attached
# to any job so we leave 'id' and 'iteration' blank, and use
# the run name for the 'label' field.
header = [None, o.info.run_name, None]
else:
raise RuntimeError(
'Output of kind "{}" unrecognised by csvproc'.format(o.kind))
for metric in o.result.metrics:
temp_file = tempfile.mktemp()
self.target.pull(self.on_target_file, temp_file)
self.target.remove(self.on_target_file)
with csvreader(temp_file) as reader:
headings = next(reader)
# Figure out which columns from the collected csv we actually want
select_columns = []
for chan in self.active_channels:
try:
select_columns.append(headings.index(chan.name))
except ValueError:
raise HostError('Channel "{}" is not in {}'.format(chan.name, temp_file))
with csvwriter(output_file) as writer:
write_headings = ['{}_{}'.format(c.site, c.kind)
for c in self.active_channels]
writer.writerow(write_headings)
for row in reader:
write_row = [row[c] for c in select_columns]
writer.writerow(write_row)
return MeasurementsCsv(output_file, self.active_channels, sample_rate_hz=10)
channel_order = ['Time_time']
for site, reader in site_readers.items():
channel_order.extend(['{}_{}'.format(site, kind)
for kind in next(reader)])
def _read_rows():
row_iter = zip_longest(*site_readers.values(), fillvalue=(None, None))
for raw_row in row_iter:
raw_row = list(chain.from_iterable(raw_row))
raw_row.insert(0, _read_rows.row_time_s)
yield raw_row
_read_rows.row_time_s += 1.0 / self.sample_rate_hz
_read_rows.row_time_s = self.target_monotonic_clock_at_start
with csvwriter(outfile) as writer:
field_names = [c.label for c in self.active_channels]
writer.writerow(field_names)
for raw_row in _read_rows():
row = [raw_row[channel_order.index(f)] for f in field_names]
writer.writerow(row)
return MeasurementsCsv(outfile, self.active_channels, self.sample_rate_hz)
finally:
for fh in file_handles:
fh.close()
for state in sorted(powerstate_report.state_stats):
stats = powerstate_report.state_stats[state]
powerstate_rows.append([job_id, workload, iteration, state] +
['{:.3f}'.format(s if s is not None else 0)
for s in stats])
outpath = output.get_path('parallel-stats.csv')
with csvwriter(outpath) as writer:
writer.writerow(['id', 'workload', 'iteration', 'cluster',
'number_of_cores', 'total_time',
'%time', '%running_time'])
writer.writerows(parallel_rows)
output.add_artifact('run-parallel-stats', outpath, kind='export')
outpath = output.get_path('power-state-stats.csv')
with csvwriter(outpath) as writer:
headers = ['id', 'workload', 'iteration', 'state']
headers += ['{} CPU{}'.format(c, i)
for i, c in enumerate(powerstate_report.core_names)]
writer.writerow(headers)
writer.writerows(powerstate_rows)
output.add_artifact('run-power-state-stats', outpath, kind='export')
def write(self):
with csvwriter(self.filepath) as writer:
writer.writerow(['cluster', 'number_of_cores', 'total_time', '%time', '%running_time'])
writer.writerows(self.values)