Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self, nBufferLength, lpBuffer):
try:
length = self.state.solver.eval_one(nBufferLength)
except angr.errors.SimValueError:
raise angr.errors.SimProcedureError("Can't handle symbolic nBufferLength in GetTempPath")
copy_len = min(self.RESULT.length//8, length - 1)
self.state.memory.store(lpBuffer, self.RESULT[self.RESULT.length - 1 : self.RESULT.length - copy_len*8].concat(claripy.BVV(0, 8)))
return self.RESULT.length // 8
def solv_concrete_engine_linux_x64(p, state):
#print("[1]Executing binary concretely until address: " + hex(BINARY_DECISION_ADDRESS))
new_concrete_state = execute_concretly(p, state, BINARY_DECISION_ADDRESS, [])
arg0 = claripy.BVS('arg0', 8*32)
symbolic_buffer_address = new_concrete_state.regs.rbp-0xc0
new_concrete_state.memory.store(symbolic_buffer_address, arg0)
# symbolic exploration
simgr = p.factory.simgr(new_concrete_state)
#print("[2]Symbolically executing binary to find dropping of second stage [ address: " + hex(DROP_STAGE2_V2) + " ]")
exploration = simgr.explore(find=DROP_STAGE2_V2, avoid=[DROP_STAGE2_V1, VENV_DETECTED, FAKE_CC])
if not exploration.stashes['found'] and exploration.errored and type(exploration.errored[0].error) is angr.errors.SimIRSBNoDecodeError:
raise nose.SkipTest()
new_symbolic_state = exploration.stashes['found'][0]
#print("[3]Executing BINARY concretely with solution found until the end " + hex(BINARY_EXECUTION_END))
execute_concretly(p, new_symbolic_state, BINARY_EXECUTION_END, [(symbolic_buffer_address, arg0)])
binary_configuration = new_symbolic_state.solver.eval(arg0, cast_to=int)
#print("[4]BINARY execution ends, the configuration to reach your BB is: " + hex(binary_configuration))
correct_solution = 0xa00000006000000f6ffffff0000000000000000000000000000000000000000
nose.tools.assert_true(binary_configuration == correct_solution)
def solv_concrete_engine_linux_arm(p, entry_state):
new_concrete_state = execute_concretly(p, entry_state, BINARY_DECISION_ADDRESS, [])
arg0 = claripy.BVS('arg0', 5 * 32)
symbolic_buffer_address = new_concrete_state.regs.r3
new_concrete_state.memory.store(symbolic_buffer_address, arg0)
simgr = p.factory.simgr(new_concrete_state)
#print("Symbolically executing BINARY to find dropping of second stage [ address: " + hex(DROP_STAGE2_V1) + " ]")
exploration = simgr.explore(find=DROP_STAGE2_V2, avoid=[DROP_STAGE2_V1, VENV_DETECTED, FAKE_CC])
if not exploration.stashes['found'] and exploration.errored and type(exploration.errored[0].error) is angr.errors.SimIRSBNoDecodeError:
raise nose.SkipTest()
new_symbolic_state = exploration.stashes['found'][0]
#print("Executing BINARY concretely with solution found until the end " + hex(BINARY_EXECUTION_END))
execute_concretly(p, new_symbolic_state, BINARY_EXECUTION_END, [(symbolic_buffer_address, arg0)], [])
bbl_max_bytes = y2 - y1
if bbl_max_bytes <= 0:
bbl_max_bytes = 800
# detect back loops
# this might still break for huge basic blocks with back loops
# but it seems unlikely
try:
bl = project.factory.block(self.trace[self.bb_cnt-1],
backup_state=current)
back_targets = set(bl.vex.constant_jump_targets) & set(bl.instruction_addrs)
if self.bb_cnt < len(self.trace) and self.trace[self.bb_cnt] in back_targets:
target_to_jumpkind = bl.vex.constant_jump_targets_and_jumpkinds
if target_to_jumpkind[self.trace[self.bb_cnt]] == "Ijk_Boring":
bbl_max_bytes = 800
except (angr.errors.SimMemoryError, angr.errors.SimEngineError):
bbl_max_bytes = 800
# if we're not in crash mode we don't care about the history
if self.trim_history and not self.crash_mode:
current.history.trim()
self.simgr.step(size=bbl_max_bytes)
if self.crash_type == EXEC_STACK:
self.simgr.stash(from_stash='active', to_stash='crashed')
return self.simgr
# if our input was preconstrained we have to keep on the lookout
# for unsat paths
if self.preconstrain_input:
self.simgr.stash(from_stash='unsat', to_stash='active')
def run(self, addr, length, prot): #pylint:disable=arguments-differ,unused-argument
try:
addr = self.state.solver.eval_one(addr)
except angr.errors.SimValueError:
raise angr.errors.SimValueError("mprotect can't handle symbolic addr")
try:
length = self.state.solver.eval_one(length)
except angr.errors.SimValueError:
raise angr.errors.SimValueError("mprotect can't handle symbolic length")
try:
prot = self.state.solver.eval_one(prot)
except angr.errors.SimValueError:
raise angr.errors.SimValueError("mprotect can't handle symbolic prot")
l.debug('mprotect(%#x, %#x, %#x) = ...', addr, length, prot)
if addr & 0xfff != 0:
l.debug('... = -1 (not aligned)')
return -1
page_end = ((addr + length - 1) & ~0xfff) + 0x1000
try:
for page in range(addr, page_end, 0x1000):
if flags & 0x00002000 or addr == 0: # MEM_RESERVE
if addr == 0:
l.debug("...searching for address")
while True:
addr = self.allocate_memory(size)
try:
self.state.memory.map_region(addr, size, angr_prot, init_zero=True)
except angr.errors.SimMemoryError:
continue
else:
l.debug("...found %#x", addr)
break
else:
try:
self.state.memory.map_region(addr, size, angr_prot, init_zero=True)
except angr.errors.SimMemoryError:
l.debug("...failed, bad address")
return 0
if flags & 0x00001000: # MEM_COMMIT
# we don't really emulate commit. we just check to see if the region was allocated.
try:
self.state.memory.permissions(addr)
except angr.errors.SimMemoryError:
l.debug("...not reserved")
return 0
# if we got all the way to the end, nothing failed! success!
return addr
def process_successors(self,
successors,
irsb=None,
insn_text=None,
insn_bytes=None,
thumb=False,
size=None,
num_inst=None,
extra_stop_points=None,
**kwargs):
if not pyvex.lifting.lifters[self.state.arch.name] or type(successors.addr) is not int:
return super().process_successors(successors, extra_stop_points=extra_stop_points, num_inst=num_inst, size=size, insn_text=insn_text, insn_bytes=insn_bytes, **kwargs)
if insn_text is not None:
if insn_bytes is not None:
raise errors.SimEngineError("You cannot provide both 'insn_bytes' and 'insn_text'!")
insn_bytes = self.project.arch.asm(insn_text, addr=successors.addr, thumb=thumb)
if insn_bytes is None:
raise errors.AngrAssemblyError("Assembling failed. Please make sure keystone is installed, and the assembly"
" string is correct.")
successors.sort = 'IRSB'
successors.description = 'IRSB'
self.state.history.recent_block_count = 1
self.state.scratch.guard = claripy.true
self.state.scratch.sim_procedure = None
addr = successors.addr
self.state.scratch.bbl_addr = addr
while True:
if irsb is None:
print '[+] resolved the uninit with concrete page'
else:
print( '[!] failed to resolve the uninit memory')
if self.pause_on_failed_memory_resolving:
import IPython; IPython.embed()
#import IPython; IPython.embed()
if self.pause_on_finish_memory_loading:
raw_input('do the read now(continue) <-')
else:
print 'Memory Content does not appear uninitialized.'
except AttributeError as e:
print e.args, e.message
print('wtf track reads')
import IPython; IPython.embed()
pass
except angr.errors.SimValueError as e:
print('wtf simvalueerror')
import IPython; IPython.embed()
else:
import IPython; IPython.embed()
return
writable = False
except Exception, e:
writable = False
# -------------------------------------------------------------------------
if writable == False:
warn("RSVP concretized but it has an invalid address '0x%x'" % con_addr)
# return False
# give it a second chance
self.__alloc_un(state, STR2BV[addr])
con_addr = state.se.eval(STR2BV[addr])
except angr.errors.SimUnsatError: # un-satisfiable constraints
dbg_prnt(DBG_LVL_2, "Reservation was un-satisfiable. Discard current path.")
print 'SSSSS', self.__state.se.constraints
return False # reservation failed
except Exception, e:
dbg_prnt(DBG_LVL_2, "Unknown Exception '%s'. Discard current path." % str(e))
return False # reservation failed
# if this address has already been written in the past, any writes will
# be overwritten, so discard current path
#if con_addr in self.__mem or con_addr in self.__imm or (con_addr + 7) in self.__imm:
if con_addr in self.__imm or (con_addr + 7) in self.__imm:
dbg_prnt(DBG_LVL_2, "RSVP 0x%x has already been written or it's immutable. "
"Discard current path." % con_addr)
def io_file_data_for_arch(arch):
"""
A wrapper to get the _IO_FILE data for an architecture
"""
if arch.name not in _IO_FILE:
raise angr.errors.SimProcedureError("missing _IO_FILE offsets for arch: %s" % arch.name)
return _IO_FILE[arch.name]