File max-length-decompression.patch of Package python-brotlipy.42504

From 75ac96915b8fc18d90177d416a14f1ea4c224630 Mon Sep 17 00:00:00 2001
From: Illia Volochii <illia.volochii@gmail.com>
Date: Fri, 21 Nov 2025 01:54:02 +0200
Subject: [PATCH] Upgrade libbrotli to v1.2.0

---
 HISTORY.rst                       |  10 +++
 libbrotli                         |   2 +-
 setup.py                          |   2 +
 src/brotlicffi/__init__.py        |   2 +-
 src/brotlicffi/_api.py            | 108 ++++++++++++++++++++++++++++--
 test/test_simple_decompression.py |  51 ++++++++++++++
 tox.ini                           |   2 +-
 7 files changed, 168 insertions(+), 9 deletions(-)


Index: brotlipy-0.7.0/src/brotli/brotli.py
===================================================================
--- brotlipy-0.7.0.orig/src/brotli/brotli.py
+++ brotlipy-0.7.0/src/brotli/brotli.py
@@ -123,6 +123,10 @@ def compress(data,
         based on ``quality``.
     :type lgblock: ``int``
 
+    .. versionchanged:: 1.2.0
+       Added ``can_accept_more_data()`` method and optional
+       ``output_buffer_limit`` parameter to ``process()``/``decompress()``.
+
     :param dictionary: A pre-set dictionary for LZ77. Please use this with
         caution: if a dictionary is used for compression, the same dictionary
         **must** be used for decompression!
@@ -346,24 +350,75 @@ class Decompressor(object):
     def __init__(self):
         dec = lib.BrotliDecoderCreateInstance(ffi.NULL, ffi.NULL, ffi.NULL)
         self._decoder = ffi.gc(dec, lib.BrotliDecoderDestroyInstance)
+        self._unconsumed_data = b''
 
-    def decompress(self, data):
+    @staticmethod
+    def _calculate_buffer_size(
+        input_data_len, output_buffer_limit, chunks_len, chunks_num
+    ):
+        if output_buffer_limit is not None:
+            return output_buffer_limit - chunks_len
+        # When `decompress(b'')` is called without `output_buffer_limit`.
+        elif input_data_len == 0:
+            # libbrotli would use 32 KB as a starting buffer size and double it
+            # each time, capped at 16 MB.
+            # https://github.com/google/brotli/blob/028fb5a23661f123017c060daa546b55cf4bde29/python/_brotli.c#L291-L292
+            return 1 << min(chunks_num + 15, 24)
+        else:
+            # Allocate a buffer that's hopefully overlarge, but if it's not we
+            # don't mind: we'll spin around again.
+            return 5 * input_data_len
+
+    def decompress(self, data, output_buffer_limit=None):
         """
         Decompress part of a complete Brotli-compressed string.
 
+        .. versionchanged:: 1.2.0
+           Added ``output_buffer_limit`` parameter.
+
         :param data: A bytestring containing Brotli-compressed data.
+        :param output_buffer_limit: Optional maximum size for the output
+            buffer. If set, the output buffer will not grow once its size
+            equals or exceeds this value. If the limit is reached, further
+            calls to process (potentially with empty input) will continue to
+            yield more data. Following process() calls must only be called
+            with empty input until can_accept_more_data() returns True.
+        :type output_buffer_limit: ``int`` or ``None``
         :returns: A bytestring containing the decompressed data.
         """
+        if self._unconsumed_data and data:
+            raise error(
+                "brotli: decoder process called with data when "
+                "'can_accept_more_data()' is False"
+            )
+
+        # We should avoid operations on the `self._unconsumed_data` if no data
+        # is to be processed.
+        if output_buffer_limit is not None and output_buffer_limit <= 0:
+            return b''
+
+        # Use unconsumed data if available, use new data otherwise.
+        if self._unconsumed_data:
+            input_data = self._unconsumed_data
+            self._unconsumed_data = b''
+        else:
+            input_data = data
+
         chunks = []
+        chunks_len = 0
 
-        available_in = ffi.new("size_t *", len(data))
-        in_buffer = ffi.new("uint8_t[]", data)
+        available_in = ffi.new("size_t *", len(input_data))
+        in_buffer = ffi.new("uint8_t[]", input_data)
         next_in = ffi.new("uint8_t **", in_buffer)
 
         while True:
-            # Allocate a buffer that's hopefully overlarge, but if it's not we
-            # don't mind: we'll spin around again.
-            buffer_size = 5 * len(data)
+            buffer_size = self._calculate_buffer_size(
+                input_data_len=len(input_data),
+                output_buffer_limit=output_buffer_limit,
+                chunks_len=chunks_len,
+                chunks_num=len(chunks),
+            )
+
             available_out = ffi.new("size_t *", buffer_size)
             out_buffer = ffi.new("uint8_t[]", buffer_size)
             next_out = ffi.new("uint8_t **", out_buffer)
@@ -386,6 +441,19 @@ class Decompressor(object):
             # Next, copy the result out.
             chunk = ffi.buffer(out_buffer, buffer_size - available_out[0])[:]
             chunks.append(chunk)
+            chunks_len += len(chunk)
+
+            # Save any unconsumed input for the next call.
+            if available_in[0] > 0:
+                remaining_input = ffi.buffer(next_in[0], available_in[0])[:]
+                self._unconsumed_data = remaining_input
+
+            # Check if we've reached the output limit.
+            if (
+                output_buffer_limit is not None
+                and chunks_len >= output_buffer_limit
+            ):
+                break
 
             if rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
                 assert available_in[0] == 0
@@ -412,6 +480,13 @@ class Decompressor(object):
         """
         return b''
 
+    def is_finished(self):
+        """
+        Returns ``True`` if the decompression stream
+        is complete, ``False`` otherwise
+        """
+        return lib.BrotliDecoderIsFinished(self._decoder) == lib.BROTLI_TRUE
+
     def finish(self):
         """
         Finish the decompressor. As the decompressor decompresses eagerly, this
@@ -428,3 +503,30 @@ class Decompressor(object):
             raise Error("Decompression error: incomplete compressed stream.")
 
         return b''
+
+    def can_accept_more_data(self):
+        """
+        Checks if the decompressor can accept more compressed data.
+
+        If the ``output_buffer_limit`` parameter was used with
+        ``decompress()`` or ``process()``, this method should be checked to
+        determine if the decompressor is ready to accept new input. When the
+        output buffer limit is reached, the decompressor may still have
+        unconsumed input data or internal buffered output, and calling
+        ``decompress(b'')`` repeatedly will continue producing output until
+        this method returns ``True``.
+
+        .. versionadded:: 1.2.0
+
+        :returns: ``True`` if the decompressor is ready to accept more
+            compressed data via ``decompress()`` or ``process()``, ``False``
+            if the decompressor needs to output some data via
+            ``decompress(b'')``/``process(b'')`` before being provided any
+            more compressed data.
+        :rtype: ``bool``
+        """
+        if len(self._unconsumed_data) > 0:
+            return False
+        if lib.BrotliDecoderHasMoreOutput(self._decoder) == lib.BROTLI_TRUE:
+            return False
+        return True
Index: brotlipy-0.7.0/test/test_simple_decompression.py
===================================================================
--- brotlipy-0.7.0.orig/test/test_simple_decompression.py
+++ brotlipy-0.7.0/test/test_simple_decompression.py
@@ -38,6 +38,57 @@ def test_decompressobj(simple_compressed
     assert data == uncompressed_data
 
 
+# `more_data_limit` allows testing `decompress(b'')` with and without a limit.
+@pytest.mark.parametrize('more_data_limit', [100, None])
+def test_decompressobj_with_output_buffer_limit(
+    simple_compressed_file, more_data_limit
+):
+    """
+    Test decompression with `output_buffer_limit` set.
+    """
+    with open(simple_compressed_file[0], 'rb') as f:
+        uncompressed_data = f.read()
+
+    with open(simple_compressed_file[1], 'rb') as f:
+        compressed_data = f.read()
+
+    o = brotli.Decompressor()
+    assert o.can_accept_more_data()
+    small_limit = 100
+    result = o.decompress(compressed_data, output_buffer_limit=small_limit)
+    assert len(result) <= small_limit
+
+    # Ensure `output_buffer_limit` of zero works.
+    assert o.decompress(b'', output_buffer_limit=0) == b''
+
+    if o._unconsumed_data:
+        with pytest.raises(
+            brotli.error,
+            match=(
+                r"brotli: decoder process called with data when "
+                r"'can_accept_more_data\(\)' is False"
+            ),
+        ):
+            o.decompress(b'additional data')
+
+    if not o.is_finished():
+        assert not o.can_accept_more_data()
+
+        # Continue decompressing with empty input.
+        all_output = [result]
+        while not o.can_accept_more_data() and not o.is_finished():
+            more_output = o.decompress(
+                b'', output_buffer_limit=more_data_limit
+            )
+            if more_data_limit is not None:
+                assert len(more_output) <= more_data_limit
+            all_output.append(more_output)
+        assert o.can_accept_more_data() or o.is_finished()
+
+        final_result = b''.join(all_output)
+        assert final_result == uncompressed_data
+
+
 def test_drip_feed(simple_compressed_file):
     """
     Sending in the data one byte at a time still works.
openSUSE Build Service is sponsored by