# -*- coding: utf-8 -*- import ldap from utils import Service from uuid import uuid4 SERVICES = [ #Service('foren', 'Foren', 'http://foren.spline.de/'), #Service('jabber', 'Jabber', 'http://jabber.spline.de/'), Service('gitlab', 'Gitlab', 'https://gitlab.spline.inf.fu-berlin.de/'), Service('osqa', 'OS Q&A', 'http://osqa.spline.de/'), ] class AccountService: """ To simplify account management through ldap this class can be used. The AccountService class is stateless. It means that every request needs its own authentication request (bind). To test you stuff against our test setup use Port-Forwarding ssh spline -L 5678:vm-account:389 -N * register a new user >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, ADMIN_USER, ADMIN_PW, SERVICES) >> foo = Account('foo','foo@bar.de', password='bar') >> service.register(foo) * authenticate a new user >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, ADMIN_USER, ADMIN_PW, SERVICES) >> foo = service.auth('foo', 'bar') * updates an account >> foo.change_mail('a@b.de') >> foo.change_password('newpw','oldpw') # changes root password >> foo.change_password('newpw','oldpw', 'gitlab') # changes password for gitlab >> service.update(foo) # save changes in ldap backend # save changes in ldap backend as admin user (no need for old password) >> service.update(foo, as_admin=True) * delete an account >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, LDAP_ADMIN_USER, LDAP_ADMIN_PASS, SERVICES) >> service.delete(Account) >> service.delete('foo') * find accounts >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, LDAP_ADMIN_USER, LDAP_ADMIN_PASS, SERVICES) >> all_accounts = service.find() # find all accounts >> print([x.uid for x in all_accounts]) >> service.find_by_uid('test') # find users by uid >> service.get_by_uid('test') # same, raise NoSuchUserError if no match >> service.find_by_mail('test@test.de') # find users by mail >> service.find_by_uid('test*', wildcard=True) # find with wildcards """ def __init__(self, ldap_host, base_dn, admin_user, admin_pass, services): self.ldap_host = ldap_host self.base_dn = base_dn self.admin_user = admin_user self.admin_pass = admin_pass self.services = services self.admin = False self.binded = False def auth(self, username, password): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. """ self._bind_anonymous() dn = self._format_dn([('uid', username), ('ou','users')]) dn_user, data_user = self.connection.search_s(dn, ldap.SCOPE_SUBTREE)[0] self._bind_as_user(username, password) uid = data_user['uid'][0] mail = data_user['mail'][0] dn = self._format_dn([('ou', 'services')]) filterstr = '(uid=%s)' % self._escape(uid) data_service = self.connection.search_s(dn, ldap.SCOPE_SUBTREE, filterstr) services = [] for entry in data_service: cn = filter(lambda x: x.startswith('cn='), entry[0].split(','))[0] services.append(cn.split('=')[-1]) acc = Account(uid, mail, services, dn_user, password) self._unbind() return acc def get_by_uid(self, uid): """ Find a single user by uid. Unlike find_by_uid, don't return a list but raise NoSuchUserError if there is no such user. """ users = self.find_by_uid(uid) if len(users) != 1: raise NoSuchUserError('No such user') return users[0] def get_by_mail(self, mail): """ Find a single user by mail. Unlike find_by_mail, don't return a list but raise NoSuchUserError if there is no such user. """ users = self.find_by_mail(mail) if len(users) != 1: raise NoSuchUserError('No such user') return users[0] def find_by_uid(self, uid, wildcard=False): return self.find({'uid': uid}, wildcard) def find_by_mail(self, mail, wildcard=False): return self.find({'mail': mail}, wildcard) def find(self, filters={}, wildcard=False): """ Find accounts by a given filter with key:value semantic) """ self._bind_anonymous() filters['objectClass'] = 'inetOrgPerson' filter_as_list = ['(%s=%s)' % (k,self._escape(v, wildcard)) for k,v in filters.items()] filterstr = ''.join(filter_as_list) if len(filter_as_list) > 1: filterstr = '(&%s)' % filterstr dn = self._format_dn([('ou','users')]) data = self.connection.search_s(dn, ldap.SCOPE_SUBTREE, filterstr) accounts = [] for a in data: accounts.append(Account(a[1]['uid'][0], a[1]['mail'][0])) self._unbind() return accounts def register(self, account): """ Persists an account in the ldap backend """ self._bind_as_admin() dn = self._format_dn([('uid', account.uid),('ou','users')]) attr = [ ('objectClass', ['top','inetOrgPerson']), ('uid', self._escape(account.uid)), ('sn', ' '), ('cn', ' '), ('mail', account.mail) ] self.connection.add_s(dn, attr) account.dn = dn account.new_password_root = (None, account.password) self._alter_passwords(account) self._unbind() def update(self, account, as_admin=False): """ Updates account informations like passwords or email. """ if as_admin: self._bind_as_admin() else: self._bind_as_user(account.uid, account.password) attr = [(ldap.MOD_REPLACE, 'mail', account.mail)] dn = self._format_dn([('uid',account.uid),('ou','users')]) self.connection.modify_s(dn, attr) self._alter_passwords(account, as_admin=as_admin) self._unbind() def delete(self, account, password=None, as_admin=False): """ Deletes an account permanently. """ if isinstance(account, basestring): raise NotImplementedError() else: user = account.uid password = account.password if as_admin: self._bind_as_admin() else: self._bind_as_user(user, password) dn = [self._format_dn([('uid',user),('cn',s),('ou','services')]) for s.id in account.services] dn.append(self._format_dn([('uid', user), ('ou','users')])) for x in dn: self.connection.delete_s(x) self._unbind() def _format_dn(self, attr, with_base_dn = True): if with_base_dn: attr.extend(self.base_dn) dn = ['%s=%s' % (item[0], self._escape(item[1])) for item in attr] return ','.join(dn) def _bind(self, dn, password): self.connection = ldap.initialize(self.ldap_host) self.connection.version = ldap.VERSION3 self.connection.simple_bind_s(dn, password) def _bind_as_admin(self): if self.binded: self._unbind() dn = self._format_dn([('cn', self.admin_user)]) self._bind(dn, self.admin_pass) self.admin = True self.binded = True def _bind_as_user(self, username, password): if self.binded: self._unbind() dn = self._format_dn([('uid', username),('ou', 'users')]) self._bind(dn, password) self.binded = True self.admin = True def _bind_anonymous(self): if self.binded: self._unbind() self._bind('','') self.binded = True def _unbind(self): self.connection.unbind_s() self.admin = False self.binded = False def _alter_passwords(self, account, as_admin=False): if account.new_password_root: dn = self._format_dn([('uid',account.uid),('ou','users')]) old, new = account.new_password_root if as_admin: self.connection.passwd_s(dn, None, new) else: try: self.connection.passwd_s(dn, old, new) except ldap.UNWILLING_TO_PERFORM: raise InvalidPasswordError() account.password = new account.new_password_root = None for service, passwords in account.new_password_services.items(): dn = self._format_dn([('uid',account.uid),('cn',service),('ou','services')]) old, new = passwords if new != None: if service not in account.services: attr = [('objectClass', ['top', 'servicePassword']), ('uid', account.uid)] self.connection.add_s(dn, attr) if as_admin: self.connection.passwd_s(dn, None, new) else: self.connection.passwd_s(dn, old, new) else: s = service.lower() if s in account.services: self.connection.delete_s(dn) account.services.remove(s) account.new_password_services = {} def _escape(self, s, wildcard=False): chars_to_escape = ['\\',',','=','+','<','>',';','"','\'','#','(',')','\0'] if not wildcard: chars_to_escape.append('*') escape = lambda x,y: x.replace(y,'\%02X' % ord(y)) return reduce(escape, chars_to_escape, s) class Account: """ An Account represents a complex ldap tree entry for spline users. For each service a spline user can have a different password. """ def __init__(self, uid, mail, services=[], dn=None, password=None): self.uid = uid.encode('utf8') if isinstance(uid, unicode) else uid self.mail = mail.encode('utf8') if isinstance(mail, unicode) else mail self.services = services self.dn = dn self.password = password.encode('utf8') if isinstance(password, unicode) else password self.new_password_root = None self.new_password_services = {} def __repr__(self): return "" % self.uid def reset_password(self, service): self.new_password_services[service] = (None, None) def change_password(self, new_password, old_password='', service=None): """ Changes a password for a given service. You have to use the AccountService class to make the changes permanent. If no service is given, the root password will be changed. """ if isinstance(new_password, unicode): new_password = new_password.encode('utf8') if isinstance(old_password, unicode): old_password = old_password.encode('utf8') if not service: self.new_password_root = (old_password, new_password) else: self.new_password_services[service] = (old_password, new_password) def change_email(self, new_mail): """ Changes the mail address of an account. You have to use the AccountService class to make changes permanent. """ if isinstance(new_mail, unicode): new_mail = new_mail.encode('utf8') self.mail = new_mail class NoSuchUserError(ValueError): pass class InvalidPasswordError(ldap.INVALID_CREDENTIALS): pass