summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJonah Brüchert <jbb@kaidan.im>2024-03-28 06:22:55 +0100
committerJonah Brüchert <jbb@kaidan.im>2024-03-28 16:57:21 +0100
commita3f0c006b5fb5beab1704aad56777dcd98c42efb (patch)
tree2a2acb62303c25a299aea4030eff55bca7e28650
parentd5977387f3e6716cc7594dc872539ccd7f130524 (diff)
downloadweb-a3f0c006b5fb5beab1704aad56777dcd98c42efb.tar.gz
web-a3f0c006b5fb5beab1704aad56777dcd98c42efb.tar.bz2
web-a3f0c006b5fb5beab1704aad56777dcd98c42efb.zip
Add some type annotations
-rw-r--r--accounts/__init__.py13
-rw-r--r--accounts/app.py16
-rw-r--r--accounts/backend/mail/__init__.py17
-rw-r--r--accounts/backend/mail/dummy.py24
-rw-r--r--accounts/backend/mail/sendmail.py17
-rw-r--r--accounts/backend/user/__init__.py23
-rw-r--r--accounts/backend/user/dummy.py29
-rw-r--r--accounts/backend/user/ldap.py57
-rw-r--r--accounts/forms.py46
-rw-r--r--accounts/models.py49
-rw-r--r--accounts/utils/__init__.py14
-rw-r--r--accounts/utils/confirmation.py6
-rw-r--r--accounts/utils/console.py1
-rw-r--r--accounts/utils/login.py17
-rw-r--r--accounts/utils/sessions.py12
-rw-r--r--accounts/views/admin/__init__.py23
-rw-r--r--accounts/views/default/__init__.py56
-rw-r--r--accounts/views/login/__init__.py26
-rw-r--r--mypy.ini11
19 files changed, 273 insertions, 184 deletions
diff --git a/accounts/__init__.py b/accounts/__init__.py
index 48cd252..c9310fe 100644
--- a/accounts/__init__.py
+++ b/accounts/__init__.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
import os
-from flask import Flask, g, session
+from flask import Flask
from .models import Service
from .utils import get_backend
@@ -8,9 +8,12 @@ from .utils.confirmation import Confirmation
from .utils.sessions import EncryptedSessionInterface
from .utils.login import create_login_manager
from .views import default, login, admin
+from .app import AccountsFlask
+from typing import Optional
-def absolute_paths(app, config):
+
+def absolute_paths(app: Flask, config: str) -> None:
def handle_option(dirname, name):
if app.config.get(name):
app.config[name] = os.path.join(dirname, app.config[name])
@@ -19,15 +22,15 @@ def absolute_paths(app, config):
handle_option(dirname, 'USERNAME_BLACKLIST_FILE')
-def load_config(app, configfile):
+def load_config(app: Flask, configfile: Optional[str]) -> None:
if configfile is not None:
filename = os.path.abspath(configfile)
app.config.from_pyfile(filename)
absolute_paths(app, filename)
-def create_app(config=None):
- app = Flask(__name__)
+def create_app(config=None) -> Flask:
+ app = AccountsFlask(__name__)
app.config.from_object('accounts.default_settings')
load_config(app, os.environ.get('SPLINE_ACCOUNT_WEB_SETTINGS'))
load_config(app, config)
diff --git a/accounts/app.py b/accounts/app.py
new file mode 100644
index 0000000..eaab824
--- /dev/null
+++ b/accounts/app.py
@@ -0,0 +1,16 @@
+from flask import Flask, current_app
+from typing import TYPE_CHECKING, cast
+
+if TYPE_CHECKING:
+ from .backend import user, mail
+ from .models import Service
+
+
+class AccountsFlask(Flask):
+ all_services: "list[Service]"
+ username_blacklist: list[str]
+ user_backend: "user.Backend"
+ mail_backend: "mail.Backend"
+
+
+accounts_app: AccountsFlask = cast(AccountsFlask, current_app)
diff --git a/accounts/backend/mail/__init__.py b/accounts/backend/mail/__init__.py
index ecd0d45..f2ec5d5 100644
--- a/accounts/backend/mail/__init__.py
+++ b/accounts/backend/mail/__init__.py
@@ -1,18 +1,25 @@
# -*- coding: utf-8 -*-
-class Backend(object):
+from flask.app import Flask
+from jinja2 import Template
+from jinja2.environment import TemplateModule
- def __init__(self, app):
+
+class Backend():
+ app: Flask
+
+ def __init__(self, app: Flask) -> None:
self.app = app
- def _send(self, recipient, content):
+ def _send(self, recipient: str, content: TemplateModule):
raise NotImplementedError()
- def send(self, recipient, template, **kwargs):
+ def send(self, recipient: str, template, **kwargs):
if recipient is None:
return
- tmpl = self.app.jinja_env.get_or_select_template(template)
+ tmpl: Template = \
+ self.app.jinja_env.get_or_select_template(template)
kwargs['recipient'] = recipient
module = tmpl.make_module(kwargs)
diff --git a/accounts/backend/mail/dummy.py b/accounts/backend/mail/dummy.py
index 61e7b75..3de3dbe 100644
--- a/accounts/backend/mail/dummy.py
+++ b/accounts/backend/mail/dummy.py
@@ -2,7 +2,8 @@
from . import Backend
-from accounts.utils import ensure_utf8
+from accounts import AccountsFlask
+from jinja2.environment import TemplateModule
FANCY_FORMAT = '''\
@@ -23,8 +24,10 @@ From: {sender}
class DummyBackend(Backend):
+ format: str
- def __init__(self, app, plain=False, format=None):
+ def __init__(self, app: AccountsFlask, plain: bool = False,
+ format: None = None) -> None:
super(DummyBackend, self).__init__(app)
self.plain = plain
@@ -36,13 +39,16 @@ class DummyBackend(Backend):
else:
self.format = format
- def _send(self, recipient, content):
- body = content.body()
+ def _send(self, recipient: str, content: TemplateModule):
+ print(type(content))
+
+ # we can't typecheck things defined in templates
+ body = content.body() # type: ignore
if not self.plain:
body = "\n| ".join(body.split("\n"))
- print((self.format.format(
- subject=ensure_utf8(content.subject()),
- sender=ensure_utf8(content.sender()),
- to=ensure_utf8(recipient),
- body=ensure_utf8(body))))
+ print(self.format.format(
+ subject=content.subject(), # type: ignore
+ sender=content.sender(), # type: ignore
+ to=recipient,
+ body=body))
diff --git a/accounts/backend/mail/sendmail.py b/accounts/backend/mail/sendmail.py
index 5ac5d93..1fecedc 100644
--- a/accounts/backend/mail/sendmail.py
+++ b/accounts/backend/mail/sendmail.py
@@ -4,27 +4,30 @@
import subprocess
from email.mime.text import MIMEText
from email.utils import parseaddr
+from jinja2.environment import TemplateModule
from . import Backend
-class SendmailBackend(Backend):
+def safe(s: str):
+ return s.split('\n', 1)[0]
- def _send(self, recipient, content):
- safe = lambda s: s.split('\n', 1)[0]
- msg = MIMEText(content.body(), _charset='utf-8')
- msg['Subject'] = safe(content.subject())
+class SendmailBackend(Backend):
+ def _send(self, recipient: str, content: TemplateModule):
+ msg = MIMEText(content.body(), _charset='utf-8') # type: ignore
+ msg['Subject'] = safe(content.subject()) # type: ignore
msg['To'] = safe(recipient)
- msg['From'] = safe(content.sender())
+ msg['From'] = safe(content.sender()) # type: ignore
envelope = []
- _, address = parseaddr(safe(content.sender()))
+ _, address = parseaddr(safe(content.sender())) # type: ignore
if address != '':
envelope = ['-f', address]
p = subprocess.Popen([self.app.config['SENDMAIL_COMMAND']] +
envelope + ['-t'], stdin=subprocess.PIPE)
+ assert p.stdin
p.stdin.write(msg.as_string().encode("utf-8"))
p.stdin.close()
diff --git a/accounts/backend/user/__init__.py b/accounts/backend/user/__init__.py
index e72302a..1504e41 100644
--- a/accounts/backend/user/__init__.py
+++ b/accounts/backend/user/__init__.py
@@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
+from accounts.app import AccountsFlask
+from accounts.models import Account
+
class NoSuchUserError(ValueError):
pass
@@ -49,7 +52,9 @@ class Backend(object):
>> backend.find_by_uid('test*', wildcard=True) # find with wildcards
"""
- def __init__(self, app):
+ app: AccountsFlask
+
+ def __init__(self, app: AccountsFlask) -> None:
self.app = app
#: Exception type, that is raised if no matching user was found.
@@ -60,14 +65,14 @@ class Backend(object):
#: could also be raised, if you want to change user information.
self.InvalidPasswordError = InvalidPasswordError
- def auth(self, username, password):
+ def auth(self, username: str, password: str):
"""
Tries to authenticate a user with a given password. If the
authentication is successful an Account object will be returned.
"""
raise NotImplementedError()
- def get_by_uid(self, uid):
+ def get_by_uid(self, uid: str) -> Account:
"""
Find a single user by uid. Unlike find_by_uid, don't return a list but
raise NoSuchUserError if there is no such user.
@@ -80,7 +85,7 @@ class Backend(object):
return users[0]
- def get_by_mail(self, mail):
+ def get_by_mail(self, mail: str) -> Account:
"""
Find a single user by mail. Unlike find_by_mail, don't return a list but
raise NoSuchUserError if there is no such user.
@@ -93,19 +98,19 @@ class Backend(object):
return users[0]
- def find_by_uid(self, uid, wildcard=False):
+ def find_by_uid(self, uid: str, wildcard=False) -> list[Account]:
return self.find({'uid': uid}, wildcard)
- def find_by_mail(self, mail, wildcard=False):
+ def find_by_mail(self, mail: str, wildcard=False) -> list[Account]:
return self.find({'mail': mail}, wildcard)
- def find(self, filters=None, wildcard=False):
+ def find(self, filters=None, wildcard=False) -> list[Account]:
"""
Find accounts by a given filter.
"""
raise NotImplementedError()
- def register(self, account):
+ def register(self, account: Account):
"""
Register a new user account.
@@ -118,7 +123,7 @@ class Backend(object):
account.uidNumber = self._get_next_uidNumber()
self._store(account)
- def update(self, account, as_admin=False):
+ def update(self, account: Account, as_admin=False):
"""
Updates account information like passwords or email.
"""
diff --git a/accounts/backend/user/dummy.py b/accounts/backend/user/dummy.py
index 8c54dac..3d0dcca 100644
--- a/accounts/backend/user/dummy.py
+++ b/accounts/backend/user/dummy.py
@@ -2,9 +2,13 @@ from fnmatch import fnmatch
from . import Backend
from accounts.models import Account
+from accounts import AccountsFlask
+from typing import Optional
-def _match_filter(account, filters, wildcard):
+
+def _match_filter(account: Account, filters: Optional[dict[str, str]],
+ wildcard: bool):
if filters is None:
return True
@@ -30,7 +34,7 @@ class DummyBackend(Backend):
users (test and test2) are created.
"""
- def __init__(self, app):
+ def __init__(self, app: AccountsFlask) -> None:
super(DummyBackend, self).__init__(app)
self._storage = {
@@ -65,7 +69,7 @@ class DummyBackend(Backend):
)
return accounts
- def auth(self, username, password):
+ def auth(self, username: str, password: str):
"""
Tries to authenticate a user with a given password. If the
authentication is successful an Account object will be returned.
@@ -76,30 +80,31 @@ class DummyBackend(Backend):
acc.password = password
return acc
- def find(self, filters=None, wildcard=False):
+ def find(self, filters: Optional[dict[str, str]] = None,
+ wildcard=False) -> list[Account]:
"""
Find accounts by a given filter.
"""
return [acc for acc in self._get_accounts() if _match_filter(acc, filters, wildcard)]
- def _store(self, account):
+ def _store(self, account: Account) -> None:
self._storage[account.uid] = {
"uidNumber": account.uidNumber,
"mail": account.mail,
"password": account.password
}
- def _verify_password(self, account, password):
+ def _verify_password(self, account: Account, password: Optional[str]):
return password == self._storage[account.uid]["password"]
- def _alter_password(self, account, password):
+ def _alter_password(self, account: Account, password: Optional[str]):
self._storage[account.uid]["password"] = password
- def update(self, account, as_admin=False):
+ def update(self, account: Account, as_admin=False):
"""
Updates account information like passwords or email.
"""
- stored_account = self.get_by_uid(account.uid)
+ stored_account: Account = self.get_by_uid(account.uid)
if not as_admin:
if not self._verify_password(stored_account, account.password):
raise self.InvalidPasswordError("Invalid password")
@@ -109,16 +114,16 @@ class DummyBackend(Backend):
if self._verify_password(stored_account, old):
self._alter_password(stored_account, new)
- def delete(self, account, as_admin=False):
+ def delete(self, account: Account, as_admin=False):
"""
Deletes an account permanently.
"""
- stored_account = self.get_by_uid(account.uid)
+ stored_account: Account = self.get_by_uid(account.uid)
if not as_admin:
if stored_account.password != account.password:
raise self.InvalidPasswordError("Invalid password")
- self._storage = [acc for acc in self._storage if acc.uid != account.uid]
+ self._storage.pop(stored_account.uid)
def _get_next_uidNumber(self):
value = self._next_uidNumber
diff --git a/accounts/backend/user/ldap.py b/accounts/backend/user/ldap.py
index 64e29d4..217dcba 100644
--- a/accounts/backend/user/ldap.py
+++ b/accounts/backend/user/ldap.py
@@ -6,12 +6,17 @@ from ldap3.utils.conv import escape_filter_chars
from ldap3.utils.dn import escape_rdn
from ldap3.core.exceptions import LDAPInvalidCredentialsResult, LDAPException, LDAPOperationResult
from ldap3.core.results import RESULT_SUCCESS
+from ldap3.abstract.entry import Entry
+from ldap3 import Connection
from . import Backend, InvalidPasswordError, NoSuchUserError, ShouldNotHappen
-from accounts.models import Account
+from accounts.models import Account, Service
+from accounts import AccountsFlask
+from typing import Optional
-def _escape(value, wildcard=False):
+
+def _escape(value: str, wildcard=False):
if not isinstance(value, str):
value = str(value)
if not wildcard:
@@ -19,7 +24,9 @@ def _escape(value, wildcard=False):
return escape_rdn(value)
-def _change_password(conn, dn, passwords, as_admin=False):
+def _change_password(conn: Connection, dn,
+ passwords: tuple[Optional[str], Optional[str]],
+ as_admin=False):
old_password, new_password = passwords
if as_admin:
conn.extend.standard.modify_password(dn, None, new_password)
@@ -32,26 +39,25 @@ def _change_password(conn, dn, passwords, as_admin=False):
class LdapBackend(Backend):
-
- def __init__(self, app):
+ def __init__(self, app: AccountsFlask) -> None:
super(LdapBackend, self).__init__(app)
- self.host = self.app.config['LDAP_HOST']
- self.base_dn = self.app.config['LDAP_BASE_DN']
- self.admin_user = self.app.config['LDAP_ADMIN_USER']
- self.admin_pass = self.app.config['LDAP_ADMIN_PASS']
- self.services = self.app.all_services
+ self.host: str = self.app.config['LDAP_HOST']
+ self.base_dn: list[tuple[str, str]] = self.app.config['LDAP_BASE_DN']
+ self.admin_user: str = self.app.config['LDAP_ADMIN_USER']
+ self.admin_pass: str = self.app.config['LDAP_ADMIN_PASS']
+ self.services: list[Service] = self.app.all_services
self.admin = False
self.binded = False
- def auth(self, username, password):
+ def auth(self, username: str, password: str):
"""
Tries to authenticate a user with a given password. If the
authentication is successful an Account object will be returned.
"""
- user_dn = self._format_dn([('uid', username), ('ou', 'users')])
- conn = self._connect(user_dn, password)
+ user_dn: str = self._format_dn([('uid', username), ('ou', 'users')])
+ conn: Connection = self._connect(user_dn, password)
uid = None
mail = None
@@ -59,6 +65,7 @@ class LdapBackend(Backend):
services = []
conn.search(user_dn, '(objectClass=*)',
attributes=['objectClass', 'uid', 'mail', 'cn', 'uidNumber'])
+ entry: Entry
for entry in conn.entries:
if 'splineAccount' in entry.objectClass.values:
uid = entry.uid.value
@@ -73,7 +80,7 @@ class LdapBackend(Backend):
return Account(uid, mail, services, password,
uidNumber=uidNumber)
- def find(self, filters=None, wildcard=False):
+ def find(self, filters: Optional[dict[str, str]] = None, wildcard=False):
"""
Find accounts by a given filter.
"""
@@ -88,7 +95,7 @@ class LdapBackend(Backend):
conn = self._connect()
base_dn = self._format_dn([('ou', 'users')])
- accounts = []
+ accounts: list[Account] = []
try:
conn.search(base_dn, filterstr, search_scope=ldap3.LEVEL,
attributes=['uid', 'mail', 'uidNumber'])
@@ -100,7 +107,7 @@ class LdapBackend(Backend):
return accounts
- def _store(self, account):
+ def _store(self, account: Account):
conn = self._connect_as_admin()
user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')])
@@ -117,7 +124,7 @@ class LdapBackend(Backend):
account.new_password_root = (None, account.password)
self._alter_passwords(conn, account)
- def update(self, account, as_admin=False):
+ def update(self, account: Account, as_admin=False):
"""
Updates account informations like passwords or email.
"""
@@ -133,7 +140,7 @@ class LdapBackend(Backend):
conn.modify(user_dn, attrs)
self._alter_passwords(conn, account, as_admin=as_admin)
- def delete(self, account, as_admin=False):
+ def delete(self, account: Account, as_admin=False):
"""
Deletes an account permanently.
"""
@@ -143,14 +150,14 @@ class LdapBackend(Backend):
else:
conn = self._connect(account.uid, account.password)
- dns = [[('cn', service.id), ('uid', account.uid), ('ou', 'users')]
+ dns = [[('cn', service), ('uid', account.uid), ('ou', 'users')]
for service in account.services] + \
[[('uid', account.uid), ('ou', 'users')]]
for dn in dns:
conn.delete(self._format_dn(dn))
- def _format_dn(self, attr, with_base_dn=True):
+ def _format_dn(self, attr: list[tuple[str, str]], with_base_dn=True) -> str:
if with_base_dn:
attr.extend(self.base_dn)
@@ -159,7 +166,8 @@ class LdapBackend(Backend):
return ','.join(dn)
- def _connect(self, user=None, password=None):
+ def _connect(self, user: Optional[str] = None,
+ password: Optional[str] = None):
server = ldap3.Server(self.host)
conn = ldap3.Connection(server, user, password, raise_exceptions=True)
@@ -174,7 +182,8 @@ class LdapBackend(Backend):
admin_dn = self._format_dn([('cn', self.admin_user)])
return self._connect(admin_dn, self.admin_pass)
- def _alter_passwords(self, conn, account, as_admin=False):
+ def _alter_passwords(self, conn: Connection, account: Account,
+ as_admin=False):
if account.new_password_root:
user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')])
_change_password(conn, user_dn, account.new_password_root, as_admin)
@@ -187,7 +196,7 @@ class LdapBackend(Backend):
('ou', 'users')])
_, new = passwords
- if new != None:
+ if new is not None:
if service_id not in account.services:
attrs = {
'objectClass': ['top', 'servicePassword'],
@@ -204,7 +213,7 @@ class LdapBackend(Backend):
del account.new_password_services[service]
- def _get_last_uidNumber(self, conn):
+ def _get_last_uidNumber(self, conn: Connection):
uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')])
conn.search(uidNumber_dn, '(objectClass=uidNumberMaximum)',
attributes=['uidNumber'])
diff --git a/accounts/forms.py b/accounts/forms.py
index ad48ba9..a85f382 100644
--- a/accounts/forms.py
+++ b/accounts/forms.py
@@ -1,12 +1,13 @@
# -*- coding: utf-8 -*-
import re
-from flask import current_app, Markup, url_for
+from flask import Markup, url_for
from flask_wtf import FlaskForm as Form
from flask_login import current_user
-from wtforms import StringField, PasswordField, ValidationError, BooleanField,\
- validators
+from wtforms import StringField, PasswordField, ValidationError, \
+ BooleanField, validators
from wtforms.form import FormMeta
from .utils import NotRegexp
+from accounts.app import accounts_app
USERNAME_RE = re.compile(r'^[a-zA-Z][a-zA-Z0-9-]{1,15}$')
@@ -25,10 +26,10 @@ class RegisterForm(Form):
def validate_username(self, field):
try:
- current_app.user_backend.get_by_uid(field.data)
- except current_app.user_backend.NoSuchUserError:
- if current_app.username_blacklist:
- if field.data.lower() in current_app.username_blacklist:
+ accounts_app.user_backend.get_by_uid(field.data)
+ except accounts_app.user_backend.NoSuchUserError:
+ if accounts_app.username_blacklist:
+ if field.data.lower() in accounts_app.username_blacklist:
raise ValidationError(Markup(
'Dieser Benutzername ist momentan nicht erlaubt. '
'<a href="%s">Weitere Informationen</a>'
@@ -38,8 +39,8 @@ class RegisterForm(Form):
def validate_mail(self, field):
try:
- current_app.user_backend.get_by_mail(field.data)
- except current_app.user_backend.NoSuchUserError:
+ accounts_app.user_backend.get_by_mail(field.data)
+ except accounts_app.user_backend.NoSuchUserError:
pass
else:
raise ValidationError(Markup(
@@ -48,11 +49,12 @@ class RegisterForm(Form):
'die <a href="%s">Passwort-vergessen-Funktion</a> benutzen.'
% url_for('default.lost_password')))
+
class AdminCreateAccountForm(RegisterForm):
def validate_username(self, field):
try:
- current_app.user_backend.get_by_uid(field.data)
- except current_app.user_backend.NoSuchUserError:
+ accounts_app.user_backend.get_by_uid(field.data)
+ except accounts_app.user_backend.NoSuchUserError:
return
else:
raise ValidationError('Dieser Benutzername ist schon vergeben')
@@ -74,19 +76,19 @@ class LostPasswordForm(Form):
def validate_username_or_mail(self, field):
if '@' not in field.data:
try:
- self.user = current_app.user_backend.get_by_uid(field.data)
- except current_app.user_backend.NoSuchUserError:
+ self.user = accounts_app.user_backend.get_by_uid(field.data)
+ except accounts_app.user_backend.NoSuchUserError:
raise ValidationError('Es gibt keinen Benutzer mit diesem Namen.')
else:
try:
- self.user = current_app.user_backend.get_by_mail(field.data)
- except current_app.user_backend.NoSuchUserError:
+ self.user = accounts_app.user_backend.get_by_mail(field.data)
+ except accounts_app.user_backend.NoSuchUserError:
raise ValidationError('Es gibt keinen Benutzer mit dieser Adresse.')
class SettingsMeta(FormMeta):
def __call__(cls, *args, **kwargs):
- for service in current_app.all_services:
+ for service in accounts_app.all_services:
setattr(cls, 'password_%s' % service.id, PasswordField(
'Passwort für %s' % service.name, [
validators.Optional(),
@@ -117,18 +119,18 @@ class SettingsForm(Form, metaclass=SettingsMeta):
raise ValidationError('Altes Passwort ist falsch.')
def validate_mail(self, field):
- results = current_app.user_backend.find_by_mail(field.data)
+ results = accounts_app.user_backend.find_by_mail(field.data)
for user in results:
if user.uid != current_user.uid:
raise ValidationError('Diese E-Mail-Adresse wird schon von einem anderen Account benutzt!')
- def get_servicepassword(self, service_id):
+ def get_servicepassword(self, service_id: str):
return getattr(self, 'password_%s' % service_id)
- def get_servicepasswordconfirm(self, service_id):
+ def get_servicepasswordconfirm(self, service_id: str):
return getattr(self, 'password_confirm_%s' % service_id)
- def get_servicedelete(self, service_id):
+ def get_servicedelete(self, service_id: str):
return getattr(self, 'delete_%s' % service_id)
@@ -138,6 +140,6 @@ class AdminDisableAccountForm(Form):
def validate_username(self, field):
try:
- self.user = current_app.user_backend.get_by_uid(field.data)
- except current_app.user_backend.NoSuchUserError:
+ self.user = accounts_app.user_backend.get_by_uid(field.data)
+ except accounts_app.user_backend.NoSuchUserError:
raise ValidationError('Dieser Benutzername existiert nicht')
diff --git a/accounts/models.py b/accounts/models.py
index 72c061d..fe6c500 100644
--- a/accounts/models.py
+++ b/accounts/models.py
@@ -1,8 +1,27 @@
from flask_login import UserMixin
+from typing import Any, Optional
+
from accounts.utils.login import create_userid
+class Service(object):
+ id: str
+ name: str
+ url: str
+
+ def __init__(self, service_id: str, name: str, url: str) -> None:
+ self.id = service_id
+ self.name = name
+ self.url = url
+
+ #: Wether the user has a separate password for this service.
+ self.changed = False
+
+ def __repr__(self):
+ return '<Service %s>' % self.id
+
+
class Account(UserMixin):
"""
An Account represents a complex ldap tree entry for spline users.
@@ -10,7 +29,15 @@ class Account(UserMixin):
"""
_ready = False
- def __init__(self, uid, mail, services=None, password=None, uidNumber=None):
+ uid: str
+ services: list[str]
+ password: Optional[str]
+ new_password_services: dict[str, tuple[Optional[str], Optional[str]]]
+ attributes: dict[Any, Any]
+ new_password_root: Optional[tuple[Optional[str], Optional[str]]]
+
+ def __init__(self, uid: str, mail: str, services=None,
+ password: Optional[str] = None, uidNumber=None):
self.uid = uid
self.services = list() if services is None else services
self.password = password
@@ -25,10 +52,11 @@ class Account(UserMixin):
def __repr__(self):
return "<Account uid=%s>" % self.uid
- def reset_password(self, service):
+ def reset_password(self, service: str):
self.new_password_services[service] = (None, None)
- def change_password(self, new_password, old_password='', service=None):
+ def change_password(self, new_password: str, old_password='',
+ service=None):
"""
Changes a password for a given service. You have to use the
UserBackend to make the changes permanent. If no service is given,
@@ -43,7 +71,7 @@ class Account(UserMixin):
def _set_attribute(self, key, value):
self.attributes[key] = value
- def change_email(self, new_mail):
+ def change_email(self, new_mail: str):
"""
Changes the mail address of an account. You have to use the
AccountService class to make changes permanent.
@@ -69,16 +97,3 @@ class Account(UserMixin):
the cookie and used to identify the user.
"""
return create_userid(self.uid, self.password)
-
-
-class Service(object):
- def __init__(self, service_id, name, url):
- self.id = service_id
- self.name = name
- self.url = url
-
- #: Wether the user has a separate password for this service.
- self.changed = False
-
- def __repr__(self):
- return '<Service %s>' % self.id
diff --git a/accounts/utils/__init__.py b/accounts/utils/__init__.py
index 8d49363..6adf317 100644
--- a/accounts/utils/__init__.py
+++ b/accounts/utils/__init__.py
@@ -1,12 +1,14 @@
# -*- coding: utf-8 -*-
import importlib
from functools import wraps
-from flask import render_template, request
+from flask import render_template, request, Flask
from wtforms.validators import Regexp, ValidationError
+from typing import Optional
+
# using http://flask.pocoo.org/docs/patterns/viewdecorators/
-def templated(template=None):
+def templated(template: Optional[str] = None):
def templated_(f):
@wraps(f)
def templated__(*args, **kwargs):
@@ -24,12 +26,6 @@ def templated(template=None):
return templated_
-def ensure_utf8(s):
- if isinstance(s, str):
- s = s.encode('utf8')
- return s
-
-
class NotRegexp(Regexp):
"""
Like wtforms.validators.Regexp, but rejects data that DOES match the regex.
@@ -42,7 +38,7 @@ class NotRegexp(Regexp):
raise ValidationError(self.message)
-def get_backend(path, app):
+def get_backend(path: str, app: Flask):
module = path.rsplit(".", 1).pop()
class_name = '%sBackend' % module.title()
backend_class = getattr(importlib.import_module(path), class_name)
diff --git a/accounts/utils/confirmation.py b/accounts/utils/confirmation.py
index 79ac7dc..b75716d 100644
--- a/accounts/utils/confirmation.py
+++ b/accounts/utils/confirmation.py
@@ -1,14 +1,14 @@
# -*- coding: utf-8 -*-
-from flask import current_app
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
from werkzeug.exceptions import Forbidden
+from accounts.app import accounts_app
class Confirmation(URLSafeTimedSerializer):
- def __init__(self, realm, key=None, **kwargs):
+ def __init__(self, realm: str, key=None, **kwargs):
if key is None:
- key = current_app.config['SECRET_KEY']
+ key = accounts_app.config['SECRET_KEY']
super(Confirmation, self).__init__(key, salt=realm, **kwargs)
def loads_http(self, s, max_age=None, return_timestamp=False, salt=None):
diff --git a/accounts/utils/console.py b/accounts/utils/console.py
index eddc25e..ef8b4fd 100644
--- a/accounts/utils/console.py
+++ b/accounts/utils/console.py
@@ -48,7 +48,6 @@ class TablePrinter(object):
in zip(list(zip(*rows)), self.widths)]
self._update_format_string()
-
def _update_format_string(self):
sep = ' %s ' % self.separator
self.format_string = '%s %s %s' % (
diff --git a/accounts/utils/login.py b/accounts/utils/login.py
index bd661b2..0cd1dc4 100644
--- a/accounts/utils/login.py
+++ b/accounts/utils/login.py
@@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
-from flask import current_app
from flask_login import LoginManager, current_user
from functools import wraps
from werkzeug.exceptions import Forbidden
from itsdangerous import base64_decode, base64_encode
import json
+import flask_login.login_manager
+from accounts.app import accounts_app
class _compact_json:
@@ -19,29 +20,29 @@ class _compact_json:
return json.dumps(obj, **kwargs)
-def create_login_manager():
+def create_login_manager() -> flask_login.login_manager.LoginManager:
login_manager = LoginManager()
login_manager.login_message = 'Bitte einloggen'
login_manager.login_view = 'login.login'
@login_manager.user_loader
- def load_user(user_id):
+ def load_user(user_id: str):
try:
username, password = parse_userid(user_id)
- return current_app.user_backend.auth(username, password)
- except (current_app.user_backend.NoSuchUserError,
- current_app.user_backend.InvalidPasswordError):
+ return accounts_app.user_backend.auth(username, password)
+ except (accounts_app.user_backend.NoSuchUserError,
+ accounts_app.user_backend.InvalidPasswordError):
return None
return login_manager
-def create_userid(username, password):
+def create_userid(username: str, password: str):
userid = (username, password)
return base64_encode(_compact_json.dumps(userid))
-def parse_userid(value):
+def parse_userid(value: str):
return _compact_json.loads(base64_decode(value))
diff --git a/accounts/utils/sessions.py b/accounts/utils/sessions.py
index 5d01b5d..e156f0c 100644
--- a/accounts/utils/sessions.py
+++ b/accounts/utils/sessions.py
@@ -3,9 +3,12 @@
from Crypto import Random
from Crypto.Cipher import AES
-from flask import current_app
+from flask import Flask
from flask.sessions import TaggedJSONSerializer, SecureCookieSessionInterface
from itsdangerous import BadPayload
+from accounts.app import accounts_app
+
+from typing import cast
def _pad(value, block_size):
@@ -25,7 +28,7 @@ class EncryptedSerializer(TaggedJSONSerializer):
self.block_size = AES.block_size
def _cipher(self, iv):
- key = current_app.config['SESSION_ENCRYPTION_KEY']
+ key = accounts_app.config['SESSION_ENCRYPTION_KEY']
assert len(key) == 32
return AES.new(key, AES.MODE_CBC, iv)
@@ -53,11 +56,12 @@ class EncryptedSerializer(TaggedJSONSerializer):
class EncryptedSessionInterface(SecureCookieSessionInterface):
serializer = EncryptedSerializer()
- def open_session(self, app, request):
+ def open_session(self, app: Flask, request):
session = None
try:
parent = super(EncryptedSessionInterface, self)
- session = parent.open_session(app, request)
+ session = cast(EncryptedSessionInterface, parent) \
+ .open_session(app, request)
except BadPayload:
session = self.session_class()
diff --git a/accounts/views/admin/__init__.py b/accounts/views/admin/__init__.py
index 35fda58..7378e38 100644
--- a/accounts/views/admin/__init__.py
+++ b/accounts/views/admin/__init__.py
@@ -2,13 +2,14 @@
from flask import Blueprint
-from flask import current_app, redirect, request, g, flash, url_for
+from flask import redirect, request, flash, url_for
from flask_login import current_user
from uuid import uuid4
from werkzeug.exceptions import Forbidden
from accounts.utils import templated
from accounts.forms import AdminCreateAccountForm, AdminDisableAccountForm
+from accounts.app import accounts_app
bp = Blueprint('admin', __name__)
@@ -17,8 +18,8 @@ bp = Blueprint('admin', __name__)
@bp.before_request
def restrict_bp_to_admins():
if not current_user.is_authenticated:
- return current_app.login_manager.unauthorized()
- if current_user.uid not in current_app.config.get('ADMIN_USERS', []):
+ return accounts_app.login_manager.unauthorized()
+ if current_user.uid not in accounts_app.config.get('ADMIN_USERS', []):
raise Forbidden('Du bist kein Admin.')
@@ -33,8 +34,8 @@ def index():
def create_account():
form = AdminCreateAccountForm()
if form.validate_on_submit():
- current_app.mail_backend.send(form.mail.data, 'mail/register.txt',
- username=form.username.data)
+ accounts_app.mail_backend.send(form.mail.data, 'mail/register.txt',
+ username=form.username.data)
flash('Mail versandt.', 'success')
return redirect(url_for('admin.index'))
@@ -45,7 +46,7 @@ def create_account():
@bp.route('/view_blacklist/<start>')
@templated('admin/view_blacklist.html')
def view_blacklist(start=''):
- entries = current_app.username_blacklist
+ entries = accounts_app.username_blacklist
if start:
entries = [e for e in entries if e.startswith(start)]
@@ -68,20 +69,20 @@ def disable_account():
if form.validate_on_submit():
random_pw = str(uuid4())
form.user.change_password(random_pw)
- for service in current_app.all_services:
+ for service in accounts_app.all_services:
form.user.reset_password(service.id)
oldmail = form.user.mail
- mail = current_app.config['DISABLED_ACCOUNT_MAILADDRESS_TEMPLATE'] % form.user.uid
+ mail = accounts_app.config['DISABLED_ACCOUNT_MAILADDRESS_TEMPLATE'] % form.user.uid
form.user.change_email(mail)
- current_app.user_backend.update(form.user, as_admin=True)
+ accounts_app.user_backend.update(form.user, as_admin=True)
flash('Passwort auf ein zufälliges und Mailadresse auf %s '
'gesetzt.' % mail, 'success')
- current_app.mail_backend.send(
- current_app.config['MAIL_REGISTER_NOTIFY'],
+ accounts_app.mail_backend.send(
+ accounts_app.config['MAIL_REGISTER_NOTIFY'],
'mail/disable_notify.txt',
username=form.user.uid, mail=oldmail, admin=current_user.uid)
diff --git a/accounts/views/default/__init__.py b/accounts/views/default/__init__.py
index 1854c46..0b7065d 100644
--- a/accounts/views/default/__init__.py
+++ b/accounts/views/default/__init__.py
@@ -3,10 +3,11 @@
from copy import deepcopy
from flask import Blueprint
-from flask import current_app, redirect, render_template, request, g, \
+from flask import redirect, render_template, request, \
flash, url_for
-from flask_login import login_required, login_user, logout_user, current_user
+from flask_login import login_required, login_user, current_user
from werkzeug.exceptions import Forbidden
+from werkzeug import Response
from accounts.forms import RegisterForm, RegisterCompleteForm, \
LostPasswordForm, SettingsForm
@@ -14,6 +15,9 @@ from accounts.utils import templated
from accounts.utils.confirmation import Confirmation
from accounts.utils.login import logout_required
from accounts.models import Account
+from accounts.app import accounts_app
+
+from typing import Union
bp = Blueprint('default', __name__)
@@ -22,11 +26,11 @@ bp = Blueprint('default', __name__)
@bp.route('/register', methods=['GET', 'POST'])
@templated('register.html')
@logout_required
-def register():
+def register() -> Union[dict, Response]:
form = RegisterForm()
if form.validate_on_submit():
- current_app.mail_backend.send(form.mail.data, 'mail/register.txt',
- username=form.username.data)
+ accounts_app.mail_backend.send(form.mail.data, 'mail/register.txt',
+ username=form.username.data)
flash('Es wurde eine E-Mail an die angegebene Adresse geschickt, '
'um diese zu überprüfen. Bitte folge den Anweisungen in der '
@@ -40,14 +44,14 @@ def register():
@bp.route('/register/<token>', methods=['GET', 'POST'])
@templated('register_complete.html')
@logout_required
-def register_complete(token):
+def register_complete(token: str):
#TODO: check for double uids and mail
username, mail = Confirmation('register').loads_http(token, max_age=3*24*60*60)
try:
- current_app.user_backend.get_by_uid(username)
- current_app.user_backend.get_by_mail(mail)
- except current_app.user_backend.NoSuchUserError:
+ accounts_app.user_backend.get_by_uid(username)
+ accounts_app.user_backend.get_by_mail(mail)
+ except accounts_app.user_backend.NoSuchUserError:
pass
else:
flash('Du hast den Benutzer bereits angelegt! Du kannst dich jetzt einfach einloggen:')
@@ -56,11 +60,11 @@ def register_complete(token):
form = RegisterCompleteForm()
if form.validate_on_submit():
user = Account(username, mail, password=form.password.data)
- current_app.user_backend.register(user)
+ accounts_app.user_backend.register(user)
login_user(user)
- current_app.mail_backend.send(
- current_app.config['MAIL_REGISTER_NOTIFY'],
+ accounts_app.mail_backend.send(
+ accounts_app.config['MAIL_REGISTER_NOTIFY'],
'mail/register_notify.txt',
username=username, mail=mail)
@@ -83,7 +87,7 @@ def lost_password():
if form.validate_on_submit():
#TODO: make the link only usable once (e.g include a hash of the old pw)
# atm the only thing we do is make the link valid for only little time
- current_app.mail_backend.send(
+ accounts_app.mail_backend.send(
form.user.mail, 'mail/lost_password.txt', username=form.user.uid)
flash('Wir haben dir eine E-Mail mit einem Link zum Passwort ändern '
@@ -97,14 +101,14 @@ def lost_password():
@bp.route('/lost_password/<token>', methods=['GET', 'POST'])
@templated('lost_password_complete.html')
@logout_required
-def lost_password_complete(token):
+def lost_password_complete(token: str):
(username,) = Confirmation('lost_password').loads_http(token, max_age=4*60*60)
form = RegisterCompleteForm()
if form.validate_on_submit():
- user = current_app.user_backend.get_by_uid(username)
+ user = accounts_app.user_backend.get_by_uid(username)
user.change_password(form.password.data)
- current_app.user_backend.update(user, as_admin=True)
+ accounts_app.user_backend.update(user, as_admin=True)
login_user(user)
flash('Passwort geändert.', 'success')
@@ -120,13 +124,13 @@ def lost_password_complete(token):
@bp.route('/', methods=['GET', 'POST'])
@templated('index.html')
@login_required
-def index():
+def index() -> Union[Response, dict]:
form = SettingsForm(mail=current_user.mail)
if form.validate_on_submit():
changed = False
if request.form.get('submit_services'):
- for service in current_app.all_services:
+ for service in accounts_app.all_services:
field = form.get_servicedelete(service.id)
if field.data:
current_user.reset_password(service.id)
@@ -134,7 +138,7 @@ def index():
elif request.form.get('submit_main'):
if form.mail.data and form.mail.data != current_user.mail:
- current_app.mail_backend.send(
+ accounts_app.mail_backend.send(
form.mail.data, 'mail/change_mail.txt',
username=current_user.uid)
@@ -148,21 +152,21 @@ def index():
flash('Passwort geändert', 'success')
changed = True
- for service in current_app.all_services:
+ for service in accounts_app.all_services:
field = form.get_servicepassword(service.id)
if field.data:
changed = True
current_user.change_password(field.data, None, service.id)
if changed:
- current_app.user_backend.update(current_user)
+ accounts_app.user_backend.update(current_user)
login_user(current_user)
return redirect(url_for('.index'))
else:
flash('Nichts geändert.')
- services = deepcopy(current_app.all_services)
+ services = deepcopy(accounts_app.all_services)
for s in services:
s.changed = s.id in current_user.services
@@ -174,19 +178,19 @@ def index():
@bp.route('/change_mail/<token>')
@login_required
-def change_mail(token):
+def change_mail(token: str):
username, mail = Confirmation('change_mail').loads_http(token, max_age=3*24*60*60)
if current_user.uid != username:
raise Forbidden('Bitte logge dich als der Benutzer ein, dessen E-Mail-Adresse du ändern willst.')
- results = current_app.user_backend.find_by_mail(mail)
+ results = accounts_app.user_backend.find_by_mail(mail)
for user in results:
if user.uid != current_user.uid:
raise Forbidden('Diese E-Mail-Adresse wird schon von einem anderen account benutzt!')
current_user.change_email(mail)
- current_app.user_backend.update(current_user)
+ accounts_app.user_backend.update(current_user)
flash('E-Mail-Adresse geändert.', 'success')
return redirect(url_for('.index'))
@@ -196,7 +200,7 @@ def change_mail(token):
@templated('about.html')
def about():
return {
- 'app': current_app,
+ 'app': accounts_app,
}
diff --git a/accounts/views/login/__init__.py b/accounts/views/login/__init__.py
index 730b3ed..ee049bf 100644
--- a/accounts/views/login/__init__.py
+++ b/accounts/views/login/__init__.py
@@ -2,17 +2,21 @@
from flask import Blueprint
-from flask import current_app, redirect, request, g, flash, render_template, url_for
+from flask import redirect, request, flash, render_template, url_for
from flask_login import login_user, logout_user, current_user
from urllib.parse import urljoin, urlparse
+from werkzeug import Response
-from .forms import LoginForm
+from accounts.app import accounts_app
+
+from typing import Union
+from .forms import LoginForm
bp = Blueprint('login', __name__)
-def is_safe_url(target):
+def is_safe_url(target: str):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
print(target)
@@ -23,24 +27,22 @@ def is_safe_url(target):
@bp.route('/login', methods=['GET', 'POST'])
-def login():
+def login() -> Union[str, Response]:
if current_user.is_authenticated:
return redirect(url_for('default.index'))
form = LoginForm(request.form)
if form.validate_on_submit():
try:
- user = current_app.user_backend.auth(form.username.data,
- form.password.data)
+ user = accounts_app.user_backend.auth(form.username.data,
+ form.password.data)
login_user(user)
flash('Erfolgreich eingeloggt', 'success')
next = request.form['next']
- if not is_safe_url(next):
- next = None
- return redirect(next or url_for('default.index'))
- except (current_app.user_backend.NoSuchUserError,
- current_app.user_backend.InvalidPasswordError):
+ return redirect(next if is_safe_url(next) else url_for('default.index'))
+ except (accounts_app.user_backend.NoSuchUserError,
+ accounts_app.user_backend.InvalidPasswordError):
flash('Ungültiger Benutzername und/oder Passwort', 'error')
return render_template("login/login.html", form=form,
@@ -48,7 +50,7 @@ def login():
@bp.route('/logout')
-def logout():
+def logout() -> Response:
logout_user()
flash('Erfolgreich ausgeloggt.', 'success')
return redirect(url_for('.login'))
diff --git a/mypy.ini b/mypy.ini
new file mode 100644
index 0000000..95a93c1
--- /dev/null
+++ b/mypy.ini
@@ -0,0 +1,11 @@
+[mypy-flask_login.*]
+ignore_missing_imports = True
+
+[mypy-wtforms.*]
+ignore_missing_imports = True
+
+[mypy-flask_wtf.*]
+ignore_missing_imports = True
+
+[mypy-flask_script.*]
+ignore_missing_imports = True