Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Table,
UniqueConstraint,
func,
select,
)
from sqlalchemy.dialects.postgresql import JSONB, aggregate_order_by
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.ext.orderinglist import ordering_list
from sqlalchemy.orm import column_property, relationship
from qcfractal.interface.models.records import DriverEnum, RecordStatusEnum
from qcfractal.storage_sockets.models.sql_base import Base, MsgpackExt
from qcfractal.storage_sockets.models.sql_models import KeywordsORM, KVStoreORM, MoleculeORM
class BaseResultORM(Base):
"""
Abstract Base class for ResultORMs and ProcedureORMs
"""
__tablename__ = "base_result"
# for SQL
result_type = Column(String) # for inheritance
task_id = Column(String) # TODO: not used, for back compatibility
# Base identification
id = Column(Integer, primary_key=True)
# ondelete="SET NULL": when manger is deleted, set this field to None
manager_name = Column(String, ForeignKey("queue_manager.name", ondelete="SET NULL"), nullable=True)
hash_index = Column(String) # TODO
Mixin class for common Dataset attributes.
"""
default_benchmark = Column(String)
default_keywords = Column(JSON)
default_driver = Column(String)
default_units = Column(String)
alias_keywords = Column(JSON)
default_program = Column(String)
history_keys = Column(JSON)
history = Column(JSON)
class ContributedValuesORM(Base):
"""One group of a contibuted values per dataset
Each dataset can have multiple rows in this table """
__tablename__ = 'contributed_values'
collection_id = Column(Integer, ForeignKey('collection.id', ondelete="cascade"), primary_key=True)
name = Column(String, nullable=False, primary_key=True)
values = Column(MsgpackExt, nullable=False)
index = Column(MsgpackExt, nullable=False)
values_structure = Column(JSON, nullable=False)
theory_level = Column(JSON, nullable=False)
units = Column(String, nullable=False)
theory_level_details = Column(JSON)
extra = Column(MsgpackExt)
__table_args__ = (
Index('ix_service_queue_status', "status"),
Index('ix_service_queue_priority', "priority"),
Index('ix_service_queue_modified_on', "modified_on"),
Index('ix_service_queue_status_tag_hash', "status", "tag"),
Index('ix_service_queue_hash_index', "hash_index"),
)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class UserORM(Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True)
username = Column(String, nullable=False, unique=True) # indexed and unique
password = Column(LargeBinary, nullable=False)
permissions = Column(JSON) # Column(ARRAY(String))
class QueueManagerLogORM(Base):
__tablename__ = "queue_manager_logs"
id = Column(Integer, primary_key=True)
manager_id = Column(Integer, ForeignKey('queue_manager.id'), nullable=False)
class ProcedureMixin:
"""
A procedure mixin to be used by specific procedure types
"""
program = Column(String(100), nullable=False)
keywords = Column(JSON)
qc_spec = Column(JSON)
# ================== Types of ProcedureORMs ================== #
class Trajectory(Base):
"""Association table for many to many"""
__tablename__ = "opt_result_association"
opt_id = Column(Integer, ForeignKey("optimization_procedure.id", ondelete="cascade"), primary_key=True)
result_id = Column(Integer, ForeignKey("result.id", ondelete="cascade"), primary_key=True)
position = Column(Integer, primary_key=True)
# Index('opt_id', 'result_id', unique=True)
# trajectory_obj = relationship(ResultORM, lazy="noload")
# # association table for many to many relation
# opt_result_association = Table('opt_result_association', Base.metadata,
# Column('opt_id', Integer, ForeignKey('optimization_procedure.id', ondelete="CASCADE")),
# Column('result_id', Integer, ForeignKey('result.id', ondelete="CASCADE")),
# {
# 'fields': ('molecule_hash', ),
# 'unique': False
# }, # should almost be unique
# {
# 'fields': ('molecular_formula', ),
# 'unique': False
# }
# ]
# }
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class KeywordsORM(Base):
"""
KeywordsORM are unique for a specific program and name
"""
__tablename__ = "keywords"
id = Column(Integer, primary_key=True)
hash_index = Column(String, nullable=False)
values = Column(JSON)
lowercase = Column(Boolean, default=True)
exact_floats = Column(Boolean, default=False)
comments = Column(String)
__table_args__ = (Index('ix_keywords_hash_index', "hash_index", unique=True), )
# meta = {'indexes': [{'fields': ('hash_index', ), 'unique': True}]}
# Carry-ons
provenance = Column(JSON)
__table_args__ = (
Index("ix_base_result_status", "status"),
Index("ix_base_result_type", "result_type"), # todo: needed?
)
__mapper_args__ = {"polymorphic_on": "result_type"}
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class WavefunctionStoreORM(Base):
__tablename__ = "wavefunction_store"
id = Column(Integer, primary_key=True)
# Sparsity is very cheap
basis = Column(MsgpackExt, nullable=False)
restricted = Column(Boolean, nullable=False)
# Core Hamiltonian
h_core_a = Column(MsgpackExt, nullable=True)
h_core_b = Column(MsgpackExt, nullable=True)
h_effective_a = Column(MsgpackExt, nullable=True)
h_effective_b = Column(MsgpackExt, nullable=True)
# SCF Results
# explicit sort will have to process all the data to identify the first n
# rows, but if there is an index matching the ORDER BY, the first n rows
# can be retrieved directly, without scanning the remainder at all.
__table_args__ = (
Index('ix_task_queue_created_on', "created_on"),
Index('ix_task_queue_keys', "status", "program", "procedure", "tag"),
Index('ix_task_queue_manager', "manager"),
Index('ix_task_queue_base_result_id', "base_result_id")
) # yapf: disable
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class ServiceQueueORM(Base):
__tablename__ = "service_queue"
id = Column(Integer, primary_key=True)
status = Column(Enum(TaskStatusEnum), default=TaskStatusEnum.waiting)
tag = Column(String, default=None)
hash_index = Column(String, nullable=False)
procedure_id = Column(Integer, ForeignKey("base_result.id"), unique=True)
procedure_obj = relationship("BaseResultORM", lazy='joined')
priority = Column(Integer, default=int(PriorityEnum.NORMAL))
created_on = Column(DateTime, default=datetime.datetime.utcnow)
modified_on = Column(DateTime, default=datetime.datetime.utcnow)
timestamp = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
completed = Column(Integer, nullable=True)
submitted = Column(Integer, nullable=True)
failures = Column(Integer, nullable=True)
total_worker_walltime = Column(Float, nullable=True)
total_task_walltime = Column(Float, nullable=True)
active_tasks = Column(Integer, nullable=True)
active_cores = Column(Integer, nullable=True)
active_memory = Column(Float, nullable=True)
__table_args__ = (Index('ix_queue_manager_log_timestamp', "timestamp"), )
class QueueManagerORM(Base):
"""
"""
__tablename__ = "queue_manager"
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
cluster = Column(String)
hostname = Column(String)
username = Column(String)
uuid = Column(String)
tag = Column(String)
# Count at current time
completed = Column(Integer, default=0)
Note: avoid circular import here by including the name of the class
in relations and foreign keys are a string (see TaskQueueORM.base_result_obj)
"""
import datetime
# from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import JSON, BigInteger, LargeBinary, Boolean, Column, DateTime, Enum, Float, ForeignKey, Index, Integer, String
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from qcfractal.interface.models.task_models import ManagerStatusEnum, PriorityEnum, TaskStatusEnum
from qcfractal.storage_sockets.models.sql_base import Base, MsgpackExt
class AccessLogORM(Base):
__tablename__ = 'access_log'
id = Column(Integer, primary_key=True)
access_date = Column(DateTime, default=datetime.datetime.utcnow)
access_method = Column(String, nullable=False)
access_type = Column(String, nullable=False)
# Note: no performance difference between varchar and text in postgres
# will mostly have a serialized JSON, but not stored as JSON for speed
extra_params = Column(String)
# user info
ip_address = Column(String)
user_agent = Column(String)
# extra computed geo data
kvstore_count = Column(Integer)
access_count = Column(Integer)
# States
result_states = Column(JSON)
# Database
db_total_size = Column(BigInteger)
db_table_size = Column(BigInteger)
db_index_size = Column(BigInteger)
db_table_information = Column(JSON)
__table_args__ = (Index('ix_server_stats_log_timestamp', "timestamp"), )
class VersionsORM(Base):
__tablename__ = 'versions'
id = Column(Integer, primary_key=True)
created_on = Column(DateTime, default=datetime.datetime.utcnow)
elemental_version = Column(String, nullable=False)
fractal_version = Column(String, nullable=False)
engine_version = Column(String)
class KVStoreORM(Base):
"""TODO: rename to """
__tablename__ = "kv_store"
id = Column(Integer, primary_key=True)
value = Column(JSON, nullable=False)