File fix-memory-leak-produced-by-batch-async-find_jobs-me.patch of Package salt.17342
From 77d53d9567b7aec045a8fffd29afcb76a8405caf Mon Sep 17 00:00:00 2001
From: Mihai Dinca <mdinca@suse.de>
Date: Mon, 16 Sep 2019 11:27:30 +0200
Subject: [PATCH] Fix memory leak produced by batch async find_jobs
 mechanism (bsc#1140912)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Multiple fixes:
- use different JIDs per find_job
- fix bug in detection of find_job returns
- fix timeout passed from request payload
- better cleanup at the end of batching
Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com>
---
 salt/cli/batch_async.py | 60 ++++++++++++++++++++++++++++++++-----------------
 salt/client/__init__.py |  1 +
 salt/master.py          |  1 -
 3 files changed, 41 insertions(+), 21 deletions(-)
diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
index 8c8f481e34..8a67331102 100644
--- a/salt/cli/batch_async.py
+++ b/salt/cli/batch_async.py
@@ -72,6 +72,7 @@ class BatchAsync(object):
         self.done_minions = set()
         self.active = set()
         self.initialized = False
+        self.jid_gen = jid_gen
         self.ping_jid = jid_gen()
         self.batch_jid = jid_gen()
         self.find_job_jid = jid_gen()
@@ -89,14 +90,11 @@ class BatchAsync(object):
     def __set_event_handler(self):
         ping_return_pattern = 'salt/job/{0}/ret/*'.format(self.ping_jid)
         batch_return_pattern = 'salt/job/{0}/ret/*'.format(self.batch_jid)
-        find_job_return_pattern = 'salt/job/{0}/ret/*'.format(self.find_job_jid)
         self.event.subscribe(ping_return_pattern, match_type='glob')
         self.event.subscribe(batch_return_pattern, match_type='glob')
-        self.event.subscribe(find_job_return_pattern, match_type='glob')
-        self.event.patterns = {
+        self.patterns = {
             (ping_return_pattern, 'ping_return'),
             (batch_return_pattern, 'batch_run'),
-            (find_job_return_pattern, 'find_job_return')
         }
         self.event.set_event_handler(self.__event_handler)
 
@@ -104,7 +102,7 @@ class BatchAsync(object):
         if not self.event:
             return
         mtag, data = self.event.unpack(raw, self.event.serial)
-        for (pattern, op) in self.event.patterns:
+        for (pattern, op) in self.patterns:
             if fnmatch.fnmatch(mtag, pattern):
                 minion = data['id']
                 if op == 'ping_return':
@@ -112,7 +110,8 @@ class BatchAsync(object):
                     if self.targeted_minions == self.minions:
                         self.event.io_loop.spawn_callback(self.start_batch)
                 elif op == 'find_job_return':
-                    self.find_job_returned.add(minion)
+                    if data.get("return", None):
+                        self.find_job_returned.add(minion)
                 elif op == 'batch_run':
                     if minion in self.active:
                         self.active.remove(minion)
@@ -131,31 +130,46 @@ class BatchAsync(object):
         return set(list(to_run)[:next_batch_size])
 
     @tornado.gen.coroutine
-    def check_find_job(self, batch_minions):
+    def check_find_job(self, batch_minions, jid):
+        find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid)
+        self.event.unsubscribe(find_job_return_pattern, match_type='glob')
+        self.patterns.remove((find_job_return_pattern, "find_job_return"))
+
         timedout_minions = batch_minions.difference(self.find_job_returned).difference(self.done_minions)
         self.timedout_minions = self.timedout_minions.union(timedout_minions)
         self.active = self.active.difference(self.timedout_minions)
         running = batch_minions.difference(self.done_minions).difference(self.timedout_minions)
+
         if timedout_minions:
             self.schedule_next()
+
         if running:
+            self.find_job_returned = self.find_job_returned.difference(running)
             self.event.io_loop.add_callback(self.find_job, running)
 
     @tornado.gen.coroutine
     def find_job(self, minions):
-        not_done = minions.difference(self.done_minions)
-        ping_return = yield self.local.run_job_async(
-            not_done,
-            'saltutil.find_job',
-            [self.batch_jid],
-            'list',
-            gather_job_timeout=self.opts['gather_job_timeout'],
-            jid=self.find_job_jid,
-            **self.eauth)
-        self.event.io_loop.call_later(
-            self.opts['gather_job_timeout'],
-            self.check_find_job,
-            not_done)
+        not_done = minions.difference(self.done_minions).difference(self.timedout_minions)
+
+        if not_done:
+            jid = self.jid_gen()
+            find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid)
+            self.patterns.add((find_job_return_pattern, "find_job_return"))
+            self.event.subscribe(find_job_return_pattern, match_type='glob')
+
+            ret = yield self.local.run_job_async(
+                not_done,
+                'saltutil.find_job',
+                [self.batch_jid],
+                'list',
+                gather_job_timeout=self.opts['gather_job_timeout'],
+                jid=jid,
+                **self.eauth)
+            self.event.io_loop.call_later(
+                self.opts['gather_job_timeout'],
+                self.check_find_job,
+                not_done,
+                jid)
 
     @tornado.gen.coroutine
     def start(self):
@@ -203,6 +217,9 @@ class BatchAsync(object):
             }
             self.event.fire_event(data, "salt/batch/{0}/done".format(self.batch_jid))
             self.event.remove_event_handler(self.__event_handler)
+            for (pattern, label) in self.patterns:
+                if label in ["ping_return", "batch_run"]:
+                    self.event.unsubscribe(pattern, match_type='glob')
 
     def schedule_next(self):
         if not self.scheduled:
@@ -226,9 +243,12 @@ class BatchAsync(object):
                     gather_job_timeout=self.opts['gather_job_timeout'],
                     jid=self.batch_jid,
                     metadata=self.metadata)
+
                 self.event.io_loop.call_later(self.opts['timeout'], self.find_job, set(next_batch))
             except Exception as ex:
+                log.error("Error in scheduling next batch: %s", ex)
                 self.active = self.active.difference(next_batch)
         else:
             self.end_batch()
         self.scheduled = False
+        yield
diff --git a/salt/client/__init__.py b/salt/client/__init__.py
index 3bbc7f9de7..a48d79ef8d 100644
--- a/salt/client/__init__.py
+++ b/salt/client/__init__.py
@@ -1622,6 +1622,7 @@ class LocalClient(object):
                           'key': self.key,
                           'tgt_type': tgt_type,
                           'ret': ret,
+                          'timeout': timeout,
                           'jid': jid}
 
         # if kwargs are passed, pack them.
diff --git a/salt/master.py b/salt/master.py
index 5e2277ba76..3abf7ae60b 100644
--- a/salt/master.py
+++ b/salt/master.py
@@ -2044,7 +2044,6 @@ class ClearFuncs(object):
     def publish_batch(self, clear_load, minions, missing):
         batch_load = {}
         batch_load.update(clear_load)
-        import salt.cli.batch_async
         batch = salt.cli.batch_async.BatchAsync(
             self.local.opts,
             functools.partial(self._prep_jid, clear_load, {}),
-- 
2.16.4