File 0017-54710-replace_jwt_with_pure_python.patch of Package ceph-ceph-17.2.9

diff --git a/src/pybind/mgr/dashboard/constraints.txt b/src/pybind/mgr/dashboard/constraints.txt
index 55f81c92d..fd6141048 100644
--- a/src/pybind/mgr/dashboard/constraints.txt
+++ b/src/pybind/mgr/dashboard/constraints.txt
@@ -1,6 +1,5 @@
 CherryPy~=13.1
 more-itertools~=8.14
-PyJWT~=2.0
 bcrypt~=3.1
 python3-saml~=1.4
 requests~=2.26
diff --git a/src/pybind/mgr/dashboard/exceptions.py b/src/pybind/mgr/dashboard/exceptions.py
index 96cbc5233..d396a38d2 100644
--- a/src/pybind/mgr/dashboard/exceptions.py
+++ b/src/pybind/mgr/dashboard/exceptions.py
@@ -121,3 +121,15 @@ class GrafanaError(Exception):
 
 class PasswordPolicyException(Exception):
     pass
+
+
+class ExpiredSignatureError(Exception):
+    pass
+
+
+class InvalidTokenError(Exception):
+    pass
+
+
+class InvalidAlgorithmError(Exception):
+    pass
diff --git a/src/pybind/mgr/dashboard/requirements.txt b/src/pybind/mgr/dashboard/requirements.txt
index 8003d62a5..292971819 100644
--- a/src/pybind/mgr/dashboard/requirements.txt
+++ b/src/pybind/mgr/dashboard/requirements.txt
@@ -1,7 +1,6 @@
 bcrypt
 CherryPy
 more-itertools
-PyJWT
 pyopenssl
 requests
 Routes
diff --git a/src/pybind/mgr/dashboard/services/auth.py b/src/pybind/mgr/dashboard/services/auth.py
index f13963abf..3c6002312 100644
--- a/src/pybind/mgr/dashboard/services/auth.py
+++ b/src/pybind/mgr/dashboard/services/auth.py
@@ -1,17 +1,19 @@
 # -*- coding: utf-8 -*-
 
+import base64
+import hashlib
+import hmac
 import json
 import logging
 import os
 import threading
 import time
 import uuid
-from base64 import b64encode
 
 import cherrypy
-import jwt
 
 from .. import mgr
+from ..exceptions import ExpiredSignatureError, InvalidAlgorithmError, InvalidTokenError
 from .access_control import LocalAuthenticator, UserDoesNotExist
 
 cherrypy.config.update({
@@ -33,7 +35,7 @@ class JwtManager(object):
     @staticmethod
     def _gen_secret():
         secret = os.urandom(16)
-        return b64encode(secret).decode('utf-8')
+        return base64.b64encode(secret).decode('utf-8')
 
     @classmethod
     def init(cls):
@@ -45,6 +47,54 @@ class JwtManager(object):
             mgr.set_store('jwt_secret', secret)
         cls._secret = secret
 
+    @classmethod
+    def array_to_base64_string(cls, message):
+        jsonstr = json.dumps(message, sort_keys=True).replace(" ", "")
+        string_bytes = base64.urlsafe_b64encode(bytes(jsonstr, 'UTF-8'))
+        return string_bytes.decode('UTF-8').replace("=", "")
+
+    @classmethod
+    def encode(cls, message, secret):
+        header = {"alg": cls.JWT_ALGORITHM, "typ": "JWT"}
+        base64_header = cls.array_to_base64_string(header)
+        base64_message = cls.array_to_base64_string(message)
+        base64_secret = base64.urlsafe_b64encode(hmac.new(
+            bytes(secret, 'UTF-8'),
+            msg=bytes(base64_header + "." + base64_message, 'UTF-8'),
+            digestmod=hashlib.sha256
+        ).digest()).decode('UTF-8').replace("=", "")
+        return base64_header + "." + base64_message + "." + base64_secret
+
+    @classmethod
+    def decode(cls, message, secret):
+        split_message = message.split(".")
+        base64_header = split_message[0]
+        base64_message = split_message[1]
+        base64_secret = split_message[2]
+
+        decoded_header = json.loads(base64.urlsafe_b64decode(base64_header))
+
+        if decoded_header['alg'] != cls.JWT_ALGORITHM:
+            raise InvalidAlgorithmError()
+
+        incoming_secret = base64.urlsafe_b64encode(hmac.new(
+            bytes(secret, 'UTF-8'),
+            msg=bytes(base64_header + "." + base64_message, 'UTF-8'),
+            digestmod=hashlib.sha256
+        ).digest()).decode('UTF-8').replace("=", "")
+
+        if base64_secret != incoming_secret:
+            raise InvalidTokenError()
+
+        # We add ==== as padding to ignore the requirement to have correct padding in
+        # the urlsafe_b64decode method.
+        decoded_message = json.loads(base64.urlsafe_b64decode(base64_message + "===="))
+        now = int(time.time())
+        if decoded_message['exp'] < now:
+            raise ExpiredSignatureError()
+
+        return decoded_message
+
     @classmethod
     def gen_token(cls, username):
         if not cls._secret:
@@ -59,13 +109,13 @@ class JwtManager(object):
             'iat': now,
             'username': username
         }
-        return jwt.encode(payload, cls._secret, algorithm=cls.JWT_ALGORITHM)  # type: ignore
+        return cls.encode(payload, cls._secret)  # type: ignore
 
     @classmethod
     def decode_token(cls, token):
         if not cls._secret:
             cls.init()
-        return jwt.decode(token, cls._secret, algorithms=cls.JWT_ALGORITHM)  # type: ignore
+        return cls.decode(token, cls._secret)  # type: ignore
 
     @classmethod
     def get_token_from_header(cls):
@@ -99,8 +149,8 @@ class JwtManager(object):
     @classmethod
     def get_user(cls, token):
         try:
-            dtoken = JwtManager.decode_token(token)
-            if not JwtManager.is_blocklisted(dtoken['jti']):
+            dtoken = cls.decode_token(token)
+            if not cls.is_blocklisted(dtoken['jti']):
                 user = AuthManager.get_user(dtoken['username'])
                 if user.last_update <= dtoken['iat']:
                     return user
@@ -110,10 +160,12 @@ class JwtManager(object):
                 )
             else:
                 cls.logger.debug('Token is block-listed')  # type: ignore
-        except jwt.ExpiredSignatureError:
+        except ExpiredSignatureError:
             cls.logger.debug("Token has expired")  # type: ignore
-        except jwt.InvalidTokenError:
+        except InvalidTokenError:
             cls.logger.debug("Failed to decode token")  # type: ignore
+        except InvalidAlgorithmError:
+            cls.logger.debug("Only the HS256 algorithm is supported.")  # type: ignore
         except UserDoesNotExist:
             cls.logger.debug(  # type: ignore
                 "Invalid token: user %s does not exist", dtoken['username']
openSUSE Build Service is sponsored by