Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def decrypt_loop(client, body):
while True:
try:
logger.info("Trying to decrypt sync")
return decryption_method(body, ignore_failures=False)
except EncryptionError:
logger.info("Error decrypting sync, waiting for next pan " "sync")
await client.synced.wait(),
logger.info("Pan synced, retrying decryption.")
def decrypt_messages_body(self, body, ignore_failures=True):
# type: (Dict[Any, Any], bool) -> Dict[Any, Any]
"""Go through a messages response and decrypt megolm encrypted events.
Args:
body (Dict[Any, Any]): The dictionary of a Sync response.
Returns the json response with decrypted events.
"""
if "chunk" not in body:
return body
logger.info("Decrypting room messages")
for event in body["chunk"]:
if "type" not in event:
continue
if event["type"] != "m.room.encrypted":
logger.debug("Event is not encrypted: " "\n{}".format(pformat(event)))
continue
self.pan_decrypt_event(event, ignore_failures=ignore_failures)
return body
# type: (Dict[Any, Any], Optional[str], bool) -> (bool)
event = Event.parse_encrypted_event(event_dict)
if not isinstance(event, MegolmEvent):
logger.warn(
"Encrypted event is not a megolm event:"
"\n{}".format(pformat(event_dict))
)
return False
if not event.room_id:
event.room_id = room_id
try:
decrypted_event = self.decrypt_event(event)
logger.info("Decrypted event: {}".format(decrypted_event))
event_dict.update(decrypted_event.source)
event_dict["decrypted"] = True
event_dict["verified"] = decrypted_event.verified
return True
except EncryptionError as error:
logger.warn(error)
if ignore_failures:
event_dict.update(self.unable_to_decrypt)
else:
raise
return False
elif isinstance(message, ConfirmSasMessage):
await client.confirm_sas(message)
elif isinstance(message, CancelSasMessage):
await client.cancel_sas(message)
elif isinstance(message, ExportKeysMessage):
path = os.path.abspath(os.path.expanduser(message.file_path))
logger.info(f"Exporting keys to {path}")
try:
await client.export_keys(path, message.passphrase)
except OSError as e:
info_msg = (
f"Error exporting keys for {client.user_id} to" f" {path} {e}"
)
logger.info(info_msg)
await self.send_response(
message.message_id, client.user_id, "m.os_error", str(e)
)
else:
info_msg = (
f"Succesfully exported keys for {client.user_id} " f"to {path}"
)
logger.info(info_msg)
await self.send_response(
message.message_id, client.user_id, "m.ok", info_msg
)
elif isinstance(message, ImportKeysMessage):
path = os.path.abspath(os.path.expanduser(message.file_path))
logger.info(f"Importing keys from {path}")
)
super().__init__(homeserver, user_id, device_id, store_path, config, ssl, proxy)
index_dir = os.path.join(store_path, server_name, user_id)
try:
os.makedirs(index_dir)
except OSError:
pass
self.server_name = server_name
self.pan_store = pan_store
self.pan_conf = pan_conf
if INDEXING_ENABLED:
logger.info("Indexing enabled.")
from pantalaimon.index import IndexStore
self.index = IndexStore(self.user_id, index_dir)
else:
logger.info("Indexing disabled.")
self.index = None
self.task = None
self.queue = queue
# Those two events are mainly used for testing.
self.new_fetch_task = asyncio.Event()
self.fetch_loop_event = asyncio.Event()
self.room_members_fetched = defaultdict(bool)