Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _revs_as_results(dset, revs):
for rev in revs:
res = get_status_dict("run", ds=dset, commit=rev)
full_msg = dset.repo.format_commit("%B", rev)
try:
msg, info = get_run_info(dset, full_msg)
except ValueError as exc:
# Recast the error so the message includes the revision.
raise ValueError(
"Error on {}'s message: {}".format(rev, exc_str(exc)))
if info is not None:
res["run_info"] = info
res["run_message"] = msg
yield dict(res, status="ok")
shlex.split(cmd, posix=not on_windows)
if isinstance(cmd, string_types)
else cmd)
try:
proc = subprocess.Popen(cmd,
stdout=outputstream,
stderr=errstream,
shell=shell,
cwd=popen_cwd,
env=popen_env,
stdin=stdin)
except Exception as e:
prot_exc = e
lgr.log(11, "Failed to start %r%r: %s" %
(cmd, " under %r" % cwd if cwd else '', exc_str(e)))
raise
finally:
if self.protocol.records_ext_commands:
self.protocol.end_section(prot_id, prot_exc)
try:
if log_online:
out = self._get_output_online(proc,
log_stdout, log_stderr,
outputstream, errstream,
expect_stderr=expect_stderr,
expect_fail=expect_fail)
else:
out = proc.communicate()
def _get_system_ssh_version():
"""Return version of ssh available system-wide
Annex prior 20170302 was using bundled version, but now would use system one
if installed
"""
try:
out, err = _runner.run('ssh -V'.split(),
expect_fail=True, expect_stderr=True)
# apparently spits out to err but I wouldn't trust it blindly
if err.startswith('OpenSSH'):
out = err
assert out.startswith('OpenSSH') # that is the only one we care about atm
return out.split(' ', 1)[0].rstrip(',.').split('_')[1]
except CommandError as exc:
lgr.debug("Could not determine version of ssh available: %s", exc_str(exc))
return None
# Don't add result_filter if it's None because then
# eval_results can't distinguish between --report-{status,type}
# not specified via the CLI and None passed via the Python API.
kwargs['result_filter'] = res_filter
kwargs['proc_pre'] = args.common_proc_pre
kwargs['proc_post'] = args.common_proc_post
try:
ret = cls.__call__(**kwargs)
if inspect.isgenerator(ret):
ret = list(ret)
if args.common_output_format == 'tailored' and \
hasattr(cls, 'custom_result_summary_renderer'):
cls.custom_result_summary_renderer(ret)
return ret
except KeyboardInterrupt as exc:
ui.error("\nInterrupted by user while doing magic: %s" % exc_str(exc))
if cls._interrupted_exit_code is not None:
sys.exit(cls._interrupted_exit_code)
else:
raise
def teardown_package():
import os
from datalad.tests.utils import rmtemp, OBSCURE_FILENAME
lgr.debug("Printing versioning information collected so far")
from datalad.support.external_versions import external_versions as ev
print(ev.dumps(query=True))
try:
print("Obscure filename: str=%s repr=%r"
% (OBSCURE_FILENAME.encode('utf-8'), OBSCURE_FILENAME))
except UnicodeEncodeError as exc:
from .dochelpers import exc_str
print("Obscure filename failed to print: %s" % exc_str(exc))
def print_dict(d):
return " ".join("%s=%r" % v for v in d.items())
print("Encodings: %s" % print_dict(get_encoding_info()))
print("Environment: %s" % print_dict(get_envvars_info()))
if os.environ.get('DATALAD_TESTS_NOTEARDOWN'):
return
from datalad.ui import ui
from datalad import consts
ui.set_backend(_test_states['ui_backend'])
if _test_states['loglevel'] is not None:
lgr.setLevel(_test_states['loglevel'])
if _test_states['DATALAD_LOG_LEVEL'] is None:
os.environ.pop('DATALAD_LOG_LEVEL')
else:
os.environ['DATALAD_LOG_LEVEL'] = _test_states['DATALAD_LOG_LEVEL']
('Description', 'description')):
if bidsterm in bids:
meta[dataladterm] = bids[bidsterm]
README_fname = opj(self.ds.path, 'README')
if not meta.get('description') and exists(README_fname):
# BIDS uses README to provide description, so if was not
# explicitly provided to possibly override longer README, let's just
# load README
try:
desc = open(README_fname, encoding="utf-8").read()
except UnicodeDecodeError as exc:
lgr.warning(
"Failed to decode content of %s. "
"Re-loading allowing for UTF-8 errors with replacement: %s"
% (README_fname, exc_str(exc))
)
desc = open(README_fname, encoding="utf-8", errors="replace").read()
meta['description'] = desc.strip()
compliance = ["http://docs.datalad.org/metadata.html#v0-1"]
# special case
if bids.get('BIDSVersion'):
compliance.append(
'http://bids.neuroimaging.io/bids_spec{}.pdf'.format(
bids['BIDSVersion'].strip()))
else:
compliance.append('http://bids.neuroimaging.io')
meta['dcterms:conformsTo'] = compliance
return meta
def __getitem__(self, url):
try:
return self.cookies_db[self._get_provider(url)]
except Exception as exc:
lgr.warning("Failed to get a cookie for %s: %s",
url, exc_str(exc))
return None
except AccessDeniedError as e:
if isinstance(e, AnonymousAccessDeniedError):
access_denied = "Anonymous"
else:
access_denied = "Authenticated"
lgr.debug("%s access was denied: %s", access_denied, exc_str(e))
supported_auth_types = e.supported_types
exc_info = sys.exc_info()
except IncompleteDownloadError as e:
exc_info = sys.exc_info()
incomplete_attempt += 1
if incomplete_attempt > 5:
# give up
raise
lgr.debug("Failed to download fully, will try again: %s", exc_str(e))
# TODO: may be fail ealier than after 20 attempts in such a case?
except DownloadError:
# TODO Handle some known ones, possibly allow for a few retries, otherwise just let it go!
raise
msg_types = ''
if supported_auth_types:
msg_types = " The failure response indicated that following " \
"authentication types should be used: %s" % (', '.join(supported_auth_types))
if access_denied: # moved logic outside of except for clarity
# TODO: what if it was anonimous attempt without authentication,
# so it is not "requires_authentication" but rather
# "supports_authentication"? We should not report below in
# _get_new_credential that authentication has failed then since there
# were no authentication. We might need a custom exception to
# be caught above about that
def _get_tokens_for_login(login, tokens):
selected_tokens = []
for t in tokens:
try:
g = gh.Github(t)
gu = g.get_user()
if gu.login == login:
selected_tokens.append(t)
except gh.BadCredentialsException as exc:
lgr.debug(
"Token %s caused %s while trying to check token's use"
" login name. Skipping", _token_str(t), exc_str(exc))
lgr.debug(
"Selected %d tokens out of %d for the login %s",
len(selected_tokens), len(tokens), login
)
return selected_tokens
headers = next(csvrows)
except StopIteration:
raise ValueError("Failed to read CSV rows from {}".format(stream))
lgr.debug("Taking %s fields from first line as headers: %s",
len(headers), headers)
idx_map = dict(enumerate(headers))
rows = [dict(zip(headers, r)) for r in csvrows]
elif input_type == "json":
import json
try:
rows = json.load(stream)
except getattr(json.decoder, "JSONDecodeError", ValueError) as e:
# ^ py2 compatibility kludge.
raise ValueError(
"Failed to read JSON from stream {}: {}"
.format(stream, exc_str(e)))
# For json input, we do not support indexing by position,
# only names.
idx_map = {}
else:
raise ValueError("input_type must be 'csv', 'json', or 'ext'")
return rows, idx_map