Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_gdrive_url():
return "gdrive://root/" + str(uuid.uuid4())
def status(self):
if self.checksum and self.use_cache and self.changed_cache():
return {str(self): "not in cache"}
if not self.exists:
return {str(self): "deleted"}
if self.changed_checksum():
return {str(self): "modified"}
if not self.checksum:
return {str(self): "new"}
return {}
def check_missing_outputs(self):
paths = [str(out) for out in self.outs if not out.exists]
if paths:
raise MissingDataSource(paths)
def _walk_exc(self, exc_info):
import traceback
buffer = StringIO()
traceback.print_exception(*exc_info, file=buffer)
exc = exc_info[1]
tb = buffer.getvalue()
exc_list = [str(exc)]
tb_list = [tb]
# NOTE: parsing chained exceptions. See dvc/exceptions.py for more info
while hasattr(exc, "cause") and exc.cause:
exc_list.append(str(exc.cause))
if hasattr(exc, "cause_tb") and exc.cause_tb:
tb_list.insert(0, str(exc.cause_tb))
exc = exc.cause
return exc_list, tb_list
def _unprotect_file(self, path):
import stat
import uuid
from dvc.system import System
from dvc.utils import copyfile, move, remove
if System.is_symlink(path) or System.is_hardlink(path):
logger.debug("Unprotecting '{}'".format(path))
tmp = os.path.join(os.path.dirname(path), "." + str(uuid.uuid4()))
move(path, tmp)
copyfile(tmp, path)
remove(tmp)
else:
logger.debug(
"Skipping copying for '{}', since it is not "
"a symlink or a hardlink.".format(path)
)
os.chmod(path, os.stat(path).st_mode | stat.S_IWRITE)
def _unprotect_file(path):
if System.is_symlink(path) or System.is_hardlink(path):
logger.debug("Unprotecting '{}'".format(path))
tmp = os.path.join(os.path.dirname(path), "." + str(uuid()))
# The operations order is important here - if some application
# would access the file during the process of copyfile then it
# would get only the part of file. So, at first, the file should be
# copied with the temporary name, and then original file should be
# replaced by new.
copyfile(path, tmp, name="Unprotecting '{}'".format(relpath(path)))
remove(path)
os.rename(tmp, path)
else:
logger.debug(
"Skipping copying for '{}', since it is not "
"a symlink or a hardlink.".format(path)
)
def move(src, dst, mode=None):
"""Atomically move src to dst and chmod it with mode.
Moving is performed in two stages to make the whole operation atomic in
case src and dst are on different filesystems and actual physical copying
of data is happening.
"""
src = fspath_py35(src)
dst = fspath_py35(dst)
dst = os.path.abspath(dst)
tmp = "{}.{}".format(dst, str(uuid()))
if os.path.islink(src):
shutil.copy(os.readlink(src), tmp)
os.unlink(src)
else:
shutil.move(src, tmp)
if mode is not None:
os.chmod(tmp, mode)
shutil.move(tmp, dst)
Schemes.SSH: OutputSSH,
Schemes.LOCAL: OutputLOCAL,
}
# NOTE: currently there are only 3 possible checksum names:
#
# 1) md5 (LOCAL, SSH, GS);
# 2) etag (S3);
# 3) checksum (HDFS);
#
# so when a few types of outputs share the same name, we only need
# specify it once.
CHECKSUM_SCHEMA = {
RemoteLOCAL.PARAM_CHECKSUM: Any(str, None),
RemoteS3.PARAM_CHECKSUM: Any(str, None),
RemoteHDFS.PARAM_CHECKSUM: Any(str, None),
}
TAGS_SCHEMA = {str: CHECKSUM_SCHEMA}
SCHEMA = CHECKSUM_SCHEMA.copy()
SCHEMA[Required(OutputBase.PARAM_PATH)] = str
SCHEMA[OutputBase.PARAM_CACHE] = bool
SCHEMA[OutputBase.PARAM_METRIC] = OutputBase.METRIC_SCHEMA
SCHEMA[OutputBase.PARAM_TAGS] = TAGS_SCHEMA
SCHEMA[OutputBase.PARAM_PERSIST] = bool
def _get(stage, p, info, cache, metric, persist=False, tags=None):
parsed = urlparse(p)
if parsed.scheme == "remote":
def __init__(self, record):
msg = "failed to log {}".format(str(record))
super(LoggingException, self).__init__(msg)