File CVE-2023-6597-TempDir-cleaning-symlink.patch of Package python3-base
From d8f50e5e191027ad3358bafe6828a1263d3c0cda Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B8ren=20L=C3=B8vborg?= <sorenl@unity3d.com>
Date: Thu, 1 Dec 2022 14:30:26 +0100
Subject: [PATCH 01/12] tempfile tests: try not to leave NOUNLINK files behind
During development, it becomes tiresome to have to manually clean up
these files in case of unrelated TemporaryDirectory breakage.
---
Lib/posixpath.py | 36 ++
Lib/tempfile.py | 148 ++++++++--
Lib/test/support/__init__.py | 31 ++
Lib/test/test_tempfile.py | 99 ++++++
Misc/NEWS.d/next/Library/2022-12-01-16-57-44.gh-issue-91133.LKMVCV.rst | 2
5 files changed, 301 insertions(+), 15 deletions(-)
--- a/Lib/posixpath.py
+++ b/Lib/posixpath.py
@@ -455,3 +455,39 @@ def relpath(path, start=None):
if not rel_list:
return curdir
return join(*rel_list)
+
+def commonpath(paths):
+ """Given a sequence of path names, returns the longest common sub-path."""
+
+ if not paths:
+ raise ValueError('commonpath() arg is an empty sequence')
+
+ if isinstance(paths[0], bytes):
+ sep = b'/'
+ curdir = b'.'
+ else:
+ sep = '/'
+ curdir = '.'
+
+ try:
+ split_paths = [path.split(sep) for path in paths]
+
+ try:
+ isabs, = set(p[:1] == sep for p in paths)
+ except ValueError:
+ raise ValueError("Can't mix absolute and relative paths") from None
+
+ split_paths = [[c for c in s if c and c != curdir] for s in split_paths]
+ s1 = min(split_paths)
+ s2 = max(split_paths)
+ common = s1
+ for i, c in enumerate(s1):
+ if c != s2[i]:
+ common = s1[:i]
+ break
+
+ prefix = sep if isabs else sep[:0]
+ return prefix + sep.join(common)
+ except (TypeError, AttributeError):
+ genericpath._check_arg_types('commonpath', *paths)
+ raise
--- a/Lib/tempfile.py
+++ b/Lib/tempfile.py
@@ -21,7 +21,7 @@ __all__ = [
"mkstemp", "mkdtemp", # low level safe interfaces
"mktemp", # deprecated unsafe interface
"TMP_MAX", "gettempprefix", # constants
- "tempdir", "gettempdir"
+ "tempdir", "gettempdir", "gettempdirb"
]
@@ -82,6 +82,45 @@ def _exists(fn):
else:
return True
+
+def _infer_return_type(*args):
+ """Look at the type of all args and divine their implied return type."""
+ return_type = None
+ for arg in args:
+ if arg is None:
+ continue
+ if isinstance(arg, bytes):
+ if return_type is str:
+ raise TypeError("Can't mix bytes and non-bytes in "
+ "path components.")
+ return_type = bytes
+ else:
+ if return_type is bytes:
+ raise TypeError("Can't mix bytes and non-bytes in "
+ "path components.")
+ return_type = str
+ if return_type is None:
+ return str # tempfile APIs return a str by default.
+ return return_type
+
+
+def _sanitize_params(prefix, suffix, dir):
+ """Common parameter processing for most APIs in this module."""
+ output_type = _infer_return_type(prefix, suffix, dir)
+ if suffix is None:
+ suffix = output_type()
+ if prefix is None:
+ if output_type is str:
+ prefix = template
+ else:
+ prefix = _os.fsencode(template)
+ if dir is None:
+ if output_type is str:
+ dir = gettempdir()
+ else:
+ dir = gettempdirb()
+ return prefix, suffix, dir, output_type
+
class _RandomNameSequence:
"""An instance of _RandomNameSequence generates an endless
sequence of unpredictable strings which can safely be incorporated
@@ -220,6 +259,22 @@ def _mkstemp_inner(dir, pre, suf, flags)
raise FileExistsError(_errno.EEXIST,
"No usable temporary file name found")
+def _dont_follow_symlinks(func, path, *args):
+ # Pass follow_symlinks=False, unless not supported on this platform.
+ if func in _os.supports_follow_symlinks:
+ func(path, *args, follow_symlinks=False)
+ elif _os.name == 'nt' or not _os.path.islink(path):
+ func(path, *args)
+
+def _resetperms(path):
+ try:
+ chflags = _os.chflags
+ except AttributeError:
+ pass
+ else:
+ _dont_follow_symlinks(chflags, path, 0)
+ _dont_follow_symlinks(_os.chmod, path, 0o700)
+
# User visible interfaces.
@@ -241,6 +296,12 @@ def gettempdir():
_once_lock.release()
return tempdir
+
+def gettempdirb():
+ """A bytes version of tempfile.gettempdir()."""
+ return _os.fsencode(gettempdir())
+
+
def mkstemp(suffix="", prefix=template, dir=None, text=False):
"""User-callable function to create and return a unique temporary
file. The return value is a pair (fd, name) where fd is the
@@ -277,7 +338,6 @@ def mkstemp(suffix="", prefix=template,
return _mkstemp_inner(dir, prefix, suffix, flags)
-
def mkdtemp(suffix="", prefix=template, dir=None):
"""User-callable function to create and return a unique temporary
directory. The return value is the pathname of the directory.
@@ -291,17 +351,17 @@ def mkdtemp(suffix="", prefix=template,
Caller is responsible for deleting the directory when done with it.
"""
- if dir is None:
- dir = gettempdir()
+ prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
names = _get_candidate_names()
+ if output_type is bytes:
+ names = map(_os.fsencode, names)
for seq in range(TMP_MAX):
name = next(names)
file = _os.path.join(dir, prefix + name + suffix)
try:
_os.mkdir(file, 0o700)
- return file
except FileExistsError:
continue # try again
except PermissionError:
@@ -312,10 +372,12 @@ def mkdtemp(suffix="", prefix=template,
continue
else:
raise
+ return file
raise FileExistsError(_errno.EEXIST,
"No usable temporary directory name found")
+
def mktemp(suffix="", prefix=template, dir=None):
"""User-callable function to return a unique temporary file name. The
file is not created.
@@ -675,7 +737,7 @@ class SpooledTemporaryFile:
return rv
-class TemporaryDirectory(object):
+class TemporaryDirectory:
"""Create and return a temporary directory. This has the same
behavior as mkdtemp but can be used as a context manager. For
example:
@@ -684,20 +746,75 @@ class TemporaryDirectory(object):
...
Upon exiting the context, the directory and everything contained
- in it are removed.
+ in it are removed (unless delete=False is passed or an exception
+ is raised during cleanup and ignore_cleanup_errors is not True).
+
+ Optional Arguments:
+ suffix - A str suffix for the directory name. (see mkdtemp)
+ prefix - A str prefix for the directory name. (see mkdtemp)
+ dir - A directory to create this temp dir in. (see mkdtemp)
+ ignore_cleanup_errors - False; ignore exceptions during cleanup?
+ delete - True; whether the directory is automatically deleted.
"""
- def __init__(self, suffix="", prefix=template, dir=None):
+ def __init__(self, suffix=None, prefix=None, dir=None,
+ ignore_cleanup_errors=False, *, delete=True):
self.name = mkdtemp(suffix, prefix, dir)
+ self._ignore_cleanup_errors = ignore_cleanup_errors
+ self._delete = delete
self._finalizer = _weakref.finalize(
self, self._cleanup, self.name,
- warn_message="Implicitly cleaning up {!r}".format(self))
+ warn_message="Implicitly cleaning up {!r}".format(self),
+ ignore_errors=self._ignore_cleanup_errors, delete=self._delete)
@classmethod
- def _cleanup(cls, name, warn_message):
- _shutil.rmtree(name)
- _warnings.warn(warn_message, ResourceWarning)
+ def _rmtree(cls, name, ignore_errors=False, repeated=False):
+ def onexc(func, path, exc_info):
+ exc = exc_info[1]
+ if isinstance(exc, PermissionError):
+ if repeated and path == name:
+ if ignore_errors:
+ return
+ raise
+
+ try:
+ if path != name:
+ _resetperms(_os.path.dirname(path))
+ _resetperms(path)
+ try:
+ _os.unlink(path)
+ except IsADirectoryError:
+ cls._rmtree(path)
+ except PermissionError:
+ # The PermissionError handler was originally added for
+ # FreeBSD in directories, but it seems that it is raised
+ # on Windows too.
+ # bpo-43153: Calling _rmtree again may
+ # raise NotADirectoryError and mask the PermissionError.
+ # So we must re-raise the current PermissionError if
+ # path is not a directory.
+ if not _os.path.isdir(path) or _os.path.isjunction(path):
+ if ignore_errors:
+ return
+ raise
+ cls._rmtree(path, ignore_errors=ignore_errors,
+ repeated=(path == name))
+ except FileNotFoundError:
+ pass
+ elif isinstance(exc, FileNotFoundError):
+ pass
+ else:
+ if not ignore_errors:
+ raise
+
+ _shutil.rmtree(name, onerror=onexc)
+
+ @classmethod
+ def _cleanup(cls, name, warn_message, ignore_errors=False, delete=True):
+ if delete:
+ cls._rmtree(name, ignore_errors=ignore_errors)
+ _warnings.warn(warn_message, ResourceWarning)
def __repr__(self):
return "<{} {!r}>".format(self.__class__.__name__, self.name)
@@ -706,8 +823,9 @@ class TemporaryDirectory(object):
return self.name
def __exit__(self, exc, value, tb):
- self.cleanup()
+ if self._delete:
+ self.cleanup()
def cleanup(self):
- if self._finalizer.detach():
- _shutil.rmtree(self.name)
+ if self._finalizer.detach() or _os.path.exists(self.name):
+ self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -119,6 +119,37 @@ class ResourceDenied(unittest.SkipTest):
and unexpected skips.
"""
+_can_symlink = None
+
+TESTFN_ASCII = "{}_{}_tmp".format('@test', os.getpid())
+
+def can_symlink():
+ global _can_symlink
+ if _can_symlink is not None:
+ return _can_symlink
+ # WASI / wasmtime prevents symlinks with absolute paths, see man
+ # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
+ # paths. Skip symlink tests on WASI for now.
+ src = os.path.abspath(TESTFN)
+ symlink_path = src + "can_symlink"
+ try:
+ os.symlink(src, symlink_path)
+ can = True
+ except (OSError, NotImplementedError, AttributeError):
+ can = False
+ else:
+ os.remove(symlink_path)
+ _can_symlink = can
+ return can
+
+
+def skip_unless_symlink(test):
+ """Skip decorator for tests that require functional symlink"""
+ ok = can_symlink()
+ msg = "Requires functional symlink implementation"
+ return test if ok else unittest.skip(msg)(test)
+
+
@contextlib.contextmanager
def _ignore_deprecated_imports(ignore=True):
"""Context manager to suppress package and module deprecation
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -83,6 +83,7 @@ class TestExports(BaseTestCase):
"TMP_MAX" : 1,
"gettempprefix" : 1,
"gettempdir" : 1,
+ "gettempdirb" : 1,
"tempdir" : 1,
"template" : 1,
"SpooledTemporaryFile" : 1,
@@ -1154,6 +1155,7 @@ class NulledModules:
d.clear()
d.update(c)
+
class TestTemporaryDirectory(BaseTestCase):
"""Test TemporaryDirectory()."""
@@ -1214,6 +1216,103 @@ class TestTemporaryDirectory(BaseTestCas
"were deleted")
d2.cleanup()
+ @support.skip_unless_symlink
+ def test_cleanup_with_symlink_modes(self):
+ # cleanup() should not follow symlinks when fixing mode bits (#91133)
+ with self.do_create(recurse=0) as d2:
+ file1 = os.path.join(d2, 'file1')
+ open(file1, 'wb').close()
+ dir1 = os.path.join(d2, 'dir1')
+ os.mkdir(dir1)
+ for mode in range(8):
+ mode <<= 6
+ with self.subTest(mode=format(mode, '03o')):
+ def test(target, target_is_directory):
+ d1 = self.do_create(recurse=0)
+ symlink = os.path.join(d1.name, 'symlink')
+ os.symlink(target, symlink,
+ target_is_directory=target_is_directory)
+ try:
+ os.chmod(symlink, mode, follow_symlinks=False)
+ except NotImplementedError:
+ pass
+ try:
+ os.chmod(symlink, mode)
+ except FileNotFoundError:
+ pass
+ os.chmod(d1.name, mode)
+ d1.cleanup()
+ self.assertFalse(os.path.exists(d1.name))
+
+ with self.subTest('nonexisting file'):
+ test('nonexisting', target_is_directory=False)
+ with self.subTest('nonexisting dir'):
+ test('nonexisting', target_is_directory=True)
+
+ with self.subTest('existing file'):
+ os.chmod(file1, mode)
+ old_mode = os.stat(file1).st_mode
+ test(file1, target_is_directory=False)
+ new_mode = os.stat(file1).st_mode
+ self.assertEqual(new_mode, old_mode,
+ '%03o != %03o' % (new_mode, old_mode))
+
+ with self.subTest('existing dir'):
+ os.chmod(dir1, mode)
+ old_mode = os.stat(dir1).st_mode
+ test(dir1, target_is_directory=True)
+ new_mode = os.stat(dir1).st_mode
+ self.assertEqual(new_mode, old_mode,
+ '%03o != %03o' % (new_mode, old_mode))
+
+ @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags')
+ @support.skip_unless_symlink
+ def test_cleanup_with_symlink_flags(self):
+ # cleanup() should not follow symlinks when fixing flags (#91133)
+ flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
+ self.check_flags(flags)
+
+ with self.do_create(recurse=0) as d2:
+ file1 = os.path.join(d2, 'file1')
+ open(file1, 'wb').close()
+ dir1 = os.path.join(d2, 'dir1')
+ os.mkdir(dir1)
+ def test(target, target_is_directory):
+ d1 = self.do_create(recurse=0)
+ symlink = os.path.join(d1.name, 'symlink')
+ os.symlink(target, symlink,
+ target_is_directory=target_is_directory)
+ try:
+ os.chflags(symlink, flags, follow_symlinks=False)
+ except NotImplementedError:
+ pass
+ try:
+ os.chflags(symlink, flags)
+ except FileNotFoundError:
+ pass
+ os.chflags(d1.name, flags)
+ d1.cleanup()
+ self.assertFalse(os.path.exists(d1.name))
+
+ with self.subTest('nonexisting file'):
+ test('nonexisting', target_is_directory=False)
+ with self.subTest('nonexisting dir'):
+ test('nonexisting', target_is_directory=True)
+
+ with self.subTest('existing file'):
+ os.chflags(file1, flags)
+ old_flags = os.stat(file1).st_flags
+ test(file1, target_is_directory=False)
+ new_flags = os.stat(file1).st_flags
+ self.assertEqual(new_flags, old_flags)
+
+ with self.subTest('existing dir'):
+ os.chflags(dir1, flags)
+ old_flags = os.stat(dir1).st_flags
+ test(dir1, target_is_directory=True)
+ new_flags = os.stat(dir1).st_flags
+ self.assertEqual(new_flags, old_flags)
+
@support.cpython_only
def test_del_on_collection(self):
# A TemporaryDirectory is deleted when garbage collected
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-12-01-16-57-44.gh-issue-91133.LKMVCV.rst
@@ -0,0 +1,2 @@
+Fix a bug in :class:`tempfile.TemporaryDirectory` cleanup, which now no longer
+dereferences symlinks when working around file system permission errors.