File fix-cve-2023-34049-bsc-1215157.patch of Package salt

From b2baafcc96a2807cf7d34374904e1710a4f58b9f Mon Sep 17 00:00:00 2001
From: Alexander Graul <agraul@suse.com>
Date: Tue, 31 Oct 2023 11:26:15 +0100
Subject: [PATCH] Fix CVE-2023-34049 (bsc#1215157)

Backport of https://github.com/saltstack/salt/pull/65482
---
 salt/client/ssh/__init__.py                   |  56 +++-
 tests/integration/modules/test_ssh.py         |   3 +-
 tests/integration/ssh/test_pre_flight.py      | 132 --------
 .../integration/ssh/test_pre_flight.py        | 315 ++++++++++++++++++
 tests/pytests/unit/client/ssh/test_single.py  | 296 +++++++++++++---
 tests/pytests/unit/client/ssh/test_ssh.py     | 110 ++++++
 6 files changed, 727 insertions(+), 185 deletions(-)
 delete mode 100644 tests/integration/ssh/test_pre_flight.py
 create mode 100644 tests/pytests/integration/ssh/test_pre_flight.py

diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py
index b120e0002e8..1e143f9e30c 100644
--- a/salt/client/ssh/__init__.py
+++ b/salt/client/ssh/__init__.py
@@ -12,9 +12,11 @@ import hashlib
 import logging
 import multiprocessing
 import os
+import pathlib
 import queue
 import re
 import shlex
+import shutil
 import subprocess
 import sys
 import tarfile
@@ -515,7 +517,14 @@ class SSH(MultiprocessingStateMixin):
             if target.get("passwd", False) or self.opts["ssh_passwd"]:
                 self._key_deploy_run(host, target, False)
             return ret
-        if ret[host].get("stderr", "").count("Permission denied"):
+        stderr = ret[host].get("stderr", "")
+        # -failed to upload file- is detecting scp errors
+        # Errors to ignore when Permission denied is in the stderr. For example
+        # scp can get a permission denied on the target host, but they where
+        # able to accurate authenticate against the box
+        ignore_err = ["failed to upload file"]
+        check_err = [x for x in ignore_err if stderr.count(x)]
+        if "Permission denied" in stderr and not check_err:
             target = self.targets[host]
             # permission denied, attempt to auto deploy ssh key
             print(
@@ -1137,11 +1146,30 @@ class Single:
         """
         Run our pre_flight script before running any ssh commands
         """
-        script = os.path.join(tempfile.gettempdir(), self.ssh_pre_file)
-
-        self.shell.send(self.ssh_pre_flight, script)
-
-        return self.execute_script(script, script_args=self.ssh_pre_flight_args)
+        with tempfile.NamedTemporaryFile() as temp:
+            # ensure we use copyfile to not copy the file attributes
+            # we want to ensure we use the perms set by the secure
+            # NamedTemporaryFile
+            try:
+                shutil.copyfile(self.ssh_pre_flight, temp.name)
+            except OSError as err:
+                return (
+                    "",
+                    "Could not copy pre flight script to temporary path",
+                    1,
+                )
+            target_script = f".{pathlib.Path(temp.name).name}"
+            log.trace("Copying the pre flight script to target")
+            stdout, stderr, retcode = self.shell.send(temp.name, target_script)
+            if retcode != 0:
+                # We could not copy the script to the target
+                log.error("Could not copy the pre flight script to target")
+                return stdout, stderr, retcode
+
+            log.trace("Executing the pre flight script on target")
+            return self.execute_script(
+                target_script, script_args=self.ssh_pre_flight_args
+            )
 
     def check_thin_dir(self):
         """
@@ -1531,18 +1559,20 @@ ARGS = {arguments}\n'''.format(
             return self.shell.exec_cmd(cmd_str)
 
         # Write the shim to a temporary file in the default temp directory
-        with tempfile.NamedTemporaryFile(
-            mode="w+b", prefix="shim_", delete=False
-        ) as shim_tmp_file:
+        with tempfile.NamedTemporaryFile(mode="w+b", delete=False) as shim_tmp_file:
             shim_tmp_file.write(salt.utils.stringutils.to_bytes(cmd_str))
 
         # Copy shim to target system, under $HOME/.<randomized name>
-        target_shim_file = ".{}.{}".format(
-            binascii.hexlify(os.urandom(6)).decode("ascii"), extension
-        )
+        target_shim_file = f".{pathlib.Path(shim_tmp_file.name).name}"
+
         if self.winrm:
             target_shim_file = saltwinshell.get_target_shim_file(self, target_shim_file)
-        self.shell.send(shim_tmp_file.name, target_shim_file, makedirs=True)
+        stdout, stderr, retcode = self.shell.send(
+            shim_tmp_file.name, target_shim_file, makedirs=True
+        )
+        if retcode != 0:
+            log.error("Could not copy the shim script to target")
+            return stdout, stderr, retcode
 
         # Remove our shim file
         try:
diff --git a/tests/integration/modules/test_ssh.py b/tests/integration/modules/test_ssh.py
index 0817877c86b..55586211622 100644
--- a/tests/integration/modules/test_ssh.py
+++ b/tests/integration/modules/test_ssh.py
@@ -26,7 +26,8 @@ def check_status():
         return False
 
 
-@pytest.mark.windows_whitelisted
+# @pytest.mark.windows_whitelisted
+# De-whitelist windows since it's hanging on the newer windows golden images
 @pytest.mark.skip_if_binaries_missing("ssh", "ssh-keygen", check_all=True)
 class SSHModuleTest(ModuleCase):
     """
diff --git a/tests/integration/ssh/test_pre_flight.py b/tests/integration/ssh/test_pre_flight.py
deleted file mode 100644
index 1598b3d51b5..00000000000
--- a/tests/integration/ssh/test_pre_flight.py
+++ /dev/null
@@ -1,132 +0,0 @@
-"""
-Test for ssh_pre_flight roster option
-"""
-
-import os
-
-import pytest
-
-import salt.utils.files
-from tests.support.case import SSHCase
-from tests.support.runtests import RUNTIME_VARS
-
-
-class SSHPreFlightTest(SSHCase):
-    """
-    Test ssh_pre_flight roster option
-    """
-
-    def setUp(self):
-        super().setUp()
-        self.roster = os.path.join(RUNTIME_VARS.TMP, "pre_flight_roster")
-        self.data = {
-            "ssh_pre_flight": os.path.join(RUNTIME_VARS.TMP, "ssh_pre_flight.sh")
-        }
-        self.test_script = os.path.join(
-            RUNTIME_VARS.TMP, "test-pre-flight-script-worked.txt"
-        )
-
-    def _create_roster(self, pre_flight_script_args=None):
-        data = dict(self.data)
-        if pre_flight_script_args:
-            data["ssh_pre_flight_args"] = pre_flight_script_args
-
-        self.custom_roster(self.roster, data)
-
-        with salt.utils.files.fopen(data["ssh_pre_flight"], "w") as fp_:
-            fp_.write("touch {}".format(self.test_script))
-
-    @pytest.mark.slow_test
-    def test_ssh_pre_flight(self):
-        """
-        test ssh when ssh_pre_flight is set
-        ensure the script runs successfully
-        """
-        self._create_roster()
-        assert self.run_function("test.ping", roster_file=self.roster)
-
-        assert os.path.exists(self.test_script)
-
-    @pytest.mark.slow_test
-    def test_ssh_run_pre_flight(self):
-        """
-        test ssh when --pre-flight is passed to salt-ssh
-        to ensure the script runs successfully
-        """
-        self._create_roster()
-        # make sure we previously ran a command so the thin dir exists
-        self.run_function("test.ping", wipe=False)
-        assert not os.path.exists(self.test_script)
-
-        assert self.run_function(
-            "test.ping", ssh_opts="--pre-flight", roster_file=self.roster, wipe=False
-        )
-        assert os.path.exists(self.test_script)
-
-    @pytest.mark.slow_test
-    def test_ssh_run_pre_flight_args(self):
-        """
-        test ssh when --pre-flight is passed to salt-ssh
-        to ensure the script runs successfully passing some args
-        """
-        self._create_roster(pre_flight_script_args="foobar test")
-        # make sure we previously ran a command so the thin dir exists
-        self.run_function("test.ping", wipe=False)
-        assert not os.path.exists(self.test_script)
-
-        assert self.run_function(
-            "test.ping", ssh_opts="--pre-flight", roster_file=self.roster, wipe=False
-        )
-        assert os.path.exists(self.test_script)
-
-    @pytest.mark.slow_test
-    def test_ssh_run_pre_flight_args_prevent_injection(self):
-        """
-        test ssh when --pre-flight is passed to salt-ssh
-        and evil arguments are used in order to produce shell injection
-        """
-        injected_file = os.path.join(RUNTIME_VARS.TMP, "injection")
-        self._create_roster(
-            pre_flight_script_args="foobar; echo injected > {}".format(injected_file)
-        )
-        # make sure we previously ran a command so the thin dir exists
-        self.run_function("test.ping", wipe=False)
-        assert not os.path.exists(self.test_script)
-        assert not os.path.isfile(injected_file)
-
-        assert self.run_function(
-            "test.ping", ssh_opts="--pre-flight", roster_file=self.roster, wipe=False
-        )
-
-        assert not os.path.isfile(
-            injected_file
-        ), "File injection suceeded. This shouldn't happend"
-
-    @pytest.mark.slow_test
-    def test_ssh_run_pre_flight_failure(self):
-        """
-        test ssh_pre_flight when there is a failure
-        in the script.
-        """
-        self._create_roster()
-        with salt.utils.files.fopen(self.data["ssh_pre_flight"], "w") as fp_:
-            fp_.write("exit 2")
-
-        ret = self.run_function(
-            "test.ping", ssh_opts="--pre-flight", roster_file=self.roster, wipe=False
-        )
-        assert ret["retcode"] == 2
-
-    def tearDown(self):
-        """
-        make sure to clean up any old ssh directories
-        """
-        files = [
-            self.roster,
-            self.data["ssh_pre_flight"],
-            self.test_script,
-            os.path.join(RUNTIME_VARS.TMP, "injection"),
-        ]
-        for fp_ in files:
-            if os.path.exists(fp_):
-                os.remove(fp_)
diff --git a/tests/pytests/integration/ssh/test_pre_flight.py b/tests/pytests/integration/ssh/test_pre_flight.py
new file mode 100644
index 00000000000..09c65d29430
--- /dev/null
+++ b/tests/pytests/integration/ssh/test_pre_flight.py
@@ -0,0 +1,315 @@
+"""
+Test for ssh_pre_flight roster option
+"""
+
+try:
+    import grp
+    import pwd
+except ImportError:
+    # windows stacktraces on import of these modules
+    pass
+import os
+import pathlib
+import shutil
+import subprocess
+
+import pytest
+import yaml
+from saltfactories.utils import random_string
+
+import salt.utils.files
+
+pytestmark = pytest.mark.skip_on_windows(reason="Salt-ssh not available on Windows")
+
+
+def _custom_roster(roster_file, roster_data):
+    with salt.utils.files.fopen(roster_file, "r") as fp:
+        data = salt.utils.yaml.safe_load(fp)
+    for key, item in roster_data.items():
+        data["localhost"][key] = item
+    with salt.utils.files.fopen(roster_file, "w") as fp:
+        yaml.safe_dump(data, fp)
+
+
+@pytest.fixture
+def _create_roster(salt_ssh_roster_file, tmp_path):
+    ret = {}
+    ret["roster"] = salt_ssh_roster_file
+    ret["data"] = {"ssh_pre_flight": str(tmp_path / "ssh_pre_flight.sh")}
+    ret["test_script"] = str(tmp_path / "test-pre-flight-script-worked.txt")
+    ret["thin_dir"] = tmp_path / "thin_dir"
+
+    with salt.utils.files.fopen(salt_ssh_roster_file, "r") as fp:
+        data = salt.utils.yaml.safe_load(fp)
+    pre_flight_script = ret["data"]["ssh_pre_flight"]
+    data["localhost"]["ssh_pre_flight"] = pre_flight_script
+    data["localhost"]["thin_dir"] = str(ret["thin_dir"])
+    with salt.utils.files.fopen(salt_ssh_roster_file, "w") as fp:
+        yaml.safe_dump(data, fp)
+
+    with salt.utils.files.fopen(pre_flight_script, "w") as fp:
+        fp.write("touch {}".format(ret["test_script"]))
+
+    yield ret
+    if ret["thin_dir"].exists():
+        shutil.rmtree(ret["thin_dir"])
+
+
+@pytest.mark.slow_test
+def test_ssh_pre_flight(salt_ssh_cli, caplog, _create_roster):
+    """
+    test ssh when ssh_pre_flight is set
+    ensure the script runs successfully
+    """
+    ret = salt_ssh_cli.run("test.ping")
+    assert ret.returncode == 0
+
+    assert pathlib.Path(_create_roster["test_script"]).exists()
+
+
+@pytest.mark.slow_test
+def test_ssh_run_pre_flight(salt_ssh_cli, _create_roster):
+    """
+    test ssh when --pre-flight is passed to salt-ssh
+    to ensure the script runs successfully
+    """
+    # make sure we previously ran a command so the thin dir exists
+    ret = salt_ssh_cli.run("test.ping")
+    assert pathlib.Path(_create_roster["test_script"]).exists()
+
+    # Now remeove the script to ensure pre_flight doesn't run
+    # without --pre-flight
+    pathlib.Path(_create_roster["test_script"]).unlink()
+
+    assert salt_ssh_cli.run("test.ping").returncode == 0
+    assert not pathlib.Path(_create_roster["test_script"]).exists()
+
+    # Now ensure
+    ret = salt_ssh_cli.run(
+        "test.ping",
+        "--pre-flight",
+    )
+    assert ret.returncode == 0
+    assert pathlib.Path(_create_roster["test_script"]).exists()
+
+
+@pytest.mark.slow_test
+def test_ssh_run_pre_flight_args(salt_ssh_cli, _create_roster):
+    """
+    test ssh when --pre-flight is passed to salt-ssh
+    to ensure the script runs successfully passing some args
+    """
+    _custom_roster(salt_ssh_cli.roster_file, {"ssh_pre_flight_args": "foobar test"})
+    # Create pre_flight script that accepts args
+    test_script = _create_roster["test_script"]
+    test_script_1 = pathlib.Path(test_script + "-foobar")
+    test_script_2 = pathlib.Path(test_script + "-test")
+    with salt.utils.files.fopen(_create_roster["data"]["ssh_pre_flight"], "w") as fp:
+        fp.write(
+            f"""
+        touch {str(test_script)}-$1
+        touch {str(test_script)}-$2
+        """
+        )
+    ret = salt_ssh_cli.run("test.ping")
+    assert ret.returncode == 0
+    assert test_script_1.exists()
+    assert test_script_2.exists()
+    pathlib.Path(test_script_1).unlink()
+    pathlib.Path(test_script_2).unlink()
+
+    ret = salt_ssh_cli.run("test.ping")
+    assert ret.returncode == 0
+    assert not test_script_1.exists()
+    assert not test_script_2.exists()
+
+    ret = salt_ssh_cli.run(
+        "test.ping",
+        "--pre-flight",
+    )
+    assert ret.returncode == 0
+    assert test_script_1.exists()
+    assert test_script_2.exists()
+
+
+@pytest.mark.slow_test
+def test_ssh_run_pre_flight_args_prevent_injection(
+    salt_ssh_cli, _create_roster, tmp_path
+):
+    """
+    test ssh when --pre-flight is passed to salt-ssh
+    and evil arguments are used in order to produce shell injection
+    """
+    injected_file = tmp_path / "injection"
+    _custom_roster(
+        salt_ssh_cli.roster_file,
+        {"ssh_pre_flight_args": f"foobar; echo injected > {str(injected_file)}"},
+    )
+    # Create pre_flight script that accepts args
+    test_script = _create_roster["test_script"]
+    test_script_1 = pathlib.Path(test_script + "-echo")
+    test_script_2 = pathlib.Path(test_script + "-foobar;")
+    with salt.utils.files.fopen(_create_roster["data"]["ssh_pre_flight"], "w") as fp:
+        fp.write(
+            f"""
+        touch {str(test_script)}-$1
+        touch {str(test_script)}-$2
+        """
+        )
+
+    # make sure we previously ran a command so the thin dir exists
+    ret = salt_ssh_cli.run("test.ping")
+    assert ret.returncode == 0
+    assert test_script_1.exists()
+    assert test_script_2.exists()
+    test_script_1.unlink()
+    test_script_2.unlink()
+    assert not injected_file.is_file()
+
+    ret = salt_ssh_cli.run(
+        "test.ping",
+        "--pre-flight",
+    )
+    assert ret.returncode == 0
+
+    assert test_script_1.exists()
+    assert test_script_2.exists()
+    assert not pathlib.Path(
+        injected_file
+    ).is_file(), "File injection suceeded. This shouldn't happend"
+
+
+@pytest.mark.flaky(max_runs=4)
+@pytest.mark.slow_test
+def test_ssh_run_pre_flight_failure(salt_ssh_cli, _create_roster):
+    """
+    test ssh_pre_flight when there is a failure
+    in the script.
+    """
+    with salt.utils.files.fopen(_create_roster["data"]["ssh_pre_flight"], "w") as fp_:
+        fp_.write("exit 2")
+
+    ret = salt_ssh_cli.run(
+        "test.ping",
+        "--pre-flight",
+    )
+    assert ret.data["retcode"] == 2
+
+
+@pytest.fixture
+def account():
+    username = random_string("test-account-", uppercase=False)
+    with pytest.helpers.create_account(username=username) as account:
+        yield account
+
+
+@pytest.mark.slow_test
+def test_ssh_pre_flight_script(salt_ssh_cli, caplog, _create_roster, tmp_path, account):
+    """
+    Test to ensure user cannot create and run a script
+    with the expected pre_flight script path on target.
+    """
+    try:
+        script = pathlib.Path.home() / "hacked"
+        tmp_preflight = pathlib.Path("/tmp", "ssh_pre_flight.sh")
+        tmp_preflight.write_text(f"touch {script}")
+        os.chown(tmp_preflight, account.info.uid, account.info.gid)
+        ret = salt_ssh_cli.run("test.ping")
+        assert not script.is_file()
+        assert ret.returncode == 0
+        assert ret.stdout == '{\n"localhost": true\n}\n'
+    finally:
+        for _file in [script, tmp_preflight]:
+            if _file.is_file():
+                _file.unlink()
+
+
+def demote(user_uid, user_gid):
+    def result():
+        # os.setgid does not remove group membership, so we remove them here so they are REALLY non-root
+        os.setgroups([])
+        os.setgid(user_gid)
+        os.setuid(user_uid)
+
+    return result
+
+
+@pytest.mark.slow_test
+def test_ssh_pre_flight_perms(salt_ssh_cli, caplog, _create_roster, account):
+    """
+    Test to ensure standard user cannot run pre flight script
+    on target when user sets wrong permissions (777) on
+    ssh_pre_flight script.
+    """
+    try:
+        script = pathlib.Path("/tmp", "itworked")
+        preflight = pathlib.Path("/ssh_pre_flight.sh")
+        preflight.write_text(f"touch {str(script)}")
+        tmp_preflight = pathlib.Path("/tmp", preflight.name)
+
+        _custom_roster(salt_ssh_cli.roster_file, {"ssh_pre_flight": str(preflight)})
+        preflight.chmod(0o0777)
+        run_script = pathlib.Path("/run_script")
+        run_script.write_text(
+            f"""
+        x=1
+        while [ $x -le 200000 ]; do
+            SCRIPT=`bash {str(tmp_preflight)} 2> /dev/null; echo $?`
+            if [ ${{SCRIPT}} == 0 ]; then
+                break
+            fi
+            x=$(( $x + 1 ))
+        done
+        """
+        )
+        run_script.chmod(0o0777)
+        # pylint: disable=W1509
+        ret = subprocess.Popen(
+            ["sh", f"{run_script}"],
+            preexec_fn=demote(account.info.uid, account.info.gid),
+            stdout=None,
+            stderr=None,
+            stdin=None,
+            universal_newlines=True,
+        )
+        # pylint: enable=W1509
+        ret = salt_ssh_cli.run("test.ping")
+        assert ret.returncode == 0
+
+        # Lets make sure a different user other than root
+        # Didn't run the script
+        assert os.stat(script).st_uid != account.info.uid
+        assert script.is_file()
+    finally:
+        for _file in [script, preflight, tmp_preflight, run_script]:
+            if _file.is_file():
+                _file.unlink()
+
+
+@pytest.mark.slow_test
+def test_ssh_run_pre_flight_target_file_perms(salt_ssh_cli, _create_roster, tmp_path):
+    """
+    test ssh_pre_flight to ensure the target pre flight script
+    has the correct perms
+    """
+    perms_file = tmp_path / "perms"
+    with salt.utils.files.fopen(_create_roster["data"]["ssh_pre_flight"], "w") as fp_:
+        fp_.write(
+            f"""
+        SCRIPT_NAME=$0
+        stat -L -c "%a %G %U" $SCRIPT_NAME > {perms_file}
+        """
+        )
+
+    ret = salt_ssh_cli.run(
+        "test.ping",
+        "--pre-flight",
+    )
+    assert ret.returncode == 0
+    with salt.utils.files.fopen(perms_file) as fp:
+        data = fp.read()
+    assert data.split()[0] == "600"
+    uid = os.getuid()
+    gid = os.getgid()
+    assert data.split()[1] == grp.getgrgid(gid).gr_name
+    assert data.split()[2] == pwd.getpwuid(uid).pw_name
diff --git a/tests/pytests/unit/client/ssh/test_single.py b/tests/pytests/unit/client/ssh/test_single.py
index f97519d5cc2..c88a1c2127f 100644
--- a/tests/pytests/unit/client/ssh/test_single.py
+++ b/tests/pytests/unit/client/ssh/test_single.py
@@ -1,6 +1,5 @@
-import os
+import logging
 import re
-import tempfile
 from textwrap import dedent
 
 import pytest
@@ -16,6 +15,8 @@ import salt.utils.yaml
 from salt.client import ssh
 from tests.support.mock import MagicMock, call, patch
 
+log = logging.getLogger(__name__)
+
 
 @pytest.fixture
 def opts(tmp_path):
@@ -59,7 +60,7 @@ def test_single_opts(opts, target):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     assert single.shell._ssh_opts() == ""
@@ -87,7 +88,7 @@ def test_run_with_pre_flight(opts, target, tmp_path):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     cmd_ret = ("Success", "", 0)
@@ -122,7 +123,7 @@ def test_run_with_pre_flight_with_args(opts, target, tmp_path):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     cmd_ret = ("Success", "foobar", 0)
@@ -156,7 +157,7 @@ def test_run_with_pre_flight_stderr(opts, target, tmp_path):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     cmd_ret = ("", "Error running script", 1)
@@ -190,7 +191,7 @@ def test_run_with_pre_flight_script_doesnot_exist(opts, target, tmp_path):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     cmd_ret = ("Success", "", 0)
@@ -224,7 +225,7 @@ def test_run_with_pre_flight_thin_dir_exists(opts, target, tmp_path):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     cmd_ret = ("", "", 0)
@@ -242,6 +243,39 @@ def test_run_with_pre_flight_thin_dir_exists(opts, target, tmp_path):
         assert ret == cmd_ret
 
 
+def test_run_ssh_pre_flight(opts, target, tmp_path):
+    """
+    test Single.run_ssh_pre_flight function
+    """
+    target["ssh_pre_flight"] = str(tmp_path / "script.sh")
+    single = ssh.Single(
+        opts,
+        opts["argv"],
+        "localhost",
+        mods={},
+        fsclient=None,
+        thin=salt.utils.thin.thin_path(opts["cachedir"]),
+        mine=False,
+        **target,
+    )
+
+    cmd_ret = ("Success", "", 0)
+    mock_flight = MagicMock(return_value=cmd_ret)
+    mock_cmd = MagicMock(return_value=cmd_ret)
+    patch_flight = patch("salt.client.ssh.Single.run_ssh_pre_flight", mock_flight)
+    patch_cmd = patch("salt.client.ssh.Single.cmd_block", mock_cmd)
+    patch_exec_cmd = patch(
+        "salt.client.ssh.shell.Shell.exec_cmd", return_value=("", "", 1)
+    )
+    patch_os = patch("os.path.exists", side_effect=[True])
+
+    with patch_os, patch_flight, patch_cmd, patch_exec_cmd:
+        ret = single.run()
+        mock_cmd.assert_called()
+        mock_flight.assert_called()
+        assert ret == cmd_ret
+
+
 def test_execute_script(opts, target, tmp_path):
     """
     test Single.execute_script()
@@ -255,7 +289,7 @@ def test_execute_script(opts, target, tmp_path):
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
         winrm=False,
-        **target
+        **target,
     )
 
     exp_ret = ("Success", "", 0)
@@ -273,7 +307,7 @@ def test_execute_script(opts, target, tmp_path):
         ] == mock_cmd.call_args_list
 
 
-def test_shim_cmd(opts, target):
+def test_shim_cmd(opts, target, tmp_path):
     """
     test Single.shim_cmd()
     """
@@ -287,7 +321,7 @@ def test_shim_cmd(opts, target):
         mine=False,
         winrm=False,
         tty=True,
-        **target
+        **target,
     )
 
     exp_ret = ("Success", "", 0)
@@ -295,21 +329,24 @@ def test_shim_cmd(opts, target):
     patch_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_cmd)
     patch_send = patch("salt.client.ssh.shell.Shell.send", return_value=("", "", 0))
     patch_rand = patch("os.urandom", return_value=b"5\xd9l\xca\xc2\xff")
+    tmp_file = tmp_path / "tmp_file"
+    mock_tmp = MagicMock()
+    patch_tmp = patch("tempfile.NamedTemporaryFile", mock_tmp)
+    mock_tmp.return_value.__enter__.return_value.name = tmp_file
 
-    with patch_cmd, patch_rand, patch_send:
+    with patch_cmd, patch_tmp, patch_send:
         ret = single.shim_cmd(cmd_str="echo test")
         assert ret == exp_ret
         assert [
-            call("/bin/sh '.35d96ccac2ff.py'"),
-            call("rm '.35d96ccac2ff.py'"),
+            call(f"/bin/sh '.{tmp_file.name}'"),
+            call(f"rm '.{tmp_file.name}'"),
         ] == mock_cmd.call_args_list
 
 
-def test_run_ssh_pre_flight(opts, target, tmp_path):
+def test_shim_cmd_copy_fails(opts, target, caplog):
     """
-    test Single.run_ssh_pre_flight
+    test Single.shim_cmd() when copying the file fails
     """
-    target["ssh_pre_flight"] = str(tmp_path / "script.sh")
     single = ssh.Single(
         opts,
         opts["argv"],
@@ -320,24 +357,202 @@ def test_run_ssh_pre_flight(opts, target, tmp_path):
         mine=False,
         winrm=False,
         tty=True,
-        **target
+        **target,
     )
 
-    exp_ret = ("Success", "", 0)
-    mock_cmd = MagicMock(return_value=exp_ret)
+    ret_cmd = ("Success", "", 0)
+    mock_cmd = MagicMock(return_value=ret_cmd)
     patch_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_cmd)
-    patch_send = patch("salt.client.ssh.shell.Shell.send", return_value=exp_ret)
-    exp_tmp = os.path.join(
-        tempfile.gettempdir(), os.path.basename(target["ssh_pre_flight"])
+    ret_send = ("", "General error in file copy", 1)
+    patch_send = patch("salt.client.ssh.shell.Shell.send", return_value=ret_send)
+    patch_rand = patch("os.urandom", return_value=b"5\xd9l\xca\xc2\xff")
+
+    with patch_cmd, patch_rand, patch_send:
+        ret = single.shim_cmd(cmd_str="echo test")
+        assert ret == ret_send
+        assert "Could not copy the shim script to target" in caplog.text
+        mock_cmd.assert_not_called()
+
+
+def test_run_ssh_pre_flight_no_connect(opts, target, tmp_path, caplog):
+    """
+    test Single.run_ssh_pre_flight when you
+    cannot connect to the target
+    """
+    pre_flight = tmp_path / "script.sh"
+    pre_flight.write_text("")
+    target["ssh_pre_flight"] = str(pre_flight)
+    single = ssh.Single(
+        opts,
+        opts["argv"],
+        "localhost",
+        mods={},
+        fsclient=None,
+        thin=salt.utils.thin.thin_path(opts["cachedir"]),
+        mine=False,
+        winrm=False,
+        tty=True,
+        **target,
     )
+    mock_exec_cmd = MagicMock(return_value=("", "", 1))
+    patch_exec_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_exec_cmd)
+    tmp_file = tmp_path / "tmp_file"
+    mock_tmp = MagicMock()
+    patch_tmp = patch("tempfile.NamedTemporaryFile", mock_tmp)
+    mock_tmp.return_value.__enter__.return_value.name = tmp_file
+    ret_send = (
+        "",
+        "ssh: connect to host 192.168.1.186 port 22: No route to host\nscp: Connection closed\n",
+        255,
+    )
+    send_mock = MagicMock(return_value=ret_send)
+    patch_send = patch("salt.client.ssh.shell.Shell.send", send_mock)
+
+    with caplog.at_level(logging.TRACE):
+        with patch_send, patch_exec_cmd, patch_tmp:
+            ret = single.run_ssh_pre_flight()
+    assert "Copying the pre flight script" in caplog.text
+    assert "Could not copy the pre flight script to target" in caplog.text
+    assert ret == ret_send
+    assert send_mock.call_args_list[0][0][0] == tmp_file
+    target_script = send_mock.call_args_list[0][0][1]
+    assert re.search(r".[a-z0-9]+", target_script)
+    mock_exec_cmd.assert_not_called()
+
+
+def test_run_ssh_pre_flight_permission_denied(opts, target, tmp_path):
+    """
+    test Single.run_ssh_pre_flight when you
+    cannot copy script to the target due to
+    a permission denied error
+    """
+    pre_flight = tmp_path / "script.sh"
+    pre_flight.write_text("")
+    target["ssh_pre_flight"] = str(pre_flight)
+    single = ssh.Single(
+        opts,
+        opts["argv"],
+        "localhost",
+        mods={},
+        fsclient=None,
+        thin=salt.utils.thin.thin_path(opts["cachedir"]),
+        mine=False,
+        winrm=False,
+        tty=True,
+        **target,
+    )
+    mock_exec_cmd = MagicMock(return_value=("", "", 1))
+    patch_exec_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_exec_cmd)
+    tmp_file = tmp_path / "tmp_file"
+    mock_tmp = MagicMock()
+    patch_tmp = patch("tempfile.NamedTemporaryFile", mock_tmp)
+    mock_tmp.return_value.__enter__.return_value.name = tmp_file
+    ret_send = (
+        "",
+        'scp: dest open "/tmp/preflight.sh": Permission denied\nscp: failed to upload file /etc/salt/preflight.sh to /tmp/preflight.sh\n',
+        255,
+    )
+    send_mock = MagicMock(return_value=ret_send)
+    patch_send = patch("salt.client.ssh.shell.Shell.send", send_mock)
 
-    with patch_cmd, patch_send:
+    with patch_send, patch_exec_cmd, patch_tmp:
         ret = single.run_ssh_pre_flight()
-        assert ret == exp_ret
-        assert [
-            call("/bin/sh '{}'".format(exp_tmp)),
-            call("rm '{}'".format(exp_tmp)),
-        ] == mock_cmd.call_args_list
+    assert ret == ret_send
+    assert send_mock.call_args_list[0][0][0] == tmp_file
+    target_script = send_mock.call_args_list[0][0][1]
+    assert re.search(r".[a-z0-9]+", target_script)
+    mock_exec_cmd.assert_not_called()
+
+
+def test_run_ssh_pre_flight_connect(opts, target, tmp_path, caplog):
+    """
+    test Single.run_ssh_pre_flight when you
+    can connect to the target
+    """
+    pre_flight = tmp_path / "script.sh"
+    pre_flight.write_text("")
+    target["ssh_pre_flight"] = str(pre_flight)
+    single = ssh.Single(
+        opts,
+        opts["argv"],
+        "localhost",
+        mods={},
+        fsclient=None,
+        thin=salt.utils.thin.thin_path(opts["cachedir"]),
+        mine=False,
+        winrm=False,
+        tty=True,
+        **target,
+    )
+    ret_exec_cmd = ("", "", 1)
+    mock_exec_cmd = MagicMock(return_value=ret_exec_cmd)
+    patch_exec_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_exec_cmd)
+    tmp_file = tmp_path / "tmp_file"
+    mock_tmp = MagicMock()
+    patch_tmp = patch("tempfile.NamedTemporaryFile", mock_tmp)
+    mock_tmp.return_value.__enter__.return_value.name = tmp_file
+    ret_send = (
+        "",
+        "\rroot@192.168.1.187's password: \n\rpreflight.sh 0%    0 0.0KB/s   --:-- ETA\rpreflight.sh 100%   20     2.7KB/s   00:00 \n",
+        0,
+    )
+    send_mock = MagicMock(return_value=ret_send)
+    patch_send = patch("salt.client.ssh.shell.Shell.send", send_mock)
+
+    with caplog.at_level(logging.TRACE):
+        with patch_send, patch_exec_cmd, patch_tmp:
+            ret = single.run_ssh_pre_flight()
+
+    assert "Executing the pre flight script on target" in caplog.text
+    assert ret == ret_exec_cmd
+    assert send_mock.call_args_list[0][0][0] == tmp_file
+    target_script = send_mock.call_args_list[0][0][1]
+    assert re.search(r".[a-z0-9]+", target_script)
+    mock_exec_cmd.assert_called()
+
+
+def test_run_ssh_pre_flight_shutil_fails(opts, target, tmp_path):
+    """
+    test Single.run_ssh_pre_flight when cannot
+    copyfile with shutil
+    """
+    pre_flight = tmp_path / "script.sh"
+    pre_flight.write_text("")
+    target["ssh_pre_flight"] = str(pre_flight)
+    single = ssh.Single(
+        opts,
+        opts["argv"],
+        "localhost",
+        mods={},
+        fsclient=None,
+        thin=salt.utils.thin.thin_path(opts["cachedir"]),
+        mine=False,
+        winrm=False,
+        tty=True,
+        **target,
+    )
+    ret_exec_cmd = ("", "", 1)
+    mock_exec_cmd = MagicMock(return_value=ret_exec_cmd)
+    patch_exec_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_exec_cmd)
+    tmp_file = tmp_path / "tmp_file"
+    mock_tmp = MagicMock()
+    patch_tmp = patch("tempfile.NamedTemporaryFile", mock_tmp)
+    mock_tmp.return_value.__enter__.return_value.name = tmp_file
+    send_mock = MagicMock()
+    mock_shutil = MagicMock(side_effect=IOError("Permission Denied"))
+    patch_shutil = patch("shutil.copyfile", mock_shutil)
+    patch_send = patch("salt.client.ssh.shell.Shell.send", send_mock)
+
+    with patch_send, patch_exec_cmd, patch_tmp, patch_shutil:
+        ret = single.run_ssh_pre_flight()
+
+    assert ret == (
+        "",
+        "Could not copy pre flight script to temporary path",
+        1,
+    )
+    mock_exec_cmd.assert_not_called()
+    send_mock.assert_not_called()
 
 
 @pytest.mark.skip_on_windows(reason="SSH_PY_SHIM not set on windows")
@@ -355,7 +570,7 @@ def test_cmd_run_set_path(opts, target):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     ret = single._cmd_str()
@@ -376,7 +591,7 @@ def test_cmd_run_not_set_path(opts, target):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     ret = single._cmd_str()
@@ -395,7 +610,7 @@ def test_cmd_block_python_version_error(opts, target):
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
         winrm=False,
-        **target
+        **target,
     )
     mock_shim = MagicMock(
         return_value=(("", "ERROR: Unable to locate appropriate python command\n", 10))
@@ -434,7 +649,9 @@ def test_run_with_pre_flight_args(opts, target, test_opts, tmp_path):
     and script successfully runs
     """
     opts["ssh_run_pre_flight"] = True
-    target["ssh_pre_flight"] = str(tmp_path / "script.sh")
+    pre_flight_script = tmp_path / "script.sh"
+    pre_flight_script.write_text("")
+    target["ssh_pre_flight"] = str(pre_flight_script)
 
     if test_opts[0] is not None:
         target["ssh_pre_flight_args"] = test_opts[0]
@@ -448,7 +665,7 @@ def test_run_with_pre_flight_args(opts, target, test_opts, tmp_path):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     cmd_ret = ("Success", "", 0)
@@ -456,14 +673,15 @@ def test_run_with_pre_flight_args(opts, target, test_opts, tmp_path):
     mock_exec_cmd = MagicMock(return_value=("", "", 0))
     patch_cmd = patch("salt.client.ssh.Single.cmd_block", mock_cmd)
     patch_exec_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_exec_cmd)
-    patch_shell_send = patch("salt.client.ssh.shell.Shell.send", return_value=None)
+    patch_shell_send = patch(
+        "salt.client.ssh.shell.Shell.send", return_value=("", "", 0)
+    )
     patch_os = patch("os.path.exists", side_effect=[True])
 
     with patch_os, patch_cmd, patch_exec_cmd, patch_shell_send:
-        ret = single.run()
-        assert mock_exec_cmd.mock_calls[0].args[
-            0
-        ] == "/bin/sh '/tmp/script.sh'{}".format(expected_args)
+        single.run()
+        script_args = mock_exec_cmd.mock_calls[0].args[0]
+        assert re.search(r"\/bin\/sh '.[a-z0-9]+", script_args)
 
 
 @pytest.mark.slow_test
diff --git a/tests/pytests/unit/client/ssh/test_ssh.py b/tests/pytests/unit/client/ssh/test_ssh.py
index 377aad9998c..cece16026cf 100644
--- a/tests/pytests/unit/client/ssh/test_ssh.py
+++ b/tests/pytests/unit/client/ssh/test_ssh.py
@@ -339,3 +339,113 @@ def test_extra_filerefs(tmp_path, opts):
     with patch("salt.roster.get_roster_file", MagicMock(return_value=roster)):
         ssh_obj = client._prep_ssh(**ssh_opts)
         assert ssh_obj.opts.get("extra_filerefs", None) == "salt://foobar"
+
+
+def test_key_deploy_permission_denied_scp(tmp_path, opts):
+    """
+    test "key_deploy" function when
+    permission denied authentication error
+    when attempting to use scp to copy file
+    to target
+    """
+    host = "localhost"
+    passwd = "password"
+    usr = "ssh-usr"
+    opts["ssh_user"] = usr
+    opts["tgt"] = host
+
+    ssh_ret = {
+        host: {
+            "stdout": "\rroot@192.168.1.187's password: \n\rroot@192.168.1.187's password: \n\rroot@192.168.1.187's password: \n",
+            "stderr": "Permission denied, please try again.\nPermission denied, please try again.\nroot@192.168.1.187: Permission denied (publickey,gssapi-keyex,gssapi-with-micimport pudb; pu.dbassword).\nscp: Connection closed\n",
+            "retcode": 255,
+        }
+    }
+    key_run_ret = {
+        "localhost": {
+            "jid": "20230922155652279959",
+            "return": "test",
+            "retcode": 0,
+            "id": "test",
+            "fun": "cmd.run",
+            "fun_args": ["echo test"],
+        }
+    }
+    patch_roster_file = patch("salt.roster.get_roster_file", MagicMock(return_value=""))
+    with patch_roster_file:
+        client = ssh.SSH(opts)
+    patch_input = patch("builtins.input", side_effect=["y"])
+    patch_getpass = patch("getpass.getpass", return_value=["password"])
+    mock_key_run = MagicMock(return_value=key_run_ret)
+    patch_key_run = patch("salt.client.ssh.SSH._key_deploy_run", mock_key_run)
+    with patch_input, patch_getpass, patch_key_run:
+        ret = client.key_deploy(host, ssh_ret)
+    assert mock_key_run.call_args_list[0][0] == (
+        host,
+        {"passwd": [passwd], "host": host, "user": usr},
+        True,
+    )
+    assert ret == key_run_ret
+    assert mock_key_run.call_count == 1
+
+
+def test_key_deploy_permission_denied_file_scp(tmp_path, opts):
+    """
+    test "key_deploy" function when permission denied
+    due to not having access to copy the file to the target
+    We do not want to deploy the key, because this is not
+    an authentication to the target error.
+    """
+    host = "localhost"
+    passwd = "password"
+    usr = "ssh-usr"
+    opts["ssh_user"] = usr
+    opts["tgt"] = host
+
+    mock_key_run = MagicMock(return_value=False)
+    patch_key_run = patch("salt.client.ssh.SSH._key_deploy_run", mock_key_run)
+
+    ssh_ret = {
+        "localhost": {
+            "stdout": "",
+            "stderr": 'scp: dest open "/tmp/preflight.sh": Permission denied\nscp: failed to upload file /etc/salt/preflight.sh to /tmp/preflight.sh\n',
+            "retcode": 1,
+        }
+    }
+    patch_roster_file = patch("salt.roster.get_roster_file", MagicMock(return_value=""))
+    with patch_roster_file:
+        client = ssh.SSH(opts)
+    ret = client.key_deploy(host, ssh_ret)
+    assert ret == ssh_ret
+    assert mock_key_run.call_count == 0
+
+
+def test_key_deploy_no_permission_denied(tmp_path, opts):
+    """
+    test "key_deploy" function when no permission denied
+    is returned
+    """
+    host = "localhost"
+    passwd = "password"
+    usr = "ssh-usr"
+    opts["ssh_user"] = usr
+    opts["tgt"] = host
+
+    mock_key_run = MagicMock(return_value=False)
+    patch_key_run = patch("salt.client.ssh.SSH._key_deploy_run", mock_key_run)
+    ssh_ret = {
+        "localhost": {
+            "jid": "20230922161937998385",
+            "return": "test",
+            "retcode": 0,
+            "id": "test",
+            "fun": "cmd.run",
+            "fun_args": ["echo test"],
+        }
+    }
+    patch_roster_file = patch("salt.roster.get_roster_file", MagicMock(return_value=""))
+    with patch_roster_file:
+        client = ssh.SSH(opts)
+    ret = client.key_deploy(host, ssh_ret)
+    assert ret == ssh_ret
+    assert mock_key_run.call_count == 0
-- 
2.42.0

openSUSE Build Service is sponsored by