File implementation-of-held-unheld-functions-for-state-pk.patch of Package salt.21413
From 2ee360753c8fa937d9c81bf7da24f457041650bc Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <35733135+vzhestkov@users.noreply.github.com>
Date: Mon, 5 Jul 2021 18:39:26 +0300
Subject: [PATCH] Implementation of held/unheld functions for state pkg
 (#387)
* Implementation of held/unheld functions for state pkg
---
 salt/modules/zypperpkg.py                    | 201 +++++++++---
 salt/states/pkg.py                           | 310 +++++++++++++++++++
 tests/pytests/unit/modules/test_zypperpkg.py | 142 +++++++++
 tests/pytests/unit/states/test_pkg.py        | 155 ++++++++++
 4 files changed, 760 insertions(+), 48 deletions(-)
 create mode 100644 tests/pytests/unit/modules/test_zypperpkg.py
 create mode 100644 tests/pytests/unit/states/test_pkg.py
diff --git a/salt/modules/zypperpkg.py b/salt/modules/zypperpkg.py
index e064e2cb4e..932b30bac5 100644
--- a/salt/modules/zypperpkg.py
+++ b/salt/modules/zypperpkg.py
@@ -2071,6 +2071,76 @@ def purge(
     return _uninstall(inclusion_detection, name=name, pkgs=pkgs, root=root)
 
 
+def list_holds(pattern=None, full=True, root=None, **kwargs):
+    """
+    List information on locked packages.
+
+    .. note::
+        This function returns the computed output of ``list_locks``
+        to show exact locked packages.
+
+    pattern
+        Regular expression used to match the package name
+
+    full : True
+        Show the full hold definition including version and epoch. Set to
+        ``False`` to return just the name of the package(s) being held.
+
+    root
+        Operate on a different root directory.
+
+
+    CLI Example:
+
+    .. code-block:: bash
+
+        salt '*' pkg.list_holds
+        salt '*' pkg.list_holds full=False
+    """
+    locks = list_locks(root=root)
+    ret = []
+    inst_pkgs = {}
+    for solv_name, lock in locks.items():
+        if lock.get("type", "package") != "package":
+            continue
+        try:
+            found_pkgs = search(
+                solv_name,
+                root=root,
+                match=None if "*" in solv_name else "exact",
+                case_sensitive=(lock.get("case_sensitive", "on") == "on"),
+                installed_only=True,
+                details=True,
+                all_versions=True,
+                ignore_no_matching_item=True,
+            )
+        except CommandExecutionError:
+            continue
+        if found_pkgs:
+            for pkg in found_pkgs:
+                if pkg not in inst_pkgs:
+                    inst_pkgs.update(
+                        info_installed(
+                            pkg, root=root, attr="edition,epoch", all_versions=True
+                        )
+                    )
+
+    ptrn_re = re.compile(r"{}-\S+".format(pattern)) if pattern else None
+    for pkg_name, pkg_editions in inst_pkgs.items():
+        for pkg_info in pkg_editions:
+            pkg_ret = (
+                "{}-{}:{}.*".format(
+                    pkg_name, pkg_info.get("epoch", 0), pkg_info.get("edition")
+                )
+                if full
+                else pkg_name
+            )
+            if pkg_ret not in ret and (not ptrn_re or ptrn_re.match(pkg_ret)):
+                ret.append(pkg_ret)
+
+    return ret
+
+
 def list_locks(root=None):
     """
     List current package locks.
@@ -2141,43 +2211,68 @@ def clean_locks(root=None):
     return out
 
 
-def unhold(name=None, pkgs=None, **kwargs):
+def unhold(name=None, pkgs=None, root=None, **kwargs):
     """
-    Remove specified package lock.
+    Remove a package hold.
+
+    name
+        A package name to unhold, or a comma-separated list of package names to
+        unhold.
+
+    pkgs
+        A list of packages to unhold.  The ``name`` parameter will be ignored if
+        this option is passed.
 
     root
-        operate on a different root directory.
+        Operate on a different root directory.
 
     CLI Example:
 
     .. code-block:: bash
 
-        salt '*' pkg.remove_lock <package name>
-        salt '*' pkg.remove_lock <package1>,<package2>,<package3>
-        salt '*' pkg.remove_lock pkgs='["foo", "bar"]'
+        salt '*' pkg.unhold <package name>
+        salt '*' pkg.unhold <package1>,<package2>,<package3>
+        salt '*' pkg.unhold pkgs='["foo", "bar"]'
     """
     ret = {}
-    root = kwargs.get("root")
-    if (not name and not pkgs) or (name and pkgs):
+    if not name and not pkgs:
         raise CommandExecutionError("Name or packages must be specified.")
-    elif name:
-        pkgs = [name]
 
-    locks = list_locks(root)
-    try:
-        pkgs = list(__salt__["pkg_resource.parse_targets"](pkgs)[0].keys())
-    except MinionError as exc:
-        raise CommandExecutionError(exc)
+    targets = []
+    if pkgs:
+        targets.extend(pkgs)
+    else:
+        targets.append(name)
 
+    locks = list_locks()
     removed = []
-    missing = []
-    for pkg in pkgs:
-        if locks.get(pkg):
-            removed.append(pkg)
-            ret[pkg]["comment"] = "Package {} is no longer held.".format(pkg)
+
+    for target in targets:
+        version = None
+        if isinstance(target, dict):
+            (target, version) = next(iter(target.items()))
+        ret[target] = {"name": target, "changes": {}, "result": True, "comment": ""}
+        if locks.get(target):
+            lock_ver = None
+            if "version" in locks.get(target):
+                lock_ver = locks.get(target)["version"]
+                lock_ver = lock_ver.lstrip("= ")
+            if version and lock_ver != version:
+                ret[target]["result"] = False
+                ret[target][
+                    "comment"
+                ] = "Unable to unhold package {} as it is held with the other version.".format(
+                    target
+                )
+            else:
+                removed.append(
+                    target if not lock_ver else "{}={}".format(target, lock_ver)
+                )
+                ret[target]["changes"]["new"] = ""
+                ret[target]["changes"]["old"] = "hold"
+                ret[target]["comment"] = "Package {} is no longer held.".format(target)
         else:
-            missing.append(pkg)
-            ret[pkg]["comment"] = "Package {} unable to be unheld.".format(pkg)
+            ret[target]["comment"] = "Package {} was already unheld.".format(target)
 
     if removed:
         __zypper__(root=root).call("rl", *removed)
@@ -2223,47 +2318,57 @@ def remove_lock(packages, root=None, **kwargs):  # pylint: disable=unused-argume
     return {"removed": len(removed), "not_found": missing}
 
 
-def hold(name=None, pkgs=None, **kwargs):
+def hold(name=None, pkgs=None, root=None, **kwargs):
     """
-    Add a package lock. Specify packages to lock by exact name.
+    Add a package hold.  Specify one of ``name`` and ``pkgs``.
+
+    name
+        A package name to hold, or a comma-separated list of package names to
+        hold.
+
+    pkgs
+        A list of packages to hold.  The ``name`` parameter will be ignored if
+        this option is passed.
 
     root
-        operate on a different root directory.
+        Operate on a different root directory.
+
 
     CLI Example:
 
     .. code-block:: bash
 
-        salt '*' pkg.add_lock <package name>
-        salt '*' pkg.add_lock <package1>,<package2>,<package3>
-        salt '*' pkg.add_lock pkgs='["foo", "bar"]'
-
-    :param name:
-    :param pkgs:
-    :param kwargs:
-    :return:
+        salt '*' pkg.hold <package name>
+        salt '*' pkg.hold <package1>,<package2>,<package3>
+        salt '*' pkg.hold pkgs='["foo", "bar"]'
     """
     ret = {}
-    root = kwargs.get("root")
-    if (not name and not pkgs) or (name and pkgs):
+    if not name and not pkgs:
         raise CommandExecutionError("Name or packages must be specified.")
-    elif name:
-        pkgs = [name]
 
-    locks = list_locks(root=root)
+    targets = []
+    if pkgs:
+        targets.extend(pkgs)
+    else:
+        targets.append(name)
+
+    locks = list_locks()
     added = []
-    try:
-        pkgs = list(__salt__["pkg_resource.parse_targets"](pkgs)[0].keys())
-    except MinionError as exc:
-        raise CommandExecutionError(exc)
 
-    for pkg in pkgs:
-        ret[pkg] = {"name": pkg, "changes": {}, "result": False, "comment": ""}
-        if not locks.get(pkg):
-            added.append(pkg)
-            ret[pkg]["comment"] = "Package {} is now being held.".format(pkg)
+    for target in targets:
+        version = None
+        if isinstance(target, dict):
+            (target, version) = next(iter(target.items()))
+        ret[target] = {"name": target, "changes": {}, "result": True, "comment": ""}
+        if not locks.get(target):
+            added.append(target if not version else "{}={}".format(target, version))
+            ret[target]["changes"]["new"] = "hold"
+            ret[target]["changes"]["old"] = ""
+            ret[target]["comment"] = "Package {} is now being held.".format(target)
         else:
-            ret[pkg]["comment"] = "Package {} is already set to be held.".format(pkg)
+            ret[target]["comment"] = "Package {} is already set to be held.".format(
+                target
+            )
 
     if added:
         __zypper__(root=root).call("al", *added)
diff --git a/salt/states/pkg.py b/salt/states/pkg.py
index f7327a33e3..0ef3f056c5 100644
--- a/salt/states/pkg.py
+++ b/salt/states/pkg.py
@@ -3550,3 +3550,313 @@ def mod_watch(name, **kwargs):
         "comment": "pkg.{} does not work with the watch requisite".format(sfun),
         "result": False,
     }
+
+
+def held(name, version=None, pkgs=None, replace=False, **kwargs):
+    """
+    Set package in 'hold' state, meaning it will not be changed.
+
+    :param str name:
+        The name of the package to be held. This parameter is ignored
+        if ``pkgs`` is used.
+
+    :param str version:
+        Hold a specific version of a package.
+        Full description of this parameter is in `installed` function.
+
+        .. note::
+
+            This parameter make sense for Zypper-based systems.
+            Ignored for YUM/DNF and APT
+
+    :param list pkgs:
+        A list of packages to be held. All packages listed under ``pkgs``
+        will be held.
+
+        .. code-block:: yaml
+
+            mypkgs:
+              pkg.held:
+                - pkgs:
+                  - foo
+                  - bar: 1.2.3-4
+                  - baz
+
+        .. note::
+
+            For Zypper-based systems the package could be held for
+            the version specified. YUM/DNF and APT ingore it.
+
+    :param bool replace:
+        Force replacement of existings holds with specified.
+        By default, this parameter is set to ``False``.
+    """
+
+    if isinstance(pkgs, list) and len(pkgs) == 0 and not replace:
+        return {
+            "name": name,
+            "changes": {},
+            "result": True,
+            "comment": "No packages to be held provided",
+        }
+
+    # If just a name (and optionally a version) is passed, just pack them into
+    # the pkgs argument.
+    if name and pkgs is None:
+        if version:
+            pkgs = [{name: version}]
+            version = None
+        else:
+            pkgs = [name]
+
+    locks = {}
+    vr_lock = False
+    if "pkg.list_locks" in __salt__:
+        locks = __salt__["pkg.list_locks"]()
+        vr_lock = True
+    elif "pkg.list_holds" in __salt__:
+        _locks = __salt__["pkg.list_holds"](full=True)
+        lock_re = re.compile(r"^(.+)-(\d+):(.*)\.\*")
+        for lock in _locks:
+            match = lock_re.match(lock)
+            if match:
+                epoch = match.group(2)
+                if epoch == "0":
+                    epoch = ""
+                else:
+                    epoch = "{}:".format(epoch)
+                locks.update(
+                    {match.group(1): {"version": "{}{}".format(epoch, match.group(3))}}
+                )
+            else:
+                locks.update({lock: {}})
+    elif "pkg.get_selections" in __salt__:
+        _locks = __salt__["pkg.get_selections"](state="hold")
+        for lock in _locks.get("hold", []):
+            locks.update({lock: {}})
+    else:
+        return {
+            "name": name,
+            "changes": {},
+            "result": False,
+            "comment": "No any function to get the list of held packages available.\n"
+            "Check if the package manager supports package locking.",
+        }
+
+    if "pkg.hold" not in __salt__:
+        return {
+            "name": name,
+            "changes": {},
+            "result": False,
+            "comment": "`hold` function is not implemented for the package manager.",
+        }
+
+    ret = {"name": name, "changes": {}, "result": True, "comment": ""}
+    comments = []
+
+    held_pkgs = set()
+    for pkg in pkgs:
+        if isinstance(pkg, dict):
+            (pkg_name, pkg_ver) = next(iter(pkg.items()))
+        else:
+            pkg_name = pkg
+            pkg_ver = None
+        lock_ver = None
+        if pkg_name in locks and "version" in locks[pkg_name]:
+            lock_ver = locks[pkg_name]["version"]
+            lock_ver = lock_ver.lstrip("= ")
+        held_pkgs.add(pkg_name)
+        if pkg_name not in locks or (vr_lock and lock_ver != pkg_ver):
+            if __opts__["test"]:
+                if pkg_name in locks:
+                    comments.append(
+                        "The following package's hold rule would be updated: {}{}".format(
+                            pkg_name,
+                            "" if not pkg_ver else " (version = {})".format(pkg_ver),
+                        )
+                    )
+                else:
+                    comments.append(
+                        "The following package would be held: {}{}".format(
+                            pkg_name,
+                            "" if not pkg_ver else " (version = {})".format(pkg_ver),
+                        )
+                    )
+            else:
+                unhold_ret = None
+                if pkg_name in locks:
+                    unhold_ret = __salt__["pkg.unhold"](name=name, pkgs=[pkg_name])
+                hold_ret = __salt__["pkg.hold"](name=name, pkgs=[pkg])
+                if not hold_ret.get(pkg_name, {}).get("result", False):
+                    ret["result"] = False
+                if (
+                    unhold_ret
+                    and unhold_ret.get(pkg_name, {}).get("result", False)
+                    and hold_ret
+                    and hold_ret.get(pkg_name, {}).get("result", False)
+                ):
+                    comments.append(
+                        "Package {} was updated with hold rule".format(pkg_name)
+                    )
+                elif hold_ret and hold_ret.get(pkg_name, {}).get("result", False):
+                    comments.append("Package {} is now being held".format(pkg_name))
+                else:
+                    comments.append("Package {} was not held".format(pkg_name))
+                ret["changes"].update(hold_ret)
+
+    if replace:
+        for pkg_name in locks:
+            if locks[pkg_name].get("type", "package") != "package":
+                continue
+            if __opts__["test"]:
+                if pkg_name not in held_pkgs:
+                    comments.append(
+                        "The following package would be unheld: {}".format(pkg_name)
+                    )
+            else:
+                if pkg_name not in held_pkgs:
+                    unhold_ret = __salt__["pkg.unhold"](name=name, pkgs=[pkg_name])
+                    if not unhold_ret.get(pkg_name, {}).get("result", False):
+                        ret["result"] = False
+                    if unhold_ret and unhold_ret.get(pkg_name, {}).get("comment"):
+                        comments.append(unhold_ret.get(pkg_name).get("comment"))
+                    ret["changes"].update(unhold_ret)
+
+    ret["comment"] = "\n".join(comments)
+    if not (ret["changes"] or ret["comment"]):
+        ret["comment"] = "No changes made"
+
+    return ret
+
+
+def unheld(name, version=None, pkgs=None, all=False, **kwargs):
+    """
+    Unset package from 'hold' state, to allow operations with the package.
+
+    :param str name:
+        The name of the package to be unheld. This parameter is ignored if "pkgs"
+        is used.
+
+    :param str version:
+        Unhold a specific version of a package.
+        Full description of this parameter is in `installed` function.
+
+        .. note::
+
+            This parameter make sense for Zypper-based systems.
+            Ignored for YUM/DNF and APT.
+
+    :param list pkgs:
+        A list of packages to be unheld. All packages listed under ``pkgs``
+        will be unheld.
+
+        .. code-block:: yaml
+
+            mypkgs:
+              pkg.unheld:
+                - pkgs:
+                  - foo
+                  - bar: 1.2.3-4
+                  - baz
+
+        .. note::
+
+            For Zypper-based systems the package could be held for
+            the version specified. YUM/DNF and APT ingore it.
+            For ``unheld`` there is no need to specify the exact version
+            to be unheld.
+
+    :param bool all:
+        Force removing of all existings locks.
+        By default, this parameter is set to ``False``.
+    """
+
+    if isinstance(pkgs, list) and len(pkgs) == 0 and not all:
+        return {
+            "name": name,
+            "changes": {},
+            "result": True,
+            "comment": "No packages to be unheld provided",
+        }
+
+    # If just a name (and optionally a version) is passed, just pack them into
+    # the pkgs argument.
+    if name and pkgs is None:
+        pkgs = [{name: version}]
+        version = None
+
+    locks = {}
+    vr_lock = False
+    if "pkg.list_locks" in __salt__:
+        locks = __salt__["pkg.list_locks"]()
+        vr_lock = True
+    elif "pkg.list_holds" in __salt__:
+        _locks = __salt__["pkg.list_holds"](full=True)
+        lock_re = re.compile(r"^(.+)-(\d+):(.*)\.\*")
+        for lock in _locks:
+            match = lock_re.match(lock)
+            if match:
+                epoch = match.group(2)
+                if epoch == "0":
+                    epoch = ""
+                else:
+                    epoch = "{}:".format(epoch)
+                locks.update(
+                    {match.group(1): {"version": "{}{}".format(epoch, match.group(3))}}
+                )
+            else:
+                locks.update({lock: {}})
+    elif "pkg.get_selections" in __salt__:
+        _locks = __salt__["pkg.get_selections"](state="hold")
+        for lock in _locks.get("hold", []):
+            locks.update({lock: {}})
+    else:
+        return {
+            "name": name,
+            "changes": {},
+            "result": False,
+            "comment": "No any function to get the list of held packages available.\n"
+            "Check if the package manager supports package locking.",
+        }
+
+    dpkgs = {}
+    for pkg in pkgs:
+        if isinstance(pkg, dict):
+            (pkg_name, pkg_ver) = next(iter(pkg.items()))
+            dpkgs.update({pkg_name: pkg_ver})
+        else:
+            dpkgs.update({pkg: None})
+
+    ret = {"name": name, "changes": {}, "result": True, "comment": ""}
+    comments = []
+
+    for pkg_name in locks:
+        if locks[pkg_name].get("type", "package") != "package":
+            continue
+        lock_ver = None
+        if vr_lock and "version" in locks[pkg_name]:
+            lock_ver = locks[pkg_name]["version"]
+            lock_ver = lock_ver.lstrip("= ")
+        if all or (pkg_name in dpkgs and (not lock_ver or lock_ver == dpkgs[pkg_name])):
+            if __opts__["test"]:
+                comments.append(
+                    "The following package would be unheld: {}{}".format(
+                        pkg_name,
+                        ""
+                        if not dpkgs.get(pkg_name)
+                        else " (version = {})".format(lock_ver),
+                    )
+                )
+            else:
+                unhold_ret = __salt__["pkg.unhold"](name=name, pkgs=[pkg_name])
+                if not unhold_ret.get(pkg_name, {}).get("result", False):
+                    ret["result"] = False
+                if unhold_ret and unhold_ret.get(pkg_name, {}).get("comment"):
+                    comments.append(unhold_ret.get(pkg_name).get("comment"))
+                ret["changes"].update(unhold_ret)
+
+    ret["comment"] = "\n".join(comments)
+    if not (ret["changes"] or ret["comment"]):
+        ret["comment"] = "No changes made"
+
+    return ret
diff --git a/tests/pytests/unit/modules/test_zypperpkg.py b/tests/pytests/unit/modules/test_zypperpkg.py
new file mode 100644
index 0000000000..464fae1f47
--- /dev/null
+++ b/tests/pytests/unit/modules/test_zypperpkg.py
@@ -0,0 +1,142 @@
+import pytest
+import salt.modules.pkg_resource as pkg_resource
+import salt.modules.zypperpkg as zypper
+from tests.support.mock import MagicMock, patch
+
+
+@pytest.fixture
+def configure_loader_modules():
+    return {zypper: {"rpm": None}, pkg_resource: {}}
+
+
+def test_pkg_hold():
+    """
+    Tests holding packages with Zypper
+    """
+
+    # Test openSUSE 15.3
+    list_locks_mock = {
+        "bar": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+        "minimal_base": {
+            "type": "pattern",
+            "match_type": "glob",
+            "case_sensitive": "on",
+        },
+        "baz": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+    }
+
+    cmd = MagicMock(
+        return_value={
+            "pid": 1234,
+            "retcode": 0,
+            "stdout": "Specified lock has been successfully added.",
+            "stderr": "",
+        }
+    )
+    with patch.object(
+        zypper, "list_locks", MagicMock(return_value=list_locks_mock)
+    ), patch.dict(zypper.__salt__, {"cmd.run_all": cmd}):
+        ret = zypper.hold("foo")
+        assert ret["foo"]["changes"]["old"] == ""
+        assert ret["foo"]["changes"]["new"] == "hold"
+        assert ret["foo"]["comment"] == "Package foo is now being held."
+        cmd.assert_called_once_with(
+            ["zypper", "--non-interactive", "--no-refresh", "al", "foo"],
+            env={},
+            output_loglevel="trace",
+            python_shell=False,
+        )
+        cmd.reset_mock()
+        ret = zypper.hold(pkgs=["foo", "bar"])
+        assert ret["foo"]["changes"]["old"] == ""
+        assert ret["foo"]["changes"]["new"] == "hold"
+        assert ret["foo"]["comment"] == "Package foo is now being held."
+        assert ret["bar"]["changes"] == {}
+        assert ret["bar"]["comment"] == "Package bar is already set to be held."
+        cmd.assert_called_once_with(
+            ["zypper", "--non-interactive", "--no-refresh", "al", "foo"],
+            env={},
+            output_loglevel="trace",
+            python_shell=False,
+        )
+
+
+def test_pkg_unhold():
+    """
+    Tests unholding packages with Zypper
+    """
+
+    # Test openSUSE 15.3
+    list_locks_mock = {
+        "bar": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+        "minimal_base": {
+            "type": "pattern",
+            "match_type": "glob",
+            "case_sensitive": "on",
+        },
+        "baz": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+    }
+
+    cmd = MagicMock(
+        return_value={
+            "pid": 1234,
+            "retcode": 0,
+            "stdout": "1 lock has been successfully removed.",
+            "stderr": "",
+        }
+    )
+    with patch.object(
+        zypper, "list_locks", MagicMock(return_value=list_locks_mock)
+    ), patch.dict(zypper.__salt__, {"cmd.run_all": cmd}):
+        ret = zypper.unhold("foo")
+        assert ret["foo"]["comment"] == "Package foo was already unheld."
+        cmd.assert_not_called()
+        cmd.reset_mock()
+        ret = zypper.unhold(pkgs=["foo", "bar"])
+        assert ret["foo"]["changes"] == {}
+        assert ret["foo"]["comment"] == "Package foo was already unheld."
+        assert ret["bar"]["changes"]["old"] == "hold"
+        assert ret["bar"]["changes"]["new"] == ""
+        assert ret["bar"]["comment"] == "Package bar is no longer held."
+        cmd.assert_called_once_with(
+            ["zypper", "--non-interactive", "--no-refresh", "rl", "bar"],
+            env={},
+            output_loglevel="trace",
+            python_shell=False,
+        )
+
+
+def test_pkg_list_holds():
+    """
+    Tests listing of calculated held packages with Zypper
+    """
+
+    # Test openSUSE 15.3
+    list_locks_mock = {
+        "bar": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+        "minimal_base": {
+            "type": "pattern",
+            "match_type": "glob",
+            "case_sensitive": "on",
+        },
+        "baz": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+    }
+    installed_pkgs = {
+        "foo": [{"edition": "1.2.3-1.1"}],
+        "bar": [{"edition": "2.3.4-2.1", "epoch": "2"}],
+    }
+
+    def zypper_search_mock(name, *_args, **_kwargs):
+        if name in installed_pkgs:
+            return {name: installed_pkgs.get(name)}
+
+    with patch.object(
+        zypper, "list_locks", MagicMock(return_value=list_locks_mock)
+    ), patch.object(
+        zypper, "search", MagicMock(side_effect=zypper_search_mock)
+    ), patch.object(
+        zypper, "info_installed", MagicMock(side_effect=zypper_search_mock)
+    ):
+        ret = zypper.list_holds()
+        assert len(ret) == 1
+        assert "bar-2:2.3.4-2.1.*" in ret
diff --git a/tests/pytests/unit/states/test_pkg.py b/tests/pytests/unit/states/test_pkg.py
new file mode 100644
index 0000000000..faf42c4681
--- /dev/null
+++ b/tests/pytests/unit/states/test_pkg.py
@@ -0,0 +1,155 @@
+import pytest
+import salt.states.pkg as pkg
+from tests.support.mock import MagicMock, patch
+
+
+@pytest.fixture
+def configure_loader_modules():
+    return {
+        pkg: {
+            "__env__": "base",
+            "__salt__": {},
+            "__grains__": {"os": "CentOS"},
+            "__opts__": {"test": False, "cachedir": ""},
+            "__instance_id__": "",
+            "__low__": {},
+            "__utils__": {},
+        },
+    }
+
+
+@pytest.mark.parametrize(
+    "package_manager", [("Zypper"), ("YUM/DNF"), ("APT")],
+)
+def test_held_unheld(package_manager):
+    """
+    Test pkg.held and pkg.unheld with Zypper, YUM/DNF and APT
+    """
+
+    if package_manager == "Zypper":
+        list_holds_func = "pkg.list_locks"
+        list_holds_mock = MagicMock(
+            return_value={
+                "bar": {
+                    "type": "package",
+                    "match_type": "glob",
+                    "case_sensitive": "on",
+                },
+                "minimal_base": {
+                    "type": "pattern",
+                    "match_type": "glob",
+                    "case_sensitive": "on",
+                },
+                "baz": {
+                    "type": "package",
+                    "match_type": "glob",
+                    "case_sensitive": "on",
+                },
+            }
+        )
+    elif package_manager == "YUM/DNF":
+        list_holds_func = "pkg.list_holds"
+        list_holds_mock = MagicMock(
+            return_value=["bar-0:1.2.3-1.1.*", "baz-0:2.3.4-2.1.*"]
+        )
+    elif package_manager == "APT":
+        list_holds_func = "pkg.get_selections"
+        list_holds_mock = MagicMock(return_value={"hold": ["bar", "baz"]})
+
+    def pkg_hold(name, pkgs=None, *_args, **__kwargs):
+        if name and pkgs is None:
+            pkgs = [name]
+        ret = {}
+        for pkg in pkgs:
+            ret.update(
+                {
+                    pkg: {
+                        "name": pkg,
+                        "changes": {"new": "hold", "old": ""},
+                        "result": True,
+                        "comment": "Package {} is now being held.".format(pkg),
+                    }
+                }
+            )
+        return ret
+
+    def pkg_unhold(name, pkgs=None, *_args, **__kwargs):
+        if name and pkgs is None:
+            pkgs = [name]
+        ret = {}
+        for pkg in pkgs:
+            ret.update(
+                {
+                    pkg: {
+                        "name": pkg,
+                        "changes": {"new": "", "old": "hold"},
+                        "result": True,
+                        "comment": "Package {} is no longer held.".format(pkg),
+                    }
+                }
+            )
+        return ret
+
+    hold_mock = MagicMock(side_effect=pkg_hold)
+    unhold_mock = MagicMock(side_effect=pkg_unhold)
+
+    # Testing with Zypper
+    with patch.dict(
+        pkg.__salt__,
+        {
+            list_holds_func: list_holds_mock,
+            "pkg.hold": hold_mock,
+            "pkg.unhold": unhold_mock,
+        },
+    ):
+        # Holding one of two packages
+        ret = pkg.held("held-test", pkgs=["foo", "bar"])
+        assert "foo" in ret["changes"]
+        assert len(ret["changes"]) == 1
+        hold_mock.assert_called_once_with(name="held-test", pkgs=["foo"])
+        unhold_mock.assert_not_called()
+
+        hold_mock.reset_mock()
+        unhold_mock.reset_mock()
+
+        # Holding one of two packages and replacing all the rest held packages
+        ret = pkg.held("held-test", pkgs=["foo", "bar"], replace=True)
+        assert "foo" in ret["changes"]
+        assert "baz" in ret["changes"]
+        assert len(ret["changes"]) == 2
+        hold_mock.assert_called_once_with(name="held-test", pkgs=["foo"])
+        unhold_mock.assert_called_once_with(name="held-test", pkgs=["baz"])
+
+        hold_mock.reset_mock()
+        unhold_mock.reset_mock()
+
+        # Remove all holds
+        ret = pkg.held("held-test", pkgs=[], replace=True)
+        assert "bar" in ret["changes"]
+        assert "baz" in ret["changes"]
+        assert len(ret["changes"]) == 2
+        hold_mock.assert_not_called()
+        unhold_mock.assert_any_call(name="held-test", pkgs=["baz"])
+        unhold_mock.assert_any_call(name="held-test", pkgs=["bar"])
+
+        hold_mock.reset_mock()
+        unhold_mock.reset_mock()
+
+        # Unolding one of two packages
+        ret = pkg.unheld("held-test", pkgs=["foo", "bar"])
+        assert "bar" in ret["changes"]
+        assert len(ret["changes"]) == 1
+        unhold_mock.assert_called_once_with(name="held-test", pkgs=["bar"])
+        hold_mock.assert_not_called()
+
+        hold_mock.reset_mock()
+        unhold_mock.reset_mock()
+
+        # Remove all holds
+        ret = pkg.unheld("held-test", all=True)
+        assert "bar" in ret["changes"]
+        assert "baz" in ret["changes"]
+        assert len(ret["changes"]) == 2
+        hold_mock.assert_not_called()
+        unhold_mock.assert_any_call(name="held-test", pkgs=["baz"])
+        unhold_mock.assert_any_call(name="held-test", pkgs=["bar"])
-- 
2.32.0