Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
except CouldNotResolveException:
return TOP
def resolve_stmt(stmt):
if type(stmt) is pyvex.IRStmt.WrTmp:
tmps[stmt.tmp] = resolve_expr(stmt.data)
elif self.track_mem and type(stmt) is pyvex.IRStmt.Store:
state.store(resolve_expr(stmt.addr), resolve_expr(stmt.data))
elif type(stmt) is pyvex.IRStmt.Put:
state.put(stmt.offset, resolve_expr(stmt.data))
else:
raise CouldNotResolveException
for stmt in block.vex.statements:
if type(stmt) is pyvex.IRStmt.IMark:
if curr_stmt_start_addr is not None:
# we've reached a new instruction. Time to store the post state
self._set_post_state(curr_stmt_start_addr, state.freeze())
curr_stmt_start_addr = stmt.addr + stmt.delta
self._set_pre_state(curr_stmt_start_addr, state.freeze())
else:
try:
resolve_stmt(stmt)
except CouldNotResolveException:
pass
# stack pointer adjustment
if self.project.arch.sp_offset in self.reg_offsets \
and block.vex.jumpkind == 'Ijk_Call' \
and self.project.arch.call_pushes_ret:
try:
# obtain the CFGNode
cfg_node = self._cfg.get_any_node(self._insn_addr, anyaddr=True)
if cfg_node is None:
# not found
raise KeyError('CFGNode for instruction %#x is not found.' % self._insn_addr)
# determine the statement ID
vex_block = self._project.factory.block(cfg_node.addr,
size=cfg_node.size,
opt_level=self._cfg._iropt_level
).vex
stmt_idx = None
insn_addr = cfg_node.addr
for i, stmt in enumerate(vex_block.statements):
if isinstance(stmt, pyvex.IRStmt.IMark):
insn_addr = stmt.addr + stmt.delta
elif insn_addr == self._insn_addr:
if isinstance(stmt, pyvex.IRStmt.Put) and stmt.offset == reg_offset:
stmt_idx = i
break
elif insn_addr > self._insn_addr:
break
if stmt_idx is None:
raise KeyError('Cannot find the statement.')
# create a program variable
variable = SimRegisterVariable(reg_offset, size)
location = CodeLocation(cfg_node.addr, stmt_idx, ins_addr=self._insn_addr)
pv = ProgramVariable(variable, location, arch=self._project.arch)
mark_addrs.pop()
headcache.add(blockstate.addr)
for addr in mark_addrs:
if addr != funcaddr and addr in self.cfg.function_manager.functions:
l.warning("\tThis function jumps into another function (%#x). Abort.", addr)
raise FidgetAnalysisFailure
cache.add(addr)
insnblock = blockstate.lift(addr, num_inst=1, max_size=400, opt_level=1)
temps = TempStore(insnblock.tyenv)
blockstate.load_tempstore(temps)
blockstate.addr = addr
stmtgen = enumerate(insnblock.statements)
for _, stmt in stmtgen:
if isinstance(stmt, pyvex.IRStmt.IMark): break
for stmt_idx, stmt in stmtgen:
path = ['statements', stmt_idx]
if stmt.tag in ('Ist_NoOp', 'Ist_AbiHint', 'Ist_MBE'):
pass
elif stmt.tag == 'Ist_Exit':
handle_expression(blockstate, stmt.dst, path + ['dst'])
# Let the cfg take care of control flow!
elif stmt.tag == 'Ist_WrTmp':
expression = handle_expression(blockstate, stmt.data, path + ['data'])
blockstate.put_tmp(stmt.tmp, expression)
elif stmt.tag == 'Ist_Store':
expression = handle_expression(blockstate, stmt.data, path + ['data'])
def __init__(self, *args, **kwargs):
self._state = args, kwargs
self._irsb = pyvex.IRSB(*args, **kwargs)
self._addr = next(a.addr for a in self._irsb.statements if isinstance(a, pyvex.IRStmt.IMark))
:rtype: bool
"""
if arch.name == "MIPS32":
if arch.memory_endness == "Iend_BE":
MIPS32_BE_NOOPS = {
b"\x00\x20\x08\x25", # move $at, $at
}
insns = set(block.bytes[i:i+4] for i in range(0, block.size, 4))
if MIPS32_BE_NOOPS.issuperset(insns):
return True
# Fallback
# the block is a noop block if it only has IMark statements
if all((type(stmt) is pyvex.IRStmt.IMark) for stmt in block.vex.statements):
return True
return False
def instruction_addrs(self):
return [ s.addr for s in self._irsb.statements if isinstance(s, pyvex.IRStmt.IMark) ]
target_addr
)
return []
ce = CFGEntry(target_addr, current_function_addr, jumpkind, last_addr=addr, src_node=cfg_node,
src_ins_addr=ins_addr, src_stmt_idx=stmt_idx)
entries.append(ce)
else:
l.debug('(%s) Indirect jump at %#x.', jumpkind, addr)
# Add it to our set. Will process it later if user allows.
# Create an IndirectJump instance
if addr not in self.indirect_jumps:
tmp_statements = irsb.statements if stmt_idx == 'default' else irsb.statements[ : stmt_idx]
ins_addr = next(iter(stmt.addr for stmt in reversed(tmp_statements)
if isinstance(stmt, pyvex.IRStmt.IMark)), None
)
ij = IndirectJump(addr, ins_addr, current_function_addr, jumpkind, stmt_idx, resolved_targets=[ ])
self.indirect_jumps[addr] = ij
else:
ij = self.indirect_jumps[addr]
self._indirect_jumps_to_resolve.add(ij)
if irsb:
# Test it on the initial state. Does it jump to a valid location?
# It will be resolved only if this is a .plt entry
tmp_simirsb = simuvex.SimIRSB(self._initial_state, irsb, addr=addr)
if len(tmp_simirsb.successors) == 1:
tmp_ip = tmp_simirsb.successors[0].ip
if tmp_ip._model_concrete is not tmp_ip:
tmp_addr = tmp_ip._model_concrete.value
# This option makes us only execute the last four instructions
if o.SUPER_FASTPATH in state.options:
imark_counter = 0
for i in range(len(ss) - 1, -1, -1):
if type(ss[i]) is pyvex.IRStmt.IMark:
imark_counter += 1
if imark_counter >= 4:
skip_stmts = max(skip_stmts, i)
break
# set the current basic block address that's being processed
state.scratch.bbl_addr = irsb.addr
for stmt_idx, stmt in enumerate(ss):
if isinstance(stmt, pyvex.IRStmt.IMark):
insn_addrs.append(stmt.addr + stmt.delta)
if stmt_idx < skip_stmts:
l.debug("Skipping statement %d", stmt_idx)
continue
if last_stmt is not None and last_stmt != DEFAULT_STATEMENT and stmt_idx > last_stmt:
l.debug("Truncating statement %d", stmt_idx)
continue
if whitelist is not None and stmt_idx not in whitelist:
l.debug("Blacklisting statement %d", stmt_idx)
continue
try:
state.scratch.stmt_idx = stmt_idx
state._inspect('statement', BP_BEFORE, statement=stmt_idx)
cont = self._handle_statement(state, successors, stmt)
def _process_Stmt(self, whitelist=None):
if whitelist is not None:
# optimize whitelist lookups
whitelist = set(whitelist)
for stmt_idx, stmt in enumerate(self.block.vex.statements):
if whitelist is not None and stmt_idx not in whitelist:
continue
self.stmt_idx = stmt_idx
if type(stmt) is pyvex.IRStmt.IMark:
# Note that we cannot skip IMarks as they are used later to trigger observation events
# The bug caused by skipping IMarks is reported at https://github.com/angr/angr/pull/1150
self.ins_addr = stmt.addr + stmt.delta
self._handle_Stmt(stmt)
if self.block.vex.jumpkind == 'Ijk_Call':
handler = '_handle_function'
if hasattr(self, handler):
getattr(self, handler)(self._expr(self.block.vex.next))
else:
self.l.warning('Function handler not implemented.')
except CouldNotResolveException:
return TOP
def resolve_stmt(stmt):
if type(stmt) is pyvex.IRStmt.WrTmp:
tmps[stmt.tmp] = resolve_expr(stmt.data)
elif type(stmt) is pyvex.IRStmt.Store:
state.store(resolve_expr(stmt.addr), resolve_expr(stmt.data))
elif type(stmt) is pyvex.IRStmt.Put:
state.put(stmt.offset, resolve_expr(stmt.data))
else:
raise CouldNotResolveException
for stmt in block.vex.statements:
if type(stmt) is pyvex.IRStmt.IMark:
if curr_stmt_start_addr is not None:
# we've reached a new instruction. Time to store the post state
self._set_post_state(curr_stmt_start_addr, state.freeze())
curr_stmt_start_addr = stmt.addr
self._set_pre_state(curr_stmt_start_addr, state.freeze())
else:
try:
resolve_stmt(stmt)
except CouldNotResolveException:
pass
# stack pointer adjustment
if self.project.arch.sp_offset in self.regOffsets \
and block.vex.jumpkind == 'Ijk_Call' \
and self.project.arch.call_pushes_ret:
try: