Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self._logger.info("Subscription '{0}'. Pulling task threw the following exception : {1}".format(
self._options.subscription_name, ex))
if self.should_try_to_reconnect(ex):
time.sleep(self._options.time_to_wait_before_connection_retry.seconds)
if self.on_subscription_connection_retry:
self.on_subscription_connection_retry(ex)
else:
if sys.gettrace():
self._logger.info(
"Connection to subscription '{0}'. have been shut down because of an error : {1}".format(
self._options.subscription_name, ex))
raise
except Exception as e:
if e == ex:
raise
raise AggregateException(e, ex)
def raise_exceptions(self, error_list):
raise exceptions.AggregateException("Failed to retrieve database topology from all known nodes", error_list)
def should_try_to_reconnect(self, exception):
if isinstance(exception, SubscriptionDoesNotBelongToNodeException):
self.assert_last_connection_failure()
request_executor = self._store.get_request_executor(self._database_name)
if not exception.appropriate_node:
return True
node_to_redirect_to = Utils.first_or_default(request_executor.topology_nodes,
lambda node: node.cluster_tag == exception.appropriate_node,
None)
if not node_to_redirect_to:
raise AggregateException(exception, InvalidOperationException(
"Could not redirect to {0}, "
"because it was not found in local topology, even after retrying".format(
exception.appropriate_node)))
self._redirect_node = node_to_redirect_to
return True
if isinstance(exception, SubscriptionChangeVectorUpdateConcurrencyException):
return True
if isinstance(exception, (
SubscriptionInUseException, SubscriptionDoesNotExistException, SubscriptionClosedException,
SubscriptionInvalidStateException, DatabaseDoesNotExistException, AuthorizationException,
AllTopologyNodesDownException, SubscriberErrorException, ValueError, NotImplementedError,
AttributeError, ijson.backend.UnexpectedSymbol)):
self.close()
return False
self.assert_last_connection_failure()
def raise_exceptions(self, error_list):
raise exceptions.AggregateException("Failed to retrieve cluster topology from all known nodes", error_list)