Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import base64
from twisted.internet import reactor, defer
from cached_property import cached_property
from quarry.net.protocol import Factory, Protocol, ProtocolError, \
protocol_modes
from quarry.net import auth, crypto
from quarry.types.uuid import UUID
class ServerProtocol(Protocol):
"""This class represents a connection with a client"""
recv_direction = "upstream"
send_direction = "downstream"
uuid = None
display_name = None
display_name_confirmed = False
# the hostname/port that the client claims it connected to. Useful for
# implementing virtual hosting.
connect_host = None
connect_port = None
# used to stop people breaking the login process
# by sending packets out-of-order or duplicated
"disconnect",
self.buff_type.pack_chat(reason))
super(ServerProtocol, self).close(reason)
if self.safe_kick:
self.safe_kick.addCallback(real_kick)
else:
real_kick()
else:
if self.protocol_mode == "login":
self.send_packet(
"login_disconnect",
self.buff_type.pack_chat(reason))
Protocol.close(self, reason)
else:
Protocol.close(self, reason)
from twisted.internet import reactor, protocol, defer
from twisted.python import failure
from quarry.net.protocol import Factory, Protocol, ProtocolError, \
protocol_modes_inv
from quarry.net import auth, crypto
class ClientProtocol(Protocol):
"""This class represents a connection to a server"""
recv_direction = "downstream"
send_direction = "upstream"
# Convenience functions ---------------------------------------------------
def switch_protocol_mode(self, mode):
self.check_protocol_mode_switch(mode)
if mode in ("status", "login"):
# Send handshake
addr = self.transport.connector.getDestination()
self.send_packet(
"handshake",
self.buff_type.pack_varint(self.protocol_version) +
def __init__(self, factory, remote_addr):
Protocol.__init__(self, factory, remote_addr)
self.server_id = crypto.make_server_id()
self.verify_token = crypto.make_verify_token()
def player_left(self):
"""Called when we leave the game."""
Protocol.player_left(self)
self.logger.info("Left the game.")