Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
challenge = req.body.challenge
key, resp.body.key = crypto.generate_session_key(challenge)
resp.body.crc = binascii.crc32(resp.body.key) & 0xffffffff
self.send(resp)
result = self.wait_event(EMsg.ChannelEncryptResult, timeout=5)
if result is None:
self.cm_servers.mark_bad(self.current_server_addr)
gevent.spawn(self.disconnect)
return
eresult = result[0].body.eresult
if eresult != EResult.OK:
self._LOG.error("Failed to secure channel: %s" % eresult)
gevent.spawn(self.disconnect)
return
self.channel_key = key
if challenge:
self._LOG.debug("Channel secured")
self.channel_hmac = key[:16]
else:
self._LOG.debug("Channel secured (legacy mode)")
self.channel_secured = True
self.emit(self.EVENT_CHANNEL_SECURED)
depots = self.get_app_depot_info(app_id)
is_enc_branch = False
if branch not in depots.get('branches', {}):
raise SteamError("No branch named %s for app_id %s" % (repr(branch), app_id))
elif int(depots['branches'][branch].get('pwdrequired', 0)) > 0:
is_enc_branch = True
if (app_id, branch) not in self.beta_passwords:
if not password:
raise SteamError("Branch %r requires a password" % branch)
result = self.check_beta_password(app_id, password)
if result != EResult.OK:
raise SteamError("Branch password is not valid. %r" % result)
if (app_id, branch) not in self.beta_passwords:
raise SteamError("Incorrect password for branch %r" % branch)
def async_fetch_manifest(app_id, depot_id, manifest_gid, decrypt, name):
manifest = self.get_manifest(app_id, depot_id, manifest_gid, decrypt)
manifest.name = name
return manifest
tasks = []
shared_depots = {}
for depot_id, depot_info in iteritems(depots):
if not depot_id.isdigit():
continue
:type steamid: :class:`int`, :class:`.SteamID`, :class:`.SteamUser`
:return: result
:rtype: :class:`EResult`
"""
if isinstance(steamid, SteamUser):
steamid = steamid.steam_id
elif not isinstance(steamid, SteamID):
steamid = SteamID(steamid)
resp = self._steam.send_um_and_wait("Player.IgnoreFriend#1",
{"steamid": steamid},
timeout=10)
if not resp:
return EResult.Timeout
elif resp.header.eresult == EResult.OK:
if steamid not in self._fr:
self._fr[steamid] = self._steam.get_user(steamid, False)
self._fr[steamid].relationship = EFriendRelationship(resp.body.friend_relationship)
return resp.header.eresult
def _handle_logon(self, msg):
CMClient._handle_logon(self, msg)
result = EResult(msg.body.eresult)
if result == EResult.OK:
self._reconnect_backoff_c = 0
self.logged_on = True
self.cell_id = msg.body.cell_id
self.emit(self.EVENT_LOGGED_ON)
return
# CM kills the connection on error anyway
self.disconnect()
if result == EResult.InvalidPassword:
self.login_key = None
if result in (EResult.AccountLogonDenied,
EResult.InvalidLoginAuthCode,
EResult.AccountLoginDeniedNeedTwoFactor,
EResult.TwoFactorCodeMismatch,
raise RuntimeError("Already logged on")
if not self.connected and not self._connecting:
if not self.connect():
return EResult.Fail
if not self.channel_secured:
resp = self.wait_event(self.EVENT_CHANNEL_SECURED, timeout=10)
# some CMs will not send hello
if resp is None:
if self.connected:
self.wait_event(self.EVENT_DISCONNECTED)
return EResult.TryAnotherCM
return EResult.OK
LOG.info("Logged on as: %s", client.user.name)
LOG.info("Community profile: %s", client.steam_id.community_url)
LOG.info("Last logon: %s", client.user.last_logon)
LOG.info("Last logoff: %s", client.user.last_logoff)
LOG.info("-"*30)
LOG.info("Press ^C to exit")
# main bit
LOG.info("Persistent logon recipe")
LOG.info("-"*30)
try:
result = client.cli_login()
if result != EResult.OK:
LOG.info("Failed to login: %s" % repr(result))
raise SystemExit
client.run_forever()
except KeyboardInterrupt:
if client.connected:
LOG.info("Logout")
client.logout()
message.body.obfustucated_private_ip = ip_to_int(self.connection.local_address) ^ 0xF00DBAAD
else:
message.body.obfustucated_private_ip = login_id
message.body.account_name = username
if login_key:
message.body.login_key = login_key
else:
message.body.password = password
sentry = self.get_sentry(username)
if sentry is None:
message.body.eresult_sentryfile = EResult.FileNotFound
else:
message.body.eresult_sentryfile = EResult.OK
message.body.sha_sentryfile = sha1_hash(sentry)
if auth_code:
message.body.auth_code = auth_code
if two_factor_code:
message.body.two_factor_code = two_factor_code
self.send(message)
resp = self.wait_msg(EMsg.ClientLogOnResponse, timeout=30)
if resp and resp.body.eresult == EResult.OK:
self.sleep(0.5)
return EResult(resp.body.eresult) if resp else EResult.Fail
message.body.obfuscated_private_ip.v4 = ip_to_int(self.connection.local_address) ^ 0xF00DBAAD
else:
message.body.obfuscated_private_ip.v4 = login_id
message.body.account_name = username
if login_key:
message.body.login_key = login_key
else:
message.body.password = password
sentry = self.get_sentry(username)
if sentry is None:
message.body.eresult_sentryfile = EResult.FileNotFound
else:
message.body.eresult_sentryfile = EResult.OK
message.body.sha_sentryfile = sha1_hash(sentry)
if auth_code:
message.body.auth_code = auth_code
if two_factor_code:
message.body.two_factor_code = two_factor_code
self.send(message)
resp = self.wait_msg(EMsg.ClientLogOnResponse, timeout=30)
if resp and resp.body.eresult == EResult.OK:
self.sleep(0.5)
return EResult(resp.body.eresult) if resp else EResult.Fail
def anonymous_login(self):
"""Login as anonymous user
:return: logon result, see `CMsgClientLogonResponse.eresult `_
:rtype: :class:`.EResult`
"""
self._LOG.debug("Attempting Anonymous login")
eresult = self._pre_login()
if eresult != EResult.OK:
return eresult
self.username = None
self.login_key = None
message = MsgProto(EMsg.ClientLogon)
message.header.steamid = SteamID(type='AnonUser', universe='Public')
message.body.protocol_version = 65579
self.send(message)
resp = self.wait_msg(EMsg.ClientLogOnResponse, timeout=30)
return EResult(resp.body.eresult) if resp else EResult.Fail