Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
l.info("CFG generated in %f seconds.", duration)
dfg = proj.analyses.DFG(cfg=cfg, fail_fast=True)
nose.tools.assert_true(len(dfg.dfgs) <= len(cfg.nodes()))
for addr, d in dfg.dfgs.items():
nose.tools.assert_true(cfg.get_any_node(addr) is not None)
# We check there is not node that we ignored
for n in d.nodes():
nose.tools.assert_not_equal(n.tag, 'Ist_IMark')
nose.tools.assert_not_equal(n.tag, 'Ist_AbiHint')
nose.tools.assert_not_equal(n.tag, 'Ist_Exit')
if n.tag == 'Ist_Put':
nose.tools.assert_not_equal(n.offset, proj.arch.ip_offset)
for (a, b) in d.edges():
if isinstance(a, pyvex.IRExpr.IRExpr):
# We check that there is no edge between two expressions/const
nose.tools.assert_false(isinstance(b, pyvex.IRExpr.IRExpr))
# If there is an edge coming from an expr/const it should be in
# the dependencies of the other node
# FIXME
# Impossible to check because of the Unop optimization in the
# DFG...
# nose.tools.assert_true(a in b.expressions)
elif hasattr(a, 'tmp'):
# If there is an edge between a tmp and another node
# be sure that this tmp is in the dependencies of this node
tmps = [ ]
for e in b.expressions:
if hasattr(e, 'tmp'):
tmps.append(e.tmp)
# mem_var_node is not found
continue
# get a sub graph
subgraph = ddg.data_sub_graph(mem_var_node,
simplified=False,
killing_edges=False,
excluding_types={'mem_addr'},
)
# is it used as a memory address anywhere?
# TODO:
# is it used as a jump target?
next_tmp = None
if isinstance(candidate_irsb.irsb.next, pyvex.IRExpr.RdTmp):
next_tmp = candidate_irsb.irsb.next.tmp
if next_tmp is not None:
next_tmp_node = next((node for node in subgraph.nodes()
if isinstance(node.variable, SimTemporaryVariable) and
node.variable.tmp_id == next_tmp),
None
)
if next_tmp_node is not None:
# ouch it's used as a jump target
return True
return False
return
if irsb.stmts_used == 1: #JMP 명령어만 존재하는 경우
new_fb = self.new_fb(Function_block(int(str(irsb.next),16)))
self.xref_fb(bb.fb,new_fb)
self.fqueue_append(new_fb)
return
new_bb = self.new_bb(Branch_block(bb.fb,(bb.count + 1 ),int(str(irsb.next),16)))
self.xref_bb(bb,new_bb)
bb.fb.bqueue_append(new_bb) # 일반적인 JUMP일 경우
if isinstance(irsb.statements[len(irsb.statements)-1],pyvex.IRStmt.Exit): # 조건 점프일경우
insert_addr = irsb.statements[len(irsb.statements)-1].dst
if type(insert_addr) is pyvex.IRExpr.Const: # pylint: disable=unidiomatic-typecheck
target_addr = insert_addr.con.value
elif type(insert_addr) in (pyvex.IRConst.U32, pyvex.IRConst.U64): # pylint: disable=unidiomatic-typecheck
target_addr = insert_addr.value
elif type(insert_addr) in (int, long): # pylint: disable=unidiomatic-typecheck
target_addr = insert_addr
else:
target_addr = None
import pdb
pdb.set_trace()
new_bb = self.new_bb(Branch_block(bb.fb,(bb.count + 2),target_addr))
self.xref_bb(bb,new_bb)
bb.fb.bqueue_append(new_bb)
# third pass. for each statement, collect all constants that are referenced or used.
instr_addr = None
next_instr_addr = None
for stmt_idx, stmt in enumerate(irsb.statements):
if type(stmt) is pyvex.IRStmt.IMark: # pylint: disable=unidiomatic-typecheck
instr_addr = instr_addrs[0]
instr_addrs = instr_addrs[1 : ]
next_instr_addr = instr_addrs[0] if instr_addrs else None
elif type(stmt) is pyvex.IRStmt.WrTmp: # pylint: disable=unidiomatic-typecheck
if type(stmt.data) is pyvex.IRExpr.Load: # pylint: disable=unidiomatic-typecheck
# load
# e.g. t7 = LDle:I64(0x0000000000600ff8)
_process(irsb, stmt, stmt_idx, stmt.data.addr, instr_addr, next_instr_addr)
elif type(stmt.data) in (pyvex.IRExpr.Binop, ): # pylint: disable=unidiomatic-typecheck
# binary operation
for arg in stmt.data.args:
_process(irsb, stmt, stmt_idx, arg, instr_addr, next_instr_addr)
elif type(stmt.data) is pyvex.IRExpr.Const: # pylint: disable=unidiomatic-typecheck
_process(irsb, stmt, stmt_idx, stmt.data, instr_addr, next_instr_addr)
elif type(stmt) is pyvex.IRStmt.Put: # pylint: disable=unidiomatic-typecheck
# put
# e.g. PUT(rdi) = 0x0000000000400714
if stmt.offset not in (self._initial_state.arch.ip_offset, ):
_process(irsb, stmt, stmt_idx, stmt.data, instr_addr, next_instr_addr)
elif type(stmt) is pyvex.IRStmt.Store: # pylint: disable=unidiomatic-typecheck
# store addr
_process(irsb, stmt, stmt_idx, stmt.addr, instr_addr, next_instr_addr)
while True:
preds = list(b.slice.predecessors(stmt_loc))
if len(preds) != 1:
break
block_addr, stmt_idx = stmt_loc = preds[0]
block = project.factory.block(block_addr, backup_state=self.base_state).vex
stmt = block.statements[stmt_idx]
if isinstance(stmt, (pyvex.IRStmt.WrTmp, pyvex.IRStmt.Put)):
if isinstance(stmt.data, (pyvex.IRExpr.Get, pyvex.IRExpr.RdTmp)):
# data transferring
stmts_to_remove.append(stmt_loc)
if isinstance(stmt, pyvex.IRStmt.WrTmp):
all_addr_holders[(stmt_loc[0], stmt.tmp)] = (AddressTransferringTypes.Assignment,)
continue
elif isinstance(stmt.data, pyvex.IRExpr.ITE):
# data transferring
# t16 = if (t43) ILGop_Ident32(LDle(t29)) else 0x0000c844
# > t44 = ITE(t43,t16,0x0000c844)
stmts_to_remove.append(stmt_loc)
if isinstance(stmt, pyvex.IRStmt.WrTmp):
all_addr_holders[(stmt_loc[0], stmt.tmp)] = (AddressTransferringTypes.Assignment,)
continue
elif isinstance(stmt.data, pyvex.IRExpr.Unop):
if stmt.data.op == 'Iop_32Sto64':
# data transferring with conversion
# t11 = 32Sto64(t12)
stmts_to_remove.append(stmt_loc)
if isinstance(stmt, pyvex.IRStmt.WrTmp):
all_addr_holders[(stmt_loc[0], stmt.tmp)] = (AddressTransferringTypes.SignedExtension,
32, 64)
continue
if isinstance(stmt, (pyvex.IRStmt.WrTmp, pyvex.IRStmt.Put)):
if isinstance(stmt.data, (pyvex.IRExpr.Get, pyvex.IRExpr.RdTmp)):
# data transferring
stmts_to_remove.append(stmt_loc)
if isinstance(stmt, pyvex.IRStmt.WrTmp):
all_addr_holders[(stmt_loc[0], stmt.tmp)] = (AddressTransferringTypes.Assignment,)
continue
elif isinstance(stmt.data, pyvex.IRExpr.ITE):
# data transferring
# t16 = if (t43) ILGop_Ident32(LDle(t29)) else 0x0000c844
# > t44 = ITE(t43,t16,0x0000c844)
stmts_to_remove.append(stmt_loc)
if isinstance(stmt, pyvex.IRStmt.WrTmp):
all_addr_holders[(stmt_loc[0], stmt.tmp)] = (AddressTransferringTypes.Assignment,)
continue
elif isinstance(stmt.data, pyvex.IRExpr.Unop):
if stmt.data.op == 'Iop_32Sto64':
# data transferring with conversion
# t11 = 32Sto64(t12)
stmts_to_remove.append(stmt_loc)
if isinstance(stmt, pyvex.IRStmt.WrTmp):
all_addr_holders[(stmt_loc[0], stmt.tmp)] = (AddressTransferringTypes.SignedExtension,
32, 64)
continue
elif stmt.data.op == 'Iop_64to32':
# data transferring with conversion
# t24 = 64to32(t21)
stmts_to_remove.append(stmt_loc)
if isinstance(stmt, pyvex.IRStmt.WrTmp):
all_addr_holders[(stmt_loc[0], stmt.tmp)] = (AddressTransferringTypes.Truncation,
64, 32)
continue
stmt_loc = (addr, 'default')
if stmt_loc not in b.slice:
return False, None
load_stmt_loc, load_stmt = None, None
past_stmts = [ stmt_loc ]
while True:
preds = b.slice.predecessors(stmt_loc)
if len(preds) != 1:
return False, None
block_addr, stmt_idx = preds[0]
block = self.project.factory.block(block_addr).vex
stmt = block.statements[stmt_idx]
if isinstance(stmt, pyvex.IRStmt.WrTmp) or isinstance(stmt, pyvex.IRStmt.Put):
if isinstance(stmt.data, pyvex.IRExpr.Get) or isinstance(stmt.data, pyvex.IRExpr.RdTmp):
# data transferring
past_stmts.append(stmt_loc)
stmt_loc = (block_addr, stmt_idx)
continue
elif isinstance(stmt.data, pyvex.IRExpr.Load):
# Got it!
stmt_loc = (block_addr, stmt_idx)
load_stmt, load_stmt_loc = stmt, stmt_loc
past_stmts.append(stmt_loc)
break
if load_stmt_loc is None:
# the load statement is not found
return False, None
# skip all statements before the load statement