Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_lifetime(self):
elg = EventLoopGroup()
resolver = DefaultHostResolver(elg)
bootstrap = ClientBootstrap(elg, resolver)
client = Client(bootstrap)
def _test_connection(self):
config = Config.get()
elg = EventLoopGroup()
resolver = DefaultHostResolver(elg)
bootstrap = ClientBootstrap(elg, resolver)
tls_opts = TlsContextOptions.create_client_with_mtls(config.cert, config.key)
tls = ClientTlsContext(tls_opts)
client = Client(bootstrap, tls)
connection = Connection(
client=client,
client_id=create_client_id(),
host_name=config.endpoint,
port=8883)
connection.connect().result(TIMEOUT)
return connection
port = kwargs.get('port', port)
socket_options = awscrt.io.SocketOptions()
socket_options.connect_timeout_ms = kwargs.get('tcp_connect_timeout_ms', 5000)
socket_options.keep_alive = kwargs.get('tcp_keepalive', False)
socket_options.keep_alive_timeout_secs = kwargs.get('tcp_keepalive_timeout_secs', 0)
socket_options.keep_alive_interval_secs = kwargs.get('tcp_keep_alive_interval_secs', 0)
socket_options.keep_alive_max_probes = kwargs.get('tcp_keep_alive_max_probes', 0)
username = kwargs.get('username', '')
if kwargs.get('enable_metrics_collection', True):
username += _get_metrics_str()
client_bootstrap = kwargs.get('client_bootstrap')
tls_ctx = awscrt.io.ClientTlsContext(tls_ctx_options)
mqtt_client = awscrt.mqtt.Client(client_bootstrap, tls_ctx)
return awscrt.mqtt.Connection(
client=mqtt_client,
on_connection_interrupted=kwargs.get('on_connection_interrupted'),
on_connection_resumed=kwargs.get('on_connection_resumed'),
client_id=kwargs.get('client_id'),
host_name=kwargs.get('endpoint'),
port=port,
clean_session=kwargs.get('clean_session', False),
reconnect_min_timeout_secs=kwargs.get('reconnect_min_timeout_secs', 5),
reconnect_max_timeout_secs=kwargs.get('reconnect_max_timeout_secs', 60),
keep_alive_secs=kwargs.get('keep_alive_secs', 3600),
ping_timeout_ms=kwargs.get('ping_timeout_ms', 3000),
will=kwargs.get('will'),
username=username,
password=kwargs.get('password'),
def __init__(self, bootstrap, tls_ctx=None):
assert isinstance(bootstrap, ClientBootstrap)
assert tls_ctx is None or isinstance(tls_ctx, ClientTlsContext)
super(Client, self).__init__()
self.tls_ctx = tls_ctx
self._binding = _awscrt.mqtt_client_new(bootstrap, tls_ctx)