File make-salt-master-self-recoverable-on-killing-eventpu.patch of Package salt
From 794b5d1aa7b8e880e9a21940183d241c6cbde9c9 Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <vzhestkov@suse.com>
Date: Wed, 15 May 2024 09:42:23 +0200
Subject: [PATCH] Make salt-master self recoverable on killing
EventPublisher
* Implement timeout and tries to transport.ipc.IPCClient.send
* Make timeout and tries configurable for fire_event
* Add test of timeout and tries
* Prevent exceptions from tornado Future on closing the IPC connection
---
salt/transport/ipc.py | 73 +++++++++++++++++---
salt/utils/event.py | 21 +++++-
tests/pytests/unit/utils/event/test_event.py | 43 ++++++++++++
3 files changed, 125 insertions(+), 12 deletions(-)
diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py
index cee100b086..6631781c5c 100644
--- a/salt/transport/ipc.py
+++ b/salt/transport/ipc.py
@@ -2,7 +2,6 @@
IPC transport classes
"""
-
import errno
import logging
import socket
@@ -340,7 +339,8 @@ class IPCClient:
try:
log.trace("IPCClient: Connecting to socket: %s", self.socket_path)
yield self.stream.connect(sock_addr)
- self._connecting_future.set_result(True)
+ if self._connecting_future is not None:
+ self._connecting_future.set_result(True)
break
except Exception as e: # pylint: disable=broad-except
if self.stream.closed():
@@ -350,7 +350,8 @@ class IPCClient:
if self.stream is not None:
self.stream.close()
self.stream = None
- self._connecting_future.set_exception(e)
+ if self._connecting_future is not None:
+ self._connecting_future.set_exception(e)
break
yield salt.ext.tornado.gen.sleep(1)
@@ -365,7 +366,13 @@ class IPCClient:
return
self._closing = True
- self._connecting_future = None
+ if self._connecting_future is not None:
+ try:
+ self._connecting_future.set_result(True)
+ self._connecting_future.exception() # pylint: disable=E0203
+ except Exception as e: # pylint: disable=broad-except
+ log.warning("Unhandled connecting exception: %s", e, exc_info=True)
+ self._connecting_future = None
log.debug("Closing %s instance", self.__class__.__name__)
@@ -435,8 +442,6 @@ class IPCMessageClient(IPCClient):
"close",
]
- # FIXME timeout unimplemented
- # FIXME tries unimplemented
@salt.ext.tornado.gen.coroutine
def send(self, msg, timeout=None, tries=None):
"""
@@ -445,12 +450,60 @@ class IPCMessageClient(IPCClient):
If the socket is not currently connected, a connection will be established.
:param dict msg: The message to be sent
- :param int timeout: Timeout when sending message (Currently unimplemented)
+ :param int timeout: Timeout when sending message
+ :param int tries: Maximum numer of tries to send message
"""
- if not self.connected():
- yield self.connect()
+ if tries is None or tries < 1:
+ tries = 1
+ due_time = None
+ if timeout is not None:
+ due_time = time.time() + timeout
+ _try = 1
+ exc_count = 0
pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True)
- yield self.stream.write(pack)
+ while _try <= tries:
+ if not self.connected():
+ self.close()
+ self.stream = None
+ self._closing = False
+ try:
+ yield self.connect(
+ timeout=(
+ None if due_time is None else max(due_time - time.time(), 1)
+ )
+ )
+ except StreamClosedError:
+ log.warning(
+ "IPCMessageClient: Unable to reconnect IPC stream on sending message with ID: 0x%016x%s",
+ id(msg),
+ f", retry {_try} of {tries}" if tries > 1 else "",
+ )
+ exc_count += 1
+ if self.connected():
+ try:
+ yield self.stream.write(pack)
+ return
+ except StreamClosedError:
+ if self._closing:
+ break
+ log.warning(
+ "IPCMessageClient: Stream was closed on sending message with ID: 0x%016x",
+ id(msg),
+ )
+ exc_count += 1
+ if exc_count == 1:
+ # Give one more chance in case if stream was detected as closed
+ # on the first write attempt
+ continue
+ cur_time = time.time()
+ _try += 1
+ if _try > tries or (due_time is not None and cur_time > due_time):
+ return
+ yield salt.ext.tornado.gen.sleep(
+ 1
+ if due_time is None
+ else (due_time - cur_time) / max(tries - _try + 1, 1)
+ )
class IPCMessageServer(IPCServer):
diff --git a/salt/utils/event.py b/salt/utils/event.py
index ef048335ae..36b530d1af 100644
--- a/salt/utils/event.py
+++ b/salt/utils/event.py
@@ -270,6 +270,10 @@ class SaltEvent:
# and don't read out events from the buffer on an on-going basis,
# the buffer will grow resulting in big memory usage.
self.connect_pub()
+ self.pusher_send_timeout = self.opts.get(
+ "pusher_send_timeout", self.opts.get("timeout")
+ )
+ self.pusher_send_tries = self.opts.get("pusher_send_tries", 3)
@classmethod
def __load_cache_regex(cls):
@@ -839,10 +843,18 @@ class SaltEvent:
]
)
msg = salt.utils.stringutils.to_bytes(event, "utf-8")
+ if timeout is None:
+ timeout_s = self.pusher_send_timeout
+ else:
+ timeout_s = float(timeout) / 1000
if self._run_io_loop_sync:
with salt.utils.asynchronous.current_ioloop(self.io_loop):
try:
- self.pusher.send(msg)
+ self.pusher.send(
+ msg,
+ timeout=timeout_s,
+ tries=self.pusher_send_tries,
+ )
except Exception as exc: # pylint: disable=broad-except
log.debug(
"Publisher send failed with exception: %s",
@@ -851,7 +863,12 @@ class SaltEvent:
)
raise
else:
- self.io_loop.spawn_callback(self.pusher.send, msg)
+ self.io_loop.spawn_callback(
+ self.pusher.send,
+ msg,
+ timeout=timeout_s,
+ tries=self.pusher_send_tries,
+ )
return True
def fire_master(self, data, tag, timeout=1000):
diff --git a/tests/pytests/unit/utils/event/test_event.py b/tests/pytests/unit/utils/event/test_event.py
index 3eadfaf6ba..fa9e420a93 100644
--- a/tests/pytests/unit/utils/event/test_event.py
+++ b/tests/pytests/unit/utils/event/test_event.py
@@ -447,3 +447,46 @@ def test_event_fire_ret_load():
)
assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org"
assert mock_log_error.mock_calls[0].args[2] == "".join(test_traceback)
+
+
+@pytest.mark.slow_test
+def test_event_single_timeout_tries(sock_dir):
+ """Test an event is sent with timout and tries"""
+
+ write_calls_count = 0
+ real_stream_write = None
+
+ @salt.ext.tornado.gen.coroutine
+ def write_mock(pack):
+ nonlocal write_calls_count
+ nonlocal real_stream_write
+ write_calls_count += 1
+ if write_calls_count > 3:
+ yield real_stream_write(pack)
+ else:
+ raise salt.ext.tornado.iostream.StreamClosedError()
+
+ with eventpublisher_process(str(sock_dir)), salt.utils.event.MasterEvent(
+ str(sock_dir), listen=True
+ ) as me:
+ me.fire_event({"data": "foo1"}, "evt1")
+ evt1 = me.get_event(tag="evt1")
+ _assert_got_event(evt1, {"data": "foo1"})
+ real_stream_write = me.pusher.stream.write
+ with patch.object(
+ me.pusher,
+ "connected",
+ side_effect=[True, True, False, False, True, True],
+ ), patch.object(
+ me.pusher,
+ "connect",
+ side_effect=salt.ext.tornado.iostream.StreamClosedError,
+ ), patch.object(
+ me.pusher.stream,
+ "write",
+ write_mock,
+ ):
+ me.fire_event({"data": "bar2"}, "evt2", timeout=5000)
+ evt2 = me.get_event(tag="evt2")
+ _assert_got_event(evt2, {"data": "bar2"})
+ assert write_calls_count == 4
--
2.45.0