How to use the aiocqhttp.bus.EventBus function in aiocqhttp

To help you get started, we’ve selected a few aiocqhttp examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github richardchien / python-aiocqhttp / aiocqhttp / __init__.py View on Github external
def __init__(self,
                 api_root: Optional[str] = None,
                 access_token: Optional[str] = None,
                 secret: Optional[AnyStr] = None,
                 enable_http_post: bool = True,
                 message_class: type = None,
                 *args, **kwargs):
        self._access_token = access_token
        self._secret = secret
        self._bus = EventBus()
        self._server_app = Quart(__name__)

        if enable_http_post:
            self._server_app.route('/', methods=['POST'])(
                self._handle_http_event)

        self._server_app.websocket('/ws/event/')(self._handle_ws_reverse_event)
        self._server_app.websocket('/ws/api/')(self._handle_ws_reverse_api)
        self._connected_ws_reverse_api_clients = {}

        self._api = UnifiedApi(http_api=HttpApi(api_root, access_token),
                               ws_reverse_api=WebSocketReverseApi(
                                   self._connected_ws_reverse_api_clients))

        self._message_class = message_class
github richardchien / python-aiocqhttp / aiocqhttp / __init__.py View on Github external
def __init__(self,
                 api_root: Optional[str] = None,
                 access_token: Optional[str] = None,
                 secret: Optional[AnyStr] = None,
                 enable_http_post: bool = True,
                 message_class: type = None,
                 *args, **kwargs):
        self._access_token = access_token
        self._secret = secret
        self._bus = EventBus()
        self._server_app = Quart(__name__)

        if enable_http_post:
            self._server_app.route('/', methods=['POST'])(
                self._handle_http_event)

        self._server_app.websocket('/ws/')(self._handle_ws_reverse)
        self._server_app.websocket('/ws/event/')(self._handle_ws_reverse)
        self._server_app.websocket('/ws/api/')(self._handle_ws_reverse)
        self._connected_ws_reverse_api_clients = {}

        self._api = UnifiedApi(http_api=HttpApi(api_root, access_token),
                               ws_reverse_api=WebSocketReverseApi(
                                   self._connected_ws_reverse_api_clients))

        self._message_class = message_class