Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
packet_buff.add(packet_body)
try: # Catch protocol errors
try: # Catch buffer overrun/underrun
if self.compression_enabled:
uncompressed_length = packet_buff.unpack_varint()
if uncompressed_length > 0:
data = zlib.decompress(packet_buff.unpack_all())
packet_buff = Buffer()
packet_buff.add(data)
ident = packet_buff.unpack_varint()
self.packet_received(packet_buff, ident)
except BufferUnderrun:
raise ProtocolError("Packet is too short!")
if packet_buff.length() > 0:
raise ProtocolError("Packet is too long!")
except ProtocolError as e:
self.protocol_error(e)
break
# We've read a complete packet, so reset the inactivity timeout
self.connection_timer.restart()
def get_packet_name(self, ident):
key = (self.protocol_version, self.protocol_mode, self.recv_direction,
ident)
try:
return packets.packet_names[key]
except KeyError:
raise ProtocolError("No name known for packet: %s" % (key,))
self.buff_type,
self.compression_threshold)
except BufferUnderrun:
self.recv_buff.restore()
break
try:
# Identify the packet
name = self.get_packet_name(buff.unpack_varint())
# Dispatch the packet
try:
self.packet_received(buff, name)
except BufferUnderrun:
raise ProtocolError("Packet is too short: %s" % name)
if len(buff) > 0:
raise ProtocolError("Packet is too long: %s" % name)
# Reset the inactivity timer
self.connection_timer.restart()
except ProtocolError as e:
self.protocol_error(e)
def get_packet_ident(self, name):
key = (self.protocol_version, self.protocol_mode, self.send_direction,
name)
try:
return packets.packet_idents[key]
except KeyError:
raise ProtocolError("No ID known for packet: %s" % (key,))
def check_protocol_mode_switch(self, mode):
transitions = [
("init", "status"),
("init", "login"),
("login", "play")
]
if (self.protocol_mode, mode) not in transitions:
raise ProtocolError("Cannot switch protocol mode from %s to %s"
% (self.protocol_mode, mode))
try: # Catch buffer overrun/underrun
if self.compression_enabled:
uncompressed_length = packet_buff.unpack_varint()
if uncompressed_length > 0:
data = zlib.decompress(packet_buff.unpack_all())
packet_buff = Buffer()
packet_buff.add(data)
ident = packet_buff.unpack_varint()
self.packet_received(packet_buff, ident)
except BufferUnderrun:
raise ProtocolError("Packet is too short!")
if packet_buff.length() > 0:
raise ProtocolError("Packet is too long!")
except ProtocolError as e:
self.protocol_error(e)
break
# We've read a complete packet, so reset the inactivity timeout
self.connection_timer.restart()
except BufferUnderrun:
self.recv_buff.restore()
break
try:
# Identify the packet
name = self.get_packet_name(buff.unpack_varint())
# Dispatch the packet
try:
self.packet_received(buff, name)
except BufferUnderrun:
raise ProtocolError("Packet is too short: %s" % name)
if len(buff) > 0:
raise ProtocolError("Packet is too long: %s" % name)
# Reset the inactivity timer
self.connection_timer.restart()
except ProtocolError as e:
self.protocol_error(e)
def check_protocol_mode_switch(self, mode):
transitions = [
("init", "status"),
("init", "login"),
("login", "play")
]
if (self.protocol_mode, mode) not in transitions:
raise ProtocolError("Cannot switch protocol mode from %s to %s"
% (self.protocol_mode, mode))