Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_aws_lambda_apply_failed_only_local_repo(
mock_popen,
mock_init_serverless,
mock_copy,
mock_copytree,
mock_check_nodejs,
mock_checkcall,
mock_checkoutput,
):
test_deployment_pb = generate_lambda_deployment_pb()
yatai_service_mock = create_yatai_service_mock(BentoUri.UNSET)
deployment_operator = AwsLambdaDeploymentOperator()
result_pb = deployment_operator.apply(test_deployment_pb, yatai_service_mock)
assert result_pb.status.status_code == Status.INTERNAL
assert result_pb.status.error_message.startswith('BentoML currently not support')
def test_sagemaker_apply_fail_not_local_repo():
yatai_service = create_yatai_service_mock(repo_storage_type=BentoUri.UNSET)
sagemaker_deployment_pb = generate_sagemaker_deployment_pb()
deployment_operator = SageMakerDeploymentOperator(yatai_service)
result_pb = deployment_operator.add(sagemaker_deployment_pb)
assert result_pb.status.status_code == Status.INTERNAL
assert result_pb.status.error_message.startswith('BentoML currently not support')
def test_aws_lambda_describe_failed_no_formation():
yatai_service_mock = create_yatai_service_mock()
test_deployment_pb = generate_lambda_deployment_pb()
deployment_operator = AwsLambdaDeploymentOperator()
result_pb = deployment_operator.describe(test_deployment_pb, yatai_service_mock)
assert result_pb.status.status_code == Status.INTERNAL
assert result_pb.status.error_message.startswith(
'An error occurred (ValidationError)'
)
def NOT_FOUND(message=None):
return StatusProto(status_code=StatusProto.NOT_FOUND, error_message=message)
def OUT_OF_RANGE(message=None):
return StatusProto(status_code=StatusProto.OUT_OF_RANGE, error_message=message)
def PERMISSION_DENIED(message=None):
return StatusProto(
status_code=StatusProto.PERMISSION_DENIED, error_message=message
)
def INVALID_ARGUMENT(message=None):
return StatusProto(
status_code=StatusProto.INVALID_ARGUMENT, error_message=message
)
def RESOURCE_EXHAUSTED(message=None):
return StatusProto(
status_code=StatusProto.RESOURCE_EXHAUSTED, error_message=message
)
def UNAUTHENTICATED(message=None):
return StatusProto(
status_code=StatusProto.UNAUTHENTICATED, error_message=message
)
def UNIMPLEMENTED(message=None):
return StatusProto(status_code=StatusProto.UNIMPLEMENTED, error_message=message)