Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param bytes|str|unicode body: Message payload
:param str routing_key: Message routing key
:param str exchange: The exchange to publish the message to
:param str virtual_host: Virtual host name
:param dict properties: Message properties
:param str payload_encoding: Payload encoding.
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: dict
"""
exchange = quote(exchange, '')
properties = properties or {}
body = json.dumps(
{
'routing_key': routing_key,
'payload': body,
'payload_encoding': payload_encoding,
'properties': properties,
'vhost': virtual_host
}
)
virtual_host = quote(virtual_host, '')
return self.http_client.post(API_BASIC_PUBLISH %
(
virtual_host,
exchange),
payload=body)
:param str queue: Queue name
:param str virtual_host: Virtual host name
:param bool passive: Do not create
:param bool durable: Durable queue
:param bool auto_delete: Automatically delete when not in use
:param dict|None arguments: Queue key/value arguments
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: dict
"""
if passive:
return self.get(queue, virtual_host=virtual_host)
queue_payload = json.dumps(
{
'durable': durable,
'auto_delete': auto_delete,
'arguments': arguments or {},
'vhost': virtual_host
}
)
return self.http_client.put(
API_QUEUE % (
quote(virtual_host, ''),
queue
),
payload=queue_payload)
dictionary before delivery.
:param int count: How many messages should we try to fetch.
:param int truncate: The maximum length in bytes, beyond that the
server will truncate the message.
:param str encoding: Message encoding.
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: list
"""
ackmode = 'ack_requeue_false'
if requeue:
ackmode = 'ack_requeue_true'
get_messages = json.dumps(
{
'count': count,
'requeue': requeue,
'ackmode': ackmode,
'encoding': encoding,
'truncate': truncate,
'vhost': virtual_host
}
)
virtual_host = quote(virtual_host, '')
response = self.http_client.post(API_BASIC_GET_MESSAGE %
(
virtual_host,
queue
),
payload=get_messages)
def bind(self, destination='', source='', routing_key='', virtual_host='/',
arguments=None):
"""Bind an Exchange.
:param str source: Source Exchange name
:param str destination: Destination Exchange name
:param str routing_key: The routing key to use
:param str virtual_host: Virtual host name
:param dict|None arguments: Bind key/value arguments
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: None
"""
bind_payload = json.dumps({
'destination': destination,
'destination_type': 'e',
'routing_key': routing_key,
'source': source,
'arguments': arguments or {},
'vhost': virtual_host
})
virtual_host = quote(virtual_host, '')
return self.http_client.post(API_EXCHANGE_BIND %
(
virtual_host,
source,
destination
),
payload=bind_payload)
:param str username: Username
:param str virtual_host: Virtual host name
:param str configure_regex: Permission pattern for configuration
operations for this user.
:param str write_regex: Permission pattern for write operations
for this user.
:param str read_regex: Permission pattern for read operations
for this user.
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: dict
"""
virtual_host = quote(virtual_host, '')
permission_payload = json.dumps({
"configure": configure_regex,
"read": read_regex,
"write": write_regex
})
return self.http_client.put(API_USER_VIRTUAL_HOST_PERMISSIONS %
(
virtual_host,
username
),
payload=permission_payload)
def close(self, connection, reason='Closed via management api'):
"""Close Connection.
:param str connection: Connection name
:param str reason: Reason for closing connection.
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: None
"""
close_payload = json.dumps({
'name': connection,
'reason': reason
})
connection = quote(connection, '')
return self.http_client.delete(API_CONNECTION % connection,
payload=close_payload,
headers={
'X-Reason': reason
})
def unbind(self, destination='', source='', routing_key='',
virtual_host='/', properties_key=None):
"""Unbind an Exchange.
:param str source: Source Exchange name
:param str destination: Destination Exchange name
:param str routing_key: The routing key to use
:param str virtual_host: Virtual host name
:param str properties_key:
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: None
"""
unbind_payload = json.dumps({
'destination': destination,
'destination_type': 'e',
'properties_key': properties_key or routing_key,
'source': source,
'vhost': virtual_host
})
virtual_host = quote(virtual_host, '')
return self.http_client.delete(API_EXCHANGE_UNBIND %
(
virtual_host,
source,
destination,
properties_key or routing_key
),
payload=unbind_payload)
def unbind(self, queue='', exchange='', routing_key='', virtual_host='/',
properties_key=None):
"""Unbind a Queue.
:param str queue: Queue name
:param str exchange: Exchange name
:param str routing_key: The routing key to use
:param str virtual_host: Virtual host name
:param str properties_key:
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: None
"""
unbind_payload = json.dumps({
'destination': queue,
'destination_type': 'q',
'properties_key': properties_key or routing_key,
'source': exchange,
'vhost': virtual_host
})
virtual_host = quote(virtual_host, '')
return self.http_client.delete(API_QUEUE_UNBIND %
(
virtual_host,
exchange,
queue,
properties_key or routing_key
),
payload=unbind_payload)