File add-parallel-support-for-orchestrations.patch of Package salt.10899
From cd89919706697e0b4bf0648443a2ec1edf2a52a0 Mon Sep 17 00:00:00 2001
From: Matt Phillips <mphillips81@bloomberg.net>
Date: Tue, 13 Feb 2018 19:46:23 -0500
Subject: [PATCH] add parallel support for orchestrations
originally the parallel global state requisite did not work correctly when
invoked under an orch - this fixes that; as well as running any other saltmod
state (function, runner, wheel).
* join() parallel process instead of a recursive sleep
its not clear to me why the recursive calls were chosen originally. this should
address https://github.com/saltstack/salt/issues/43668
* revisit previous join() behavior in check_requisites
rather than join()'ing on each running process, we instead use check_running to
assert completion of our processes. This should provide more correct timing results,
as measuring durations of a longer running join()'d process could trample
a shorter parallel process that just happened to be checked after instead of
before.
* record start/stop duration for parallel processes separately
previously durations were only recording the time to spawn the multiprocessing
proc, not the actual time of completion, which is completely wrong. This should
capture the full duration correctly now. We unfortunately have to duplicate
start & complete times instead of using the passed in start_time attr, as that
value is only a time (not date), so it is impossible to re-calculate the full
duration based on that alone (ie, what happens if start_time is 23:59 with
a roll-over to the next day).
This fixes #44828
* cherry-pick cdata KeyError prevention from #39832
@ninja- noticed there was some useful code already in _call_parallel_target to
mitigate KeyErrors for potentially empty cdata, but it wasnt being executed due
to the invoking method making the same mistake before calling it. this moves
that code up to eliminate that potential stacktrace.
this should close #39832
* add integration test to runners/test_state to exercise parallel
this should hopefully exercise a few different facets of parallel that were
previously not covered in the code base.
* removing prereq from test orch
seems to be encountering unrelated preexisting failures in the functionality
unrelated to my changes.
* fix parallel mode py3 compatibility
parallel: True codepath incompatibilities uncovered by the added tests.
additionally use salt.serializers.msgpack to avoid other py2/py3/back compat
issues.
---
salt/modules/saltutil.py | 3 +
salt/runner.py | 2 +-
salt/runners/state.py | 1 +
salt/state.py | 72 ++++++++++++---------
salt/utils/files.py | 6 +-
tests/integration/runners/test_state.py | 86 +++++++++++++++++++++++++
6 files changed, 136 insertions(+), 34 deletions(-)
diff --git a/salt/modules/saltutil.py b/salt/modules/saltutil.py
index b951983f5c..f5717db443 100644
--- a/salt/modules/saltutil.py
+++ b/salt/modules/saltutil.py
@@ -1506,6 +1506,9 @@ def runner(name, arg=None, kwarg=None, full_return=False, saltenv='base', jid=No
if 'saltenv' in aspec.args:
kwarg['saltenv'] = saltenv
+ if name in ['state.orchestrate', 'state.orch', 'state.sls']:
+ kwarg['orchestration_jid'] = jid
+
if jid:
salt.utils.event.fire_args(
__opts__,
diff --git a/salt/runner.py b/salt/runner.py
index fea1031abb..ec389a45b0 100644
--- a/salt/runner.py
+++ b/salt/runner.py
@@ -232,7 +232,7 @@ class Runner(RunnerClient):
else:
user = salt.utils.user.get_specific_user()
- if low['fun'] in ('state.orchestrate', 'state.orch'):
+ if low['fun'] in ['state.orchestrate', 'state.orch', 'state.sls']:
low['kwarg']['orchestration_jid'] = async_pub['jid']
# Run the runner!
diff --git a/salt/runners/state.py b/salt/runners/state.py
index a0e65a49df..993d00055a 100644
--- a/salt/runners/state.py
+++ b/salt/runners/state.py
@@ -103,6 +103,7 @@ def orchestrate(mods,
saltenv=saltenv,
pillarenv=pillarenv,
pillar_enc=pillar_enc,
+ __pub_jid=orchestration_jid,
orchestration_jid=orchestration_jid)
ret = {'data': {minion.opts['id']: running}, 'outputter': 'highstate'}
res = __utils__['state.check_result'](ret['data'])
diff --git a/salt/state.py b/salt/state.py
index 4a59b77305..2746622ab2 100644
--- a/salt/state.py
+++ b/salt/state.py
@@ -43,6 +43,7 @@ import salt.utils.platform
import salt.utils.process
import salt.utils.url
import salt.syspaths as syspaths
+from salt.serializers.msgpack import serialize as msgpack_serialize, deserialize as msgpack_deserialize
from salt.template import compile_template, compile_template_str
from salt.exceptions import (
SaltRenderError,
@@ -54,11 +55,11 @@ from salt.utils.locales import sdecode
import salt.utils.yamlloader as yamlloader
# Import third party libs
+from msgpack import UnpackValueError
# pylint: disable=import-error,no-name-in-module,redefined-builtin
from salt.ext import six
from salt.ext.six.moves import map, range, reload_module
# pylint: enable=import-error,no-name-in-module,redefined-builtin
-import msgpack
log = logging.getLogger(__name__)
@@ -1695,27 +1696,20 @@ class State(object):
errors.extend(req_in_errors)
return req_in_high, errors
- def _call_parallel_target(self, cdata, low):
+ def _call_parallel_target(self, name, cdata, low):
'''
The target function to call that will create the parallel thread/process
'''
+ # we need to re-record start/end duration here because it is impossible to
+ # correctly calculate further down the chain
+ utc_start_time = datetime.datetime.utcnow()
+
tag = _gen_tag(low)
try:
ret = self.states[cdata['full']](*cdata['args'],
**cdata['kwargs'])
except Exception:
trb = traceback.format_exc()
- # There are a number of possibilities to not have the cdata
- # populated with what we might have expected, so just be smart
- # enough to not raise another KeyError as the name is easily
- # guessable and fallback in all cases to present the real
- # exception to the user
- if len(cdata['args']) > 0:
- name = cdata['args'][0]
- elif 'name' in cdata['kwargs']:
- name = cdata['kwargs']['name']
- else:
- name = low.get('name', low.get('__id__'))
ret = {
'result': False,
'name': name,
@@ -1723,6 +1717,13 @@ class State(object):
'comment': 'An exception occurred in this state: {0}'.format(
trb)
}
+
+ utc_finish_time = datetime.datetime.utcnow()
+ delta = (utc_finish_time - utc_start_time)
+ # duration in milliseconds.microseconds
+ duration = (delta.seconds * 1000000 + delta.microseconds) / 1000.0
+ ret['duration'] = duration
+
troot = os.path.join(self.opts['cachedir'], self.jid)
tfile = os.path.join(troot, _clean_tag(tag))
if not os.path.isdir(troot):
@@ -1733,17 +1734,26 @@ class State(object):
# and the attempt, we are safe to pass
pass
with salt.utils.files.fopen(tfile, 'wb+') as fp_:
- fp_.write(msgpack.dumps(ret))
+ fp_.write(msgpack_serialize(ret))
def call_parallel(self, cdata, low):
'''
Call the state defined in the given cdata in parallel
'''
+ # There are a number of possibilities to not have the cdata
+ # populated with what we might have expected, so just be smart
+ # enough to not raise another KeyError as the name is easily
+ # guessable and fallback in all cases to present the real
+ # exception to the user
+ name = (cdata.get('args') or [None])[0] or cdata['kwargs'].get('name')
+ if not name:
+ name = low.get('name', low.get('__id__'))
+
proc = salt.utils.process.MultiprocessingProcess(
target=self._call_parallel_target,
- args=(cdata, low))
+ args=(name, cdata, low))
proc.start()
- ret = {'name': cdata['args'][0],
+ ret = {'name': name,
'result': None,
'changes': {},
'comment': 'Started in a seperate process',
@@ -1892,12 +1902,10 @@ class State(object):
# enough to not raise another KeyError as the name is easily
# guessable and fallback in all cases to present the real
# exception to the user
- if len(cdata['args']) > 0:
- name = cdata['args'][0]
- elif 'name' in cdata['kwargs']:
- name = cdata['kwargs']['name']
- else:
+ name = (cdata.get('args') or [None])[0] or cdata['kwargs'].get('name')
+ if not name:
name = low.get('name', low.get('__id__'))
+
ret = {
'result': False,
'name': name,
@@ -1938,7 +1946,7 @@ class State(object):
ret['start_time'] = local_start_time.time().isoformat()
delta = (utc_finish_time - utc_start_time)
# duration in milliseconds.microseconds
- duration = (delta.seconds * 1000000 + delta.microseconds)/1000.0
+ duration = (delta.seconds * 1000000 + delta.microseconds) / 1000.0
ret['duration'] = duration
ret['__id__'] = low['__id__']
log.info(
@@ -2106,7 +2114,7 @@ class State(object):
while True:
if self.reconcile_procs(running):
break
- time.sleep(0.01)
+ time.sleep(0.0001)
ret = dict(list(disabled.items()) + list(running.items()))
return ret
@@ -2138,8 +2146,8 @@ class State(object):
tries = 0
with salt.utils.files.fopen(pause_path, 'rb') as fp_:
try:
- pdat = msgpack.loads(fp_.read())
- except msgpack.UnpackValueError:
+ pdat = msgpack_deserialize(fp_.read())
+ except UnpackValueError:
# Reading race condition
if tries > 10:
# Break out if there are a ton of read errors
@@ -2185,7 +2193,7 @@ class State(object):
'changes': {}}
try:
with salt.utils.files.fopen(ret_cache, 'rb') as fp_:
- ret = msgpack.loads(fp_.read())
+ ret = msgpack_deserialize(fp_.read())
except (OSError, IOError):
ret = {'result': False,
'comment': 'Parallel cache failure',
@@ -2298,16 +2306,18 @@ class State(object):
run_dict = self.pre
else:
run_dict = running
+
+ while True:
+ if self.reconcile_procs(run_dict):
+ break
+ time.sleep(0.0001)
+
for chunk in chunks:
tag = _gen_tag(chunk)
if tag not in run_dict:
req_stats.add('unmet')
continue
- if run_dict[tag].get('proc'):
- # Run in parallel, first wait for a touch and then recheck
- time.sleep(0.01)
- return self.check_requisite(low, running, chunks, pre)
- if r_state.startswith('onfail'):
+ if r_state == 'onfail':
if run_dict[tag]['result'] is True:
req_stats.add('onfail') # At least one state is OK
continue
diff --git a/salt/utils/files.py b/salt/utils/files.py
index 3ca73b9db8..d22bce9e19 100644
--- a/salt/utils/files.py
+++ b/salt/utils/files.py
@@ -16,7 +16,6 @@ import stat
import subprocess
import tempfile
import time
-import urllib
# Import Salt libs
import salt.utils.path
@@ -36,6 +35,9 @@ except ImportError:
# fcntl is not available on windows
HAS_FCNTL = False
+from salt.ext.six.moves.urllib.parse import quote # pylint: disable=no-name-in-module
+
+
log = logging.getLogger(__name__)
LOCAL_PROTOS = ('', 'file')
@@ -563,7 +565,7 @@ def safe_filename_leaf(file_basename):
:codeauthor: Damon Atkins <https://github.com/damon-atkins>
'''
def _replace(re_obj):
- return urllib.quote(re_obj.group(0), safe='')
+ return quote(re_obj.group(0), safe=u'')
if not isinstance(file_basename, six.text_type):
# the following string is not prefixed with u
return re.sub('[\\\\:/*?"<>|]',
diff --git a/tests/integration/runners/test_state.py b/tests/integration/runners/test_state.py
index dac7b3f033..312fcffc01 100644
--- a/tests/integration/runners/test_state.py
+++ b/tests/integration/runners/test_state.py
@@ -6,10 +6,12 @@ Tests for the state runner
# Import Python Libs
from __future__ import absolute_import, print_function, unicode_literals
import errno
+import logging
import os
import shutil
import signal
import tempfile
+import time
import textwrap
import threading
from salt.ext.six.moves import queue
@@ -31,6 +33,8 @@ import salt.utils.yaml
# Import 3rd-party libs
from salt.ext import six
+log = logging.getLogger(__name__)
+
class StateRunnerTest(ShellCase):
'''
@@ -351,3 +355,85 @@ class OrchEventTest(ShellCase):
finally:
del listener
signal.alarm(0)
+
+ def test_parallel_orchestrations(self):
+ '''
+ Test to confirm that the parallel state requisite works in orch
+ we do this by running 10 test.sleep's of 10 seconds, and insure it only takes roughly 10s
+ '''
+ self.write_conf({
+ 'fileserver_backend': ['roots'],
+ 'file_roots': {
+ 'base': [self.base_env],
+ },
+ })
+
+ orch_sls = os.path.join(self.base_env, 'test_par_orch.sls')
+
+ with salt.utils.fopen(orch_sls, 'w') as fp_:
+ fp_.write(textwrap.dedent('''
+ {% for count in range(1, 20) %}
+
+ sleep {{ count }}:
+ module.run:
+ - name: test.sleep
+ - length: 10
+ - parallel: True
+
+ {% endfor %}
+
+ sleep 21:
+ module.run:
+ - name: test.sleep
+ - length: 10
+ - parallel: True
+ - require:
+ - module: sleep 1
+ '''))
+
+ orch_sls = os.path.join(self.base_env, 'test_par_orch.sls')
+
+ listener = salt.utils.event.get_event(
+ 'master',
+ sock_dir=self.master_opts['sock_dir'],
+ transport=self.master_opts['transport'],
+ opts=self.master_opts)
+
+ start_time = time.time()
+ jid = self.run_run_plus(
+ 'state.orchestrate',
+ 'test_par_orch',
+ __reload_config=True).get('jid')
+
+ if jid is None:
+ raise Exception('jid missing from run_run_plus output')
+
+ signal.signal(signal.SIGALRM, self.alarm_handler)
+ signal.alarm(self.timeout)
+ received = False
+ try:
+ while True:
+ event = listener.get_event(full=True)
+ if event is None:
+ continue
+
+ # if we receive the ret for this job before self.timeout (60),
+ # the test is implicitly sucessful; if it were happening in serial it would be
+ # atleast 110 seconds.
+ if event['tag'] == 'salt/run/{0}/ret'.format(jid):
+ received = True
+ # Don't wrap this in a try/except. We want to know if the
+ # data structure is different from what we expect!
+ ret = event['data']['return']['data']['master']
+ for state in ret:
+ data = ret[state]
+ # we expect each duration to be greater than 10s
+ self.assertTrue(data['duration'] > 10000)
+ break
+
+ # self confirm that the total runtime is roughly 30s (left 10s for buffer)
+ self.assertTrue((time.time() - start_time) < 40)
+ finally:
+ self.assertTrue(received)
+ del listener
+ signal.alarm(0)
--
2.20.1