File CVE-2026-32714.patch of Package python-scitokens
From 3dba108853f2f4a6c0f2325c03779bf083c41cf2 Mon Sep 17 00:00:00 2001
From: Derek Weitzel <djw8605@gmail.com>
Date: Fri, 13 Mar 2026 10:44:27 -0500
Subject: [PATCH] Refactor KeyCache SQL queries to use parameterized statements
for security and add regression tests for SQL injection prevention
---
src/scitokens/utils/keycache.py | 31 ++++----
tests/test_keycache.py | 134 ++++++++++++++++++++++++++++++++
2 files changed, 149 insertions(+), 16 deletions(-)
Index: scitokens-1.8.1/src/scitokens/utils/keycache.py
===================================================================
--- scitokens-1.8.1.orig/src/scitokens/utils/keycache.py
+++ scitokens-1.8.1/src/scitokens/utils/keycache.py
@@ -76,7 +76,7 @@ class KeyCache(object):
conn = sqlite3.connect(self.cache_location)
conn.row_factory = sqlite3.Row
curs = conn.cursor()
- curs.execute("DELETE FROM keycache WHERE issuer = '{}' AND key_id = '{}'".format(issuer, key_id))
+ curs.execute("DELETE FROM keycache WHERE issuer = ? AND key_id = ?", [issuer, key_id])
KeyCache._addkeyinfo(curs, issuer, key_id, public_key, cache_timer=cache_timer, next_update=next_update)
conn.commit()
conn.close()
@@ -87,14 +87,13 @@ class KeyCache(object):
Given an open database cursor to a key cache, insert a key.
"""
# Add the key to the cache
- insert_key_statement = "INSERT INTO keycache VALUES('{issuer}', '{expiration}', '{key_id}', \
- '{keydata}', '{next_update}')"
+ insert_key_statement = "INSERT INTO keycache VALUES(?, ?, ?, ?, ?)"
keydata = {
'pub_key': public_key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo).decode('ascii'),
}
- curs.execute(insert_key_statement.format(issuer=issuer, expiration=time.time()+cache_timer, key_id=key_id,
- keydata=json.dumps(keydata), next_update=time.time()+next_update))
+ curs.execute(insert_key_statement, [issuer, time.time()+cache_timer, key_id,
+ json.dumps(keydata), time.time()+next_update])
if curs.rowcount != 1:
raise UnableToWriteKeyCache("Unable to insert into key cache")
@@ -129,8 +128,7 @@ class KeyCache(object):
# Open the connection to the database
conn = sqlite3.connect(self.cache_location)
curs = conn.cursor()
- curs.execute("DELETE FROM keycache WHERE issuer = '{}' AND key_id = '{}'".format(issuer,
- key_id))
+ curs.execute("DELETE FROM keycache WHERE issuer = ? AND key_id = ?", [issuer, key_id])
conn.commit()
conn.close()
@@ -145,14 +143,16 @@ class KeyCache(object):
:returns: None if no key is found. Else, returns the public key
"""
# Check the sql database
- key_query = ("SELECT * FROM keycache WHERE "
- "issuer = '{issuer}'")
- if key_id != None:
- key_query += " AND key_id = '{key_id}'"
+ if key_id is not None:
+ key_query = "SELECT * FROM keycache WHERE issuer = ? AND key_id = ?"
+ query_params = [issuer, key_id]
+ else:
+ key_query = "SELECT * FROM keycache WHERE issuer = ?"
+ query_params = [issuer]
conn = sqlite3.connect(self.cache_location)
conn.row_factory = sqlite3.Row
curs = conn.cursor()
- curs.execute(key_query.format(issuer=issuer, key_id=key_id))
+ curs.execute(key_query, query_params)
row = curs.fetchone()
conn.commit()
Index: scitokens-1.8.1/tests/test_keycache.py
===================================================================
--- scitokens-1.8.1.orig/tests/test_keycache.py
+++ scitokens-1.8.1/tests/test_keycache.py
@@ -264,5 +264,122 @@ class TestKeyCache(unittest.TestCase):
create_webserver.shutdown_server()
+import sqlite3
+
+class TestKeyCacheSQLInjection(unittest.TestCase):
+ """
+ Regression tests to verify that SQL injection via issuer/key_id is not possible.
+ """
+
+ def setUp(self):
+ self.tmp_dir = tempfile.mkdtemp()
+ self.old_xdg = os.environ.get('XDG_CACHE_HOME', None)
+ os.environ['XDG_CACHE_HOME'] = self.tmp_dir
+ self.keycache = KeyCache()
+
+ # Generate a test key pair
+ self.private_key = generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend()
+ )
+ self.public_key = self.private_key.public_key()
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp_dir)
+ if self.old_xdg:
+ os.environ['XDG_CACHE_HOME'] = self.old_xdg
+
+ def _count_rows(self):
+ conn = sqlite3.connect(self.keycache.cache_location)
+ curs = conn.cursor()
+ curs.execute("SELECT COUNT(*) FROM keycache")
+ count = curs.fetchone()[0]
+ conn.close()
+ return count
+
+ def test_injection_in_issuer_does_not_delete_other_rows(self):
+ """
+ With the old .format() pattern, an issuer like "x' OR '1'='1" in a
+ DELETE would wipe every row. Parameterized queries treat it as a
+ literal value, so no rows other than the exact match are affected.
+ """
+ # Insert a legitimate row
+ self.keycache.addkeyinfo("https://legit.example.com/", "key1",
+ self.public_key, cache_timer=3600)
+ self.assertEqual(self._count_rows(), 1)
+
+ # Attempt injection via issuer in addkeyinfo (which DELETEs first)
+ malicious_issuer = "x' OR '1'='1"
+ self.keycache.addkeyinfo(malicious_issuer, "evil_key",
+ self.public_key, cache_timer=3600)
+
+ # The legitimate row must still exist, plus the new malicious-literal row
+ self.assertEqual(self._count_rows(), 2)
+
+ def test_injection_in_key_id_does_not_delete_other_rows(self):
+ """
+ A malicious key_id should not be able to affect other rows.
+ """
+ self.keycache.addkeyinfo("https://legit.example.com/", "key1",
+ self.public_key, cache_timer=3600)
+ self.assertEqual(self._count_rows(), 1)
+
+ malicious_key_id = "x' OR '1'='1"
+ self.keycache.addkeyinfo("https://other.example.com/", malicious_key_id,
+ self.public_key, cache_timer=3600)
+
+ self.assertEqual(self._count_rows(), 2)
+
+ def test_delete_cache_entry_with_injection_string(self):
+ """
+ _delete_cache_entry with a crafted issuer must not delete unrelated rows.
+ """
+ self.keycache.addkeyinfo("https://legit.example.com/", "key1",
+ self.public_key, cache_timer=3600)
+ self.assertEqual(self._count_rows(), 1)
+
+ # Try to delete with an injection string — should match nothing
+ self.keycache._delete_cache_entry("x' OR '1'='1", "key1")
+ self.assertEqual(self._count_rows(), 1)
+
+ def test_union_select_injection_is_literal(self):
+ """
+ A UNION SELECT payload in the issuer should be stored as a literal
+ value, not interpreted as SQL.
+ """
+ malicious_issuer = "x' UNION SELECT * FROM keycache --"
+ self.keycache.addkeyinfo(malicious_issuer, "key1",
+ self.public_key, cache_timer=3600)
+ self.assertEqual(self._count_rows(), 1)
+
+ # The stored issuer should be the literal malicious string
+ conn = sqlite3.connect(self.keycache.cache_location)
+ curs = conn.cursor()
+ curs.execute("SELECT issuer FROM keycache")
+ row = curs.fetchone()
+ conn.close()
+ self.assertEqual(row[0], malicious_issuer)
+
+ def test_getkeyinfo_injection_issuer_no_leak(self):
+ """
+ getkeyinfo with an injection payload in issuer must not return
+ rows belonging to a different issuer.
+ """
+ self.keycache.addkeyinfo("https://legit.example.com/", "key1",
+ self.public_key, cache_timer=3600)
+
+ # This injection string would match all rows with the old code
+ malicious_issuer = "x' OR '1'='1"
+ # getkeyinfo will not find a cached row and will try to download,
+ # which will fail — that's expected. The important thing is it
+ # does NOT return the legit key.
+ try:
+ result = self.keycache.getkeyinfo(malicious_issuer, "key1")
+ except Exception:
+ result = None
+ self.assertIsNone(result)
+
+
if __name__ == '__main__':
unittest.main()