File batch-async-catch-exceptions-and-safety-unregister-a.patch of Package salt

From c5edf396ffd66b6ac1479aa01367aae3eff7683d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?=
 <psuarezhernandez@suse.com>
Date: Fri, 28 Feb 2020 15:11:53 +0000
Subject: [PATCH] Batch Async: Catch exceptions and safety unregister and
 close instances

---
 salt/cli/batch_async.py | 156 +++++++++++++++++++++++-----------------
 1 file changed, 89 insertions(+), 67 deletions(-)

diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
index da069b64bd..b8f272ed67 100644
--- a/salt/cli/batch_async.py
+++ b/salt/cli/batch_async.py
@@ -13,7 +13,6 @@ import salt.client
 
 # pylint: enable=import-error,no-name-in-module,redefined-builtin
 import logging
-import fnmatch
 
 log = logging.getLogger(__name__)
 
@@ -104,22 +103,25 @@ class BatchAsync(object):
     def __event_handler(self, raw):
         if not self.event:
             return
-        mtag, data = self.event.unpack(raw, self.event.serial)
-        for (pattern, op) in self.patterns:
-            if mtag.startswith(pattern[:-1]):
-                minion = data['id']
-                if op == 'ping_return':
-                    self.minions.add(minion)
-                    if self.targeted_minions == self.minions:
-                        self.event.io_loop.spawn_callback(self.start_batch)
-                elif op == 'find_job_return':
-                    if data.get("return", None):
-                        self.find_job_returned.add(minion)
-                elif op == 'batch_run':
-                    if minion in self.active:
-                        self.active.remove(minion)
-                        self.done_minions.add(minion)
-                        self.event.io_loop.spawn_callback(self.schedule_next)
+        try:
+            mtag, data = self.event.unpack(raw, self.event.serial)
+            for (pattern, op) in self.patterns:
+                if mtag.startswith(pattern[:-1]):
+                    minion = data['id']
+                    if op == 'ping_return':
+                        self.minions.add(minion)
+                        if self.targeted_minions == self.minions:
+                            self.event.io_loop.spawn_callback(self.start_batch)
+                    elif op == 'find_job_return':
+                        if data.get("return", None):
+                            self.find_job_returned.add(minion)
+                    elif op == 'batch_run':
+                        if minion in self.active:
+                            self.active.remove(minion)
+                            self.done_minions.add(minion)
+                            self.event.io_loop.spawn_callback(self.schedule_next)
+        except Exception as ex:
+           log.error("Exception occured while processing event: {}".format(ex))
 
     def _get_next(self):
         to_run = self.minions.difference(
@@ -146,54 +148,59 @@ class BatchAsync(object):
             if timedout_minions:
                 self.schedule_next()
 
-            if running:
+            if self.event and running:
                 self.find_job_returned = self.find_job_returned.difference(running)
                 self.event.io_loop.spawn_callback(self.find_job, running)
 
     @tornado.gen.coroutine
     def find_job(self, minions):
-        not_done = minions.difference(self.done_minions).difference(self.timedout_minions)
-
-        if not_done:
-            jid = self.jid_gen()
-            find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid)
-            self.patterns.add((find_job_return_pattern, "find_job_return"))
-            self.event.subscribe(find_job_return_pattern, match_type='glob')
-
-            ret = yield self.local.run_job_async(
-                not_done,
-                'saltutil.find_job',
-                [self.batch_jid],
-                'list',
-                gather_job_timeout=self.opts['gather_job_timeout'],
-                jid=jid,
-                **self.eauth)
-            yield tornado.gen.sleep(self.opts['gather_job_timeout'])
-            self.event.io_loop.spawn_callback(
-                self.check_find_job,
-                not_done,
-                jid)
+        if self.event:
+            not_done = minions.difference(self.done_minions).difference(self.timedout_minions)
+            try:
+                if not_done:
+                    jid = self.jid_gen()
+                    find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid)
+                    self.patterns.add((find_job_return_pattern, "find_job_return"))
+                    self.event.subscribe(find_job_return_pattern, match_type='glob')
+                    ret = yield self.local.run_job_async(
+                        not_done,
+                        'saltutil.find_job',
+                        [self.batch_jid],
+                        'list',
+                        gather_job_timeout=self.opts['gather_job_timeout'],
+                        jid=jid,
+                        **self.eauth)
+                    yield tornado.gen.sleep(self.opts['gather_job_timeout'])
+                    if self.event:
+                        self.event.io_loop.spawn_callback(
+                            self.check_find_job,
+                            not_done,
+                            jid)
+            except Exception as ex:
+                log.error("Exception occured handling batch async: {}. Aborting execution.".format(ex))
+                self.close_safe()
 
     @tornado.gen.coroutine
     def start(self):
-        self.__set_event_handler()
-        ping_return = yield self.local.run_job_async(
-            self.opts['tgt'],
-            'test.ping',
-            [],
-            self.opts.get(
-                'selected_target_option',
-                self.opts.get('tgt_type', 'glob')
-            ),
-            gather_job_timeout=self.opts['gather_job_timeout'],
-            jid=self.ping_jid,
-            metadata=self.metadata,
-            **self.eauth)
-        self.targeted_minions = set(ping_return['minions'])
-        #start batching even if not all minions respond to ping
-        yield tornado.gen.sleep(self.batch_presence_ping_timeout or self.opts['gather_job_timeout'])
-        self.event.io_loop.spawn_callback(self.start_batch)
-
+        if self.event:
+            self.__set_event_handler()
+            ping_return = yield self.local.run_job_async(
+                self.opts['tgt'],
+                'test.ping',
+                [],
+                self.opts.get(
+                    'selected_target_option',
+                    self.opts.get('tgt_type', 'glob')
+                ),
+                gather_job_timeout=self.opts['gather_job_timeout'],
+                jid=self.ping_jid,
+                metadata=self.metadata,
+                **self.eauth)
+            self.targeted_minions = set(ping_return['minions'])
+            #start batching even if not all minions respond to ping
+            yield tornado.gen.sleep(self.batch_presence_ping_timeout or self.opts['gather_job_timeout'])
+            if self.event:
+                self.event.io_loop.spawn_callback(self.start_batch)
 
     @tornado.gen.coroutine
     def start_batch(self):
@@ -206,7 +213,8 @@ class BatchAsync(object):
                 "metadata": self.metadata
             }
             ret = self.event.fire_event(data, "salt/batch/{0}/start".format(self.batch_jid))
-            self.event.io_loop.spawn_callback(self.run_next)
+            if self.event:
+                self.event.io_loop.spawn_callback(self.run_next)
 
     @tornado.gen.coroutine
     def end_batch(self):
@@ -221,11 +229,21 @@ class BatchAsync(object):
                 "metadata": self.metadata
             }
             self.event.fire_event(data, "salt/batch/{0}/done".format(self.batch_jid))
-            for (pattern, label) in self.patterns:
-                if label in ["ping_return", "batch_run"]:
-                    self.event.unsubscribe(pattern, match_type='glob')
-            del self
-            gc.collect()
+
+            # release to the IOLoop to allow the event to be published
+            # before closing batch async execution
+            yield tornado.gen.sleep(1)
+            self.close_safe()
+
+    def close_safe(self):
+       for (pattern, label) in self.patterns:
+           self.event.unsubscribe(pattern, match_type='glob')
+       self.event.remove_event_handler(self.__event_handler)
+       self.event = None
+       self.local = None
+       self.ioloop = None
+       del self
+       gc.collect()
 
     @tornado.gen.coroutine
     def schedule_next(self):
@@ -233,7 +251,8 @@ class BatchAsync(object):
             self.scheduled = True
             # call later so that we maybe gather more returns
             yield tornado.gen.sleep(self.batch_delay)
-            self.event.io_loop.spawn_callback(self.run_next)
+            if self.event:
+                self.event.io_loop.spawn_callback(self.run_next)
 
     @tornado.gen.coroutine
     def run_next(self):
@@ -254,17 +273,20 @@ class BatchAsync(object):
                     metadata=self.metadata)
 
                 yield tornado.gen.sleep(self.opts['timeout'])
-                self.event.io_loop.spawn_callback(self.find_job, set(next_batch))
+
+                # The batch can be done already at this point, which means no self.event
+                if self.event:
+                    self.event.io_loop.spawn_callback(self.find_job, set(next_batch))
             except Exception as ex:
-                log.error("Error in scheduling next batch: %s", ex)
+                log.error("Error in scheduling next batch: %s. Aborting execution", ex)
                 self.active = self.active.difference(next_batch)
+                self.close_safe()
         else:
             yield self.end_batch()
         gc.collect()
 
     def __del__(self):
         self.local = None
-        self.event.remove_event_handler(self.__event_handler)
         self.event = None
         self.ioloop = None
         gc.collect()
-- 
2.23.0