Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'secure': True,
'steamid': SteamID(id=3279818759, type='AnonGameServer', universe='Public', instance=7011),
'version': '1.35.4.0'}
]
"""
resp = self._s.send_um_and_wait("GameServers.GetServerList#1",
{
"filter": filter_text,
"limit": max_servers,
},
timeout=20,
)
if resp is None:
return None
if resp.header.eresult != EResult.OK:
raise SteamError(resp.header.error_message, resp.header.eresult)
resp = proto_to_dict(resp.body)
if not resp:
return []
else:
for server in resp['servers']:
server['steamid'] = SteamID(server['steamid'])
return resp['servers']
:type password: str
:returns: result
:rtype: :class:`.EResult`
"""
resp = self.steam.send_job_and_wait(MsgProto(EMsg.ClientCheckAppBetaPassword),
{'app_id': app_id, 'betapassword': password})
if resp.eresult == EResult.OK:
self._LOG.debug("Unlocked following beta branches: %s",
', '.join(map(lambda x: x.betaname.lower(), resp.betapasswords)))
for entry in resp.betapasswords:
self.beta_passwords[(app_id, entry.betaname.lower())] = unhexlify(entry.betapassword)
else:
self._LOG.debug("App beta password check failed. %r" % EResult(resp.eresult))
return EResult(resp.eresult)
.. note::
First request change mail via :meth:`request_password_change_mail()`
to get the email code
"""
message = Msg(EMsg.ClientPasswordChange3, extended=True)
message.body.password = password
message.body.new_password = new_password
message.body.code = email_code
resp = self.send_job_and_wait(message, timeout=10)
if resp is None:
return EResult.Timeout
else:
return EResult(resp.eresult)
"""Request password change mail
:param password: current account password
:type password: :class:`str`
:return: result
:rtype: :class:`.EResult`
"""
message = Msg(EMsg.ClientRequestChangeMail, extended=True)
message.body.password = password
resp = self.send_job_and_wait(message, timeout=10)
if resp is None:
return EResult.Timeout
else:
return EResult(resp.eresult)
:return: leaderboard instance
:rtype: :class:`SteamLeaderboard`
:raises: :class:`LookupError` on message timeout or error
"""
message = MsgProto(EMsg.ClientLBSFindOrCreateLB)
message.header.routing_appid = app_id
message.body.app_id = app_id
message.body.leaderboard_name = name
message.body.create_if_not_found = False
resp = self.send_job_and_wait(message, timeout=15)
if not resp:
raise LookupError("Didn't receive response within 15seconds :(")
if resp.eresult != EResult.OK:
raise LookupError(EResult(resp.eresult))
return SteamLeaderboard(self, app_id, name, resp)
:param cellid: cell id (0 = global)
:type cellid: :class:`int`
:return: booststrap success
:rtype: :class:`bool`
"""
self._LOG.debug("Attempting bootstrap via WebAPI")
from steam import webapi
try:
resp = webapi.get('ISteamDirectory', 'GetCMList', 1, params={'cellid': cell_id,
'http_timeout': 3})
except Exception as exp:
self._LOG.error("WebAPI boostrap failed: %s" % str(exp))
return False
result = EResult(resp['response']['result'])
if result != EResult.OK:
self._LOG.error("GetCMList failed with %s" % repr(result))
return False
serverlist = resp['response']['serverlist']
self._LOG.debug("Recieved %d servers from WebAPI" % len(serverlist))
def str_to_tuple(serveraddr):
ip, port = serveraddr.split(':')
return str(ip), int(port)
self.clear()
self.cell_id = cell_id
self.merge_list(map(str_to_tuple, serverlist))
def wrap_match_details(message):
eresult = EResult(message.result)
match = message.match if eresult == EResult.OK else None
self.emit('match_details', match_id, eresult, match)
def get_depot_key(self, app_id, depot_id):
"""Get depot key, which is needed to decrypt files
:param app_id: app id
:type app_id: int
:param depot_id: depot id
:type depot_id: int
:return: returns decryption key
:rtype: bytes
:raises SteamError: error message
"""
if depot_id not in self.depot_keys:
msg = self.steam.get_depot_key(app_id, depot_id)
if msg and msg.eresult == EResult.OK:
self.depot_keys[depot_id] = msg.depot_encryption_key
else:
raise SteamError("Failed getting depot key",
EResult.Timeout if msg is None else EResult(msg.eresult))
return self.depot_keys[depot_id]
if self.logged_on:
self._LOG.debug("Trying to login while logged on???")
raise RuntimeError("Already logged on")
if not self.connected and not self._connecting:
if not self.connect():
return EResult.Fail
if not self.channel_secured:
resp = self.wait_event(self.EVENT_CHANNEL_SECURED, timeout=10)
# some CMs will not send hello
if resp is None:
if self.connected:
self.wait_event(self.EVENT_DISCONNECTED)
return EResult.TryAnotherCM
return EResult.OK
raise RuntimeError("Already logged on")
if not self.connected and not self._connecting:
if not self.connect():
return EResult.Fail
if not self.channel_secured:
resp = self.wait_event(self.EVENT_CHANNEL_SECURED, timeout=10)
# some CMs will not send hello
if resp is None:
if self.connected:
self.wait_event(self.EVENT_DISCONNECTED)
return EResult.TryAnotherCM
return EResult.OK