Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_run_function_with_interrupt(
directory_of_mbox_files, function, patched, kwargs
):
tmp_filename = "test.sqlite3"
with TemporaryDirectory() as tmpdir:
destination = Path(tmpdir) / tmp_filename
Session = db_init(destination)
with db_session(Session) as session, patch(
patched, new=MagicMock(side_effect=KeyboardInterrupt),
):
status = function(
files=get_set_of_files(directory_of_mbox_files),
session=session,
**kwargs,
)
assert status == 1
def test_extract_entities_from_mbox_files(directory_of_mbox_files):
tmp_filename = "test.sqlite3"
with TemporaryDirectory() as tmpdir:
destination = Path(tmpdir) / tmp_filename
Session = db_init(destination)
with db_session(Session) as session:
status = extract_entities(
files=get_set_of_files(directory_of_mbox_files),
session=session,
spacy_model=load_spacy_model(SPACY_MODELS.en_core_web_sm)[0],
jobs=2,
)
assert status == 0
def test_scan_files_with_interrupt(directory_of_mbox_files):
tmp_filename = "test.sqlite3"
with TemporaryDirectory() as tmpdir:
destination = Path(tmpdir) / tmp_filename
Session = db_init(destination)
with db_session(Session) as session, patch(
"libratom.lib.report.FileReport",
new=MagicMock(side_effect=KeyboardInterrupt),
):
assert (
scan_files(
files=get_set_of_files(directory_of_mbox_files),
session=session,
jobs=2,
)
== 1
)
Session = db_init(out)
# Get set of PST files from the source
files = get_set_of_files(src)
if not files:
logger.info(f"No PST file found in {src}")
# Compute and store file information
with progress_bar_context(
total=len(files),
desc="Initial file scan",
unit="files",
color="green",
leave=False,
) as file_bar, db_session(Session) as session:
status = scan_files(
files, session, jobs=jobs, progress_callback=file_bar.update
)
if status == 1:
logger.warning("Aborting")
return status
# Get messages and extract entities
with db_session(Session) as session:
# Record configuration info
store_configuration_in_db(session, str(src), jobs)
# Get total message count
msg_count = session.query(func.sum(FileReport.msg_count)).scalar()
Session = db_init(out)
# Get set of PST files from the source
files = get_set_of_files(src)
if not files:
logger.info(f"No PST file found in {src}")
# Compute and store file information
with progress_bar_context(
total=len(files),
desc="Initial file scan",
unit="files",
color="green",
leave=False,
) as file_bar, db_session(Session) as session:
status = scan_files(
files, session, jobs=jobs, progress_callback=file_bar.update
)
if status == 1:
logger.warning("Aborting")
return status
# Get spaCy model
logger.info(f"Loading spaCy model: {spacy_model_name}")
spacy_model, spacy_model_version = load_spacy_model(spacy_model_name)
if not spacy_model:
return 1
# Try to see if we're using a stale model version
try:
total=len(files),
desc="Initial file scan",
unit="files",
color="green",
leave=False,
) as file_bar, db_session(Session) as session:
status = scan_files(
files, session, jobs=jobs, progress_callback=file_bar.update
)
if status == 1:
logger.warning("Aborting")
return status
# Get messages and extract entities
with db_session(Session) as session:
# Record configuration info
store_configuration_in_db(session, str(src), jobs)
# Get total message count
msg_count = session.query(func.sum(FileReport.msg_count)).scalar()
# Get list of good files
good_files = [
Path(file.path)
for file in session.query(FileReport).filter(FileReport.error.is_(None))
]
with progress_bar_context(
total=msg_count, desc="Processing messages", unit="msg", color="green"
) as msg_bar:
spacy_model, spacy_model_version = load_spacy_model(spacy_model_name)
if not spacy_model:
return 1
# Try to see if we're using a stale model version
try:
latest_version = get_spacy_models()[spacy_model_name][0]
if parse(latest_version) > parse(spacy_model_version):
logger.info(
f"Model {spacy_model_name} {spacy_model_version} will be used, but {latest_version} is available"
)
except Exception as exc:
logger.debug(exc, exc_info=True)
# Get messages and extract entities
with db_session(Session) as session:
# Record configuration info
store_configuration_in_db(
session, str(src), jobs, spacy_model_name, spacy_model_version
)
# Get total message count
msg_count = session.query(func.sum(FileReport.msg_count)).scalar()
# Get list of good files
good_files = [
Path(file.path)
for file in session.query(FileReport).filter(FileReport.error.is_(None))
]
with progress_bar_context(