Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, config, templates):
"""
:param config: Crossbar transport configuration.
:type config: dict
"""
options = config.get('options', {})
server = "Crossbar/{}".format(crossbar.__version__)
externalPort = options.get('external_port', None)
WebSocketServerFactory.__init__(self,
url=config.get('url', None),
server=server,
externalPort=externalPort)
# transport configuration
self._config = config
# Jinja2 templates for 404 etc
self._templates = templates
# set WebSocket options
set_websocket_options(self, options)
def sendServerStatus(self, redirectUrl=None, redirectAfter=0):
"""
Used to send out server status/version upon receiving a HTTP/GET without
upgrade to WebSocket header (and option serverStatus is True).
"""
try:
page = self.factory._templates.get_template('cb_ws_testee_status.html')
self.sendHtml(page.render(redirectUrl=redirectUrl,
redirectAfter=redirectAfter,
cbVersion=crossbar.__version__,
wsUri=self.factory.url))
except Exception as e:
self.log.warn("Error rendering WebSocket status page template: {e}", e=e)
# check and load the node configuration
#
try:
config_source, config_path = node.load_config(options.config)
except InvalidConfigException as e:
log.failure()
log.error("Invalid node configuration")
log.error("{e!s}", e=e)
sys.exit(1)
except:
raise
else:
config_source = node.CONFIG_SOURCE_TO_STR.get(config_source, None)
log.info('Node configuration loaded [config_source={config_source}, config_path={config_path}]',
config_source=hl(config_source, bold=True, color='green'), config_path=hlid(config_path))
# if vmprof global profiling is enabled via command line option, this will carry
# the file where vmprof writes its profile data
if _HAS_VMPROF:
_vm_prof = {
# need to put this into a dict, since FDs are ints, and python closures can't
# write to this otherwise
'outfd': None
}
# https://twistedmatrix.com/documents/current/api/twisted.internet.interfaces.IReactorCore.html
# Each "system event" in Twisted, such as 'startup', 'shutdown', and 'persist', has 3 phases:
# 'before', 'during', and 'after' (in that order, of course). These events will be fired
# internally by the Reactor.
def before_reactor_started():
# https://docs.python.org/2/library/os.html#os.EX_UNAVAILABLE
# https://www.freebsd.org/cgi/man.cgi?query=sysexits&sektion=3
_EXIT_ERROR = getattr(os, 'EX_UNAVAILABLE', 1)
# check if there is a Crossbar.io instance currently running from
# the Crossbar.io node directory at all
pid_data = _check_is_running(options.cbdir)
# optional current state to assert
_assert = options.__dict__['assert']
if pid_data is None:
if _assert == 'running':
log.error('Assert status RUNNING failed: status is {}'.format(hl('STOPPED', color='red', bold=True)))
sys.exit(_EXIT_ERROR)
elif _assert == 'stopped':
log.info('Assert status STOPPED succeeded: status is {}'.format(hl('STOPPED', color='green', bold=True)))
sys.exit(0)
else:
log.info('Status is {}'.format(hl('STOPPED', color='white', bold=True)))
sys.exit(0)
else:
if _assert == 'running':
log.info('Assert status RUNNING succeeded: status is {}'.format(hl('RUNNING', color='green', bold=True)))
sys.exit(0)
elif _assert == 'stopped':
log.error('Assert status STOPPED failed: status is {}'.format(hl('RUNNING', color='red', bold=True)))
sys.exit(_EXIT_ERROR)
else:
log.info('Status is {}'.format(hl('RUNNING', color='white', bold=True)))
sys.exit(0)
transport_factory.protocol = WorkerServerProtocol
transport_factory.setProtocolOptions(failByDrop=False)
# create a protocol instance and wire up to stdio
#
from twisted.python.runtime import platform as _platform
from twisted.internet import stdio
proto = transport_factory.buildProtocol(None)
if _platform.isWindows():
stdio.StandardIO(proto)
else:
stdio.StandardIO(proto, stdout=3)
# now start reactor loop
#
log.info(hl('Entering event reactor ...', color='green', bold=True))
reactor.run()
except Exception as e:
log.info("Unhandled exception: {e}", e=e)
if reactor.running:
reactor.addSystemEventTrigger('after', 'shutdown', os._exit, 1)
reactor.stop()
else:
sys.exit(1)
#
log.info(hl('Entering event reactor ...', color='green', bold=True))
term_print('CROSSBAR:REACTOR_ENTERED')
reactor.run()
# once the reactor has finally stopped, we get here, and at that point,
# exit_info['was_clean'] MUST have been set before - either to True or to False
# (otherwise we are missing a code path to handle in above)
# exit the program with exit code depending on whether the node has been cleanly shut down
if exit_info['was_clean'] is True:
term_print('CROSSBAR:EXIT_WITH_SUCCESS')
sys.exit(0)
elif exit_info['was_clean'] is False:
term_print('CROSSBAR:EXIT_WITH_ERROR')
sys.exit(1)
else:
term_print('CROSSBAR:EXIT_WITH_INTERNAL_ERROR')
sys.exit(1)
if hasattr(options, 'logdir'):
if options.logdir:
options.logdir = os.path.abspath(os.path.join(options.cbdir, options.logdir))
if not os.path.isdir(options.logdir):
try:
os.mkdir(options.logdir)
except Exception as e:
print("Could not create log directory: {}".format(e))
sys.exit(1)
else:
print("Auto-created log directory {}".format(options.logdir))
# Start the logger
#
_start_logging(options, reactor)
term_print('CROSSBAR:LOGGING_STARTED')
# run the subcommand selected
#
try:
options.func(options, reactor=reactor, personality=personality)
except SystemExit as e:
# SystemExit(0) is okay! Anything other than that is bad and should be
# re-raised.
if e.args[0] != 0:
raise
def _read_release_key():
release_pubkey_file = 'crossbar-{}.pub'.format('-'.join(crossbar.__version__.split('.')[0:2]))
release_pubkey_path = os.path.join(pkg_resources.resource_filename('crossbar', 'common/keys'), release_pubkey_file)
release_pubkey_hex = binascii.b2a_hex(cryptosign._read_signify_ed25519_pubkey(release_pubkey_path)).decode('ascii')
with open(release_pubkey_path) as f:
release_pubkey_base64 = f.read().splitlines()[1]
release_pubkey_qrcode = cryptosign._qrcode_from_signify_ed25519_pubkey(release_pubkey_path)
release_pubkey = {
'base64': release_pubkey_base64,
'hex': release_pubkey_hex,
'qrcode': release_pubkey_qrcode
}
return release_pubkey
def _render_request(self, request):
"""
Receives an HTTP/POST|PUT request, and then calls the Publisher/Caller
processor.
"""
# read HTTP/POST|PUT body
body = request.content.read()
args = {native_string(x): y[0] for x, y in request.args.items()}
headers = request.requestHeaders
# check content type + charset encoding
#
content_type_header = headers.getRawHeaders(b"content-type", [])
if len(content_type_header) > 0:
content_type_elements = [
x.strip().lower()
for x in content_type_header[0].split(b";")
]
else:
content_type_elements = []
if self.decode_as_json:
# if the client sent a content type, it MUST be one of _ALLOWED_CONTENT_TYPES
given=content_type_elements[0],
log_category="AR452"
)
encoding_parts = {}
if len(content_type_elements) > 1:
try:
for item in content_type_elements:
if b"=" not in item:
# Don't bother looking at things "like application/json"
continue
# Parsing things like:
# charset=utf-8
_ = native_string(item).split("=")
assert len(_) == 2
# We don't want duplicates
key = _[0].strip().lower()
assert key not in encoding_parts
encoding_parts[key] = _[1].strip().lower()
except:
return self._deny_request(request, 400, log_category="AR450")
charset_encoding = encoding_parts.get("charset", "utf-8")
if charset_encoding not in ["utf-8", 'utf8']:
return self._deny_request(
request, 400,
log_category="AR450")