summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJonah Brüchert <jbb@kaidan.im>2024-03-29 02:20:41 +0100
committerJonah Brüchert <jbb@kaidan.im>2024-03-29 02:23:00 +0100
commit39230732099298b7f56c60b396949d8c0484e4dc (patch)
treed2fc00ccd36783183927fcb90b748641ed588e1d
parentdcad1d5ca832ea05ababa3d38de9a82fc361f2ba (diff)
downloadweb-39230732099298b7f56c60b396949d8c0484e4dc.tar.gz
web-39230732099298b7f56c60b396949d8c0484e4dc.tar.bz2
web-39230732099298b7f56c60b396949d8c0484e4dc.zip
Format project
To allow automatically formatting changes in the future. Command used: black -l 79 accounts/
-rw-r--r--accounts/__init__.py21
-rw-r--r--accounts/backend/mail/__init__.py7
-rw-r--r--accounts/backend/mail/dummy.py26
-rw-r--r--accounts/backend/mail/sendmail.py22
-rw-r--r--accounts/backend/user/__init__.py12
-rw-r--r--accounts/backend/user/dummy.py30
-rw-r--r--accounts/backend/user/ldap.py174
-rw-r--r--accounts/default_settings.py40
-rw-r--r--accounts/forms.py179
-rw-r--r--accounts/models.py28
-rw-r--r--accounts/utils/__init__.py13
-rw-r--r--accounts/utils/confirmation.py17
-rw-r--r--accounts/utils/console.py47
-rw-r--r--accounts/utils/login.py16
-rw-r--r--accounts/utils/sessions.py21
-rw-r--r--accounts/views/admin/__init__.py72
-rw-r--r--accounts/views/default/__init__.py175
-rw-r--r--accounts/views/login/__init__.py46
-rw-r--r--accounts/views/login/forms.py4
19 files changed, 578 insertions, 372 deletions
diff --git a/accounts/__init__.py b/accounts/__init__.py
index 883d45c..c290c3a 100644
--- a/accounts/__init__.py
+++ b/accounts/__init__.py
@@ -19,7 +19,7 @@ def absolute_paths(app: Flask, config: str) -> None:
app.config[name] = os.path.join(dirname, app.config[name])
dirname = os.path.dirname(config)
- handle_option(dirname, 'USERNAME_BLACKLIST_FILE')
+ handle_option(dirname, "USERNAME_BLACKLIST_FILE")
def load_config(app: Flask, configfile: Optional[str]) -> None:
@@ -31,32 +31,33 @@ def load_config(app: Flask, configfile: Optional[str]) -> None:
def create_app(config=None) -> Flask:
app = AccountsFlask(__name__)
- app.config.from_object('accounts.default_settings')
- load_config(app, os.environ.get('SPLINE_ACCOUNT_WEB_SETTINGS'))
+ app.config.from_object("accounts.default_settings")
+ load_config(app, os.environ.get("SPLINE_ACCOUNT_WEB_SETTINGS"))
load_config(app, config)
app.register_blueprint(default.bp)
app.register_blueprint(login.bp)
- app.register_blueprint(admin.bp, url_prefix='/admin')
+ app.register_blueprint(admin.bp, url_prefix="/admin")
app.session_interface = EncryptedSessionInterface()
app.all_services = list()
- for (name, url) in app.config.get('SERVICES', list()):
+ for name, url in app.config.get("SERVICES", list()):
cn = name.lower()
app.all_services.append(Service(cn, name, url))
app.username_blacklist = list()
- if app.config.get('USERNAME_BLACKLIST_FILE'):
- with open(app.config['USERNAME_BLACKLIST_FILE']) as f:
+ if app.config.get("USERNAME_BLACKLIST_FILE"):
+ with open(app.config["USERNAME_BLACKLIST_FILE"]) as f:
app.username_blacklist = [line.rstrip() for line in f]
login_manager = create_login_manager()
login_manager.init_app(app)
app.jinja_env.globals.update(
- confirm=lambda realm, *args: Confirmation(realm).dumps(tuple(args)))
+ confirm=lambda realm, *args: Confirmation(realm).dumps(tuple(args))
+ )
- app.user_backend = get_backend(app.config['USER_BACKEND'], app)
- app.mail_backend = get_backend(app.config['MAIL_BACKEND'], app)
+ app.user_backend = get_backend(app.config["USER_BACKEND"], app)
+ app.mail_backend = get_backend(app.config["MAIL_BACKEND"], app)
return app
diff --git a/accounts/backend/mail/__init__.py b/accounts/backend/mail/__init__.py
index f2ec5d5..bb1709f 100644
--- a/accounts/backend/mail/__init__.py
+++ b/accounts/backend/mail/__init__.py
@@ -5,7 +5,7 @@ from jinja2 import Template
from jinja2.environment import TemplateModule
-class Backend():
+class Backend:
app: Flask
def __init__(self, app: Flask) -> None:
@@ -18,10 +18,9 @@ class Backend():
if recipient is None:
return
- tmpl: Template = \
- self.app.jinja_env.get_or_select_template(template)
+ tmpl: Template = self.app.jinja_env.get_or_select_template(template)
- kwargs['recipient'] = recipient
+ kwargs["recipient"] = recipient
module = tmpl.make_module(kwargs)
self._send(recipient, module)
diff --git a/accounts/backend/mail/dummy.py b/accounts/backend/mail/dummy.py
index 049df7a..0c6da21 100644
--- a/accounts/backend/mail/dummy.py
+++ b/accounts/backend/mail/dummy.py
@@ -6,28 +6,29 @@ from accounts import AccountsFlask
from jinja2.environment import TemplateModule
-FANCY_FORMAT = '''\
+FANCY_FORMAT = """\
,---------------------------------------------------------------------------
| Subject: {subject}
| To: {to}
| From: {sender}
|---------------------------------------------------------------------------
| {body}
-`---------------------------------------------------------------------------'''
+`---------------------------------------------------------------------------"""
-PLAIN_FORMAT = '''Subject: {subject}
+PLAIN_FORMAT = """Subject: {subject}
To: {to}
From: {sender}
-{body}'''
+{body}"""
class DummyBackend(Backend):
format: str
- def __init__(self, app: AccountsFlask, plain: bool = False,
- format: None = None) -> None:
+ def __init__(
+ self, app: AccountsFlask, plain: bool = False, format: None = None
+ ) -> None:
super(DummyBackend, self).__init__(app)
self.plain = plain
@@ -45,8 +46,11 @@ class DummyBackend(Backend):
if not self.plain:
body = "\n| ".join(body.split("\n"))
- print(self.format.format(
- subject=content.subject(), # type: ignore
- sender=content.sender(), # type: ignore
- to=recipient,
- body=body))
+ print(
+ self.format.format(
+ subject=content.subject(), # type: ignore
+ sender=content.sender(), # type: ignore
+ to=recipient,
+ body=body,
+ )
+ )
diff --git a/accounts/backend/mail/sendmail.py b/accounts/backend/mail/sendmail.py
index 1fecedc..abaab87 100644
--- a/accounts/backend/mail/sendmail.py
+++ b/accounts/backend/mail/sendmail.py
@@ -10,26 +10,28 @@ from . import Backend
def safe(s: str):
- return s.split('\n', 1)[0]
+ return s.split("\n", 1)[0]
class SendmailBackend(Backend):
def _send(self, recipient: str, content: TemplateModule):
- msg = MIMEText(content.body(), _charset='utf-8') # type: ignore
- msg['Subject'] = safe(content.subject()) # type: ignore
- msg['To'] = safe(recipient)
- msg['From'] = safe(content.sender()) # type: ignore
+ msg = MIMEText(content.body(), _charset="utf-8") # type: ignore
+ msg["Subject"] = safe(content.subject()) # type: ignore
+ msg["To"] = safe(recipient)
+ msg["From"] = safe(content.sender()) # type: ignore
envelope = []
_, address = parseaddr(safe(content.sender())) # type: ignore
- if address != '':
- envelope = ['-f', address]
+ if address != "":
+ envelope = ["-f", address]
- p = subprocess.Popen([self.app.config['SENDMAIL_COMMAND']] +
- envelope + ['-t'], stdin=subprocess.PIPE)
+ p = subprocess.Popen(
+ [self.app.config["SENDMAIL_COMMAND"]] + envelope + ["-t"],
+ stdin=subprocess.PIPE,
+ )
assert p.stdin
p.stdin.write(msg.as_string().encode("utf-8"))
p.stdin.close()
if p.wait() != 0:
- raise RuntimeError('sendmail terminated with %d' % p.returncode)
+ raise RuntimeError("sendmail terminated with %d" % p.returncode)
diff --git a/accounts/backend/user/__init__.py b/accounts/backend/user/__init__.py
index 70f973a..fdbe28b 100644
--- a/accounts/backend/user/__init__.py
+++ b/accounts/backend/user/__init__.py
@@ -79,9 +79,9 @@ class Backend(object):
"""
users = self.find_by_uid(uid)
if len(users) == 0:
- raise NoSuchUserError('No such user')
+ raise NoSuchUserError("No such user")
if len(users) > 1:
- raise ShouldNotHappen('Several users for one uid returned.')
+ raise ShouldNotHappen("Several users for one uid returned.")
return users[0]
@@ -92,17 +92,17 @@ class Backend(object):
"""
users = self.find_by_mail(mail)
if len(users) == 0:
- raise NoSuchUserError('No such user')
+ raise NoSuchUserError("No such user")
if len(users) > 1:
- raise ShouldNotHappen('Several users for one mail returned.')
+ raise ShouldNotHappen("Several users for one mail returned.")
return users[0]
def find_by_uid(self, uid: str, wildcard=False) -> list[Account]:
- return self.find({'uid': uid}, wildcard)
+ return self.find({"uid": uid}, wildcard)
def find_by_mail(self, mail: str, wildcard=False) -> list[Account]:
- return self.find({'mail': mail}, wildcard)
+ return self.find({"mail": mail}, wildcard)
def find(self, filters=None, wildcard=False) -> list[Account]:
"""
diff --git a/accounts/backend/user/dummy.py b/accounts/backend/user/dummy.py
index fd6620a..cef1b7b 100644
--- a/accounts/backend/user/dummy.py
+++ b/accounts/backend/user/dummy.py
@@ -7,8 +7,9 @@ from accounts import AccountsFlask
from typing import Optional
-def _match_filter(account: Account, filters: Optional[dict[str, str]],
- wildcard: bool):
+def _match_filter(
+ account: Account, filters: Optional[dict[str, str]], wildcard: bool
+):
if filters is None:
return True
@@ -41,17 +42,17 @@ class DummyBackend(Backend):
"test": {
"uidNumber": 1,
"mail": "test@accounts.spline.de",
- "password": "test"
+ "password": "test",
},
"test2": {
"uidNumber": 2,
"mail": "test2@accounts.spline.de",
- "password": "test2"
+ "password": "test2",
},
"admin": {
"uidNumber": 3,
"mail": "admin@accounts.spline.de",
- "password": "admin"
+ "password": "admin",
},
}
@@ -61,11 +62,7 @@ class DummyBackend(Backend):
accounts = []
for uid, attrs in self._storage.items():
accounts.append(
- Account(
- uid,
- str(attrs["mail"]),
- uidNumber=attrs["uidNumber"]
- )
+ Account(uid, str(attrs["mail"]), uidNumber=attrs["uidNumber"])
)
return accounts
@@ -80,18 +77,23 @@ class DummyBackend(Backend):
acc.password = password
return acc
- def find(self, filters: Optional[dict[str, str]] = None,
- wildcard=False) -> list[Account]:
+ def find(
+ self, filters: Optional[dict[str, str]] = None, wildcard=False
+ ) -> list[Account]:
"""
Find accounts by a given filter.
"""
- return [acc for acc in self._get_accounts() if _match_filter(acc, filters, wildcard)]
+ return [
+ acc
+ for acc in self._get_accounts()
+ if _match_filter(acc, filters, wildcard)
+ ]
def _store(self, account: Account) -> None:
self._storage[account.uid] = {
"uidNumber": account.uidNumber,
"mail": account.mail,
- "password": account.password
+ "password": account.password,
}
def _verify_password(self, account: Account, password: Optional[str]):
diff --git a/accounts/backend/user/ldap.py b/accounts/backend/user/ldap.py
index 99080a4..a1ed904 100644
--- a/accounts/backend/user/ldap.py
+++ b/accounts/backend/user/ldap.py
@@ -4,7 +4,11 @@
import ldap3
from ldap3.utils.conv import escape_filter_chars
from ldap3.utils.dn import escape_rdn
-from ldap3.core.exceptions import LDAPInvalidCredentialsResult, LDAPException, LDAPOperationResult
+from ldap3.core.exceptions import (
+ LDAPInvalidCredentialsResult,
+ LDAPException,
+ LDAPOperationResult,
+)
from ldap3.core.results import RESULT_SUCCESS
from ldap3.abstract.entry import Entry
from ldap3 import Connection
@@ -24,28 +28,32 @@ def _escape(value: str, wildcard=False):
return escape_rdn(value)
-def _change_password(conn: Connection, dn,
- passwords: tuple[Optional[str], Optional[str]],
- as_admin=False):
+def _change_password(
+ conn: Connection,
+ dn,
+ passwords: tuple[Optional[str], Optional[str]],
+ as_admin=False,
+):
old_password, new_password = passwords
if as_admin:
conn.extend.standard.modify_password(dn, None, new_password)
else:
try:
- conn.extend.standard.modify_password(dn, old_password,
- new_password)
+ conn.extend.standard.modify_password(
+ dn, old_password, new_password
+ )
except LDAPException:
- raise InvalidPasswordError('Invalid password')
+ raise InvalidPasswordError("Invalid password")
class LdapBackend(Backend):
def __init__(self, app: AccountsFlask) -> None:
super(LdapBackend, self).__init__(app)
- self.host: str = self.app.config['LDAP_HOST']
- self.base_dn: list[tuple[str, str]] = self.app.config['LDAP_BASE_DN']
- self.admin_user: str = self.app.config['LDAP_ADMIN_USER']
- self.admin_pass: str = self.app.config['LDAP_ADMIN_PASS']
+ self.host: str = self.app.config["LDAP_HOST"]
+ self.base_dn: list[tuple[str, str]] = self.app.config["LDAP_BASE_DN"]
+ self.admin_user: str = self.app.config["LDAP_ADMIN_USER"]
+ self.admin_pass: str = self.app.config["LDAP_ADMIN_PASS"]
self.services: list[Service] = self.app.all_services
self.admin = False
@@ -56,29 +64,31 @@ class LdapBackend(Backend):
Tries to authenticate a user with a given password. If the
authentication is successful an Account object will be returned.
"""
- user_dn: str = self._format_dn([('uid', username), ('ou', 'users')])
+ user_dn: str = self._format_dn([("uid", username), ("ou", "users")])
conn: Connection = self._connect(user_dn, password)
uid = None
mail = None
uidNumber = None
services = []
- conn.search(user_dn, '(objectClass=*)',
- attributes=['objectClass', 'uid', 'mail', 'cn', 'uidNumber'])
+ conn.search(
+ user_dn,
+ "(objectClass=*)",
+ attributes=["objectClass", "uid", "mail", "cn", "uidNumber"],
+ )
entry: Entry
for entry in conn.entries:
- if 'splineAccount' in entry.objectClass.values:
+ if "splineAccount" in entry.objectClass.values:
uid = entry.uid.value
mail = entry.mail.value
uidNumber = entry.uidNumber.value
- elif 'servicePassword' in entry.objectClass.value:
+ elif "servicePassword" in entry.objectClass.value:
services.append(entry.cn.value)
if uid is None or mail is None or uidNumber is None:
raise NoSuchUserError("User not found")
- return Account(uid, mail, services, password,
- uidNumber=uidNumber)
+ return Account(uid, mail, services, password, uidNumber=uidNumber)
def find(self, filters: Optional[dict[str, str]] = None, wildcard=False):
"""
@@ -87,21 +97,32 @@ class LdapBackend(Backend):
if filters is None:
filters = dict()
- filters['objectClass'] = 'splineAccount'
- filter_as_list = ['(%s=%s)' % (attr, _escape(value, wildcard))
- for attr, value in list(filters.items())]
- filterstr = '(&%s)' % ''.join(filter_as_list)
+ filters["objectClass"] = "splineAccount"
+ filter_as_list = [
+ "(%s=%s)" % (attr, _escape(value, wildcard))
+ for attr, value in list(filters.items())
+ ]
+ filterstr = "(&%s)" % "".join(filter_as_list)
conn = self._connect()
- base_dn = self._format_dn([('ou', 'users')])
+ base_dn = self._format_dn([("ou", "users")])
accounts: list[Account] = []
try:
- conn.search(base_dn, filterstr, search_scope=ldap3.LEVEL,
- attributes=['uid', 'mail', 'uidNumber'])
+ conn.search(
+ base_dn,
+ filterstr,
+ search_scope=ldap3.LEVEL,
+ attributes=["uid", "mail", "uidNumber"],
+ )
for entry in conn.entries:
- accounts.append(Account(entry.uid.value, entry.mail.value,
- uidNumber=entry.uidNumber.value))
+ accounts.append(
+ Account(
+ entry.uid.value,
+ entry.mail.value,
+ uidNumber=entry.uidNumber.value,
+ )
+ )
except LDAPException:
pass
@@ -110,14 +131,14 @@ class LdapBackend(Backend):
def _store(self, account: Account):
conn = self._connect_as_admin()
- user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')])
+ user_dn = self._format_dn([("uid", account.uid), ("ou", "users")])
attrs = {
- 'objectClass': ['top', 'inetOrgPerson', 'splineAccount'],
- 'uid': _escape(account.uid),
- 'sn': 'n/a',
- 'cn': _escape(account.uid),
- 'mail': _escape(account.mail),
- 'uidNumber': _escape(account.uidNumber),
+ "objectClass": ["top", "inetOrgPerson", "splineAccount"],
+ "uid": _escape(account.uid),
+ "sn": "n/a",
+ "cn": _escape(account.uid),
+ "mail": _escape(account.mail),
+ "uidNumber": _escape(account.uidNumber),
}
conn.add(user_dn, attributes=attrs)
@@ -129,14 +150,16 @@ class LdapBackend(Backend):
Updates account informations like passwords or email.
"""
conn = None
- user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')])
+ user_dn = self._format_dn([("uid", account.uid), ("ou", "users")])
if as_admin:
conn = self._connect_as_admin()
else:
conn = self._connect(user_dn, account.password)
- attrs = {key: [(ldap3.MODIFY_REPLACE, [value])]
- for key, value in list(account.attributes.items())}
+ attrs = {
+ key: [(ldap3.MODIFY_REPLACE, [value])]
+ for key, value in list(account.attributes.items())
+ }
conn.modify(user_dn, attrs)
self._alter_passwords(conn, account, as_admin=as_admin)
@@ -150,57 +173,64 @@ class LdapBackend(Backend):
else:
conn = self._connect(account.uid, account.password)
- dns = [[('cn', service), ('uid', account.uid), ('ou', 'users')]
- for service in account.services] + \
- [[('uid', account.uid), ('ou', 'users')]]
+ dns = [
+ [("cn", service), ("uid", account.uid), ("ou", "users")]
+ for service in account.services
+ ] + [[("uid", account.uid), ("ou", "users")]]
for dn in dns:
conn.delete(self._format_dn(dn))
- def _format_dn(self, attr: list[tuple[str, str]], with_base_dn=True) -> str:
+ def _format_dn(
+ self, attr: list[tuple[str, str]], with_base_dn=True
+ ) -> str:
if with_base_dn:
attr.extend(self.base_dn)
- dn = ['%s=%s' % (item[0], _escape(item[1]))
- for item in attr]
+ dn = ["%s=%s" % (item[0], _escape(item[1])) for item in attr]
- return ','.join(dn)
+ return ",".join(dn)
- def _connect(self, user: Optional[str] = None,
- password: Optional[str] = None) -> Connection:
+ def _connect(
+ self, user: Optional[str] = None, password: Optional[str] = None
+ ) -> Connection:
server = ldap3.Server(self.host)
conn = ldap3.Connection(server, user, password, raise_exceptions=True)
try:
conn.bind()
except LDAPInvalidCredentialsResult:
- raise InvalidPasswordError('Invalid password')
+ raise InvalidPasswordError("Invalid password")
return conn
def _connect_as_admin(self) -> Connection:
- admin_dn = self._format_dn([('cn', self.admin_user)])
+ admin_dn = self._format_dn([("cn", self.admin_user)])
return self._connect(admin_dn, self.admin_pass)
- def _alter_passwords(self, conn: Connection, account: Account,
- as_admin=False):
+ def _alter_passwords(
+ self, conn: Connection, account: Account, as_admin=False
+ ):
if account.new_password_root:
- user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')])
- _change_password(conn, user_dn, account.new_password_root, as_admin)
+ user_dn = self._format_dn([("uid", account.uid), ("ou", "users")])
+ _change_password(
+ conn, user_dn, account.new_password_root, as_admin
+ )
_, account.password = account.new_password_root
account.new_password_root = None
for service, passwords in list(account.new_password_services.items()):
service_id = service.lower()
- service_dn = self._format_dn([('cn', service_id), ('uid', account.uid),
- ('ou', 'users')])
+ service_dn = self._format_dn(
+ [("cn", service_id), ("uid", account.uid), ("ou", "users")]
+ )
_, new = passwords
if new is not None:
if service_id not in account.services:
attrs = {
- 'objectClass': ['top', 'servicePassword'],
- 'cn': _escape(service_id),
+ "objectClass": ["top", "servicePassword"],
+ "cn": _escape(service_id),
}
conn.add(service_dn, attributes=attrs)
account.services.append(service_id)
@@ -214,30 +244,38 @@ class LdapBackend(Backend):
del account.new_password_services[service]
def _get_last_uidNumber(self, conn: Connection):
- uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')])
- conn.search(uidNumber_dn, '(objectClass=uidNumberMaximum)',
- attributes=['uidNumber'])
+ uidNumber_dn = self._format_dn([("cn", "uidMax"), ("ou", "other")])
+ conn.search(
+ uidNumber_dn,
+ "(objectClass=uidNumberMaximum)",
+ attributes=["uidNumber"],
+ )
for entry in conn.entries:
return int(entry.uidNumber.value)
- raise ShouldNotHappen('Last uidNumber not found.')
+ raise ShouldNotHappen("Last uidNumber not found.")
def _get_next_uidNumber(self):
conn = self._connect_as_admin()
- uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')])
+ uidNumber_dn = self._format_dn([("cn", "uidMax"), ("ou", "other")])
uidNumber = self._get_last_uidNumber(conn)
# Try to acquire next uidNumber
for i in [0, 1, 2, 3, 4, 5]:
try:
- conn.modify(uidNumber_dn, {'uidNumber': [
- (ldap3.MODIFY_DELETE, ['%d' % (uidNumber + i)]),
- (ldap3.MODIFY_ADD, ['%d' % (uidNumber + i + 1)]),
- ]})
-
- if conn.result['result'] == RESULT_SUCCESS:
+ conn.modify(
+ uidNumber_dn,
+ {
+ "uidNumber": [
+ (ldap3.MODIFY_DELETE, ["%d" % (uidNumber + i)]),
+ (ldap3.MODIFY_ADD, ["%d" % (uidNumber + i + 1)]),
+ ]
+ },
+ )
+
+ if conn.result["result"] == RESULT_SUCCESS:
return uidNumber + i + 1
except LDAPOperationResult:
pass
- raise ShouldNotHappen('Unable to get next uidNumber, try again.')
+ raise ShouldNotHappen("Unable to get next uidNumber, try again.")
diff --git a/accounts/default_settings.py b/accounts/default_settings.py
index cf36c6c..98987fa 100644
--- a/accounts/default_settings.py
+++ b/accounts/default_settings.py
@@ -2,33 +2,41 @@ from datetime import timedelta
DEBUG = False
-SECRET_KEY = 'remember to change this to something more random and secret'
+SECRET_KEY = "remember to change this to something more random and secret"
# CHANGE THIS! (e.g. os.urandom(32) )
-SESSION_ENCRYPTION_KEY = b'.\x14\xa7\x1b\xa2:\x1b\xb7\xbck\x1bD w\xab\x87a\xb4\xb7\xca\xf1\x06\xb0\x9f?q\x13\x05\x8dY\xe5<'
+SESSION_ENCRYPTION_KEY = b".\x14\xa7\x1b\xa2:\x1b\xb7\xbck\x1bD w\xab\x87a\xb4\xb7\xca\xf1\x06\xb0\x9f?q\x13\x05\x8dY\xe5<"
-MAIL_DEFAULT_SENDER = 'spline accounts <noreply@accounts.spline.de>'
+MAIL_DEFAULT_SENDER = "spline accounts <noreply@accounts.spline.de>"
MAIL_REGISTER_NOTIFY = None
-DISABLED_ACCOUNT_MAILADDRESS_TEMPLATE = 'noreply-disabledaccount-%s@accounts.spline.de'
+DISABLED_ACCOUNT_MAILADDRESS_TEMPLATE = (
+ "noreply-disabledaccount-%s@accounts.spline.de"
+)
-SENDMAIL_COMMAND = '/usr/sbin/sendmail'
+SENDMAIL_COMMAND = "/usr/sbin/sendmail"
# to make the cookie a session cookie, set this to None
-PERMANENT_SESSION_LIFETIME = timedelta(seconds=600) # 10 minutes
-
-LDAP_HOST = 'ldap://localhost:5678'
-LDAP_BASE_DN = [('dc', 'accounts'), ('dc', 'spline'), ('dc', 'inf'), ('dc', 'fu-berlin'), ('dc', 'de')]
-LDAP_ADMIN_USER = 'admin'
-LDAP_ADMIN_PASS = 'admin'
+PERMANENT_SESSION_LIFETIME = timedelta(seconds=600) # 10 minutes
+
+LDAP_HOST = "ldap://localhost:5678"
+LDAP_BASE_DN = [
+ ("dc", "accounts"),
+ ("dc", "spline"),
+ ("dc", "inf"),
+ ("dc", "fu-berlin"),
+ ("dc", "de"),
+]
+LDAP_ADMIN_USER = "admin"
+LDAP_ADMIN_PASS = "admin"
-PREFERRED_URL_SCHEME = 'https'
+PREFERRED_URL_SCHEME = "https"
-USER_BACKEND = 'accounts.backend.user.dummy'
-MAIL_BACKEND = 'accounts.backend.mail.dummy'
+USER_BACKEND = "accounts.backend.user.dummy"
+MAIL_BACKEND = "accounts.backend.mail.dummy"
ADMIN_USERS = ["admin"]
SERVICES = [
- ('Service1', 'https://service1.spline.de'),
- ('Service2', 'https://service2.spline.de'),
+ ("Service1", "https://service1.spline.de"),
+ ("Service2", "https://service2.spline.de"),
]
diff --git a/accounts/forms.py b/accounts/forms.py
index b59ab64..01713ad 100644
--- a/accounts/forms.py
+++ b/accounts/forms.py
@@ -3,27 +3,50 @@ import re
from flask import url_for
from flask_wtf import FlaskForm as Form
from flask_login import current_user
-from wtforms import StringField, PasswordField, ValidationError, \
- BooleanField, validators
+from wtforms import (
+ StringField,
+ PasswordField,
+ ValidationError,
+ BooleanField,
+ validators,
+)
from wtforms.form import FormMeta
from markupsafe import Markup
from .utils import NotRegexp
from accounts.app import accounts_app
-USERNAME_RE = re.compile(r'^[a-zA-Z][a-zA-Z0-9-]{1,15}$')
-USERNAME_EXCLUDE_RE = re.compile(r'^(admin|root)')
+USERNAME_RE = re.compile(r"^[a-zA-Z][a-zA-Z0-9-]{1,15}$")
+USERNAME_EXCLUDE_RE = re.compile(r"^(admin|root)")
class RegisterForm(Form):
- username = StringField('Benutzername', [
- validators.Regexp(USERNAME_RE, message='Benutzername darf nur aus a-z, '
- 'Zahlen und - bestehen (2-16 Zeichen, am Anfang nur a-z).'),
- NotRegexp(USERNAME_EXCLUDE_RE, message='Dieser Benutzername ist nicht erlaubt.'),
- ])
- mail = StringField('E-Mail-Adresse', [validators.Email(), validators.Length(min=6, max=50)])
- question = StringField('Hauptstadt von Deutschland?', [validators.AnyOf(
- ('Berlin', 'berlin'), message='Bitte beantworte die Frage.')])
+ username = StringField(
+ "Benutzername",
+ [
+ validators.Regexp(
+ USERNAME_RE,
+ message="Benutzername darf nur aus a-z, "
+ "Zahlen und - bestehen (2-16 Zeichen, am Anfang nur a-z).",
+ ),
+ NotRegexp(
+ USERNAME_EXCLUDE_RE,
+ message="Dieser Benutzername ist nicht erlaubt.",
+ ),
+ ],
+ )
+ mail = StringField(
+ "E-Mail-Adresse",
+ [validators.Email(), validators.Length(min=6, max=50)],
+ )
+ question = StringField(
+ "Hauptstadt von Deutschland?",
+ [
+ validators.AnyOf(
+ ("Berlin", "berlin"), message="Bitte beantworte die Frage."
+ )
+ ],
+ )
def validate_username(self, field):
try:
@@ -31,12 +54,15 @@ class RegisterForm(Form):
except accounts_app.user_backend.NoSuchUserError:
if accounts_app.username_blacklist:
if field.data.lower() in accounts_app.username_blacklist:
- raise ValidationError(Markup(
- 'Dieser Benutzername ist momentan nicht erlaubt. '
- '<a href="%s">Weitere Informationen</a>'
- % url_for('default.about')))
+ raise ValidationError(
+ Markup(
+ "Dieser Benutzername ist momentan nicht erlaubt. "
+ '<a href="%s">Weitere Informationen</a>'
+ % url_for("default.about")
+ )
+ )
else:
- raise ValidationError('Dieser Benutzername ist schon vergeben.')
+ raise ValidationError("Dieser Benutzername ist schon vergeben.")
def validate_mail(self, field):
try:
@@ -44,11 +70,14 @@ class RegisterForm(Form):
except accounts_app.user_backend.NoSuchUserError:
pass
else:
- raise ValidationError(Markup(
- 'Ein Benutzername mit dieser Adresse existiert bereits. '
- 'Falls du deinen Benutzernamen vergessen hast, kannst du '
- 'die <a href="%s">Passwort-vergessen-Funktion</a> benutzen.'
- % url_for('default.lost_password')))
+ raise ValidationError(
+ Markup(
+ "Ein Benutzername mit dieser Adresse existiert bereits. "
+ "Falls du deinen Benutzernamen vergessen hast, kannst du "
+ 'die <a href="%s">Passwort-vergessen-Funktion</a> benutzen.'
+ % url_for("default.lost_password")
+ )
+ )
class AdminCreateAccountForm(RegisterForm):
@@ -58,89 +87,131 @@ class AdminCreateAccountForm(RegisterForm):
except accounts_app.user_backend.NoSuchUserError:
return
else:
- raise ValidationError('Dieser Benutzername ist schon vergeben')
+ raise ValidationError("Dieser Benutzername ist schon vergeben")
question = None
class RegisterCompleteForm(Form):
- password = PasswordField('Passwort', [validators.DataRequired(),
- validators.EqualTo('password_confirm', message='Passwörter stimmen nicht überein')])
- password_confirm = PasswordField('Passwort bestätigen')
+ password = PasswordField(
+ "Passwort",
+ [
+ validators.DataRequired(),
+ validators.EqualTo(
+ "password_confirm", message="Passwörter stimmen nicht überein"
+ ),
+ ],
+ )
+ password_confirm = PasswordField("Passwort bestätigen")
# n.b. this form is also used in lost_password_complete
class LostPasswordForm(Form):
- username_or_mail = StringField('Benutzername oder E-Mail')
+ username_or_mail = StringField("Benutzername oder E-Mail")
user = None
def validate_username_or_mail(self, field):
- if '@' not in field.data:
+ if "@" not in field.data:
try:
self.user = accounts_app.user_backend.get_by_uid(field.data)
except accounts_app.user_backend.NoSuchUserError:
- raise ValidationError('Es gibt keinen Benutzer mit diesem Namen.')
+ raise ValidationError(
+ "Es gibt keinen Benutzer mit diesem Namen."
+ )
else:
try:
self.user = accounts_app.user_backend.get_by_mail(field.data)
except accounts_app.user_backend.NoSuchUserError:
- raise ValidationError('Es gibt keinen Benutzer mit dieser Adresse.')
+ raise ValidationError(
+ "Es gibt keinen Benutzer mit dieser Adresse."
+ )
class SettingsMeta(FormMeta):
def __call__(cls, *args, **kwargs):
for service in accounts_app.all_services:
- setattr(cls, 'password_%s' % service.id, PasswordField(
- 'Passwort für %s' % service.name, [
- validators.Optional(),
- validators.EqualTo(
- 'password_confirm_%s' % service.id,
- message='Passwörter stimmen nicht überein'),
- ]))
- setattr(cls, 'password_confirm_%s' % service.id, PasswordField(
- 'Passwort für %s (Bestätigung)' % service.name))
- setattr(cls, 'delete_%s' % service.id, BooleanField(
- 'Passwort für %s löschen' % service.name))
+ setattr(
+ cls,
+ "password_%s" % service.id,
+ PasswordField(
+ "Passwort für %s" % service.name,
+ [
+ validators.Optional(),
+ validators.EqualTo(
+ "password_confirm_%s" % service.id,
+ message="Passwörter stimmen nicht überein",
+ ),
+ ],
+ ),
+ )
+ setattr(
+ cls,
+ "password_confirm_%s" % service.id,
+ PasswordField("Passwort für %s (Bestätigung)" % service.name),
+ )
+ setattr(
+ cls,
+ "delete_%s" % service.id,
+ BooleanField("Passwort für %s löschen" % service.name),
+ )
return super(SettingsMeta, cls).__call__(*args, **kwargs)
class SettingsForm(Form, metaclass=SettingsMeta):
- old_password = PasswordField('Altes Passwort')
- password = PasswordField('Neues Passwort', [validators.Optional(),
- validators.EqualTo('password_confirm', message='Passwörter stimmen nicht überein')])
- password_confirm = PasswordField('Passwort bestätigen')
- mail = StringField('E-Mail-Adresse', [validators.Optional(), validators.Email(), validators.Length(min=6, max=50)])
+ old_password = PasswordField("Altes Passwort")
+ password = PasswordField(
+ "Neues Passwort",
+ [
+ validators.Optional(),
+ validators.EqualTo(
+ "password_confirm", message="Passwörter stimmen nicht überein"
+ ),
+ ],
+ )
+ password_confirm = PasswordField("Passwort bestätigen")
+ mail = StringField(
+ "E-Mail-Adresse",
+ [
+ validators.Optional(),
+ validators.Email(),
+ validators.Length(min=6, max=50),
+ ],
+ )
def validate_old_password(self, field):
if self.password.data:
if not field.data:
- raise ValidationError('Gib bitte dein altes Passwort ein, um ein neues zu setzen.')
+ raise ValidationError(
+ "Gib bitte dein altes Passwort ein, um ein neues zu setzen."
+ )
if field.data != current_user.password:
- raise ValidationError('Altes Passwort ist falsch.')
+ raise ValidationError("Altes Passwort ist falsch.")
def validate_mail(self, field):
results = accounts_app.user_backend.find_by_mail(field.data)
for user in results:
if user.uid != current_user.uid:
- raise ValidationError('Diese E-Mail-Adresse wird schon von einem anderen Account benutzt!')
+ raise ValidationError(
+ "Diese E-Mail-Adresse wird schon von einem anderen Account benutzt!"
+ )
def get_servicepassword(self, service_id: str):
- return getattr(self, 'password_%s' % service_id)
+ return getattr(self, "password_%s" % service_id)
def get_servicepasswordconfirm(self, service_id: str):
- return getattr(self, 'password_confirm_%s' % service_id)
+ return getattr(self, "password_confirm_%s" % service_id)
def get_servicedelete(self, service_id: str):
- return getattr(self, 'delete_%s' % service_id)
+ return getattr(self, "delete_%s" % service_id)
class AdminDisableAccountForm(Form):
- username = StringField('Benutzername')
+ username = StringField("Benutzername")
user = None
def validate_username(self, field):
try:
self.user = accounts_app.user_backend.get_by_uid(field.data)
except accounts_app.user_backend.NoSuchUserError:
- raise ValidationError('Dieser Benutzername existiert nicht')
+ raise ValidationError("Dieser Benutzername existiert nicht")
diff --git a/accounts/models.py b/accounts/models.py
index 7bbb235..8b34fab 100644
--- a/accounts/models.py
+++ b/accounts/models.py
@@ -19,7 +19,7 @@ class Service(object):
self.changed = False
def __repr__(self):
- return '<Service %s>' % self.id
+ return "<Service %s>" % self.id
class Account(UserMixin):
@@ -27,6 +27,7 @@ class Account(UserMixin):
An Account represents a complex ldap tree entry for spline users.
For each service a spline user can have a different password.
"""
+
_ready = False
uid: str
@@ -36,8 +37,14 @@ class Account(UserMixin):
attributes: dict[Any, Any]
new_password_root: Optional[tuple[Optional[str], Optional[str]]]
- def __init__(self, uid: str, mail: str, services=None,
- password: Optional[str] = None, uidNumber=None):
+ def __init__(
+ self,
+ uid: str,
+ mail: str,
+ services=None,
+ password: Optional[str] = None,
+ uidNumber=None,
+ ):
self.uid = uid
self.services = list() if services is None else services
self.password = password
@@ -46,7 +53,7 @@ class Account(UserMixin):
self.attributes = {}
self.uidNumber = uidNumber
- self._set_attribute('mail', mail)
+ self._set_attribute("mail", mail)
self._ready = True
def __repr__(self):
@@ -55,8 +62,9 @@ class Account(UserMixin):
def reset_password(self, service: str):
self.new_password_services[service] = (None, None)
- def change_password(self, new_password: str, old_password='',
- service=None):
+ def change_password(
+ self, new_password: str, old_password="", service=None
+ ):
"""
Changes a password for a given service. You have to use the
UserBackend to make the changes permanent. If no service is given,
@@ -76,14 +84,16 @@ class Account(UserMixin):
Changes the mail address of an account. You have to use the
AccountService class to make changes permanent.
"""
- self._set_attribute('mail', new_mail)
+ self._set_attribute("mail", new_mail)
def __getattr__(self, name):
if name in self.attributes:
return self.attributes[name]
- raise AttributeError("'%s' object has no attribute '%s'" %
- (self.__class__.__name__, name))
+ raise AttributeError(
+ "'%s' object has no attribute '%s'"
+ % (self.__class__.__name__, name)
+ )
def __setattr__(self, name: str, value: Any):
if self._ready and name not in self.__dict__:
diff --git a/accounts/utils/__init__.py b/accounts/utils/__init__.py
index da92528..1f79953 100644
--- a/accounts/utils/__init__.py
+++ b/accounts/utils/__init__.py
@@ -15,8 +15,9 @@ def templated(template: Optional[str] = None):
template_name = template
if template_name is None:
if request.endpoint:
- template_name = request.endpoint \
- .replace('.', '/') + '.html'
+ template_name = (
+ request.endpoint.replace(".", "/") + ".html"
+ )
else:
template_name = "error.html"
ctx = f(*args, **kwargs)
@@ -25,7 +26,9 @@ def templated(template: Optional[str] = None):
elif not isinstance(ctx, dict):
return ctx
return render_template(template_name, **ctx)
+
return templated__
+
return templated_
@@ -35,15 +38,15 @@ class NotRegexp(Regexp):
"""
def __call__(self, form, field):
- if self.regex.match(field.data or ''):
+ if self.regex.match(field.data or ""):
if self.message is None:
- self.message: str = field.gettext('Invalid input.')
+ self.message: str = field.gettext("Invalid input.")
raise ValidationError(self.message)
def get_backend(path: str, app: Flask):
module = path.rsplit(".", 1).pop()
- class_name = '%sBackend' % module.title()
+ class_name = "%sBackend" % module.title()
backend_class = getattr(importlib.import_module(path), class_name)
return backend_class(app)
diff --git a/accounts/utils/confirmation.py b/accounts/utils/confirmation.py
index 60967de..62f14ad 100644
--- a/accounts/utils/confirmation.py
+++ b/accounts/utils/confirmation.py
@@ -10,13 +10,16 @@ class Confirmation(URLSafeTimedSerializer):
def __init__(self, realm: str, key=None, **kwargs):
if key is None:
- key = accounts_app.config['SECRET_KEY']
+ key = accounts_app.config["SECRET_KEY"]
super(Confirmation, self).__init__(key, salt=realm, **kwargs)
- def loads_http(self, s: Union[str, bytes],
- max_age: Optional[int] = None,
- return_timestamp: bool = False,
- salt: Optional[bytes] = None) -> Any:
+ def loads_http(
+ self,
+ s: Union[str, bytes],
+ max_age: Optional[int] = None,
+ return_timestamp: bool = False,
+ salt: Optional[bytes] = None,
+ ) -> Any:
"""
Like `Confirmation.loads`, but raise HTTP exceptions with appropriate
messages instead of `BadSignature` or `SignatureExpired`.
@@ -25,6 +28,6 @@ class Confirmation(URLSafeTimedSerializer):
try:
return self.loads(s, max_age, return_timestamp, salt)
except BadSignature:
- raise Forbidden('Ungültiger Bestätigungslink.')
+ raise Forbidden("Ungültiger Bestätigungslink.")
except SignatureExpired:
- raise Forbidden('Bestätigungslink ist zu alt.')
+ raise Forbidden("Bestätigungslink ist zu alt.")
diff --git a/accounts/utils/console.py b/accounts/utils/console.py
index 823ec33..1b539bd 100644
--- a/accounts/utils/console.py
+++ b/accounts/utils/console.py
@@ -1,13 +1,14 @@
# -*- coding: utf-8 -*-
-class TablePrinter():
+
+class TablePrinter:
separator: str
- def __init__(self, headers=None, separator='|'):
+ def __init__(self, headers=None, separator="|"):
self.headers = headers
self.separator = separator
- self.format_string = ''
+ self.format_string = ""
self.widths = list()
if headers is not None:
@@ -26,17 +27,19 @@ class TablePrinter():
while len(self.widths) < columns:
self.widths.append(0)
- self.widths = [_column_width(column, width)
- for column, width
- in zip(list(zip(*rows)), self.widths)]
+ self.widths = [
+ _column_width(column, width)
+ for column, width in zip(list(zip(*rows)), self.widths)
+ ]
self._update_format_string()
def _update_format_string(self) -> None:
- sep = ' %s ' % self.separator
- self.format_string = '%s %s %s' % (
+ sep = " %s " % self.separator
+ self.format_string = "%s %s %s" % (
+ self.separator,
+ sep.join(["%%-%ds" % width for width in self.widths]),
self.separator,
- sep.join(['%%-%ds' % width for width in self.widths]),
- self.separator)
+ )
def output(self, rows):
if len(rows) > 0:
@@ -50,10 +53,18 @@ class TablePrinter():
self._print_row(row)
def _print_headline(self) -> None:
- print(('%s%s%s' % (
- self.separator,
- self.separator.join(['-' * (width + 2) for width in self.widths]),
- self.separator)))
+ print(
+ (
+ "%s%s%s"
+ % (
+ self.separator,
+ self.separator.join(
+ ["-" * (width + 2) for width in self.widths]
+ ),
+ self.separator,
+ )
+ )
+ )
def _print_row(self, row) -> None:
print((self.format_string % tuple(row)))
@@ -63,7 +74,7 @@ class ConsoleForm(object):
_ready = False
def __init__(self, formcls, **kwargs):
- self.form = formcls(meta={'csrf': False})
+ self.form = formcls(meta={"csrf": False})
self._fill(kwargs)
self._ready = True
@@ -76,11 +87,11 @@ class ConsoleForm(object):
def print_errors(self):
for field, errors in list(self.form.errors.items()):
if len(errors) > 1:
- print(('%s:' % field))
+ print(("%s:" % field))
for error in errors:
- print((' %s' % error))
+ print((" %s" % error))
else:
- print(('%s: %s' % (field, errors[0])))
+ print(("%s: %s" % (field, errors[0])))
def __getattr__(self, name):
return getattr(self.form, name)
diff --git a/accounts/utils/login.py b/accounts/utils/login.py
index 07953e3..938268f 100644
--- a/accounts/utils/login.py
+++ b/accounts/utils/login.py
@@ -24,16 +24,18 @@ class _compact_json:
def create_login_manager() -> flask_login.login_manager.LoginManager:
login_manager = LoginManager()
- login_manager.login_message = 'Bitte einloggen'
- login_manager.login_view = 'login.login'
+ login_manager.login_message = "Bitte einloggen"
+ login_manager.login_view = "login.login"
@login_manager.user_loader
def load_user(user_id: str) -> LoginManager:
try:
username, password = parse_userid(user_id)
return accounts_app.user_backend.auth(username, password)
- except (accounts_app.user_backend.NoSuchUserError,
- accounts_app.user_backend.InvalidPasswordError):
+ except (
+ accounts_app.user_backend.NoSuchUserError,
+ accounts_app.user_backend.InvalidPasswordError,
+ ):
return None
return login_manager
@@ -52,7 +54,9 @@ def logout_required(f):
@wraps(f)
def logout_required_(*args, **kwargs):
if current_user.is_authenticated:
- raise Forbidden('Diese Seite ist nur für nicht eingeloggte Benutzer gedacht!')
+ raise Forbidden(
+ "Diese Seite ist nur für nicht eingeloggte Benutzer gedacht!"
+ )
return f(*args, **kwargs)
- return logout_required_
+ return logout_required_
diff --git a/accounts/utils/sessions.py b/accounts/utils/sessions.py
index a452fe1..47580bd 100644
--- a/accounts/utils/sessions.py
+++ b/accounts/utils/sessions.py
@@ -18,7 +18,7 @@ def _pad(value: str, block_size: int) -> bytes:
def _unpad(value: str) -> str:
- pad_length = ord(value[len(value)-1:])
+ pad_length = ord(value[len(value) - 1 :])
return value[:-pad_length]
@@ -29,7 +29,7 @@ class EncryptedSerializer(TaggedJSONSerializer):
self.block_size = AES.block_size
def _cipher(self, iv: bytes):
- key = accounts_app.config['SESSION_ENCRYPTION_KEY']
+ key = accounts_app.config["SESSION_ENCRYPTION_KEY"]
assert len(key) == 32
return AES.new(key, AES.MODE_CBC, iv)
@@ -50,10 +50,9 @@ class EncryptedSerializer(TaggedJSONSerializer):
`config.SESSION_ENCRYPTION_KEY`.
"""
decoded = b64decode(value.encode("UTF-8"))
- iv = decoded[:self.block_size]
- raw = self._cipher(iv).decrypt(decoded[AES.block_size:])
- return super(EncryptedSerializer, self) \
- .loads(_unpad(raw))
+ iv = decoded[: self.block_size]
+ raw = self._cipher(iv).decrypt(decoded[AES.block_size :])
+ return super(EncryptedSerializer, self).loads(_unpad(raw))
class EncryptedSessionInterface(SecureCookieSessionInterface):
@@ -63,13 +62,15 @@ class EncryptedSessionInterface(SecureCookieSessionInterface):
session = None
try:
parent = super(EncryptedSessionInterface, self)
- session = cast(EncryptedSessionInterface, parent) \
- .open_session(app, request)
+ session = cast(EncryptedSessionInterface, parent).open_session(
+ app, request
+ )
except BadPayload:
session = self.session_class()
if session is not None:
- session.permanent = \
- app.config.get('PERMANENT_SESSION_LIFETIME') is not None
+ session.permanent = (
+ app.config.get("PERMANENT_SESSION_LIFETIME") is not None
+ )
return session
diff --git a/accounts/views/admin/__init__.py b/accounts/views/admin/__init__.py
index 938033b..92dbf22 100644
--- a/accounts/views/admin/__init__.py
+++ b/accounts/views/admin/__init__.py
@@ -12,40 +12,41 @@ from accounts.forms import AdminCreateAccountForm, AdminDisableAccountForm
from accounts.app import accounts_app
-bp = Blueprint('admin', __name__)
+bp = Blueprint("admin", __name__)
@bp.before_request
def restrict_bp_to_admins():
if not current_user.is_authenticated:
return accounts_app.login_manager.unauthorized()
- if current_user.uid not in accounts_app.config.get('ADMIN_USERS', []):
- raise Forbidden('Du bist kein Admin.')
+ if current_user.uid not in accounts_app.config.get("ADMIN_USERS", []):
+ raise Forbidden("Du bist kein Admin.")
-@bp.route('/')
-@templated('admin/index.html')
+@bp.route("/")
+@templated("admin/index.html")
def index():
return {}
-@bp.route('/create_account', methods=['GET', 'POST'])
-@templated('admin/create_account.html')
+@bp.route("/create_account", methods=["GET", "POST"])
+@templated("admin/create_account.html")
def create_account():
form = AdminCreateAccountForm()
if form.validate_on_submit():
- accounts_app.mail_backend.send(form.mail.data, 'mail/register.txt',
- username=form.username.data)
+ accounts_app.mail_backend.send(
+ form.mail.data, "mail/register.txt", username=form.username.data
+ )
- flash('Mail versandt.', 'success')
- return redirect(url_for('admin.index'))
- return {'form': form}
+ flash("Mail versandt.", "success")
+ return redirect(url_for("admin.index"))
+ return {"form": form}
-@bp.route('/view_blacklist')
-@bp.route('/view_blacklist/<start>')
-@templated('admin/view_blacklist.html')
-def view_blacklist(start=''):
+@bp.route("/view_blacklist")
+@bp.route("/view_blacklist/<start>")
+@templated("admin/view_blacklist.html")
+def view_blacklist(start=""):
entries = accounts_app.username_blacklist
if start:
entries = [e for e in entries if e.startswith(start)]
@@ -53,18 +54,18 @@ def view_blacklist(start=''):
next_letters = set(e[len(start)] for e in entries if len(e) > len(start))
return {
- 'entries': entries,
- 'start': start,
- 'next_letters': next_letters,
+ "entries": entries,
+ "start": start,
+ "next_letters": next_letters,
}
-@bp.route('/disable_account', methods=['GET', 'POST'])
-@templated('admin/disable_account.html')
+@bp.route("/disable_account", methods=["GET", "POST"])
+@templated("admin/disable_account.html")
def disable_account():
form = AdminDisableAccountForm()
- if 'uid' in request.args:
- form = AdminDisableAccountForm(username=request.args['uid'])
+ if "uid" in request.args:
+ form = AdminDisableAccountForm(username=request.args["uid"])
if form.validate_on_submit() and form.user:
random_pw = str(uuid4())
@@ -73,19 +74,28 @@ def disable_account():
form.user.reset_password(service.id)
oldmail = form.user.mail
- mail = accounts_app.config['DISABLED_ACCOUNT_MAILADDRESS_TEMPLATE'] % form.user.uid
+ mail = (
+ accounts_app.config["DISABLED_ACCOUNT_MAILADDRESS_TEMPLATE"]
+ % form.user.uid
+ )
form.user.change_email(mail)
accounts_app.user_backend.update(form.user, as_admin=True)
- flash('Passwort auf ein zufälliges und Mailadresse auf %s '
- 'gesetzt.' % mail, 'success')
+ flash(
+ "Passwort auf ein zufälliges und Mailadresse auf %s "
+ "gesetzt." % mail,
+ "success",
+ )
accounts_app.mail_backend.send(
- accounts_app.config['MAIL_REGISTER_NOTIFY'],
- 'mail/disable_notify.txt',
- username=form.user.uid, mail=oldmail, admin=current_user.uid)
+ accounts_app.config["MAIL_REGISTER_NOTIFY"],
+ "mail/disable_notify.txt",
+ username=form.user.uid,
+ mail=oldmail,
+ admin=current_user.uid,
+ )
- return redirect(url_for('admin.index'))
+ return redirect(url_for("admin.index"))
- return {'form': form}
+ return {"form": form}
diff --git a/accounts/views/default/__init__.py b/accounts/views/default/__init__.py
index bba20fd..1639182 100644
--- a/accounts/views/default/__init__.py
+++ b/accounts/views/default/__init__.py
@@ -3,14 +3,17 @@
from copy import deepcopy
from flask import Blueprint
-from flask import redirect, render_template, request, \
- flash, url_for
+from flask import redirect, render_template, request, flash, url_for
from flask_login import login_required, login_user, current_user
from werkzeug.exceptions import Forbidden
from werkzeug import Response
-from accounts.forms import RegisterForm, RegisterCompleteForm, \
- LostPasswordForm, SettingsForm
+from accounts.forms import (
+ RegisterForm,
+ RegisterCompleteForm,
+ LostPasswordForm,
+ SettingsForm,
+)
from accounts.utils import templated
from accounts.utils.confirmation import Confirmation
from accounts.utils.login import logout_required
@@ -20,33 +23,39 @@ from accounts.app import accounts_app
from typing import Union
-bp = Blueprint('default', __name__)
+bp = Blueprint("default", __name__)
-@bp.route('/register', methods=['GET', 'POST'])
-@templated('register.html')
+@bp.route("/register", methods=["GET", "POST"])
+@templated("register.html")
@logout_required
def register() -> Union[dict, Response]:
form = RegisterForm()
if form.validate_on_submit():
- accounts_app.mail_backend.send(form.mail.data, 'mail/register.txt',
- username=form.username.data)
+ accounts_app.mail_backend.send(
+ form.mail.data, "mail/register.txt", username=form.username.data
+ )
- flash('Es wurde eine E-Mail an die angegebene Adresse geschickt, '
- 'um diese zu überprüfen. Bitte folge den Anweisungen in der '
- 'E-Mail.', 'success')
+ flash(
+ "Es wurde eine E-Mail an die angegebene Adresse geschickt, "
+ "um diese zu überprüfen. Bitte folge den Anweisungen in der "
+ "E-Mail.",
+ "success",
+ )
- return redirect(url_for('.index'))
+ return redirect(url_for(".index"))
- return {'form': form}
+ return {"form": form}
-@bp.route('/register/<token>', methods=['GET', 'POST'])
-@templated('register_complete.html')
+@bp.route("/register/<token>", methods=["GET", "POST"])
+@templated("register_complete.html")
@logout_required
def register_complete(token: str):
- #TODO: check for double uids and mail
- username, mail = Confirmation('register').loads_http(token, max_age=3*24*60*60)
+ # TODO: check for double uids and mail
+ username, mail = Confirmation("register").loads_http(
+ token, max_age=3 * 24 * 60 * 60
+ )
try:
accounts_app.user_backend.get_by_uid(username)
@@ -54,8 +63,10 @@ def register_complete(token: str):
except accounts_app.user_backend.NoSuchUserError:
pass
else:
- flash('Du hast den Benutzer bereits angelegt! Du kannst dich jetzt einfach einloggen:')
- return redirect(url_for('.index'))
+ flash(
+ "Du hast den Benutzer bereits angelegt! Du kannst dich jetzt einfach einloggen:"
+ )
+ return redirect(url_for(".index"))
form = RegisterCompleteForm()
if form.validate_on_submit():
@@ -64,45 +75,53 @@ def register_complete(token: str):
login_user(user)
accounts_app.mail_backend.send(
- accounts_app.config['MAIL_REGISTER_NOTIFY'],
- 'mail/register_notify.txt',
- username=username, mail=mail)
+ accounts_app.config["MAIL_REGISTER_NOTIFY"],
+ "mail/register_notify.txt",
+ username=username,
+ mail=mail,
+ )
- flash('Benutzer erfolgreich angelegt.', 'success')
- return redirect(url_for('.index'))
+ flash("Benutzer erfolgreich angelegt.", "success")
+ return redirect(url_for(".index"))
return {
- 'form': form,
- 'token': token,
- 'username': username,
- 'mail': mail,
+ "form": form,
+ "token": token,
+ "username": username,
+ "mail": mail,
}
-@bp.route('/lost_password', methods=['GET', 'POST'])
-@templated('lost_password.html')
+@bp.route("/lost_password", methods=["GET", "POST"])
+@templated("lost_password.html")
@logout_required
def lost_password():
form = LostPasswordForm()
if form.validate_on_submit() and form.user:
- #TODO: make the link only usable once (e.g include a hash of the old pw)
+ # TODO: make the link only usable once (e.g include a hash of the old pw)
# atm the only thing we do is make the link valid for only little time
accounts_app.mail_backend.send(
- form.user.mail, 'mail/lost_password.txt', username=form.user.uid)
+ form.user.mail, "mail/lost_password.txt", username=form.user.uid
+ )
- flash('Wir haben dir eine E-Mail mit einem Link zum Passwort ändern '
- 'geschickt. Bitte folge den Anweisungen in der E-Mail.', 'success')
+ flash(
+ "Wir haben dir eine E-Mail mit einem Link zum Passwort ändern "
+ "geschickt. Bitte folge den Anweisungen in der E-Mail.",
+ "success",
+ )
- return redirect(url_for('.index'))
+ return redirect(url_for(".index"))
- return {'form': form}
+ return {"form": form}
-@bp.route('/lost_password/<token>', methods=['GET', 'POST'])
-@templated('lost_password_complete.html')
+@bp.route("/lost_password/<token>", methods=["GET", "POST"])
+@templated("lost_password_complete.html")
@logout_required
def lost_password_complete(token: str):
- (username,) = Confirmation('lost_password').loads_http(token, max_age=4*60*60)
+ (username,) = Confirmation("lost_password").loads_http(
+ token, max_age=4 * 60 * 60
+ )
form = RegisterCompleteForm()
if form.validate_on_submit():
@@ -111,45 +130,52 @@ def lost_password_complete(token: str):
accounts_app.user_backend.update(user, as_admin=True)
login_user(user)
- flash('Passwort geändert.', 'success')
- return redirect(url_for('.index'))
+ flash("Passwort geändert.", "success")
+ return redirect(url_for(".index"))
return {
- 'form': form,
- 'token': token,
- 'username': username,
+ "form": form,
+ "token": token,
+ "username": username,
}
-@bp.route('/', methods=['GET', 'POST'])
-@templated('index.html')
+@bp.route("/", methods=["GET", "POST"])
+@templated("index.html")
@login_required
def index() -> Union[Response, dict]:
form = SettingsForm(mail=current_user.mail)
if form.validate_on_submit():
changed = False
- if request.form.get('submit_services'):
+ if request.form.get("submit_services"):
for service in accounts_app.all_services:
field = form.get_servicedelete(service.id)
if field.data:
current_user.reset_password(service.id)
changed = True
- elif request.form.get('submit_main'):
+ elif request.form.get("submit_main"):
if form.mail.data and form.mail.data != current_user.mail:
accounts_app.mail_backend.send(
- form.mail.data, 'mail/change_mail.txt',
- username=current_user.uid)
-
- flash('Es wurde eine E-Mail an die angegebene Adresse geschickt, '
- 'um diese zu überprüfen. Bitte folge den Anweisungen in der '
- 'E-Mail.', 'success')
+ form.mail.data,
+ "mail/change_mail.txt",
+ username=current_user.uid,
+ )
+
+ flash(
+ "Es wurde eine E-Mail an die angegebene Adresse geschickt, "
+ "um diese zu überprüfen. Bitte folge den Anweisungen in der "
+ "E-Mail.",
+ "success",
+ )
changed = True
if form.password.data:
- current_user.change_password(form.password.data, form.old_password.data)
- flash('Passwort geändert', 'success')
+ current_user.change_password(
+ form.password.data, form.old_password.data
+ )
+ flash("Passwort geändert", "success")
changed = True
for service in accounts_app.all_services:
@@ -161,46 +187,51 @@ def index() -> Union[Response, dict]:
if changed:
accounts_app.user_backend.update(current_user)
login_user(current_user)
- return redirect(url_for('.index'))
+ return redirect(url_for(".index"))
else:
- flash('Nichts geändert.')
-
+ flash("Nichts geändert.")
services = deepcopy(accounts_app.all_services)
for s in services:
s.changed = s.id in current_user.services
return {
- 'form': form,
- 'services': services,
+ "form": form,
+ "services": services,
}
-@bp.route('/change_mail/<token>')
+@bp.route("/change_mail/<token>")
@login_required
def change_mail(token: str):
- username, mail = Confirmation('change_mail').loads_http(token, max_age=3*24*60*60)
+ username, mail = Confirmation("change_mail").loads_http(
+ token, max_age=3 * 24 * 60 * 60
+ )
if current_user.uid != username:
- raise Forbidden('Bitte logge dich als der Benutzer ein, dessen E-Mail-Adresse du ändern willst.')
+ raise Forbidden(
+ "Bitte logge dich als der Benutzer ein, dessen E-Mail-Adresse du ändern willst."
+ )
results = accounts_app.user_backend.find_by_mail(mail)
for user in results:
if user.uid != current_user.uid:
- raise Forbidden('Diese E-Mail-Adresse wird schon von einem anderen account benutzt!')
+ raise Forbidden(
+ "Diese E-Mail-Adresse wird schon von einem anderen account benutzt!"
+ )
current_user.change_email(mail)
accounts_app.user_backend.update(current_user)
- flash('E-Mail-Adresse geändert.', 'success')
- return redirect(url_for('.index'))
+ flash("E-Mail-Adresse geändert.", "success")
+ return redirect(url_for(".index"))
-@bp.route('/about')
-@templated('about.html')
+@bp.route("/about")
+@templated("about.html")
def about():
return {
- 'app': accounts_app,
+ "app": accounts_app,
}
@@ -213,4 +244,4 @@ def errorhandler(e):
except AttributeError:
code = 500
- return render_template('error.html', error=e), code
+ return render_template("error.html", error=e), code
diff --git a/accounts/views/login/__init__.py b/accounts/views/login/__init__.py
index ee049bf..1285605 100644
--- a/accounts/views/login/__init__.py
+++ b/accounts/views/login/__init__.py
@@ -13,7 +13,7 @@ from typing import Union
from .forms import LoginForm
-bp = Blueprint('login', __name__)
+bp = Blueprint("login", __name__)
def is_safe_url(target: str):
@@ -21,36 +21,44 @@ def is_safe_url(target: str):
test_url = urlparse(urljoin(request.host_url, target))
print(target)
print(test_url)
- return test_url.scheme in ('http', 'https') and \
- ref_url.netloc == test_url.netloc and \
- test_url.path == target
+ return (
+ test_url.scheme in ("http", "https")
+ and ref_url.netloc == test_url.netloc
+ and test_url.path == target
+ )
-@bp.route('/login', methods=['GET', 'POST'])
+@bp.route("/login", methods=["GET", "POST"])
def login() -> Union[str, Response]:
if current_user.is_authenticated:
- return redirect(url_for('default.index'))
+ return redirect(url_for("default.index"))
form = LoginForm(request.form)
if form.validate_on_submit():
try:
- user = accounts_app.user_backend.auth(form.username.data,
- form.password.data)
+ user = accounts_app.user_backend.auth(
+ form.username.data, form.password.data
+ )
login_user(user)
- flash('Erfolgreich eingeloggt', 'success')
+ flash("Erfolgreich eingeloggt", "success")
- next = request.form['next']
- return redirect(next if is_safe_url(next) else url_for('default.index'))
- except (accounts_app.user_backend.NoSuchUserError,
- accounts_app.user_backend.InvalidPasswordError):
- flash('Ungültiger Benutzername und/oder Passwort', 'error')
+ next = request.form["next"]
+ return redirect(
+ next if is_safe_url(next) else url_for("default.index")
+ )
+ except (
+ accounts_app.user_backend.NoSuchUserError,
+ accounts_app.user_backend.InvalidPasswordError,
+ ):
+ flash("Ungültiger Benutzername und/oder Passwort", "error")
- return render_template("login/login.html", form=form,
- next=request.values.get('next'))
+ return render_template(
+ "login/login.html", form=form, next=request.values.get("next")
+ )
-@bp.route('/logout')
+@bp.route("/logout")
def logout() -> Response:
logout_user()
- flash('Erfolgreich ausgeloggt.', 'success')
- return redirect(url_for('.login'))
+ flash("Erfolgreich ausgeloggt.", "success")
+ return redirect(url_for(".login"))
diff --git a/accounts/views/login/forms.py b/accounts/views/login/forms.py
index e4155b4..b9774a3 100644
--- a/accounts/views/login/forms.py
+++ b/accounts/views/login/forms.py
@@ -4,5 +4,5 @@ from wtforms import StringField, PasswordField, validators
class LoginForm(Form):
- username = StringField('Benutzername')
- password = PasswordField('Passwort', [validators.DataRequired()])
+ username = StringField("Benutzername")
+ password = PasswordField("Passwort", [validators.DataRequired()])