File backport-batch-async-fixes-and-improvements-701.patch of Package salt.37943

From 4fe7231fa99de8edc848367386f1a6a5192a0f58 Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <vzhestkov@suse.com>
Date: Fri, 21 Feb 2025 11:15:42 +0100
Subject: [PATCH] Backport batch async fixes and improvements (#701)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Backport batch async fixes and improvements

Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com>

* Align batch_async tests

---------

Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com>
---
 salt/cli/batch_async.py                    | 60 ++++++++++++++++-----
 tests/pytests/unit/cli/test_batch_async.py | 63 ++++++++--------------
 2 files changed, 69 insertions(+), 54 deletions(-)

diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
index 5d49993faa..92215d0e04 100644
--- a/salt/cli/batch_async.py
+++ b/salt/cli/batch_async.py
@@ -35,7 +35,7 @@ def batch_async_required(opts, minions, extra):
     Check opts to identify if batch async is required for the operation.
     """
     if not isinstance(minions, list):
-        False
+        return False
     batch_async_opts = opts.get("batch_async", {})
     batch_async_threshold = (
         batch_async_opts.get("threshold", 1)
@@ -179,6 +179,7 @@ class SharedEventsChannel:
         self._used_by.discard(subscriber_id)
 
     def destroy_unused(self):
+        log.trace("SharedEventsChannel.destroy_unused called")
         if self._used_by:
             return False
         self.master_event.remove_event_handler(self.__handle_event)
@@ -267,6 +268,7 @@ class BatchAsync:
         self.ended = False
         self.event = self.events_channel.master_event
         self.scheduled = False
+        self._start_batch_on_timeout = None
 
     def __set_event_handler(self):
         self.events_channel.subscribe(
@@ -278,6 +280,8 @@ class BatchAsync:
 
     @salt.ext.tornado.gen.coroutine
     def __event_handler(self, tag, data, op):
+        # IMPORTANT: This function must run fast and not wait for any other task,
+        # otherwise it would cause events to be stuck.
         if not self.event:
             return
         try:
@@ -285,7 +289,9 @@ class BatchAsync:
             if op == "ping_return":
                 self.minions.add(minion)
                 if self.targeted_minions == self.minions:
-                    yield self.start_batch()
+                    # call start_batch and do not wait for timeout as we received
+                    # the responses from all the targets
+                    self.io_loop.add_callback(self.start_batch)
             elif op == "find_job_return":
                 if data.get("return", None):
                     self.find_job_returned.add(minion)
@@ -293,7 +299,8 @@ class BatchAsync:
                 if minion in self.active:
                     self.active.remove(minion)
                     self.done_minions.add(minion)
-                    yield self.schedule_next()
+                if not self.active:
+                    self.io_loop.add_callback(self.schedule_next)
         except Exception as ex:  # pylint: disable=W0703
             log.error(
                 "Exception occured while processing event: %s: %s",
@@ -333,7 +340,7 @@ class BatchAsync:
         )
 
         if timedout_minions:
-            yield self.schedule_next()
+            self.io_loop.add_callback(self.schedule_next)
 
         if self.event and running:
             self.find_job_returned = self.find_job_returned.difference(running)
@@ -344,6 +351,9 @@ class BatchAsync:
         """
         Find if the job was finished on the minions
         """
+        log.trace(
+            "[%s] BatchAsync.find_job called for minions: %s", self.batch_jid, minions
+        )
         if not self.event:
             return
         not_done = minions.difference(self.done_minions).difference(
@@ -386,6 +396,7 @@ class BatchAsync:
         if not self.event:
             return
         self.__set_event_handler()
+        # call test.ping for all the targets in async way
         ping_return = yield self.events_channel.local_client.run_job_async(
             self.opts["tgt"],
             "test.ping",
@@ -398,19 +409,24 @@ class BatchAsync:
             listen=False,
             **self.eauth,
         )
+        # ping_return contains actual targeted minions and no actual responses
+        # from the minions as it's async and intended to populate targeted_minions set
         self.targeted_minions = set(ping_return["minions"])
-        # start batching even if not all minions respond to ping
-        yield salt.ext.tornado.gen.sleep(
-            self.batch_presence_ping_timeout or self.opts["gather_job_timeout"]
+        # schedule start_batch to perform even if not all the minions responded
+        # self.__event_handler can push start_batch in case if all targets responded
+        self._start_batch_on_timeout = self.io_loop.call_later(
+            self.batch_presence_ping_timeout or self.opts["gather_job_timeout"],
+            self.start_batch,
         )
-        if self.event:
-            yield self.start_batch()
 
     @salt.ext.tornado.gen.coroutine
     def start_batch(self):
         """
         Fire `salt/batch/*/start` and continue batch with `run_next`
         """
+        if self._start_batch_on_timeout is not None:
+            self.io_loop.remove_timeout(self._start_batch_on_timeout)
+        self._start_batch_on_timeout = None
         if self.initialized:
             return
         self.batch_size = get_bnum(self.opts, self.minions, True)
@@ -431,6 +447,7 @@ class BatchAsync:
         """
         End the batch and call safe closing
         """
+        log.trace("[%s] BatchAsync.end_batch called", self.batch_jid)
         left = self.minions.symmetric_difference(
             self.done_minions.union(self.timedout_minions)
         )
@@ -452,10 +469,11 @@ class BatchAsync:
 
         # release to the IOLoop to allow the event to be published
         # before closing batch async execution
-        yield salt.ext.tornado.gen.sleep(1)
+        yield salt.ext.tornado.gen.sleep(0.03)
         self.close_safe()
 
     def close_safe(self):
+        log.trace("[%s] BatchAsync.close_safe called", self.batch_jid)
         if self.events_channel is not None:
             self.events_channel.unsubscribe(None, None, id(self))
             self.events_channel.unuse(id(self))
@@ -465,11 +483,22 @@ class BatchAsync:
 
     @salt.ext.tornado.gen.coroutine
     def schedule_next(self):
+        log.trace("[%s] BatchAsync.schedule_next called", self.batch_jid)
         if self.scheduled:
+            log.trace(
+                "[%s] BatchAsync.schedule_next -> Batch already scheduled, nothing to do.",
+                self.batch_jid,
+            )
             return
         self.scheduled = True
-        # call later so that we maybe gather more returns
-        yield salt.ext.tornado.gen.sleep(self.batch_delay)
+        if self._get_next():
+            # call later so that we maybe gather more returns
+            log.trace(
+                "[%s] BatchAsync.schedule_next delaying batch %s second(s).",
+                self.batch_jid,
+                self.batch_delay,
+            )
+            yield salt.ext.tornado.gen.sleep(self.batch_delay)
         if self.event:
             yield self.run_next()
 
@@ -480,6 +509,11 @@ class BatchAsync:
         """
         self.scheduled = False
         next_batch = self._get_next()
+        log.trace(
+            "[%s] BatchAsync.run_next called. Next Batch -> %s",
+            self.batch_jid,
+            next_batch,
+        )
         if not next_batch:
             yield self.end_batch()
             return
@@ -504,7 +538,7 @@ class BatchAsync:
             yield salt.ext.tornado.gen.sleep(self.opts["timeout"])
 
             # The batch can be done already at this point, which means no self.event
-            if self.event:
+            if self.event and self.active.intersection(next_batch):
                 yield self.find_job(set(next_batch))
         except Exception as ex:  # pylint: disable=W0703
             log.error(
diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py
index bc871aba54..be8de692e6 100644
--- a/tests/pytests/unit/cli/test_batch_async.py
+++ b/tests/pytests/unit/cli/test_batch_async.py
@@ -85,11 +85,17 @@ def test_batch_start_on_batch_presence_ping_timeout(batch):
     future.set_result({})
     with patch.object(batch, "events_channel", MagicMock()), patch(
         "salt.ext.tornado.gen.sleep", return_value=future
-    ), patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
+    ), patch.object(batch, "io_loop", MagicMock()), patch.object(
+        batch, "start_batch", return_value=future
+    ) as start_batch_mock:
         batch.events_channel.local_client.run_job_async.return_value = future_ret
         ret = batch.start()
-        # assert start_batch is called
-        start_batch_mock.assert_called_once()
+        # start_batch is scheduled to be called later
+        assert batch.io_loop.call_later.call_args[0] == (
+            batch.batch_presence_ping_timeout,
+            batch.start_batch,
+        )
+        assert batch._start_batch_on_timeout is not None
         # assert test.ping called
         assert batch.events_channel.local_client.run_job_async.call_args[0] == (
             "*",
@@ -109,16 +115,21 @@ def test_batch_start_on_gather_job_timeout(batch):
     batch.batch_presence_ping_timeout = None
     with patch.object(batch, "events_channel", MagicMock()), patch(
         "salt.ext.tornado.gen.sleep", return_value=future
+    ), patch.object(batch, "io_loop", MagicMock()), patch.object(
+        batch, "start_batch", return_value=future
     ), patch.object(
         batch, "start_batch", return_value=future
     ) as start_batch_mock, patch.object(
         batch, "batch_presence_ping_timeout", None
     ):
         batch.events_channel.local_client.run_job_async.return_value = future_ret
-        # ret = batch_async.start(batch)
         ret = batch.start()
-        # assert start_batch is called
-        start_batch_mock.assert_called_once()
+        # start_batch is scheduled to be called later
+        assert batch.io_loop.call_later.call_args[0] == (
+            batch.opts["gather_job_timeout"],
+            batch.start_batch,
+        )
+        assert batch._start_batch_on_timeout is not None
 
 
 def test_batch_fire_start_event(batch):
@@ -271,34 +282,10 @@ def test_batch__event_handler_ping_return(batch):
     assert batch.done_minions == set()
 
 
-def test_batch__event_handler_call_start_batch_when_all_pings_return(batch):
-    batch.targeted_minions = {"foo"}
-    future = salt.ext.tornado.gen.Future()
-    future.set_result({})
-    with patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
-        batch.start()
-        batch._BatchAsync__event_handler(
-            "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
-        )
-        start_batch_mock.assert_called_once()
-
-
-def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch):
-    batch.targeted_minions = {"foo", "bar"}
-    future = salt.ext.tornado.gen.Future()
-    future.set_result({})
-    with patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
-        batch.start()
-        batch._BatchAsync__event_handler(
-            "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
-        )
-        start_batch_mock.assert_not_called()
-
-
 def test_batch__event_handler_batch_run_return(batch):
     future = salt.ext.tornado.gen.Future()
     future.set_result({})
-    with patch.object(
+    with patch.object(batch, "io_loop", MagicMock()), patch.object(
         batch, "schedule_next", return_value=future
     ) as schedule_next_mock:
         batch.start()
@@ -308,7 +295,7 @@ def test_batch__event_handler_batch_run_return(batch):
         )
         assert batch.active == set()
         assert batch.done_minions == {"foo"}
-        schedule_next_mock.assert_called_once()
+        batch.io_loop.add_callback.call_args[0] == (batch.schedule_next)
 
 
 def test_batch__event_handler_find_job_return(batch):
@@ -322,9 +309,7 @@ def test_batch__event_handler_find_job_return(batch):
 def test_batch_run_next_end_batch_when_no_next(batch):
     future = salt.ext.tornado.gen.Future()
     future.set_result({})
-    with patch.object(
-        batch, "_get_next", return_value={}
-    ), patch.object(
+    with patch.object(batch, "_get_next", return_value={}), patch.object(
         batch, "end_batch", return_value=future
     ) as end_batch_mock:
         batch.run_next()
@@ -337,9 +322,7 @@ def test_batch_find_job(batch):
     batch.minions = {"foo", "bar"}
     with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
         batch, "check_find_job", return_value=future
-    ) as check_find_job_mock, patch.object(
-        batch, "jid_gen", return_value="1236"
-    ):
+    ) as check_find_job_mock, patch.object(batch, "jid_gen", return_value="1236"):
         batch.events_channel.local_client.run_job_async.return_value = future
         batch.find_job({"foo", "bar"})
         assert check_find_job_mock.call_args[0] == (
@@ -355,9 +338,7 @@ def test_batch_find_job_with_done_minions(batch):
     batch.minions = {"foo", "bar"}
     with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
         batch, "check_find_job", return_value=future
-    ) as check_find_job_mock, patch.object(
-        batch, "jid_gen", return_value="1236"
-    ):
+    ) as check_find_job_mock, patch.object(batch, "jid_gen", return_value="1236"):
         batch.events_channel.local_client.run_job_async.return_value = future
         batch.find_job({"foo", "bar"})
         assert check_find_job_mock.call_args[0] == (
-- 
2.48.1

openSUSE Build Service is sponsored by