Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def ProxyInfoFromEnvironmentVar(proxy_env_var):
"""Reads proxy info from the environment and converts to httplib2.ProxyInfo.
Args:
proxy_env_var: Environment variable string to read, such as http_proxy or
https_proxy.
Returns:
httplib2.ProxyInfo constructed from the environment string.
"""
proxy_url = os.environ.get(proxy_env_var)
if not proxy_url or not proxy_env_var.lower().startswith('http'):
return httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP, None, 0)
proxy_protocol = proxy_env_var.lower().split('_')[0]
if not proxy_url.lower().startswith('http'):
# proxy_info_from_url requires a protocol, which is always http or https.
proxy_url = proxy_protocol + '://' + proxy_url
return httplib2.proxy_info_from_url(proxy_url, method=proxy_protocol)
# Setting of Google Custom Search.
service = None
if self.utility.proxy != '':
# Set proxy.
self.utility.print_message(WARNING, 'Set proxy server: {}'.format(self.utility.proxy))
parsed = util.parse_url(self.utility.proxy)
proxy = None
if self.utility.proxy_pass != '':
proxy = httplib2.ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP,
proxy_host=parsed.host,
proxy_port=parsed.port,
proxy_user=self.utility.proxy_user,
proxy_pass=self.utility.proxy_pass)
else:
proxy = httplib2.ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP,
proxy_host=parsed.host,
proxy_port=parsed.port)
my_http = httplib2.Http(proxy_info=proxy, disable_ssl_certificate_validation=True)
service = build("customsearch", "v1", developerKey=self.api_key, http=my_http)
else:
# None proxy.
service = build("customsearch", "v1", developerKey=self.api_key)
# Execute search.
urls = []
fqdn_list = []
result_count = 0
start_index = self.start_index
try:
search_count = 0
while search_count < max_page_count:
def __init__(self,root):
self.root = root
self.server = urlparse.urlparse(root)[1]
self.myls = lswww.lswww(root)
self.myls.verbosity(1)
socket.setdefaulttimeout(self.timeout)
# HttpLib2 vars
proxy = None
if self.proxy != "":
(proxy_type, proxy_usr, proxy_pwd, proxy_host, proxy_port,
path, query, fragment) = httplib2.parse_proxy(self.proxy)
proxy = httplib2.ProxyInfo(proxy_type, proxy_host, proxy_port,
proxy_user=proxy_usr, proxy_pass=proxy_pwd)
self.cookiejar = libcookie.libcookie(self.server)
self.h = httplib2.Http(cache = None, timeout = self.timeout, proxy_info = proxy)
self.h.follow_redirects=False
if self.auth_basic != []:
self.h.add_credentials(self.auth_basic[0], self.auth_basic[1])
def __init__(self, consumer_key, consumer_secret, proxy_host=None, proxy_port=None):
(self.consumer_key, self.consumer_secret) = (consumer_key, consumer_secret)
if proxy_host and proxy_port:
self.proxy = httplib2.ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP, proxy_host=proxy_host, proxy_port=proxy_port)
else:
self.proxy = None
def get_httplib():
http = None
try:
if __addon__.getSetting('proxy_use') == 'true':
(proxy_type, proxy_server, proxy_port, proxy_dns, proxy_user, proxy_pass) = get_proxy()
utils.log("Using proxy: type %i rdns: %i server: %s port: %s user: %s pass: %s" % (proxy_type, proxy_dns, proxy_server, proxy_port, "***", "***"),xbmc.LOGINFO)
http = httplib2.Http(proxy_info = httplib2.ProxyInfo(proxy_type, proxy_server, proxy_port, proxy_dns, proxy_user, proxy_pass))
else:
http = httplib2.Http()
except:
raise
utils.log('Failed to initialize httplib2 module',xbmc.LOGFATAL)
return http
baseurl = str(current_app.config.get('CFG_EPIC_BASEURL'))
prefix = str(current_app.config.get('CFG_EPIC_PREFIX'))
# If the proxy and proxy ports are set in the invenio-local.conf file
# read them and set the proxy. If not, do nothing.
try:
proxy = current_app.config.get('CFG_SITE_PROXY')
proxy_port = current_app.config.get('CFG_SITE_PROXYPORT')
except:
proxy = None
proxy_port = 80
if proxy is not None:
import socks
http = httplib2.Http(
proxy_info=httplib2.ProxyInfo(socks.PROXY_TYPE_HTTP,
proxy, proxy_port))
else:
http = httplib2.Http()
http.add_credentials(username, password)
if not (prefix.endswith('/')):
prefix += '/'
if baseurl.endswith('/'):
uri = baseurl + prefix
else:
uri = baseurl + '/' + prefix
if suffix != '':
uri += "/" + suffix.lstrip(prefix + "/")
# for a PUT, add 'If-None-Match': '*' to the header
def get_http_client_with_proxy():
proxies = handle_proxy()
if not proxies or not proxies['https']:
raise Exception('https proxy value is empty. Check Demisto server configuration')
https_proxy = proxies['https']
if not https_proxy.startswith('https') and not https_proxy.startswith('http'):
https_proxy = 'https://' + https_proxy
parsed_proxy = urlparse.urlparse(https_proxy)
proxy_info = httplib2.ProxyInfo(
proxy_type=httplib2.socks.PROXY_TYPE_HTTP, # disable-secrets-detection
proxy_host=parsed_proxy.hostname,
proxy_port=parsed_proxy.port,
proxy_user=parsed_proxy.username,
proxy_pass=parsed_proxy.password)
return httplib2.Http(proxy_info=proxy_info, disable_ssl_certificate_validation=DISABLE_SSL)
# disable-secrets-detection-end
def _get_proxy_configs(self):
try:
import socks
proxy=self._conf['proxy']
if proxy=='none':
return None
if proxy=='socks5':
socks=socks.PROXY_TYPE_SOCKS5
elif proxy=='socks4':
socks=socks.PROXY_TYPE_SOCKS4
else:
socks=socks.PROXY_TYPE_HTTP
return httplib2.ProxyInfo(proxy_type=socks,
proxy_host=self._conf['proxy_server'],
proxy_port=self._conf['proxy_port'])
return None
except ImportError:
return None
user, password = None, None
proxy_type = proxy_config.get('proxy_type')
proxy_type = proxy_type.lower() if proxy_type else 'http'
if proxy_type in _PROXY_TYPE_MAP:
ptv = _PROXY_TYPE_MAP[proxy_type]
elif proxy_type in list(_PROXY_TYPE_MAP.values()):
ptv = proxy_type
else:
ptv = socks.PROXY_TYPE_HTTP
_logger.info('Proxy type not found, set to "HTTP"')
rdns = is_true(proxy_config.get('proxy_rdns'))
proxy_info = ProxyInfo(
proxy_host=url,
proxy_port=int(port),
proxy_type=ptv,
proxy_user=user,
proxy_pass=password,
proxy_rdns=rdns
)
return proxy_info
"http": socks.PROXY_TYPE_HTTP,
"http_no_tunnel": socks.PROXY_TYPE_HTTP_NO_TUNNEL,
"socks4": socks.PROXY_TYPE_SOCKS4,
"socks5": socks.PROXY_TYPE_SOCKS5,
}
if config.get("proxy_type") in proxy_type_to_code:
proxy_type = proxy_type_to_code[config["proxy_type"]]
else:
proxy_type = socks.PROXY_TYPE_HTTP
rdns = scu.is_true(config.get("proxy_rdns"))
proxy_info = None
if config.get("proxy_url") and config.get("proxy_port"):
if config.get("proxy_username") and config.get("proxy_password"):
proxy_info = ProxyInfo(proxy_type=proxy_type,
proxy_host=config["proxy_url"],
proxy_port=int(config["proxy_port"]),
proxy_user=config["proxy_username"],
proxy_pass=config["proxy_password"],
proxy_rdns=rdns)
else:
proxy_info = ProxyInfo(proxy_type=proxy_type,
proxy_host=config["proxy_url"],
proxy_port=int(config["proxy_port"]),
proxy_rdns=rdns)
if proxy_info:
http = Http(proxy_info=proxy_info, timeout=timeout,
disable_ssl_certificate_validation=disable_ssl_validation)
else:
http = Http(timeout=timeout,
disable_ssl_certificate_validation=disable_ssl_validation)