diff options
Diffstat (limited to 'accounts/backend/user/ldap.py')
-rw-r--r-- | accounts/backend/user/ldap.py | 174 |
1 files changed, 106 insertions, 68 deletions
diff --git a/accounts/backend/user/ldap.py b/accounts/backend/user/ldap.py index 99080a4..a1ed904 100644 --- a/accounts/backend/user/ldap.py +++ b/accounts/backend/user/ldap.py @@ -4,7 +4,11 @@ import ldap3 from ldap3.utils.conv import escape_filter_chars from ldap3.utils.dn import escape_rdn -from ldap3.core.exceptions import LDAPInvalidCredentialsResult, LDAPException, LDAPOperationResult +from ldap3.core.exceptions import ( + LDAPInvalidCredentialsResult, + LDAPException, + LDAPOperationResult, +) from ldap3.core.results import RESULT_SUCCESS from ldap3.abstract.entry import Entry from ldap3 import Connection @@ -24,28 +28,32 @@ def _escape(value: str, wildcard=False): return escape_rdn(value) -def _change_password(conn: Connection, dn, - passwords: tuple[Optional[str], Optional[str]], - as_admin=False): +def _change_password( + conn: Connection, + dn, + passwords: tuple[Optional[str], Optional[str]], + as_admin=False, +): old_password, new_password = passwords if as_admin: conn.extend.standard.modify_password(dn, None, new_password) else: try: - conn.extend.standard.modify_password(dn, old_password, - new_password) + conn.extend.standard.modify_password( + dn, old_password, new_password + ) except LDAPException: - raise InvalidPasswordError('Invalid password') + raise InvalidPasswordError("Invalid password") class LdapBackend(Backend): def __init__(self, app: AccountsFlask) -> None: super(LdapBackend, self).__init__(app) - self.host: str = self.app.config['LDAP_HOST'] - self.base_dn: list[tuple[str, str]] = self.app.config['LDAP_BASE_DN'] - self.admin_user: str = self.app.config['LDAP_ADMIN_USER'] - self.admin_pass: str = self.app.config['LDAP_ADMIN_PASS'] + self.host: str = self.app.config["LDAP_HOST"] + self.base_dn: list[tuple[str, str]] = self.app.config["LDAP_BASE_DN"] + self.admin_user: str = self.app.config["LDAP_ADMIN_USER"] + self.admin_pass: str = self.app.config["LDAP_ADMIN_PASS"] self.services: list[Service] = self.app.all_services self.admin = False @@ -56,29 +64,31 @@ class LdapBackend(Backend): Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. """ - user_dn: str = self._format_dn([('uid', username), ('ou', 'users')]) + user_dn: str = self._format_dn([("uid", username), ("ou", "users")]) conn: Connection = self._connect(user_dn, password) uid = None mail = None uidNumber = None services = [] - conn.search(user_dn, '(objectClass=*)', - attributes=['objectClass', 'uid', 'mail', 'cn', 'uidNumber']) + conn.search( + user_dn, + "(objectClass=*)", + attributes=["objectClass", "uid", "mail", "cn", "uidNumber"], + ) entry: Entry for entry in conn.entries: - if 'splineAccount' in entry.objectClass.values: + if "splineAccount" in entry.objectClass.values: uid = entry.uid.value mail = entry.mail.value uidNumber = entry.uidNumber.value - elif 'servicePassword' in entry.objectClass.value: + elif "servicePassword" in entry.objectClass.value: services.append(entry.cn.value) if uid is None or mail is None or uidNumber is None: raise NoSuchUserError("User not found") - return Account(uid, mail, services, password, - uidNumber=uidNumber) + return Account(uid, mail, services, password, uidNumber=uidNumber) def find(self, filters: Optional[dict[str, str]] = None, wildcard=False): """ @@ -87,21 +97,32 @@ class LdapBackend(Backend): if filters is None: filters = dict() - filters['objectClass'] = 'splineAccount' - filter_as_list = ['(%s=%s)' % (attr, _escape(value, wildcard)) - for attr, value in list(filters.items())] - filterstr = '(&%s)' % ''.join(filter_as_list) + filters["objectClass"] = "splineAccount" + filter_as_list = [ + "(%s=%s)" % (attr, _escape(value, wildcard)) + for attr, value in list(filters.items()) + ] + filterstr = "(&%s)" % "".join(filter_as_list) conn = self._connect() - base_dn = self._format_dn([('ou', 'users')]) + base_dn = self._format_dn([("ou", "users")]) accounts: list[Account] = [] try: - conn.search(base_dn, filterstr, search_scope=ldap3.LEVEL, - attributes=['uid', 'mail', 'uidNumber']) + conn.search( + base_dn, + filterstr, + search_scope=ldap3.LEVEL, + attributes=["uid", "mail", "uidNumber"], + ) for entry in conn.entries: - accounts.append(Account(entry.uid.value, entry.mail.value, - uidNumber=entry.uidNumber.value)) + accounts.append( + Account( + entry.uid.value, + entry.mail.value, + uidNumber=entry.uidNumber.value, + ) + ) except LDAPException: pass @@ -110,14 +131,14 @@ class LdapBackend(Backend): def _store(self, account: Account): conn = self._connect_as_admin() - user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) + user_dn = self._format_dn([("uid", account.uid), ("ou", "users")]) attrs = { - 'objectClass': ['top', 'inetOrgPerson', 'splineAccount'], - 'uid': _escape(account.uid), - 'sn': 'n/a', - 'cn': _escape(account.uid), - 'mail': _escape(account.mail), - 'uidNumber': _escape(account.uidNumber), + "objectClass": ["top", "inetOrgPerson", "splineAccount"], + "uid": _escape(account.uid), + "sn": "n/a", + "cn": _escape(account.uid), + "mail": _escape(account.mail), + "uidNumber": _escape(account.uidNumber), } conn.add(user_dn, attributes=attrs) @@ -129,14 +150,16 @@ class LdapBackend(Backend): Updates account informations like passwords or email. """ conn = None - user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) + user_dn = self._format_dn([("uid", account.uid), ("ou", "users")]) if as_admin: conn = self._connect_as_admin() else: conn = self._connect(user_dn, account.password) - attrs = {key: [(ldap3.MODIFY_REPLACE, [value])] - for key, value in list(account.attributes.items())} + attrs = { + key: [(ldap3.MODIFY_REPLACE, [value])] + for key, value in list(account.attributes.items()) + } conn.modify(user_dn, attrs) self._alter_passwords(conn, account, as_admin=as_admin) @@ -150,57 +173,64 @@ class LdapBackend(Backend): else: conn = self._connect(account.uid, account.password) - dns = [[('cn', service), ('uid', account.uid), ('ou', 'users')] - for service in account.services] + \ - [[('uid', account.uid), ('ou', 'users')]] + dns = [ + [("cn", service), ("uid", account.uid), ("ou", "users")] + for service in account.services + ] + [[("uid", account.uid), ("ou", "users")]] for dn in dns: conn.delete(self._format_dn(dn)) - def _format_dn(self, attr: list[tuple[str, str]], with_base_dn=True) -> str: + def _format_dn( + self, attr: list[tuple[str, str]], with_base_dn=True + ) -> str: if with_base_dn: attr.extend(self.base_dn) - dn = ['%s=%s' % (item[0], _escape(item[1])) - for item in attr] + dn = ["%s=%s" % (item[0], _escape(item[1])) for item in attr] - return ','.join(dn) + return ",".join(dn) - def _connect(self, user: Optional[str] = None, - password: Optional[str] = None) -> Connection: + def _connect( + self, user: Optional[str] = None, password: Optional[str] = None + ) -> Connection: server = ldap3.Server(self.host) conn = ldap3.Connection(server, user, password, raise_exceptions=True) try: conn.bind() except LDAPInvalidCredentialsResult: - raise InvalidPasswordError('Invalid password') + raise InvalidPasswordError("Invalid password") return conn def _connect_as_admin(self) -> Connection: - admin_dn = self._format_dn([('cn', self.admin_user)]) + admin_dn = self._format_dn([("cn", self.admin_user)]) return self._connect(admin_dn, self.admin_pass) - def _alter_passwords(self, conn: Connection, account: Account, - as_admin=False): + def _alter_passwords( + self, conn: Connection, account: Account, as_admin=False + ): if account.new_password_root: - user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) - _change_password(conn, user_dn, account.new_password_root, as_admin) + user_dn = self._format_dn([("uid", account.uid), ("ou", "users")]) + _change_password( + conn, user_dn, account.new_password_root, as_admin + ) _, account.password = account.new_password_root account.new_password_root = None for service, passwords in list(account.new_password_services.items()): service_id = service.lower() - service_dn = self._format_dn([('cn', service_id), ('uid', account.uid), - ('ou', 'users')]) + service_dn = self._format_dn( + [("cn", service_id), ("uid", account.uid), ("ou", "users")] + ) _, new = passwords if new is not None: if service_id not in account.services: attrs = { - 'objectClass': ['top', 'servicePassword'], - 'cn': _escape(service_id), + "objectClass": ["top", "servicePassword"], + "cn": _escape(service_id), } conn.add(service_dn, attributes=attrs) account.services.append(service_id) @@ -214,30 +244,38 @@ class LdapBackend(Backend): del account.new_password_services[service] def _get_last_uidNumber(self, conn: Connection): - uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')]) - conn.search(uidNumber_dn, '(objectClass=uidNumberMaximum)', - attributes=['uidNumber']) + uidNumber_dn = self._format_dn([("cn", "uidMax"), ("ou", "other")]) + conn.search( + uidNumber_dn, + "(objectClass=uidNumberMaximum)", + attributes=["uidNumber"], + ) for entry in conn.entries: return int(entry.uidNumber.value) - raise ShouldNotHappen('Last uidNumber not found.') + raise ShouldNotHappen("Last uidNumber not found.") def _get_next_uidNumber(self): conn = self._connect_as_admin() - uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')]) + uidNumber_dn = self._format_dn([("cn", "uidMax"), ("ou", "other")]) uidNumber = self._get_last_uidNumber(conn) # Try to acquire next uidNumber for i in [0, 1, 2, 3, 4, 5]: try: - conn.modify(uidNumber_dn, {'uidNumber': [ - (ldap3.MODIFY_DELETE, ['%d' % (uidNumber + i)]), - (ldap3.MODIFY_ADD, ['%d' % (uidNumber + i + 1)]), - ]}) - - if conn.result['result'] == RESULT_SUCCESS: + conn.modify( + uidNumber_dn, + { + "uidNumber": [ + (ldap3.MODIFY_DELETE, ["%d" % (uidNumber + i)]), + (ldap3.MODIFY_ADD, ["%d" % (uidNumber + i + 1)]), + ] + }, + ) + + if conn.result["result"] == RESULT_SUCCESS: return uidNumber + i + 1 except LDAPOperationResult: pass - raise ShouldNotHappen('Unable to get next uidNumber, try again.') + raise ShouldNotHappen("Unable to get next uidNumber, try again.") |