Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_send_command(self, device):
url = BASE_URL + "/device/" + device.device.id + "/exec"
httpretty.register_uri(httpretty.POST, url)
# Exception must not be raised
device.send_command(Command("open"))
assert httpretty.last_request().parsed_body == {
"name": "open",
"parameters": [],
}
def setup(self):
httpretty.enable()
self.key = RSAKey(kid='testkey').load(os.path.join(FIXTURE_ROOT, 'testkey.pem'))
def jwks(_request, _uri, headers): # noqa: E306
ks = KEYS()
ks.add(self.key.serialize())
return 200, headers, ks.dump_jwks()
httpretty.register_uri(
httpretty.GET, oidc_rp_settings.PROVIDER_JWKS_ENDPOINT, status=200, body=jwks)
httpretty.register_uri(
httpretty.POST, oidc_rp_settings.PROVIDER_TOKEN_ENDPOINT,
body=json.dumps({
'id_token': self.generate_jws(), 'access_token': 'accesstoken',
'refresh_token': 'refreshtoken', }),
content_type='text/json')
httpretty.register_uri(
httpretty.GET, oidc_rp_settings.PROVIDER_USERINFO_ENDPOINT,
body=json.dumps({'sub': '1234', 'email': 'test@example.com', }),
content_type='text/json')
yield
httpretty.disable()
login = read_file('data/groupsio/login')
subscriptions_page_1 = read_file('data/groupsio/subscriptions_page_1')
subscriptions_page_2 = read_file('data/groupsio/subscriptions_page_2')
if empty_mbox:
httpretty.register_uri(httpretty.GET,
GROUPSIO_API_URL + GroupsioClient.DOWNLOAD_ARCHIVES,
body=groupsio_mbox_empty,
status=http_status_download)
else:
httpretty.register_uri(httpretty.GET,
GROUPSIO_API_URL + GroupsioClient.DOWNLOAD_ARCHIVES,
body=groupsio_mbox_archive,
status=http_status_download)
httpretty.register_uri(httpretty.POST,
GROUPSIO_API_URL + GroupsioClient.LOGIN,
body=login,
params={'email': 'jsmith@example.com', 'password': 'aaaaa'},
status=200)
httpretty.register_uri(httpretty.GET,
GROUPSIO_API_URL + GroupsioClient.GET_SUBSCRIPTIONS,
body=subscriptions_page_2,
params={"limit": 1, "next_page_token": 1},
status=http_status_subscriptions)
httpretty.register_uri(httpretty.GET,
GROUPSIO_API_URL + GroupsioClient.GET_SUBSCRIPTIONS,
body=subscriptions_page_1,
params={"limit": 1},
status=http_status_subscriptions)
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,6052765696,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,6052577280,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,6053679104,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,6052036608,available,mem,mac.local\n' \
'\n\n' \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,18632704,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,17420288,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,17256448,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,18362368,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,16723968,free,mem,mac.local\n\n'
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
_dataFrames = self.client.query_api().query_data_frame(
'from(bucket: "my-bucket") '
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")',
"my-org")
self.assertEqual(list, type(_dataFrames))
self.assertEqual(len(_dataFrames), 3)
self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrames[0].columns))
def stub_auth(self, subject_token=None, **kwargs):
if not subject_token:
subject_token = self.TEST_TOKEN
self.stub_url(httpretty.POST, ['auth', 'tokens'],
X_Subject_Token=subject_token, **kwargs)
@mock_response(hp.POST, '/v2/orders/foo/refund', mock_item)
def test_refund(self):
client = Client(api_key, api_secret)
order = new_api_object(client, mock_order, Order)
# Start with none of the required arguments, and slowly make requests with
# an additional required argument, expecting failure until all arguments
# are present.
send_kwargs = {}
required_kwargs = {'currency': 'USD'}
while required_kwargs:
with self.assertRaises(ValueError):
order.refund(**send_kwargs)
for key in required_kwargs:
send_kwargs[key] = required_kwargs.pop(key)
break
order = order.refund(**send_kwargs)
self.assertIsInstance(order, Order)
def test_fetch_post(self):
"""Test fetch method"""
output = "success"
httpretty.register_uri(httpretty.POST,
CLIENT_SPIDERMAN_URL,
body=output,
status=200)
client = MockedClient(CLIENT_API_URL, sleep_time=0.1, max_retries=1)
response = client.fetch(CLIENT_SPIDERMAN_URL, method=HttpClient.POST)
self.assertEqual(response.request.method, HttpClient.POST)
self.assertEqual(response.text, output)
def setup_expected_oauth_response(queryParameters, tokenPath, httpCode, returnDoc, authorityEndpoint):
query = urlencode(queryParameters)
url = "{}/{}?{}".format(authorityEndpoint.rstrip('/'), tokenPath.lstrip('/'), query)
httpretty.register_uri(httpretty.POST, url, json.dumps(returnDoc), status = httpCode, content_type = 'text/json')
def test_login(self):
httpretty.register_uri(
httpretty.POST,
"http://example.com/api_jsonrpc.php",
body=json.dumps({
"jsonrpc": "2.0",
"result": "0424bd59b807674191e7d77572075f33",
"id": 0
}),
)
zapi = ZabbixAPI('http://example.com')
zapi.login('mylogin', 'mypass')
# Check request
self.assertEqual(
json.loads(httpretty.last_request().body.decode('utf-8')),
{
'jsonrpc': '2.0',
def pretty_api():
httpretty.register_uri(httpretty.GET, 'http://api.test.com/a',
status=200,
content_type='application/json',
body=u'{"name": "a"}')
httpretty.register_uri(httpretty.POST, 'http://api.test.com/a',
status=200,
content_type='application/json',
body=u'{"name": "a"}')
httpretty.register_uri(httpretty.PUT, 'http://api.test.com/a',
status=200,
content_type='application/json',
body=u'{"name": "a"}')
httpretty.register_uri(httpretty.PATCH, 'http://api.test.com/a',
status=200,
content_type='application/json',
body=u'{"name": "a"}')
httpretty.register_uri(httpretty.DELETE, 'http://api.test.com/a',
status=200,
content_type='application/json',
body=u'{"name": "a"}')
httpretty.register_uri(httpretty.GET, 'http://api.test.com/b/c/d',