Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def replace_fetcher_task(self, server, pan_user, old_task, new_task):
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
PanFetcherTasks.delete().where(
PanFetcherTasks.user == user,
PanFetcherTasks.room_id == old_task.room_id,
PanFetcherTasks.token == old_task.token,
).execute()
PanFetcherTasks.replace(
user=user, room_id=new_task.room_id, token=new_task.token
).execute()
def replace_fetcher_task(self, server, pan_user, old_task, new_task):
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
PanFetcherTasks.delete().where(
PanFetcherTasks.user == user,
PanFetcherTasks.room_id == old_task.room_id,
PanFetcherTasks.token == old_task.token,
).execute()
PanFetcherTasks.replace(
user=user, room_id=new_task.room_id, token=new_task.token
).execute()
@attr.s
class PanStore:
store_path = attr.ib(type=str)
database_name = attr.ib(type=str, default="pan.db")
database = attr.ib(type=SqliteDatabase, init=False)
database_path = attr.ib(type=str, init=False)
models = [
Accounts,
AccessTokens,
Servers,
ServerUsers,
DeviceKeys,
DeviceTrustState,
PanSyncTokens,
PanFetcherTasks,
]
def __attrs_post_init__(self):
self.database_path = os.path.join(
os.path.abspath(self.store_path), self.database_name
)
self.database = self._create_database()
self.database.connect()
with self.database.bind_ctx(self.models):
self.database.create_tables(self.models)
def _create_database(self):
return SqliteDatabase(
self.database_path, pragmas={"foreign_keys": 1, "secure_delete": 1}
def replace_fetcher_task(self, server, pan_user, old_task, new_task):
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
PanFetcherTasks.delete().where(
PanFetcherTasks.user == user,
PanFetcherTasks.room_id == old_task.room_id,
PanFetcherTasks.token == old_task.token,
).execute()
PanFetcherTasks.replace(
user=user, room_id=new_task.room_id, token=new_task.token
).execute()
def replace_fetcher_task(self, server, pan_user, old_task, new_task):
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
PanFetcherTasks.delete().where(
PanFetcherTasks.user == user,
PanFetcherTasks.room_id == old_task.room_id,
PanFetcherTasks.token == old_task.token,
).execute()
PanFetcherTasks.replace(
user=user, room_id=new_task.room_id, token=new_task.token
).execute()
def delete_fetcher_task(self, server, pan_user, task):
server = Servers.get(name=server)
user = ServerUsers.get(server=server, user_id=pan_user)
PanFetcherTasks.delete().where(
PanFetcherTasks.user == user,
PanFetcherTasks.room_id == task.room_id,
PanFetcherTasks.token == task.token,
).execute()