Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#
log.info(hl('Entering event reactor ...', color='green', bold=True))
term_print('CROSSBAR:REACTOR_ENTERED')
reactor.run()
# once the reactor has finally stopped, we get here, and at that point,
# exit_info['was_clean'] MUST have been set before - either to True or to False
# (otherwise we are missing a code path to handle in above)
# exit the program with exit code depending on whether the node has been cleanly shut down
if exit_info['was_clean'] is True:
term_print('CROSSBAR:EXIT_WITH_SUCCESS')
sys.exit(0)
elif exit_info['was_clean'] is False:
term_print('CROSSBAR:EXIT_WITH_ERROR')
sys.exit(1)
else:
term_print('CROSSBAR:EXIT_WITH_INTERNAL_ERROR')
sys.exit(1)
if hasattr(options, 'logdir'):
if options.logdir:
options.logdir = os.path.abspath(os.path.join(options.cbdir, options.logdir))
if not os.path.isdir(options.logdir):
try:
os.mkdir(options.logdir)
except Exception as e:
print("Could not create log directory: {}".format(e))
sys.exit(1)
else:
print("Auto-created log directory {}".format(options.logdir))
# Start the logger
#
_start_logging(options, reactor)
term_print('CROSSBAR:LOGGING_STARTED')
# run the subcommand selected
#
try:
options.func(options, reactor=reactor, personality=personality)
except SystemExit as e:
# SystemExit(0) is okay! Anything other than that is bad and should be
# re-raised.
if e.args[0] != 0:
raise
def before_reactor_stopped():
term_print('CROSSBAR[{}]:REACTOR_STOPPING'.format(options.worker))
if _HAS_VMPROF and options.vmprof and _vm_prof['outfd']:
vmprof.disable()
term_print('CROSSBAR[{}]:VMPROF_DISABLED'.format(options.worker))
def after_reactor_started():
term_print('CROSSBAR[{}]:REACTOR_STARTED'.format(options.worker))
if _HAS_VMPROF and options.vmprof:
outfn = os.path.join(options.cbdir, '.vmprof-worker-{}-{}.dat'.format(options.worker, os.getpid()))
_vm_prof['outfd'] = os.open(outfn, os.O_RDWR | os.O_CREAT | os.O_TRUNC)
vmprof.enable(_vm_prof['outfd'], period=0.01)
term_print('CROSSBAR[{}]:VMPROF_ENABLED:{}'.format(options.worker, outfn))
def shutdown(self, restart=False, mode=None, details=None):
"""
Explicitly stop this node.
"""
if self._shutdown_requested:
# we're already shutting down .. ignore ..
return
self._shutdown_requested = True
self.log.info('Node shutdown requested (restart={}, mode={}, reactor.running={}) ..'.format(
restart, mode, self._reactor.running))
term_print('CROSSBAR:NODE_SHUTDOWN_REQUESTED')
try:
# shutdown any specific to the node controller
yield self._shutdown(restart, mode)
# node shutdown information
shutdown_info = {
'node_id': self._node._node_id,
'restart': restart,
'mode': mode,
'who': details.caller if details else None,
'when': utcnow(),
'was_clean': self._shutdown_was_clean,
}
if self._node._shutdown_complete:
# signal that this worker is ready for setup. the actual setup procedure
# will either be sequenced from the local node configuration file or remotely
# from a management service
yield self.publish(
'{}.on_worker_ready'.format(self._uri_prefix),
{
'type': self.WORKER_TYPE,
'id': self.config.extra.worker,
'pid': os.getpid(),
},
options=PublishOptions(acknowledge=True)
)
self.log.debug("Worker '{worker}' running as PID {pid}",
worker=self.config.extra.worker, pid=os.getpid())
term_print('CROSSBAR[{}]:WORKER_STARTED'.format(self.config.extra.worker))
self.log.info('Checking for node shutdown: worker_exit_success={worker_exit_success}, shutdown_requested={shutdown_requested}, node_shutdown_triggers={node_shutdown_triggers}', worker_exit_success=was_successful, shutdown_requested=self._shutdown_requested, node_shutdown_triggers=self._node._node_shutdown_triggers)
shutdown = self._shutdown_requested
# automatically shutdown node whenever a worker ended (successfully, or with error)
#
if NODE_SHUTDOWN_ON_WORKER_EXIT in self._node._node_shutdown_triggers:
self.log.info("Node worker ended, and trigger '{trigger}' is active: will shutdown node ..", trigger=NODE_SHUTDOWN_ON_WORKER_EXIT)
term_print('CROSSBAR:NODE_SHUTDOWN_ON_WORKER_EXIT')
shutdown = True
# automatically shutdown node when worker ended with error
#
elif not was_successful and NODE_SHUTDOWN_ON_WORKER_EXIT_WITH_ERROR in self._node._node_shutdown_triggers:
self.log.info("Node worker ended with error, and trigger '{trigger}' is active: will shutdown node ..", trigger=NODE_SHUTDOWN_ON_WORKER_EXIT_WITH_ERROR)
term_print('CROSSBAR:NODE_SHUTDOWN_ON_WORKER_EXIT_WITH_ERROR')
shutdown = True
# automatically shutdown node when no more workers are left
#
elif len(self._workers) == 0 and NODE_SHUTDOWN_ON_LAST_WORKER_EXIT in self._node._node_shutdown_triggers:
self.log.info("No more node workers running, and trigger '{trigger}' is active: will shutdown node ..", trigger=NODE_SHUTDOWN_ON_LAST_WORKER_EXIT)
term_print('CROSSBAR:NODE_SHUTDOWN_ON_LAST_WORKER_EXIT')
shutdown = True
# initiate shutdown (but only if we are not already shutting down)
#
if shutdown:
self.shutdown()
else:
self.log.info('Node will continue to run!')
def before_reactor_started():
term_print('CROSSBAR[{}]:REACTOR_STARTING'.format(options.worker))
def on_startup_success(_shutdown_complete):
term_print('CROSSBAR:NODE_STARTED')
shutdown_complete = _shutdown_complete['shutdown_complete']
# .. exits, signaling exit status _inside_ the result returned
def on_shutdown_success(shutdown_info):
exit_info['was_clean'] = shutdown_info['was_clean']
log.info('on_shutdown_success: was_clean={was_clean}', shutdown_info['was_clean'])
# should not arrive here:
def on_shutdown_error(err):
exit_info['was_clean'] = False
log.error("on_shutdown_error: {tb}", tb=failure_format_traceback(err))
shutdown_complete.addCallbacks(on_shutdown_success, on_shutdown_error)