File CVE-2026-32714.patch of Package python-scitokens

From 3dba108853f2f4a6c0f2325c03779bf083c41cf2 Mon Sep 17 00:00:00 2001
From: Derek Weitzel <djw8605@gmail.com>
Date: Fri, 13 Mar 2026 10:44:27 -0500
Subject: [PATCH] Refactor KeyCache SQL queries to use parameterized statements
 for security and add regression tests for SQL injection prevention

---
 src/scitokens/utils/keycache.py |  31 ++++----
 tests/test_keycache.py          | 134 ++++++++++++++++++++++++++++++++
 2 files changed, 149 insertions(+), 16 deletions(-)

Index: scitokens-1.8.1/src/scitokens/utils/keycache.py
===================================================================
--- scitokens-1.8.1.orig/src/scitokens/utils/keycache.py
+++ scitokens-1.8.1/src/scitokens/utils/keycache.py
@@ -76,7 +76,7 @@ class KeyCache(object):
         conn = sqlite3.connect(self.cache_location)
         conn.row_factory = sqlite3.Row
         curs = conn.cursor()
-        curs.execute("DELETE FROM keycache WHERE issuer = '{}' AND key_id = '{}'".format(issuer, key_id))
+        curs.execute("DELETE FROM keycache WHERE issuer = ? AND key_id = ?", [issuer, key_id])
         KeyCache._addkeyinfo(curs, issuer, key_id, public_key, cache_timer=cache_timer, next_update=next_update)
         conn.commit()
         conn.close()
@@ -87,14 +87,13 @@ class KeyCache(object):
         Given an open database cursor to a key cache, insert a key.
         """
         # Add the key to the cache
-        insert_key_statement = "INSERT INTO keycache VALUES('{issuer}', '{expiration}', '{key_id}', \
-                               '{keydata}', '{next_update}')"
+        insert_key_statement = "INSERT INTO keycache VALUES(?, ?, ?, ?, ?)"
         keydata = {
             'pub_key': public_key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo).decode('ascii'),
         }
 
-        curs.execute(insert_key_statement.format(issuer=issuer, expiration=time.time()+cache_timer, key_id=key_id,
-                                                 keydata=json.dumps(keydata), next_update=time.time()+next_update))
+        curs.execute(insert_key_statement, [issuer, time.time()+cache_timer, key_id,
+                                                 json.dumps(keydata), time.time()+next_update])
         if curs.rowcount != 1:
             raise UnableToWriteKeyCache("Unable to insert into key cache")
 
@@ -129,8 +128,7 @@ class KeyCache(object):
         # Open the connection to the database
         conn = sqlite3.connect(self.cache_location)
         curs = conn.cursor()
-        curs.execute("DELETE FROM keycache WHERE issuer = '{}' AND key_id = '{}'".format(issuer,
-                     key_id))
+        curs.execute("DELETE FROM keycache WHERE issuer = ? AND key_id = ?", [issuer, key_id])
         conn.commit()
         conn.close()
 
@@ -145,14 +143,16 @@ class KeyCache(object):
         :returns: None if no key is found.  Else, returns the public key
         """
         # Check the sql database
-        key_query = ("SELECT * FROM keycache WHERE "
-                     "issuer = '{issuer}'")
-        if key_id != None:
-            key_query += " AND key_id = '{key_id}'"
+        if key_id is not None:
+            key_query = "SELECT * FROM keycache WHERE issuer = ? AND key_id = ?"
+            query_params = [issuer, key_id]
+        else:
+            key_query = "SELECT * FROM keycache WHERE issuer = ?"
+            query_params = [issuer]
         conn = sqlite3.connect(self.cache_location)
         conn.row_factory = sqlite3.Row
         curs = conn.cursor()
-        curs.execute(key_query.format(issuer=issuer, key_id=key_id))
+        curs.execute(key_query, query_params)
 
         row = curs.fetchone()
         conn.commit()
Index: scitokens-1.8.1/tests/test_keycache.py
===================================================================
--- scitokens-1.8.1.orig/tests/test_keycache.py
+++ scitokens-1.8.1/tests/test_keycache.py
@@ -264,5 +264,122 @@ class TestKeyCache(unittest.TestCase):
         create_webserver.shutdown_server()
 
 
+import sqlite3
+
+class TestKeyCacheSQLInjection(unittest.TestCase):
+    """
+    Regression tests to verify that SQL injection via issuer/key_id is not possible.
+    """
+
+    def setUp(self):
+        self.tmp_dir = tempfile.mkdtemp()
+        self.old_xdg = os.environ.get('XDG_CACHE_HOME', None)
+        os.environ['XDG_CACHE_HOME'] = self.tmp_dir
+        self.keycache = KeyCache()
+
+        # Generate a test key pair
+        self.private_key = generate_private_key(
+            public_exponent=65537,
+            key_size=2048,
+            backend=default_backend()
+        )
+        self.public_key = self.private_key.public_key()
+
+    def tearDown(self):
+        shutil.rmtree(self.tmp_dir)
+        if self.old_xdg:
+            os.environ['XDG_CACHE_HOME'] = self.old_xdg
+
+    def _count_rows(self):
+        conn = sqlite3.connect(self.keycache.cache_location)
+        curs = conn.cursor()
+        curs.execute("SELECT COUNT(*) FROM keycache")
+        count = curs.fetchone()[0]
+        conn.close()
+        return count
+
+    def test_injection_in_issuer_does_not_delete_other_rows(self):
+        """
+        With the old .format() pattern, an issuer like "x' OR '1'='1" in a
+        DELETE would wipe every row. Parameterized queries treat it as a
+        literal value, so no rows other than the exact match are affected.
+        """
+        # Insert a legitimate row
+        self.keycache.addkeyinfo("https://legit.example.com/", "key1",
+                                 self.public_key, cache_timer=3600)
+        self.assertEqual(self._count_rows(), 1)
+
+        # Attempt injection via issuer in addkeyinfo (which DELETEs first)
+        malicious_issuer = "x' OR '1'='1"
+        self.keycache.addkeyinfo(malicious_issuer, "evil_key",
+                                 self.public_key, cache_timer=3600)
+
+        # The legitimate row must still exist, plus the new malicious-literal row
+        self.assertEqual(self._count_rows(), 2)
+
+    def test_injection_in_key_id_does_not_delete_other_rows(self):
+        """
+        A malicious key_id should not be able to affect other rows.
+        """
+        self.keycache.addkeyinfo("https://legit.example.com/", "key1",
+                                 self.public_key, cache_timer=3600)
+        self.assertEqual(self._count_rows(), 1)
+
+        malicious_key_id = "x' OR '1'='1"
+        self.keycache.addkeyinfo("https://other.example.com/", malicious_key_id,
+                                 self.public_key, cache_timer=3600)
+
+        self.assertEqual(self._count_rows(), 2)
+
+    def test_delete_cache_entry_with_injection_string(self):
+        """
+        _delete_cache_entry with a crafted issuer must not delete unrelated rows.
+        """
+        self.keycache.addkeyinfo("https://legit.example.com/", "key1",
+                                 self.public_key, cache_timer=3600)
+        self.assertEqual(self._count_rows(), 1)
+
+        # Try to delete with an injection string — should match nothing
+        self.keycache._delete_cache_entry("x' OR '1'='1", "key1")
+        self.assertEqual(self._count_rows(), 1)
+
+    def test_union_select_injection_is_literal(self):
+        """
+        A UNION SELECT payload in the issuer should be stored as a literal
+        value, not interpreted as SQL.
+        """
+        malicious_issuer = "x' UNION SELECT * FROM keycache --"
+        self.keycache.addkeyinfo(malicious_issuer, "key1",
+                                 self.public_key, cache_timer=3600)
+        self.assertEqual(self._count_rows(), 1)
+
+        # The stored issuer should be the literal malicious string
+        conn = sqlite3.connect(self.keycache.cache_location)
+        curs = conn.cursor()
+        curs.execute("SELECT issuer FROM keycache")
+        row = curs.fetchone()
+        conn.close()
+        self.assertEqual(row[0], malicious_issuer)
+
+    def test_getkeyinfo_injection_issuer_no_leak(self):
+        """
+        getkeyinfo with an injection payload in issuer must not return
+        rows belonging to a different issuer.
+        """
+        self.keycache.addkeyinfo("https://legit.example.com/", "key1",
+                                 self.public_key, cache_timer=3600)
+
+        # This injection string would match all rows with the old code
+        malicious_issuer = "x' OR '1'='1"
+        # getkeyinfo will not find a cached row and will try to download,
+        # which will fail — that's expected.  The important thing is it
+        # does NOT return the legit key.
+        try:
+            result = self.keycache.getkeyinfo(malicious_issuer, "key1")
+        except Exception:
+            result = None
+        self.assertIsNone(result)
+
+
 if __name__ == '__main__':
     unittest.main()
openSUSE Build Service is sponsored by