Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
categories = Set(Category)
additional_info = Optional(str)
times_played = Required(int, default=0)
times_solved = Required(int, default=0)
vote_up = Required(int, default=0)
vote_down = Required(int, default=0)
date_added = Required(datetime, sql_default="CURRENT_TIMESTAMP")
date_modified = Required(datetime, sql_default="CURRENT_TIMESTAMP")
last_played = Required(datetime, sql_default="CURRENT_TIMESTAMP")
rounds = Set("Round")
reports = Set("Report")
player = Optional("Player")
def __str__(self):
return "{} *** {}".format(self.question, self.primary_answer)
@property
def primary_answer(self):
return self.answer.split("|")[0]
@property
def answer_re(self):
if not hasattr(self, "_answer_re"):
answers = map(lambda a: re.escape(a), self.answer.split("|"))
pattern = r"\b{}\b".format("|".join(answers))
self._answer_re = re.compile(pattern, re.IGNORECASE)
return self._answer_re
# arbitrary timezone, for pagination
local_date = orm.Required(datetime.datetime)
# The actual displayable date - stored as string to guarantee the timezone
# is maintained
display_date = orm.Required(str)
slug_text = orm.Optional(str)
entry_type = orm.Optional(str)
redirect_url = orm.Optional(str) # maps to Redirect-To
title = orm.Optional(str)
sort_title = orm.Optional(str)
aliases = orm.Set("PathAlias")
tags = orm.Set("EntryTag")
auth = orm.Set("EntryAuth")
auth_log = orm.Set("AuthLog")
attachments = orm.Set("Entry", reverse="attached")
attached = orm.Set("Entry", reverse="attachments")
canonical_path = orm.Optional(str)
orm.composite_index(category, entry_type, utc_date)
orm.composite_index(category, entry_type, local_date)
orm.composite_index(category, entry_type, sort_title)
@property
def visible(self) -> bool:
"start",
"unlock",
"next",
]
name = Required(str, NAME_MAX_LEN, unique=True)
password_hash = Optional(str, 200)
email = Optional(str, 200)
permissions = Required(int, default=0)
date_joined = Required(datetime, sql_default="CURRENT_TIMESTAMP")
last_played = Required(datetime, sql_default="CURRENT_TIMESTAMP")
rounds_solved = Set("Round")
submitted_reports = Set("Report")
submitted_questions = Set(Question)
def __str__(self):
return "{} (#{})".format(self.name, self.id)
def logged_in(self):
self.last_played = datetime.utcnow()
def has_password(self):
return bool(self.password_hash)
def set_password(self, password):
self.password_hash = bcrypt_sha256.encrypt(password, rounds=self.BCRYPT_ROUNDS)
def check_password(self, password):
if not self.has_password():
return True
"stop",
"start",
"unlock",
"next",
]
name = Required(str, NAME_MAX_LEN, unique=True)
password_hash = Optional(str, 200)
email = Optional(str, 200)
permissions = Required(int, default=0)
date_joined = Required(datetime, sql_default="CURRENT_TIMESTAMP")
last_played = Required(datetime, sql_default="CURRENT_TIMESTAMP")
rounds_solved = Set("Round")
submitted_reports = Set("Report")
submitted_questions = Set(Question)
def __str__(self):
return "{} (#{})".format(self.name, self.id)
def logged_in(self):
self.last_played = datetime.utcnow()
def has_password(self):
return bool(self.password_hash)
def set_password(self, password):
self.password_hash = bcrypt_sha256.encrypt(password, rounds=self.BCRYPT_ROUNDS)
def check_password(self, password):
if not self.has_password():
commit = orm.commit
rollback = orm.rollback
select = orm.select
desc = orm.desc
class User(db.Entity):
_table_ = 'users'
id = orm.PrimaryKey(int)
name = orm.Required(str, unique=True)
passhash = orm.Required(str)
can_manage = orm.Required(bool)
can_download_artifacts = orm.Required(bool)
can_view_buildlogs = orm.Required(bool)
login_tokens = orm.Set('LoginToken')
def __init__(self, **kwargs):
if 'id' not in kwargs:
kwargs['id'] = (orm.max(x.id for x in User) or 0) + 1
super().__init__(**kwargs)
def set_password(self, password):
self.passhash = utils.hash_pw(password)
@classmethod
def get_by_login_details(cls, user_name, password):
passhash = utils.hash_pw(password)
return orm.select(x for x in cls if x.name == user_name and
x.passhash == passhash).first()
@classmethod
def uri(self):
return f"spotify:user:{self.id}"
@property
def href(self):
return f"https://api.spotify.com/v1/users/{self.id}"
@property
def external_url(self):
return f"http://open.spotify.com/user/{self.id}"
class Genre(db.Entity, ImageMixin):
_table_ = "genres"
name = PrimaryKey(str)
artists = Set("Artist")
playlists = Set("Playlist")
fans = Set(User, reverse="top_genres", table="genre_fans")
haters = Set(User, reverse="disliked_genres", table="genre_haters")
images = Set(Image, cascade_delete=True)
def play(self, client, device=None):
popularity = random.choice(list(Playlist.Popularity)[:3])
playlist = client.genre_playlist(self.name, popularity)
return playlist.play(device=device)
class Country(db.Entity, ImageMixin):
_table_ = "countries"
code = PrimaryKey(str, max_len=2)
name = Required(str, index=True)
users = Set("User", reverse="country")
DEBUG = True
app = Flask(__name__)
app.config.from_object(Settings)
### Models ###
db = orm.Database('sqlite', 'inventory.db', create_db=True)
class Person(db.Entity):
_table_ = 'people'
firstname = orm.Required(unicode, 80, nullable=False)
lastname = orm.Required(unicode, 80, nullable=False)
created = orm.Required(datetime, default=datetime.utcnow)
items = orm.Set("Item")
@property
def n_items(self):
return orm.count(item for item in self.items)
def __repr__(self):
return "".format(self.firstname, self.lastname)
class Item(db.Entity):
_table_ = 'items'
name = orm.Required(unicode, 100, nullable=False)
person = orm.Optional(Person)
checked_out = orm.Required(bool, default=False)
updated = orm.Required(datetime, default=datetime.utcnow)
if op in ('==', '<>', '!='):
if type1 is NoneType or type2 is NoneType: return True
elif type1 in primitive_types: return type1 is type2
elif isinstance(type1, orm.EntityMeta): return type1._root_ is type2._root_
else: return False
if op in ['in', 'not in']:
if type1 in primitive_types:
if type(type2) in (tuple, list): return set(type2) == set((type1,))
elif isinstance(type2, orm.Set): return type1 is type2.py_type
else: return False
elif isinstance(type1, orm.EntityMeta):
if type(type2) in (tuple, list):
for t in type2:
if not isinstance(t, orm.EntityMeta) or type1._root_ is not t._root_: return False
return True
elif isinstance(type2, orm.Set):
t = type2.py_type
return isinstance(t, orm.EntityMeta) or type1._root_ is not t._root_
else: return False
else: return False
token = Required(Json, volatile=True)
spotify_premium = Required(bool)
api_calls = Required(int, default=0, sql_default="0", volatile=True)
created_at = Required(
datetime, default=datetime.utcnow, sql_default=SQL_DEFAULT.now
)
last_usage_at = Required(
datetime, default=datetime.utcnow, sql_default=SQL_DEFAULT.now, volatile=True
)
images = Set("Image", cascade_delete=True)
top_artists = Set("Artist")
disliked_artists = Set("Artist")
top_genres = Set("Genre")
disliked_genres = Set("Genre")
top_countries = Set("Country")
disliked_countries = Set("Country")
top_cities = Set("City")
disliked_cities = Set("City")
top_expires_at = Required(Json, volatile=True, default=dict, sql_default="'{}'")
# pylint: disable=arguments-differ,signature-differs
def to_dict(self, *args, **kwargs):
_dict = super().to_dict(*args, **kwargs)
if "id" in _dict:
_dict["id"] = str(_dict["id"])
return _dict
@classmethod
async def from_dict_async(cls, user):
if user.images:
try:
color = await Image.grab_color_async(user.images[-1].url)
else:
if user and auth.user_group in user.auth_groups:
result = auth.allowed
LOGGER.debug(" result->%s", result)
LOGGER.debug("Final result: %s", result)
return result
class EntryTag(DbEntity):
""" Tags for an entry """
key = orm.PrimaryKey(str)
name = orm.Required(str)
entries = orm.Set(Entry)
class Category(DbEntity):
""" Metadata for a category """
category = orm.Optional(str)
file_path = orm.Required(str)
sort_name = orm.Optional(str)
aliases = orm.Set("PathAlias")
class PathAlias(DbEntity):
""" Path alias mapping """
path = orm.PrimaryKey(str)
entry = orm.Optional(Entry)