Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
##
if args.debug:
log.startLogging(sys.stdout)
# we use an Autobahn utility to import the "best" available Twisted reactor
##
from autobahn.twisted.choosereactor import install_reactor
reactor = install_reactor()
if args.debug:
print("Running on reactor {}".format(reactor))
# create a WAMP application session factory
##
from autobahn.twisted.wamp import ApplicationSessionFactory
from autobahn.wamp import types
session_factory = ApplicationSessionFactory(types.ComponentConfig(realm=args.realm))
# dynamically load the application component ..
##
import importlib
c = args.component.split('.')
mod, klass = '.'.join(c[:-1]), c[-1]
app = importlib.import_module(mod)
# .. and set the session class on the factory
##
session_factory.session = getattr(app, klass)
if args.transport == "websocket":
# create a WAMP-over-WebSocket transport client factory
##
parser.add_argument("--port", type=int, default=8080,
help='TCP port to connect to.')
parser.add_argument("--transport", choices=['websocket', 'rawsocket-json', 'rawsocket-msgpack'], default="websocket",
help='WAMP transport type')
parser.add_argument("--url", type=str, default=None,
help='The WebSocket URL to connect to, e.g. ws://127.0.0.1:8080/ws.')
args = parser.parse_args()
# create a WAMP application session factory
##
from autobahn.asyncio.wamp import ApplicationSessionFactory
from autobahn.wamp import types
session_factory = ApplicationSessionFactory(types.ComponentConfig(realm=args.realm))
# dynamically load the application component ..
##
import importlib
c = args.component.split('.')
mod, klass = '.'.join(c[:-1]), c[-1]
app = importlib.import_module(mod)
# .. and set the session class on the factory
##
session_factory.session = getattr(app, klass)
if args.transport == "websocket":
# create a WAMP-over-WebSocket transport client factory
##
try:
yield proxy_transport.start(False)
except Exception as err:
_emsg = "Cannot listen on transport endpoint: {log_failure}"
self.log.error(_emsg, log_failure=err)
topic = '{}.on_proxy_transport_stopped'.format(self._uri_prefix)
self.publish(topic, event, options=types.PublishOptions(exclude=caller))
raise ApplicationError("crossbar.error.cannot_listen", _emsg.format(log_failure=err))
self.transports[transport_id] = proxy_transport
self.log.info('Proxy transport "{transport_id}" started and listening', transport_id=hlid(transport_id))
topic = '{}.on_proxy_transport_started'.format(self._uri_prefix)
self.publish(topic, event, options=types.PublishOptions(exclude=caller))
returnValue(proxy_transport.marshal())
def _on_backend_joined(session, details):
msg = message.Welcome(self._session_id,
ProxyFrontendSession.ROLES,
realm=details.realm,
authid=details.authid,
authrole=details.authrole,
authmethod=auth_result.authmethod,
authprovider=auth_result.authprovider,
authextra=dict(details.authextra or {}, **self._custom_authextra))
self._backend_session = session
self.transport.send(msg)
self.log.info('Proxy frontend session WELCOME: session_id={session_id}, session={session}, msg={msg}',
session_id=hlid(self._session_id), session=self, msg=msg)
session.on('join', _on_backend_joined)
elif isinstance(auth_result, types.Deny):
self.transport.send(message.Abort(auth_result.reason, message=auth_result.message))
else:
# should not arrive here: logic error
self.transport.send(message.Abort(ApplicationError.AUTHENTICATION_FAILED,
message='internal error: unexpected authenticator return type {}'.format(type(auth_result))))
else:
# should not arrive here: logic error
self.transport.send(message.Abort(ApplicationError.AUTHENTICATION_FAILED,
message='internal error: unexpected pending authentication'))
else:
# should not arrive here: client misbehaving!
self.transport.send(message.Abort(ApplicationError.AUTHENTICATION_FAILED,
message='no pending authentication'))
def on_authenticate_ok(principal):
self._salt = binascii.a2b_hex(principal['salt']) # error if no salt per-user
self._iterations = principal['iterations']
self._memory = principal['memory']
self._kdf = principal['kdf']
self._stored_key = binascii.a2b_hex(principal['stored-key'])
# do we actually need the server-key? can we compute it ourselves?
self._server_key = binascii.a2b_hex(principal['server-key'])
error = self._assign_principal(principal)
if error:
return error
# XXX TODO this needs to include (optional) channel-binding
extra = self._compute_challenge()
return types.Challenge(self._authmethod, extra)
def _assign_principal(self, principal):
if isinstance(principal, str):
# FIXME: more strict authrole checking
pass
elif isinstance(principal, dict):
# FIXME: check principal
pass
else:
error = ApplicationError.AUTHENTICATION_FAILED
message = 'got invalid return type "{}" from dynamic authenticator'.format(type(principal))
return types.Deny(error, message)
# backwards compatibility: dynamic authenticator
# was expected to return a role directly
if isinstance(principal, str):
principal = {
'role': principal
}
# allow to override realm request, redirect realm or set default realm
if 'realm' in principal:
self._realm = principal['realm']
# allow overriding effectively assigned authid
if 'authid' in principal:
self._authid = principal['authid']
def hello(self, realm, details):
# remember the realm the client requested to join (if any)
self._realm = realm
# remember the authid the client wants to identify as (if any)
self._authid = details.authid
# use static principal database from configuration
if self._config['type'] == 'static':
self._authprovider = 'static'
client_cert = self._session_details['transport'].get('client_cert', None)
if not client_cert:
return types.Deny(message='client did not send a TLS client certificate')
client_cert_sha1 = client_cert['sha1']
if client_cert_sha1 in self._cert_sha1_to_principal:
principal = self._cert_sha1_to_principal[client_cert_sha1]
error = self._assign_principal(principal)
if error:
return error
return self._accept()
else:
return types.Deny(message='no principal with authid "{}" exists'.format(client_cert_sha1))
raise Exception("not implemented")
txaio.add_callbacks(
d, None,
lambda e: self._swallow_error(e, "While notifying 'ready'")
)
def error(e):
reply = message.Abort(
u"wamp.error.cannot_authenticate", u"Error calling onWelcome handler"
)
self._transport.send(reply)
return self._swallow_error(e, "While firing onWelcome")
txaio.add_callbacks(d, success, error)
elif isinstance(msg, message.Abort):
# fire callback and close the transport
details = types.CloseDetails(msg.reason, msg.message)
d = txaio.as_future(self.onLeave, details)
def success(arg):
# XXX also: handle async
d = self.fire('leave', self, details)
def return_arg(_):
return arg
def _error(e):
return self._swallow_error(e, "While firing 'leave' event")
txaio.add_callbacks(d, return_arg, _error)
return d
def _error(e):
return self._swallow_error(e, "While firing onLeave")
def assign(res):
"""
.. and the incoming backend connection from the proxy frontend is authenticated as the principal
the frontend proxy has _already_ authenticated the actual client (before even connecting and
authenticating to the backend here)
"""
if isinstance(res, types.Deny):
return res
principal = {}
principal['realm'] = details.authextra['proxy_realm']
principal['authid'] = details.authextra['proxy_authid']
principal['role'] = details.authextra['proxy_authrole']
principal['extra'] = details.authextra.get('proxy_authextra', None)
self._assign_principal(principal)
self.log.debug(
'{klass}.hello(realm={realm}, details={details}) -> principal={principal}',
klass=self.__class__.__name__,
realm=realm,
details=details,
principal=principal,
)
self._session_to_subscriptions = {}
## map: session_id -> session
## needed for exclude/eligible
self._session_id_to_session = {}
## map: topic -> (subscription, set(session))
## needed for PUBLISH and SUBSCRIBE
self._topic_to_sessions = {}
## map: subscription -> (topic, set(session))
## needed for UNSUBSCRIBE
self._subscription_to_sessions = {}
## check all topic URIs with strict rules
self._option_uri_strict = self._options.uri_check == types.RouterOptions.URI_CHECK_STRICT
## supported features from "WAMP Advanced Profile"
self._role_features = role.RoleBrokerFeatures(publisher_identification = True, subscriber_blackwhite_listing = True, publisher_exclusion = True)