Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_retry_policy():
policy = conf.retry_policy()
if policy == LINEAR_RETRY:
return LinearRetryPolicy(seconds_to_sleep=5, max_retries=5)
elif policy == CONFIGURABLE_RETRY:
return ConfigurableRetryPolicy(retry_seconds_to_sleep_list=conf.retry_seconds_to_sleep_list(), max_retries=conf.configurable_retry_policy_max_retries())
else:
raise BadUserConfigurationException(u"Retry policy '{}' not supported".format(policy))
def _credentials_override(f):
"""Provides special handling for credentials. It still calls _override().
If 'base64_password' in config is set, it will base64 decode it and returned in return value's 'password' field.
If 'base64_password' is not set, it will fallback to 'password' in config.
"""
credentials = f()
base64_decoded_credentials = {k: credentials.get(k) for k in ('username', 'password', 'url', 'auth')}
base64_password = credentials.get('base64_password')
if base64_password is not None:
try:
base64_decoded_credentials['password'] = base64.b64decode(base64_password).decode()
except Exception:
exception_type, exception, traceback = sys.exc_info()
msg = "base64_password for %s contains invalid base64 string: %s %s" % (f.__name__, exception_type, exception)
raise BadUserConfigurationException(msg)
if base64_decoded_credentials['auth'] is None:
base64_decoded_credentials['auth'] = get_auth_value(base64_decoded_credentials['username'], base64_decoded_credentials['password'])
return base64_decoded_credentials
def get_livy_kind(language):
if language == LANG_SCALA:
return SESSION_KIND_SPARK
elif language == LANG_PYTHON:
return SESSION_KIND_PYSPARK
elif language == LANG_R:
return SESSION_KIND_SPARKR
else:
raise BadUserConfigurationException("Cannot get session kind for {}.".format(language))
def __init__(self, endpoint, headers, retry_policy):
self._endpoint = endpoint
self._headers = headers
self._retry_policy = retry_policy
if self._endpoint.auth == constants.AUTH_KERBEROS:
self._auth = HTTPKerberosAuth(mutual_authentication=REQUIRED)
elif self._endpoint.auth == constants.AUTH_BASIC:
self._auth = (self._endpoint.username, self._endpoint.password)
elif self._endpoint.auth != constants.NO_AUTH:
raise BadUserConfigurationException(u"Unsupported auth %s" %self._endpoint.auth)
self.logger = SparkLog(u"ReliableHttpClient")
self.verify_ssl = not conf.ignore_ssl_errors()
if not self.verify_ssl:
self.logger.debug(u"ATTENTION: Will ignore SSL errors. This might render you vulnerable to attacks.")
requests.packages.urllib3.disable_warnings()