Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def initiate(self):
if self._initiated:
return
self._initiated = True
d = opj(self.repopath, '.git', 'bin')
if not exists(d):
os.makedirs(d)
suf = '-' + self.custom_remote_name.rstrip(':') if self.custom_remote_name else ''
self._file = _file = opj(d, 'git-annex-remote-datalad' + suf)
if exists(_file):
lgr.debug("Commenting out previous entries")
# comment out all the past entries
with open(_file, 'rb') as f:
entries = list(map(assure_unicode, f.readlines()))
for i in range(len(self.HEADER.split(os.linesep)), len(entries)):
e = entries[i]
if e.startswith('recv ') or e.startswith('send '):
entries[i] = '#' + e
with open(_file, 'wb') as f:
def get_extracted_files(self):
"""Generator to provide filenames which are available under extracted archive
"""
path = self.assure_extracted()
path_len = len(path) + (len(os.sep) if not path.endswith(os.sep) else 0)
for root, dirs, files in os.walk(path): # TEMP
for name in files:
yield assure_unicode(opj(root, name)[path_len:])
def get_DIRHASH(self, key, full=False):
"""Gets a two level hash associated with a Key.
Parameters
----------
full: bool, optional
If True, would spit out full DIRHASH path, i.e. with a KEY/ directory
Something like "abc/def". This is always the same for any given Key, so
can be used for eg, creating hash directory structures to store Keys in.
"""
self.send("DIRHASH", key)
val = self.read("VALUE", 1)[1]
if full:
return opj(self.path, val, key)
else:
return val
def get_archive(self, archive):
archive = self._get_normalized_archive_path(archive)
if archive not in self._archives:
self._archives[archive] = \
ExtractedArchive(archive,
opj(self.path, _get_cached_filename(archive)),
persistent=self.persistent)
return self._archives[archive]
of the result (we are asking the location for the same archive key often)
"""
if key not in self._contentlocations:
fpath = self.repo.get_contentlocation(key, batch=True)
if fpath: # shouldn't store empty ones
self._contentlocations[key] = fpath
else:
fpath = self._contentlocations[key]
# but verify that it exists
if verify_exists and not lexists(opj(self.path, fpath)):
# prune from cache
del self._contentlocations[key]
fpath = ''
if absolute and fpath:
return opj(self.path, fpath)
else:
return fpath
def _get_normalized_archive_path(self, archive):
"""Return full path to archive
So we have consistent operation from different subdirs,
while referencing archives from the topdir
TODO: why do we need it???
"""
if not isabs(archive) and self._toppath:
out = normpath(opj(self._toppath, archive))
if relpath(out, self._toppath).startswith(pardir):
raise RuntimeError("%s points outside of the topdir %s"
% (archive, self._toppath))
if isdir(out):
raise RuntimeError("got a directory here... bleh")
return out
return archive
for root, dirs, files in os.walk(dir_, followlinks=False):
for d in dirs:
subdir = opj(root, d)
os.chmod(subdir,
os.stat(subdir).st_mode |
os.path.stat.S_IEXEC)
if leading_directories == 'strip':
_, dirs, files = next(os.walk(dir_))
if not len(files) and len(dirs) == 1:
# move all the content under dirs[0] up 1 level
widow_dir = opj(dir_, dirs[0])
lgr.debug("Moving content within %s upstairs" % widow_dir)
subdir, subdirs_, files_ = next(os.walk(opj(dir_, dirs[0])))
for f in subdirs_ + files_:
os.rename(opj(subdir, f), opj(dir_, f))
rmdir(widow_dir)
elif leading_directories is None:
pass # really do nothing
else:
raise NotImplementedError("Not supported %s" % leading_directories)
if (os.environ.get('DATALAD_USE_DEFAULT_GIT', '0').lower()
in ('1', 'on', 'true', 'yes')):
git_fpath = find_executable("git")
if git_fpath:
GitRunner._GIT_PATH = ''
lgr.log(9, "Will use default git %s", git_fpath)
return # we are done - there is a default git avail.
# if not -- we will look for a bundled one
annex_fpath = find_executable("git-annex")
if not annex_fpath:
# not sure how to live further anyways! ;)
alongside = False
else:
annex_path = op.dirname(op.realpath(annex_fpath))
alongside = op.lexists(op.join(annex_path, 'git'))
GitRunner._GIT_PATH = annex_path if alongside else ''
lgr.log(9, "Will use git under %r (no adjustments to PATH if empty "
"string)", GitRunner._GIT_PATH)
assert(GitRunner._GIT_PATH is not None) # we made the decision!
def get_max_path_length(top_path=None, maxl=1000):
"""Deduce the maximal length of the filename in a given path
"""
if not top_path:
top_path = getpwd()
import random
from datalad import lgr
from datalad.dochelpers import exc_str
from datalad.support import path
prefix = path.join(top_path, "dl%d" % random.randint(1 ,100000))
# some smart folks could implement binary search for this
max_path_length = None
for i in range(maxl-len(prefix)):
filename = prefix + '_' * i
path_length = len(filename)
try:
with open(filename, 'w') as f:
max_path_length = path_length
except Exception as exc:
lgr.debug(
"Failed to create sample file for length %d. Last succeeded was %s. Exception: %s",
path_length, max_path_length, exc_str(exc))
break
unlink(filename)
return max_path_length
def _get_key_path(self, key):
"""Return path to the KEY file
"""
# TODO: should actually be implemented by AnnexRepo
# Command is available in annex >= 20140410
(out, err) = \
self.runner(['git-annex', 'contentlocation', key], cwd=self.path)
# TODO: it would exit with non-0 if key is not present locally.
# we need to catch and throw our exception
return opj(self.path, out.rstrip(os.linesep))