Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def Producer(self, *args, **kwargs):
from kombu.messaging import Producer
return Producer(self, *args, **kwargs)
NOTE(sileht): Must be called within the connection lock
"""
if new_channel == self.channel:
return
if self.channel is not None:
self._declared_queues.clear()
self._declared_exchanges.clear()
self.connection.maybe_close_channel(self.channel)
self.channel = new_channel
if new_channel is not None:
if self.purpose == rpc_common.PURPOSE_LISTEN:
self._set_qos(new_channel)
self._producer = kombu.messaging.Producer(new_channel,
on_return=self.on_return)
for consumer in self._consumers:
consumer.declare(self)
def enable(self):
self.publisher = Producer(self.channel or self.connection.channel(),
exchange=event_exchange,
serializer=self.serializer)
self.enabled = True
for callback in self.on_enabled:
callback()
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection."""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
channel=channel,
routing_key=self.routing_key)
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection."""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
channel=channel,
routing_key=self.routing_key)
def reconnect(self):
LOG.info("Opening new AMQP connection to amqp://%s@%s:%s%s" % (
self.conf.amqp_userid, self.conf.amqp_hostname, self.conf.amqp_port, self.conf.amqp_virtual_host))
if self.connection:
self.connection.release()
self.connection = Connection(
hostname=self.conf.amqp_hostname,
userid=self.conf.amqp_userid,
password=self.conf.amqp_password,
virtual_host=self.conf.amqp_virtual_host,
port=self.conf.amqp_port)
channel = self.connection.channel() # get a new channel
self.producer = Producer(channel, self.exchange,
auto_declare=[self.exchange])
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection."""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
channel=channel,
routing_key=self.routing_key)
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection."""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
channel=channel,
routing_key=self.routing_key)
def publish(self, type, arguments, destination=None, reply_ticket=None,
channel=None):
arguments["command"] = type
arguments["destination"] = destination
if reply_ticket:
arguments["reply_to"] = {"exchange": self.reply_exchange.name,
"routing_key": reply_ticket}
chan = channel or self.connection.channel()
producer = Producer(chan, exchange=self.exchange)
try:
producer.publish({"control": arguments})
finally:
channel or chan.close()
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection."""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
channel=channel,
routing_key=self.routing_key)