Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _new_client_connection(self, secure, proxy_options=None):
if secure:
tls_ctx_opt = TlsContextOptions()
tls_ctx_opt.override_default_trust_store_from_path(None, 'test/resources/unittests.crt')
tls_ctx = ClientTlsContext(tls_ctx_opt)
tls_conn_opt = tls_ctx.new_connection_options()
tls_conn_opt.set_server_name(self.hostname)
else:
tls_conn_opt = None
event_loop_group = EventLoopGroup()
host_resolver = DefaultHostResolver(event_loop_group)
bootstrap = ClientBootstrap(event_loop_group, host_resolver)
connection_future = HttpClientConnection.new(host_name=self.hostname,
port=self.port,
bootstrap=bootstrap,
tls_connection_options=tls_conn_opt,
proxy_options=proxy_options)
return connection_future.result(self.timeout)
def test_with_mtls_from_path(self):
opt = TlsContextOptions.create_client_with_mtls_from_path(
'test/resources/unittests.crt', 'test/resources/unittests.key')
ctx = ClientTlsContext(opt)
def test_alpn_list(self):
opt = TlsContextOptions()
ctx = ClientTlsContext(opt)
conn_opt = TlsConnectionOptions(ctx)
conn_opt.set_alpn_list(['h2', 'http/1.1'])
def test_override_default_trust_store_file(self):
opt = TlsContextOptions()
opt.override_default_trust_store_from_path(None, 'test/resources/unittests.crt')
ctx = ClientTlsContext(opt)
def __init__(self, bootstrap, tls_ctx=None):
assert isinstance(bootstrap, ClientBootstrap)
assert tls_ctx is None or isinstance(tls_ctx, ClientTlsContext)
super(Client, self).__init__()
self.tls_ctx = tls_ctx
self._binding = _awscrt.mqtt_client_new(bootstrap, tls_ctx)
port = kwargs.get('port', port)
socket_options = awscrt.io.SocketOptions()
socket_options.connect_timeout_ms = kwargs.get('tcp_connect_timeout_ms', 5000)
socket_options.keep_alive = kwargs.get('tcp_keepalive', False)
socket_options.keep_alive_timeout_secs = kwargs.get('tcp_keepalive_timeout_secs', 0)
socket_options.keep_alive_interval_secs = kwargs.get('tcp_keep_alive_interval_secs', 0)
socket_options.keep_alive_max_probes = kwargs.get('tcp_keep_alive_max_probes', 0)
username = kwargs.get('username', '')
if kwargs.get('enable_metrics_collection', True):
username += _get_metrics_str()
client_bootstrap = kwargs.get('client_bootstrap')
tls_ctx = awscrt.io.ClientTlsContext(tls_ctx_options)
mqtt_client = awscrt.mqtt.Client(client_bootstrap, tls_ctx)
return awscrt.mqtt.Connection(
client=mqtt_client,
on_connection_interrupted=kwargs.get('on_connection_interrupted'),
on_connection_resumed=kwargs.get('on_connection_resumed'),
client_id=kwargs.get('client_id'),
host_name=kwargs.get('endpoint'),
port=port,
clean_session=kwargs.get('clean_session', False),
reconnect_min_timeout_secs=kwargs.get('reconnect_min_timeout_secs', 5),
reconnect_max_timeout_secs=kwargs.get('reconnect_max_timeout_secs', 60),
keep_alive_secs=kwargs.get('keep_alive_secs', 3600),
ping_timeout_ms=kwargs.get('ping_timeout_ms', 3000),
will=kwargs.get('will'),
username=username,
tls_connection_options = None
if scheme == 'https':
if args.cert is not None and args.key is not None:
tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.cert, args.key)
else:
tls_ctx_options = io.TlsContextOptions()
if args.cacert is not None or args.capath is not None:
tls_ctx_options.override_default_trust_store_from_path(args.capath, args.cacert)
if args.insecure:
tls_ctx_options.verify_peer = False
tls_ctx = io.ClientTlsContext(tls_ctx_options)
tls_connection_options = tls_ctx.new_connection_options()
tls_connection_options.set_server_name(url.hostname)
if args.alpn:
tls_connection_options.set_alpn_list(args.alpn)
# invoked up on the connection closing
def on_connection_shutdown(shutdown_future):
print('connection close with error: {}'.format(shutdown_future.exception()))
# invoked by the http request call as the response body is received in chunks
def on_incoming_body(http_stream, chunk, **kwargs):