File hybrid_identity.py of Package openstack-keystone
# Copyright 2012-2014 SUSE Linux Products GmbH
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Hybrid Identity backend for Keystone on top of the LDAP and SQL backends"""
from keystone.common import dependency
from keystone.common import sql
from keystone.common import utils
from keystone import config
from keystone import exception
from keystone import identity
from keystone.identity.backends import ldap as ldap_backend
from keystone.identity.backends import sql as sql_ident
from keystone.openstack.common import log
CONF = config.CONF
LOG = log.getLogger(__name__)
@dependency.requires('assignment_api')
class Identity(sql_ident.Identity):
def __init__(self, *args, **kwargs):
super(Identity, self).__init__(*args, **kwargs)
self.user = ldap_backend.UserApi(CONF)
self.domain_aware = True
# Identity interface
def authenticate(self, user_id, password):
"""Authenticate based on a user and password.
Tries to authenticate using the SQL backend first, if that fails
it tries the LDAP backend.
"""
if not password:
raise AssertionError('Invalid user / password')
session = sql.get_session()
try:
user_ref = self._get_user(session, user_id)
except exception.UserNotFound:
raise AssertionError('Invalid user / password')
try:
# if the user_ref has a password, it's from the SQL backend and
# we can just check if it coincides with the one we got
assert utils.check_password(password, user_ref['password']), \
'Invalid user / password'
except TypeError:
raise AssertionError('Invalid user / password')
except KeyError: # if it doesn't have a password, it must be LDAP
conn = None
try:
# get_connection does a bind for us which checks the password
conn = self.user.get_connection(self.user._id_to_dn(user_id),
password)
assert conn
except Exception:
raise AssertionError('Invalid user / password')
else:
LOG.debug("Authenticated user with LDAP.")
self.domain_aware = False
finally:
if conn:
conn.unbind_s()
else:
LOG.debug("Authenticated user with SQL.")
# turn the SQLAlchemy User object into a dict to match what
# LDAP would return
user_ref = user_ref.to_dict()
return identity.filter_user(user_ref)
def is_domain_aware(self):
# XXX we only need domain_aware to be False when authenticating
# as an LDAP user; after that, all operations will be done on
# the SQL database and domain_aware needs to be True. This code
# makes the assumption that the result of authenticate() should
# be read as not domain_aware (for LDAP), after which
# domain_aware should revert to True.
domain_aware = self.domain_aware
if not self.domain_aware:
self.domain_aware = True
return domain_aware
def _get_user(self, session, user_id):
# try SQL first
try:
user_ref = super(Identity, self)._get_user(session, user_id)
except exception.UserNotFound:
# then try LDAP
user_ref = self.user.get(user_id)
user_ref['domain_id'] = CONF.identity.default_domain_id
return user_ref
else:
return user_ref
def get_user(self, user_id):
LOG.debug("Called get_user %s" % user_id)
session = sql.get_session()
user = self._get_user(session, user_id)
try:
user = user.to_dict()
except AttributeError:
# LDAP Users are already dicts which is fine
pass
return identity.filter_user(user)
def get_user_by_name(self, user_name, domain_id):
LOG.debug("Called get_user_by_name %s, %s" % (user_name, domain_id))
# try SQL first
try:
user = super(Identity, self).get_user_by_name(user_name, domain_id)
except exception.UserNotFound:
# then try LDAP
user = identity.filter_user(self.user.get_by_name(user_name))
user['domain_id'] = CONF.identity.default_domain_id
return user
else:
return user
def list_users(self, hints):
sql_users = super(Identity, self).list_users(hints)
ldap_users = self.user.get_all_filtered()
for user in ldap_users:
user['domain_id'] = CONF.identity.default_domain_id
return sql_users + ldap_users