Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
properties[types.AMQPSymbol("platform")] = platform_str
final_user_agent = "{}/{} {} ({})".format(
USER_AGENT_PREFIX, VERSION, framework, platform_str
)
if user_agent:
final_user_agent = "{} {}".format(final_user_agent, user_agent)
if len(final_user_agent) > MAX_USER_AGENT_LENGTH:
raise ValueError(
"The user-agent string cannot be more than {} in length."
"Current user_agent string is: {} with length: {}".format(
MAX_USER_AGENT_LENGTH, final_user_agent, len(final_user_agent)
)
)
properties[types.AMQPSymbol("user-agent")] = final_user_agent
return properties
def _create_handler(self, auth):
# type: (JWTTokenAuth) -> None
source = Source(self._source)
if self._offset is not None:
source.set_filter(
event_position_selector(self._offset, self._offset_inclusive)
)
desired_capabilities = None
if self._track_last_enqueued_event_properties:
symbol_array = [types.AMQPSymbol(RECEIVER_RUNTIME_METRIC_SYMBOL)]
desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array))
properties = create_properties(
self._client._config.user_agent # pylint:disable=protected-access
)
self._handler = ReceiveClient(
source,
auth=auth,
debug=self._client._config.network_tracing, # pylint:disable=protected-access
prefetch=self._prefetch,
link_properties=self._link_properties,
timeout=self._timeout,
idle_timeout=self._idle_timeout,
error_policy=self._retry_policy,
keep_alive_interval=self._keep_alive,
client_name=self._name,
def create_properties(user_agent=None):
# type: (Optional[str]) -> Dict[types.AMQPSymbol, str]
"""
Format the properties with which to instantiate the connection.
This acts like a user agent over HTTP.
:rtype: dict
"""
properties = {}
properties[types.AMQPSymbol("product")] = USER_AGENT_PREFIX
properties[types.AMQPSymbol("version")] = VERSION
framework = "Python/{}.{}.{}".format(
sys.version_info[0], sys.version_info[1], sys.version_info[2]
)
properties[types.AMQPSymbol("framework")] = framework
platform_str = platform.platform()
properties[types.AMQPSymbol("platform")] = platform_str
final_user_agent = "{}/{} {} ({})".format(
USER_AGENT_PREFIX, VERSION, framework, platform_str
)
if user_agent:
final_user_agent = "{} {}".format(final_user_agent, user_agent)
if len(final_user_agent) > MAX_USER_AGENT_LENGTH:
raise ValueError(
"The user-agent string cannot be more than {} in length."
"Current user_agent string is: {} with length: {}".format(
MAX_USER_AGENT_LENGTH, final_user_agent, len(final_user_agent)
)
)
def schedule(self, schedule_time):
"""Add a specific enqueue time to the message.
:param schedule_time: The scheduled time to enqueue the message.
:type schedule_time: ~datetime.datetime
"""
if not self.properties.message_id:
self.properties.message_id = str(uuid.uuid4())
if not self.message.annotations:
self.message.annotations = {}
self.message.annotations[types.AMQPSymbol(self._x_OPT_SCHEDULED_ENQUEUE_TIME)] = schedule_time
self._retry_policy = errors.ErrorPolicy(
max_retries=self._client._config.max_retries, on_error=_error_handler # pylint:disable=protected-access
)
self._reconnect_backoff = 1
self._name = "EHProducer-{}".format(uuid.uuid4())
self._unsent_events = [] # type: List[Any]
self._error = None
if partition:
self._target += "/Partitions/" + partition
self._name += "-partition{}".format(partition)
self._handler = None # type: Optional[SendClientAsync]
self._outcome = None # type: Optional[constants.MessageSendResult]
self._condition = None # type: Optional[Exception]
self._lock = asyncio.Lock(loop=self._loop)
self._link_properties = {
types.AMQPSymbol(TIMEOUT_SYMBOL): types.AMQPLong(int(self._timeout * 1000))
}