File CVE-2024-6923-email-hdr-inject.patch of Package python3.35633

From 72561b15a3dd128b84401b5423f16f810b49e1d6 Mon Sep 17 00:00:00 2001
From: Petr Viktorin <encukou@gmail.com>
Date: Wed, 24 Jul 2024 15:13:14 +0200
Subject: [PATCH] [CVE-2024-6923] Encode newlines in headers, and verify
 headers are sound

The :mod:`~email.generator` will now refuse to serialize (write) headers
that are improperly folded or delimited, such that they would be parsed as
multiple headers or joined with adjacent data.
If you need to turn this safety feature off,
set `~email.policy.Policy.verify_generated_headers`.

Per RFC 2047:

> [...] these encoding schemes allow the
> encoding of arbitrary octet values, mail readers that implement this
> decoding should also ensure that display of the decoded data on the
> recipient's terminal will not cause unwanted side-effects

It seems that the "quoted-word" scheme is a valid way to include
a newline character in a header value, just like we already allow
undecodable bytes or control characters.

They do need to be properly quoted when serialized to text, though.

Fixes: gh#python/cpython#121650
Fixes: bsc#1228780 (CVE-2024-6923)
From-PR: gh#python/cpython!122233
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
Co-authored-by: Bas Bloemsaat <bas@bloemsaat.com>
Co-authored-by: Petr Viktorin <encukou@gmail.com>
Co-authored-by: Jakub Stasiak <jakub@stasiak.at>
Patch: CVE-2024-6923-email-hdr-inject.patch
---
 Doc/library/email.errors.rst                                            |    6 
 Doc/library/email.policy.rst                                            |   18 
 Lib/email/_header_value_parser.py                                       |  218 +++++++++-
 Lib/email/_policybase.py                                                |    8 
 Lib/email/errors.py                                                     |    4 
 Lib/email/generator.py                                                  |   16 
 Lib/email/policy.py                                                     |    3 
 Lib/test/test_email/test__header_value_parser.py                        |    6 
 Lib/test/test_email/test_generator.py                                   |   62 ++
 Lib/test/test_email/test_headerregistry.py                              |    2 
 Lib/test/test_email/test_policy.py                                      |   26 +
 Misc/NEWS.d/next/Library/2024-07-27-16-10-41.gh-issue-121650.nf6oc9.rst |    5 
 12 files changed, 354 insertions(+), 20 deletions(-)
 create mode 100644 Misc/NEWS.d/next/Library/2024-07-27-16-10-41.gh-issue-121650.nf6oc9.rst

--- a/Doc/library/email.errors.rst
+++ b/Doc/library/email.errors.rst
@@ -59,6 +59,12 @@ The following exception classes are defi
    :class:`~email.mime.image.MIMEImage`).
 
 
+.. exception:: HeaderWriteError()
+
+   Raised when an error occurs when the :mod:`~email.generator` outputs
+   headers.
+
+
 Here is the list of the defects that the :class:`~email.parser.FeedParser`
 can find while parsing messages.  Note that the defects are added to the message
 where the problem was found, so for example, if a message nested inside a
--- a/Doc/library/email.policy.rst
+++ b/Doc/library/email.policy.rst
@@ -229,6 +229,24 @@ added matters.  To illustrate::
 
       .. versionadded:: 3.6
 
+
+   .. attribute:: verify_generated_headers
+
+      If ``True`` (the default), the generator will raise
+      :exc:`~email.errors.HeaderWriteError` instead of writing a header
+      that is improperly folded or delimited, such that it would
+      be parsed as multiple headers or joined with adjacent data.
+      Such headers can be generated by custom header classes or bugs
+      in the ``email`` module.
+
+      As it's a security feature, this defaults to ``True`` even in the
+      :class:`~email.policy.Compat32` policy.
+      For backwards compatible, but unsafe, behavior, it must be set to
+      ``False`` explicitly.
+
+      .. versionadded:: 3.13
+
+
    The following :class:`Policy` method is intended to be called by code using
    the email library to create policy instances with custom settings:
 
--- a/Lib/email/_header_value_parser.py
+++ b/Lib/email/_header_value_parser.py
@@ -68,6 +68,7 @@ XXX: provide complete list of token type
 """
 
 import re
+import sys
 import urllib   # For urllib.parse.unquote
 from string import hexdigits
 from collections import OrderedDict
@@ -92,10 +93,24 @@ TOKEN_ENDS = TSPECIALS | WSP
 ASPECIALS = TSPECIALS | set("*'%")
 ATTRIBUTE_ENDS = ASPECIALS | WSP
 EXTENDED_ATTRIBUTE_ENDS = ATTRIBUTE_ENDS - set('%')
+NLSET = {'\n', '\r'}
+SPECIALSNL = SPECIALS | NLSET
 
 def quote_string(value):
     return '"'+str(value).replace('\\', '\\\\').replace('"', r'\"')+'"'
 
+# Match a RFC 2047 word, looks like =?utf-8?q?someword?=
+rfc2047_matcher = re.compile(r'''
+   =\?            # literal =?
+   [^?]*          # charset
+   \?             # literal ?
+   [qQbB]         # literal 'q' or 'b', case insensitive
+   \?             # literal ?
+  .*?             # encoded word
+  \?=             # literal ?=
+''', re.VERBOSE | re.MULTILINE)
+
+
 #
 # TokenList and its subclasses
 #
@@ -506,6 +521,11 @@ class DotAtomText(TokenList):
     as_ew_allowed = True
 
 
+class NoFoldLiteral(TokenList):
+    token_type = 'no-fold-literal'
+    as_ew_allowed = False
+
+
 class AddrSpec(TokenList):
 
     token_type = 'addr-spec'
@@ -836,8 +856,24 @@ class HeaderLabel(TokenList):
     as_ew_allowed = False
 
 
-class Header(TokenList):
+class MsgID(TokenList):
+    token_type = 'msg-id'
+    as_ew_allowed = False
+
+    def fold(self, policy):
+        # message-id tokens may not be folded.
+        return str(self) + policy.linesep
+
 
+class MessageID(MsgID):
+    token_type = 'message-id'
+
+
+class InvalidMessageID(MessageID):
+    token_type = 'invalid-message-id'
+
+
+class Header(TokenList):
     token_type = 'header'
 
 
@@ -918,6 +954,10 @@ class EWWhiteSpaceTerminal(WhiteSpaceTer
         return ''
 
 
+class _InvalidEwError(errors.HeaderParseError):
+    """Invalid encoded word found while parsing headers."""
+
+
 # XXX these need to become classes and used as instances so
 # that a program can't change them in a parse tree and screw
 # up other parse trees.  Maybe should have  tests for that, too.
@@ -948,15 +988,14 @@ RouteComponentMarker = ValueTerminal('@'
 
 _wsp_splitter = re.compile(r'([{}]+)'.format(''.join(WSP))).split
 _non_atom_end_matcher = re.compile(r"[^{}]+".format(
-    ''.join(ATOM_ENDS).replace('\\','\\\\').replace(']',r'\]'))).match
+    re.escape(''.join(ATOM_ENDS)))).match
 _non_printable_finder = re.compile(r"[\x00-\x20\x7F]").findall
 _non_token_end_matcher = re.compile(r"[^{}]+".format(
-    ''.join(TOKEN_ENDS).replace('\\','\\\\').replace(']',r'\]'))).match
+    re.escape(''.join(TOKEN_ENDS)))).match
 _non_attribute_end_matcher = re.compile(r"[^{}]+".format(
-    ''.join(ATTRIBUTE_ENDS).replace('\\','\\\\').replace(']',r'\]'))).match
+    re.escape(''.join(ATTRIBUTE_ENDS)))).match
 _non_extended_attribute_end_matcher = re.compile(r"[^{}]+".format(
-    ''.join(EXTENDED_ATTRIBUTE_ENDS).replace(
-                                    '\\','\\\\').replace(']',r'\]'))).match
+    re.escape(''.join(EXTENDED_ATTRIBUTE_ENDS)))).match
 
 def _validate_xtext(xtext):
     """If input token contains ASCII non-printables, register a defect."""
@@ -1023,7 +1062,10 @@ def get_encoded_word(value):
         raise errors.HeaderParseError(
             "expected encoded word but found {}".format(value))
     remstr = ''.join(remainder)
-    if len(remstr) > 1 and remstr[0] in hexdigits and remstr[1] in hexdigits:
+    if (len(remstr) > 1 and
+        remstr[0] in hexdigits and
+        remstr[1] in hexdigits and
+        tok.count('?') < 2):
         # The ? after the CTE was followed by an encoded word escape (=XX).
         rest, *remainder = remstr.split('?=', 1)
         tok = tok + '?=' + rest
@@ -1034,8 +1076,8 @@ def get_encoded_word(value):
     value = ''.join(remainder)
     try:
         text, charset, lang, defects = _ew.decode('=?' + tok + '?=')
-    except ValueError:
-        raise errors.HeaderParseError(
+    except (ValueError, KeyError):
+        raise _InvalidEwError(
             "encoded word format invalid: '{}'".format(ew.cte))
     ew.charset = charset
     ew.lang = lang
@@ -1050,6 +1092,10 @@ def get_encoded_word(value):
         _validate_xtext(vtext)
         ew.append(vtext)
         text = ''.join(remainder)
+    # Encoded words should be followed by a WS
+    if value and value[0] not in WSP:
+        ew.defects.append(errors.InvalidHeaderDefect(
+            "missing trailing whitespace after encoded-word"))
     return ew, value
 
 def get_unstructured(value):
@@ -1081,9 +1127,12 @@ def get_unstructured(value):
             token, value = get_fws(value)
             unstructured.append(token)
             continue
+        valid_ew = True
         if value.startswith('=?'):
             try:
                 token, value = get_encoded_word(value)
+            except _InvalidEwError:
+                valid_ew = False
             except errors.HeaderParseError:
                 # XXX: Need to figure out how to register defects when
                 # appropriate here.
@@ -1102,6 +1151,14 @@ def get_unstructured(value):
                 unstructured.append(token)
                 continue
         tok, *remainder = _wsp_splitter(value, 1)
+        # Split in the middle of an atom if there is a rfc2047 encoded word
+        # which does not have WSP on both sides. The defect will be registered
+        # the next time through the loop.
+        # This needs to only be performed when the encoded word is valid;
+        # otherwise, performing it on an invalid encoded word can cause
+        # the parser to go in an infinite loop.
+        if valid_ew and rfc2047_matcher.search(tok):
+            tok, *remainder = value.partition('=?')
         vtext = ValueTerminal(tok, 'vtext')
         _validate_xtext(vtext)
         unstructured.append(vtext)
@@ -1168,19 +1225,28 @@ def get_bare_quoted_string(value):
             "expected '\"' but found '{}'".format(value))
     bare_quoted_string = BareQuotedString()
     value = value[1:]
-    if value[0] == '"':
+    if value and value[0] == '"':
         token, value = get_qcontent(value)
         bare_quoted_string.append(token)
     while value and value[0] != '"':
         if value[0] in WSP:
             token, value = get_fws(value)
         elif value[:2] == '=?':
+            valid_ew = False
             try:
                 token, value = get_encoded_word(value)
                 bare_quoted_string.defects.append(errors.InvalidHeaderDefect(
                     "encoded word inside quoted string"))
+                valid_ew = True
             except errors.HeaderParseError:
                 token, value = get_qcontent(value)
+            # Collapse the whitespace between two encoded words that occur in a
+            # bare-quoted-string.
+            if valid_ew and len(bare_quoted_string) > 1:
+                if (bare_quoted_string[-1].token_type == 'fws' and
+                        bare_quoted_string[-2].token_type == 'encoded-word'):
+                    bare_quoted_string[-1] = EWWhiteSpaceTerminal(
+                        bare_quoted_string[-1], 'fws')
         else:
             token, value = get_qcontent(value)
         bare_quoted_string.append(token)
@@ -1337,6 +1403,9 @@ def get_word(value):
         leader, value = get_cfws(value)
     else:
         leader = None
+    if not value:
+        raise errors.HeaderParseError(
+            "Expected 'atom' or 'quoted-string' but found nothing.")
     if value[0]=='"':
         token, value = get_quoted_string(value)
     elif value[0] in SPECIALS:
@@ -1586,7 +1655,7 @@ def get_addr_spec(value):
     addr_spec.append(token)
     if not value or value[0] != '@':
         addr_spec.defects.append(errors.InvalidHeaderDefect(
-            "add-spec local part with no domain"))
+            "addr-spec local part with no domain"))
         return addr_spec, value
     addr_spec.append(ValueTerminal('@', 'address-at-symbol'))
     token, value = get_domain(value[1:])
@@ -1971,6 +2040,118 @@ def get_address_list(value):
             value = value[1:]
     return address_list, value
 
+
+def get_no_fold_literal(value):
+    """ no-fold-literal = "[" *dtext "]"
+    """
+    no_fold_literal = NoFoldLiteral()
+    if not value:
+        raise errors.HeaderParseError(
+            "expected no-fold-literal but found '{}'".format(value))
+    if value[0] != '[':
+        raise errors.HeaderParseError(
+            "expected '[' at the start of no-fold-literal "
+            "but found '{}'".format(value))
+    no_fold_literal.append(ValueTerminal('[', 'no-fold-literal-start'))
+    value = value[1:]
+    token, value = get_dtext(value)
+    no_fold_literal.append(token)
+    if not value or value[0] != ']':
+        raise errors.HeaderParseError(
+            "expected ']' at the end of no-fold-literal "
+            "but found '{}'".format(value))
+    no_fold_literal.append(ValueTerminal(']', 'no-fold-literal-end'))
+    return no_fold_literal, value[1:]
+
+def get_msg_id(value):
+    """msg-id = [CFWS] "<" id-left '@' id-right  ">" [CFWS]
+       id-left = dot-atom-text / obs-id-left
+       id-right = dot-atom-text / no-fold-literal / obs-id-right
+       no-fold-literal = "[" *dtext "]"
+    """
+    msg_id = MsgID()
+    if value and value[0] in CFWS_LEADER:
+        token, value = get_cfws(value)
+        msg_id.append(token)
+    if not value or value[0] != '<':
+        raise errors.HeaderParseError(
+            "expected msg-id but found '{}'".format(value))
+    msg_id.append(ValueTerminal('<', 'msg-id-start'))
+    value = value[1:]
+    # Parse id-left.
+    try:
+        token, value = get_dot_atom_text(value)
+    except errors.HeaderParseError:
+        try:
+            # obs-id-left is same as local-part of add-spec.
+            token, value = get_obs_local_part(value)
+            msg_id.defects.append(errors.ObsoleteHeaderDefect(
+                "obsolete id-left in msg-id"))
+        except errors.HeaderParseError:
+            raise errors.HeaderParseError(
+                "expected dot-atom-text or obs-id-left"
+                " but found '{}'".format(value))
+    msg_id.append(token)
+    if not value or value[0] != '@':
+        msg_id.defects.append(errors.InvalidHeaderDefect(
+            "msg-id with no id-right"))
+        # Even though there is no id-right, if the local part
+        # ends with `>` let's just parse it too and return
+        # along with the defect.
+        if value and value[0] == '>':
+            msg_id.append(ValueTerminal('>', 'msg-id-end'))
+            value = value[1:]
+        return msg_id, value
+    msg_id.append(ValueTerminal('@', 'address-at-symbol'))
+    value = value[1:]
+    # Parse id-right.
+    try:
+        token, value = get_dot_atom_text(value)
+    except errors.HeaderParseError:
+        try:
+            token, value = get_no_fold_literal(value)
+        except errors.HeaderParseError as e:
+            try:
+                token, value = get_domain(value)
+                msg_id.defects.append(errors.ObsoleteHeaderDefect(
+                    "obsolete id-right in msg-id"))
+            except errors.HeaderParseError:
+                raise errors.HeaderParseError(
+                    "expected dot-atom-text, no-fold-literal or obs-id-right"
+                    " but found '{}'".format(value))
+    msg_id.append(token)
+    if value and value[0] == '>':
+        value = value[1:]
+    else:
+        msg_id.defects.append(errors.InvalidHeaderDefect(
+            "missing trailing '>' on msg-id"))
+    msg_id.append(ValueTerminal('>', 'msg-id-end'))
+    if value and value[0] in CFWS_LEADER:
+        token, value = get_cfws(value)
+        msg_id.append(token)
+    return msg_id, value
+
+
+def parse_message_id(value):
+    """message-id      =   "Message-ID:" msg-id CRLF
+    """
+    message_id = MessageID()
+    try:
+        token, value = get_msg_id(value)
+        message_id.append(token)
+    except errors.HeaderParseError as ex:
+        token = get_unstructured(value)
+        message_id = InvalidMessageID(token)
+        message_id.defects.append(
+            errors.InvalidHeaderDefect("Invalid msg-id: {!r}".format(ex)))
+    else:
+        # Value after parsing a valid msg_id should be None.
+        if value:
+            message_id.defects.append(errors.InvalidHeaderDefect(
+                "Unexpected {!r}".format(value)))
+
+    return message_id
+
 #
 # XXX: As I begin to add additional header parsers, I'm realizing we probably
 # have two level of parser routines: the get_XXX methods that get a token in
@@ -2391,7 +2572,7 @@ def parse_mime_parameters(value):
     the formal RFC grammar, but it is more convenient for us for the set of
     parameters to be treated as its own TokenList.
 
-    This is 'parse' routine because it consumes the reminaing value, but it
+    This is 'parse' routine because it consumes the remaining value, but it
     would never be called to parse a full header.  Instead it is called to
     parse everything after the non-parameter value of a specific MIME header.
 
@@ -2597,7 +2778,7 @@ def _refold_parse_tree(parse_tree, *, po
 
     """
     # max_line_length 0/None means no limit, ie: infinitely long.
-    maxlen = policy.max_line_length or float("+inf")
+    maxlen = policy.max_line_length or sys.maxsize
     encoding = 'utf-8' if policy.utf8 else 'us-ascii'
     lines = ['']
     last_ew = None
@@ -2611,6 +2792,14 @@ def _refold_parse_tree(parse_tree, *, po
             wrap_as_ew_blocked -= 1
             continue
         tstr = str(part)
+
+        if not want_encoding:
+            if part.token_type == 'ptext':
+                # Encode if tstr contains special characters.
+                want_encoding = not SPECIALSNL.isdisjoint(tstr)
+            else:
+                # Encode if tstr contains newlines.
+                want_encoding = not NLSET.isdisjoint(tstr)
         try:
             tstr.encode(encoding)
             charset = encoding
@@ -2632,7 +2821,7 @@ def _refold_parse_tree(parse_tree, *, po
                 want_encoding = False
                 last_ew = None
                 if part.syntactic_break:
-                    encoded_part = part.fold(policy=policy)[:-1] # strip nl
+                    encoded_part = part.fold(policy=policy)[:-len(policy.linesep)]
                     if policy.linesep not in encoded_part:
                         # It fits on a single line
                         if len(encoded_part) > maxlen - len(lines[-1]):
@@ -2667,6 +2856,7 @@ def _refold_parse_tree(parse_tree, *, po
             newline = _steal_trailing_WSP_if_exists(lines)
             if newline or part.startswith_fws():
                 lines.append(newline + tstr)
+                last_ew = None
                 continue
         if not hasattr(part, 'encode'):
             # It's not a terminal, try folding the subparts.
--- a/Lib/email/_policybase.py
+++ b/Lib/email/_policybase.py
@@ -157,6 +157,13 @@ class Policy(_PolicyBase, metaclass=abc.
     message_factory     -- the class to use to create new message objects.
                            If the value is None, the default is Message.
 
+    verify_generated_headers
+                        -- if true, the generator verifies that each header
+                           they are properly folded, so that a parser won't
+                           treat it as multiple headers, start-of-body, or
+                           part of another header.
+                           This is a check against custom Header & fold()
+                           implementations.
     """
 
     raise_on_defect = False
@@ -165,6 +172,7 @@ class Policy(_PolicyBase, metaclass=abc.
     max_line_length = 78
     mangle_from_ = False
     message_factory = None
+    verify_generated_headers = True
 
     def handle_defect(self, obj, defect):
         """Based on policy, either raise defect or call register_defect.
--- a/Lib/email/errors.py
+++ b/Lib/email/errors.py
@@ -29,6 +29,10 @@ class CharsetError(MessageError):
     """An illegal charset was given."""
 
 
+class HeaderWriteError(MessageError):
+    """Error while writing headers."""
+
+
 # These are parsing defects which the parser was able to work around.
 class MessageDefect(ValueError):
     """Base class for a message defect."""
--- a/Lib/email/generator.py
+++ b/Lib/email/generator.py
@@ -14,12 +14,14 @@ import random
 from copy import deepcopy
 from io import StringIO, BytesIO
 from email.utils import _has_surrogates
+from email.errors import HeaderWriteError
 
 UNDERSCORE = '_'
 NL = '\n'  # XXX: no longer used by the code below.
 
 NLCRE = re.compile(r'\r\n|\r|\n')
 fcre = re.compile(r'^From ', re.MULTILINE)
+NEWLINE_WITHOUT_FWSP = re.compile(r'\r\n[^ \t]|\r[^ \n\t]|\n[^ \t]')
 
 
 
@@ -223,7 +225,19 @@ class Generator:
 
     def _write_headers(self, msg):
         for h, v in msg.raw_items():
-            self.write(self.policy.fold(h, v))
+            folded = self.policy.fold(h, v)
+            if self.policy.verify_generated_headers:
+                linesep = self.policy.linesep
+                if not folded.endswith(self.policy.linesep):
+                    raise HeaderWriteError(
+                        f'folded header does not end with {linesep!r}: {folded!r}')
+                folded_no_linesep = folded
+                if folded.endswith(linesep):
+                    folded_no_linesep = folded[:-len(linesep)]
+                if NEWLINE_WITHOUT_FWSP.search(folded_no_linesep):
+                    raise HeaderWriteError(
+                        f'folded header contains newline: {folded!r}')
+            self.write(folded)
         # A blank line always separates headers from body
         self.write(self._NL)
 
--- a/Lib/email/policy.py
+++ b/Lib/email/policy.py
@@ -3,6 +3,7 @@ code that adds all the email6 features.
 """
 
 import re
+import sys
 from email._policybase import Policy, Compat32, compat32, _extend_docstrings
 from email.utils import _has_surrogates
 from email.headerregistry import HeaderRegistry as HeaderRegistry
@@ -203,7 +204,7 @@ class EmailPolicy(Policy):
     def _fold(self, name, value, refold_binary=False):
         if hasattr(value, 'name'):
             return value.fold(policy=self)
-        maxlen = self.max_line_length if self.max_line_length else float('inf')
+        maxlen = self.max_line_length if self.max_line_length else sys.maxsize
         lines = value.splitlines()
         refold = (self.refold_source == 'all' or
                   self.refold_source == 'long' and
--- a/Lib/test/test_email/test__header_value_parser.py
+++ b/Lib/test/test_email/test__header_value_parser.py
@@ -118,7 +118,7 @@ class TestParser(TestParserMixin, TestEm
                          '=?us-ascii?q?first?==?utf-8?q?second?=',
                          'first',
                          'first',
-                         [],
+                         [errors.InvalidHeaderDefect],
                          '=?utf-8?q?second?=')
 
     def test_get_encoded_word_sets_extra_attributes(self):
@@ -361,7 +361,7 @@ class TestParser(TestParserMixin, TestEm
             '=?utf-8?q?foo?==?utf-8?q?bar?=',
             'foobar',
             'foobar',
-            [errors.InvalidHeaderDefect],
+            [errors.InvalidHeaderDefect, errors.InvalidHeaderDefect],
             '')
 
     # get_qp_ctext
@@ -546,7 +546,7 @@ class TestParser(TestParserMixin, TestEm
             '"=?utf-8?Q?not_really_valid?="',
             '"not really valid"',
             'not really valid',
-            [errors.InvalidHeaderDefect],
+            [errors.InvalidHeaderDefect, errors.InvalidHeaderDefect],
             '')
 
     # get_comment
--- a/Lib/test/test_email/test_generator.py
+++ b/Lib/test/test_email/test_generator.py
@@ -5,6 +5,7 @@ from email import message_from_string, m
 from email.message import EmailMessage
 from email.generator import Generator, BytesGenerator
 from email import policy
+import email.errors
 from test.test_email import TestEmailBase, parameterize
 
 
@@ -215,6 +216,44 @@ class TestGeneratorBase:
         g.flatten(msg)
         self.assertEqual(s.getvalue(), self.typ(expected))
 
+    def test_keep_encoded_newlines(self):
+        msg = self.msgmaker(self.typ(textwrap.dedent("""\
+            To: nobody
+            Subject: Bad subject=?UTF-8?Q?=0A?=Bcc: injection@example.com
+
+            None
+            """)))
+        expected = textwrap.dedent("""\
+            To: nobody
+            Subject: Bad subject=?UTF-8?Q?=0A?=Bcc: injection@example.com
+
+            None
+            """)
+        s = self.ioclass()
+        g = self.genclass(s, policy=self.policy.clone(max_line_length=80))
+        g.flatten(msg)
+        self.assertEqual(s.getvalue(), self.typ(expected))
+
+    def test_keep_long_encoded_newlines(self):
+        msg = self.msgmaker(self.typ(textwrap.dedent("""\
+            To: nobody
+            Subject: Bad subject=?UTF-8?Q?=0A?=Bcc: injection@example.com
+
+            None
+            """)))
+        expected = textwrap.dedent("""\
+            To: nobody
+            Subject: Bad subject
+             =?utf-8?q?=0A?=Bcc:
+             injection@example.com
+
+            None
+            """)
+        s = self.ioclass()
+        g = self.genclass(s, policy=self.policy.clone(max_line_length=30))
+        g.flatten(msg)
+        self.assertEqual(s.getvalue(), self.typ(expected))
+
 
 class TestGenerator(TestGeneratorBase, TestEmailBase):
 
@@ -223,6 +262,29 @@ class TestGenerator(TestGeneratorBase, T
     ioclass = io.StringIO
     typ = str
 
+    def test_verify_generated_headers(self):
+        """gh-121650: by default the generator prevents header injection"""
+        class LiteralHeader(str):
+            name = 'Header'
+            def fold(self, **kwargs):
+                return self
+
+        for text in (
+            'Value\r\nBad Injection\r\n',
+            'NoNewLine'
+        ):
+            with self.subTest(text=text):
+                message = message_from_string(
+                    "Header: Value\r\n\r\nBody",
+                    policy=self.policy,
+                )
+
+                del message['Header']
+                message['Header'] = LiteralHeader(text)
+
+                with self.assertRaises(email.errors.HeaderWriteError):
+                    message.as_string()
+
 
 class TestBytesGenerator(TestGeneratorBase, TestEmailBase):
 
--- a/Lib/test/test_email/test_headerregistry.py
+++ b/Lib/test/test_email/test_headerregistry.py
@@ -1180,7 +1180,7 @@ class TestAddressHeader(TestHeaderBase):
 
         'rfc2047_atom_in_quoted_string_is_decoded':
             ('"=?utf-8?q?=C3=89ric?=" <foo@example.com>',
-            [errors.InvalidHeaderDefect],
+            [errors.InvalidHeaderDefect, errors.InvalidHeaderDefect],
             'Éric <foo@example.com>',
             'Éric',
             'foo@example.com',
--- a/Lib/test/test_email/test_policy.py
+++ b/Lib/test/test_email/test_policy.py
@@ -26,6 +26,7 @@ class PolicyAPITests(unittest.TestCase):
         'raise_on_defect':          False,
         'mangle_from_':             True,
         'message_factory':          None,
+        'verify_generated_headers': True,
         }
     # These default values are the ones set on email.policy.default.
     # If any of these defaults change, the docs must be updated.
@@ -265,6 +266,31 @@ class PolicyAPITests(unittest.TestCase):
                 with self.assertRaises(email.errors.HeaderParseError):
                     policy.fold("Subject", subject)
 
+    def test_verify_generated_headers(self):
+        """Turning protection off allows header injection"""
+        policy = email.policy.default.clone(verify_generated_headers=False)
+        for text in (
+            'Header: Value\r\nBad: Injection\r\n',
+            'Header: NoNewLine'
+        ):
+            with self.subTest(text=text):
+                message = email.message_from_string(
+                    "Header: Value\r\n\r\nBody",
+                    policy=policy,
+                )
+                class LiteralHeader(str):
+                    name = 'Header'
+                    def fold(self, **kwargs):
+                        return self
+
+                del message['Header']
+                message['Header'] = LiteralHeader(text)
+
+                self.assertEqual(
+                    message.as_string(),
+                    f"{text}\nBody",
+                )
+
     # XXX: Need subclassing tests.
     # For adding subclassed objects, make sure the usual rules apply (subclass
     # wins), but that the order still works (right overrides left).
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2024-07-27-16-10-41.gh-issue-121650.nf6oc9.rst
@@ -0,0 +1,5 @@
+:mod:`email` headers with embedded newlines are now quoted on output. The
+:mod:`~email.generator` will now refuse to serialize (write) headers that
+are unsafely folded or delimited; see
+:attr:`~email.policy.Policy.verify_generated_headers`. (Contributed by Bas
+Bloemsaat and Petr Viktorin in :gh:`121650`.)
openSUSE Build Service is sponsored by