Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@post_load
def make_pagination(self, data):
return Pagination(**data)
class ListRunsResponseSchema(BaseSchema):
pagination = fields.Nested(PaginationSchema, required=True)
runs = fields.Nested(RunSummarySchema, many=True, required=True)
@post_load
def make_list_runs_response_schema(self, data):
return ListRunsResponse(**data)
class JobClient(BaseClient):
SERVICE_NAME = "steve"
def list(self, project_id):
"""List the jobs in a project.
Parameters
----------
project_id : uuid.UUID
Returns
-------
List[JobSummary]
"""
endpoint = "/project/{}/job".format(project_id)
return self._get(endpoint, JobSummarySchema(many=True))
)
class DatasetsSecretsSchema(BaseSchema):
bucket = fields.String(required=True)
access_key = fields.String(required=True)
secret_key = fields.String(required=True)
region = fields.String(required=True)
verified = fields.Boolean(required=True)
@post_load
def make_project_datasets_secrets(self, data):
return DatasetsSecrets(**data)
class SecretClient(BaseClient):
_SERVICE_NAME = "secret-service"
def datasets_secrets(self, project_id):
endpoint = "sfs/{}".format(project_id)
return self._get(endpoint, DatasetsSecretsSchema())
@post_load
def make_log_part(self, data):
return LogPart(**data)
class LogPartsResponseSchema(BaseSchema):
log_parts = fields.Nested(
LogPartSchema, data_key="logParts", many=True, required=True
)
@post_load
def make_log_parts_response(self, data):
return LogPartsResponse(**data)
class LogClient(BaseClient):
SERVICE_NAME = "wozniak"
def get_subrun_command_logs(self, project_id, job_id, run_id, subrun_id):
template = "/project/{}/job/{}/run/{}/subrun/{}/command/log-part"
endpoint = template.format(project_id, job_id, run_id, subrun_id)
return self._get_logs(endpoint)
def get_subrun_environment_step_logs(
self, project_id, job_id, run_id, subrun_id, environment_step_id
):
template = (
"/project/{}/job/{}/run/{}/subrun/{}/environment-step/{}/log-part"
)
endpoint = template.format(
project_id, job_id, run_id, subrun_id, environment_step_id
class ParamConflict(Exception):
def __init__(self, message, conflicting_params=None):
super(ParamConflict, self).__init__(message)
if conflicting_params is None:
self.conflicting_params = []
else:
self.conflicting_params = conflicting_params
class ExperimentDeleted(Exception):
def __init__(self, message, experiment_id):
super(ExperimentDeleted, self).__init__(message)
self.experiment_id = experiment_id
class ExperimentClient(BaseClient):
"""Client for the Faculty experiment service.
Either build this client with a session directly, or use the
:func:`faculty.client` helper function:
>>> client = faculty.client("experiment")
Parameters
----------
session : faculty.session.Session
The session to use to make requests
"""
_SERVICE_NAME = "atlas"
def create(
specification = fields.Nested(SpecificationSchema(), required=True)
@post_load
def make_environment_update(self, data):
return EnvironmentCreateUpdate(**data)
class EnvironmentCreationResponseSchema(BaseSchema):
id = fields.UUID(data_key="environmentId", required=True)
@post_load
def make_environment(self, data):
return EnvironmentCreationResponse(**data)
class EnvironmentClient(BaseClient):
SERVICE_NAME = "baskerville"
def list(self, project_id):
endpoint = "/project/{}/environment".format(project_id)
return self._get(endpoint, EnvironmentSchema(many=True))
def get(self, project_id, environment_id):
endpoint = "/project/{}/environment/{}".format(
project_id, environment_id
)
return self._get(endpoint, EnvironmentSchema())
def update(
self, project_id, environment_id, name, specification, description=None
):
class ReportWithVersionsSchema(BaseSchema):
created_at = fields.DateTime(required=True)
name = fields.String(required=True, data_key="report_name")
id = fields.UUID(required=True, data_key="report_id")
description = fields.String(required=True)
active_version_id = fields.UUID(required=True)
versions = fields.Nested(ReportVersionSchema, required=True, many=True)
@post_load
def make_report_with_versions(self, data):
return ReportWithVersions(**data)
class ReportClient(BaseClient):
_SERVICE_NAME = "tavern"
def list(self, project_id):
endpoint = "/project/{}".format(project_id)
return self._get(endpoint, ReportSchema(many=True))
def get_with_versions(self, report_id):
endpoint = "/report/{}/versions".format(report_id)
return self._get(endpoint, ReportWithVersionsSchema())
def get(self, report_id):
endpoint = "/report/{}/active".format(report_id)
return self._get(endpoint, ReportSchema())
def create(
class ModelSchema(BaseSchema):
id = fields.UUID(data_key="modelId", required=True)
name = fields.String(required=True)
description = fields.String(required=True)
user_ids = fields.List(fields.UUID, data_key="users", required=True)
latest_version = fields.Nested(
ModelVersionSchema, data_key="latestVersion", missing=None
)
@post_load
def make_model(self, data):
return Model(**data)
class ModelClient(BaseClient):
SERVICE_NAME = "zoolander"
def get(self, project_id, model_id):
endpoint = "/project/{}/model/{}".format(project_id, model_id)
return self._get(endpoint, ModelSchema())
def list(self, project_id):
endpoint = "/project/{}/model".format(project_id)
return self._get(endpoint, ModelSchema(many=True))
def get_version(self, project_id, model_id, version_id):
endpoint = "/project/{}/model/{}/version/{}".format(
project_id, model_id, version_id
)
return self._get(endpoint, ModelVersionSchema())
Project = namedtuple("Project", ["id", "name", "owner_id"])
class ProjectSchema(BaseSchema):
id = fields.UUID(data_key="projectId", required=True)
name = fields.Str(required=True)
owner_id = fields.UUID(data_key="ownerId", required=True)
@post_load
def make_project(self, data):
return Project(**data)
class ProjectClient(BaseClient):
_SERVICE_NAME = "casebook"
def create(self, owner_id, project_name):
payload = {"owner_id": str(owner_id), "name": project_name}
return self._post("/project", ProjectSchema(), json=payload)
def get(self, project_id):
endpoint = "/project/{}".format(project_id)
return self._get(endpoint, ProjectSchema())
def get_by_owner_and_name(self, owner_id, project_name):
endpoint = "/project/{}/{}".format(owner_id, project_name)
return self._get(endpoint, ProjectSchema())
def list_accessible_by_user(self, user_id):
else:
raise ValueError("Invalid file node type.")
class ListResponseSchema(BaseSchema):
project_id = fields.UUID(data_key="project_id", required=True)
path = fields.String(required=True)
content = fields.List(fields.Nested(FileNodeSchema), required=True)
@post_load
def make_list_response(self, data):
return ListResponse(**data)
class WorkspaceClient(BaseClient):
_SERVICE_NAME = "workspace"
def list(self, project_id, prefix, depth):
endpoint = "/project/{}/file".format(project_id)
params = {"depth": depth, "prefix": prefix}
response = self._get(endpoint, ListResponseSchema(), params=params)
return response.content
data_key="maxJobInstances", required=True
)
milli_cpus = fields.Integer(data_key="milliCpus", required=True)
memory_mb = fields.Integer(data_key="memoryMb", required=True)
num_gpus = fields.Integer(data_key="numGpus", required=True)
gpu_name = fields.String(data_key="gpuName", missing=None)
cost_usd_per_hour = fields.Decimal(
data_key="costUsdPerHour", required=True
)
@post_load
def make_node_type(self, data):
return NodeType(**data)
class ClusterClient(BaseClient):
SERVICE_NAME = "klostermann"
def list_single_tenanted_node_types(
self,
interactive_instances_configured=None,
job_instances_configured=None,
):
"""Get information on single tenanted node types from the cluster.
Parameters
----------
interactive_instances_configured : bool, optional
If True, only get node types which are configured to support
interactive instances, or if False, those which are not configured
to support interactive instances