Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_uri_set_virtual_host(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/travis', True)
self.assertIsInstance(connection.parameters['virtual_host'], str)
self.assertEqual(connection.parameters['virtual_host'], 'travis')
def test_uri_set_timeout(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F?'
'timeout=1337', True)
self.assertIsInstance(connection.parameters['timeout'], int)
self.assertEqual(connection.parameters['timeout'], 1337)
def test_uri_set_ssl(self):
connection = UriConnection('amqps://guest:guest@localhost:5671/%2F?'
'ssl_version=protocol_tlsv1&'
'cert_reqs=cert_required&'
'keyfile=file.key&'
'certfile=file.crt&'
'ca_certs=test', True)
self.assertTrue(connection.parameters['ssl'])
self.assertEqual(connection.parameters['ssl_options']['ssl_version'],
ssl.PROTOCOL_TLSv1)
self.assertEqual(connection.parameters['ssl_options']['cert_reqs'],
ssl.CERT_REQUIRED)
self.assertEqual(connection.parameters['ssl_options']['keyfile'],
'file.key')
self.assertEqual(connection.parameters['ssl_options']['certfile'],
'file.crt')
self.assertEqual(connection.parameters['ssl_options']['ca_certs'],
'test')
def test_uri_set_hostname(self):
connection = \
UriConnection('amqp://guest:guest@my-server:5672/%2F?'
'heartbeat=1337', True)
self.assertIsInstance(connection.parameters['hostname'], str)
self.assertEqual(connection.parameters['hostname'], 'my-server')
def test_uri_get_invalid_ssl_validation(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
self.assertEqual(ssl.CERT_NONE,
connection._get_ssl_validation('cert_test'))
def test_uri_set_heartbeat(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F?'
'heartbeat=1337', True)
self.assertIsInstance(connection.parameters['heartbeat'], int)
self.assertEqual(connection.parameters['heartbeat'], 1337)
def test_uri_default(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
self.assertEqual(connection.parameters['hostname'], 'localhost')
self.assertEqual(connection.parameters['username'], 'guest')
self.assertEqual(connection.parameters['password'], 'guest')
self.assertEqual(connection.parameters['virtual_host'], '/')
self.assertEqual(connection.parameters['port'], 5672)
self.assertEqual(connection.parameters['heartbeat'], 60)
self.assertEqual(connection.parameters['timeout'], 30)
self.assertFalse(connection.parameters['ssl'])
def test_uri_set_port(self):
connection = \
UriConnection('amqp://guest:guest@localhost:1337/%2F', True)
self.assertIsInstance(connection.parameters['port'], int)
self.assertEqual(connection.parameters['port'], 1337)
try:
if self._router._connection_type=="app":
request_res = requests.post(
"https://api.kervi.io/sessions",
headers= {
"api-user": self._user,
"api-password": self._password,
"app_id": self._app_id,
"app_name": self._app_name,
}
)
print("cres", request_res.json())
connection_string = 'amqps://'+self._user+':'+self._password+'@'+self._address+':' + str(self._port)+'/'+self._vhost
#print("cns", connection_string)
self._connection = UriConnection(
connection_string
)
#self._connection = Connection(self._address, self._user, self._password, port=self._port, vhost=self._vhost)
break
except amqpstorm.AMQPError as why:
LOGGER.exception(why)
print("why connect", why.error_code)
if why.error_code == 403 or why.error_code == 530:
print("Kervi.io authentication error, check configuration for the plugin kervi.plugin.routing.kervi_io")
break
if self._max_retries and attempts > self._max_retries:
break
time.sleep(min(attempts * 2, 30))
except KeyboardInterrupt:
break
def publisher():
with UriConnection(URI) as connection:
with connection.channel() as channel:
channel.queue.declare('simple_queue')
channel.basic.publish(body='Hello World!',
routing_key='simple_queue')