Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
jmpq_rdi = b"\xff\xe7"
code = (
movabsq_address_rdx
+ subq_0x18_rsp
+ xorl_eax_eax
+ movq_rdi_stack
+ callq_rdx
+ movq_stack_rdi
+ xorl_eax_eax
+ addq_0x18_rsp
+ jmpq_rdi
)
ctypes.memmove(mem, code, len(code))
# Make code executable
libc.mprotect(mem, page_size, libc.PROT_READ | libc.PROT_EXEC)
return libc.CLONE_CALLBACK(mem)
def _load_seccomp():
# First check if seccomp has a chance of working.
try:
libc.prctl(libc.PR_GET_SECCOMP, 0, 0, 0, 0)
except OSError as e:
logging.warning(
"Seccomp is not available, container isolation is degraded (%s).",
os.strerror(e.errno),
)
return False
try:
libc.prctl(libc.PR_SET_SECCOMP, _SECCOMP_MODE_FILTER, 0, 0, 0)
except OSError as e:
# EFAULT is expected if passing null pointer as filter argument
if e.errno != errno.EFAULT:
logging.warning(
"Unexpected failure when enabling seccomp filter, "
"container isolation is degraded (%s).",
e,
_scmp_filter_ctx = c_void_p
_SECCOMP_MODE_FILTER = 2 # /usr/include/linux/seccomp.h
# By default, a seccomp filter only allows syscall that match the native architecture.
# In some cases, it is common to run binaries with a different architecture
# (e.g., x86 binaries on an x86_64 kernel) and we want to allow such binaries and filter
# their syscalls. Thus we need a list of architectures of which we expect binaries to
# occur even though the native architecture is different. This is commonly the case for
# 32-bit counterparts of 64-bit architectures.
_ALLOWED_COMPATIBILITY_ARCHITECTURES = [b"x86", b"x32", b"arm"]
_AVAILABLE = None
_LOAD_LOCK = threading.Lock()
_lib = None
_check_errno = libc._check_errno
def _check_null(result, func, arguments):
"""Check that a ctypes function returned something else than null."""
if result:
return result
func_name = getattr(func, "__name__", "__unknown__")
raise OSError(func_name + "(" + ", ".join(map(str, arguments)) + ") returned null")
def _load_seccomp():
# First check if seccomp has a chance of working.
try:
libc.prctl(libc.PR_GET_SECCOMP, 0, 0, 0, 0)
except OSError as e:
def allocate_stack(size=DEFAULT_STACK_SIZE):
"""Allocate some memory that can be used as a stack.
@return: a ctypes void pointer to the *top* of the stack.
"""
# Allocate memory with appropriate flags for a stack as in
# https://blog.fefe.de/?ts=a85c8ba7
base = libc.mmap_anonymous(
size + GUARD_PAGE_SIZE,
libc.PROT_READ | libc.PROT_WRITE,
libc.MAP_GROWSDOWN | libc.MAP_STACK,
)
try:
# create a guard page that crashes the application when it is written to
# (on stack overflow)
libc.mprotect(base, GUARD_PAGE_SIZE, libc.PROT_NONE)
yield ctypes.c_void_p(base + size + GUARD_PAGE_SIZE)
finally:
libc.munmap(base, size + GUARD_PAGE_SIZE)
code = (
movabsq_address_rdx
+ subq_0x18_rsp
+ xorl_eax_eax
+ movq_rdi_stack
+ callq_rdx
+ movq_stack_rdi
+ xorl_eax_eax
+ addq_0x18_rsp
+ jmpq_rdi
)
ctypes.memmove(mem, code, len(code))
# Make code executable
libc.mprotect(mem, page_size, libc.PROT_READ | libc.PROT_EXEC)
return libc.CLONE_CALLBACK(mem)
"make_overlay_mount",
"mount_proc",
"make_bind_mount",
"get_my_pid_from_procfs",
"drop_capabilities",
"wait_for_child_and_forward_signals",
"setup_container_system_config",
"CONTAINER_UID",
"CONTAINER_GID",
"CONTAINER_HOME",
"CONTAINER_HOSTNAME",
]
DEFAULT_STACK_SIZE = 1024 * 1024
GUARD_PAGE_SIZE = libc.sysconf(libc.SC_PAGESIZE) # size of guard page at end of stack
CONTAINER_UID = 1000
CONTAINER_GID = 1000
CONTAINER_HOME = "/home/benchexec"
CONTAINER_HOSTNAME = "benchexec"
CONTAINER_ETC_NSSWITCH_CONF = """
passwd: files
group: files
shadow: files
hosts: files
networks: files
protocols: db files
services: db files
ethers: db files
def make_bind_mount(source, target, recursive=False, private=False, read_only=False):
"""Make a bind mount.
@param source: the source directory as bytes
@param target: the target directory as bytes
@param recursive: whether to also recursively bind mount all mounts below source
@param private: whether to mark the bind as private,
i.e., changes to the existing mounts won't propagate and vice-versa
(changes to files/dirs will still be visible).
"""
flags = libc.MS_BIND
if recursive:
flags |= libc.MS_REC
if private:
flags |= libc.MS_PRIVATE
if read_only:
flags |= libc.MS_RDONLY
libc.mount(source, target, None, flags, None)
def remount_with_additional_flags(mountpoint, existing_options, mountflags):
"""Remount an existing mount point with additional flags.
@param mountpoint: the mount point as bytes
@param existing_options: dict with current mount existing_options as bytes
@param mountflags: int with additional mount existing_options
(cf. libc.MS_* constants)
"""
mountflags |= libc.MS_REMOUNT | libc.MS_BIND
for option, flag in libc.MOUNT_FLAGS.items():
if option in existing_options:
mountflags |= flag
libc.mount(None, mountpoint, None, mountflags, None)
util.makedirs(temp_base + temp_dir, exist_ok=True)
container.make_bind_mount(temp_base + temp_dir, mount_base + temp_dir)
# Now we make mount_base the new root directory. For this we need a place below
# mount_base where to move the old root directory.
# Explanation: https://unix.stackexchange.com/a/456777/15398
old_root = b"/proc" # Does not matter, just needs to exist.
# These three steps together are the recommended sequence for calling pivot_root
# (http://man7.org/linux/man-pages/man8/pivot_root.8.html)
os.chdir(mount_base)
libc.pivot_root(mount_base, mount_base + old_root)
os.chroot(".")
# Now the container file system is at /,
# and the outer file system is visible at old_root in the container.
# We can just unmount old_root and finally make it inaccessible from container.
libc.umount2(old_root, libc.MNT_DETACH)