File CVE-2025-69229-small-chunk-exhaustion.patch of Package python-aiohttp.42478

From dc3170b56904bdf814228fae70a5501a42a6c712 Mon Sep 17 00:00:00 2001
From: Sam Bull <git@sambull.org>
Date: Sat, 3 Jan 2026 03:57:17 +0000
Subject: [PATCH] Use collections.deque for chunk splits (#11892) (#11912)

(cherry picked from commit 271532ea355c65480c8ecc14137dfbb72aec8f6f)

---------

Co-authored-by: Finder <nakamurajames123@gmail.com>
---
 aiohttp/streams.py        |  8 ++++----
 tests/test_http_parser.py | 14 +++++++++-----
 2 files changed, 13 insertions(+), 9 deletions(-)

Index: aiohttp-3.6.0/aiohttp/streams.py
===================================================================
--- aiohttp-3.6.0.orig/aiohttp/streams.py
+++ aiohttp-3.6.0/aiohttp/streams.py
@@ -115,10 +115,15 @@ class StreamReader(AsyncStreamReaderMixi
         self._high_water = limit * 2
         if loop is None:
             loop = asyncio.get_event_loop()
+        # Ensure high_water_chunks >= 3 so it's always > low_water_chunks.
+        self._high_water_chunks = max(3, limit // 4)
+        # Use max(2, ...) because there's always at least 1 chunk split remaining
+        # (the current position), so we need low_water >= 2 to allow resume.
+        self._low_water_chunks = max(2, self._high_water_chunks // 2)
         self._loop = loop
         self._size = 0
         self._cursor = 0
-        self._http_chunk_splits = None  # type: Optional[List[int]]
+        self._http_chunk_splits = None  # type: Optional[Deque[int]]
         self._buffer = collections.deque()  # type: Deque[bytes]
         self._buffer_offset = 0
         self._eof = False
@@ -251,7 +256,7 @@ class StreamReader(AsyncStreamReaderMixi
             if self.total_bytes:
                 raise RuntimeError("Called begin_http_chunk_receiving when"
                                    "some data was already fed")
-            self._http_chunk_splits = []
+            self._http_chunk_splits = collections.deque()
 
     def end_http_chunk_receiving(self) -> None:
         if self._http_chunk_splits is None:
@@ -275,6 +280,15 @@ class StreamReader(AsyncStreamReaderMixi
 
         self._http_chunk_splits.append(self.total_bytes)
 
+        # If we get too many small chunks before self._high_water is reached, then any
+        # .read() call becomes computationally expensive, and could block the event loop
+        # for too long, hence an additional self._high_water_chunks here.
+        if (
+            len(self._http_chunk_splits) > self._high_water_chunks
+            and not self._protocol._reading_paused
+        ):
+            self._protocol.pause_reading()
+
         # wake up readchunk when end of http chunk received
         waiter = self._waiter
         if waiter is not None:
@@ -393,7 +407,7 @@ class StreamReader(AsyncStreamReaderMixi
                 raise self._exception
 
             while self._http_chunk_splits:
-                pos = self._http_chunk_splits.pop(0)
+                pos = self._http_chunk_splits.popleft()
                 if pos == self._cursor:
                     return (b"", True)
                 if pos > self._cursor:
@@ -463,9 +477,16 @@ class StreamReader(AsyncStreamReaderMixi
         chunk_splits = self._http_chunk_splits
         # Prevent memory leak: drop useless chunk splits
         while chunk_splits and chunk_splits[0] < self._cursor:
-            chunk_splits.pop(0)
+            chunk_splits.popleft()
 
-        if self._size < self._low_water and self._protocol._reading_paused:
+        if (
+            self._protocol._reading_paused
+            and self._size < self._low_water
+            and (
+                self._http_chunk_splits is None
+                or len(self._http_chunk_splits) < self._low_water_chunks
+            )
+        ):
             self._protocol.resume_reading()
         return data
 
Index: aiohttp-3.6.0/tests/test_http_parser.py
===================================================================
--- aiohttp-3.6.0.orig/tests/test_http_parser.py
+++ aiohttp-3.6.0/tests/test_http_parser.py
@@ -691,7 +691,8 @@ def test_http_request_chunked_payload(pa
     parser.feed_data(b'4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n')
 
     assert b'dataline' == b''.join(d for d in payload._buffer)
-    assert [4, 8] == payload._http_chunk_splits
+    assert payload._http_chunk_splits is not None
+    assert [4, 8] == list(payload._http_chunk_splits)
     assert payload.is_eof()
 
 
@@ -706,7 +707,8 @@ def test_http_request_chunked_payload_an
         b'transfer-encoding: chunked\r\n\r\n')
 
     assert b'dataline' == b''.join(d for d in payload._buffer)
-    assert [4, 8] == payload._http_chunk_splits
+    assert payload._http_chunk_splits is not None
+    assert [4, 8] == list(payload._http_chunk_splits)
     assert payload.is_eof()
 
     assert len(messages) == 1
@@ -731,12 +733,14 @@ def test_http_request_chunked_payload_ch
     parser.feed_data(b'test: test\r\n')
 
     assert b'dataline' == b''.join(d for d in payload._buffer)
-    assert [4, 8] == payload._http_chunk_splits
+    assert payload._http_chunk_splits is not None
+    assert [4, 8] == list(payload._http_chunk_splits)
     assert not payload.is_eof()
 
     parser.feed_data(b'\r\n')
     assert b'dataline' == b''.join(d for d in payload._buffer)
-    assert [4, 8] == payload._http_chunk_splits
+    assert payload._http_chunk_splits is not None
+    assert [4, 8] == list(payload._http_chunk_splits)
     assert payload.is_eof()
 
 
@@ -749,7 +753,8 @@ def test_parse_chunked_payload_chunk_ext
         b'4;test\r\ndata\r\n4\r\nline\r\n0\r\ntest: test\r\n\r\n')
 
     assert b'dataline' == b''.join(d for d in payload._buffer)
-    assert [4, 8] == payload._http_chunk_splits
+    assert payload._http_chunk_splits is not None
+    assert [4, 8] == list(payload._http_chunk_splits)
     assert payload.is_eof()
 
 async def test_request_chunked_with_trailer(parser: HttpRequestParser) -> None:
Index: aiohttp-3.6.0/tests/test_streams.py
===================================================================
--- aiohttp-3.6.0.orig/tests/test_streams.py
+++ aiohttp-3.6.0/tests/test_streams.py
@@ -1378,3 +1378,160 @@ async def test_stream_reader_iter_chunks
     async for data, end_of_chunk in stream.iter_chunks():
         assert (data, end_of_chunk) == (next(it), True)
     pytest.raises(StopIteration, next, it)
+
+
+async def test_stream_reader_pause_on_high_water_chunks(protocol):
+    """Test that reading is paused when chunk count exceeds high water mark."""
+    loop = asyncio.get_event_loop()
+    # Use small limit so high_water_chunks is small: limit // 4 = 10
+    stream = streams.StreamReader(protocol, limit=40, loop=loop)
+
+    assert stream._high_water_chunks == 10
+    assert stream._low_water_chunks == 5
+
+    # Feed chunks until we exceed high_water_chunks
+    for i in range(12):
+        stream.begin_http_chunk_receiving()
+        stream.feed_data(b"x")  # 1 byte per chunk
+        stream.end_http_chunk_receiving()
+
+    # pause_reading should have been called when chunk count exceeded 10
+    protocol.pause_reading.assert_called()
+
+
+async def test_stream_reader_resume_on_low_water_chunks(protocol):
+    """Test that reading resumes when chunk count drops below low water mark."""
+    loop = asyncio.get_event_loop()
+    # Use small limit so high_water_chunks is small: limit // 4 = 10
+    stream = streams.StreamReader(protocol, limit=40, loop=loop)
+
+    assert stream._high_water_chunks == 10
+    assert stream._low_water_chunks == 5
+
+    # Feed chunks until we exceed high_water_chunks
+    for i in range(12):
+        stream.begin_http_chunk_receiving()
+        stream.feed_data(b"x")  # 1 byte per chunk
+        stream.end_http_chunk_receiving()
+
+    # Simulate that reading was paused
+    protocol._reading_paused = True
+    protocol.pause_reading.reset_mock()
+
+    # Read data to reduce both size and chunk count
+    # Reading will consume chunks and reduce _http_chunk_splits
+    data = await stream.read(10)
+    assert data == b"xxxxxxxxxx"
+
+    # resume_reading should have been called when both size and chunk count
+    # dropped below their respective low water marks
+    protocol.resume_reading.assert_called()
+
+
+async def test_stream_reader_no_resume_when_chunks_still_high(protocol):
+    """Test that reading doesn't resume if chunk count is still above low water."""
+    loop = asyncio.get_event_loop()
+    # Use small limit so high_water_chunks is small: limit // 4 = 10
+    stream = streams.StreamReader(protocol, limit=40, loop=loop)
+
+    # Feed many chunks
+    for i in range(12):
+        stream.begin_http_chunk_receiving()
+        stream.feed_data(b"x")
+        stream.end_http_chunk_receiving()
+
+    # Simulate that reading was paused
+    protocol._reading_paused = True
+
+    # Read only a few bytes - chunk count will still be high
+    data = await stream.read(2)
+    assert data == b"xx"
+
+    # resume_reading should NOT be called because chunk count is still >= low_water_chunks
+    protocol.resume_reading.assert_not_called()
+
+
+async def test_stream_reader_read_non_chunked_response(protocol):
+    """Test that non-chunked responses work correctly (no chunk tracking)."""
+    loop = asyncio.get_event_loop()
+    stream = streams.StreamReader(protocol, limit=40, loop=loop)
+
+    # Non-chunked: just feed data without begin/end_http_chunk_receiving
+    stream.feed_data(b"Hello World")
+
+    # _http_chunk_splits should be None for non-chunked responses
+    assert stream._http_chunk_splits is None
+
+    # Reading should work without issues
+    data = await stream.read(5)
+    assert data == b"Hello"
+
+    data = await stream.read(6)
+    assert data == b" World"
+
+
+async def test_stream_reader_resume_non_chunked_when_paused(protocol):
+    """Test that resume works for non-chunked responses when paused due to size."""
+    loop = asyncio.get_event_loop()
+    # Small limit so we can trigger pause via size
+    stream = streams.StreamReader(protocol, limit=10, loop=loop)
+
+    # Feed data that exceeds high_water (limit * 2 = 20)
+    stream.feed_data(b"x" * 25)
+
+    # Simulate that reading was paused due to size
+    protocol._reading_paused = True
+    protocol.pause_reading.assert_called()
+
+    # Read enough to drop below low_water (limit = 10)
+    data = await stream.read(20)
+    assert data == b"x" * 20
+
+    # resume_reading should be called (size is now 5 < low_water 10)
+    protocol.resume_reading.assert_called()
+
+
+@pytest.mark.parametrize("limit", [1, 2, 4])
+async def test_stream_reader_small_limit_resumes_reading(protocol, limit):
+    """Test that small limits still allow resume_reading to be called.
+
+    Even with very small limits, high_water_chunks should be at least 3
+    and low_water_chunks should be at least 2, with high > low to ensure
+    proper flow control.
+    """
+    loop = asyncio.get_event_loop()
+    stream = streams.StreamReader(protocol, limit=limit, loop=loop)
+
+    # Verify minimum thresholds are enforced and high > low
+    assert stream._high_water_chunks >= 3
+    assert stream._low_water_chunks >= 2
+    assert stream._high_water_chunks > stream._low_water_chunks
+
+    # Set up pause/resume side effects
+    def pause_reading() -> None:
+        protocol._reading_paused = True
+
+    protocol.pause_reading.side_effect = pause_reading
+
+    def resume_reading() -> None:
+        protocol._reading_paused = False
+
+    protocol.resume_reading.side_effect = resume_reading
+
+    # Feed 4 chunks (triggers pause at > high_water_chunks which is >= 3)
+    for char in b"abcd":
+        stream.begin_http_chunk_receiving()
+        stream.feed_data(bytes([char]))
+        stream.end_http_chunk_receiving()
+
+    # Reading should now be paused
+    assert protocol._reading_paused is True
+    assert protocol.pause_reading.called
+
+    # Read all data - should resume (chunk count drops below low_water_chunks)
+    data = stream.read_nowait()
+    assert data == b"abcd"
+    assert stream._size == 0
+
+    protocol.resume_reading.assert_called()
+    assert protocol._reading_paused is False
openSUSE Build Service is sponsored by