File 0001-Always-generate-a-random-IV-for-AES-operations.patch of Package python-pysaml2
From cc7c2075adc5f77126264f11e1f052a8e71c6863 Mon Sep 17 00:00:00 2001
From: Ivan Kanakarakis <ivan.kanak@gmail.com>
Date: Fri, 13 Jul 2018 20:15:04 +0300
Subject: [PATCH] Always generate a random IV for AES operations
Quoting @obi1kenobi:
> Initialization vector reuse like this is a security concern, since it leaks
> information about the encrypted data to attackers, regardless of the
> encryption mode used.
> Instead of relying on a fixed, randomly-generated IV, it would be better to
> randomly-generate a new IV for every encryption operation.
Breaks AESCipher ECB support
Reported as CVE-2017-1000246
Fixes #417
Signed-off-by: Ivan Kanakarakis <ivan.kanak@gmail.com>
(cherry picked from commit 7323f5c20efb59424d853c822e7a26d1aa3e84aa)
---
src/saml2/aes.py | 42 +++++++++++-------------------------------
src/saml2/authn.py | 1 +
src/saml2/server.py | 3 ---
3 files changed, 12 insertions(+), 34 deletions(-)
diff --git a/src/saml2/aes.py b/src/saml2/aes.py
index 4110e1fc..5de93ed8 100644
--- a/src/saml2/aes.py
+++ b/src/saml2/aes.py
@@ -9,38 +9,27 @@ __author__ = 'rolandh'
POSTFIX_MODE = {
"cbc": AES.MODE_CBC,
"cfb": AES.MODE_CFB,
- "ecb": AES.MODE_CFB,
}
BLOCK_SIZE = 16
class AESCipher(object):
- def __init__(self, key, iv=""):
+ def __init__(self, key):
"""
:param key: The encryption key
- :param iv: Init vector
:return: AESCipher instance
"""
self.key = key
- self.iv = iv
- def build_cipher(self, iv="", alg="aes_128_cbc"):
+ def build_cipher(self, alg='aes_128_cbc'):
"""
- :param iv: init vector
:param alg: cipher algorithm
:return: A Cipher instance
"""
typ, bits, cmode = alg.split("_")
-
- if not iv:
- if self.iv:
- iv = self.iv
- else:
- iv = Random.new().read(AES.block_size)
- else:
- assert len(iv) == AES.block_size
+ iv = Random.new().read(AES.block_size)
if bits not in ["128", "192", "256"]:
raise Exception("Unsupported key length")
@@ -54,12 +43,12 @@ class AESCipher(object):
except KeyError:
raise Exception("Unsupported chaining mode")
+ return cipher, iv
- def encrypt(self, msg, iv=None, alg="aes_128_cbc", padding="PKCS#7",
- b64enc=True, block_size=BLOCK_SIZE):
+ def encrypt(self, msg, alg='aes_128_cbc', padding='PKCS#7', b64enc=True,
+ block_size=BLOCK_SIZE):
"""
:param key: The encryption key
- :param iv: init vector
:param msg: Message to be encrypted
:param padding: Which padding that should be used
:param b64enc: Whether the result should be base64encoded
@@ -79,7 +68,7 @@ class AESCipher(object):
c = chr(plen)
msg += c*plen
- cipher, iv = self.build_cipher(iv, alg)
+ cipher, iv = self.build_cipher(alg)
cmsg = iv + cipher.encrypt(msg)
if b64enc:
return b64encode(cmsg)
@@ -87,10 +76,9 @@ class AESCipher(object):
return cmsg
- def decrypt(self, msg, iv=None, alg="aes_128_cbc", padding="PKCS#7", b64dec=True):
+ def decrypt(self, msg, alg="aes_128_cbc", padding="PKCS#7", b64dec=True):
"""
:param key: The encryption key
- :param iv: init vector
:param msg: Base64 encoded message to be decrypted
:return: The decrypted message
"""
@@ -99,10 +87,7 @@ class AESCipher(object):
else:
data = msg
- _iv = data[:AES.block_size]
- if iv:
- assert iv == _iv
- cipher, iv = self.build_cipher(iv, alg=alg)
+ cipher, iv = self.build_cipher(alg=alg)
res = cipher.decrypt(data)[AES.block_size:]
if padding in ["PKCS#5", "PKCS#7"]:
res = res[:-ord(res[-1])]
@@ -113,11 +98,6 @@ if __name__ == "__main__":
# Iff padded, the message doesn't have to be multiple of 16 in length
msg_ = "ToBeOrNotTobe W.S."
aes = AESCipher(key_)
- iv_ = os.urandom(16)
- encrypted_msg = aes.encrypt(key_, msg_, iv_)
- txt = aes.decrypt(key_, encrypted_msg, iv_)
- assert txt == msg_
-
- encrypted_msg = aes.encrypt(key_, msg_, 0)
- txt = aes.decrypt(key_, encrypted_msg, 0)
+ encrypted_msg = aes.encrypt(msg_)
+ txt = aes.decrypt(encrypted_msg)
assert txt == msg_
diff --git a/src/saml2/authn.py b/src/saml2/authn.py
index 1c3006d8..893904d5 100644
--- a/src/saml2/authn.py
+++ b/src/saml2/authn.py
@@ -122,6 +122,7 @@ class UsernamePasswordMako(UserAuthnMethod):
self.active = {}
self.query_param = "upm_answer"
self.aes = AESCipher(self.srv.symkey, srv.iv)
+ self.aes = AESCipher(self.srv.symkey.encode())
def __call__(self, cookie=None, policy_url=None, logo_url=None,
query="", **kwargs):
diff --git a/src/saml2/server.py b/src/saml2/server.py
index b77b2c4d..86335f4b 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -80,12 +80,9 @@ class Server(Entity):
self.init_config(stype)
self.cache = cache
self.ticket = {}
- #
self.session_db = self.choose_session_storage()
- # Needed for
self.symkey = symkey
self.seed = rndstr()
- self.iv = os.urandom(16)
self.lock = threading.Lock()
def getvalid_certificate_str(self):
--
2.26.2