File of Package pagure

# -*- coding: utf-8 -*-
# Flask-FAS-OpenID - A Flask extension for authorizing users with FAS-OpenID
# Primary maintainer: Patrick Uiterwijk <>
# Copyright (c) 2013, Patrick Uiterwijk
# This file is part of python-fedora
# python-fedora is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
# python-fedora is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with python-fedora; if not, see <>

FAS-OpenID authentication plugin for the flask web framework

.. moduleauthor:: Patrick Uiterwijk <>

..versionadded:: 0.3.33
from functools import wraps

import logging
import time
from munch import Munch
import flask
    from flask import _app_ctx_stack as stack
except ImportError:
    from flask import _request_ctx_stack as stack

from openid.consumer import consumer
from openid.fetchers import setDefaultFetcher, Urllib2Fetcher
from openid.extensions import pape, sreg, ax
from openid_cla import cla
from openid_teams import teams

import six
log = logging.getLogger(__name__)

def request_wants_json():
    ''' Return wether the user requested the data in JSON or not. '''
    best = flask.request.accept_mimetypes \
        .best_match(['application/json', 'text/html'])
    return best == 'application/json' and \
        flask.request.accept_mimetypes[best] > \

class FASJSONEncoder(flask.json.JSONEncoder):
    """ Dedicated JSON encoder for the FAS openid information. """

    def default(self, o):
        """Implement this method in a subclass such that it returns a
        serializable object for ``o``, or calls the base implementation (to
        raise a ``TypeError``).

        For example, to support arbitrary iterators, you could implement
        default like this::

        def default(self, o):
                iterable = iter(o)
            except TypeError:
                return list(iterable)
            return JSONEncoder.default(self, o)
        if isinstance(o, (set, frozenset)):
            return list(o)
        return flask.json.JSONEncoder.default(self, o)

class FAS(object):
    """ The Flask plugin. """

    def __init__(self, app=None):
        self.postlogin_func = None = app
        if is not None:

    def init_app(self, app):
        """ Constructor for the Flask application. """ = app
        app.config.setdefault('FAS_OPENID_CHECK_CERT', True)

        if not['FAS_OPENID_CHECK_CERT']:

        # json_encoder is only available from flask 0.10
        version = flask.__version__.split('.')
        assume_recent = False
            major = int(version[0])
            minor = int(version[1])
        except ValueError:
            # We'll assume we're using a recent enough flask as the packages
            # of old versions used sane version numbers.
            assume_recent = True

        if assume_recent or (major > 0 or minor >= 10):
   = FASJSONEncoder

        @app.route('/_flask_fas_openid_handler/', methods=['GET', 'POST'])
        def flask_fas_openid_handler():
            """ Endpoint for OpenID results. """
            return self._handle_openid_request()


    def postlogin(self, f):
        """Marks a function as post login handler. This decorator calls your
        function after the login has been performed.
        self.postlogin_func = f
        return f

    def _handle_openid_request(self):
        return_url = flask.session.get('FLASK_FAS_OPENID_RETURN_URL', None)
        cancel_url = flask.session.get('FLASK_FAS_OPENID_CANCEL_URL', None)
        base_url = self.normalize_url(flask.request.base_url)
        oidconsumer = consumer.Consumer(flask.session, None)
        info = oidconsumer.complete(flask.request.values, base_url)
        display_identifier = info.getDisplayIdentifier()

        if info.status == consumer.FAILURE and display_identifier:
            return 'FAILURE. display_identifier: %s' % display_identifier
        elif info.status == consumer.CANCEL:
            if cancel_url:
                return flask.redirect(cancel_url)
            return 'OpenID request was cancelled'
        elif info.status == consumer.SUCCESS:
            if info.endpoint.server_url != \
                log.warn('Claim received from invalid issuer: %s',
                return 'Invalid provider issued claim!'

            sreg_resp = sreg.SRegResponse.fromSuccessResponse(info)
            teams_resp = teams.TeamsResponse.fromSuccessResponse(info)
            cla_resp = cla.CLAResponse.fromSuccessResponse(info)
            ax_resp = ax.FetchResponse.fromSuccessResponse(info)
            user = {'fullname': '', 'username': '', 'email': '',
                    'timezone': '', 'cla_done': False, 'groups': []}
            if not sreg_resp:
                # If we have no basic info, be gone with them!
                return flask.redirect(cancel_url)
            user['username'] = sreg_resp.get('nickname')
            user['fullname'] = sreg_resp.get('fullname')
            user['email'] = sreg_resp.get('email')
            user['timezone'] = sreg_resp.get('timezone')
            user['login_time'] = time.time()
            if cla_resp:
                user['cla_done'] = cla.CLA_URI_FEDORA_DONE in cla_resp.clas
            if teams_resp:
                # The groups do not contain the cla_ groups
                user['groups'] = frozenset(teams_resp.teams)
            if ax_resp:
                ssh_keys = ax_resp.get(
                if isinstance(ssh_keys, (list, tuple)):
                    ssh_keys = '\n'.join(
                        for ssh_key in ssh_keys
                        if ssh_key.strip()
                    if ssh_keys:
                        user['ssh_key'] = ssh_keys
                user['gpg_keyid'] = ax_resp.get(
            flask.session['FLASK_FAS_OPENID_USER'] = user
            flask.session.modified = True
            if self.postlogin_func is not None:
                return self.postlogin_func(return_url)
                return flask.redirect(return_url)
            return 'Strange state: %s' % info.status

    def _check_session(self):
        if 'FLASK_FAS_OPENID_USER' not in flask.session \
                or flask.session['FLASK_FAS_OPENID_USER'] is None:
            flask.g.fas_user = None
            user = flask.session['FLASK_FAS_OPENID_USER']
            # Add approved_memberships to provide backwards compatibility
            # New applications should only use g.fas_user.groups
            user['approved_memberships'] = []
            for group in user['groups']:
                membership = dict()
                membership['name'] = group
            flask.g.fas_user = Munch.fromDict(user)
            flask.g.fas_user.groups = frozenset(flask.g.fas_user.groups)
        flask.g.fas_session_id = 0

    def _check_safe_root(self, url):
        if url is None:
            return None
        if url.startswith(flask.request.url_root) or url.startswith('/'):
            # A URL inside the same app is deemed to always be safe
            return url
        return None

    def login(self, username=None, password=None, return_url=None,
              cancel_url=None, groups=['_FAS_ALL_GROUPS_']):
        """Tries to log in a user.

        Sets the user information on :attr:`flask.g.fas_user`.
        Will set 0 to :attr:`flask.g.fas_session_id, for compatibility
        with flask_fas.

        :kwarg username: Not used, but accepted for compatibility with the
            flask_fas module
        :kwarg password: Not used, but accepted for compatibility with the
            flask_fas module
        :kwarg return_url: The URL to forward the user to after login
        :kwarg groups: A string or a list of group the user should belong
            to to be authentified.
        :returns: True if the user was succesfully authenticated.
        :raises: Might raise an redirect to the OpenID endpoint
        if return_url is None:
            if 'next' in flask.request.args.values():
                return_url = flask.request.args.values['next']
                return_url = flask.request.url_root
        # This makes sure that we only allow stuff where
        # ?next= value is in a safe root (the application
        # root)
        return_url = (self._check_safe_root(return_url) or
        session = {}
        oidconsumer = consumer.Consumer(session, None)
            request = oidconsumer.begin(['FAS_OPENID_ENDPOINT'])
        except consumer.DiscoveryFailure as exc:
            # VERY strange, as this means it could not discover an OpenID
            # endpoint at FAS_OPENID_ENDPOINT
            return 'discoveryfailure'
        if request is None:
            # Also very strange, as this means the discovered OpenID
            # endpoint is no OpenID endpoint
            return 'no-request'

        if isinstance(groups, six.string_types):
            groups = [groups]

            required=['nickname', 'fullname', 'email', 'timezone']))

        ax_req = ax.FetchRequest()

        trust_root = self.normalize_url(flask.request.url_root)
        return_to = trust_root + '_flask_fas_openid_handler/'

        flask.session['FLASK_FAS_OPENID_RETURN_URL'] = return_url
        flask.session['FLASK_FAS_OPENID_CANCEL_URL'] = cancel_url

        if request_wants_json():
            output = request.getMessage(trust_root,
            output['server_url'] = request.endpoint.server_url
            return flask.jsonify(output)
        elif request.shouldSendRedirect():
            redirect_url = request.redirectURL(trust_root, return_to, False)
            return flask.redirect(redirect_url)
            return request.htmlMarkup(
                trust_root, return_to,
                form_tag_attrs={'id': 'openid_message'}, immediate=False)

    def logout(self):
        '''Logout the user associated with this session
        flask.session['FLASK_FAS_OPENID_USER'] = None
        flask.g.fas_session_id = None
        flask.g.fas_user = None
        flask.session.modified = True

    def normalize_url(self, url):
        ''' Replace the scheme prefix of a url with our preferred scheme.
        scheme =['PREFERRED_URL_SCHEME']
        scheme_index = url.index('://')
        return scheme + url[scheme_index:]

# This is a decorator we can use with any HTTP method (except login, obviously)
# to require a login.
# If the user is not logged in, it will redirect them to the login form.
def fas_login_required(function):
    """ Flask decorator to ensure that the user is logged in against FAS.
    To use this decorator you need to have a function named 'auth_login'.
    Without that function the redirect if the user is not logged in will not
    def decorated_function(*args, **kwargs):
        if flask.g.fas_user is None:
            return flask.redirect(flask.url_for('auth_login',
        return function(*args, **kwargs)
    return decorated_function

def cla_plus_one_required(function):
    """ Flask decorator to retrict access to CLA+1.
    To use this decorator you need to have a function named 'auth_login'.
    Without that function the redirect if the user is not logged in will not
    def decorated_function(*args, **kwargs):
        if flask.g.fas_user is None or not flask.g.fas_user.cla_done \
                or len(flask.g.fas_user.groups) < 1:
            # FAS-OpenID does not return cla_ groups
            return flask.redirect(flask.url_for('auth_login',
            return function(*args, **kwargs)
    return decorated_function
openSUSE Build Service is sponsored by