File CVE-2022-21712-sec-expo-CO-redirect.patch of Package python-Twisted.26707
From eda4f1e2ec9988a142de244f1a2b285939718c03 Mon Sep 17 00:00:00 2001
From: Glyph <glyph@twistedmatrix.com>
Date: Sun, 23 Jan 2022 12:57:49 -0800
Subject: [PATCH 01/10] failing test for header data leak
---
twisted/newsfragments/10294.bugfix | 1
twisted/web/client.py | 66 ++++++++++-
twisted/web/iweb.py | 10 -
twisted/web/test/test_agent.py | 208 +++++++++++++++++++++++++++----------
4 files changed, 219 insertions(+), 66 deletions(-)
Index: Twisted-15.2.1/twisted/newsfragments/10294.bugfix
===================================================================
--- /dev/null
+++ Twisted-15.2.1/twisted/newsfragments/10294.bugfix
@@ -0,0 +1 @@
+twisted.web.client.RedirectAgent and twisted.web.client.BrowserLikeRedirectAgent now properly remove sensitive headers when redirecting to a different origin.
\ No newline at end of file
Index: Twisted-15.2.1/twisted/web/client.py
===================================================================
--- Twisted-15.2.1.orig/twisted/web/client.py
+++ Twisted-15.2.1/twisted/web/client.py
@@ -11,9 +11,10 @@ from __future__ import division, absolut
import os
import types
import warnings
+import zlib
try:
- from urlparse import urlunparse, urljoin, urldefrag
+ from urlparse import urldefrag, urljoin, urlunparse
from urllib import splithost, splittype
except ImportError:
from urllib.parse import splithost, splittype, urljoin, urldefrag
@@ -23,19 +24,20 @@ except ImportError:
result = _urlunparse(tuple([p.decode("charmap") for p in parts]))
return result.encode("charmap")
-import zlib
from functools import wraps
-
from zope.interface import implementer
from twisted.python.compat import _PY3, nativeString, intToBytes
from twisted.python import log
from twisted.python.failure import Failure
-from twisted.python.deprecate import deprecatedModuleAttribute
from twisted.python.versions import Version
from twisted.web.iweb import IPolicyForHTTPS, IAgentEndpointFactory
-from twisted.python.deprecate import getDeprecationWarningString
+from twisted.python.deprecate import (
+ deprecated,
+ deprecatedModuleAttribute,
+ getDeprecationWarningString
+)
from twisted.web import http
from twisted.internet import defer, protocol, task, reactor
from twisted.internet.interfaces import IProtocol
@@ -43,8 +45,15 @@ from twisted.internet.endpoints import T
from twisted.python.util import InsensitiveDict
from twisted.python.components import proxyForInterface
from twisted.web import error
-from twisted.web.iweb import UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse
from twisted.web.http_headers import Headers
+from twisted.web.iweb import (
+ UNKNOWN_LENGTH,
+ IAgent,
+ IAgentEndpointFactory,
+ IBodyProducer,
+ IPolicyForHTTPS,
+ IResponse,
+)
from twisted.web._newclient import _ensureValidURI, _ensureValidMethod
@@ -1899,6 +1908,18 @@ class ContentDecoderAgent(object):
+_canonicalHeaderName = Headers()._canonicalNameCaps
+_defaultSensitiveHeaders = frozenset(
+ [
+ b"Authorization",
+ b"Cookie",
+ b"Cookie2",
+ b"Proxy-Authorization",
+ b"WWW-Authenticate",
+ ]
+)
+
+
@implementer(IAgent)
class RedirectAgent(object):
"""
@@ -1913,6 +1934,11 @@ class RedirectAgent(object):
@param redirectLimit: The maximum number of times the agent is allowed to
follow redirects before failing with a L{error.InfiniteRedirection}.
+ @param sensitiveHeaderNames: An iterable of C{bytes} enumerating the names
+ of headers that must not be transmitted when redirecting to a different
+ origins. These will be consulted in addition to the protocol-specified
+ set of headers that contain sensitive information.
+
@cvar _redirectResponses: A L{list} of HTTP status codes to be redirected
for I{GET} and I{HEAD} methods.
@@ -1927,9 +1953,17 @@ class RedirectAgent(object):
_seeOtherResponses = [http.SEE_OTHER]
- def __init__(self, agent, redirectLimit=20):
+ def __init__(
+ self,
+ agent,
+ redirectLimit = 20,
+ sensitiveHeaderNames = (),
+ ):
self._agent = agent
self._redirectLimit = redirectLimit
+ sensitive = {_canonicalHeaderName(each) for each in sensitiveHeaderNames}
+ sensitive.update(_defaultSensitiveHeaders)
+ self._sensitiveHeaderNames = sensitive
def request(self, method, uri, headers=None, bodyProducer=None):
@@ -1976,6 +2010,22 @@ class RedirectAgent(object):
response.code, 'No location header field', uri)
raise ResponseFailed([Failure(err)], response)
location = self._resolveLocation(uri, locationHeaders[0])
+ if headers:
+ parsedURI = URI.fromBytes(uri)
+ parsedLocation = URI.fromBytes(location)
+ sameOrigin = (
+ (parsedURI.scheme == parsedLocation.scheme)
+ and (parsedURI.host == parsedLocation.host)
+ and (parsedURI.port == parsedLocation.port)
+ )
+ if not sameOrigin:
+ headers = Headers(
+ {
+ rawName: rawValue
+ for rawName, rawValue in headers.getAllRawHeaders()
+ if rawName not in self._sensitiveHeaderNames
+ }
+ )
deferred = self._agent.request(method, location, headers)
def _chainResponse(newResponse):
newResponse.setPreviousResponse(response)
Index: Twisted-15.2.1/twisted/web/iweb.py
===================================================================
--- Twisted-15.2.1.orig/twisted/web/iweb.py
+++ Twisted-15.2.1/twisted/web/iweb.py
@@ -675,12 +675,12 @@ class IAgent(Interface):
obtained by combining a number of (hypothetical) implementations::
baseAgent = Agent(reactor)
- redirect = BrowserLikeRedirectAgent(baseAgent, limit=10)
+ decode = ContentDecoderAgent(baseAgent, [(b"gzip", GzipDecoder())])
+ cookie = CookieAgent(decode, diskStore.cookie)
authenticate = AuthenticateAgent(
- redirect, [diskStore.credentials, GtkAuthInterface()])
- cookie = CookieAgent(authenticate, diskStore.cookie)
- decode = ContentDecoderAgent(cookie, [(b"gzip", GzipDecoder())])
- cache = CacheAgent(decode, diskStore.cache)
+ cookie, [diskStore.credentials, GtkAuthInterface()])
+ cache = CacheAgent(authenticate, diskStore.cache)
+ redirect = BrowserLikeRedirectAgent(cache, limit=10)
doSomeRequests(cache)
"""
Index: Twisted-15.2.1/twisted/web/test/test_agent.py
===================================================================
--- Twisted-15.2.1.orig/twisted/web/test/test_agent.py
+++ Twisted-15.2.1/twisted/web/test/test_agent.py
@@ -5,45 +5,79 @@
Tests for L{twisted.web.client.Agent} and related new client APIs.
"""
-import cookielib
import zlib
-from StringIO import StringIO
+try:
+ from cookielib import CookieJar
+except ImportError:
+ from http.cookiejar import CookieJar
+from io import StringIO
+from unittest import SkipTest
+from zope.interface.declarations import implementer
from zope.interface.verify import verifyObject
+from incremental import Version
+
from twisted.trial.unittest import TestCase, SynchronousTestCase
from twisted.web import client, error, http_headers
-from twisted.web._newclient import RequestNotSent, RequestTransmissionFailed
-from twisted.web._newclient import ResponseNeverReceived, ResponseFailed
-from twisted.web._newclient import PotentialDataLoss
+from twisted.web._newclient import (
+ HTTP11ClientProtocol,
+ PotentialDataLoss,
+ RequestNotSent,
+ RequestTransmissionFailed,
+ Response,
+ ResponseFailed,
+ ResponseNeverReceived,
+)
+from twisted.web.client import (
+ URI,
+ BrowserLikePolicyForHTTPS,
+ FileBodyProducer,
+ HostnameCachingHTTPSPolicy,
+ HTTPConnectionPool,
+ Request,
+ ResponseDone,
+ _HTTP11ClientFactory,
+)
from twisted.internet import defer, task
from twisted.python.failure import Failure
+from twisted.test.proto_helpers import (
+ MemoryReactorClock,
+ StringTransport,
+)
from twisted.python.components import proxyForInterface
-from twisted.test.proto_helpers import StringTransport, MemoryReactorClock
from twisted.internet.task import Clock
-from twisted.internet.error import ConnectionRefusedError, ConnectionDone
-from twisted.internet.error import ConnectionLost
+from twisted.internet.error import (
+ ConnectionDone,
+ ConnectionLost,
+ ConnectionRefusedError,
+)
from twisted.internet.protocol import Protocol, Factory
from twisted.internet.defer import Deferred, succeed, CancelledError
from twisted.internet.endpoints import TCP4ClientEndpoint, SSL4ClientEndpoint
-from twisted.web.client import (FileBodyProducer, Request, HTTPConnectionPool,
- ResponseDone, _HTTP11ClientFactory, URI)
-
from twisted.web.iweb import (
- UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse, IAgentEndpointFactory,
- )
+ UNKNOWN_LENGTH,
+ IAgent,
+ IAgentEndpointFactory,
+ IBodyProducer,
+ IPolicyForHTTPS,
+ IResponse,
+)
from twisted.web.http_headers import Headers
-from twisted.web._newclient import HTTP11ClientProtocol, Response
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
-from zope.interface.declarations import implementer
-from twisted.web.iweb import IPolicyForHTTPS
+from twisted.web.test.injectionhelpers import (
+ MethodInjectionTestsMixin,
+ URIInjectionTestsMixin,
+)
+
from twisted.python.deprecate import getDeprecationWarningString
-from incremental import Version
-from twisted.web.client import (BrowserLikePolicyForHTTPS)
from twisted.web.error import SchemeNotSupported
+testMixinClass = object
+runtimeTestCase = TestCase
+
try:
from twisted.internet import ssl
from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol
@@ -63,7 +97,7 @@ class StubHTTPProtocol(Protocol):
tuple consisting of the request and the L{Deferred} returned from the
request method is appended to this list.
"""
- def __init__(self):
+ def __init__(self) -> None:
self.requests = []
self.state = 'QUIESCENT'
@@ -1743,13 +1777,13 @@ class CookieJarTests(TestCase, CookieTes
"""
Tests for L{twisted.web.client._FakeUrllib2Response} and
L{twisted.web.client._FakeUrllib2Request}'s interactions with
- C{cookielib.CookieJar} instances.
+ C{CookieJar} instances.
"""
def makeCookieJar(self):
"""
- @return: a C{cookielib.CookieJar} with some sample cookies
+ @return: a C{CookieJar} with some sample cookies
"""
- cookieJar = cookielib.CookieJar()
+ cookieJar = CookieJar()
reqres = self.addCookies(
cookieJar,
'http://example.com:1234/foo?bar',
@@ -1760,7 +1794,7 @@ class CookieJarTests(TestCase, CookieTes
def test_extractCookies(self):
"""
- L{cookielib.CookieJar.extract_cookies} extracts cookie information from
+ L{CookieJar.extract_cookies} extracts cookie information from
fake urllib2 response instances.
"""
jar = self.makeCookieJar()[0]
@@ -1785,7 +1819,7 @@ class CookieJarTests(TestCase, CookieTes
def test_sendCookie(self):
"""
- L{cookielib.CookieJar.add_cookie_header} adds a cookie header to a fake
+ L{CookieJar.add_cookie_header} adds a cookie header to a fake
urllib2 request instance.
"""
jar, (request, response) = self.makeCookieJar()
@@ -1812,7 +1846,7 @@ class CookieAgentTests(TestCase, CookieT
"""
return client.CookieAgent(
self.buildAgentForWrapperTest(self.reactor),
- cookielib.CookieJar())
+ CookieJar())
def setUp(self):
@@ -1826,7 +1860,7 @@ class CookieAgentTests(TestCase, CookieT
being requested. Cookies are extracted from the response and stored in
the cookie jar.
"""
- cookieJar = cookielib.CookieJar()
+ cookieJar = CookieJar()
self.assertEqual(list(cookieJar), [])
agent = self.buildAgentForWrapperTest(self.reactor)
@@ -1865,7 +1899,7 @@ class CookieAgentTests(TestCase, CookieT
uri = 'http://example.com:1234/foo?bar'
cookie = 'foo=1'
- cookieJar = cookielib.CookieJar()
+ cookieJar = CookieJar()
self.addCookies(cookieJar, uri, [cookie])
self.assertEqual(len(list(cookieJar)), 1)
@@ -1885,7 +1919,7 @@ class CookieAgentTests(TestCase, CookieT
uri = 'https://example.com:1234/foo?bar'
cookie = 'foo=1;secure'
- cookieJar = cookielib.CookieJar()
+ cookieJar = CookieJar()
self.addCookies(cookieJar, uri, [cookie])
self.assertEqual(len(list(cookieJar)), 1)
@@ -1905,7 +1939,7 @@ class CookieAgentTests(TestCase, CookieT
uri = 'http://example.com/foo?bar'
cookie = 'foo=1;secure'
- cookieJar = cookielib.CookieJar()
+ cookieJar = CookieJar()
self.addCookies(cookieJar, uri, [cookie])
self.assertEqual(len(list(cookieJar)), 1)
@@ -1925,7 +1959,7 @@ class CookieAgentTests(TestCase, CookieT
uri = 'https://example.com:1234/foo?bar'
cookie = 'foo=1;port=1234'
- cookieJar = cookielib.CookieJar()
+ cookieJar = CookieJar()
self.addCookies(cookieJar, uri, [cookie])
self.assertEqual(len(list(cookieJar)), 1)
@@ -1945,7 +1979,7 @@ class CookieAgentTests(TestCase, CookieT
uri = 'https://example.com:4567/foo?bar'
cookie = 'foo=1;port=1234'
- cookieJar = cookielib.CookieJar()
+ cookieJar = CookieJar()
self.addCookies(cookieJar, uri, [cookie])
self.assertEqual(len(list(cookieJar)), 0)
@@ -2400,11 +2434,26 @@ class ProxyAgentTests(TestCase, FakeReac
-class _RedirectAgentTestsMixin(object):
+SENSITIVE_HEADERS = [
+ b"authorization",
+ b"cookie",
+ b"cookie2",
+ b"proxy-authorization",
+ b"www-authenticate",
+]
+
+testMixinClass = object
+
+
+class _RedirectAgentTestsMixin(testMixinClass):
"""
Test cases mixin for L{RedirectAgentTests} and
L{BrowserLikeRedirectAgentTests}.
"""
+ agent: IAgent
+ reactor: MemoryReactorClock
+ protocol: StubHTTPProtocol
+
def test_noRedirect(self):
"""
L{client.RedirectAgent} behaves like L{client.Agent} if the response
@@ -2423,25 +2472,58 @@ class _RedirectAgentTestsMixin(object):
self.assertIdentical(response, result)
self.assertIdentical(result.previousResponse, None)
-
- def _testRedirectDefault(self, code):
+ def _testRedirectDefault(
+ self,
+ code: int,
+ crossScheme: bool = False,
+ crossDomain: bool = False,
+ crossPort: bool = False,
+ requestHeaders: Optional[Headers] = None,
+ ) -> Request:
"""
When getting a redirect, L{client.RedirectAgent} follows the URL
specified in the L{Location} header field and make a new request.
@param code: HTTP status code.
"""
- self.agent.request('GET', 'http://example.com/foo')
+ startDomain = b"example.com"
+ startScheme = b"https" if ssl is not None else b"http"
+ startPort = 80 if startScheme == b"http" else 443
+ self.agent.request(
+ b"GET", startScheme + b"://" + startDomain + b"/foo", headers=requestHeaders
+ )
host, port = self.reactor.tcpClients.pop()[:2]
self.assertEqual("example.com", host)
- self.assertEqual(80, port)
+ self.assertEqual(startPort, port)
req, res = self.protocol.requests.pop()
- headers = http_headers.Headers(
- {'location': ['https://example.com/bar']})
- response = Response(('HTTP', 1, 1), code, 'OK', headers, None)
+ # If possible (i.e.: TLS support is present), run the test with a
+ # cross-scheme redirect to verify that the scheme is honored; if not,
+ # let's just make sure it works at all.
+
+ targetScheme = startScheme
+ targetDomain = startDomain
+ targetPort = startPort
+
+ if crossScheme:
+ if ssl is None:
+ raise SkipTest(
+ "Cross-scheme redirects can't be tested without TLS support."
+ )
+ targetScheme = b"https" if startScheme == b"http" else b"http"
+ targetPort = 443 if startPort == 80 else 80
+
+ portSyntax = b""
+ if crossPort:
+ targetPort = 8443
+ portSyntax = b":8443"
+ targetDomain = b"example.net" if crossDomain else startDomain
+ locationValue = targetScheme + b"://" + targetDomain + portSyntax + b"/bar"
+ headers = http_headers.Headers({b"location": [locationValue]}
+ )
+ response = Response((b'HTTP', 1, 1), code, b'OK', headers, None)
res.callback(response)
req2, res2 = self.protocol.requests.pop()
@@ -2449,9 +2531,9 @@ class _RedirectAgentTestsMixin(object):
self.assertEqual('/bar', req2.uri)
host, port = self.reactor.sslClients.pop()[:2]
- self.assertEqual("example.com", host)
- self.assertEqual(443, port)
-
+ self.assertEqual(EXAMPLE_NET_IP if crossDomain else EXAMPLE_COM_IP, host)
+ self.assertEqual(targetPort, port)
+ return req2
def test_redirect301(self):
"""
@@ -2688,19 +2770,24 @@ class _RedirectAgentTestsMixin(object):
self.assertIdentical(redirectResponse.previousResponse, None)
-
-class RedirectAgentTests(TestCase, FakeReactorAndConnectMixin,
- _RedirectAgentTestsMixin, AgentTestsMixin):
+class RedirectAgentTests(
+ FakeReactorAndConnectMixin,
+ _RedirectAgentTestsMixin,
+ AgentTestsMixin,
+ runtimeTestCase
+):
"""
Tests for L{client.RedirectAgent}.
"""
+
def makeAgent(self):
"""
@return: a new L{twisted.web.client.RedirectAgent}
"""
return client.RedirectAgent(
- self.buildAgentForWrapperTest(self.reactor))
-
+ self.buildAgentForWrapperTest(self.reactor),
+ sensitiveHeaderNames=[b"X-Custom-sensitive"],
+ )
def setUp(self):
self.reactor = self.Reactor()
@@ -2725,11 +2812,12 @@ class RedirectAgentTests(TestCase, FakeR
self._testPageRedirectFailure(302, 'POST')
-
-class BrowserLikeRedirectAgentTests(TestCase,
- FakeReactorAndConnectMixin,
- _RedirectAgentTestsMixin,
- AgentTestsMixin):
+class BrowserLikeRedirectAgentTests(
+ FakeReactorAndConnectMixin,
+ _RedirectAgentTestsMixin,
+ AgentTestsMixin,
+ runtimeTestCase,
+):
"""
Tests for L{client.BrowserLikeRedirectAgent}.
"""
@@ -2738,7 +2826,9 @@ class BrowserLikeRedirectAgentTests(Test
@return: a new L{twisted.web.client.BrowserLikeRedirectAgent}
"""
return client.BrowserLikeRedirectAgent(
- self.buildAgentForWrapperTest(self.reactor))
+ self.buildAgentForWrapperTest(self.reactor),
+ sensitiveHeaderNames=[b"x-Custom-sensitive"],
+ )
def setUp(self):