Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test(self, mock_prompt):
if not self.should_test():
return
cache = (
self.scheme
+ self.scheme_sep
+ self.bucket
+ self.sep
+ str(uuid.uuid4())
)
ret = main(["config", "cache." + self.cache_scheme, "myrepo"])
self.assertEqual(ret, 0)
ret = main(["remote", "add", "myrepo", cache])
self.assertEqual(ret, 0)
ret = main(["remote", "modify", "myrepo", "type", self.cache_type])
self.assertEqual(ret, 0)
remote_name = "myremote"
remote_key = str(uuid.uuid4())
remote = (
self.scheme + self.scheme_sep + self.bucket + self.sep + remote_key
)
ret = main(["remote", "add", remote_name, remote])
self.assertEqual(ret, 0)
ret = main(["remote", "modify", remote_name, "type", self.cache_type])
self.assertEqual(ret, 0)
self.dvc = DvcRepo(".")
def test(self):
fname_master = "file_in_a_master"
branch_master = "master"
fname_branch = "file_in_a_branch"
branch_1 = "b1"
self.dvc.scm.add(self.dvc.scm.untracked_files())
self.dvc.scm.commit("add all files")
self.commit_data_file(fname_master)
self.dvc.scm.checkout(branch_1, True)
ret = main(["checkout", "--force"])
self.assertEqual(ret, 0)
self.commit_data_file(fname_branch)
self.dvc.scm.checkout(branch_master)
ret = main(["checkout", "--force"])
self.assertEqual(ret, 0)
ignored = self.read_ignored()
self.assertEqual(len(ignored), 1)
self.assertIn("/" + fname_master, ignored)
self.dvc.scm.checkout(branch_1)
ret = main(["checkout", "--force"])
self.assertEqual(ret, 0)
ignored = self.read_ignored()
def test(self):
metrics_file = "metrics_file"
metrics_value = 0.123489015
ret = main(
[
"run",
"-m",
metrics_file,
"echo {} >> {}".format(metrics_value, metrics_file),
]
)
self.assertEqual(0, ret)
self._caplog.clear()
ret = main(
[
"repro",
"--force",
"--metrics",
metrics_file + Stage.STAGE_FILE_SUFFIX,
]
)
self.assertEqual(0, ret)
expected_metrics_display = "{}: {}".format(metrics_file, metrics_value)
self.assertIn(expected_metrics_display, self._caplog.text)
def test(self):
test_get_file_checksum = spy(RemoteLOCAL.get_file_checksum)
with patch.object(
RemoteLOCAL, "get_file_checksum", test_get_file_checksum
):
url = get_local_url()
ret = main(["remote", "add", "-d", TEST_REMOTE, url])
self.assertEqual(ret, 0)
ret = main(["config", "cache.type", "hardlink"])
self.assertEqual(ret, 0)
ret = main(["add", self.FOO])
self.assertEqual(ret, 0)
ret = main(["push"])
self.assertEqual(ret, 0)
ret = main(["run", "-d", self.FOO, "echo foo"])
self.assertEqual(ret, 0)
self.assertEqual(test_get_file_checksum.mock.call_count, 1)
def test(self):
cache_dir = TestDvc.mkdtemp()
ret = main(["config", "cache.dir", cache_dir])
self.assertEqual(ret, 0)
self.assertFalse(os.path.exists(self.dvc.cache.local.cache_dir))
ret = main(["add", self.FOO])
self.assertEqual(ret, 0)
ret = main(["add", self.DATA_DIR])
self.assertEqual(ret, 0)
self.assertFalse(os.path.exists(".dvc/cache"))
self.assertNotEqual(len(os.listdir(cache_dir)), 0)
def test_cached(self):
ret = main(["run", "-m", "metrics.txt", "echo test > metrics.txt"])
self.assertEqual(ret, 0)
with open("metrics.txt", "r") as fd:
self.assertEqual(fd.read().rstrip(), "test")
def setUp(self):
super(TestShouldWarnOnNoChecksumInLocalAndRemoteCache, self).setUp()
cache_dir = self.mkdtemp()
ret = main(["add", self.FOO])
self.assertEqual(0, ret)
ret = main(["add", self.BAR])
self.assertEqual(0, ret)
# purge cache
shutil.rmtree(self.dvc.cache.local.cache_dir)
ret = main(["remote", "add", "remote_name", "-d", cache_dir])
self.assertEqual(0, ret)
checksum_foo = file_md5(self.FOO)[0]
checksum_bar = file_md5(self.BAR)[0]
self.message_header = (
"Some of the cache files do not exist neither locally "
"nor on remote. Missing cache files: "
)
self.message_bar_part = "name: {}, md5: {}".format(
self.BAR, checksum_bar
def test_show_multiple_outputs(tmp_dir, dvc, caplog):
tmp_dir.gen(
{
"1.json": json.dumps({"AUC": 1}),
"2.json": json.dumps({"AUC": 2}),
"metrics/3.json": json.dumps({"AUC": 3}),
}
)
dvc.run(cmd="", overwrite=True, metrics=["1.json"])
dvc.run(cmd="", overwrite=True, metrics=["2.json"])
dvc.run(cmd="", overwrite=True, metrics=["metrics/3.json"])
with caplog.at_level(logging.INFO, logger="dvc"):
assert 0 == main(["metrics", "show", "1.json", "2.json"])
assert '1.json: {"AUC": 1}' in caplog.text
assert '2.json: {"AUC": 2}' in caplog.text
caplog.clear()
with caplog.at_level(logging.INFO, logger="dvc"):
assert 0 == main(["metrics", "show", "-R", "1.json", "metrics"])
assert '1.json: {"AUC": 1}' in caplog.text
assert '3.json: {"AUC": 3}' in caplog.text
caplog.clear()
with caplog.at_level(logging.INFO, logger="dvc"):
assert 1 == main(["metrics", "show", "1.json", "not-found"])
assert '1.json: {"AUC": 1}' in caplog.text
assert (
def test_dot(self):
ret = main(["pipeline", "show", "--dot", self.file1_stage])
self.assertEqual(ret, 0)
def test_shared_cache(repo_dir, dvc_repo, protected, dir_mode, file_mode):
assert main(["config", "cache.shared", "group"]) == 0
if protected:
assert main(["config", "cache.protected", "true"]) == 0
assert main(["add", repo_dir.FOO]) == 0
assert main(["add", repo_dir.DATA_DIR]) == 0
for root, dnames, fnames in os.walk(dvc_repo.cache.local.cache_dir):
for dname in dnames:
path = os.path.join(root, dname)
assert stat.S_IMODE(os.stat(path).st_mode) == dir_mode
for fname in fnames:
path = os.path.join(root, fname)
assert stat.S_IMODE(os.stat(path).st_mode) == file_mode