Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.running = False
self.client = client
self.source = source
self.offset = offset
self.prefetch = prefetch
self.epoch = epoch
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.reconnect_backoff = 1
self.properties = None
self.redirected = None
self.error = None
partition = self.source.split('/')[-1]
self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition)
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
if epoch:
self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))}
self._handler = ReceiveClient(
source,
auth=self.client.get_auth(),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
self.running = False
self.client = client
self.source = source
self.offset = offset
self.prefetch = prefetch
self.epoch = epoch
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.reconnect_backoff = 1
self.redirected = None
self.error = None
self.properties = None
partition = self.source.split('/')[-1]
self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition)
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
if epoch:
self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))}
self._handler = ReceiveClientAsync(
source,
auth=self.client.get_auth(),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)
async def _reconnect_async(self): # pylint: disable=too-many-statements
# pylint: disable=protected-access
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
await self._handler.close_async()
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
self._handler = ReceiveClientAsync(
source,
auth=self.client.get_auth(**alt_creds),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)
try:
await self._handler.open_async()
def _create_handler(self, auth):
# type: (JWTTokenAuth) -> None
source = Source(self._source)
if self._offset is not None:
source.set_filter(
event_position_selector(self._offset, self._offset_inclusive)
)
desired_capabilities = None
if self._track_last_enqueued_event_properties:
symbol_array = [types.AMQPSymbol(RECEIVER_RUNTIME_METRIC_SYMBOL)]
desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array))
properties = create_properties(
self._client._config.user_agent # pylint:disable=protected-access
)
self._handler = ReceiveClient(
source,
auth=auth,
debug=self._client._config.network_tracing, # pylint:disable=protected-access
def _create_handler(self, auth: "JWTTokenAsync") -> None:
source = Source(self._source)
if self._offset is not None:
source.set_filter(
event_position_selector(self._offset, self._offset_inclusive)
)
desired_capabilities = None
if self._track_last_enqueued_event_properties:
symbol_array = [types.AMQPSymbol(RECEIVER_RUNTIME_METRIC_SYMBOL)]
desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array))
properties = create_properties(
self._client._config.user_agent # pylint:disable=protected-access
)
self._handler = ReceiveClientAsync(
source,
auth=auth,
debug=self._client._config.network_tracing, # pylint:disable=protected-access
async def _reconnect_async(self): # pylint: disable=too-many-statements
# pylint: disable=protected-access
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
await self._handler.close_async()
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
self._handler = ReceiveClientAsync(
source,
auth=self.client.get_auth(**alt_creds),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)
try:
await self._handler.open_async()
def _reconnect(self): # pylint: disable=too-many-statements
# pylint: disable=protected-access
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
self._handler.close()
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
self._handler = ReceiveClient(
source,
auth=self.client.get_auth(**alt_creds),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
try:
self._handler.open()
while not self._handler.client_ready():
def open(self):
"""
Open the Receiver using the supplied conneciton.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.
:param connection: The underlying client shared connection.
:type: connection: ~uamqp.connection.Connection
"""
# pylint: disable=protected-access
self.running = True
if self.redirected:
self.source = self.redirected.address
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
self._handler = ReceiveClient(
source,
auth=self.client.get_auth(**alt_creds),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())