diff options
author | Alexander Sulfrian <alexander@sulfrian.net> | 2016-01-24 03:27:56 +0100 |
---|---|---|
committer | Alexander Sulfrian <alexander@sulfrian.net> | 2016-02-02 04:22:16 +0100 |
commit | 6eb1db6bff15e1611767f5219ee1b4ea558e3d28 (patch) | |
tree | 91365c6b016edf9122094e6f45ec88606591d904 /accounts/utils/__init__.py | |
parent | 081e388bdb401ff877baf0b27d17dc9a56076e59 (diff) | |
download | web-6eb1db6bff15e1611767f5219ee1b4ea558e3d28.tar.gz web-6eb1db6bff15e1611767f5219ee1b4ea558e3d28.tar.bz2 web-6eb1db6bff15e1611767f5219ee1b4ea558e3d28.zip |
Create utils package
Diffstat (limited to 'accounts/utils/__init__.py')
-rw-r--r-- | accounts/utils/__init__.py | 250 |
1 files changed, 250 insertions, 0 deletions
diff --git a/accounts/utils/__init__.py b/accounts/utils/__init__.py new file mode 100644 index 0000000..8f68733 --- /dev/null +++ b/accounts/utils/__init__.py @@ -0,0 +1,250 @@ +# -*- coding: utf-8 -*- +import hmac +import importlib +import ldap +import pickle +import re +import struct +from base64 import urlsafe_b64encode, urlsafe_b64decode +from Crypto.Cipher import AES +from functools import wraps +from flask import current_app, flash, g, redirect, render_template, request, session +from flask import url_for as flask_url_for +from hashlib import sha1 +from itertools import izip +from random import randint +from time import time +from werkzeug.exceptions import Forbidden +from wtforms.validators import Regexp, ValidationError + + +_username_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9-]{1,15}$') +_username_exclude_re = re.compile(r'^(admin|root)') + +# using http://flask.pocoo.org/docs/patterns/viewdecorators/ +def templated(template=None): + def templated_(f): + @wraps(f) + def templated__(*args, **kwargs): + template_name = template + if template_name is None: + template_name = request.endpoint \ + .replace('.', '/') + '.html' + ctx = f(*args, **kwargs) + if ctx is None: + ctx = {} + elif not isinstance(ctx, dict): + return ctx + return render_template(template_name, **ctx) + return templated__ + return templated_ + +def login_required(f): + @wraps(f) + def login_required_(*args, **kwargs): + if not g.user: + raise Forbidden(u'Bitte einloggen!') + return f(*args, **kwargs) + return login_required_ + +def admin_required(f): + @wraps(f) + def admin_required_(*args, **kwargs): + if not g.user: + raise Forbidden(u'Bitte einloggen!') + if g.user.uid not in current_app.config.get('ADMIN_USERS', []): + raise Forbidden(u'Du bist kein Admin.') + return f(*args, **kwargs) + return admin_required_ + +def logout_required(f): + @wraps(f) + def logout_required_(*args, **kwargs): + if g.user: + raise Forbidden(u'Diese Seite ist nur für nicht eingeloggte Benutzer gedacht!') + return f(*args, **kwargs) + return logout_required_ + + +def login_user(username, password): + username = ensure_utf8(username) + password = ensure_utf8(password) + + try: + g.user = current_app.user_backend.auth(username, password) + except ldap.INVALID_CREDENTIALS: + return False + except ldap.NO_SUCH_OBJECT: + return False + + session['username'] = username + session['password'] = encrypt_password(password) + + return True + + +def logout_user(): + session.pop('username', None) + session.pop('password', None) + g.user = None + + +def pad(s, numbytes=32, padding='\0'): + return s + (numbytes - len(s) % numbytes) * padding + +def encrypt_password(password): + """ + Encrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`. + The key must be 32 bytes long. + """ + assert len(current_app.config['PASSWORD_ENCRYPTION_KEY']) == 32 + + password = ensure_utf8(password) + + iv = ''.join(chr(randint(0, 0xff)) for i in range(16)) + encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv) + return iv + encryptor.encrypt(pad(password)) + +def decrypt_password(ciphertext): + """ + Decrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`. + """ + iv = ciphertext[:16] + encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv) + return encryptor.decrypt(ciphertext[16:]).rstrip('\0').decode('utf8') + + +def make_confirmation(realm, data): + """ + Create a confirmation token e.g. for confirmation mails. + + Expects as input a realm to distinguish data for several applications and + some data (pickle-able). + """ + key = '\0'.join((current_app.config['SECRET_KEY'], realm)) + payload = ''.join((struct.pack('>i', time()), pickle.dumps(data))) + mac = hmac.new(key, payload, sha1) + return urlsafe_b64encode(''.join((mac.digest(), payload))).rstrip('=') + +class ConfirmationInvalid(ValueError): + """Raised by `verify_confirmation` on invalid input data""" + +class ConfirmationTimeout(ValueError): + """Raised by `verify_confirmation` when the input data is too old""" + +def verify_confirmation(realm, token, timeout=None): + """ + Verify a confirmation token created by `make_confirmation` and, if it is + valid, return the original data. + If `timeout` is given, only accept the token if it is less than `timeout` + seconds old. + """ + key = '\0'.join((current_app.config['SECRET_KEY'], realm)) + + token = urlsafe_b64decode(token + '=' * (4 - len(token) % 4)) + mac = token[:20] + tokentime = struct.unpack('>i', token[20:24])[0] + payload = token[24:] + + if not constant_time_compare(mac, hmac.new(key, token[20:], sha1).digest()): + raise ConfirmationInvalid('MAC does not match') + + if timeout is not None and time() > tokentime + timeout: + raise ConfirmationTimeout('Token is too old') + + return pickle.loads(payload) + +def http_verify_confirmation(*args, **kwargs): + """ + Like `verify_confirmation`, but raise HTTP exceptions with appropriate + messages instead of `Confirmation{Invalid,Timeout}`. + """ + + try: + return verify_confirmation(*args, **kwargs) + except ConfirmationInvalid: + raise Forbidden(u'Ungültiger Bestätigungslink.') + except ConfirmationTimeout: + raise Forbidden(u'Bestätigungslink ist zu alt.') + + +# Shamelessly stolen from https://github.com/mitsuhiko/itsdangerous/ +# (C) 2011 by Armin Ronacher and the Django Software Foundation. 3-clause BSD. +def constant_time_compare(val1, val2): + """Returns True if the two strings are equal, False otherwise. + + The time taken is independent of the number of characters that match. Do + not use this function for anything else than comparision with known + length targets. + + This is should be implemented in C in order to get it completely right. + """ + len_eq = len(val1) == len(val2) + if len_eq: + result = 0 + left = val1 + else: + result = 1 + left = val2 + for x, y in izip(bytearray(left), bytearray(val2)): + result |= x ^ y + return result == 0 + + +class Service(object): + def __init__(self, id, name, url): + self.id = id + self.name = name + self.url = url + self.changed = None # used by settings view + + def __repr__(self): + return '<Service %s>' % self.id + + +def ensure_utf8(s): + if isinstance(s, unicode): + s = s.encode('utf8') + return s + + +def send_register_confirmation_mail(username, mail): + confirm_token = make_confirmation('register', (username, mail)) + confirm_link = url_for('default.register_complete', token=confirm_token, _external=True) + + body = render_template('mail/register.txt', username=username, + mail=mail, link=confirm_link) + + current_app.mail_backend.send( + mail, u'E-Mail-Adresse bestätigen', body, + sender=current_app.config.get('MAIL_CONFIRM_SENDER')) + + +class NotRegexp(Regexp): + """ + Like wtforms.validators.Regexp, but rejects data that DOES match the regex. + """ + def __call__(self, form, field): + if self.regex.match(field.data or u''): + if self.message is None: + self.message = field.gettext(u'Invalid input.') + + raise ValidationError(self.message) + + +def url_for(endpoint, **values): + """Wrap `flask.url_for` so that it always returns https links""" + #XXX: Drop this in favor of config.PREFERRED_URL_SCHEME when we require Flask 0.9 + u = flask_url_for(endpoint, **values) + if '_external' in values and u.startswith('http://') \ + and current_app.config['PREFERRED_URL_SCHEME'] == 'https': + return 'https://' + u[7:] + else: + return u + + +def get_backend(path, app): + module = path.rsplit(".", 1).pop() + class_name = '%sBackend' % module.title() + backend_class = getattr(importlib.import_module(path), class_name) + return backend_class(app) |