# -*- coding: utf-8 -*- import ldap from utils import Service LDAP_HOST = 'ldap://localhost:5678' LDAP_BASE_DN = 'dc=account,dc=spline,dc=inf,dc=fu-berlin,dc=de' LDAP_ADMIN_USER = 'admin' LDAP_ADMIN_PASS = 'admin' SERVICES = [ Service('foren', 'Foren', 'http://foren.spline.de/'), Service('jabber', 'Jabber', 'http://jabber.spline.de/'), Service('gitlab', 'Gitlab', 'https://gitlab.spline.inf.fu-berlin.de/'), Service('osqa', 'OS Q&A', 'http://osqa.spline.de/'), ] class AccountService: """ To simplify account management through ldap this class can be used. The AccountService class is stateless. It means that every request needs its own authentication request (bind). To test you stuff against our test setup use Port-Forwarding ssh spline -L 5678:vm-account:389 -N * register a new user >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, ADMIN_USER, ADMIN_PW, SERVICES) >> foo = Account('foo','foo@bar.de', password='bar') >> service.register(foo) * authenticate a new user >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, ADMIN_USER, ADMIN_PW, SERVICES) >> foo = service.auth('foo', 'bar') * updates an account >> foo.change_mail('a@b.de') >> foo.change_password('newpw','oldpw') # changes root password >> foo.change_password('newpw','oldpw', 'gitlab') # changes password for gitlab >> service.update(foo) # save changes in ldap backend # save changes in ldap backend as admin user (no need for old password) >> service.update(foo, as_admin=True) * delete an account >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, LDAP_ADMIN_USER, LDAP_ADMIN_PASS, SERVICES) >> service.delete(Account) >> service.delete('foo') * find accounts >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, LDAP_ADMIN_USER, LDAP_ADMIN_PASS, SERVICES) >> all_accounts = service.find() # find all accounts >> print([x.uid for x in all_accounts]) >> service.find_by_uid('test') # find users by uid >> service.find_by_mail('test@test.de') # find users by mail >> service.find_by_uid('test*', wildcard=True) # find with wildcards """ def __init__(self, ldap_host, base_dn, admin_user, admin_pass, services): self.ldap_host = ldap_host self.base_dn = base_dn self.admin_user = admin_user self.admin_pass = admin_pass self.services = services self.admin = False def auth(self, username, password): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. """ dn = 'uid=%s,ou=users,%s' % (username, self.base_dn) self._bind(dn, password) dn_user, data_user = self.connection.search_s(dn, ldap.SCOPE_SUBTREE)[0] uid = data_user['uid'][0] mail = data_user['mail'][0] dn = 'ou=services,%s' % self.base_dn filterstr = '(uid=%s)' % uid data_service = self.connection.search_s(dn,ldap.SCOPE_SUBTREE,filterstr) services = [] for entry in data_service: cn = filter(lambda x: x.startswith('cn='), entry[0].split(','))[0] services.append(cn.split('=')[-1]) acc = Account(uid, mail, services, dn_user, password) self._unbind() return acc def find_by_uid(self, uid, wildcard=False): return self.find({'uid':uid}, wildcard) def find_by_mail(self, mail, wildcard=False): return self.find({'mail':mail}, wildcard) def find(self, filters = {}, wildcard=False): """ Find accounts by a given filter with key:value semantic) """ self._bind_anonymous() dn = 'ou=users,%s' % self.base_dn filters['objectClass'] = 'inetOrgPerson' chars_to_escape = ['\\','(',')','\0'] if not wildcard: chars_to_escape.append('*') escape = lambda x,y: x.replace(y,'\%s' % y) filter_as_list = ['(%s=%s)' % (k,reduce(escape, chars_to_escape, v)) for k,v in filters.items()] filterstr = ''.join(filter_as_list) if len(filter_as_list) > 1: filterstr = '(&%s)' % filterstr data = self.connection.search_s(dn,ldap.SCOPE_SUBTREE,filterstr) accounts = [] for a in data: accounts.append(Account(a[1]['uid'],a[1]['mail'])) self._unbind() return accounts def register(self, account): """ Persists an account in the ldap backend """ self._bind_as_admin() dn = 'uid=%s,ou=users,%s' % (account.uid, self.base_dn) attr = [ ('objectClass', ['top','inetOrgPerson']), ('uid', account.uid), ('sn', ' '), ('cn', ' '), ('mail', account.mail) ] self.connection.add_s(dn, attr) account.dn = dn account.new_password_root = (None,account.password) self._alter_passwords(account) self._unbind() def update(self, account, as_admin=False): """ Updates account informations like passwords or email. """ if as_admin: self._bind_as_admin() else: user = 'uid=%s,ou=users' % account.uid password = account.password self._bind('%s,%s' % (user, self.base_dn), password) attr = [(ldap.MOD_REPLACE, 'mail', account.mail)] dn = 'uid=%s,ou=users,%s' % (account.uid, self.base_dn) self.connection.modify_s(dn, attr) self._alter_passwords(account) self._unbind() def delete(self, account, password=None, as_admin=False): """ Deletes an account permanently. """ if isinstance(account, basestring): raise NotImplementedError() else: user = account.dn password = account.password if as_admin: self._bind_as_admin() else: self._bind('%s,%s' % (user, self.base_dn), password) dn = ['uid=%s,cn=%s,ou=services,%s' % (account.uid,s,self.base_dn) for s.id in account.services] dn.append(user) for x in dn: self.connection.delete_s(x) self._unbind() def _bind(self, dn, password): self.connection = ldap.initialize(self.ldap_host) self.connection.version = ldap.VERSION3 self.connection.simple_bind_s(dn, password) def _bind_as_admin(self): self.admin = True self._bind('cn=%s,%s' % (self.admin_user, self.base_dn), self.admin_pass) def _bind_anonymous(self): self._bind('','') def _unbind(self): self.connection.unbind_s() self.admin = False def _alter_passwords(self, account): if account.new_password_root: dn = 'uid=%s,ou=users,%s' % (account.uid, self.base_dn) old, new = account.new_password_root if self.admin: self.connection.passwd_s(dn, None, new) else: self.connection.passwd_s(dn, old, new) account.password = new account.new_password_root = None for service, passwords in account.new_password_services.items(): dn = 'uid=%s,cn=%s,ou=services,%s' % (account.uid, service, self.base_dn) old, new = passwords if self.admin: self.connection.passwd_s(dn, None, new) else: self.connection.passwd_s(dn, old, new) account.new_password_services = {} class Account: """ An Account represents a complex ldap tree entry for spline users. For each service a spline user can have a different password. """ def __init__(self, uid, mail, services=[], dn=None, password=None): self.uid = uid.encode('utf8') if isinstance(uid, unicode) else uid self.mail = mail.encode('utf8') if isinstance(mail, unicode) else mail self.services = services self.dn = dn self.password = password.encode('utf8') if isinstance(password, unicode) else password self.new_password_root = None self.new_password_services = {} def __repr__(self): return "" % self.uid def change_password(self, new_password, old_password='', service=None): """ Changes a password for a given service. You have to use the AccountService class to make the changes permanent. If no service is given, the root password will be changed. """ if isinstance(new_password, unicode): new_password = new_password.encode('utf8') if isinstance(old_password, unicode): old_password = old_password.encode('utf8') if not service: self.new_password_root = (old_password, new_password) else: self.new_password_services[service] = (old_password, new_password) def change_email(self, new_mail): """ Changes the mail address of an account. You have to use the AccountService class to make changes permanent. """ if isinstance(new_mail, unicode): new_mail = new_mail.encode('utf8') self.mail = new_mail