Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def widen(self, _):
raise SimMergeError("Widening the filesystem is unsupported")
def widen(self, *others):
"""
Perform a widening between self and other states
:param others:
:return:
"""
if len(set(frozenset(o.plugins.keys()) for o in others)) != 1:
raise SimMergeError("Unable to widen due to different sets of plugins.")
if len(set(o.arch.name for o in others)) != 1:
raise SimMergeError("Unable to widen due to different architectures.")
widened = self.copy()
widening_occurred = False
# plugins
for p in self.plugins:
if p in ('solver', 'unicorn'):
continue
plugin_state_widened = widened.plugins[p].widen([_.plugins[p] for _ in others])
if plugin_state_widened:
l.debug('Widening occured in %s', p)
widening_occurred = True
return widened, widening_occurred
def _combine(self, others):
if any(o.heap_base != self.heap_base for o in others):
raise SimMergeError("Cannot merge heaps with different bases")
# When heaps become more dynamic, this next one can probably change
if any(o.heap_size != self.heap_size for o in others):
raise SimMergeError("Cannot merge heaps with different sizes")
if any(o.free_head_chunk != self.free_head_chunk for o in others):
raise SimMergeError("Cannot merge heaps with different freelist head chunks")
if any(o.mmap_base != self.mmap_base for o in others):
raise SimMergeError("Cannot merge heaps with different mmap bases")
# These are definitely sanity checks
if any(o._chunk_size_t_size != self._chunk_size_t_size for o in others):
raise SimMergeError("Cannot merge heaps with different chunk size_t sizes")
if any(o._chunk_min_size != self._chunk_min_size for o in others):
raise SimMergeError("Cannot merge heaps with different minimum chunk sizes")
if any(o._chunk_align_mask != self._chunk_align_mask for o in others):
raise SimMergeError("Cannot merge heaps with different chunk alignments")
return False
def _combine(self, others):
if any(o.heap_base != self.heap_base for o in others):
raise SimMergeError("Cannot merge heaps with different bases")
# When heaps become more dynamic, this next one can probably change
if any(o.heap_size != self.heap_size for o in others):
raise SimMergeError("Cannot merge heaps with different sizes")
if any(o.free_head_chunk != self.free_head_chunk for o in others):
raise SimMergeError("Cannot merge heaps with different freelist head chunks")
if any(o.mmap_base != self.mmap_base for o in others):
raise SimMergeError("Cannot merge heaps with different mmap bases")
# These are definitely sanity checks
if any(o._chunk_size_t_size != self._chunk_size_t_size for o in others):
raise SimMergeError("Cannot merge heaps with different chunk size_t sizes")
if any(o._chunk_min_size != self._chunk_min_size for o in others):
raise SimMergeError("Cannot merge heaps with different minimum chunk sizes")
if any(o._chunk_align_mask != self._chunk_align_mask for o in others):
raise SimMergeError("Cannot merge heaps with different chunk alignments")
return False
def merge(self, others):
if not all(self._repeat_expr is o._repeat_expr for o in others):
raise SimMergeError("Unable to merge two different repeat expressions.")
self._repeat_min = max(self._repeat_min, max(o._repeat_min for o in others))
self._repeat_granularity = max(
self._repeat_granularity,
max(o._repeat_granularity for o in others)
)
to_merge = self.stashes[stash]
not_to_merge = [ ]
merge_groups = [ ]
while len(to_merge) > 0:
g, to_merge = self._filter_states(lambda s: s.addr == to_merge[0].addr, to_merge)
if len(g) <= 1:
not_to_merge.extend(g)
else:
merge_groups.append(g)
for g in merge_groups:
try:
m = self._merge_states(g) if merge_func is None else merge_func(*g)
not_to_merge.append(m)
except SimMergeError:
l.warning("SimMergeError while merging %d states", len(g), exc_info=True)
not_to_merge.extend(g)
new_stashes = self._copy_stashes()
new_stashes[stash] = not_to_merge
return self._successor(new_stashes)
def widen(self, *others):
"""
Perform a widening between self and other states
:param others:
:return:
"""
if len(set(frozenset(o.plugins.keys()) for o in others)) != 1:
raise SimMergeError("Unable to widen due to different sets of plugins.")
if len(set(o.arch.name for o in others)) != 1:
raise SimMergeError("Unable to widen due to different architectures.")
widened = self.copy()
widening_occurred = False
# plugins
for p in self.plugins:
if p in ('solver', 'unicorn'):
continue
plugin_state_widened = widened.plugins[p].widen([_.plugins[p] for _ in others])
if plugin_state_widened:
l.debug('Widening occured in %s', p)
widening_occurred = True
return widened, widening_occurred
def _merge_strategies(*strategy_lists):
if len(set(len(sl) for sl in strategy_lists)) != 1:
raise SimMergeError("unable to merge memories with amounts of strategies")
merged_strategies = [ ]
for strategies in zip(*strategy_lists):
if len(set(s.__class__ for s in strategies)) != 1:
raise SimMergeError("unable to merge memories with different types of strategies")
unique = list(set(strategies))
if len(unique) > 1:
unique[0].merge(unique[1:])
merged_strategies.append(unique[0])
return merged_strategies
def merge(self, others, merge_conditions, common_ancestor=None):
merging_occured = False
for o in others:
if o.cwd != self.cwd:
raise SimMergeError("Can't merge filesystems with disparate cwds")
if len(o._mountpoints) != len(self._mountpoints):
raise SimMergeError("Can't merge filesystems with disparate mountpoints")
if list(map(id, o.unlinks)) != list(map(id, self.unlinks)):
raise SimMergeError("Can't merge filesystems with disparate unlinks")
for fname in self._mountpoints:
try:
subdeck = [o._mountpoints[fname] for o in others]
except KeyError:
raise SimMergeError("Can't merge filesystems with disparate file sets")
if common_ancestor is not None and fname in common_ancestor._mountpoints:
common_mp = common_ancestor._mountpoints[fname]
else:
common_mp = None
merging_occured |= self._mountpoints[fname].merge(subdeck, merge_conditions, common_ancestor=common_mp)