File TESTING-deb822.patch of Package venv-salt-minion
diff --git a/changelog/64286.fixed.md b/changelog/64286.fixed.md
new file mode 100644
index 00000000000..1ea343c38d4
--- /dev/null
+++ b/changelog/64286.fixed.md
@@ -0,0 +1 @@
+Fix 2 pkgrepo.absent bugs for apt-based distros
diff --git a/changelog/65703.fixed.md b/changelog/65703.fixed.md
new file mode 100644
index 00000000000..091c754114d
--- /dev/null
+++ b/changelog/65703.fixed.md
@@ -0,0 +1 @@
+fix 65703 by using OrderedDict instead of a index that breaks. .
diff --git a/changelog/66056.changed.md b/changelog/66056.changed.md
new file mode 100644
index 00000000000..f088b055cd3
--- /dev/null
+++ b/changelog/66056.changed.md
@@ -0,0 +1 @@
+re-work the aptpkg module to remove system libraries that onedir and virtualenvs do not have access. Streamline testing, and code use to needed libraries only.
diff --git a/changelog/66201.added.md b/changelog/66201.added.md
new file mode 100644
index 00000000000..1a5738512a6
--- /dev/null
+++ b/changelog/66201.added.md
@@ -0,0 +1 @@
+added pkg.which to aptpkg, for finding which package installed a file.
diff --git a/salt/modules/aptpkg.py b/salt/modules/aptpkg.py
index 8244c639e85..6f8bb4bd509 100644
--- a/salt/modules/aptpkg.py
+++ b/salt/modules/aptpkg.py
@@ -6,8 +6,6 @@ Support for APT (Advanced Packaging Tool)
minion, and it is using a different module (or gives an error similar to
*'pkg.install' is not available*), see :ref:`here
<module-provider-override>`.
-
- For repository management, the ``python-apt`` package must be installed.
"""
import copy
@@ -18,7 +16,6 @@ import os
import pathlib
import re
import shutil
-import tempfile
import time
from urllib.error import HTTPError
from urllib.request import Request as _Request
@@ -48,42 +45,17 @@ from salt.exceptions import (
)
from salt.modules.cmdmod import _parse_env
from salt.utils.versions import warn_until_date
+from salt.utils.pkg.deb import (
+ Deb822SourceEntry,
+ Deb822Section,
+ SourceEntry,
+ SourcesList,
+ string_to_bool_int,
+ _invalid,
+)
log = logging.getLogger(__name__)
-# pylint: disable=import-error
-try:
- from aptsources.sourceslist import SourceEntry, SourcesList
-
- HAS_APT = True
-except ImportError:
- HAS_APT = False
-
-HAS_DEB822 = False
-
-if HAS_APT:
- try:
- from aptsources.sourceslist import Deb822SourceEntry, _deb822
-
- HAS_DEB822 = True
- except ImportError:
- pass
-
-try:
- import apt_pkg
-
- HAS_APTPKG = True
-except ImportError:
- HAS_APTPKG = False
-
-try:
- import softwareproperties.ppa
-
- HAS_SOFTWAREPROPERTIES = True
-except ImportError:
- HAS_SOFTWAREPROPERTIES = False
-# pylint: enable=import-error
-
APT_LISTS_PATH = "/var/lib/apt/lists"
PKG_ARCH_SEPARATOR = ":"
@@ -92,7 +64,7 @@ LP_SRC_FORMAT = "deb http://ppa.launchpad.net/{0}/{1}/ubuntu {2} main"
LP_PVT_SRC_FORMAT = "deb https://{0}private-ppa.launchpad.net/{1}/{2}/ubuntu {3} main"
_MODIFY_OK = frozenset(
- ["uri", "comps", "architectures", "disabled", "file", "dist", "signedby"]
+ ["uri", "comps", "architectures", "disabled", "file", "dist", "signedby", "trusted"]
)
DPKG_ENV_VARS = {
"APT_LISTBUGS_FRONTEND": "none",
@@ -131,201 +103,6 @@ def __init__(opts):
os.environ.update(DPKG_ENV_VARS)
-def _invalid(line):
- """
- This is a workaround since python3-apt does not support
- the signed-by argument. This function was removed from
- the class to ensure users using the python3-apt module or
- not can use the signed-by option.
- """
- disabled = False
- invalid = False
- comment = ""
- line = line.strip()
- if not line:
- invalid = True
- return disabled, invalid, comment, ""
-
- if line.startswith("#"):
- disabled = True
- line = line[1:]
-
- idx = line.find("#")
- if idx > 0:
- comment = line[idx + 1 :]
- line = line[:idx]
-
- cdrom_match = re.match(r"(.*)(cdrom:.*/)(.*)", line.strip())
- if cdrom_match:
- repo_line = (
- [p.strip() for p in cdrom_match.group(1).split()]
- + [cdrom_match.group(2).strip()]
- + [p.strip() for p in cdrom_match.group(3).split()]
- )
- else:
- repo_line = line.strip().split()
- if (
- not repo_line
- or repo_line[0] not in ["deb", "deb-src", "rpm", "rpm-src"]
- or len(repo_line) < 3
- ):
- invalid = True
- return disabled, invalid, comment, repo_line
-
- if repo_line[1].startswith("["):
- if not any(x.endswith("]") for x in repo_line[1:]):
- invalid = True
- return disabled, invalid, comment, repo_line
-
- return disabled, invalid, comment, repo_line
-
-
-if not HAS_APT:
-
- class SourceEntry: # pylint: disable=function-redefined
- def __init__(self, line, file=None):
- self.invalid = False
- self.comps = []
- self.disabled = False
- self.comment = ""
- self.dist = ""
- self.type = ""
- self.uri = ""
- self.line = line
- self.architectures = []
- self.signedby = ""
- self.file = file
- if not self.file:
- self.file = str(pathlib.Path(os.sep, "etc", "apt", "sources.list"))
- self._parse_sources(line)
-
- def str(self):
- return self.repo_line()
-
- def repo_line(self):
- """
- Return the repo line for the sources file
- """
- repo_line = []
- if self.invalid:
- return self.line
- if self.disabled:
- repo_line.append("#")
-
- repo_line.append(self.type)
- opts = []
- if self.architectures:
- opts.append("arch={}".format(",".join(self.architectures)))
- if self.signedby:
- opts.append("signed-by={}".format(self.signedby))
-
- if opts:
- repo_line.append("[{}]".format(" ".join(opts)))
-
- repo_line = repo_line + [self.uri, self.dist, " ".join(self.comps)]
- if self.comment:
- repo_line.append("#{}".format(self.comment))
- return " ".join(repo_line) + "\n"
-
- def _parse_sources(self, line):
- """
- Parse lines from sources files
- """
- self.disabled, self.invalid, self.comment, repo_line = _invalid(line)
- if self.invalid:
- return False
- if repo_line[1].startswith("["):
- repo_line = [x for x in (line.strip("[]") for line in repo_line) if x]
- opts = _get_opts(self.line)
- self.architectures.extend(opts["arch"]["value"])
- self.signedby = opts["signedby"]["value"]
- for opt in opts.keys():
- opt = opts[opt]["full"]
- if opt:
- try:
- repo_line.pop(repo_line.index(opt))
- except ValueError:
- repo_line.pop(repo_line.index("[" + opt + "]"))
- self.type = repo_line[0]
- self.uri = repo_line[1]
- self.dist = repo_line[2]
- self.comps = repo_line[3:]
- return True
-
- class SourcesList: # pylint: disable=function-redefined
- def __init__(self):
- self.list = []
- self.files = [
- pathlib.Path(os.sep, "etc", "apt", "sources.list"),
- pathlib.Path(os.sep, "etc", "apt", "sources.list.d"),
- ]
- for file in self.files:
- if file.is_dir():
- for fp in file.glob("**/*.list"):
- self.add_file(file=fp)
- else:
- self.add_file(file)
-
- def __iter__(self):
- yield from self.list
-
- def add_file(self, file):
- """
- Add the lines of a file to self.list
- """
- if file.is_file():
- with salt.utils.files.fopen(str(file)) as source:
- for line in source:
- self.list.append(SourceEntry(line, file=str(file)))
- else:
- log.debug("The apt sources file %s does not exist", file)
-
- def add(self, type, uri, dist, orig_comps, architectures, signedby):
- opts_count = []
- opts_line = ""
- if architectures:
- architectures = "arch={}".format(",".join(architectures))
- opts_count.append(architectures)
- if signedby:
- signedby = "signed-by={}".format(signedby)
- opts_count.append(signedby)
- if len(opts_count) > 1:
- opts_line = "[" + " ".join(opts_count) + "]"
- elif len(opts_count) == 1:
- opts_line = "[" + "".join(opts_count) + "]"
- repo_line = [
- type,
- opts_line,
- uri,
- dist,
- " ".join(orig_comps),
- ]
- return SourceEntry(" ".join([line for line in repo_line if line.strip()]))
-
- def remove(self, source):
- """
- remove a source from the list of sources
- """
- self.list.remove(source)
-
- def save(self):
- """
- write all of the sources from the list of sources
- to the file.
- """
- filemap = {}
- with tempfile.TemporaryDirectory() as tmpdir:
- for source in self.list:
- fname = pathlib.Path(tmpdir, pathlib.Path(source.file).name)
- with salt.utils.files.fopen(str(fname), "a") as fp:
- fp.write(source.repo_line())
- if source.file not in filemap:
- filemap[source.file] = {"tmp": fname}
-
- for fp in filemap:
- shutil.move(str(filemap[fp]["tmp"]), fp)
-
-
def _get_ppa_info_from_launchpad(owner_name, ppa_name):
"""
Idea from softwareproperties.ppa.
@@ -338,21 +115,12 @@ def _get_ppa_info_from_launchpad(owner_name, ppa_name):
:return:
"""
- lp_url = "https://launchpad.net/api/1.0/~{}/+archive/{}".format(
- owner_name, ppa_name
- )
+ lp_url = f"https://launchpad.net/api/1.0/~{owner_name}/+archive/{ppa_name}"
request = _Request(lp_url, headers={"Accept": "application/json"})
lp_page = _urlopen(request)
return salt.utils.json.load(lp_page)
-def _reconstruct_ppa_name(owner_name, ppa_name):
- """
- Stringify PPA name from args.
- """
- return "ppa:{}/{}".format(owner_name, ppa_name)
-
-
def _call_apt(args, scope=True, **kwargs):
"""
Call apt* utilities.
@@ -383,18 +151,6 @@ def _call_apt(args, scope=True, **kwargs):
return cmd_ret
-def _warn_software_properties(repo):
- """
- Warn of missing python-software-properties package.
- """
- log.warning(
- "The 'python-software-properties' package is not installed. "
- "For more accurate support of PPA repositories, you should "
- "install this package."
- )
- log.warning("Best guess at ppa format: %s", repo)
-
-
def normalize_name(name):
"""
Strips the architecture from the specified package name, if necessary.
@@ -596,7 +352,14 @@ def refresh_db(cache_valid_time=0, failhard=False, **kwargs):
except OSError as exp:
log.warning("could not stat cache directory due to: %s", exp)
- call = _call_apt(["apt-get", "-q", "update"], scope=False)
+ call = _call_apt(
+ ["apt-get", "-q", "update"],
+ scope=False,
+ timeout=kwargs.get("timeout", __opts__.get("aptpkg_refresh_db_timeout", 30)),
+ )
+ if "Timed out" in call["stdout"]:
+ _call_apt(["apt-get", "-q", "clean"], scope=False)
+ call = _call_apt(["apt-get", "-q", "update"], scope=False)
if call["retcode"] != 0:
comment = ""
if "stderr" in call:
@@ -624,9 +387,7 @@ def refresh_db(cache_valid_time=0, failhard=False, **kwargs):
error_repos.append(ident)
if failhard and error_repos:
- raise CommandExecutionError(
- "Error getting repos: {}".format(", ".join(error_repos))
- )
+ raise CommandExecutionError(f"Error getting repos: {', '.join(error_repos)}")
return ret
@@ -852,22 +613,23 @@ def install(
)
else:
pkg_params_items = []
- for pkg_source in pkg_params:
- if "lowpkg.bin_pkg_info" in __salt__:
+ # we don't need to do the test below for every package in the list.
+ # it either exists or doesn't. test once then loop.
+ if "lowpkg.bin_pkg_info" in __salt__:
+ for pkg_source in pkg_params:
deb_info = __salt__["lowpkg.bin_pkg_info"](pkg_source)
- else:
- deb_info = None
- if deb_info is None:
+ pkg_params_items.append(
+ [deb_info["name"], pkg_source, deb_info["version"]]
+ )
+ else:
+ for pkg_source in pkg_params:
log.error(
"pkg.install: Unable to get deb information for %s. "
"Version comparisons will be unavailable.",
pkg_source,
)
pkg_params_items.append([pkg_source])
- else:
- pkg_params_items.append(
- [deb_info["name"], pkg_source, deb_info["version"]]
- )
+
# Build command prefix
cmd_prefix.extend(["apt-get", "-q", "-y"])
if kwargs.get("force_yes", False):
@@ -875,8 +637,14 @@ def install(
if "force_conf_new" in kwargs and kwargs["force_conf_new"]:
cmd_prefix.extend(["-o", "DPkg::Options::=--force-confnew"])
else:
- cmd_prefix.extend(["-o", "DPkg::Options::=--force-confold"])
- cmd_prefix += ["-o", "DPkg::Options::=--force-confdef"]
+ cmd_prefix.extend(
+ [
+ "-o",
+ "DPkg::Options::=--force-confold",
+ "-o",
+ "DPkg::Options::=--force-confdef",
+ ]
+ )
if "install_recommends" in kwargs:
if not kwargs["install_recommends"]:
cmd_prefix.append("--no-install-recommends")
@@ -931,12 +699,8 @@ def install(
)
if target is None:
errors.append(
- "No version matching '{}{}' could be found "
- "(available: {})".format(
- pkgname,
- version_num,
- ", ".join(candidates) if candidates else None,
- )
+ f"No version matching '{pkgname}{version_num}' could be found "
+ f"(available: {', '.join(candidates) if candidates else None})"
)
continue
else:
@@ -1401,9 +1165,7 @@ def hold(name=None, pkgs=None, sources=None, **kwargs): # pylint: disable=W0613
ret[target]["comment"] = "Package {} is now being held.".format(target)
else:
ret[target].update(result=True)
- ret[target]["comment"] = "Package {} is already set to be held.".format(
- target
- )
+ ret[target]["comment"] = f"Package {target} is already set to be held."
return ret
@@ -1459,20 +1221,14 @@ def unhold(name=None, pkgs=None, sources=None, **kwargs): # pylint: disable=W06
elif salt.utils.data.is_true(state.get("hold", False)):
if "test" in __opts__ and __opts__["test"]:
ret[target].update(result=None)
- ret[target]["comment"] = "Package {} is set not to be held.".format(
- target
- )
+ ret[target]["comment"] = f"Package {target} is set not to be held."
else:
result = set_selections(selection={"install": [target]})
ret[target].update(changes=result[target], result=True)
- ret[target]["comment"] = "Package {} is no longer being held.".format(
- target
- )
+ ret[target]["comment"] = f"Package {target} is no longer being held."
else:
ret[target].update(result=True)
- ret[target]["comment"] = "Package {} is already set not to be held.".format(
- target
- )
+ ret[target]["comment"] = f"Package {target} is already set not to be held."
return ret
@@ -1592,7 +1348,7 @@ def _get_upgradable(dist_upgrade=True, **kwargs):
else:
cmd.append("upgrade")
try:
- cmd.extend(["-o", "APT::Default-Release={}".format(kwargs["fromrepo"])])
+ cmd.extend(["-o", f"APT::Default-Release={kwargs['fromrepo']}"])
except KeyError:
pass
@@ -1692,23 +1448,6 @@ def version_cmp(pkg1, pkg2, ignore_epoch=False, **kwargs):
# if we have apt_pkg, this will be quickier this way
# and also do not rely on shell.
- if HAS_APTPKG:
- try:
- # the apt_pkg module needs to be manually initialized
- apt_pkg.init_system()
-
- # if there is a difference in versions, apt_pkg.version_compare will
- # return an int representing the difference in minor versions, or
- # 1/-1 if the difference is smaller than minor versions. normalize
- # to -1, 0 or 1.
- try:
- ret = apt_pkg.version_compare(pkg1, pkg2)
- except TypeError:
- ret = apt_pkg.version_compare(str(pkg1), str(pkg2))
- return 1 if ret > 0 else -1 if ret < 0 else 0
- except Exception: # pylint: disable=broad-except
- # Try to use shell version in case of errors w/python bindings
- pass
try:
for oper, ret in (("lt", -1), ("eq", 0), ("gt", 1)):
cmd = ["dpkg", "--compare-versions", pkg1, oper, pkg2]
@@ -1722,56 +1461,22 @@ def version_cmp(pkg1, pkg2, ignore_epoch=False, **kwargs):
return None
-def _get_opts(line):
- """
- Return all opts in [] for a repo line
- """
- get_opts = re.search(r"\[(.*=.*)\]", line)
- ret = {
- "arch": {"full": "", "value": "", "index": 0},
- "signedby": {"full": "", "value": "", "index": 0},
- }
-
- if not get_opts:
- return ret
- opts = get_opts.group(0).strip("[]")
- architectures = []
- for idx, opt in enumerate(opts.split()):
- if opt.startswith("arch"):
- architectures.extend(opt.split("=", 1)[1].split(","))
- ret["arch"]["full"] = opt
- ret["arch"]["value"] = architectures
- ret["arch"]["index"] = idx
- elif opt.startswith("signed-by"):
- ret["signedby"]["full"] = opt
- ret["signedby"]["value"] = opt.split("=", 1)[1]
- ret["signedby"]["index"] = idx
- else:
- other_opt = opt.split("=", 1)[0]
- ret[other_opt] = {}
- ret[other_opt]["full"] = opt
- ret[other_opt]["value"] = opt.split("=", 1)[1]
- ret[other_opt]["index"] = idx
- return ret
-
-
def _split_repo_str(repo):
"""
- Return APT source entry as a tuple.
+ Return APT source entry as a dictionary
"""
- split = SourceEntry(repo)
- if not HAS_APT:
- signedby = split.signedby
- else:
- signedby = _get_opts(line=repo)["signedby"].get("value", "")
- return (
- split.type,
- split.architectures,
- split.uri,
- split.dist,
- split.comps,
- signedby,
- )
+ entry = SourceEntry(repo)
+ invalid = entry.invalid
+ signedby = entry.signedby
+
+ return {
+ "type": entry.type,
+ "architectures": entry.architectures,
+ "uri": entry.uri,
+ "dist": entry.dist,
+ "comps": entry.comps,
+ "signedby": signedby,
+ }
def _consolidate_repo_sources(sources):
@@ -1917,52 +1622,52 @@ def list_repos(**kwargs):
salt '*' pkg.list_repos disabled=True
"""
repos = {}
- if HAS_DEB822:
- sources = SourcesList(deb822=True)
- else:
- sources = SourcesList()
+ sources = SourcesList()
for source in sources:
if _skip_source(source):
continue
- if not HAS_APT:
- signedby = source.signedby
- else:
- signedby = _get_opts(line=source.line)["signedby"].get("value", "")
+ if isinstance(source, Deb822SourceEntry):
+ # deb822 could contain multiple types and suites
+ # for backward compatibility we need to expand it
+ # to get separate entries for each type and suite
+ for suite in source.suites:
+ for source_type in source.types:
+ repo = {}
+ repo["file"] = source.file
+ repo["comps"] = getattr(source, "comps", [])
+ repo["disabled"] = source.disabled
+ repo["enabled"] = not repo[
+ "disabled"
+ ] # This is for compatibility with the other modules
+ repo["dist"] = suite
+ repo["suites"] = source.suites
+ repo["type"] = source_type
+ repo["uri"] = source.uri
+ compat_source = SourceEntry(
+ f"{source_type} {source.uri} {suite} {' '.join(getattr(source, 'comps', []))}"
+ )
+ for attr in ("disabled", "architectures", "signedby"):
+ setattr(compat_source, attr, getattr(source, attr))
+ repo["line"] = str(compat_source)
+ repo["architectures"] = getattr(source, "architectures", [])
+ repo["signedby"] = source.signedby
+ repos.setdefault(source.uri, []).append(repo)
+ continue
repo = {}
- if HAS_DEB822:
- try:
- signedby = source.section.tags.get("Signed-By", signedby)
- except AttributeError:
- pass
repo["file"] = source.file
- repo_comps = getattr(source, "comps", [])
- repo_dists = source.dist.split(" ")
- repo["comps"] = repo_comps
+ repo["comps"] = getattr(source, "comps", [])
repo["disabled"] = source.disabled
repo["enabled"] = not repo[
"disabled"
] # This is for compatibility with the other modules
- repo["dist"] = repo_dists.pop(0)
+ repo["dist"] = source.dist
+ repo["suites"] = list(source.suites)
repo["type"] = source.type
repo["uri"] = source.uri
- if "Types: " in source.line and "\n" in source.line:
- repo["line"] = (
- f"{source.type} {source.uri} {repo['dist']} {' '.join(repo_comps)}"
- )
- else:
- repo["line"] = source.line.strip()
+ repo["line"] = source.line.strip()
repo["architectures"] = getattr(source, "architectures", [])
- repo["signedby"] = signedby
+ repo["signedby"] = source.signedby
repos.setdefault(source.uri, []).append(repo)
- if len(repo_dists):
- for dist in repo_dists:
- repo_copy = repo.copy()
- repo_copy["dist"] = dist
- if "Types: " in source.line and "\n" in source.line:
- repo_copy["line"] = (
- f"{source.type} {source.uri} {repo_copy['dist']} {' '.join(repo_comps)}"
- )
- repos[source.uri].append(repo_copy)
return repos
@@ -1995,39 +1700,19 @@ def get_repo(repo, **kwargs):
auth_info = "{}@".format(ppa_auth)
repo = LP_PVT_SRC_FORMAT.format(auth_info, owner_name, ppa_name, dist)
else:
- if HAS_SOFTWAREPROPERTIES:
- try:
- if hasattr(softwareproperties.ppa, "PPAShortcutHandler"):
- repo = softwareproperties.ppa.PPAShortcutHandler(repo).expand(
- dist
- )[0]
- else:
- repo = softwareproperties.ppa.expand_ppa_line(repo, dist)[0]
- except NameError as name_error:
- raise CommandExecutionError(
- "Could not find ppa {}: {}".format(repo, name_error)
- )
- else:
- repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist)
+ repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist)
repos = list_repos()
if repos:
try:
- (
- repo_type,
- repo_architectures,
- repo_uri,
- repo_dist,
- repo_comps,
- repo_signedby,
- ) = _split_repo_str(repo)
+ repo_entry = _split_repo_str(repo)
if ppa_auth:
- uri_match = re.search("(http[s]?://)(.+)", repo_uri)
+ uri_match = re.search("(http[s]?://)(.+)", repo_entry["uri"])
if uri_match:
if not uri_match.group(2).startswith(ppa_auth):
- repo_uri = "{}{}@{}".format(
- uri_match.group(1), ppa_auth, uri_match.group(2)
+ repo_entry["uri"] = (
+ f"{uri_match.group(1)}{ppa_auth}@{uri_match.group(2)}"
)
except SyntaxError:
raise CommandExecutionError(
@@ -2037,13 +1722,13 @@ def get_repo(repo, **kwargs):
for source in repos.values():
for sub in source:
if (
- sub["type"] == repo_type
- and sub["uri"] == repo_uri
- and sub["dist"] == repo_dist
+ sub["type"] == repo_entry["type"]
+ and sub["uri"].rstrip("/") == repo_entry["uri"].rstrip("/")
+ and sub["dist"] == repo_entry["dist"]
):
- if not repo_comps:
+ if not repo_entry["comps"]:
return sub
- for comp in repo_comps:
+ for comp in repo_entry["comps"]:
if comp in sub.get("comps", []):
return sub
return {}
@@ -2078,36 +1763,19 @@ def del_repo(repo, **kwargs):
# to derive the name.
is_ppa = True
dist = __grains__["oscodename"]
- if not HAS_SOFTWAREPROPERTIES:
- _warn_software_properties(repo)
- owner_name, ppa_name = repo[4:].split("/")
- if "ppa_auth" in kwargs:
- auth_info = "{}@".format(kwargs["ppa_auth"])
- repo = LP_PVT_SRC_FORMAT.format(auth_info, dist, owner_name, ppa_name)
- else:
- repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist)
+ owner_name, ppa_name = repo[4:].split("/")
+ if "ppa_auth" in kwargs:
+ auth_info = f"{kwargs['ppa_auth']}@"
+ repo = LP_PVT_SRC_FORMAT.format(auth_info, dist, owner_name, ppa_name)
else:
- if hasattr(softwareproperties.ppa, "PPAShortcutHandler"):
- repo = softwareproperties.ppa.PPAShortcutHandler(repo).expand(dist)[0]
- else:
- repo = softwareproperties.ppa.expand_ppa_line(repo, dist)[0]
+ repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist)
- if HAS_DEB822:
- sources = SourcesList(deb822=True)
- else:
- sources = SourcesList()
+ sources = SourcesList()
repos = [s for s in sources.list if not s.invalid]
if repos:
deleted_from = dict()
try:
- (
- repo_type,
- repo_architectures,
- repo_uri,
- repo_dist,
- repo_comps,
- repo_signedby,
- ) = _split_repo_str(repo)
+ repo_entry = _split_repo_str(repo)
except SyntaxError:
raise SaltInvocationError(
"Error: repo '{}' not a well formatted definition".format(repo)
@@ -2115,17 +1783,26 @@ def del_repo(repo, **kwargs):
for source in repos:
if (
- source.type == repo_type
- and source.architectures == repo_architectures
- and source.uri == repo_uri
- and repo_dist in source.dist
+ repo_entry["type"] in source.type.split()
+ and source.architectures == repo_entry["architectures"]
+ and source.uri.rstrip("/") == repo_entry["uri"].rstrip("/")
+ and repo_entry["dist"] in source.suites
):
-
s_comps = set(source.comps)
- r_comps = set(repo_comps)
- if s_comps.intersection(r_comps) or (
- s_comps == set() and r_comps == set()
- ):
+ r_comps = set(repo_entry["comps"])
+ if s_comps == r_comps:
+ r_suites = list(source.suites)
+ r_suites.remove(repo_entry["dist"])
+ source.suites = r_suites
+ deleted_from[source.file] = 0
+ if not source.suites:
+ try:
+ sources.remove(source)
+ except ValueError:
+ pass
+ sources.save()
+ continue
+ if s_comps.intersection(r_comps) or (not s_comps and not r_comps):
deleted_from[source.file] = 0
source.comps = list(s_comps.difference(r_comps))
if not source.comps:
@@ -2138,15 +1815,27 @@ def del_repo(repo, **kwargs):
# measure
if (
is_ppa
- and repo_type == "deb"
+ and repo_entry["type"] == "deb"
and source.type == "deb-src"
- and source.uri == repo_uri
- and source.dist == repo_dist
+ and source.uri == repo_entry["uri"]
+ and repo_entry["dist"] in source.suites
):
s_comps = set(source.comps)
- r_comps = set(repo_comps)
- if s_comps.intersection(r_comps):
+ r_comps = set(repo_entry["comps"])
+ if s_comps == r_comps:
+ r_suites = list(source.suites)
+ r_suites.remove(repo_entry["dist"])
+ source.suites = r_suites
+ deleted_from[source.file] = 0
+ if not source.suites:
+ try:
+ sources.remove(source)
+ except ValueError:
+ pass
+ sources.save()
+ continue
+ if s_comps.intersection(r_comps) or (not s_comps and not r_comps):
deleted_from[source.file] = 0
source.comps = list(s_comps.difference(r_comps))
if not source.comps:
@@ -2158,6 +1847,8 @@ def del_repo(repo, **kwargs):
if deleted_from:
ret = ""
for source in sources:
+ if source.invalid:
+ continue
if source.file in deleted_from:
deleted_from[source.file] += 1
for repo_file, count in deleted_from.items():
@@ -2445,9 +2136,7 @@ def add_repo_key(
kwargs.update({"stdin": text})
elif keyserver:
if not keyid:
- error_msg = "No keyid or keyid too short for keyserver: {}".format(
- keyserver
- )
+ error_msg = f"No keyid or keyid too short for keyserver: {keyserver}"
raise SaltInvocationError(error_msg)
if not aptkey:
@@ -2698,30 +2387,20 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
out = _call_apt(cmd, env=env, scope=False, **kwargs)
if out["retcode"]:
raise CommandExecutionError(
- "Unable to add PPA '{}'. '{}' exited with "
- "status {!s}: '{}' ".format(
- repo[4:], cmd, out["retcode"], out["stderr"]
- )
+ f"Unable to add PPA '{repo[4:]}'. '{cmd}' exited with status {out['retcode']!s}: '{out['stderr']}'"
)
# explicit refresh when a repo is modified.
if refresh:
refresh_db()
return {repo: out}
else:
- if not HAS_SOFTWAREPROPERTIES:
- _warn_software_properties(repo)
- else:
- log.info("Falling back to urllib method for private PPA")
# fall back to urllib style
try:
owner_name, ppa_name = repo[4:].split("/", 1)
except ValueError:
raise CommandExecutionError(
- "Unable to get PPA info from argument. "
- 'Expected format "<PPA_OWNER>/<PPA_NAME>" '
- "(e.g. saltstack/salt) not found. Received "
- "'{}' instead.".format(repo[4:])
+ f"Unable to get PPA info from argument. Expected format \"<PPA_OWNER>/<PPA_NAME>\" (e.g. saltstack/salt) not found. Received '{repo[4:]}' instead."
)
dist = __grains__["oscodename"]
# ppa has a lot of implicit arguments. Make them explicit.
@@ -2729,8 +2408,10 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
kwargs["dist"] = dist
ppa_auth = ""
if "file" not in kwargs:
- filename = "/etc/apt/sources.list.d/{0}-{1}-{2}.list"
- kwargs["file"] = filename.format(owner_name, ppa_name, dist)
+ filename = (
+ f"/etc/apt/sources.list.d/{owner_name}-{ppa_name}-{dist}.list"
+ )
+ kwargs["file"] = filename
try:
launchpad_ppa_info = _get_ppa_info_from_launchpad(
owner_name, ppa_name
@@ -2739,23 +2420,16 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
kwargs["keyid"] = launchpad_ppa_info["signing_key_fingerprint"]
else:
if "keyid" not in kwargs:
- error_str = (
- "Private PPAs require a keyid to be specified: {0}/{1}"
- )
raise CommandExecutionError(
- error_str.format(owner_name, ppa_name)
+ f"Private PPAs require a keyid to be specified: {owner_name}/{ppa_name}"
)
except HTTPError as exc:
raise CommandExecutionError(
- "Launchpad does not know about {}/{}: {}".format(
- owner_name, ppa_name, exc
- )
+ f"Launchpad does not know about {owner_name}/{ppa_name}: {exc}"
)
except IndexError as exc:
raise CommandExecutionError(
- "Launchpad knows about {}/{} but did not "
- "return a fingerprint. Please set keyid "
- "manually: {}".format(owner_name, ppa_name, exc)
+ f"Launchpad knows about {owner_name}/{ppa_name} but did not return a fingerprint. Please set keyid manually: {exc}"
)
if "keyserver" not in kwargs:
@@ -2763,15 +2437,13 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
if "ppa_auth" in kwargs:
if not launchpad_ppa_info["private"]:
raise CommandExecutionError(
- "PPA is not private but auth credentials passed: {}".format(
- repo
- )
+ f"PPA is not private but auth credentials passed: {repo}"
)
# assign the new repo format to the "repo" variable
# so we can fall through to the "normal" mechanism
# here.
if "ppa_auth" in kwargs:
- ppa_auth = "{}@".format(kwargs["ppa_auth"])
+ ppa_auth = f"{kwargs['ppa_auth']}@"
repo = LP_PVT_SRC_FORMAT.format(
ppa_auth, owner_name, ppa_name, dist
)
@@ -2782,10 +2454,7 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
'cannot parse "ppa:" style repo definitions: {}'.format(repo)
)
- if HAS_DEB822:
- sources = SourcesList(deb822=True)
- else:
- sources = SourcesList()
+ sources = SourcesList()
if kwargs.get("consolidate", False):
# attempt to de-dup and consolidate all sources
# down to entries in sources.list
@@ -2802,40 +2471,36 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
repos = []
for source in sources:
- if HAS_APT and not HAS_DEB822:
- _, invalid, _, _ = _invalid(source.line)
- if not invalid:
- repos.append(source)
+ if isinstance(source, Deb822SourceEntry):
+ if source.types == [""] or not bool(source.types) or not source.type:
+ continue
else:
- if HAS_DEB822 and (
- source.types == [""] or not bool(source.types) or not source.type
- ):
- # most probably invalid or comment line
+ _, invalid, _, _ = _invalid(source.line)
+ if invalid:
continue
- repos.append(source)
+ repos.append(source)
mod_source = None
try:
- (
- repo_type,
- repo_architectures,
- repo_uri,
- repo_dist,
- repo_comps,
- repo_signedby,
- ) = _split_repo_str(repo)
+ repo_entry = _split_repo_str(repo)
except SyntaxError:
raise SyntaxError(
"Error: repo '{}' not a well formatted definition".format(repo)
)
- full_comp_list = {comp.strip() for comp in repo_comps}
+ full_comp_list = [comp.strip() for comp in repo_entry["comps"]]
no_proxy = __salt__["config.option"]("no_proxy")
- kwargs["signedby"] = pathlib.Path(repo_signedby) if repo_signedby else ""
+ kwargs["signedby"] = (
+ pathlib.Path(repo_entry["signedby"]) if repo_entry["signedby"] else ""
+ )
- if not aptkey and not kwargs["signedby"]:
- raise SaltInvocationError("missing 'signedby' option when apt-key is missing")
+ if "trusted" not in kwargs:
+ kwargs["trusted"] = (
+ repo_entry["trusted"]
+ if string_to_bool_int(repo_entry.get("trusted", "no")) == 1
+ else "no"
+ )
if "keyid" in kwargs:
keyid = kwargs.pop("keyid", None)
@@ -2902,9 +2567,7 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
ret = _call_apt(cmd, scope=False, **kwargs)
if ret["retcode"] != 0:
raise CommandExecutionError(
- "Error: key retrieval failed: {}".format(
- ret["stdout"]
- )
+ f"Error: key retrieval failed: {ret['stdout']}"
)
elif "key_url" in kwargs:
@@ -2948,14 +2611,16 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
if "comps" in kwargs:
kwargs["comps"] = [comp.strip() for comp in kwargs["comps"].split(",")]
- full_comp_list |= set(kwargs["comps"])
+ for comp in kwargs["comps"]:
+ if comp not in full_comp_list:
+ full_comp_list.append(comp)
else:
kwargs["comps"] = list(full_comp_list)
if "architectures" in kwargs:
kwargs["architectures"] = kwargs["architectures"].split(",")
else:
- kwargs["architectures"] = repo_architectures
+ kwargs["architectures"] = repo_entry["architectures"]
if "disabled" in kwargs:
kwargs["disabled"] = salt.utils.data.is_true(kwargs["disabled"])
@@ -2970,13 +2635,12 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
# and the resulting source line. The idea here is to ensure
# we are not returning bogus data because the source line
# has already been modified on a previous run.
- apt_source_dists = apt_source.dist.split(" ")
repo_matches = (
- apt_source.type == repo_type
- and apt_source.uri.rstrip("/") == repo_uri.rstrip("/")
- and repo_dist in apt_source_dists
+ repo_entry["type"] in apt_source.type.split()
+ and apt_source.uri.rstrip("/") == repo_entry["uri"].rstrip("/")
+ and repo_entry["dist"] in apt_source.suites
)
- kw_matches = apt_source.dist == kw_dist and apt_source.type == kw_type
+ kw_matches = kw_dist in apt_source.suites and kw_type in apt_source.type.split()
if repo_matches or kw_matches:
for comp in full_comp_list:
@@ -2993,16 +2657,26 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
kwargs["comments"] = salt.utils.pkg.deb.combine_comments(kwargs["comments"])
if not mod_source:
+ if not aptkey and not (
+ kwargs["signedby"] or string_to_bool_int(kwargs.get("trusted", "no")) == 1
+ ):
+ raise SaltInvocationError(
+ "missing 'signedby' or 'trusted' option when apt-key is missing"
+ )
+
apt_source_file = kwargs.get("file")
if not apt_source_file:
- raise SaltInvocationError("missing 'file' argument when defining a new repository")
- if HAS_DEB822 and not apt_source_file.endswith(".list"):
- section = _deb822.Section("")
- section["Types"] = repo_type
- section["URIs"] = repo_uri
- section["Suites"] = repo_dist
- section["Components"] = " ".join(repo_comps)
- if kwargs.get("trusted") == True or kwargs.get("Trusted") == True:
+ raise SaltInvocationError(
+ "missing 'file' argument when defining a new repository"
+ )
+
+ if not apt_source_file.endswith(".list"):
+ section = Deb822Section("")
+ section["Types"] = repo_entry["type"]
+ section["URIs"] = repo_entry["uri"]
+ section["Suites"] = repo_entry["dist"]
+ section["Components"] = " ".join(repo_entry["comps"])
+ if kwargs.get("trusted") is True or kwargs.get("Trusted") is True:
section["Trusted"] = "yes"
mod_source = Deb822SourceEntry(section, apt_source_file)
else:
@@ -3013,48 +2687,38 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs):
elif "comments" in kwargs:
mod_source.comment = kwargs["comments"]
- if HAS_APT:
- # workaround until python3-apt supports signedby
- if str(mod_source) != str(SourceEntry(repo)) and "signed-by" in str(mod_source):
- rline = SourceEntry(repo)
- mod_source.line = rline.line
-
if not mod_source.line.endswith("\n"):
mod_source.line = mod_source.line + "\n"
+ if not kwargs["architectures"] and not mod_source.architectures:
+ kwargs.pop("architectures")
+
for key in kwargs:
if key in _MODIFY_OK and hasattr(mod_source, key):
setattr(mod_source, key, kwargs[key])
- if mod_source.uri != repo_uri:
- mod_source.uri = repo_uri
- if not HAS_DEB822:
- mod_source.line = mod_source.str()
+ if mod_source.uri != repo_entry["uri"]:
+ mod_source.uri = repo_entry["uri"]
+ mod_source.line = str(mod_source)
sources.save()
# on changes, explicitly refresh
if refresh:
refresh_db()
- if not HAS_APT:
- signedby = mod_source.signedby
- else:
- signedby = _get_opts(repo)["signedby"].get("value", "")
-
- repo_source_line = mod_source.line
- if "Types: " in repo_source_line and "\n" in repo_source_line:
- repo_source_line = f"{mod_source.type} {mod_source.uri} {repo_dist} {' '.join(mod_source.comps)}"
+ signedby = mod_source.signedby
return {
repo: {
"architectures": getattr(mod_source, "architectures", []),
"dist": mod_source.dist,
+ "suites": mod_source.suites,
"comps": mod_source.comps,
"disabled": mod_source.disabled,
"file": mod_source.file,
"type": mod_source.type,
"uri": mod_source.uri,
- "line": repo_source_line,
+ "line": mod_source.line,
"signedby": signedby,
}
}
@@ -3115,19 +2779,12 @@ def _expand_repo_def(os_name, os_codename=None, **kwargs):
auth_info = "{}@".format(kwargs["ppa_auth"])
repo = LP_PVT_SRC_FORMAT.format(auth_info, owner_name, ppa_name, dist)
else:
- if HAS_SOFTWAREPROPERTIES:
- if hasattr(softwareproperties.ppa, "PPAShortcutHandler"):
- repo = softwareproperties.ppa.PPAShortcutHandler(repo).expand(dist)[
- 0
- ]
- else:
- repo = softwareproperties.ppa.expand_ppa_line(repo, dist)[0]
- else:
- repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist)
+ repo = LP_SRC_FORMAT.format(owner_name, ppa_name, dist)
if "file" not in kwargs:
- filename = "/etc/apt/sources.list.d/{0}-{1}-{2}.list"
- kwargs["file"] = filename.format(owner_name, ppa_name, dist)
+ kwargs["file"] = (
+ f"/etc/apt/sources.list.d/{owner_name}-{ppa_name}-{dist}.list"
+ )
source_entry = SourceEntry(repo)
for list_args in ("architectures", "comps"):
@@ -3139,16 +2796,10 @@ def _expand_repo_def(os_name, os_codename=None, **kwargs):
if kwarg in kwargs:
setattr(source_entry, kwarg, kwargs[kwarg])
- if HAS_DEB822:
- source_list = SourcesList(deb822=True)
- else:
- source_list = SourcesList()
+ source_list = SourcesList()
kwargs = {}
- if not HAS_APT:
- signedby = source_entry.signedby
- kwargs["signedby"] = signedby
- else:
- signedby = _get_opts(repo)["signedby"].get("value", "")
+ signedby = source_entry.signedby
+ kwargs["signedby"] = signedby
_source_entry = source_list.add(
type=source_entry.type,
@@ -3173,28 +2824,6 @@ def _expand_repo_def(os_name, os_codename=None, **kwargs):
sanitized["line"] = getattr(_source_entry, "line", "").strip()
sanitized["architectures"] = getattr(_source_entry, "architectures", [])
sanitized["signedby"] = signedby
- if HAS_APT and signedby:
- # python3-apt does not supported the signed-by opt currently.
- # creating the line with all opts including signed-by
- if signedby not in sanitized["line"]:
- line = sanitized["line"].split()
- repo_opts = _get_opts(repo)
- opts_order = [
- opt_type
- for opt_type, opt_def in repo_opts.items()
- if opt_def["full"] != ""
- ]
- for opt in repo_opts:
- if "index" in repo_opts[opt]:
- idx = repo_opts[opt]["index"]
- opts_order[idx] = repo_opts[opt]["full"]
-
- opts = "[" + " ".join(opts_order) + "]"
- if line[1].startswith("["):
- line[1] = opts
- else:
- line.insert(1, opts)
- sanitized["line"] = " ".join(line)
return sanitized
@@ -3363,9 +2992,7 @@ def set_selections(path=None, selection=None, clear=False, saltenv="base"):
valid_states = ("install", "hold", "deinstall", "purge")
bad_states = [x for x in selection if x not in valid_states]
if bad_states:
- raise SaltInvocationError(
- "Invalid state(s): {}".format(", ".join(bad_states))
- )
+ raise SaltInvocationError(f"Invalid state(s): {', '.join(bad_states)}")
if clear:
cmd = ["dpkg", "--clear-selections"]
@@ -3688,3 +3315,31 @@ def services_need_restart(**kwargs):
services.add(service)
return list(services)
+
+
+def which(path):
+ """
+ Displays which package installed a specific file
+
+ CLI Examples:
+
+ .. code-block:: bash
+
+ salt * pkg.which <file name>
+ """
+ filepath = pathlib.Path(path)
+ cmd = ["dpkg"]
+ if filepath.is_absolute():
+ if filepath.exists():
+ cmd.extend(["-S", str(filepath)])
+ else:
+ log.debug("%s does not exist", filepath)
+ return False
+ else:
+ log.debug("%s is not absolute path", filepath)
+ return False
+ cmd_ret = _call_apt(cmd)
+ if "no path found matching pattern" in cmd_ret["stdout"]:
+ return None
+ pkg = cmd_ret["stdout"].split(":")[0]
+ return pkg
diff --git a/salt/states/pkgrepo.py b/salt/states/pkgrepo.py
index 4ef5fd9c2fa..b17c1bea397 100644
--- a/salt/states/pkgrepo.py
+++ b/salt/states/pkgrepo.py
@@ -624,6 +624,12 @@ def absent(name, **kwargs):
The name of the package repo, as it would be referred to when running
the regular package manager commands.
+ .. note::
+ On apt-based systems this must be the complete source entry. For
+ example, if you include ``[arch=amd64]``, and a repo matching the
+ specified URI, dist, etc. exists _without_ an architecture, then no
+ changes will be made and the state will report a ``True`` result.
+
**FEDORA/REDHAT-SPECIFIC OPTIONS**
copr
@@ -706,6 +712,23 @@ def absent(name, **kwargs):
ret["comment"] = f"Failed to configure repo '{name}': {exc}"
return ret
+ if repo and (
+ __grains__["os_family"].lower() == "debian"
+ or __opts__.get("providers", {}).get("pkg") == "aptpkg"
+ ):
+ # On Debian/Ubuntu, pkg.get_repo will return a match for the repo
+ # even if the architectures do not match. However, changing get_repo
+ # breaks idempotency for pkgrepo.managed states. So, compare the
+ # architectures of the matched repo to the architectures specified in
+ # the repo string passed to this state. If the architectures do not
+ # match, then invalidate the match by setting repo to an empty dict.
+ import salt.modules.aptpkg
+
+ if set(salt.modules.aptpkg._split_repo_str(stripname)["architectures"]) != set(
+ repo["architectures"]
+ ):
+ repo = {}
+
if not repo:
ret["comment"] = f"Package repo {name} is absent"
ret["result"] = True
diff --git a/salt/utils/pkg/deb.py b/salt/utils/pkg/deb.py
index c8bfa8ae20e..f35fed3d1d6 100644
--- a/salt/utils/pkg/deb.py
+++ b/salt/utils/pkg/deb.py
@@ -2,6 +2,818 @@
Common functions for working with deb packages
"""
+import logging
+import os
+import re
+from collections import OrderedDict
+
+import salt.utils.files
+
+log = logging.getLogger(__name__)
+
+
+_APT_SOURCES_LIST = "/etc/apt/sources.list"
+_APT_SOURCES_PARTSDIR = "/etc/apt/sources.list.d/"
+
+
+def string_to_bool_int(s):
+ """
+ Convert string representation of bool values to integer
+ """
+ if isinstance(s, bool):
+ s = "yes" if s else "no"
+ s = s.lower()
+ if s in ("no", "false", "without", "off", "disable"):
+ return 0
+ elif s in ("yes", "true", "with", "on", "enable"):
+ return 1
+ return -1
+
+
+class Deb822Section:
+ """
+ A deb822 section representation of single entry,
+ which could contain comments.
+ """
+
+ def __init__(self, section):
+ """
+ Init new deb822 section object
+ """
+ if isinstance(section, Deb822Section):
+ self.tags = OrderedDict(section.tags)
+ self.header = section.header
+ self.footer = section.footer
+ else:
+ self.tags, self.header, self.footer = self._parse_section_string(section)
+ self._tag_map = {k.lower(): k for k in self.tags}
+
+ @staticmethod
+ def _parse_section_string(section_string):
+ """
+ Parse section string to comments and tags
+ """
+ header = []
+ footer = []
+ raw_data = []
+ tag_re = re.compile(r"\A(\S+):\s*(\S.*|)")
+ tags = OrderedDict()
+
+ for line in section_string.splitlines():
+ if line.startswith("#"):
+ if raw_data:
+ footer.append(line)
+ else:
+ header.append(line)
+ else:
+ raw_data.append(line)
+
+ tag = None
+ value = None
+ for line in raw_data:
+ match = tag_re.match(line)
+ if match:
+ if tag is not None:
+ # Store previous found tag,
+ # as the values could contain multiple lines
+ tags[tag] = value.strip()
+ tag = match.group(1)
+ value = match.group(2)
+ elif line == "" and tag is not None:
+ tags[tag] = value.strip()
+ else:
+ value = f"{value}\n{line}"
+ if tag is not None:
+ tags[tag] = value.strip()
+
+ return tags, header, footer
+
+ def __getitem__(self, key):
+ """
+ Get the value of a tag
+ """
+ return self.tags[self._tag_map.get(key.lower(), key)]
+
+ def __delitem__(self, key):
+ """
+ Delete the tag
+ """
+ _lc_key = key.lower()
+ del self.tags[self._tag_map.get(_lc_key, key)]
+ del self._tag_map[_lc_key]
+
+ def __setitem__(self, key, val):
+ """
+ Set the value of the tag
+ """
+ _lc_key = key.lower()
+ if _lc_key not in self._tag_map:
+ self._tag_map[_lc_key] = key
+ self.tags[key] = val
+
+ def __bool__(self):
+ """
+ Represent as True if the section has any tag
+ """
+ return bool(self.tags)
+
+ def get(self, key, default=None):
+ """
+ Get the value of a tag or return default
+ """
+ try:
+ return self[key]
+ except KeyError:
+ return default
+
+ def __str__(self):
+ """
+ Return the string representation of the section
+ """
+ return (
+ "\n".join(self.header)
+ + ("\n" if self.header else "")
+ + "".join(f"{k}: {v}\n" for k, v in self.tags.items())
+ + "\n".join(self.footer)
+ + ("\n" if self.footer else "")
+ )
+
+
+class Deb822SourceEntry:
+ """
+ Source entry in deb822 format
+ """
+
+ _properties = {
+ "architectures": {"key": "Architectures", "multi": True},
+ "types": {"key": "Types", "multi": True},
+ "type": {"key": "Types", "multi": False, "deprecated": True},
+ "uris": {"key": "URIs", "multi": True},
+ "uri": {"key": "URIs", "multi": False, "deprecated": True},
+ "suites": {"key": "Suites", "multi": True},
+ "dist": {"key": "Suites", "multi": False, "deprecated": True},
+ "comps": {"key": "Components", "multi": True},
+ "signedby": {"key": "Signed-By", "multi": False},
+ }
+
+ def __init__(
+ self,
+ section,
+ file,
+ list=None,
+ ):
+ if section is None:
+ self.section = Deb822Section("")
+ elif isinstance(section, str):
+ self.section = Deb822Section(section)
+ else:
+ self.section = section
+
+ self._line = str(self.section)
+ self.file = file
+
+ def __getattr__(self, name):
+ """
+ Get the values to the section for specified keys
+ """
+ if name in self._properties:
+ if self._properties[name]["multi"]:
+ return SourceEntry.split_source_line(
+ self.section.get(self._properties[name]["key"], "")
+ )
+ else:
+ return self.section.get(self._properties[name]["key"], None)
+
+ def __setattr__(self, name, value):
+ """
+ Pass the values to the section for specified keys
+ """
+ if name not in self._properties:
+ return super().__setattr__(name, value)
+
+ key = self._properties[name]["key"]
+ if value is None:
+ del self.section[key]
+ else:
+ if key == "signedby":
+ value = str(value)
+ self.section[key] = (
+ " ".join(value) if self._properties[name]["multi"] else value
+ )
+
+ def __eq__(self, other):
+ """
+ Equal operator for deb822 source entries
+ """
+ return (
+ self.disabled == other.disabled
+ and self.type == other.type
+ and self.uri
+ and self.uri.rstrip("/") == other.uri.rstrip("/")
+ and self.dist == other.dist
+ and self.comps == other.comps
+ )
+
+ @property
+ def comment(self):
+ """
+ Returns the header of the section
+ """
+ return "\n".join(self.section.header)
+
+ @comment.setter
+ def comment(self, comment):
+ """
+ Sets the header of the section
+ """
+ comments = comment.splitlines()
+ if not all(x.startswith("#") for x in comments):
+ comments = [f"#{x}" for x in comments]
+ self.section.header = comments
+
+ @property
+ def trusted(self):
+ """
+ Return the value of the Trusted field
+ """
+ try:
+ return string_to_bool_int(self.section["Trusted"])
+ except KeyError:
+ return None
+
+ @trusted.setter
+ def trusted(self, value):
+ if value is None:
+ try:
+ del self.section["Trusted"]
+ except KeyError:
+ pass
+ else:
+ self.section["Trusted"] = "yes" if value else "no"
+
+ @property
+ def disabled(self):
+ """
+ Return True if the source is enabled
+ """
+ return not string_to_bool_int(self.section.get("Enabled", "yes"))
+
+ @disabled.setter
+ def disabled(self, value):
+ if value:
+ self.section["Enabled"] = "no"
+ else:
+ try:
+ del self.section["Enabled"]
+ except KeyError:
+ pass
+
+ @property
+ def invalid(self):
+ """
+ Return True if the source doesn't have proper attributes
+ """
+ return not self.section
+
+ @property
+ def line(self):
+ """
+ Return the original string representation of the source entry
+ """
+ return self._line
+
+ def __str__(self):
+ """
+ Return the string representation of the entry
+ """
+ return str(self.section).strip()
+
+ def set_enabled(self, enabled):
+ """
+ Opposite to .disabled
+ """
+ self.disabled = not enabled
+
+
+class SourceEntry:
+ """
+ Distinct sources.list entry
+ """
+
+ def __init__(self, line, file=None):
+ self.invalid = False
+ self.disabled = False # identified as disabled if commented
+ self.type = "" # type of the source (deb, deb-src)
+ self.architectures = []
+ self._signedby = ""
+ self.trusted = None
+ self.uri = ""
+ self.dist = "" # distribution name
+ self.comps = [] # list of available componetns (or empty)
+ self.comment = "" # comment (optional)
+ self.line = line # the original sources.list entry
+ if file is None:
+ file = _APT_SOURCES_LIST
+ if file.endswith(".sources"):
+ raise ValueError("Classic SourceEntry cannot be written to .sources file")
+ self.file = file # the file that the entry is located in
+ self.parse(line)
+ self.children = []
+
+ def __eq__(self, other):
+ """
+ Equal operator for two classic sources.list entries
+ """
+ return (
+ self.disabled == other.disabled
+ and self.type == other.type
+ and self.uri.rstrip("/") == other.uri.rstrip("/")
+ and self.dist == other.dist
+ and self.comps == other.comps
+ )
+
+ @staticmethod
+ def split_source_line(line):
+ """
+ Splits the entries of sources.list format
+ """
+ line = line.strip()
+ pieces = []
+ tmp = ""
+ # we are inside a [..] block
+ p_found = False
+ space_found = False
+ for c in line:
+ if c == "[":
+ if space_found:
+ space_found = False
+ p_found = True
+ pieces.append(tmp)
+ tmp = c
+ else:
+ p_found = True
+ tmp += c
+ elif c == "]":
+ p_found = False
+ tmp += c
+ elif space_found and not c.isspace():
+ # we skip one or more space
+ space_found = False
+ pieces.append(tmp)
+ tmp = c
+ elif c.isspace() and not p_found:
+ # found a whitespace
+ space_found = True
+ else:
+ tmp += c
+ # append last piece
+ if len(tmp) > 0:
+ pieces.append(tmp)
+ return pieces
+
+ def parse(self, line):
+ """
+ Parse lines from sources files
+ """
+ self.disabled, self.invalid, self.comment, repo_line = _invalid(line)
+ if self.invalid:
+ return False
+ if repo_line[1].startswith("["):
+ repo_line = [x for x in (line.strip("[]") for line in repo_line) if x]
+ opts = _get_opts(self.line)
+ if "arch" in opts:
+ self.architectures.extend(opts["arch"]["value"])
+ if "signedby" in opts:
+ self.signedby = opts["signedby"]["value"]
+ if "trusted" in opts:
+ self.trusted = opts["trusted"]["value"]
+ for opt in opts.values():
+ opt = opt["full"]
+ if opt:
+ try:
+ repo_line.pop(repo_line.index(opt))
+ except ValueError:
+ repo_line.pop(repo_line.index(f"[{opt}]"))
+ self.type = repo_line[0]
+ self.uri = repo_line[1]
+ self.dist = repo_line[2]
+ self.comps = repo_line[3:]
+ return True
+
+ def __str__(self):
+ """
+ Return string representation
+ """
+ return self.repo_line().strip()
+
+ def repo_line(self):
+ """
+ Return the line of the entry for the sources file
+ """
+ repo_line = []
+ if self.invalid:
+ return self.line
+ if self.disabled:
+ repo_line.append("#")
+
+ repo_line.append(self.type)
+ opts = _get_opts(self.line)
+ if self.architectures:
+ if "arch" not in opts:
+ opts["arch"] = {}
+ opts["arch"]["full"] = f"arch={','.join(self.architectures)}"
+ opts["arch"]["value"] = self.architectures
+ if self.signedby:
+ if "signedby" not in opts:
+ opts["signedby"] = {}
+ opts["signedby"]["full"] = f"signed-by={self.signedby}"
+ opts["signedby"]["value"] = self.signedby
+ if self.trusted:
+ if "trusted" not in opts:
+ opts["trusted"] = {}
+ opts["trusted"]["value"] = (
+ "yes" if string_to_bool_int(self.trusted) == 1 else "no"
+ )
+ opts["trusted"]["full"] = f"trusted={opts['trusted']['value']}"
+ if "trusted" in opts and opts["trusted"]["value"] == "no":
+ del opts["trusted"]
+
+ ordered_opts = []
+ for opt in opts.values():
+ if opt["full"] != "":
+ ordered_opts.append(opt["full"])
+
+ if ordered_opts:
+ repo_line.append(f"[{' '.join(ordered_opts)}]")
+
+ repo_line += [self.uri, self.dist, " ".join(self.comps)]
+ if self.comment:
+ repo_line.append(f"#{self.comment}")
+ return " ".join(repo_line) + "\n"
+
+ @property
+ def types(self):
+ """
+ Deb822 compatible attribute for the type
+ """
+ return [self.type]
+
+ @property
+ def uris(self):
+ """
+ Deb822 compatible attribute for the uri
+ """
+ return [self.uri]
+
+ @property
+ def suites(self):
+ """
+ Deb822 compatible attribute for the suite
+ """
+ if self.dist:
+ return [self.dist]
+ return []
+
+ @suites.setter
+ def suites(self, suites):
+ """
+ Deb822 compatible setter for the suite
+ """
+ if len(suites) > 1:
+ raise ValueError("Only one suite is possible for non deb822 source entry")
+ if suites:
+ self.dist = str(suites[0])
+ assert self.dist == suites[0]
+ else:
+ self.dist = ""
+ assert self.dist == ""
+
+ @property
+ def signedby(self):
+ """
+ Deb822 compatible attribute for the signedby
+ """
+ return self._signedby
+
+ @signedby.setter
+ def signedby(self, signedby):
+ """
+ Deb822 compatible setter for the signedy
+ """
+ self._signedby = str(signedby)
+
+
+class SourcesList:
+ """
+ Represents the full sources.list + sources.list.d files
+ including deb822 .sources files
+ """
+
+ def __init__(
+ self,
+ ):
+ self.list = [] # the actual SourceEntries Type
+ self.refresh()
+
+ def refresh(self):
+ """update the list of known entries"""
+ self.list = []
+ # read sources.list
+ file = _APT_SOURCES_LIST
+ if os.path.isfile(file):
+ self.load(file)
+ # read sources.list.d
+ partsdir = _APT_SOURCES_PARTSDIR
+ if os.path.isdir(partsdir):
+ for file in os.listdir(partsdir):
+ if file.endswith(".sources") or file.endswith(".list"):
+ self.load(os.path.join(partsdir, file))
+
+ def __iter__(self):
+ """
+ Iterate over self.list with SourceEntry elements
+ """
+ yield from self.list
+
+ def __find(self, *predicates, **attrs):
+ uri = attrs.pop("uri", None)
+ for source in self.list:
+ if uri and source.uri and uri.rstrip("/") != source.uri.rstrip("/"):
+ continue
+ if all(getattr(source, key) == attrs[key] for key in attrs) and all(
+ predicate(source) for predicate in predicates
+ ):
+ yield source
+
+ def add(
+ self,
+ type,
+ uri,
+ dist,
+ orig_comps,
+ comment="",
+ pos=-1,
+ file=None,
+ architectures=None,
+ signedby="",
+ parent=None,
+ ):
+ """
+ Add a new source to the sources.list.
+ The method will search for existing matching repos and will try to
+ reuse them as far as possible
+ """
+
+ type = type.strip()
+ disabled = type.startswith("#")
+ if disabled:
+ type = type[1:].lstrip()
+ if architectures is None:
+ architectures = []
+ architectures = set(architectures)
+ # create a working copy of the component list so that
+ # we can modify it later
+ comps = orig_comps[:]
+ sources = self.__find(
+ lambda s: set(s.architectures) == architectures,
+ disabled=disabled,
+ invalid=False,
+ type=type,
+ uri=uri,
+ dist=dist,
+ )
+ # check if we have this source already in the sources.list
+ for source in sources:
+ for new_comp in comps:
+ if new_comp in source.comps:
+ # we have this component already, delete it
+ # from the new_comps list
+ del comps[comps.index(new_comp)]
+ if len(comps) == 0:
+ return source
+
+ sources = self.__find(
+ lambda s: set(s.architectures) == architectures,
+ invalid=False,
+ type=type,
+ uri=uri,
+ dist=dist,
+ )
+ for source in sources:
+ if source.disabled == disabled:
+ # if there is a repo with the same (disabled, type, uri, dist)
+ # just add the components
+ if set(source.comps) != set(comps):
+ source.comps = list(set(source.comps + comps))
+ return source
+ elif source.disabled and not disabled:
+ # enable any matching (type, uri, dist), but disabled repo
+ if set(source.comps) == set(comps):
+ source.disabled = False
+ return source
+
+ new_entry = None
+ if file is None:
+ file = _APT_SOURCES_LIST
+ if file.endswith(".sources"):
+ new_entry = Deb822SourceEntry(None, file=file, list=self)
+ if parent:
+ parent = getattr(parent, "parent", parent)
+ assert isinstance(parent, Deb822SourceEntry)
+ for k in parent.section.tags:
+ new_entry.section[k] = parent.section[k]
+ new_entry.types = [type]
+ new_entry.uris = [uri]
+ new_entry.suites = [dist]
+ new_entry.comps = comps
+ if architectures:
+ new_entry.architectures = list(architectures)
+ new_entry.section.header = comment
+ new_entry.disabled = disabled
+ else:
+ # there isn't any matching source, so create a new line and parse it
+ parts = [
+ "#" if disabled else "",
+ type,
+ ("[arch=%s]" % ",".join(architectures)) if architectures else "",
+ uri,
+ dist,
+ ]
+ parts.extend(comps)
+ if comment:
+ parts.append("#" + comment)
+ line = " ".join(part for part in parts if part) + "\n"
+
+ new_entry = SourceEntry(line)
+ if file is not None:
+ new_entry.file = file
+
+ if pos < 0:
+ self.list.append(new_entry)
+ else:
+ self.list.insert(pos, new_entry)
+ return new_entry
+
+ def remove(self, source_entry):
+ """
+ Remove the entry from the sources.list
+ """
+ self.list.remove(source_entry)
+
+ def load_deb822_sections(self, file_obj):
+ """
+ Return Deb822 sections from .sources file object
+ """
+ sections = []
+ section = ""
+ for line in file_obj:
+ if not line.isspace():
+ # Consider not empty line as a part of a section
+ section += line
+ elif section:
+ # Add a new section on getting first space line
+ sections.append(Deb822Section(section))
+ section = ""
+
+ # Create the last section if we still have data for it
+ if section:
+ sections.append(Deb822Section(section))
+
+ return sections
+
+ def load(self, file_path):
+ """
+ Load the sources from the file
+ """
+ try:
+ with salt.utils.files.fopen(file_path) as f:
+ if file_path.endswith(".sources"):
+ for section in self.load_deb822_sections(f):
+ self.list.append(
+ Deb822SourceEntry(section, file_path, list=self)
+ )
+ else:
+ for line in f:
+ source = SourceEntry(line, file_path)
+ self.list.append(source)
+ except Exception as exc: # pylint: disable=broad-except
+ log.error("Could not parse source file '%s'", file_path, exc_info=True)
+
+ def index(self, entry):
+ return self.list.index(entry)
+
+ def save(self):
+ """save the current sources"""
+ # write an empty default config file if there aren't any sources
+ if len(self.list) == 0:
+ path = _APT_SOURCES_LIST
+ header = (
+ "## See sources.list(5) for more information, especialy\n"
+ "# Remember that you can only use http, ftp or file URIs\n"
+ "# CDROMs are managed through the apt-cdrom tool.\n"
+ )
+
+ try:
+ with salt.utils.files.fopen(path, "w") as f:
+ f.write(header)
+ except FileNotFoundError:
+ # No need to create file if there is no apt directory
+ pass
+ return
+
+ files = {}
+ for source in self.list:
+ if source.file not in files:
+ files[source.file] = []
+ elif isinstance(source, Deb822SourceEntry):
+ files[source.file].append("\n")
+ files[source.file].append(str(source) + "\n")
+ for file in files:
+ with salt.utils.files.fopen(file, "w") as f:
+ f.write("".join(files[file]))
+
+
+def _invalid(line):
+ """
+ This is a workaround since python3-apt does not support
+ the signed-by argument. This function was removed from
+ the class to ensure users using the python3-apt module or
+ not can use the signed-by option.
+ """
+ disabled = False
+ invalid = False
+ comment = ""
+ line = line.strip()
+ if not line:
+ invalid = True
+ return disabled, invalid, comment, ""
+
+ if line.startswith("#"):
+ disabled = True
+ line = line[1:]
+
+ idx = line.find("#")
+ if idx > 0:
+ comment = line[idx + 1 :]
+ line = line[:idx]
+
+ cdrom_match = re.match(r"(.*)(cdrom:.*/)(.*)", line.strip())
+ if cdrom_match:
+ repo_line = (
+ [p.strip() for p in cdrom_match.group(1).split()]
+ + [cdrom_match.group(2).strip()]
+ + [p.strip() for p in cdrom_match.group(3).split()]
+ )
+ else:
+ repo_line = line.strip().split()
+ if (
+ not repo_line
+ or repo_line[0] not in ["deb", "deb-src", "rpm", "rpm-src"]
+ or len(repo_line) < 3
+ ):
+ invalid = True
+ return disabled, invalid, comment, repo_line
+
+ if repo_line[1].startswith("["):
+ if not any(x.endswith("]") for x in repo_line[1:]):
+ invalid = True
+ return disabled, invalid, comment, repo_line
+
+ return disabled, invalid, comment, repo_line
+
+
+def _get_opts(line):
+ """
+ Return all opts in [] for a repo line
+ """
+ get_opts = re.search(r"\[(.*=.*)\]", line)
+ ret = OrderedDict()
+
+ if not get_opts:
+ return ret
+ opts = get_opts.group(0).strip("[]")
+ architectures = []
+ for opt in opts.split():
+ if opt.startswith("arch"):
+ architectures.extend(opt.split("=", 1)[1].split(","))
+ ret["arch"] = {}
+ ret["arch"]["full"] = opt
+ ret["arch"]["value"] = architectures
+ elif opt.startswith("signed-by"):
+ ret["signedby"] = {}
+ ret["signedby"]["full"] = opt
+ ret["signedby"]["value"] = opt.split("=", 1)[1]
+ elif opt.startswith("trusted"):
+ ret["trusted"] = {}
+ ret["trusted"]["full"] = opt
+ ret["trusted"]["value"] = opt.split("=", 1)[1]
+ else:
+ other_opt = opt.split("=", 1)[0]
+ ret[other_opt] = {}
+ ret[other_opt]["full"] = opt
+ ret[other_opt]["value"] = opt.split("=", 1)[1]
+ return ret
+
def combine_comments(comments):
"""
diff --git a/tests/pytests/functional/modules/test_pkg.py b/tests/pytests/functional/modules/test_pkg.py
index c80bb3b0c39..bd9a2093cbd 100644
--- a/tests/pytests/functional/modules/test_pkg.py
+++ b/tests/pytests/functional/modules/test_pkg.py
@@ -219,12 +219,15 @@ def test_owner(modules, grains):
# Similar to pkg.owner, but for FreeBSD's pkgng
@pytest.mark.skip_on_freebsd(reason="test for new package manager for FreeBSD")
@pytest.mark.requires_salt_modules("pkg.which")
-def test_which(modules):
+def test_which(grains, modules):
"""
test finding the package owning a file
"""
- func = "pkg.which"
- ret = modules.pkg.which("/usr/local/bin/salt-call")
+ if grains["os_family"] in ["Debian", "RedHat"]:
+ file = "/bin/mknod"
+ else:
+ file = "/usr/local/bin/salt-call"
+ ret = modules.pkg.which(file)
assert len(ret) != 0
diff --git a/tests/pytests/functional/states/pkgrepo/test_debian.py b/tests/pytests/functional/states/pkgrepo/test_debian.py
index 45afaf25746..1c57d387afd 100644
--- a/tests/pytests/functional/states/pkgrepo/test_debian.py
+++ b/tests/pytests/functional/states/pkgrepo/test_debian.py
@@ -39,12 +39,12 @@ def pkgrepo(states, grains):
@pytest.mark.requires_salt_states("pkgrepo.managed")
-def test_adding_repo_file(pkgrepo, tmp_path):
+def test_adding_repo_file(pkgrepo, repo_uri, tmp_path):
"""
test adding a repo file using pkgrepo.managed
"""
repo_file = str(tmp_path / "stable-binary.list")
- repo_content = "deb http://www.deb-multimedia.org stable main"
+ repo_content = f"deb {repo_uri} stable main"
ret = pkgrepo.managed(name=repo_content, file=repo_file, clean_file=True)
with salt.utils.files.fopen(repo_file, "r") as fp:
file_content = fp.read()
@@ -52,30 +52,24 @@ def test_adding_repo_file(pkgrepo, tmp_path):
@pytest.mark.requires_salt_states("pkgrepo.managed")
-def test_adding_repo_file_arch(pkgrepo, tmp_path, subtests):
+def test_adding_repo_file_arch(pkgrepo, repo_uri, tmp_path, subtests):
"""
test adding a repo file using pkgrepo.managed
and setting architecture
"""
repo_file = str(tmp_path / "stable-binary.list")
- repo_content = "deb [arch=amd64 ] http://www.deb-multimedia.org stable main"
+ repo_content = f"deb [arch=amd64 ] {repo_uri} stable main"
pkgrepo.managed(name=repo_content, file=repo_file, clean_file=True)
with salt.utils.files.fopen(repo_file, "r") as fp:
file_content = fp.read()
- assert (
- file_content.strip()
- == "deb [arch=amd64] http://www.deb-multimedia.org stable main"
- )
+ assert file_content.strip() == f"deb [arch=amd64] {repo_uri} stable main"
with subtests.test("With multiple archs"):
- repo_content = (
- "deb [arch=amd64,i386 ] http://www.deb-multimedia.org stable main"
- )
+ repo_content = f"deb [arch=amd64,i386 ] {repo_uri} stable main"
pkgrepo.managed(name=repo_content, file=repo_file, clean_file=True)
with salt.utils.files.fopen(repo_file, "r") as fp:
file_content = fp.read()
assert (
- file_content.strip()
- == "deb [arch=amd64,i386] http://www.deb-multimedia.org stable main"
+ file_content.strip() == f"deb [arch=amd64,i386] {repo_uri} stable main"
)
@@ -98,7 +92,7 @@ def test_adding_repo_file_cdrom(pkgrepo, tmp_path):
def system_aptsources_ids(value):
- return "{}(aptsources.sourceslist)".format(value.title())
+ return f"{value.title()}(aptsources.sourceslist)"
@pytest.fixture(
@@ -122,7 +116,7 @@ def system_aptsources(request, grains):
raise pytest.skip.Exception(
"This test is meant to run without the system aptsources package, but it's "
"available from '{}'.".format(sourceslist.__file__),
- **exc_kwargs
+ **exc_kwargs,
)
else:
# Run the test
@@ -378,7 +372,7 @@ def test_pkgrepo_with_architectures(pkgrepo, grains, sources_list_file, subtests
)
def _get_arch(arch):
- return "[arch={}] ".format(arch) if arch else ""
+ return f"[arch={arch}] " if arch else ""
def _run(arch=None, test=False):
return pkgrepo.managed(
@@ -498,6 +492,11 @@ def test_pkgrepo_with_architectures(pkgrepo, grains, sources_list_file, subtests
assert ret.result is True
+@pytest.fixture(scope="module")
+def repo_uri(grains):
+ yield "http://www.deb-multimedia.org"
+
+
@pytest.fixture
def trailing_slash_repo_file(grains):
if grains["os_family"] != "Debian":
@@ -517,19 +516,21 @@ def trailing_slash_repo_file(grains):
@pytest.mark.requires_salt_states("pkgrepo.managed", "pkgrepo.absent")
-def test_repo_present_absent_trailing_slash_uri(pkgrepo, trailing_slash_repo_file):
+def test_repo_present_absent_trailing_slash_uri(
+ pkgrepo, repo_uri, trailing_slash_repo_file
+):
"""
test adding a repo with a trailing slash in the uri
"""
# with the trailing slash
- repo_content = "deb http://www.deb-multimedia.org/ stable main"
+ repo_content = f"deb {repo_uri}/ stable main"
# initial creation
ret = pkgrepo.managed(
name=repo_content, file=trailing_slash_repo_file, refresh=False, clean_file=True
)
with salt.utils.files.fopen(trailing_slash_repo_file, "r") as fp:
file_content = fp.read()
- assert file_content.strip() == "deb http://www.deb-multimedia.org/ stable main"
+ assert file_content.strip() == f"deb {repo_uri}/ stable main"
assert ret.changes
# no changes
ret = pkgrepo.managed(
@@ -542,19 +543,21 @@ def test_repo_present_absent_trailing_slash_uri(pkgrepo, trailing_slash_repo_fil
@pytest.mark.requires_salt_states("pkgrepo.managed", "pkgrepo.absent")
-def test_repo_present_absent_no_trailing_slash_uri(pkgrepo, trailing_slash_repo_file):
+def test_repo_present_absent_no_trailing_slash_uri(
+ pkgrepo, repo_uri, trailing_slash_repo_file
+):
"""
test adding a repo with a trailing slash in the uri
"""
# without the trailing slash
- repo_content = "deb http://www.deb-multimedia.org stable main"
+ repo_content = f"deb {repo_uri} stable main"
# initial creation
ret = pkgrepo.managed(
name=repo_content, file=trailing_slash_repo_file, refresh=False, clean_file=True
)
with salt.utils.files.fopen(trailing_slash_repo_file, "r") as fp:
file_content = fp.read()
- assert file_content.strip() == "deb http://www.deb-multimedia.org stable main"
+ assert file_content.strip() == repo_content
assert ret.changes
# no changes
ret = pkgrepo.managed(
@@ -568,35 +571,74 @@ def test_repo_present_absent_no_trailing_slash_uri(pkgrepo, trailing_slash_repo_
@pytest.mark.requires_salt_states("pkgrepo.managed", "pkgrepo.absent")
def test_repo_present_absent_no_trailing_slash_uri_add_slash(
- pkgrepo, trailing_slash_repo_file
+ pkgrepo, repo_uri, trailing_slash_repo_file
):
"""
test adding a repo without a trailing slash, and then running it
again with a trailing slash.
"""
# without the trailing slash
- repo_content = "deb http://www.deb-multimedia.org stable main"
+ repo_content = f"deb {repo_uri} stable main"
# initial creation
ret = pkgrepo.managed(
name=repo_content, file=trailing_slash_repo_file, refresh=False, clean_file=True
)
with salt.utils.files.fopen(trailing_slash_repo_file, "r") as fp:
file_content = fp.read()
- assert file_content.strip() == "deb http://www.deb-multimedia.org stable main"
+ assert file_content.strip() == repo_content
assert ret.changes
# now add a trailing slash in the name
- repo_content = "deb http://www.deb-multimedia.org/ stable main"
+ repo_content = f"deb {repo_uri}/ stable main"
ret = pkgrepo.managed(
name=repo_content, file=trailing_slash_repo_file, refresh=False
)
with salt.utils.files.fopen(trailing_slash_repo_file, "r") as fp:
file_content = fp.read()
- assert file_content.strip() == "deb http://www.deb-multimedia.org/ stable main"
+ assert file_content.strip() == repo_content
# absent
ret = pkgrepo.absent(name=repo_content)
assert ret.result
+@pytest.mark.requires_salt_states("pkgrepo.managed", "pkgrepo.absent")
+def test_repo_absent_trailing_slash_in_uri(
+ pkgrepo, repo_uri, subtests, trailing_slash_repo_file
+):
+ """
+ Test pkgrepo.absent with a URI containing a trailing slash
+
+ See https://github.com/saltstack/salt/issues/64286
+ """
+ repo_file = pathlib.Path(trailing_slash_repo_file)
+ repo_content = f"deb [arch=amd64] {repo_uri}/ stable main"
+
+ with subtests.test("Remove repo with trailing slash in URI"):
+ # Write contents to file with trailing slash in URI
+ repo_file.write_text(f"{repo_content}\n")
+ # Perform and validate removal
+ ret = pkgrepo.absent(name=repo_content)
+ assert ret.result
+ assert ret.changes
+ assert not repo_file.exists()
+ # A second run of the pkgrepo.absent state should be a no-op (i.e. no changes)
+ ret = pkgrepo.absent(name=repo_content)
+ assert ret.result
+ assert not ret.changes
+ assert not repo_file.exists()
+
+ with subtests.test("URI match with mismatched arch"):
+ # Create a repo file that matches the URI but contains no architecture.
+ # This should not be identified as a match for repo_content, and thus
+ # the result of a state should be a no-op.
+ repo_file.write_text(f"deb {repo_uri} stable main\n")
+ # Since this was a no-op, the state should have succeeded, made no
+ # changes, and left the repo file in place.
+ ret = pkgrepo.absent(name=repo_content)
+ assert ret.result
+ assert not ret.changes
+ assert repo_file.exists()
+
+
@attr.s(kw_only=True)
class Repo:
key_root = attr.ib(default=pathlib.Path("/usr", "share", "keyrings"))
diff --git a/tests/pytests/functional/utils/pkg/__init__.py b/tests/pytests/functional/utils/pkg/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/tests/pytests/functional/utils/pkg/test_deb.py b/tests/pytests/functional/utils/pkg/test_deb.py
new file mode 100644
index 00000000000..37b1fb0ed33
--- /dev/null
+++ b/tests/pytests/functional/utils/pkg/test_deb.py
@@ -0,0 +1,88 @@
+import salt.utils.pkg.deb
+from salt.utils.pkg.deb import SourceEntry
+
+
+def test__get_opts():
+ tests = [
+ {
+ "oneline": "deb [signed-by=/etc/apt/keyrings/example.key arch=amd64] https://example.com/pub/repos/apt xenial main",
+ "result": {
+ "signedby": {
+ "full": "signed-by=/etc/apt/keyrings/example.key",
+ "value": "/etc/apt/keyrings/example.key",
+ },
+ "arch": {"full": "arch=amd64", "value": ["amd64"]},
+ },
+ },
+ {
+ "oneline": "deb [arch=amd64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main",
+ "result": {
+ "arch": {"full": "arch=amd64", "value": ["amd64"]},
+ "signedby": {
+ "full": "signed-by=/etc/apt/keyrings/example.key",
+ "value": "/etc/apt/keyrings/example.key",
+ },
+ },
+ },
+ {
+ "oneline": "deb [arch=amd64] https://example.com/pub/repos/apt xenial main",
+ "result": {
+ "arch": {"full": "arch=amd64", "value": ["amd64"]},
+ },
+ },
+ ]
+
+ for test in tests:
+ ret = salt.utils.pkg.deb._get_opts(test["oneline"])
+ assert ret == test["result"]
+
+
+def test_SourceEntry_init():
+ source = SourceEntry(
+ "deb [arch=amd64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main",
+ file="/tmp/test.list",
+ )
+ assert source.invalid is False
+ assert source.comps == ["main"]
+ assert source.comment == ""
+ assert source.dist == "xenial"
+ assert source.type == "deb"
+ assert source.uri == "https://example.com/pub/repos/apt"
+ assert source.architectures == ["amd64"]
+ assert source.signedby == "/etc/apt/keyrings/example.key"
+ assert source.file == "/tmp/test.list"
+
+
+def test_SourceEntry_repo_line():
+
+ lines = [
+ "deb [arch=amd64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n",
+ "deb [signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n",
+ "deb [signed-by=/etc/apt/keyrings/example.key arch=amd64,x86_64] https://example.com/pub/repos/apt xenial main\n",
+ ]
+ for line in lines:
+ source = SourceEntry(line, file="/tmp/test.list")
+ assert source.invalid is False
+ assert source.repo_line() == line
+
+ lines = [
+ (
+ "deb [arch=amd64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n",
+ "deb [arch=x86_64 signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n",
+ ),
+ (
+ "deb [signed-by=/etc/apt/keyrings/example.key] https://example.com/pub/repos/apt xenial main\n",
+ "deb [signed-by=/etc/apt/keyrings/example.key arch=x86_64] https://example.com/pub/repos/apt xenial main\n",
+ ),
+ (
+ "deb [signed-by=/etc/apt/keyrings/example.key arch=amd64,x86_64] https://example.com/pub/repos/apt xenial main\n",
+ "deb [signed-by=/etc/apt/keyrings/example.key arch=x86_64] https://example.com/pub/repos/apt xenial main\n",
+ ),
+ ]
+ for line in lines:
+ line_key, line_value = line
+ source = SourceEntry(line_key, file="/tmp/test.list")
+ source.architectures = ["x86_64"]
+ assert source.invalid is False
+ assert source.repo_line() == line_value
+ assert source.invalid is False
diff --git a/tests/pytests/unit/modules/test_aptpkg.py b/tests/pytests/unit/modules/test_aptpkg.py
index 3d7c004ef7f..9fc66494c86 100644
--- a/tests/pytests/unit/modules/test_aptpkg.py
+++ b/tests/pytests/unit/modules/test_aptpkg.py
@@ -25,42 +25,6 @@ from salt.exceptions import (
)
from tests.support.mock import MagicMock, Mock, call, mock_open, patch
-try:
- from aptsources.sourceslist import ( # pylint: disable=unused-import
- SourceEntry,
- SourcesList,
- )
-
- HAS_APT = True
-except ImportError:
- HAS_APT = False
-
-try:
- from aptsources import sourceslist # pylint: disable=unused-import
-
- HAS_APTSOURCES = True
-except ImportError:
- HAS_APTSOURCES = False
-
-HAS_DEB822 = False
-
-if HAS_APT:
- try:
- from aptsources.sourceslist import Deb822SourceEntry, _deb822 # pylint: disable=unused-import
-
- HAS_DEB822 = True
- except ImportError:
- pass
-
-HAS_APT_PKG = False
-
-try:
- import apt_pkg
-
- HAS_APT_PKG = True
-except ImportError:
- pass
-
log = logging.getLogger(__name__)
@@ -232,11 +196,11 @@ class MockSourceEntry:
self.file = file
self.disabled = False
self.dist = dist
+ self.suites = [dist]
self.comps = []
self.architectures = []
self.signedby = ""
- if HAS_DEB822:
- self.types = []
+ self.types = []
def mysplit(self, line):
return line.split()
@@ -285,26 +249,25 @@ def deb822_repo_file(tmp_path: pathlib.Path, deb822_repo_content: str):
def mock_apt_config(deb822_repo_file: pathlib.Path):
"""
Mocking common to deb822 testing so that apt_pkg uses the
- tmp_path/sources.list.d as the Dir::Etc::sourceparts location
+ tmp_path/sources.list.d as the sourceparts location
"""
with patch.dict(
aptpkg.__salt__,
{"config.option": MagicMock()},
- ), patch.object(apt_pkg, "config") as mock_config:
- mock_config.find_file.return_value = "/etc/apt/sources.list"
- mock_config.find_dir.return_value = os.path.dirname(str(deb822_repo_file))
+ ) as mock_config, patch(
+ "salt.utils.pkg.deb._APT_SOURCES_PARTSDIR",
+ os.path.dirname(str(deb822_repo_file)),
+ ):
yield mock_config
-@pytest.mark.skipif(not HAS_DEB822, reason="Requires deb822 support")
-@pytest.mark.skipif(not HAS_APT_PKG, reason="Requires debian/ubuntu apt_pkg system library")
def test_mod_repo_deb822_modify(deb822_repo_file: pathlib.Path, mock_apt_config):
"""
Test that aptpkg can modify an existing repository in the deb822 format.
In this test, we match the repository by name and disable it.
"""
uri = "http://cz.archive.ubuntu.com/ubuntu/"
- repo = f"deb {uri} noble main"
+ repo = f"deb [signed-by=/usr/share/keyrings/ubuntu-archive-keyring.gpg] {uri} noble main"
aptpkg.mod_repo(repo, enabled=False, file=str(deb822_repo_file), refresh_db=False)
@@ -313,14 +276,12 @@ def test_mod_repo_deb822_modify(deb822_repo_file: pathlib.Path, mock_apt_config)
assert f"URIs: {uri}" in repo_file
-@pytest.mark.skipif(not HAS_DEB822, reason="Requires deb822 support")
-@pytest.mark.skipif(not HAS_APT_PKG, reason="Requires debian/ubuntu apt_pkg system library")
def test_mod_repo_deb822_add(deb822_repo_file: pathlib.Path, mock_apt_config):
"""
Test that aptpkg can add a repository in the deb822 format.
"""
uri = "http://security.ubuntu.com/ubuntu/"
- repo = f"deb {uri} noble-security main"
+ repo = f"deb [signed-by=/usr/share/keyrings/ubuntu-archive-keyring.gpg] {uri} noble-security main"
aptpkg.mod_repo(repo, file=str(deb822_repo_file), refresh_db=False)
@@ -329,23 +290,26 @@ def test_mod_repo_deb822_add(deb822_repo_file: pathlib.Path, mock_apt_config):
assert "URIs: http://cz.archive.ubuntu.com/ubuntu/" in repo_file
-@pytest.mark.skipif(not HAS_DEB822, reason="Requires deb822 support")
-@pytest.mark.skipif(not HAS_APT_PKG, reason="Requires debian/ubuntu apt_pkg system library")
def test_del_repo_deb822(deb822_repo_file: pathlib.Path, mock_apt_config):
"""
Test that aptpkg can delete a repository in the deb822 format.
"""
uri = "http://cz.archive.ubuntu.com/ubuntu/"
- repo = f"deb {uri} noble main"
with patch.object(aptpkg, "refresh_db"):
+ repo = f"deb {uri} noble main"
aptpkg.del_repo(repo, file=str(deb822_repo_file))
+ assert os.path.isfile(str(deb822_repo_file))
- assert not os.path.isfile(str(deb822_repo_file))
+ repo = f"deb {uri} noble-updates main"
+ aptpkg.del_repo(repo, file=str(deb822_repo_file))
+ assert os.path.isfile(str(deb822_repo_file))
+
+ repo = f"deb {uri} noble-backports main"
+ aptpkg.del_repo(repo, file=str(deb822_repo_file))
+ assert not os.path.isfile(str(deb822_repo_file))
-@pytest.mark.skipif(not HAS_DEB822, reason="Requires deb822 support")
-@pytest.mark.skipif(not HAS_APT_PKG, reason="Requires debian/ubuntu apt_pkg system library")
def test_get_repo_deb822(deb822_repo_file: pathlib.Path, mock_apt_config):
"""
Test that aptpkg can match a repository in the deb822 format.
@@ -424,12 +388,9 @@ def test_get_repo_keys(repo_keys_var):
mock = MagicMock(return_value={"retcode": 0, "stdout": APT_KEY_LIST})
with patch.dict(aptpkg.__salt__, {"cmd.run_all": mock}):
- if not HAS_APT:
- with patch("os.listdir", return_value="/tmp/keys"):
- with patch("pathlib.Path.is_dir", return_value=True):
- assert aptpkg.get_repo_keys() == repo_keys_var
- else:
- assert aptpkg.get_repo_keys() == repo_keys_var
+ with patch("os.listdir", return_value="/tmp/keys"):
+ with patch("pathlib.Path.is_dir", return_value=True):
+ assert aptpkg.get_repo_keys() == repo_keys_var
def test_file_dict(lowpkg_files_var):
@@ -984,14 +945,14 @@ def test_mod_repo_match(tmp_path):
with patch(
"salt.modules.aptpkg._split_repo_str",
MagicMock(
- return_value=(
- "deb",
- [],
- "http://cdn-aws.deb.debian.org/debian/",
- "stretch",
- ["main"],
- "",
- )
+ return_value={
+ "type": "deb",
+ "architectures": [],
+ "uri": "http://cdn-aws.deb.debian.org/debian/",
+ "dist": "stretch",
+ "comps": ["main"],
+ "signedby": "",
+ }
),
):
source_line_no_slash = (
@@ -1134,7 +1095,7 @@ def test__parse_source(case):
importlib.reload(aptpkg)
source = NoAptSourceEntry(case["line"])
- ok = source._parse_sources(case["line"])
+ ok = source.parse(case["line"])
assert ok is case["ok"]
assert source.invalid is case["invalid"]
@@ -1545,28 +1506,34 @@ SERVICE:cups-daemon,390,/usr/sbin/cupsd
]
-@pytest.mark.skipif(
- HAS_APTSOURCES is True, reason="Only run test with python3-apt library is missing."
-)
def test_sourceslist_multiple_comps():
"""
Test SourcesList when repo has multiple comps
"""
repo_line = "deb http://archive.ubuntu.com/ubuntu/ focal-updates main restricted"
- with patch.object(aptpkg, "HAS_APT", return_value=True):
- with patch("salt.utils.files.fopen", mock_open(read_data=repo_line)):
- with patch("pathlib.Path.is_file", side_effect=[True, False]):
- sources = aptpkg.SourcesList()
- for source in sources:
- assert source.type == "deb"
- assert source.uri == "http://archive.ubuntu.com/ubuntu/"
- assert source.comps == ["main", "restricted"]
- assert source.dist == "focal-updates"
+ with patch("salt.utils.files.fopen", mock_open(read_data=repo_line)), patch(
+ "pathlib.Path.is_file", side_effect=[True, False]
+ ):
+ sources = aptpkg.SourcesList()
+ for source in sources:
+ assert source.type == "deb"
+ assert source.uri == "http://archive.ubuntu.com/ubuntu/"
+ assert source.comps == ["main", "restricted"]
+ assert source.dist == "focal-updates"
+
+
+def test_sourceslist_multiple_comps():
+ """
+ Test SourcesList when repo has multiple comps
+ """
+ sources = aptpkg.SourcesList()
+ for source in sources:
+ assert source.type == "deb"
+ assert source.uri == "http://archive.ubuntu.com/ubuntu/"
+ assert source.comps == ["main", "restricted"]
+ assert source.dist == "focal-updates"
-@pytest.mark.skipif(
- HAS_APTSOURCES is True, reason="Only run test with python3-apt library is missing."
-)
@pytest.mark.parametrize(
"repo_line",
[