File make-reactor-engine-less-blocking-the-eventpublisher.patch of Package salt

From 0d35f09288700f5c961567442c3fcc25838b8de4 Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <vzhestkov@suse.com>
Date: Wed, 15 May 2024 09:44:21 +0200
Subject: [PATCH] Make reactor engine less blocking the EventPublisher

---
 salt/utils/reactor.py | 45 +++++++++++++++++++++++++++----------------
 1 file changed, 28 insertions(+), 17 deletions(-)

diff --git a/salt/utils/reactor.py b/salt/utils/reactor.py
index 19420a51cf..78adad34da 100644
--- a/salt/utils/reactor.py
+++ b/salt/utils/reactor.py
@@ -1,10 +1,12 @@
 """
 Functions which implement running reactor jobs
 """
+
 import fnmatch
 import glob
 import logging
 import os
+from threading import Lock
 
 import salt.client
 import salt.defaults.exitcodes
@@ -194,13 +196,6 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
         self.resolve_aliases(chunks)
         return chunks
 
-    def call_reactions(self, chunks):
-        """
-        Execute the reaction state
-        """
-        for chunk in chunks:
-            self.wrap.run(chunk)
-
     def run(self):
         """
         Enter into the server loop
@@ -218,7 +213,7 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
         ) as event:
             self.wrap = ReactWrap(self.opts)
 
-            for data in event.iter_events(full=True):
+            for data in event.iter_events(full=True, auto_reconnect=True):
                 # skip all events fired by ourselves
                 if data["data"].get("user") == self.wrap.event_user:
                     continue
@@ -268,15 +263,9 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
                     if not self.is_leader:
                         continue
                     else:
-                        reactors = self.list_reactors(data["tag"])
-                        if not reactors:
-                            continue
-                        chunks = self.reactions(data["tag"], data["data"], reactors)
-                        if chunks:
-                            try:
-                                self.call_reactions(chunks)
-                            except SystemExit:
-                                log.warning("Exit ignored by reactor")
+                        self.wrap.call_reactions(
+                            data, self.list_reactors, self.reactions
+                        )
 
 
 class ReactWrap:
@@ -297,6 +286,7 @@ class ReactWrap:
 
     def __init__(self, opts):
         self.opts = opts
+        self._run_lock = Lock()
         if ReactWrap.client_cache is None:
             ReactWrap.client_cache = salt.utils.cache.CacheDict(
                 opts["reactor_refresh_interval"]
@@ -480,3 +470,24 @@ class ReactWrap:
         Wrap LocalCaller to execute remote exec functions locally on the Minion
         """
         self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"])
+
+    def _call_reactions(self, data, list_reactors, get_reactions):
+        reactors = list_reactors(data["tag"])
+        if not reactors:
+            return
+        chunks = get_reactions(data["tag"], data["data"], reactors)
+        if not chunks:
+            return
+        with self._run_lock:
+            try:
+                for chunk in chunks:
+                    self.run(chunk)
+            except Exception as exc:  # pylint: disable=broad-except
+                log.error(
+                    "Exception while calling the reactions: %s", exc, exc_info=True
+                )
+
+    def call_reactions(self, data, list_reactors, get_reactions):
+        return self.pool.fire_async(
+            self._call_reactions, args=(data, list_reactors, get_reactions)
+        )
-- 
2.45.0

openSUSE Build Service is sponsored by