Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def go():
_, port, _ = await self.create_server(
"GET",
"/blockchain/hardship/8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU",
handler,
)
with self.assertRaises(jsonschema.exceptions.ValidationError):
client = Client(BMAEndpoint("127.0.0.1", "", "", port))
await client(hardship, "8Fi1VSTbjkXguwThF4v2ZxC5whK7pwG2vcGTkPUPjPGU")
await client.close()
async def go():
_, port, _ = await self.create_server("GET", "/blockchain/with/tx", handler)
with self.assertRaises(jsonschema.exceptions.ValidationError):
client = Client(BMAEndpoint("127.0.0.1", "", "", port))
await client(tx)
await client.close()
async def go():
_, port, _ = await self.create_server("GET", "/network/ws2p/heads", handler)
with self.assertRaises(jsonschema.ValidationError):
client = Client(BMAEndpoint("127.0.0.1", "", "", port))
await client(network.ws2p_heads)
await client.close()
async def go():
_, port, _ = await self.create_server(
"GET", "/wot/certifiers-of/pubkey", handler
)
with self.assertRaises(jsonschema.exceptions.ValidationError):
client = Client(BMAEndpoint("127.0.0.1", "", "", port))
await client(certifiers_of, "pubkey")
await client.close()
async def go():
_, port, _ = await self.create_server(
"GET", "/blockchain/with/newcomers", handler
)
with self.assertRaises(jsonschema.exceptions.ValidationError):
client = Client(BMAEndpoint("127.0.0.1", "", "", port))
await client(newcomers)
await client.close()
async def go():
_, port, _ = await self.create_server("GET", "/tx/history/pubkey", handler)
with self.assertRaises(jsonschema.exceptions.ValidationError):
client = Client(BMAEndpoint("127.0.0.1", "", "", port))
await client(history, "pubkey")
await client.close()
def test_fromraw(self):
peer = Peer.from_signed_raw(rawpeer)
self.assertEqual(peer.currency, "beta_brousouf")
self.assertEqual(peer.pubkey, "HsLShAtzXTVxeUtQd7yi5Z5Zh4zNvbu8sTEZ53nfKcqY")
self.assertEqual(
str(peer.blockUID), "8-1922C324ABC4AF7EF7656734A31F5197888DDD52"
)
self.assertEqual(len(peer.endpoints), 4)
self.assertIsInstance(peer.endpoints[0], BMAEndpoint)
self.assertIsInstance(peer.endpoints[1], BMAEndpoint)
self.assertIsInstance(peer.endpoints[2], WS2PEndpoint)
self.assertIsInstance(peer.endpoints[3], UnknownEndpoint)
self.assertEqual(peer.endpoints[0].server, "some.dns.name")
self.assertEqual(peer.endpoints[0].ipv4, "88.77.66.55")
self.assertEqual(peer.endpoints[0].ipv6, "2001:42d0:52:a00::648")
self.assertEqual(peer.endpoints[0].port, 9001)
self.assertEqual(peer.endpoints[1].server, "some.dns.name")
self.assertEqual(peer.endpoints[1].ipv4, "88.77.66.55")
self.assertEqual(peer.endpoints[1].ipv6, "2001:42d0:52:a00::648")
self.assertEqual(peer.endpoints[1].port, 9002)
self.assertEqual(peer.endpoints[2].server, "g1-test.duniter.org")
self.assertEqual(peer.endpoints[2].ws2pid, "d2edcb92")
async def go():
_, port, _ = await self.create_server("GET", "/blockchain/with/ud", handler)
with self.assertRaises(jsonschema.exceptions.ValidationError):
client = Client(BMAEndpoint("127.0.0.1", "", "", port))
await client(ud)
await client.close()
def test_fromraw(self):
peer = Peer.from_signed_raw(rawpeer)
self.assertEqual(peer.currency, "beta_brousouf")
self.assertEqual(peer.pubkey, "HsLShAtzXTVxeUtQd7yi5Z5Zh4zNvbu8sTEZ53nfKcqY")
self.assertEqual(str(peer.blockUID), "8-1922C324ABC4AF7EF7656734A31F5197888DDD52")
self.assertEqual(len(peer.endpoints), 4)
self.assertIsInstance(peer.endpoints[0], BMAEndpoint)
self.assertIsInstance(peer.endpoints[1], BMAEndpoint)
self.assertIsInstance(peer.endpoints[2], WS2PEndpoint)
self.assertIsInstance(peer.endpoints[3], UnknownEndpoint)
self.assertEqual(peer.endpoints[0].server, "some.dns.name")
self.assertEqual(peer.endpoints[0].ipv4, "88.77.66.55")
self.assertEqual(peer.endpoints[0].ipv6, "2001:42d0:52:a00::648")
self.assertEqual(peer.endpoints[0].port, 9001)
self.assertEqual(peer.endpoints[1].server, "some.dns.name")
self.assertEqual(peer.endpoints[1].ipv4, "88.77.66.55")
self.assertEqual(peer.endpoints[1].ipv6, "2001:42d0:52:a00::648")
self.assertEqual(peer.endpoints[1].port, 9002)
self.assertEqual(peer.endpoints[2].server, "g1-test.duniter.org")
self.assertEqual(peer.endpoints[2].ws2pid, "d2edcb92")
async def go():
_, port, _ = await self.create_server(
"GET", "/blockchain/with/actives", handler
)
with self.assertRaises(jsonschema.exceptions.ValidationError):
client = Client(BMAEndpoint("127.0.0.1", "", "", port))
await client(actives)
await client.close()