File add-salt-ssh-support-with-venv-salt-minion-3002.2-47.patch of Package salt

From 99e9f670ac2b3a520d1e4dd0035fc3983004a3a0 Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <vzhestkov@suse.com>
Date: Thu, 3 Mar 2022 19:11:25 +0300
Subject: [PATCH] Add salt-ssh support with venv-salt-minion - 3002.2
 (#476)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Add salt-ssh support with venv-salt-minion

* Add some comments and drop the commented line

* Fix return in check_venv_hash_file

* Remove redundant logging locks

* Prevent possible segfault by GC

* Convert all script parameters to strings

* Reduce the size of minion response

Minion response contains SSH_PY_CODE wrapped to base64.
This fix reduces the size of the response in DEBUG logging

* Prevent simultaneous to salt-ssh minion

* Avoid race condition on loading roster modules

* Fix failing test test_ssh_kwargs

* Fix test test_roster_defaults_flat

* Pass the context to roster modules

* Make VENV_HASH_FILE global

* Make ssh session grace time configurable

* Blacken salt.client.ssh

* Revert "Avoid race condition on loading roster modules"

This reverts commit ce9414b74abb7396b6f529e0527b619f40859f30.

* Prevent deadlocks with importlib on using LazyLoader

* Make logging on salt-ssh errors more informative

* Add comments about using salt.loader.LOAD_LOCK

* Fix test_loader test

* Revert "Fix test test_roster_defaults_flat"

This reverts commit 62ff8645c6c1a8f66b022075a136e9d5652bb624.

* Revert "Fix failing test test_ssh_kwargs"

This reverts commit 039fa0b75f2fe84f0449d83431e55d0060d42967.

* Prevent deadlocks on using logging

* Use collections.deque instead of list for salt-ssh

Suggested by @agraul

* Get proper exitstatus from salt.utils.vt.Terminal

to prevent empty event returns due to improperly detecting
the child process as failed

* Do not run pre flight script for raw_shell

* Prevent shell injection via pre_flight_script_args (#497)

Add tests around preflight script args

Readjust logic to validate script args

Use RLock to prevent issues in single threads

Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com>
---
 salt/_logging/impl.py                    |  11 +-
 salt/client/ssh/__init__.py              | 182 +++++++++++++++++++----
 salt/client/ssh/client.py                |   7 +-
 salt/client/ssh/shell.py                 |   8 +
 salt/client/ssh/ssh_py_shim.py           | 109 ++++++++------
 salt/loader.py                           |  30 +++-
 salt/netapi/__init__.py                  |   3 +-
 salt/roster/__init__.py                  |   6 +-
 tests/integration/ssh/test_pre_flight.py |  56 ++++++-
 tests/unit/client/test_ssh.py            |  35 +++++
 tests/unit/test_loader.py                |   2 +-
 11 files changed, 352 insertions(+), 97 deletions(-)

diff --git a/salt/_logging/impl.py b/salt/_logging/impl.py
index 2f15bf0025..919de264dc 100644
--- a/salt/_logging/impl.py
+++ b/salt/_logging/impl.py
@@ -9,6 +9,7 @@
 import logging
 import re
 import sys
+import threading
 import types
 
 import salt.ext.six as six
@@ -96,6 +97,10 @@ SORTED_LEVEL_NAMES = [l[0] for l in sorted(LOG_LEVELS.items(), key=lambda x: x[1
 
 MODNAME_PATTERN = re.compile(r"(?P<name>%%\(name\)(?:\-(?P<digits>[\d]+))?s)")
 
+# LOG_LOCK is used to prevent deadlocks on using logging
+# in combination with multiprocessing with salt-api
+LOG_LOCK = threading.RLock()
+
 
 # ----- REMOVE ME ON REFACTOR COMPLETE ------------------------------------------------------------------------------>
 class __NullLoggingHandler(TemporaryLoggingHandler):
@@ -289,7 +294,7 @@ class SaltLoggingClass(
             extra["exc_info_on_loglevel"] = exc_info_on_loglevel
 
         try:
-            logging._acquireLock()
+            LOG_LOCK.acquire()
             if sys.version_info < (3,):
                 LOGGING_LOGGER_CLASS._log(
                     self, level, msg, args, exc_info=exc_info, extra=extra
@@ -315,10 +320,8 @@ class SaltLoggingClass(
                     stack_info=stack_info,
                     stacklevel=stacklevel,
                 )
-        except:
-            pass
         finally:
-            logging._releaseLock()
+            LOG_LOCK.release()
 
     def makeRecord(
         self,
diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py
index 0a76627fe3..57bc289a62 100644
--- a/salt/client/ssh/__init__.py
+++ b/salt/client/ssh/__init__.py
@@ -6,18 +6,22 @@ import base64
 import binascii
 import copy
 import datetime
+import gc
 import getpass
 import hashlib
 import logging
 import multiprocessing
 import os
+import psutil
 import re
+import shlex
 import subprocess
 import sys
 import tarfile
 import tempfile
 import time
 import uuid
+from collections import deque
 
 import salt.client.ssh.shell
 import salt.client.ssh.wrapper
@@ -43,6 +47,7 @@ import salt.utils.stringutils
 import salt.utils.thin
 import salt.utils.url
 import salt.utils.verify
+from salt._logging.impl import LOG_LOCK
 from salt.ext import six
 from salt.ext.six.moves import input  # pylint: disable=import-error,redefined-builtin
 from salt.template import compile_template
@@ -147,15 +152,26 @@ elif [ "$SUDO" ] && [ -n "$SUDO_USER" ]
 then SUDO="sudo "
 fi
 EX_PYTHON_INVALID={EX_THIN_PYTHON_INVALID}
-PYTHON_CMDS="python3 /usr/libexec/platform-python python27 python2.7 python26 python2.6 python2 python"
+set +x
+SSH_PY_CODE='import base64;
+                   exec(base64.b64decode("""{{SSH_PY_CODE}}""").decode("utf-8"))'
+if [ -n "$DEBUG" ]
+    then set -x
+fi
+PYTHON_CMDS="/var/tmp/venv-salt-minion/bin/python python3 /usr/libexec/platform-python python27 python2.7 python26 python2.6 python2 python"
 for py_cmd in $PYTHON_CMDS
 do
     if command -v "$py_cmd" >/dev/null 2>&1 && "$py_cmd" -c "import sys; sys.exit(not (sys.version_info >= (2, 6)));"
     then
         py_cmd_path=`"$py_cmd" -c 'from __future__ import print_function;import sys; print(sys.executable);'`
         cmdpath=`command -v $py_cmd 2>/dev/null || which $py_cmd 2>/dev/null`
+        cmdpath=`readlink -f $cmdpath`
         if file $cmdpath | grep "shell script" > /dev/null
         then
+            if echo $cmdpath | grep venv-salt-minion > /dev/null
+            then
+                exec $SUDO "$cmdpath" -c "$SSH_PY_CODE"
+            fi
             ex_vars="'PATH', 'LD_LIBRARY_PATH', 'MANPATH', \
                    'XDG_DATA_DIRS', 'PKG_CONFIG_PATH'"
             export `$py_cmd -c \
@@ -167,13 +183,9 @@ do
             exec $SUDO PATH=$PATH LD_LIBRARY_PATH=$LD_LIBRARY_PATH \
                      MANPATH=$MANPATH XDG_DATA_DIRS=$XDG_DATA_DIRS \
                      PKG_CONFIG_PATH=$PKG_CONFIG_PATH \
-                     "$py_cmd_path" -c \
-                   'import base64;
-                   exec(base64.b64decode("""{{SSH_PY_CODE}}""").decode("utf-8"))'
+                     "$py_cmd_path" -c "$SSH_PY_CODE"
         else
-            exec $SUDO "$py_cmd_path" -c \
-                   'import base64;
-                   exec(base64.b64decode("""{{SSH_PY_CODE}}""").decode("utf-8"))'
+            exec $SUDO "$py_cmd_path" -c "$SSH_PY_CODE"
         fi
         exit 0
     else
@@ -190,6 +202,10 @@ EOF'''.format(
     ]
 )
 
+# The file on a salt-ssh minion used to identify if Salt Bundle was deployed
+VENV_HASH_FILE = "/var/tmp/venv-salt-minion/venv-hash.txt"
+
+
 if not is_windows():
     shim_file = os.path.join(os.path.dirname(__file__), "ssh_py_shim.py")
     if not os.path.exists(shim_file):
@@ -208,7 +224,7 @@ class SSH:
 
     ROSTER_UPDATE_FLAG = "#__needs_update"
 
-    def __init__(self, opts):
+    def __init__(self, opts, context=None):
         self.__parsed_rosters = {SSH.ROSTER_UPDATE_FLAG: True}
         pull_sock = os.path.join(opts["sock_dir"], "master_event_pull.ipc")
         if os.path.exists(pull_sock) and zmq:
@@ -232,7 +248,9 @@ class SSH:
             else "glob"
         )
         self._expand_target()
-        self.roster = salt.roster.Roster(self.opts, self.opts.get("roster", "flat"))
+        self.roster = salt.roster.Roster(
+            self.opts, self.opts.get("roster", "flat"), context=context
+        )
         self.targets = self.roster.targets(self.opts["tgt"], self.tgt_type)
         if not self.targets:
             self._update_targets()
@@ -315,6 +333,14 @@ class SSH:
         )
         self.mods = mod_data(self.fsclient)
 
+        self.cache = salt.cache.Cache(self.opts)
+        self.master_id = self.opts["id"]
+        self.max_pid_wait = int(self.opts.get("ssh_max_pid_wait", 600))
+        self.session_flock_file = os.path.join(
+            self.opts["cachedir"], "salt-ssh.session.lock"
+        )
+        self.ssh_session_grace_time = int(self.opts.get("ssh_session_grace_time", 3))
+
     @property
     def parse_tgt(self):
         """
@@ -532,6 +558,8 @@ class SSH:
         """
         Run the routine in a "Thread", put a dict on the queue
         """
+        LOG_LOCK.release()
+        salt.loader.LOAD_LOCK.release()
         opts = copy.deepcopy(opts)
         single = Single(
             opts,
@@ -544,9 +572,7 @@ class SSH:
             **target
         )
         ret = {"id": single.id}
-        logging._acquireLock()
         stdout, stderr, retcode = single.run()
-        logging._releaseLock()
         # This job is done, yield
         try:
             data = salt.utils.json.find_json(stdout)
@@ -573,7 +599,7 @@ class SSH:
         """
         que = multiprocessing.Queue()
         running = {}
-        target_iter = self.targets.__iter__()
+        targets_queue = deque(self.targets.keys())
         returned = set()
         rets = set()
         init = False
@@ -582,11 +608,43 @@ class SSH:
                 log.error("No matching targets found in roster.")
                 break
             if len(running) < self.opts.get("ssh_max_procs", 25) and not init:
-                try:
-                    host = next(target_iter)
-                except StopIteration:
+                if targets_queue:
+                    host = targets_queue.popleft()
+                else:
                     init = True
                     continue
+                with salt.utils.files.flopen(self.session_flock_file, "w"):
+                    cached_session = self.cache.fetch("salt-ssh/session", host)
+                    if cached_session is not None and "ts" in cached_session:
+                        prev_session_running = time.time() - cached_session["ts"]
+                        if (
+                            "pid" in cached_session
+                            and cached_session.get("master_id", self.master_id)
+                            == self.master_id
+                        ):
+                            pid_running = (
+                                False
+                                if cached_session["pid"] == 0
+                                else psutil.pid_exists(cached_session["pid"])
+                            )
+                            if (
+                                pid_running and prev_session_running < self.max_pid_wait
+                            ) or (
+                                not pid_running
+                                and prev_session_running < self.ssh_session_grace_time
+                            ):
+                                targets_queue.append(host)
+                                time.sleep(0.3)
+                                continue
+                    self.cache.store(
+                        "salt-ssh/session",
+                        host,
+                        {
+                            "pid": 0,
+                            "master_id": self.master_id,
+                            "ts": time.time(),
+                        },
+                    )
                 for default in self.defaults:
                     if default not in self.targets[host]:
                         self.targets[host][default] = self.defaults[default]
@@ -614,15 +672,39 @@ class SSH:
                     self.targets[host],
                     mine,
                 )
+                routine = Process(target=self.handle_routine, args=args)
+                # Explicitly call garbage collector to prevent possible segfault
+                # in salt-api child process. (bsc#1188607)
+                gc.collect()
                 try:
-                    logging._acquireLock()
-                    routine = Process(target=self.handle_routine, args=args)
+                    # salt.loader.LOAD_LOCK is used to prevent deadlock
+                    # with importlib in combination with using multiprocessing (bsc#1182851)
+                    # If the salt-api child process is creating while LazyLoader instance
+                    # is loading module, new child process gets the lock for this module acquired.
+                    # Touching this module with importlib inside child process leads to deadlock.
+                    #
+                    # salt.loader.LOAD_LOCK is used to prevent salt-api child process creation
+                    # while creating new instance of LazyLoader
+                    # salt.loader.LOAD_LOCK must be released explicitly in self.handle_routine
+                    salt.loader.LOAD_LOCK.acquire()
+                    # The same solution applied to fix logging deadlock
+                    # LOG_LOCK must be released explicitly in self.handle_routine
+                    LOG_LOCK.acquire()
                     routine.start()
-                except:
-                    pass
                 finally:
-                    logging._releaseLock()
+                    LOG_LOCK.release()
+                    salt.loader.LOAD_LOCK.release()
                 running[host] = {"thread": routine}
+                with salt.utils.files.flopen(self.session_flock_file, "w"):
+                    self.cache.store(
+                        "salt-ssh/session",
+                        host,
+                        {
+                            "pid": routine.pid,
+                            "master_id": self.master_id,
+                            "ts": time.time(),
+                        },
+                    )
                 continue
             ret = {}
             try:
@@ -657,12 +739,27 @@ class SSH:
                             ).format(host)
                             ret = {"id": host, "ret": error}
                             log.error(error)
+                            log.error(
+                                "PID %s did not return any data for host '%s'",
+                                running[host]["thread"].pid,
+                                host,
+                            )
                             yield {ret["id"]: ret["ret"]}
                     running[host]["thread"].join()
                     rets.add(host)
             for host in rets:
                 if host in running:
                     running.pop(host)
+                    with salt.utils.files.flopen(self.session_flock_file, "w"):
+                        self.cache.store(
+                            "salt-ssh/session",
+                            host,
+                            {
+                                "pid": 0,
+                                "master_id": self.master_id,
+                                "ts": time.time(),
+                            },
+                        )
             if len(rets) >= len(self.targets):
                 break
             # Sleep when limit or all threads started
@@ -924,6 +1021,7 @@ class Single:
         self.context = {"master_opts": self.opts, "fileclient": self.fsclient}
 
         self.ssh_pre_flight = kwargs.get("ssh_pre_flight", None)
+        self.ssh_pre_flight_args = kwargs.get("ssh_pre_flight_args", None)
 
         if self.ssh_pre_flight:
             self.ssh_pre_file = os.path.basename(self.ssh_pre_flight)
@@ -1016,7 +1114,7 @@ class Single:
 
         self.shell.send(self.ssh_pre_flight, script)
 
-        return self.execute_script(script)
+        return self.execute_script(script, script_args=self.ssh_pre_flight_args)
 
     def check_thin_dir(self):
         """
@@ -1029,13 +1127,24 @@ class Single:
             return False
         return True
 
+    def check_venv_hash_file(self):
+        """
+        check if the venv exists on the remote machine
+        """
+        stdout, stderr, retcode = self.shell.exec_cmd(
+            "test -f {}".format(VENV_HASH_FILE)
+        )
+        return retcode == 0
+
     def deploy(self):
         """
         Deploy salt-thin
         """
-        self.shell.send(
-            self.thin, os.path.join(self.thin_dir, "salt-thin.tgz"),
-        )
+        if not self.check_venv_hash_file():
+            self.shell.send(
+                self.thin,
+                os.path.join(self.thin_dir, "salt-thin.tgz"),
+            )
         self.deploy_ext()
         return True
 
@@ -1045,7 +1154,8 @@ class Single:
         """
         if self.mods.get("file"):
             self.shell.send(
-                self.mods["file"], os.path.join(self.thin_dir, "salt-ext_mods.tgz"),
+                self.mods["file"],
+                os.path.join(self.thin_dir, "salt-ext_mods.tgz"),
             )
         return True
 
@@ -1062,8 +1172,9 @@ class Single:
         Returns tuple of (stdout, stderr, retcode)
         """
         stdout = stderr = retcode = None
+        raw_shell = self.opts.get("raw_shell", False)
 
-        if self.ssh_pre_flight:
+        if self.ssh_pre_flight and not raw_shell:
             if not self.opts.get("ssh_run_pre_flight", False) and self.check_thin_dir():
                 log.info(
                     "{} thin dir already exists. Not running ssh_pre_flight script".format(
@@ -1080,9 +1191,9 @@ class Single:
                 stdout, stderr, retcode = self.run_ssh_pre_flight()
                 if retcode != 0:
                     log.error(
-                        "Error running ssh_pre_flight script {}".format(
-                            self.ssh_pre_file
-                        )
+                        "Error running ssh_pre_flight script %s for host '%s'",
+                        self.ssh_pre_file,
+                        self.target["host"],
                     )
                     return stdout, stderr, retcode
                 log.info(
@@ -1091,7 +1202,7 @@ class Single:
                     )
                 )
 
-        if self.opts.get("raw_shell", False):
+        if raw_shell:
             cmd_str = " ".join([self._escape_arg(arg) for arg in self.argv])
             stdout, stderr, retcode = self.shell.exec_cmd(cmd_str)
 
@@ -1348,15 +1459,22 @@ ARGS = {arguments}\n'''.format(
 
         return cmd
 
-    def execute_script(self, script, extension="py", pre_dir=""):
+    def execute_script(self, script, extension="py", pre_dir="", script_args=None):
         """
         execute a script on the minion then delete
         """
+        args = ""
+        if script_args:
+            if not isinstance(script_args, (list, tuple)):
+                script_args = shlex.split(str(script_args))
+            args = " {}".format(" ".join([shlex.quote(str(el)) for el in script_args]))
         if extension == "ps1":
             ret = self.shell.exec_cmd('"powershell {}"'.format(script))
         else:
             if not self.winrm:
-                ret = self.shell.exec_cmd("/bin/sh '{}{}'".format(pre_dir, script))
+                ret = self.shell.exec_cmd(
+                    "/bin/sh '{}{}'{}".format(pre_dir, script, args)
+                )
             else:
                 ret = saltwinshell.call_python(self, script)
 
diff --git a/salt/client/ssh/client.py b/salt/client/ssh/client.py
index 7e2fbe2675..5b2e984904 100644
--- a/salt/client/ssh/client.py
+++ b/salt/client/ssh/client.py
@@ -108,7 +108,7 @@ class SSHClient:
         return sane_kwargs
 
     def _prep_ssh(
-        self, tgt, fun, arg=(), timeout=None, tgt_type="glob", kwarg=None, **kwargs
+        self, tgt, fun, arg=(), timeout=None, tgt_type="glob", kwarg=None, context=None, **kwargs
     ):
         """
         Prepare the arguments
@@ -123,7 +123,7 @@ class SSHClient:
         opts["selected_target_option"] = tgt_type
         opts["tgt"] = tgt
         opts["arg"] = arg
-        return salt.client.ssh.SSH(opts)
+        return salt.client.ssh.SSH(opts, context=context)
 
     def cmd_iter(
         self,
@@ -160,7 +160,7 @@ class SSHClient:
             final.update(ret)
         return final
 
-    def cmd_sync(self, low):
+    def cmd_sync(self, low, context=None):
         """
         Execute a salt-ssh call synchronously.
 
@@ -193,6 +193,7 @@ class SSHClient:
             low.get("timeout"),
             low.get("tgt_type"),
             low.get("kwarg"),
+            context=context,
             **kwargs
         )
 
diff --git a/salt/client/ssh/shell.py b/salt/client/ssh/shell.py
index fc39279213..a1fae007b7 100644
--- a/salt/client/ssh/shell.py
+++ b/salt/client/ssh/shell.py
@@ -440,6 +440,14 @@ class Shell:
                 if stdout:
                     old_stdout = stdout
                 time.sleep(0.01)
+            if term.exitstatus is None:
+                try:
+                    term.wait()
+                except:  # pylint: disable=broad-except
+                    # It's safe to put the broad exception handling here
+                    # as we just need to ensure the child process in term finished
+                    # to get proper term.exitstatus instead of None
+                    pass
             return ret_stdout, ret_stderr, term.exitstatus
         finally:
             term.close(terminate=True, kill=True)
diff --git a/salt/client/ssh/ssh_py_shim.py b/salt/client/ssh/ssh_py_shim.py
index 5ddd282ed0..08d10a1023 100644
--- a/salt/client/ssh/ssh_py_shim.py
+++ b/salt/client/ssh/ssh_py_shim.py
@@ -273,55 +273,72 @@ def main(argv):  # pylint: disable=W0613
     """
     Main program body
     """
-    thin_path = os.path.join(OPTIONS.saltdir, THIN_ARCHIVE)
-    if os.path.isfile(thin_path):
-        if OPTIONS.checksum != get_hash(thin_path, OPTIONS.hashfunc):
-            need_deployment()
-        unpack_thin(thin_path)
-        # Salt thin now is available to use
-    else:
-        if not sys.platform.startswith("win"):
-            scpstat = subprocess.Popen(["/bin/sh", "-c", "command -v scp"]).wait()
-            if scpstat != 0:
-                sys.exit(EX_SCP_NOT_FOUND)
-
-        if os.path.exists(OPTIONS.saltdir) and not os.path.isdir(OPTIONS.saltdir):
-            sys.stderr.write(
-                'ERROR: salt path "{0}" exists but is'
-                " not a directory\n".format(OPTIONS.saltdir)
-            )
-            sys.exit(EX_CANTCREAT)
-
-        if not os.path.exists(OPTIONS.saltdir):
-            need_deployment()
 
-        code_checksum_path = os.path.normpath(
-            os.path.join(OPTIONS.saltdir, "code-checksum")
-        )
-        if not os.path.exists(code_checksum_path) or not os.path.isfile(
-            code_checksum_path
-        ):
-            sys.stderr.write(
-                "WARNING: Unable to locate current code checksum: {0}.\n".format(
-                    code_checksum_path
+    virt_env = os.getenv("VIRTUAL_ENV", None)
+    # VIRTUAL_ENV environment variable is defined by venv-salt-minion wrapper
+    # it's used to check if the shim is running under this wrapper
+    venv_salt_call = None
+    if virt_env and "venv-salt-minion" in virt_env:
+        venv_salt_call = os.path.join(virt_env, "bin", "salt-call")
+        if not os.path.exists(venv_salt_call):
+            venv_salt_call = None
+        elif not os.path.exists(OPTIONS.saltdir):
+            os.makedirs(OPTIONS.saltdir)
+            cache_dir = os.path.join(OPTIONS.saltdir, "running_data", "var", "cache")
+            os.makedirs(os.path.join(cache_dir, "salt"))
+            os.symlink("salt", os.path.relpath(os.path.join(cache_dir, "venv-salt-minion")))
+
+    if venv_salt_call is None:
+        # Use Salt thin only if Salt Bundle (venv-salt-minion) is not available
+        thin_path = os.path.join(OPTIONS.saltdir, THIN_ARCHIVE)
+        if os.path.isfile(thin_path):
+            if OPTIONS.checksum != get_hash(thin_path, OPTIONS.hashfunc):
+                need_deployment()
+            unpack_thin(thin_path)
+            # Salt thin now is available to use
+        else:
+            if not sys.platform.startswith("win"):
+                scpstat = subprocess.Popen(["/bin/sh", "-c", "command -v scp"]).wait()
+                if scpstat != 0:
+                    sys.exit(EX_SCP_NOT_FOUND)
+
+            if os.path.exists(OPTIONS.saltdir) and not os.path.isdir(OPTIONS.saltdir):
+                sys.stderr.write(
+                    'ERROR: salt path "{0}" exists but is'
+                    " not a directory\n".format(OPTIONS.saltdir)
                 )
+                sys.exit(EX_CANTCREAT)
+
+            if not os.path.exists(OPTIONS.saltdir):
+                need_deployment()
+
+            code_checksum_path = os.path.normpath(
+                os.path.join(OPTIONS.saltdir, "code-checksum")
             )
-            need_deployment()
-        with open(code_checksum_path, "r") as vpo:
-            cur_code_cs = vpo.readline().strip()
-        if cur_code_cs != OPTIONS.code_checksum:
-            sys.stderr.write(
-                "WARNING: current code checksum {0} is different to {1}.\n".format(
-                    cur_code_cs, OPTIONS.code_checksum
+            if not os.path.exists(code_checksum_path) or not os.path.isfile(
+                code_checksum_path
+            ):
+                sys.stderr.write(
+                    "WARNING: Unable to locate current code checksum: {0}.\n".format(
+                        code_checksum_path
+                    )
                 )
-            )
-            need_deployment()
-        # Salt thin exists and is up-to-date - fall through and use it
+                need_deployment()
+            with open(code_checksum_path, "r") as vpo:
+                cur_code_cs = vpo.readline().strip()
+            if cur_code_cs != OPTIONS.code_checksum:
+                sys.stderr.write(
+                    "WARNING: current code checksum {0} is different to {1}.\n".format(
+                        cur_code_cs, OPTIONS.code_checksum
+                    )
+                )
+                need_deployment()
+            # Salt thin exists and is up-to-date - fall through and use it
 
-    salt_call_path = os.path.join(OPTIONS.saltdir, "salt-call")
-    if not os.path.isfile(salt_call_path):
-        sys.stderr.write('ERROR: thin is missing "{0}"\n'.format(salt_call_path))
-        need_deployment()
+        salt_call_path = os.path.join(OPTIONS.saltdir, "salt-call")
+        if not os.path.isfile(salt_call_path):
+            sys.stderr.write('ERROR: thin is missing "{0}"\n'.format(salt_call_path))
+            need_deployment()
 
     with open(os.path.join(OPTIONS.saltdir, "minion"), "w") as config:
         config.write(OPTIONS.config + "\n")
@@ -344,8 +361,8 @@ def main(argv):  # pylint: disable=W0613
         argv_prepared = ARGS
 
     salt_argv = [
-        get_executable(),
-        salt_call_path,
+        sys.executable if venv_salt_call is not None else get_executable(),
+        venv_salt_call if venv_salt_call is not None else salt_call_path,
         "--retcode-passthrough",
         "--local",
         "--metadata",
diff --git a/salt/loader.py b/salt/loader.py
index 02446b5ee1..e1570a146b 100644
--- a/salt/loader.py
+++ b/salt/loader.py
@@ -88,6 +88,18 @@ LIBCLOUD_FUNCS_NOT_SUPPORTED = (
 # Will be set to pyximport module at runtime if cython is enabled in config.
 pyximport = None
 
+LOAD_LOCK = threading.Lock()
+
+
+def LazyLoader(*args, **kwargs):
+    # This wrapper is used to prevent deadlocks with importlib (bsc#1182851)
+    # LOAD_LOCK is also used directly in salt.client.ssh.SSH
+    try:
+        LOAD_LOCK.acquire()
+        return _LazyLoader(*args, **kwargs)
+    finally:
+        LOAD_LOCK.release()
+
 
 def static_loader(
     opts,
@@ -516,16 +528,19 @@ def fileserver(opts, backends):
     )
 
 
-def roster(opts, runner=None, utils=None, whitelist=None):
+def roster(opts, runner=None, utils=None, whitelist=None, context=None):
     """
     Returns the roster modules
     """
+    if context is None:
+        context = {}
+
     return LazyLoader(
         _module_dirs(opts, "roster"),
         opts,
         tag="roster",
         whitelist=whitelist,
-        pack={"__runner__": runner, "__utils__": utils},
+        pack={"__runner__": runner, "__utils__": utils, "__context__": context},
         extra_module_dirs=utils.module_dirs if utils else None,
     )
 
@@ -657,7 +672,14 @@ def render(opts, functions, states=None, proxy=None, context=None):
     )
     rend = FilterDictWrapper(ret, ".render")
 
-    if not check_render_pipe_str(
+    def _check_render_pipe_str(pipestr, renderers, blacklist, whitelist):
+        try:
+            LOAD_LOCK.acquire()
+            return check_render_pipe_str(pipestr, renderers, blacklist, whitelist)
+        finally:
+            LOAD_LOCK.release()
+
+    if not _check_render_pipe_str(
         opts["renderer"], rend, opts["renderer_blacklist"], opts["renderer_whitelist"]
     ):
         err = (
@@ -1147,7 +1169,7 @@ class FilterDictWrapper(MutableMapping):
                 yield key.replace(self.suffix, "")
 
 
-class LazyLoader(salt.utils.lazy.LazyDict):
+class _LazyLoader(salt.utils.lazy.LazyDict):
     """
     A pseduo-dictionary which has a set of keys which are the
     name of the module and function, delimited by a dot. When
diff --git a/salt/netapi/__init__.py b/salt/netapi/__init__.py
index dec19b37ef..ffd63c90af 100644
--- a/salt/netapi/__init__.py
+++ b/salt/netapi/__init__.py
@@ -47,6 +47,7 @@ class NetapiClient:
         self.loadauth = salt.auth.LoadAuth(apiopts)
         self.key = salt.daemons.masterapi.access_keys(apiopts)
         self.ckminions = salt.utils.minions.CkMinions(apiopts)
+        self.context = {}
 
     def _is_master_running(self):
         """
@@ -207,7 +208,7 @@ class NetapiClient:
         ssh_client = salt.client.ssh.client.SSHClient(
             mopts=self.opts, disable_custom_roster=True
         )
-        return ssh_client.cmd_sync(kwargs)
+        return ssh_client.cmd_sync(kwargs, context=self.context)
 
     def runner(self, fun, timeout=None, full_return=False, **kwargs):
         """
diff --git a/salt/roster/__init__.py b/salt/roster/__init__.py
index 255f43bdb6..c15498edf0 100644
--- a/salt/roster/__init__.py
+++ b/salt/roster/__init__.py
@@ -61,7 +61,7 @@ class Roster:
     minion aware
     """
 
-    def __init__(self, opts, backends="flat"):
+    def __init__(self, opts, backends="flat", context=None):
         self.opts = opts
         if isinstance(backends, list):
             self.backends = backends
@@ -73,7 +73,9 @@ class Roster:
             self.backends = ["flat"]
         utils = salt.loader.utils(self.opts)
         runner = salt.loader.runner(self.opts, utils=utils)
-        self.rosters = salt.loader.roster(self.opts, runner=runner, utils=utils)
+        self.rosters = salt.loader.roster(
+            self.opts, runner=runner, utils=utils, context=context
+        )
 
     def _gen_back(self):
         """
diff --git a/tests/integration/ssh/test_pre_flight.py b/tests/integration/ssh/test_pre_flight.py
index 5ff5ffe527..37b493bf8d 100644
--- a/tests/integration/ssh/test_pre_flight.py
+++ b/tests/integration/ssh/test_pre_flight.py
@@ -25,10 +25,14 @@ class SSHPreFlightTest(SSHCase):
             RUNTIME_VARS.TMP, "test-pre-flight-script-worked.txt"
         )
 
-    def _create_roster(self):
-        self.custom_roster(self.roster, self.data)
+    def _create_roster(self, pre_flight_script_args=None):
+        data = dict(self.data)
+        if pre_flight_script_args:
+            data["ssh_pre_flight_args"] = pre_flight_script_args
 
-        with salt.utils.files.fopen(self.data["ssh_pre_flight"], "w") as fp_:
+        self.custom_roster(self.roster, data)
+
+        with salt.utils.files.fopen(data["ssh_pre_flight"], "w") as fp_:
             fp_.write("touch {}".format(self.test_script))
 
     @slowTest
@@ -58,6 +62,45 @@ class SSHPreFlightTest(SSHCase):
         )
         assert os.path.exists(self.test_script)
 
+    @slowTest
+    def test_ssh_run_pre_flight_args(self):
+        """
+        test ssh when --pre-flight is passed to salt-ssh
+        to ensure the script runs successfully passing some args
+        """
+        self._create_roster(pre_flight_script_args="foobar test")
+        # make sure we previously ran a command so the thin dir exists
+        self.run_function("test.ping", wipe=False)
+        assert not os.path.exists(self.test_script)
+
+        assert self.run_function(
+            "test.ping", ssh_opts="--pre-flight", roster_file=self.roster, wipe=False
+        )
+        assert os.path.exists(self.test_script)
+
+    @slowTest
+    def test_ssh_run_pre_flight_args_prevent_injection(self):
+        """
+        test ssh when --pre-flight is passed to salt-ssh
+        and evil arguments are used in order to produce shell injection
+        """
+        injected_file = os.path.join(RUNTIME_VARS.TMP, "injection")
+        self._create_roster(
+            pre_flight_script_args="foobar; echo injected > {}".format(injected_file)
+        )
+        # make sure we previously ran a command so the thin dir exists
+        self.run_function("test.ping", wipe=False)
+        assert not os.path.exists(self.test_script)
+        assert not os.path.isfile(injected_file)
+
+        assert self.run_function(
+            "test.ping", ssh_opts="--pre-flight", roster_file=self.roster, wipe=False
+        )
+
+        assert not os.path.isfile(
+            injected_file
+        ), "File injection suceeded. This shouldn't happend"
+
     @slowTest
     def test_ssh_run_pre_flight_failure(self):
         """
@@ -77,7 +120,12 @@ class SSHPreFlightTest(SSHCase):
         """
         make sure to clean up any old ssh directories
         """
-        files = [self.roster, self.data["ssh_pre_flight"], self.test_script]
+        files = [
+            self.roster,
+            self.data["ssh_pre_flight"],
+            self.test_script,
+            os.path.join(RUNTIME_VARS.TMP, "injection"),
+        ]
         for fp_ in files:
             if os.path.exists(fp_):
                 os.remove(fp_)
diff --git a/tests/unit/client/test_ssh.py b/tests/unit/client/test_ssh.py
index cc5eee71d2..75af4adc23 100644
--- a/tests/unit/client/test_ssh.py
+++ b/tests/unit/client/test_ssh.py
@@ -235,6 +235,41 @@ class SSHSingleTests(TestCase):
             mock_flight.assert_called()
             assert ret == cmd_ret
 
+    def test_run_with_pre_flight_with_args(self):
+        """
+        test Single.run() when ssh_pre_flight is set
+        and script successfully runs
+        """
+        target = self.target.copy()
+        target["ssh_pre_flight"] = os.path.join(RUNTIME_VARS.TMP, "script.sh")
+        target["ssh_pre_flight_args"] = "foobar"
+        single = ssh.Single(
+            self.opts,
+            self.opts["argv"],
+            "localhost",
+            mods={},
+            fsclient=None,
+            thin=salt.utils.thin.thin_path(self.opts["cachedir"]),
+            mine=False,
+            **target
+        )
+
+        cmd_ret = ("Success", "foobar", 0)
+        mock_flight = MagicMock(return_value=cmd_ret)
+        mock_cmd = MagicMock(return_value=cmd_ret)
+        patch_flight = patch("salt.client.ssh.Single.run_ssh_pre_flight", mock_flight)
+        patch_cmd = patch("salt.client.ssh.Single.cmd_block", mock_cmd)
+        patch_exec_cmd = patch(
+            "salt.client.ssh.shell.Shell.exec_cmd", return_value=("", "", 1)
+        )
+        patch_os = patch("os.path.exists", side_effect=[True])
+
+        with patch_os, patch_flight, patch_cmd, patch_exec_cmd:
+            ret = single.run()
+            mock_cmd.assert_called()
+            mock_flight.assert_called()
+            assert ret == cmd_ret
+
     def test_run_with_pre_flight_stderr(self):
         """
         test Single.run() when ssh_pre_flight is set
diff --git a/tests/unit/test_loader.py b/tests/unit/test_loader.py
index 5b23ad83e3..5d17af469f 100644
--- a/tests/unit/test_loader.py
+++ b/tests/unit/test_loader.py
@@ -1746,7 +1746,7 @@ class LazyLoaderRefreshFileMappingTest(TestCase):
         cls.funcs = salt.loader.minion_mods(cls.opts, utils=cls.utils, proxy=cls.proxy)
 
     def setUp(self):
-        class LazyLoaderMock(salt.loader.LazyLoader):
+        class LazyLoaderMock(salt.loader._LazyLoader):
             pass
 
         self.LOADER_CLASS = LazyLoaderMock
-- 
2.35.1


openSUSE Build Service is sponsored by