Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# init annex when traces of a remote annex can be detected
if reckless:
lgr.debug(
"Instruct annex to hardlink content in %s from local "
"sources, if possible (reckless)", dataset.path)
dataset.config.add(
'annex.hardlink', 'true', where='local', reload=True)
lgr.debug("Initializing annex repo at %s", dataset.path)
# Note, that we cannot enforce annex-init via AnnexRepo().
# If such an instance already exists, its __init__ will not be executed.
# Therefore do quick test once we have an object and decide whether to call its _init().
#
# Additionally, call init if we need to add a description (see #1403),
# since AnnexRepo.__init__ can only do it with create=True
repo = AnnexRepo(dataset.path, init=True)
if not repo.is_initialized() or description:
repo._init(description=description)
if reckless:
repo._run_annex_command('untrust', annex_options=['here'])
srs = {True: [], False: []} # special remotes by "autoenable" key
remote_uuids = None # might be necessary to discover known UUIDs
for uuid, config in repo.get_special_remotes().items():
sr_name = config.get('name', None)
sr_autoenable = config.get('autoenable', False)
try:
sr_autoenable = assure_bool(sr_autoenable)
except ValueError:
# Be resilient against misconfiguration. Here it is only about
# informing the user, so no harm would be done
if op.exists(gitattributes_path):
with open(gitattributes_path, 'rb') as f:
known_attrs = [line.decode('utf-8').rstrip() for line in f.readlines()]
else:
known_attrs = []
for attr in desired_attrs.split('\n'):
if attr not in known_attrs:
known_attrs.append(attr)
with open(gitattributes_path, 'wb') as f:
f.write('\n'.join(known_attrs).encode('utf-8'))
# so for mortals it just looks like a regular directory!
if not ds.config.get('annex.thin'):
ds.config.add('annex.thin', 'true', where='local')
# initialize annex there if not yet initialized
AnnexRepo(ds.path, init=True)
# ds might have memories of having ds.repo GitRepo
superds = None
del ds
ds = Dataset(studydir)
# Add doesn't have all the options of save such as msg and supers
ds.add('.gitattributes', to_git=True, save=False)
dsh = dsh_path = None
if op.lexists(op.join(ds.path, '.heudiconv')):
dsh_path = op.join(ds.path, '.heudiconv')
dsh = Dataset(dsh_path)
if not dsh.is_installed():
# Previously we did not have it as a submodule, and since no
# automagic migration is implemented, we just need to check first
# if any path under .heudiconv is already under git control
if any(x[0].startswith('.heudiconv/') for x in
ds.repo.repo.index.entries.keys()):
# stuff that we create and want to have tracked with git (not annex)
add_to_git = []
if no_annex:
lgr.info("Creating a new git repo at %s", tbds.path)
GitRepo(
tbds.path,
url=None,
create=True,
git_opts=git_opts,
fake_dates=fake_dates)
else:
# always come with annex when created from scratch
lgr.info("Creating a new annex repo at %s", tbds.path)
tbrepo = AnnexRepo(
tbds.path,
url=None,
create=True,
backend=annex_backend,
version=annex_version,
description=description,
git_opts=git_opts,
annex_opts=annex_opts,
annex_init_opts=annex_init_opts,
fake_dates=fake_dates
)
if text_no_annex:
attrs = tbrepo.get_gitattributes('.')
# some basic protection against useless duplication
# on rerun with --force
modified_subs = []
for sm in self.get_submodules():
sm_dirty = False
# First check for changes committed in the submodule, using
# git submodule summary -- path,
# since this can't be detected from within the submodule.
if self.is_submodule_modified(sm.name):
sm_dirty = True
# check state of annex submodules, that might be in direct mode
elif AnnexRepo.is_valid_repo(opj(self.path, sm.path),
allow_noninitialized=False):
sm_repo = AnnexRepo(opj(self.path, sm.path),
create=False, init=False)
sm_status = sm_repo.get_status(untracked=untracked, deleted=deleted,
modified=modified, added=added,
type_changed=type_changed,
submodules=False, path=path)
if any([bool(sm_status[i]) for i in sm_status]):
sm_dirty = True
# check state of submodule, that is a plain git or not an
# initialized annex, which we can safely treat as a plain git, too.
elif GitRepo.is_valid_repo(opj(self.path, sm.path)):
sm_repo = GitRepo(opj(self.path, sm.path))
# TODO: Clarify issue: GitRepo.is_dirty() doesn't fit our parameters
if sm_repo.is_dirty(index=deleted or modified or added or type_changed,
tracked.
`key`
Annex key of a file (if an annex'ed file)
`bytesize`
Size of an annexed file in bytes.
`has_content`
Bool whether a content object for this key exists in the local
annex (with `eval_availability`)
`objloc`
pathlib.Path of the content object in the local annex, if one
is available (with `eval_availability`)
"""
if init is None:
info = OrderedDict()
elif init == 'git':
info = super(AnnexRepo, self).get_content_info(
paths=paths, ref=ref, **kwargs)
else:
info = init
# use this funny-looking option with both find and findref
# it takes care of git-annex reporting on any known key, regardless
# of whether or not it actually (did) exist in the local annex
opts = ['--copies', '0']
if ref:
cmd = 'findref'
opts.append(ref)
else:
cmd = 'find'
# stringify any pathobjs
opts.extend([str(p) for p in paths]
if paths else ['--include', '*'])
for j in self._run_annex_command_json(cmd, opts=opts):
# The simplest check first -- exist in both and content is the same.
# Even if content is just a symlink file on windows, the same content
# condition would be correct
if all(map(op.exists, paths)) and all_same(map(md5sum, paths)):
return True
# We first need to find problematic ones which are annexed and
# have no content locally, and take their
keys = []
backends = []
presents = []
for ds in dss:
repo = ds.repo
key = None
present = True
if isinstance(repo, AnnexRepo):
try:
key = repo.get_file_key(relpath)
except FileInGitError:
continue
if not key:
raise ValueError(
"Must have got a key, unexpectedly got %r for %s within %s"
% (key, relpath, ds)
)
# For now the rest (e.g. not tracked) remains an error
if not repo.file_has_content(relpath):
present = False
backends.append(repo.get_key_backend(key))
keys.append(key)
presents.append(present)
def get_urls(self, file_, key=False, batch=False):
"""Get URLs for a file/key
Parameters
----------
file_: str
key: bool, optional
Either provided files are actually annex keys
"""
return self.whereis(file_, output='full', batch=batch)[AnnexRepo.WEB_UUID]['urls']
def size(self):
"""Size of the node computed based on its type"""
type_ = self.type_
sizes = {'total': 0.0,
'ondisk': 0.0,
'git': 0.0,
'annex': 0.0,
'annex_worktree': 0.0}
if type_ in ['file', 'link', 'link-broken']:
# if node is under annex, ask annex for node size, ondisk_size
if isinstance(self.repo, AnnexRepo) and self.repo.is_under_annex(self._path):
size = self.repo.info(self._path, batch=True)['size']
ondisk_size = size \
if self.repo.file_has_content(self._path) \
else 0
# else ask fs for node size (= ondisk_size)
else:
size = ondisk_size = 0 \
if type_ == 'link-broken' \
else lstat(self.symlink or self._path).st_size
sizes.update({'total': size, 'ondisk': ondisk_size})
if self.repo.path == self._path:
sizes.update({'git': self.git_local_size,
'annex': self.annex_local_size,
'annex_worktree': self.annex_worktree_size})