From a3f0c006b5fb5beab1704aad56777dcd98c42efb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonah=20Br=C3=BCchert?= Date: Thu, 28 Mar 2024 06:22:55 +0100 Subject: Add some type annotations --- accounts/__init__.py | 13 +++++---- accounts/app.py | 16 +++++++++++ accounts/backend/mail/__init__.py | 17 ++++++++---- accounts/backend/mail/dummy.py | 24 ++++++++++------ accounts/backend/mail/sendmail.py | 17 +++++++----- accounts/backend/user/__init__.py | 23 +++++++++------ accounts/backend/user/dummy.py | 29 +++++++++++-------- accounts/backend/user/ldap.py | 57 ++++++++++++++++++++++---------------- accounts/forms.py | 46 +++++++++++++++--------------- accounts/models.py | 49 ++++++++++++++++++++------------ accounts/utils/__init__.py | 14 ++++------ accounts/utils/confirmation.py | 6 ++-- accounts/utils/console.py | 1 - accounts/utils/login.py | 17 ++++++------ accounts/utils/sessions.py | 12 +++++--- accounts/views/admin/__init__.py | 23 +++++++-------- accounts/views/default/__init__.py | 56 ++++++++++++++++++++----------------- accounts/views/login/__init__.py | 26 +++++++++-------- mypy.ini | 11 ++++++++ 19 files changed, 273 insertions(+), 184 deletions(-) create mode 100644 accounts/app.py create mode 100644 mypy.ini diff --git a/accounts/__init__.py b/accounts/__init__.py index 48cd252..c9310fe 100644 --- a/accounts/__init__.py +++ b/accounts/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- import os -from flask import Flask, g, session +from flask import Flask from .models import Service from .utils import get_backend @@ -8,9 +8,12 @@ from .utils.confirmation import Confirmation from .utils.sessions import EncryptedSessionInterface from .utils.login import create_login_manager from .views import default, login, admin +from .app import AccountsFlask +from typing import Optional -def absolute_paths(app, config): + +def absolute_paths(app: Flask, config: str) -> None: def handle_option(dirname, name): if app.config.get(name): app.config[name] = os.path.join(dirname, app.config[name]) @@ -19,15 +22,15 @@ def absolute_paths(app, config): handle_option(dirname, 'USERNAME_BLACKLIST_FILE') -def load_config(app, configfile): +def load_config(app: Flask, configfile: Optional[str]) -> None: if configfile is not None: filename = os.path.abspath(configfile) app.config.from_pyfile(filename) absolute_paths(app, filename) -def create_app(config=None): - app = Flask(__name__) +def create_app(config=None) -> Flask: + app = AccountsFlask(__name__) app.config.from_object('accounts.default_settings') load_config(app, os.environ.get('SPLINE_ACCOUNT_WEB_SETTINGS')) load_config(app, config) diff --git a/accounts/app.py b/accounts/app.py new file mode 100644 index 0000000..eaab824 --- /dev/null +++ b/accounts/app.py @@ -0,0 +1,16 @@ +from flask import Flask, current_app +from typing import TYPE_CHECKING, cast + +if TYPE_CHECKING: + from .backend import user, mail + from .models import Service + + +class AccountsFlask(Flask): + all_services: "list[Service]" + username_blacklist: list[str] + user_backend: "user.Backend" + mail_backend: "mail.Backend" + + +accounts_app: AccountsFlask = cast(AccountsFlask, current_app) diff --git a/accounts/backend/mail/__init__.py b/accounts/backend/mail/__init__.py index ecd0d45..f2ec5d5 100644 --- a/accounts/backend/mail/__init__.py +++ b/accounts/backend/mail/__init__.py @@ -1,18 +1,25 @@ # -*- coding: utf-8 -*- -class Backend(object): +from flask.app import Flask +from jinja2 import Template +from jinja2.environment import TemplateModule - def __init__(self, app): + +class Backend(): + app: Flask + + def __init__(self, app: Flask) -> None: self.app = app - def _send(self, recipient, content): + def _send(self, recipient: str, content: TemplateModule): raise NotImplementedError() - def send(self, recipient, template, **kwargs): + def send(self, recipient: str, template, **kwargs): if recipient is None: return - tmpl = self.app.jinja_env.get_or_select_template(template) + tmpl: Template = \ + self.app.jinja_env.get_or_select_template(template) kwargs['recipient'] = recipient module = tmpl.make_module(kwargs) diff --git a/accounts/backend/mail/dummy.py b/accounts/backend/mail/dummy.py index 61e7b75..3de3dbe 100644 --- a/accounts/backend/mail/dummy.py +++ b/accounts/backend/mail/dummy.py @@ -2,7 +2,8 @@ from . import Backend -from accounts.utils import ensure_utf8 +from accounts import AccountsFlask +from jinja2.environment import TemplateModule FANCY_FORMAT = '''\ @@ -23,8 +24,10 @@ From: {sender} class DummyBackend(Backend): + format: str - def __init__(self, app, plain=False, format=None): + def __init__(self, app: AccountsFlask, plain: bool = False, + format: None = None) -> None: super(DummyBackend, self).__init__(app) self.plain = plain @@ -36,13 +39,16 @@ class DummyBackend(Backend): else: self.format = format - def _send(self, recipient, content): - body = content.body() + def _send(self, recipient: str, content: TemplateModule): + print(type(content)) + + # we can't typecheck things defined in templates + body = content.body() # type: ignore if not self.plain: body = "\n| ".join(body.split("\n")) - print((self.format.format( - subject=ensure_utf8(content.subject()), - sender=ensure_utf8(content.sender()), - to=ensure_utf8(recipient), - body=ensure_utf8(body)))) + print(self.format.format( + subject=content.subject(), # type: ignore + sender=content.sender(), # type: ignore + to=recipient, + body=body)) diff --git a/accounts/backend/mail/sendmail.py b/accounts/backend/mail/sendmail.py index 5ac5d93..1fecedc 100644 --- a/accounts/backend/mail/sendmail.py +++ b/accounts/backend/mail/sendmail.py @@ -4,27 +4,30 @@ import subprocess from email.mime.text import MIMEText from email.utils import parseaddr +from jinja2.environment import TemplateModule from . import Backend -class SendmailBackend(Backend): +def safe(s: str): + return s.split('\n', 1)[0] - def _send(self, recipient, content): - safe = lambda s: s.split('\n', 1)[0] - msg = MIMEText(content.body(), _charset='utf-8') - msg['Subject'] = safe(content.subject()) +class SendmailBackend(Backend): + def _send(self, recipient: str, content: TemplateModule): + msg = MIMEText(content.body(), _charset='utf-8') # type: ignore + msg['Subject'] = safe(content.subject()) # type: ignore msg['To'] = safe(recipient) - msg['From'] = safe(content.sender()) + msg['From'] = safe(content.sender()) # type: ignore envelope = [] - _, address = parseaddr(safe(content.sender())) + _, address = parseaddr(safe(content.sender())) # type: ignore if address != '': envelope = ['-f', address] p = subprocess.Popen([self.app.config['SENDMAIL_COMMAND']] + envelope + ['-t'], stdin=subprocess.PIPE) + assert p.stdin p.stdin.write(msg.as_string().encode("utf-8")) p.stdin.close() diff --git a/accounts/backend/user/__init__.py b/accounts/backend/user/__init__.py index e72302a..1504e41 100644 --- a/accounts/backend/user/__init__.py +++ b/accounts/backend/user/__init__.py @@ -1,5 +1,8 @@ # -*- coding: utf-8 -*- +from accounts.app import AccountsFlask +from accounts.models import Account + class NoSuchUserError(ValueError): pass @@ -49,7 +52,9 @@ class Backend(object): >> backend.find_by_uid('test*', wildcard=True) # find with wildcards """ - def __init__(self, app): + app: AccountsFlask + + def __init__(self, app: AccountsFlask) -> None: self.app = app #: Exception type, that is raised if no matching user was found. @@ -60,14 +65,14 @@ class Backend(object): #: could also be raised, if you want to change user information. self.InvalidPasswordError = InvalidPasswordError - def auth(self, username, password): + def auth(self, username: str, password: str): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. """ raise NotImplementedError() - def get_by_uid(self, uid): + def get_by_uid(self, uid: str) -> Account: """ Find a single user by uid. Unlike find_by_uid, don't return a list but raise NoSuchUserError if there is no such user. @@ -80,7 +85,7 @@ class Backend(object): return users[0] - def get_by_mail(self, mail): + def get_by_mail(self, mail: str) -> Account: """ Find a single user by mail. Unlike find_by_mail, don't return a list but raise NoSuchUserError if there is no such user. @@ -93,19 +98,19 @@ class Backend(object): return users[0] - def find_by_uid(self, uid, wildcard=False): + def find_by_uid(self, uid: str, wildcard=False) -> list[Account]: return self.find({'uid': uid}, wildcard) - def find_by_mail(self, mail, wildcard=False): + def find_by_mail(self, mail: str, wildcard=False) -> list[Account]: return self.find({'mail': mail}, wildcard) - def find(self, filters=None, wildcard=False): + def find(self, filters=None, wildcard=False) -> list[Account]: """ Find accounts by a given filter. """ raise NotImplementedError() - def register(self, account): + def register(self, account: Account): """ Register a new user account. @@ -118,7 +123,7 @@ class Backend(object): account.uidNumber = self._get_next_uidNumber() self._store(account) - def update(self, account, as_admin=False): + def update(self, account: Account, as_admin=False): """ Updates account information like passwords or email. """ diff --git a/accounts/backend/user/dummy.py b/accounts/backend/user/dummy.py index 8c54dac..3d0dcca 100644 --- a/accounts/backend/user/dummy.py +++ b/accounts/backend/user/dummy.py @@ -2,9 +2,13 @@ from fnmatch import fnmatch from . import Backend from accounts.models import Account +from accounts import AccountsFlask +from typing import Optional -def _match_filter(account, filters, wildcard): + +def _match_filter(account: Account, filters: Optional[dict[str, str]], + wildcard: bool): if filters is None: return True @@ -30,7 +34,7 @@ class DummyBackend(Backend): users (test and test2) are created. """ - def __init__(self, app): + def __init__(self, app: AccountsFlask) -> None: super(DummyBackend, self).__init__(app) self._storage = { @@ -65,7 +69,7 @@ class DummyBackend(Backend): ) return accounts - def auth(self, username, password): + def auth(self, username: str, password: str): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. @@ -76,30 +80,31 @@ class DummyBackend(Backend): acc.password = password return acc - def find(self, filters=None, wildcard=False): + def find(self, filters: Optional[dict[str, str]] = None, + wildcard=False) -> list[Account]: """ Find accounts by a given filter. """ return [acc for acc in self._get_accounts() if _match_filter(acc, filters, wildcard)] - def _store(self, account): + def _store(self, account: Account) -> None: self._storage[account.uid] = { "uidNumber": account.uidNumber, "mail": account.mail, "password": account.password } - def _verify_password(self, account, password): + def _verify_password(self, account: Account, password: Optional[str]): return password == self._storage[account.uid]["password"] - def _alter_password(self, account, password): + def _alter_password(self, account: Account, password: Optional[str]): self._storage[account.uid]["password"] = password - def update(self, account, as_admin=False): + def update(self, account: Account, as_admin=False): """ Updates account information like passwords or email. """ - stored_account = self.get_by_uid(account.uid) + stored_account: Account = self.get_by_uid(account.uid) if not as_admin: if not self._verify_password(stored_account, account.password): raise self.InvalidPasswordError("Invalid password") @@ -109,16 +114,16 @@ class DummyBackend(Backend): if self._verify_password(stored_account, old): self._alter_password(stored_account, new) - def delete(self, account, as_admin=False): + def delete(self, account: Account, as_admin=False): """ Deletes an account permanently. """ - stored_account = self.get_by_uid(account.uid) + stored_account: Account = self.get_by_uid(account.uid) if not as_admin: if stored_account.password != account.password: raise self.InvalidPasswordError("Invalid password") - self._storage = [acc for acc in self._storage if acc.uid != account.uid] + self._storage.pop(stored_account.uid) def _get_next_uidNumber(self): value = self._next_uidNumber diff --git a/accounts/backend/user/ldap.py b/accounts/backend/user/ldap.py index 64e29d4..217dcba 100644 --- a/accounts/backend/user/ldap.py +++ b/accounts/backend/user/ldap.py @@ -6,12 +6,17 @@ from ldap3.utils.conv import escape_filter_chars from ldap3.utils.dn import escape_rdn from ldap3.core.exceptions import LDAPInvalidCredentialsResult, LDAPException, LDAPOperationResult from ldap3.core.results import RESULT_SUCCESS +from ldap3.abstract.entry import Entry +from ldap3 import Connection from . import Backend, InvalidPasswordError, NoSuchUserError, ShouldNotHappen -from accounts.models import Account +from accounts.models import Account, Service +from accounts import AccountsFlask +from typing import Optional -def _escape(value, wildcard=False): + +def _escape(value: str, wildcard=False): if not isinstance(value, str): value = str(value) if not wildcard: @@ -19,7 +24,9 @@ def _escape(value, wildcard=False): return escape_rdn(value) -def _change_password(conn, dn, passwords, as_admin=False): +def _change_password(conn: Connection, dn, + passwords: tuple[Optional[str], Optional[str]], + as_admin=False): old_password, new_password = passwords if as_admin: conn.extend.standard.modify_password(dn, None, new_password) @@ -32,26 +39,25 @@ def _change_password(conn, dn, passwords, as_admin=False): class LdapBackend(Backend): - - def __init__(self, app): + def __init__(self, app: AccountsFlask) -> None: super(LdapBackend, self).__init__(app) - self.host = self.app.config['LDAP_HOST'] - self.base_dn = self.app.config['LDAP_BASE_DN'] - self.admin_user = self.app.config['LDAP_ADMIN_USER'] - self.admin_pass = self.app.config['LDAP_ADMIN_PASS'] - self.services = self.app.all_services + self.host: str = self.app.config['LDAP_HOST'] + self.base_dn: list[tuple[str, str]] = self.app.config['LDAP_BASE_DN'] + self.admin_user: str = self.app.config['LDAP_ADMIN_USER'] + self.admin_pass: str = self.app.config['LDAP_ADMIN_PASS'] + self.services: list[Service] = self.app.all_services self.admin = False self.binded = False - def auth(self, username, password): + def auth(self, username: str, password: str): """ Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. """ - user_dn = self._format_dn([('uid', username), ('ou', 'users')]) - conn = self._connect(user_dn, password) + user_dn: str = self._format_dn([('uid', username), ('ou', 'users')]) + conn: Connection = self._connect(user_dn, password) uid = None mail = None @@ -59,6 +65,7 @@ class LdapBackend(Backend): services = [] conn.search(user_dn, '(objectClass=*)', attributes=['objectClass', 'uid', 'mail', 'cn', 'uidNumber']) + entry: Entry for entry in conn.entries: if 'splineAccount' in entry.objectClass.values: uid = entry.uid.value @@ -73,7 +80,7 @@ class LdapBackend(Backend): return Account(uid, mail, services, password, uidNumber=uidNumber) - def find(self, filters=None, wildcard=False): + def find(self, filters: Optional[dict[str, str]] = None, wildcard=False): """ Find accounts by a given filter. """ @@ -88,7 +95,7 @@ class LdapBackend(Backend): conn = self._connect() base_dn = self._format_dn([('ou', 'users')]) - accounts = [] + accounts: list[Account] = [] try: conn.search(base_dn, filterstr, search_scope=ldap3.LEVEL, attributes=['uid', 'mail', 'uidNumber']) @@ -100,7 +107,7 @@ class LdapBackend(Backend): return accounts - def _store(self, account): + def _store(self, account: Account): conn = self._connect_as_admin() user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) @@ -117,7 +124,7 @@ class LdapBackend(Backend): account.new_password_root = (None, account.password) self._alter_passwords(conn, account) - def update(self, account, as_admin=False): + def update(self, account: Account, as_admin=False): """ Updates account informations like passwords or email. """ @@ -133,7 +140,7 @@ class LdapBackend(Backend): conn.modify(user_dn, attrs) self._alter_passwords(conn, account, as_admin=as_admin) - def delete(self, account, as_admin=False): + def delete(self, account: Account, as_admin=False): """ Deletes an account permanently. """ @@ -143,14 +150,14 @@ class LdapBackend(Backend): else: conn = self._connect(account.uid, account.password) - dns = [[('cn', service.id), ('uid', account.uid), ('ou', 'users')] + dns = [[('cn', service), ('uid', account.uid), ('ou', 'users')] for service in account.services] + \ [[('uid', account.uid), ('ou', 'users')]] for dn in dns: conn.delete(self._format_dn(dn)) - def _format_dn(self, attr, with_base_dn=True): + def _format_dn(self, attr: list[tuple[str, str]], with_base_dn=True) -> str: if with_base_dn: attr.extend(self.base_dn) @@ -159,7 +166,8 @@ class LdapBackend(Backend): return ','.join(dn) - def _connect(self, user=None, password=None): + def _connect(self, user: Optional[str] = None, + password: Optional[str] = None): server = ldap3.Server(self.host) conn = ldap3.Connection(server, user, password, raise_exceptions=True) @@ -174,7 +182,8 @@ class LdapBackend(Backend): admin_dn = self._format_dn([('cn', self.admin_user)]) return self._connect(admin_dn, self.admin_pass) - def _alter_passwords(self, conn, account, as_admin=False): + def _alter_passwords(self, conn: Connection, account: Account, + as_admin=False): if account.new_password_root: user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) _change_password(conn, user_dn, account.new_password_root, as_admin) @@ -187,7 +196,7 @@ class LdapBackend(Backend): ('ou', 'users')]) _, new = passwords - if new != None: + if new is not None: if service_id not in account.services: attrs = { 'objectClass': ['top', 'servicePassword'], @@ -204,7 +213,7 @@ class LdapBackend(Backend): del account.new_password_services[service] - def _get_last_uidNumber(self, conn): + def _get_last_uidNumber(self, conn: Connection): uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')]) conn.search(uidNumber_dn, '(objectClass=uidNumberMaximum)', attributes=['uidNumber']) diff --git a/accounts/forms.py b/accounts/forms.py index ad48ba9..a85f382 100644 --- a/accounts/forms.py +++ b/accounts/forms.py @@ -1,12 +1,13 @@ # -*- coding: utf-8 -*- import re -from flask import current_app, Markup, url_for +from flask import Markup, url_for from flask_wtf import FlaskForm as Form from flask_login import current_user -from wtforms import StringField, PasswordField, ValidationError, BooleanField,\ - validators +from wtforms import StringField, PasswordField, ValidationError, \ + BooleanField, validators from wtforms.form import FormMeta from .utils import NotRegexp +from accounts.app import accounts_app USERNAME_RE = re.compile(r'^[a-zA-Z][a-zA-Z0-9-]{1,15}$') @@ -25,10 +26,10 @@ class RegisterForm(Form): def validate_username(self, field): try: - current_app.user_backend.get_by_uid(field.data) - except current_app.user_backend.NoSuchUserError: - if current_app.username_blacklist: - if field.data.lower() in current_app.username_blacklist: + accounts_app.user_backend.get_by_uid(field.data) + except accounts_app.user_backend.NoSuchUserError: + if accounts_app.username_blacklist: + if field.data.lower() in accounts_app.username_blacklist: raise ValidationError(Markup( 'Dieser Benutzername ist momentan nicht erlaubt. ' 'Weitere Informationen' @@ -38,8 +39,8 @@ class RegisterForm(Form): def validate_mail(self, field): try: - current_app.user_backend.get_by_mail(field.data) - except current_app.user_backend.NoSuchUserError: + accounts_app.user_backend.get_by_mail(field.data) + except accounts_app.user_backend.NoSuchUserError: pass else: raise ValidationError(Markup( @@ -48,11 +49,12 @@ class RegisterForm(Form): 'die Passwort-vergessen-Funktion benutzen.' % url_for('default.lost_password'))) + class AdminCreateAccountForm(RegisterForm): def validate_username(self, field): try: - current_app.user_backend.get_by_uid(field.data) - except current_app.user_backend.NoSuchUserError: + accounts_app.user_backend.get_by_uid(field.data) + except accounts_app.user_backend.NoSuchUserError: return else: raise ValidationError('Dieser Benutzername ist schon vergeben') @@ -74,19 +76,19 @@ class LostPasswordForm(Form): def validate_username_or_mail(self, field): if '@' not in field.data: try: - self.user = current_app.user_backend.get_by_uid(field.data) - except current_app.user_backend.NoSuchUserError: + self.user = accounts_app.user_backend.get_by_uid(field.data) + except accounts_app.user_backend.NoSuchUserError: raise ValidationError('Es gibt keinen Benutzer mit diesem Namen.') else: try: - self.user = current_app.user_backend.get_by_mail(field.data) - except current_app.user_backend.NoSuchUserError: + self.user = accounts_app.user_backend.get_by_mail(field.data) + except accounts_app.user_backend.NoSuchUserError: raise ValidationError('Es gibt keinen Benutzer mit dieser Adresse.') class SettingsMeta(FormMeta): def __call__(cls, *args, **kwargs): - for service in current_app.all_services: + for service in accounts_app.all_services: setattr(cls, 'password_%s' % service.id, PasswordField( 'Passwort für %s' % service.name, [ validators.Optional(), @@ -117,18 +119,18 @@ class SettingsForm(Form, metaclass=SettingsMeta): raise ValidationError('Altes Passwort ist falsch.') def validate_mail(self, field): - results = current_app.user_backend.find_by_mail(field.data) + results = accounts_app.user_backend.find_by_mail(field.data) for user in results: if user.uid != current_user.uid: raise ValidationError('Diese E-Mail-Adresse wird schon von einem anderen Account benutzt!') - def get_servicepassword(self, service_id): + def get_servicepassword(self, service_id: str): return getattr(self, 'password_%s' % service_id) - def get_servicepasswordconfirm(self, service_id): + def get_servicepasswordconfirm(self, service_id: str): return getattr(self, 'password_confirm_%s' % service_id) - def get_servicedelete(self, service_id): + def get_servicedelete(self, service_id: str): return getattr(self, 'delete_%s' % service_id) @@ -138,6 +140,6 @@ class AdminDisableAccountForm(Form): def validate_username(self, field): try: - self.user = current_app.user_backend.get_by_uid(field.data) - except current_app.user_backend.NoSuchUserError: + self.user = accounts_app.user_backend.get_by_uid(field.data) + except accounts_app.user_backend.NoSuchUserError: raise ValidationError('Dieser Benutzername existiert nicht') diff --git a/accounts/models.py b/accounts/models.py index 72c061d..fe6c500 100644 --- a/accounts/models.py +++ b/accounts/models.py @@ -1,8 +1,27 @@ from flask_login import UserMixin +from typing import Any, Optional + from accounts.utils.login import create_userid +class Service(object): + id: str + name: str + url: str + + def __init__(self, service_id: str, name: str, url: str) -> None: + self.id = service_id + self.name = name + self.url = url + + #: Wether the user has a separate password for this service. + self.changed = False + + def __repr__(self): + return '' % self.id + + class Account(UserMixin): """ An Account represents a complex ldap tree entry for spline users. @@ -10,7 +29,15 @@ class Account(UserMixin): """ _ready = False - def __init__(self, uid, mail, services=None, password=None, uidNumber=None): + uid: str + services: list[str] + password: Optional[str] + new_password_services: dict[str, tuple[Optional[str], Optional[str]]] + attributes: dict[Any, Any] + new_password_root: Optional[tuple[Optional[str], Optional[str]]] + + def __init__(self, uid: str, mail: str, services=None, + password: Optional[str] = None, uidNumber=None): self.uid = uid self.services = list() if services is None else services self.password = password @@ -25,10 +52,11 @@ class Account(UserMixin): def __repr__(self): return "" % self.uid - def reset_password(self, service): + def reset_password(self, service: str): self.new_password_services[service] = (None, None) - def change_password(self, new_password, old_password='', service=None): + def change_password(self, new_password: str, old_password='', + service=None): """ Changes a password for a given service. You have to use the UserBackend to make the changes permanent. If no service is given, @@ -43,7 +71,7 @@ class Account(UserMixin): def _set_attribute(self, key, value): self.attributes[key] = value - def change_email(self, new_mail): + def change_email(self, new_mail: str): """ Changes the mail address of an account. You have to use the AccountService class to make changes permanent. @@ -69,16 +97,3 @@ class Account(UserMixin): the cookie and used to identify the user. """ return create_userid(self.uid, self.password) - - -class Service(object): - def __init__(self, service_id, name, url): - self.id = service_id - self.name = name - self.url = url - - #: Wether the user has a separate password for this service. - self.changed = False - - def __repr__(self): - return '' % self.id diff --git a/accounts/utils/__init__.py b/accounts/utils/__init__.py index 8d49363..6adf317 100644 --- a/accounts/utils/__init__.py +++ b/accounts/utils/__init__.py @@ -1,12 +1,14 @@ # -*- coding: utf-8 -*- import importlib from functools import wraps -from flask import render_template, request +from flask import render_template, request, Flask from wtforms.validators import Regexp, ValidationError +from typing import Optional + # using http://flask.pocoo.org/docs/patterns/viewdecorators/ -def templated(template=None): +def templated(template: Optional[str] = None): def templated_(f): @wraps(f) def templated__(*args, **kwargs): @@ -24,12 +26,6 @@ def templated(template=None): return templated_ -def ensure_utf8(s): - if isinstance(s, str): - s = s.encode('utf8') - return s - - class NotRegexp(Regexp): """ Like wtforms.validators.Regexp, but rejects data that DOES match the regex. @@ -42,7 +38,7 @@ class NotRegexp(Regexp): raise ValidationError(self.message) -def get_backend(path, app): +def get_backend(path: str, app: Flask): module = path.rsplit(".", 1).pop() class_name = '%sBackend' % module.title() backend_class = getattr(importlib.import_module(path), class_name) diff --git a/accounts/utils/confirmation.py b/accounts/utils/confirmation.py index 79ac7dc..b75716d 100644 --- a/accounts/utils/confirmation.py +++ b/accounts/utils/confirmation.py @@ -1,14 +1,14 @@ # -*- coding: utf-8 -*- -from flask import current_app from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer from werkzeug.exceptions import Forbidden +from accounts.app import accounts_app class Confirmation(URLSafeTimedSerializer): - def __init__(self, realm, key=None, **kwargs): + def __init__(self, realm: str, key=None, **kwargs): if key is None: - key = current_app.config['SECRET_KEY'] + key = accounts_app.config['SECRET_KEY'] super(Confirmation, self).__init__(key, salt=realm, **kwargs) def loads_http(self, s, max_age=None, return_timestamp=False, salt=None): diff --git a/accounts/utils/console.py b/accounts/utils/console.py index eddc25e..ef8b4fd 100644 --- a/accounts/utils/console.py +++ b/accounts/utils/console.py @@ -48,7 +48,6 @@ class TablePrinter(object): in zip(list(zip(*rows)), self.widths)] self._update_format_string() - def _update_format_string(self): sep = ' %s ' % self.separator self.format_string = '%s %s %s' % ( diff --git a/accounts/utils/login.py b/accounts/utils/login.py index bd661b2..0cd1dc4 100644 --- a/accounts/utils/login.py +++ b/accounts/utils/login.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- -from flask import current_app from flask_login import LoginManager, current_user from functools import wraps from werkzeug.exceptions import Forbidden from itsdangerous import base64_decode, base64_encode import json +import flask_login.login_manager +from accounts.app import accounts_app class _compact_json: @@ -19,29 +20,29 @@ class _compact_json: return json.dumps(obj, **kwargs) -def create_login_manager(): +def create_login_manager() -> flask_login.login_manager.LoginManager: login_manager = LoginManager() login_manager.login_message = 'Bitte einloggen' login_manager.login_view = 'login.login' @login_manager.user_loader - def load_user(user_id): + def load_user(user_id: str): try: username, password = parse_userid(user_id) - return current_app.user_backend.auth(username, password) - except (current_app.user_backend.NoSuchUserError, - current_app.user_backend.InvalidPasswordError): + return accounts_app.user_backend.auth(username, password) + except (accounts_app.user_backend.NoSuchUserError, + accounts_app.user_backend.InvalidPasswordError): return None return login_manager -def create_userid(username, password): +def create_userid(username: str, password: str): userid = (username, password) return base64_encode(_compact_json.dumps(userid)) -def parse_userid(value): +def parse_userid(value: str): return _compact_json.loads(base64_decode(value)) diff --git a/accounts/utils/sessions.py b/accounts/utils/sessions.py index 5d01b5d..e156f0c 100644 --- a/accounts/utils/sessions.py +++ b/accounts/utils/sessions.py @@ -3,9 +3,12 @@ from Crypto import Random from Crypto.Cipher import AES -from flask import current_app +from flask import Flask from flask.sessions import TaggedJSONSerializer, SecureCookieSessionInterface from itsdangerous import BadPayload +from accounts.app import accounts_app + +from typing import cast def _pad(value, block_size): @@ -25,7 +28,7 @@ class EncryptedSerializer(TaggedJSONSerializer): self.block_size = AES.block_size def _cipher(self, iv): - key = current_app.config['SESSION_ENCRYPTION_KEY'] + key = accounts_app.config['SESSION_ENCRYPTION_KEY'] assert len(key) == 32 return AES.new(key, AES.MODE_CBC, iv) @@ -53,11 +56,12 @@ class EncryptedSerializer(TaggedJSONSerializer): class EncryptedSessionInterface(SecureCookieSessionInterface): serializer = EncryptedSerializer() - def open_session(self, app, request): + def open_session(self, app: Flask, request): session = None try: parent = super(EncryptedSessionInterface, self) - session = parent.open_session(app, request) + session = cast(EncryptedSessionInterface, parent) \ + .open_session(app, request) except BadPayload: session = self.session_class() diff --git a/accounts/views/admin/__init__.py b/accounts/views/admin/__init__.py index 35fda58..7378e38 100644 --- a/accounts/views/admin/__init__.py +++ b/accounts/views/admin/__init__.py @@ -2,13 +2,14 @@ from flask import Blueprint -from flask import current_app, redirect, request, g, flash, url_for +from flask import redirect, request, flash, url_for from flask_login import current_user from uuid import uuid4 from werkzeug.exceptions import Forbidden from accounts.utils import templated from accounts.forms import AdminCreateAccountForm, AdminDisableAccountForm +from accounts.app import accounts_app bp = Blueprint('admin', __name__) @@ -17,8 +18,8 @@ bp = Blueprint('admin', __name__) @bp.before_request def restrict_bp_to_admins(): if not current_user.is_authenticated: - return current_app.login_manager.unauthorized() - if current_user.uid not in current_app.config.get('ADMIN_USERS', []): + return accounts_app.login_manager.unauthorized() + if current_user.uid not in accounts_app.config.get('ADMIN_USERS', []): raise Forbidden('Du bist kein Admin.') @@ -33,8 +34,8 @@ def index(): def create_account(): form = AdminCreateAccountForm() if form.validate_on_submit(): - current_app.mail_backend.send(form.mail.data, 'mail/register.txt', - username=form.username.data) + accounts_app.mail_backend.send(form.mail.data, 'mail/register.txt', + username=form.username.data) flash('Mail versandt.', 'success') return redirect(url_for('admin.index')) @@ -45,7 +46,7 @@ def create_account(): @bp.route('/view_blacklist/') @templated('admin/view_blacklist.html') def view_blacklist(start=''): - entries = current_app.username_blacklist + entries = accounts_app.username_blacklist if start: entries = [e for e in entries if e.startswith(start)] @@ -68,20 +69,20 @@ def disable_account(): if form.validate_on_submit(): random_pw = str(uuid4()) form.user.change_password(random_pw) - for service in current_app.all_services: + for service in accounts_app.all_services: form.user.reset_password(service.id) oldmail = form.user.mail - mail = current_app.config['DISABLED_ACCOUNT_MAILADDRESS_TEMPLATE'] % form.user.uid + mail = accounts_app.config['DISABLED_ACCOUNT_MAILADDRESS_TEMPLATE'] % form.user.uid form.user.change_email(mail) - current_app.user_backend.update(form.user, as_admin=True) + accounts_app.user_backend.update(form.user, as_admin=True) flash('Passwort auf ein zufälliges und Mailadresse auf %s ' 'gesetzt.' % mail, 'success') - current_app.mail_backend.send( - current_app.config['MAIL_REGISTER_NOTIFY'], + accounts_app.mail_backend.send( + accounts_app.config['MAIL_REGISTER_NOTIFY'], 'mail/disable_notify.txt', username=form.user.uid, mail=oldmail, admin=current_user.uid) diff --git a/accounts/views/default/__init__.py b/accounts/views/default/__init__.py index 1854c46..0b7065d 100644 --- a/accounts/views/default/__init__.py +++ b/accounts/views/default/__init__.py @@ -3,10 +3,11 @@ from copy import deepcopy from flask import Blueprint -from flask import current_app, redirect, render_template, request, g, \ +from flask import redirect, render_template, request, \ flash, url_for -from flask_login import login_required, login_user, logout_user, current_user +from flask_login import login_required, login_user, current_user from werkzeug.exceptions import Forbidden +from werkzeug import Response from accounts.forms import RegisterForm, RegisterCompleteForm, \ LostPasswordForm, SettingsForm @@ -14,6 +15,9 @@ from accounts.utils import templated from accounts.utils.confirmation import Confirmation from accounts.utils.login import logout_required from accounts.models import Account +from accounts.app import accounts_app + +from typing import Union bp = Blueprint('default', __name__) @@ -22,11 +26,11 @@ bp = Blueprint('default', __name__) @bp.route('/register', methods=['GET', 'POST']) @templated('register.html') @logout_required -def register(): +def register() -> Union[dict, Response]: form = RegisterForm() if form.validate_on_submit(): - current_app.mail_backend.send(form.mail.data, 'mail/register.txt', - username=form.username.data) + accounts_app.mail_backend.send(form.mail.data, 'mail/register.txt', + username=form.username.data) flash('Es wurde eine E-Mail an die angegebene Adresse geschickt, ' 'um diese zu überprüfen. Bitte folge den Anweisungen in der ' @@ -40,14 +44,14 @@ def register(): @bp.route('/register/', methods=['GET', 'POST']) @templated('register_complete.html') @logout_required -def register_complete(token): +def register_complete(token: str): #TODO: check for double uids and mail username, mail = Confirmation('register').loads_http(token, max_age=3*24*60*60) try: - current_app.user_backend.get_by_uid(username) - current_app.user_backend.get_by_mail(mail) - except current_app.user_backend.NoSuchUserError: + accounts_app.user_backend.get_by_uid(username) + accounts_app.user_backend.get_by_mail(mail) + except accounts_app.user_backend.NoSuchUserError: pass else: flash('Du hast den Benutzer bereits angelegt! Du kannst dich jetzt einfach einloggen:') @@ -56,11 +60,11 @@ def register_complete(token): form = RegisterCompleteForm() if form.validate_on_submit(): user = Account(username, mail, password=form.password.data) - current_app.user_backend.register(user) + accounts_app.user_backend.register(user) login_user(user) - current_app.mail_backend.send( - current_app.config['MAIL_REGISTER_NOTIFY'], + accounts_app.mail_backend.send( + accounts_app.config['MAIL_REGISTER_NOTIFY'], 'mail/register_notify.txt', username=username, mail=mail) @@ -83,7 +87,7 @@ def lost_password(): if form.validate_on_submit(): #TODO: make the link only usable once (e.g include a hash of the old pw) # atm the only thing we do is make the link valid for only little time - current_app.mail_backend.send( + accounts_app.mail_backend.send( form.user.mail, 'mail/lost_password.txt', username=form.user.uid) flash('Wir haben dir eine E-Mail mit einem Link zum Passwort ändern ' @@ -97,14 +101,14 @@ def lost_password(): @bp.route('/lost_password/', methods=['GET', 'POST']) @templated('lost_password_complete.html') @logout_required -def lost_password_complete(token): +def lost_password_complete(token: str): (username,) = Confirmation('lost_password').loads_http(token, max_age=4*60*60) form = RegisterCompleteForm() if form.validate_on_submit(): - user = current_app.user_backend.get_by_uid(username) + user = accounts_app.user_backend.get_by_uid(username) user.change_password(form.password.data) - current_app.user_backend.update(user, as_admin=True) + accounts_app.user_backend.update(user, as_admin=True) login_user(user) flash('Passwort geändert.', 'success') @@ -120,13 +124,13 @@ def lost_password_complete(token): @bp.route('/', methods=['GET', 'POST']) @templated('index.html') @login_required -def index(): +def index() -> Union[Response, dict]: form = SettingsForm(mail=current_user.mail) if form.validate_on_submit(): changed = False if request.form.get('submit_services'): - for service in current_app.all_services: + for service in accounts_app.all_services: field = form.get_servicedelete(service.id) if field.data: current_user.reset_password(service.id) @@ -134,7 +138,7 @@ def index(): elif request.form.get('submit_main'): if form.mail.data and form.mail.data != current_user.mail: - current_app.mail_backend.send( + accounts_app.mail_backend.send( form.mail.data, 'mail/change_mail.txt', username=current_user.uid) @@ -148,21 +152,21 @@ def index(): flash('Passwort geändert', 'success') changed = True - for service in current_app.all_services: + for service in accounts_app.all_services: field = form.get_servicepassword(service.id) if field.data: changed = True current_user.change_password(field.data, None, service.id) if changed: - current_app.user_backend.update(current_user) + accounts_app.user_backend.update(current_user) login_user(current_user) return redirect(url_for('.index')) else: flash('Nichts geändert.') - services = deepcopy(current_app.all_services) + services = deepcopy(accounts_app.all_services) for s in services: s.changed = s.id in current_user.services @@ -174,19 +178,19 @@ def index(): @bp.route('/change_mail/') @login_required -def change_mail(token): +def change_mail(token: str): username, mail = Confirmation('change_mail').loads_http(token, max_age=3*24*60*60) if current_user.uid != username: raise Forbidden('Bitte logge dich als der Benutzer ein, dessen E-Mail-Adresse du ändern willst.') - results = current_app.user_backend.find_by_mail(mail) + results = accounts_app.user_backend.find_by_mail(mail) for user in results: if user.uid != current_user.uid: raise Forbidden('Diese E-Mail-Adresse wird schon von einem anderen account benutzt!') current_user.change_email(mail) - current_app.user_backend.update(current_user) + accounts_app.user_backend.update(current_user) flash('E-Mail-Adresse geändert.', 'success') return redirect(url_for('.index')) @@ -196,7 +200,7 @@ def change_mail(token): @templated('about.html') def about(): return { - 'app': current_app, + 'app': accounts_app, } diff --git a/accounts/views/login/__init__.py b/accounts/views/login/__init__.py index 730b3ed..ee049bf 100644 --- a/accounts/views/login/__init__.py +++ b/accounts/views/login/__init__.py @@ -2,17 +2,21 @@ from flask import Blueprint -from flask import current_app, redirect, request, g, flash, render_template, url_for +from flask import redirect, request, flash, render_template, url_for from flask_login import login_user, logout_user, current_user from urllib.parse import urljoin, urlparse +from werkzeug import Response -from .forms import LoginForm +from accounts.app import accounts_app + +from typing import Union +from .forms import LoginForm bp = Blueprint('login', __name__) -def is_safe_url(target): +def is_safe_url(target: str): ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) print(target) @@ -23,24 +27,22 @@ def is_safe_url(target): @bp.route('/login', methods=['GET', 'POST']) -def login(): +def login() -> Union[str, Response]: if current_user.is_authenticated: return redirect(url_for('default.index')) form = LoginForm(request.form) if form.validate_on_submit(): try: - user = current_app.user_backend.auth(form.username.data, - form.password.data) + user = accounts_app.user_backend.auth(form.username.data, + form.password.data) login_user(user) flash('Erfolgreich eingeloggt', 'success') next = request.form['next'] - if not is_safe_url(next): - next = None - return redirect(next or url_for('default.index')) - except (current_app.user_backend.NoSuchUserError, - current_app.user_backend.InvalidPasswordError): + return redirect(next if is_safe_url(next) else url_for('default.index')) + except (accounts_app.user_backend.NoSuchUserError, + accounts_app.user_backend.InvalidPasswordError): flash('Ungültiger Benutzername und/oder Passwort', 'error') return render_template("login/login.html", form=form, @@ -48,7 +50,7 @@ def login(): @bp.route('/logout') -def logout(): +def logout() -> Response: logout_user() flash('Erfolgreich ausgeloggt.', 'success') return redirect(url_for('.login')) diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..95a93c1 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,11 @@ +[mypy-flask_login.*] +ignore_missing_imports = True + +[mypy-wtforms.*] +ignore_missing_imports = True + +[mypy-flask_wtf.*] +ignore_missing_imports = True + +[mypy-flask_script.*] +ignore_missing_imports = True -- cgit v1.2.3-1-g7c22