Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def image(ctx_config):
global config
config = ctx_config
try:
anchorecli.cli.utils.check_access(config)
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image', {}, err))
sys.exit(2)
def system(ctx_config, ctx):
global config
config = ctx_config
if ctx.invoked_subcommand not in ['wait']:
try:
anchorecli.cli.utils.check_access(config)
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'system', {}, err))
sys.exit(2)
def account(ctx_config):
global config, whoami
config = ctx_config
try:
anchorecli.cli.utils.check_access(config)
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'account', {}, err))
sys.exit(2)
try:
ret = anchorecli.clients.apiexternal.get_account(config)
if ret['success']:
whoami['account'] = ret['payload']
else:
raise Exception( json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'account', {}, err))
sys.exit(2)
try:
ret = anchorecli.clients.apiexternal.get_user(config)
def policy(ctx_config):
global config
config = ctx_config
try:
anchorecli.cli.utils.check_access(config)
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'policy', {}, err))
sys.exit(2)
def registry(ctx_config):
global config
config = ctx_config
try:
anchorecli.cli.utils.check_access(config)
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'registry', {}, err))
sys.exit(2)
:param timeout:
:param interval:
:param feedsready:
:return:
"""
global config
ecode = 0
try:
sys.stderr.write("Starting checks to wait for anchore-engine to be available timeout={} interval={}\n".format(timeout, interval))
ts = time.time()
while timeout < 0 or time.time() - ts < timeout:
sys.stderr.write("API availability: Checking anchore-engine URL ({})...\n".format(config['url']))
_logger.debug("Checking API availability for anchore-engine URL ({})".format(config['url']))
try:
anchorecli.cli.utils.check_access(config)
_logger.debug("check access success")
break;
except Exception as err:
_logger.debug("check access failed, trying again")
time.sleep(interval)
else:
raise Exception("timed out after {} seconds.".format(timeout))
sys.stderr.write("API availability: Success.\n")
while timeout < 0 or time.time() - ts < timeout:
all_up = {}
try:
services_to_check = [x for x in servicesready.split(',') if x]
for f in services_to_check:
all_up[f] = False