Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.url = url.rstrip("/")
self.session = requests.session()
retry = Retry(
total=5,
read=5,
connect=5,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
if not auth:
try:
auth = HTTPBasicAuthFromNetrc(url)
except ValueError as e:
logger.debug("Error parsing netrc: %s", str(e))
pass
elif isinstance(auth, Anonymous):
logger.debug("Anonymous")
auth = None
if auth:
if not isinstance(auth, requests.auth.AuthBase):
raise ValueError(
"Invalid auth type; must be derived from requests.auth.AuthBase"
)
if not self.url.endswith(GERRIT_AUTH_SUFFIX):
self.url += GERRIT_AUTH_SUFFIX
else:
if not options.gerrit_url:
logging.error("Gerrit URL is required")
return 1
pattern = re.compile(r"^([\d]+)(month[s]?|year[s]?|week[s]?)")
match = pattern.match(options.age)
if not match:
logging.error("Invalid age: %s", options.age)
return 1
message = "Abandoning after %s %s or more of inactivity." % \
(match.group(1), match.group(2))
if options.digest_auth:
auth_type = HTTPDigestAuthFromNetrc
else:
auth_type = HTTPBasicAuthFromNetrc
try:
auth = auth_type(url=options.gerrit_url)
gerrit = GerritRestAPI(url=options.gerrit_url, auth=auth)
except Exception as e:
logging.error(e)
return 1
logging.info(message)
try:
stale_changes = []
offset = 0
step = 500
query_terms = ["status:new", "age:%s" % options.age]
if options.branches:
query_terms += ["branch:%s" % b for b in options.branches]
def __init__(self, url):
"""See class docstring."""
auth = _get_netrc_auth(url)
if not auth:
raise ValueError("netrc missing or no credentials found in netrc")
username, password = auth
super(HTTPBasicAuthFromNetrc, self).__init__(username, password)