diff options
Diffstat (limited to 'accounts/backend/user')
-rw-r--r-- | accounts/backend/user/__init__.py | 23 | ||||
-rw-r--r-- | accounts/backend/user/dummy.py | 29 | ||||
-rw-r--r-- | accounts/backend/user/ldap.py | 57 |
3 files changed, 64 insertions, 45 deletions
diff --git a/accounts/backend/user/__init__.py b/accounts/backend/user/__init__.py index e72302a..1504e41 100644 --- a/accounts/backend/user/__init__.py +++ b/accounts/backend/user/__init__.py @@ -1,5 +1,8 @@ # -*- coding: utf-8 -*- +from accounts.app import AccountsFlask +from accounts.models import Account + class NoSuchUserError(ValueError): pass @@ -49,7 +52,9 @@ class Backend(object): >> backend.find_by_uid('test*', wildcard=True) # find with wildcards """ - def __init__(self, app): + app: AccountsFlask + + def __init__(self, app: AccountsFlask) -> None: self.app = app #: Exception type, that is raised if no matching user was found. @@ -60,14 +65,14 @@ class Backend(object): #: could also be raised, if you want to change user information. self.InvalidPasswordError = InvalidPasswordError - def auth(self, username, password): + def auth(self, username: str, password: str): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. """ raise NotImplementedError() - def get_by_uid(self, uid): + def get_by_uid(self, uid: str) -> Account: """ Find a single user by uid. Unlike find_by_uid, don't return a list but raise NoSuchUserError if there is no such user. @@ -80,7 +85,7 @@ class Backend(object): return users[0] - def get_by_mail(self, mail): + def get_by_mail(self, mail: str) -> Account: """ Find a single user by mail. Unlike find_by_mail, don't return a list but raise NoSuchUserError if there is no such user. @@ -93,19 +98,19 @@ class Backend(object): return users[0] - def find_by_uid(self, uid, wildcard=False): + def find_by_uid(self, uid: str, wildcard=False) -> list[Account]: return self.find({'uid': uid}, wildcard) - def find_by_mail(self, mail, wildcard=False): + def find_by_mail(self, mail: str, wildcard=False) -> list[Account]: return self.find({'mail': mail}, wildcard) - def find(self, filters=None, wildcard=False): + def find(self, filters=None, wildcard=False) -> list[Account]: """ Find accounts by a given filter. """ raise NotImplementedError() - def register(self, account): + def register(self, account: Account): """ Register a new user account. @@ -118,7 +123,7 @@ class Backend(object): account.uidNumber = self._get_next_uidNumber() self._store(account) - def update(self, account, as_admin=False): + def update(self, account: Account, as_admin=False): """ Updates account information like passwords or email. """ diff --git a/accounts/backend/user/dummy.py b/accounts/backend/user/dummy.py index 8c54dac..3d0dcca 100644 --- a/accounts/backend/user/dummy.py +++ b/accounts/backend/user/dummy.py @@ -2,9 +2,13 @@ from fnmatch import fnmatch from . import Backend from accounts.models import Account +from accounts import AccountsFlask +from typing import Optional -def _match_filter(account, filters, wildcard): + +def _match_filter(account: Account, filters: Optional[dict[str, str]], + wildcard: bool): if filters is None: return True @@ -30,7 +34,7 @@ class DummyBackend(Backend): users (test and test2) are created. """ - def __init__(self, app): + def __init__(self, app: AccountsFlask) -> None: super(DummyBackend, self).__init__(app) self._storage = { @@ -65,7 +69,7 @@ class DummyBackend(Backend): ) return accounts - def auth(self, username, password): + def auth(self, username: str, password: str): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. @@ -76,30 +80,31 @@ class DummyBackend(Backend): acc.password = password return acc - def find(self, filters=None, wildcard=False): + def find(self, filters: Optional[dict[str, str]] = None, + wildcard=False) -> list[Account]: """ Find accounts by a given filter. """ return [acc for acc in self._get_accounts() if _match_filter(acc, filters, wildcard)] - def _store(self, account): + def _store(self, account: Account) -> None: self._storage[account.uid] = { "uidNumber": account.uidNumber, "mail": account.mail, "password": account.password } - def _verify_password(self, account, password): + def _verify_password(self, account: Account, password: Optional[str]): return password == self._storage[account.uid]["password"] - def _alter_password(self, account, password): + def _alter_password(self, account: Account, password: Optional[str]): self._storage[account.uid]["password"] = password - def update(self, account, as_admin=False): + def update(self, account: Account, as_admin=False): """ Updates account information like passwords or email. """ - stored_account = self.get_by_uid(account.uid) + stored_account: Account = self.get_by_uid(account.uid) if not as_admin: if not self._verify_password(stored_account, account.password): raise self.InvalidPasswordError("Invalid password") @@ -109,16 +114,16 @@ class DummyBackend(Backend): if self._verify_password(stored_account, old): self._alter_password(stored_account, new) - def delete(self, account, as_admin=False): + def delete(self, account: Account, as_admin=False): """ Deletes an account permanently. """ - stored_account = self.get_by_uid(account.uid) + stored_account: Account = self.get_by_uid(account.uid) if not as_admin: if stored_account.password != account.password: raise self.InvalidPasswordError("Invalid password") - self._storage = [acc for acc in self._storage if acc.uid != account.uid] + self._storage.pop(stored_account.uid) def _get_next_uidNumber(self): value = self._next_uidNumber diff --git a/accounts/backend/user/ldap.py b/accounts/backend/user/ldap.py index 64e29d4..217dcba 100644 --- a/accounts/backend/user/ldap.py +++ b/accounts/backend/user/ldap.py @@ -6,12 +6,17 @@ from ldap3.utils.conv import escape_filter_chars from ldap3.utils.dn import escape_rdn from ldap3.core.exceptions import LDAPInvalidCredentialsResult, LDAPException, LDAPOperationResult from ldap3.core.results import RESULT_SUCCESS +from ldap3.abstract.entry import Entry +from ldap3 import Connection from . import Backend, InvalidPasswordError, NoSuchUserError, ShouldNotHappen -from accounts.models import Account +from accounts.models import Account, Service +from accounts import AccountsFlask +from typing import Optional -def _escape(value, wildcard=False): + +def _escape(value: str, wildcard=False): if not isinstance(value, str): value = str(value) if not wildcard: @@ -19,7 +24,9 @@ def _escape(value, wildcard=False): return escape_rdn(value) -def _change_password(conn, dn, passwords, as_admin=False): +def _change_password(conn: Connection, dn, + passwords: tuple[Optional[str], Optional[str]], + as_admin=False): old_password, new_password = passwords if as_admin: conn.extend.standard.modify_password(dn, None, new_password) @@ -32,26 +39,25 @@ def _change_password(conn, dn, passwords, as_admin=False): class LdapBackend(Backend): - - def __init__(self, app): + def __init__(self, app: AccountsFlask) -> None: super(LdapBackend, self).__init__(app) - self.host = self.app.config['LDAP_HOST'] - self.base_dn = self.app.config['LDAP_BASE_DN'] - self.admin_user = self.app.config['LDAP_ADMIN_USER'] - self.admin_pass = self.app.config['LDAP_ADMIN_PASS'] - self.services = self.app.all_services + self.host: str = self.app.config['LDAP_HOST'] + self.base_dn: list[tuple[str, str]] = self.app.config['LDAP_BASE_DN'] + self.admin_user: str = self.app.config['LDAP_ADMIN_USER'] + self.admin_pass: str = self.app.config['LDAP_ADMIN_PASS'] + self.services: list[Service] = self.app.all_services self.admin = False self.binded = False - def auth(self, username, password): + def auth(self, username: str, password: str): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. """ - user_dn = self._format_dn([('uid', username), ('ou', 'users')]) - conn = self._connect(user_dn, password) + user_dn: str = self._format_dn([('uid', username), ('ou', 'users')]) + conn: Connection = self._connect(user_dn, password) uid = None mail = None @@ -59,6 +65,7 @@ class LdapBackend(Backend): services = [] conn.search(user_dn, '(objectClass=*)', attributes=['objectClass', 'uid', 'mail', 'cn', 'uidNumber']) + entry: Entry for entry in conn.entries: if 'splineAccount' in entry.objectClass.values: uid = entry.uid.value @@ -73,7 +80,7 @@ class LdapBackend(Backend): return Account(uid, mail, services, password, uidNumber=uidNumber) - def find(self, filters=None, wildcard=False): + def find(self, filters: Optional[dict[str, str]] = None, wildcard=False): """ Find accounts by a given filter. """ @@ -88,7 +95,7 @@ class LdapBackend(Backend): conn = self._connect() base_dn = self._format_dn([('ou', 'users')]) - accounts = [] + accounts: list[Account] = [] try: conn.search(base_dn, filterstr, search_scope=ldap3.LEVEL, attributes=['uid', 'mail', 'uidNumber']) @@ -100,7 +107,7 @@ class LdapBackend(Backend): return accounts - def _store(self, account): + def _store(self, account: Account): conn = self._connect_as_admin() user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) @@ -117,7 +124,7 @@ class LdapBackend(Backend): account.new_password_root = (None, account.password) self._alter_passwords(conn, account) - def update(self, account, as_admin=False): + def update(self, account: Account, as_admin=False): """ Updates account informations like passwords or email. """ @@ -133,7 +140,7 @@ class LdapBackend(Backend): conn.modify(user_dn, attrs) self._alter_passwords(conn, account, as_admin=as_admin) - def delete(self, account, as_admin=False): + def delete(self, account: Account, as_admin=False): """ Deletes an account permanently. """ @@ -143,14 +150,14 @@ class LdapBackend(Backend): else: conn = self._connect(account.uid, account.password) - dns = [[('cn', service.id), ('uid', account.uid), ('ou', 'users')] + dns = [[('cn', service), ('uid', account.uid), ('ou', 'users')] for service in account.services] + \ [[('uid', account.uid), ('ou', 'users')]] for dn in dns: conn.delete(self._format_dn(dn)) - def _format_dn(self, attr, with_base_dn=True): + def _format_dn(self, attr: list[tuple[str, str]], with_base_dn=True) -> str: if with_base_dn: attr.extend(self.base_dn) @@ -159,7 +166,8 @@ class LdapBackend(Backend): return ','.join(dn) - def _connect(self, user=None, password=None): + def _connect(self, user: Optional[str] = None, + password: Optional[str] = None): server = ldap3.Server(self.host) conn = ldap3.Connection(server, user, password, raise_exceptions=True) @@ -174,7 +182,8 @@ class LdapBackend(Backend): admin_dn = self._format_dn([('cn', self.admin_user)]) return self._connect(admin_dn, self.admin_pass) - def _alter_passwords(self, conn, account, as_admin=False): + def _alter_passwords(self, conn: Connection, account: Account, + as_admin=False): if account.new_password_root: user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) _change_password(conn, user_dn, account.new_password_root, as_admin) @@ -187,7 +196,7 @@ class LdapBackend(Backend): ('ou', 'users')]) _, new = passwords - if new != None: + if new is not None: if service_id not in account.services: attrs = { 'objectClass': ['top', 'servicePassword'], @@ -204,7 +213,7 @@ class LdapBackend(Backend): del account.new_password_services[service] - def _get_last_uidNumber(self, conn): + def _get_last_uidNumber(self, conn: Connection): uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')]) conn.search(uidNumber_dn, '(objectClass=uidNumberMaximum)', attributes=['uidNumber']) |