Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def tearDown(self):
gc.collect()
# Native resources might need a few more ticks to finish cleaning themselves up.
wait_until = time.time() + TIMEOUT
while NativeResource._living and time.time() < wait_until:
time.sleep(0.1)
# Print out debugging info on leaking resources
if NativeResource._living:
print('Leaking NativeResources:')
for i in NativeResource._living:
print('-', i)
# getrefcount(i) returns 4+ here, but 2 of those are due to debugging.
# Don't show:
# - 1 for WeakSet iterator due to this for-loop.
# - 1 for getrefcount(i)'s reference.
# But do show:
# - 1 for item's self-reference.
# - the rest are what's causing this leak.
refcount = sys.getrefcount(i) - 2
self.retain = retain
class Client(NativeResource):
__slots__ = ('tls_ctx')
def __init__(self, bootstrap, tls_ctx=None):
assert isinstance(bootstrap, ClientBootstrap)
assert tls_ctx is None or isinstance(tls_ctx, ClientTlsContext)
super(Client, self).__init__()
self.tls_ctx = tls_ctx
self._binding = _awscrt.mqtt_client_new(bootstrap, tls_ctx)
class Connection(NativeResource):
def __init__(self,
client,
host_name,
port,
client_id,
clean_session=True,
on_connection_interrupted=None,
on_connection_resumed=None,
reconnect_min_timeout_secs=5,
reconnect_max_timeout_secs=60,
keep_alive_secs=3600,
ping_timeout_ms=3000,
will=None,
username=None,
password=None,
socket_options=None,
def init_logging(log_level, file_name):
"""
initialize a logger. log_level is type LogLevel, and file_name is of type str.
To write to stdout, or stderr, simply pass 'stdout' or 'stderr' as strings. Otherwise, a file path is assumed.
"""
assert log_level is not None
assert file_name is not None
_awscrt.init_logging(log_level, file_name)
def is_alpn_available():
return _awscrt.is_alpn_available()
class EventLoopGroup(NativeResource):
"""
Manages a collection of event-loops.
An event-loop is a thread for doing async work, such as I/O.
Classes that need to do async work will ask the EventLoopGroup for an event-loop to use.
"""
__slots__ = ('shutdown_event')
def __init__(self, num_threads=0):
"""
num_threads: Number of event-loops to create. Pass 0 to create one for each processor on the machine.
"""
super(EventLoopGroup, self).__init__()
shutdown_event = threading.Event()
return self._response_status_code
def _on_response(self, status_code, name_value_pairs):
self._response_status_code = status_code
if self._on_response_cb:
self._on_response_cb(http_stream=self, status_code=status_code, headers=name_value_pairs)
def _on_complete(self, error_code):
if error_code == 0:
self._completion_future.set_result(self._response_status_code)
else:
self._completion_future.set_exception(awscrt.exceptions.from_code(error_code))
class HttpMessageBase(NativeResource):
"""
Base for HttpRequest and HttpResponse classes.
"""
__slots__ = ('_headers')
def __init__(self, binding, headers, body_stream=None):
assert isinstance(headers, HttpHeaders)
super(HttpMessageBase, self).__init__()
self._binding = binding
self._headers = headers
if body_stream:
self.body_stream = body_stream
@property
SERVER_UNAVAILABLE = 3
BAD_USERNAME_OR_PASSWORD = 4
NOT_AUTHORIZED = 5
class Will(object):
__slots__ = ('topic', 'qos', 'payload', 'retain')
def __init__(self, topic, qos, payload, retain):
self.topic = topic
self.qos = qos
self.payload = payload
self.retain = retain
class Client(NativeResource):
__slots__ = ('tls_ctx')
def __init__(self, bootstrap, tls_ctx=None):
assert isinstance(bootstrap, ClientBootstrap)
assert tls_ctx is None or isinstance(tls_ctx, ClientTlsContext)
super(Client, self).__init__()
self.tls_ctx = tls_ctx
self._binding = _awscrt.mqtt_client_new(bootstrap, tls_ctx)
class Connection(NativeResource):
def __init__(self,
client,
host_name,
port,
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from __future__ import absolute_import
import _awscrt
from concurrent.futures import Future
from awscrt import NativeResource, isinstance_str
import awscrt.exceptions
from awscrt.io import ClientBootstrap, EventLoopGroup, DefaultHostResolver, InputStream, TlsConnectionOptions, SocketOptions
from enum import IntEnum
class HttpConnectionBase(NativeResource):
"""
Base for HTTP connection classes.
Attributes:
shutdown_future (concurrent.futures.Future): Completes when the connection has finished shutting down.
Future will contain a result of None, or an exception indicating why shutdown occurred.
Note that the connection may have been garbage-collected before this future completes.
"""
__slots__ = ('shutdown_future')
def __init__(self):
super(HttpConnectionBase, self).__init__()
self.shutdown_future = Future()
def close(self):
class HostResolverBase(NativeResource):
__slots__ = ()
class DefaultHostResolver(HostResolverBase):
__slots__ = ()
def __init__(self, event_loop_group, max_hosts=16):
assert isinstance(event_loop_group, EventLoopGroup)
super(DefaultHostResolver, self).__init__()
self._binding = _awscrt.host_resolver_new_default(max_hosts, event_loop_group)
class ClientBootstrap(NativeResource):
__slots__ = ('shutdown_event')
def __init__(self, event_loop_group, host_resolver):
assert isinstance(event_loop_group, EventLoopGroup)
assert isinstance(host_resolver, HostResolverBase)
super(ClientBootstrap, self).__init__()
shutdown_event = threading.Event()
def on_shutdown():
shutdown_event.set()
self.shutdown_event = shutdown_event
self._binding = _awscrt.client_bootstrap_new(event_loop_group, host_resolver, on_shutdown)
def __init__(self, num_threads=0):
"""
num_threads: Number of event-loops to create. Pass 0 to create one for each processor on the machine.
"""
super(EventLoopGroup, self).__init__()
shutdown_event = threading.Event()
def on_shutdown():
shutdown_event.set()
self.shutdown_event = shutdown_event
self._binding = _awscrt.event_loop_group_new(num_threads, on_shutdown)
class HostResolverBase(NativeResource):
__slots__ = ()
class DefaultHostResolver(HostResolverBase):
__slots__ = ()
def __init__(self, event_loop_group, max_hosts=16):
assert isinstance(event_loop_group, EventLoopGroup)
super(DefaultHostResolver, self).__init__()
self._binding = _awscrt.host_resolver_new_default(max_hosts, event_loop_group)
class ClientBootstrap(NativeResource):
__slots__ = ('shutdown_event')
class _UTC(datetime.tzinfo):
ZERO = datetime.timedelta(0)
def utcoffset(self, dt):
return _UTC.ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return _UTC.ZERO
_utc = _UTC()
class AwsCredentials(NativeResource):
"""
AwsCredentials are the public/private data needed to sign an authenticated AWS request.
AwsCredentials are immutable.
"""
__slots__ = ()
def __init__(self, access_key_id, secret_access_key, session_token=None):
assert isinstance_str(access_key_id)
assert isinstance_str(secret_access_key)
assert isinstance_str(session_token) or session_token is None
super(AwsCredentials, self).__init__()
self._binding = _awscrt.credentials_new(access_key_id, secret_access_key, session_token)
@property
def access_key_id(self):
def __init__(self, tls_ctx):
assert isinstance(tls_ctx, ClientTlsContext)
super(TlsConnectionOptions, self).__init__()
self.tls_ctx = tls_ctx
self._binding = _awscrt.tls_connections_options_new_from_ctx(tls_ctx)
def set_alpn_list(self, alpn_list):
_awscrt.tls_connection_options_set_alpn_list(self, _alpn_list_to_str(alpn_list))
def set_server_name(self, server_name):
_awscrt.tls_connection_options_set_server_name(self, server_name)
class InputStream(NativeResource):
"""InputStream allows awscrt native code to read from Python I/O classes"""
__slots__ = ()
# TODO: Implement IOBase interface so Python can read from this class as well.
def __init__(self, stream):
assert isinstance(stream, io.IOBase)
assert not isinstance(stream, InputStream)
super(InputStream, self).__init__()
self._binding = _awscrt.input_stream_new(stream)
@classmethod
def wrap(cls, stream, allow_none=False):
"""
If stream is already an awscrt.io.InputStream, return it.
Otherwise, return an awscrt.io.InputStream which wraps the stream.