File introduce-process_count_max-minion-configuration-par.patch of Package salt

From d5678e1c20e4a7aa25cc5e8fe32e56cf46254f85 Mon Sep 17 00:00:00 2001
From: Silvio Moioli <smoioli@suse.de>
Date: Wed, 20 Sep 2017 14:33:33 +0200
Subject: [PATCH] Introduce process_count_max minion configuration
 parameter

This allows users to limit the number of processes or threads a minion
will start in response to published messages, prevents resource
exhaustion in case a high number of concurrent jobs is scheduled in a
short time.

process_count_max: add defaults and documentation

process_count_max: adapt existing unit tests

process_count_max: add unit test

process_count_max: disable by default
---
 conf/minion                      |  6 +++++
 doc/ref/configuration/minion.rst | 17 +++++++++++++
 pkg/windows/buildenv/conf/minion |  6 +++++
 salt/config/__init__.py          |  4 ++++
 salt/minion.py                   | 10 ++++++++
 tests/unit/minion_test.py        | 52 +++++++++++++++++++++++++++++++++++++---
 6 files changed, 92 insertions(+), 3 deletions(-)

diff --git a/conf/minion b/conf/minion
index 1dcc065639..71d2c0835c 100644
--- a/conf/minion
+++ b/conf/minion
@@ -653,6 +653,12 @@
 # for a full explanation.
 #multiprocessing: True
 
+# Limit the maximum amount of processes or threads created by salt-minion.
+# This is useful to avoid resource exhaustion in case the minion receives more
+# publications than it is able to handle, as it limits the number of spawned
+# processes or threads. -1 is the default and disables the limit.
+#process_count_max: -1
+
 
 #####         Logging settings       #####
 ##########################################
diff --git a/doc/ref/configuration/minion.rst b/doc/ref/configuration/minion.rst
index 388cc93611..165a5e5025 100644
--- a/doc/ref/configuration/minion.rst
+++ b/doc/ref/configuration/minion.rst
@@ -1948,6 +1948,23 @@ executed in a thread.
 
     multiprocessing: True
 
+.. conf_minion:: process_count_max
+
+``process_count_max``
+-------
+
+.. versionadded:: Oxygen
+
+Default: ``-1``
+
+Limit the maximum amount of processes or threads created by ``salt-minion``.
+This is useful to avoid resource exhaustion in case the minion receives more
+publications than it is able to handle, as it limits the number of spawned
+processes or threads. ``-1`` is the default and disables the limit.
+
+.. code-block:: yaml
+
+    process_count_max: -1
 
 .. _minion-logging-settings:
 
diff --git a/pkg/windows/buildenv/conf/minion b/pkg/windows/buildenv/conf/minion
index e9f168b321..87a2314105 100644
--- a/pkg/windows/buildenv/conf/minion
+++ b/pkg/windows/buildenv/conf/minion
@@ -283,6 +283,12 @@ pki_dir: /conf/pki/minion
 # publication a new process is spawned and the command is executed therein.
 # multiprocessing: True
 
+# Limit the maximum amount of processes or threads created by salt-minion.
+# This is useful to avoid resource exhaustion in case the minion receives more
+# publications than it is able to handle, as it limits the number of spawned
+# processes or threads. -1 disables the limit.
+#process_count_max: 20
+
 ######         Logging settings       #####
 ###########################################
 # The location of the minion log file.
diff --git a/salt/config/__init__.py b/salt/config/__init__.py
index 58a66c33c8..7254da4871 100644
--- a/salt/config/__init__.py
+++ b/salt/config/__init__.py
@@ -304,6 +304,9 @@ VALID_OPTS = {
     # Whether or not processes should be forked when needed. The alternative is to use threading.
     'multiprocessing': bool,
 
+    # Maximum number of concurrently active processes at any given point in time
+    'process_count_max': int,
+
     # Whether or not the salt minion should run scheduled mine updates
     'mine_enabled': bool,
 
@@ -1070,6 +1073,7 @@ DEFAULT_MINION_OPTS = {
     'auto_accept': True,
     'autosign_timeout': 120,
     'multiprocessing': True,
+    'process_count_max': -1,
     'mine_enabled': True,
     'mine_return_job': False,
     'mine_interval': 60,
diff --git a/salt/minion.py b/salt/minion.py
index 71b8c2f54e..791f9e1911 100644
--- a/salt/minion.py
+++ b/salt/minion.py
@@ -1265,6 +1265,7 @@ class Minion(MinionBase):
                 self._send_req_async(load, timeout, callback=lambda f: None)  # pylint: disable=unexpected-keyword-arg
         return True
 
+    @tornado.gen.coroutine
     def _handle_decoded_payload(self, data):
         '''
         Override this method if you wish to handle the decoded data
@@ -1296,6 +1297,15 @@ class Minion(MinionBase):
                 self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
                 self.schedule.functions = self.functions
                 self.schedule.returners = self.returners
+
+        process_count_max = self.opts.get('process_count_max')
+        if process_count_max > 0:
+            process_count = len(salt.utils.minion.running(self.opts))
+            while process_count >= process_count_max:
+                log.warn("Maximum number of processes reached while executing jid {0}, waiting...".format(data['jid']))
+                yield tornado.gen.sleep(10)
+                process_count = len(salt.utils.minion.running(self.opts))
+
         # We stash an instance references to allow for the socket
         # communication in Windows. You can't pickle functions, and thus
         # python needs to be able to reconstruct the reference on the other
diff --git a/tests/unit/minion_test.py b/tests/unit/minion_test.py
index 1a43de7bdd..69a965f0ef 100644
--- a/tests/unit/minion_test.py
+++ b/tests/unit/minion_test.py
@@ -17,6 +17,7 @@ from salttesting.mock import NO_MOCK, NO_MOCK_REASON, patch, MagicMock
 from salt import minion
 from salt.utils import event
 from salt.exceptions import SaltSystemExit
+from salt.ext.six.moves import range
 import salt.syspaths
 import tornado
 
@@ -73,7 +74,7 @@ class MinionTestCase(TestCase):
         mock_jid_queue = [123]
         try:
             minion = salt.minion.Minion(mock_opts, jid_queue=copy.copy(mock_jid_queue), io_loop=tornado.ioloop.IOLoop())
-            ret = minion._handle_decoded_payload(mock_data)
+            ret = minion._handle_decoded_payload(mock_data).result()
             self.assertEqual(minion.jid_queue, mock_jid_queue)
             self.assertIsNone(ret)
         finally:
@@ -104,7 +105,7 @@ class MinionTestCase(TestCase):
             # Call the _handle_decoded_payload function and update the mock_jid_queue to include the new
             # mock_jid. The mock_jid should have been added to the jid_queue since the mock_jid wasn't
             # previously included. The minion's jid_queue attribute and the mock_jid_queue should be equal.
-            minion._handle_decoded_payload(mock_data)
+            minion._handle_decoded_payload(mock_data).result()
             mock_jid_queue.append(mock_jid)
             self.assertEqual(minion.jid_queue, mock_jid_queue)
         finally:
@@ -133,12 +134,57 @@ class MinionTestCase(TestCase):
 
             # Call the _handle_decoded_payload function and check that the queue is smaller by one item
             # and contains the new jid
-            minion._handle_decoded_payload(mock_data)
+            minion._handle_decoded_payload(mock_data).result()
             self.assertEqual(len(minion.jid_queue), 2)
             self.assertEqual(minion.jid_queue, [456, 789])
         finally:
             minion.destroy()
 
+    @patch('salt.minion.Minion.ctx', MagicMock(return_value={}))
+    @patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True))
+    @patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True))
+    @patch('salt.utils.minion.running', MagicMock(return_value=[]))
+    @patch('tornado.gen.sleep', MagicMock(return_value=tornado.concurrent.Future()))
+    def test_process_count_max(self):
+        '''
+        Tests that the _handle_decoded_payload function does not spawn more than the configured amount of processes,
+        as per process_count_max.
+        '''
+        process_count_max = 10
+        mock_opts = salt.config.DEFAULT_MINION_OPTS
+        mock_opts['minion_jid_queue_hwm'] = 100
+        mock_opts['process_count_max'] = process_count_max
+
+        try:
+            io_loop = tornado.ioloop.IOLoop()
+            minion = salt.minion.Minion(mock_opts, jid_queue=[], io_loop=io_loop)
+
+            # mock gen.sleep to throw a special Exception when called, so that we detect it
+            class SleepCalledEception(Exception):
+                """Thrown when sleep is called"""
+                pass
+            tornado.gen.sleep.return_value.set_exception(SleepCalledEception())
+
+            # up until process_count_max: gen.sleep does not get called, processes are started normally
+            for i in range(process_count_max):
+                mock_data = {'fun': 'foo.bar',
+                             'jid': i}
+                io_loop.run_sync(lambda: minion._handle_decoded_payload(mock_data))
+                self.assertEqual(salt.utils.process.SignalHandlingMultiprocessingProcess.start.call_count, i +1)
+                self.assertEqual(len(minion.jid_queue), i + 1)
+                salt.utils.minion.running.return_value += [i]
+
+            # above process_count_max: gen.sleep does get called, JIDs are created but no new processes are started
+            mock_data = {'fun': 'foo.bar',
+                         'jid': process_count_max + 1}
+
+            self.assertRaises(SleepCalledEception,
+                              lambda: io_loop.run_sync(lambda: minion._handle_decoded_payload(mock_data)))
+            self.assertEqual(salt.utils.process.SignalHandlingMultiprocessingProcess.start.call_count,
+                             process_count_max)
+            self.assertEqual(len(minion.jid_queue), process_count_max + 1)
+        finally:
+            minion.destroy()
 
 if __name__ == '__main__':
     from integration import run_tests
-- 
2.13.6


openSUSE Build Service is sponsored by