File 0001-Retry-to-declare-a-queue-after-internal-error.patch of Package python-oslo.messaging
From 94e13755bcf1883c317b1a69c00d3c7c51d8957d Mon Sep 17 00:00:00 2001
From: Gabriele <gsantomaggio@suse.com>
Date: Thu, 4 Apr 2019 14:56:25 +0200
Subject: [PATCH] Retry to declare a queue after internal error
Without this commit, the client can lose the messages, because the
client does not handler the 'AMQP internal error 541',
read here [2] for details.
The fix retries to create the queue after a delay.
When the virtual-host is ready the declare does not fail.
This is a rare condiction, please read the bug [1] for details.
Closes-Bug: #1822778
[1] https://bugs.launchpad.net/oslo.messaging/+bug/1822778
[2] https://www.rabbitmq.com/amqp-0-9-1-reference.html
Change-Id: I7ab1f9d21ebb807285bf1422bc14cc6e07dcd32a
---
oslo_messaging/_drivers/impl_rabbit.py | 23 ++++++++++++++++++++++-
1 file changed, 22 insertions(+), 1 deletion(-)
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 06be3c3d..3397aeb1 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -277,7 +277,6 @@ class Consumer(object):
self.nowait = nowait
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
rabbit_queue_ttl)
-
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
@@ -312,6 +311,28 @@ class Consumer(object):
self.queue.declare()
else:
raise
+ except kombu.exceptions.ConnectionError as exc:
+ # NOTE(gsantomaggio): This exception happens when the
+ # connection is established,but it fails to create the queue.
+ # Add some delay to avoid too many requests to the server.
+ # See: https://bugs.launchpad.net/oslo.messaging/+bug/1822778
+ # for details.
+ if exc.code == 541:
+ interval = 2
+ info = {'sleep_time': interval,
+ 'queue': self.queue_name,
+ 'err_str': exc
+ }
+ LOG.error(_LE('Internal amqp error (541) '
+ 'during queue declare,'
+ 'retrying in %(sleep_time)s seconds. '
+ 'Queue: [%(queue)s], '
+ 'error message: [%(err_str)s]'), info)
+ time.sleep(interval)
+ self.queue.declare()
+ else:
+ raise
+
self._declared_on = conn.channel
def consume(self, conn, tag):
--
2.25.1