File CVE-2022-21712-sec-expo-CO-redirect.patch of Package python-Twisted.26707

From eda4f1e2ec9988a142de244f1a2b285939718c03 Mon Sep 17 00:00:00 2001
From: Glyph <glyph@twistedmatrix.com>
Date: Sun, 23 Jan 2022 12:57:49 -0800
Subject: [PATCH 01/10] failing test for header data leak

---
 twisted/newsfragments/10294.bugfix |    1 
 twisted/web/client.py              |   66 ++++++++++-
 twisted/web/iweb.py                |   10 -
 twisted/web/test/test_agent.py     |  208 +++++++++++++++++++++++++++----------
 4 files changed, 219 insertions(+), 66 deletions(-)

Index: Twisted-15.2.1/twisted/newsfragments/10294.bugfix
===================================================================
--- /dev/null
+++ Twisted-15.2.1/twisted/newsfragments/10294.bugfix
@@ -0,0 +1 @@
+twisted.web.client.RedirectAgent and twisted.web.client.BrowserLikeRedirectAgent now properly remove sensitive headers when redirecting to a different origin.
\ No newline at end of file
Index: Twisted-15.2.1/twisted/web/client.py
===================================================================
--- Twisted-15.2.1.orig/twisted/web/client.py
+++ Twisted-15.2.1/twisted/web/client.py
@@ -11,9 +11,10 @@ from __future__ import division, absolut
 import os
 import types
 import warnings
+import zlib
 
 try:
-    from urlparse import urlunparse, urljoin, urldefrag
+    from urlparse import urldefrag, urljoin, urlunparse
     from urllib import splithost, splittype
 except ImportError:
     from urllib.parse import splithost, splittype, urljoin, urldefrag
@@ -23,19 +24,20 @@ except ImportError:
         result = _urlunparse(tuple([p.decode("charmap") for p in parts]))
         return result.encode("charmap")
 
-import zlib
 from functools import wraps
-
 from zope.interface import implementer
 
 from twisted.python.compat import _PY3, nativeString, intToBytes
 from twisted.python import log
 from twisted.python.failure import Failure
-from twisted.python.deprecate import deprecatedModuleAttribute
 from twisted.python.versions import Version
 
 from twisted.web.iweb import IPolicyForHTTPS, IAgentEndpointFactory
-from twisted.python.deprecate import getDeprecationWarningString
+from twisted.python.deprecate import (
+    deprecated,
+    deprecatedModuleAttribute,
+    getDeprecationWarningString
+)
 from twisted.web import http
 from twisted.internet import defer, protocol, task, reactor
 from twisted.internet.interfaces import IProtocol
@@ -43,8 +45,15 @@ from twisted.internet.endpoints import T
 from twisted.python.util import InsensitiveDict
 from twisted.python.components import proxyForInterface
 from twisted.web import error
-from twisted.web.iweb import UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse
 from twisted.web.http_headers import Headers
+from twisted.web.iweb import (
+    UNKNOWN_LENGTH,
+    IAgent,
+    IAgentEndpointFactory,
+    IBodyProducer,
+    IPolicyForHTTPS,
+    IResponse,
+)
 
 from twisted.web._newclient import _ensureValidURI, _ensureValidMethod
 
@@ -1899,6 +1908,18 @@ class ContentDecoderAgent(object):
 
 
 
+_canonicalHeaderName = Headers()._canonicalNameCaps
+_defaultSensitiveHeaders = frozenset(
+    [
+        b"Authorization",
+        b"Cookie",
+        b"Cookie2",
+        b"Proxy-Authorization",
+        b"WWW-Authenticate",
+    ]
+)
+
+
 @implementer(IAgent)
 class RedirectAgent(object):
     """
@@ -1913,6 +1934,11 @@ class RedirectAgent(object):
     @param redirectLimit: The maximum number of times the agent is allowed to
         follow redirects before failing with a L{error.InfiniteRedirection}.
 
+    @param sensitiveHeaderNames: An iterable of C{bytes} enumerating the names
+        of headers that must not be transmitted when redirecting to a different
+        origins.  These will be consulted in addition to the protocol-specified
+        set of headers that contain sensitive information.
+
     @cvar _redirectResponses: A L{list} of HTTP status codes to be redirected
         for I{GET} and I{HEAD} methods.
 
@@ -1927,9 +1953,17 @@ class RedirectAgent(object):
     _seeOtherResponses = [http.SEE_OTHER]
 
 
-    def __init__(self, agent, redirectLimit=20):
+    def __init__(
+        self,
+        agent,
+        redirectLimit = 20,
+        sensitiveHeaderNames = (),
+    ):
         self._agent = agent
         self._redirectLimit = redirectLimit
+        sensitive = {_canonicalHeaderName(each) for each in sensitiveHeaderNames}
+        sensitive.update(_defaultSensitiveHeaders)
+        self._sensitiveHeaderNames = sensitive
 
 
     def request(self, method, uri, headers=None, bodyProducer=None):
@@ -1976,6 +2010,22 @@ class RedirectAgent(object):
                 response.code, 'No location header field', uri)
             raise ResponseFailed([Failure(err)], response)
         location = self._resolveLocation(uri, locationHeaders[0])
+        if headers:
+            parsedURI = URI.fromBytes(uri)
+            parsedLocation = URI.fromBytes(location)
+            sameOrigin = (
+                (parsedURI.scheme == parsedLocation.scheme)
+                and (parsedURI.host == parsedLocation.host)
+                and (parsedURI.port == parsedLocation.port)
+            )
+            if not sameOrigin:
+                headers = Headers(
+                    {
+                        rawName: rawValue
+                        for rawName, rawValue in headers.getAllRawHeaders()
+                        if rawName not in self._sensitiveHeaderNames
+                    }
+                )
         deferred = self._agent.request(method, location, headers)
         def _chainResponse(newResponse):
             newResponse.setPreviousResponse(response)
Index: Twisted-15.2.1/twisted/web/iweb.py
===================================================================
--- Twisted-15.2.1.orig/twisted/web/iweb.py
+++ Twisted-15.2.1/twisted/web/iweb.py
@@ -675,12 +675,12 @@ class IAgent(Interface):
     obtained by combining a number of (hypothetical) implementations::
 
         baseAgent = Agent(reactor)
-        redirect = BrowserLikeRedirectAgent(baseAgent, limit=10)
+        decode = ContentDecoderAgent(baseAgent, [(b"gzip", GzipDecoder())])
+        cookie = CookieAgent(decode, diskStore.cookie)
         authenticate = AuthenticateAgent(
-            redirect, [diskStore.credentials, GtkAuthInterface()])
-        cookie = CookieAgent(authenticate, diskStore.cookie)
-        decode = ContentDecoderAgent(cookie, [(b"gzip", GzipDecoder())])
-        cache = CacheAgent(decode, diskStore.cache)
+            cookie, [diskStore.credentials, GtkAuthInterface()])
+        cache = CacheAgent(authenticate, diskStore.cache)
+        redirect = BrowserLikeRedirectAgent(cache, limit=10)
 
         doSomeRequests(cache)
     """
Index: Twisted-15.2.1/twisted/web/test/test_agent.py
===================================================================
--- Twisted-15.2.1.orig/twisted/web/test/test_agent.py
+++ Twisted-15.2.1/twisted/web/test/test_agent.py
@@ -5,45 +5,79 @@
 Tests for L{twisted.web.client.Agent} and related new client APIs.
 """
 
-import cookielib
 import zlib
-from StringIO import StringIO
+try:
+    from cookielib import CookieJar
+except ImportError:
+    from http.cookiejar import CookieJar
+from io import StringIO
+from unittest import SkipTest
 
+from zope.interface.declarations import implementer
 from zope.interface.verify import verifyObject
 
+from incremental import Version
+
 from twisted.trial.unittest import TestCase, SynchronousTestCase
 from twisted.web import client, error, http_headers
-from twisted.web._newclient import RequestNotSent, RequestTransmissionFailed
-from twisted.web._newclient import ResponseNeverReceived, ResponseFailed
-from twisted.web._newclient import PotentialDataLoss
+from twisted.web._newclient import (
+    HTTP11ClientProtocol,
+    PotentialDataLoss,
+    RequestNotSent,
+    RequestTransmissionFailed,
+    Response,
+    ResponseFailed,
+    ResponseNeverReceived,
+)
+from twisted.web.client import (
+    URI,
+    BrowserLikePolicyForHTTPS,
+    FileBodyProducer,
+    HostnameCachingHTTPSPolicy,
+    HTTPConnectionPool,
+    Request,
+    ResponseDone,
+    _HTTP11ClientFactory,
+)
 from twisted.internet import defer, task
 from twisted.python.failure import Failure
+from twisted.test.proto_helpers import (
+    MemoryReactorClock,
+    StringTransport,
+)
 from twisted.python.components import proxyForInterface
-from twisted.test.proto_helpers import StringTransport, MemoryReactorClock
 from twisted.internet.task import Clock
-from twisted.internet.error import ConnectionRefusedError, ConnectionDone
-from twisted.internet.error import ConnectionLost
+from twisted.internet.error import (
+    ConnectionDone,
+    ConnectionLost,
+    ConnectionRefusedError,
+)
 from twisted.internet.protocol import Protocol, Factory
 from twisted.internet.defer import Deferred, succeed, CancelledError
 from twisted.internet.endpoints import TCP4ClientEndpoint, SSL4ClientEndpoint
 
-from twisted.web.client import (FileBodyProducer, Request, HTTPConnectionPool,
-                                ResponseDone, _HTTP11ClientFactory, URI)
-
 from twisted.web.iweb import (
-    UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse, IAgentEndpointFactory,
-    )
+    UNKNOWN_LENGTH,
+    IAgent,
+    IAgentEndpointFactory,
+    IBodyProducer,
+    IPolicyForHTTPS,
+    IResponse,
+)
 from twisted.web.http_headers import Headers
-from twisted.web._newclient import HTTP11ClientProtocol, Response
 
 from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
-from zope.interface.declarations import implementer
-from twisted.web.iweb import IPolicyForHTTPS
+from twisted.web.test.injectionhelpers import (
+    MethodInjectionTestsMixin,
+    URIInjectionTestsMixin,
+)
+
 from twisted.python.deprecate import getDeprecationWarningString
-from incremental import Version
-from twisted.web.client import (BrowserLikePolicyForHTTPS)
 from twisted.web.error import SchemeNotSupported
 
+testMixinClass = object
+runtimeTestCase = TestCase
+
 try:
     from twisted.internet import ssl
     from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol
@@ -63,7 +97,7 @@ class StubHTTPProtocol(Protocol):
         tuple consisting of the request and the L{Deferred} returned from the
         request method is appended to this list.
     """
-    def __init__(self):
+    def __init__(self) -> None:
         self.requests = []
         self.state = 'QUIESCENT'
 
@@ -1743,13 +1777,13 @@ class CookieJarTests(TestCase, CookieTes
     """
     Tests for L{twisted.web.client._FakeUrllib2Response} and
     L{twisted.web.client._FakeUrllib2Request}'s interactions with
-    C{cookielib.CookieJar} instances.
+    C{CookieJar} instances.
     """
     def makeCookieJar(self):
         """
-        @return: a C{cookielib.CookieJar} with some sample cookies
+        @return: a C{CookieJar} with some sample cookies
         """
-        cookieJar = cookielib.CookieJar()
+        cookieJar = CookieJar()
         reqres = self.addCookies(
             cookieJar,
             'http://example.com:1234/foo?bar',
@@ -1760,7 +1794,7 @@ class CookieJarTests(TestCase, CookieTes
 
     def test_extractCookies(self):
         """
-        L{cookielib.CookieJar.extract_cookies} extracts cookie information from
+        L{CookieJar.extract_cookies} extracts cookie information from
         fake urllib2 response instances.
         """
         jar = self.makeCookieJar()[0]
@@ -1785,7 +1819,7 @@ class CookieJarTests(TestCase, CookieTes
 
     def test_sendCookie(self):
         """
-        L{cookielib.CookieJar.add_cookie_header} adds a cookie header to a fake
+        L{CookieJar.add_cookie_header} adds a cookie header to a fake
         urllib2 request instance.
         """
         jar, (request, response) = self.makeCookieJar()
@@ -1812,7 +1846,7 @@ class CookieAgentTests(TestCase, CookieT
         """
         return client.CookieAgent(
             self.buildAgentForWrapperTest(self.reactor),
-            cookielib.CookieJar())
+            CookieJar())
 
 
     def setUp(self):
@@ -1826,7 +1860,7 @@ class CookieAgentTests(TestCase, CookieT
         being requested. Cookies are extracted from the response and stored in
         the cookie jar.
         """
-        cookieJar = cookielib.CookieJar()
+        cookieJar = CookieJar()
         self.assertEqual(list(cookieJar), [])
 
         agent = self.buildAgentForWrapperTest(self.reactor)
@@ -1865,7 +1899,7 @@ class CookieAgentTests(TestCase, CookieT
         uri = 'http://example.com:1234/foo?bar'
         cookie = 'foo=1'
 
-        cookieJar = cookielib.CookieJar()
+        cookieJar = CookieJar()
         self.addCookies(cookieJar, uri, [cookie])
         self.assertEqual(len(list(cookieJar)), 1)
 
@@ -1885,7 +1919,7 @@ class CookieAgentTests(TestCase, CookieT
         uri = 'https://example.com:1234/foo?bar'
         cookie = 'foo=1;secure'
 
-        cookieJar = cookielib.CookieJar()
+        cookieJar = CookieJar()
         self.addCookies(cookieJar, uri, [cookie])
         self.assertEqual(len(list(cookieJar)), 1)
 
@@ -1905,7 +1939,7 @@ class CookieAgentTests(TestCase, CookieT
         uri = 'http://example.com/foo?bar'
         cookie = 'foo=1;secure'
 
-        cookieJar = cookielib.CookieJar()
+        cookieJar = CookieJar()
         self.addCookies(cookieJar, uri, [cookie])
         self.assertEqual(len(list(cookieJar)), 1)
 
@@ -1925,7 +1959,7 @@ class CookieAgentTests(TestCase, CookieT
         uri = 'https://example.com:1234/foo?bar'
         cookie = 'foo=1;port=1234'
 
-        cookieJar = cookielib.CookieJar()
+        cookieJar = CookieJar()
         self.addCookies(cookieJar, uri, [cookie])
         self.assertEqual(len(list(cookieJar)), 1)
 
@@ -1945,7 +1979,7 @@ class CookieAgentTests(TestCase, CookieT
         uri = 'https://example.com:4567/foo?bar'
         cookie = 'foo=1;port=1234'
 
-        cookieJar = cookielib.CookieJar()
+        cookieJar = CookieJar()
         self.addCookies(cookieJar, uri, [cookie])
         self.assertEqual(len(list(cookieJar)), 0)
 
@@ -2400,11 +2434,26 @@ class ProxyAgentTests(TestCase, FakeReac
 
 
 
-class _RedirectAgentTestsMixin(object):
+SENSITIVE_HEADERS = [
+    b"authorization",
+    b"cookie",
+    b"cookie2",
+    b"proxy-authorization",
+    b"www-authenticate",
+]
+
+testMixinClass = object
+
+
+class _RedirectAgentTestsMixin(testMixinClass):
     """
     Test cases mixin for L{RedirectAgentTests} and
     L{BrowserLikeRedirectAgentTests}.
     """
+    agent: IAgent
+    reactor: MemoryReactorClock
+    protocol: StubHTTPProtocol
+
     def test_noRedirect(self):
         """
         L{client.RedirectAgent} behaves like L{client.Agent} if the response
@@ -2423,25 +2472,58 @@ class _RedirectAgentTestsMixin(object):
         self.assertIdentical(response, result)
         self.assertIdentical(result.previousResponse, None)
 
-
-    def _testRedirectDefault(self, code):
+    def _testRedirectDefault(
+        self,
+        code: int,
+        crossScheme: bool = False,
+        crossDomain: bool = False,
+        crossPort: bool = False,
+        requestHeaders: Optional[Headers] = None,
+    ) -> Request:
         """
         When getting a redirect, L{client.RedirectAgent} follows the URL
         specified in the L{Location} header field and make a new request.
 
         @param code: HTTP status code.
         """
-        self.agent.request('GET', 'http://example.com/foo')
+        startDomain = b"example.com"
+        startScheme = b"https" if ssl is not None else b"http"
+        startPort = 80 if startScheme == b"http" else 443
+        self.agent.request(
+            b"GET", startScheme + b"://" + startDomain + b"/foo", headers=requestHeaders
+        )
 
         host, port = self.reactor.tcpClients.pop()[:2]
         self.assertEqual("example.com", host)
-        self.assertEqual(80, port)
+        self.assertEqual(startPort, port)
 
         req, res = self.protocol.requests.pop()
 
-        headers = http_headers.Headers(
-            {'location': ['https://example.com/bar']})
-        response = Response(('HTTP', 1, 1), code, 'OK', headers, None)
+        # If possible (i.e.: TLS support is present), run the test with a
+        # cross-scheme redirect to verify that the scheme is honored; if not,
+        # let's just make sure it works at all.
+
+        targetScheme = startScheme
+        targetDomain = startDomain
+        targetPort = startPort
+
+        if crossScheme:
+            if ssl is None:
+                raise SkipTest(
+                    "Cross-scheme redirects can't be tested without TLS support."
+                )
+            targetScheme = b"https" if startScheme == b"http" else b"http"
+            targetPort = 443 if startPort == 80 else 80
+
+        portSyntax = b""
+        if crossPort:
+            targetPort = 8443
+            portSyntax = b":8443"
+        targetDomain = b"example.net" if crossDomain else startDomain
+        locationValue = targetScheme + b"://" + targetDomain + portSyntax + b"/bar"
+        headers = http_headers.Headers({b"location": [locationValue]}
+        )
+        response = Response((b'HTTP', 1, 1), code, b'OK', headers, None)
         res.callback(response)
 
         req2, res2 = self.protocol.requests.pop()
@@ -2449,9 +2531,9 @@ class _RedirectAgentTestsMixin(object):
         self.assertEqual('/bar', req2.uri)
 
         host, port = self.reactor.sslClients.pop()[:2]
-        self.assertEqual("example.com", host)
-        self.assertEqual(443, port)
-
+        self.assertEqual(EXAMPLE_NET_IP if crossDomain else EXAMPLE_COM_IP, host)
+        self.assertEqual(targetPort, port)
+        return req2
 
     def test_redirect301(self):
         """
@@ -2688,19 +2770,24 @@ class _RedirectAgentTestsMixin(object):
         self.assertIdentical(redirectResponse.previousResponse, None)
 
 
-
-class RedirectAgentTests(TestCase, FakeReactorAndConnectMixin,
-                         _RedirectAgentTestsMixin, AgentTestsMixin):
+class RedirectAgentTests(
+    FakeReactorAndConnectMixin,
+    _RedirectAgentTestsMixin,
+    AgentTestsMixin,
+    runtimeTestCase
+):
     """
     Tests for L{client.RedirectAgent}.
     """
+
     def makeAgent(self):
         """
         @return: a new L{twisted.web.client.RedirectAgent}
         """
         return client.RedirectAgent(
-            self.buildAgentForWrapperTest(self.reactor))
-
+            self.buildAgentForWrapperTest(self.reactor),
+            sensitiveHeaderNames=[b"X-Custom-sensitive"],
+        )
 
     def setUp(self):
         self.reactor = self.Reactor()
@@ -2725,11 +2812,12 @@ class RedirectAgentTests(TestCase, FakeR
         self._testPageRedirectFailure(302, 'POST')
 
 
-
-class BrowserLikeRedirectAgentTests(TestCase,
-                                    FakeReactorAndConnectMixin,
-                                    _RedirectAgentTestsMixin,
-                                    AgentTestsMixin):
+class BrowserLikeRedirectAgentTests(
+    FakeReactorAndConnectMixin,
+    _RedirectAgentTestsMixin,
+    AgentTestsMixin,
+    runtimeTestCase,
+):
     """
     Tests for L{client.BrowserLikeRedirectAgent}.
     """
@@ -2738,7 +2826,9 @@ class BrowserLikeRedirectAgentTests(Test
         @return: a new L{twisted.web.client.BrowserLikeRedirectAgent}
         """
         return client.BrowserLikeRedirectAgent(
-            self.buildAgentForWrapperTest(self.reactor))
+            self.buildAgentForWrapperTest(self.reactor),
+            sensitiveHeaderNames=[b"x-Custom-sensitive"],
+        )
 
 
     def setUp(self):
openSUSE Build Service is sponsored by