Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _print_deployment_info(deployment, output_type):
if output_type == 'yaml':
result = pb_to_yaml(deployment)
else:
result = MessageToJson(deployment)
if deployment.state.info_json:
result = json.loads(result)
result['state']['infoJson'] = json.loads(deployment.state.info_json)
_echo(json.dumps(result, indent=2, separators=(',', ': ')))
return
_echo(result)
def info(bundle_path=bundle_path):
"""
List all APIs defined in the BentoService loaded from saved bundle
"""
track_cli('info')
bento_service_metadata_pb = load_bento_service_metadata(bundle_path)
output = json.dumps(ProtoMessageToDict(bento_service_metadata_pb), indent=2)
_echo(output)
def _print_deployment_info(deployment, output_type):
if output_type == 'yaml':
result = pb_to_yaml(deployment)
else:
result = MessageToJson(deployment)
if deployment.state.info_json:
result = json.loads(result)
result['state']['infoJson'] = json.loads(deployment.state.info_json)
_echo(json.dumps(result, indent=2, separators=(',', ': ')))
return
_echo(result)
' status. {error_code}:{error_message}'.format(
name=deployment_yaml.get('name'),
error_code=status_pb2.Status.Code.Name(
result_state.status.status_code
),
error_message=result_state.status.error_message,
)
)
return
result.deployment.state.CopyFrom(result_state.state)
track_cli(
'deploy-apply-success',
deployment_yaml.get('spec', {}).get('operator'),
)
_echo(
'Successfully applied spec to deployment {}'.format(
deployment_yaml.get('name')
),
CLI_COLOR_SUCCESS,
)
_print_deployment_info(result.deployment, output)
except BentoMLException as e:
_echo(
'Failed to apply deployment {name}. Error message: {message}'.format(
name=deployment_yaml.get('name'), message=e
)
if result_state.status.status_code != status_pb2.Status.OK:
_echo(
'Created deployment {name}, failed to retrieve latest status.'
' {error_code}:{error_message}'.format(
name=name,
error_code=status_pb2.Status.Code.Name(
result_state.status.status_code
),
error_message=result_state.status.error_message,
)
)
return
result.deployment.state.CopyFrom(result_state.state)
track_cli('deploy-create-success', platform.replace('-', '_').upper())
_echo('Successfully created deployment {}'.format(name), CLI_COLOR_SUCCESS)
_print_deployment_info(result.deployment, output)
def list_deployments_cli(output, limit, filters, labels, namespace, all_namespaces):
track_cli('deploy-list')
yatai_service = get_yatai_service()
result = list_deployments(
limit=limit,
filters=filters,
labels=parse_key_value_pairs(labels),
namespace=namespace,
is_all_namespaces=all_namespaces,
yatai_service=yatai_service,
)
if result.status.status_code != status_pb2.Status.OK:
_echo(
'Failed to list deployments. {error_code}:{error_message}'.format(
error_code=status_pb2.Status.Code.Name(result.status.status_code),
error_message=result.status.error_message,
),
CLI_COLOR_ERROR,
)
else:
_print_deployments_info(result.deployments, output)
result.status.status_code
),
error_message=result.status.error_message,
),
CLI_COLOR_ERROR,
)
else:
if wait:
result_state = get_state_after_await_action_complete(
yatai_service=yatai_service,
name=deployment_yaml.get('name'),
namespace=deployment_yaml.get('namespace'),
message='Applying deployment',
)
if result_state.status.status_code != status_pb2.Status.OK:
_echo(
'Created deployment {name}, failed to retrieve latest'
' status. {error_code}:{error_message}'.format(
name=deployment_yaml.get('name'),
error_code=status_pb2.Status.Code.Name(
result_state.status.status_code
),
error_message=result_state.status.error_message,
)
)
return
result.deployment.state.CopyFrom(result_state.state)
track_cli(
'deploy-apply-success',
deployment_yaml.get('spec', {}).get('operator'),
)
def open_api_spec(bundle_path=bundle_path):
track_cli('open-api-spec')
bento_service = load(bundle_path)
_echo(json.dumps(get_docs(bento_service), indent=2))