File batch-async-catch-exceptions-and-safety-unregister-a.patch of Package salt
From 1606379714f4776e2b529fb1d45891266985c896 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?=
<psuarezhernandez@suse.com>
Date: Fri, 28 Feb 2020 15:11:53 +0000
Subject: [PATCH] Batch Async: Catch exceptions and safety unregister
and close instances
---
salt/cli/batch_async.py | 160 ++++++++++++++++++++++++----------------
1 file changed, 96 insertions(+), 64 deletions(-)
diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
index 1e2ac5b0d3..3dc04826d1 100644
--- a/salt/cli/batch_async.py
+++ b/salt/cli/batch_async.py
@@ -107,22 +107,25 @@ class BatchAsync:
def __event_handler(self, raw):
if not self.event:
return
- mtag, data = self.event.unpack(raw, self.event.serial)
- for (pattern, op) in self.patterns:
- if mtag.startswith(pattern[:-1]):
- minion = data["id"]
- if op == "ping_return":
- self.minions.add(minion)
- if self.targeted_minions == self.minions:
- self.event.io_loop.spawn_callback(self.start_batch)
- elif op == "find_job_return":
- if data.get("return", None):
- self.find_job_returned.add(minion)
- elif op == "batch_run":
- if minion in self.active:
- self.active.remove(minion)
- self.done_minions.add(minion)
- self.event.io_loop.spawn_callback(self.schedule_next)
+ try:
+ mtag, data = self.event.unpack(raw, self.event.serial)
+ for (pattern, op) in self.patterns:
+ if mtag.startswith(pattern[:-1]):
+ minion = data["id"]
+ if op == "ping_return":
+ self.minions.add(minion)
+ if self.targeted_minions == self.minions:
+ self.event.io_loop.spawn_callback(self.start_batch)
+ elif op == "find_job_return":
+ if data.get("return", None):
+ self.find_job_returned.add(minion)
+ elif op == "batch_run":
+ if minion in self.active:
+ self.active.remove(minion)
+ self.done_minions.add(minion)
+ self.event.io_loop.spawn_callback(self.schedule_next)
+ except Exception as ex:
+ log.error("Exception occured while processing event: {}".format(ex))
def _get_next(self):
to_run = (
@@ -154,53 +157,67 @@ class BatchAsync:
if timedout_minions:
self.schedule_next()
- if running:
+ if self.event and running:
self.find_job_returned = self.find_job_returned.difference(running)
self.event.io_loop.spawn_callback(self.find_job, running)
@tornado.gen.coroutine
def find_job(self, minions):
- not_done = minions.difference(self.done_minions).difference(
- self.timedout_minions
- )
-
- if not_done:
- jid = self.jid_gen()
- find_job_return_pattern = "salt/job/{}/ret/*".format(jid)
- self.patterns.add((find_job_return_pattern, "find_job_return"))
- self.event.subscribe(find_job_return_pattern, match_type="glob")
-
- ret = yield self.local.run_job_async(
- not_done,
- "saltutil.find_job",
- [self.batch_jid],
- "list",
- gather_job_timeout=self.opts["gather_job_timeout"],
- jid=jid,
- **self.eauth
+ if self.event:
+ not_done = minions.difference(self.done_minions).difference(
+ self.timedout_minions
)
- yield tornado.gen.sleep(self.opts["gather_job_timeout"])
- self.event.io_loop.spawn_callback(self.check_find_job, not_done, jid)
+ try:
+ if not_done:
+ jid = self.jid_gen()
+ find_job_return_pattern = "salt/job/{}/ret/*".format(jid)
+ self.patterns.add((find_job_return_pattern, "find_job_return"))
+ self.event.subscribe(find_job_return_pattern, match_type="glob")
+ ret = yield self.local.run_job_async(
+ not_done,
+ "saltutil.find_job",
+ [self.batch_jid],
+ "list",
+ gather_job_timeout=self.opts["gather_job_timeout"],
+ jid=jid,
+ **self.eauth
+ )
+ yield tornado.gen.sleep(self.opts["gather_job_timeout"])
+ if self.event:
+ self.event.io_loop.spawn_callback(
+ self.check_find_job, not_done, jid
+ )
+ except Exception as ex:
+ log.error(
+ "Exception occured handling batch async: {}. Aborting execution.".format(
+ ex
+ )
+ )
+ self.close_safe()
@tornado.gen.coroutine
def start(self):
- self.__set_event_handler()
- ping_return = yield self.local.run_job_async(
- self.opts["tgt"],
- "test.ping",
- [],
- self.opts.get("selected_target_option", self.opts.get("tgt_type", "glob")),
- gather_job_timeout=self.opts["gather_job_timeout"],
- jid=self.ping_jid,
- metadata=self.metadata,
- **self.eauth
- )
- self.targeted_minions = set(ping_return["minions"])
- # start batching even if not all minions respond to ping
- yield tornado.gen.sleep(
- self.batch_presence_ping_timeout or self.opts["gather_job_timeout"]
- )
- self.event.io_loop.spawn_callback(self.start_batch)
+ if self.event:
+ self.__set_event_handler()
+ ping_return = yield self.local.run_job_async(
+ self.opts["tgt"],
+ "test.ping",
+ [],
+ self.opts.get(
+ "selected_target_option", self.opts.get("tgt_type", "glob")
+ ),
+ gather_job_timeout=self.opts["gather_job_timeout"],
+ jid=self.ping_jid,
+ metadata=self.metadata,
+ **self.eauth
+ )
+ self.targeted_minions = set(ping_return["minions"])
+ # start batching even if not all minions respond to ping
+ yield tornado.gen.sleep(
+ self.batch_presence_ping_timeout or self.opts["gather_job_timeout"]
+ )
+ if self.event:
+ self.event.io_loop.spawn_callback(self.start_batch)
@tornado.gen.coroutine
def start_batch(self):
@@ -215,7 +232,8 @@ class BatchAsync:
ret = self.event.fire_event(
data, "salt/batch/{}/start".format(self.batch_jid)
)
- self.event.io_loop.spawn_callback(self.run_next)
+ if self.event:
+ self.event.io_loop.spawn_callback(self.run_next)
@tornado.gen.coroutine
def end_batch(self):
@@ -232,11 +250,21 @@ class BatchAsync:
"metadata": self.metadata,
}
self.event.fire_event(data, "salt/batch/{}/done".format(self.batch_jid))
- for (pattern, label) in self.patterns:
- if label in ["ping_return", "batch_run"]:
- self.event.unsubscribe(pattern, match_type="glob")
- del self
- gc.collect()
+
+ # release to the IOLoop to allow the event to be published
+ # before closing batch async execution
+ yield tornado.gen.sleep(1)
+ self.close_safe()
+
+ def close_safe(self):
+ for (pattern, label) in self.patterns:
+ self.event.unsubscribe(pattern, match_type="glob")
+ self.event.remove_event_handler(self.__event_handler)
+ self.event = None
+ self.local = None
+ self.ioloop = None
+ del self
+ gc.collect()
@tornado.gen.coroutine
def schedule_next(self):
@@ -244,7 +272,8 @@ class BatchAsync:
self.scheduled = True
# call later so that we maybe gather more returns
yield tornado.gen.sleep(self.batch_delay)
- self.event.io_loop.spawn_callback(self.run_next)
+ if self.event:
+ self.event.io_loop.spawn_callback(self.run_next)
@tornado.gen.coroutine
def run_next(self):
@@ -266,17 +295,20 @@ class BatchAsync:
)
yield tornado.gen.sleep(self.opts["timeout"])
- self.event.io_loop.spawn_callback(self.find_job, set(next_batch))
+
+ # The batch can be done already at this point, which means no self.event
+ if self.event:
+ self.event.io_loop.spawn_callback(self.find_job, set(next_batch))
except Exception as ex:
- log.error("Error in scheduling next batch: %s", ex)
+ log.error("Error in scheduling next batch: %s. Aborting execution", ex)
self.active = self.active.difference(next_batch)
+ self.close_safe()
else:
yield self.end_batch()
gc.collect()
def __del__(self):
self.local = None
- self.event.remove_event_handler(self.__event_handler)
self.event = None
self.ioloop = None
gc.collect()
--
2.29.2