Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read_status(self):
request = Connection()
request.write_varint(0) # Request status
self.connection.write_buffer(request)
response = self.connection.read_buffer()
if response.read_varint() != 0:
raise IOError("Received invalid status response packet.")
try:
raw = json.loads(response.read_utf())
except ValueError:
raise IOError("Received invalid JSON")
try:
return PingResponse(raw)
except ValueError as e:
raise IOError("Received invalid status response: %s" % e)
def test_ping(self):
request = Connection()
request.write_varint(1) # Test ping
request.write_long(self.ping_token)
sent = datetime.datetime.now()
self.connection.write_buffer(request)
response = self.connection.read_buffer()
received = datetime.datetime.now()
if response.read_varint() != 1:
raise IOError("Received invalid ping response packet.")
received_token = response.read_long()
if received_token != self.ping_token:
raise IOError("Received mangled ping response packet (expected token %d, received %d)" % (
self.ping_token, received_token))
delta = (received - sent)
# We have no trivial way of getting a time delta :(
if len(new) == 0:
raise IOError("Server did not respond with any information!")
result.extend(new)
return result
def write(self, data):
self.socket.send(data)
def __del__(self):
try:
self.socket.close()
except:
pass
class UDPSocketConnection(Connection):
def __init__(self, addr, timeout=3):
Connection.__init__(self)
self.addr = addr
self.socket = socket.socket(socket.AF_INET if ip_type(addr[0]) == 4 else socket.AF_INET6, socket.SOCK_DGRAM)
self.socket.settimeout(timeout)
def flush(self):
raise TypeError("UDPSocketConnection does not support flush()")
def receive(self, data):
raise TypeError("UDPSocketConnection does not support receive()")
def remaining(self):
return 65535
def read(self, length):
def _read_packet(self):
packet = Connection()
packet.receive(self.connection.read(self.connection.remaining()))
packet.read(1 + 4)
return packet
def _create_packet(self, id):
packet = Connection()
packet.write(self.MAGIC_PREFIX)
packet.write(struct.pack("!B", id))
packet.write_uint(0)
packet.write_int(self.challenge)
return packet
def read_buffer(self):
length = self.read_varint()
result = Connection()
result.receive(self.read(length))
return result
def handshake(self):
packet = Connection()
packet.write_varint(0)
packet.write_varint(self.version)
packet.write_utf(self.host)
packet.write_ushort(self.port)
packet.write_varint(1) # Intention to query status
self.connection.write_buffer(packet)