Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_force_with_dependencies(self):
run_out = self.dvc.run(
fname="datetime.dvc",
deps=[self.FOO],
outs=["datetime.txt"],
cmd='python -c "import time; print(time.time())" > datetime.txt',
).outs[0]
ret = main(["repro", "--force", "datetime.dvc"])
self.assertEqual(ret, 0)
repro_out = Stage.load(self.dvc, "datetime.dvc").outs[0]
self.assertNotEqual(run_out.checksum, repro_out.checksum)
def __build_graph(self, target, commands, outs):
import networkx
from dvc.stage import Stage
from dvc.repo.graph import get_pipeline
stage = Stage.load(self.repo, target)
node = relpath(stage.path, self.repo.root_dir)
G = get_pipeline(self.repo.pipelines, node)
stages = networkx.get_node_attributes(G, "stage")
nodes = []
for n in G:
stage = stages[n]
if commands:
if stage.cmd is None:
continue
nodes.append(stage.cmd)
elif outs:
for out in stage.outs:
nodes.append(str(out))
else:
ignore_build_cache=False,
):
from dvc.stage import Stage
if not target and not all_pipelines:
raise ValueError()
if not interactive:
config = self.config
core = config.config[config.SECTION_CORE]
interactive = core.get(config.SECTION_CORE_INTERACTIVE, False)
targets = []
if pipeline or all_pipelines:
if pipeline:
stage = Stage.load(self, target)
node = os.path.relpath(stage.path, self.root_dir)
pipelines = [self._get_pipeline(node)]
else:
pipelines = self.pipelines()
for G in pipelines:
for node in G.nodes():
if G.in_degree(node) == 0:
targets.append(os.path.join(self.root_dir, node))
else:
targets.append(target)
self.files_to_git_add = []
ret = []
with self.state:
def update(self, target):
from dvc.stage import Stage
stage = Stage.load(self, target)
stage.update()
stage.dump()
NOTE: For large repos, this could be an expensive
operation. Consider using some memoization.
"""
from dvc.stage import Stage
stages = []
outs = []
for root, dirs, files in self.tree.walk(
self.root_dir, dvcignore=self.dvcignore
):
for fname in files:
path = os.path.join(root, fname)
if not Stage.is_valid_filename(path):
continue
stage = Stage.load(self, path)
for out in stage.outs:
if out.scheme == "local":
outs.append(out.fspath + out.sep)
stages.append(stage)
dirs[:] = self._filter_out_dirs(dirs, outs, root)
return stages
return get_stages(G)
target = os.path.abspath(target)
if recursive and os.path.isdir(target):
attrs = nx.get_node_attributes(G, "stage")
nodes = [node for node in nx.dfs_postorder_nodes(G)]
ret = []
for node in nodes:
stage = attrs[node]
if path_isin(stage.path, target):
ret.append(stage)
return ret
stage = Stage.load(self, target)
if not with_deps:
return [stage]
node = relpath(stage.path, self.root_dir)
pipeline = get_pipeline(get_pipelines(G), node)
return [
pipeline.node[n]["stage"]
for n in nx.dfs_postorder_nodes(pipeline, node)
]
def lock_stage(self, target, unlock=False):
from dvc.stage import Stage
stage = Stage.load(self, target)
stage.locked = False if unlock else True
stage.dump()
return stage
def _collect(self, target, with_deps=False):
import networkx as nx
from dvc.stage import Stage
stage = Stage.load(self, target)
if not with_deps:
return [stage]
node = os.path.relpath(stage.path, self.root_dir)
G = self._get_pipeline(node)
stages = nx.get_node_attributes(G, "stage")
ret = [stage]
for n in nx.dfs_postorder_nodes(G, node):
ret.append(stages[n])
return ret