Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def image_add(image_digests):
"""
Add an analyzed image to the analysis archive
"""
ecode = 0
try:
for digest in image_digests:
if not re.match(digest_regex, digest):
raise Exception('Invalid image digest {}. Must conform to regex: {}'.format(digest, digest_regex))
ret = anchorecli.clients.apiexternal.archive_analyses(config, image_digests)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'archive_analysis', {}, ret['payload']))
else:
raise Exception( json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'archive_analysis', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
def watch(input_repo):
"""
INPUT_REPO: Input repo can be in the following formats: registry/repo
"""
ecode = 0
image_info = anchorecli.cli.utils.parse_dockerimage_string(input_repo)
input_repo = image_info['registry'] + "/" + image_info['repo']
try:
ret = anchorecli.clients.apiexternal.watch_repo(config, input_repo)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret:
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'repo_watch', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
else:
raise Exception("operation failed with empty response")
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'repo_watch', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
def deactivate(subscription_type, subscription_key):
"""
SUBSCRIPTION_TYPE: Type of subscription. Valid options:
- tag_update: Receive notification when new image is pushed
- policy_eval: Receive notification when image policy status changes
- vuln_update: Receive notification when vulnerabilities are added, removed or modified
SUBSCRIPTION_KEY: Fully qualified name of tag to subscribe to. Eg. docker.io/library/alpine:latest
"""
ecode = 0
try:
ret = anchorecli.clients.apiexternal.deactivate_subscription(config, subscription_type, subscription_key)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'subscription_deactivate', {}, ret['payload']))
else:
raise Exception( json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'subscription_deactivate', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
ret = anchorecli.clients.apiexternal.get_policy(config, policyId=policyid, detail=True)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
policy_records = ret['payload']
policy_record = {}
if policy_records:
policy_record = policy_records[0]
else:
raise Exception(json.dumps(ret['error'], indent=4))
if not policy_record:
raise Exception("no policy could be fetched to activate")
policy_record['active'] = True
ret = anchorecli.clients.apiexternal.update_policy(config, policyid, policy_record=policy_record)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'policy_activate', {'policyId': policyid}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'policy_activate', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
def list(since=None, before=None, level=None, service=None, host=None, resource=None, event_type=None, resource_type=None, all=False, full=False):
"""
RESOURCE: Value can be a tag, image digest or repository name. Displays results related to the specific resource
"""
ecode = 0
try:
if level:
if level.upper() not in ['INFO', 'ERROR']:
raise Exception('{} is an invalid value for --level. Supported values are \'info\' or \'error\''.format(level))
level = level.upper()
ret = anchorecli.clients.apiexternal.list_events(config, since=since, before=before, level=level, service=service, host=host, resource=resource, event_type=event_type, resource_type=resource_type, all=all)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
if full:
print(anchorecli.cli.utils.format_output(config, 'event_list_full', {}, ret['payload']))
else:
print(anchorecli.cli.utils.format_output(config, 'event_list', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'event_list', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
"""
ecode = 0
try:
itype, image, imageDigest = anchorecli.cli.utils.discover_inputimage(config, input_image)
if imageDigest:
thetag = input_image
if tag:
thetag = tag
elif itype == 'tag':
thetag = image
else:
raise Exception("input image name is not a tag, and no --tag is specified")
ret = anchorecli.clients.apiexternal.check_eval(config, imageDigest=imageDigest, history=show_history, detail=detail, tag=thetag, policyId=policy)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'evaluate_check', {'detail': detail, 'history': show_history, 'tag': thetag}, ret['payload']))
ecode = anchorecli.cli.utils.get_eval_ecode(ret['payload'], anchorecli.cli.utils.unquote_plus(imageDigest))
else:
raise Exception(json.dumps(ret['error'], indent=4))
else:
raise Exception("could not get image record from anchore")
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'evaluate_check', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
def _fetch_bundle(config, bundlename=None, auth=(None, None)):
base_url = re.sub("/$", "", config['hub-url'])
ret = anchorecli.clients.hub.get_policies(config)
if ret['success']:
index = ret['payload']
else:
raise Exception(ret['error'])
url = None
for record in index['content']:
if record['type'] == 'bundle' and record['name'] == bundlename:
url = base_url + '/' + record['location']
if not url:
raise Exception("Bundle name {} not found in index".format(bundlename))
bundle = None
r = requests.get(url, auth=auth)
if r.status_code not in range(200, 299):