Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_headers():
headers = {"key0": "val0", "key1": "val1"}
pi = httplib2.ProxyInfo(
httplib2.socks.PROXY_TYPE_HTTP, "localhost", 1234, proxy_headers=headers
)
assert pi.proxy_headers == headers
def ProxyInfoFromEnvironmentVar(proxy_env_var):
"""Reads proxy info from the environment and converts to httplib2.ProxyInfo.
Args:
proxy_env_var: Environment variable string to read, such as http_proxy or
https_proxy.
Returns:
httplib2.ProxyInfo constructed from the environment string.
"""
proxy_url = os.environ.get(proxy_env_var)
if not proxy_url or not proxy_env_var.lower().startswith('http'):
return httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP, None, 0)
proxy_protocol = proxy_env_var.lower().split('_')[0]
if not proxy_url.lower().startswith('http'):
# proxy_info_from_url requires a protocol, which is always http or https.
proxy_url = proxy_protocol + '://' + proxy_url
return httplib2.proxy_info_from_url(proxy_url, method=proxy_protocol)
def configure_proxy(self):
""" Configure a proxy so that the API accepts it, had to use httplib2.. """
if self.proxy is not None:
prox_list = self.proxy.split(":")
prox_list[0] = ""
prox_list[1] = '.'.join(re.findall(r"\d+", prox_list[1]))
return httplib2.ProxyInfo(proxy_type=httplib2.socks.PROXY_TYPE_HTTP,
proxy_host=prox_list[1],
proxy_port=prox_list[2])
else:
return None
def get_http_client_with_proxy():
proxies = handle_proxy()
if not proxies or not proxies['https']:
raise Exception('https proxy value is empty. Check Demisto server configuration')
https_proxy = proxies['https']
if not https_proxy.startswith('https') and not https_proxy.startswith('http'):
https_proxy = 'https://' + https_proxy
parsed_proxy = urlparse.urlparse(https_proxy)
proxy_info = httplib2.ProxyInfo(
proxy_type=httplib2.socks.PROXY_TYPE_HTTP, # disable-secrets-detection
proxy_host=parsed_proxy.hostname,
proxy_port=parsed_proxy.port,
proxy_user=parsed_proxy.username,
proxy_pass=parsed_proxy.password)
return httplib2.Http(proxy_info=proxy_info, disable_ssl_certificate_validation=DISABLE_SSL)
# disable-secrets-detection-end
from cloudconnectlib.common.log import get_cc_logger
from cloudconnectlib.core import defaults
from cloudconnectlib.core.exceptions import HTTPError
from httplib2 import Http, socks, ProxyInfo
from solnlib.packages.requests import PreparedRequest, utils
from solnlib.utils import is_true
try: # Python2 environment support
from httplib2 import SSLHandshakeError
except: # Python3 environment support
from ssl import SSLError as SSLHandshakeError
_logger = get_cc_logger()
_PROXY_TYPE_MAP = {
'http': socks.PROXY_TYPE_HTTP,
'http_no_tunnel': socks.PROXY_TYPE_HTTP_NO_TUNNEL,
'socks4': socks.PROXY_TYPE_SOCKS4,
'socks5': socks.PROXY_TYPE_SOCKS5,
}
class HTTPResponse(object):
"""
HTTPResponse class wraps response of HTTP request for later use.
"""
def __init__(self, response, content):
"""Construct a HTTPResponse from response and content returned
with httplib2 request"""
self._status_code = response.status
self._header = response
"proxy_type": http,http_no_tunnel,sock4,sock5,
"proxy_rdns": 0 or 1,
}
:return: Http2.Http object
"""
proxy_type_to_code = {
"http": socks.PROXY_TYPE_HTTP,
"http_no_tunnel": socks.PROXY_TYPE_HTTP_NO_TUNNEL,
"socks4": socks.PROXY_TYPE_SOCKS4,
"socks5": socks.PROXY_TYPE_SOCKS5,
}
if config.get("proxy_type") in proxy_type_to_code:
proxy_type = proxy_type_to_code[config["proxy_type"]]
else:
proxy_type = socks.PROXY_TYPE_HTTP
rdns = scu.is_true(config.get("proxy_rdns"))
proxy_info = None
if config.get("proxy_url") and config.get("proxy_port"):
if config.get("proxy_username") and config.get("proxy_password"):
proxy_info = ProxyInfo(proxy_type=proxy_type,
proxy_host=config["proxy_url"],
proxy_port=int(config["proxy_port"]),
proxy_user=config["proxy_username"],
proxy_pass=config["proxy_password"],
proxy_rdns=rdns)
else:
proxy_info = ProxyInfo(proxy_type=proxy_type,
proxy_host=config["proxy_url"],
proxy_port=int(config["proxy_port"]),
:config: dict like, proxy and account information are in the following
format {
"username": xx,
"password": yy,
"proxy_url": zz,
"proxy_port": aa,
"proxy_username": bb,
"proxy_password": cc,
"proxy_type": http,http_no_tunnel,sock4,sock5,
"proxy_rdns": 0 or 1,
}
:return: Http2.Http object
"""
proxy_type_to_code = {
"http": socks.PROXY_TYPE_HTTP,
"http_no_tunnel": socks.PROXY_TYPE_HTTP_NO_TUNNEL,
"socks4": socks.PROXY_TYPE_SOCKS4,
"socks5": socks.PROXY_TYPE_SOCKS5,
}
if config.get("proxy_type") in proxy_type_to_code:
proxy_type = proxy_type_to_code[config["proxy_type"]]
else:
proxy_type = socks.PROXY_TYPE_HTTP
rdns = scu.is_true(config.get("proxy_rdns"))
proxy_info = None
if config.get("proxy_url") and config.get("proxy_port"):
if config.get("proxy_username") and config.get("proxy_password"):
proxy_info = ProxyInfo(proxy_type=proxy_type,
proxy_host=config["proxy_url"],
def resolve_proxy_type(self, proxy_type):
# Make sure the proxy string is not none
if proxy_type is None:
return None
# Prepare the string so that the proxy type can be matched more reliably
t = proxy_type.strip().lower()
if t == "socks4":
return socks.PROXY_TYPE_SOCKS4
elif t == "socks5":
return socks.PROXY_TYPE_SOCKS5
elif t == "http":
return socks.PROXY_TYPE_HTTP
elif t == "":
return None
else:
if self.logger is not None:
self.logger.warn("Proxy type is not recognized: %s", proxy_type)
return None
self.github_url = github_url
if requests_per_second is None:
self.delay = 0
else:
self.delay = 1.0 / requests_per_second
self.last_request = datetime.datetime(1900, 1, 1)
if not self.url_prefix:
self.url_prefix = self.url_format % {
"github_url": self.github_url,
"api_version": self.api_version,
"api_format": self.api_format,
}
if proxy_host is None:
self._http = httplib2.Http(cache=cache)
else:
proxy_info = httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP,
proxy_host, proxy_port)
self._http = httplib2.Http(proxy_info=proxy_info, cache=cache)
self._http.ca_certs = CA_CERTS
if SYSTEM_CERTS:
LOGGER.info('Using system certificates in %r', CA_CERTS)
elif CURL_CERTS:
LOGGER.info("Using cURL's certificates in %r", CA_CERTS)
else:
LOGGER.warning('Using bundled certificate for HTTPS connections')
:config: dict like, proxy and account information are in the following
format {
"username": xx,
"password": yy,
"proxy_url": zz,
"proxy_port": aa,
"proxy_username": bb,
"proxy_password": cc,
"proxy_type": http,http_no_tunnel,sock4,sock5,
"proxy_rdns": 0 or 1,
}
:return: Http2.Http object
"""
proxy_type_to_code = {
"http": socks.PROXY_TYPE_HTTP,
"http_no_tunnel": socks.PROXY_TYPE_HTTP_NO_TUNNEL,
"socks4": socks.PROXY_TYPE_SOCKS4,
"socks5": socks.PROXY_TYPE_SOCKS5,
}
if config.get("proxy_type") in proxy_type_to_code:
proxy_type = proxy_type_to_code[config["proxy_type"]]
else:
proxy_type = socks.PROXY_TYPE_HTTP
rdns = scu.is_true(config.get("proxy_rdns"))
proxy_info = None
if config.get("proxy_url") and config.get("proxy_port"):
if config.get("proxy_username") and config.get("proxy_password"):
proxy_info = ProxyInfo(proxy_type=proxy_type,
proxy_host=config["proxy_url"],