Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def message_callback(self):
try:
message = self.receive_queue.get_nowait()
except Empty:
return True
logger.debug(f"UI loop received message {message}")
if isinstance(message, UpdateDevicesMessage):
self.device_if.update_devices(message)
elif isinstance(message, UpdateUsersMessage):
self.control_if.update_users(message)
elif isinstance(message, UnverifiedDevicesSignal):
self.control_if.UnverifiedDevices(
message.pan_user, message.room_id, message.room_display_name
)
if self.notifications:
self.unverified_notification(message)
elif isinstance(message, InviteSasSignal):
def find_proxy_by_user(user):
# type: (str) -> Optional[ProxyDaemon]
for proxy in proxies:
if user in proxy.pan_clients:
return proxy
return None
async def send_info(message_id, pan_user, code, string):
message = DaemonResponse(message_id, pan_user, code, string)
await send_queue.put(message)
while True:
message = await receive_queue.get()
logger.debug(f"Router got message {message}")
proxy = find_proxy_by_user(message.pan_user)
if not proxy:
msg = f"No pan client found for {message.pan_user}."
logger.warn(msg)
await send_info(
message.message_id, message.pan_user, "m.unknown_client", msg
)
await proxy.receive_message(message)
self.fetch_loop_event.clear()
try:
await asyncio.sleep(self.pan_conf.history_fetch_delay)
fetch_task = await self.history_fetch_queue.get()
try:
room = self.rooms[fetch_task.room_id]
except KeyError:
# The room is missing from our client, we probably left the
# room.
self.delete_fetcher_task(fetch_task)
continue
try:
logger.debug(
f"Fetching room history for {room.display_name} "
f"({room.room_id}), token {fetch_task.token}."
)
response = await self.room_messages(
fetch_task.room_id,
fetch_task.token,
limit=self.pan_conf.indexing_batch_size,
)
except ClientConnectionError as e:
logger.debug("Error fetching room history: ", e)
await self.history_fetch_queue.put(fetch_task)
# The chunk was empty, we're at the start of the timeline.
if not response.chunk:
self.delete_fetcher_task(fetch_task)
continue
# room.
self.delete_fetcher_task(fetch_task)
continue
try:
logger.debug(
f"Fetching room history for {room.display_name} "
f"({room.room_id}), token {fetch_task.token}."
)
response = await self.room_messages(
fetch_task.room_id,
fetch_task.token,
limit=self.pan_conf.indexing_batch_size,
)
except ClientConnectionError as e:
logger.debug("Error fetching room history: ", e)
await self.history_fetch_queue.put(fetch_task)
# The chunk was empty, we're at the start of the timeline.
if not response.chunk:
self.delete_fetcher_task(fetch_task)
continue
for event in response.chunk:
if not isinstance(
event,
(
RoomMessageText,
RoomMessageMedia,
RoomEncryptedMedia,
RoomTopicEvent,
RoomNameEvent,
Args:
body (Dict[Any, Any]): The dictionary of a Sync response.
Returns the json response with decrypted events.
"""
if "chunk" not in body:
return body
logger.info("Decrypting room messages")
for event in body["chunk"]:
if "type" not in event:
continue
if event["type"] != "m.room.encrypted":
logger.debug("Event is not encrypted: " "\n{}".format(pformat(event)))
continue
self.pan_decrypt_event(event, ignore_failures=ignore_failures)
return body