Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
Cloud:OpenStack:Icehouse
openstack-keystone
hybrid_identity.py
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File hybrid_identity.py of Package openstack-keystone
# Copyright 2012-2014 SUSE Linux Products GmbH # Copyright 2012 OpenStack LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Hybrid Identity backend for Keystone on top of the LDAP and SQL backends""" from keystone.common import dependency from keystone.common import sql from keystone.common import utils from keystone import config from keystone import exception from keystone import identity from keystone.identity.backends import ldap as ldap_backend from keystone.identity.backends import sql as sql_ident from keystone.openstack.common import log CONF = config.CONF LOG = log.getLogger(__name__) @dependency.requires('assignment_api') class Identity(sql_ident.Identity): def __init__(self, *args, **kwargs): super(Identity, self).__init__(*args, **kwargs) self.user = ldap_backend.UserApi(CONF) self.domain_aware = True # Identity interface def authenticate(self, user_id, password): """Authenticate based on a user and password. Tries to authenticate using the SQL backend first, if that fails it tries the LDAP backend. """ if not password: raise AssertionError('Invalid user / password') session = sql.get_session() try: user_ref = self._get_user(session, user_id) except exception.UserNotFound: raise AssertionError('Invalid user / password') try: # if the user_ref has a password, it's from the SQL backend and # we can just check if it coincides with the one we got assert utils.check_password(password, user_ref['password']) except TypeError: raise AssertionError('Invalid user / password') except KeyError: # if it doesn't have a password, it must be LDAP conn = None try: # get_connection does a bind for us which checks the password conn = self.user.get_connection(self.user._id_to_dn(user_id), password) assert conn except Exception: raise AssertionError('Invalid user / password') else: LOG.debug("Authenticated user with LDAP.") self.domain_aware = False finally: if conn: conn.unbind_s() else: LOG.debug("Authenticated user with SQL.") # turn the SQLAlchemy User object into a dict to match what # LDAP would return user_ref = user_ref.to_dict() return identity.filter_user(user_ref) def is_domain_aware(self): # XXX we only need domain_aware to be False when authenticating # as an LDAP user; after that, all operations will be done on # the SQL database and domain_aware needs to be True. This code # makes the assumption that the result of authenticate() should # be read as not domain_aware (for LDAP), after which # domain_aware should revert to True. domain_aware = self.domain_aware if not self.domain_aware: self.domain_aware = True return domain_aware def _get_user(self, session, user_id): # try SQL first try: user_ref = super(Identity, self)._get_user(session, user_id) except exception.UserNotFound: # then try LDAP user_ref = self.user.get(user_id) user_ref['domain_id'] = CONF.identity.default_domain_id return user_ref else: return user_ref def get_user(self, user_id): LOG.debug("Called get_user %s" % user_id) session = sql.get_session() user = self._get_user(session, user_id) try: user = user.to_dict() except AttributeError: # LDAP Users are already dicts which is fine pass return identity.filter_user(user) def get_user_by_name(self, user_name, domain_id): LOG.debug("Called get_user_by_name %s, %s" % (user_name, domain_id)) # try SQL first try: user = super(Identity, self).get_user_by_name(user_name, domain_id) except exception.UserNotFound: # then try LDAP user = identity.filter_user(self.user.get_by_name(user_name)) user['domain_id'] = CONF.identity.default_domain_id return user else: return user def list_users(self, hints): sql_users = super(Identity, self).list_users(hints) ldap_users = self.user.get_all_filtered() for user in ldap_users: user['domain_id'] = CONF.identity.default_domain_id return sql_users + ldap_users
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor