Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def server(self, ctx, server_ip: str):
"""Get info about server"""
try:
server = await self.bot.loop.run_in_executor(None, MinecraftServer.lookup, server_ip)
except Exception as e:
await ctx.send(chat.error(_("Unable to resolve IP: {}").format(e)))
return
async with ctx.channel.typing():
try:
status = await self.bot.loop.run_in_executor(None, server.status)
except OSError as e:
await ctx.send(chat.error(_("Unable to get server's status: {}").format(e)))
return
try:
query = await self.bot.loop.run_in_executor(None, server.query)
except (ConnectionResetError, OSError):
query = None
icon = (
discord.File(BytesIO(b64decode(status.favicon.split(",", 1)[1])), filename="icon.png")
if status.favicon
json_response = None
with open('config.yml', 'r') as cfg_file:
servers_config = yaml.load(cfg_file)
# c = 0.0
for category in servers_config:
print category
data[category] = {}
for server in servers_config[category]:
print "- " + server + ": " + servers_config[category][server]
ip = servers_config[category][server]
if "/" not in ip:
ip += "/25565"
status = mcstatus.McServer(ip.split("/")[0], ip.split("/")[1])
# c += 1
data[category][server] = status
def update_all():
# i = 0.0
for category in data:
# d = 5.0 / c
for server in data[category]:
# i += 1.0
status = data[category][server]
threading.Thread(target=lambda: status.Update()).start()
def sort_dict_by_key(to_sort):
return OrderedDict(sorted(to_sort.items(), key=lambda t: t[0]))
def generate_json():
def read_status(self):
request = Connection()
request.write_varint(0) # Request status
self.connection.write_buffer(request)
response = self.connection.read_buffer()
if response.read_varint() != 0:
raise IOError("Received invalid status response packet.")
try:
raw = json.loads(response.read_utf())
except ValueError:
raise IOError("Received invalid JSON")
try:
return PingResponse(raw)
except ValueError as e:
raise IOError("Received invalid status response: %s" % e)
def test_ping(self):
request = Connection()
request.write_varint(1) # Test ping
request.write_long(self.ping_token)
sent = datetime.datetime.now()
self.connection.write_buffer(request)
response = self.connection.read_buffer()
received = datetime.datetime.now()
if response.read_varint() != 1:
raise IOError("Received invalid ping response packet.")
received_token = response.read_long()
if received_token != self.ping_token:
raise IOError("Received mangled ping response packet (expected token %d, received %d)" % (
self.ping_token, received_token))
delta = (received - sent)
# We have no trivial way of getting a time delta :(
if len(new) == 0:
raise IOError("Server did not respond with any information!")
result.extend(new)
return result
def write(self, data):
self.socket.send(data)
def __del__(self):
try:
self.socket.close()
except:
pass
class UDPSocketConnection(Connection):
def __init__(self, addr, timeout=3):
Connection.__init__(self)
self.addr = addr
self.socket = socket.socket(socket.AF_INET if ip_type(addr[0]) == 4 else socket.AF_INET6, socket.SOCK_DGRAM)
self.socket.settimeout(timeout)
def flush(self):
raise TypeError("UDPSocketConnection does not support flush()")
def receive(self, data):
raise TypeError("UDPSocketConnection does not support receive()")
def remaining(self):
return 65535
def read(self, length):
def _read_packet(self):
packet = Connection()
packet.receive(self.connection.read(self.connection.remaining()))
packet.read(1 + 4)
return packet
def _create_packet(self, id):
packet = Connection()
packet.write(self.MAGIC_PREFIX)
packet.write(struct.pack("!B", id))
packet.write_uint(0)
packet.write_int(self.challenge)
return packet
def read_buffer(self):
length = self.read_varint()
result = Connection()
result.receive(self.read(length))
return result
def handshake(self):
packet = Connection()
packet.write_varint(0)
packet.write_varint(self.version)
packet.write_utf(self.host)
packet.write_ushort(self.port)
packet.write_varint(1) # Intention to query status
self.connection.write_buffer(packet)
def api_world_status(world: minecraft.World):
"""Returns JSON containing info about the given world, including whether the server is running, the current Minecraft version, and the list of people who are online. Requires mcstatus."""
import mcstatus
result = api.util2.short_world_status(world)
server = mcstatus.MinecraftServer.lookup(api.util.CONFIG['worldHost'] if world.is_main else '{}.{}'.format(world, api.util.CONFIG['worldHost']))
try:
status = server.status()
except ConnectionRefusedError:
result['list'] = []
else:
result['list'] = [str(api.util2.Player(player.id)) for player in (status.players.sample or [])]
return result