Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
returns.append(Response(200, json.dumps({'user': user})))
request.side_effect = returns
r = sess.get('/users/P123456')
self.assertEqual(sess.max_network_attempts+1,
request.call_count)
self.assertEqual(sess.max_network_attempts, sleep.call_count)
self.assertTrue(r.ok)
request.reset_mock()
sleep.reset_mock()
# Now test handling a non-transient error:
raises = [pdpyras.RequestsError("D'oh!")]*(
sess.max_network_attempts-1)
raises.extend([pdpyras.Urllib3Error("D'oh!")]*2)
request.side_effect = raises
self.assertRaises(pdpyras.PDClientError, sess.get, '/users')
self.assertEqual(sess.max_network_attempts+1,
request.call_count)
self.assertEqual(sess.max_network_attempts, sleep.call_count)
# Test custom retry logic:
sess.retry[404] = 3
request.side_effect = [
Response(404, json.dumps({})),
Response(404, json.dumps({})),
Response(200, json.dumps({'user': user})),
]
r = sess.get('/users/P123456')
self.assertEqual(200, r.status_code)
# Test retry logic with too many 404s
sess.retry[404] = 1
request.side_effect = [
def subdomain(self):
"""
Subdomain of the PagerDuty account of the API access token.
:type: str or None
"""
if not hasattr(self, '_subdomain') or self._subdomain is None:
try:
url = self.rget('users', params={'limit':1})[0]['html_url']
self._subdomain = url.split('/')[2].split('.')[0]
except PDClientError as e:
self.log.error("Failed to obtain subdomain; encountered error.")
self._subdomain = None
raise e
return self._subdomain
The path/URL to request. If it does not start with the base URL, the
base URL will be prepended.
:param \*\*kwargs:
Additional keyword arguments to pass to `requests.Session.request
`_
:type method: str
:type url: str
:returns: the HTTP response object
:rtype: `requests.Response`_
"""
sleep_timer = self.sleep_timer
network_attempts = 0
http_attempts = {}
method = method.strip().upper()
if method not in self.permitted_methods:
raise PDClientError(
"Method %s not supported by this API. Permitted methods: %s"%(
method, ', '.join(self.permitted_methods)))
req_kw = deepcopy(kwargs)
my_headers = self.prepare_headers(method)
# Merge, but do not replace, any headers specified in keyword arguments:
if 'headers' in kwargs:
my_headers.update(kwargs['headers'])
req_kw.update({'headers': my_headers, 'stream': False})
# Compose/normalize URL whether or not path is already a complete URL
if url.startswith(self.url) or not self.url:
my_url = url
else:
my_url = self.url + "/" + url.lstrip('/')
# Make the request (and repeat w/cooldown if the rate limit is reached):
while True:
try:
"""
actions = ('trigger', 'acknowledge', 'resolve')
if action not in actions:
raise ValueError("Event action must be one of: "+', '.join(actions))
event = {'event_action':action}
event.update(properties)
if isinstance(dedup_key, string_types):
event['dedup_key'] = dedup_key
elif not action == 'trigger':
raise ValueError("The dedup_key property is required for"
"event_action=%s events, and it must be a string."%action)
response = self.post('/v2/enqueue', json=event)
raise_on_error(response)
response_body = try_decoding(response)
if not 'dedup_key' in response_body:
raise PDClientError("Malformed response body; does not contain "
"deduplication key.", response=response)
return response_body['dedup_key']
def find_element_by_name(appDict, element, name):
"""
find the internal id for a pagerduty element (policy, service, priority, etc.)
:param appDict:
:param element: escalation_policies, service, priority, etc.
:param name:
:return: id of policy or None
"""
session = APISession(appDict['api_token'])
try:
rtn_element = session.find(element, name.strip().lower())
LOG.debug(rtn_element)
return rtn_element['id'] if rtn_element else None
except PDClientError as err:
LOG.error(str(err))
return None
def get_teams(session, comma_separated):
if comma_separated:
sys.stdout.write("Team ID, Team Name, User ID, User name, Team role\n")
try:
for team in session.iter_all('teams'):
get_team_members(team['id'], team['name'], session, comma_separated)
except pdpyras.PDClientError as e:
raise e
def try_decoding(r):
"""
JSON-decode a response body and raise :class:`PDClientError` if it fails.
:param r:
`requests.Response`_ object
"""
try:
return r.json()
except ValueError as e:
raise PDClientError("API responded with invalid JSON: "+r.text[:99],
response=r)
def raise_on_error(r):
"""
Raise an exception if a HTTP error response has error status.
:param r: Response object corresponding to the response received.
:type r: `requests.Response`_
:returns: The response object, if its status was success
:rtype: `requests.Response`_
"""
if r.ok:
return r
else:
raise PDClientError("%s %s: API responded with non-success status "
"(%d)"%(
r.request.method.upper(),
r.request.url.replace('https://api.pagerduty.com', ''),
r.status_code
), response=r
)
if total:
data['total'] = 1
if params is not None:
# Override defaults with values given:
data.update(params)
more = True
offset = 0
n = 0
while more: # Paginate through all results
if paginate:
data['offset'] = offset
r = self.get(path, params=data.copy())
if not r.ok:
if self.raise_if_http_error:
raise PDClientError("Encountered HTTP error status (%d) "
"response while iterating through index endpoint %s."%(
r.status_code, path), response=r)
self.log.debug("Stopping iteration on endpoint \"%s\"; API "
"responded with non-success status %d", path, r.status_code)
break
try:
response = r.json()
except ValueError:
self.log.debug("Stopping iteration on endpoint %s; API "
"responded with invalid JSON.", path)
break
if 'limit' in response:
data['limit'] = response['limit']
more = False
total_count = None
if paginate:
http_attempts[status] = 1 + http_attempts.get(status, 0)
sleep_timer *= self.sleep_timer_base
self.log.debug("HTTP error (%d); retrying in %g seconds.",
status, sleep_timer)
time.sleep(sleep_timer)
continue
elif status == 429:
sleep_timer *= self.sleep_timer_base
self.log.debug("Hit API rate limit (response status 429); "
"retrying in %g seconds", sleep_timer)
time.sleep(sleep_timer)
continue
elif status == 401:
# Stop. Authentication failed. We shouldn't try doing any more,
# because we'll run into problems later anyway.
raise PDClientError(
"Received 401 Unauthorized response from the API. The "
"access key (%s) might not be valid."%self.trunc_key,
response=response)
else:
# All went according to plan.
return response