Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _json_http(
uri,
body=None,
headers={},
method='POST',
timeout=120.0
):
pool = urllib3.PoolManager(timeout=timeout, retries=urllib3.util.retry.Retry(15))
headers.update({'Content-Type': 'application/json', 'Connection': 'close'})
if body is not None and not isinstance(body, str):
body = json.dumps(body).encode('utf-8')
print '[Request]: %s url=%s, headers=%s, body=%s' % (method, uri, headers, body)
if body:
headers['Content-Length'] = len(body)
rsp = pool.request(method, uri, body=body, headers=headers)
else:
rsp = pool.request(method, uri, headers=headers)
print '[Response to %s %s]: status: %s, body: %s' % (method, uri, rsp.status, rsp.data)
return rsp
def test_connection_error_retries(self):
""" ECONNREFUSED error should raise a connection error, with retries """
port = find_unused_port()
pool = HTTPConnectionPool(self.host, port)
try:
pool.request('GET', '/', retries=Retry(connect=3))
self.fail("Should have failed with a connection error.")
except MaxRetryError as e:
self.assertTrue(isinstance(e.reason, ProtocolError))
self.assertEqual(e.reason.args[1].errno, errno.ECONNREFUSED)
def test_method_whitelist_with_status_forcelist(self):
# Falsey method_whitelist means to retry on any method.
retry = Retry(status_forcelist=[500], method_whitelist=None)
assert retry.is_retry("GET", status_code=500)
assert retry.is_retry("POST", status_code=500)
# Criteria of method_whitelist and status_forcelist are ANDed.
retry = Retry(status_forcelist=[500], method_whitelist=["POST"])
assert not retry.is_retry("GET", status_code=500)
assert retry.is_retry("POST", status_code=500)
def test_retry_default_remove_headers_on_redirect(self):
retry = Retry()
assert list(retry.remove_headers_on_redirect) == ["authorization"]
def test_default_method_whitelist_retried(self):
""" urllib3 should retry methods in the default method whitelist """
with HTTPConnectionPool(self.host, self.port) as pool:
retry = Retry(total=1, status_forcelist=[418])
resp = pool.request(
"OPTIONS",
"/successful_retry",
headers={"test-name": "test_default_whitelist"},
retries=retry,
)
assert resp.status == 200
def retry(self, retries=3, backoff_factor=0.3, status_forcelist=(500, 502, 504)):
"""Add retry to Requests Session
https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.retry.Retry
"""
retries = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
# mount all https requests
self.mount('https://', adapters.HTTPAdapter(max_retries=retries))
@pytest.mark.parametrize('value', [
"-1",
"+1",
"1.0",
six.u("\xb2"), # \xb2 = ^2
])
def test_parse_retry_after_invalid(self, value):
retry = Retry()
with pytest.raises(InvalidHeader):
retry.parse_retry_after(value)
print self.server_uri
# build headers once
self.headers = urllib3.make_headers( keep_alive=True,
user_agent='hubble-hec/{0}'.format(__version__),
accept_encoding=True)
self.headers.update({ 'Content-Type': 'application/json',
'Authorization': 'Splunk {0}'.format(self.token) })
# retries can be made much more flexible than shown here theoretically,
# it could take the load off overloaded servers through the backoff and
# improve overall throughput at those (usually transient) bottlenecks
# -- the number 3 was chosen essentially at random
pm_kw = {
'timeout': self.timeout,
'retries': urllib3.util.retry.Retry(
total=5, redirect=10, backoff_factor=3,
connect=self.timeout, read=self.timeout,
respect_retry_after_header=True)
}
if http_event_collector_SSL_verify:
pm_kw.update({'cert_reqs': 'CERT_REQUIRED', 'ca_certs': certifi.where()})
else:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
if self.proxy:
self.pool_manager = urllib3.ProxyManager(self.proxy, **pm_kw)
else:
self.pool_manager = urllib3.PoolManager(**pm_kw)
self.queue = DiskQueue()
def _load_adapter(self, max_retries=None):
if max_retries is None and self._adapter is not None:
return self._adapter
max_retries = max_retries or 0
adapter = HTTPAdapter()
adapter.max_retries = Retry(
total=max_retries,
read=max_retries,
connect=max_retries,
backoff_factor=self.backoff_factor,
status_forcelist=self.status_forcelist,
)
return adapter
client = BackendApplicationClient(
client_id=self._config["client_id"], scope=self._config["scope"]
)
self._http_client = RefreshingOAuth2Session(
client=client,
scope=self._config["scope"],
auto_refresh_url=self._config["token_url"],
auto_refresh_kwargs={
"client_id": self._config["client_id"],
"client_secret": self._config["client_secret"],
},
token_updater=self._save_token,
)
# Register retry handling for Connection errors and 502, 503, 504.
retry = Retry(status=3, connect=3, status_forcelist=[502, 503, 504])
adapter = HTTPAdapter(max_retries=retry)
self._http_client.mount("http://", adapter)
self._http_client.mount("https://", adapter)
if token:
self._http_client.token = token
else:
token = self._http_client.fetch_token(
token_url=self._config["token_url"],
scope=self._config["scope"],
client_id=self._config["client_id"],
client_secret=self._config["client_secret"],
)
self._save_token(token)
self.api_clients = ApiClientService(self)