Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import contextlib
import tempfile
import logging
import os
l = logging.getLogger("archr.arsenal.core_bow")
from . import ContextBow
class CoreResults:
local_core_path = None
target_core_path = None
_super_core_cmd = "echo core | docker run --rm --privileged -i ubuntu tee /proc/sys/kernel/core_pattern"
class CoreBow(ContextBow):
"""
Runs the target and retrieves a core file. Assumes a /proc/sys/kernel/core_pattern is "core".
"""
def __init__(self, *args, **kwargs):
with open("/proc/sys/kernel/core_pattern", 'rb') as c:
if c.read().strip() != b"core":
l.warning("/proc/sys/kernel/core_pattern needs to be 'core'. I am setting this system-wide.")
os.system(_super_core_cmd)
super().__init__(*args, **kwargs)
if type(self.target) is not targets.DockerImageTarget:
l.warning("When using a LocalTarget, this Bow will chmod 777 your CWD!!! Be careful.")
@contextlib.contextmanager
def fire_context(self, **kwargs): #pylint:disable=arguments-differ
if self.target.run_command(["chmod","777",os.path.dirname(self.target.target_path)], user="root").wait() != 0:
timed_out = None
# introspection
trace = None
crash_address = None
base_address = None
magic_contents = None
core_path = None
def tracer_technique(self, **kwargs):
return angr.exploration_techniques.Tracer(self.trace, crash_addr=self.crash_address, **kwargs)
_trace_old_re = re.compile(br'Trace (.*) \[(?P.*)\].*')
_trace_new_re = re.compile(br'Trace (.*) \[(?P.*)\/(?P.*)\/(?P.*)\].*')
class QEMUTracerBow(ContextBow):
REQUIRED_ARROW = "shellphish_qemu"
def __init__(self, target, timeout=10, ld_linux=None, library_path=None, seed=None, **kwargs):
super().__init__(target, **kwargs)
self.timeout = timeout
self.ld_linux = ld_linux
self.library_path = library_path
self.seed = seed
@contextlib.contextmanager
def _target_mk_tmpdir(self):
tmpdir = tempfile.mktemp(prefix="/tmp/tracer_target_")
self.target.run_command(["mkdir", tmpdir]).wait()
try:
yield tmpdir
finally:
def __init__(self, trace_dir=None, symbolic_fd=None):
if trace_dir is None:
self.trace_dir = tempfile.TemporaryDirectory(
prefix='rr_trace_dir_')
else:
self.trace_dir = FakeTempdir(trace_dir)
self.symbolic_fd = symbolic_fd
def tracer_technique(self, **kwargs):
if trraces is None:
raise Exception("need to install trraces")
return trraces.replay_interfaces.angr.technique.Trracer(self.trace_dir.name, symbolic_fd=self.symbolic_fd, **kwargs)
class RRBow(ContextBow):
REQUIRED_ARROW = "rr"
REMOTE_TRACE_DIR_PREFIX = "/tmp/rr_trace_"
def __init__(self, target, timeout=10, local_trace_dir=None, symbolic_fd=None):
super().__init__(target)
self.timeout = timeout
self.local_trace_dir = local_trace_dir
self.symbolic_fd = symbolic_fd
@contextlib.contextmanager
def _target_mk_tmpdir(self):
tmpdir = tempfile.mktemp(prefix=self.REMOTE_TRACE_DIR_PREFIX)
self.target.run_command(["rm", "-rf", tmpdir]).wait()
self.target.run_command(["mkdir", tmpdir]).wait()
try:
yield tmpdir
import os
import logging
l = logging.getLogger("archr.arsenal.memory_map")
from . import ContextBow
class GDBServerBow(ContextBow):
"""
Launches a gdb server.
"""
REQUIRED_ARROW = "gdbserver"
def fire_context(self, port=31337, aslr=False, **kwargs):
fire_path = os.path.join(self.target.tmpwd, "gdbserver", "fire")
return self.target.flight_context(args_prefix=[fire_path, "0.0.0.0:%d"%port], aslr=aslr, **kwargs)
:param kwargs: Additional arguments to run_command
:return: Target instance returned by run_command
"""
fire_path = os.path.join(self.target.tmpwd, "strace", "fire")
args_prefix = (args_prefix or []) + [fire_path] + (trace_args or []) + ["--"]
with self.target.flight_context(args_prefix=args_prefix, **kwargs) as flight:
yield flight
try:
flight.result = flight.process.stderr.read() # illegal, technically
except ValueError:
flight.result = b''
class STraceAttachBow(ContextBow):
"""
Attaches to a process with strace
"""
REQUIRED_BINARY = "/usr/bin/strace"
@contextlib.contextmanager
def fire_context(self, pid, trace_args=None, args_prefix=None, **kwargs): #pylint:disable=arguments-differ
"""
Starts strace attaching to a given process
:param pid: PID of target process, if already existing
:param kwargs: Additional arguments to run_command
:return: Target instance returned by run_command
"""
import os
import logging
from contextlib import contextmanager
from . import ContextBow
from .strace import super_yama
l = logging.getLogger("archr.arsenal.ltrace")
class LTraceBow(ContextBow):
"""
Returns an ltrace instance which has launched a fresh instance of the process
"""
REQUIRED_ARROW = "ltrace"
@contextmanager
def fire_context(self, args_prefix=None, trace_args=None, **kwargs):
"""
Starts ltrace with a fresh process.
:param trace_args: Options for ltrace
:return: Target instance returned by run_command
"""
fire_path = os.path.join(self.target.tmpwd, "ltrace", "fire")
args_prefix = (args_prefix or []) + [fire_path] + (trace_args or []) + ["--"]
@contextmanager
def fire_context(self, args_prefix=None, trace_args=None, **kwargs):
"""
Starts ltrace with a fresh process.
:param trace_args: Options for ltrace
:return: Target instance returned by run_command
"""
fire_path = os.path.join(self.target.tmpwd, "ltrace", "fire")
args_prefix = (args_prefix or []) + [fire_path] + (trace_args or []) + ["--"]
with self.target.flight_context(args_prefix=args_prefix, **kwargs) as flight:
yield flight
flight.result = flight.process.stderr.read() # illegal, technically
class LTraceAttachBow(ContextBow):
"""
Returns an ltrace instance attached to a running instance of the target.
"""
REQUIRED_ARROW = "ltrace"
@contextmanager
def fire_context(self, pid=None, trace_args=None, **kwargs):
"""
Attaches ltrace to an already existing process.
:param pid: PID of target process
:param trace_args: Options for ltrace
:param kwargs: Additional arguments
:return:
"""
import logging
import os
from . import ContextBow
l = logging.getLogger("archr.arsenal.strace")
def super_yama():
with open("/proc/sys/kernel/yama/ptrace_scope", 'rb') as c:
if c.read().strip() != b"0":
l.warning("/proc/sys/kernel/yama/ptrace_scope needs to be '0'. I am setting this system-wide.")
os.system(_super_yama_cmd)
_super_yama_cmd = "echo 0 | docker run --rm --privileged -i ubuntu tee /proc/sys/kernel/yama/ptrace_scope"
class STraceBow(ContextBow):
"""
Launches a process under strace
"""
REQUIRED_BINARY = "/usr/bin/strace"
@contextlib.contextmanager
def fire_context(self, trace_args=None, args_prefix=None, **kwargs): #pylint:disable=arguments-differ
"""
Starts strace with a fresh process.
:param kwargs: Additional arguments to run_command
:return: Target instance returned by run_command
"""
fire_path = os.path.join(self.target.tmpwd, "strace", "fire")
return
class GDBResult:
returncode = None
signal = None
crashed = False
timed_out = False
def __init__(self, trace_dir=None):
if trace_dir is None:
self.trace_dir = tempfile.TemporaryDirectory(prefix='gdb_trace_dir_')
else:
self.trace_dir = FakeTempdir(trace_dir)
class GDBBow(ContextBow):
REQUIRED_ARROW = "gdb"
def __init__(self, target, local_trace_dir=None, timeout=10):
super().__init__(target)
self.timeout = timeout
self.local_trace_dir = local_trace_dir
@contextlib.contextmanager
def fire_context(self, prefix_args=None, gdb_args=None, gdb_script=None, sleep_time=0.1):
"""Run the target with gdb.
Keyword arguments:
prefix_args -- additional commands BEFORE the gdb command (default None)
gdb_args -- addition args for gdb (default None)
gdb_script -- Path of an optional gdb_script file (default None)
"""