File max-length-decompression.patch of Package python-brotlipy.42504
From 75ac96915b8fc18d90177d416a14f1ea4c224630 Mon Sep 17 00:00:00 2001
From: Illia Volochii <illia.volochii@gmail.com>
Date: Fri, 21 Nov 2025 01:54:02 +0200
Subject: [PATCH] Upgrade libbrotli to v1.2.0
---
HISTORY.rst | 10 +++
libbrotli | 2 +-
setup.py | 2 +
src/brotlicffi/__init__.py | 2 +-
src/brotlicffi/_api.py | 108 ++++++++++++++++++++++++++++--
test/test_simple_decompression.py | 51 ++++++++++++++
tox.ini | 2 +-
7 files changed, 168 insertions(+), 9 deletions(-)
Index: brotlipy-0.7.0/src/brotli/brotli.py
===================================================================
--- brotlipy-0.7.0.orig/src/brotli/brotli.py
+++ brotlipy-0.7.0/src/brotli/brotli.py
@@ -123,6 +123,10 @@ def compress(data,
based on ``quality``.
:type lgblock: ``int``
+ .. versionchanged:: 1.2.0
+ Added ``can_accept_more_data()`` method and optional
+ ``output_buffer_limit`` parameter to ``process()``/``decompress()``.
+
:param dictionary: A pre-set dictionary for LZ77. Please use this with
caution: if a dictionary is used for compression, the same dictionary
**must** be used for decompression!
@@ -346,24 +350,75 @@ class Decompressor(object):
def __init__(self):
dec = lib.BrotliDecoderCreateInstance(ffi.NULL, ffi.NULL, ffi.NULL)
self._decoder = ffi.gc(dec, lib.BrotliDecoderDestroyInstance)
+ self._unconsumed_data = b''
- def decompress(self, data):
+ @staticmethod
+ def _calculate_buffer_size(
+ input_data_len, output_buffer_limit, chunks_len, chunks_num
+ ):
+ if output_buffer_limit is not None:
+ return output_buffer_limit - chunks_len
+ # When `decompress(b'')` is called without `output_buffer_limit`.
+ elif input_data_len == 0:
+ # libbrotli would use 32 KB as a starting buffer size and double it
+ # each time, capped at 16 MB.
+ # https://github.com/google/brotli/blob/028fb5a23661f123017c060daa546b55cf4bde29/python/_brotli.c#L291-L292
+ return 1 << min(chunks_num + 15, 24)
+ else:
+ # Allocate a buffer that's hopefully overlarge, but if it's not we
+ # don't mind: we'll spin around again.
+ return 5 * input_data_len
+
+ def decompress(self, data, output_buffer_limit=None):
"""
Decompress part of a complete Brotli-compressed string.
+ .. versionchanged:: 1.2.0
+ Added ``output_buffer_limit`` parameter.
+
:param data: A bytestring containing Brotli-compressed data.
+ :param output_buffer_limit: Optional maximum size for the output
+ buffer. If set, the output buffer will not grow once its size
+ equals or exceeds this value. If the limit is reached, further
+ calls to process (potentially with empty input) will continue to
+ yield more data. Following process() calls must only be called
+ with empty input until can_accept_more_data() returns True.
+ :type output_buffer_limit: ``int`` or ``None``
:returns: A bytestring containing the decompressed data.
"""
+ if self._unconsumed_data and data:
+ raise error(
+ "brotli: decoder process called with data when "
+ "'can_accept_more_data()' is False"
+ )
+
+ # We should avoid operations on the `self._unconsumed_data` if no data
+ # is to be processed.
+ if output_buffer_limit is not None and output_buffer_limit <= 0:
+ return b''
+
+ # Use unconsumed data if available, use new data otherwise.
+ if self._unconsumed_data:
+ input_data = self._unconsumed_data
+ self._unconsumed_data = b''
+ else:
+ input_data = data
+
chunks = []
+ chunks_len = 0
- available_in = ffi.new("size_t *", len(data))
- in_buffer = ffi.new("uint8_t[]", data)
+ available_in = ffi.new("size_t *", len(input_data))
+ in_buffer = ffi.new("uint8_t[]", input_data)
next_in = ffi.new("uint8_t **", in_buffer)
while True:
- # Allocate a buffer that's hopefully overlarge, but if it's not we
- # don't mind: we'll spin around again.
- buffer_size = 5 * len(data)
+ buffer_size = self._calculate_buffer_size(
+ input_data_len=len(input_data),
+ output_buffer_limit=output_buffer_limit,
+ chunks_len=chunks_len,
+ chunks_num=len(chunks),
+ )
+
available_out = ffi.new("size_t *", buffer_size)
out_buffer = ffi.new("uint8_t[]", buffer_size)
next_out = ffi.new("uint8_t **", out_buffer)
@@ -386,6 +441,19 @@ class Decompressor(object):
# Next, copy the result out.
chunk = ffi.buffer(out_buffer, buffer_size - available_out[0])[:]
chunks.append(chunk)
+ chunks_len += len(chunk)
+
+ # Save any unconsumed input for the next call.
+ if available_in[0] > 0:
+ remaining_input = ffi.buffer(next_in[0], available_in[0])[:]
+ self._unconsumed_data = remaining_input
+
+ # Check if we've reached the output limit.
+ if (
+ output_buffer_limit is not None
+ and chunks_len >= output_buffer_limit
+ ):
+ break
if rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
assert available_in[0] == 0
@@ -412,6 +480,13 @@ class Decompressor(object):
"""
return b''
+ def is_finished(self):
+ """
+ Returns ``True`` if the decompression stream
+ is complete, ``False`` otherwise
+ """
+ return lib.BrotliDecoderIsFinished(self._decoder) == lib.BROTLI_TRUE
+
def finish(self):
"""
Finish the decompressor. As the decompressor decompresses eagerly, this
@@ -428,3 +503,30 @@ class Decompressor(object):
raise Error("Decompression error: incomplete compressed stream.")
return b''
+
+ def can_accept_more_data(self):
+ """
+ Checks if the decompressor can accept more compressed data.
+
+ If the ``output_buffer_limit`` parameter was used with
+ ``decompress()`` or ``process()``, this method should be checked to
+ determine if the decompressor is ready to accept new input. When the
+ output buffer limit is reached, the decompressor may still have
+ unconsumed input data or internal buffered output, and calling
+ ``decompress(b'')`` repeatedly will continue producing output until
+ this method returns ``True``.
+
+ .. versionadded:: 1.2.0
+
+ :returns: ``True`` if the decompressor is ready to accept more
+ compressed data via ``decompress()`` or ``process()``, ``False``
+ if the decompressor needs to output some data via
+ ``decompress(b'')``/``process(b'')`` before being provided any
+ more compressed data.
+ :rtype: ``bool``
+ """
+ if len(self._unconsumed_data) > 0:
+ return False
+ if lib.BrotliDecoderHasMoreOutput(self._decoder) == lib.BROTLI_TRUE:
+ return False
+ return True
Index: brotlipy-0.7.0/test/test_simple_decompression.py
===================================================================
--- brotlipy-0.7.0.orig/test/test_simple_decompression.py
+++ brotlipy-0.7.0/test/test_simple_decompression.py
@@ -38,6 +38,57 @@ def test_decompressobj(simple_compressed
assert data == uncompressed_data
+# `more_data_limit` allows testing `decompress(b'')` with and without a limit.
+@pytest.mark.parametrize('more_data_limit', [100, None])
+def test_decompressobj_with_output_buffer_limit(
+ simple_compressed_file, more_data_limit
+):
+ """
+ Test decompression with `output_buffer_limit` set.
+ """
+ with open(simple_compressed_file[0], 'rb') as f:
+ uncompressed_data = f.read()
+
+ with open(simple_compressed_file[1], 'rb') as f:
+ compressed_data = f.read()
+
+ o = brotli.Decompressor()
+ assert o.can_accept_more_data()
+ small_limit = 100
+ result = o.decompress(compressed_data, output_buffer_limit=small_limit)
+ assert len(result) <= small_limit
+
+ # Ensure `output_buffer_limit` of zero works.
+ assert o.decompress(b'', output_buffer_limit=0) == b''
+
+ if o._unconsumed_data:
+ with pytest.raises(
+ brotli.error,
+ match=(
+ r"brotli: decoder process called with data when "
+ r"'can_accept_more_data\(\)' is False"
+ ),
+ ):
+ o.decompress(b'additional data')
+
+ if not o.is_finished():
+ assert not o.can_accept_more_data()
+
+ # Continue decompressing with empty input.
+ all_output = [result]
+ while not o.can_accept_more_data() and not o.is_finished():
+ more_output = o.decompress(
+ b'', output_buffer_limit=more_data_limit
+ )
+ if more_data_limit is not None:
+ assert len(more_output) <= more_data_limit
+ all_output.append(more_output)
+ assert o.can_accept_more_data() or o.is_finished()
+
+ final_result = b''.join(all_output)
+ assert final_result == uncompressed_data
+
+
def test_drip_feed(simple_compressed_file):
"""
Sending in the data one byte at a time still works.