Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.epoch = epoch
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.reconnect_backoff = 1
self.properties = None
self.redirected = None
self.error = None
partition = self.source.split('/')[-1]
self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition)
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
if epoch:
self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))}
self._handler = ReceiveClient(
source,
auth=self.client.get_auth(),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
def _create_handler(self, auth):
# type: (JWTTokenAuth) -> None
source = Source(self._source)
if self._offset is not None:
source.set_filter(
event_position_selector(self._offset, self._offset_inclusive)
)
desired_capabilities = None
if self._track_last_enqueued_event_properties:
symbol_array = [types.AMQPSymbol(RECEIVER_RUNTIME_METRIC_SYMBOL)]
desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array))
properties = create_properties(
self._client._config.user_agent # pylint:disable=protected-access
)
self._handler = ReceiveClient(
source,
auth=auth,
debug=self._client._config.network_tracing, # pylint:disable=protected-access
prefetch=self._prefetch,
link_properties=self._link_properties,
timeout=self._timeout,
idle_timeout=self._idle_timeout,
error_policy=self._retry_policy,
keep_alive_interval=self._keep_alive,
client_name=self._name,
receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete,
auto_complete=False,
properties=properties,
desired_capabilities=desired_capabilities,
)
def _reconnect(self): # pylint: disable=too-many-statements
# pylint: disable=protected-access
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
self._handler.close()
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
self._handler = ReceiveClient(
source,
auth=self.client.get_auth(**alt_creds),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
try:
self._handler.open()
while not self._handler.client_ready():
time.sleep(0.05)
return True
except errors.TokenExpired as shutdown:
def _reconnect(self): # pylint: disable=too-many-statements
# pylint: disable=protected-access
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
self._handler.close()
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
self._handler = ReceiveClient(
source,
auth=self.client.get_auth(**alt_creds),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
try:
self._handler.open()
while not self._handler.client_ready():
time.sleep(0.05)
return True
except errors.TokenExpired as shutdown:
target, duration=token_duration
)
endpoint = endpoint + operation
device_filter_txt = None
if device_id:
device_filter_txt = " filtering on device: {},".format(device_id)
six.print_(
"Starting C2D feedback monitor,{} use ctrl-c to stop...".format(
device_filter_txt if device_filter_txt else ""
)
)
try:
client = uamqp.ReceiveClient(
"amqps://" + endpoint, client_name=_get_container_id(), debug=DEBUG
)
message_generator = client.receive_messages_iter()
for msg in message_generator:
match = handle_msg(msg)
if match:
logger.info("Requested message Id has been matched...")
msg.accept()
return match
except uamqp.errors.AMQPConnectionError:
logger.debug("AMQPS connection has expired...")
finally:
client.close()
def _build_handler(self):
auth = None if self.connection else authentication.SASTokenAuth.from_shared_access_key(**self.auth_config)
self._handler = ReceiveClient(
self._get_source(),
auth=auth,
debug=self.debug,
properties=self.properties,
error_policy=self.error_policy,
client_name=self.name,
on_attach=self._on_attach,
auto_complete=False,
encoding=self.encoding,
**self.handler_kwargs)