Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def registration_match(self, procedure, details=None):
"""
Given a procedure URI, return the registration best matching the procedure.
This essentially models what a dealer does for dispatching an incoming call.
:param procedure: The procedure to match.
:type procedure: str
:returns: The best matching registration or ``None``.
:rtype: obj or None
"""
registration = self._router._dealer._registration_map.best_matching_observation(procedure)
if registration and not is_protected_uri(registration.uri, details):
return registration.id
else:
return None
def registration_list_callees(self, registration_id, details=None):
"""
Retrieve list of callees (WAMP session IDs) registered on (attached to) a registration.
:param registration_id: The ID of the registration to get callees for.
:type registration_id: int
:returns: A list of WAMP session IDs of callees currently attached to the registration.
:rtype: list
"""
registration = self._router._dealer._registration_map.get_observation_by_id(registration_id)
if registration:
if is_protected_uri(registration.uri, details):
raise ApplicationError(
ApplicationError.NOT_AUTHORIZED,
message='not authorized to list callees for protected URI "{}"'.format(registration.uri),
)
session_ids = []
for callee in registration.observers:
session_ids.append(callee._session_id)
return session_ids
else:
raise ApplicationError(
ApplicationError.NO_SUCH_REGISTRATION,
'no registration with ID {} exists on this dealer'.format(registration_id),
)
:returns: List of events.
:rtype: list
"""
self.log.debug('subscription_get_events({subscription_id}, {limit})', subscription_id=subscription_id, limit=limit)
if not self._router._broker._event_store:
raise ApplicationError(
'wamp.error.history_unavailable',
message='event history not available or enabled',
)
subscription = self._router._broker._subscription_map.get_observation_by_id(subscription_id)
if subscription:
if is_protected_uri(subscription.uri, details):
raise ApplicationError(
ApplicationError.NOT_AUTHORIZED,
message='not authorized to retrieve event history for protected URI "{}"'.format(subscription.uri),
)
events = self._router._broker._event_store.get_events(subscription_id, limit)
if events is None:
# a return value of None in above signals that event history really
# is not available/enabled (which is different from an empty history!)
raise ApplicationError(
'wamp.error.history_unavailable',
message='event history for the given subscription is not available or enabled',
)
else:
return events
else:
def subscription_count_subscribers(self, subscription_id, details=None):
"""
Retrieve number of subscribers subscribed on (attached to) a subscription.
:param subscription_id: The ID of the subscription to get the number subscribers for.
:type subscription_id: int
:returns: Number of subscribers currently attached to the subscription.
:rtype: int
"""
subscription = self._router._broker._subscription_map.get_observation_by_id(subscription_id)
if subscription:
if is_protected_uri(subscription.uri, details):
raise ApplicationError(
ApplicationError.NOT_AUTHORIZED,
message='not authorized to count subscribers for protected URI "{}"'.format(subscription.uri),
)
return len(subscription.observers)
else:
raise ApplicationError(
ApplicationError.NO_SUCH_SUBSCRIPTION,
'no subscription with ID {} exists on this broker'.format(subscription_id),
)
_regs = s2r[session]
regs = {
'exact': [reg.id for reg in _regs if reg.match == 'exact'],
'prefix': [reg.id for reg in _regs if reg.match == 'prefix'],
'wildcard': [reg.id for reg in _regs if reg.match == 'wildcard'],
}
return regs
else:
registration_map = self._router._dealer._registration_map
registrations_exact = []
for registration in registration_map._observations_exact.values():
if not is_protected_uri(registration.uri, details):
registrations_exact.append(registration.id)
registrations_prefix = []
for registration in registration_map._observations_prefix.values():
if not is_protected_uri(registration.uri, details):
registrations_prefix.append(registration.id)
registrations_wildcard = []
for registration in registration_map._observations_wildcard.values():
if not is_protected_uri(registration.uri, details):
registrations_wildcard.append(registration.id)
regs = {
'exact': registrations_exact,
'prefix': registrations_prefix,
'wildcard': registrations_wildcard,
Given a topic URI, returns all subscriptions matching the topic.
This essentially models what a broker does for dispatching an incoming publication.
:param topic: The topic to match.
:type topic: str
:returns: All matching subscriptions or ``None``.
:rtype: obj or None
"""
subscriptions = self._router._broker._subscription_map.match_observations(topic)
if subscriptions:
subscription_ids = []
for subscription in subscriptions:
if not is_protected_uri(subscription.uri, details):
subscription_ids.append(subscription.id)
if subscription_ids:
return subscription_ids
else:
return None
else:
return None
This essentially models what a dealer does when registering for a procedure.
:param procedure: The procedure to lookup the registration for.
:type procedure: str
:param options: Same options as when registering a procedure.
:type options: dict or None
:returns: The ID of the registration managing the procedure or ``None``.
:rtype: int or None
"""
options = options or {}
match = options.get('match', 'exact')
registration = self._router._dealer._registration_map.get_observation(procedure, match)
if registration and not is_protected_uri(registration.uri, details):
return registration.id
else:
return None
This essentially models what a broker does when subscribing for a topic.
:param topic: The topic to lookup the subscription for.
:type topic: str
:param options: Same options as when subscribing to a topic.
:type options: dict or None
:returns: The ID of the subscription managing the topic or ``None``.
:rtype: int or None
"""
options = options or {}
match = options.get('match', 'exact')
subscription = self._router._broker._subscription_map.get_observation(topic, match)
if subscription and not is_protected_uri(subscription.uri, details):
return subscription.id
else:
return None
def registration_count_callees(self, registration_id, details=None):
"""
Retrieve number of callees registered on (attached to) a registration.
:param registration_id: The ID of the registration to get the number of callees for.
:type registration_id: int
:returns: Number of callees currently attached to the registration.
:rtype: int
"""
registration = self._router._dealer._registration_map.get_observation_by_id(registration_id)
if registration:
if is_protected_uri(registration.uri, details):
raise ApplicationError(
ApplicationError.NOT_AUTHORIZED,
message='not authorized to count callees for protected URI "{}"'.format(registration.uri),
)
return len(registration.observers)
else:
raise ApplicationError(
ApplicationError.NO_SUCH_REGISTRATION,
'no registration with ID {} exists on this dealer'.format(registration_id),
)
:param subscription_id: The ID of the subscription to remove the subscriber from.
:type subscription_id: int
:param subscriber_id: The WAMP session ID of the subscriber to remove.
:type subscriber_id: int
"""
subscriber = self._router._session_id_to_session.get(subscriber_id, None)
if not subscriber:
raise ApplicationError(
ApplicationError.NO_SUCH_SESSION,
message='no session with ID {} exists on this router'.format(subscriber_id),
)
subscription = self._router._broker._subscription_map.get_observation_by_id(subscription_id)
if subscription:
if is_protected_uri(subscription.uri, details):
raise ApplicationError(
ApplicationError.NOT_AUTHORIZED,
message='not authorized to remove subscriber for protected URI "{}"'.format(subscription.uri),
)
if subscriber not in subscription.observers:
raise ApplicationError(
ApplicationError.NO_SUCH_SUBSCRIPTION,
'session {} is not subscribed on subscription {} on this broker'.format(subscriber_id, subscription_id),
)
self._router._broker.removeSubscriber(subscription, subscriber, reason=reason)
else:
raise ApplicationError(
ApplicationError.NO_SUCH_SUBSCRIPTION,
'no subscription with ID {} exists on this broker'.format(subscription_id),