File v3-1-2-arbitrary-file-read-write-plus-RCE.patch of Package cobbler

From 97116e47ebe065fa8641bc79d1527bf13b3cf196 Mon Sep 17 00:00:00 2001
From: SchoolGuy <egotthold@suse.de>
Date: Thu, 26 Aug 2021 12:50:09 +0200
Subject: [PATCH] Backport of the arbitrary file read & write plus RCE

---
 cobbler/remote.py   | 94 ++++++++++++++++++++++++++++++++++-----------
 cobbler/tftpgen.py  | 18 ++++++---
 cobbler/validate.py | 45 +++++++++++++++++++++-
 3 files changed, 127 insertions(+), 30 deletions(-)

Index: cobbler-3.1.2/cobbler/remote.py
===================================================================
--- cobbler-3.1.2.orig/cobbler/remote.py
+++ cobbler-3.1.2/cobbler/remote.py
@@ -28,6 +28,7 @@ from past.utils import old_div
 import base64
 import errno
 import fcntl
+import keyword
 import os
 import random
 import xmlrpc.server
@@ -43,6 +44,7 @@ from cobbler.items import package, syste
 from cobbler import tftpgen
 from cobbler import utils
 from cobbler.cexceptions import CX
+from cobbler.validate import validate_autoinstall_script_name, validate_obj_id, validate_obj_name
 
 
 EVENT_TIMEOUT = 7 * 24 * 60 * 60        # 1 week
@@ -546,12 +548,14 @@ class CobblerXMLRPCInterface(object):
         :return: The username if the token was valid.
         :raises CX: If the token supplied to the function is invalid.
         """
+        if not CobblerXMLRPCInterface.__is_token(token):
+            raise ValueError("\"token\" did not have the correct format or type!")
         if token not in self.token_cache:
             raise CX("invalid token: %s" % token)
         else:
             return self.token_cache[token][1]
 
-    def _log(self, msg, user=None, token=None, name=None, object_id=None, attribute=None, debug=False, error=False):
+    def _log(self, msg, token=None, name=None, object_id=None, attribute=None, debug=False, error=False):
         """
         Helper function to write data to the log file from the XMLRPC remote implementation.
         Takes various optional parameters that should be supplied when known.
@@ -567,10 +571,10 @@ class CobblerXMLRPCInterface(object):
         :param error: If the message logged is an error message.
         :type error: bool
         """
+        if not (isinstance(error, bool) or isinstance(debug, bool) or isinstance(msg, str)):
+            return
         # add the user editing the object, if supplied
         m_user = "?"
-        if user is not None:
-            m_user = user
         if token is not None:
             try:
                 m_user = self.get_user_from_token(token)
@@ -580,13 +584,21 @@ class CobblerXMLRPCInterface(object):
         msg = "REMOTE %s; user(%s)" % (msg, m_user)
 
         if name is not None:
+            if not isinstance(name, str):
+                return
+            if not validate_obj_name(name):
+                return
             msg = "%s; name(%s)" % (msg, name)
 
         if object_id is not None:
+            if not validate_obj_id(object_id):
+                return
             msg = "%s; object_id(%s)" % (msg, object_id)
 
         # add any attributes being modified, if any
         if attribute:
+            if not (isinstance(attribute, str) and attribute.isidentifier()) or keyword.iskeyword(attribute):
+                return
             msg = "%s; attribute(%s)" % (msg, attribute)
 
         # log to the correct logger
@@ -1757,6 +1769,9 @@ class CobblerXMLRPCInterface(object):
         :param token: The API-token obtained via the login() method.
         :return: 0 on success, 1 on error.
         """
+        if not self.api.settings().allow_dynamic_settings:
+            self._log("modify_setting - feature turned off but was tried to be accessed", token=token)
+            return 1
         self._log("modify_setting(%s)" % setting_name, token=token)
         self.check_access(token, "modify_setting")
         try:
@@ -2122,6 +2137,10 @@ class CobblerXMLRPCInterface(object):
         :return: Some generated script.
         """
         self._log("generate_script, name is %s" % str(name))
+        # This is duplicated from tftpgen.py to prevent log poisoning via a template engine (Cheetah, Jinja2).
+        if not validate_autoinstall_script_name(name):
+            raise ValueError("\"name\" handed to generate_script was not valid!")
+        self._log("generate_script, name is \"%s\"" % name)
         return self.api.generate_script(profile, system, name)
 
     def get_blended_data(self, profile=None, system=None):
@@ -2407,6 +2426,8 @@ class CobblerXMLRPCInterface(object):
         :param rest: This is dropped in this method since it is not needed here.
         :return: True if everything succeeded.
         """
+        if not self.__validate_log_data_params(sys_name, file, size, offset, data, token):
+            return False
         self._log("upload_log_data (file: '%s', size: %s, offset: %s)" % (file, size, offset), token=token, name=sys_name)
 
         # Check if enabled in self.api.settings()
@@ -2415,15 +2436,36 @@ class CobblerXMLRPCInterface(object):
             return False
 
         # Find matching system record
-        systems = self.api.systems()
-        obj = systems.find(name=sys_name)
+        obj = self.api.find_system(name=sys_name)
         if obj is None:
             # system not found!
-            self._log("upload_log_data - WARNING - system '%s' not found in Cobbler" % sys_name, token=token, name=sys_name)
+            self._log("upload_log_data - WARNING - system '%s' not found in Cobbler" % sys_name, token=token,
+                      name=sys_name)
+            return False
 
-        return self.__upload_file(sys_name, file, size, offset, data)
+        return self.__upload_file(obj.name, file, size, offset, data)
 
-    def __upload_file(self, sys_name, file, size, offset, data):
+    def __validate_log_data_params(self, sys_name: str, logfile_name: str, size: int, offset: int, data: bytes,
+                                   token=None) -> bool:
+        # Validate all types
+        if not (isinstance(sys_name, str) and isinstance(logfile_name, str) and isinstance(size, int)
+                and isinstance(offset, int) and isinstance(data, bytes)):
+            self.logger.warning("upload_log_data - One of the parameters handed over had an invalid type!")
+            return False
+        if token is not None and not isinstance(token, str):
+            self.logger.warning("upload_log_data - token was given but had an invalid type.")
+            return False
+        # Validate sys_name with item regex
+        if not validate_obj_name(sys_name):
+            self.logger.warning("upload_log_data - The provided sys_name contained invalid characters!")
+            return False
+        # Validate logfile_name - this uses the script name validation, possibly we need our own for this one later
+        if not validate_autoinstall_script_name(logfile_name):
+            self.logger.warning("upload_log_data - The provided file contained invalid characters!")
+            return False
+        return True
+
+    def __upload_file(self, sys_name: str, logfile_name: str, size: int, offset: int, data: bytes) -> bool:
         """
         Files can be uploaded in chunks, if so the size describes the chunk rather than the whole file. The offset
         indicates where the chunk belongs the special offset -1 is used to indicate the final chunk.
@@ -2435,28 +2477,30 @@ class CobblerXMLRPCInterface(object):
         :param data: base64 encoded file contents
         :return: True if the action succeeded.
         """
-        contents = base64.decodestring(data)
+        contents = base64.decodebytes(data)
         del data
         if offset != -1:
             if size is not None:
                 if size != len(contents):
                     return False
 
-        # XXX - have an incoming dir and move after upload complete
-        # SECURITY - ensure path remains under uploadpath
-        tt = str.maketrans("/", "+")
-        fn = str.translate(file, tt)
-        if fn.startswith('..'):
-            raise CX("invalid filename used: %s" % fn)
-
-        # FIXME ... get the base dir from cobbler settings()
-        udir = "/var/log/cobbler/anamon/%s" % sys_name
-        if not os.path.isdir(udir):
-            os.mkdir(udir, 0o755)
+        # FIXME: Get the base directory from Cobbler app-settings
+        anamon_base_directory = "/var/log/cobbler/anamon"
+        anamon_sys_directory = os.path.join(anamon_base_directory, sys_name)
+
+        file_name = os.path.join(anamon_sys_directory, logfile_name)
+        normalized_path = os.path.normpath(file_name)
+
+        if not normalized_path.startswith(anamon_sys_directory):
+            self.logger.warning("upload_log_data: built path for the logfile was outside of the Cobbler-Anamon log "
+                                "directory!")
+            return False
+
+        if not os.path.isdir(anamon_sys_directory):
+            os.mkdir(anamon_sys_directory, 0o755)
 
-        fn = "%s/%s" % (udir, fn)
         try:
-            st = os.lstat(fn)
+            st = os.lstat(file_name)
         except OSError as e:
             if e.errno == errno.ENOENT:
                 pass
@@ -2464,9 +2508,9 @@ class CobblerXMLRPCInterface(object):
                 raise
         else:
             if not stat.S_ISREG(st.st_mode):
-                raise CX("destination not a file: %s" % fn)
+                raise CX("destination not a file: %s" % file_name)
 
-        fd = os.open(fn, os.O_RDWR | os.O_CREAT, 0o644)
+        fd = os.open(file_name, os.O_RDWR | os.O_CREAT | os.O_CLOEXEC, 0o644)
         # log_error("fd=%r" %fd)
         try:
             if offset == 0 or (offset == -1 and size == len(contents)):
@@ -3076,6 +3120,10 @@ class CobblerXMLRPCInterface(object):
         self.token_cache[b64] = (time.time(), user)
         return b64
 
+    @staticmethod
+    def __is_token(token: str) -> bool:
+        return isinstance(token, str) and len(token) == 36
+
     def __invalidate_expired_tokens(self):
         """
         Deletes any login tokens that might have expired. Also removes expired events.
Index: cobbler-3.1.2/cobbler/tftpgen.py
===================================================================
--- cobbler-3.1.2.orig/cobbler/tftpgen.py
+++ cobbler-3.1.2/cobbler/tftpgen.py
@@ -29,6 +29,7 @@ import socket
 from cobbler import templar
 from cobbler import utils
 from cobbler.cexceptions import CX
+from cobbler.validate import validate_autoinstall_script_name
 
 
 class TFTPGen(object):
@@ -1076,8 +1077,13 @@ class TFTPGen(object):
         """
         if what == "profile":
             obj = self.api.find_profile(name=objname)
-        else:
+        elif what == "system":
             obj = self.api.find_system(name=objname)
+        else:
+            raise ValueError("\"what\" needs to be either \"profile\" or \"system\"!")
+
+        if not validate_autoinstall_script_name(script_name):
+            raise ValueError("\"script_name\" handed to generate_script was not valid!")
 
         if not obj:
             return "# %s named %s not found" % (what, objname)
@@ -1102,12 +1108,14 @@ class TFTPGen(object):
         else:
             blended['img_path'] = os.path.join("/images", distro.name)
 
-        template = os.path.normpath(os.path.join("/var/lib/cobbler/autoinstall_scripts", script_name))
+        scripts_root = "/var/lib/cobbler/scripts"
+        template = os.path.normpath(os.path.join(scripts_root, script_name))
+        if not template.startswith(scripts_root):
+            return "# script template \"%s\" could not be found in the script root" % script_name
         if not os.path.exists(template):
             return "# script template %s not found" % script_name
 
-        template_fh = open(template)
-        template_data = template_fh.read()
-        template_fh.close()
+        with open(template) as template_fh:
+            template_data = template_fh.read()
 
         return self.templar.render(template_data, blended, None, obj)
Index: cobbler-3.1.2/cobbler/validate.py
===================================================================
--- cobbler-3.1.2.orig/cobbler/validate.py
+++ cobbler-3.1.2/cobbler/validate.py
@@ -20,10 +20,13 @@ Foundation, Inc., 51 Franklin Street, Fi
 import netaddr
 import re
 import shlex
+from uuid import UUID
 
-from cobbler.cexceptions import CX
+import netaddr
 
+from cobbler.cexceptions import CX
 
+RE_SCRIPT_NAME = re.compile(r"[a-zA-Z0-9_.]+")
 RE_OBJECT_NAME = re.compile(r'[a-zA-Z0-9_\-.:]*$')
 RE_HOSTNAME = re.compile(r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9]))*$')
 
@@ -249,4 +252,42 @@ def name_servers_search(search, for_item
 
     return search
 
-# EOF
+
+def validate_autoinstall_script_name(name: str) -> bool:
+    if not isinstance(name, str):
+        return False
+    if re.fullmatch(RE_SCRIPT_NAME, name):
+        return True
+    return False
+
+
+def validate_uuid(possible_uuid: str) -> bool:
+    if not isinstance(possible_uuid, str):
+        return False
+    # Taken from: https://stackoverflow.com/a/33245493/4730773
+    try:
+        uuid_obj = UUID(possible_uuid, version=4)
+    except ValueError:
+        return False
+    return str(uuid_obj) == possible_uuid
+
+
+def validate_obj_type(object_type: str) -> bool:
+    if not isinstance(object_type, str):
+        return False
+    return object_type in ["distro", "profile", "system", "repo", "image", "mgmtclass", "package", "file", "menu"]
+
+
+def validate_obj_name(future_object_name: str) -> bool:
+    if not isinstance(future_object_name, str):
+        return False
+    return bool(re.fullmatch(RE_OBJECT_NAME, future_object_name))
+
+
+def validate_obj_id(object_id: str) -> bool:
+    if not isinstance(object_id, str):
+        return False
+    if object_id.startswith("___NEW___"):
+        object_id = object_id[9:]
+    (otype, oname) = object_id.split("::", 1)
+    return validate_obj_type(otype) and validate_obj_name(oname)
openSUSE Build Service is sponsored by