File CVE-2025-58367.patch of Package python-deepdiff.40583

From c69c06c13f75e849c770ade3f556cd16209fd183 Mon Sep 17 00:00:00 2001
From: Sep Dehpour <sep@zepworks.com>
Date: Wed, 3 Sep 2025 09:42:30 -0700
Subject: [PATCH] Security fix: Prevent class pollution and remote code
 execution in Delta

- Add validation to prevent traversing dunder attributes via check_elem()
- Harden Delta class against malicious pickle payloads
- Make SAFE_TO_IMPORT a frozenset for immutability
- Add comprehensive security tests in test_security.py
- Prevent access to __globals__ and other dangerous attributes
---
 deepdiff/delta.py         |   8 ++-
 deepdiff/path.py          |   7 ++
 deepdiff/serialization.py |   4 +-
 tests/test_security.py    | 133 ++++++++++++++++++++++++++++++++++++++
 4 files changed, 149 insertions(+), 3 deletions(-)
 create mode 100644 tests/test_security.py

Index: deepdiff-6.3.0/deepdiff/delta.py
===================================================================
--- deepdiff-6.3.0.orig/deepdiff/delta.py
+++ deepdiff-6.3.0/deepdiff/delta.py
@@ -9,7 +9,7 @@ from deepdiff.helper import (
     np_ndarray, np_array_factory, numpy_dtypes, get_doc,
     not_found, numpy_dtype_string_to_type, dict_,
 )
-from deepdiff.path import _path_to_elements, _get_nested_obj, GET, GETATTR
+from deepdiff.path import _path_to_elements, _get_nested_obj, GET, GETATTR, check_elem
 from deepdiff.anyset import AnySet
 
 
@@ -156,6 +156,11 @@ class Delta:
 
     def _get_elem_and_compare_to_old_value(self, obj, path_for_err_reporting, expected_old_value, elem=None, action=None):
         try:
+            check_elem(elem)
+        except ValueError as error:
+            self._raise_or_log(UNABLE_TO_GET_ITEM_MSG.format(path_for_err_reporting, error))
+            return not_found
+        try:
             if action == GET:
                 current_old_value = obj[elem]
             elif action == GETATTR:
@@ -360,6 +365,7 @@ class Delta:
                 parent = parent_to_obj_elem = parent_to_obj_action = None
                 obj = _get_nested_obj(obj=self, elements=elements[:-1])
             elem, action = elements[-1]
+            check_elem(elem)
         except Exception as e:
             self._raise_or_log(UNABLE_TO_GET_ITEM_MSG.format(path, e))
             return None
Index: deepdiff-6.3.0/deepdiff/path.py
===================================================================
--- deepdiff-6.3.0.orig/deepdiff/path.py
+++ deepdiff-6.3.0/deepdiff/path.py
@@ -102,8 +102,14 @@ def _path_to_elements(path, root_element
     return tuple(elements)
 
 
+def check_elem(elem):
+    if isinstance(elem, str) and elem.startswith("__") and elem.endswith("__"):
+        raise ValueError("traversing dunder attributes is not allowed")
+
+
 def _get_nested_obj(obj, elements):
     for (elem, action) in elements:
+        check_elem(elem)
         if action == GET:
             obj = obj[elem]
         elif action == GETATTR:
Index: deepdiff-6.3.0/deepdiff/serialization.py
===================================================================
--- deepdiff-6.3.0.orig/deepdiff/serialization.py
+++ deepdiff-6.3.0/deepdiff/serialization.py
@@ -62,7 +62,7 @@ FORBIDDEN_MODULE_MSG = "Module '{}' is f
 DELTA_IGNORE_ORDER_NEEDS_REPETITION_REPORT = 'report_repetition must be set to True when ignore_order is True to create the delta object.'
 DELTA_ERROR_WHEN_GROUP_BY = 'Delta can not be made when group_by is used since the structure of data is modified from the original form.'
 
-SAFE_TO_IMPORT = {
+SAFE_TO_IMPORT = frozenset({
     'builtins.range',
     'builtins.complex',
     'builtins.set',
@@ -87,7 +87,8 @@ SAFE_TO_IMPORT = {
     'collections.namedtuple',
     'collections.OrderedDict',
     're.Pattern',
-}
+    'deepdiff.helper.Opcode',
+})
 
 
 TYPE_STR_TO_TYPE = {
Index: deepdiff-6.3.0/tests/test_security.py
===================================================================
--- /dev/null
+++ deepdiff-6.3.0/tests/test_security.py
@@ -0,0 +1,133 @@
+import os
+import pickle
+import pytest
+from deepdiff import Delta
+from deepdiff.helper import Opcode
+from deepdiff.serialization import ForbiddenModule
+
+
+class TestDeltaClassPollution:
+
+    def test_builtins_int(self):
+
+        pollute_int = pickle.dumps(
+            {
+                "values_changed": {"root['tmp']": {"new_value": Opcode("", 0, 0, 0, 0)}},
+                "dictionary_item_added": {
+                    (
+                        ("root", "GETATTR"),
+                        ("tmp", "GET"),
+                        ("__repr__", "GETATTR"),
+                        ("__globals__", "GETATTR"),
+                        ("__builtins__", "GET"),
+                        ("int", "GET"),
+                    ): "no longer a class"
+                },
+            }
+        )
+
+        assert isinstance(pollute_int, bytes)
+
+        # ------------[ Exploit ]------------
+        # This could be some example, vulnerable, application.
+        # The inputs above could be sent via HTTP, for example.
+
+
+        # Existing dictionary; it is assumed that it contains
+        # at least one entry, otherwise a different Delta needs to be
+        # applied first, adding an entry to the dictionary.
+        mydict = {"tmp": "foobar"}
+
+        # Before pollution
+        assert 42 == int("41") + 1
+
+        # Apply Delta to mydict
+        result = mydict + Delta(pollute_int)
+
+        assert 1337 == int("1337")
+
+    def test_remote_code_execution(self):
+        if os.path.exists('/tmp/pwned'):
+            os.remove('/tmp/pwned')
+
+        pollute_safe_to_import = pickle.dumps(
+            {
+                "values_changed": {"root['tmp']": {"new_value": Opcode("", 0, 0, 0, 0)}},
+                "set_item_added": {
+                    (
+                        ("root", "GETATTR"),
+                        ("tmp", "GET"),
+                        ("__repr__", "GETATTR"),
+                        ("__globals__", "GETATTR"),
+                        ("sys", "GET"),
+                        ("modules", "GETATTR"),
+                        ("deepdiff.serialization", "GET"),
+                        ("SAFE_TO_IMPORT", "GETATTR"),
+                    ): set(["posix.system"])
+                },
+            }
+        )
+
+        # From https://davidhamann.de/2020/04/05/exploiting-python-pickle/
+        class RCE:
+            def __reduce__(self):
+                cmd = "id > /tmp/pwned"
+                return os.system, (cmd,)
+
+        # Wrap object with dictionary so that Delta does not crash
+        rce_pickle = pickle.dumps({"_": RCE()})
+
+        assert isinstance(pollute_safe_to_import, bytes)
+        assert isinstance(rce_pickle, bytes)
+
+        # ------------[ Exploit ]------------
+        # This could be some example, vulnerable, application.
+        # The inputs above could be sent via HTTP, for example.
+
+        # Existing dictionary; it is assumed that it contains
+        # at least one entry, otherwise a different Delta needs to be
+        # applied first, adding an entry to the dictionary.
+        mydict = {"tmp": "foobar"}
+
+        # Apply Delta to mydict
+        with pytest.raises(ValueError) as exc_info:
+            mydict + Delta(pollute_safe_to_import)
+        assert "traversing dunder attributes is not allowed" == str(exc_info.value)
+
+        with pytest.raises(ForbiddenModule) as exc_info:
+            Delta(rce_pickle)  # no need to apply this Delta
+        assert "Module 'posix.system' is forbidden. You need to explicitly pass it by passing a safe_to_import parameter" == str(exc_info.value)
+
+        assert not os.path.exists('/tmp/pwned'), "We should not have created this file"
+
+    def test_delta_should_not_access_globals(self):
+
+        pollute_global = pickle.dumps(
+            {
+                "dictionary_item_added": {
+                    (
+                        ("root", "GETATTR"),
+                        ("myfunc", "GETATTR"),
+                        ("__globals__", "GETATTR"),
+                        ("PWNED", "GET"),
+                    ): 1337
+                }
+            }
+        )
+
+
+        # demo application
+        class Foo:
+            def __init__(self):
+                pass
+
+            def myfunc(self):
+                pass
+
+
+        PWNED = False
+        delta = Delta(pollute_global)
+        assert PWNED is False
+        b = Foo() + delta
+
+        assert PWNED is False
Index: deepdiff-6.3.0/deepdiff/helper.py
===================================================================
--- deepdiff-6.3.0.orig/deepdiff/helper.py
+++ deepdiff-6.3.0/deepdiff/helper.py
@@ -9,6 +9,7 @@ import string
 import time
 from ast import literal_eval
 from decimal import Decimal, localcontext
+from typing import NamedTuple, Optional, List, Any
 from collections import namedtuple
 from itertools import repeat
 from ordered_set import OrderedSet
@@ -232,6 +233,31 @@ class indexed_set(set):
     """
 
 
+def named_tuple_repr(self: NamedTuple) -> str:
+    fields = []
+    for field, value in self._asdict().items():
+        # Only include fields that do not have their default value
+        if field in self._field_defaults:
+            if value != self._field_defaults[field]:
+                fields.append(f"{field}={value!r}")
+        else:
+            fields.append(f"{field}={value!r}")
+
+    return f"{self.__class__.__name__}({', '.join(fields)})"
+
+
+class Opcode(NamedTuple):
+    tag: str
+    t1_from_index: int
+    t1_to_index: int
+    t2_from_index: int
+    t2_to_index: int
+    old_values: Optional[List[Any]] = None
+    new_values: Optional[List[Any]] = None
+
+    __repr__ = __str__ = named_tuple_repr
+
+
 def add_to_frozen_set(parents_ids, item_id):
     return parents_ids | {item_id}
 
openSUSE Build Service is sponsored by