Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_self_modifying_code():
p = angr.Project(os.path.join(test_location, 'cgc', 'stuff'))
pg = p.factory.simulation_manager(p.factory.entry_state(add_options={o.STRICT_PAGE_ACCESS}))
pg.step(until=lambda lpg: len(lpg.active) != 1)
retval = pg.one_deadended.regs.ebx
nose.tools.assert_true(claripy.is_true(retval == 65))
pgu = p.factory.simulation_manager(p.factory.entry_state(add_options={o.STRICT_PAGE_ACCESS} | o.unicorn))
pgu.step(until=lambda lpg: len(lpg.active) != 1)
retval = pgu.one_deadended.regs.ebx
nose.tools.assert_true(claripy.is_true(retval == 65))
nose.tools.assert_true(pg.one_deadended.history.bbl_addrs.hardcopy == pgu.one_deadended.history.bbl_addrs.hardcopy)
def test_type_annotation():
my_ty = angr.sim_type.SimTypeTop()
ptr = claripy.BVS('ptr', 32).annotate(angr.type_backend.TypeAnnotation(angr.sim_type.SimTypePointer(my_ty, label=[])))
ptroffset = ptr + 4
bt = angr.type_backend.TypeBackend()
tv = bt.convert(ptroffset)
nose.tools.assert_is(tv.ty.pts_to, my_ty)
nose.tools.assert_true(claripy.is_true(tv.ty.offset == 4))
def test_rcr():
p = angr.Project(os.path.join(os.path.dirname(__file__), '..', '..', 'binaries', 'tests', 'i386', 'rcr_test'))
result = p.factory.successors(p.factory.entry_state()).successors[0]
nose.tools.assert_true(claripy.is_true(result.regs.cl == 8))
fmt_arg_pos = PRINTF_VARIANTS[func_name]
callers = set.union(set(), *(cfg.get_predecessors(node) for node in cfg.get_all_nodes(func.addr)))
handled_addrs = set()
func_to_cfg = {}
for caller in callers:
if caller.addr in handled_addrs:
continue
try:
args, _ = self.ident.get_call_args(func, caller.addr)
except KeyError:
continue
fmt_str = args[fmt_arg_pos]
if not claripy.is_true(claripy.Or(*(claripy.And(seg.min_addr <= fmt_str, fmt_str <= seg.max_addr)\
for seg in self.ro_segments))):
# we bad
break
handled_addrs.add(caller.addr)
else:
# patch not necessary for this function
continue
pnum += 1 # we need this to ensure always different labels
used_spec_chars.append(func_obj.format_spec_char)
check = """
; is the address not in RO memory?
cmp dword [esp+{stack_offset}], {{max_ro_addr}}
jbe _end_printfcheck_%d
func_addr=state_addr,
stack_ptr=stack_ptr,
ret_addr=ret_addr,
jumpkind='Ijk_Call')
state.callstack.push(new_frame)
state._inspect('call', BP_AFTER)
else:
while state.solver.is_true(state.regs._sp > state.callstack.top.stack_ptr):
state._inspect('return', BP_BEFORE, function_address=state.callstack.top.func_addr)
state.callstack.pop()
state._inspect('return', BP_AFTER)
if not state.arch.call_pushes_ret and \
claripy.is_true(state.regs._ip == state.callstack.ret_addr) and \
claripy.is_true(state.regs._sp == state.callstack.stack_ptr):
# very weird edge case that's not actually weird or on the edge at all:
# if we use a link register for the return address, the stack pointer will be the same
# before and after the call. therefore we have to check for equality with the marker
# along with this other check with the instruction pointer to guess whether it's time
# to pop a callframe. Still better than relying on Ijk_Ret.
state._inspect('return', BP_BEFORE, function_address=state.callstack.top.func_addr)
state.callstack.pop()
state._inspect('return', BP_AFTER)
:param simfile: The actual simfile to preconstrain
"""
repair_entry_state_opts = False
if o.TRACK_ACTION_HISTORY in self.state.options:
repair_entry_state_opts = True
self.state.options -= {o.TRACK_ACTION_HISTORY}
if set_length: # disable read bounds
simfile.has_end = False
pos = 0
for write in content:
if type(write) is int:
write = bytes([write])
data, length, pos = simfile.read(pos, len(write), disable_actions=True, inspect=False, short_reads=False)
if not claripy.is_true(length == len(write)):
raise AngrError("Bug in either SimFile or in usage of preconstrainer: couldn't get requested data from file")
self.preconstrain(write, data)
# if the file is a stream, reset its position
if simfile.pos is not None:
simfile.pos = 0
if set_length: # enable read bounds; size is now maximum size
simfile.has_end = True
if repair_entry_state_opts:
self.state.options |= {o.TRACK_ACTION_HISTORY}
def run(self, lib_handle, name_addr):
if lib_handle.symbolic:
raise angr.errors.SimValueError("GetProcAddress called with symbolic library handle %s" % lib_handle)
lib_handle = self.state.solver.eval(lib_handle)
if lib_handle == 0:
obj = self.project.loader.main_object
else:
for obj in self.project.loader.all_pe_objects:
if obj.mapped_base == lib_handle:
break
else:
l.warning("GetProcAddress: invalid library handle %s", lib_handle)
return 0
if claripy.is_true(name_addr < 0x10000):
# this matches the bogus name specified in the loader...
ordinal = self.state.solver.eval(name_addr)
name = 'ordinal.%d.%s' % (ordinal, obj.provides)
else:
name = self.state.mem[name_addr].string.concrete.decode('utf-8')
full_name = '%s.%s' % (obj.provides, name)
self.procs.add(full_name)
sym = obj.get_symbol(name)
if sym is None and name.endswith('@'):
# There seems to be some mangling parsing being done in the linker?
# I don't know what I'm doing
for suffix in ['Z', 'XZ']:
sym = obj.get_symbol(name + suffix)
if sym is not None:
for node_0 in seq.nodes:
if not type(node_0) is CodeNode:
continue
rcond_0 = node_0.reaching_condition
if rcond_0 is None:
continue
for node_1 in seq.nodes:
if not type(node_1) is CodeNode:
continue
if node_0 is node_1:
continue
rcond_1 = node_1.reaching_condition
if rcond_1 is None:
continue
cond_ = claripy.simplify(claripy.Not(rcond_0) == rcond_1)
if claripy.is_true(cond_):
# node_0 and node_1 should be structured using an if-then-else
self._make_ite(seq, node_0, node_1)
break
else:
break
# make all conditionally-reachable nodes ConditionNodes
for i in range(len(seq.nodes)):
node = seq.nodes[i]
if node.reaching_condition is not None and not claripy.is_true(node.reaching_condition):
if isinstance(node.node, ConditionalBreakNode):
# Put conditions together and simplify them
cond = claripy.And(node.reaching_condition, self._bool_variable_from_ail_condition(node.node.condition))
new_node = CodeNode(ConditionalBreakNode(node.node.addr, cond, node.node.target), None)
else:
new_node = ConditionNode(node.addr, None, node.reaching_condition, node,
clemory = state.memory.regions['global'].memory.mem._memory_backer
else:
# symbolic memory
clemory = state.memory.mem._memory_backer
buff, size = b"", 0
# Load from the clemory if we can
smc = self._support_selfmodifying_code
if state and not smc:
try:
p = state.memory.permissions(addr)
if p.symbolic:
smc = True
else:
smc = claripy.is_true(p & 2 != 0)
except: # pylint: disable=bare-except
smc = True # I don't know why this would ever happen, we checked this right?
if (not smc or not state) and isinstance(clemory, cle.Clemory):
try:
start, backer = next(clemory.backers(addr))
except StopIteration:
pass
else:
if start <= addr:
offset = addr - start
buff = pyvex.ffi.from_buffer(backer) + offset
size = len(backer) - offset
# If that didn't work, try to load from the state
if size == 0 and state:
ret_addr=ret_addr,
jumpkind='Ijk_Call')
state.callstack.push(new_frame)
state._inspect('call', BP_AFTER)
else:
while True:
cur_sp = state.solver.max(state.regs._sp) if state.has_plugin('symbolizer') else state.regs._sp
if not state.solver.is_true(cur_sp > state.callstack.top.stack_ptr):
break
state._inspect('return', BP_BEFORE, function_address=state.callstack.top.func_addr)
state.callstack.pop()
state._inspect('return', BP_AFTER)
if not state.arch.call_pushes_ret and \
claripy.is_true(state.regs._ip == state.callstack.ret_addr) and \
claripy.is_true(state.regs._sp == state.callstack.stack_ptr):
# very weird edge case that's not actually weird or on the edge at all:
# if we use a link register for the return address, the stack pointer will be the same
# before and after the call. therefore we have to check for equality with the marker
# along with this other check with the instruction pointer to guess whether it's time
# to pop a callframe. Still better than relying on Ijk_Ret.
state._inspect('return', BP_BEFORE, function_address=state.callstack.top.func_addr)
state.callstack.pop()
state._inspect('return', BP_AFTER)