File improve-batch_async-to-release-consumed-memory-bsc-1.patch of Package salt

From e53d50ce5fabf67eeb5344f7be9cccbb09d0179b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?=
 <psuarezhernandez@suse.com>
Date: Thu, 26 Sep 2019 10:41:06 +0100
Subject: [PATCH] Improve batch_async to release consumed memory
 (bsc#1140912)

---
 salt/cli/batch_async.py | 89 ++++++++++++++++++++++++-----------------
 1 file changed, 52 insertions(+), 37 deletions(-)

diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
index 388b709416..0a0b8f5f83 100644
--- a/salt/cli/batch_async.py
+++ b/salt/cli/batch_async.py
@@ -2,7 +2,7 @@
 Execute a job on the targeted minions by using a moving window of fixed size `batch`.
 """
 
-import fnmatch
+import gc
 
 # pylint: enable=import-error,no-name-in-module,redefined-builtin
 import logging
@@ -78,6 +78,7 @@ class BatchAsync:
         self.batch_jid = jid_gen()
         self.find_job_jid = jid_gen()
         self.find_job_returned = set()
+        self.ended = False
         self.event = salt.utils.event.get_event(
             "master",
             self.opts["sock_dir"],
@@ -88,6 +89,7 @@ class BatchAsync:
             keep_loop=True,
         )
         self.scheduled = False
+        self.patterns = {}
 
     def __set_event_handler(self):
         ping_return_pattern = "salt/job/{}/ret/*".format(self.ping_jid)
@@ -118,7 +120,7 @@ class BatchAsync:
                     if minion in self.active:
                         self.active.remove(minion)
                         self.done_minions.add(minion)
-                        self.schedule_next()
+                        self.event.io_loop.spawn_callback(self.schedule_next)
 
     def _get_next(self):
         to_run = (
@@ -132,27 +134,27 @@ class BatchAsync:
         )
         return set(list(to_run)[:next_batch_size])
 
-    @tornado.gen.coroutine
     def check_find_job(self, batch_minions, jid):
-        find_job_return_pattern = "salt/job/{}/ret/*".format(jid)
-        self.event.unsubscribe(find_job_return_pattern, match_type="glob")
-        self.patterns.remove((find_job_return_pattern, "find_job_return"))
+        if self.event:
+            find_job_return_pattern = "salt/job/{}/ret/*".format(jid)
+            self.event.unsubscribe(find_job_return_pattern, match_type="glob")
+            self.patterns.remove((find_job_return_pattern, "find_job_return"))
 
-        timedout_minions = batch_minions.difference(self.find_job_returned).difference(
-            self.done_minions
-        )
-        self.timedout_minions = self.timedout_minions.union(timedout_minions)
-        self.active = self.active.difference(self.timedout_minions)
-        running = batch_minions.difference(self.done_minions).difference(
-            self.timedout_minions
-        )
+            timedout_minions = batch_minions.difference(
+                self.find_job_returned
+            ).difference(self.done_minions)
+            self.timedout_minions = self.timedout_minions.union(timedout_minions)
+            self.active = self.active.difference(self.timedout_minions)
+            running = batch_minions.difference(self.done_minions).difference(
+                self.timedout_minions
+            )
 
-        if timedout_minions:
-            self.schedule_next()
+            if timedout_minions:
+                self.schedule_next()
 
-        if running:
-            self.find_job_returned = self.find_job_returned.difference(running)
-            self.event.io_loop.add_callback(self.find_job, running)
+            if running:
+                self.find_job_returned = self.find_job_returned.difference(running)
+                self.event.io_loop.spawn_callback(self.find_job, running)
 
     @tornado.gen.coroutine
     def find_job(self, minions):
@@ -175,18 +177,12 @@ class BatchAsync:
                 jid=jid,
                 **self.eauth
             )
-            self.event.io_loop.call_later(
-                self.opts["gather_job_timeout"], self.check_find_job, not_done, jid
-            )
+            yield tornado.gen.sleep(self.opts["gather_job_timeout"])
+            self.event.io_loop.spawn_callback(self.check_find_job, not_done, jid)
 
     @tornado.gen.coroutine
     def start(self):
         self.__set_event_handler()
-        # start batching even if not all minions respond to ping
-        self.event.io_loop.call_later(
-            self.batch_presence_ping_timeout or self.opts["gather_job_timeout"],
-            self.start_batch,
-        )
         ping_return = yield self.local.run_job_async(
             self.opts["tgt"],
             "test.ping",
@@ -198,6 +194,11 @@ class BatchAsync:
             **self.eauth
         )
         self.targeted_minions = set(ping_return["minions"])
+        # start batching even if not all minions respond to ping
+        yield tornado.gen.sleep(
+            self.batch_presence_ping_timeout or self.opts["gather_job_timeout"]
+        )
+        self.event.io_loop.spawn_callback(self.start_batch)
 
     @tornado.gen.coroutine
     def start_batch(self):
@@ -209,14 +210,18 @@ class BatchAsync:
                 "down_minions": self.targeted_minions.difference(self.minions),
                 "metadata": self.metadata,
             }
-            self.event.fire_event(data, "salt/batch/{}/start".format(self.batch_jid))
-            yield self.run_next()
+            ret = self.event.fire_event(
+                data, "salt/batch/{}/start".format(self.batch_jid)
+            )
+            self.event.io_loop.spawn_callback(self.run_next)
 
+    @tornado.gen.coroutine
     def end_batch(self):
         left = self.minions.symmetric_difference(
             self.done_minions.union(self.timedout_minions)
         )
-        if not left:
+        if not left and not self.ended:
+            self.ended = True
             data = {
                 "available_minions": self.minions,
                 "down_minions": self.targeted_minions.difference(self.minions),
@@ -229,20 +234,26 @@ class BatchAsync:
             for (pattern, label) in self.patterns:
                 if label in ["ping_return", "batch_run"]:
                     self.event.unsubscribe(pattern, match_type="glob")
+            del self
+            gc.collect()
+        yield
 
+    @tornado.gen.coroutine
     def schedule_next(self):
         if not self.scheduled:
             self.scheduled = True
             # call later so that we maybe gather more returns
-            self.event.io_loop.call_later(self.batch_delay, self.run_next)
+            yield tornado.gen.sleep(self.batch_delay)
+            self.event.io_loop.spawn_callback(self.run_next)
 
     @tornado.gen.coroutine
     def run_next(self):
+        self.scheduled = False
         next_batch = self._get_next()
         if next_batch:
             self.active = self.active.union(next_batch)
             try:
-                yield self.local.run_job_async(
+                ret = yield self.local.run_job_async(
                     next_batch,
                     self.opts["fun"],
                     self.opts["arg"],
@@ -254,13 +265,17 @@ class BatchAsync:
                     metadata=self.metadata,
                 )
 
-                self.event.io_loop.call_later(
-                    self.opts["timeout"], self.find_job, set(next_batch)
-                )
+                yield tornado.gen.sleep(self.opts["timeout"])
+                self.event.io_loop.spawn_callback(self.find_job, set(next_batch))
             except Exception as ex:
                 log.error("Error in scheduling next batch: %s", ex)
                 self.active = self.active.difference(next_batch)
         else:
-            self.end_batch()
-        self.scheduled = False
+            yield self.end_batch()
+        gc.collect()
         yield
+
+    def __del__(self):
+        self.event = None
+        self.ioloop = None
+        gc.collect()
-- 
2.29.2


openSUSE Build Service is sponsored by