Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_remote_info(ds_path, ds_remote_info, to, missing):
"""Returns None if desired info was obtained, or a tuple (status, message)
if not"""
ds = Dataset(ds_path)
if ds.repo is None:
# There is no repository, nothing could be done
return ('impossible',
'No repository found for %s' % ds)
if to is None:
# we need an upstream remote, if there's none given. We could
# wait for git push to complain, but we need to explicitly
# figure it out for pushing annex branch anyway and we might as
# well fail right here.
track_remote, track_refspec = ds.repo.get_tracking_branch()
if not track_remote:
# no tracking remote configured, but let try one more
# if we only have one remote, and it has a push target
# configured that is "good enough" for us
cand_remotes = [r for r in ds.repo.get_remotes()
if 'remote.{}.push'.format(r) in ds.config]
check_paths.extend(check_path.parents)
if any(p in subds_status for p in check_paths):
conflict = [p for p in check_paths if p in subds_status]
res.update({
'status': 'error',
'message': (
'collision with %s (dataset) in dataset %s',
str(conflict[0]),
str(parentds_path))})
yield res
return
# important to use the given Dataset object to avoid spurious ID
# changes with not-yet-materialized Datasets
tbds = ds if isinstance(ds, Dataset) and \
ds.path == path else Dataset(str(path))
# don't create in non-empty directory without `force`:
if op.isdir(tbds.path) and listdir(tbds.path) != [] and not force:
res.update({
'status': 'error',
'message':
'will not create a dataset in a non-empty directory, use '
'`force` option to ignore'})
yield res
return
# stuff that we create and want to have tracked with git (not annex)
add_to_git = {}
if initopts is not None and isinstance(initopts, list):
initopts = {'_from_cmdline_': initopts}
Some value identifying a dataset or `None`. In the latter case
a dataset will be searched based on the process working directory.
check_installed : bool, optional
If True, an optional check whether the resolved dataset is
properly installed will be performed.
purpose : str, optional
This string will be inserted in error messages to make them more
informative. The pattern is "... dataset for ".
Returns
-------
Dataset
Or raises an exception (InsufficientArgumentsError).
"""
if dataset is not None and not isinstance(dataset, Dataset):
dataset = Dataset(dataset)
if dataset is None: # possible scenario of cmdline calls
dspath = get_dataset_root(getpwd())
if not dspath:
raise NoDatasetArgumentFound("No dataset found")
dataset = Dataset(dspath)
assert(dataset is not None)
lgr.debug("Resolved dataset{0}: {1}".format(
' for {}'.format(purpose) if purpose else '',
dataset))
if check_installed and not dataset.is_installed():
raise ValueError("No installed dataset found at "
"{0}.".format(dataset.path))
# report the dataset path rather than the repo path to avoid
# realpath/symlink issues
parentds=ds.path,
status='ok',
)
# if a dataset, and given in rsync-style 'ds/' or with sufficient
# recursion level left -> dive in
if props.get('type', None) == 'dataset' and (
(paths and paths.get(path, False)) or recursion_level != 0):
subds_state = props.get('state', None)
if subds_state in ('clean', 'deleted'):
# no need to look into the subdataset
continue
elif subds_state in ('added', 'modified'):
# dive
subds = Dataset(pathinds)
for r in _diff_ds(
subds,
# from before time or from the reported state
fr if constant_refs
else PRE_INIT_COMMIT_SHA
if subds_state == 'added'
else props['prev_gitshasum'],
# to the last recorded state, or the worktree
None if to is None
else to if constant_refs
else props['gitshasum'],
constant_refs,
# subtract on level on the way down, unless the path
# args instructed to go inside this subdataset
recursion_level=recursion_level
if paths and paths.get(path, False) else recursion_level - 1,
action='create_sibling',
# both next should not happen anyways
unavailable_path_status='impossible',
nondataset_path_status='error',
modified=since,
return_type='generator',
on_failure='ignore'):
if ap.get('status', None):
# this is done
yield ap
continue
if ap.get('type', None) != 'dataset' or ap.get('state', None) == 'absent':
# this can happen when there is `since`, but we have no
# use for anything but datasets here
continue
checkds_remotes = Dataset(ap['path']).repo.get_remotes() \
if ap.get('state', None) != 'absent' \
else []
if publish_depends:
# make sure dependencies are valid
# TODO: inherit -- we might want to automagically create
# those dependents as well???
unknown_deps = set(assure_list(publish_depends)).difference(checkds_remotes)
if unknown_deps:
ap['status'] = 'error'
ap['message'] = (
'unknown sibling(s) specified as publication dependency: %s',
unknown_deps)
yield ap
continue
if name in checkds_remotes and existing in ('error', 'skip'):
ap['status'] = 'error' if existing == 'error' else 'notneeded'
content_by_ds, ds_props, completed, nondataset_paths = \
annotated2content_by_ds(
annotated_paths,
refds_path=refds_path)
assert(not completed)
if not content_by_ds:
# we should have complained about any inappropriate path argument
# above, so if nothing is left, we can simply exit
return
# simple loop over datasets -- save happens later
# start deep down
to_save = []
for ds_path in sorted(content_by_ds, reverse=True):
ds = Dataset(ds_path)
torepoadd = {}
respath_by_status = {}
for ap in content_by_ds[ds_path]:
# we have a new story
ap.pop('status', None)
torepoadd[ap['path']] = ap
# skip anything that doesn't look like a wannabe subdataset
if not ap.get('type', None) == 'dataset' or \
ap['path'] == ds_path:
continue
if ap.get('registered_subds', False):
# subdataset that might be in this list because of the
# need to save all the way up to a super dataset
respath_by_status['success'] = \
subds_paths : list(str)
Sequence of absolute paths of subdatasets of the to-be-updated dataset,
whose agginfo shall be updated within the to-be-updated dataset.
Any subdataset that is not listed here is assumed to be gone (i.e. no longer
a subdataset at all, not just not locally installed)
incremental : bool
If set, the update will not remove any information on datasets not listed in
subds_paths
agginfo_db : dict
Dictionary with all information on aggregate metadata on all datasets.
Keys are absolute paths of datasets.
to_save : list
List of paths to save eventually. This function will add new paths as
necessary.
"""
ds = Dataset(ds_path)
# load existing aggregate info dict
# makes sure all file/dataset paths become absolute
# TODO take from cache, once used in _get_dsinfo_from_aggmetadata()
agginfo_fpath, agg_base_path = get_ds_aggregate_db_locations(ds)
ds_agginfos = load_ds_aggregate_db(ds, abspath=True)
# object locations referenced initially
objlocs_was = set(ai[k]
for ai in ds_agginfos.values()
for k in location_keys
if k in ai)
# track which objects need to be copied (each item is a from/to tuple
objs2copy = []
# for each subdataset (any depth level)
procds_paths = [ds.path] + subds_paths
for dpath in procds_paths:
ds_dbinfo = agginfo_db.get(dpath, {}).copy()
for spath in subpaths:
if os.path.exists(os.path.join(ds.path, spath)):
lgr.warning(
"Not creating subdataset at existing path: %s",
spath)
else:
for r in ds.create(spath, result_xfm=None,
return_type='generator', save=save):
yield r
for row in rows:
# Add additional information that we'll need for various
# operations.
filename_abs = os.path.join(ds.path, row["filename"])
if row["subpath"]:
ds_current = Dataset(os.path.join(ds.path,
row["subpath"]))
ds_filename = os.path.relpath(filename_abs, ds_current.path)
else:
ds_current = ds
ds_filename = row["filename"]
row.update({"filename_abs": filename_abs,
"ds": ds_current,
"ds_filename": ds_filename})
if version_urls:
num_urls = len(rows)
log_progress(lgr.info, "addurls_versionurls",
"Versioning %d URLs", num_urls,
label="Versioning URLs",
total=num_urls, unit=" URLs")
for row in rows:
one to be returned. Therefore local definitions/configurations take
precedence over ones, that come from outside (via a datalad-extension or a
dataset with its .datalad/config). If a dataset had precedence (as it was
before), the addition (or just an update) of a (sub-)dataset would otherwise
surprisingly cause you do execute code different from what you defined
within ~/.gitconfig or your local repository's .git/config.
So, local definitions take precedence over remote ones and more specific
ones over more general ones.
Returns
-------
tuple
path, name, format string, help message
"""
ds = ds if isinstance(ds, Dataset) else Dataset(ds) if ds else None
# 1. check system and user account for procedure
for loc in (cfg.obtain('datalad.locations.user-procedures'),
cfg.obtain('datalad.locations.system-procedures')):
for dir in assure_list(loc):
for m, n in _get_file_match(dir, name):
yield (m, n,) + _get_proc_config(n)
# 2. check dataset for procedure
if ds is not None and ds.is_installed():
# could be more than one
dirs = assure_list(
ds.config.obtain('datalad.locations.dataset-procedures'))
for dir in dirs:
# TODO `get` dirs if necessary
for m, n in _get_file_match(op.join(ds.path, dir), name):
yield (m, n,) + _get_proc_config(n, ds=ds)
def is_result_matching_pathsource_argument(res, **kwargs):
# we either have any non-zero number of "paths" (that could be anything), or
# we have one path and one source
# we don't do any error checking here, done by the command itself
source = kwargs.get('source', None)
if source is not None:
# we want to be able to deal with Dataset instances given as 'source':
if isinstance(source, Dataset):
source = source.path
# if there was a source, it needs to be recorded in the result
# otherwise this is not what we are looking for
return source == res.get('source_url', None)
# the only thing left is a potentially heterogeneous list of paths/URLs
paths = assure_list(kwargs.get('path', []))
# three cases left:
# 1. input arg was an absolute path -> must match 'path' property
# 2. input arg was relative to a dataset -> must match refds/relpath
# 3. something nifti with a relative input path that uses PWD as the
# reference
respath = res.get('path', None)
if respath in paths:
# absolute match, pretty sure we want this
return True
elif kwargs.get('dataset', None) and YieldRelativePaths()(res) in paths: