File make-salt-master-self-recoverable-on-killing-eventpu.patch of Package salt
From 794b5d1aa7b8e880e9a21940183d241c6cbde9c9 Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <vzhestkov@suse.com>
Date: Wed, 15 May 2024 09:42:23 +0200
Subject: [PATCH] Make salt-master self recoverable on killing
 EventPublisher
* Implement timeout and tries to transport.ipc.IPCClient.send
* Make timeout and tries configurable for fire_event
* Add test of timeout and tries
* Prevent exceptions from tornado Future on closing the IPC connection
---
 salt/transport/ipc.py                        | 73 +++++++++++++++++---
 salt/utils/event.py                          | 21 +++++-
 tests/pytests/unit/utils/event/test_event.py | 43 ++++++++++++
 3 files changed, 125 insertions(+), 12 deletions(-)
diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py
index cee100b086..6631781c5c 100644
--- a/salt/transport/ipc.py
+++ b/salt/transport/ipc.py
@@ -2,7 +2,6 @@
 IPC transport classes
 """
 
-
 import errno
 import logging
 import socket
@@ -340,7 +339,8 @@ class IPCClient:
             try:
                 log.trace("IPCClient: Connecting to socket: %s", self.socket_path)
                 yield self.stream.connect(sock_addr)
-                self._connecting_future.set_result(True)
+                if self._connecting_future is not None:
+                    self._connecting_future.set_result(True)
                 break
             except Exception as e:  # pylint: disable=broad-except
                 if self.stream.closed():
@@ -350,7 +350,8 @@ class IPCClient:
                     if self.stream is not None:
                         self.stream.close()
                         self.stream = None
-                    self._connecting_future.set_exception(e)
+                    if self._connecting_future is not None:
+                        self._connecting_future.set_exception(e)
                     break
 
                 yield salt.ext.tornado.gen.sleep(1)
@@ -365,7 +366,13 @@ class IPCClient:
             return
 
         self._closing = True
-        self._connecting_future = None
+        if self._connecting_future is not None:
+            try:
+                self._connecting_future.set_result(True)
+                self._connecting_future.exception()  # pylint: disable=E0203
+            except Exception as e:  # pylint: disable=broad-except
+                log.warning("Unhandled connecting exception: %s", e, exc_info=True)
+            self._connecting_future = None
 
         log.debug("Closing %s instance", self.__class__.__name__)
 
@@ -435,8 +442,6 @@ class IPCMessageClient(IPCClient):
         "close",
     ]
 
-    # FIXME timeout unimplemented
-    # FIXME tries unimplemented
     @salt.ext.tornado.gen.coroutine
     def send(self, msg, timeout=None, tries=None):
         """
@@ -445,12 +450,60 @@ class IPCMessageClient(IPCClient):
         If the socket is not currently connected, a connection will be established.
 
         :param dict msg: The message to be sent
-        :param int timeout: Timeout when sending message (Currently unimplemented)
+        :param int timeout: Timeout when sending message
+        :param int tries: Maximum numer of tries to send message
         """
-        if not self.connected():
-            yield self.connect()
+        if tries is None or tries < 1:
+            tries = 1
+        due_time = None
+        if timeout is not None:
+            due_time = time.time() + timeout
+        _try = 1
+        exc_count = 0
         pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True)
-        yield self.stream.write(pack)
+        while _try <= tries:
+            if not self.connected():
+                self.close()
+                self.stream = None
+                self._closing = False
+                try:
+                    yield self.connect(
+                        timeout=(
+                            None if due_time is None else max(due_time - time.time(), 1)
+                        )
+                    )
+                except StreamClosedError:
+                    log.warning(
+                        "IPCMessageClient: Unable to reconnect IPC stream on sending message with ID: 0x%016x%s",
+                        id(msg),
+                        f", retry {_try} of {tries}" if tries > 1 else "",
+                    )
+                    exc_count += 1
+            if self.connected():
+                try:
+                    yield self.stream.write(pack)
+                    return
+                except StreamClosedError:
+                    if self._closing:
+                        break
+                    log.warning(
+                        "IPCMessageClient: Stream was closed on sending message with ID: 0x%016x",
+                        id(msg),
+                    )
+                    exc_count += 1
+                    if exc_count == 1:
+                        # Give one more chance in case if stream was detected as closed
+                        # on the first write attempt
+                        continue
+            cur_time = time.time()
+            _try += 1
+            if _try > tries or (due_time is not None and cur_time > due_time):
+                return
+            yield salt.ext.tornado.gen.sleep(
+                1
+                if due_time is None
+                else (due_time - cur_time) / max(tries - _try + 1, 1)
+            )
 
 
 class IPCMessageServer(IPCServer):
diff --git a/salt/utils/event.py b/salt/utils/event.py
index ef048335ae..36b530d1af 100644
--- a/salt/utils/event.py
+++ b/salt/utils/event.py
@@ -270,6 +270,10 @@ class SaltEvent:
             # and don't read out events from the buffer on an on-going basis,
             # the buffer will grow resulting in big memory usage.
             self.connect_pub()
+        self.pusher_send_timeout = self.opts.get(
+            "pusher_send_timeout", self.opts.get("timeout")
+        )
+        self.pusher_send_tries = self.opts.get("pusher_send_tries", 3)
 
     @classmethod
     def __load_cache_regex(cls):
@@ -839,10 +843,18 @@ class SaltEvent:
             ]
         )
         msg = salt.utils.stringutils.to_bytes(event, "utf-8")
+        if timeout is None:
+            timeout_s = self.pusher_send_timeout
+        else:
+            timeout_s = float(timeout) / 1000
         if self._run_io_loop_sync:
             with salt.utils.asynchronous.current_ioloop(self.io_loop):
                 try:
-                    self.pusher.send(msg)
+                    self.pusher.send(
+                        msg,
+                        timeout=timeout_s,
+                        tries=self.pusher_send_tries,
+                    )
                 except Exception as exc:  # pylint: disable=broad-except
                     log.debug(
                         "Publisher send failed with exception: %s",
@@ -851,7 +863,12 @@ class SaltEvent:
                     )
                     raise
         else:
-            self.io_loop.spawn_callback(self.pusher.send, msg)
+            self.io_loop.spawn_callback(
+                self.pusher.send,
+                msg,
+                timeout=timeout_s,
+                tries=self.pusher_send_tries,
+            )
         return True
 
     def fire_master(self, data, tag, timeout=1000):
diff --git a/tests/pytests/unit/utils/event/test_event.py b/tests/pytests/unit/utils/event/test_event.py
index 3eadfaf6ba..fa9e420a93 100644
--- a/tests/pytests/unit/utils/event/test_event.py
+++ b/tests/pytests/unit/utils/event/test_event.py
@@ -447,3 +447,46 @@ def test_event_fire_ret_load():
         )
         assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org"
         assert mock_log_error.mock_calls[0].args[2] == "".join(test_traceback)
+
+
+@pytest.mark.slow_test
+def test_event_single_timeout_tries(sock_dir):
+    """Test an event is sent with timout and tries"""
+
+    write_calls_count = 0
+    real_stream_write = None
+
+    @salt.ext.tornado.gen.coroutine
+    def write_mock(pack):
+        nonlocal write_calls_count
+        nonlocal real_stream_write
+        write_calls_count += 1
+        if write_calls_count > 3:
+            yield real_stream_write(pack)
+        else:
+            raise salt.ext.tornado.iostream.StreamClosedError()
+
+    with eventpublisher_process(str(sock_dir)), salt.utils.event.MasterEvent(
+        str(sock_dir), listen=True
+    ) as me:
+        me.fire_event({"data": "foo1"}, "evt1")
+        evt1 = me.get_event(tag="evt1")
+        _assert_got_event(evt1, {"data": "foo1"})
+        real_stream_write = me.pusher.stream.write
+        with patch.object(
+            me.pusher,
+            "connected",
+            side_effect=[True, True, False, False, True, True],
+        ), patch.object(
+            me.pusher,
+            "connect",
+            side_effect=salt.ext.tornado.iostream.StreamClosedError,
+        ), patch.object(
+            me.pusher.stream,
+            "write",
+            write_mock,
+        ):
+            me.fire_event({"data": "bar2"}, "evt2", timeout=5000)
+            evt2 = me.get_event(tag="evt2")
+            _assert_got_event(evt2, {"data": "bar2"})
+            assert write_calls_count == 4
-- 
2.45.0