File v3-1-2-arbitrary-file-read-write-plus-RCE.patch of Package cobbler
From 97116e47ebe065fa8641bc79d1527bf13b3cf196 Mon Sep 17 00:00:00 2001
From: SchoolGuy <egotthold@suse.de>
Date: Thu, 26 Aug 2021 12:50:09 +0200
Subject: [PATCH] Backport of the arbitrary file read & write plus RCE
---
cobbler/remote.py | 94 ++++++++++++++++++++++++++++++++++-----------
cobbler/tftpgen.py | 18 ++++++---
cobbler/validate.py | 45 +++++++++++++++++++++-
3 files changed, 127 insertions(+), 30 deletions(-)
Index: cobbler-3.1.2/cobbler/remote.py
===================================================================
--- cobbler-3.1.2.orig/cobbler/remote.py
+++ cobbler-3.1.2/cobbler/remote.py
@@ -28,6 +28,7 @@ from past.utils import old_div
import base64
import errno
import fcntl
+import keyword
import os
import random
import xmlrpc.server
@@ -43,6 +44,7 @@ from cobbler.items import package, syste
from cobbler import tftpgen
from cobbler import utils
from cobbler.cexceptions import CX
+from cobbler.validate import validate_autoinstall_script_name, validate_obj_id, validate_obj_name
EVENT_TIMEOUT = 7 * 24 * 60 * 60 # 1 week
@@ -546,12 +548,14 @@ class CobblerXMLRPCInterface(object):
:return: The username if the token was valid.
:raises CX: If the token supplied to the function is invalid.
"""
+ if not CobblerXMLRPCInterface.__is_token(token):
+ raise ValueError("\"token\" did not have the correct format or type!")
if token not in self.token_cache:
raise CX("invalid token: %s" % token)
else:
return self.token_cache[token][1]
- def _log(self, msg, user=None, token=None, name=None, object_id=None, attribute=None, debug=False, error=False):
+ def _log(self, msg, token=None, name=None, object_id=None, attribute=None, debug=False, error=False):
"""
Helper function to write data to the log file from the XMLRPC remote implementation.
Takes various optional parameters that should be supplied when known.
@@ -567,10 +571,10 @@ class CobblerXMLRPCInterface(object):
:param error: If the message logged is an error message.
:type error: bool
"""
+ if not (isinstance(error, bool) or isinstance(debug, bool) or isinstance(msg, str)):
+ return
# add the user editing the object, if supplied
m_user = "?"
- if user is not None:
- m_user = user
if token is not None:
try:
m_user = self.get_user_from_token(token)
@@ -580,13 +584,21 @@ class CobblerXMLRPCInterface(object):
msg = "REMOTE %s; user(%s)" % (msg, m_user)
if name is not None:
+ if not isinstance(name, str):
+ return
+ if not validate_obj_name(name):
+ return
msg = "%s; name(%s)" % (msg, name)
if object_id is not None:
+ if not validate_obj_id(object_id):
+ return
msg = "%s; object_id(%s)" % (msg, object_id)
# add any attributes being modified, if any
if attribute:
+ if not (isinstance(attribute, str) and attribute.isidentifier()) or keyword.iskeyword(attribute):
+ return
msg = "%s; attribute(%s)" % (msg, attribute)
# log to the correct logger
@@ -1757,6 +1769,9 @@ class CobblerXMLRPCInterface(object):
:param token: The API-token obtained via the login() method.
:return: 0 on success, 1 on error.
"""
+ if not self.api.settings().allow_dynamic_settings:
+ self._log("modify_setting - feature turned off but was tried to be accessed", token=token)
+ return 1
self._log("modify_setting(%s)" % setting_name, token=token)
self.check_access(token, "modify_setting")
try:
@@ -2122,6 +2137,10 @@ class CobblerXMLRPCInterface(object):
:return: Some generated script.
"""
self._log("generate_script, name is %s" % str(name))
+ # This is duplicated from tftpgen.py to prevent log poisoning via a template engine (Cheetah, Jinja2).
+ if not validate_autoinstall_script_name(name):
+ raise ValueError("\"name\" handed to generate_script was not valid!")
+ self._log("generate_script, name is \"%s\"" % name)
return self.api.generate_script(profile, system, name)
def get_blended_data(self, profile=None, system=None):
@@ -2407,6 +2426,8 @@ class CobblerXMLRPCInterface(object):
:param rest: This is dropped in this method since it is not needed here.
:return: True if everything succeeded.
"""
+ if not self.__validate_log_data_params(sys_name, file, size, offset, data, token):
+ return False
self._log("upload_log_data (file: '%s', size: %s, offset: %s)" % (file, size, offset), token=token, name=sys_name)
# Check if enabled in self.api.settings()
@@ -2415,15 +2436,36 @@ class CobblerXMLRPCInterface(object):
return False
# Find matching system record
- systems = self.api.systems()
- obj = systems.find(name=sys_name)
+ obj = self.api.find_system(name=sys_name)
if obj is None:
# system not found!
- self._log("upload_log_data - WARNING - system '%s' not found in Cobbler" % sys_name, token=token, name=sys_name)
+ self._log("upload_log_data - WARNING - system '%s' not found in Cobbler" % sys_name, token=token,
+ name=sys_name)
+ return False
- return self.__upload_file(sys_name, file, size, offset, data)
+ return self.__upload_file(obj.name, file, size, offset, data)
- def __upload_file(self, sys_name, file, size, offset, data):
+ def __validate_log_data_params(self, sys_name: str, logfile_name: str, size: int, offset: int, data: bytes,
+ token=None) -> bool:
+ # Validate all types
+ if not (isinstance(sys_name, str) and isinstance(logfile_name, str) and isinstance(size, int)
+ and isinstance(offset, int) and isinstance(data, bytes)):
+ self.logger.warning("upload_log_data - One of the parameters handed over had an invalid type!")
+ return False
+ if token is not None and not isinstance(token, str):
+ self.logger.warning("upload_log_data - token was given but had an invalid type.")
+ return False
+ # Validate sys_name with item regex
+ if not validate_obj_name(sys_name):
+ self.logger.warning("upload_log_data - The provided sys_name contained invalid characters!")
+ return False
+ # Validate logfile_name - this uses the script name validation, possibly we need our own for this one later
+ if not validate_autoinstall_script_name(logfile_name):
+ self.logger.warning("upload_log_data - The provided file contained invalid characters!")
+ return False
+ return True
+
+ def __upload_file(self, sys_name: str, logfile_name: str, size: int, offset: int, data: bytes) -> bool:
"""
Files can be uploaded in chunks, if so the size describes the chunk rather than the whole file. The offset
indicates where the chunk belongs the special offset -1 is used to indicate the final chunk.
@@ -2435,28 +2477,30 @@ class CobblerXMLRPCInterface(object):
:param data: base64 encoded file contents
:return: True if the action succeeded.
"""
- contents = base64.decodestring(data)
+ contents = base64.decodebytes(data)
del data
if offset != -1:
if size is not None:
if size != len(contents):
return False
- # XXX - have an incoming dir and move after upload complete
- # SECURITY - ensure path remains under uploadpath
- tt = str.maketrans("/", "+")
- fn = str.translate(file, tt)
- if fn.startswith('..'):
- raise CX("invalid filename used: %s" % fn)
-
- # FIXME ... get the base dir from cobbler settings()
- udir = "/var/log/cobbler/anamon/%s" % sys_name
- if not os.path.isdir(udir):
- os.mkdir(udir, 0o755)
+ # FIXME: Get the base directory from Cobbler app-settings
+ anamon_base_directory = "/var/log/cobbler/anamon"
+ anamon_sys_directory = os.path.join(anamon_base_directory, sys_name)
+
+ file_name = os.path.join(anamon_sys_directory, logfile_name)
+ normalized_path = os.path.normpath(file_name)
+
+ if not normalized_path.startswith(anamon_sys_directory):
+ self.logger.warning("upload_log_data: built path for the logfile was outside of the Cobbler-Anamon log "
+ "directory!")
+ return False
+
+ if not os.path.isdir(anamon_sys_directory):
+ os.mkdir(anamon_sys_directory, 0o755)
- fn = "%s/%s" % (udir, fn)
try:
- st = os.lstat(fn)
+ st = os.lstat(file_name)
except OSError as e:
if e.errno == errno.ENOENT:
pass
@@ -2464,9 +2508,9 @@ class CobblerXMLRPCInterface(object):
raise
else:
if not stat.S_ISREG(st.st_mode):
- raise CX("destination not a file: %s" % fn)
+ raise CX("destination not a file: %s" % file_name)
- fd = os.open(fn, os.O_RDWR | os.O_CREAT, 0o644)
+ fd = os.open(file_name, os.O_RDWR | os.O_CREAT | os.O_CLOEXEC, 0o644)
# log_error("fd=%r" %fd)
try:
if offset == 0 or (offset == -1 and size == len(contents)):
@@ -3076,6 +3120,10 @@ class CobblerXMLRPCInterface(object):
self.token_cache[b64] = (time.time(), user)
return b64
+ @staticmethod
+ def __is_token(token: str) -> bool:
+ return isinstance(token, str) and len(token) == 36
+
def __invalidate_expired_tokens(self):
"""
Deletes any login tokens that might have expired. Also removes expired events.
Index: cobbler-3.1.2/cobbler/tftpgen.py
===================================================================
--- cobbler-3.1.2.orig/cobbler/tftpgen.py
+++ cobbler-3.1.2/cobbler/tftpgen.py
@@ -29,6 +29,7 @@ import socket
from cobbler import templar
from cobbler import utils
from cobbler.cexceptions import CX
+from cobbler.validate import validate_autoinstall_script_name
class TFTPGen(object):
@@ -1076,8 +1077,13 @@ class TFTPGen(object):
"""
if what == "profile":
obj = self.api.find_profile(name=objname)
- else:
+ elif what == "system":
obj = self.api.find_system(name=objname)
+ else:
+ raise ValueError("\"what\" needs to be either \"profile\" or \"system\"!")
+
+ if not validate_autoinstall_script_name(script_name):
+ raise ValueError("\"script_name\" handed to generate_script was not valid!")
if not obj:
return "# %s named %s not found" % (what, objname)
@@ -1102,12 +1108,14 @@ class TFTPGen(object):
else:
blended['img_path'] = os.path.join("/images", distro.name)
- template = os.path.normpath(os.path.join("/var/lib/cobbler/autoinstall_scripts", script_name))
+ scripts_root = "/var/lib/cobbler/scripts"
+ template = os.path.normpath(os.path.join(scripts_root, script_name))
+ if not template.startswith(scripts_root):
+ return "# script template \"%s\" could not be found in the script root" % script_name
if not os.path.exists(template):
return "# script template %s not found" % script_name
- template_fh = open(template)
- template_data = template_fh.read()
- template_fh.close()
+ with open(template) as template_fh:
+ template_data = template_fh.read()
return self.templar.render(template_data, blended, None, obj)
Index: cobbler-3.1.2/cobbler/validate.py
===================================================================
--- cobbler-3.1.2.orig/cobbler/validate.py
+++ cobbler-3.1.2/cobbler/validate.py
@@ -20,10 +20,13 @@ Foundation, Inc., 51 Franklin Street, Fi
import netaddr
import re
import shlex
+from uuid import UUID
-from cobbler.cexceptions import CX
+import netaddr
+from cobbler.cexceptions import CX
+RE_SCRIPT_NAME = re.compile(r"[a-zA-Z0-9_.]+")
RE_OBJECT_NAME = re.compile(r'[a-zA-Z0-9_\-.:]*$')
RE_HOSTNAME = re.compile(r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9]))*$')
@@ -249,4 +252,42 @@ def name_servers_search(search, for_item
return search
-# EOF
+
+def validate_autoinstall_script_name(name: str) -> bool:
+ if not isinstance(name, str):
+ return False
+ if re.fullmatch(RE_SCRIPT_NAME, name):
+ return True
+ return False
+
+
+def validate_uuid(possible_uuid: str) -> bool:
+ if not isinstance(possible_uuid, str):
+ return False
+ # Taken from: https://stackoverflow.com/a/33245493/4730773
+ try:
+ uuid_obj = UUID(possible_uuid, version=4)
+ except ValueError:
+ return False
+ return str(uuid_obj) == possible_uuid
+
+
+def validate_obj_type(object_type: str) -> bool:
+ if not isinstance(object_type, str):
+ return False
+ return object_type in ["distro", "profile", "system", "repo", "image", "mgmtclass", "package", "file", "menu"]
+
+
+def validate_obj_name(future_object_name: str) -> bool:
+ if not isinstance(future_object_name, str):
+ return False
+ return bool(re.fullmatch(RE_OBJECT_NAME, future_object_name))
+
+
+def validate_obj_id(object_id: str) -> bool:
+ if not isinstance(object_id, str):
+ return False
+ if object_id.startswith("___NEW___"):
+ object_id = object_id[9:]
+ (otype, oname) = object_id.split("::", 1)
+ return validate_obj_type(otype) and validate_obj_name(oname)