Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _parse_properties(self, packet):
if self.protocol_version < MQTTv50:
# If protocol is version is less than 5.0, there is no properties in packet
return {}, packet
properties_len, left_packet = unpack_variable_byte_integer(packet)
packet = left_packet[:properties_len]
left_packet = left_packet[properties_len:]
properties_dict = defaultdict(list)
while packet:
property_identifier, = struct.unpack("!B", packet[:1])
property_obj = Property.factory(id_=property_identifier)
if property_obj is None:
logger.critical('[PROPERTIES] received invalid property id {}, disconnecting'.format(property_identifier))
return None, None
result, packet = property_obj.loads(packet[1:])
for k, v in result.items():
properties_dict[k].append(v)
properties_dict = dict(properties_dict)
return properties_dict, left_packet
def _build_properties_data(cls, properties_dict, protocol_version):
if protocol_version < MQTTv50:
return bytearray()
data = bytearray()
for property_name, property_value in properties_dict.items():
property = Property.factory(name=property_name)
if property is None:
logger.warning('[GMQTT] property {} is not supported, it was ignored'.format(property_name))
continue
property_bytes = property.dumps(property_value)
data.extend(property_bytes)
result = pack_variable_byte_integer(len(data))
result.extend(data)
return result