Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
offset = uri.split("offset=")[1].split("&")[0]
if offset == "0":
body = mozilla_events_1
else:
body = mozilla_events_2
HTTPServer.requests_http.append(httpretty.last_request())
return (200, headers, body)
httpretty.register_uri(httpretty.GET,
MOZILLA_REPS_API_EVENTS,
responses=[
httpretty.Response(body=request_callback)
])
httpretty.register_uri(httpretty.GET,
MOZILLA_REPS_API_USERS,
responses=[
httpretty.Response(body=request_callback)
])
BUGZILLA_LOGIN_URL,
responses=[
httpretty.Response(body=request_callback)
])
httpretty.register_uri(httpretty.GET,
BUGZILLA_BUGLIST_URL,
responses=[
httpretty.Response(body=request_callback)
for _ in range(2)
])
httpretty.register_uri(httpretty.GET,
BUGZILLA_BUG_URL,
responses=[
httpretty.Response(body=request_callback)
])
httpretty.register_uri(httpretty.GET,
BUGZILLA_BUG_ACTIVITY_URL,
responses=[
httpretty.Response(body=request_callback)
for _ in range(2)
])
from_date = datetime.datetime(2015, 1, 1)
bg = Bugzilla(BUGZILLA_SERVER_URL,
user='jsmith@example.com',
password='1234')
bugs = [bug for bug in bg.fetch(from_date=from_date)]
self.assertEqual(len(bugs), 2)
self.assertEqual(bugs[0]['data']['bug_id'][0]['__text__'], '30')
self.assertEqual(len(bugs[0]['data']['activity']), 14)
def test_devices(self, api):
sites_path = os.path.join(CURRENT_DIR, "get_sites.json")
devices_path_1 = os.path.join(CURRENT_DIR, "get_devices_1.json")
devices_path_2 = os.path.join(CURRENT_DIR, "get_devices_2.json")
with open(sites_path, "r") as get_sites, open(
devices_path_1, "r"
) as get_devices_1, open(devices_path_2, "r") as get_devices_2:
httpretty.register_uri(
httpretty.GET, BASE_URL + "/site", body=get_sites.read()
)
httpretty.register_uri(
httpretty.GET,
BASE_URL + "/site/site-1/device",
body=get_devices_1.read(),
)
httpretty.register_uri(
httpretty.GET,
BASE_URL + "/site/site-2/device",
body=get_devices_2.read(),
)
assert len(api.get_devices()) == 4
assert len(api.get_devices("site-1")) == 3
assert len(api.get_devices(site_id="site-1")) == 3
assert len(api.get_devices(category=Category.ROLLER_SHUTTER)) == 3
def test_metadata_fetch_key(self):
base_url = 'http://169.254.169.254/%s/meta-data/' % (self.VERSION)
hp.register_uri(hp.GET, base_url, status=200,
body="\n".join(['hostname',
'instance-id',
'public-keys/']))
hp.register_uri(hp.GET, uh.combine_url(base_url, 'hostname'),
status=200, body='ec2.fake.host.name.com')
hp.register_uri(hp.GET, uh.combine_url(base_url, 'instance-id'),
status=200, body='123')
hp.register_uri(hp.GET, uh.combine_url(base_url, 'public-keys/'),
status=200, body='0=my-public-key')
hp.register_uri(hp.GET,
uh.combine_url(base_url, 'public-keys/0/openssh-key'),
status=200, body='ssh-rsa AAAA.....wZEf my-public-key')
md = eu.get_instance_metadata(self.VERSION, retries=0, timeout=0.1)
self.assertEqual(md['hostname'], 'ec2.fake.host.name.com')
self.assertEqual(md['instance-id'], '123')
self.assertEqual(1, len(md['public-keys']))
def test_get_version_local(self):
self.stub_url(httpretty.GET, base_url="http://localhost:35357/",
json=self.TEST_RESPONSE_DICT)
cs = client.Client()
versions = cs.discover()
self.assertIsInstance(versions, dict)
self.assertIn('message', versions)
self.assertIn('v2.0', versions)
self.assertEqual(
versions['v2.0']['url'],
self.TEST_RESPONSE_DICT['versions']['values'][0]['links'][0]
['href'])
def test_install_chef_from_omnibus_has_omnibus_version(self, m_subp_blob):
"""install_chef_from_omnibus provides version arg to OMNIBUS_URL."""
chef_outfile = self.tmp_path('chef.out', self.new_root)
response = '#!/bin/bash\necho "Hi Mom" > {0}'.format(chef_outfile)
httpretty.register_uri(
httpretty.GET, cc_chef.OMNIBUS_URL, body=response)
cc_chef.install_chef_from_omnibus(omnibus_version='2.0')
called_kwargs = m_subp_blob.call_args_list[0][1]
expected_kwargs = {
'args': ['-v', '2.0'],
'basename': 'chef-omnibus-install',
'blob': response,
'capture': False
}
self.assertCountEqual(expected_kwargs, called_kwargs)
def test_skip_auto_load_if_disabled(self):
httpretty.register_uri(
httpretty.GET, self.base_domain, body=self.api_description, status=200
)
expected_version = DEFAULT_API_VERSION
client = GeocodioClient(self.TEST_API_KEY, auto_load_api_version=False)
self.assertEqual(client.version, expected_version)
self.assertTrue(
client.BASE_URL.startswith(
"{domain}/v{version}".format(domain=self.base_domain, version=expected_version)))
self.assertEqual(len(httpretty.latest_requests()), 0)
httpretty.register_uri(
httpretty.GET,
"http://example.com/test.csv",
body="one,two,three\n1,2,3\n4,5,6",
content_type="text/csv"
)
httpretty.register_uri(
httpretty.GET,
"http://example.com/fail.txt",
body="Not Found",
content_type="text/plain",
status=404,
)
httpretty.register_uri(
httpretty.GET,
"http://example.com/fail.html",
body="Not Found",
content_type="text/html",
status=404,
)
"resultType": "org.opencb.commons.datastore.core.ObjectMap",
"result": [{"date": "20170128014231", "ip": "10.5.0.2", "sessionId": "XOkCfKX09FV0YyPJCBvd",
"id": "XOkCfKX09FV0YyPJCBvd"}],
"time": 0, "id": "You successfully logged in", "dbTime": 5}]
httpretty.register_uri(httpretty.HEAD, server, status=302)
response_base['response'] = user_results
httpretty.register_uri(httpretty.POST, re.compile(base + 'users/(.+)/login'), status=200,
body=json.dumps(response_base),
content_type='application/json',
)
httpretty.register_uri(httpretty.GET, re.compile(base + 'projects/(.+)/info'), status=200,
body=json.dumps(response_base),
content_type='application/json',
)
httpretty.register_uri(httpretty.GET, re.compile(base + 'files/search'), status=200,
body=self.create_paginate_response,
content_type='application/json',
)
httpretty.register_uri(httpretty.POST, re.compile(base + 'analysis/variant/query'), status=200,
body=self.create_paginate_response_no_count,
content_type='application/json',
)
self.configuration = {
'version': 'v1',
'rest': {
'hosts': [
'mock-opencga/opencga'
]
def register_spacetrack_http_call(self):
httpretty.register_uri(httpretty.POST,
'https://www.space-track.org/ajaxauth/login',
body="")
url = 'https://www.space-track.org/basicspacedata/query/class/' \
'tle_latest/NORAD_CAT_ID/25544/ORDINAL/1/'
httpretty.register_uri(httpretty.GET, url,
body=fixture('spacetrack_tle.json'))