Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_big_list(self):
# As of writing this my list has ~150 friends. I don't plan on going below 100.
# If I should become a pariah or otherwise go under 100 this should probably be changed.
# TODO: Implement GetFriendList in steamodd proper
friends = api.interface("ISteamUser").GetFriendList(steamid = self.VALID_ID64)
testsids = [friend["steamid"] for friend in friends["friendslist"]["friends"]]
self.assertEqual(set(testsids), set(map(lambda x: str(x.id64), user.profile_batch(testsids))))
self.assertEqual(set(testsids), set(map(lambda x: str(x.id64), user.bans_batch(testsids))))
baseurl = user.strip('/').split('/')
if len(baseurl) > 0:
user = baseurl[-1]
if not user: raise steam.items.InventoryError("Need an ID")
try:
prof = database.user(user).load()
ctx = database.sim_context(prof).load()
return templates.sim_selector(prof, ctx)
except steam.items.InventoryError as E:
raise web.NotFound(error_page.generic("Failed to load backpack ({0})".format(E)))
except steam.user.ProfileError as E:
raise web.NotFound(error_page.generic("Failed to load profile ({0})".format(E)))
except steam.api.HTTPError as E:
raise web.NotFound(error_page.generic("Couldn't connect to Steam (HTTP {0})".format(E)))
except database.CacheEmptyError as E:
raise web.NotFound(error_page.generic(E))
def _build_client_schema_specials(self):
schema = self.load()
cs = schema.client_url
special = {}
if not cs:
return special
req = steam.api.http_downloader(str(cs), timeout = STEAM_TIMEOUT)
try:
clients = steam.vdf.loads(req.download())["items_game"]
except:
return special
prefabs = clients.get("prefabs", {})
colors = clients.get("colors", {})
csitems = clients.get("items", {})
for sitem in schema:
sid = sitem.schema_id
item = csitems.get(str(sid))
clientstuff = {}
if not item:
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""
import os, sys
import threading
try:
sys.path += sys.argv[1:]
except IndexError:
pass
import steam
from optf2 import models, config
steam.api.key.set(config.ini.get("steam", "api-key"))
class DumpThread(threading.Thread):
def __init__(self, scope, language):
threading.Thread.__init__(self)
self.scope = scope
self.language = language
def run(self):
schema = models.schema(scope = self.scope, lang = self.language)
schema.dump()
print("{0}-{1}: Finished".format(self.scope, self.language))
if __name__ == "__main__":
updatepairs = set()
def __init__(self, user, **kwargs):
self._cache = {}
try:
sid = user.id64
except:
sid = user
self._downloader = api.http_downloader("http://steamcommunity.com/profiles/{0}/inventory/".format(sid), **kwargs)
self._user = sid
def dump(self):
try:
schema = steam.items.schema(self._scope, lang = self._lang, timeout = STEAM_TIMEOUT)
self._schema = schema
self._build_paint_store()
self._build_particle_store()
self._build_quality_store()
self._build_item_store()
except steam.api.HTTPError:
self._schema = None
schema = None
except Exception as E:
self._schema = None
schema = None
if schema:
self._dump_atomically(self._schema_cache, pickle.dumps(schema, pickle.HIGHEST_PROTOCOL))
return schema
"""
Steam economy - Inventories, schemas, assets, etc.
Copyright (c) 2010-2013, Anthony Garcia
Distributed under the ISC License (see LICENSE)
"""
import time
import operator
from . import api, loc
class SchemaError(api.APIError):
pass
class AssetError(api.APIError):
pass
class InventoryError(api.APIError):
pass
class BadID64Error(InventoryError):
pass
class ProfilePrivateError(InventoryError):
"""
Remote storage/UGC
Copyright (c) 2010-2013, Anthony Garcia
Distributed under the ISC License (see LICENSE)
"""
from . import api
class UGCError(api.APIError):
pass
class FileNotFoundError(UGCError):
pass
class ugc_file(object):
""" Resolves a UGC file ID into usable metadata. """
@property
def size(self):
""" Size in bytes """
return self._data["size"]
@property