Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Client ID
b.append(build_string(self.client_id))
if self.flags.will:
b.append(build_string(self.will_topic))
# Will message is a uint16 prefixed bytestring
b.append(pack('uint:16', len(self.will_message)).bytes)
b.append(self.will_message)
if self.flags.username:
b.append(build_string(self.username))
# Technically this should be binary data but we will only accept UTF-8
if self.flags.password:
b.append(build_string(self.password))
return b"".join(b)
def serialise(self):
"""
Assemble this into an on-wire message part.
"""
b = []
# Topic filter, as UTF-8
b.append(build_string(self.topic_filter))
# Reserved section + max QoS
b.append(pack('uint:6, uint:2', 0, self.max_qos).bytes)
return b"".join(b)
def _make_payload(self):
"""
Build the payload from its constituent parts.
"""
b = []
# Topic Name
b.append(build_string(self.topic_name))
if self.packet_identifier:
if self.qos_level > 0:
# Session identifier
b.append(pack('uint:16', self.packet_identifier).bytes)
else:
raise SerialisationFailure(self, "Packet Identifier on non-QoS 1/2 packet")
else:
if self.qos_level > 0:
raise SerialisationFailure(self, "QoS level > 0 but no Packet Identifier")
# Payload (bytes)
b.append(self.payload)
return b"".join(b)
b.append(build_string("MQTT"))
# Protocol Level (4 == 3.1.1)
b.append(pack('uint:8', 4).bytes)
# CONNECT flags
b.append(self.flags.serialise())
# Keep Alive time
b.append(pack('uint:16', self.keep_alive).bytes)
# Client ID
b.append(build_string(self.client_id))
if self.flags.will:
b.append(build_string(self.will_topic))
# Will message is a uint16 prefixed bytestring
b.append(pack('uint:16', len(self.will_message)).bytes)
b.append(self.will_message)
if self.flags.username:
b.append(build_string(self.username))
# Technically this should be binary data but we will only accept UTF-8
if self.flags.password:
b.append(build_string(self.password))
return b"".join(b)
def _make_payload(self):
"""
Build the payload from its constituent parts.
"""
b = []
# Protocol name (MQTT)
b.append(build_string("MQTT"))
# Protocol Level (4 == 3.1.1)
b.append(pack('uint:8', 4).bytes)
# CONNECT flags
b.append(self.flags.serialise())
# Keep Alive time
b.append(pack('uint:16', self.keep_alive).bytes)
# Client ID
b.append(build_string(self.client_id))
if self.flags.will:
b.append(build_string(self.will_topic))
# Keep Alive time
b.append(pack('uint:16', self.keep_alive).bytes)
# Client ID
b.append(build_string(self.client_id))
if self.flags.will:
b.append(build_string(self.will_topic))
# Will message is a uint16 prefixed bytestring
b.append(pack('uint:16', len(self.will_message)).bytes)
b.append(self.will_message)
if self.flags.username:
b.append(build_string(self.username))
# Technically this should be binary data but we will only accept UTF-8
if self.flags.password:
b.append(build_string(self.password))
return b"".join(b)
b = []
# Protocol name (MQTT)
b.append(build_string("MQTT"))
# Protocol Level (4 == 3.1.1)
b.append(pack('uint:8', 4).bytes)
# CONNECT flags
b.append(self.flags.serialise())
# Keep Alive time
b.append(pack('uint:16', self.keep_alive).bytes)
# Client ID
b.append(build_string(self.client_id))
if self.flags.will:
b.append(build_string(self.will_topic))
# Will message is a uint16 prefixed bytestring
b.append(pack('uint:16', len(self.will_message)).bytes)
b.append(self.will_message)
if self.flags.username:
b.append(build_string(self.username))
# Technically this should be binary data but we will only accept UTF-8
if self.flags.password:
b.append(build_string(self.password))
return b"".join(b)