# -*- coding: utf-8 -*- import ldap LDAP_HOST = 'ldap://localhost:5678' LDAP_BASE_DN = 'dc=account,dc=spline,dc=inf,dc=fu-berlin,dc=de' LDAP_ADMIN_USER = 'admin' LDAP_ADMIN_PASS = 'admin' SERVICES = ['foren','jabber', 'gitlab'] class AccountService: """ To simplify account management through ldap this class can be used. The AccountService class is stateless. It means that every request needs its own authentication request (bind). To test you stuff against our test setup use Port-Forwarding ssh spline -L 5678:vm-splinux:389 -N * register a new user >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, ADMIN_USER, ADMIN_PW, SERVICES) >> foo = Account('foo','foo@bar.de', password='bar') >> service.register(foo) * authenticate a new user >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, ADMIN_USER, ADMIN_PW,, SERVICES) >> foo = service.auth('foo', 'bar') * updates an account >> foo.change_mail('a@b.de') >> foo.change_password('bar2') # changes root password >> foo.change_password('bar2', 'gitlab') # changes password for gitlab >> service.update(foo) # save changes in ldap backend # save changes in ldap backend as admin user >> service.update(foo, as_admin=True) * delete an account >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, LDAP_ADMIN_USER, LDAP_ADMIN_PASS, SERVICES) >> service.delete(Account) >> service.delete('foo') * find accounts >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, LDAP_ADMIN_USER, LDAP_ADMIN_PASS, SERVICES) >> all_accounts = service.find() >> print([x.uid for x in all_accounts]) """ def __init__(self, ldap_host, base_dn, admin_user, admin_pass, services): self.ldap_host = ldap_host self.base_dn = base_dn self.admin_user = admin_user self.admin_pass = admin_pass self.services = services def auth(self, username, password): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. """ dn = 'uid=%s,ou=users,%s' % (username, self.base_dn) self._bind(dn, password) dn_user, data_user = self.connection.search_s(dn, ldap.SCOPE_SUBTREE)[0] uid = data_user['uid'][0] mail = data_user['mail'][0] dn = 'ou=services,%s' % self.base_dn filterstr = '(uid=%s)' % uid data_service = self.connection.search_s(dn,ldap.SCOPE_SUBTREE,filterstr) services = [] for entry in data_service: cn = filter(lambda x: x.startswith('cn='), entry[0].split(','))[0] services.append(cn.split('=')[-1]) acc = Account(uid, mail, services, dn_user, password) self._unbind() return acc def find(self, filterstr = '(objectClass=*)'): """ Find accounts with raw ldap filter syntax """ self._bind_as_admin() dn = 'ou=users,%s' % self.base_dn data = self.connection.search_s(dn,ldap.SCOPE_SUBTREE,filterstr) accounts = [] for a in data[1:]: accounts.append(Account(a[1]['uid'],a[1]['mail'])) self._unbind() return accounts def register(self, account): """ Persists an account in the ldap backend """ self._bind_as_admin() dn = 'uid=%s,ou=users,%s' % (account.uid, self.base_dn) attr = [ ('objectClass', ['top','inetOrgPerson']), ('uid', account.uid), ('sn', ' '), ('cn', ' '), ('mail', account.mail), ('userPassword', account.password) ] self.connection.add_s(dn, attr) account.dn = dn self._alter_passwords(account) self._unbind() def update(self, account, as_admin=False): """ Updates account informations like passwords or email. """ if as_admin: self._bind_as_admin() else: user = 'uid=%s,ou=users' % account.uid password = account.password self._bind('%s,%s' % (user, self.base_dn), password) attr = [(ldap.MOD_REPLACE, 'mail', account.mail)] dn = 'uid=%s,ou=users,%s' % (account.uid, self.base_dn) self.connection.modify_s(dn, attr) self._alter_passwords(account) self._unbind() def delete(self, account, password=None, as_admin=False): """ Deletes an account permanently. """ if isinstance(account, basestring): raise NotImplementedError() else: user = account.dn password = account.password if as_admin: self._bind_as_admin() else: self._bind('%s,%s' % (user, self.base_dn), password) dn = ['uid=%s,cn=%s,ou=services,%s' % (account.uid,s,self.base_dn) for s in account.services] dn.append(user) for x in dn: self.connection.delete_s(x) self._unbind() def _bind(self, dn, password): self.connection = ldap.initialize(self.ldap_host) self.connection.version = ldap.VERSION3 self.connection.simple_bind_s(dn, password) def _bind_as_admin(self): self._bind('cn=%s,%s' % (self.admin_user, self.base_dn), self.admin_pass) def _unbind(self): self.connection.unbind_s() def _alter_passwords(self, account): if account.new_password_root: attr = [ (ldap.MOD_REPLACE,'userPassword',account.new_password_root) ] dn = 'uid=%s,ou=users,%s' % (account.uid, self.base_dn) self.connection.modify_s(dn, attr) account.new_password_root = None for service, password in account.new_password_services.items(): attr = [ (ldap.MOD_REPLACE, 'userPassword', password) ] dn = 'uid=%s,cn=%s,ou=services,%s' % (account.uid, service, self.base_dn) self.connection.modify_s(dn, attr) account.new_password_services = {} class Account: """ An Account represents a complex ldap tree entry for spline users. For each service a spline user can have a different password. """ def __init__(self, uid, mail, services = [], dn = None, password = None): self.uid = uid.encode('utf8') if isinstance(uid, unicode) else uid self.mail = mail.encode('utf8') if isinstance(mail, unicode) else mail self.services = services self.dn = dn self.password = password.encode('utf8') if isinstance(password, unicode) else password self.new_password_root = None self.new_password_services = {} def __repr__(self): return "" % self.uid def change_password(self, new_password, service=None): """ Changes a password for a given service. You have to use the AccountService class to make the changes permanent. If no service is given, the root password will be changed. """ if isinstance(new_password, unicode): new_password = new_password.encode('utf8') if not service: self.new_password_root = new_password else: self.new_password_services[service] = new_password def change_email(self, new_mail): """ Changes the mail address of an account. You have to use the AccountService class to make changes permanent. """ if isinstance(new_mail, unicode): new_mail = new_mail.encode('utf8') self.mail = new_mail