Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_cache_type_is_properly_overridden(erepo):
erepo.dvc.config.set(
Config.SECTION_CACHE, Config.SECTION_CACHE_TYPE, "symlink"
)
erepo.dvc.scm.add([erepo.dvc.config.config_file])
erepo.dvc.scm.commit("set cache type to symlinks")
src = erepo.FOO
dst = erepo.FOO + "_imported"
Repo.get(erepo.root_dir, src, dst)
assert not System.is_symlink(dst)
assert os.path.exists(dst)
assert os.path.isfile(dst)
def _should_test_gdrive():
if os.getenv("DVC_TEST_GDRIVE") == "true":
return True
elif os.getenv("DVC_TEST_GDRIVE") == "false":
return False
oauth_storage = os.path.join(
Config.get_global_config_dir(),
"gdrive-oauth2",
"068b8e92002dd24414a9995a80726a14",
)
if os.path.exists(oauth_storage):
return True
from subprocess import CalledProcessError, check_output, Popen
from dvc.utils import env2bool
from dvc.config import Config
from dvc.remote import RemoteGDrive
from dvc.remote.gs import RemoteGS
from dvc.remote.s3 import RemoteS3
from tests.basic_env import TestDvc
from moto.s3 import mock_s3
TEST_REMOTE = "upstream"
TEST_SECTION = 'remote "{}"'.format(TEST_REMOTE)
TEST_CONFIG = {
Config.SECTION_CACHE: {},
Config.SECTION_CORE: {Config.SECTION_CORE_REMOTE: TEST_REMOTE},
TEST_SECTION: {Config.SECTION_REMOTE_URL: ""},
}
TEST_AWS_REPO_BUCKET = os.environ.get("DVC_TEST_AWS_REPO_BUCKET", "dvc-test")
TEST_GCP_REPO_BUCKET = os.environ.get("DVC_TEST_GCP_REPO_BUCKET", "dvc-test")
TEST_OSS_REPO_BUCKET = "dvc-test"
TEST_GCP_CREDS_FILE = os.path.abspath(
os.environ.get(
"GOOGLE_APPLICATION_CREDENTIALS",
os.path.join("scripts", "ci", "gcp-creds.json"),
)
)
# Ensure that absolute path is used
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = TEST_GCP_CREDS_FILE
RemoteSSH,
RemoteHDFS,
RemoteHTTP,
)
from dvc.remote.base import STATUS_OK, STATUS_NEW, STATUS_DELETED
from dvc.utils import file_md5
from dvc.utils.stage import load_stage_file, dump_stage_file
from tests.basic_env import TestDvc
from tests.utils import spy
TEST_REMOTE = "upstream"
TEST_SECTION = 'remote "{}"'.format(TEST_REMOTE)
TEST_CONFIG = {
Config.SECTION_CACHE: {},
Config.SECTION_CORE: {Config.SECTION_CORE_REMOTE: TEST_REMOTE},
TEST_SECTION: {Config.SECTION_REMOTE_URL: ""},
}
TEST_AWS_REPO_BUCKET = os.environ.get("DVC_TEST_AWS_REPO_BUCKET", "dvc-test")
TEST_GCP_REPO_BUCKET = os.environ.get("DVC_TEST_GCP_REPO_BUCKET", "dvc-test")
TEST_OSS_REPO_BUCKET = "dvc-test"
TEST_GCP_CREDS_FILE = os.path.abspath(
os.environ.get(
"GOOGLE_APPLICATION_CREDENTIALS",
os.path.join("scripts", "ci", "gcp-creds.json"),
)
)
# Ensure that absolute path is used
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = TEST_GCP_CREDS_FILE
)
)
if os.path.isdir(dvc_dir):
if not force:
raise InitError(
"'{repo}' exists. Use `-f` to force.".format(
repo=relpath(dvc_dir)
)
)
remove(dvc_dir)
os.mkdir(dvc_dir)
config = Config.init(dvc_dir)
proj = Repo(root_dir)
scm.add([config.config_file])
if scm.ignore_file:
scm.add([os.path.join(dvc_dir, scm.ignore_file)])
logger.info("\nYou can now commit the changes to git.\n")
_welcome_message()
return proj
def get_remote(self, remote=None, command=""):
if not remote:
remote = self._core.get(Config.SECTION_CORE_REMOTE)
if remote:
return self._init_remote(remote)
raise NoRemoteError(command)
def find_or_create_user_id():
"""
The user's ID is stored on a file under the global config directory.
The file should contain a JSON with a "user_id" key:
{"user_id": "16fd2706-8baf-433b-82eb-8c7fada847da"}
IDs are generated randomly with UUID.
"""
config_dir = Config.get_global_config_dir()
fname = config_dir / "user_id"
lockfile = fname.with_suffix(".lock")
try:
with Lock(lockfile):
try:
user_id = json.loads(fname.read_text())["user_id"]
except (FileNotFoundError, ValueError, AttributeError):
user_id = str(uuid.uuid4())
makedirs(fname.parent, exist_ok=True)
fname.write_text(str(json.dumps({"user_id": user_id})))
return user_id
except LockError:
logger.debug("Failed to acquire {lockfile}".format(lockfile=lockfile))
def wrapper(remote, *args, **kwargs):
if this.already_displayed:
return f(remote, *args, **kwargs)
config = remote.repo.config.config.get(Config.SECTION_CACHE, {})
cache_type = config.get(Config.SECTION_CACHE_TYPE)
should_warn = config.get(Config.SECTION_CACHE_SLOW_LINK_WARNING, True)
if not should_warn or cache_type:
return f(remote, *args, **kwargs)
start = time.time()
result = f(remote, *args, **kwargs)
delta = time.time() - start
if delta >= this.timeout_seconds:
logger.warning(this.message)
this.already_displayed = True
return result
)
parent_config_parser.add_argument(
"--system",
dest="level",
action="store_const",
const=Config.LEVEL_SYSTEM,
help="Use system config.",
)
parent_config_parser.add_argument(
"--local",
dest="level",
action="store_const",
const=Config.LEVEL_LOCAL,
help="Use local config.",
)
parent_config_parser.set_defaults(level=Config.LEVEL_REPO)
def add_parser(subparsers, parent_parser):
CONFIG_HELP = "Get or set config options."
config_parser = subparsers.add_parser(
"config",
parents=[parent_config_parser, parent_parser],
description=append_doc_link(CONFIG_HELP, "config"),
help=CONFIG_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
config_parser.add_argument(
"-u",
"--unset",
default=False,
def __init__(self, repo, config):
self.repo = repo
self.dvc_dir = repo.dvc_dir
self.root_dir = repo.root_dir
state_config = config.get(Config.SECTION_STATE, {})
self.row_limit = state_config.get(
Config.SECTION_STATE_ROW_LIMIT, self.STATE_ROW_LIMIT
)
self.row_cleanup_quota = state_config.get(
Config.SECTION_STATE_ROW_CLEANUP_QUOTA,
self.STATE_ROW_CLEANUP_QUOTA,
)
if not self.dvc_dir:
self.state_file = None
return
self.state_file = os.path.join(self.dvc_dir, self.STATE_FILE)
# https://www.sqlite.org/tempfiles.html
self.temp_files = [
self.state_file + "-journal",
self.state_file + "-wal",