From 39230732099298b7f56c60b396949d8c0484e4dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonah=20Br=C3=BCchert?= Date: Fri, 29 Mar 2024 02:20:41 +0100 Subject: Format project To allow automatically formatting changes in the future. Command used: black -l 79 accounts/ --- accounts/__init__.py | 21 ++--- accounts/backend/mail/__init__.py | 7 +- accounts/backend/mail/dummy.py | 26 +++--- accounts/backend/mail/sendmail.py | 22 ++--- accounts/backend/user/__init__.py | 12 +-- accounts/backend/user/dummy.py | 30 ++++--- accounts/backend/user/ldap.py | 174 +++++++++++++++++++++-------------- accounts/default_settings.py | 40 +++++---- accounts/forms.py | 179 ++++++++++++++++++++++++++----------- accounts/models.py | 28 ++++-- accounts/utils/__init__.py | 13 +-- accounts/utils/confirmation.py | 17 ++-- accounts/utils/console.py | 47 ++++++---- accounts/utils/login.py | 16 ++-- accounts/utils/sessions.py | 21 ++--- accounts/views/admin/__init__.py | 72 ++++++++------- accounts/views/default/__init__.py | 175 +++++++++++++++++++++--------------- accounts/views/login/__init__.py | 46 ++++++---- accounts/views/login/forms.py | 4 +- 19 files changed, 578 insertions(+), 372 deletions(-) diff --git a/accounts/__init__.py b/accounts/__init__.py index 883d45c..c290c3a 100644 --- a/accounts/__init__.py +++ b/accounts/__init__.py @@ -19,7 +19,7 @@ def absolute_paths(app: Flask, config: str) -> None: app.config[name] = os.path.join(dirname, app.config[name]) dirname = os.path.dirname(config) - handle_option(dirname, 'USERNAME_BLACKLIST_FILE') + handle_option(dirname, "USERNAME_BLACKLIST_FILE") def load_config(app: Flask, configfile: Optional[str]) -> None: @@ -31,32 +31,33 @@ def load_config(app: Flask, configfile: Optional[str]) -> None: def create_app(config=None) -> Flask: app = AccountsFlask(__name__) - app.config.from_object('accounts.default_settings') - load_config(app, os.environ.get('SPLINE_ACCOUNT_WEB_SETTINGS')) + app.config.from_object("accounts.default_settings") + load_config(app, os.environ.get("SPLINE_ACCOUNT_WEB_SETTINGS")) load_config(app, config) app.register_blueprint(default.bp) app.register_blueprint(login.bp) - app.register_blueprint(admin.bp, url_prefix='/admin') + app.register_blueprint(admin.bp, url_prefix="/admin") app.session_interface = EncryptedSessionInterface() app.all_services = list() - for (name, url) in app.config.get('SERVICES', list()): + for name, url in app.config.get("SERVICES", list()): cn = name.lower() app.all_services.append(Service(cn, name, url)) app.username_blacklist = list() - if app.config.get('USERNAME_BLACKLIST_FILE'): - with open(app.config['USERNAME_BLACKLIST_FILE']) as f: + if app.config.get("USERNAME_BLACKLIST_FILE"): + with open(app.config["USERNAME_BLACKLIST_FILE"]) as f: app.username_blacklist = [line.rstrip() for line in f] login_manager = create_login_manager() login_manager.init_app(app) app.jinja_env.globals.update( - confirm=lambda realm, *args: Confirmation(realm).dumps(tuple(args))) + confirm=lambda realm, *args: Confirmation(realm).dumps(tuple(args)) + ) - app.user_backend = get_backend(app.config['USER_BACKEND'], app) - app.mail_backend = get_backend(app.config['MAIL_BACKEND'], app) + app.user_backend = get_backend(app.config["USER_BACKEND"], app) + app.mail_backend = get_backend(app.config["MAIL_BACKEND"], app) return app diff --git a/accounts/backend/mail/__init__.py b/accounts/backend/mail/__init__.py index f2ec5d5..bb1709f 100644 --- a/accounts/backend/mail/__init__.py +++ b/accounts/backend/mail/__init__.py @@ -5,7 +5,7 @@ from jinja2 import Template from jinja2.environment import TemplateModule -class Backend(): +class Backend: app: Flask def __init__(self, app: Flask) -> None: @@ -18,10 +18,9 @@ class Backend(): if recipient is None: return - tmpl: Template = \ - self.app.jinja_env.get_or_select_template(template) + tmpl: Template = self.app.jinja_env.get_or_select_template(template) - kwargs['recipient'] = recipient + kwargs["recipient"] = recipient module = tmpl.make_module(kwargs) self._send(recipient, module) diff --git a/accounts/backend/mail/dummy.py b/accounts/backend/mail/dummy.py index 049df7a..0c6da21 100644 --- a/accounts/backend/mail/dummy.py +++ b/accounts/backend/mail/dummy.py @@ -6,28 +6,29 @@ from accounts import AccountsFlask from jinja2.environment import TemplateModule -FANCY_FORMAT = '''\ +FANCY_FORMAT = """\ ,--------------------------------------------------------------------------- | Subject: {subject} | To: {to} | From: {sender} |--------------------------------------------------------------------------- | {body} -`---------------------------------------------------------------------------''' +`---------------------------------------------------------------------------""" -PLAIN_FORMAT = '''Subject: {subject} +PLAIN_FORMAT = """Subject: {subject} To: {to} From: {sender} -{body}''' +{body}""" class DummyBackend(Backend): format: str - def __init__(self, app: AccountsFlask, plain: bool = False, - format: None = None) -> None: + def __init__( + self, app: AccountsFlask, plain: bool = False, format: None = None + ) -> None: super(DummyBackend, self).__init__(app) self.plain = plain @@ -45,8 +46,11 @@ class DummyBackend(Backend): if not self.plain: body = "\n| ".join(body.split("\n")) - print(self.format.format( - subject=content.subject(), # type: ignore - sender=content.sender(), # type: ignore - to=recipient, - body=body)) + print( + self.format.format( + subject=content.subject(), # type: ignore + sender=content.sender(), # type: ignore + to=recipient, + body=body, + ) + ) diff --git a/accounts/backend/mail/sendmail.py b/accounts/backend/mail/sendmail.py index 1fecedc..abaab87 100644 --- a/accounts/backend/mail/sendmail.py +++ b/accounts/backend/mail/sendmail.py @@ -10,26 +10,28 @@ from . import Backend def safe(s: str): - return s.split('\n', 1)[0] + return s.split("\n", 1)[0] class SendmailBackend(Backend): def _send(self, recipient: str, content: TemplateModule): - msg = MIMEText(content.body(), _charset='utf-8') # type: ignore - msg['Subject'] = safe(content.subject()) # type: ignore - msg['To'] = safe(recipient) - msg['From'] = safe(content.sender()) # type: ignore + msg = MIMEText(content.body(), _charset="utf-8") # type: ignore + msg["Subject"] = safe(content.subject()) # type: ignore + msg["To"] = safe(recipient) + msg["From"] = safe(content.sender()) # type: ignore envelope = [] _, address = parseaddr(safe(content.sender())) # type: ignore - if address != '': - envelope = ['-f', address] + if address != "": + envelope = ["-f", address] - p = subprocess.Popen([self.app.config['SENDMAIL_COMMAND']] + - envelope + ['-t'], stdin=subprocess.PIPE) + p = subprocess.Popen( + [self.app.config["SENDMAIL_COMMAND"]] + envelope + ["-t"], + stdin=subprocess.PIPE, + ) assert p.stdin p.stdin.write(msg.as_string().encode("utf-8")) p.stdin.close() if p.wait() != 0: - raise RuntimeError('sendmail terminated with %d' % p.returncode) + raise RuntimeError("sendmail terminated with %d" % p.returncode) diff --git a/accounts/backend/user/__init__.py b/accounts/backend/user/__init__.py index 70f973a..fdbe28b 100644 --- a/accounts/backend/user/__init__.py +++ b/accounts/backend/user/__init__.py @@ -79,9 +79,9 @@ class Backend(object): """ users = self.find_by_uid(uid) if len(users) == 0: - raise NoSuchUserError('No such user') + raise NoSuchUserError("No such user") if len(users) > 1: - raise ShouldNotHappen('Several users for one uid returned.') + raise ShouldNotHappen("Several users for one uid returned.") return users[0] @@ -92,17 +92,17 @@ class Backend(object): """ users = self.find_by_mail(mail) if len(users) == 0: - raise NoSuchUserError('No such user') + raise NoSuchUserError("No such user") if len(users) > 1: - raise ShouldNotHappen('Several users for one mail returned.') + raise ShouldNotHappen("Several users for one mail returned.") return users[0] def find_by_uid(self, uid: str, wildcard=False) -> list[Account]: - return self.find({'uid': uid}, wildcard) + return self.find({"uid": uid}, wildcard) def find_by_mail(self, mail: str, wildcard=False) -> list[Account]: - return self.find({'mail': mail}, wildcard) + return self.find({"mail": mail}, wildcard) def find(self, filters=None, wildcard=False) -> list[Account]: """ diff --git a/accounts/backend/user/dummy.py b/accounts/backend/user/dummy.py index fd6620a..cef1b7b 100644 --- a/accounts/backend/user/dummy.py +++ b/accounts/backend/user/dummy.py @@ -7,8 +7,9 @@ from accounts import AccountsFlask from typing import Optional -def _match_filter(account: Account, filters: Optional[dict[str, str]], - wildcard: bool): +def _match_filter( + account: Account, filters: Optional[dict[str, str]], wildcard: bool +): if filters is None: return True @@ -41,17 +42,17 @@ class DummyBackend(Backend): "test": { "uidNumber": 1, "mail": "test@accounts.spline.de", - "password": "test" + "password": "test", }, "test2": { "uidNumber": 2, "mail": "test2@accounts.spline.de", - "password": "test2" + "password": "test2", }, "admin": { "uidNumber": 3, "mail": "admin@accounts.spline.de", - "password": "admin" + "password": "admin", }, } @@ -61,11 +62,7 @@ class DummyBackend(Backend): accounts = [] for uid, attrs in self._storage.items(): accounts.append( - Account( - uid, - str(attrs["mail"]), - uidNumber=attrs["uidNumber"] - ) + Account(uid, str(attrs["mail"]), uidNumber=attrs["uidNumber"]) ) return accounts @@ -80,18 +77,23 @@ class DummyBackend(Backend): acc.password = password return acc - def find(self, filters: Optional[dict[str, str]] = None, - wildcard=False) -> list[Account]: + def find( + self, filters: Optional[dict[str, str]] = None, wildcard=False + ) -> list[Account]: """ Find accounts by a given filter. """ - return [acc for acc in self._get_accounts() if _match_filter(acc, filters, wildcard)] + return [ + acc + for acc in self._get_accounts() + if _match_filter(acc, filters, wildcard) + ] def _store(self, account: Account) -> None: self._storage[account.uid] = { "uidNumber": account.uidNumber, "mail": account.mail, - "password": account.password + "password": account.password, } def _verify_password(self, account: Account, password: Optional[str]): diff --git a/accounts/backend/user/ldap.py b/accounts/backend/user/ldap.py index 99080a4..a1ed904 100644 --- a/accounts/backend/user/ldap.py +++ b/accounts/backend/user/ldap.py @@ -4,7 +4,11 @@ import ldap3 from ldap3.utils.conv import escape_filter_chars from ldap3.utils.dn import escape_rdn -from ldap3.core.exceptions import LDAPInvalidCredentialsResult, LDAPException, LDAPOperationResult +from ldap3.core.exceptions import ( + LDAPInvalidCredentialsResult, + LDAPException, + LDAPOperationResult, +) from ldap3.core.results import RESULT_SUCCESS from ldap3.abstract.entry import Entry from ldap3 import Connection @@ -24,28 +28,32 @@ def _escape(value: str, wildcard=False): return escape_rdn(value) -def _change_password(conn: Connection, dn, - passwords: tuple[Optional[str], Optional[str]], - as_admin=False): +def _change_password( + conn: Connection, + dn, + passwords: tuple[Optional[str], Optional[str]], + as_admin=False, +): old_password, new_password = passwords if as_admin: conn.extend.standard.modify_password(dn, None, new_password) else: try: - conn.extend.standard.modify_password(dn, old_password, - new_password) + conn.extend.standard.modify_password( + dn, old_password, new_password + ) except LDAPException: - raise InvalidPasswordError('Invalid password') + raise InvalidPasswordError("Invalid password") class LdapBackend(Backend): def __init__(self, app: AccountsFlask) -> None: super(LdapBackend, self).__init__(app) - self.host: str = self.app.config['LDAP_HOST'] - self.base_dn: list[tuple[str, str]] = self.app.config['LDAP_BASE_DN'] - self.admin_user: str = self.app.config['LDAP_ADMIN_USER'] - self.admin_pass: str = self.app.config['LDAP_ADMIN_PASS'] + self.host: str = self.app.config["LDAP_HOST"] + self.base_dn: list[tuple[str, str]] = self.app.config["LDAP_BASE_DN"] + self.admin_user: str = self.app.config["LDAP_ADMIN_USER"] + self.admin_pass: str = self.app.config["LDAP_ADMIN_PASS"] self.services: list[Service] = self.app.all_services self.admin = False @@ -56,29 +64,31 @@ class LdapBackend(Backend): Tries to authenticate a user with a given password. If the authentication is successful an Account object will be returned. """ - user_dn: str = self._format_dn([('uid', username), ('ou', 'users')]) + user_dn: str = self._format_dn([("uid", username), ("ou", "users")]) conn: Connection = self._connect(user_dn, password) uid = None mail = None uidNumber = None services = [] - conn.search(user_dn, '(objectClass=*)', - attributes=['objectClass', 'uid', 'mail', 'cn', 'uidNumber']) + conn.search( + user_dn, + "(objectClass=*)", + attributes=["objectClass", "uid", "mail", "cn", "uidNumber"], + ) entry: Entry for entry in conn.entries: - if 'splineAccount' in entry.objectClass.values: + if "splineAccount" in entry.objectClass.values: uid = entry.uid.value mail = entry.mail.value uidNumber = entry.uidNumber.value - elif 'servicePassword' in entry.objectClass.value: + elif "servicePassword" in entry.objectClass.value: services.append(entry.cn.value) if uid is None or mail is None or uidNumber is None: raise NoSuchUserError("User not found") - return Account(uid, mail, services, password, - uidNumber=uidNumber) + return Account(uid, mail, services, password, uidNumber=uidNumber) def find(self, filters: Optional[dict[str, str]] = None, wildcard=False): """ @@ -87,21 +97,32 @@ class LdapBackend(Backend): if filters is None: filters = dict() - filters['objectClass'] = 'splineAccount' - filter_as_list = ['(%s=%s)' % (attr, _escape(value, wildcard)) - for attr, value in list(filters.items())] - filterstr = '(&%s)' % ''.join(filter_as_list) + filters["objectClass"] = "splineAccount" + filter_as_list = [ + "(%s=%s)" % (attr, _escape(value, wildcard)) + for attr, value in list(filters.items()) + ] + filterstr = "(&%s)" % "".join(filter_as_list) conn = self._connect() - base_dn = self._format_dn([('ou', 'users')]) + base_dn = self._format_dn([("ou", "users")]) accounts: list[Account] = [] try: - conn.search(base_dn, filterstr, search_scope=ldap3.LEVEL, - attributes=['uid', 'mail', 'uidNumber']) + conn.search( + base_dn, + filterstr, + search_scope=ldap3.LEVEL, + attributes=["uid", "mail", "uidNumber"], + ) for entry in conn.entries: - accounts.append(Account(entry.uid.value, entry.mail.value, - uidNumber=entry.uidNumber.value)) + accounts.append( + Account( + entry.uid.value, + entry.mail.value, + uidNumber=entry.uidNumber.value, + ) + ) except LDAPException: pass @@ -110,14 +131,14 @@ class LdapBackend(Backend): def _store(self, account: Account): conn = self._connect_as_admin() - user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) + user_dn = self._format_dn([("uid", account.uid), ("ou", "users")]) attrs = { - 'objectClass': ['top', 'inetOrgPerson', 'splineAccount'], - 'uid': _escape(account.uid), - 'sn': 'n/a', - 'cn': _escape(account.uid), - 'mail': _escape(account.mail), - 'uidNumber': _escape(account.uidNumber), + "objectClass": ["top", "inetOrgPerson", "splineAccount"], + "uid": _escape(account.uid), + "sn": "n/a", + "cn": _escape(account.uid), + "mail": _escape(account.mail), + "uidNumber": _escape(account.uidNumber), } conn.add(user_dn, attributes=attrs) @@ -129,14 +150,16 @@ class LdapBackend(Backend): Updates account informations like passwords or email. """ conn = None - user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) + user_dn = self._format_dn([("uid", account.uid), ("ou", "users")]) if as_admin: conn = self._connect_as_admin() else: conn = self._connect(user_dn, account.password) - attrs = {key: [(ldap3.MODIFY_REPLACE, [value])] - for key, value in list(account.attributes.items())} + attrs = { + key: [(ldap3.MODIFY_REPLACE, [value])] + for key, value in list(account.attributes.items()) + } conn.modify(user_dn, attrs) self._alter_passwords(conn, account, as_admin=as_admin) @@ -150,57 +173,64 @@ class LdapBackend(Backend): else: conn = self._connect(account.uid, account.password) - dns = [[('cn', service), ('uid', account.uid), ('ou', 'users')] - for service in account.services] + \ - [[('uid', account.uid), ('ou', 'users')]] + dns = [ + [("cn", service), ("uid", account.uid), ("ou", "users")] + for service in account.services + ] + [[("uid", account.uid), ("ou", "users")]] for dn in dns: conn.delete(self._format_dn(dn)) - def _format_dn(self, attr: list[tuple[str, str]], with_base_dn=True) -> str: + def _format_dn( + self, attr: list[tuple[str, str]], with_base_dn=True + ) -> str: if with_base_dn: attr.extend(self.base_dn) - dn = ['%s=%s' % (item[0], _escape(item[1])) - for item in attr] + dn = ["%s=%s" % (item[0], _escape(item[1])) for item in attr] - return ','.join(dn) + return ",".join(dn) - def _connect(self, user: Optional[str] = None, - password: Optional[str] = None) -> Connection: + def _connect( + self, user: Optional[str] = None, password: Optional[str] = None + ) -> Connection: server = ldap3.Server(self.host) conn = ldap3.Connection(server, user, password, raise_exceptions=True) try: conn.bind() except LDAPInvalidCredentialsResult: - raise InvalidPasswordError('Invalid password') + raise InvalidPasswordError("Invalid password") return conn def _connect_as_admin(self) -> Connection: - admin_dn = self._format_dn([('cn', self.admin_user)]) + admin_dn = self._format_dn([("cn", self.admin_user)]) return self._connect(admin_dn, self.admin_pass) - def _alter_passwords(self, conn: Connection, account: Account, - as_admin=False): + def _alter_passwords( + self, conn: Connection, account: Account, as_admin=False + ): if account.new_password_root: - user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')]) - _change_password(conn, user_dn, account.new_password_root, as_admin) + user_dn = self._format_dn([("uid", account.uid), ("ou", "users")]) + _change_password( + conn, user_dn, account.new_password_root, as_admin + ) _, account.password = account.new_password_root account.new_password_root = None for service, passwords in list(account.new_password_services.items()): service_id = service.lower() - service_dn = self._format_dn([('cn', service_id), ('uid', account.uid), - ('ou', 'users')]) + service_dn = self._format_dn( + [("cn", service_id), ("uid", account.uid), ("ou", "users")] + ) _, new = passwords if new is not None: if service_id not in account.services: attrs = { - 'objectClass': ['top', 'servicePassword'], - 'cn': _escape(service_id), + "objectClass": ["top", "servicePassword"], + "cn": _escape(service_id), } conn.add(service_dn, attributes=attrs) account.services.append(service_id) @@ -214,30 +244,38 @@ class LdapBackend(Backend): del account.new_password_services[service] def _get_last_uidNumber(self, conn: Connection): - uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')]) - conn.search(uidNumber_dn, '(objectClass=uidNumberMaximum)', - attributes=['uidNumber']) + uidNumber_dn = self._format_dn([("cn", "uidMax"), ("ou", "other")]) + conn.search( + uidNumber_dn, + "(objectClass=uidNumberMaximum)", + attributes=["uidNumber"], + ) for entry in conn.entries: return int(entry.uidNumber.value) - raise ShouldNotHappen('Last uidNumber not found.') + raise ShouldNotHappen("Last uidNumber not found.") def _get_next_uidNumber(self): conn = self._connect_as_admin() - uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')]) + uidNumber_dn = self._format_dn([("cn", "uidMax"), ("ou", "other")]) uidNumber = self._get_last_uidNumber(conn) # Try to acquire next uidNumber for i in [0, 1, 2, 3, 4, 5]: try: - conn.modify(uidNumber_dn, {'uidNumber': [ - (ldap3.MODIFY_DELETE, ['%d' % (uidNumber + i)]), - (ldap3.MODIFY_ADD, ['%d' % (uidNumber + i + 1)]), - ]}) - - if conn.result['result'] == RESULT_SUCCESS: + conn.modify( + uidNumber_dn, + { + "uidNumber": [ + (ldap3.MODIFY_DELETE, ["%d" % (uidNumber + i)]), + (ldap3.MODIFY_ADD, ["%d" % (uidNumber + i + 1)]), + ] + }, + ) + + if conn.result["result"] == RESULT_SUCCESS: return uidNumber + i + 1 except LDAPOperationResult: pass - raise ShouldNotHappen('Unable to get next uidNumber, try again.') + raise ShouldNotHappen("Unable to get next uidNumber, try again.") diff --git a/accounts/default_settings.py b/accounts/default_settings.py index cf36c6c..98987fa 100644 --- a/accounts/default_settings.py +++ b/accounts/default_settings.py @@ -2,33 +2,41 @@ from datetime import timedelta DEBUG = False -SECRET_KEY = 'remember to change this to something more random and secret' +SECRET_KEY = "remember to change this to something more random and secret" # CHANGE THIS! (e.g. os.urandom(32) ) -SESSION_ENCRYPTION_KEY = b'.\x14\xa7\x1b\xa2:\x1b\xb7\xbck\x1bD w\xab\x87a\xb4\xb7\xca\xf1\x06\xb0\x9f?q\x13\x05\x8dY\xe5<' +SESSION_ENCRYPTION_KEY = b".\x14\xa7\x1b\xa2:\x1b\xb7\xbck\x1bD w\xab\x87a\xb4\xb7\xca\xf1\x06\xb0\x9f?q\x13\x05\x8dY\xe5<" -MAIL_DEFAULT_SENDER = 'spline accounts ' +MAIL_DEFAULT_SENDER = "spline accounts " MAIL_REGISTER_NOTIFY = None -DISABLED_ACCOUNT_MAILADDRESS_TEMPLATE = 'noreply-disabledaccount-%s@accounts.spline.de' +DISABLED_ACCOUNT_MAILADDRESS_TEMPLATE = ( + "noreply-disabledaccount-%s@accounts.spline.de" +) -SENDMAIL_COMMAND = '/usr/sbin/sendmail' +SENDMAIL_COMMAND = "/usr/sbin/sendmail" # to make the cookie a session cookie, set this to None -PERMANENT_SESSION_LIFETIME = timedelta(seconds=600) # 10 minutes - -LDAP_HOST = 'ldap://localhost:5678' -LDAP_BASE_DN = [('dc', 'accounts'), ('dc', 'spline'), ('dc', 'inf'), ('dc', 'fu-berlin'), ('dc', 'de')] -LDAP_ADMIN_USER = 'admin' -LDAP_ADMIN_PASS = 'admin' +PERMANENT_SESSION_LIFETIME = timedelta(seconds=600) # 10 minutes + +LDAP_HOST = "ldap://localhost:5678" +LDAP_BASE_DN = [ + ("dc", "accounts"), + ("dc", "spline"), + ("dc", "inf"), + ("dc", "fu-berlin"), + ("dc", "de"), +] +LDAP_ADMIN_USER = "admin" +LDAP_ADMIN_PASS = "admin" -PREFERRED_URL_SCHEME = 'https' +PREFERRED_URL_SCHEME = "https" -USER_BACKEND = 'accounts.backend.user.dummy' -MAIL_BACKEND = 'accounts.backend.mail.dummy' +USER_BACKEND = "accounts.backend.user.dummy" +MAIL_BACKEND = "accounts.backend.mail.dummy" ADMIN_USERS = ["admin"] SERVICES = [ - ('Service1', 'https://service1.spline.de'), - ('Service2', 'https://service2.spline.de'), + ("Service1", "https://service1.spline.de"), + ("Service2", "https://service2.spline.de"), ] diff --git a/accounts/forms.py b/accounts/forms.py index b59ab64..01713ad 100644 --- a/accounts/forms.py +++ b/accounts/forms.py @@ -3,27 +3,50 @@ import re from flask import url_for from flask_wtf import FlaskForm as Form from flask_login import current_user -from wtforms import StringField, PasswordField, ValidationError, \ - BooleanField, validators +from wtforms import ( + StringField, + PasswordField, + ValidationError, + BooleanField, + validators, +) from wtforms.form import FormMeta from markupsafe import Markup from .utils import NotRegexp from accounts.app import accounts_app -USERNAME_RE = re.compile(r'^[a-zA-Z][a-zA-Z0-9-]{1,15}$') -USERNAME_EXCLUDE_RE = re.compile(r'^(admin|root)') +USERNAME_RE = re.compile(r"^[a-zA-Z][a-zA-Z0-9-]{1,15}$") +USERNAME_EXCLUDE_RE = re.compile(r"^(admin|root)") class RegisterForm(Form): - username = StringField('Benutzername', [ - validators.Regexp(USERNAME_RE, message='Benutzername darf nur aus a-z, ' - 'Zahlen und - bestehen (2-16 Zeichen, am Anfang nur a-z).'), - NotRegexp(USERNAME_EXCLUDE_RE, message='Dieser Benutzername ist nicht erlaubt.'), - ]) - mail = StringField('E-Mail-Adresse', [validators.Email(), validators.Length(min=6, max=50)]) - question = StringField('Hauptstadt von Deutschland?', [validators.AnyOf( - ('Berlin', 'berlin'), message='Bitte beantworte die Frage.')]) + username = StringField( + "Benutzername", + [ + validators.Regexp( + USERNAME_RE, + message="Benutzername darf nur aus a-z, " + "Zahlen und - bestehen (2-16 Zeichen, am Anfang nur a-z).", + ), + NotRegexp( + USERNAME_EXCLUDE_RE, + message="Dieser Benutzername ist nicht erlaubt.", + ), + ], + ) + mail = StringField( + "E-Mail-Adresse", + [validators.Email(), validators.Length(min=6, max=50)], + ) + question = StringField( + "Hauptstadt von Deutschland?", + [ + validators.AnyOf( + ("Berlin", "berlin"), message="Bitte beantworte die Frage." + ) + ], + ) def validate_username(self, field): try: @@ -31,12 +54,15 @@ class RegisterForm(Form): except accounts_app.user_backend.NoSuchUserError: if accounts_app.username_blacklist: if field.data.lower() in accounts_app.username_blacklist: - raise ValidationError(Markup( - 'Dieser Benutzername ist momentan nicht erlaubt. ' - 'Weitere Informationen' - % url_for('default.about'))) + raise ValidationError( + Markup( + "Dieser Benutzername ist momentan nicht erlaubt. " + 'Weitere Informationen' + % url_for("default.about") + ) + ) else: - raise ValidationError('Dieser Benutzername ist schon vergeben.') + raise ValidationError("Dieser Benutzername ist schon vergeben.") def validate_mail(self, field): try: @@ -44,11 +70,14 @@ class RegisterForm(Form): except accounts_app.user_backend.NoSuchUserError: pass else: - raise ValidationError(Markup( - 'Ein Benutzername mit dieser Adresse existiert bereits. ' - 'Falls du deinen Benutzernamen vergessen hast, kannst du ' - 'die Passwort-vergessen-Funktion benutzen.' - % url_for('default.lost_password'))) + raise ValidationError( + Markup( + "Ein Benutzername mit dieser Adresse existiert bereits. " + "Falls du deinen Benutzernamen vergessen hast, kannst du " + 'die Passwort-vergessen-Funktion benutzen.' + % url_for("default.lost_password") + ) + ) class AdminCreateAccountForm(RegisterForm): @@ -58,89 +87,131 @@ class AdminCreateAccountForm(RegisterForm): except accounts_app.user_backend.NoSuchUserError: return else: - raise ValidationError('Dieser Benutzername ist schon vergeben') + raise ValidationError("Dieser Benutzername ist schon vergeben") question = None class RegisterCompleteForm(Form): - password = PasswordField('Passwort', [validators.DataRequired(), - validators.EqualTo('password_confirm', message='Passwörter stimmen nicht überein')]) - password_confirm = PasswordField('Passwort bestätigen') + password = PasswordField( + "Passwort", + [ + validators.DataRequired(), + validators.EqualTo( + "password_confirm", message="Passwörter stimmen nicht überein" + ), + ], + ) + password_confirm = PasswordField("Passwort bestätigen") # n.b. this form is also used in lost_password_complete class LostPasswordForm(Form): - username_or_mail = StringField('Benutzername oder E-Mail') + username_or_mail = StringField("Benutzername oder E-Mail") user = None def validate_username_or_mail(self, field): - if '@' not in field.data: + if "@" not in field.data: try: self.user = accounts_app.user_backend.get_by_uid(field.data) except accounts_app.user_backend.NoSuchUserError: - raise ValidationError('Es gibt keinen Benutzer mit diesem Namen.') + raise ValidationError( + "Es gibt keinen Benutzer mit diesem Namen." + ) else: try: self.user = accounts_app.user_backend.get_by_mail(field.data) except accounts_app.user_backend.NoSuchUserError: - raise ValidationError('Es gibt keinen Benutzer mit dieser Adresse.') + raise ValidationError( + "Es gibt keinen Benutzer mit dieser Adresse." + ) class SettingsMeta(FormMeta): def __call__(cls, *args, **kwargs): for service in accounts_app.all_services: - setattr(cls, 'password_%s' % service.id, PasswordField( - 'Passwort für %s' % service.name, [ - validators.Optional(), - validators.EqualTo( - 'password_confirm_%s' % service.id, - message='Passwörter stimmen nicht überein'), - ])) - setattr(cls, 'password_confirm_%s' % service.id, PasswordField( - 'Passwort für %s (Bestätigung)' % service.name)) - setattr(cls, 'delete_%s' % service.id, BooleanField( - 'Passwort für %s löschen' % service.name)) + setattr( + cls, + "password_%s" % service.id, + PasswordField( + "Passwort für %s" % service.name, + [ + validators.Optional(), + validators.EqualTo( + "password_confirm_%s" % service.id, + message="Passwörter stimmen nicht überein", + ), + ], + ), + ) + setattr( + cls, + "password_confirm_%s" % service.id, + PasswordField("Passwort für %s (Bestätigung)" % service.name), + ) + setattr( + cls, + "delete_%s" % service.id, + BooleanField("Passwort für %s löschen" % service.name), + ) return super(SettingsMeta, cls).__call__(*args, **kwargs) class SettingsForm(Form, metaclass=SettingsMeta): - old_password = PasswordField('Altes Passwort') - password = PasswordField('Neues Passwort', [validators.Optional(), - validators.EqualTo('password_confirm', message='Passwörter stimmen nicht überein')]) - password_confirm = PasswordField('Passwort bestätigen') - mail = StringField('E-Mail-Adresse', [validators.Optional(), validators.Email(), validators.Length(min=6, max=50)]) + old_password = PasswordField("Altes Passwort") + password = PasswordField( + "Neues Passwort", + [ + validators.Optional(), + validators.EqualTo( + "password_confirm", message="Passwörter stimmen nicht überein" + ), + ], + ) + password_confirm = PasswordField("Passwort bestätigen") + mail = StringField( + "E-Mail-Adresse", + [ + validators.Optional(), + validators.Email(), + validators.Length(min=6, max=50), + ], + ) def validate_old_password(self, field): if self.password.data: if not field.data: - raise ValidationError('Gib bitte dein altes Passwort ein, um ein neues zu setzen.') + raise ValidationError( + "Gib bitte dein altes Passwort ein, um ein neues zu setzen." + ) if field.data != current_user.password: - raise ValidationError('Altes Passwort ist falsch.') + raise ValidationError("Altes Passwort ist falsch.") def validate_mail(self, field): results = accounts_app.user_backend.find_by_mail(field.data) for user in results: if user.uid != current_user.uid: - raise ValidationError('Diese E-Mail-Adresse wird schon von einem anderen Account benutzt!') + raise ValidationError( + "Diese E-Mail-Adresse wird schon von einem anderen Account benutzt!" + ) def get_servicepassword(self, service_id: str): - return getattr(self, 'password_%s' % service_id) + return getattr(self, "password_%s" % service_id) def get_servicepasswordconfirm(self, service_id: str): - return getattr(self, 'password_confirm_%s' % service_id) + return getattr(self, "password_confirm_%s" % service_id) def get_servicedelete(self, service_id: str): - return getattr(self, 'delete_%s' % service_id) + return getattr(self, "delete_%s" % service_id) class AdminDisableAccountForm(Form): - username = StringField('Benutzername') + username = StringField("Benutzername") user = None def validate_username(self, field): try: self.user = accounts_app.user_backend.get_by_uid(field.data) except accounts_app.user_backend.NoSuchUserError: - raise ValidationError('Dieser Benutzername existiert nicht') + raise ValidationError("Dieser Benutzername existiert nicht") diff --git a/accounts/models.py b/accounts/models.py index 7bbb235..8b34fab 100644 --- a/accounts/models.py +++ b/accounts/models.py @@ -19,7 +19,7 @@ class Service(object): self.changed = False def __repr__(self): - return '' % self.id + return "" % self.id class Account(UserMixin): @@ -27,6 +27,7 @@ class Account(UserMixin): An Account represents a complex ldap tree entry for spline users. For each service a spline user can have a different password. """ + _ready = False uid: str @@ -36,8 +37,14 @@ class Account(UserMixin): attributes: dict[Any, Any] new_password_root: Optional[tuple[Optional[str], Optional[str]]] - def __init__(self, uid: str, mail: str, services=None, - password: Optional[str] = None, uidNumber=None): + def __init__( + self, + uid: str, + mail: str, + services=None, + password: Optional[str] = None, + uidNumber=None, + ): self.uid = uid self.services = list() if services is None else services self.password = password @@ -46,7 +53,7 @@ class Account(UserMixin): self.attributes = {} self.uidNumber = uidNumber - self._set_attribute('mail', mail) + self._set_attribute("mail", mail) self._ready = True def __repr__(self): @@ -55,8 +62,9 @@ class Account(UserMixin): def reset_password(self, service: str): self.new_password_services[service] = (None, None) - def change_password(self, new_password: str, old_password='', - service=None): + def change_password( + self, new_password: str, old_password="", service=None + ): """ Changes a password for a given service. You have to use the UserBackend to make the changes permanent. If no service is given, @@ -76,14 +84,16 @@ class Account(UserMixin): Changes the mail address of an account. You have to use the AccountService class to make changes permanent. """ - self._set_attribute('mail', new_mail) + self._set_attribute("mail", new_mail) def __getattr__(self, name): if name in self.attributes: return self.attributes[name] - raise AttributeError("'%s' object has no attribute '%s'" % - (self.__class__.__name__, name)) + raise AttributeError( + "'%s' object has no attribute '%s'" + % (self.__class__.__name__, name) + ) def __setattr__(self, name: str, value: Any): if self._ready and name not in self.__dict__: diff --git a/accounts/utils/__init__.py b/accounts/utils/__init__.py index da92528..1f79953 100644 --- a/accounts/utils/__init__.py +++ b/accounts/utils/__init__.py @@ -15,8 +15,9 @@ def templated(template: Optional[str] = None): template_name = template if template_name is None: if request.endpoint: - template_name = request.endpoint \ - .replace('.', '/') + '.html' + template_name = ( + request.endpoint.replace(".", "/") + ".html" + ) else: template_name = "error.html" ctx = f(*args, **kwargs) @@ -25,7 +26,9 @@ def templated(template: Optional[str] = None): elif not isinstance(ctx, dict): return ctx return render_template(template_name, **ctx) + return templated__ + return templated_ @@ -35,15 +38,15 @@ class NotRegexp(Regexp): """ def __call__(self, form, field): - if self.regex.match(field.data or ''): + if self.regex.match(field.data or ""): if self.message is None: - self.message: str = field.gettext('Invalid input.') + self.message: str = field.gettext("Invalid input.") raise ValidationError(self.message) def get_backend(path: str, app: Flask): module = path.rsplit(".", 1).pop() - class_name = '%sBackend' % module.title() + class_name = "%sBackend" % module.title() backend_class = getattr(importlib.import_module(path), class_name) return backend_class(app) diff --git a/accounts/utils/confirmation.py b/accounts/utils/confirmation.py index 60967de..62f14ad 100644 --- a/accounts/utils/confirmation.py +++ b/accounts/utils/confirmation.py @@ -10,13 +10,16 @@ class Confirmation(URLSafeTimedSerializer): def __init__(self, realm: str, key=None, **kwargs): if key is None: - key = accounts_app.config['SECRET_KEY'] + key = accounts_app.config["SECRET_KEY"] super(Confirmation, self).__init__(key, salt=realm, **kwargs) - def loads_http(self, s: Union[str, bytes], - max_age: Optional[int] = None, - return_timestamp: bool = False, - salt: Optional[bytes] = None) -> Any: + def loads_http( + self, + s: Union[str, bytes], + max_age: Optional[int] = None, + return_timestamp: bool = False, + salt: Optional[bytes] = None, + ) -> Any: """ Like `Confirmation.loads`, but raise HTTP exceptions with appropriate messages instead of `BadSignature` or `SignatureExpired`. @@ -25,6 +28,6 @@ class Confirmation(URLSafeTimedSerializer): try: return self.loads(s, max_age, return_timestamp, salt) except BadSignature: - raise Forbidden('Ungültiger Bestätigungslink.') + raise Forbidden("Ungültiger Bestätigungslink.") except SignatureExpired: - raise Forbidden('Bestätigungslink ist zu alt.') + raise Forbidden("Bestätigungslink ist zu alt.") diff --git a/accounts/utils/console.py b/accounts/utils/console.py index 823ec33..1b539bd 100644 --- a/accounts/utils/console.py +++ b/accounts/utils/console.py @@ -1,13 +1,14 @@ # -*- coding: utf-8 -*- -class TablePrinter(): + +class TablePrinter: separator: str - def __init__(self, headers=None, separator='|'): + def __init__(self, headers=None, separator="|"): self.headers = headers self.separator = separator - self.format_string = '' + self.format_string = "" self.widths = list() if headers is not None: @@ -26,17 +27,19 @@ class TablePrinter(): while len(self.widths) < columns: self.widths.append(0) - self.widths = [_column_width(column, width) - for column, width - in zip(list(zip(*rows)), self.widths)] + self.widths = [ + _column_width(column, width) + for column, width in zip(list(zip(*rows)), self.widths) + ] self._update_format_string() def _update_format_string(self) -> None: - sep = ' %s ' % self.separator - self.format_string = '%s %s %s' % ( + sep = " %s " % self.separator + self.format_string = "%s %s %s" % ( + self.separator, + sep.join(["%%-%ds" % width for width in self.widths]), self.separator, - sep.join(['%%-%ds' % width for width in self.widths]), - self.separator) + ) def output(self, rows): if len(rows) > 0: @@ -50,10 +53,18 @@ class TablePrinter(): self._print_row(row) def _print_headline(self) -> None: - print(('%s%s%s' % ( - self.separator, - self.separator.join(['-' * (width + 2) for width in self.widths]), - self.separator))) + print( + ( + "%s%s%s" + % ( + self.separator, + self.separator.join( + ["-" * (width + 2) for width in self.widths] + ), + self.separator, + ) + ) + ) def _print_row(self, row) -> None: print((self.format_string % tuple(row))) @@ -63,7 +74,7 @@ class ConsoleForm(object): _ready = False def __init__(self, formcls, **kwargs): - self.form = formcls(meta={'csrf': False}) + self.form = formcls(meta={"csrf": False}) self._fill(kwargs) self._ready = True @@ -76,11 +87,11 @@ class ConsoleForm(object): def print_errors(self): for field, errors in list(self.form.errors.items()): if len(errors) > 1: - print(('%s:' % field)) + print(("%s:" % field)) for error in errors: - print((' %s' % error)) + print((" %s" % error)) else: - print(('%s: %s' % (field, errors[0]))) + print(("%s: %s" % (field, errors[0]))) def __getattr__(self, name): return getattr(self.form, name) diff --git a/accounts/utils/login.py b/accounts/utils/login.py index 07953e3..938268f 100644 --- a/accounts/utils/login.py +++ b/accounts/utils/login.py @@ -24,16 +24,18 @@ class _compact_json: def create_login_manager() -> flask_login.login_manager.LoginManager: login_manager = LoginManager() - login_manager.login_message = 'Bitte einloggen' - login_manager.login_view = 'login.login' + login_manager.login_message = "Bitte einloggen" + login_manager.login_view = "login.login" @login_manager.user_loader def load_user(user_id: str) -> LoginManager: try: username, password = parse_userid(user_id) return accounts_app.user_backend.auth(username, password) - except (accounts_app.user_backend.NoSuchUserError, - accounts_app.user_backend.InvalidPasswordError): + except ( + accounts_app.user_backend.NoSuchUserError, + accounts_app.user_backend.InvalidPasswordError, + ): return None return login_manager @@ -52,7 +54,9 @@ def logout_required(f): @wraps(f) def logout_required_(*args, **kwargs): if current_user.is_authenticated: - raise Forbidden('Diese Seite ist nur für nicht eingeloggte Benutzer gedacht!') + raise Forbidden( + "Diese Seite ist nur für nicht eingeloggte Benutzer gedacht!" + ) return f(*args, **kwargs) - return logout_required_ + return logout_required_ diff --git a/accounts/utils/sessions.py b/accounts/utils/sessions.py index a452fe1..47580bd 100644 --- a/accounts/utils/sessions.py +++ b/accounts/utils/sessions.py @@ -18,7 +18,7 @@ def _pad(value: str, block_size: int) -> bytes: def _unpad(value: str) -> str: - pad_length = ord(value[len(value)-1:]) + pad_length = ord(value[len(value) - 1 :]) return value[:-pad_length] @@ -29,7 +29,7 @@ class EncryptedSerializer(TaggedJSONSerializer): self.block_size = AES.block_size def _cipher(self, iv: bytes): - key = accounts_app.config['SESSION_ENCRYPTION_KEY'] + key = accounts_app.config["SESSION_ENCRYPTION_KEY"] assert len(key) == 32 return AES.new(key, AES.MODE_CBC, iv) @@ -50,10 +50,9 @@ class EncryptedSerializer(TaggedJSONSerializer): `config.SESSION_ENCRYPTION_KEY`. """ decoded = b64decode(value.encode("UTF-8")) - iv = decoded[:self.block_size] - raw = self._cipher(iv).decrypt(decoded[AES.block_size:]) - return super(EncryptedSerializer, self) \ - .loads(_unpad(raw)) + iv = decoded[: self.block_size] + raw = self._cipher(iv).decrypt(decoded[AES.block_size :]) + return super(EncryptedSerializer, self).loads(_unpad(raw)) class EncryptedSessionInterface(SecureCookieSessionInterface): @@ -63,13 +62,15 @@ class EncryptedSessionInterface(SecureCookieSessionInterface): session = None try: parent = super(EncryptedSessionInterface, self) - session = cast(EncryptedSessionInterface, parent) \ - .open_session(app, request) + session = cast(EncryptedSessionInterface, parent).open_session( + app, request + ) except BadPayload: session = self.session_class() if session is not None: - session.permanent = \ - app.config.get('PERMANENT_SESSION_LIFETIME') is not None + session.permanent = ( + app.config.get("PERMANENT_SESSION_LIFETIME") is not None + ) return session diff --git a/accounts/views/admin/__init__.py b/accounts/views/admin/__init__.py index 938033b..92dbf22 100644 --- a/accounts/views/admin/__init__.py +++ b/accounts/views/admin/__init__.py @@ -12,40 +12,41 @@ from accounts.forms import AdminCreateAccountForm, AdminDisableAccountForm from accounts.app import accounts_app -bp = Blueprint('admin', __name__) +bp = Blueprint("admin", __name__) @bp.before_request def restrict_bp_to_admins(): if not current_user.is_authenticated: return accounts_app.login_manager.unauthorized() - if current_user.uid not in accounts_app.config.get('ADMIN_USERS', []): - raise Forbidden('Du bist kein Admin.') + if current_user.uid not in accounts_app.config.get("ADMIN_USERS", []): + raise Forbidden("Du bist kein Admin.") -@bp.route('/') -@templated('admin/index.html') +@bp.route("/") +@templated("admin/index.html") def index(): return {} -@bp.route('/create_account', methods=['GET', 'POST']) -@templated('admin/create_account.html') +@bp.route("/create_account", methods=["GET", "POST"]) +@templated("admin/create_account.html") def create_account(): form = AdminCreateAccountForm() if form.validate_on_submit(): - accounts_app.mail_backend.send(form.mail.data, 'mail/register.txt', - username=form.username.data) + accounts_app.mail_backend.send( + form.mail.data, "mail/register.txt", username=form.username.data + ) - flash('Mail versandt.', 'success') - return redirect(url_for('admin.index')) - return {'form': form} + flash("Mail versandt.", "success") + return redirect(url_for("admin.index")) + return {"form": form} -@bp.route('/view_blacklist') -@bp.route('/view_blacklist/') -@templated('admin/view_blacklist.html') -def view_blacklist(start=''): +@bp.route("/view_blacklist") +@bp.route("/view_blacklist/") +@templated("admin/view_blacklist.html") +def view_blacklist(start=""): entries = accounts_app.username_blacklist if start: entries = [e for e in entries if e.startswith(start)] @@ -53,18 +54,18 @@ def view_blacklist(start=''): next_letters = set(e[len(start)] for e in entries if len(e) > len(start)) return { - 'entries': entries, - 'start': start, - 'next_letters': next_letters, + "entries": entries, + "start": start, + "next_letters": next_letters, } -@bp.route('/disable_account', methods=['GET', 'POST']) -@templated('admin/disable_account.html') +@bp.route("/disable_account", methods=["GET", "POST"]) +@templated("admin/disable_account.html") def disable_account(): form = AdminDisableAccountForm() - if 'uid' in request.args: - form = AdminDisableAccountForm(username=request.args['uid']) + if "uid" in request.args: + form = AdminDisableAccountForm(username=request.args["uid"]) if form.validate_on_submit() and form.user: random_pw = str(uuid4()) @@ -73,19 +74,28 @@ def disable_account(): form.user.reset_password(service.id) oldmail = form.user.mail - mail = accounts_app.config['DISABLED_ACCOUNT_MAILADDRESS_TEMPLATE'] % form.user.uid + mail = ( + accounts_app.config["DISABLED_ACCOUNT_MAILADDRESS_TEMPLATE"] + % form.user.uid + ) form.user.change_email(mail) accounts_app.user_backend.update(form.user, as_admin=True) - flash('Passwort auf ein zufälliges und Mailadresse auf %s ' - 'gesetzt.' % mail, 'success') + flash( + "Passwort auf ein zufälliges und Mailadresse auf %s " + "gesetzt." % mail, + "success", + ) accounts_app.mail_backend.send( - accounts_app.config['MAIL_REGISTER_NOTIFY'], - 'mail/disable_notify.txt', - username=form.user.uid, mail=oldmail, admin=current_user.uid) + accounts_app.config["MAIL_REGISTER_NOTIFY"], + "mail/disable_notify.txt", + username=form.user.uid, + mail=oldmail, + admin=current_user.uid, + ) - return redirect(url_for('admin.index')) + return redirect(url_for("admin.index")) - return {'form': form} + return {"form": form} diff --git a/accounts/views/default/__init__.py b/accounts/views/default/__init__.py index bba20fd..1639182 100644 --- a/accounts/views/default/__init__.py +++ b/accounts/views/default/__init__.py @@ -3,14 +3,17 @@ from copy import deepcopy from flask import Blueprint -from flask import redirect, render_template, request, \ - flash, url_for +from flask import redirect, render_template, request, flash, url_for from flask_login import login_required, login_user, current_user from werkzeug.exceptions import Forbidden from werkzeug import Response -from accounts.forms import RegisterForm, RegisterCompleteForm, \ - LostPasswordForm, SettingsForm +from accounts.forms import ( + RegisterForm, + RegisterCompleteForm, + LostPasswordForm, + SettingsForm, +) from accounts.utils import templated from accounts.utils.confirmation import Confirmation from accounts.utils.login import logout_required @@ -20,33 +23,39 @@ from accounts.app import accounts_app from typing import Union -bp = Blueprint('default', __name__) +bp = Blueprint("default", __name__) -@bp.route('/register', methods=['GET', 'POST']) -@templated('register.html') +@bp.route("/register", methods=["GET", "POST"]) +@templated("register.html") @logout_required def register() -> Union[dict, Response]: form = RegisterForm() if form.validate_on_submit(): - accounts_app.mail_backend.send(form.mail.data, 'mail/register.txt', - username=form.username.data) + accounts_app.mail_backend.send( + form.mail.data, "mail/register.txt", username=form.username.data + ) - flash('Es wurde eine E-Mail an die angegebene Adresse geschickt, ' - 'um diese zu überprüfen. Bitte folge den Anweisungen in der ' - 'E-Mail.', 'success') + flash( + "Es wurde eine E-Mail an die angegebene Adresse geschickt, " + "um diese zu überprüfen. Bitte folge den Anweisungen in der " + "E-Mail.", + "success", + ) - return redirect(url_for('.index')) + return redirect(url_for(".index")) - return {'form': form} + return {"form": form} -@bp.route('/register/', methods=['GET', 'POST']) -@templated('register_complete.html') +@bp.route("/register/", methods=["GET", "POST"]) +@templated("register_complete.html") @logout_required def register_complete(token: str): - #TODO: check for double uids and mail - username, mail = Confirmation('register').loads_http(token, max_age=3*24*60*60) + # TODO: check for double uids and mail + username, mail = Confirmation("register").loads_http( + token, max_age=3 * 24 * 60 * 60 + ) try: accounts_app.user_backend.get_by_uid(username) @@ -54,8 +63,10 @@ def register_complete(token: str): except accounts_app.user_backend.NoSuchUserError: pass else: - flash('Du hast den Benutzer bereits angelegt! Du kannst dich jetzt einfach einloggen:') - return redirect(url_for('.index')) + flash( + "Du hast den Benutzer bereits angelegt! Du kannst dich jetzt einfach einloggen:" + ) + return redirect(url_for(".index")) form = RegisterCompleteForm() if form.validate_on_submit(): @@ -64,45 +75,53 @@ def register_complete(token: str): login_user(user) accounts_app.mail_backend.send( - accounts_app.config['MAIL_REGISTER_NOTIFY'], - 'mail/register_notify.txt', - username=username, mail=mail) + accounts_app.config["MAIL_REGISTER_NOTIFY"], + "mail/register_notify.txt", + username=username, + mail=mail, + ) - flash('Benutzer erfolgreich angelegt.', 'success') - return redirect(url_for('.index')) + flash("Benutzer erfolgreich angelegt.", "success") + return redirect(url_for(".index")) return { - 'form': form, - 'token': token, - 'username': username, - 'mail': mail, + "form": form, + "token": token, + "username": username, + "mail": mail, } -@bp.route('/lost_password', methods=['GET', 'POST']) -@templated('lost_password.html') +@bp.route("/lost_password", methods=["GET", "POST"]) +@templated("lost_password.html") @logout_required def lost_password(): form = LostPasswordForm() if form.validate_on_submit() and form.user: - #TODO: make the link only usable once (e.g include a hash of the old pw) + # TODO: make the link only usable once (e.g include a hash of the old pw) # atm the only thing we do is make the link valid for only little time accounts_app.mail_backend.send( - form.user.mail, 'mail/lost_password.txt', username=form.user.uid) + form.user.mail, "mail/lost_password.txt", username=form.user.uid + ) - flash('Wir haben dir eine E-Mail mit einem Link zum Passwort ändern ' - 'geschickt. Bitte folge den Anweisungen in der E-Mail.', 'success') + flash( + "Wir haben dir eine E-Mail mit einem Link zum Passwort ändern " + "geschickt. Bitte folge den Anweisungen in der E-Mail.", + "success", + ) - return redirect(url_for('.index')) + return redirect(url_for(".index")) - return {'form': form} + return {"form": form} -@bp.route('/lost_password/', methods=['GET', 'POST']) -@templated('lost_password_complete.html') +@bp.route("/lost_password/", methods=["GET", "POST"]) +@templated("lost_password_complete.html") @logout_required def lost_password_complete(token: str): - (username,) = Confirmation('lost_password').loads_http(token, max_age=4*60*60) + (username,) = Confirmation("lost_password").loads_http( + token, max_age=4 * 60 * 60 + ) form = RegisterCompleteForm() if form.validate_on_submit(): @@ -111,45 +130,52 @@ def lost_password_complete(token: str): accounts_app.user_backend.update(user, as_admin=True) login_user(user) - flash('Passwort geändert.', 'success') - return redirect(url_for('.index')) + flash("Passwort geändert.", "success") + return redirect(url_for(".index")) return { - 'form': form, - 'token': token, - 'username': username, + "form": form, + "token": token, + "username": username, } -@bp.route('/', methods=['GET', 'POST']) -@templated('index.html') +@bp.route("/", methods=["GET", "POST"]) +@templated("index.html") @login_required def index() -> Union[Response, dict]: form = SettingsForm(mail=current_user.mail) if form.validate_on_submit(): changed = False - if request.form.get('submit_services'): + if request.form.get("submit_services"): for service in accounts_app.all_services: field = form.get_servicedelete(service.id) if field.data: current_user.reset_password(service.id) changed = True - elif request.form.get('submit_main'): + elif request.form.get("submit_main"): if form.mail.data and form.mail.data != current_user.mail: accounts_app.mail_backend.send( - form.mail.data, 'mail/change_mail.txt', - username=current_user.uid) - - flash('Es wurde eine E-Mail an die angegebene Adresse geschickt, ' - 'um diese zu überprüfen. Bitte folge den Anweisungen in der ' - 'E-Mail.', 'success') + form.mail.data, + "mail/change_mail.txt", + username=current_user.uid, + ) + + flash( + "Es wurde eine E-Mail an die angegebene Adresse geschickt, " + "um diese zu überprüfen. Bitte folge den Anweisungen in der " + "E-Mail.", + "success", + ) changed = True if form.password.data: - current_user.change_password(form.password.data, form.old_password.data) - flash('Passwort geändert', 'success') + current_user.change_password( + form.password.data, form.old_password.data + ) + flash("Passwort geändert", "success") changed = True for service in accounts_app.all_services: @@ -161,46 +187,51 @@ def index() -> Union[Response, dict]: if changed: accounts_app.user_backend.update(current_user) login_user(current_user) - return redirect(url_for('.index')) + return redirect(url_for(".index")) else: - flash('Nichts geändert.') - + flash("Nichts geändert.") services = deepcopy(accounts_app.all_services) for s in services: s.changed = s.id in current_user.services return { - 'form': form, - 'services': services, + "form": form, + "services": services, } -@bp.route('/change_mail/') +@bp.route("/change_mail/") @login_required def change_mail(token: str): - username, mail = Confirmation('change_mail').loads_http(token, max_age=3*24*60*60) + username, mail = Confirmation("change_mail").loads_http( + token, max_age=3 * 24 * 60 * 60 + ) if current_user.uid != username: - raise Forbidden('Bitte logge dich als der Benutzer ein, dessen E-Mail-Adresse du ändern willst.') + raise Forbidden( + "Bitte logge dich als der Benutzer ein, dessen E-Mail-Adresse du ändern willst." + ) results = accounts_app.user_backend.find_by_mail(mail) for user in results: if user.uid != current_user.uid: - raise Forbidden('Diese E-Mail-Adresse wird schon von einem anderen account benutzt!') + raise Forbidden( + "Diese E-Mail-Adresse wird schon von einem anderen account benutzt!" + ) current_user.change_email(mail) accounts_app.user_backend.update(current_user) - flash('E-Mail-Adresse geändert.', 'success') - return redirect(url_for('.index')) + flash("E-Mail-Adresse geändert.", "success") + return redirect(url_for(".index")) -@bp.route('/about') -@templated('about.html') +@bp.route("/about") +@templated("about.html") def about(): return { - 'app': accounts_app, + "app": accounts_app, } @@ -213,4 +244,4 @@ def errorhandler(e): except AttributeError: code = 500 - return render_template('error.html', error=e), code + return render_template("error.html", error=e), code diff --git a/accounts/views/login/__init__.py b/accounts/views/login/__init__.py index ee049bf..1285605 100644 --- a/accounts/views/login/__init__.py +++ b/accounts/views/login/__init__.py @@ -13,7 +13,7 @@ from typing import Union from .forms import LoginForm -bp = Blueprint('login', __name__) +bp = Blueprint("login", __name__) def is_safe_url(target: str): @@ -21,36 +21,44 @@ def is_safe_url(target: str): test_url = urlparse(urljoin(request.host_url, target)) print(target) print(test_url) - return test_url.scheme in ('http', 'https') and \ - ref_url.netloc == test_url.netloc and \ - test_url.path == target + return ( + test_url.scheme in ("http", "https") + and ref_url.netloc == test_url.netloc + and test_url.path == target + ) -@bp.route('/login', methods=['GET', 'POST']) +@bp.route("/login", methods=["GET", "POST"]) def login() -> Union[str, Response]: if current_user.is_authenticated: - return redirect(url_for('default.index')) + return redirect(url_for("default.index")) form = LoginForm(request.form) if form.validate_on_submit(): try: - user = accounts_app.user_backend.auth(form.username.data, - form.password.data) + user = accounts_app.user_backend.auth( + form.username.data, form.password.data + ) login_user(user) - flash('Erfolgreich eingeloggt', 'success') + flash("Erfolgreich eingeloggt", "success") - next = request.form['next'] - return redirect(next if is_safe_url(next) else url_for('default.index')) - except (accounts_app.user_backend.NoSuchUserError, - accounts_app.user_backend.InvalidPasswordError): - flash('Ungültiger Benutzername und/oder Passwort', 'error') + next = request.form["next"] + return redirect( + next if is_safe_url(next) else url_for("default.index") + ) + except ( + accounts_app.user_backend.NoSuchUserError, + accounts_app.user_backend.InvalidPasswordError, + ): + flash("Ungültiger Benutzername und/oder Passwort", "error") - return render_template("login/login.html", form=form, - next=request.values.get('next')) + return render_template( + "login/login.html", form=form, next=request.values.get("next") + ) -@bp.route('/logout') +@bp.route("/logout") def logout() -> Response: logout_user() - flash('Erfolgreich ausgeloggt.', 'success') - return redirect(url_for('.login')) + flash("Erfolgreich ausgeloggt.", "success") + return redirect(url_for(".login")) diff --git a/accounts/views/login/forms.py b/accounts/views/login/forms.py index e4155b4..b9774a3 100644 --- a/accounts/views/login/forms.py +++ b/accounts/views/login/forms.py @@ -4,5 +4,5 @@ from wtforms import StringField, PasswordField, validators class LoginForm(Form): - username = StringField('Benutzername') - password = PasswordField('Passwort', [validators.DataRequired()]) + username = StringField("Benutzername") + password = PasswordField("Passwort", [validators.DataRequired()]) -- cgit v1.2.3-1-g7c22