summaryrefslogtreecommitdiffstats
path: root/accounts/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'accounts/utils.py')
-rw-r--r--accounts/utils.py274
1 files changed, 274 insertions, 0 deletions
diff --git a/accounts/utils.py b/accounts/utils.py
new file mode 100644
index 0000000..86991b3
--- /dev/null
+++ b/accounts/utils.py
@@ -0,0 +1,274 @@
+# -*- coding: utf-8 -*-
+import hmac
+import ldap
+import pickle
+import re
+import struct
+import subprocess
+from base64 import urlsafe_b64encode, urlsafe_b64decode
+from Crypto.Cipher import AES
+from email.mime.text import MIMEText
+from email.utils import parseaddr
+from functools import wraps
+from flask import current_app, flash, g, redirect, render_template, request, session
+from flask import url_for as flask_url_for
+from hashlib import sha1
+from itertools import izip
+from random import randint
+from time import time
+from werkzeug.exceptions import Forbidden
+from wtforms.validators import Regexp, ValidationError
+
+
+
+_username_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9-]{1,15}$')
+_username_exclude_re = re.compile(r'^(admin|root)')
+
+# using http://flask.pocoo.org/docs/patterns/viewdecorators/
+def templated(template=None):
+ def templated_(f):
+ @wraps(f)
+ def templated__(*args, **kwargs):
+ template_name = template
+ if template_name is None:
+ template_name = request.endpoint \
+ .replace('.', '/') + '.html'
+ ctx = f(*args, **kwargs)
+ if ctx is None:
+ ctx = {}
+ elif not isinstance(ctx, dict):
+ return ctx
+ return render_template(template_name, **ctx)
+ return templated__
+ return templated_
+
+def login_required(f):
+ @wraps(f)
+ def login_required_(*args, **kwargs):
+ if not g.user:
+ raise Forbidden(u'Bitte einloggen!')
+ return f(*args, **kwargs)
+ return login_required_
+
+def admin_required(f):
+ @wraps(f)
+ def admin_required_(*args, **kwargs):
+ if not g.user:
+ raise Forbidden(u'Bitte einloggen!')
+ if g.user.uid not in current_app.config.get('ADMIN_USERS', []):
+ raise Forbidden(u'Du bist kein Admin.')
+ return f(*args, **kwargs)
+ return admin_required_
+
+def logout_required(f):
+ @wraps(f)
+ def logout_required_(*args, **kwargs):
+ if g.user:
+ raise Forbidden(u'Diese Seite ist nur für nicht eingeloggte Benutzer gedacht!')
+ return f(*args, **kwargs)
+ return logout_required_
+
+
+def login_user(username, password):
+ username = ensure_utf8(username)
+ password = ensure_utf8(password)
+
+ try:
+ g.user = g.ldap.auth(username, password)
+ except ldap.INVALID_CREDENTIALS:
+ return False
+ except ldap.NO_SUCH_OBJECT:
+ return False
+
+ session['username'] = username
+ session['password'] = encrypt_password(password)
+
+ return True
+
+
+def logout_user():
+ session.pop('username', None)
+ session.pop('password', None)
+ g.user = None
+
+
+def pad(s, numbytes=32, padding='\0'):
+ return s + (numbytes - len(s) % numbytes) * padding
+
+def encrypt_password(password):
+ """
+ Encrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`.
+ The key must be 32 bytes long.
+ """
+ assert len(current_app.config['PASSWORD_ENCRYPTION_KEY']) == 32
+
+ password = ensure_utf8(password)
+
+ iv = ''.join(chr(randint(0, 0xff)) for i in range(16))
+ encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv)
+ return iv + encryptor.encrypt(pad(password))
+
+def decrypt_password(ciphertext):
+ """
+ Decrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`.
+ """
+ iv = ciphertext[:16]
+ encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv)
+ return encryptor.decrypt(ciphertext[16:]).rstrip('\0').decode('utf8')
+
+
+def make_confirmation(realm, data):
+ """
+ Create a confirmation token e.g. for confirmation mails.
+
+ Expects as input a realm to distinguish data for several applications and
+ some data (pickle-able).
+ """
+ key = '\0'.join((current_app.config['SECRET_KEY'], realm))
+ payload = ''.join((struct.pack('>i', time()), pickle.dumps(data)))
+ mac = hmac.new(key, payload, sha1)
+ return urlsafe_b64encode(''.join((mac.digest(), payload))).rstrip('=')
+
+class ConfirmationInvalid(ValueError):
+ """Raised by `verify_confirmation` on invalid input data"""
+
+class ConfirmationTimeout(ValueError):
+ """Raised by `verify_confirmation` when the input data is too old"""
+
+def verify_confirmation(realm, token, timeout=None):
+ """
+ Verify a confirmation token created by `make_confirmation` and, if it is
+ valid, return the original data.
+ If `timeout` is given, only accept the token if it is less than `timeout`
+ seconds old.
+ """
+ key = '\0'.join((current_app.config['SECRET_KEY'], realm))
+
+ token = urlsafe_b64decode(token + '=' * (4 - len(token) % 4))
+ mac = token[:20]
+ tokentime = struct.unpack('>i', token[20:24])[0]
+ payload = token[24:]
+
+ if not constant_time_compare(mac, hmac.new(key, token[20:], sha1).digest()):
+ raise ConfirmationInvalid('MAC does not match')
+
+ if timeout is not None and time() > tokentime + timeout:
+ raise ConfirmationTimeout('Token is too old')
+
+ return pickle.loads(payload)
+
+def http_verify_confirmation(*args, **kwargs):
+ """
+ Like `verify_confirmation`, but raise HTTP exceptions with appropriate
+ messages instead of `Confirmation{Invalid,Timeout}`.
+ """
+
+ try:
+ return verify_confirmation(*args, **kwargs)
+ except ConfirmationInvalid:
+ raise Forbidden(u'Ungültiger Bestätigungslink.')
+ except ConfirmationTimeout:
+ raise Forbidden(u'Bestätigungslink ist zu alt.')
+
+
+# Shamelessly stolen from https://github.com/mitsuhiko/itsdangerous/
+# (C) 2011 by Armin Ronacher and the Django Software Foundation. 3-clause BSD.
+def constant_time_compare(val1, val2):
+ """Returns True if the two strings are equal, False otherwise.
+
+ The time taken is independent of the number of characters that match. Do
+ not use this function for anything else than comparision with known
+ length targets.
+
+ This is should be implemented in C in order to get it completely right.
+ """
+ len_eq = len(val1) == len(val2)
+ if len_eq:
+ result = 0
+ left = val1
+ else:
+ result = 1
+ left = val2
+ for x, y in izip(bytearray(left), bytearray(val2)):
+ result |= x ^ y
+ return result == 0
+
+
+def send_mail(recipient, subject, body, sender=None):
+ if sender is None:
+ sender = current_app.config['MAIL_DEFAULT_SENDER']
+
+ safe = lambda s: s.split('\n', 1)[0]
+
+ msg = MIMEText(body, _charset='utf-8')
+ msg['Subject'] = safe(subject)
+ msg['To'] = safe(recipient)
+ msg['From'] = safe(sender)
+
+ envelope = []
+ (name, address) = parseaddr(safe(sender))
+ if address != '':
+ envelope = ['-f', address]
+
+ p = subprocess.Popen([current_app.config['SENDMAIL_COMMAND']] + envelope + ['-t'],
+ stdin=subprocess.PIPE)
+ p.stdin.write(msg.as_string())
+ p.stdin.close()
+
+ if p.wait() != 0:
+ raise RuntimeError('sendmail terminated with %d' % p.returncode)
+
+
+class Service(object):
+ def __init__(self, id, name, url):
+ self.id = id
+ self.name = name
+ self.url = url
+ self.changed = None # used by settings view
+
+ def __repr__(self):
+ return '<Service %s>' % self.id
+
+
+def ensure_utf8(s):
+ if isinstance(s, unicode):
+ s = s.encode('utf8')
+ return s
+
+
+def send_register_confirmation_mail(username, mail):
+ confirm_token = make_confirmation('register', (username, mail))
+ confirm_link = url_for('register_complete', token=confirm_token, _external=True)
+
+ body = render_template('mail/register.txt', username=username,
+ mail=mail, link=confirm_link)
+
+ send_mail(mail, u'E-Mail-Adresse bestätigen', body,
+ sender=current_app.config.get('MAIL_CONFIRM_SENDER'))
+
+
+class NotRegexp(Regexp):
+ """
+ Like wtforms.validators.Regexp, but rejects data that DOES match the regex.
+ """
+ def __call__(self, form, field):
+ if self.regex.match(field.data or u''):
+ if self.message is None:
+ self.message = field.gettext(u'Invalid input.')
+
+ raise ValidationError(self.message)
+
+
+def url_for(endpoint, **values):
+ """Wrap `flask.url_for` so that it always returns https links"""
+ #XXX: Drop this in favor of config.PREFERRED_URL_SCHEME when we require Flask 0.9
+ u = flask_url_for(endpoint, **values)
+ if '_external' in values and u.startswith('http://') \
+ and current_app.config['PREFERRED_URL_SCHEME'] == 'https':
+ return 'https://' + u[7:]
+ else:
+ return u
+
+# used when we encounter inconsistent data etc
+class ShouldNotHappen(RuntimeError):
+ pass