Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
file_report=file_report,
)
)
# Commit if we reach a certain amount of new entities
if len(new_entities) >= RATOM_DB_COMMIT_BATCH_SIZE:
session.add_all(new_entities)
new_entities = []
try:
session.commit()
except Exception as exc:
logger.exception(exc)
session.rollback()
# Update progress every N messages
if not msg_count % RATOM_MSG_PROGRESS_STEP:
reporting_update_progress(RATOM_MSG_PROGRESS_STEP)
# Add remaining new entities
session.add_all(new_entities)
# Update progress with remaining message count
reporting_update_progress(msg_count % RATOM_MSG_PROGRESS_STEP)
except KeyboardInterrupt:
logger.warning("Cancelling running task")
logger.info("Partial results written to database")
logger.info("Terminating workers")
# Clean up process pool
pool.terminate()
pool.join()
new_entities = []
try:
session.commit()
except Exception as exc:
logger.exception(exc)
session.rollback()
# Update progress every N messages
if not msg_count % RATOM_MSG_PROGRESS_STEP:
reporting_update_progress(RATOM_MSG_PROGRESS_STEP)
# Add remaining new entities
session.add_all(new_entities)
# Update progress with remaining message count
reporting_update_progress(msg_count % RATOM_MSG_PROGRESS_STEP)
except KeyboardInterrupt:
logger.warning("Cancelling running task")
logger.info("Partial results written to database")
logger.info("Terminating workers")
# Clean up process pool
pool.terminate()
pool.join()
return 1
return 0
except Exception as exc:
# Log and move on to the next message
message_id = getattr(message, "identifier", None)
message_str = (
f"message {message_id}" if message_id else "a message"
)
logger.info(f"Skipping {message_str} from {file}")
logger.debug(exc, exc_info=True)
finally:
msg_count += 1
# Update progress every N messages
if not msg_count % RATOM_MSG_PROGRESS_STEP:
progress_callback(RATOM_MSG_PROGRESS_STEP)
except Exception as exc:
# Log and move on to the next file
logger.info(f"Skipping file {file}")
logger.debug(exc, exc_info=True)
# Update progress with remaining message count
progress_callback(msg_count % RATOM_MSG_PROGRESS_STEP)
yield res
except Exception as exc:
# Log and move on to the next message
message_id = getattr(message, "identifier", None)
message_str = (
f"message {message_id}" if message_id else "a message"
)
logger.info(f"Skipping {message_str} from {file}")
logger.debug(exc, exc_info=True)
finally:
msg_count += 1
# Update progress every N messages
if not msg_count % RATOM_MSG_PROGRESS_STEP:
progress_callback(RATOM_MSG_PROGRESS_STEP)
except Exception as exc:
# Log and move on to the next file
logger.info(f"Skipping file {file}")
logger.debug(exc, exc_info=True)
# Update progress with remaining message count
progress_callback(msg_count % RATOM_MSG_PROGRESS_STEP)