How to use the dvc.main.main function in dvc

To help you get started, we’ve selected a few dvc examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github iterative / dvc / tests / func / test_repro.py View on Github external
def test(self, mock_prompt):
        if not self.should_test():
            return

        cache = (
            self.scheme
            + self.scheme_sep
            + self.bucket
            + self.sep
            + str(uuid.uuid4())
        )

        ret = main(["config", "cache." + self.cache_scheme, "myrepo"])
        self.assertEqual(ret, 0)
        ret = main(["remote", "add", "myrepo", cache])
        self.assertEqual(ret, 0)
        ret = main(["remote", "modify", "myrepo", "type", self.cache_type])
        self.assertEqual(ret, 0)

        remote_name = "myremote"
        remote_key = str(uuid.uuid4())
        remote = (
            self.scheme + self.scheme_sep + self.bucket + self.sep + remote_key
        )

        ret = main(["remote", "add", remote_name, remote])
        self.assertEqual(ret, 0)
        ret = main(["remote", "modify", remote_name, "type", self.cache_type])
        self.assertEqual(ret, 0)

        self.dvc = DvcRepo(".")
github iterative / dvc / tests / func / test_checkout.py View on Github external
def test(self):
        fname_master = "file_in_a_master"
        branch_master = "master"
        fname_branch = "file_in_a_branch"
        branch_1 = "b1"

        self.dvc.scm.add(self.dvc.scm.untracked_files())
        self.dvc.scm.commit("add all files")
        self.commit_data_file(fname_master)

        self.dvc.scm.checkout(branch_1, True)
        ret = main(["checkout", "--force"])
        self.assertEqual(ret, 0)
        self.commit_data_file(fname_branch)

        self.dvc.scm.checkout(branch_master)
        ret = main(["checkout", "--force"])
        self.assertEqual(ret, 0)

        ignored = self.read_ignored()

        self.assertEqual(len(ignored), 1)
        self.assertIn("/" + fname_master, ignored)

        self.dvc.scm.checkout(branch_1)
        ret = main(["checkout", "--force"])
        self.assertEqual(ret, 0)
        ignored = self.read_ignored()
github iterative / dvc / tests / func / test_repro.py View on Github external
def test(self):
        metrics_file = "metrics_file"
        metrics_value = 0.123489015
        ret = main(
            [
                "run",
                "-m",
                metrics_file,
                "echo {} >> {}".format(metrics_value, metrics_file),
            ]
        )
        self.assertEqual(0, ret)

        self._caplog.clear()
        ret = main(
            [
                "repro",
                "--force",
                "--metrics",
                metrics_file + Stage.STAGE_FILE_SUFFIX,
            ]
        )
        self.assertEqual(0, ret)

        expected_metrics_display = "{}: {}".format(metrics_file, metrics_value)
        self.assertIn(expected_metrics_display, self._caplog.text)
github iterative / dvc / tests / func / test_data_cloud.py View on Github external
def test(self):
        test_get_file_checksum = spy(RemoteLOCAL.get_file_checksum)
        with patch.object(
            RemoteLOCAL, "get_file_checksum", test_get_file_checksum
        ):
            url = get_local_url()
            ret = main(["remote", "add", "-d", TEST_REMOTE, url])
            self.assertEqual(ret, 0)
            ret = main(["config", "cache.type", "hardlink"])
            self.assertEqual(ret, 0)
            ret = main(["add", self.FOO])
            self.assertEqual(ret, 0)
            ret = main(["push"])
            self.assertEqual(ret, 0)
            ret = main(["run", "-d", self.FOO, "echo foo"])
            self.assertEqual(ret, 0)
        self.assertEqual(test_get_file_checksum.mock.call_count, 1)
github iterative / dvc / tests / func / test_cache.py View on Github external
def test(self):
        cache_dir = TestDvc.mkdtemp()

        ret = main(["config", "cache.dir", cache_dir])
        self.assertEqual(ret, 0)

        self.assertFalse(os.path.exists(self.dvc.cache.local.cache_dir))

        ret = main(["add", self.FOO])
        self.assertEqual(ret, 0)

        ret = main(["add", self.DATA_DIR])
        self.assertEqual(ret, 0)

        self.assertFalse(os.path.exists(".dvc/cache"))
        self.assertNotEqual(len(os.listdir(cache_dir)), 0)
github iterative / dvc / tests / func / test_run.py View on Github external
def test_cached(self):
        ret = main(["run", "-m", "metrics.txt", "echo test > metrics.txt"])
        self.assertEqual(ret, 0)
        with open("metrics.txt", "r") as fd:
            self.assertEqual(fd.read().rstrip(), "test")
github iterative / dvc / tests / func / test_data_cloud.py View on Github external
def setUp(self):
        super(TestShouldWarnOnNoChecksumInLocalAndRemoteCache, self).setUp()

        cache_dir = self.mkdtemp()
        ret = main(["add", self.FOO])
        self.assertEqual(0, ret)

        ret = main(["add", self.BAR])
        self.assertEqual(0, ret)

        # purge cache
        shutil.rmtree(self.dvc.cache.local.cache_dir)

        ret = main(["remote", "add", "remote_name", "-d", cache_dir])
        self.assertEqual(0, ret)

        checksum_foo = file_md5(self.FOO)[0]
        checksum_bar = file_md5(self.BAR)[0]
        self.message_header = (
            "Some of the cache files do not exist neither locally "
            "nor on remote. Missing cache files: "
        )
        self.message_bar_part = "name: {}, md5: {}".format(
            self.BAR, checksum_bar
github iterative / dvc / tests / func / test_metrics.py View on Github external
def test_show_multiple_outputs(tmp_dir, dvc, caplog):
    tmp_dir.gen(
        {
            "1.json": json.dumps({"AUC": 1}),
            "2.json": json.dumps({"AUC": 2}),
            "metrics/3.json": json.dumps({"AUC": 3}),
        }
    )

    dvc.run(cmd="", overwrite=True, metrics=["1.json"])
    dvc.run(cmd="", overwrite=True, metrics=["2.json"])
    dvc.run(cmd="", overwrite=True, metrics=["metrics/3.json"])

    with caplog.at_level(logging.INFO, logger="dvc"):
        assert 0 == main(["metrics", "show", "1.json", "2.json"])
        assert '1.json: {"AUC": 1}' in caplog.text
        assert '2.json: {"AUC": 2}' in caplog.text

    caplog.clear()

    with caplog.at_level(logging.INFO, logger="dvc"):
        assert 0 == main(["metrics", "show", "-R", "1.json", "metrics"])
        assert '1.json: {"AUC": 1}' in caplog.text
        assert '3.json: {"AUC": 3}' in caplog.text

    caplog.clear()

    with caplog.at_level(logging.INFO, logger="dvc"):
        assert 1 == main(["metrics", "show", "1.json", "not-found"])
        assert '1.json: {"AUC": 1}' in caplog.text
        assert (
github iterative / dvc / tests / func / test_pipeline.py View on Github external
def test_dot(self):
        ret = main(["pipeline", "show", "--dot", self.file1_stage])
        self.assertEqual(ret, 0)
github iterative / dvc / tests / func / test_cache.py View on Github external
def test_shared_cache(repo_dir, dvc_repo, protected, dir_mode, file_mode):
    assert main(["config", "cache.shared", "group"]) == 0

    if protected:
        assert main(["config", "cache.protected", "true"]) == 0

    assert main(["add", repo_dir.FOO]) == 0
    assert main(["add", repo_dir.DATA_DIR]) == 0

    for root, dnames, fnames in os.walk(dvc_repo.cache.local.cache_dir):
        for dname in dnames:
            path = os.path.join(root, dname)
            assert stat.S_IMODE(os.stat(path).st_mode) == dir_mode

        for fname in fnames:
            path = os.path.join(root, fname)
            assert stat.S_IMODE(os.stat(path).st_mode) == file_mode