Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_filter_schema_invalid_value_with_key(filter_type, value):
filter = filter_type("key", ComparisonOperator.EQUAL_TO, value)
with pytest.raises(ValidationError):
FilterSchema().dump(filter)
def test_filter_schema_invalid_operator_no_key(filter_type, value, operator):
filter = filter_type(operator, value)
with pytest.raises(ValidationError, match="Not a discrete operator"):
FilterSchema().dump(filter)
def test_filter_schema_experiment_id(
operator, value, expected_operator, expected_value
):
filter = ExperimentIdFilter(operator, value)
data = FilterSchema().dump(filter)
assert data == {
"by": "experimentId",
"operator": expected_operator,
"value": expected_value,
}
def test_filter_schema_run_id(
operator, value, expected_operator, expected_value
):
filter = RunIdFilter(operator, value)
data = FilterSchema().dump(filter)
assert data == {
"by": "runId",
"operator": expected_operator,
"value": expected_value,
}
def test_filter_schema_compound(operator, expected_operator):
filter = CompoundFilter(operator, [PROJECT_ID_FILTER, TAG_FILTER])
data = FilterSchema().dump(filter)
assert data == {
"operator": expected_operator,
"conditions": [PROJECT_ID_FILTER_BODY, TAG_FILTER_BODY],
}
def test_filter_schema_metric(
operator, value, expected_operator, expected_value
):
filter = MetricFilter("metric-key", operator, value)
data = FilterSchema().dump(filter)
assert data == {
"by": "metric",
"key": "metric-key",
"operator": expected_operator,
"value": expected_value,
}
def test_filter_schema_tag(operator, value, expected_operator, expected_value):
filter = TagFilter("tag-key", operator, value)
data = FilterSchema().dump(filter)
assert data == {
"by": "tag",
"key": "tag-key",
"operator": expected_operator,
"value": expected_value,
}
endpoint = "/project/{}/run/delete/query".format(project_id)
if run_ids is None:
# Delete all runs in project
payload = {} # No filter
elif len(run_ids) == 0:
return DeleteExperimentRunsResponse(
deleted_run_ids=[], conflicted_run_ids=[]
)
else:
run_id_filters = [
RunIdFilter(ComparisonOperator.EQUAL_TO, run_id)
for run_id in run_ids
]
filter = CompoundFilter(LogicalOperator.OR, run_id_filters)
payload = {"filter": FilterSchema().dump(filter)}
return self._post(
endpoint, DeleteExperimentRunsResponseSchema(), json=payload
)
by = fields.Constant("metric", dump_only=True)
class SortSchema(_OneOfSchemaWithoutType):
type_schemas = {
"StartedAtSort": _StartedAtSortSchema,
"RunNumberSort": _RunNumberSortSchema,
"DurationSort": _DurationSortSchema,
"TagSort": _TagSortSchema,
"ParamSort": _ParamSortSchema,
"MetricSort": _MetricSortSchema,
}
class RunQuerySchema(BaseSchema):
filter = _OptionalField(fields.Nested(FilterSchema))
sort = fields.List(fields.Nested(SortSchema))
page = fields.Nested(PageSchema, missing=None)
# Schemas for responses returned from API:
class DeleteExperimentRunsResponseSchema(BaseSchema):
deleted_run_ids = fields.List(
fields.UUID(), data_key="deletedRunIds", required=True
)
conflicted_run_ids = fields.List(
fields.UUID(), data_key="conflictedRunIds", required=True
)
@post_load