File batch-async-catch-exceptions-and-safety-unregister-a.patch of Package salt.18368
From c5edf396ffd66b6ac1479aa01367aae3eff7683d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?=
<psuarezhernandez@suse.com>
Date: Fri, 28 Feb 2020 15:11:53 +0000
Subject: [PATCH] Batch Async: Catch exceptions and safety unregister and
close instances
---
salt/cli/batch_async.py | 156 +++++++++++++++++++++++-----------------
1 file changed, 89 insertions(+), 67 deletions(-)
diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
index da069b64bd..b8f272ed67 100644
--- a/salt/cli/batch_async.py
+++ b/salt/cli/batch_async.py
@@ -13,7 +13,6 @@ import salt.client
# pylint: enable=import-error,no-name-in-module,redefined-builtin
import logging
-import fnmatch
log = logging.getLogger(__name__)
@@ -104,22 +103,25 @@ class BatchAsync(object):
def __event_handler(self, raw):
if not self.event:
return
- mtag, data = self.event.unpack(raw, self.event.serial)
- for (pattern, op) in self.patterns:
- if mtag.startswith(pattern[:-1]):
- minion = data['id']
- if op == 'ping_return':
- self.minions.add(minion)
- if self.targeted_minions == self.minions:
- self.event.io_loop.spawn_callback(self.start_batch)
- elif op == 'find_job_return':
- if data.get("return", None):
- self.find_job_returned.add(minion)
- elif op == 'batch_run':
- if minion in self.active:
- self.active.remove(minion)
- self.done_minions.add(minion)
- self.event.io_loop.spawn_callback(self.schedule_next)
+ try:
+ mtag, data = self.event.unpack(raw, self.event.serial)
+ for (pattern, op) in self.patterns:
+ if mtag.startswith(pattern[:-1]):
+ minion = data['id']
+ if op == 'ping_return':
+ self.minions.add(minion)
+ if self.targeted_minions == self.minions:
+ self.event.io_loop.spawn_callback(self.start_batch)
+ elif op == 'find_job_return':
+ if data.get("return", None):
+ self.find_job_returned.add(minion)
+ elif op == 'batch_run':
+ if minion in self.active:
+ self.active.remove(minion)
+ self.done_minions.add(minion)
+ self.event.io_loop.spawn_callback(self.schedule_next)
+ except Exception as ex:
+ log.error("Exception occured while processing event: {}".format(ex))
def _get_next(self):
to_run = self.minions.difference(
@@ -146,54 +148,59 @@ class BatchAsync(object):
if timedout_minions:
self.schedule_next()
- if running:
+ if self.event and running:
self.find_job_returned = self.find_job_returned.difference(running)
self.event.io_loop.spawn_callback(self.find_job, running)
@tornado.gen.coroutine
def find_job(self, minions):
- not_done = minions.difference(self.done_minions).difference(self.timedout_minions)
-
- if not_done:
- jid = self.jid_gen()
- find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid)
- self.patterns.add((find_job_return_pattern, "find_job_return"))
- self.event.subscribe(find_job_return_pattern, match_type='glob')
-
- ret = yield self.local.run_job_async(
- not_done,
- 'saltutil.find_job',
- [self.batch_jid],
- 'list',
- gather_job_timeout=self.opts['gather_job_timeout'],
- jid=jid,
- **self.eauth)
- yield tornado.gen.sleep(self.opts['gather_job_timeout'])
- self.event.io_loop.spawn_callback(
- self.check_find_job,
- not_done,
- jid)
+ if self.event:
+ not_done = minions.difference(self.done_minions).difference(self.timedout_minions)
+ try:
+ if not_done:
+ jid = self.jid_gen()
+ find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid)
+ self.patterns.add((find_job_return_pattern, "find_job_return"))
+ self.event.subscribe(find_job_return_pattern, match_type='glob')
+ ret = yield self.local.run_job_async(
+ not_done,
+ 'saltutil.find_job',
+ [self.batch_jid],
+ 'list',
+ gather_job_timeout=self.opts['gather_job_timeout'],
+ jid=jid,
+ **self.eauth)
+ yield tornado.gen.sleep(self.opts['gather_job_timeout'])
+ if self.event:
+ self.event.io_loop.spawn_callback(
+ self.check_find_job,
+ not_done,
+ jid)
+ except Exception as ex:
+ log.error("Exception occured handling batch async: {}. Aborting execution.".format(ex))
+ self.close_safe()
@tornado.gen.coroutine
def start(self):
- self.__set_event_handler()
- ping_return = yield self.local.run_job_async(
- self.opts['tgt'],
- 'test.ping',
- [],
- self.opts.get(
- 'selected_target_option',
- self.opts.get('tgt_type', 'glob')
- ),
- gather_job_timeout=self.opts['gather_job_timeout'],
- jid=self.ping_jid,
- metadata=self.metadata,
- **self.eauth)
- self.targeted_minions = set(ping_return['minions'])
- #start batching even if not all minions respond to ping
- yield tornado.gen.sleep(self.batch_presence_ping_timeout or self.opts['gather_job_timeout'])
- self.event.io_loop.spawn_callback(self.start_batch)
-
+ if self.event:
+ self.__set_event_handler()
+ ping_return = yield self.local.run_job_async(
+ self.opts['tgt'],
+ 'test.ping',
+ [],
+ self.opts.get(
+ 'selected_target_option',
+ self.opts.get('tgt_type', 'glob')
+ ),
+ gather_job_timeout=self.opts['gather_job_timeout'],
+ jid=self.ping_jid,
+ metadata=self.metadata,
+ **self.eauth)
+ self.targeted_minions = set(ping_return['minions'])
+ #start batching even if not all minions respond to ping
+ yield tornado.gen.sleep(self.batch_presence_ping_timeout or self.opts['gather_job_timeout'])
+ if self.event:
+ self.event.io_loop.spawn_callback(self.start_batch)
@tornado.gen.coroutine
def start_batch(self):
@@ -206,7 +213,8 @@ class BatchAsync(object):
"metadata": self.metadata
}
ret = self.event.fire_event(data, "salt/batch/{0}/start".format(self.batch_jid))
- self.event.io_loop.spawn_callback(self.run_next)
+ if self.event:
+ self.event.io_loop.spawn_callback(self.run_next)
@tornado.gen.coroutine
def end_batch(self):
@@ -221,11 +229,21 @@ class BatchAsync(object):
"metadata": self.metadata
}
self.event.fire_event(data, "salt/batch/{0}/done".format(self.batch_jid))
- for (pattern, label) in self.patterns:
- if label in ["ping_return", "batch_run"]:
- self.event.unsubscribe(pattern, match_type='glob')
- del self
- gc.collect()
+
+ # release to the IOLoop to allow the event to be published
+ # before closing batch async execution
+ yield tornado.gen.sleep(1)
+ self.close_safe()
+
+ def close_safe(self):
+ for (pattern, label) in self.patterns:
+ self.event.unsubscribe(pattern, match_type='glob')
+ self.event.remove_event_handler(self.__event_handler)
+ self.event = None
+ self.local = None
+ self.ioloop = None
+ del self
+ gc.collect()
@tornado.gen.coroutine
def schedule_next(self):
@@ -233,7 +251,8 @@ class BatchAsync(object):
self.scheduled = True
# call later so that we maybe gather more returns
yield tornado.gen.sleep(self.batch_delay)
- self.event.io_loop.spawn_callback(self.run_next)
+ if self.event:
+ self.event.io_loop.spawn_callback(self.run_next)
@tornado.gen.coroutine
def run_next(self):
@@ -254,17 +273,20 @@ class BatchAsync(object):
metadata=self.metadata)
yield tornado.gen.sleep(self.opts['timeout'])
- self.event.io_loop.spawn_callback(self.find_job, set(next_batch))
+
+ # The batch can be done already at this point, which means no self.event
+ if self.event:
+ self.event.io_loop.spawn_callback(self.find_job, set(next_batch))
except Exception as ex:
- log.error("Error in scheduling next batch: %s", ex)
+ log.error("Error in scheduling next batch: %s. Aborting execution", ex)
self.active = self.active.difference(next_batch)
+ self.close_safe()
else:
yield self.end_batch()
gc.collect()
def __del__(self):
self.local = None
- self.event.remove_event_handler(self.__event_handler)
self.event = None
self.ioloop = None
gc.collect()
--
2.23.0