summaryrefslogtreecommitdiffstats
path: root/accounts/backend
diff options
context:
space:
mode:
authorJonah BrĂ¼chert <jbb@kaidan.im>2024-03-28 06:22:55 +0100
committerJonah BrĂ¼chert <jbb@kaidan.im>2024-03-28 16:57:21 +0100
commita3f0c006b5fb5beab1704aad56777dcd98c42efb (patch)
tree2a2acb62303c25a299aea4030eff55bca7e28650 /accounts/backend
parentd5977387f3e6716cc7594dc872539ccd7f130524 (diff)
downloadweb-a3f0c006b5fb5beab1704aad56777dcd98c42efb.tar.gz
web-a3f0c006b5fb5beab1704aad56777dcd98c42efb.tar.bz2
web-a3f0c006b5fb5beab1704aad56777dcd98c42efb.zip
Add some type annotations
Diffstat (limited to 'accounts/backend')
-rw-r--r--accounts/backend/mail/__init__.py17
-rw-r--r--accounts/backend/mail/dummy.py24
-rw-r--r--accounts/backend/mail/sendmail.py17
-rw-r--r--accounts/backend/user/__init__.py23
-rw-r--r--accounts/backend/user/dummy.py29
-rw-r--r--accounts/backend/user/ldap.py57
6 files changed, 101 insertions, 66 deletions
diff --git a/accounts/backend/mail/__init__.py b/accounts/backend/mail/__init__.py
index ecd0d45..f2ec5d5 100644
--- a/accounts/backend/mail/__init__.py
+++ b/accounts/backend/mail/__init__.py
@@ -1,18 +1,25 @@
# -*- coding: utf-8 -*-
-class Backend(object):
+from flask.app import Flask
+from jinja2 import Template
+from jinja2.environment import TemplateModule
- def __init__(self, app):
+
+class Backend():
+ app: Flask
+
+ def __init__(self, app: Flask) -> None:
self.app = app
- def _send(self, recipient, content):
+ def _send(self, recipient: str, content: TemplateModule):
raise NotImplementedError()
- def send(self, recipient, template, **kwargs):
+ def send(self, recipient: str, template, **kwargs):
if recipient is None:
return
- tmpl = self.app.jinja_env.get_or_select_template(template)
+ tmpl: Template = \
+ self.app.jinja_env.get_or_select_template(template)
kwargs['recipient'] = recipient
module = tmpl.make_module(kwargs)
diff --git a/accounts/backend/mail/dummy.py b/accounts/backend/mail/dummy.py
index 61e7b75..3de3dbe 100644
--- a/accounts/backend/mail/dummy.py
+++ b/accounts/backend/mail/dummy.py
@@ -2,7 +2,8 @@
from . import Backend
-from accounts.utils import ensure_utf8
+from accounts import AccountsFlask
+from jinja2.environment import TemplateModule
FANCY_FORMAT = '''\
@@ -23,8 +24,10 @@ From: {sender}
class DummyBackend(Backend):
+ format: str
- def __init__(self, app, plain=False, format=None):
+ def __init__(self, app: AccountsFlask, plain: bool = False,
+ format: None = None) -> None:
super(DummyBackend, self).__init__(app)
self.plain = plain
@@ -36,13 +39,16 @@ class DummyBackend(Backend):
else:
self.format = format
- def _send(self, recipient, content):
- body = content.body()
+ def _send(self, recipient: str, content: TemplateModule):
+ print(type(content))
+
+ # we can't typecheck things defined in templates
+ body = content.body() # type: ignore
if not self.plain:
body = "\n| ".join(body.split("\n"))
- print((self.format.format(
- subject=ensure_utf8(content.subject()),
- sender=ensure_utf8(content.sender()),
- to=ensure_utf8(recipient),
- body=ensure_utf8(body))))
+ print(self.format.format(
+ subject=content.subject(), # type: ignore
+ sender=content.sender(), # type: ignore
+ to=recipient,
+ body=body))
diff --git a/accounts/backend/mail/sendmail.py b/accounts/backend/mail/sendmail.py
index 5ac5d93..1fecedc 100644
--- a/accounts/backend/mail/sendmail.py
+++ b/accounts/backend/mail/sendmail.py
@@ -4,27 +4,30 @@
import subprocess
from email.mime.text import MIMEText
from email.utils import parseaddr
+from jinja2.environment import TemplateModule
from . import Backend
-class SendmailBackend(Backend):
+def safe(s: str):
+ return s.split('\n', 1)[0]
- def _send(self, recipient, content):
- safe = lambda s: s.split('\n', 1)[0]
- msg = MIMEText(content.body(), _charset='utf-8')
- msg['Subject'] = safe(content.subject())
+class SendmailBackend(Backend):
+ def _send(self, recipient: str, content: TemplateModule):
+ msg = MIMEText(content.body(), _charset='utf-8') # type: ignore
+ msg['Subject'] = safe(content.subject()) # type: ignore
msg['To'] = safe(recipient)
- msg['From'] = safe(content.sender())
+ msg['From'] = safe(content.sender()) # type: ignore
envelope = []
- _, address = parseaddr(safe(content.sender()))
+ _, address = parseaddr(safe(content.sender())) # type: ignore
if address != '':
envelope = ['-f', address]
p = subprocess.Popen([self.app.config['SENDMAIL_COMMAND']] +
envelope + ['-t'], stdin=subprocess.PIPE)
+ assert p.stdin
p.stdin.write(msg.as_string().encode("utf-8"))
p.stdin.close()
diff --git a/accounts/backend/user/__init__.py b/accounts/backend/user/__init__.py
index e72302a..1504e41 100644
--- a/accounts/backend/user/__init__.py
+++ b/accounts/backend/user/__init__.py
@@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
+from accounts.app import AccountsFlask
+from accounts.models import Account
+
class NoSuchUserError(ValueError):
pass
@@ -49,7 +52,9 @@ class Backend(object):
>> backend.find_by_uid('test*', wildcard=True) # find with wildcards
"""
- def __init__(self, app):
+ app: AccountsFlask
+
+ def __init__(self, app: AccountsFlask) -> None:
self.app = app
#: Exception type, that is raised if no matching user was found.
@@ -60,14 +65,14 @@ class Backend(object):
#: could also be raised, if you want to change user information.
self.InvalidPasswordError = InvalidPasswordError
- def auth(self, username, password):
+ def auth(self, username: str, password: str):
"""
Tries to authenticate a user with a given password. If the
authentication is successful an Account object will be returned.
"""
raise NotImplementedError()
- def get_by_uid(self, uid):
+ def get_by_uid(self, uid: str) -> Account:
"""
Find a single user by uid. Unlike find_by_uid, don't return a list but
raise NoSuchUserError if there is no such user.
@@ -80,7 +85,7 @@ class Backend(object):
return users[0]
- def get_by_mail(self, mail):
+ def get_by_mail(self, mail: str) -> Account:
"""
Find a single user by mail. Unlike find_by_mail, don't return a list but
raise NoSuchUserError if there is no such user.
@@ -93,19 +98,19 @@ class Backend(object):
return users[0]
- def find_by_uid(self, uid, wildcard=False):
+ def find_by_uid(self, uid: str, wildcard=False) -> list[Account]:
return self.find({'uid': uid}, wildcard)
- def find_by_mail(self, mail, wildcard=False):
+ def find_by_mail(self, mail: str, wildcard=False) -> list[Account]:
return self.find({'mail': mail}, wildcard)
- def find(self, filters=None, wildcard=False):
+ def find(self, filters=None, wildcard=False) -> list[Account]:
"""
Find accounts by a given filter.
"""
raise NotImplementedError()
- def register(self, account):
+ def register(self, account: Account):
"""
Register a new user account.
@@ -118,7 +123,7 @@ class Backend(object):
account.uidNumber = self._get_next_uidNumber()
self._store(account)
- def update(self, account, as_admin=False):
+ def update(self, account: Account, as_admin=False):
"""
Updates account information like passwords or email.
"""
diff --git a/accounts/backend/user/dummy.py b/accounts/backend/user/dummy.py
index 8c54dac..3d0dcca 100644
--- a/accounts/backend/user/dummy.py
+++ b/accounts/backend/user/dummy.py
@@ -2,9 +2,13 @@ from fnmatch import fnmatch
from . import Backend
from accounts.models import Account
+from accounts import AccountsFlask
+from typing import Optional
-def _match_filter(account, filters, wildcard):
+
+def _match_filter(account: Account, filters: Optional[dict[str, str]],
+ wildcard: bool):
if filters is None:
return True
@@ -30,7 +34,7 @@ class DummyBackend(Backend):
users (test and test2) are created.
"""
- def __init__(self, app):
+ def __init__(self, app: AccountsFlask) -> None:
super(DummyBackend, self).__init__(app)
self._storage = {
@@ -65,7 +69,7 @@ class DummyBackend(Backend):
)
return accounts
- def auth(self, username, password):
+ def auth(self, username: str, password: str):
"""
Tries to authenticate a user with a given password. If the
authentication is successful an Account object will be returned.
@@ -76,30 +80,31 @@ class DummyBackend(Backend):
acc.password = password
return acc
- def find(self, filters=None, wildcard=False):
+ def find(self, filters: Optional[dict[str, str]] = None,
+ wildcard=False) -> list[Account]:
"""
Find accounts by a given filter.
"""
return [acc for acc in self._get_accounts() if _match_filter(acc, filters, wildcard)]
- def _store(self, account):
+ def _store(self, account: Account) -> None:
self._storage[account.uid] = {
"uidNumber": account.uidNumber,
"mail": account.mail,
"password": account.password
}
- def _verify_password(self, account, password):
+ def _verify_password(self, account: Account, password: Optional[str]):
return password == self._storage[account.uid]["password"]
- def _alter_password(self, account, password):
+ def _alter_password(self, account: Account, password: Optional[str]):
self._storage[account.uid]["password"] = password
- def update(self, account, as_admin=False):
+ def update(self, account: Account, as_admin=False):
"""
Updates account information like passwords or email.
"""
- stored_account = self.get_by_uid(account.uid)
+ stored_account: Account = self.get_by_uid(account.uid)
if not as_admin:
if not self._verify_password(stored_account, account.password):
raise self.InvalidPasswordError("Invalid password")
@@ -109,16 +114,16 @@ class DummyBackend(Backend):
if self._verify_password(stored_account, old):
self._alter_password(stored_account, new)
- def delete(self, account, as_admin=False):
+ def delete(self, account: Account, as_admin=False):
"""
Deletes an account permanently.
"""
- stored_account = self.get_by_uid(account.uid)
+ stored_account: Account = self.get_by_uid(account.uid)
if not as_admin:
if stored_account.password != account.password:
raise self.InvalidPasswordError("Invalid password")
- self._storage = [acc for acc in self._storage if acc.uid != account.uid]
+ self._storage.pop(stored_account.uid)
def _get_next_uidNumber(self):
value = self._next_uidNumber
diff --git a/accounts/backend/user/ldap.py b/accounts/backend/user/ldap.py
index 64e29d4..217dcba 100644
--- a/accounts/backend/user/ldap.py
+++ b/accounts/backend/user/ldap.py
@@ -6,12 +6,17 @@ from ldap3.utils.conv import escape_filter_chars
from ldap3.utils.dn import escape_rdn
from ldap3.core.exceptions import LDAPInvalidCredentialsResult, LDAPException, LDAPOperationResult
from ldap3.core.results import RESULT_SUCCESS
+from ldap3.abstract.entry import Entry
+from ldap3 import Connection
from . import Backend, InvalidPasswordError, NoSuchUserError, ShouldNotHappen
-from accounts.models import Account
+from accounts.models import Account, Service
+from accounts import AccountsFlask
+from typing import Optional
-def _escape(value, wildcard=False):
+
+def _escape(value: str, wildcard=False):
if not isinstance(value, str):
value = str(value)
if not wildcard:
@@ -19,7 +24,9 @@ def _escape(value, wildcard=False):
return escape_rdn(value)
-def _change_password(conn, dn, passwords, as_admin=False):
+def _change_password(conn: Connection, dn,
+ passwords: tuple[Optional[str], Optional[str]],
+ as_admin=False):
old_password, new_password = passwords
if as_admin:
conn.extend.standard.modify_password(dn, None, new_password)
@@ -32,26 +39,25 @@ def _change_password(conn, dn, passwords, as_admin=False):
class LdapBackend(Backend):
-
- def __init__(self, app):
+ def __init__(self, app: AccountsFlask) -> None:
super(LdapBackend, self).__init__(app)
- self.host = self.app.config['LDAP_HOST']
- self.base_dn = self.app.config['LDAP_BASE_DN']
- self.admin_user = self.app.config['LDAP_ADMIN_USER']
- self.admin_pass = self.app.config['LDAP_ADMIN_PASS']
- self.services = self.app.all_services
+ self.host: str = self.app.config['LDAP_HOST']
+ self.base_dn: list[tuple[str, str]] = self.app.config['LDAP_BASE_DN']
+ self.admin_user: str = self.app.config['LDAP_ADMIN_USER']
+ self.admin_pass: str = self.app.config['LDAP_ADMIN_PASS']
+ self.services: list[Service] = self.app.all_services
self.admin = False
self.binded = False
- def auth(self, username, password):
+ def auth(self, username: str, password: str):
"""
Tries to authenticate a user with a given password. If the
authentication is successful an Account object will be returned.
"""
- user_dn = self._format_dn([('uid', username), ('ou', 'users')])
- conn = self._connect(user_dn, password)
+ user_dn: str = self._format_dn([('uid', username), ('ou', 'users')])
+ conn: Connection = self._connect(user_dn, password)
uid = None
mail = None
@@ -59,6 +65,7 @@ class LdapBackend(Backend):
services = []
conn.search(user_dn, '(objectClass=*)',
attributes=['objectClass', 'uid', 'mail', 'cn', 'uidNumber'])
+ entry: Entry
for entry in conn.entries:
if 'splineAccount' in entry.objectClass.values:
uid = entry.uid.value
@@ -73,7 +80,7 @@ class LdapBackend(Backend):
return Account(uid, mail, services, password,
uidNumber=uidNumber)
- def find(self, filters=None, wildcard=False):
+ def find(self, filters: Optional[dict[str, str]] = None, wildcard=False):
"""
Find accounts by a given filter.
"""
@@ -88,7 +95,7 @@ class LdapBackend(Backend):
conn = self._connect()
base_dn = self._format_dn([('ou', 'users')])
- accounts = []
+ accounts: list[Account] = []
try:
conn.search(base_dn, filterstr, search_scope=ldap3.LEVEL,
attributes=['uid', 'mail', 'uidNumber'])
@@ -100,7 +107,7 @@ class LdapBackend(Backend):
return accounts
- def _store(self, account):
+ def _store(self, account: Account):
conn = self._connect_as_admin()
user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')])
@@ -117,7 +124,7 @@ class LdapBackend(Backend):
account.new_password_root = (None, account.password)
self._alter_passwords(conn, account)
- def update(self, account, as_admin=False):
+ def update(self, account: Account, as_admin=False):
"""
Updates account informations like passwords or email.
"""
@@ -133,7 +140,7 @@ class LdapBackend(Backend):
conn.modify(user_dn, attrs)
self._alter_passwords(conn, account, as_admin=as_admin)
- def delete(self, account, as_admin=False):
+ def delete(self, account: Account, as_admin=False):
"""
Deletes an account permanently.
"""
@@ -143,14 +150,14 @@ class LdapBackend(Backend):
else:
conn = self._connect(account.uid, account.password)
- dns = [[('cn', service.id), ('uid', account.uid), ('ou', 'users')]
+ dns = [[('cn', service), ('uid', account.uid), ('ou', 'users')]
for service in account.services] + \
[[('uid', account.uid), ('ou', 'users')]]
for dn in dns:
conn.delete(self._format_dn(dn))
- def _format_dn(self, attr, with_base_dn=True):
+ def _format_dn(self, attr: list[tuple[str, str]], with_base_dn=True) -> str:
if with_base_dn:
attr.extend(self.base_dn)
@@ -159,7 +166,8 @@ class LdapBackend(Backend):
return ','.join(dn)
- def _connect(self, user=None, password=None):
+ def _connect(self, user: Optional[str] = None,
+ password: Optional[str] = None):
server = ldap3.Server(self.host)
conn = ldap3.Connection(server, user, password, raise_exceptions=True)
@@ -174,7 +182,8 @@ class LdapBackend(Backend):
admin_dn = self._format_dn([('cn', self.admin_user)])
return self._connect(admin_dn, self.admin_pass)
- def _alter_passwords(self, conn, account, as_admin=False):
+ def _alter_passwords(self, conn: Connection, account: Account,
+ as_admin=False):
if account.new_password_root:
user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')])
_change_password(conn, user_dn, account.new_password_root, as_admin)
@@ -187,7 +196,7 @@ class LdapBackend(Backend):
('ou', 'users')])
_, new = passwords
- if new != None:
+ if new is not None:
if service_id not in account.services:
attrs = {
'objectClass': ['top', 'servicePassword'],
@@ -204,7 +213,7 @@ class LdapBackend(Backend):
del account.new_password_services[service]
- def _get_last_uidNumber(self, conn):
+ def _get_last_uidNumber(self, conn: Connection):
uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')])
conn.search(uidNumber_dn, '(objectClass=uidNumberMaximum)',
attributes=['uidNumber'])