summaryrefslogtreecommitdiffstats
path: root/accounts/account.py
diff options
context:
space:
mode:
Diffstat (limited to 'accounts/account.py')
-rw-r--r--accounts/account.py310
1 files changed, 0 insertions, 310 deletions
diff --git a/accounts/account.py b/accounts/account.py
index 4c522a0..fdfeba2 100644
--- a/accounts/account.py
+++ b/accounts/account.py
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
-import ldap
from utils import Service
from uuid import uuid4
@@ -13,308 +12,6 @@ SERVICES = [
]
-
-class AccountService:
- """
- To simplify account management through ldap this class can be used.
- The AccountService class is stateless. It means that every request needs
- its own authentication request (bind).
-
- To test you stuff against our test setup use Port-Forwarding
- ssh spline -L 5678:vm-account:389 -N
-
- * register a new user
- >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, ADMIN_USER, ADMIN_PW, SERVICES)
- >> foo = Account('foo','foo@bar.de', password='bar')
- >> service.register(foo)
-
- * authenticate a new user
- >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, ADMIN_USER, ADMIN_PW, SERVICES)
- >> foo = service.auth('foo', 'bar')
-
- * updates an account
- >> foo.change_mail('a@b.de')
- >> foo.change_password('newpw','oldpw') # changes root password
- >> foo.change_password('newpw','oldpw', 'gitlab') # changes password for gitlab
- >> service.update(foo) # save changes in ldap backend
- # save changes in ldap backend as admin user (no need for old password)
- >> service.update(foo, as_admin=True)
-
- * delete an account
- >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, LDAP_ADMIN_USER, LDAP_ADMIN_PASS, SERVICES)
- >> service.delete(Account)
- >> service.delete('foo')
-
- * find accounts
- >> service = AccountService(LDAP_HOST, LDAP_BASE_DN, LDAP_ADMIN_USER, LDAP_ADMIN_PASS, SERVICES)
- >> all_accounts = service.find() # find all accounts
- >> print([x.uid for x in all_accounts])
- >> service.find_by_uid('test') # find users by uid
- >> service.get_by_uid('test') # same, raise NoSuchUserError if no match
- >> service.find_by_mail('test@test.de') # find users by mail
- >> service.find_by_uid('test*', wildcard=True) # find with wildcards
-
- """
- def __init__(self, ldap_host, base_dn, admin_user, admin_pass, services):
- self.ldap_host = ldap_host
- self.base_dn = base_dn
- self.admin_user = admin_user
- self.admin_pass = admin_pass
- self.services = services
- self.admin = False
- self.binded = False
-
-
- def auth(self, username, password):
- """
- Tries to authenticate a user with a given password. If the
- authentication is successful an Account object will be returned.
- """
-
- self._bind_anonymous()
- dn = self._format_dn([('uid', username), ('ou','users')])
- dn_user, data_user = self.connection.search_s(dn, ldap.SCOPE_SUBTREE)[0]
-
- self._bind_as_user(username, password)
- uid = data_user['uid'][0]
- mail = data_user['mail'][0]
-
- dn = self._format_dn([('ou', 'services')])
- filterstr = '(uid=%s)' % self._escape(uid)
- data_service = self.connection.search_s(dn, ldap.SCOPE_SUBTREE, filterstr)
-
- services = []
- for entry in data_service:
- cn = filter(lambda x: x.startswith('cn='), entry[0].split(','))[0]
- services.append(cn.split('=')[-1])
-
- acc = Account(uid, mail, services, dn_user, password)
-
- self._unbind()
-
- return acc
-
- def get_by_uid(self, uid):
- """
- Find a single user by uid. Unlike find_by_uid, don't return a list but
- raise NoSuchUserError if there is no such user.
- """
- users = self.find_by_uid(uid)
- if len(users) == 0:
- raise NoSuchUserError('No such user')
- if len(users) > 1:
- raise ShouldNotHappen('Several users for one uid returned.')
-
- return users[0]
-
- def get_by_mail(self, mail):
- """
- Find a single user by mail. Unlike find_by_mail, don't return a list but
- raise NoSuchUserError if there is no such user.
- """
- users = self.find_by_mail(mail)
- if len(users) == 0:
- raise NoSuchUserError('No such user')
- if len(users) > 1:
- raise ShouldNotHappen('Several users for one mail returned.')
-
- return users[0]
-
- def find_by_uid(self, uid, wildcard=False):
- return self.find({'uid': uid}, wildcard)
-
- def find_by_mail(self, mail, wildcard=False):
- return self.find({'mail': mail}, wildcard)
-
- def find(self, filters={}, wildcard=False):
- """
- Find accounts by a given filter with key:value semantic)
- """
- self._bind_anonymous()
-
- filters['objectClass'] = 'inetOrgPerson'
-
-
- filter_as_list = ['(%s=%s)' % (k,self._escape(v, wildcard)) for k,v in filters.items()]
- filterstr = ''.join(filter_as_list)
- if len(filter_as_list) > 1:
- filterstr = '(&%s)' % filterstr
-
- dn = self._format_dn([('ou','users')])
- data = self.connection.search_s(dn, ldap.SCOPE_SUBTREE, filterstr)
-
- accounts = []
- for a in data:
- accounts.append(Account(a[1]['uid'][0], a[1]['mail'][0]))
-
- self._unbind()
-
- return accounts
-
-
- def register(self, account):
- """
- Persists an account in the ldap backend
- """
- self._bind_as_admin()
-
- dn = self._format_dn([('uid', account.uid),('ou','users')])
- uid = self._escape(account.uid)
- attr = [
- ('objectClass', ['top','inetOrgPerson']),
- ('uid', uid), ('sn', 'n/a'), ('cn', uid),
- ('mail', account.attributes['mail'])
- ]
- self.connection.add_s(dn, attr)
- account.dn = dn
-
- account.new_password_root = (None, account.password)
- self._alter_passwords(account)
-
- self._unbind()
-
-
- def update(self, account, as_admin=False):
- """
- Updates account informations like passwords or email.
- """
- if as_admin:
- self._bind_as_admin()
- else:
- self._bind_as_user(account.uid, account.password)
-
- attr = [(ldap.MOD_REPLACE, k, v) for k, v in account.attributes.items()]
- dn = self._format_dn([('uid',account.uid),('ou','users')])
- self.connection.modify_s(dn, attr)
- self._alter_passwords(account, as_admin=as_admin)
-
- self._unbind()
-
-
- def delete(self, account, password=None, as_admin=False):
- """
- Deletes an account permanently.
- """
-
- if isinstance(account, basestring):
- raise NotImplementedError()
- else:
- user = account.uid
- password = account.password
-
- if as_admin:
- self._bind_as_admin()
- else:
- self._bind_as_user(user, password)
-
- dn = [self._format_dn([('uid',user),('cn',s),('ou','services')]) for s.id in account.services]
- dn.append(self._format_dn([('uid', user), ('ou','users')]))
-
- for x in dn:
- self.connection.delete_s(x)
-
- self._unbind()
-
- def _format_dn(self, attr, with_base_dn = True):
- if with_base_dn:
- attr.extend(self.base_dn)
-
- dn = ['%s=%s' % (item[0], self._escape(item[1])) for item in attr]
-
- return ','.join(dn)
-
- def _bind(self, dn, password):
- self.connection = ldap.initialize(self.ldap_host)
- self.connection.version = ldap.VERSION3
-
- self.connection.simple_bind_s(dn, password)
-
- def _bind_as_admin(self):
- if self.binded:
- self._unbind()
-
- dn = self._format_dn([('cn', self.admin_user)])
- self._bind(dn, self.admin_pass)
-
- self.admin = True
- self.binded = True
-
- def _bind_as_user(self, username, password):
- if self.binded:
- self._unbind()
-
- dn = self._format_dn([('uid', username),('ou', 'users')])
- self._bind(dn, password)
-
- self.binded = True
- self.admin = True
-
- def _bind_anonymous(self):
- if self.binded:
- self._unbind()
-
- self._bind('','')
-
- self.binded = True
-
-
- def _unbind(self):
- self.connection.unbind_s()
- self.admin = False
- self.binded = False
-
-
- def _alter_passwords(self, account, as_admin=False):
- if account.new_password_root:
- dn = self._format_dn([('uid',account.uid),('ou','users')])
- old, new = account.new_password_root
- if as_admin:
- self.connection.passwd_s(dn, None, new)
- else:
- try:
- self.connection.passwd_s(dn, old, new)
- except ldap.UNWILLING_TO_PERFORM:
- raise InvalidPasswordError()
-
- account.password = new
-
- account.new_password_root = None
-
- for service, passwords in account.new_password_services.items():
- dn = self._format_dn([('uid',account.uid),('cn',service),('ou','services')])
- old, new = passwords
-
- if new != None:
- if service not in account.services:
- attr = [('objectClass', ['top', 'servicePassword']), ('uid', account.uid)]
- self.connection.add_s(dn, attr)
-
- if as_admin:
- self.connection.passwd_s(dn, None, new)
- else:
- self.connection.passwd_s(dn, old, new)
-
- else:
- s = service.lower()
- if s in account.services:
- self.connection.delete_s(dn)
- account.services.remove(s)
-
- account.new_password_services = {}
-
- def _escape(self, s, wildcard=False):
- chars_to_escape = ['\\',',','=','+','<','>',';','"','\'','#','(',')','\0']
-
- if not wildcard:
- chars_to_escape.append('*')
-
- escape = lambda x,y: x.replace(y,'\%02X' % ord(y))
-
- return reduce(escape, chars_to_escape, s)
-
-
-
-
class Account:
"""
An Account represents a complex ldap tree entry for spline users.
@@ -374,10 +71,3 @@ class Account:
raise AttributeError("'%s' object has no attribute '%s'" %
(self.__class__.__name__, name))
-
-
-class NoSuchUserError(ValueError):
- pass
-
-class InvalidPasswordError(ldap.INVALID_CREDENTIALS):
- pass