Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_mapping(base_dir, partition_file):
mapping = {}
with open(partition_file) as pf:
for line in pf:
pair = line.split()
if len(pair) != 2:
HostError('partitions.txt is not properly formated')
image_path = os.path.join(base_dir, pair[1])
if not os.path.isfile(expand_path(image_path)):
HostError('file {} was not found in the bundle or was misplaced'.format(pair[1]))
mapping[pair[0]] = image_path
return mapping
def _check_env():
global ssh, scp, sshpass # pylint: disable=global-statement
if not ssh:
ssh = which('ssh')
scp = which('scp')
sshpass = which('sshpass')
if not (ssh and scp):
raise HostError('OpenSSH must be installed on the host.')
def _give_password(password, command):
if not sshpass:
raise HostError('Must have sshpass installed on the host in order to use password-based auth.')
pass_template = "sshpass -p {} "
pass_string = pass_template.format(quote(password))
redacted_string = pass_template.format(quote(''))
return (pass_string + command, redacted_string + command)
def stop(self):
process = self.process
self.process = None
if not process:
raise RuntimeError('Monsoon script not started')
process.poll()
if process.returncode is not None:
stdout, stderr = process.communicate()
if sys.version_info[0] == 3:
stdout = stdout.encode(sys.stdout.encoding or 'utf-8')
stderr = stderr.encode(sys.stdout.encoding or 'utf-8')
raise HostError(
'Monsoon script exited unexpectedly with exit code {}.\n'
'stdout:\n{}\nstderr:\n{}'.format(process.returncode,
stdout, stderr))
process.send_signal(signal.SIGINT)
stderr = process.stderr.read()
self.buffer_file.close()
with open(self.buffer_file.name) as f:
stdout = f.read()
os.remove(self.buffer_file.name)
self.buffer_file = None
self.output = (stdout, stderr)
def _prepare_xfer(self, action, sources, dest):
"""
Check the sanity of sources and destination and prepare the ground for
transfering multiple sources.
"""
if action == 'push':
src_excep = HostError
dst_excep = TargetStableError
dst_path_exists = self.file_exists
dst_is_dir = self.directory_exists
dst_mkdir = self.makedirs
for source in sources:
if not os.path.exists(source):
raise HostError('No such file "{}"'.format(source))
else:
src_excep = TargetStableError
dst_excep = HostError
dst_path_exists = os.path.exists
dst_is_dir = os.path.isdir
dst_mkdir = functools.partial(os.makedirs, exist_ok=True)
if not sources:
raise src_excep('No file matching: {}'.format(source))
elif len(sources) > 1:
if dst_path_exists(dest):
if not dst_is_dir(dest):
raise dst_excep('A folder dest is required for multiple matches but destination is a file: {}'.format(dest))
else:
dst_makedirs(dest)
def validate_image_bundle(bundle):
if not tarfile.is_tarfile(bundle):
raise HostError('Image bundle {} does not appear to be a valid TAR file.'.format(bundle))
with tarfile.open(bundle) as tar:
try:
tar.getmember('config.txt')
except KeyError:
try:
tar.getmember('./config.txt')
except KeyError:
msg = 'Tarball {} does not appear to be a valid image bundle (did not see config.txt).'
raise HostError(msg.format(bundle))
labels=None,
device_entry='/dev/ttyACM0',
keep_raw=False
):
super(EnergyProbeInstrument, self).__init__(target)
self.resistor_values = resistor_values
self.keep_raw = keep_raw
if labels is not None:
self.labels = labels
else:
self.labels = ['PORT_{}'.format(i)
for i in range(len(resistor_values))]
self.device_entry = device_entry
self.caiman = which('caiman')
if self.caiman is None:
raise HostError('caiman must be installed on the host '
'(see https://github.com/ARM-software/caiman)')
self.attributes_per_sample = 3
self.bytes_per_sample = self.attributes_per_sample * 4
self.attributes = ['power', 'voltage', 'current']
self.command = None
self.raw_output_directory = None
self.process = None
self.sample_rate_hz = 10000 # Determined empirically
self.raw_data_file = None
for label in self.labels:
for kind in self.attributes:
self.add_channel(label, kind)
def get_data(self, output_file):
temp_file = tempfile.mktemp()
self.target.pull(self.on_target_file, temp_file)
self.target.remove(self.on_target_file)
with csvreader(temp_file) as reader:
headings = next(reader)
# Figure out which columns from the collected csv we actually want
select_columns = []
for chan in self.active_channels:
try:
select_columns.append(headings.index(chan.name))
except ValueError:
raise HostError('Channel "{}" is not in {}'.format(chan.name, temp_file))
with csvwriter(output_file) as writer:
write_headings = ['{}_{}'.format(c.site, c.kind)
for c in self.active_channels]
writer.writerow(write_headings)
for row in reader:
write_row = [row[c] for c in select_columns]
writer.writerow(write_row)
return MeasurementsCsv(output_file, self.active_channels, sample_rate_hz=10)
self.function_profile_file = self.target.path.join(self.tracing_path, 'function_profile_enabled')
self.marker_file = self.target.path.join(self.tracing_path, 'trace_marker')
self.ftrace_filter_file = self.target.path.join(self.tracing_path, 'set_ftrace_filter')
self.trace_clock_file = self.target.path.join(self.tracing_path, 'trace_clock')
self.save_cmdlines_size_file = self.target.path.join(self.tracing_path, 'saved_cmdlines_size')
self.available_tracers_file = self.target.path.join(self.tracing_path, 'available_tracers')
self.host_binary = which('trace-cmd')
self.kernelshark = which('kernelshark')
if not self.target.is_rooted:
raise TargetStableError('trace-cmd instrument cannot be used on an unrooted device.')
if self.autoreport and not self.report_on_target and self.host_binary is None:
raise HostError('trace-cmd binary must be installed on the host if autoreport=True.')
if self.autoview and self.kernelshark is None:
raise HostError('kernelshark binary must be installed on the host if autoview=True.')
if not no_install:
host_file = os.path.join(PACKAGE_BIN_DIRECTORY, self.target.abi, 'trace-cmd')
self.target_binary = self.target.install(host_file)
else:
if not self.target.is_installed('trace-cmd'):
raise TargetStableError('No trace-cmd found on device and no_install=True is specified.')
self.target_binary = 'trace-cmd'
# Validate required events to be traced
def event_to_regex(event):
if not event.startswith('*'):
event = '*' + event
return re.compile(event.replace('*', '.*'))
def event_is_in_list(event, events):
def __init__(self, target, config_file='./config-aep', keep_raw=False):
super(ArmEnergyProbeInstrument, self).__init__(target)
self.arm_probe = which('arm-probe')
if self.arm_probe is None:
raise HostError('arm-probe must be installed on the host')
#todo detect is config file exist
self.attributes = ['power', 'voltage', 'current']
self.sample_rate_hz = 10000
self.config_file = config_file
self.keep_raw = keep_raw
self.parser = AepParser()
#TODO make it generic
topo = self.parser.topology_from_config(self.config_file)
for item in topo:
if item == 'time':
self.add_channel('timestamp', 'time')
else:
self.add_channel(item, 'power')