Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
del self.headers['cookie']
def urlopen(self, method, url, body=None, headers=None,
retries=None, *args, **kwargs):
if hasattr(retries, 'retry_cookie'):
self._add_test_cookie(retries, headers)
response = super(TestHTTPConnectionPool, self).urlopen(
method, url, body, headers, retries, *args, **kwargs)
if hasattr(retries, 'retry_cookie'):
self._remove_test_cookie(retries, headers)
return response
class TestAdapter(HTTPAdapter):
def init_poolmanager(self, connections, maxsize, block=False):
self.poolmanager = PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block)
context = {
"scheme": "http",
"host": "localhost",
"port": 3000,
"block": block,
"maxsize": maxsize
}
test_hosts = [_default_key_normalizer(PoolKey, context)]
for host in test_hosts:
self.poolmanager.pools[host] = \
def _get_session_with_retries(self):
session = requests.Session()
retries = Retry(total=3, backoff_factor=1)
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))
return session
else:
path = path.rsplit('/', 1)[0] + '/'
new_key = urlparse.urlunsplit((
parsed.scheme,
parsed.netloc,
path,
parsed.query,
parsed.fragment
))
if new_key == key:
return None
key = new_key
class _FingerprintAdapter(requests.adapters.HTTPAdapter):
def __init__(self, fingerprint=None, **kwargs):
self.fingerprint = str(fingerprint)
super(_FingerprintAdapter, self).__init__(**kwargs)
def init_poolmanager(self, connections, maxsize, block=False):
self.poolmanager = PoolManager(num_pools=connections,
maxsize=maxsize,
block=block,
assert_fingerprint=self.fingerprint)
def request(method, url, data=None, headers=None, auth=None, verify=None,
session=None, latin1_fallback=True, verify_fingerprint=None):
'''
Wrapper method for requests, to ease logging and mocking. Parameters should
be the same as for ``requests.request``, except:
def init_connection_pools(self):
self.request_session = RequestSession()
retry = Retry(**self.settings["requests"]["retries"])
for protocol in ("http", "https"):
self.request_session.mount(
f"{protocol}://",
HTTPAdapter(max_retries=retry, **self.settings["requests"]["pool"]),
)
def make_session(self):
"""Authenticate and get the name of assigned SFDC data server"""
with connect_lock:
if self._sf_session is None:
sf_session = requests.Session()
# TODO configurable class Salesforce***Auth
sf_session.auth = SalesforcePasswordAuth(db_alias=self.alias,
settings_dict=self.settings_dict)
sf_instance_url = sf_session.auth.instance_url
sf_requests_adapter = HTTPAdapter(max_retries=get_max_retries())
sf_session.mount(sf_instance_url, sf_requests_adapter)
# Additional headers work, but the same are added automatically by "requests' package.
# sf_session.header = {'accept-encoding': 'gzip, deflate', 'connection': 'keep-alive'} # TODO
self._sf_session = sf_session
def _create_session(self):
import requests
self._session = requests.Session()
if self._pool_size > 0:
adapter = requests.adapters.HTTPAdapter(pool_connections=self._pool_size,
pool_maxsize=self._pool_size)
self._session.mount('http://', adapter)
self._session.mount('https://', adapter)
self._session.auth = self.auth
def __init__(self, host='localhost', port=8332, tls=False, max_retries=3):
self.host = host
self.port = port
self.tls = tls
self.session = requests.Session()
self.session.mount(self.host, HTTPAdapter(max_retries=max_retries))
self.headers = {'Content-Type': 'application/json'}
self.scheme = 'https' if self.tls else 'http'
self.url = '{}://{}:{}'.format(self.scheme, self.host, self.port)
def rebuild(self):
if self.session:
log.info('Rebuilding session and connection pools...')
# Build the connection pool
self.session = requests.Session()
self.session.proxies = self.proxies
# Mount adapters
self.session.mount('http://', HTTPAdapter(**self.adapter_kwargs))
self.session.mount('https://', HTTPSAdapter(ssl_version=self._ssl_version, **self.adapter_kwargs))
return self.session
def requests_retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session