diff options
author | Jonah BrĂ¼chert <jbb@kaidan.im> | 2024-03-28 06:22:55 +0100 |
---|---|---|
committer | Jonah BrĂ¼chert <jbb@kaidan.im> | 2024-03-28 16:57:21 +0100 |
commit | a3f0c006b5fb5beab1704aad56777dcd98c42efb (patch) | |
tree | 2a2acb62303c25a299aea4030eff55bca7e28650 /accounts/backend | |
parent | d5977387f3e6716cc7594dc872539ccd7f130524 (diff) | |
download | web-a3f0c006b5fb5beab1704aad56777dcd98c42efb.tar.gz web-a3f0c006b5fb5beab1704aad56777dcd98c42efb.tar.bz2 web-a3f0c006b5fb5beab1704aad56777dcd98c42efb.zip |
Add some type annotations
Diffstat (limited to 'accounts/backend')
-rw-r--r-- | accounts/backend/mail/__init__.py | 17 | ||||
-rw-r--r-- | accounts/backend/mail/dummy.py | 24 | ||||
-rw-r--r-- | accounts/backend/mail/sendmail.py | 17 | ||||
-rw-r--r-- | accounts/backend/user/__init__.py | 23 | ||||
-rw-r--r-- | accounts/backend/user/dummy.py | 29 | ||||
-rw-r--r-- | accounts/backend/user/ldap.py | 57 |
6 files changed, 101 insertions, 66 deletions
diff --git a/accounts/backend/mail/__init__.py b/accounts/backend/mail/__init__.py index ecd0d45..f2ec5d5 100644 --- a/accounts/backend/mail/__init__.py +++ b/accounts/backend/mail/__init__.py @@ -1,18 +1,25 @@ # -*- coding: utf-8 -*- -class Backend(object): +from flask.app import Flask +from jinja2 import Template +from jinja2.environment import TemplateModule - def __init__(self, app): + +class Backend(): + app: Flask + + def __init__(self, app: Flask) -> None: self.app = app - def _send(self, recipient, content): + def _send(self, recipient: str, content: TemplateModule): raise NotImplementedError() - def send(self, recipient, template, **kwargs): + def send(self, recipient: str, template, **kwargs): if recipient is None: return - tmpl = self.app.jinja_env.get_or_select_template(template) + tmpl: Template = \ + self.app.jinja_env.get_or_select_template(template) kwargs['recipient'] = recipient module = tmpl.make_module(kwargs) diff --git a/accounts/backend/mail/dummy.py b/accounts/backend/mail/dummy.py index 61e7b75..3de3dbe 100644 --- a/accounts/backend/mail/dummy.py +++ b/accounts/backend/mail/dummy.py @@ -2,7 +2,8 @@ from . import Backend -from accounts.utils import ensure_utf8 +from accounts import AccountsFlask +from jinja2.environment import TemplateModule FANCY_FORMAT = '''\ @@ -23,8 +24,10 @@ From: {sender} class DummyBackend(Backend): + format: str - def __init__(self, app, plain=False, format=None): + def __init__(self, app: AccountsFlask, plain: bool = False, + format: None = None) -> None: super(DummyBackend, self).__init__(app) self.plain = plain @@ -36,13 +39,16 @@ class DummyBackend(Backend): else: self.format = format - def _send(self, recipient, content): - body = content.body() + def _send(self, recipient: str, content: TemplateModule): + print(type(content)) + + # we can't typecheck things defined in templates + body = content.body() # type: ignore if not self.plain: body = "\n| ".join(body.split("\n")) - print((self.format.format( - subject=ensure_utf8(content.subject()), - sender=ensure_utf8(content.sender()), - to=ensure_utf8(recipient), - body=ensure_utf8(body)))) + print(self.format.format( + subject=content.subject(), # type: ignore + sender=content.sender(), # type: ignore + to=recipient, + body=body)) diff --git a/accounts/backend/mail/sendmail.py b/accounts/backend/mail/sendmail.py index 5ac5d93..1fecedc 100644 --- a/accounts/backend/mail/sendmail.py +++ b/accounts/backend/mail/sendmail.py @@ -4,27 +4,30 @@ import subprocess from email.mime.text import MIMEText from email.utils import parseaddr +from jinja2.environment import TemplateModule from . import Backend -class SendmailBackend(Backend): +def safe(s: str): + return s.split('\n', 1)[0] - def _send(self, recipient, content): - safe = lambda s: s.split('\n', 1)[0] - msg = MIMEText(content.body(), _charset='utf-8') - msg['Subject'] = safe(content.subject()) +class SendmailBackend(Backend): + def _send(self, recipient: str, content: TemplateModule): + msg = MIMEText(content.body(), _charset='utf-8') # type: ignore + msg['Subject'] = safe(content.subject()) # type: ignore msg['To'] = safe(recipient) - msg['From'] = safe(content.sender()) + msg['From'] = safe(content.sender()) # type: ignore envelope = [] - _, address = parseaddr(safe(content.sender())) + _, address = parseaddr(safe(content.sender())) # type: ignore if address != '': envelope = ['-f', address] p = subprocess.Popen([self.app.config['SENDMAIL_COMMAND']] + envelope + ['-t'], stdin=subprocess.PIPE) + assert p.stdin p.stdin.write(msg.as_string().encode("utf-8")) p.stdin.close() diff --git a/accounts/backend/user/__init__.py b/accounts/backend/user/__init__.py index e72302a..1504e41 100644 --- a/accounts/backend/user/__init__.py +++ b/accounts/backend/user/__init__.py @@ -1,5 +1,8 @@ # -*- coding: utf-8 -*- +from accounts.app import AccountsFlask +from accounts.models import Account + class NoSuchUserError(ValueError): pass @@ -49,7 +52,9 @@ class Backend(object): >> backend.find_by_uid('test*', wildcard=True) # find with wildcards """ - def __init__(self, app): + app: AccountsFlask + + def __init__(self, app: AccountsFlask) -> None: self.app = app #: Exception type, that is raised if no matching user was found. @@ -60,14 +65,14 @@ class Backend(object): #: could also be raised, if you want to change user information. self.InvalidPasswordError = InvalidPasswordError - def auth(self, username, password): + def auth(self, username: str, password: str): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. """ raise NotImplementedError() - def get_by_uid(self, uid): + def get_by_uid(self, uid: str) -> Account: """ Find a single user by uid. Unlike find_by_uid, don't return a list but raise NoSuchUserError if there is no such user. @@ -80,7 +85,7 @@ class Backend(object): return users[0] - def get_by_mail(self, mail): + def get_by_mail(self, mail: str) -> Account: """ Find a single user by mail. Unlike find_by_mail, don't return a list but raise NoSuchUserError if there is no such user. @@ -93,19 +98,19 @@ class Backend(object): return users[0] - def find_by_uid(self, uid, wildcard=False): + def find_by_uid(self, uid: str, wildcard=False) -> list[Account]: return self.find({'uid': uid}, wildcard) - def find_by_mail(self, mail, wildcard=False): + def find_by_mail(self, mail: str, wildcard=False) -> list[Account]: return self.find({'mail': mail}, wildcard) - def find(self, filters=None, wildcard=False): + def find(self, filters=None, wildcard=False) -> list[Account]: """ Find accounts by a given filter. """ raise NotImplementedError() - def register(self, account): + def register(self, account: Account): """ Register a new user account. @@ -118,7 +123,7 @@ class Backend(object): account.uidNumber = self._get_next_uidNumber() self._store(account) - def update(self, account, as_admin=False): + def update(self, account: Account, as_admin=False): """ Updates account information like passwords or email. """ diff --git a/accounts/backend/user/dummy.py b/accounts/backend/user/dummy.py index 8c54dac..3d0dcca 100644 --- a/accounts/backend/user/dummy.py +++ b/accounts/backend/user/dummy.py @@ -2,9 +2,13 @@ from fnmatch import fnmatch from . import Backend from accounts.models import Account +from accounts import AccountsFlask +from typing import Optional -def _match_filter(account, filters, wildcard): + +def _match_filter(account: Account, filters: Optional[dict[str, str]], + wildcard: bool): if filters is None: return True @@ -30,7 +34,7 @@ class DummyBackend(Backend): users (test and test2) are created. """ - def __init__(self, app): + def __init__(self, app: AccountsFlask) -> None: super(DummyBackend, self).__init__(app) self._storage = { @@ -65,7 +69,7 @@ class DummyBackend(Backend): ) return accounts - def auth(self, username, password): + def auth(self, username: str, password: str): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. @@ -76,30 +80,31 @@ class DummyBackend(Backend): acc.password = password return acc - def find(self, filters=None, wildcard=False): + def find(self, filters: Optional[dict[str, str]] = None, + wildcard=False) -> list[Account]: """ Find accounts by a given filter. """ return [acc for acc in self._get_accounts() if _match_filter(acc, filters, wildcard)] - def _store(self, account): + def _store(self, account: Account) -> None: self._storage[account.uid] = { "uidNumber": account.uidNumber, "mail": account.mail, "password": account.password } - def _verify_password(self, account, password): + def _verify_password(self, account: Account, password: Optional[str]): return password == self._storage[account.uid]["password"] - def _alter_password(self, account, password): + def _alter_password(self, account: Account, password: Optional[str]): self._storage[account.uid]["password"] = password - def update(self, account, as_admin=False): + def update(self, account: Account, as_admin=False): """ Updates account information like passwords or email. """ - stored_account = self.get_by_uid(account.uid) + stored_account: Account = self.get_by_uid(account.uid) if not as_admin: if not self._verify_password(stored_account, account.password): raise self.InvalidPasswordError("Invalid password") @@ -109,16 +114,16 @@ class DummyBackend(Backend): if self._verify_password(stored_account, old): self._alter_password(stored_account, new) - def delete(self, account, as_admin=False): + def delete(self, account: Account, as_admin=False): """ Deletes an account permanently. """ - stored_account = self.get_by_uid(account.uid) + stored_account: Account = self.get_by_uid(account.uid) if not as_admin: if stored_account.password != account.password: raise self.InvalidPasswordError("Invalid password") - self._storage = [acc for acc in self._storage if acc.uid != account.uid] + self._storage.pop(stored_account.uid) def _get_next_uidNumber(self): value = self._next_uidNumber diff --git a/accounts/backend/user/ldap.py b/accounts/backend/user/ldap.py index 64e29d4..217dcba 100644 --- a/accounts/backend/user/ldap.py +++ b/accounts/backend/user/ldap.py @@ -6,12 +6,17 @@ from ldap3.utils.conv import escape_filter_chars from ldap3.utils.dn import escape_rdn from ldap3.core.exceptions import LDAPInvalidCredentialsResult, LDAPException, LDAPOperationResult from ldap3.core.results import RESULT_SUCCESS +from ldap3.abstract.entry import Entry +from ldap3 import Connection from . import Backend, InvalidPasswordError, NoSuchUserError, ShouldNotHappen -from accounts.models import Account +from accounts.models import Account, Service +from accounts import AccountsFlask +from typing import Optional -def _escape(value, wildcard=False): + +def _escape(value: str, wildcard=False): if not isinstance(value, str): value = str(value) if not wildcard: @@ -19,7 +24,9 @@ def _escape(value, wildcard=False): return escape_rdn(value) -def _change_password(conn, dn, passwords, as_admin=False): +def _change_password(conn: Connection, dn, + passwords: tuple[Optional[str], Optional[str]], + as_admin=False): old_password, new_password = passwords if as_admin: conn.extend.standard.modify_password(dn, None, new_password) @@ -32,26 +39,25 @@ def _change_password(conn, dn, passwords, as_admin=False): class LdapBackend(Backend): - - def __init__(self, app): + def __init__(self, app: AccountsFlask) -> None: super(LdapBackend, self).__init__(app) - self.host = self.app.config['LDAP_HOST'] - self.base_dn = self.app.config['LDAP_BASE_DN'] - self.admin_user = self.app.config['LDAP_ADMIN_USER'] - self.admin_pass = self.app.config['LDAP_ADMIN_PASS'] - self.services = self.app.all_services + self.host: str = self.app.config['LDAP_HOST'] + self.base_dn: list[tuple[str, str]] = self.app.config['LDAP_BASE_DN'] + self.admin_user: str = self.app.config['LDAP_ADMIN_USER'] + self.admin_pass: str = self.app.config['LDAP_ADMIN_PASS'] + self.services: list[Service] = self.app.all_services self.admin = False self.binded = False - def auth(self, username, password): + def auth(self, username: str, password: str): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. """ - user_dn = self._format_dn([('uid', username), ('ou', 'users')]) - conn = self._connect(user_dn, password) + user_dn: str = self._format_dn([('uid', username), ('ou', 'users')]) + conn: Connection = self._connect(user_dn, password) uid = None mail = None @@ -59,6 +65,7 @@ class LdapBackend(Backend): services = [] conn.search(user_dn, '(objectClass=*)', attributes=['objectClass', 'uid', 'mail', 'cn', 'uidNumber']) + entry: Entry for entry in conn.entries: if 'splineAccount' in entry.objectClass.values: uid = entry.uid.value @@ -73,7 +80,7 @@ class LdapBackend(Backend): return Account(uid, mail, services, password, uidNumber=uidNumber) - def find(self, filters=None, wildcard=False): + def find(self, filters: Optional[dict[str, str]] = None, wildcard=False): """ Find accounts by a given filter. """ @@ -88,7 +95,7 @@ class LdapBackend(Backend): conn = self._connect() base_dn = self._format_dn([('ou', 'users')]) - accounts = [] + accounts: list[Account] = [] try: conn.search(base_dn, filterstr, search_scope=ldap3.LEVEL, attributes=['uid', 'mail', 'uidNumber']) @@ -100,7 +107,7 @@ class LdapBackend(Backend): return accounts - def _store(self, account): + def _store(self, account: Account): conn = self._connect_as_admin() user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) @@ -117,7 +124,7 @@ class LdapBackend(Backend): account.new_password_root = (None, account.password) self._alter_passwords(conn, account) - def update(self, account, as_admin=False): + def update(self, account: Account, as_admin=False): """ Updates account informations like passwords or email. """ @@ -133,7 +140,7 @@ class LdapBackend(Backend): conn.modify(user_dn, attrs) self._alter_passwords(conn, account, as_admin=as_admin) - def delete(self, account, as_admin=False): + def delete(self, account: Account, as_admin=False): """ Deletes an account permanently. """ @@ -143,14 +150,14 @@ class LdapBackend(Backend): else: conn = self._connect(account.uid, account.password) - dns = [[('cn', service.id), ('uid', account.uid), ('ou', 'users')] + dns = [[('cn', service), ('uid', account.uid), ('ou', 'users')] for service in account.services] + \ [[('uid', account.uid), ('ou', 'users')]] for dn in dns: conn.delete(self._format_dn(dn)) - def _format_dn(self, attr, with_base_dn=True): + def _format_dn(self, attr: list[tuple[str, str]], with_base_dn=True) -> str: if with_base_dn: attr.extend(self.base_dn) @@ -159,7 +166,8 @@ class LdapBackend(Backend): return ','.join(dn) - def _connect(self, user=None, password=None): + def _connect(self, user: Optional[str] = None, + password: Optional[str] = None): server = ldap3.Server(self.host) conn = ldap3.Connection(server, user, password, raise_exceptions=True) @@ -174,7 +182,8 @@ class LdapBackend(Backend): admin_dn = self._format_dn([('cn', self.admin_user)]) return self._connect(admin_dn, self.admin_pass) - def _alter_passwords(self, conn, account, as_admin=False): + def _alter_passwords(self, conn: Connection, account: Account, + as_admin=False): if account.new_password_root: user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) _change_password(conn, user_dn, account.new_password_root, as_admin) @@ -187,7 +196,7 @@ class LdapBackend(Backend): ('ou', 'users')]) _, new = passwords - if new != None: + if new is not None: if service_id not in account.services: attrs = { 'objectClass': ['top', 'servicePassword'], @@ -204,7 +213,7 @@ class LdapBackend(Backend): del account.new_password_services[service] - def _get_last_uidNumber(self, conn): + def _get_last_uidNumber(self, conn: Connection): uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')]) conn.search(uidNumber_dn, '(objectClass=uidNumberMaximum)', attributes=['uidNumber']) |