Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# check and load the node configuration
#
try:
config_source, config_path = node.load_config(options.config)
except InvalidConfigException as e:
log.failure()
log.error("Invalid node configuration")
log.error("{e!s}", e=e)
sys.exit(1)
except:
raise
else:
config_source = node.CONFIG_SOURCE_TO_STR.get(config_source, None)
log.info('Node configuration loaded [config_source={config_source}, config_path={config_path}]',
config_source=hl(config_source, bold=True, color='green'), config_path=hlid(config_path))
# if vmprof global profiling is enabled via command line option, this will carry
# the file where vmprof writes its profile data
if _HAS_VMPROF:
_vm_prof = {
# need to put this into a dict, since FDs are ints, and python closures can't
# write to this otherwise
'outfd': None
}
# https://twistedmatrix.com/documents/current/api/twisted.internet.interfaces.IReactorCore.html
# Each "system event" in Twisted, such as 'startup', 'shutdown', and 'persist', has 3 phases:
# 'before', 'during', and 'after' (in that order, of course). These events will be fired
# internally by the Reactor.
def before_reactor_started():
# https://docs.python.org/2/library/os.html#os.EX_UNAVAILABLE
# https://www.freebsd.org/cgi/man.cgi?query=sysexits&sektion=3
_EXIT_ERROR = getattr(os, 'EX_UNAVAILABLE', 1)
# check if there is a Crossbar.io instance currently running from
# the Crossbar.io node directory at all
pid_data = _check_is_running(options.cbdir)
# optional current state to assert
_assert = options.__dict__['assert']
if pid_data is None:
if _assert == 'running':
log.error('Assert status RUNNING failed: status is {}'.format(hl('STOPPED', color='red', bold=True)))
sys.exit(_EXIT_ERROR)
elif _assert == 'stopped':
log.info('Assert status STOPPED succeeded: status is {}'.format(hl('STOPPED', color='green', bold=True)))
sys.exit(0)
else:
log.info('Status is {}'.format(hl('STOPPED', color='white', bold=True)))
sys.exit(0)
else:
if _assert == 'running':
log.info('Assert status RUNNING succeeded: status is {}'.format(hl('RUNNING', color='green', bold=True)))
sys.exit(0)
elif _assert == 'stopped':
log.error('Assert status STOPPED failed: status is {}'.format(hl('RUNNING', color='red', bold=True)))
sys.exit(_EXIT_ERROR)
else:
log.info('Status is {}'.format(hl('RUNNING', color='white', bold=True)))
sys.exit(0)
transport_factory.protocol = WorkerServerProtocol
transport_factory.setProtocolOptions(failByDrop=False)
# create a protocol instance and wire up to stdio
#
from twisted.python.runtime import platform as _platform
from twisted.internet import stdio
proto = transport_factory.buildProtocol(None)
if _platform.isWindows():
stdio.StandardIO(proto)
else:
stdio.StandardIO(proto, stdout=3)
# now start reactor loop
#
log.info(hl('Entering event reactor ...', color='green', bold=True))
reactor.run()
except Exception as e:
log.info("Unhandled exception: {e}", e=e)
if reactor.running:
reactor.addSystemEventTrigger('after', 'shutdown', os._exit, 1)
reactor.stop()
else:
sys.exit(1)
self._service_session.publish('wamp.session.on_stats', session_info_short, stats)
self._transport._serializer.RATED_MESSAGE_SIZE = rated_message_size
self._transport._serializer.set_stats_autoreset(trigger_after_rated_messages,
trigger_after_duration,
on_stats)
self._stats_enabled = True
self.log.info('WAMP session statistics {mode} (rated_message_size={rated_message_size}, trigger_after_rated_messages={trigger_after_rated_messages}, trigger_after_duration={trigger_after_duration}, trigger_on_join={trigger_on_join}, trigger_on_leave={trigger_on_leave})',
trigger_after_rated_messages=trigger_after_rated_messages,
trigger_after_duration=trigger_after_duration,
trigger_on_join=trigger_on_join,
trigger_on_leave=trigger_on_leave,
rated_message_size=rated_message_size,
mode=hl('ENABLED'))
else:
self._stats_enabled = False
self.log.debug('WAMP session statistics {mode}', mode=hl('DISABLED'))
parallel_worker_start = controller.get('options', {}).get('enable_parallel_worker_start', False)
self.log.info('{bootmsg} {method}',
bootmsg=hl('Booting node from local configuration [parallel_worker_start={}] ..'.format(parallel_worker_start),
color='green', bold=True),
method=hltype(Node.boot_from_config))
# start Manhole in node controller
if 'manhole' in controller:
yield self._controller.call('crossbar.start_manhole', controller['manhole'], options=CallOptions())
self.log.debug("controller: manhole started")
# startup all workers
workers = config.get('workers', [])
if len(workers):
self.log.info(hl('Will start {} worker{} ..'.format(len(workers), 's' if len(workers) > 1 else ''), color='green', bold=True))
else:
self.log.info(hl('No workers configured, nothing to do', color='green', bold=True))
dl = []
for worker in workers:
# worker ID
if 'id' in worker:
worker_id = worker['id']
else:
worker_id = 'worker{:03d}'.format(self._worker_no)
worker['id'] = worker_id
self._worker_no += 1
# worker type: either a native worker ('router', 'container', ..), or a guest worker ('guest')
worker_type = worker['type']
node_options = personality.NodeOptions(debug_lifecycle=options.debug_lifecycle,
debug_programflow=options.debug_programflow,
enable_vmprof=enable_vmprof)
node = personality.Node(personality,
options.cbdir,
reactor=reactor,
options=node_options)
# print the banner, personality and node directory
#
for line in personality.BANNER.splitlines():
log.info(hl(line, color='yellow', bold=True))
print()
log.info('{note} {func}', note=hl('Booting {} node ..'.format(personality.NAME), color='red', bold=True),
func=hltype(_run_command_start))
log.debug('Running on realm="{realm}" from cbdir="{cbdir}"', realm=hlid(node.realm), cbdir=hlid(options.cbdir))
# possibly generate new node key
#
node.load_keys(options.cbdir)
# check and load the node configuration
#
try:
config_source, config_path = node.load_config(options.config)
except InvalidConfigException as e:
log.failure()
log.error("Invalid node configuration")
log.error("{e!s}", e=e)
def _print_usage(prog, personality):
print(hl(personality.BANNER, color='yellow', bold=True))
print('Type "{} --help" to get help, or "{} --help" to get help on a specific command.'.format(prog, prog))
print('Type "{} legal" to read legal notices, terms of use and license and privacy information.'.format(prog))
print('Type "{} version" to print detailed version information.'.format(prog))
trigger_after_duration,
on_stats)
self._stats_enabled = True
self.log.info('WAMP session statistics {mode} (rated_message_size={rated_message_size}, trigger_after_rated_messages={trigger_after_rated_messages}, trigger_after_duration={trigger_after_duration}, trigger_on_join={trigger_on_join}, trigger_on_leave={trigger_on_leave})',
trigger_after_rated_messages=trigger_after_rated_messages,
trigger_after_duration=trigger_after_duration,
trigger_on_join=trigger_on_join,
trigger_on_leave=trigger_on_leave,
rated_message_size=rated_message_size,
mode=hl('ENABLED'))
else:
self._stats_enabled = False
self.log.debug('WAMP session statistics {mode}', mode=hl('DISABLED'))