Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_yatai_service_mock(repo_storage_type=BentoUri.LOCAL):
bento_pb = Bento(name='bento_test_name', version='version1.1.1')
if repo_storage_type == BentoUri.LOCAL:
bento_pb.uri.uri = '/fake/path/to/bundle'
bento_pb.uri.type = repo_storage_type
api = BentoServiceMetadata.BentoServiceApi(name='predict')
bento_pb.bento_service_metadata.apis.extend([api])
get_bento_response = GetBentoResponse(bento=bento_pb)
yatai_service_mock = MagicMock()
yatai_service_mock.GetBento.return_value = get_bento_response
return yatai_service_mock
def create_yatai_service_mock(repo_storage_type=BentoUri.LOCAL):
bento_pb = Bento(
name=TEST_DEPLOYMENT_BENTO_NAME, version=TEST_DEPLOYMENT_BENTO_VERSION
)
if repo_storage_type == BentoUri.LOCAL:
bento_pb.uri.uri = TEST_DEPLOYMENT_BENTO_LOCAL_URI
bento_pb.uri.type = repo_storage_type
api = BentoServiceMetadata.BentoServiceApi(name=TEST_BENTO_API_NAME)
bento_pb.bento_service_metadata.apis.extend([api])
get_bento_response = GetBentoResponse(bento=bento_pb)
yatai_service_mock = MagicMock()
yatai_service_mock.GetBento.return_value = get_bento_response
return yatai_service_mock
def create_yatai_service_mock(repo_storage_type=BentoUri.LOCAL):
bento_pb = Bento(name='bento_test_name', version='version1.1.1')
if repo_storage_type == BentoUri.LOCAL:
bento_pb.uri.uri = '/tmp/path/to/bundle'
bento_pb.uri.type = repo_storage_type
api = BentoServiceMetadata.BentoServiceApi(name='predict')
bento_pb.bento_service_metadata.apis.extend([api])
bento_pb.bento_service_metadata.env.python_version = '3.7.0'
get_bento_response = GetBentoResponse(bento=bento_pb)
yatai_service_mock = MagicMock()
yatai_service_mock.GetBento.return_value = get_bento_response
return yatai_service_mock
def add(self, deployment_pb):
try:
ensure_sam_available_or_raise()
ensure_docker_available_or_raise()
deployment_spec = deployment_pb.spec
bento_pb = self.yatai_service.GetBento(
GetBentoRequest(
bento_name=deployment_spec.bento_name,
bento_version=deployment_spec.bento_version,
)
)
if bento_pb.bento.uri.type not in (BentoUri.LOCAL, BentoUri.S3):
raise BentoMLException(
'BentoML currently not support {} repository'.format(
BentoUri.StorageType.Name(bento_pb.bento.uri.type)
)
)
return self._add(deployment_pb, bento_pb, bento_pb.bento.uri.uri)
except BentoMLException as error:
deployment_pb.state.state = DeploymentState.ERROR
deployment_pb.state.error_message = f'Error: {str(error)}'
return ApplyDeploymentResponse(
status=error.status_proto, deployment=deployment_pb
)
DEFAULT_UPLOAD_STATUS = UploadStatus(status=UploadStatus.UNINITIALIZED)
class Bento(Base):
__tablename__ = 'bentos'
__table_args__ = tuple(UniqueConstraint('name', 'version', name='_name_version_uc'))
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
version = Column(String, nullable=False)
# Storage URI for this Bento
uri = Column(String, nullable=False)
uri_type = Column(Enum(*BentoUri.StorageType.keys()), default=BentoUri.UNSET)
# JSON filed mapping directly to BentoServiceMetadata proto message
bento_service_metadata = Column(JSON, nullable=False, default={})
# Time of AddBento call, the time of Bento creation can be found in metadata field
created_at = Column(DateTime, default=datetime.datetime.utcnow)
# latest upload status, JSON message also includes last update timestamp
upload_status = Column(
JSON, nullable=False, default=ProtoMessageToDict(DEFAULT_UPLOAD_STATUS)
)
# mark as deleted
deleted = Column(Boolean, default=False)
def __init__(self, base_url):
if not os.path.exists(base_url):
# make sure local repo base path exist
os.mkdir(base_url)
self.base_path = base_url
self.uri_type = BentoUri.LOCAL
def apply(self, deployment_pb, yatai_service, prev_deployment=None):
try:
ensure_docker_available_or_raise()
deployment_spec = deployment_pb.spec
bento_pb = yatai_service.GetBento(
GetBentoRequest(
bento_name=deployment_spec.bento_name,
bento_version=deployment_spec.bento_version,
)
)
if bento_pb.bento.uri.type not in (BentoUri.LOCAL, BentoUri.S3):
raise BentoMLException(
'BentoML currently not support {} repository'.format(
bento_pb.bento.uri.type
)
)
return self._apply(
deployment_pb, bento_pb, yatai_service, bento_pb.bento.uri.uri
)
except BentoMLException as error:
return ApplyDeploymentResponse(status=error.status_proto)
def add(self, deployment_pb):
try:
ensure_docker_available_or_raise()
deployment_spec = deployment_pb.spec
sagemaker_config = deployment_spec.sagemaker_operator_config
if sagemaker_config is None:
raise YataiDeploymentException('Sagemaker configuration is missing.')
bento_pb = self.yatai_service.GetBento(
GetBentoRequest(
bento_name=deployment_spec.bento_name,
bento_version=deployment_spec.bento_version,
)
)
if bento_pb.bento.uri.type not in (BentoUri.LOCAL, BentoUri.S3):
raise BentoMLException(
'BentoML currently not support {} repository'.format(
BentoUri.StorageType.Name(bento_pb.bento.uri.type)
)
)
return self._add(deployment_pb, bento_pb, bento_pb.bento.uri.uri)
except BentoMLException as error:
deployment_pb.state.state = DeploymentState.ERROR
deployment_pb.state.error_message = (
f'Error creating SageMaker deployment: {str(error)}'
)
return ApplyDeploymentResponse(
status=error.status_proto, deployment=deployment_pb
)