Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sys.stderr.write("API availability: Success.\n")
while timeout < 0 or time.time() - ts < timeout:
all_up = {}
try:
services_to_check = [x for x in servicesready.split(',') if x]
for f in services_to_check:
all_up[f] = False
except:
all_up = {}
sys.stderr.write("Service availability: Checking for service set ({})...\n".format(','.join(all_up.keys())))
_logger.debug("Checking service set availability for anchore-engine URL ({})".format(config['url']))
try:
ret = anchorecli.clients.apiexternal.system_status(config)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
for service_record in ret.get('payload', {}).get('service_states', []):
s = service_record.get('servicename', None)
if s:
if s not in all_up:
all_up[s] = False
try:
s_up = service_record.get('service_detail', {}).get('up', False)
except:
s_up = False
if s_up:
all_up[s] = s_up
if False not in all_up.values():
def activate(subscription_type, subscription_key):
"""
SUBSCRIPTION_TYPE: Type of subscription. Valid options:
- tag_update: Receive notification when new image is pushed
- policy_eval: Receive notification when image policy status changes
- vuln_update: Receive notification when vulnerabilities are added, removed or modified
SUBSCRIPTION_KEY: Fully qualified name of tag to subscribe to. Eg. docker.io/library/alpine:latest
"""
ecode = 0
try:
ret = anchorecli.clients.apiexternal.activate_subscription(config, subscription_type, subscription_key)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'subscription_activate', {}, ret['payload']))
else:
raise Exception( json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'subscription_activate', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
def rule_delete(rule_id):
ecode = 0
try:
ret = anchorecli.clients.apiexternal.delete_transition_rule(config, rule_id)
if ret:
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'image_delete', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
else:
raise Exception("operation failed with empty response")
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image_delete', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
def delete(registry):
"""
REGISTRY: Full hostname/port of registry. Eg. myrepo.example.com:5000
"""
ecode = 0
try:
ret = anchorecli.clients.apiexternal.delete_registry(config, registry)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'registry_delete', {}, ret['payload']))
else:
raise Exception( json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'registry_delete', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'account', {}, err))
sys.exit(2)
try:
ret = anchorecli.clients.apiexternal.get_account(config)
if ret['success']:
whoami['account'] = ret['payload']
else:
raise Exception( json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'account', {}, err))
sys.exit(2)
try:
ret = anchorecli.clients.apiexternal.get_user(config)
if ret['success']:
whoami['user'] = ret['payload']
else:
raise Exception( json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'account', {}, err))
sys.exit(2)
if feedsready:
all_up = {}
try:
feeds_to_check = feedsready.split(',')
for f in feeds_to_check:
all_up[f] = False
except:
all_up = {}
while timeout < 0 or time.time() - ts < timeout:
sys.stderr.write("Feed sync: Checking sync completion for feed set ({})...\n".format(','.join(all_up.keys())))
_logger.debug("Checking feed sync status for anchore-engine URL ({})".format(config['url']))
try:
ret = anchorecli.clients.apiexternal.system_feeds_list(config)
if ret['success']:
for feed_record in ret.get('payload', []):
_logger.debug("response show feed name={} was last_full_sync={}".format(feed_record.get('name'), feed_record.get('last_full_sync')))
if feed_record.get('name', None) in all_up:
if feed_record.get('last_full_sync', None):
all_up[feed_record.get('name')] = True
if False not in all_up.values():
_logger.debug("all requests feeds have been synced")
break
else:
_logger.debug("some feeds not yet synced {}".format(all_up))
except Exception as err:
print ("service feeds list failed {}".format(err))
time.sleep(interval)
else:
def delete(host_id, servicename):
ecode = 0
try:
ret = anchorecli.clients.apiexternal.delete_system_service(config, host_id, servicename)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'delete_system_service', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'delete_system_service', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
def check_access(config):
# test the endpoint
try:
rc = anchorecli.clients.apiexternal.get_base_routes(config)
if not rc['success']:
raise Exception(json.dumps(rc['error'], sort_keys=True))
except Exception as err:
if config['debug']:
raise Exception("could not access anchore service (user="+str(config['user']) +" url="+str(config['url'])+"): " + str(err))
else:
raise Exception("could not access anchore service (user="+str(config['user']) +" url="+str(config['url'])+")")
return(True)