Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __call__(repo_name, repo_accession, repo_url, path=None, output=None, dataset=None):
# we need this resource file, no point in starting without it
default_path = opj(dirname(datalad_neuroimaging.__file__), 'resources', 'isatab',
'scidata_bids_investigator.txt')
itmpl_path = cfg.obtain(
'datalad.plugin.bids2scidata.investigator.template',
default=default_path)
if itmpl_path == default_path and not os.path.isabs(default_path):
# see https://github.com/datalad/datalad/issues/2514
raise RuntimeError(
"Do not run within the datalad_neuroimaging source tree")
if path and dataset is None:
dataset = path
dataset = require_dataset(
dataset, purpose='metadata query', check_installed=True)
errored = False
dsmeta = None
filemeta = []
for m in metadata(
def _get_2fa_token(user):
one_time_password = ui.question(
"2FA one time password", hidden=True, repeat=False
)
token_note = cfg.obtain('datalad.github.token-note')
try:
# TODO: can fail if already exists -- handle!?
# in principle there is .authorization.delete()
auth = user.create_authorization(
scopes=['user', 'repo'], # TODO: Configurable??
note=token_note, # TODO: Configurable??
onetime_password=one_time_password)
except gh.GithubException as exc:
if (exc.status == 422 # "Unprocessable Entity"
and exc.data.get('errors', [{}])[0].get('code') == 'already_exists'
):
raise ValueError(
"Token %r already exists. If you specified "
"password -- don't, and specify token in configuration as %s. "
"If token already exists and you want to generate a new one "
"anyways - specify a new one via 'datalad.github.token-note' "
"configuration variable"
% (token_note, CONFIG_HUB_TOKEN_FIELD)
)
raise
token = auth.token
where_to_store = ui.question(
title="Where to store token %s?" % _token_str(token),
text="Empty string would result in the token not being "
"stored for future reuse, so you will have to adjust "
"configuration manually",
choices=["global", "local", ""]
)
if where_to_store:
try:
# Using .add so other (possibly still legit tokens) are not lost
if cfg.get(CONFIG_HUB_TOKEN_FIELD, None):
lgr.info("Found that there is some other known tokens already, "
"adding one more")
cfg.add(CONFIG_HUB_TOKEN_FIELD, auth.token,
where=where_to_store)
lgr.info("Stored %s=%s in %s config.",
CONFIG_HUB_TOKEN_FIELD, _token_str(token),
where_to_store)
except Exception as exc:
lgr.error("Failed to store token: %s",
# sanitize away the token
exc_str(exc).replace(token, _token_str(token)))
# assuming that it is ok to display the token to the user, since
# otherwise it would be just lost. ui shouldn't log it (at least
# ATM)
ui.error(
"Failed to store the token (%s), please store manually as %s"
raise
token = auth.token
where_to_store = ui.question(
title="Where to store token %s?" % _token_str(token),
text="Empty string would result in the token not being "
"stored for future reuse, so you will have to adjust "
"configuration manually",
choices=["global", "local", ""]
)
if where_to_store:
try:
# Using .add so other (possibly still legit tokens) are not lost
if cfg.get(CONFIG_HUB_TOKEN_FIELD, None):
lgr.info("Found that there is some other known tokens already, "
"adding one more")
cfg.add(CONFIG_HUB_TOKEN_FIELD, auth.token,
where=where_to_store)
lgr.info("Stored %s=%s in %s config.",
CONFIG_HUB_TOKEN_FIELD, _token_str(token),
where_to_store)
except Exception as exc:
lgr.error("Failed to store token: %s",
# sanitize away the token
exc_str(exc).replace(token, _token_str(token)))
# assuming that it is ok to display the token to the user, since
# otherwise it would be just lost. ui shouldn't log it (at least
# ATM)
ui.error(
"Failed to store the token (%s), please store manually as %s"
% (token, CONFIG_HUB_TOKEN_FIELD)
)
return token
from datalad.utils import chpwd # import late so we could mock during tests
with chpwd(chdir):
assert not (is_pipeline and is_template), "it is either a pipeline or a template name, can't be both"
if is_template:
# generate a config and overload path with its filename
path = initiate_pipeline_config(template=path, # kwargs=TODO,
commit=True)
# TODO: centralize via _params_ handling
if dry_run:
dryrun_optlabel = 'datalad.crawl.dryrun'
if dryrun_optlabel in cfg:
cfg.unset(dryrun_optlabel, where='local', reload=False)
cfg.add(dryrun_optlabel, "True", where='local')
if path is None:
# get config from the current repository/dataset
if is_pipeline:
raise ValueError("You must specify the file if --pipeline")
# Let's see if there is a config or pipeline in this repo
path = get_repo_pipeline_config_path()
if not path or not exists(path):
# Check if there may be the pipeline provided
path = get_repo_pipeline_script_path()
if path and exists(path):
is_pipeline = True
stats = ActivityStats()
def _get_log_setting(self, opt, default=False):
try:
return self._log_opts[opt]
except KeyError:
try:
from . import cfg
except ImportError:
return default
adapter = self._LOG_OPTS_ADAPTERS.get(opt, None)
self._log_opts[opt] = \
(cfg.getbool if not adapter else cfg.get_value)(
'datalad.log.cmd', opt, default=default)
if adapter:
self._log_opts[opt] = adapter(self._log_opts[opt])
return self._log_opts[opt]
def _get_result_filter(cls, args):
from datalad import cfg
result_filter = None
if args.common_report_status or 'datalad.runtime.report-status' in cfg:
report_status = args.common_report_status or \
cfg.obtain('datalad.runtime.report-status')
if report_status == "all":
pass # no filter
elif report_status == 'success':
result_filter = EnsureKeyChoice('status', ('ok', 'notneeded'))
elif report_status == 'failure':
result_filter = EnsureKeyChoice('status',
('impossible', 'error'))
else:
result_filter = EnsureKeyChoice('status', (report_status,))
if args.common_report_type:
tfilt = EnsureKeyChoice('type', tuple(args.common_report_type))
result_filter = result_filter & tfilt if result_filter else tfilt
return result_filter
def _get_format(self, log_name=False, log_pid=False):
from datalad import cfg
from datalad.config import anything2bool
show_timestamps = anything2bool(cfg.get('datalad.log.timestamp', False))
return (("" if not show_timestamps else "$BOLD%(asctime)-15s$RESET ") +
("%(name)-15s " if log_name else "") +
("{%(process)d}" if log_pid else "") +
"[%(levelname)s] "
"%(message)s ")
TODO: theoretically it is not network specific at all -- and just a memoize
pattern, but may be some time we would make it treat headers etc correctly.
And ATM would support any URL we support via providers/downloaders
Parameters
----------
fetcher: callable, optional
Function to call with url if needed to be refetched
maxage: float, optional
Age in days to retain valid for. <0 - would retain forever. If None -
would consult the config, 0 - would force to reload
"""
doc_fname = get_url_cache_filename(url, name)
if maxage is None:
maxage = float(cfg.get('datalad.locations.cache-maxage'))
doc = None
if os.path.exists(doc_fname) and maxage != 0:
fage = (time.time() - os.stat(doc_fname).st_mtime)/(24. * 3600)
if maxage < 0 or fage < maxage:
try:
lgr.debug("use cached request result to '%s' from %s", url, doc_fname)
doc = pickle.load(open(doc_fname, 'rb'))
except Exception as e: # it is OK to ignore any error and fall back on the true source
lgr.warning(
"cannot load cache from '%s', fall back to download: %s",
doc_fname, exc_str(e))
if doc is None:
if fetcher is None:
git_opts=initopts,
fake_dates=fake_dates
)
# set the annex backend in .gitattributes as a staged change
tbrepo.set_default_backend(
cfg.obtain('datalad.repo.backend', default='MD5E'),
persistent=True, commit=False)
add_to_git[tbds.repo.pathobj / '.gitattributes'] = {
'type': 'file',
'state': 'added'}
# make sure that v6 annex repos never commit content under .datalad
attrs_cfg = (
('config', 'annex.largefiles', 'nothing'),
('metadata/aggregate*', 'annex.largefiles', 'nothing'),
('metadata/objects/**', 'annex.largefiles',
'({})'.format(cfg.obtain(
'datalad.metadata.create-aggregate-annex-limit'))))
attrs = tbds.repo.get_gitattributes(
[op.join('.datalad', i[0]) for i in attrs_cfg])
set_attrs = []
for p, k, v in attrs_cfg:
if not attrs.get(
op.join('.datalad', p), {}).get(k, None) == v:
set_attrs.append((p, {k: v}))
if set_attrs:
tbds.repo.set_gitattributes(
set_attrs,
attrfile=op.join('.datalad', '.gitattributes'))
# prevent git annex from ever annexing .git* stuff (gh-1597)
attrs = tbds.repo.get_gitattributes('.git')
if not attrs.get('.git', {}).get(