How to use the dvc.stage.Stage.load function in dvc

To help you get started, we’ve selected a few dvc examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github iterative / dvc / tests / func / test_repro.py View on Github external
def test_force_with_dependencies(self):
        run_out = self.dvc.run(
            fname="datetime.dvc",
            deps=[self.FOO],
            outs=["datetime.txt"],
            cmd='python -c "import time; print(time.time())" > datetime.txt',
        ).outs[0]

        ret = main(["repro", "--force", "datetime.dvc"])
        self.assertEqual(ret, 0)

        repro_out = Stage.load(self.dvc, "datetime.dvc").outs[0]

        self.assertNotEqual(run_out.checksum, repro_out.checksum)
github iterative / dvc / dvc / command / pipeline.py View on Github external
def __build_graph(self, target, commands, outs):
        import networkx
        from dvc.stage import Stage
        from dvc.repo.graph import get_pipeline

        stage = Stage.load(self.repo, target)
        node = relpath(stage.path, self.repo.root_dir)

        G = get_pipeline(self.repo.pipelines, node)
        stages = networkx.get_node_attributes(G, "stage")

        nodes = []
        for n in G:
            stage = stages[n]
            if commands:
                if stage.cmd is None:
                    continue
                nodes.append(stage.cmd)
            elif outs:
                for out in stage.outs:
                    nodes.append(str(out))
            else:
github iterative / dvc / dvc / project.py View on Github external
ignore_build_cache=False,
    ):
        from dvc.stage import Stage

        if not target and not all_pipelines:
            raise ValueError()

        if not interactive:
            config = self.config
            core = config.config[config.SECTION_CORE]
            interactive = core.get(config.SECTION_CORE_INTERACTIVE, False)

        targets = []
        if pipeline or all_pipelines:
            if pipeline:
                stage = Stage.load(self, target)
                node = os.path.relpath(stage.path, self.root_dir)
                pipelines = [self._get_pipeline(node)]
            else:
                pipelines = self.pipelines()

            for G in pipelines:
                for node in G.nodes():
                    if G.in_degree(node) == 0:
                        targets.append(os.path.join(self.root_dir, node))
        else:
            targets.append(target)

        self.files_to_git_add = []

        ret = []
        with self.state:
github iterative / dvc / dvc / repo / update.py View on Github external
def update(self, target):
    from dvc.stage import Stage

    stage = Stage.load(self, target)
    stage.update()

    stage.dump()
github iterative / dvc / dvc / repo / __init__.py View on Github external
NOTE: For large repos, this could be an expensive
              operation. Consider using some memoization.
        """
        from dvc.stage import Stage

        stages = []
        outs = []

        for root, dirs, files in self.tree.walk(
            self.root_dir, dvcignore=self.dvcignore
        ):
            for fname in files:
                path = os.path.join(root, fname)
                if not Stage.is_valid_filename(path):
                    continue
                stage = Stage.load(self, path)
                for out in stage.outs:
                    if out.scheme == "local":
                        outs.append(out.fspath + out.sep)
                stages.append(stage)

            dirs[:] = self._filter_out_dirs(dirs, outs, root)

        return stages
github iterative / dvc / dvc / repo / __init__.py View on Github external
return get_stages(G)

        target = os.path.abspath(target)

        if recursive and os.path.isdir(target):
            attrs = nx.get_node_attributes(G, "stage")
            nodes = [node for node in nx.dfs_postorder_nodes(G)]

            ret = []
            for node in nodes:
                stage = attrs[node]
                if path_isin(stage.path, target):
                    ret.append(stage)
            return ret

        stage = Stage.load(self, target)
        if not with_deps:
            return [stage]

        node = relpath(stage.path, self.root_dir)
        pipeline = get_pipeline(get_pipelines(G), node)

        return [
            pipeline.node[n]["stage"]
            for n in nx.dfs_postorder_nodes(pipeline, node)
        ]
github iterative / dvc / dvc / project.py View on Github external
def lock_stage(self, target, unlock=False):
        from dvc.stage import Stage

        stage = Stage.load(self, target)
        stage.locked = False if unlock else True
        stage.dump()

        return stage
github iterative / dvc / dvc / project.py View on Github external
def _collect(self, target, with_deps=False):
        import networkx as nx
        from dvc.stage import Stage

        stage = Stage.load(self, target)
        if not with_deps:
            return [stage]

        node = os.path.relpath(stage.path, self.root_dir)
        G = self._get_pipeline(node)
        stages = nx.get_node_attributes(G, "stage")

        ret = [stage]
        for n in nx.dfs_postorder_nodes(G, node):
            ret.append(stages[n])

        return ret