Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def initiate(self):
if self._initiated:
return
self._initiated = True
d = opj(self.repopath, '.git', 'bin')
if not exists(d):
os.makedirs(d)
suf = '-' + self.custom_remote_name.rstrip(':') if self.custom_remote_name else ''
self._file = _file = opj(d, 'git-annex-remote-datalad' + suf)
if exists(_file):
lgr.debug("Commenting out previous entries")
# comment out all the past entries
with open(_file, 'rb') as f:
entries = list(map(assure_unicode, f.readlines()))
for i in range(len(self.HEADER.split(os.linesep)), len(entries)):
e = entries[i]
if e.startswith('recv ') or e.startswith('send '):
entries[i] = '#' + e
with open(_file, 'wb') as f:
f.write(u''.join(entries).encode('utf-8'))
return # nothing else to be done
lgr.debug("Initiating protocoling."
"cd %s; vim %s"
% (realpath(self.repopath),
_file[len(self.repopath) + 1:]))
def is_extracted(self):
return exists(self.path) and exists(self.stamp_path) \
and os.stat(self.stamp_path).st_mtime >= os.stat(self.path).st_mtime
# file was changed -- would be the most kosher yoh thinks to the
# degree of our abilities
ti.mtime = committed_date
return ti
tar_args = dict(recursive=False, filter=_filter_tarinfo)
file_extension = '.{}{}'.format(
archivetype,
'{}{}'.format(
'.' if compression else '',
compression) if archivetype == 'tar' else '')
default_filename = "datalad_{.id}".format(dataset)
if filename is None:
filename = default_filename # in current directory
elif path.exists(filename) and path.isdir(filename):
filename = path.join(filename, default_filename) # under given directory
if not filename.endswith(file_extension):
filename += file_extension
root = dataset.path
# use dir inside matching the output filename
# TODO: could be an option to the export plugin allowing empty value
# for no leading dir
leading_dir = file_basename(filename)
# workaround for inability to pass down the time stamp
with patch('time.time', return_value=committed_date), \
tarfile.open(filename, "w:{}".format(compression)) \
if archivetype == 'tar' \
else zipfile.ZipFile(
filename, 'w',
lgr.debug(
"Previous extracted (but probably not fully) cached archive "
"found. Removing %s",
path)
rmtree(path)
os.makedirs(path)
assert (exists(path))
# remove old stamp
if exists(self.stamp_path):
rmtree(self.stamp_path)
decompress_file(self._archive, path, leading_directories=None)
# TODO: must optional since we might to use this content, move it
# into the tree etc
# lgr.debug("Adjusting permissions to R/O for the extracted content")
# rotree(path)
assert (exists(path))
# create a stamp
with open(self.stamp_path, 'wb') as f:
f.write(assure_bytes(self._archive))
# assert that stamp mtime is not older than archive's directory
assert (self.is_extracted)
def clean(self, force=False):
# would interfere with tests
# if os.environ.get('DATALAD_TESTS_TEMP_KEEP'):
# lgr.info("As instructed, not cleaning up the cache under %s"
# % self._path)
# return
for path, name in [
(self._path, 'cache'),
(self.stamp_path, 'stamp file')
]:
if exists(path):
if (not self._persistent) or force:
lgr.debug("Cleaning up the %s for %s under %s", name, self._archive, path)
# TODO: we must be careful here -- to not modify permissions of files
# only of directories
(rmtree if isdir(path) else unlink)(path)
def decompress_file(archive, dir_, leading_directories='strip'):
"""Decompress `archive` into a directory `dir_`
Parameters
----------
archive: str
dir_: str
leading_directories: {'strip', None}
If `strip`, and archive contains a single leading directory under which
all content is stored, all the content will be moved one directory up
and that leading directory will be removed.
"""
if not exists(dir_):
lgr.debug("Creating directory %s to extract archive into" % dir_)
os.makedirs(dir_)
with swallow_outputs() as cmo:
archive = assure_bytes(archive)
dir_ = assure_bytes(dir_)
patoolib.util.check_existing_filename(archive)
patoolib.util.check_existing_filename(dir_, onlyfiles=False)
# Call protected one to avoid the checks on existence on unixified path
outdir = unixify_path(dir_)
if not PY2:
# should be supplied in PY3 to avoid b''
outdir = assure_unicode(outdir)
archive = assure_unicode(archive)
format_compression = patoolib.get_archive_format(archive)
def get_extracted_file(self, afile):
lgr.debug(u"Requested file {afile} from archive {self._archive}".format(**locals()))
# TODO: That could be a good place to provide "compatibility" layer if
# filenames within archive are too obscure for local file system.
# We could somehow adjust them while extracting and here channel back
# "fixed" up names since they are only to point to the load
self.assure_extracted()
path = self.get_extracted_filename(afile)
# TODO: make robust
lgr.log(2, "Verifying that %s exists" % abspath(path))
assert exists(path), "%s must exist" % path
return path
lock_path=(lambda k: opj(self.repo.path, '.git', 'datalad-archives-%s' % k), (akey,)),
operation="annex-get"
) as (akey_fpath, lock):
if lock:
assert not akey_fpath
self._annex_get_archive_by_key(akey)
akey_fpath = self.get_contentlocation(akey)
if not akey_fpath:
raise RuntimeError(
"We were reported to fetch it alright but now can't "
"get its location. Check logic"
)
akey_path = opj(self.repo.path, akey_fpath)
assert exists(akey_path), "Key file %s is not present" % akey_path
# Extract that bloody file from the bloody archive
# TODO: implement/use caching, for now a simple one
# actually patool doesn't support extraction of a single file
# https://github.com/wummel/patool/issues/20
# so
pwd = getpwd()
lgr.debug(u"Getting file {afile} from {akey_path} while PWD={pwd}".format(**locals()))
apath = self.cache[akey_path].get_extracted_file(afile)
link_file_load(apath, path)
self.send('TRANSFER-SUCCESS', cmd, key)
return
except Exception as exc:
# from celery.contrib import rdb
# rdb.set_trace()
exc_ = exc_str(exc)