# -*- coding: utf-8 -*- import ldap3 from ldap3.utils.conv import escape_filter_chars from ldap3.utils.dn import escape_rdn from . import Backend, InvalidPasswordError, NoSuchUserError, ShouldNotHappen from accounts.models import Account def _escape(value, wildcard=False): if not isinstance(value, str): value = str(value) if not wildcard: value = escape_filter_chars(value) return escape_rdn(value) def _change_password(conn, dn, passwords, as_admin=False): old_password, new_password = passwords if as_admin: conn.extend.standard.modify_password(dn, None, new_password) else: try: conn.extend.standard.modify_password(dn, old_password, new_password) except ldap3.LDAPException: raise InvalidPasswordError('Invalid password') class LdapBackend(Backend): def __init__(self, app): super(LdapBackend, self).__init__(app) self.host = self.app.config['LDAP_HOST'] self.base_dn = self.app.config['LDAP_BASE_DN'] self.admin_user = self.app.config['LDAP_ADMIN_USER'] self.admin_pass = self.app.config['LDAP_ADMIN_PASS'] self.services = self.app.all_services self.admin = False self.binded = False def auth(self, username, password): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. """ user_dn = self._format_dn([('uid', username), ('ou', 'users')]) conn = self._connect(user_dn, password) uid = None mail = None uidNumber = None services = [] conn.search(user_dn, '(objectClass=*)', attributes=['objectClass', 'uid', 'mail', 'cn', 'uidNumber']) for entry in conn.entries: if 'splineAccount' in entry.objectClass.values: uid = entry.uid.value mail = entry.mail.value uidNumber = entry.uidNumber.value elif 'servicePassword' in entry.objectClass.value: services.append(entry.cn.value) if uid is None or mail is None or uidNumber is None: raise NoSuchUserError("User not found") return Account(uid, mail, services, password, uidNumber=uidNumber) def find(self, filters=None, wildcard=False): """ Find accounts by a given filter. """ if filters is None: filters = dict() filters['objectClass'] = 'splineAccount' filter_as_list = ['(%s=%s)' % (attr, _escape(value, wildcard)) for attr, value in list(filters.items())] filterstr = '(&%s)' % ''.join(filter_as_list) conn = self._connect() base_dn = self._format_dn([('ou', 'users')]) accounts = [] try: conn.search(base_dn, filterstr, search_scope=ldap3.LEVEL, attributes=['uid', 'mail', 'uidNumber']) for entry in conn.entries: accounts.append(Account(entry.uid.value, entry.mail.value, uidNumber=entry.uidNumber.value)) except ldap3.LDAPException: pass return accounts def _store(self, account): conn = self._connect_as_admin() user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) attrs = { 'objectClass': ['top', 'inetOrgPerson', 'splineAccount'], 'uid': _escape(account.uid), 'sn': 'n/a', 'cn': _escape(account.uid), 'mail': _escape(account.mail), 'uidNumber': _escape(account.uidNumber), } conn.add(user_dn, attributes=attrs) account.new_password_root = (None, account.password) self._alter_passwords(conn, account) def update(self, account, as_admin=False): """ Updates account informations like passwords or email. """ conn = None user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) if as_admin: conn = self._connect_as_admin() else: conn = self._connect(user_dn, account.password) attrs = {key: [(ldap3.MODIFY_REPLACE, [value])] for key, value in list(account.attributes.items())} conn.modify(user_dn, attrs) self._alter_passwords(conn, account, as_admin=as_admin) def delete(self, account, as_admin=False): """ Deletes an account permanently. """ conn = None if as_admin: conn = self._connect_as_admin() else: conn = self._connect(account.uid, account.password) dns = [[('cn', service.id), ('uid', account.uid), ('ou', 'users')] for service in account.services] + \ [[('uid', account.uid), ('ou', 'users')]] for dn in dns: conn.delete(self._format_dn(dn)) def _format_dn(self, attr, with_base_dn=True): if with_base_dn: attr.extend(self.base_dn) dn = ['%s=%s' % (item[0], _escape(item[1])) for item in attr] return ','.join(dn) def _connect(self, user=None, password=None): server = ldap3.Server(self.host) conn = ldap3.Connection(server, user, password, raise_exceptions=True) try: conn.bind() except ldap3.LDAPInvalidCredentialsResult: raise InvalidPasswordError('Invalid password') return conn def _connect_as_admin(self): admin_dn = self._format_dn([('cn', self.admin_user)]) return self._connect(admin_dn, self.admin_pass) def _alter_passwords(self, conn, account, as_admin=False): if account.new_password_root: user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) _change_password(conn, user_dn, account.new_password_root, as_admin) _, account.password = account.new_password_root account.new_password_root = None for service, passwords in list(account.new_password_services.items()): service_id = service.lower() service_dn = self._format_dn([('cn', service_id), ('uid', account.uid), ('ou', 'users')]) _, new = passwords if new != None: if service_id not in account.services: attrs = { 'objectClass': ['top', 'servicePassword'], 'cn': _escape(service_id), } conn.add(service_dn, attributes=attrs) account.services.append(service_id) _change_password(conn, service_dn, passwords, as_admin) else: if service_id in account.services: conn.delete(service_dn) account.services.remove(service_id) del account.new_password_services[service] def _get_last_uidNumber(self, conn): uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')]) conn.search(uidNumber_dn, '(objectClass=uidNumberMaximum)', attributes=['uidNumber']) for entry in conn.entries: return int(entry.uidNumber.value) raise ShouldNotHappen('Last uidNumber not found.') def _get_next_uidNumber(self): conn = self._connect_as_admin() uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')]) uidNumber = self._get_last_uidNumber(conn) # Try to acquire next uidNumber for i in [0, 1, 2, 3, 4, 5]: try: conn.modify(uidNumber_dn, {'uidNumber': [ (ldap3.MODIFY_DELETE, ['%d' % (uidNumber + i)]), (ldap3.MODIFY_ADD, ['%d' % (uidNumber + i + 1)]), ]}) if conn.result['result'] == ldap3.RESULT_SUCCESS: return uidNumber + i + 1 except ldap3.LDAPOperationResult: pass raise ShouldNotHappen('Unable to get next uidNumber, try again.')