Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_reconstruction_group_and_tables(self, results_group, overwrite):
if results_group in self.data:
if overwrite:
self.data.remove_node(results_group, recursive=True)
else:
raise RuntimeError("Result group exists, but overwrite is False")
head, tail = os.path.split(results_group)
group = self.data.create_group(head, tail)
stations_description = {'s%d' % u: tables.BoolCol() for u in
self.stations}
description = self.reconstruction_description
description.update(stations_description)
self.reconstruction = self.data.create_table(group,
'reconstructions', description)
description = self.reconstruction_coincidence_description
description.update(stations_description)
self.reconstruction_coincidences = \
self.data.create_table(group, 'coincidences', description)
return group
def add_meta(simulation, key):
"""Create a node for the simulation and add data to /Metadata"""
class Meta(tbl.IsDescription):
SID = tbl.StringCol(itemsize=16)
path = tbl.StringCol(itemsize=160)
log_analysed = tbl.BoolCol()
successful = tbl.BoolCol()
algorithm = tbl.StringCol(itemsize=16)
cpu_time = tbl.Float32Col()
successful_steps = tbl.Int32Col()
steps_nok = tbl.Int32Col()
timed_out = tbl.BoolCol()
perc_wrong = tbl.Float32Col()
time_events_model = tbl.Int32Col()
time_events_U = tbl.Int32Col()
state_events = tbl.Int32Col()
step_events = tbl.Int32Col()
step_size_min = tbl.Float32Col()
step_size_max = tbl.Float32Col()
int_order_max = tbl.Int32Col()
"loading", "patching"])
region_types = tables.Enum(["future", 'global', 'patching',
'stack',
'symbol',
'readonly',
BOOKKEEPING])
vlist = ['rw', 'r', 'w', 'none', '?']
perms = tables.Enum(vlist + ["rwx", "x", "rx"])
class MemoryRegionInfo(tables.IsDescription):
short_name = tables.StringCol(255)
parent_name = tables.StringCol(255)
name = tables.StringCol(255)
comments = tables.StringCol(512)
include_children = tables.BoolCol()
reclassifiable = tables.BoolCol()
do_print = tables.BoolCol()
class MemoryRegionAddrs(tables.IsDescription):
short_name = tables.StringCol(255)
startaddr = tables.UInt64Col()
startaddrlo = tables.UInt32Col()
startaddrhi = tables.UInt32Col()
endaddr = tables.UInt64Col()
endaddrlo = tables.UInt32Col()
endaddrhi = tables.UInt32Col()
class SubstageRelocInfo(tables.IsDescription):
substagenum = tables.UInt8Col()
else:
return "" % (self.start_ins_addr, self.write_ins_addr, self.finish_ins_addr)
class WriteEntry(tables.IsDescription):
pc = tables.UInt64Col()
pclo = tables.UInt32Col()
pchi = tables.UInt32Col()
thumb = tables.BoolCol()
reg0 = tables.StringCol(4)
reg1 = tables.StringCol(4)
reg2 = tables.StringCol(4)
reg3 = tables.StringCol(4)
reg4 = tables.StringCol(4)
writesize = tables.Int64Col()
halt = tables.BoolCol() # whether to insert a breakpoint here
class SrcEntry(tables.IsDescription):
addr = tables.UInt64Col()
addrlo = tables.UInt32Col()
addrhi = tables.UInt32Col()
line = tables.StringCol(512) # file/lineno
src = tables.StringCol(512) # contents of source code at this location
ivalue = tables.StringCol(12)
ilength = tables.UInt8Col()
thumb = tables.BoolCol()
mne = tables.StringCol(10)
disasm = tables.StringCol(256)
class RelocInfo(tables.IsDescription):
reloc_name = tables.StringCol(128)
class SubstageRegionPolicy(tables.IsDescription):
default_perms = tables.EnumCol(perms,
'rwx', base='uint8')
short_name = tables.StringCol(255)
symbol_name = tables.StringCol(255) # symbol name in code
symbol_elf_name = tables.StringCol(255) # symbol name in elf filie
region_type = tables.EnumCol(region_types, BOOKKEEPING, base='uint8')
substagenum = tables.UInt8Col()
new = tables.BoolCol()
defined = tables.BoolCol()
undefined = tables.BoolCol()
writable = tables.BoolCol()
reclassified = tables.BoolCol()
allowed_symbol = tables.BoolCol()
do_print = tables.BoolCol()
class SubstageContents(tables.IsDescription):
substagenum = tables.UInt8Col(pos=1)
functionname = tables.StringCol(255, pos=2)
class SubstageEntry(tables.IsDescription):
substagenum = tables.UInt8Col(pos=1)
functionname = tables.StringCol(255)
name = tables.StringCol(255)
stack = tables.StringCol(255)
comments = tables.StringCol(255)
substage_type = tables.EnumCol(substage_types,
def _prepare_coincidence_tables(self):
"""Create coincidence tables
These are the same as the tables created by
:class:`sapphire.analysis.coincidences.CoincidencesESD`.
This makes it easy to link events detected by multiple stations.
"""
self.coincidence_group = self.datafile.createGroup(self.output_path,
'coincidences',
createparents=True)
self.coincidence_group._v_attrs.cluster = self.cluster
description = storage.Coincidence
station_columns = {'s%d' % station.station_number: tables.BoolCol()
for station in self.cluster.stations}
description.columns.update(station_columns)
self.coincidences = self.datafile.createTable(
self.coincidence_group, 'coincidences', description)
self.c_index = self.datafile.createVLArray(
self.coincidence_group, 'c_index', tables.UInt32Col(shape=2))
self.s_index = self.datafile.createVLArray(
self.coincidence_group, 's_index', tables.VLStringAtom())
def _create_coincidences_tables(file, group, station_groups):
"""Setup coincidence tables
:return: the created coincidences group.
"""
coin_group = group + '/coincidences'
# Create coincidences table
description = storage.Coincidence
s_columns = {'s%d' % station: tables.BoolCol(pos=p)
for p, station in enumerate(station_groups, 12)}
description.columns.update(s_columns)
coincidences = file.create_table(coin_group, 'coincidences', description, createparents=True)
# Create c_index
file.create_vlarray(coin_group, 'c_index', tables.UInt32Col(shape=2))
# Create and fill s_index
s_index = file.create_vlarray(coin_group, 's_index', tables.VLStringAtom())
for station_group in itervalues(station_groups):
s_index.append(station_group['group'].encode('utf-8'))
return coincidences._v_parent
def prepare_output(self):
"""Prepare output table"""
dest_path = os.path.join(self.dest_group, self.destination)
if dest_path in self.dest_data:
if self.overwrite:
self.dest_data.remove_node(dest_path, recursive=True)
else:
raise RuntimeError("Reconstructions table already exists for "
"%s, and overwrite is False" %
self.dest_group)
s_columns = {'s%d' % station.number: tables.BoolCol(pos=p)
for p, station in enumerate(self.cluster.stations, 26)}
description = ReconstructedCoincidence
description.columns.update(s_columns)
self.reconstructions = self.dest_data.create_table(
self.dest_group, self.destination, description,
expectedrows=self.coincidences.nrows, createparents=True)
try:
self.reconstructions._v_attrs.cluster = self.cluster
except tables.HDF5ExtError:
warnings.warn('Unable to store cluster object, to large for HDF.')
def add_meta(simulation, key):
"""Create a node for the simulation and add data to /Metadata"""
class Meta(tbl.IsDescription):
SID = tbl.StringCol(itemsize=16)
path = tbl.StringCol(itemsize=160)
log_analysed = tbl.BoolCol()
successful = tbl.BoolCol()
algorithm = tbl.StringCol(itemsize=16)
cpu_time = tbl.Float32Col()
successful_steps = tbl.Int32Col()
steps_nok = tbl.Int32Col()
timed_out = tbl.BoolCol()
perc_wrong = tbl.Float32Col()
time_events_model = tbl.Int32Col()
time_events_U = tbl.Int32Col()
state_events = tbl.Int32Col()
step_events = tbl.Int32Col()
step_size_min = tbl.Float32Col()
step_size_max = tbl.Float32Col()
int_order_max = tbl.Int32Col()