File migrate-batch_async-to-upstream-tornado.patch of Package salt
From 8be046aee09b7ac36b497921065b20b7a70516cf Mon Sep 17 00:00:00 2001
From: Marek Czernek <marek.czernek@suse.com>
Date: Wed, 16 Jul 2025 14:32:56 +0200
Subject: [PATCH] Migrate batch_async to upstream tornado
---
salt/cli/batch_async.py | 38 +++++++++++++++++++-------------------
1 file changed, 19 insertions(+), 19 deletions(-)
diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
index 92215d0e04a..9e0ecddda54 100644
--- a/salt/cli/batch_async.py
+++ b/salt/cli/batch_async.py
@@ -6,10 +6,10 @@ import logging
import re
import salt.client
-import salt.ext.tornado
+import tornado
import salt.utils.event
from salt.cli.batch import batch_get_eauth, batch_get_opts, get_bnum
-from salt.ext.tornado.iostream import StreamClosedError
+from tornado.iostream import StreamClosedError
log = logging.getLogger(__name__)
@@ -111,14 +111,14 @@ class SharedEventsChannel:
if not self._subscribers[subscriber_id]:
del self._subscribers[subscriber_id]
- @salt.ext.tornado.gen.coroutine
+ @tornado.gen.coroutine
def __handle_close(self):
if not self._subscriptions:
return
log.warning("Master Event Subscriber was closed. Trying to reconnect...")
yield self.__reconnect_subscriber()
- @salt.ext.tornado.gen.coroutine
+ @tornado.gen.coroutine
def __handle_event(self, raw):
if self.master_event is None:
return
@@ -138,7 +138,7 @@ class SharedEventsChannel:
exc_info=True,
)
- @salt.ext.tornado.gen.coroutine
+ @tornado.gen.coroutine
def __reconnect_subscriber(self):
if self.master_event.subscriber.connected() or self._reconnecting_subscriber:
return
@@ -167,7 +167,7 @@ class SharedEventsChannel:
self._reconnecting_subscriber = False
return
if _try < max_tries:
- yield salt.ext.tornado.gen.sleep(self._subscriber_reconnect_interval)
+ yield tornado.gen.sleep(self._subscriber_reconnect_interval)
_try += 1
self._reconnecting_subscriber = False
@@ -229,7 +229,7 @@ class BatchAsync:
self.extra_job_kwargs[kwarg] = kwargs[kwarg]
elif kwarg in opts:
self.extra_job_kwargs[kwarg] = opts[kwarg]
- self.io_loop = salt.ext.tornado.ioloop.IOLoop.current()
+ self.io_loop = tornado.ioloop.IOLoop.current()
self.events_channel = _get_shared_events_channel(opts, self.io_loop).use(
id(self)
)
@@ -278,7 +278,7 @@ class BatchAsync:
self.batch_jid, "batch_run", id(self), self.__event_handler
)
- @salt.ext.tornado.gen.coroutine
+ @tornado.gen.coroutine
def __event_handler(self, tag, data, op):
# IMPORTANT: This function must run fast and not wait for any other task,
# otherwise it would cause events to be stuck.
@@ -321,7 +321,7 @@ class BatchAsync:
)
return set(list(to_run)[:next_batch_size])
- @salt.ext.tornado.gen.coroutine
+ @tornado.gen.coroutine
def check_find_job(self, batch_minions, jid):
"""
Check if the job with specified ``jid`` was finished on the minions
@@ -346,7 +346,7 @@ class BatchAsync:
self.find_job_returned = self.find_job_returned.difference(running)
yield self.find_job(running)
- @salt.ext.tornado.gen.coroutine
+ @tornado.gen.coroutine
def find_job(self, minions):
"""
Find if the job was finished on the minions
@@ -377,7 +377,7 @@ class BatchAsync:
listen=False,
**self.eauth,
)
- yield salt.ext.tornado.gen.sleep(self.opts["gather_job_timeout"])
+ yield tornado.gen.sleep(self.opts["gather_job_timeout"])
if self.event:
yield self.check_find_job(not_done, jid)
except Exception as ex: # pylint: disable=W0703
@@ -388,7 +388,7 @@ class BatchAsync:
)
self.close_safe()
- @salt.ext.tornado.gen.coroutine
+ @tornado.gen.coroutine
def start(self):
"""
Start the batch execution
@@ -419,7 +419,7 @@ class BatchAsync:
self.start_batch,
)
- @salt.ext.tornado.gen.coroutine
+ @tornado.gen.coroutine
def start_batch(self):
"""
Fire `salt/batch/*/start` and continue batch with `run_next`
@@ -442,7 +442,7 @@ class BatchAsync:
if self.event:
yield self.run_next()
- @salt.ext.tornado.gen.coroutine
+ @tornado.gen.coroutine
def end_batch(self):
"""
End the batch and call safe closing
@@ -469,7 +469,7 @@ class BatchAsync:
# release to the IOLoop to allow the event to be published
# before closing batch async execution
- yield salt.ext.tornado.gen.sleep(0.03)
+ yield tornado.gen.sleep(0.03)
self.close_safe()
def close_safe(self):
@@ -481,7 +481,7 @@ class BatchAsync:
_destroy_unused_shared_events_channel()
self.event = None
- @salt.ext.tornado.gen.coroutine
+ @tornado.gen.coroutine
def schedule_next(self):
log.trace("[%s] BatchAsync.schedule_next called", self.batch_jid)
if self.scheduled:
@@ -498,11 +498,11 @@ class BatchAsync:
self.batch_jid,
self.batch_delay,
)
- yield salt.ext.tornado.gen.sleep(self.batch_delay)
+ yield tornado.gen.sleep(self.batch_delay)
if self.event:
yield self.run_next()
- @salt.ext.tornado.gen.coroutine
+ @tornado.gen.coroutine
def run_next(self):
"""
Continue batch execution with the next targets
@@ -535,7 +535,7 @@ class BatchAsync:
**self.extra_job_kwargs,
)
- yield salt.ext.tornado.gen.sleep(self.opts["timeout"])
+ yield tornado.gen.sleep(self.opts["timeout"])
# The batch can be done already at this point, which means no self.event
if self.event and self.active.intersection(next_batch):
--
2.50.0