Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def subscribe(self, subscription_or_topic, qos=0, no_local=False, retain_as_published=False,
retain_handling_options=0, **kwargs):
if isinstance(subscription_or_topic, Subscription):
subscription = subscription_or_topic
elif isinstance(subscription_or_topic, (tuple, list)):
subscription = subscription_or_topic
elif isinstance(subscription_or_topic, str):
subscription = Subscription(subscription_or_topic, qos=qos, no_local=no_local,
retain_as_published=retain_as_published,
retain_handling_options=retain_handling_options)
else:
raise ValueError('Bad subscription: must be string or Subscription or list of Subscriptions')
return self._connection.subscribe(subscription, **kwargs)
def subscribe(self, subscription_or_topic, qos=0, no_local=False, retain_as_published=False,
retain_handling_options=0, **kwargs):
if isinstance(subscription_or_topic, Subscription):
subscription = subscription_or_topic
elif isinstance(subscription_or_topic, (tuple, list)):
subscription = subscription_or_topic
elif isinstance(subscription_or_topic, str):
subscription = Subscription(subscription_or_topic, qos=qos, no_local=no_local,
retain_as_published=retain_as_published,
retain_handling_options=retain_handling_options)
else:
raise ValueError('Bad subscription: must be string or Subscription or list of Subscriptions')
return self._connection.subscribe(subscription, **kwargs)