Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_log_empty_str(self):
PARAM_NAME = "new param"
fs = FileStore(self.test_root)
run_id = self.exp_data[FileStore.DEFAULT_EXPERIMENT_ID]["runs"][0]
fs.log_param(run_id, Param(PARAM_NAME, ""))
run = fs.get_run(run_id)
assert run.data.params[PARAM_NAME] == ""
def test_bad_experiment_id_recorded_for_run(self):
fs = FileStore(self.test_root)
exp_0 = fs.get_experiment(FileStore.DEFAULT_EXPERIMENT_ID)
all_runs = self._search(fs, exp_0.experiment_id)
all_run_ids = self.exp_data[exp_0.experiment_id]["runs"]
assert len(all_runs) == len(all_run_ids)
# change experiment pointer in run
bad_run_id = str(self.exp_data[exp_0.experiment_id]['runs'][0])
path = os.path.join(self.test_root, str(exp_0.experiment_id), bad_run_id)
experiment_data = read_yaml(path, "meta.yaml")
experiment_data["experiment_id"] = 1
write_yaml(path, "meta.yaml", experiment_data, True)
with pytest.raises(MlflowException) as e:
fs.get_run(bad_run_id)
assert e.message.contains("not found")
def test_log_batch_allows_tag_overwrite(self):
fs = FileStore(self.test_root)
run = self._create_run(fs)
fs.log_batch(run.info.run_id, metrics=[], params=[], tags=[RunTag("t-key", "val")])
fs.log_batch(run.info.run_id, metrics=[], params=[], tags=[RunTag("t-key", "newval")])
self._verify_logged(fs, run.info.run_id, metrics=[], params=[],
tags=[RunTag("t-key", "newval")])
def test_set_tags(self):
fs = FileStore(self.test_root)
run_id = self.exp_data[FileStore.DEFAULT_EXPERIMENT_ID]["runs"][0]
fs.set_tag(run_id, RunTag("tag0", "value0"))
fs.set_tag(run_id, RunTag("tag1", "value1"))
tags = fs.get_run(run_id).data.tags
assert tags["tag0"] == "value0"
assert tags["tag1"] == "value1"
# Can overwrite tags.
fs.set_tag(run_id, RunTag("tag0", "value2"))
tags = fs.get_run(run_id).data.tags
assert tags["tag0"] == "value2"
assert tags["tag1"] == "value1"
# Can set multiline tags.
fs.set_tag(run_id, RunTag("multiline_tag", "value2\nvalue2\nvalue2"))
tags = fs.get_run(run_id).data.tags
def test_weird_param_names(self):
WEIRD_PARAM_NAME = "this is/a weird/but valid param"
fs = FileStore(self.test_root)
run_id = self.exp_data[FileStore.DEFAULT_EXPERIMENT_ID]["runs"][0]
fs.log_param(run_id, Param(WEIRD_PARAM_NAME, "Value"))
run = fs.get_run(run_id)
assert run.data.params[WEIRD_PARAM_NAME] == "Value"
def test_log_batch(self):
fs = FileStore(self.test_root)
run = fs.create_run(
experiment_id=FileStore.DEFAULT_EXPERIMENT_ID, user_id='user', start_time=0, tags=[])
run_id = run.info.run_id
metric_entities = [Metric("m1", 0.87, 12345, 0), Metric("m2", 0.49, 12345, 0)]
param_entities = [Param("p1", "p1val"), Param("p2", "p2val")]
tag_entities = [RunTag("t1", "t1val"), RunTag("t2", "t2val")]
fs.log_batch(
run_id=run_id, metrics=metric_entities, params=param_entities, tags=tag_entities)
self._verify_logged(fs, run_id, metric_entities, param_entities, tag_entities)
def test_log_batch_tags_idempotency(self):
fs = FileStore(self.test_root)
run = self._create_run(fs)
fs.log_batch(run.info.run_id, metrics=[], params=[], tags=[RunTag("t-key", "t-val")])
fs.log_batch(run.info.run_id, metrics=[], params=[], tags=[RunTag("t-key", "t-val")])
self._verify_logged(fs, run.info.run_id, metrics=[], params=[],
tags=[RunTag("t-key", "t-val")])
def _get_file_store(cls, store_uri, artifact_uri):
from mlflow.store.tracking.file_store import FileStore
return FileStore(store_uri, artifact_uri)
def _get_file_store(store_uri, **_):
return FileStore(store_uri, store_uri)
def _get_run_files(self, run_uuid, resource_type):
_validate_run_id(run_uuid)
run_info = self._get_run_info(run_uuid)
if run_info is None:
raise MlflowException("Run '%s' metadata is in invalid state." % run_uuid,
databricks_pb2.INVALID_STATE)
_, run_dir = self._find_run_root(run_uuid)
# run_dir exists since run validity has been confirmed above.
if resource_type == "metric":
subfolder_name = FileStore.METRICS_FOLDER_NAME
elif resource_type == "param":
subfolder_name = FileStore.PARAMS_FOLDER_NAME
elif resource_type == "tag":
subfolder_name = FileStore.TAGS_FOLDER_NAME
else:
raise Exception("Looking for unknown resource under run.")
return self._get_resource_files(run_dir, subfolder_name)