Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def should_bypass_proxies(url, no_proxy):
"""
Returns whether we should bypass proxies or not.
:rtype: bool
"""
get_proxy = lambda k: os.environ.get(k) or os.environ.get(k.upper())
# First check whether no_proxy is defined. If it is, check that the URL
# we're getting isn't in the no_proxy list.
no_proxy_arg = no_proxy
if no_proxy is None:
no_proxy = get_proxy('no_proxy')
netloc = urlparse(url).netloc
if no_proxy:
# We need to check whether we match here. We need to see if we match
# the end of the netloc, both with and without the port.
no_proxy = (
host for host in no_proxy.replace(' ', '').split(',') if host
)
ip = netloc.split(':')[0]
if is_ipv4_address(ip):
for proxy_ip in no_proxy:
if is_valid_cidr(proxy_ip):
if address_in_network(ip, proxy_ip):
return True
elif ip == proxy_ip:
# If no_proxy ip was defined in plain IP notation instead of cidr notation &
loc = os.path.expanduser('~/{0}'.format(f))
except KeyError:
# os.path.expanduser can fail when $HOME is undefined and
# getpwuid fails. See http://bugs.python.org/issue20164 &
# https://github.com/requests/requests/issues/1846
return
if os.path.exists(loc):
netrc_path = loc
break
# Abort early if there isn't one.
if netrc_path is None:
return
ri = urlparse(url)
# Strip port numbers from netloc. This weird `if...encode`` dance is
# used for Python 3.2, which doesn't support unicode literals.
splitstr = b':'
if isinstance(url, str):
splitstr = splitstr.decode('ascii')
host = ri.netloc.split(splitstr)[0]
try:
_netrc = netrc(netrc_path).authenticators(host)
if _netrc:
# Return with login / password
login_i = (0 if _netrc[0] else 1)
return (_netrc[login_i], _netrc[2])
except (NetrcParseError, IOError):
# If there was a parsing error or a permissions issue reading the file,
loc = os.path.expanduser('~/{0}'.format(f))
except KeyError:
# os.path.expanduser can fail when $HOME is undefined and
# getpwuid fails. See http://bugs.python.org/issue20164 &
# https://github.com/requests/requests/issues/1846
return
if os.path.exists(loc):
netrc_path = loc
break
# Abort early if there isn't one.
if netrc_path is None:
return
ri = urlparse(url)
# Strip port numbers from netloc. This weird `if...encode`` dance is
# used for Python 3.2, which doesn't support unicode literals.
splitstr = b':'
if isinstance(url, str):
splitstr = splitstr.decode('ascii')
host = ri.netloc.split(splitstr)[0]
try:
_netrc = netrc(netrc_path).authenticators(host)
if _netrc:
# Return with login / password
login_i = (0 if _netrc[0] else 1)
return (_netrc[login_i], _netrc[2])
except (NetrcParseError, IOError):
# If there was a parsing error or a permissions issue reading the file,
def should_bypass_proxies(url, no_proxy):
"""
Returns whether we should bypass proxies or not.
:rtype: bool
"""
get_proxy = lambda k: os.environ.get(k) or os.environ.get(k.upper())
# First check whether no_proxy is defined. If it is, check that the URL
# we're getting isn't in the no_proxy list.
no_proxy_arg = no_proxy
if no_proxy is None:
no_proxy = get_proxy('no_proxy')
netloc = urlparse(url).netloc
if no_proxy:
# We need to check whether we match here. We need to see if we match
# the end of the netloc, both with and without the port.
no_proxy = (
host for host in no_proxy.replace(' ', '').split(',') if host
)
ip = netloc.split(':')[0]
if is_ipv4_address(ip):
for proxy_ip in no_proxy:
if is_valid_cidr(proxy_ip):
if address_in_network(ip, proxy_ip):
return True
elif ip == proxy_ip:
# If no_proxy ip was defined in plain IP notation instead of cidr notation &
def urldefragauth(url):
"""
Given a url remove the fragment and the authentication part.
:rtype: str
"""
scheme, netloc, path, params, query, fragment = urlparse(url)
# see func:`prepend_scheme_if_needed`
if not netloc:
netloc, path = path, netloc
netloc = netloc.rsplit('@', 1)[-1]
return urlunparse((scheme, netloc, path, params, query, ''))
def should_retry(self, retry_policy_context):
retryable = RetryCondition.BLANK_STATUS
no_retry_flag = RetryCondition.NO_RETRY
mask = RetryCondition.SHOULD_RETRY
mask |= RetryCondition.SHOULD_RETRY_WITH_CLIENT_TOKEN
mask |= RetryCondition.SHOULD_RETRY_WITH_THROTTLING_BACKOFF
for condition in self.conditions:
ret = condition.should_retry(retry_policy_context)
retryable |= ret & mask
no_retry_flag &= ret & RetryCondition.NO_RETRY
return retryable | no_retry_flag
def should_retry(self, retry_policy_context):
retryable = RetryCondition.BLANK_STATUS
no_retry_flag = RetryCondition.NO_RETRY
mask = RetryCondition.SHOULD_RETRY
mask |= RetryCondition.SHOULD_RETRY_WITH_CLIENT_TOKEN
mask |= RetryCondition.SHOULD_RETRY_WITH_THROTTLING_BACKOFF
for condition in self.conditions:
ret = condition.should_retry(retry_policy_context)
retryable |= ret & mask
no_retry_flag &= ret & RetryCondition.NO_RETRY
return retryable | no_retry_flag
def __init__(self, retry_config):
self.retry_config = retry_config
def should_retry(self, retry_policy_context):
request = retry_policy_context.original_request
retry_config_prefix = retry_policy_context.retry_config_prefix
retryable_apis = _find_data_in_retry_config(
"RetryableAPIsWithClientToken", retry_config_prefix, self.retry_config)
if isinstance(retryable_apis, list) and request.action_name in retryable_apis:
return RetryCondition.SHOULD_RETRY | RetryCondition.SHOULD_RETRY_WITH_THROTTLING_BACKOFF
else:
return RetryCondition.NO_RETRY
class AndRetryCondition(RetryCondition):
def __init__(self, conditions):
self.conditions = conditions
def should_retry(self, retry_policy_context):
retryable = RetryCondition.BLANK_STATUS
for condition in self.conditions:
retryable |= condition.should_retry(retry_policy_context)
return retryable
class OrRetryCondition(RetryCondition):
def __init__(self, conditions):
self.conditions = conditions
def __init__(self, max_retry_times, retry_config):
RetryCondition.__init__(self)
self._inner_condition = AndRetryCondition([
MaxRetryTimesCondition(max_retry_times),
OrRetryCondition([
RetryOnApiCondition(retry_config),
RetryOnApiWithClientTokenCondition(retry_config),
]),
OrRetryCondition([
RetryOnExceptionCondition(retry_config),
RetryOnHttpStatusCondition(),
]),
def should_retry(self, retry_policy_context):
retryable = RetryCondition.BLANK_STATUS
for condition in self.conditions:
retryable |= condition.should_retry(retry_policy_context)
return retryable