Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ret = main(["cache", "dir", new_cache_dir])
self.assertEqual(ret, 0)
ret = main(["checkout", "-f"])
self.assertEqual(ret, 0)
self.assertTrue(System.is_symlink(self.FOO))
new_foo_link = readlink(self.FOO)
self.assertTrue(System.is_symlink(self.DATA))
new_data_link = readlink(self.DATA)
self.assertEqual(
relpath(old_foo_link, old_cache_dir),
relpath(new_foo_link, new_cache_dir),
)
self.assertEqual(
relpath(old_data_link, old_cache_dir),
relpath(new_data_link, new_cache_dir),
)
def test_relpath_windows_different_drives():
path1 = os.path.join("A:", os.sep, "some", "path")
path2 = os.path.join("B:", os.sep, "other", "path")
assert relpath(path1, path2) == path1
info1, info2 = PathInfo(path1), PathInfo(path2)
rel_info = relpath(info1, info2)
assert isinstance(rel_info, str)
assert rel_info == path1
def ignore_remove(self, path):
entry, gitignore = self._get_gitignore(path)
if not os.path.exists(gitignore):
return
with open(gitignore, "r") as fobj:
lines = fobj.readlines()
filtered = list(filter(lambda x: x.strip() != entry.strip(), lines))
with open(gitignore, "w") as fobj:
fobj.writelines(filtered)
self.track_file(relpath(gitignore))
def _get_gitignore(self, path):
ignore_file_dir = os.path.dirname(path)
assert os.path.isabs(path)
assert os.path.isabs(ignore_file_dir)
entry = relpath(path, ignore_file_dir).replace(os.sep, "/")
# NOTE: using '/' prefix to make path unambiguous
if len(entry) > 0 and entry[0] != "/":
entry = "/" + entry
gitignore = os.path.join(ignore_file_dir, self.GITIGNORE)
if not path_isin(gitignore, os.path.realpath(self.root_dir)):
raise FileNotInRepoError(path)
return entry, gitignore
def ignore(self, path):
entry, gitignore = self._get_gitignore(path)
if self._ignored(entry, gitignore):
return
msg = "Adding '{}' to '{}'.".format(relpath(path), relpath(gitignore))
logger.debug(msg)
self._add_entry_to_gitignore(entry, gitignore)
self.track_file(relpath(gitignore))
self.ignored_paths.append(path)
def _get_gitignore(self, path):
ignore_file_dir = os.path.dirname(path)
assert os.path.isabs(path)
assert os.path.isabs(ignore_file_dir)
entry = relpath(path, ignore_file_dir).replace(os.sep, "/")
# NOTE: using '/' prefix to make path unambiguous
if len(entry) > 0 and entry[0] != "/":
entry = "/" + entry
gitignore = os.path.join(ignore_file_dir, self.GITIGNORE)
if not gitignore.startswith(os.path.realpath(self.root_dir)):
raise FileNotInRepoError(path)
return entry, gitignore
def _show(self, target, commands, outs, locked):
import networkx
from dvc.stage import Stage
stage = Stage.load(self.repo, target)
G = self.repo.graph
stages = networkx.get_node_attributes(G, "stage")
node = relpath(stage.path, self.repo.root_dir)
nodes = networkx.dfs_postorder_nodes(G, node)
if locked:
nodes = [n for n in nodes if stages[n].locked]
for n in nodes:
if commands:
if stages[n].cmd is None:
continue
logger.info(stages[n].cmd)
elif outs:
for out in stages[n].outs:
logger.info(str(out))
else:
logger.info(n)
node = relpath(stage.path, self.root_dir)
G.add_node(node, stage=stage)
for dep in stage.deps:
if dep.path_info is None:
continue
for out in outs:
if (
out == dep.path_info
or dep.path_info.isin(out)
or out.isin(dep.path_info)
):
dep_stage = outs[out].stage
dep_node = relpath(dep_stage.path, self.root_dir)
G.add_node(dep_node, stage=dep_stage)
G.add_edge(node, dep_node)
check_acyclic(G)
return G
def ignore(self, path):
entry, gitignore = self._get_gitignore(path)
if self._ignored(entry, gitignore):
return
msg = "Adding '{}' to '{}'.".format(relpath(path), relpath(gitignore))
logger.debug(msg)
self._add_entry_to_gitignore(entry, gitignore)
self.track_file(relpath(gitignore))
self.ignored_paths.append(path)
def load_dir_cache(self, checksum):
path_info = self.checksum_to_path_info(checksum)
try:
with self.cache.open(path_info, "r") as fobj:
d = json.load(fobj)
except (ValueError, FileNotFoundError) as exc:
raise DirCacheError(checksum, cause=exc)
if not isinstance(d, list):
msg = "dir cache file format error '{}' [skipping the file]"
logger.error(msg.format(relpath(path_info)))
return []
for info in d:
# NOTE: here is a BUG, see comment to .as_posix() below
relative_path = PathInfo.from_posix(info[self.PARAM_RELPATH])
info[self.PARAM_RELPATH] = relative_path.fspath
return d