Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import logging
l = logging.getLogger("archr.arsenal.datascout")
from ..errors import ArchrError
from . import Bow
# Keystone engine 0.9.2 (incorrectly) defaults to radix 16. so we'd better off only using 0x-prefixed integers from now.
# See the related PR: https://github.com/keystone-engine/keystone/pull/382
# and the related issue: https://github.com/keystone-engine/keystone/issues/436
class DataScoutBow(Bow):
"""
Grabs the environment and auxiliary vector from the target.
"""
REQUIRED_ARROW = "shellphish_qemu"
def _encode_bytes(self, s):
def _encode_name(bits):
w = bits // 8 # word size
n = ["0"] + [s[i:i + w].ljust(w, "\0")[::-1].encode('utf-8').hex() for i in range(0, len(s), w)][::-1]
return n
if self.target.target_arch == 'x86_64':
encoded_name = _encode_name(64)
return "".join("mov rax, 0x%s; push rax; " % word for word in encoded_name)
import logging
import time
l = logging.getLogger("archr.arsenal.input_fd")
from . import Bow
class InputFDBow(Bow):
"""
Figures out what file descriptor the target uses to read input.
"""
def fire(self, **kwargs): #pylint:disable=arguments-differ
with STraceBow(self.target).fire_context(trace_args=["-f"]) as flight:
time.sleep(0.1)
flight.default_channel.write(b'aRcHr'*0x1000)
flight.default_channel.recv_until(b'aRcHr')
flight.default_channel.close()
strace = flight.result
archr_lines = [ line for line in strace.splitlines() if b"aRcHr" in line ]
archr_read = [ line for line in archr_lines if line.startswith(b"read") or line.startswith(b"recv") ]
fd = archr_read[0].split()[0].split(b"(")[1].split(b",")[0]
return int(fd)
import logging
l = logging.getLogger("archr.arsenal.memory_map")
from . import Bow
class MemoryMapBow(Bow):
"""
Gets a memory map of the target.
"""
def fire(self, aslr=False): #pylint:disable=arguments-differ
ldd_map_str,_ = self.target.run_command([ "ldd", self.target.target_path ], aslr=aslr).communicate()
lib_addrs = parse_ldd(ldd_map_str)
mapped_addrs,_ = self.target.run_command([ "cat", "/proc/self/maps" ], aslr=aslr).communicate()
lib_addrs['[stack-end]'] = int(next(m for m in mapped_addrs.splitlines() if m.endswith(b'[stack]')).split(b'-')[1].split(b' ')[0], 16)
lib_addrs.update({
v.decode('utf-8'): int(next(m for m in mapped_addrs.splitlines() if m.endswith(v)).split(b'-')[0], 16)
for v in [ b"[heap]", b"[vvar]", b"[vdso]", b"[vsyscall]" ]
if v in mapped_addrs
})
import socket
import nclib
from . import Bow
class NetCatBow(Bow):
"""
Returns an nclib instance connected to a running instance of the target.
"""
def fire(self, run=True, stderr=True, **kwargs): #pylint:disable=arguments-differ
"""
Returns a tube connected to the process.
:param bool stderr: If the target is a console app, whether to include stderr.
:param bool run: Start the target (and pass kwargs along).
:param kwargs: kwargs to pass through to run_command
:returns: an nclib.NetCat
"""
if self.target.tcp_ports:
if run:
self.target.run_command(**kwargs)
class SimArchrMount(angr.state_plugins.filesystem.SimConcreteFilesystem):
def __init__(self, target=None, **kwargs):
super().__init__(**kwargs)
self.target = target
@angr.SimStatePlugin.memo
def copy(self, memo):
o = super().copy(memo)
o.target = self.target
return o
def _load_file(self, guest_path):
content = self.target.retrieve_contents(guest_path)
return angr.SimFile(name='file://' + guest_path, content=content, size=len(content))
class angrStateBow(Bow):
"""
Constructs an angr state (full init variety) to match the target precisely
"""
def __init__(self, target, project_bow):
super(angrStateBow, self).__init__(target)
self.project_bow = project_bow
def fire(self, **kwargs): #pylint:disable=arguments-differ
project = self.project_bow.fire()
if 'cwd' not in kwargs:
cwd = os.path.dirname(self.project_bow.target.target_path)
kwargs['cwd'] = bytes(cwd, 'utf-8')
s = project.factory.full_init_state(
concrete_fs=True, chroot="/ARCHR-INVALID",
import tempfile
import logging
import angr
import cle
import os
l = logging.getLogger("archr.arsenal.angr")
from . import Bow
class angrProjectBow(Bow):
"""
Constructs an angr project to match the target precisely
"""
def __init__(self, target, scout_bow, static_simproc=False):
"""
:param target: The target to work on.
:param scout_bow: The scout bow.
:param static_simproc: When enabled, angr will hook functions in the main binary with SimProcedures if
available. This is useful when dealing with statically linked binaries.
:type static_simproc: bool
"""
super(angrProjectBow, self).__init__(target)
self.scout_bow = scout_bow
from . import Bow
import fuzztainer as f
import os
from subprocess import Popen
class AflBow(Bow):
"""
Returns a Runnable AFL instance connected to the target
"""
def __init__(self, target):
super(AflBow, self).__init__(target)
self.target.mount_local()
self.runner = None
def fire(self): #pylint:disable=arguments-differ
"""
Returns an AFLRunner instance
"""
# TODO: resolve container vs local inject_path
if hasattr(self.target, "container"):
import socket
import pwnlib
from . import Bow
class TubeBow(Bow):
"""
Returns a pwntools tube connected to a running instance of the target.
"""
def fire(self, stderr=True): #pylint:disable=arguments-differ
"""
Returns a tube connected to the process.
:param bool stderr: If the target is a console app, whether to include stderr.
"""
if self.target.tcp_ports:
self.target.run_command()
r = pwnlib.tubes.remote.remote(self.target.ipv4_address, self.target.tcp_ports[0])
return r
elif self.target.udp_ports:
self.target.run_command()