Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.logger.debug("Pausing scheduler - no more jobs will be run")
self.scheduler.pause()
self.plugin_manager.stop_all_plugins()
self.logger.debug("Stopping helper threads")
for helper_thread in reversed(self.helper_threads):
helper_thread.stop()
if self.scheduler.running:
self.logger.debug("Shutting down scheduler")
self.scheduler.shutdown(wait=False)
try:
self.events_manager.add_event(
brewtils.models.Event(name=Events.BARTENDER_STOPPED.name)
)
except RequestException:
self.logger.warning("Unable to publish shutdown notification")
self.logger.debug("Stopping entry points")
for entry_point in self.entry_points:
entry_point.stop(timeout=10)
self.logger.info("Stopping log reader")
self.log_reader.stop()
self.logger.info("Stopping local events manager")
self.events_manager.stop()
self.logger.info("Successfully shut down Beer-garden")
@publish_event(Events.INSTANCE_INITIALIZED)
def initialize(instance_id: str, runner_id: str = None) -> Instance:
"""Initializes an instance.
Args:
instance_id: The Instance ID
runner_id: The runner id to associate with this plugin, if any
Returns:
The updated Instance
"""
instance = db.query_unique(Instance, id=instance_id)
system = db.query_unique(System, instances__contains=instance)
logger.info(f"Initializing instance {system}[{instance}]")
queue_spec = queue.create(instance)
@publish_event(Events.REQUEST_CANCELED)
def cancel_request(request_id: Request) -> Request:
"""Mark a Request as CANCELED
Args:
request_id: The Request ID to cancel
Returns:
The modified Request
Raises:
ModelValidationError: The Request is already completed
"""
request = db.query_unique(Request, id=request_id)
request.status = "CANCELED"
# We note the status before the operations, because it is possible for the
# operations to update the status of the request. In that case, because the
# updates are coming in in a single request it is okay to update the output or
# error_class. Ideally this would be handled correctly when we better integrate
# PatchOperations with their models.
status_before = req.status
for op in operations:
if op.operation == "replace":
if op.path == "/status":
if op.value.upper() in BrewtilsRequest.STATUS_LIST:
req.status = op.value.upper()
if op.value.upper() == "IN_PROGRESS":
self.request.event.name = Events.REQUEST_STARTED.name
elif op.value.upper() in BrewtilsRequest.COMPLETED_STATUSES:
self.request.event.name = Events.REQUEST_COMPLETED.name
if request_id in brew_view.request_map:
wait_event = brew_view.request_map[request_id]
else:
error_msg = "Unsupported status value '%s'" % op.value
self.logger.warning(error_msg)
raise ModelValidationError(error_msg)
elif op.path == "/output":
if req.output == op.value:
continue
if status_before in Request.COMPLETED_STATUSES:
raise ModelValidationError(
def signal_handler(_: int, __: types.FrameType):
logger.debug("Stopping forward processing")
beer_garden.router.forward_processor.stop()
# This will almost definitely not be published because it would need to make it up
# to the main process and back down into this process. We just publish this here in
# case the main process is looking for it.
publish(Event(name=Events.ENTRY_STOPPED.name))
the_server.stop()
@publish_event(Events.SYSTEM_REMOVED)
def remove_system(system_id: str) -> None:
"""Remove a system
Args:
system_id: The System ID
Returns:
None
"""
system = db.query_unique(System, id=system_id)
# Attempt to stop the plugins
registered = beer_garden.application.plugin_registry.get_plugins_by_system(
system.name, system.version
)
def handle_event(event):
# Only care about downstream garden
if event.garden != config.get("garden.name"):
if event.name == Events.REQUEST_CREATED.name:
if db.query_unique(Request, id=event.payload.id) is None:
db.create(event.payload)
elif event.name in (Events.REQUEST_STARTED.name, Events.REQUEST_COMPLETED.name):
# When we send child requests to child gardens where the parent was on
# the local garden we remove the parent before sending them. Only setting
# the subset of fields that change "corrects" the parent
existing_request = db.query_unique(Request, id=event.payload.id)
for field in ("status", "output", "error_class"):
setattr(existing_request, field, getattr(event.payload, field))
db.update(existing_request)
def handle_event(event):
"""Handle events"""
if event.name in (
Events.SYSTEM_CREATED.name,
Events.SYSTEM_UPDATED.name,
Events.SYSTEM_REMOVED.name,
):
index = None
for i, system in enumerate(gardens[event.garden].systems):
if system.id == event.payload.id:
index = i
break
if index is not None:
gardens[event.garden].systems.pop(index)
if event.name in (Events.SYSTEM_CREATED.name, Events.SYSTEM_UPDATED.name):
gardens[event.garden].systems.append(event.payload)
# This is a little unintuitive. We want to let the garden module deal with handling
@publish_event(Events.INSTANCE_UPDATED)
def update_instance(instance_id: str, patch: PatchOperation) -> Instance:
"""Applies updates to an instance.
Args:
instance_id: The Instance ID
patch: Patch definition to apply
Returns:
The updated Instance
"""
instance = None
for op in patch:
operation = op.operation.lower()
if operation == "initialize":
@publish_event(Events.ALL_QUEUES_CLEARED)
def clear_all_queues():
"""Clears all queues that Bartender knows about.
:return: None
"""
systems = db.query(System)
for system in systems:
for instance in system.instances:
routing_key = get_routing_key(
system.namespace, system.name, system.version, instance.name
)
clear_queue(routing_key)