Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Extract entities
assert 0 == subcommands.entities(
out=out,
spacy_model_name=SPACY_MODELS.en_core_web_sm,
jobs=2,
src=file,
progress=False,
)
# Connect to DB file
engine = create_engine(f"sqlite:////{out}")
session = sessionmaker(bind=engine)()
# There should be one FileReport instance for this run
file_report = session.query(FileReport).one() # pylint: disable=no-member
# Path
assert file_report.path == str(file)
# Name
assert file_report.name == file.name
# Size
assert file_report.size == file.stat().st_size
# Checksums
assert file_report.md5 == "ac62843cff3232120bba30aa02a9fe86"
assert (
file_report.sha256
== "1afa29342c6bf5f774e03b3acad677febd5b0d59ec05eb22dee2481c8dfd6b88"
)
def test_file_report_with_empty_relationship():
file_report = FileReport()
assert file_report.processing_start_time is None
assert file_report.processing_end_time is None
assert file_report.processing_wall_time is None
continue
# Extract results
entities = res.pop("entities")
message_id = res.pop("message_id")
filepath = res.pop("filepath")
attachments = res.pop("attachments")
# Create new message instance
message = Message(pff_identifier=message_id, **res)
# Link message to a file_report
try:
file_report = (
session.query(FileReport).filter_by(path=filepath).one()
)
except Exception as exc:
file_report = None
logger.info(
f"Unable to link message id {message_id} to a file. Error: {exc}"
)
message.file_report = file_report
session.add(message)
# Record attachment info
session.add_all(
[
Attachment(
**attachment._asdict(),
message=message,
"""
# Default progress callback to no-op
update_progress = progress_callback or (lambda *_, **__: None)
with multiprocessing.Pool(processes=jobs, initializer=worker_init) as pool:
logger.debug(f"Starting pool with {pool._processes} processes")
try:
for values, exc in pool.imap(
get_file_info, ({"path": file} for file in files), chunksize=1
):
if not exc:
# Make a new FileReport object with the results
session.add(FileReport(**values))
else:
logger.info(
f"Unable to retrieve file information for {values['path']}, error: {exc}"
)
update_progress()
except KeyboardInterrupt:
logger.warning("Aborting")
# Politely terminate workers
pool.terminate()
pool.join()
return 1
Main entity extraction function that extracts named entities from a given iterable of files
Spawns multiple processes via multiprocessing.Pool
"""
# Confirm environment settings
for key, value in globals().items():
if key.startswith("RATOM_"):
logger.debug(f"{key}: {value}")
# Default progress callbacks to no-op
processing_update_progress = processing_progress_callback or (lambda *_, **__: None)
reporting_update_progress = reporting_progress_callback or (lambda *_, **__: None)
# Load the file_report table for local lookup
_file_reports = session.query(FileReport).all() # noqa: F841
# Start of multiprocessing
with multiprocessing.Pool(processes=jobs, initializer=worker_init) as pool:
logger.debug(f"Starting pool with {pool._processes} processes")
new_entities = []
msg_count = 0
try:
for msg_count, worker_output in enumerate(
pool.imap_unordered(
process_message,
get_messages(
files,
spacy_model=spacy_model,
status = scan_files(
files, session, jobs=jobs, progress_callback=file_bar.update
)
if status == 1:
logger.warning("Aborting")
return status
# Get messages and extract entities
with db_session(Session) as session:
# Record configuration info
store_configuration_in_db(session, str(src), jobs)
# Get total message count
msg_count = session.query(func.sum(FileReport.msg_count)).scalar()
# Get list of good files
good_files = [
Path(file.path)
for file in session.query(FileReport).filter(FileReport.error.is_(None))
]
with progress_bar_context(
total=msg_count, desc="Processing messages", unit="msg", color="green"
) as msg_bar:
status = generate_report(
files=good_files,
session=session,
include_message_contents=include_message_contents,
progress_callback=msg_bar.update,
logger.info(
f"Model {spacy_model_name} {spacy_model_version} will be used, but {latest_version} is available"
)
except Exception as exc:
logger.debug(exc, exc_info=True)
# Get messages and extract entities
with db_session(Session) as session:
# Record configuration info
store_configuration_in_db(
session, str(src), jobs, spacy_model_name, spacy_model_version
)
# Get total message count
msg_count = session.query(func.sum(FileReport.msg_count)).scalar()
# Get list of good files
good_files = [
Path(file.path)
for file in session.query(FileReport).filter(FileReport.error.is_(None))
]
with progress_bar_context(
total=msg_count, desc="Processing messages", unit="msg", color="blue"
) as processing_msg_bar, progress_bar_context(
total=msg_count,
desc="Generating message reports",
unit="msg",
color="green",
) as reporting_msg_bar:
# Extract results
message_id = msg_info.pop("message_id")
filepath = msg_info.pop("filepath")
attachments = msg_info.pop("attachments")
if include_message_contents:
msg_info["body"] = cleanup_message_body(
msg_info["body"], msg_info.pop("body_type")
)
# Create new message instance
message = Message(pff_identifier=message_id, **msg_info)
# Link message to a file_report
try:
file_report = session.query(FileReport).filter_by(path=filepath).one()
except Exception as exc:
file_report = None
logger.info(
f"Unable to link message id {message_id} to a file. Error: {exc}"
)
message.file_report = file_report
session.add(message)
# Record attachment info
session.add_all(
[
Attachment(
**attachment._asdict(),
message=message,
file_report=file_report,