File fix-memory-leak-produced-by-batch-async-find_jobs-me.patch of Package salt.21002
From 00c538383e463febba492e74577ae64be80d4d00 Mon Sep 17 00:00:00 2001
From: Mihai Dinca <mdinca@suse.de>
Date: Mon, 16 Sep 2019 11:27:30 +0200
Subject: [PATCH] Fix memory leak produced by batch async find_jobs
mechanism (bsc#1140912)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Multiple fixes:
- use different JIDs per find_job
- fix bug in detection of find_job returns
- fix timeout passed from request payload
- better cleanup at the end of batching
Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com>
---
salt/cli/batch_async.py | 59 ++++++++++++++++++++++++++++-------------
salt/client/__init__.py | 1 +
salt/master.py | 2 --
3 files changed, 41 insertions(+), 21 deletions(-)
diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py
index 7225491228..388b709416 100644
--- a/salt/cli/batch_async.py
+++ b/salt/cli/batch_async.py
@@ -73,6 +73,7 @@ class BatchAsync:
self.done_minions = set()
self.active = set()
self.initialized = False
+ self.jid_gen = jid_gen
self.ping_jid = jid_gen()
self.batch_jid = jid_gen()
self.find_job_jid = jid_gen()
@@ -91,14 +92,11 @@ class BatchAsync:
def __set_event_handler(self):
ping_return_pattern = "salt/job/{}/ret/*".format(self.ping_jid)
batch_return_pattern = "salt/job/{}/ret/*".format(self.batch_jid)
- find_job_return_pattern = "salt/job/{}/ret/*".format(self.find_job_jid)
self.event.subscribe(ping_return_pattern, match_type="glob")
self.event.subscribe(batch_return_pattern, match_type="glob")
- self.event.subscribe(find_job_return_pattern, match_type="glob")
- self.event.patterns = {
+ self.patterns = {
(ping_return_pattern, "ping_return"),
(batch_return_pattern, "batch_run"),
- (find_job_return_pattern, "find_job_return"),
}
self.event.set_event_handler(self.__event_handler)
@@ -106,7 +104,7 @@ class BatchAsync:
if not self.event:
return
mtag, data = self.event.unpack(raw, self.event.serial)
- for (pattern, op) in self.event.patterns:
+ for (pattern, op) in self.patterns:
if fnmatch.fnmatch(mtag, pattern):
minion = data["id"]
if op == "ping_return":
@@ -114,7 +112,8 @@ class BatchAsync:
if self.targeted_minions == self.minions:
self.event.io_loop.spawn_callback(self.start_batch)
elif op == "find_job_return":
- self.find_job_returned.add(minion)
+ if data.get("return", None):
+ self.find_job_returned.add(minion)
elif op == "batch_run":
if minion in self.active:
self.active.remove(minion)
@@ -134,7 +133,11 @@ class BatchAsync:
return set(list(to_run)[:next_batch_size])
@tornado.gen.coroutine
- def check_find_job(self, batch_minions):
+ def check_find_job(self, batch_minions, jid):
+ find_job_return_pattern = "salt/job/{}/ret/*".format(jid)
+ self.event.unsubscribe(find_job_return_pattern, match_type="glob")
+ self.patterns.remove((find_job_return_pattern, "find_job_return"))
+
timedout_minions = batch_minions.difference(self.find_job_returned).difference(
self.done_minions
)
@@ -143,27 +146,39 @@ class BatchAsync:
running = batch_minions.difference(self.done_minions).difference(
self.timedout_minions
)
+
if timedout_minions:
self.schedule_next()
+
if running:
+ self.find_job_returned = self.find_job_returned.difference(running)
self.event.io_loop.add_callback(self.find_job, running)
@tornado.gen.coroutine
def find_job(self, minions):
- not_done = minions.difference(self.done_minions)
- ping_return = yield self.local.run_job_async(
- not_done,
- "saltutil.find_job",
- [self.batch_jid],
- "list",
- gather_job_timeout=self.opts["gather_job_timeout"],
- jid=self.find_job_jid,
- **self.eauth
- )
- self.event.io_loop.call_later(
- self.opts["gather_job_timeout"], self.check_find_job, not_done
+ not_done = minions.difference(self.done_minions).difference(
+ self.timedout_minions
)
+ if not_done:
+ jid = self.jid_gen()
+ find_job_return_pattern = "salt/job/{}/ret/*".format(jid)
+ self.patterns.add((find_job_return_pattern, "find_job_return"))
+ self.event.subscribe(find_job_return_pattern, match_type="glob")
+
+ ret = yield self.local.run_job_async(
+ not_done,
+ "saltutil.find_job",
+ [self.batch_jid],
+ "list",
+ gather_job_timeout=self.opts["gather_job_timeout"],
+ jid=jid,
+ **self.eauth
+ )
+ self.event.io_loop.call_later(
+ self.opts["gather_job_timeout"], self.check_find_job, not_done, jid
+ )
+
@tornado.gen.coroutine
def start(self):
self.__set_event_handler()
@@ -211,6 +226,9 @@ class BatchAsync:
}
self.event.fire_event(data, "salt/batch/{}/done".format(self.batch_jid))
self.event.remove_event_handler(self.__event_handler)
+ for (pattern, label) in self.patterns:
+ if label in ["ping_return", "batch_run"]:
+ self.event.unsubscribe(pattern, match_type="glob")
def schedule_next(self):
if not self.scheduled:
@@ -235,11 +253,14 @@ class BatchAsync:
jid=self.batch_jid,
metadata=self.metadata,
)
+
self.event.io_loop.call_later(
self.opts["timeout"], self.find_job, set(next_batch)
)
except Exception as ex:
+ log.error("Error in scheduling next batch: %s", ex)
self.active = self.active.difference(next_batch)
else:
self.end_batch()
self.scheduled = False
+ yield
diff --git a/salt/client/__init__.py b/salt/client/__init__.py
index 1e9f11df4c..cc8fd4048d 100644
--- a/salt/client/__init__.py
+++ b/salt/client/__init__.py
@@ -1776,6 +1776,7 @@ class LocalClient:
"key": self.key,
"tgt_type": tgt_type,
"ret": ret,
+ "timeout": timeout,
"jid": jid,
}
diff --git a/salt/master.py b/salt/master.py
index b9bc1a7a67..7a99af357a 100644
--- a/salt/master.py
+++ b/salt/master.py
@@ -2232,8 +2232,6 @@ class ClearFuncs(TransportMethods):
def publish_batch(self, clear_load, minions, missing):
batch_load = {}
batch_load.update(clear_load)
- import salt.cli.batch_async
-
batch = salt.cli.batch_async.BatchAsync(
self.local.opts,
functools.partial(self._prep_jid, clear_load, {}),
--
2.29.2