Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _expr(self, expr):
v = super()._expr(expr)
if v not in {None, BOTTOM, TOP} and v is not expr:
# Record the replacement
if type(expr) is pyvex.IRExpr.Get:
if expr.offset not in (self.arch.sp_offset, self.arch.ip_offset, ):
self.state.add_replacement(self._codeloc(),
VEXReg(expr.offset, expr.result_size(self.tyenv) // 8),
v)
return v
return None
if not isinstance(statements[put_stmt_id].data, pyvex.IRExpr.RdTmp):
return None
temps.add(statements[put_stmt_id].data.tmp)
for i in xrange(put_stmt_id, -1, -1):
stmt = statements[i]
if isinstance(stmt, pyvex.IRStmt.WrTmp):
data = None
if stmt.tmp in temps:
data = stmt.data
if isinstance(data, pyvex.IRExpr.RdTmp):
temps.add(data.tmp)
elif isinstance(data, pyvex.IRExpr.Get):
src_stmt_ids.add(i)
temps.remove(stmt.tmp)
return src_stmt_ids
def _fastpath_irexpr(self, expr, temps, regs):
if type(expr) == pyvex.IRExpr.Const:
return translate_irconst(self.state, expr.con)
elif type(expr) == pyvex.IRExpr.RdTmp:
return temps[expr.tmp]
elif type(expr) == pyvex.IRExpr.Get and expr.offset in regs:
return regs[expr.offset]
else:
return None
block_addrs = list(set([ a for a, _ in self.slice.nodes() ]))
for block_addr in block_addrs:
block_str = " IRSB %#x\n" % block_addr
block = self.project.factory.block(block_addr, backup_state=self._base_state).vex
included_stmts = set([ stmt for _, stmt in self.slice.nodes() if _ == block_addr ])
default_exit_included = any(stmt == DEFAULT_STATEMENT for _, stmt in self.slice.nodes() if _ == block_addr)
for i, stmt in enumerate(block.statements):
if arch is not None:
if isinstance(stmt, pyvex.IRStmt.Put):
reg_name = arch.translate_register_name(stmt.offset)
stmt_str = stmt.__str__(reg_name=reg_name)
elif isinstance(stmt, pyvex.IRStmt.WrTmp) and isinstance(stmt.data, pyvex.IRExpr.Get):
reg_name = arch.translate_register_name(stmt.data.offset)
stmt_str = stmt.__str__(reg_name=reg_name)
else:
stmt_str = str(stmt)
else:
stmt_str = str(stmt)
block_str += "%02s %02d | %s\n" % ("+" if i in included_stmts else " ",
i,
stmt_str
)
block_str += " + " if default_exit_included else " "
if isinstance(block.next, pyvex.IRExpr.Const):
block_str += "Next: %#x\n" % block.next.con.value
elif isinstance(block.next, pyvex.IRExpr.RdTmp):
def _resolve_expr(expr):
if type(expr) is pyvex.IRExpr.Binop:
arg0, arg1 = expr.args
if expr.op.startswith('Iop_Add'):
return _resolve_expr(arg0) + _resolve_expr(arg1)
elif expr.op.startswith('Iop_Sub'):
return _resolve_expr(arg0) - _resolve_expr(arg1)
elif type(expr) is pyvex.IRExpr.RdTmp and expr.tmp in tmps and tmps[expr.tmp] is not None:
return tmps[expr.tmp]
elif type(expr) is pyvex.IRExpr.Const:
return Constant(expr.con.value)
elif type(expr) is pyvex.IRExpr.Get:
return state.get(expr.offset)
elif type(expr) is pyvex.IRExpr.Load:
return state.load(_resolve_expr(expr.addr))
else:
raise CouldNotResolveException
try:
while True:
tick()
bb = self._block(addr, skip_stmts=False)
step_forward = False
# the block shouldn't touch any cc_* registers
if self.arch.name in ('X86', 'AMD64', 'ARMEL', 'ARMHF', 'ARMCortexM'):
cc_regs = { self.arch.registers['cc_op'][0], self.arch.registers['cc_ndep'][0],
self.arch.registers['cc_dep1'][0], self.arch.registers['cc_dep2'][0]
}
if any([ isinstance(stmt, pyvex.IRStmt.Put) and stmt.offset in cc_regs
for stmt in bb.statements ]):
step_forward = True
elif any( [ isinstance(stmt, pyvex.IRStmt.WrTmp) and isinstance(stmt.data, pyvex.IRExpr.Get)
and stmt.data.offset in cc_regs for stmt in bb.statements ]):
step_forward = True
if step_forward:
# only steps one instruction forward
addr += instruction_alignment
continue
if block_is_good(bb):
break
if bb.jumpkind == 'Ijk_NoDecode':
addr += instruction_alignment
else:
addr += bb.size
# "push" means try to increase the address as far as we can without regard for semantics
# + 10 | t2 = Add64(0x0000000000571df0,t11)
#
# all_addr_holders will be {(0x4c64c4, 11): (AddressTransferringTypes.SignedExtension, 32, 64,),
# (0x4c64c4, 12); (AddressTransferringTypes.Assignment,),
# }
all_addr_holders = OrderedDict()
while True:
preds = list(b.slice.predecessors(stmt_loc))
if len(preds) != 1:
break
block_addr, stmt_idx = stmt_loc = preds[0]
block = project.factory.block(block_addr, backup_state=self.base_state).vex
stmt = block.statements[stmt_idx]
if isinstance(stmt, (pyvex.IRStmt.WrTmp, pyvex.IRStmt.Put)):
if isinstance(stmt.data, (pyvex.IRExpr.Get, pyvex.IRExpr.RdTmp)):
# data transferring
stmts_to_remove.append(stmt_loc)
if isinstance(stmt, pyvex.IRStmt.WrTmp):
all_addr_holders[(stmt_loc[0], stmt.tmp)] = (AddressTransferringTypes.Assignment,)
continue
elif isinstance(stmt.data, pyvex.IRExpr.ITE):
# data transferring
# t16 = if (t43) ILGop_Ident32(LDle(t29)) else 0x0000c844
# > t44 = ITE(t43,t16,0x0000c844)
stmts_to_remove.append(stmt_loc)
if isinstance(stmt, pyvex.IRStmt.WrTmp):
all_addr_holders[(stmt_loc[0], stmt.tmp)] = (AddressTransferringTypes.Assignment,)
continue
elif isinstance(stmt.data, pyvex.IRExpr.Unop):
if stmt.data.op == 'Iop_32Sto64':
# data transferring with conversion
for tmp_dep in action.tmp_deps:
if tmp_dep in self._temp_variables:
self._data_graph_add_edge(self._temp_variables[tmp_dep], pv)
if self._custom_data_per_statement is not None:
self._temp_register_symbols[tmp] = self._custom_data_per_statement
for data in self._variables_per_statement:
self._data_graph_add_edge(data, pv)
if isinstance(statement, pyvex.IRStmt.WrTmp) and self._variables_per_statement:
if isinstance(statement.data, pyvex.IRExpr.RdTmp):
# assignment: dst_tmp = src_tmp
for s in filter(lambda x: isinstance(x.variable, SimTemporaryVariable) and x.variable.tmp_id != tmp, self._variables_per_statement):
self._ast_graph.add_edge(s, pv)
elif isinstance(statement.data, pyvex.IRExpr.Get):
# assignment: dst_tmp = src_reg
for s in filter(lambda x: isinstance(x.variable, SimRegisterVariable), self._variables_per_statement):
self._ast_graph.add_edge(s, pv)
elif isinstance(statement.data, pyvex.IRExpr.Load):
# assignment: dst_tmp = [ src_mem ]
for s in filter(lambda x: isinstance(x.variable, SimMemoryVariable), self._variables_per_statement):
self._ast_graph.add_edge(s, pv)
if not action.tmp_deps and not self._variables_per_statement and not ast:
# read in a constant
# try to parse out the constant from statement
const_variable = SimConstantVariable()
if statement is not None:
if isinstance(statement, pyvex.IRStmt.Dirty):
l.warning('Dirty statements are not supported in DDG for now.')
elif isinstance(statement.data, pyvex.IRExpr.Const):
return
sp_offset = self.project.arch.sp_offset
initial_sp = 0x7fff0000
last_sp = None
tmps = {}
last_imark = next((stmt for stmt in reversed(irsb.statements)
if isinstance(stmt, pyvex.IRStmt.IMark)
), 0
)
tmp_irsb = self.project.factory.block(last_imark.addr + last_imark.delta).vex
# pylint:disable=too-many-nested-blocks
for stmt in tmp_irsb.statements:
if isinstance(stmt, pyvex.IRStmt.WrTmp):
data = stmt.data
if isinstance(data, pyvex.IRExpr.Get) and data.offset == sp_offset:
# t0 = GET:I32(sp)
tmps[stmt.tmp] = initial_sp
elif isinstance(data, pyvex.IRExpr.Binop):
# only support Add
if data.op == 'Iop_Add32':
arg0, arg1 = data.args
if isinstance(arg0, pyvex.IRExpr.RdTmp) and isinstance(arg1, pyvex.IRExpr.Const):
if arg0.tmp in tmps:
tmps[stmt.tmp] = tmps[arg0.tmp] + arg1.con.value
elif isinstance(data, pyvex.IRExpr.Load):
if isinstance(data.addr, pyvex.IRExpr.RdTmp):
if data.addr.tmp in tmps:
tmps[stmt.tmp] = ('load', tmps[data.addr.tmp])
elif isinstance(stmt, pyvex.IRStmt.Put):
if stmt.offset == sp_offset and isinstance(stmt.data, pyvex.IRExpr.RdTmp):
if stmt.data.tmp in tmps:
return None
if not isinstance(statements[put_stmt_id].data, pyvex.IRExpr.RdTmp):
return None
temps.add(statements[put_stmt_id].data.tmp)
for i in xrange(put_stmt_id, -1, -1):
stmt = statements[i]
if isinstance(stmt, pyvex.IRStmt.WrTmp):
data = None
if stmt.tmp in temps:
data = stmt.data
if isinstance(data, pyvex.IRExpr.RdTmp):
temps.add(data.tmp)
elif isinstance(data, pyvex.IRExpr.Get):
src_stmt_ids.add(i)
temps.remove(stmt.tmp)
return src_stmt_ids