Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def delete_fetcher_task(self, server, pan_user, task):
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
PanFetcherTasks.delete().where(
PanFetcherTasks.user == user,
PanFetcherTasks.room_id == task.room_id,
PanFetcherTasks.token == task.token,
).execute()
def load_fetcher_tasks(self, server, pan_user):
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
tasks = []
for t in user.fetcher_tasks:
tasks.append(FetchTask(t.room_id, t.token))
return tasks
def replace_fetcher_task(self, server, pan_user, old_task, new_task):
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
PanFetcherTasks.delete().where(
PanFetcherTasks.user == user,
PanFetcherTasks.room_id == old_task.room_id,
PanFetcherTasks.token == old_task.token,
).execute()
PanFetcherTasks.replace(
user=user, room_id=new_task.room_id, token=new_task.token
).execute()
def save_fetcher_task(self, server, pan_user, task):
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
PanFetcherTasks.replace(
user=user, room_id=task.room_id, token=task.token
).execute()
def save_token(self, server, pan_user, token):
# type: (str, str, str) -> None
"""Save a sync token for a pan user."""
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
PanSyncTokens.replace(user=user, token=token).execute()