We have some news to share for the request index beta feature. We’ve added more options to sort your requests, counters to the individual filters and documentation for the search functionality. Checkout the blog post for more details.

File fix-cve-2023-34049-bsc-1215157.patch of Package salt

From b2baafcc96a2807cf7d34374904e1710a4f58b9f Mon Sep 17 00:00:00 2001
From: Alexander Graul <agraul@suse.com>
Date: Tue, 31 Oct 2023 11:26:15 +0100
Subject: [PATCH] Fix CVE-2023-34049 (bsc#1215157)

Backport of https://github.com/saltstack/salt/pull/65482
---
 salt/client/ssh/__init__.py                   |  56 +++-
 tests/integration/modules/test_ssh.py         |   3 +-
 tests/integration/ssh/test_pre_flight.py      | 132 --------
 .../integration/ssh/test_pre_flight.py        | 315 ++++++++++++++++++
 tests/pytests/unit/client/ssh/test_single.py  | 296 +++++++++++++---
 tests/pytests/unit/client/ssh/test_ssh.py     | 110 ++++++
 6 files changed, 727 insertions(+), 185 deletions(-)
 delete mode 100644 tests/integration/ssh/test_pre_flight.py
 create mode 100644 tests/pytests/integration/ssh/test_pre_flight.py

diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py
index b120e0002e8..1e143f9e30c 100644
--- a/salt/client/ssh/__init__.py
+++ b/salt/client/ssh/__init__.py
@@ -12,9 +12,11 @@ import hashlib
 import logging
 import multiprocessing
 import os
+import pathlib
 import queue
 import re
 import shlex
+import shutil
 import subprocess
 import sys
 import tarfile
@@ -515,7 +517,14 @@ class SSH(MultiprocessingStateMixin):
             if target.get("passwd", False) or self.opts["ssh_passwd"]:
                 self._key_deploy_run(host, target, False)
             return ret
-        if ret[host].get("stderr", "").count("Permission denied"):
+        stderr = ret[host].get("stderr", "")
+        # -failed to upload file- is detecting scp errors
+        # Errors to ignore when Permission denied is in the stderr. For example
+        # scp can get a permission denied on the target host, but they where
+        # able to accurate authenticate against the box
+        ignore_err = ["failed to upload file"]
+        check_err = [x for x in ignore_err if stderr.count(x)]
+        if "Permission denied" in stderr and not check_err:
             target = self.targets[host]
             # permission denied, attempt to auto deploy ssh key
             print(
@@ -1137,11 +1146,30 @@ class Single:
         """
         Run our pre_flight script before running any ssh commands
         """
-        script = os.path.join(tempfile.gettempdir(), self.ssh_pre_file)
-
-        self.shell.send(self.ssh_pre_flight, script)
-
-        return self.execute_script(script, script_args=self.ssh_pre_flight_args)
+        with tempfile.NamedTemporaryFile() as temp:
+            # ensure we use copyfile to not copy the file attributes
+            # we want to ensure we use the perms set by the secure
+            # NamedTemporaryFile
+            try:
+                shutil.copyfile(self.ssh_pre_flight, temp.name)
+            except OSError as err:
+                return (
+                    "",
+                    "Could not copy pre flight script to temporary path",
+                    1,
+                )
+            target_script = f".{pathlib.Path(temp.name).name}"
+            log.trace("Copying the pre flight script to target")
+            stdout, stderr, retcode = self.shell.send(temp.name, target_script)
+            if retcode != 0:
+                # We could not copy the script to the target
+                log.error("Could not copy the pre flight script to target")
+                return stdout, stderr, retcode
+
+            log.trace("Executing the pre flight script on target")
+            return self.execute_script(
+                target_script, script_args=self.ssh_pre_flight_args
+            )
 
     def check_thin_dir(self):
         """
@@ -1531,18 +1559,20 @@ ARGS = {arguments}\n'''.format(
             return self.shell.exec_cmd(cmd_str)
 
         # Write the shim to a temporary file in the default temp directory
-        with tempfile.NamedTemporaryFile(
-            mode="w+b", prefix="shim_", delete=False
-        ) as shim_tmp_file:
+        with tempfile.NamedTemporaryFile(mode="w+b", delete=False) as shim_tmp_file:
             shim_tmp_file.write(salt.utils.stringutils.to_bytes(cmd_str))
 
         # Copy shim to target system, under $HOME/.<randomized name>
-        target_shim_file = ".{}.{}".format(
-            binascii.hexlify(os.urandom(6)).decode("ascii"), extension
-        )
+        target_shim_file = f".{pathlib.Path(shim_tmp_file.name).name}"
+
         if self.winrm:
             target_shim_file = saltwinshell.get_target_shim_file(self, target_shim_file)
-        self.shell.send(shim_tmp_file.name, target_shim_file, makedirs=True)
+        stdout, stderr, retcode = self.shell.send(
+            shim_tmp_file.name, target_shim_file, makedirs=True
+        )
+        if retcode != 0:
+            log.error("Could not copy the shim script to target")
+            return stdout, stderr, retcode
 
         # Remove our shim file
         try:
diff --git a/tests/integration/modules/test_ssh.py b/tests/integration/modules/test_ssh.py
index 0817877c86b..55586211622 100644
--- a/tests/integration/modules/test_ssh.py
+++ b/tests/integration/modules/test_ssh.py
@@ -26,7 +26,8 @@ def check_status():
         return False
 
 
-@pytest.mark.windows_whitelisted
+# @pytest.mark.windows_whitelisted
+# De-whitelist windows since it's hanging on the newer windows golden images
 @pytest.mark.skip_if_binaries_missing("ssh", "ssh-keygen", check_all=True)
 class SSHModuleTest(ModuleCase):
     """
diff --git a/tests/integration/ssh/test_pre_flight.py b/tests/integration/ssh/test_pre_flight.py
deleted file mode 100644
index 1598b3d51b5..00000000000
--- a/tests/integration/ssh/test_pre_flight.py
+++ /dev/null
@@ -1,132 +0,0 @@
-"""
-Test for ssh_pre_flight roster option
-"""
-
-import os
-
-import pytest
-
-import salt.utils.files
-from tests.support.case import SSHCase
-from tests.support.runtests import RUNTIME_VARS
-
-
-class SSHPreFlightTest(SSHCase):
-    """
-    Test ssh_pre_flight roster option
-    """
-
-    def setUp(self):
-        super().setUp()
-        self.roster = os.path.join(RUNTIME_VARS.TMP, "pre_flight_roster")
-        self.data = {
-            "ssh_pre_flight": os.path.join(RUNTIME_VARS.TMP, "ssh_pre_flight.sh")
-        }
-        self.test_script = os.path.join(
-            RUNTIME_VARS.TMP, "test-pre-flight-script-worked.txt"
-        )
-
-    def _create_roster(self, pre_flight_script_args=None):
-        data = dict(self.data)
-        if pre_flight_script_args:
-            data["ssh_pre_flight_args"] = pre_flight_script_args
-
-        self.custom_roster(self.roster, data)
-
-        with salt.utils.files.fopen(data["ssh_pre_flight"], "w") as fp_:
-            fp_.write("touch {}".format(self.test_script))
-
-    @pytest.mark.slow_test
-    def test_ssh_pre_flight(self):
-        """
-        test ssh when ssh_pre_flight is set
-        ensure the script runs successfully
-        """
-        self._create_roster()
-        assert self.run_function("test.ping", roster_file=self.roster)
-
-        assert os.path.exists(self.test_script)
-
-    @pytest.mark.slow_test
-    def test_ssh_run_pre_flight(self):
-        """
-        test ssh when --pre-flight is passed to salt-ssh
-        to ensure the script runs successfully
-        """
-        self._create_roster()
-        # make sure we previously ran a command so the thin dir exists
-        self.run_function("test.ping", wipe=False)
-        assert not os.path.exists(self.test_script)
-
-        assert self.run_function(
-            "test.ping", ssh_opts="--pre-flight", roster_file=self.roster, wipe=False
-        )
-        assert os.path.exists(self.test_script)
-
-    @pytest.mark.slow_test
-    def test_ssh_run_pre_flight_args(self):
-        """
-        test ssh when --pre-flight is passed to salt-ssh
-        to ensure the script runs successfully passing some args
-        """
-        self._create_roster(pre_flight_script_args="foobar test")
-        # make sure we previously ran a command so the thin dir exists
-        self.run_function("test.ping", wipe=False)
-        assert not os.path.exists(self.test_script)
-
-        assert self.run_function(
-            "test.ping", ssh_opts="--pre-flight", roster_file=self.roster, wipe=False
-        )
-        assert os.path.exists(self.test_script)
-
-    @pytest.mark.slow_test
-    def test_ssh_run_pre_flight_args_prevent_injection(self):
-        """
-        test ssh when --pre-flight is passed to salt-ssh
-        and evil arguments are used in order to produce shell injection
-        """
-        injected_file = os.path.join(RUNTIME_VARS.TMP, "injection")
-        self._create_roster(
-            pre_flight_script_args="foobar; echo injected > {}".format(injected_file)
-        )
-        # make sure we previously ran a command so the thin dir exists
-        self.run_function("test.ping", wipe=False)
-        assert not os.path.exists(self.test_script)
-        assert not os.path.isfile(injected_file)
-
-        assert self.run_function(
-            "test.ping", ssh_opts="--pre-flight", roster_file=self.roster, wipe=False
-        )
-
-        assert not os.path.isfile(
-            injected_file
-        ), "File injection suceeded. This shouldn't happend"
-
-    @pytest.mark.slow_test
-    def test_ssh_run_pre_flight_failure(self):
-        """
-        test ssh_pre_flight when there is a failure
-        in the script.
-        """
-        self._create_roster()
-        with salt.utils.files.fopen(self.data["ssh_pre_flight"], "w") as fp_:
-            fp_.write("exit 2")
-
-        ret = self.run_function(
-            "test.ping", ssh_opts="--pre-flight", roster_file=self.roster, wipe=False
-        )
-        assert ret["retcode"] == 2
-
-    def tearDown(self):
-        """
-        make sure to clean up any old ssh directories
-        """
-        files = [
-            self.roster,
-            self.data["ssh_pre_flight"],
-            self.test_script,
-            os.path.join(RUNTIME_VARS.TMP, "injection"),
-        ]
-        for fp_ in files:
-            if os.path.exists(fp_):
-                os.remove(fp_)
diff --git a/tests/pytests/integration/ssh/test_pre_flight.py b/tests/pytests/integration/ssh/test_pre_flight.py
new file mode 100644
index 00000000000..09c65d29430
--- /dev/null
+++ b/tests/pytests/integration/ssh/test_pre_flight.py
@@ -0,0 +1,315 @@
+"""
+Test for ssh_pre_flight roster option
+"""
+
+try:
+    import grp
+    import pwd
+except ImportError:
+    # windows stacktraces on import of these modules
+    pass
+import os
+import pathlib
+import shutil
+import subprocess
+
+import pytest
+import yaml
+from saltfactories.utils import random_string
+
+import salt.utils.files
+
+pytestmark = pytest.mark.skip_on_windows(reason="Salt-ssh not available on Windows")
+
+
+def _custom_roster(roster_file, roster_data):
+    with salt.utils.files.fopen(roster_file, "r") as fp:
+        data = salt.utils.yaml.safe_load(fp)
+    for key, item in roster_data.items():
+        data["localhost"][key] = item
+    with salt.utils.files.fopen(roster_file, "w") as fp:
+        yaml.safe_dump(data, fp)
+
+
+@pytest.fixture
+def _create_roster(salt_ssh_roster_file, tmp_path):
+    ret = {}
+    ret["roster"] = salt_ssh_roster_file
+    ret["data"] = {"ssh_pre_flight": str(tmp_path / "ssh_pre_flight.sh")}
+    ret["test_script"] = str(tmp_path / "test-pre-flight-script-worked.txt")
+    ret["thin_dir"] = tmp_path / "thin_dir"
+
+    with salt.utils.files.fopen(salt_ssh_roster_file, "r") as fp:
+        data = salt.utils.yaml.safe_load(fp)
+    pre_flight_script = ret["data"]["ssh_pre_flight"]
+    data["localhost"]["ssh_pre_flight"] = pre_flight_script
+    data["localhost"]["thin_dir"] = str(ret["thin_dir"])
+    with salt.utils.files.fopen(salt_ssh_roster_file, "w") as fp:
+        yaml.safe_dump(data, fp)
+
+    with salt.utils.files.fopen(pre_flight_script, "w") as fp:
+        fp.write("touch {}".format(ret["test_script"]))
+
+    yield ret
+    if ret["thin_dir"].exists():
+        shutil.rmtree(ret["thin_dir"])
+
+
+@pytest.mark.slow_test
+def test_ssh_pre_flight(salt_ssh_cli, caplog, _create_roster):
+    """
+    test ssh when ssh_pre_flight is set
+    ensure the script runs successfully
+    """
+    ret = salt_ssh_cli.run("test.ping")
+    assert ret.returncode == 0
+
+    assert pathlib.Path(_create_roster["test_script"]).exists()
+
+
+@pytest.mark.slow_test
+def test_ssh_run_pre_flight(salt_ssh_cli, _create_roster):
+    """
+    test ssh when --pre-flight is passed to salt-ssh
+    to ensure the script runs successfully
+    """
+    # make sure we previously ran a command so the thin dir exists
+    ret = salt_ssh_cli.run("test.ping")
+    assert pathlib.Path(_create_roster["test_script"]).exists()
+
+    # Now remeove the script to ensure pre_flight doesn't run
+    # without --pre-flight
+    pathlib.Path(_create_roster["test_script"]).unlink()
+
+    assert salt_ssh_cli.run("test.ping").returncode == 0
+    assert not pathlib.Path(_create_roster["test_script"]).exists()
+
+    # Now ensure
+    ret = salt_ssh_cli.run(
+        "test.ping",
+        "--pre-flight",
+    )
+    assert ret.returncode == 0
+    assert pathlib.Path(_create_roster["test_script"]).exists()
+
+
+@pytest.mark.slow_test
+def test_ssh_run_pre_flight_args(salt_ssh_cli, _create_roster):
+    """
+    test ssh when --pre-flight is passed to salt-ssh
+    to ensure the script runs successfully passing some args
+    """
+    _custom_roster(salt_ssh_cli.roster_file, {"ssh_pre_flight_args": "foobar test"})
+    # Create pre_flight script that accepts args
+    test_script = _create_roster["test_script"]
+    test_script_1 = pathlib.Path(test_script + "-foobar")
+    test_script_2 = pathlib.Path(test_script + "-test")
+    with salt.utils.files.fopen(_create_roster["data"]["ssh_pre_flight"], "w") as fp:
+        fp.write(
+            f"""
+        touch {str(test_script)}-$1
+        touch {str(test_script)}-$2
+        """
+        )
+    ret = salt_ssh_cli.run("test.ping")
+    assert ret.returncode == 0
+    assert test_script_1.exists()
+    assert test_script_2.exists()
+    pathlib.Path(test_script_1).unlink()
+    pathlib.Path(test_script_2).unlink()
+
+    ret = salt_ssh_cli.run("test.ping")
+    assert ret.returncode == 0
+    assert not test_script_1.exists()
+    assert not test_script_2.exists()
+
+    ret = salt_ssh_cli.run(
+        "test.ping",
+        "--pre-flight",
+    )
+    assert ret.returncode == 0
+    assert test_script_1.exists()
+    assert test_script_2.exists()
+
+
+@pytest.mark.slow_test
+def test_ssh_run_pre_flight_args_prevent_injection(
+    salt_ssh_cli, _create_roster, tmp_path
+):
+    """
+    test ssh when --pre-flight is passed to salt-ssh
+    and evil arguments are used in order to produce shell injection
+    """
+    injected_file = tmp_path / "injection"
+    _custom_roster(
+        salt_ssh_cli.roster_file,
+        {"ssh_pre_flight_args": f"foobar; echo injected > {str(injected_file)}"},
+    )
+    # Create pre_flight script that accepts args
+    test_script = _create_roster["test_script"]
+    test_script_1 = pathlib.Path(test_script + "-echo")
+    test_script_2 = pathlib.Path(test_script + "-foobar;")
+    with salt.utils.files.fopen(_create_roster["data"]["ssh_pre_flight"], "w") as fp:
+        fp.write(
+            f"""
+        touch {str(test_script)}-$1
+        touch {str(test_script)}-$2
+        """
+        )
+
+    # make sure we previously ran a command so the thin dir exists
+    ret = salt_ssh_cli.run("test.ping")
+    assert ret.returncode == 0
+    assert test_script_1.exists()
+    assert test_script_2.exists()
+    test_script_1.unlink()
+    test_script_2.unlink()
+    assert not injected_file.is_file()
+
+    ret = salt_ssh_cli.run(
+        "test.ping",
+        "--pre-flight",
+    )
+    assert ret.returncode == 0
+
+    assert test_script_1.exists()
+    assert test_script_2.exists()
+    assert not pathlib.Path(
+        injected_file
+    ).is_file(), "File injection suceeded. This shouldn't happend"
+
+
+@pytest.mark.flaky(max_runs=4)
+@pytest.mark.slow_test
+def test_ssh_run_pre_flight_failure(salt_ssh_cli, _create_roster):
+    """
+    test ssh_pre_flight when there is a failure
+    in the script.
+    """
+    with salt.utils.files.fopen(_create_roster["data"]["ssh_pre_flight"], "w") as fp_:
+        fp_.write("exit 2")
+
+    ret = salt_ssh_cli.run(
+        "test.ping",
+        "--pre-flight",
+    )
+    assert ret.data["retcode"] == 2
+
+
+@pytest.fixture
+def account():
+    username = random_string("test-account-", uppercase=False)
+    with pytest.helpers.create_account(username=username) as account:
+        yield account
+
+
+@pytest.mark.slow_test
+def test_ssh_pre_flight_script(salt_ssh_cli, caplog, _create_roster, tmp_path, account):
+    """
+    Test to ensure user cannot create and run a script
+    with the expected pre_flight script path on target.
+    """
+    try:
+        script = pathlib.Path.home() / "hacked"
+        tmp_preflight = pathlib.Path("/tmp", "ssh_pre_flight.sh")
+        tmp_preflight.write_text(f"touch {script}")
+        os.chown(tmp_preflight, account.info.uid, account.info.gid)
+        ret = salt_ssh_cli.run("test.ping")
+        assert not script.is_file()
+        assert ret.returncode == 0
+        assert ret.stdout == '{\n"localhost": true\n}\n'
+    finally:
+        for _file in [script, tmp_preflight]:
+            if _file.is_file():
+                _file.unlink()
+
+
+def demote(user_uid, user_gid):
+    def result():
+        # os.setgid does not remove group membership, so we remove them here so they are REALLY non-root
+        os.setgroups([])
+        os.setgid(user_gid)
+        os.setuid(user_uid)
+
+    return result
+
+
+@pytest.mark.slow_test
+def test_ssh_pre_flight_perms(salt_ssh_cli, caplog, _create_roster, account):
+    """
+    Test to ensure standard user cannot run pre flight script
+    on target when user sets wrong permissions (777) on
+    ssh_pre_flight script.
+    """
+    try:
+        script = pathlib.Path("/tmp", "itworked")
+        preflight = pathlib.Path("/ssh_pre_flight.sh")
+        preflight.write_text(f"touch {str(script)}")
+        tmp_preflight = pathlib.Path("/tmp", preflight.name)
+
+        _custom_roster(salt_ssh_cli.roster_file, {"ssh_pre_flight": str(preflight)})
+        preflight.chmod(0o0777)
+        run_script = pathlib.Path("/run_script")
+        run_script.write_text(
+            f"""
+        x=1
+        while [ $x -le 200000 ]; do
+            SCRIPT=`bash {str(tmp_preflight)} 2> /dev/null; echo $?`
+            if [ ${{SCRIPT}} == 0 ]; then
+                break
+            fi
+            x=$(( $x + 1 ))
+        done
+        """
+        )
+        run_script.chmod(0o0777)
+        # pylint: disable=W1509
+        ret = subprocess.Popen(
+            ["sh", f"{run_script}"],
+            preexec_fn=demote(account.info.uid, account.info.gid),
+            stdout=None,
+            stderr=None,
+            stdin=None,
+            universal_newlines=True,
+        )
+        # pylint: enable=W1509
+        ret = salt_ssh_cli.run("test.ping")
+        assert ret.returncode == 0
+
+        # Lets make sure a different user other than root
+        # Didn't run the script
+        assert os.stat(script).st_uid != account.info.uid
+        assert script.is_file()
+    finally:
+        for _file in [script, preflight, tmp_preflight, run_script]:
+            if _file.is_file():
+                _file.unlink()
+
+
+@pytest.mark.slow_test
+def test_ssh_run_pre_flight_target_file_perms(salt_ssh_cli, _create_roster, tmp_path):
+    """
+    test ssh_pre_flight to ensure the target pre flight script
+    has the correct perms
+    """
+    perms_file = tmp_path / "perms"
+    with salt.utils.files.fopen(_create_roster["data"]["ssh_pre_flight"], "w") as fp_:
+        fp_.write(
+            f"""
+        SCRIPT_NAME=$0
+        stat -L -c "%a %G %U" $SCRIPT_NAME > {perms_file}
+        """
+        )
+
+    ret = salt_ssh_cli.run(
+        "test.ping",
+        "--pre-flight",
+    )
+    assert ret.returncode == 0
+    with salt.utils.files.fopen(perms_file) as fp:
+        data = fp.read()
+    assert data.split()[0] == "600"
+    uid = os.getuid()
+    gid = os.getgid()
+    assert data.split()[1] == grp.getgrgid(gid).gr_name
+    assert data.split()[2] == pwd.getpwuid(uid).pw_name
diff --git a/tests/pytests/unit/client/ssh/test_single.py b/tests/pytests/unit/client/ssh/test_single.py
index f97519d5cc2..c88a1c2127f 100644
--- a/tests/pytests/unit/client/ssh/test_single.py
+++ b/tests/pytests/unit/client/ssh/test_single.py
@@ -1,6 +1,5 @@
-import os
+import logging
 import re
-import tempfile
 from textwrap import dedent
 
 import pytest
@@ -16,6 +15,8 @@ import salt.utils.yaml
 from salt.client import ssh
 from tests.support.mock import MagicMock, call, patch
 
+log = logging.getLogger(__name__)
+
 
 @pytest.fixture
 def opts(tmp_path):
@@ -59,7 +60,7 @@ def test_single_opts(opts, target):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     assert single.shell._ssh_opts() == ""
@@ -87,7 +88,7 @@ def test_run_with_pre_flight(opts, target, tmp_path):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     cmd_ret = ("Success", "", 0)
@@ -122,7 +123,7 @@ def test_run_with_pre_flight_with_args(opts, target, tmp_path):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     cmd_ret = ("Success", "foobar", 0)
@@ -156,7 +157,7 @@ def test_run_with_pre_flight_stderr(opts, target, tmp_path):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     cmd_ret = ("", "Error running script", 1)
@@ -190,7 +191,7 @@ def test_run_with_pre_flight_script_doesnot_exist(opts, target, tmp_path):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     cmd_ret = ("Success", "", 0)
@@ -224,7 +225,7 @@ def test_run_with_pre_flight_thin_dir_exists(opts, target, tmp_path):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     cmd_ret = ("", "", 0)
@@ -242,6 +243,39 @@ def test_run_with_pre_flight_thin_dir_exists(opts, target, tmp_path):
         assert ret == cmd_ret
 
 
+def test_run_ssh_pre_flight(opts, target, tmp_path):
+    """
+    test Single.run_ssh_pre_flight function
+    """
+    target["ssh_pre_flight"] = str(tmp_path / "script.sh")
+    single = ssh.Single(
+        opts,
+        opts["argv"],
+        "localhost",
+        mods={},
+        fsclient=None,
+        thin=salt.utils.thin.thin_path(opts["cachedir"]),
+        mine=False,
+        **target,
+    )
+
+    cmd_ret = ("Success", "", 0)
+    mock_flight = MagicMock(return_value=cmd_ret)
+    mock_cmd = MagicMock(return_value=cmd_ret)
+    patch_flight = patch("salt.client.ssh.Single.run_ssh_pre_flight", mock_flight)
+    patch_cmd = patch("salt.client.ssh.Single.cmd_block", mock_cmd)
+    patch_exec_cmd = patch(
+        "salt.client.ssh.shell.Shell.exec_cmd", return_value=("", "", 1)
+    )
+    patch_os = patch("os.path.exists", side_effect=[True])
+
+    with patch_os, patch_flight, patch_cmd, patch_exec_cmd:
+        ret = single.run()
+        mock_cmd.assert_called()
+        mock_flight.assert_called()
+        assert ret == cmd_ret
+
+
 def test_execute_script(opts, target, tmp_path):
     """
     test Single.execute_script()
@@ -255,7 +289,7 @@ def test_execute_script(opts, target, tmp_path):
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
         winrm=False,
-        **target
+        **target,
     )
 
     exp_ret = ("Success", "", 0)
@@ -273,7 +307,7 @@ def test_execute_script(opts, target, tmp_path):
         ] == mock_cmd.call_args_list
 
 
-def test_shim_cmd(opts, target):
+def test_shim_cmd(opts, target, tmp_path):
     """
     test Single.shim_cmd()
     """
@@ -287,7 +321,7 @@ def test_shim_cmd(opts, target):
         mine=False,
         winrm=False,
         tty=True,
-        **target
+        **target,
     )
 
     exp_ret = ("Success", "", 0)
@@ -295,21 +329,24 @@ def test_shim_cmd(opts, target):
     patch_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_cmd)
     patch_send = patch("salt.client.ssh.shell.Shell.send", return_value=("", "", 0))
     patch_rand = patch("os.urandom", return_value=b"5\xd9l\xca\xc2\xff")
+    tmp_file = tmp_path / "tmp_file"
+    mock_tmp = MagicMock()
+    patch_tmp = patch("tempfile.NamedTemporaryFile", mock_tmp)
+    mock_tmp.return_value.__enter__.return_value.name = tmp_file
 
-    with patch_cmd, patch_rand, patch_send:
+    with patch_cmd, patch_tmp, patch_send:
         ret = single.shim_cmd(cmd_str="echo test")
         assert ret == exp_ret
         assert [
-            call("/bin/sh '.35d96ccac2ff.py'"),
-            call("rm '.35d96ccac2ff.py'"),
+            call(f"/bin/sh '.{tmp_file.name}'"),
+            call(f"rm '.{tmp_file.name}'"),
         ] == mock_cmd.call_args_list
 
 
-def test_run_ssh_pre_flight(opts, target, tmp_path):
+def test_shim_cmd_copy_fails(opts, target, caplog):
     """
-    test Single.run_ssh_pre_flight
+    test Single.shim_cmd() when copying the file fails
     """
-    target["ssh_pre_flight"] = str(tmp_path / "script.sh")
     single = ssh.Single(
         opts,
         opts["argv"],
@@ -320,24 +357,202 @@ def test_run_ssh_pre_flight(opts, target, tmp_path):
         mine=False,
         winrm=False,
         tty=True,
-        **target
+        **target,
     )
 
-    exp_ret = ("Success", "", 0)
-    mock_cmd = MagicMock(return_value=exp_ret)
+    ret_cmd = ("Success", "", 0)
+    mock_cmd = MagicMock(return_value=ret_cmd)
     patch_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_cmd)
-    patch_send = patch("salt.client.ssh.shell.Shell.send", return_value=exp_ret)
-    exp_tmp = os.path.join(
-        tempfile.gettempdir(), os.path.basename(target["ssh_pre_flight"])
+    ret_send = ("", "General error in file copy", 1)
+    patch_send = patch("salt.client.ssh.shell.Shell.send", return_value=ret_send)
+    patch_rand = patch("os.urandom", return_value=b"5\xd9l\xca\xc2\xff")
+
+    with patch_cmd, patch_rand, patch_send:
+        ret = single.shim_cmd(cmd_str="echo test")
+        assert ret == ret_send
+        assert "Could not copy the shim script to target" in caplog.text
+        mock_cmd.assert_not_called()
+
+
+def test_run_ssh_pre_flight_no_connect(opts, target, tmp_path, caplog):
+    """
+    test Single.run_ssh_pre_flight when you
+    cannot connect to the target
+    """
+    pre_flight = tmp_path / "script.sh"
+    pre_flight.write_text("")
+    target["ssh_pre_flight"] = str(pre_flight)
+    single = ssh.Single(
+        opts,
+        opts["argv"],
+        "localhost",
+        mods={},
+        fsclient=None,
+        thin=salt.utils.thin.thin_path(opts["cachedir"]),
+        mine=False,
+        winrm=False,
+        tty=True,
+        **target,
     )
+    mock_exec_cmd = MagicMock(return_value=("", "", 1))
+    patch_exec_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_exec_cmd)
+    tmp_file = tmp_path / "tmp_file"
+    mock_tmp = MagicMock()
+    patch_tmp = patch("tempfile.NamedTemporaryFile", mock_tmp)
+    mock_tmp.return_value.__enter__.return_value.name = tmp_file
+    ret_send = (
+        "",
+        "ssh: connect to host 192.168.1.186 port 22: No route to host\nscp: Connection closed\n",
+        255,
+    )
+    send_mock = MagicMock(return_value=ret_send)
+    patch_send = patch("salt.client.ssh.shell.Shell.send", send_mock)
+
+    with caplog.at_level(logging.TRACE):
+        with patch_send, patch_exec_cmd, patch_tmp:
+            ret = single.run_ssh_pre_flight()
+    assert "Copying the pre flight script" in caplog.text
+    assert "Could not copy the pre flight script to target" in caplog.text
+    assert ret == ret_send
+    assert send_mock.call_args_list[0][0][0] == tmp_file
+    target_script = send_mock.call_args_list[0][0][1]
+    assert re.search(r".[a-z0-9]+", target_script)
+    mock_exec_cmd.assert_not_called()
+
+
+def test_run_ssh_pre_flight_permission_denied(opts, target, tmp_path):
+    """
+    test Single.run_ssh_pre_flight when you
+    cannot copy script to the target due to
+    a permission denied error
+    """
+    pre_flight = tmp_path / "script.sh"
+    pre_flight.write_text("")
+    target["ssh_pre_flight"] = str(pre_flight)
+    single = ssh.Single(
+        opts,
+        opts["argv"],
+        "localhost",
+        mods={},
+        fsclient=None,
+        thin=salt.utils.thin.thin_path(opts["cachedir"]),
+        mine=False,
+        winrm=False,
+        tty=True,
+        **target,
+    )
+    mock_exec_cmd = MagicMock(return_value=("", "", 1))
+    patch_exec_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_exec_cmd)
+    tmp_file = tmp_path / "tmp_file"
+    mock_tmp = MagicMock()
+    patch_tmp = patch("tempfile.NamedTemporaryFile", mock_tmp)
+    mock_tmp.return_value.__enter__.return_value.name = tmp_file
+    ret_send = (
+        "",
+        'scp: dest open "/tmp/preflight.sh": Permission denied\nscp: failed to upload file /etc/salt/preflight.sh to /tmp/preflight.sh\n',
+        255,
+    )
+    send_mock = MagicMock(return_value=ret_send)
+    patch_send = patch("salt.client.ssh.shell.Shell.send", send_mock)
 
-    with patch_cmd, patch_send:
+    with patch_send, patch_exec_cmd, patch_tmp:
         ret = single.run_ssh_pre_flight()
-        assert ret == exp_ret
-        assert [
-            call("/bin/sh '{}'".format(exp_tmp)),
-            call("rm '{}'".format(exp_tmp)),
-        ] == mock_cmd.call_args_list
+    assert ret == ret_send
+    assert send_mock.call_args_list[0][0][0] == tmp_file
+    target_script = send_mock.call_args_list[0][0][1]
+    assert re.search(r".[a-z0-9]+", target_script)
+    mock_exec_cmd.assert_not_called()
+
+
+def test_run_ssh_pre_flight_connect(opts, target, tmp_path, caplog):
+    """
+    test Single.run_ssh_pre_flight when you
+    can connect to the target
+    """
+    pre_flight = tmp_path / "script.sh"
+    pre_flight.write_text("")
+    target["ssh_pre_flight"] = str(pre_flight)
+    single = ssh.Single(
+        opts,
+        opts["argv"],
+        "localhost",
+        mods={},
+        fsclient=None,
+        thin=salt.utils.thin.thin_path(opts["cachedir"]),
+        mine=False,
+        winrm=False,
+        tty=True,
+        **target,
+    )
+    ret_exec_cmd = ("", "", 1)
+    mock_exec_cmd = MagicMock(return_value=ret_exec_cmd)
+    patch_exec_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_exec_cmd)
+    tmp_file = tmp_path / "tmp_file"
+    mock_tmp = MagicMock()
+    patch_tmp = patch("tempfile.NamedTemporaryFile", mock_tmp)
+    mock_tmp.return_value.__enter__.return_value.name = tmp_file
+    ret_send = (
+        "",
+        "\rroot@192.168.1.187's password: \n\rpreflight.sh 0%    0 0.0KB/s   --:-- ETA\rpreflight.sh 100%   20     2.7KB/s   00:00 \n",
+        0,
+    )
+    send_mock = MagicMock(return_value=ret_send)
+    patch_send = patch("salt.client.ssh.shell.Shell.send", send_mock)
+
+    with caplog.at_level(logging.TRACE):
+        with patch_send, patch_exec_cmd, patch_tmp:
+            ret = single.run_ssh_pre_flight()
+
+    assert "Executing the pre flight script on target" in caplog.text
+    assert ret == ret_exec_cmd
+    assert send_mock.call_args_list[0][0][0] == tmp_file
+    target_script = send_mock.call_args_list[0][0][1]
+    assert re.search(r".[a-z0-9]+", target_script)
+    mock_exec_cmd.assert_called()
+
+
+def test_run_ssh_pre_flight_shutil_fails(opts, target, tmp_path):
+    """
+    test Single.run_ssh_pre_flight when cannot
+    copyfile with shutil
+    """
+    pre_flight = tmp_path / "script.sh"
+    pre_flight.write_text("")
+    target["ssh_pre_flight"] = str(pre_flight)
+    single = ssh.Single(
+        opts,
+        opts["argv"],
+        "localhost",
+        mods={},
+        fsclient=None,
+        thin=salt.utils.thin.thin_path(opts["cachedir"]),
+        mine=False,
+        winrm=False,
+        tty=True,
+        **target,
+    )
+    ret_exec_cmd = ("", "", 1)
+    mock_exec_cmd = MagicMock(return_value=ret_exec_cmd)
+    patch_exec_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_exec_cmd)
+    tmp_file = tmp_path / "tmp_file"
+    mock_tmp = MagicMock()
+    patch_tmp = patch("tempfile.NamedTemporaryFile", mock_tmp)
+    mock_tmp.return_value.__enter__.return_value.name = tmp_file
+    send_mock = MagicMock()
+    mock_shutil = MagicMock(side_effect=IOError("Permission Denied"))
+    patch_shutil = patch("shutil.copyfile", mock_shutil)
+    patch_send = patch("salt.client.ssh.shell.Shell.send", send_mock)
+
+    with patch_send, patch_exec_cmd, patch_tmp, patch_shutil:
+        ret = single.run_ssh_pre_flight()
+
+    assert ret == (
+        "",
+        "Could not copy pre flight script to temporary path",
+        1,
+    )
+    mock_exec_cmd.assert_not_called()
+    send_mock.assert_not_called()
 
 
 @pytest.mark.skip_on_windows(reason="SSH_PY_SHIM not set on windows")
@@ -355,7 +570,7 @@ def test_cmd_run_set_path(opts, target):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     ret = single._cmd_str()
@@ -376,7 +591,7 @@ def test_cmd_run_not_set_path(opts, target):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     ret = single._cmd_str()
@@ -395,7 +610,7 @@ def test_cmd_block_python_version_error(opts, target):
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
         winrm=False,
-        **target
+        **target,
     )
     mock_shim = MagicMock(
         return_value=(("", "ERROR: Unable to locate appropriate python command\n", 10))
@@ -434,7 +649,9 @@ def test_run_with_pre_flight_args(opts, target, test_opts, tmp_path):
     and script successfully runs
     """
     opts["ssh_run_pre_flight"] = True
-    target["ssh_pre_flight"] = str(tmp_path / "script.sh")
+    pre_flight_script = tmp_path / "script.sh"
+    pre_flight_script.write_text("")
+    target["ssh_pre_flight"] = str(pre_flight_script)
 
     if test_opts[0] is not None:
         target["ssh_pre_flight_args"] = test_opts[0]
@@ -448,7 +665,7 @@ def test_run_with_pre_flight_args(opts, target, test_opts, tmp_path):
         fsclient=None,
         thin=salt.utils.thin.thin_path(opts["cachedir"]),
         mine=False,
-        **target
+        **target,
     )
 
     cmd_ret = ("Success", "", 0)
@@ -456,14 +673,15 @@ def test_run_with_pre_flight_args(opts, target, test_opts, tmp_path):
     mock_exec_cmd = MagicMock(return_value=("", "", 0))
     patch_cmd = patch("salt.client.ssh.Single.cmd_block", mock_cmd)
     patch_exec_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_exec_cmd)
-    patch_shell_send = patch("salt.client.ssh.shell.Shell.send", return_value=None)
+    patch_shell_send = patch(
+        "salt.client.ssh.shell.Shell.send", return_value=("", "", 0)
+    )
     patch_os = patch("os.path.exists", side_effect=[True])
 
     with patch_os, patch_cmd, patch_exec_cmd, patch_shell_send:
-        ret = single.run()
-        assert mock_exec_cmd.mock_calls[0].args[
-            0
-        ] == "/bin/sh '/tmp/script.sh'{}".format(expected_args)
+        single.run()
+        script_args = mock_exec_cmd.mock_calls[0].args[0]
+        assert re.search(r"\/bin\/sh '.[a-z0-9]+", script_args)
 
 
 @pytest.mark.slow_test
diff --git a/tests/pytests/unit/client/ssh/test_ssh.py b/tests/pytests/unit/client/ssh/test_ssh.py
index 377aad9998c..cece16026cf 100644
--- a/tests/pytests/unit/client/ssh/test_ssh.py
+++ b/tests/pytests/unit/client/ssh/test_ssh.py
@@ -339,3 +339,113 @@ def test_extra_filerefs(tmp_path, opts):
     with patch("salt.roster.get_roster_file", MagicMock(return_value=roster)):
         ssh_obj = client._prep_ssh(**ssh_opts)
         assert ssh_obj.opts.get("extra_filerefs", None) == "salt://foobar"
+
+
+def test_key_deploy_permission_denied_scp(tmp_path, opts):
+    """
+    test "key_deploy" function when
+    permission denied authentication error
+    when attempting to use scp to copy file
+    to target
+    """
+    host = "localhost"
+    passwd = "password"
+    usr = "ssh-usr"
+    opts["ssh_user"] = usr
+    opts["tgt"] = host
+
+    ssh_ret = {
+        host: {
+            "stdout": "\rroot@192.168.1.187's password: \n\rroot@192.168.1.187's password: \n\rroot@192.168.1.187's password: \n",
+            "stderr": "Permission denied, please try again.\nPermission denied, please try again.\nroot@192.168.1.187: Permission denied (publickey,gssapi-keyex,gssapi-with-micimport pudb; pu.dbassword).\nscp: Connection closed\n",
+            "retcode": 255,
+        }
+    }
+    key_run_ret = {
+        "localhost": {
+            "jid": "20230922155652279959",
+            "return": "test",
+            "retcode": 0,
+            "id": "test",
+            "fun": "cmd.run",
+            "fun_args": ["echo test"],
+        }
+    }
+    patch_roster_file = patch("salt.roster.get_roster_file", MagicMock(return_value=""))
+    with patch_roster_file:
+        client = ssh.SSH(opts)
+    patch_input = patch("builtins.input", side_effect=["y"])
+    patch_getpass = patch("getpass.getpass", return_value=["password"])
+    mock_key_run = MagicMock(return_value=key_run_ret)
+    patch_key_run = patch("salt.client.ssh.SSH._key_deploy_run", mock_key_run)
+    with patch_input, patch_getpass, patch_key_run:
+        ret = client.key_deploy(host, ssh_ret)
+    assert mock_key_run.call_args_list[0][0] == (
+        host,
+        {"passwd": [passwd], "host": host, "user": usr},
+        True,
+    )
+    assert ret == key_run_ret
+    assert mock_key_run.call_count == 1
+
+
+def test_key_deploy_permission_denied_file_scp(tmp_path, opts):
+    """
+    test "key_deploy" function when permission denied
+    due to not having access to copy the file to the target
+    We do not want to deploy the key, because this is not
+    an authentication to the target error.
+    """
+    host = "localhost"
+    passwd = "password"
+    usr = "ssh-usr"
+    opts["ssh_user"] = usr
+    opts["tgt"] = host
+
+    mock_key_run = MagicMock(return_value=False)
+    patch_key_run = patch("salt.client.ssh.SSH._key_deploy_run", mock_key_run)
+
+    ssh_ret = {
+        "localhost": {
+            "stdout": "",
+            "stderr": 'scp: dest open "/tmp/preflight.sh": Permission denied\nscp: failed to upload file /etc/salt/preflight.sh to /tmp/preflight.sh\n',
+            "retcode": 1,
+        }
+    }
+    patch_roster_file = patch("salt.roster.get_roster_file", MagicMock(return_value=""))
+    with patch_roster_file:
+        client = ssh.SSH(opts)
+    ret = client.key_deploy(host, ssh_ret)
+    assert ret == ssh_ret
+    assert mock_key_run.call_count == 0
+
+
+def test_key_deploy_no_permission_denied(tmp_path, opts):
+    """
+    test "key_deploy" function when no permission denied
+    is returned
+    """
+    host = "localhost"
+    passwd = "password"
+    usr = "ssh-usr"
+    opts["ssh_user"] = usr
+    opts["tgt"] = host
+
+    mock_key_run = MagicMock(return_value=False)
+    patch_key_run = patch("salt.client.ssh.SSH._key_deploy_run", mock_key_run)
+    ssh_ret = {
+        "localhost": {
+            "jid": "20230922161937998385",
+            "return": "test",
+            "retcode": 0,
+            "id": "test",
+            "fun": "cmd.run",
+            "fun_args": ["echo test"],
+        }
+    }
+    patch_roster_file = patch("salt.roster.get_roster_file", MagicMock(return_value=""))
+    with patch_roster_file:
+        client = ssh.SSH(opts)
+    ret = client.key_deploy(host, ssh_ret)
+    assert ret == ssh_ret
+    assert mock_key_run.call_count == 0
-- 
2.42.0

openSUSE Build Service is sponsored by