File 0055-Backport-31164-and-31364-32474.patch of Package salt.3314
From cacf00e3a08421a83e64a4649ce8c02752ae820a Mon Sep 17 00:00:00 2001
From: Dmitry Kuzmenko <dmitry.kuzmenko@dsr-company.com>
Date: Mon, 11 Apr 2016 17:02:36 +0300
Subject: [PATCH 55/55] Backport 31164 and 31364 (#32474)
* Don't send REQ while another one is waiting for response.
The message has to be removed from the queue the only *after* it's
already processed to don't confuse send() functionality that expects
empty queue means: there's no active sendings.
* Fixed zeromq ReqMessageClient destroy
(cherry picked from commit f5bd6bdcc38fde068b57285699a8889a9f3ee89b)
---
salt/transport/tcp.py | 5 ++++-
salt/transport/zeromq.py | 18 ++++++++++++++----
2 files changed, 18 insertions(+), 5 deletions(-)
diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py
index 8beec9d..6a27d15 100644
--- a/salt/transport/tcp.py
+++ b/salt/transport/tcp.py
@@ -516,19 +516,22 @@ class SaltMessageClient(object):
while not self._connecting_future.done() or self._connecting_future.result() is not True:
yield self._connecting_future
while len(self.send_queue) > 0:
- message_id, item = self.send_queue.pop(0)
+ message_id, item = self.send_queue[0]
try:
yield self._stream.write(item)
+ del self.send_queue[0]
# if the connection is dead, lets fail this send, and make sure we
# attempt to reconnect
except tornado.iostream.StreamClosedError as e:
self.send_future_map.pop(message_id).set_exception(Exception())
self.remove_message_timeout(message_id)
+ del self.send_queue[0]
if self._closing:
return
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
+ yield self._connecting_future
def _message_id(self):
wrap = False
diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py
index 5ae363b..505154e 100644
--- a/salt/transport/zeromq.py
+++ b/salt/transport/zeromq.py
@@ -746,6 +746,7 @@ class AsyncReqMessageClient(object):
self.stream.io_loop.remove_handler(self.stream.socket)
# set this to None, more hacks for messed up pyzmq
self.stream.socket = None
+ self.stream = None
self.socket.close()
self.context.term()
@@ -806,8 +807,12 @@ class AsyncReqMessageClient(object):
@tornado.gen.coroutine
def _internal_send_recv(self):
while len(self.send_queue) > 0:
- message = self.send_queue.pop(0)
- future = self.send_future_map[message]
+ message = self.send_queue[0]
+ future = self.send_future_map.get(message, None)
+ if future is None:
+ # Timedout
+ del self.send_queue[0]
+ continue
# send
def mark_future(msg):
@@ -820,14 +825,19 @@ class AsyncReqMessageClient(object):
ret = yield future
except: # pylint: disable=W0702
self._init_socket() # re-init the zmq socket (no other way in zmq)
+ del self.send_queue[0]
continue
+ del self.send_queue[0]
+ self.send_future_map.pop(message, None)
self.remove_message_timeout(message)
def remove_message_timeout(self, message):
if message not in self.send_timeout_map:
return
- timeout = self.send_timeout_map.pop(message)
- self.io_loop.remove_timeout(timeout)
+ timeout = self.send_timeout_map.pop(message, None)
+ if timeout is not None:
+ # Hasn't been already timedout
+ self.io_loop.remove_timeout(timeout)
def timeout_message(self, message):
'''
--
2.7.3