Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def init_clients():
await cleanup(host, port, username, prefix=PREFIX)
aclient = gmqtt.Client(PREFIX + "myclientid", clean_session=True)
aclient.set_auth_credentials(username)
callback = Callbacks()
callback.register_for_client(aclient)
bclient = gmqtt.Client(PREFIX + "myclientid2", clean_session=True)
bclient.set_auth_credentials(username)
callback2 = Callbacks()
callback2.register_for_client(bclient)
yield aclient, callback, bclient, callback2
await aclient.disconnect()
await bclient.disconnect()
async def clean_retained(host, port, username, password=None, prefix=None):
def on_message(client, topic, payload, qos, properties):
curclient.publish(topic, b"", qos=0, retain=True)
curclient = gmqtt.Client(prefix + "cleanretained", clean_session=True)
curclient.set_auth_credentials(username, password)
curclient.on_message = on_message
await curclient.connect(host=host, port=port)
topic = '#' if not prefix else prefix + '#'
curclient.subscribe(topic)
await asyncio.sleep(10) # wait for all retained messages to arrive
await curclient.disconnect()
time.sleep(.1)
async def init_clients():
await cleanup(host, port, username, prefix=PREFIX)
aclient = gmqtt.Client(PREFIX + "myclientid", clean_session=True)
aclient.set_auth_credentials(username)
callback = Callbacks()
callback.register_for_client(aclient)
bclient = gmqtt.Client(PREFIX + "myclientid2", clean_session=True)
bclient.set_auth_credentials(username)
callback2 = Callbacks()
callback2.register_for_client(bclient)
yield aclient, callback, bclient, callback2
await aclient.disconnect()
await bclient.disconnect()
async def test_async_on_message(init_clients):
# redelivery on reconnect. When a QoS 1 or 2 exchange has not been completed, the server should retry the
# appropriate MQTT packets
messages = []
async def on_message(client, topic, payload, qos, properties):
print('MSG', (topic, payload, qos, properties))
await asyncio.sleep(0.5)
messages.append((topic, payload, qos, properties))
return 131
aclient, callback, bclient, callback2 = init_clients
disconnect_client = gmqtt.Client(PREFIX + 'myclientid3', optimistic_acknowledgement=False,
clean_session=False, session_expiry_interval=99999)
disconnect_client.set_config({'reconnect_retries': 0})
disconnect_client.on_message = on_message
disconnect_client.set_auth_credentials(username)
await disconnect_client.connect(host=host, port=port)
disconnect_client.subscribe(WILDTOPICS[6], 2)
await asyncio.sleep(1)
await aclient.connect(host, port)
await asyncio.sleep(1)
aclient.publish(TOPICS[1], b"", 1, retain=False)
aclient.publish(TOPICS[3], b"", 2, retain=False)
await asyncio.sleep(2)
messages = []
async def cleanup(host, port=1883, username=None, password=None, client_ids=None, prefix=None):
# clean all client state
print("clean up starting")
client_ids = client_ids or (prefix + "myclientid", prefix + "myclientid2", prefix + "myclientid3")
for clientid in client_ids:
curclient = gmqtt.Client(clientid.encode("utf-8"), clean_session=True)
curclient.set_auth_credentials(username=username, password=password)
await curclient.connect(host=host, port=port)
time.sleep(.1)
await curclient.disconnect()
time.sleep(.1)
# clean retained messages
await clean_retained(host, port, username, password=password, prefix=prefix)
print("clean up finished")
async def test_shared_subscriptions(init_clients):
aclient, callback, bclient, callback2 = init_clients
shared_sub_topic = '$share/sharename/{}x'.format(PREFIX)
shared_pub_topic = PREFIX + 'x'
await aclient.connect(host=host, port=port)
aclient.subscribe(shared_sub_topic)
aclient.subscribe(TOPICS[0])
await bclient.connect(host=host, port=port)
bclient.subscribe(shared_sub_topic)
bclient.subscribe(TOPICS[0])
pubclient = gmqtt.Client("myclient3", clean_session=True)
pubclient.set_auth_credentials(username)
await pubclient.connect(host, port)
count = 10
for i in range(count):
pubclient.publish(TOPICS[0], "message " + str(i), 0)
j = 0
while len(callback.messages) + len(callback2.messages) < 2 * count and j < 20:
await asyncio.sleep(1)
j += 1
await asyncio.sleep(1)
assert len(callback.messages) == count
assert len(callback2.messages) == count
callback.clear()
callback2.clear()
async def test_assigned_clientid():
noidclient = gmqtt.Client("", clean_session=True)
noidclient.set_auth_credentials(username)
callback = Callbacks()
callback.register_for_client(noidclient)
await noidclient.connect(host=host, port=port)
await noidclient.disconnect()
assert callback.connack[2]['assigned_client_identifier'][0] != ""
async def test_basic_subscriptions(init_clients):
aclient, callback, bclient, callback2 = init_clients
await aclient.connect(host=host, port=port)
await bclient.connect(host=host, port=port)
subscriptions = [
gmqtt.Subscription(TOPICS[1], qos=1),
gmqtt.Subscription(TOPICS[2], qos=2),
]
bclient.subscribe(subscriptions, user_property=('key', 'value'), subscription_identifier=1)
bclient.subscribe(gmqtt.Subscription(TOPICS[3], qos=1), user_property=('key', 'value'), subscription_identifier=2)
await asyncio.sleep(1)
aclient.publish(TOPICS[3], b"qos 0")
aclient.publish(TOPICS[1], b"qos 1", 1)
aclient.publish(TOPICS[2], b"qos 2", 2)
await asyncio.sleep(1)
assert len(callback2.messages) == 3
async def test_basic_subscriptions(init_clients):
aclient, callback, bclient, callback2 = init_clients
await aclient.connect(host=host, port=port)
await bclient.connect(host=host, port=port)
subscriptions = [
gmqtt.Subscription(TOPICS[1], qos=1),
gmqtt.Subscription(TOPICS[2], qos=2),
]
bclient.subscribe(subscriptions, user_property=('key', 'value'), subscription_identifier=1)
bclient.subscribe(gmqtt.Subscription(TOPICS[3], qos=1), user_property=('key', 'value'), subscription_identifier=2)
await asyncio.sleep(1)
aclient.publish(TOPICS[3], b"qos 0")
aclient.publish(TOPICS[1], b"qos 1", 1)
aclient.publish(TOPICS[2], b"qos 2", 2)
await asyncio.sleep(1)
assert len(callback2.messages) == 3
async def test_basic_subscriptions(init_clients):
aclient, callback, bclient, callback2 = init_clients
await aclient.connect(host=host, port=port)
await bclient.connect(host=host, port=port)
subscriptions = [
gmqtt.Subscription(TOPICS[1], qos=1),
gmqtt.Subscription(TOPICS[2], qos=2),
]
bclient.subscribe(subscriptions, user_property=('key', 'value'), subscription_identifier=1)
bclient.subscribe(gmqtt.Subscription(TOPICS[3], qos=1), user_property=('key', 'value'), subscription_identifier=2)
await asyncio.sleep(1)
aclient.publish(TOPICS[3], b"qos 0")
aclient.publish(TOPICS[1], b"qos 1", 1)
aclient.publish(TOPICS[2], b"qos 2", 2)
await asyncio.sleep(1)
assert len(callback2.messages) == 3