Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _handle_irsb_fastpath(self):
temps = { }
regs = { }
guard = self.state.se.true
for stmt in self.irsb.statements():
if type(stmt) == pyvex.IRStmt.IMark:
self.last_imark = stmt
elif type(stmt) == pyvex.IRStmt.Exit:
l.debug("%s adding conditional exit", self)
e = SimExit(expr=self.state.BVV(stmt.offsIP, self.state.arch.bits), guard=guard, state=self.state, source=self.state.BVV(self.last_imark.addr, self.state.arch.bits), jumpkind=self.irsb.jumpkind, simplify=False)
self.conditional_exits.append(e)
self.add_exits(e)
if self.irsb.jumpkind == 'Ijk_Call' and o.DO_RET_EMULATION in self.state.options:
self.postcall_exit = SimExit(expr=self.state.BVV(self.last_imark.addr+self.last_imark.len, self.state.arch.bits), guard=guard, state=self.state, source=self.state.BVV(self.last_imark.addr, self.state.arch.bits), jumpkind='Ijk_Ret', simplify=False)
self.add_exits(self.postcall_exit)
elif type(stmt) == pyvex.IRStmt.WrTmp:
temps[stmt.tmp] = self._fastpath_irexpr(stmt.data, temps, regs)
elif type(stmt) == pyvex.IRStmt.Put:
regs[stmt.offset] = self._fastpath_irexpr(stmt.data, temps, regs)
else:
continue
next_expr = self._fastpath_irexpr(self.irsb.next, temps, regs)
self.xref_bb(bb,new_bb)
bb.fb.bqueue_append(new_bb) #다음 명령어부터 타 블록으로 간주
return
if irsb.stmts_used == 1: #JMP 명령어만 존재하는 경우
new_fb = self.new_fb(Function_block(int(str(irsb.next),16)))
self.xref_fb(bb.fb,new_fb)
self.fqueue_append(new_fb)
return
new_bb = self.new_bb(Branch_block(bb.fb,(bb.count + 1 ),int(str(irsb.next),16)))
self.xref_bb(bb,new_bb)
bb.fb.bqueue_append(new_bb) # 일반적인 JUMP일 경우
if isinstance(irsb.statements[len(irsb.statements)-1],pyvex.IRStmt.Exit): # 조건 점프일경우
insert_addr = irsb.statements[len(irsb.statements)-1].dst
if type(insert_addr) is pyvex.IRExpr.Const: # pylint: disable=unidiomatic-typecheck
target_addr = insert_addr.con.value
elif type(insert_addr) in (pyvex.IRConst.U32, pyvex.IRConst.U64): # pylint: disable=unidiomatic-typecheck
target_addr = insert_addr.value
elif type(insert_addr) in (int, long): # pylint: disable=unidiomatic-typecheck
target_addr = insert_addr
else:
target_addr = None
import pdb
pdb.set_trace()
new_bb = self.new_bb(Branch_block(bb.fb,(bb.count + 2),target_addr))
self.xref_bb(bb,new_bb)
bb.fb.bqueue_append(new_bb)
# Let's try to create the pyvex IRSB directly, since it's much faster
try:
irsb = self.project.factory.block(addr).vex
# Log the size of this basic block
self._block_size[addr] = irsb.size
# Occupy the block
self._seg_list.occupy(addr, irsb.size)
except (SimEngineError, SimMemoryError):
return
# Get all possible successors
next, jumpkind = irsb.next, irsb.jumpkind
successors = [ (i.dst, i.jumpkind) for i in irsb.statements if type(i) is pyvex.IRStmt.Exit]
successors.append((next, jumpkind))
# Process each successor
for suc in successors:
target, jumpkind = suc
if type(target) is pyvex.IRExpr.Const:
next_addr = target.con.value
else:
next_addr = None
if jumpkind == 'Ijk_Boring' and next_addr is not None:
remaining_exits.add((current_function_addr, next_addr,
addr, None))
elif jumpkind == 'Ijk_Call' and next_addr is not None:
l.debug("... whitelist says skip it!")
continue
elif self.whitelist is not None:
l.debug("... whitelist says analyze it!")
# process it!
self.state._inspect('statement', BP_BEFORE, statement=stmt_idx)
s_stmt = translate_stmt(self.irsb, stmt_idx, self.last_imark, self.state)
if s_stmt is not None:
self.state.log.extend_actions(s_stmt.actions)
self.statements.append(s_stmt)
self.state._inspect('statement', BP_AFTER)
# for the exits, put *not* taking the exit on the list of constraints so
# that we can continue on. Otherwise, add the constraints
if type(stmt) == pyvex.IRStmt.Exit:
l.debug("%s adding conditional exit", self)
exit_ins_addr = self.state.scratch.last_ins_addr if self.state.arch.branch_delay_slot else \
self.state.scratch.ins_addr
e = self.add_successor(self.state.copy(), s_stmt.target, s_stmt.guard, s_stmt.jumpkind,
exit_stmt_idx=stmt_idx, exit_ins_addr=exit_ins_addr)
self.conditional_exits.append(e)
self.state.add_constraints(self.state.se.Not(s_stmt.guard))
self.default_exit_guard = self.state.se.And(self.default_exit_guard, self.state.se.Not(s_stmt.guard))
if o.SINGLE_EXIT in self.state.options and e.satisfiable():
l.debug("%s returning after taken exit due to SINGLE_EXIT option.", self)
return
if self.last_stmt is None:
self.has_default_exit = True
# 18 | ------ IMark(0x401688, 4, 0) ------
# 19 | if (t0) goto {Ijk_Boring} 0x401684
# 20 | PUT(128) = 0x0040168c
# 21 | t4 = GET:I32(128)
# NEXT: PUT(128) = t4; Ijk_Boring
#
stmts = block.statements
tmp_exit = None
exit_stmt_idx = None
dst = None
for i, stmt in reversed(list(enumerate(stmts))):
if tmp_exit is None:
# Looking for the Exit statement
if isinstance(stmt, pyvex.IRStmt.Exit) and \
isinstance(stmt.guard, pyvex.IRExpr.RdTmp):
tmp_exit = stmt.guard.tmp
dst = stmt.dst
exit_stmt_idx = i
else:
# Looking for the WrTmp statement
if isinstance(stmt, pyvex.IRStmt.WrTmp) and \
stmt.tmp == tmp_exit:
if isinstance(stmt.data, pyvex.IRExpr.Binop) and \
stmt.data.op == 'Iop_CmpEQ32' and \
isinstance(stmt.data.child_expressions[0], pyvex.IRExpr.Const) and \
isinstance(stmt.data.child_expressions[1], pyvex.IRExpr.Const) and \
stmt.data.child_expressions[0].con.value == stmt.data.child_expressions[1].con.value:
# Create a new IRConst
irconst = pyvex.IRExpr.Const() # XXX: THIS IS BROKEN FIX THIS VERY SOON
# 18 | ------ IMark(0x401688, 4, 0) ------
# 19 | if (t0) goto {Ijk_Boring} 0x401684
# 20 | PUT(128) = 0x0040168c
# 21 | t4 = GET:I32(128)
# NEXT: PUT(128) = t4; Ijk_Boring
#
stmts = block.statements
tmp_exit = None
exit_stmt_idx = None
dst = None
for i, stmt in reversed(list(enumerate(stmts))):
if tmp_exit is None:
# Looking for the Exit statement
if isinstance(stmt, pyvex.IRStmt.Exit) and \
isinstance(stmt.guard, pyvex.IRExpr.RdTmp):
tmp_exit = stmt.guard.tmp
dst = stmt.dst
exit_stmt_idx = i
else:
# Looking for the WrTmp statement
if isinstance(stmt, pyvex.IRStmt.WrTmp) and \
stmt.tmp == tmp_exit:
if isinstance(stmt.data, pyvex.IRExpr.Binop) and \
stmt.data.op == 'Iop_CmpEQ32' and \
isinstance(stmt.data.child_expressions[0], pyvex.IRExpr.Const) and \
isinstance(stmt.data.child_expressions[1], pyvex.IRExpr.Const) and \
stmt.data.child_expressions[0].con.value == stmt.data.child_expressions[
1].con.value:
# Create a new IRConst
# IRSB is only used once per CFGNode. We should be able to clean up the CFGNode here in order to save memory
cfg_node.irsb = None
self._process_block_arch_specific(addr, irsb, current_function_addr)
# Scan the basic block to collect data references
if self._collect_data_ref:
self._collect_data_references(irsb, addr)
# Get all possible successors
irsb_next, jumpkind = irsb.next, irsb.jumpkind
successors = [ ]
ins_addr = addr
for i, stmt in enumerate(irsb.statements):
if isinstance(stmt, pyvex.IRStmt.Exit):
successors.append((i, ins_addr, stmt.dst, stmt.jumpkind))
elif isinstance(stmt, pyvex.IRStmt.IMark):
ins_addr = stmt.addr + stmt.delta
successors.append(('default', ins_addr, irsb_next, jumpkind))
entries = [ ]
successors = self._post_process_successors(addr, successors)
# Process each successor
for suc in successors:
stmt_idx, ins_addr, target, jumpkind = suc
entries += self._create_entries(target, jumpkind, current_function_addr, irsb, addr, cfg_node, ins_addr, stmt_idx)
constants = irsb.constants
jump_targets = list(irsb.constant_jump_targets)
for constant in constants:
constant = int(str(constant),16)
if irsb.direct_next is True:
if constant == int(str(irsb.next),16): #next 인경우
continue
if constant in jump_targets: #jump target 인경우
continue
if constant == (bb.addr + irsb.size): #next block
continue
if isinstance(irsb.statements[len(irsb.statements)-1],pyvex.IRStmt.Exit): # 조건 점프일경우
insert_addr = irsb.statements[len(irsb.statements)-1].dst
if type(insert_addr) is pyvex.IRExpr.Const: # pylint: disable=unidiomatic-typecheck
target_addr = insert_addr.con.value
elif type(insert_addr) in (pyvex.IRConst.U32, pyvex.IRConst.U64): # pylint: disable=unidiomatic-typecheck
target_addr = insert_addr.value
elif type(insert_addr) in (int, long): # pylint: disable=unidiomatic-typecheck
target_addr = insert_addr
else:
target_addr = None
import pdb
pdb.set_trace()
if constant == target_addr:
continue
try:
if self.main_section == self._manager._header.is_section(constant).Name: # 간접 Address Functio Block
temps = set()
regs = regs.copy()
stmts = self._get_irsb(run).statements
if exit_stmt_idx is None or exit_stmt_idx == DEFAULT_STATEMENT:
# Initialize the temps set with whatever in the `next` attribute of this irsb
next_expr = self._get_irsb(run).next
if type(next_expr) is pyvex.IRExpr.RdTmp:
temps.add(next_expr.tmp)
# if there are conditional exits, we *always* add them into the slice (so if they should not be taken, we do not
# lose the condition)
for stmt_idx_, s_ in enumerate(self._get_irsb(run).statements):
if not type(s_) is pyvex.IRStmt.Exit:
continue
if s_.jumpkind != 'Ijk_Boring':
continue
if type(s_.guard) is pyvex.IRExpr.RdTmp:
temps.add(s_.guard.tmp)
# Put it in our slice
irsb_addr = self._get_addr(run)
self._inslice_callback(stmt_idx_, s_, {'irsb_addr': irsb_addr, 'prev': prev})
prev = (irsb_addr, stmt_idx_)
infodict = {'irsb_addr' : self._get_addr(run),
'prev' : prev,
'has_statement': False
}