Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
content_type="application/json")
httpretty.register_uri(httpretty.GET,
self.tenant_href + "/applications",
responses=[
httpretty.Response(json.dumps(tenant_applications_body_one_app)),
httpretty.Response(json.dumps(tenant_applications_body_no_apps)),
],
content_type="application/json")
httpretty.register_uri(httpretty.GET,
self.app_href,
responses=[
httpretty.Response(json.dumps(self.app_body)),
httpretty.Response(body=json.dumps(error), status=404)
],
content_type="application/json")
httpretty.register_uri(httpretty.DELETE,
self.app_href,
body='', status=204)
applications = self.client.applications
# Test that we have one app
self.assertEquals(1, len(self.client.applications))
self.client.applications.refresh()
self.assertEquals(0, len(self.client.applications))
@httpretty.activate
def test_generates_feed_submission(self):
httpretty.register_uri(
httpretty.POST,
'https://mws.amazonservices.com/',
responses=[httpretty.Response(
self.load_data('submit_feed_response.xml'),
)],
)
submissions = gateway.submit_product_feed(
products=[self.product],
marketplaces=[self.marketplace],
)
self.assertEquals(len(submissions), 1)
self.assertEquals(submissions[0].submission_id, '2291326430')
self.assertEquals(
submissions[0].processing_status,
am.STATUS_SUBMITTED
)
httpretty.Response(body=request_callback)
])
httpretty.register_uri(httpretty.GET,
CONFLUENCE_HISTORICAL_CONTENT_1,
responses=[
httpretty.Response(body=request_callback)
])
httpretty.register_uri(httpretty.GET,
CONFLUENCE_HISTORICAL_CONTENT_2,
responses=[
httpretty.Response(body=request_callback)
])
httpretty.register_uri(httpretty.GET,
CONFLUENCE_HISTORICAL_CONTENT_ATT,
responses=[
httpretty.Response(body=request_callback)
])
return http_requests
# To tests for Internal Server Error
return (500, headers, '')
elif page == str(KITSUNE_SERVER_FAIL_PAGE + 1):
# Next page to the server fail returns questions
return (200, headers, mozilla_questions_2)
else:
return (404, headers, '')
HTTPServer.requests_http.append(httpretty.last_request())
return (200, headers, body)
httpretty.register_uri(httpretty.GET,
KITSUNE_API_QUESTION,
responses=[
httpretty.Response(body=request_callback)
])
httpretty.register_uri(httpretty.GET,
KITSUNE_API_ANSWER,
responses=[
httpretty.Response(body=request_callback)
])
else:
return (404, headers, '')
HTTPServer.requests_http.append(httpretty.last_request())
return (200, headers, body)
httpretty.register_uri(httpretty.GET,
KITSUNE_API_QUESTION,
responses=[
httpretty.Response(body=request_callback)
])
httpretty.register_uri(httpretty.GET,
KITSUNE_API_ANSWER,
responses=[
httpretty.Response(body=request_callback)
])
raise Exception
requests_http.append(httpretty.last_request())
return 200, headers, body
httpretty.register_uri(httpretty.GET,
DISCOURSE_TOPICS_URL,
responses=[
httpretty.Response(body=request_callback)
for _ in range(2)
])
httpretty.register_uri(httpretty.GET,
DISCOURSE_TOPIC_URL_1148,
responses=[
httpretty.Response(body=request_callback)
])
httpretty.register_uri(httpretty.GET,
DISCOURSE_TOPIC_URL_1149,
responses=[
httpretty.Response(body=request_callback)
])
httpretty.register_uri(httpretty.GET,
DISCOURSE_POST_URL_1,
responses=[
httpretty.Response(body=request_callback)
])
httpretty.register_uri(httpretty.GET,
DISCOURSE_POST_URL_2,
responses=[
httpretty.Response(body=request_callback)
])
@httpretty.activate
def test_request_retry_502(self):
httpretty.register_uri(httpretty.GET, "https://my_service.com/get_endpoint",
responses=[
httpretty.Response(body="retry response", status=502),
httpretty.Response(body='success response', status=202),
])
response = self.client.send(self.request)
self.assertEqual(response.status_code, 202, msg="Should retry on 502")
http_requests.append(last_request)
return (200, headers, body)
httpretty.register_uri(httpretty.GET,
MEETUP_EVENTS_URL,
responses=[
httpretty.Response(body=request_callback)
for _ in range(2)
])
for url in MEETUP_COMMENTS_URL:
httpretty.register_uri(httpretty.GET,
url,
responses=[
httpretty.Response(body=request_callback)
])
for url in MEETUP_RSVPS_URL:
httpretty.register_uri(httpretty.GET,
url,
responses=[
httpretty.Response(body=request_callback)
])
return http_requests
def test_backoff_waiting(self):
"""Test if the clients waits some seconds when backoff field is received"""
backoff_page = read_file('data/stackexchange/stackexchange_question_backoff_page')
question_page = read_file('data/stackexchange/stackexchange_question_page_2')
def request_callback(method, uri, headers):
params = urllib.parse.parse_qs(urllib.parse.urlparse(uri).query)
page = params.get('page')[0]
body = backoff_page if page == '1' else question_page
return (200, headers, body)
httpretty.register_uri(httpretty.GET,
STACKEXCHANGE_QUESTIONS_URL,
responses=[
httpretty.Response(body=request_callback)
])
client = StackExchangeClient(site="stackoverflow",
tagged="python",
token="aaa", max_questions=1)
before = time.time()
raw_pages = [question for question in client.get_questions(from_date=None)]
after = time.time()
self.assertEqual(len(raw_pages), 2)
# backoff value harcoded in the JSON
diff = after - before
self.assertGreaterEqual(diff, 0.2)
def request_callback(method, uri, headers):
if uri.startswith(BUGZILLA_BUGLIST_URL):
body = bodies_csv.pop(0)
elif uri.startswith(BUGZILLA_BUG_URL):
body = bodies_xml.pop(0)
else:
body = bodies_html[len(http_requests) % 2]
http_requests.append(httpretty.last_request())
return 200, headers, body
httpretty.register_uri(httpretty.GET,
BUGZILLA_BUGLIST_URL,
responses=[
httpretty.Response(body=request_callback)
for _ in range(3)
])
httpretty.register_uri(httpretty.GET,
BUGZILLA_BUG_URL,
responses=[
httpretty.Response(body=request_callback)
for _ in range(3)
])
httpretty.register_uri(httpretty.GET,
BUGZILLA_BUG_ACTIVITY_URL,
responses=[
httpretty.Response(body=request_callback)
for _ in range(7)
])
return http_requests