Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _load_context(self, user, event, before, after):
context = {}
if before > 0:
query = (
Event.select()
.join(UserMessages)
.where(
(Event.date <= event.date)
& (Event.room_id == event.room_id)
& (Event.id != event.id)
& (UserMessages.user == user)
)
.order_by(Event.date.desc())
.limit(before)
)
context["events_before"] = [e.source for e in query]
else:
context["events_before"] = []
if after > 0:
query = (
Event.select()
.join(UserMessages)
.where(
(Event.date >= event.date)
& (Event.room_id == event.room_id)
& (Event.room_id == event.room_id)
& (Event.id != event.id)
& (UserMessages.user == user)
)
.order_by(Event.date.desc())
.limit(before)
)
context["events_before"] = [e.source for e in query]
else:
context["events_before"] = []
if after > 0:
query = (
Event.select()
.join(UserMessages)
.where(
(Event.date >= event.date)
& (Event.room_id == event.room_id)
& (Event.id != event.id)
& (UserMessages.user == user)
)
.order_by(Event.date)
.limit(after)
)
context["events_after"] = [e.source for e in query]
else:
context["events_after"] = []
return context
class Meta:
constraints = [SQL("UNIQUE(event_id, room_id, sender, profile_id)")]
class UserMessages(Model):
user = ForeignKeyField(model=StoreUser, column_name="user_id")
event = ForeignKeyField(model=Event, column_name="event_id")
@attr.s
class MessageStore:
user = attr.ib(type=str)
store_path = attr.ib(type=str)
database_name = attr.ib(type=str)
database = attr.ib(type=SqliteDatabase, init=False)
database_path = attr.ib(type=str, init=False)
models = [StoreUser, Event, Profile, UserMessages]
def __attrs_post_init__(self):
self.database_path = os.path.join(
os.path.abspath(self.store_path), self.database_name
)
self.database = self._create_database()
self.database.connect()
with self.database.bind_ctx(self.models):
self.database.create_tables(self.models)
def _create_database(self):
return SqliteDatabase(
self.database_path, pragmas={"foreign_keys": 1, "secure_delete": 1}
)
search_result, # type: List[Tuple[int, int]]
include_profile=False, # type: bool
order_by_recent=False, # type: bool
before=0, # type: int
after=0, # type: int
):
# type: (...) -> Dict[Any, Any]
user, _ = StoreUser.get_or_create(user_id=self.user)
search_dict = {r[1]: r[0] for r in search_result}
columns = list(search_dict.keys())
result_dict = {"results": []}
query = (
UserMessages.select()
.where(
(UserMessages.user_id == user) & (UserMessages.event.in_(columns))
)
.execute()
)
for message in query:
event = message.event
event_dict = {
"rank": 1 if order_by_recent else search_dict[event.id],
"result": event.source,
"context": {},
}
order_by_recent=False, # type: bool
before=0, # type: int
after=0, # type: int
):
# type: (...) -> Dict[Any, Any]
user, _ = StoreUser.get_or_create(user_id=self.user)
search_dict = {r[1]: r[0] for r in search_result}
columns = list(search_dict.keys())
result_dict = {"results": []}
query = (
UserMessages.select()
.where(
(UserMessages.user_id == user) & (UserMessages.event.in_(columns))
)
.execute()
)
for message in query:
event = message.event
event_dict = {
"rank": 1 if order_by_recent else search_dict[event.id],
"result": event.source,
"context": {},
}
if include_profile:
event_profile = event.profile
Event.insert(
event_id=event.event_id,
sender=event.sender,
date=datetime.datetime.fromtimestamp(event.server_timestamp / 1000),
room_id=room_id,
source=event_source,
profile=profile_id,
)
.on_conflict_ignore()
.execute()
)
if event_id <= 0:
return None
_, created = UserMessages.get_or_create(user=user, event=event_id)
if created:
return event_id
return None