File 0017-54710-replace_jwt_with_pure_python.patch of Package ceph-ceph-16.2.15

diff --git a/src/pybind/mgr/dashboard/constraints.txt b/src/pybind/mgr/dashboard/constraints.txt
index 1b4595e1d0d..fa8d8616064 100644
--- a/src/pybind/mgr/dashboard/constraints.txt
+++ b/src/pybind/mgr/dashboard/constraints.txt
@@ -1,6 +1,5 @@
 CherryPy~=13.1.0
 more-itertools~=8.14.0
-PyJWT~=2.0.1
 bcrypt~=3.1.4
 python3-saml~=1.4.1
 requests~=2.26
diff --git a/src/pybind/mgr/dashboard/exceptions.py b/src/pybind/mgr/dashboard/exceptions.py
index 8857c423cc4..5c559c6d591 100644
--- a/src/pybind/mgr/dashboard/exceptions.py
+++ b/src/pybind/mgr/dashboard/exceptions.py
@@ -122,3 +122,15 @@ class GrafanaError(Exception):
 
 class PasswordPolicyException(Exception):
     pass
+
+
+class ExpiredSignatureError(Exception):
+    pass
+
+
+class InvalidTokenError(Exception):
+    pass
+
+
+class InvalidAlgorithmError(Exception):
+    pass
diff --git a/src/pybind/mgr/dashboard/requirements.txt b/src/pybind/mgr/dashboard/requirements.txt
index 84dee7efc9a..a49d92d6cb1 100644
--- a/src/pybind/mgr/dashboard/requirements.txt
+++ b/src/pybind/mgr/dashboard/requirements.txt
@@ -1,7 +1,6 @@
 bcrypt
 CherryPy
 more-itertools
-PyJWT
 pyopenssl
 requests
 Routes
diff --git a/src/pybind/mgr/dashboard/services/auth.py b/src/pybind/mgr/dashboard/services/auth.py
index 6c5153620aa..94189548fda 100644
--- a/src/pybind/mgr/dashboard/services/auth.py
+++ b/src/pybind/mgr/dashboard/services/auth.py
@@ -1,18 +1,20 @@
 # -*- coding: utf-8 -*-
 from __future__ import absolute_import
 
+import base64
+import hashlib
+import hmac
 import json
 import logging
 import os
 import threading
 import time
 import uuid
-from base64 import b64encode
 
 import cherrypy
-import jwt
 
 from .. import mgr
+from ..exceptions import ExpiredSignatureError, InvalidAlgorithmError, InvalidTokenError
 from .access_control import LocalAuthenticator, UserDoesNotExist
 
 cherrypy.config.update({
@@ -34,7 +36,7 @@ class JwtManager(object):
     @staticmethod
     def _gen_secret():
         secret = os.urandom(16)
-        return b64encode(secret).decode('utf-8')
+        return base64.b64encode(secret).decode('utf-8')
 
     @classmethod
     def init(cls):
@@ -46,6 +48,54 @@ class JwtManager(object):
             mgr.set_store('jwt_secret', secret)
         cls._secret = secret
 
+    @classmethod
+    def array_to_base64_string(cls, message):
+        jsonstr = json.dumps(message, sort_keys=True).replace(" ", "")
+        string_bytes = base64.urlsafe_b64encode(bytes(jsonstr, 'UTF-8'))
+        return string_bytes.decode('UTF-8').replace("=", "")
+
+    @classmethod
+    def encode(cls, message, secret):
+        header = {"alg": cls.JWT_ALGORITHM, "typ": "JWT"}
+        base64_header = cls.array_to_base64_string(header)
+        base64_message = cls.array_to_base64_string(message)
+        base64_secret = base64.urlsafe_b64encode(hmac.new(
+            bytes(secret, 'UTF-8'),
+            msg=bytes(base64_header + "." + base64_message, 'UTF-8'),
+            digestmod=hashlib.sha256
+        ).digest()).decode('UTF-8').replace("=", "")
+        return base64_header + "." + base64_message + "." + base64_secret
+
+    @classmethod
+    def decode(cls, message, secret):
+        split_message = message.split(".")
+        base64_header = split_message[0]
+        base64_message = split_message[1]
+        base64_secret = split_message[2]
+
+        decoded_header = json.loads(base64.urlsafe_b64decode(base64_header))
+
+        if decoded_header['alg'] != cls.JWT_ALGORITHM:
+            raise InvalidAlgorithmError()
+
+        incoming_secret = base64.urlsafe_b64encode(hmac.new(
+            bytes(secret, 'UTF-8'),
+            msg=bytes(base64_header + "." + base64_message, 'UTF-8'),
+            digestmod=hashlib.sha256
+        ).digest()).decode('UTF-8').replace("=", "")
+
+        if base64_secret != incoming_secret:
+            raise InvalidTokenError()
+
+        # We add ==== as padding to ignore the requirement to have correct padding in
+        # the urlsafe_b64decode method.
+        decoded_message = json.loads(base64.urlsafe_b64decode(base64_message + "===="))
+        now = int(time.time())
+        if decoded_message['exp'] < now:
+            raise ExpiredSignatureError()
+
+        return decoded_message
+
     @classmethod
     def gen_token(cls, username):
         if not cls._secret:
@@ -60,13 +110,13 @@ class JwtManager(object):
             'iat': now,
             'username': username
         }
-        return jwt.encode(payload, cls._secret, algorithm=cls.JWT_ALGORITHM)  # type: ignore
+        return cls.encode(payload, cls._secret)  # type: ignore
 
     @classmethod
     def decode_token(cls, token):
         if not cls._secret:
             cls.init()
-        return jwt.decode(token, cls._secret, algorithms=cls.JWT_ALGORITHM)  # type: ignore
+        return cls.decode(token, cls._secret)  # type: ignore
 
     @classmethod
     def get_token_from_header(cls):
@@ -100,8 +150,8 @@ class JwtManager(object):
     @classmethod
     def get_user(cls, token):
         try:
-            dtoken = JwtManager.decode_token(token)
-            if not JwtManager.is_blocklisted(dtoken['jti']):
+            dtoken = cls.decode_token(token)
+            if not cls.is_blocklisted(dtoken['jti']):
                 user = AuthManager.get_user(dtoken['username'])
                 if user.last_update <= dtoken['iat']:
                     return user
@@ -111,10 +161,12 @@ class JwtManager(object):
                 )
             else:
                 cls.logger.debug('Token is block-listed')  # type: ignore
-        except jwt.ExpiredSignatureError:
+        except ExpiredSignatureError:
             cls.logger.debug("Token has expired")  # type: ignore
-        except jwt.InvalidTokenError:
+        except InvalidTokenError:
             cls.logger.debug("Failed to decode token")  # type: ignore
+        except InvalidAlgorithmError:
+            cls.logger.debug("Only the HS256 algorithm is supported.")  # type: ignore
         except UserDoesNotExist:
             cls.logger.debug(  # type: ignore
                 "Invalid token: user %s does not exist", dtoken['username']
openSUSE Build Service is sponsored by