Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_mbox_message_by_id_with_bad_id(sample_mbox_file):
with open_mail_archive(sample_mbox_file) as archive:
assert archive.get_message_by_id(1234) is None
result = generate_report(
params, enron_dataset_part027, isolated_cli_runner, expected
)
with db_session_from_cmd_out(result) as session:
# Verify total message count
assert session.query(Message).count() == 9297
# Get message contents from DB
msg = session.query(Message).filter_by(pff_identifier=msg_id).one()
headers, body = msg.headers, msg.body
if expected.with_messages:
# Access message directly and compare
archive_file = list(enron_dataset_part027.glob("*.pst"))[0]
with open_mail_archive(archive_file) as archive:
message = archive.get_message_by_id(msg_id)
assert cleanup_message_body(*archive.get_message_body(message)) == body
assert archive.get_message_headers(message) == headers
else:
assert headers is None
assert body is None
def test_open_mail_archive_with_unsupported_type():
with pytest.raises(FileTypeError):
_ = open_mail_archive(Path("bad_path"))
):
msg_id = 2097572
# Run entity extraction job with message content flag on
result = extract_entities(
params, enron_dataset_part001, isolated_cli_runner, expected
)
# Get message contents from DB
with db_session_from_cmd_out(result) as session:
msg = session.query(Message).filter_by(pff_identifier=msg_id).one()
headers, body = msg.headers, msg.body
# Access message directly and compare
archive_file = list(enron_dataset_part001.glob("*.pst"))[0]
with open_mail_archive(archive_file) as archive:
message = archive.get_message_by_id(msg_id)
assert cleanup_message_body(*archive.get_message_body(message)) == body
assert archive.get_message_headers(message) == headers
def test_get_mbox_message_by_id(sample_mbox_file):
with open_mail_archive(sample_mbox_file) as archive:
assert archive.message_count == 113
for index, message in enumerate(archive.messages(), start=1):
msg = archive.get_message_by_id(index)
assert extract_message_from_archive(archive, index)
assert archive.format_message(msg) == archive.format_message(message)
assert archive.get_message_headers(message)
files: Iterable[Path],
progress_callback: Callable,
with_content=True,
with_headers=False,
**kwargs,
) -> Generator[Dict, None, None]:
"""
Message generator to feed a pool of processes from a directory of PST files
"""
msg_count = 0
# Iterate over files
for file in files:
try:
with open_mail_archive(file) as archive:
# Iterate over messages
for message in archive.messages():
try:
# Keyword arguments for process_message()
res = {
"filepath": archive.filepath,
"message_id": getattr(message, "identifier", None),
"attachments": archive.get_attachment_metadata(message),
}
try:
res["date"] = archive.get_message_date(message)
except Exception as exc:
res["date"] = None
logger.debug(
md5 = hashlib.md5()
sha256 = hashlib.sha256()
# First we read the file one block at a time and update digests
with open(path_str, "rb") as f:
for block in iter(partial(f.read, 128), b""):
md5.update(block)
sha256.update(block)
md5, sha256 = md5.hexdigest(), sha256.hexdigest()
res.update({"size": size, "md5": md5, "sha256": sha256})
# Then we try to get a message count
try:
with open_mail_archive(path) as archive:
res["msg_count"] = archive.message_count
except Exception as exc:
res["error"] = str(exc)
except Exception as exc:
return res, str(exc)
return res, None