Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
__tablename__ = brewtils.models.Choices.schema
class Event(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Event.schema
class Principal(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Principal.schema
class Role(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Role.schema
class RefreshToken(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.RefreshToken.schema
class Job(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Job.schema
class RequestTemplate(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.RequestTemplate.schema
brewtils_model = brewtils.models.RequestTemplate
system = FieldBase(field_type="STRING", required=True)
system_version = FieldBase(field_type="STRING", required=True)
instance_name = FieldBase(field_type="STRING", required=True)
namespace = FieldBase(field_type="STRING", required=True)
command = FieldBase(field_type="STRING", required=True)
command_type = FieldBase(field_type="STRING")
parameters = FieldBase(field_type="DICT")
comment = FieldBase(field_type="STRING")
metadata = FieldBase(field_type="DICT")
output_type = FieldBase(field_type="STRING")
class Request(RequestTemplate):
brewtils_model = brewtils.models.Request
parent = FieldBase(field_type=brewtils.models.Request, is_ref=True, reverse_delete_rule="CASCADE")
children = FieldBase(field_type="PLACE_HOLDER")
output = FieldBase(field_type="STRING")
output_type = FieldBase(field_type="STRING", choices=BrewtilsCommand.OUTPUT_TYPES)
status = FieldBase(field_type="STRING", choices=BrewtilsRequest.STATUS_LIST, default="CREATED")
command_type = FieldBase(field_type="STRING", choices=BrewtilsCommand.COMMAND_TYPES)
created_at = FieldBase(field_type="DATE", default=datetime.datetime.utcnow, required=True)
updated_at = FieldBase(field_type="DATE", default=None, required=True)
error_class = FieldBase(field_type="STRING")
has_parent = FieldBase(field_type="BOOLEAN")
requester = FieldBase(field_type="STRING")
class System:
brewtils_model = brewtils.models.System
self.unique_with = unique_with
self.reverse_delete_rule = reverse_delete_rule
self.min_value = min_value
def clean_update(self):
pass
def clean(self):
pass
class Choices:
brewtils_model = brewtils.models.Choices
display = FieldBase(
required=True, choices=brewtils.models.Choices.DISPLAYS, field_type="STRING"
)
strict = FieldBase(required=True, default=True, field_type="BOOLEAN")
type = FieldBase(
required=True,
default="static",
choices=brewtils.models.Choices.TYPES,
field_type="STRING",
)
value = FieldBase(required=True, field_type="BLOB")
details = FieldBase(field_type="JSON")
def clean(self):
if self.type == "static" and not isinstance(self.value, (list, dict)):
raise ModelValidationError(
f"Can not save choices '{self}': type is 'static' but the value is "
else:
setattr(
sql_class,
"__table_args__",
(UniqueConstraint(*unique_args, name="unique_name")),
)
class System(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.System.schema
class Instance(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Instance.schema
class Command(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Command.schema
class Parameter(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Parameter.schema
class Request(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Request.schema
command_type = FieldBase(field_type="STRING", choices=BrewtilsCommand.COMMAND_TYPES)
created_at = FieldBase(field_type="DATE", default=datetime.datetime.utcnow, required=True)
updated_at = FieldBase(field_type="DATE", default=None, required=True)
error_class = FieldBase(field_type="STRING")
has_parent = FieldBase(field_type="BOOLEAN")
requester = FieldBase(field_type="STRING")
class System:
brewtils_model = brewtils.models.System
name = FieldBase(field_type="STRING", required=True, unique_with=['name','version', 'namespace'])
description = FieldBase(field_type="STRING")
version = FieldBase(field_type="STRING", required=True)
namespace = FieldBase(field_type="STRING", required=True)
max_instances = FieldBase(field_type="INT", default=1)
instances = FieldBase(field_type=brewtils.models.Instance, is_ref=True, is_list=True, reverse_delete_rule="PULL")
commands = FieldBase(field_type=brewtils.models.Command, is_ref=True, is_list=True, reverse_delete_rule="PULL")
icon_name = FieldBase(field_type="STRING")
display_name = FieldBase(field_type="STRING")
metadata = FieldBase(field_type="DICT")
local = FieldBase(field_type="BOOLEAN", default=True)
class Event:
brewtils_model = brewtils.models.Event
name = FieldBase(field_type="STRING", required=True)
namespace = FieldBase(field_type="STRING", required=True)
garden = FieldBase(field_type="STRING")
payload = FieldBase(field_type="DICT")
error = FieldBase(field_type="BOOLEAN")
metadata = FieldBase(field_type="DICT")
timestamp = FieldBase(field_type="DATE")
if self.output_type not in BrewtilsCommand.OUTPUT_TYPES:
raise ModelValidationError(
f"Can not save Command {self}: Invalid output type '{self.output_type}'"
)
if len(self.parameters) != len(
set(parameter.key for parameter in self.parameters)
):
raise ModelValidationError(
f"Can not save Command {self}: Contains Parameters with duplicate keys"
)
class StatusInfo:
brewtils_model = brewtils.models.StatusInfo
heartbeat = FieldBase(field_type="DATE")
class Instance:
brewtils_model = brewtils.models.Instance
name = FieldBase(field_type="STRING", required=True, default="default")
description = FieldBase(field_type="STRING")
status = FieldBase(field_type="STRING", default="INITIALIZING")
status_info = FieldBase(
field_type="StatusInfo", default=StatusInfo()
)
queue_type = FieldBase(field_type="STRING")
queue_info = FieldBase(field_type="JSON")
icon_name = FieldBase(field_type="STRING")
metadata = FieldBase(field_type="JSON")
__tablename__ = brewtils.models.DateTrigger.schema
class CronTrigger(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.CronTrigger.schema
class IntervalTrigger(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.IntervalTrigger.schema
class Garden(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Garden.schema
class StatusInfo(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = "StatusInfo"
schema_mapping = dict()
class_mapper(Choices, db_models.Choices)
class_mapper(Parameter, db_models.Parameter)
class_mapper(Command, db_models.Command)
class_mapper(StatusInfo, db_models.StatusInfo)
class_mapper(Instance, db_models.Instance)
class_mapper(RequestTemplate, db_models.RequestTemplate)
class_mapper(Request, db_models.Request)