diff options
Diffstat (limited to 'accounts/account.py')
-rw-r--r-- | accounts/account.py | 310 |
1 files changed, 0 insertions, 310 deletions
diff --git a/accounts/account.py b/accounts/account.py index 4c522a0..fdfeba2 100644 --- a/accounts/account.py +++ b/accounts/account.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -import ldap from utils import Service from uuid import uuid4 @@ -13,308 +12,6 @@ SERVICES = [ ] - -class AccountService: - """ - To simplify account management through ldap this class can be used. - The AccountService class is stateless. It means that every request needs - its own authentication request (bind). - - To test you stuff against our test setup use Port-Forwarding - ssh spline -L 5678:vm-account:389 -N - - * register a new user - >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, ADMIN_USER, ADMIN_PW, SERVICES) - >> foo = Account('foo','foo@bar.de', password='bar') - >> service.register(foo) - - * authenticate a new user - >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, ADMIN_USER, ADMIN_PW, SERVICES) - >> foo = service.auth('foo', 'bar') - - * updates an account - >> foo.change_mail('a@b.de') - >> foo.change_password('newpw','oldpw') # changes root password - >> foo.change_password('newpw','oldpw', 'gitlab') # changes password for gitlab - >> service.update(foo) # save changes in ldap backend - # save changes in ldap backend as admin user (no need for old password) - >> service.update(foo, as_admin=True) - - * delete an account - >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, LDAP_ADMIN_USER, LDAP_ADMIN_PASS, SERVICES) - >> service.delete(Account) - >> service.delete('foo') - - * find accounts - >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, LDAP_ADMIN_USER, LDAP_ADMIN_PASS, SERVICES) - >> all_accounts = service.find() # find all accounts - >> print([x.uid for x in all_accounts]) - >> service.find_by_uid('test') # find users by uid - >> service.get_by_uid('test') # same, raise NoSuchUserError if no match - >> service.find_by_mail('test@test.de') # find users by mail - >> service.find_by_uid('test*', wildcard=True) # find with wildcards - - """ - def __init__(self, ldap_host, base_dn, admin_user, admin_pass, services): - self.ldap_host = ldap_host - self.base_dn = base_dn - self.admin_user = admin_user - self.admin_pass = admin_pass - self.services = services - self.admin = False - self.binded = False - - - def auth(self, username, password): - """ - Tries to authenticate a user with a given password. If the - authentication is successful an Account object will be returned. - """ - - self._bind_anonymous() - dn = self._format_dn([('uid', username), ('ou','users')]) - dn_user, data_user = self.connection.search_s(dn, ldap.SCOPE_SUBTREE)[0] - - self._bind_as_user(username, password) - uid = data_user['uid'][0] - mail = data_user['mail'][0] - - dn = self._format_dn([('ou', 'services')]) - filterstr = '(uid=%s)' % self._escape(uid) - data_service = self.connection.search_s(dn, ldap.SCOPE_SUBTREE, filterstr) - - services = [] - for entry in data_service: - cn = filter(lambda x: x.startswith('cn='), entry[0].split(','))[0] - services.append(cn.split('=')[-1]) - - acc = Account(uid, mail, services, dn_user, password) - - self._unbind() - - return acc - - def get_by_uid(self, uid): - """ - Find a single user by uid. Unlike find_by_uid, don't return a list but - raise NoSuchUserError if there is no such user. - """ - users = self.find_by_uid(uid) - if len(users) == 0: - raise NoSuchUserError('No such user') - if len(users) > 1: - raise ShouldNotHappen('Several users for one uid returned.') - - return users[0] - - def get_by_mail(self, mail): - """ - Find a single user by mail. Unlike find_by_mail, don't return a list but - raise NoSuchUserError if there is no such user. - """ - users = self.find_by_mail(mail) - if len(users) == 0: - raise NoSuchUserError('No such user') - if len(users) > 1: - raise ShouldNotHappen('Several users for one mail returned.') - - return users[0] - - def find_by_uid(self, uid, wildcard=False): - return self.find({'uid': uid}, wildcard) - - def find_by_mail(self, mail, wildcard=False): - return self.find({'mail': mail}, wildcard) - - def find(self, filters={}, wildcard=False): - """ - Find accounts by a given filter with key:value semantic) - """ - self._bind_anonymous() - - filters['objectClass'] = 'inetOrgPerson' - - - filter_as_list = ['(%s=%s)' % (k,self._escape(v, wildcard)) for k,v in filters.items()] - filterstr = ''.join(filter_as_list) - if len(filter_as_list) > 1: - filterstr = '(&%s)' % filterstr - - dn = self._format_dn([('ou','users')]) - data = self.connection.search_s(dn, ldap.SCOPE_SUBTREE, filterstr) - - accounts = [] - for a in data: - accounts.append(Account(a[1]['uid'][0], a[1]['mail'][0])) - - self._unbind() - - return accounts - - - def register(self, account): - """ - Persists an account in the ldap backend - """ - self._bind_as_admin() - - dn = self._format_dn([('uid', account.uid),('ou','users')]) - uid = self._escape(account.uid) - attr = [ - ('objectClass', ['top','inetOrgPerson']), - ('uid', uid), ('sn', 'n/a'), ('cn', uid), - ('mail', account.attributes['mail']) - ] - self.connection.add_s(dn, attr) - account.dn = dn - - account.new_password_root = (None, account.password) - self._alter_passwords(account) - - self._unbind() - - - def update(self, account, as_admin=False): - """ - Updates account informations like passwords or email. - """ - if as_admin: - self._bind_as_admin() - else: - self._bind_as_user(account.uid, account.password) - - attr = [(ldap.MOD_REPLACE, k, v) for k, v in account.attributes.items()] - dn = self._format_dn([('uid',account.uid),('ou','users')]) - self.connection.modify_s(dn, attr) - self._alter_passwords(account, as_admin=as_admin) - - self._unbind() - - - def delete(self, account, password=None, as_admin=False): - """ - Deletes an account permanently. - """ - - if isinstance(account, basestring): - raise NotImplementedError() - else: - user = account.uid - password = account.password - - if as_admin: - self._bind_as_admin() - else: - self._bind_as_user(user, password) - - dn = [self._format_dn([('uid',user),('cn',s),('ou','services')]) for s.id in account.services] - dn.append(self._format_dn([('uid', user), ('ou','users')])) - - for x in dn: - self.connection.delete_s(x) - - self._unbind() - - def _format_dn(self, attr, with_base_dn = True): - if with_base_dn: - attr.extend(self.base_dn) - - dn = ['%s=%s' % (item[0], self._escape(item[1])) for item in attr] - - return ','.join(dn) - - def _bind(self, dn, password): - self.connection = ldap.initialize(self.ldap_host) - self.connection.version = ldap.VERSION3 - - self.connection.simple_bind_s(dn, password) - - def _bind_as_admin(self): - if self.binded: - self._unbind() - - dn = self._format_dn([('cn', self.admin_user)]) - self._bind(dn, self.admin_pass) - - self.admin = True - self.binded = True - - def _bind_as_user(self, username, password): - if self.binded: - self._unbind() - - dn = self._format_dn([('uid', username),('ou', 'users')]) - self._bind(dn, password) - - self.binded = True - self.admin = True - - def _bind_anonymous(self): - if self.binded: - self._unbind() - - self._bind('','') - - self.binded = True - - - def _unbind(self): - self.connection.unbind_s() - self.admin = False - self.binded = False - - - def _alter_passwords(self, account, as_admin=False): - if account.new_password_root: - dn = self._format_dn([('uid',account.uid),('ou','users')]) - old, new = account.new_password_root - if as_admin: - self.connection.passwd_s(dn, None, new) - else: - try: - self.connection.passwd_s(dn, old, new) - except ldap.UNWILLING_TO_PERFORM: - raise InvalidPasswordError() - - account.password = new - - account.new_password_root = None - - for service, passwords in account.new_password_services.items(): - dn = self._format_dn([('uid',account.uid),('cn',service),('ou','services')]) - old, new = passwords - - if new != None: - if service not in account.services: - attr = [('objectClass', ['top', 'servicePassword']), ('uid', account.uid)] - self.connection.add_s(dn, attr) - - if as_admin: - self.connection.passwd_s(dn, None, new) - else: - self.connection.passwd_s(dn, old, new) - - else: - s = service.lower() - if s in account.services: - self.connection.delete_s(dn) - account.services.remove(s) - - account.new_password_services = {} - - def _escape(self, s, wildcard=False): - chars_to_escape = ['\\',',','=','+','<','>',';','"','\'','#','(',')','\0'] - - if not wildcard: - chars_to_escape.append('*') - - escape = lambda x,y: x.replace(y,'\%02X' % ord(y)) - - return reduce(escape, chars_to_escape, s) - - - - class Account: """ An Account represents a complex ldap tree entry for spline users. @@ -374,10 +71,3 @@ class Account: raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) - - -class NoSuchUserError(ValueError): - pass - -class InvalidPasswordError(ldap.INVALID_CREDENTIALS): - pass |