File 0055-Backport-31164-and-31364-32474.patch of Package salt.3314

From cacf00e3a08421a83e64a4649ce8c02752ae820a Mon Sep 17 00:00:00 2001
From: Dmitry Kuzmenko <dmitry.kuzmenko@dsr-company.com>
Date: Mon, 11 Apr 2016 17:02:36 +0300
Subject: [PATCH 55/55] Backport 31164 and 31364 (#32474)

* Don't send REQ while another one is waiting for response.

The message has to be removed from the queue the only *after* it's
already processed to don't confuse send() functionality that expects
empty queue means: there's no active sendings.

* Fixed zeromq ReqMessageClient destroy

(cherry picked from commit f5bd6bdcc38fde068b57285699a8889a9f3ee89b)
---
 salt/transport/tcp.py    |  5 ++++-
 salt/transport/zeromq.py | 18 ++++++++++++++----
 2 files changed, 18 insertions(+), 5 deletions(-)

diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py
index 8beec9d..6a27d15 100644
--- a/salt/transport/tcp.py
+++ b/salt/transport/tcp.py
@@ -516,19 +516,22 @@ class SaltMessageClient(object):
         while not self._connecting_future.done() or self._connecting_future.result() is not True:
             yield self._connecting_future
         while len(self.send_queue) > 0:
-            message_id, item = self.send_queue.pop(0)
+            message_id, item = self.send_queue[0]
             try:
                 yield self._stream.write(item)
+                del self.send_queue[0]
             # if the connection is dead, lets fail this send, and make sure we
             # attempt to reconnect
             except tornado.iostream.StreamClosedError as e:
                 self.send_future_map.pop(message_id).set_exception(Exception())
                 self.remove_message_timeout(message_id)
+                del self.send_queue[0]
                 if self._closing:
                     return
                 # if the last connect finished, then we need to make a new one
                 if self._connecting_future.done():
                     self._connecting_future = self.connect()
+                yield self._connecting_future
 
     def _message_id(self):
         wrap = False
diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py
index 5ae363b..505154e 100644
--- a/salt/transport/zeromq.py
+++ b/salt/transport/zeromq.py
@@ -746,6 +746,7 @@ class AsyncReqMessageClient(object):
             self.stream.io_loop.remove_handler(self.stream.socket)
             # set this to None, more hacks for messed up pyzmq
             self.stream.socket = None
+            self.stream = None
             self.socket.close()
         self.context.term()
 
@@ -806,8 +807,12 @@ class AsyncReqMessageClient(object):
     @tornado.gen.coroutine
     def _internal_send_recv(self):
         while len(self.send_queue) > 0:
-            message = self.send_queue.pop(0)
-            future = self.send_future_map[message]
+            message = self.send_queue[0]
+            future = self.send_future_map.get(message, None)
+            if future is None:
+                # Timedout
+                del self.send_queue[0]
+                continue
 
             # send
             def mark_future(msg):
@@ -820,14 +825,19 @@ class AsyncReqMessageClient(object):
                 ret = yield future
             except:  # pylint: disable=W0702
                 self._init_socket()  # re-init the zmq socket (no other way in zmq)
+                del self.send_queue[0]
                 continue
+            del self.send_queue[0]
+            self.send_future_map.pop(message, None)
             self.remove_message_timeout(message)
 
     def remove_message_timeout(self, message):
         if message not in self.send_timeout_map:
             return
-        timeout = self.send_timeout_map.pop(message)
-        self.io_loop.remove_timeout(timeout)
+        timeout = self.send_timeout_map.pop(message, None)
+        if timeout is not None:
+            # Hasn't been already timedout
+            self.io_loop.remove_timeout(timeout)
 
     def timeout_message(self, message):
         '''
-- 
2.7.3

openSUSE Build Service is sponsored by