We have some news to share for the request index beta feature. We’ve added more options to sort your requests, counters to the individual filters and documentation for the search functionality. Checkout the blog post for more details.

File make-salt-master-self-recoverable-on-killing-eventpu.patch of Package salt

From 794b5d1aa7b8e880e9a21940183d241c6cbde9c9 Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <vzhestkov@suse.com>
Date: Wed, 15 May 2024 09:42:23 +0200
Subject: [PATCH] Make salt-master self recoverable on killing
 EventPublisher

* Implement timeout and tries to transport.ipc.IPCClient.send

* Make timeout and tries configurable for fire_event

* Add test of timeout and tries

* Prevent exceptions from tornado Future on closing the IPC connection
---
 salt/transport/ipc.py                        | 73 +++++++++++++++++---
 salt/utils/event.py                          | 21 +++++-
 tests/pytests/unit/utils/event/test_event.py | 43 ++++++++++++
 3 files changed, 125 insertions(+), 12 deletions(-)

diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py
index cee100b086..6631781c5c 100644
--- a/salt/transport/ipc.py
+++ b/salt/transport/ipc.py
@@ -2,7 +2,6 @@
 IPC transport classes
 """
 
-
 import errno
 import logging
 import socket
@@ -340,7 +339,8 @@ class IPCClient:
             try:
                 log.trace("IPCClient: Connecting to socket: %s", self.socket_path)
                 yield self.stream.connect(sock_addr)
-                self._connecting_future.set_result(True)
+                if self._connecting_future is not None:
+                    self._connecting_future.set_result(True)
                 break
             except Exception as e:  # pylint: disable=broad-except
                 if self.stream.closed():
@@ -350,7 +350,8 @@ class IPCClient:
                     if self.stream is not None:
                         self.stream.close()
                         self.stream = None
-                    self._connecting_future.set_exception(e)
+                    if self._connecting_future is not None:
+                        self._connecting_future.set_exception(e)
                     break
 
                 yield salt.ext.tornado.gen.sleep(1)
@@ -365,7 +366,13 @@ class IPCClient:
             return
 
         self._closing = True
-        self._connecting_future = None
+        if self._connecting_future is not None:
+            try:
+                self._connecting_future.set_result(True)
+                self._connecting_future.exception()  # pylint: disable=E0203
+            except Exception as e:  # pylint: disable=broad-except
+                log.warning("Unhandled connecting exception: %s", e, exc_info=True)
+            self._connecting_future = None
 
         log.debug("Closing %s instance", self.__class__.__name__)
 
@@ -435,8 +442,6 @@ class IPCMessageClient(IPCClient):
         "close",
     ]
 
-    # FIXME timeout unimplemented
-    # FIXME tries unimplemented
     @salt.ext.tornado.gen.coroutine
     def send(self, msg, timeout=None, tries=None):
         """
@@ -445,12 +450,60 @@ class IPCMessageClient(IPCClient):
         If the socket is not currently connected, a connection will be established.
 
         :param dict msg: The message to be sent
-        :param int timeout: Timeout when sending message (Currently unimplemented)
+        :param int timeout: Timeout when sending message
+        :param int tries: Maximum numer of tries to send message
         """
-        if not self.connected():
-            yield self.connect()
+        if tries is None or tries < 1:
+            tries = 1
+        due_time = None
+        if timeout is not None:
+            due_time = time.time() + timeout
+        _try = 1
+        exc_count = 0
         pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True)
-        yield self.stream.write(pack)
+        while _try <= tries:
+            if not self.connected():
+                self.close()
+                self.stream = None
+                self._closing = False
+                try:
+                    yield self.connect(
+                        timeout=(
+                            None if due_time is None else max(due_time - time.time(), 1)
+                        )
+                    )
+                except StreamClosedError:
+                    log.warning(
+                        "IPCMessageClient: Unable to reconnect IPC stream on sending message with ID: 0x%016x%s",
+                        id(msg),
+                        f", retry {_try} of {tries}" if tries > 1 else "",
+                    )
+                    exc_count += 1
+            if self.connected():
+                try:
+                    yield self.stream.write(pack)
+                    return
+                except StreamClosedError:
+                    if self._closing:
+                        break
+                    log.warning(
+                        "IPCMessageClient: Stream was closed on sending message with ID: 0x%016x",
+                        id(msg),
+                    )
+                    exc_count += 1
+                    if exc_count == 1:
+                        # Give one more chance in case if stream was detected as closed
+                        # on the first write attempt
+                        continue
+            cur_time = time.time()
+            _try += 1
+            if _try > tries or (due_time is not None and cur_time > due_time):
+                return
+            yield salt.ext.tornado.gen.sleep(
+                1
+                if due_time is None
+                else (due_time - cur_time) / max(tries - _try + 1, 1)
+            )
 
 
 class IPCMessageServer(IPCServer):
diff --git a/salt/utils/event.py b/salt/utils/event.py
index ef048335ae..36b530d1af 100644
--- a/salt/utils/event.py
+++ b/salt/utils/event.py
@@ -270,6 +270,10 @@ class SaltEvent:
             # and don't read out events from the buffer on an on-going basis,
             # the buffer will grow resulting in big memory usage.
             self.connect_pub()
+        self.pusher_send_timeout = self.opts.get(
+            "pusher_send_timeout", self.opts.get("timeout")
+        )
+        self.pusher_send_tries = self.opts.get("pusher_send_tries", 3)
 
     @classmethod
     def __load_cache_regex(cls):
@@ -839,10 +843,18 @@ class SaltEvent:
             ]
         )
         msg = salt.utils.stringutils.to_bytes(event, "utf-8")
+        if timeout is None:
+            timeout_s = self.pusher_send_timeout
+        else:
+            timeout_s = float(timeout) / 1000
         if self._run_io_loop_sync:
             with salt.utils.asynchronous.current_ioloop(self.io_loop):
                 try:
-                    self.pusher.send(msg)
+                    self.pusher.send(
+                        msg,
+                        timeout=timeout_s,
+                        tries=self.pusher_send_tries,
+                    )
                 except Exception as exc:  # pylint: disable=broad-except
                     log.debug(
                         "Publisher send failed with exception: %s",
@@ -851,7 +863,12 @@ class SaltEvent:
                     )
                     raise
         else:
-            self.io_loop.spawn_callback(self.pusher.send, msg)
+            self.io_loop.spawn_callback(
+                self.pusher.send,
+                msg,
+                timeout=timeout_s,
+                tries=self.pusher_send_tries,
+            )
         return True
 
     def fire_master(self, data, tag, timeout=1000):
diff --git a/tests/pytests/unit/utils/event/test_event.py b/tests/pytests/unit/utils/event/test_event.py
index 3eadfaf6ba..fa9e420a93 100644
--- a/tests/pytests/unit/utils/event/test_event.py
+++ b/tests/pytests/unit/utils/event/test_event.py
@@ -447,3 +447,46 @@ def test_event_fire_ret_load():
         )
         assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org"
         assert mock_log_error.mock_calls[0].args[2] == "".join(test_traceback)
+
+
+@pytest.mark.slow_test
+def test_event_single_timeout_tries(sock_dir):
+    """Test an event is sent with timout and tries"""
+
+    write_calls_count = 0
+    real_stream_write = None
+
+    @salt.ext.tornado.gen.coroutine
+    def write_mock(pack):
+        nonlocal write_calls_count
+        nonlocal real_stream_write
+        write_calls_count += 1
+        if write_calls_count > 3:
+            yield real_stream_write(pack)
+        else:
+            raise salt.ext.tornado.iostream.StreamClosedError()
+
+    with eventpublisher_process(str(sock_dir)), salt.utils.event.MasterEvent(
+        str(sock_dir), listen=True
+    ) as me:
+        me.fire_event({"data": "foo1"}, "evt1")
+        evt1 = me.get_event(tag="evt1")
+        _assert_got_event(evt1, {"data": "foo1"})
+        real_stream_write = me.pusher.stream.write
+        with patch.object(
+            me.pusher,
+            "connected",
+            side_effect=[True, True, False, False, True, True],
+        ), patch.object(
+            me.pusher,
+            "connect",
+            side_effect=salt.ext.tornado.iostream.StreamClosedError,
+        ), patch.object(
+            me.pusher.stream,
+            "write",
+            write_mock,
+        ):
+            me.fire_event({"data": "bar2"}, "evt2", timeout=5000)
+            evt2 = me.get_event(tag="evt2")
+            _assert_got_event(evt2, {"data": "bar2"})
+            assert write_calls_count == 4
-- 
2.45.0

openSUSE Build Service is sponsored by