How to use the uamqp.types.AMQPSymbol function in uamqp

To help you get started, we’ve selected a few uamqp examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github Azure / azure-sdk-for-python / sdk / eventhub / azure-eventhub / azure / eventhub / _utils.py View on Github external
properties[types.AMQPSymbol("platform")] = platform_str

    final_user_agent = "{}/{} {} ({})".format(
        USER_AGENT_PREFIX, VERSION, framework, platform_str
    )
    if user_agent:
        final_user_agent = "{} {}".format(final_user_agent, user_agent)

    if len(final_user_agent) > MAX_USER_AGENT_LENGTH:
        raise ValueError(
            "The user-agent string cannot be more than {} in length."
            "Current user_agent string is: {} with length: {}".format(
                MAX_USER_AGENT_LENGTH, final_user_agent, len(final_user_agent)
            )
        )
    properties[types.AMQPSymbol("user-agent")] = final_user_agent
    return properties
github Azure / azure-sdk-for-python / sdk / eventhub / azure-eventhub / azure / eventhub / _consumer.py View on Github external
def _create_handler(self, auth):
        # type: (JWTTokenAuth) -> None
        source = Source(self._source)
        if self._offset is not None:
            source.set_filter(
                event_position_selector(self._offset, self._offset_inclusive)
            )
        desired_capabilities = None
        if self._track_last_enqueued_event_properties:
            symbol_array = [types.AMQPSymbol(RECEIVER_RUNTIME_METRIC_SYMBOL)]
            desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array))

        properties = create_properties(
            self._client._config.user_agent  # pylint:disable=protected-access
        )
        self._handler = ReceiveClient(
            source,
            auth=auth,
            debug=self._client._config.network_tracing,  # pylint:disable=protected-access
            prefetch=self._prefetch,
            link_properties=self._link_properties,
            timeout=self._timeout,
            idle_timeout=self._idle_timeout,
            error_policy=self._retry_policy,
            keep_alive_interval=self._keep_alive,
            client_name=self._name,
github Azure / azure-sdk-for-python / sdk / eventhub / azure-eventhub / azure / eventhub / _utils.py View on Github external
def create_properties(user_agent=None):
    # type: (Optional[str]) -> Dict[types.AMQPSymbol, str]
    """
    Format the properties with which to instantiate the connection.
    This acts like a user agent over HTTP.

    :rtype: dict
    """
    properties = {}
    properties[types.AMQPSymbol("product")] = USER_AGENT_PREFIX
    properties[types.AMQPSymbol("version")] = VERSION
    framework = "Python/{}.{}.{}".format(
        sys.version_info[0], sys.version_info[1], sys.version_info[2]
    )
    properties[types.AMQPSymbol("framework")] = framework
    platform_str = platform.platform()
    properties[types.AMQPSymbol("platform")] = platform_str

    final_user_agent = "{}/{} {} ({})".format(
        USER_AGENT_PREFIX, VERSION, framework, platform_str
    )
    if user_agent:
        final_user_agent = "{} {}".format(final_user_agent, user_agent)

    if len(final_user_agent) > MAX_USER_AGENT_LENGTH:
        raise ValueError(
            "The user-agent string cannot be more than {} in length."
            "Current user_agent string is: {} with length: {}".format(
                MAX_USER_AGENT_LENGTH, final_user_agent, len(final_user_agent)
            )
        )
github Azure / azure-sdk-for-python / sdk / servicebus / azure-servicebus / azure / servicebus / common / message.py View on Github external
def schedule(self, schedule_time):
        """Add a specific enqueue time to the message.

        :param schedule_time: The scheduled time to enqueue the message.
        :type schedule_time: ~datetime.datetime
        """
        if not self.properties.message_id:
            self.properties.message_id = str(uuid.uuid4())
        if not self.message.annotations:
            self.message.annotations = {}
        self.message.annotations[types.AMQPSymbol(self._x_OPT_SCHEDULED_ENQUEUE_TIME)] = schedule_time
github Azure / azure-sdk-for-python / sdk / eventhub / azure-eventhub / azure / eventhub / aio / _producer_async.py View on Github external
self._retry_policy = errors.ErrorPolicy(
            max_retries=self._client._config.max_retries, on_error=_error_handler  # pylint:disable=protected-access
        )
        self._reconnect_backoff = 1
        self._name = "EHProducer-{}".format(uuid.uuid4())
        self._unsent_events = []  # type: List[Any]
        self._error = None
        if partition:
            self._target += "/Partitions/" + partition
            self._name += "-partition{}".format(partition)
        self._handler = None  # type: Optional[SendClientAsync]
        self._outcome = None  # type: Optional[constants.MessageSendResult]
        self._condition = None  # type: Optional[Exception]
        self._lock = asyncio.Lock(loop=self._loop)
        self._link_properties = {
            types.AMQPSymbol(TIMEOUT_SYMBOL): types.AMQPLong(int(self._timeout * 1000))
        }