File implementation-of-held-unheld-functions-for-state-pk.patch of Package salt.23568

From 3db1d2e6a3149291959991a583159c05a6a1e5ef Mon Sep 17 00:00:00 2001
From: Victor Zhestkov <35733135+vzhestkov@users.noreply.github.com>
Date: Mon, 5 Jul 2021 18:39:49 +0300
Subject: [PATCH] Implementation of held/unheld functions for state pkg
 - 3000 (#388)

* Implementation of held/unheld functions for state pkg
---
 salt/modules/zypperpkg.py            | 216 ++++++++++++++-----
 salt/states/pkg.py                   | 310 +++++++++++++++++++++++++++
 tests/unit/modules/test_zypperpkg.py | 133 +++++++++++-
 tests/unit/states/test_pkg.py        | 145 +++++++++++++
 4 files changed, 748 insertions(+), 56 deletions(-)

diff --git a/salt/modules/zypperpkg.py b/salt/modules/zypperpkg.py
index 325999360d..7936993a6b 100644
--- a/salt/modules/zypperpkg.py
+++ b/salt/modules/zypperpkg.py
@@ -1963,6 +1963,76 @@ def purge(name=None, pkgs=None, root=None, inclusion_detection=False, **kwargs):
     return _uninstall(inclusion_detection, name=name, pkgs=pkgs, root=root)
 
 
+def list_holds(pattern=None, full=True, root=None, **kwargs):
+    """
+    List information on locked packages.
+
+    .. note::
+        This function returns the computed output of ``list_locks``
+        to show exact locked packages.
+
+    pattern
+        Regular expression used to match the package name
+
+    full : True
+        Show the full hold definition including version and epoch. Set to
+        ``False`` to return just the name of the package(s) being held.
+
+    root
+        Operate on a different root directory.
+
+
+    CLI Example:
+
+    .. code-block:: bash
+
+        salt '*' pkg.list_holds
+        salt '*' pkg.list_holds full=False
+    """
+    locks = list_locks(root=root)
+    ret = []
+    inst_pkgs = {}
+    for solv_name, lock in locks.items():
+        if lock.get("type", "package") != "package":
+            continue
+        try:
+            found_pkgs = search(
+                solv_name,
+                root=root,
+                match=None if "*" in solv_name else "exact",
+                case_sensitive=(lock.get("case_sensitive", "on") == "on"),
+                installed_only=True,
+                details=True,
+                all_versions=True,
+                ignore_no_matching_item=True,
+            )
+        except CommandExecutionError:
+            continue
+        if found_pkgs:
+            for pkg in found_pkgs:
+                if pkg not in inst_pkgs:
+                    inst_pkgs.update(
+                        info_installed(
+                            pkg, root=root, attr="edition,epoch", all_versions=True
+                        )
+                    )
+
+    ptrn_re = re.compile(r"{}-\S+".format(pattern)) if pattern else None
+    for pkg_name, pkg_editions in inst_pkgs.items():
+        for pkg_info in pkg_editions:
+            pkg_ret = (
+                "{}-{}:{}.*".format(
+                    pkg_name, pkg_info.get("epoch", 0), pkg_info.get("edition")
+                )
+                if full
+                else pkg_name
+            )
+            if pkg_ret not in ret and (not ptrn_re or ptrn_re.match(pkg_ret)):
+                ret.append(pkg_ret)
+
+    return ret
+
+
 def list_locks(root=None):
     '''
     List current package locks.
@@ -2031,46 +2101,72 @@ def clean_locks(root=None):
     return out
 
 
-def unhold(name=None, pkgs=None, **kwargs):
-    '''
-    Remove specified package lock.
+def unhold(name=None, pkgs=None, root=None, **kwargs):
+    """
+    Remove a package hold.
+
+    name
+        A package name to unhold, or a comma-separated list of package names to
+        unhold.
+
+    pkgs
+        A list of packages to unhold.  The ``name`` parameter will be ignored if
+        this option is passed.
 
     root
-        operate on a different root directory.
+        Operate on a different root directory.
+
 
     CLI Example:
 
     .. code-block:: bash
 
-        salt '*' pkg.remove_lock <package name>
-        salt '*' pkg.remove_lock <package1>,<package2>,<package3>
-        salt '*' pkg.remove_lock pkgs='["foo", "bar"]'
-    '''
+        salt '*' pkg.unhold <package name>
+        salt '*' pkg.unhold <package1>,<package2>,<package3>
+        salt '*' pkg.unhold pkgs='["foo", "bar"]'
+    """
     ret = {}
-    root = kwargs.get('root')
-    if (not name and not pkgs) or (name and pkgs):
-        raise CommandExecutionError('Name or packages must be specified.')
-    elif name:
-        pkgs = [name]
+    if not name and not pkgs:
+        raise CommandExecutionError("Name or packages must be specified.")
 
-    locks = list_locks(root)
-    try:
-        pkgs = list(__salt__['pkg_resource.parse_targets'](pkgs)[0].keys())
-    except MinionError as exc:
-        raise CommandExecutionError(exc)
+    targets = []
+    if pkgs:
+        targets.extend(pkgs)
+    else:
+        targets.append(name)
 
+    locks = list_locks(root=root)
     removed = []
-    missing = []
-    for pkg in pkgs:
-        if locks.get(pkg):
-            removed.append(pkg)
-            ret[pkg]['comment'] = 'Package {0} is no longer held.'.format(pkg)
+
+    for target in targets:
+        version = None
+        if isinstance(target, dict):
+            (target, version) = next(iter(target.items()))
+        ret[target] = {"name": target, "changes": {}, "result": True, "comment": ""}
+        if locks.get(target):
+            lock_ver = None
+            if "version" in locks.get(target):
+                lock_ver = locks.get(target)["version"]
+                lock_ver = lock_ver.lstrip("= ")
+            if version and lock_ver != version:
+                ret[target]["result"] = False
+                ret[target][
+                    "comment"
+                ] = "Unable to unhold package {} as it is held with the other version.".format(
+                    target
+                )
+            else:
+                removed.append(
+                    target if not lock_ver else "{}={}".format(target, lock_ver)
+                )
+                ret[target]["changes"]["new"] = ""
+                ret[target]["changes"]["old"] = "hold"
+                ret[target]["comment"] = "Package {} is no longer held.".format(target)
         else:
-            missing.append(pkg)
-            ret[pkg]['comment'] = 'Package {0} unable to be unheld.'.format(pkg)
+            ret[target]["comment"] = "Package {} was already unheld.".format(target)
 
     if removed:
-        __zypper__(root=root).call('rl', *removed)
+        __zypper__(root=root).call("rl", *removed)
 
     return ret
 
@@ -2111,50 +2207,60 @@ def remove_lock(packages, root=None, **kwargs):  # pylint: disable=unused-argume
     return {'removed': len(removed), 'not_found': missing}
 
 
-def hold(name=None, pkgs=None, **kwargs):
-    '''
-    Add a package lock. Specify packages to lock by exact name.
+def hold(name=None, pkgs=None, root=None, **kwargs):
+    """
+    Add a package hold.  Specify one of ``name`` and ``pkgs``.
+
+    name
+        A package name to hold, or a comma-separated list of package names to
+        hold.
+
+    pkgs
+        A list of packages to hold.  The ``name`` parameter will be ignored if
+        this option is passed.
 
     root
-        operate on a different root directory.
+        Operate on a different root directory.
+
 
     CLI Example:
 
     .. code-block:: bash
 
-        salt '*' pkg.add_lock <package name>
-        salt '*' pkg.add_lock <package1>,<package2>,<package3>
-        salt '*' pkg.add_lock pkgs='["foo", "bar"]'
-
-    :param name:
-    :param pkgs:
-    :param kwargs:
-    :return:
-    '''
+        salt '*' pkg.hold <package name>
+        salt '*' pkg.hold <package1>,<package2>,<package3>
+        salt '*' pkg.hold pkgs='["foo", "bar"]'
+    """
     ret = {}
-    root = kwargs.get('root')
-    if (not name and not pkgs) or (name and pkgs):
-        raise CommandExecutionError('Name or packages must be specified.')
-    elif name:
-        pkgs = [name]
+    if not name and not pkgs:
+        raise CommandExecutionError("Name or packages must be specified.")
+
+    targets = []
+    if pkgs:
+        targets.extend(pkgs)
+    else:
+        targets.append(name)
 
     locks = list_locks(root=root)
     added = []
-    try:
-        pkgs = list(__salt__['pkg_resource.parse_targets'](pkgs)[0].keys())
-    except MinionError as exc:
-        raise CommandExecutionError(exc)
 
-    for pkg in pkgs:
-        ret[pkg] = {'name': pkg, 'changes': {}, 'result': False, 'comment': ''}
-        if not locks.get(pkg):
-            added.append(pkg)
-            ret[pkg]['comment'] = 'Package {0} is now being held.'.format(pkg)
+    for target in targets:
+        version = None
+        if isinstance(target, dict):
+            (target, version) = next(iter(target.items()))
+        ret[target] = {"name": target, "changes": {}, "result": True, "comment": ""}
+        if not locks.get(target):
+            added.append(target if not version else "{}={}".format(target, version))
+            ret[target]["changes"]["new"] = "hold"
+            ret[target]["changes"]["old"] = ""
+            ret[target]["comment"] = "Package {} is now being held.".format(target)
         else:
-            ret[pkg]['comment'] = 'Package {0} is already set to be held.'.format(pkg)
+            ret[target]["comment"] = "Package {} is already set to be held.".format(
+                target
+            )
 
     if added:
-        __zypper__(root=root).call('al', *added)
+        __zypper__(root=root).call("al", *added)
 
     return ret
 
diff --git a/salt/states/pkg.py b/salt/states/pkg.py
index 71ba29a27c..4d3b9f14bf 100644
--- a/salt/states/pkg.py
+++ b/salt/states/pkg.py
@@ -3350,3 +3350,313 @@ def mod_watch(name, **kwargs):
             'changes': {},
             'comment': 'pkg.{0} does not work with the watch requisite'.format(sfun),
             'result': False}
+
+
+def held(name, version=None, pkgs=None, replace=False, **kwargs):
+    """
+    Set package in 'hold' state, meaning it will not be changed.
+
+    :param str name:
+        The name of the package to be held. This parameter is ignored
+        if ``pkgs`` is used.
+
+    :param str version:
+        Hold a specific version of a package.
+        Full description of this parameter is in `installed` function.
+
+        .. note::
+
+            This parameter make sense for Zypper-based systems.
+            Ignored for YUM/DNF and APT
+
+    :param list pkgs:
+        A list of packages to be held. All packages listed under ``pkgs``
+        will be held.
+
+        .. code-block:: yaml
+
+            mypkgs:
+              pkg.held:
+                - pkgs:
+                  - foo
+                  - bar: 1.2.3-4
+                  - baz
+
+        .. note::
+
+            For Zypper-based systems the package could be held for
+            the version specified. YUM/DNF and APT ingore it.
+
+    :param bool replace:
+        Force replacement of existings holds with specified.
+        By default, this parameter is set to ``False``.
+    """
+
+    if isinstance(pkgs, list) and len(pkgs) == 0 and not replace:
+        return {
+            "name": name,
+            "changes": {},
+            "result": True,
+            "comment": "No packages to be held provided",
+        }
+
+    # If just a name (and optionally a version) is passed, just pack them into
+    # the pkgs argument.
+    if name and pkgs is None:
+        if version:
+            pkgs = [{name: version}]
+            version = None
+        else:
+            pkgs = [name]
+
+    locks = {}
+    vr_lock = False
+    if "pkg.list_locks" in __salt__:
+        locks = __salt__["pkg.list_locks"]()
+        vr_lock = True
+    elif "pkg.list_holds" in __salt__:
+        _locks = __salt__["pkg.list_holds"](full=True)
+        lock_re = re.compile(r"^(.+)-(\d+):(.*)\.\*")
+        for lock in _locks:
+            match = lock_re.match(lock)
+            if match:
+                epoch = match.group(2)
+                if epoch == "0":
+                    epoch = ""
+                else:
+                    epoch = "{}:".format(epoch)
+                locks.update(
+                    {match.group(1): {"version": "{}{}".format(epoch, match.group(3))}}
+                )
+            else:
+                locks.update({lock: {}})
+    elif "pkg.get_selections" in __salt__:
+        _locks = __salt__["pkg.get_selections"](state="hold")
+        for lock in _locks.get("hold", []):
+            locks.update({lock: {}})
+    else:
+        return {
+            "name": name,
+            "changes": {},
+            "result": False,
+            "comment": "No any function to get the list of held packages available.\n"
+            "Check if the package manager supports package locking.",
+        }
+
+    if "pkg.hold" not in __salt__:
+        return {
+            "name": name,
+            "changes": {},
+            "result": False,
+            "comment": "`hold` function is not implemented for the package manager.",
+        }
+
+    ret = {"name": name, "changes": {}, "result": True, "comment": ""}
+    comments = []
+
+    held_pkgs = set()
+    for pkg in pkgs:
+        if isinstance(pkg, dict):
+            (pkg_name, pkg_ver) = next(iter(pkg.items()))
+        else:
+            pkg_name = pkg
+            pkg_ver = None
+        lock_ver = None
+        if pkg_name in locks and "version" in locks[pkg_name]:
+            lock_ver = locks[pkg_name]["version"]
+            lock_ver = lock_ver.lstrip("= ")
+        held_pkgs.add(pkg_name)
+        if pkg_name not in locks or (vr_lock and lock_ver != pkg_ver):
+            if __opts__["test"]:
+                if pkg_name in locks:
+                    comments.append(
+                        "The following package's hold rule would be updated: {}{}".format(
+                            pkg_name,
+                            "" if not pkg_ver else " (version = {})".format(pkg_ver),
+                        )
+                    )
+                else:
+                    comments.append(
+                        "The following package would be held: {}{}".format(
+                            pkg_name,
+                            "" if not pkg_ver else " (version = {})".format(pkg_ver),
+                        )
+                    )
+            else:
+                unhold_ret = None
+                if pkg_name in locks:
+                    unhold_ret = __salt__["pkg.unhold"](name=name, pkgs=[pkg_name])
+                hold_ret = __salt__["pkg.hold"](name=name, pkgs=[pkg])
+                if not hold_ret.get(pkg_name, {}).get("result", False):
+                    ret["result"] = False
+                if (
+                    unhold_ret
+                    and unhold_ret.get(pkg_name, {}).get("result", False)
+                    and hold_ret
+                    and hold_ret.get(pkg_name, {}).get("result", False)
+                ):
+                    comments.append(
+                        "Package {} was updated with hold rule".format(pkg_name)
+                    )
+                elif hold_ret and hold_ret.get(pkg_name, {}).get("result", False):
+                    comments.append("Package {} is now being held".format(pkg_name))
+                else:
+                    comments.append("Package {} was not held".format(pkg_name))
+                ret["changes"].update(hold_ret)
+
+    if replace:
+        for pkg_name in locks:
+            if locks[pkg_name].get("type", "package") != "package":
+                continue
+            if __opts__["test"]:
+                if pkg_name not in held_pkgs:
+                    comments.append(
+                        "The following package would be unheld: {}".format(pkg_name)
+                    )
+            else:
+                if pkg_name not in held_pkgs:
+                    unhold_ret = __salt__["pkg.unhold"](name=name, pkgs=[pkg_name])
+                    if not unhold_ret.get(pkg_name, {}).get("result", False):
+                        ret["result"] = False
+                    if unhold_ret and unhold_ret.get(pkg_name, {}).get("comment"):
+                        comments.append(unhold_ret.get(pkg_name).get("comment"))
+                    ret["changes"].update(unhold_ret)
+
+    ret["comment"] = "\n".join(comments)
+    if not (ret["changes"] or ret["comment"]):
+        ret["comment"] = "No changes made"
+
+    return ret
+
+
+def unheld(name, version=None, pkgs=None, all=False, **kwargs):
+    """
+    Unset package from 'hold' state, to allow operations with the package.
+
+    :param str name:
+        The name of the package to be unheld. This parameter is ignored
+        if ``pkgs`` is used.
+
+    :param str version:
+        Unhold a specific version of a package.
+        Full description of this parameter is in `installed` function.
+
+        .. note::
+
+            This parameter make sense for Zypper-based systems.
+            Ignored for YUM/DNF and APT
+
+    :param list pkgs:
+        A list of packages to be unheld. All packages listed under ``pkgs``
+        will be unheld.
+
+        .. code-block:: yaml
+
+            mypkgs:
+              pkg.unheld:
+                - pkgs:
+                  - foo
+                  - bar: 1.2.3-4
+                  - baz
+
+        .. note::
+
+            For Zypper-based systems the package could be held for
+            the version specified. YUM/DNF and APT ingore it.
+            For ``unheld`` there is no need to specify the exact version
+            to be unheld.
+
+    :param bool all:
+        Force removing of all existings locks.
+        By default, this parameter is set to ``False``.
+    """
+
+    if isinstance(pkgs, list) and len(pkgs) == 0 and not all:
+        return {
+            "name": name,
+            "changes": {},
+            "result": True,
+            "comment": "No packages to be unheld provided",
+        }
+
+    # If just a name (and optionally a version) is passed, just pack them into
+    # the pkgs argument.
+    if name and pkgs is None:
+        pkgs = [{name: version}]
+        version = None
+
+    locks = {}
+    vr_lock = False
+    if "pkg.list_locks" in __salt__:
+        locks = __salt__["pkg.list_locks"]()
+        vr_lock = True
+    elif "pkg.list_holds" in __salt__:
+        _locks = __salt__["pkg.list_holds"](full=True)
+        lock_re = re.compile(r"^(.+)-(\d+):(.*)\.\*")
+        for lock in _locks:
+            match = lock_re.match(lock)
+            if match:
+                epoch = match.group(2)
+                if epoch == "0":
+                    epoch = ""
+                else:
+                    epoch = "{}:".format(epoch)
+                locks.update(
+                    {match.group(1): {"version": "{}{}".format(epoch, match.group(3))}}
+                )
+            else:
+                locks.update({lock: {}})
+    elif "pkg.get_selections" in __salt__:
+        _locks = __salt__["pkg.get_selections"](state="hold")
+        for lock in _locks.get("hold", []):
+            locks.update({lock: {}})
+    else:
+        return {
+            "name": name,
+            "changes": {},
+            "result": False,
+            "comment": "No any function to get the list of held packages available.\n"
+            "Check if the package manager supports package locking.",
+        }
+
+    dpkgs = {}
+    for pkg in pkgs:
+        if isinstance(pkg, dict):
+            (pkg_name, pkg_ver) = next(iter(pkg.items()))
+            dpkgs.update({pkg_name: pkg_ver})
+        else:
+            dpkgs.update({pkg: None})
+
+    ret = {"name": name, "changes": {}, "result": True, "comment": ""}
+    comments = []
+
+    for pkg_name in locks:
+        if locks[pkg_name].get("type", "package") != "package":
+            continue
+        lock_ver = None
+        if vr_lock and "version" in locks[pkg_name]:
+            lock_ver = locks[pkg_name]["version"]
+            lock_ver = lock_ver.lstrip("= ")
+        if all or (pkg_name in dpkgs and (not lock_ver or lock_ver == dpkgs[pkg_name])):
+            if __opts__["test"]:
+                comments.append(
+                    "The following package would be unheld: {}{}".format(
+                        pkg_name,
+                        ""
+                        if not dpkgs.get(pkg_name)
+                        else " (version = {})".format(lock_ver),
+                    )
+                )
+            else:
+                unhold_ret = __salt__["pkg.unhold"](name=name, pkgs=[pkg_name])
+                if not unhold_ret.get(pkg_name, {}).get("result", False):
+                    ret["result"] = False
+                if unhold_ret and unhold_ret.get(pkg_name, {}).get("comment"):
+                    comments.append(unhold_ret.get(pkg_name).get("comment"))
+                ret["changes"].update(unhold_ret)
+
+    ret["comment"] = "\n".join(comments)
+    if not (ret["changes"] or ret["comment"]):
+        ret["comment"] = "No changes made"
+
+    return ret
diff --git a/tests/unit/modules/test_zypperpkg.py b/tests/unit/modules/test_zypperpkg.py
index dfe7f4f7a1..10d90660c6 100644
--- a/tests/unit/modules/test_zypperpkg.py
+++ b/tests/unit/modules/test_zypperpkg.py
@@ -1942,7 +1942,6 @@ pattern() = package-c"""
                 python_shell=False,
                 env={"ZYPP_READONLY_HACK": "1"},
             )
-            self.assertEqual(zypper.__context__, {"pkg.other_data": None})
 
     def test_get_repo_keys(self):
         salt_mock = {"lowpkg.list_gpg_keys": MagicMock(return_value=True)}
@@ -1994,3 +1993,135 @@ pattern() = package-c"""
         with patch("salt.modules.zypperpkg.__zypper__", zypper_mock):
             assert zypper.services_need_restart() == expected
             zypper_mock(root=None).nolock.call.assert_called_with("ps", "-sss")
+
+    def test_pkg_hold(self):
+        """
+        Tests holding packages with Zypper
+        """
+
+        # Test openSUSE 15.3
+        list_locks_mock = {
+            "bar": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+            "minimal_base": {
+                "type": "pattern",
+                "match_type": "glob",
+                "case_sensitive": "on",
+            },
+            "baz": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+        }
+
+        cmd = MagicMock(
+            return_value={
+                "pid": 1234,
+                "retcode": 0,
+                "stdout": "Specified lock has been successfully added.",
+                "stderr": "",
+            }
+        )
+        with patch.object(
+            zypper, "list_locks", MagicMock(return_value=list_locks_mock)
+        ), patch.dict(zypper.__salt__, {"cmd.run_all": cmd}):
+            ret = zypper.hold("foo")
+            assert ret["foo"]["changes"]["old"] == ""
+            assert ret["foo"]["changes"]["new"] == "hold"
+            assert ret["foo"]["comment"] == "Package foo is now being held."
+            cmd.assert_called_once_with(
+                ["zypper", "--non-interactive", "--no-refresh", "al", "foo"],
+                env={},
+                output_loglevel="trace",
+                python_shell=False,
+            )
+            cmd.reset_mock()
+            ret = zypper.hold(pkgs=["foo", "bar"])
+            assert ret["foo"]["changes"]["old"] == ""
+            assert ret["foo"]["changes"]["new"] == "hold"
+            assert ret["foo"]["comment"] == "Package foo is now being held."
+            assert ret["bar"]["changes"] == {}
+            assert ret["bar"]["comment"] == "Package bar is already set to be held."
+            cmd.assert_called_once_with(
+                ["zypper", "--non-interactive", "--no-refresh", "al", "foo"],
+                env={},
+                output_loglevel="trace",
+                python_shell=False,
+            )
+
+
+    def test_pkg_unhold(self):
+        """
+        Tests unholding packages with Zypper
+        """
+
+        # Test openSUSE 15.3
+        list_locks_mock = {
+            "bar": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+            "minimal_base": {
+                "type": "pattern",
+                "match_type": "glob",
+                "case_sensitive": "on",
+            },
+            "baz": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+        }
+
+        cmd = MagicMock(
+            return_value={
+                "pid": 1234,
+                "retcode": 0,
+                "stdout": "1 lock has been successfully removed.",
+                "stderr": "",
+            }
+        )
+        with patch.object(
+            zypper, "list_locks", MagicMock(return_value=list_locks_mock)
+        ), patch.dict(zypper.__salt__, {"cmd.run_all": cmd}):
+            ret = zypper.unhold("foo")
+            assert ret["foo"]["comment"] == "Package foo was already unheld."
+            cmd.assert_not_called()
+            cmd.reset_mock()
+            ret = zypper.unhold(pkgs=["foo", "bar"])
+            assert ret["foo"]["changes"] == {}
+            assert ret["foo"]["comment"] == "Package foo was already unheld."
+            assert ret["bar"]["changes"]["old"] == "hold"
+            assert ret["bar"]["changes"]["new"] == ""
+            assert ret["bar"]["comment"] == "Package bar is no longer held."
+            cmd.assert_called_once_with(
+                ["zypper", "--non-interactive", "--no-refresh", "rl", "bar"],
+                env={},
+                output_loglevel="trace",
+                python_shell=False,
+            )
+
+
+    def test_pkg_list_holds(self):
+        """
+        Tests listing of calculated held packages with Zypper
+        """
+
+        # Test openSUSE 15.3
+        list_locks_mock = {
+            "bar": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+            "minimal_base": {
+                "type": "pattern",
+                "match_type": "glob",
+                "case_sensitive": "on",
+            },
+            "baz": {"type": "package", "match_type": "glob", "case_sensitive": "on"},
+        }
+        installed_pkgs = {
+            "foo": [{"edition": "1.2.3-1.1"}],
+            "bar": [{"edition": "2.3.4-2.1", "epoch": "2"}],
+        }
+
+        def zypper_search_mock(name, *_args, **_kwargs):
+            if name in installed_pkgs:
+                return {name: installed_pkgs.get(name)}
+
+        with patch.object(
+            zypper, "list_locks", MagicMock(return_value=list_locks_mock)
+        ), patch.object(
+            zypper, "search", MagicMock(side_effect=zypper_search_mock)
+        ), patch.object(
+            zypper, "info_installed", MagicMock(side_effect=zypper_search_mock)
+        ):
+            ret = zypper.list_holds()
+            assert len(ret) == 1
+            assert "bar-2:2.3.4-2.1.*" in ret
diff --git a/tests/unit/states/test_pkg.py b/tests/unit/states/test_pkg.py
index 38f72353fa..41d74a4706 100644
--- a/tests/unit/states/test_pkg.py
+++ b/tests/unit/states/test_pkg.py
@@ -232,3 +232,148 @@ class PkgTestCase(TestCase, LoaderModuleMockMixin):
         for installed_versions, operator, version, expected_result in test_parameters:
             msg = "installed_versions: {}, operator: {}, version: {}, expected_result: {}".format(installed_versions, operator, version, expected_result)
             self.assertEqual(expected_result, pkg._fulfills_version_spec(installed_versions, operator, version), msg)
+
+    def test_held_unheld_Zypper(self):
+        self.pkgr_held_unheld("Zypper")
+
+    def test_held_unheld_YUM(self):
+        self.pkgr_held_unheld("YUM/DNF")
+
+    def test_held_unheld_APT(self):
+        self.pkgr_held_unheld("APT")
+
+    def pkgr_held_unheld(self, package_manager):
+        """
+        Test pkg.held and pkg.unheld with Zypper, YUM/DNF and APT
+        """
+
+        if package_manager == "Zypper":
+            list_holds_func = "pkg.list_locks"
+            list_holds_mock = MagicMock(
+                return_value={
+                    "bar": {
+                        "type": "package",
+                        "match_type": "glob",
+                        "case_sensitive": "on",
+                    },
+                    "minimal_base": {
+                        "type": "pattern",
+                        "match_type": "glob",
+                        "case_sensitive": "on",
+                    },
+                    "baz": {
+                        "type": "package",
+                        "match_type": "glob",
+                        "case_sensitive": "on",
+                    },
+                }
+            )
+        elif package_manager == "YUM/DNF":
+            list_holds_func = "pkg.list_holds"
+            list_holds_mock = MagicMock(
+                return_value=["bar-0:1.2.3-1.1.*", "baz-0:2.3.4-2.1.*"]
+            )
+        elif package_manager == "APT":
+            list_holds_func = "pkg.get_selections"
+            list_holds_mock = MagicMock(return_value={"hold": ["bar", "baz"]})
+
+        def pkg_hold(name, pkgs=None, *_args, **__kwargs):
+            if name and pkgs is None:
+                pkgs = [name]
+            ret = {}
+            for pkg in pkgs:
+                ret.update(
+                    {
+                        pkg: {
+                            "name": pkg,
+                            "changes": {"new": "hold", "old": ""},
+                            "result": True,
+                            "comment": "Package {} is now being held.".format(pkg),
+                        }
+                    }
+                )
+            return ret
+
+        def pkg_unhold(name, pkgs=None, *_args, **__kwargs):
+            if name and pkgs is None:
+                pkgs = [name]
+            ret = {}
+            for pkg in pkgs:
+                ret.update(
+                    {
+                        pkg: {
+                            "name": pkg,
+                            "changes": {"new": "", "old": "hold"},
+                            "result": True,
+                            "comment": "Package {} is no longer held.".format(pkg),
+                        }
+                    }
+                )
+            return ret
+        hold_mock = MagicMock(side_effect=pkg_hold)
+        unhold_mock = MagicMock(side_effect=pkg_unhold)
+
+        # Testing with Zypper
+        with patch.dict(
+            pkg.__salt__,
+            {
+                list_holds_func: list_holds_mock,
+                "pkg.hold": hold_mock,
+                "pkg.unhold": unhold_mock,
+            },
+        ), patch.dict(
+            pkg.__opts__, {
+                "test": False,
+            }
+        ):
+            # Holding one of two packages
+            ret = pkg.held("held-test", pkgs=["foo", "bar"])
+            assert "foo" in ret["changes"]
+            assert len(ret["changes"]) == 1
+            hold_mock.assert_called_once_with(name="held-test", pkgs=["foo"])
+            unhold_mock.assert_not_called()
+
+            hold_mock.reset_mock()
+            unhold_mock.reset_mock()
+
+            # Holding one of two packages and replacing all the rest held packages
+            ret = pkg.held("held-test", pkgs=["foo", "bar"], replace=True)
+            assert "foo" in ret["changes"]
+            assert "baz" in ret["changes"]
+            assert len(ret["changes"]) == 2
+            hold_mock.assert_called_once_with(name="held-test", pkgs=["foo"])
+            unhold_mock.assert_called_once_with(name="held-test", pkgs=["baz"])
+
+            hold_mock.reset_mock()
+            unhold_mock.reset_mock()
+
+            # Remove all holds
+            ret = pkg.held("held-test", pkgs=[], replace=True)
+            assert "bar" in ret["changes"]
+            assert "baz" in ret["changes"]
+            assert len(ret["changes"]) == 2
+            hold_mock.assert_not_called()
+            unhold_mock.assert_any_call(name="held-test", pkgs=["baz"])
+            unhold_mock.assert_any_call(name="held-test", pkgs=["bar"])
+
+            hold_mock.reset_mock()
+            unhold_mock.reset_mock()
+
+            # Unolding one of two packages
+            ret = pkg.unheld("held-test", pkgs=["foo", "bar"])
+            assert "bar" in ret["changes"]
+            assert len(ret["changes"]) == 1
+            unhold_mock.assert_called_once_with(name="held-test", pkgs=["bar"])
+            hold_mock.assert_not_called()
+
+            hold_mock.reset_mock()
+            unhold_mock.reset_mock()
+
+            # Remove all holds
+            ret = pkg.unheld("held-test", all=True)
+            assert "bar" in ret["changes"]
+            assert "baz" in ret["changes"]
+            assert len(ret["changes"]) == 2
+            hold_mock.assert_not_called()
+            unhold_mock.assert_any_call(name="held-test", pkgs=["baz"])
+            unhold_mock.assert_any_call(name="held-test", pkgs=["bar"])
-- 
2.32.0


openSUSE Build Service is sponsored by