summaryrefslogtreecommitdiffstats
path: root/accounts/backend/user/ldap.py
diff options
context:
space:
mode:
Diffstat (limited to 'accounts/backend/user/ldap.py')
-rw-r--r--accounts/backend/user/ldap.py57
1 files changed, 33 insertions, 24 deletions
diff --git a/accounts/backend/user/ldap.py b/accounts/backend/user/ldap.py
index 64e29d4..217dcba 100644
--- a/accounts/backend/user/ldap.py
+++ b/accounts/backend/user/ldap.py
@@ -6,12 +6,17 @@ from ldap3.utils.conv import escape_filter_chars
from ldap3.utils.dn import escape_rdn
from ldap3.core.exceptions import LDAPInvalidCredentialsResult, LDAPException, LDAPOperationResult
from ldap3.core.results import RESULT_SUCCESS
+from ldap3.abstract.entry import Entry
+from ldap3 import Connection
from . import Backend, InvalidPasswordError, NoSuchUserError, ShouldNotHappen
-from accounts.models import Account
+from accounts.models import Account, Service
+from accounts import AccountsFlask
+from typing import Optional
-def _escape(value, wildcard=False):
+
+def _escape(value: str, wildcard=False):
if not isinstance(value, str):
value = str(value)
if not wildcard:
@@ -19,7 +24,9 @@ def _escape(value, wildcard=False):
return escape_rdn(value)
-def _change_password(conn, dn, passwords, as_admin=False):
+def _change_password(conn: Connection, dn,
+ passwords: tuple[Optional[str], Optional[str]],
+ as_admin=False):
old_password, new_password = passwords
if as_admin:
conn.extend.standard.modify_password(dn, None, new_password)
@@ -32,26 +39,25 @@ def _change_password(conn, dn, passwords, as_admin=False):
class LdapBackend(Backend):
-
- def __init__(self, app):
+ def __init__(self, app: AccountsFlask) -> None:
super(LdapBackend, self).__init__(app)
- self.host = self.app.config['LDAP_HOST']
- self.base_dn = self.app.config['LDAP_BASE_DN']
- self.admin_user = self.app.config['LDAP_ADMIN_USER']
- self.admin_pass = self.app.config['LDAP_ADMIN_PASS']
- self.services = self.app.all_services
+ self.host: str = self.app.config['LDAP_HOST']
+ self.base_dn: list[tuple[str, str]] = self.app.config['LDAP_BASE_DN']
+ self.admin_user: str = self.app.config['LDAP_ADMIN_USER']
+ self.admin_pass: str = self.app.config['LDAP_ADMIN_PASS']
+ self.services: list[Service] = self.app.all_services
self.admin = False
self.binded = False
- def auth(self, username, password):
+ def auth(self, username: str, password: str):
"""
Tries to authenticate a user with a given password. If the
authentication is successful an Account object will be returned.
"""
- user_dn = self._format_dn([('uid', username), ('ou', 'users')])
- conn = self._connect(user_dn, password)
+ user_dn: str = self._format_dn([('uid', username), ('ou', 'users')])
+ conn: Connection = self._connect(user_dn, password)
uid = None
mail = None
@@ -59,6 +65,7 @@ class LdapBackend(Backend):
services = []
conn.search(user_dn, '(objectClass=*)',
attributes=['objectClass', 'uid', 'mail', 'cn', 'uidNumber'])
+ entry: Entry
for entry in conn.entries:
if 'splineAccount' in entry.objectClass.values:
uid = entry.uid.value
@@ -73,7 +80,7 @@ class LdapBackend(Backend):
return Account(uid, mail, services, password,
uidNumber=uidNumber)
- def find(self, filters=None, wildcard=False):
+ def find(self, filters: Optional[dict[str, str]] = None, wildcard=False):
"""
Find accounts by a given filter.
"""
@@ -88,7 +95,7 @@ class LdapBackend(Backend):
conn = self._connect()
base_dn = self._format_dn([('ou', 'users')])
- accounts = []
+ accounts: list[Account] = []
try:
conn.search(base_dn, filterstr, search_scope=ldap3.LEVEL,
attributes=['uid', 'mail', 'uidNumber'])
@@ -100,7 +107,7 @@ class LdapBackend(Backend):
return accounts
- def _store(self, account):
+ def _store(self, account: Account):
conn = self._connect_as_admin()
user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')])
@@ -117,7 +124,7 @@ class LdapBackend(Backend):
account.new_password_root = (None, account.password)
self._alter_passwords(conn, account)
- def update(self, account, as_admin=False):
+ def update(self, account: Account, as_admin=False):
"""
Updates account informations like passwords or email.
"""
@@ -133,7 +140,7 @@ class LdapBackend(Backend):
conn.modify(user_dn, attrs)
self._alter_passwords(conn, account, as_admin=as_admin)
- def delete(self, account, as_admin=False):
+ def delete(self, account: Account, as_admin=False):
"""
Deletes an account permanently.
"""
@@ -143,14 +150,14 @@ class LdapBackend(Backend):
else:
conn = self._connect(account.uid, account.password)
- dns = [[('cn', service.id), ('uid', account.uid), ('ou', 'users')]
+ dns = [[('cn', service), ('uid', account.uid), ('ou', 'users')]
for service in account.services] + \
[[('uid', account.uid), ('ou', 'users')]]
for dn in dns:
conn.delete(self._format_dn(dn))
- def _format_dn(self, attr, with_base_dn=True):
+ def _format_dn(self, attr: list[tuple[str, str]], with_base_dn=True) -> str:
if with_base_dn:
attr.extend(self.base_dn)
@@ -159,7 +166,8 @@ class LdapBackend(Backend):
return ','.join(dn)
- def _connect(self, user=None, password=None):
+ def _connect(self, user: Optional[str] = None,
+ password: Optional[str] = None):
server = ldap3.Server(self.host)
conn = ldap3.Connection(server, user, password, raise_exceptions=True)
@@ -174,7 +182,8 @@ class LdapBackend(Backend):
admin_dn = self._format_dn([('cn', self.admin_user)])
return self._connect(admin_dn, self.admin_pass)
- def _alter_passwords(self, conn, account, as_admin=False):
+ def _alter_passwords(self, conn: Connection, account: Account,
+ as_admin=False):
if account.new_password_root:
user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')])
_change_password(conn, user_dn, account.new_password_root, as_admin)
@@ -187,7 +196,7 @@ class LdapBackend(Backend):
('ou', 'users')])
_, new = passwords
- if new != None:
+ if new is not None:
if service_id not in account.services:
attrs = {
'objectClass': ['top', 'servicePassword'],
@@ -204,7 +213,7 @@ class LdapBackend(Backend):
del account.new_password_services[service]
- def _get_last_uidNumber(self, conn):
+ def _get_last_uidNumber(self, conn: Connection):
uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')])
conn.search(uidNumber_dn, '(objectClass=uidNumberMaximum)',
attributes=['uidNumber'])