How to use the pantalaimon.store.ServerUsers.get function in pantalaimon

To help you get started, we’ve selected a few pantalaimon examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github matrix-org / pantalaimon / pantalaimon / store.py View on Github external
def replace_fetcher_task(self, server, pan_user, old_task, new_task):
        server = Servers.get(name=server)
        user = ServerUsers.get(server=server, user_id=pan_user)

        PanFetcherTasks.delete().where(
            PanFetcherTasks.user == user,
            PanFetcherTasks.room_id == old_task.room_id,
            PanFetcherTasks.token == old_task.token,
        ).execute()

        PanFetcherTasks.replace(
            user=user, room_id=new_task.room_id, token=new_task.token
        ).execute()
github matrix-org / pantalaimon / pantalaimon / store.py View on Github external
def save_fetcher_task(self, server, pan_user, task):
        server = Servers.get(name=server)
        user = ServerUsers.get(server=server, user_id=pan_user)

        PanFetcherTasks.replace(
            user=user, room_id=task.room_id, token=task.token
        ).execute()
github matrix-org / pantalaimon / pantalaimon / store.py View on Github external
def save_token(self, server, pan_user, token):
        # type: (str, str, str) -> None
        """Save a sync token for a pan user."""
        server = Servers.get(name=server)
        user = ServerUsers.get(server=server, user_id=pan_user)

        PanSyncTokens.replace(user=user, token=token).execute()
github matrix-org / pantalaimon / pantalaimon / store.py View on Github external
def load_token(self, server, pan_user):
        # type: (str, str) -> Optional[str]
        """Load a sync token for a pan user.

        Returns the sync token if one is found.
        """
        server = Servers.get(name=server)
        user = ServerUsers.get(server=server, user_id=pan_user)

        token = PanSyncTokens.get_or_none(user=user)

        if token:
            return token.token

        return None
github matrix-org / pantalaimon / pantalaimon / store.py View on Github external
def load_fetcher_tasks(self, server, pan_user):
        server = Servers.get(name=server)
        user = ServerUsers.get(server=server, user_id=pan_user)

        tasks = []

        for t in user.fetcher_tasks:
            tasks.append(FetchTask(t.room_id, t.token))

        return tasks
github matrix-org / pantalaimon / pantalaimon / store.py View on Github external
def delete_fetcher_task(self, server, pan_user, task):
        server = Servers.get(name=server)
        user = ServerUsers.get(server=server, user_id=pan_user)

        PanFetcherTasks.delete().where(
            PanFetcherTasks.user == user,
            PanFetcherTasks.room_id == task.room_id,
            PanFetcherTasks.token == task.token,
        ).execute()