We have some news to share for the request index beta feature. We’ve added more options to sort your requests, counters to the individual filters and documentation for the search functionality. Checkout the blog post for more details.

File backport-batch-async-fixes-and-improvements-701.patch of Package venv-salt-minion

From 4fe7231fa99de8edc848367386f1a6a5192a0f58 Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <vzhestkov@suse.com>
Date: Fri, 21 Feb 2025 11:15:42 +0100
Subject: [PATCH] Backport batch async fixes and improvements (#701)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Backport batch async fixes and improvements

Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com>

* Align batch_async tests

---------

Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com>
---
 salt/cli/batch_async.py                    | 60 ++++++++++++++++-----
 tests/pytests/unit/cli/test_batch_async.py | 63 ++++++++--------------
 2 files changed, 69 insertions(+), 54 deletions(-)

diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
index 5d49993faa..92215d0e04 100644
--- a/salt/cli/batch_async.py
+++ b/salt/cli/batch_async.py
@@ -35,7 +35,7 @@ def batch_async_required(opts, minions, extra):
     Check opts to identify if batch async is required for the operation.
     """
     if not isinstance(minions, list):
-        False
+        return False
     batch_async_opts = opts.get("batch_async", {})
     batch_async_threshold = (
         batch_async_opts.get("threshold", 1)
@@ -179,6 +179,7 @@ class SharedEventsChannel:
         self._used_by.discard(subscriber_id)
 
     def destroy_unused(self):
+        log.trace("SharedEventsChannel.destroy_unused called")
         if self._used_by:
             return False
         self.master_event.remove_event_handler(self.__handle_event)
@@ -267,6 +268,7 @@ class BatchAsync:
         self.ended = False
         self.event = self.events_channel.master_event
         self.scheduled = False
+        self._start_batch_on_timeout = None
 
     def __set_event_handler(self):
         self.events_channel.subscribe(
@@ -278,6 +280,8 @@ class BatchAsync:
 
     @salt.ext.tornado.gen.coroutine
     def __event_handler(self, tag, data, op):
+        # IMPORTANT: This function must run fast and not wait for any other task,
+        # otherwise it would cause events to be stuck.
         if not self.event:
             return
         try:
@@ -285,7 +289,9 @@ class BatchAsync:
             if op == "ping_return":
                 self.minions.add(minion)
                 if self.targeted_minions == self.minions:
-                    yield self.start_batch()
+                    # call start_batch and do not wait for timeout as we received
+                    # the responses from all the targets
+                    self.io_loop.add_callback(self.start_batch)
             elif op == "find_job_return":
                 if data.get("return", None):
                     self.find_job_returned.add(minion)
@@ -293,7 +299,8 @@ class BatchAsync:
                 if minion in self.active:
                     self.active.remove(minion)
                     self.done_minions.add(minion)
-                    yield self.schedule_next()
+                if not self.active:
+                    self.io_loop.add_callback(self.schedule_next)
         except Exception as ex:  # pylint: disable=W0703
             log.error(
                 "Exception occured while processing event: %s: %s",
@@ -333,7 +340,7 @@ class BatchAsync:
         )
 
         if timedout_minions:
-            yield self.schedule_next()
+            self.io_loop.add_callback(self.schedule_next)
 
         if self.event and running:
             self.find_job_returned = self.find_job_returned.difference(running)
@@ -344,6 +351,9 @@ class BatchAsync:
         """
         Find if the job was finished on the minions
         """
+        log.trace(
+            "[%s] BatchAsync.find_job called for minions: %s", self.batch_jid, minions
+        )
         if not self.event:
             return
         not_done = minions.difference(self.done_minions).difference(
@@ -386,6 +396,7 @@ class BatchAsync:
         if not self.event:
             return
         self.__set_event_handler()
+        # call test.ping for all the targets in async way
         ping_return = yield self.events_channel.local_client.run_job_async(
             self.opts["tgt"],
             "test.ping",
@@ -398,19 +409,24 @@ class BatchAsync:
             listen=False,
             **self.eauth,
         )
+        # ping_return contains actual targeted minions and no actual responses
+        # from the minions as it's async and intended to populate targeted_minions set
         self.targeted_minions = set(ping_return["minions"])
-        # start batching even if not all minions respond to ping
-        yield salt.ext.tornado.gen.sleep(
-            self.batch_presence_ping_timeout or self.opts["gather_job_timeout"]
+        # schedule start_batch to perform even if not all the minions responded
+        # self.__event_handler can push start_batch in case if all targets responded
+        self._start_batch_on_timeout = self.io_loop.call_later(
+            self.batch_presence_ping_timeout or self.opts["gather_job_timeout"],
+            self.start_batch,
         )
-        if self.event:
-            yield self.start_batch()
 
     @salt.ext.tornado.gen.coroutine
     def start_batch(self):
         """
         Fire `salt/batch/*/start` and continue batch with `run_next`
         """
+        if self._start_batch_on_timeout is not None:
+            self.io_loop.remove_timeout(self._start_batch_on_timeout)
+        self._start_batch_on_timeout = None
         if self.initialized:
             return
         self.batch_size = get_bnum(self.opts, self.minions, True)
@@ -431,6 +447,7 @@ class BatchAsync:
         """
         End the batch and call safe closing
         """
+        log.trace("[%s] BatchAsync.end_batch called", self.batch_jid)
         left = self.minions.symmetric_difference(
             self.done_minions.union(self.timedout_minions)
         )
@@ -452,10 +469,11 @@ class BatchAsync:
 
         # release to the IOLoop to allow the event to be published
         # before closing batch async execution
-        yield salt.ext.tornado.gen.sleep(1)
+        yield salt.ext.tornado.gen.sleep(0.03)
         self.close_safe()
 
     def close_safe(self):
+        log.trace("[%s] BatchAsync.close_safe called", self.batch_jid)
         if self.events_channel is not None:
             self.events_channel.unsubscribe(None, None, id(self))
             self.events_channel.unuse(id(self))
@@ -465,11 +483,22 @@ class BatchAsync:
 
     @salt.ext.tornado.gen.coroutine
     def schedule_next(self):
+        log.trace("[%s] BatchAsync.schedule_next called", self.batch_jid)
         if self.scheduled:
+            log.trace(
+                "[%s] BatchAsync.schedule_next -> Batch already scheduled, nothing to do.",
+                self.batch_jid,
+            )
             return
         self.scheduled = True
-        # call later so that we maybe gather more returns
-        yield salt.ext.tornado.gen.sleep(self.batch_delay)
+        if self._get_next():
+            # call later so that we maybe gather more returns
+            log.trace(
+                "[%s] BatchAsync.schedule_next delaying batch %s second(s).",
+                self.batch_jid,
+                self.batch_delay,
+            )
+            yield salt.ext.tornado.gen.sleep(self.batch_delay)
         if self.event:
             yield self.run_next()
 
@@ -480,6 +509,11 @@ class BatchAsync:
         """
         self.scheduled = False
         next_batch = self._get_next()
+        log.trace(
+            "[%s] BatchAsync.run_next called. Next Batch -> %s",
+            self.batch_jid,
+            next_batch,
+        )
         if not next_batch:
             yield self.end_batch()
             return
@@ -504,7 +538,7 @@ class BatchAsync:
             yield salt.ext.tornado.gen.sleep(self.opts["timeout"])
 
             # The batch can be done already at this point, which means no self.event
-            if self.event:
+            if self.event and self.active.intersection(next_batch):
                 yield self.find_job(set(next_batch))
         except Exception as ex:  # pylint: disable=W0703
             log.error(
diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py
index bc871aba54..be8de692e6 100644
--- a/tests/pytests/unit/cli/test_batch_async.py
+++ b/tests/pytests/unit/cli/test_batch_async.py
@@ -85,11 +85,17 @@ def test_batch_start_on_batch_presence_ping_timeout(batch):
     future.set_result({})
     with patch.object(batch, "events_channel", MagicMock()), patch(
         "salt.ext.tornado.gen.sleep", return_value=future
-    ), patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
+    ), patch.object(batch, "io_loop", MagicMock()), patch.object(
+        batch, "start_batch", return_value=future
+    ) as start_batch_mock:
         batch.events_channel.local_client.run_job_async.return_value = future_ret
         ret = batch.start()
-        # assert start_batch is called
-        start_batch_mock.assert_called_once()
+        # start_batch is scheduled to be called later
+        assert batch.io_loop.call_later.call_args[0] == (
+            batch.batch_presence_ping_timeout,
+            batch.start_batch,
+        )
+        assert batch._start_batch_on_timeout is not None
         # assert test.ping called
         assert batch.events_channel.local_client.run_job_async.call_args[0] == (
             "*",
@@ -109,16 +115,21 @@ def test_batch_start_on_gather_job_timeout(batch):
     batch.batch_presence_ping_timeout = None
     with patch.object(batch, "events_channel", MagicMock()), patch(
         "salt.ext.tornado.gen.sleep", return_value=future
+    ), patch.object(batch, "io_loop", MagicMock()), patch.object(
+        batch, "start_batch", return_value=future
     ), patch.object(
         batch, "start_batch", return_value=future
     ) as start_batch_mock, patch.object(
         batch, "batch_presence_ping_timeout", None
     ):
         batch.events_channel.local_client.run_job_async.return_value = future_ret
-        # ret = batch_async.start(batch)
         ret = batch.start()
-        # assert start_batch is called
-        start_batch_mock.assert_called_once()
+        # start_batch is scheduled to be called later
+        assert batch.io_loop.call_later.call_args[0] == (
+            batch.opts["gather_job_timeout"],
+            batch.start_batch,
+        )
+        assert batch._start_batch_on_timeout is not None
 
 
 def test_batch_fire_start_event(batch):
@@ -271,34 +282,10 @@ def test_batch__event_handler_ping_return(batch):
     assert batch.done_minions == set()
 
 
-def test_batch__event_handler_call_start_batch_when_all_pings_return(batch):
-    batch.targeted_minions = {"foo"}
-    future = salt.ext.tornado.gen.Future()
-    future.set_result({})
-    with patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
-        batch.start()
-        batch._BatchAsync__event_handler(
-            "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
-        )
-        start_batch_mock.assert_called_once()
-
-
-def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch):
-    batch.targeted_minions = {"foo", "bar"}
-    future = salt.ext.tornado.gen.Future()
-    future.set_result({})
-    with patch.object(batch, "start_batch", return_value=future) as start_batch_mock:
-        batch.start()
-        batch._BatchAsync__event_handler(
-            "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return"
-        )
-        start_batch_mock.assert_not_called()
-
-
 def test_batch__event_handler_batch_run_return(batch):
     future = salt.ext.tornado.gen.Future()
     future.set_result({})
-    with patch.object(
+    with patch.object(batch, "io_loop", MagicMock()), patch.object(
         batch, "schedule_next", return_value=future
     ) as schedule_next_mock:
         batch.start()
@@ -308,7 +295,7 @@ def test_batch__event_handler_batch_run_return(batch):
         )
         assert batch.active == set()
         assert batch.done_minions == {"foo"}
-        schedule_next_mock.assert_called_once()
+        batch.io_loop.add_callback.call_args[0] == (batch.schedule_next)
 
 
 def test_batch__event_handler_find_job_return(batch):
@@ -322,9 +309,7 @@ def test_batch__event_handler_find_job_return(batch):
 def test_batch_run_next_end_batch_when_no_next(batch):
     future = salt.ext.tornado.gen.Future()
     future.set_result({})
-    with patch.object(
-        batch, "_get_next", return_value={}
-    ), patch.object(
+    with patch.object(batch, "_get_next", return_value={}), patch.object(
         batch, "end_batch", return_value=future
     ) as end_batch_mock:
         batch.run_next()
@@ -337,9 +322,7 @@ def test_batch_find_job(batch):
     batch.minions = {"foo", "bar"}
     with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
         batch, "check_find_job", return_value=future
-    ) as check_find_job_mock, patch.object(
-        batch, "jid_gen", return_value="1236"
-    ):
+    ) as check_find_job_mock, patch.object(batch, "jid_gen", return_value="1236"):
         batch.events_channel.local_client.run_job_async.return_value = future
         batch.find_job({"foo", "bar"})
         assert check_find_job_mock.call_args[0] == (
@@ -355,9 +338,7 @@ def test_batch_find_job_with_done_minions(batch):
     batch.minions = {"foo", "bar"}
     with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object(
         batch, "check_find_job", return_value=future
-    ) as check_find_job_mock, patch.object(
-        batch, "jid_gen", return_value="1236"
-    ):
+    ) as check_find_job_mock, patch.object(batch, "jid_gen", return_value="1236"):
         batch.events_channel.local_client.run_job_async.return_value = future
         batch.find_job({"foo", "bar"})
         assert check_find_job_mock.call_args[0] == (
-- 
2.48.1

openSUSE Build Service is sponsored by