Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:raises: :class:`.SteamError`
Sample response:
.. code:: python
{'1.2.3.4:27060': SteamID(id=123456, type='AnonGameServer', universe='Public', instance=1234)}
"""
resp = self._s.send_um_and_wait("GameServers.GetServerSteamIDsByIP#1",
{"server_ips": server_ips},
timeout=timeout,
)
if resp is None:
return None
if resp.header.eresult != EResult.OK:
raise SteamError(resp.header.error_message, resp.header.eresult)
return {server.addr: SteamID(server.steamid) for server in resp.body.servers}
is_enc_branch = False
if branch not in depots.get('branches', {}):
raise SteamError("No branch named %s for app_id %s" % (repr(branch), app_id))
elif int(depots['branches'][branch].get('pwdrequired', 0)) > 0:
is_enc_branch = True
if (app_id, branch) not in self.beta_passwords:
if not password:
raise SteamError("Branch %r requires a password" % branch)
result = self.check_beta_password(app_id, password)
if result != EResult.OK:
raise SteamError("Branch password is not valid. %r" % result)
if (app_id, branch) not in self.beta_passwords:
raise SteamError("Incorrect password for branch %r" % branch)
def async_fetch_manifest(app_id, depot_id, manifest_gid, decrypt, name):
manifest = self.get_manifest(app_id, depot_id, manifest_gid, decrypt)
manifest.name = name
return manifest
tasks = []
shared_depots = {}
for depot_id, depot_info in iteritems(depots):
if not depot_id.isdigit():
continue
"""
resp = self.steam.send_um_and_wait('PublishedFile.GetDetails#1', {
'publishedfileids': [item_id],
'includetags': False,
'includeadditionalpreviews': False,
'includechildren': False,
'includekvtags': False,
'includevotes': False,
'short_description': True,
'includeforsaledata': False,
'includemetadata': False,
'language': 0
}, timeout=7)
if resp.header.eresult != EResult.OK:
raise SteamError(resp.header.error_message or 'No message', resp.header.eresult)
wf = None if resp is None else resp.body.publishedfiledetails[0]
if wf is None or wf.result != EResult.OK:
raise SteamError("Failed getting workshop file info",
EResult.Timeout if resp is None else EResult(wf.result))
elif not wf.hcontent_file:
raise SteamError("Workshop file is not on SteamPipe", EResult.FileNotFound)
app_id = ws_app_id = wf.consumer_appid
manifest = self.get_manifest(app_id, ws_app_id, wf.hcontent_file)
manifest.name = wf.title
return manifest
if data[:2] == b'VZ':
if data[-2:] != b'zv':
raise SteamError("VZ: Invalid footer: %s" % repr(data[-2:]))
if data[2:3] != b'a':
raise SteamError("VZ: Invalid version: %s" % repr(data[2:3]))
vzfilter = lzma._decode_filter_properties(lzma.FILTER_LZMA1, data[7:12])
vzdec = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[vzfilter])
checksum, decompressed_size = struct.unpack('
server.host,
server.port,
command,
args,
)
try:
resp = self.web.get(url, timeout=10)
except Exception as exp:
self._LOG.debug("Request error: %s", exp)
else:
if resp.ok:
return resp
elif 400 <= resp.status_code < 500:
self._LOG.debug("Got HTTP %s", resp.status_code)
raise SteamError("HTTP Error %s" % resp.status_code)
self.steam.sleep(0.5)
server = self.get_content_server(rotate=True)
:param app_id: app id
:type app_id: int
:param depot_id: depot id
:type depot_id: int
:return: returns decryption key
:rtype: bytes
:raises SteamError: error message
"""
if depot_id not in self.depot_keys:
msg = self.steam.get_depot_key(app_id, depot_id)
if msg and msg.eresult == EResult.OK:
self.depot_keys[depot_id] = msg.depot_encryption_key
else:
raise SteamError("Failed getting depot key",
EResult.Timeout if msg is None else EResult(msg.eresult))
return self.depot_keys[depot_id]
'includechildren': False,
'includekvtags': False,
'includevotes': False,
'short_description': True,
'includeforsaledata': False,
'includemetadata': False,
'language': 0
}, timeout=7)
if resp.header.eresult != EResult.OK:
raise SteamError(resp.header.error_message or 'No message', resp.header.eresult)
wf = None if resp is None else resp.body.publishedfiledetails[0]
if wf is None or wf.result != EResult.OK:
raise SteamError("Failed getting workshop file info",
EResult.Timeout if resp is None else EResult(wf.result))
elif not wf.hcontent_file:
raise SteamError("Workshop file is not on SteamPipe", EResult.FileNotFound)
app_id = ws_app_id = wf.consumer_appid
manifest = self.get_manifest(app_id, ws_app_id, wf.hcontent_file)
manifest.name = wf.title
return manifest