Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create(self, name, contents):
dname = os.path.dirname(name)
if len(dname) > 0 and not os.path.isdir(dname):
os.makedirs(dname)
with open(name, "a", encoding="utf-8") as f:
f.write(
contents
if isinstance(contents, str)
else contents.decode("utf-8")
)
def init(dvc_dir):
"""Initializes dvc config.
Args:
dvc_dir (str): path to .dvc directory.
Returns:
dvc.config.Config: config object.
"""
config_file = os.path.join(dvc_dir, Config.CONFIG)
open(config_file, "w+").close()
return Config(dvc_dir)
def _install_hook(self, name, cmd):
command = (
'[ "$3" = "0" ]'
' || [ -z "$(git ls-files .dvc)" ]'
" || exec dvc {}".format(cmd)
)
hook = self._hook_path(name)
if os.path.isfile(hook):
with open(hook, "r+") as fobj:
if command not in fobj.read():
fobj.write("{command}\n".format(command=command))
else:
with open(hook, "w+") as fobj:
fobj.write("#!/bin/sh\n" "{command}\n".format(command=command))
os.chmod(hook, 0o777)
from dvc.progress import Tqdm
from dvc.system import System
name = name if name else os.path.basename(dest)
total = os.stat(src).st_size
if os.path.isdir(dest):
dest = os.path.join(dest, os.path.basename(src))
try:
System.reflink(src, dest)
except DvcException:
with Tqdm(
desc=name, disable=no_progress_bar, total=total, bytes=True
) as pbar:
with open(src, "rb") as fsrc, open(dest, "wb+") as fdest:
while True:
buf = fsrc.read(LOCAL_CHUNK_SIZE)
if not buf:
break
fdest.write(buf)
pbar.update(len(buf))
def _install_hook(self, name, cmd):
command = (
'[ "$3" = "0" ]'
' || [ -z "$(git ls-files .dvc)" ]'
" || exec dvc {}".format(cmd)
)
hook = self._hook_path(name)
if os.path.isfile(hook):
with open(hook, "r+") as fobj:
if command not in fobj.read():
fobj.write("{command}\n".format(command=command))
else:
with open(hook, "w+") as fobj:
fobj.write("#!/bin/sh\n" "{command}\n".format(command=command))
os.chmod(hook, 0o777)
def _install_hook(self, name, cmd):
command = (
'[ "$3" = "0" ]'
' || [ -z "$(git ls-files .dvc)" ]'
" || exec dvc {}".format(cmd)
)
hook = self._hook_path(name)
if os.path.isfile(hook):
with open(hook, "r+") as fobj:
if command not in fobj.read():
fobj.write("{command}\n".format(command=command))
else:
with open(hook, "w+") as fobj:
fobj.write("#!/bin/sh\n" "{command}\n".format(command=command))
os.chmod(hook, 0o777)
def ignore_remove(self, path):
entry, gitignore = self._get_gitignore(path)
if not os.path.exists(gitignore):
return
with open(gitignore, "r") as fobj:
lines = fobj.readlines()
filtered = list(filter(lambda x: x.strip() != entry.strip(), lines))
with open(gitignore, "w") as fobj:
fobj.writelines(filtered)
self.track_file(relpath(gitignore))
cache_file = self.cache.local.checksum_to_path_info(out.checksum)
cache_file = fspath_py35(cache_file)
if os.path.exists(cache_file):
return _open(cache_file, mode=mode, encoding=encoding)
try:
remote_obj = self.cloud.get_remote(remote)
remote_info = remote_obj.checksum_to_path_info(out.checksum)
return remote_obj.open(remote_info, mode=mode, encoding=encoding)
except RemoteActionNotImplemented:
with self.state:
cache_info = out.get_used_cache(remote=remote)
self.cloud.pull(cache_info, remote=remote)
return _open(cache_file, mode=mode, encoding=encoding)
def ignore_remove(self, path):
entry, gitignore = self._get_gitignore(path)
if not os.path.exists(gitignore):
return
with open(gitignore, "r") as fobj:
lines = fobj.readlines()
filtered = list(filter(lambda x: x.strip() != entry.strip(), lines))
with open(gitignore, "w") as fobj:
fobj.writelines(filtered)
self.track_file(relpath(gitignore))