Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Copyright (c) 2019 VMware, Inc. All Rights Reserved.
# SPDX-License-Identifier: BSD-2-Clause
"""
Use an external tool to analyze a container image
"""
import logging
from tern.classes.notice import Notice
from tern.utils import constants
from tern.utils import rootfs
# global logger
logger = logging.getLogger(constants.logger_name)
def get_filesystem_command(layer_obj, command):
'''Given an ImageLayer object and a command in the form of a string,
return the command in list form with the target directory of the layer.
This assumes that the layer tarball is untarred, which should have happened
during the loading of the Image object'''
cmd_list = command.split(' ')
# in most cases, the external tool has a CLI where the target directory
# is the last token in the command. So the most straightforward way
# to perform this operation is to append the target directory
cmd_list.append(rootfs.get_untar_dir(layer_obj.tar_file))
return cmd_list
def execute_external_command(layer_obj, command):
import argparse
import logging
import os
import sys
from tern.analyze import common
from tern.analyze.docker import run
from tern.utils import cache
from tern.utils import constants
from tern.utils import general
from tern.utils import rootfs
from tern.report import errors
# global logger
logger = logging.getLogger(constants.logger_name)
logger.setLevel(logging.DEBUG)
# console stream handler
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(module)s - %(message)s')
log_handler = logging.FileHandler(constants.logfile, mode='w')
log_handler.setLevel(logging.DEBUG)
log_handler.setFormatter(formatter)
console.setFormatter(formatter)
logger.addHandler(log_handler)
# SPDX-License-Identifier: BSD-2-Clause
"""
default report generator
"""
import logging
from tern.report import formats
from tern.formats import generator
from tern.report import content
from tern.utils import constants
# global logger
logger = logging.getLogger(constants.logger_name)
def print_full_report(image):
'''Given an image, go through the Origins object and collect all the
notices for the image, layers and packages'''
notes = ''
for image_origin in image.origins.origins:
notes = notes + content.print_notices(image_origin, '', '\t')
for layer in image.layers:
if layer.import_image:
notes = notes + print_full_report(layer.import_image)
else:
for layer_origin in layer.origins.origins:
notes = notes + content.print_notices(layer_origin,
'\t', '\t\t')
layer_pkg_list = []
import tarfile
from tern.classes.package import Package
from tern.classes.notice import Notice
from tern.classes.command import Command
from tern.command_lib import command_lib
from tern.report import formats
from tern.report import errors
from tern.report import content
from tern.utils import cache
from tern.utils import constants
from tern.utils import general
from tern.utils import rootfs
# global logger
logger = logging.getLogger(constants.logger_name)
def get_shell_commands(shell_command_line):
'''Given a shell command line, get a list of Command objects'''
comm_list = general.split_command(shell_command_line)
cleaned_list = []
for comm in comm_list:
cleaned_list.append(Command(general.clean_command(comm)))
return cleaned_list
def load_from_cache(layer, redo=False):
'''Given a layer object, check against cache to see if that layer id exists
if yes then get the package list and load it in the layer and return true.
If it doesn't exist return false. Default operation is to not redo the
cache. Add notices to the layer's origins matching the origin_str'''
from tern.utils import container
from tern.utils import constants
from tern.utils import cache
from tern.utils import general
from tern.utils import rootfs
from tern.classes.docker_image import DockerImage
from tern.classes.image import Image
from tern.classes.image_layer import ImageLayer
from tern.classes.notice import Notice
from tern.classes.package import Package
from tern.helpers import common
import tern.helpers.docker as dhelper
from tern.command_lib import command_lib
# global logger
logger = logging.getLogger(constants.logger_name)
def write_report(report, args):
'''Write the report to a file'''
if args.file:
file_name = args.file
elif args.yaml:
file_name = constants.yaml_file
else:
file_name = constants.report_file
with open(file_name, 'w') as f:
f.write(report)
def setup(dockerfile=None, image_tag_string=None):
'''Any initial setup'''
from tern.classes.docker_image import DockerImage
from tern.classes.notice import Notice
from tern.analyze.docker import dockerfile
from tern.analyze.docker import container
from tern.utils import constants
from tern.report import errors
from tern.report import formats
from tern.analyze import common
# dockerfile
dockerfile_global = ''
# dockerfile commands
docker_commands = []
# global logger
logger = logging.getLogger(constants.logger_name)
def load_docker_commands(dockerfile_path):
'''Given a dockerfile get a persistent list of docker commands'''
if not os.path.isfile(dockerfile_path):
raise IOError('{} does not exist'.format(dockerfile_path))
global docker_commands
docker_commands = dockerfile.get_directive_list(
dockerfile.get_command_list(dockerfile_path))
global dockerfile_global
dockerfile_global = dockerfile_path
def print_dockerfile_base(base_instructions):
'''For the purpose of tracking the lines in the dockerfile that
produce packages, return a string containing the lines in the dockerfile
# general snippets in command library
snippet_file = pkg_resources.resource_filename('tern',
'command_lib/snippets.yml')
# command library
command_lib = {'base': {}, 'snippets': {}}
with open(os.path.abspath(base_file)) as f:
command_lib['base'] = yaml.safe_load(f)
with open(os.path.abspath(snippet_file)) as f:
command_lib['snippets'] = yaml.safe_load(f)
# list of package information keys that the command library can accomodate
base_keys = {'names', 'versions', 'licenses', 'copyrights', 'proj_urls',
'srcs'}
package_keys = {'name', 'version', 'license', 'copyright', 'proj_url', 'src'}
# global logger
logger = logging.getLogger(constants.logger_name)
class FormatAwk(dict):
'''Code snippets will sometimes use awk and some of the formatting
syntax resembles python's formatting. This class is meant to override
the KeyError error that occurs for a missing key when trying to format
a string such as "awk '{print $1}'"'''
def __missing__(self, key):
return '{' + key + '}'
def get_base_listing(key):
'''Given the key listing in base.yml, return the dictionary'''
listing = {}
if key in command_lib['base'].keys():
listing = command_lib['base'][key]
from tern.report import formats
from tern.report import report
from tern.utils import constants
from tern.analyze.docker import container
from tern.classes.notice import Notice
from tern.analyze import common
import tern.analyze.docker.helpers as dhelper
from tern.classes.image_layer import ImageLayer
from tern.classes.image import Image
from tern.classes.package import Package
from tern.analyze.docker.analyze import analyze_docker_image
from tern.analyze.passthrough import run_extension
# global logger
logger = logging.getLogger(constants.logger_name)
def get_dockerfile_packages():
'''Given a Dockerfile return an approximate image object. This is mosty
guess work and shouldn't be relied on for accurate information. Add
Notice messages indicating as such:
1. Create an image with a placeholder repotag
2. For each RUN command, create a package list
3. Create layer objects with incremental integers and add the package
list to that layer with a Notice about parsing
4. Return stub image'''
stub_image = Image('easteregg:cookie')
layer_count = 0
for inst in dhelper.docker_commands:
if inst[0] == 'RUN':
layer_count = layer_count + 1
import logging
import sys
from tern.report import errors
from tern.report import formats
from tern.utils import constants
from tern.utils import rootfs
from tern.classes.notice import Notice
from tern.analyze import common
import tern.analyze.docker.helpers as dhelper
from tern.command_lib import command_lib
# global logger
logger = logging.getLogger(constants.logger_name)
def analyze_docker_image(image_obj, redo=False, dockerfile=False):
'''Given a DockerImage object, for each layer, retrieve the packages, first
looking up in cache and if not there then looking up in the command
library. For looking up in command library first mount the filesystem
and then look up the command library for commands to run in chroot'''
# set up empty master list of packages
master_list = []
prepare_for_analysis(image_obj, dockerfile)
# Analyze the first layer and get the shell
shell = analyze_first_layer(image_obj, master_list, redo)
# Analyze the remaining layers
analyze_subsequent_layers(image_obj, shell, master_list, redo)
common.save_to_cache(image_obj)
mount_proc = ['mount', '-t', 'proc', '/proc']
mount_sys = ['mount', '-o', 'bind', '/sys']
mount_dev = ['mount', '-o', 'bind', '/dev']
unmount = ['umount']
# enable host DNS settings
host_dns = ['cp', constants.resolv_path]
# unshare PID within rootfs
unshare_pid = ['unshare', '-pf']
# union mount
union_mount = ['mount', '-t', 'overlay', 'overlay', '-o']
# global logger
logger = logging.getLogger(constants.logger_name)
def root_command(command, *extra):
'''Invoke a shell command as root or using sudo. The command is a
list of shell command words'''
full_cmd = []
sudo = True
if os.getuid() == 0:
sudo = False
if sudo:
full_cmd.append('sudo')
full_cmd.extend(command)
for arg in extra:
full_cmd.append(arg)
# invoke
logger.debug("Running command: %s", ' '.join(full_cmd))