File 0017-54710-replace_jwt_with_pure_python.patch of Package ceph-ceph-15.2.17

diff --git a/src/pybind/mgr/dashboard/constraints.txt b/src/pybind/mgr/dashboard/constraints.txt
index 7bd2d4a10c7..6a2aa6b8e09 100644
--- a/src/pybind/mgr/dashboard/constraints.txt
+++ b/src/pybind/mgr/dashboard/constraints.txt
@@ -1,7 +1,6 @@
 CherryPy==13.1.0
 enum34==1.1.6
 more-itertools==4.1.0
-PyJWT==2.0.1
 bcrypt==3.1.4
 python3-saml==1.4.1
 requests==2.25.1
diff --git a/src/pybind/mgr/dashboard/exceptions.py b/src/pybind/mgr/dashboard/exceptions.py
index 8857c423cc4..5c559c6d591 100644
--- a/src/pybind/mgr/dashboard/exceptions.py
+++ b/src/pybind/mgr/dashboard/exceptions.py
@@ -122,3 +122,15 @@ class GrafanaError(Exception):
 
 class PasswordPolicyException(Exception):
     pass
+
+
+class ExpiredSignatureError(Exception):
+    pass
+
+
+class InvalidTokenError(Exception):
+    pass
+
+
+class InvalidAlgorithmError(Exception):
+    pass
diff --git a/src/pybind/mgr/dashboard/requirements.txt b/src/pybind/mgr/dashboard/requirements.txt
index 258e34261b1..2f8ef880072 100644
--- a/src/pybind/mgr/dashboard/requirements.txt
+++ b/src/pybind/mgr/dashboard/requirements.txt
@@ -2,7 +2,6 @@ bcrypt
 CherryPy
 enum34; python_version<'3.4'
 more-itertools
-PyJWT
 pyopenssl
 python3-saml
 requests
diff --git a/src/pybind/mgr/dashboard/services/auth.py b/src/pybind/mgr/dashboard/services/auth.py
index 916afca99a1..6105e3faa7b 100644
--- a/src/pybind/mgr/dashboard/services/auth.py
+++ b/src/pybind/mgr/dashboard/services/auth.py
@@ -1,7 +1,9 @@
 # -*- coding: utf-8 -*-
 from __future__ import absolute_import
 
-from base64 import b64encode
+import base64
+import hashlib
+import hmac
 import json
 import logging
 import os
@@ -10,10 +12,10 @@ import time
 import uuid
 
 import cherrypy
-import jwt
 
-from .access_control import LocalAuthenticator, UserDoesNotExist
 from .. import mgr
+from ..exceptions import ExpiredSignatureError, InvalidAlgorithmError, InvalidTokenError
+from .access_control import LocalAuthenticator, UserDoesNotExist
 
 cherrypy.config.update({
     'response.headers.server': 'Ceph-Dashboard',
@@ -34,7 +36,7 @@ class JwtManager(object):
     @staticmethod
     def _gen_secret():
         secret = os.urandom(16)
-        return b64encode(secret).decode('utf-8')
+        return base64.b64encode(secret).decode('utf-8')
 
     @classmethod
     def init(cls):
@@ -46,6 +48,54 @@ class JwtManager(object):
             mgr.set_store('jwt_secret', secret)
         cls._secret = secret
 
+    @classmethod
+    def array_to_base64_string(cls, message):
+        jsonstr = json.dumps(message, sort_keys=True).replace(" ", "")
+        string_bytes = base64.urlsafe_b64encode(bytes(jsonstr, 'UTF-8'))
+        return string_bytes.decode('UTF-8').replace("=", "")
+
+    @classmethod
+    def encode(cls, message, secret):
+        header = {"alg": cls.JWT_ALGORITHM, "typ": "JWT"}
+        base64_header = cls.array_to_base64_string(header)
+        base64_message = cls.array_to_base64_string(message)
+        base64_secret = base64.urlsafe_b64encode(hmac.new(
+            bytes(secret, 'UTF-8'),
+            msg=bytes(base64_header + "." + base64_message, 'UTF-8'),
+            digestmod=hashlib.sha256
+        ).digest()).decode('UTF-8').replace("=", "")
+        return base64_header + "." + base64_message + "." + base64_secret
+
+    @classmethod
+    def decode(cls, message, secret):
+        split_message = message.split(".")
+        base64_header = split_message[0]
+        base64_message = split_message[1]
+        base64_secret = split_message[2]
+
+        decoded_header = json.loads(base64.urlsafe_b64decode(base64_header))
+
+        if decoded_header['alg'] != cls.JWT_ALGORITHM:
+            raise InvalidAlgorithmError()
+
+        incoming_secret = base64.urlsafe_b64encode(hmac.new(
+            bytes(secret, 'UTF-8'),
+            msg=bytes(base64_header + "." + base64_message, 'UTF-8'),
+            digestmod=hashlib.sha256
+        ).digest()).decode('UTF-8').replace("=", "")
+
+        if base64_secret != incoming_secret:
+            raise InvalidTokenError()
+
+        # We add ==== as padding to ignore the requirement to have correct padding in
+        # the urlsafe_b64decode method.
+        decoded_message = json.loads(base64.urlsafe_b64decode(base64_message + "===="))
+        now = int(time.time())
+        if decoded_message['exp'] < now:
+            raise ExpiredSignatureError()
+
+        return decoded_message
+
     @classmethod
     def gen_token(cls, username):
         if not cls._secret:
@@ -60,13 +110,13 @@ class JwtManager(object):
             'iat': now,
             'username': username
         }
-        return jwt.encode(payload, cls._secret, algorithm=cls.JWT_ALGORITHM)  # type: ignore
+        return cls.encode(payload, cls._secret)  # type: ignore
 
     @classmethod
     def decode_token(cls, token):
         if not cls._secret:
             cls.init()
-        return jwt.decode(token, cls._secret, algorithms=cls.JWT_ALGORITHM)  # type: ignore
+        return cls.decode(token, cls._secret)  # type: ignore
 
     @classmethod
     def get_token_from_header(cls):
@@ -100,8 +150,8 @@ class JwtManager(object):
     @classmethod
     def get_user(cls, token):
         try:
-            dtoken = JwtManager.decode_token(token)
-            if not JwtManager.is_blacklisted(dtoken['jti']):
+            dtoken = cls.decode_token(token)
+            if not cls.is_blocklisted(dtoken['jti']):
                 user = AuthManager.get_user(dtoken['username'])
                 if user.last_update <= dtoken['iat']:
                     return user
@@ -111,10 +161,12 @@ class JwtManager(object):
                 )
             else:
                 cls.logger.debug('Token is black-listed')  # type: ignore
-        except jwt.ExpiredSignatureError:
+        except ExpiredSignatureError:
             cls.logger.debug("Token has expired")  # type: ignore
-        except jwt.InvalidTokenError:
+        except InvalidTokenError:
             cls.logger.debug("Failed to decode token")  # type: ignore
+        except InvalidAlgorithmError:
+            cls.logger.debug("Only the HS256 algorithm is supported.")  # type: ignore
         except UserDoesNotExist:
             cls.logger.debug(  # type: ignore
                 "Invalid token: user %s does not exist", dtoken['username']
openSUSE Build Service is sponsored by