Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def list():
ecode = 0
try:
ret = anchorecli.clients.apiexternal.system_feeds_list(config)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'system_feeds_list', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'system_feeds_list', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
if days_old == 0 and tag_versions_newer == 0:
resp = click.prompt('Are you sure you want to use 0 for both days old limit and number of tag versions newer? WARNING: This will archive all images that match the registry/repo/tag selectors as soon as they are analyzed', type=click.Choice(['y', 'n']), default='n')
if resp.lower() != 'y':
ecode = 0
anchorecli.cli.utils.doexit(ecode)
try:
ret = anchorecli.clients.apiexternal.add_transition_rule(config, days_old, tag_versions_newer, registry_selector, repository_selector, tag_selector, transition, is_global)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'transition_rules', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'transition_rules', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
if ret['success']:
whoami['account'] = ret['payload']
else:
raise Exception( json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'account', {}, err))
sys.exit(2)
try:
ret = anchorecli.clients.apiexternal.get_user(config)
if ret['success']:
whoami['user'] = ret['payload']
else:
raise Exception( json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'account', {}, err))
sys.exit(2)
def add(input_policy):
ecode = 0
try:
with open(input_policy, 'r') as FH:
policybundle = json.loads(FH.read())
ret = anchorecli.clients.apiexternal.add_policy(config, policybundle=policybundle, detail=True)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'policy_add', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'policy_add', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
all_up[feed_record.get('name')] = True
if False not in all_up.values():
_logger.debug("all requests feeds have been synced")
break
else:
_logger.debug("some feeds not yet synced {}".format(all_up))
except Exception as err:
print ("service feeds list failed {}".format(err))
time.sleep(interval)
else:
raise Exception("timed out after {} seconds.".format(timeout))
sys.stderr.write("Feed sync: Success.\n")
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'system_wait', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
"""
ecode = 0
try:
ret = anchorecli.clients.apiexternal.delete_archived_analysis(config, digest)
if ret:
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'image_delete', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
else:
raise Exception("operation failed with empty response")
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image_delete', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
def list_archived_analyses():
ecode = 0
try:
ret = anchorecli.clients.apiexternal.list_archived_analyses(config)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'analysis_archive_list', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'analysis_archive_list', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
def describe_errorcodes():
ecode = 0
try:
ret = anchorecli.clients.apiexternal.describe_error_codes(config)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'system_describe_error_codes', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'system_describe_error_codes', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
def get(policyid, detail):
"""
POLICYID: Policy ID to get
"""
ecode = 0
try:
ret = anchorecli.clients.apiexternal.get_policy(config, policyId=policyid, detail=detail)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'policy_get', {'detail':detail}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'policy_get', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
"""
ACCOUNT_NAME: name of new account to create
"""
ecode = 0
try:
ret = anchorecli.clients.apiexternal.get_account(config, account_name=account_name)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'account_get', {}, ret['payload']))
else:
raise Exception( json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'account_get', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)