Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _create_connection(self, host, port, ssl, clean_session, keepalive):
# important for reconnects, make sure u know what u are doing if wanna change :(
self._exit_reconnecting_state()
connection = await MQTTConnection.create_connection(host, port, ssl, clean_session, keepalive)
connection.set_handler(self)
return connection
async def create_connection(cls, host, port, ssl, clean_session, keepalive, loop=None):
loop = loop or asyncio.get_event_loop()
transport, protocol = await loop.create_connection(MQTTProtocol, host, port, ssl=ssl)
return MQTTConnection(transport, protocol, clean_session, keepalive)
packet.extend(prop_bytes)
return packet
class FBNSMQTTProtocol(MQTTProtocol):
proto_name = b'MQTToT'
proto_ver = 3
async def send_auth_package(self, fbns_auth, clean_session, keepalive, will_message=None, **kwargs):
pkg = FBNSConnectPackageFactor.build_package(fbns_auth, clean_session, keepalive, self, will_message=will_message, **kwargs)
self.write_data(pkg)
class FBNSMQTTConnection(MQTTConnection):
@classmethod
async def create_connection(cls, host, port, ssl, clean_session, keepalive, loop=None):
loop = loop or asyncio.get_event_loop()
transport, protocol = await loop.create_connection(FBNSMQTTProtocol, host, port, ssl=ssl)
return FBNSMQTTConnection(transport, protocol, clean_session, keepalive)
async def auth(self, fbns_auth, will_message=None, **kwargs):
await self._protocol.send_auth_package(fbns_auth, self._clean_session,
self._keepalive, will_message=will_message, **kwargs)
def _keep_connection(self):
super()._keep_connection()
logger.debug('[KEEP ALIVE]')
FBNSConnAckReturnCodes = {