# -*- coding: utf-8 -*- import hmac import ldap import pickle import re import struct import subprocess from base64 import urlsafe_b64encode, urlsafe_b64decode from Crypto.Cipher import AES from email.mime.text import MIMEText from functools import wraps from flask import current_app, flash, g, redirect, render_template, request, session from flask import url_for as flask_url_for from flask.ext.wtf import ValidationError from hashlib import sha1 from itertools import izip from random import randint from time import time from werkzeug.exceptions import Forbidden from wtforms.validators import Regexp _username_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9-]{1,15}$') _username_exclude_re = re.compile(r'^(admin|root)') # using http://flask.pocoo.org/docs/patterns/viewdecorators/ def templated(template=None): def templated_(f): @wraps(f) def templated__(*args, **kwargs): template_name = template if template_name is None: template_name = request.endpoint \ .replace('.', '/') + '.html' ctx = f(*args, **kwargs) if ctx is None: ctx = {} elif not isinstance(ctx, dict): return ctx return render_template(template_name, **ctx) return templated__ return templated_ def login_required(f): @wraps(f) def login_required_(*args, **kwargs): if not g.user: raise Forbidden(u'Bitte einloggen!') return f(*args, **kwargs) return login_required_ def admin_required(f): @wraps(f) def admin_required_(*args, **kwargs): if not g.user: raise Forbidden(u'Bitte einloggen!') if g.user.uid not in current_app.config.get('ADMIN_USERS', []): raise Forbidden(u'Du bist kein Admin.') return f(*args, **kwargs) return admin_required_ def logout_required(f): @wraps(f) def logout_required_(*args, **kwargs): if g.user: raise Forbidden(u'Diese Seite ist nur für nicht eingeloggte Benutzer gedacht!') return f(*args, **kwargs) return logout_required_ def login_user(username, password): username = ensure_utf8(username) password = ensure_utf8(password) try: g.user = g.ldap.auth(username, password) except ldap.INVALID_CREDENTIALS: return False except ldap.NO_SUCH_OBJECT: return False session['username'] = username session['password'] = encrypt_password(password) return True def logout_user(): session.pop('username', None) session.pop('password', None) g.user = None def pad(s, numbytes=32, padding='\0'): return s + (numbytes - len(s) % numbytes) * padding def encrypt_password(password): """ Encrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`. The key must be 32 bytes long. """ assert len(current_app.config['PASSWORD_ENCRYPTION_KEY']) == 32 password = ensure_utf8(password) iv = ''.join(chr(randint(0, 0xff)) for i in range(16)) encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv) return iv + encryptor.encrypt(pad(password)) def decrypt_password(ciphertext): """ Decrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`. """ iv = ciphertext[:16] encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv) return encryptor.decrypt(ciphertext[16:]).rstrip('\0').decode('utf8') def make_confirmation(realm, data): """ Create a confirmation token e.g. for confirmation mails. Expects as input a realm to distinguish data for several applications and some data (pickle-able). """ key = '\0'.join((current_app.config['SECRET_KEY'], realm)) payload = ''.join((struct.pack('>i', time()), pickle.dumps(data))) mac = hmac.new(key, payload, sha1) return urlsafe_b64encode(''.join((mac.digest(), payload))).rstrip('=') class ConfirmationInvalid(ValueError): """Raised by `verify_confirmation` on invalid input data""" class ConfirmationTimeout(ValueError): """Raised by `verify_confirmation` when the input data is too old""" def verify_confirmation(realm, token, timeout=None): """ Verify a confirmation token created by `make_confirmation` and, if it is valid, return the original data. If `timeout` is given, only accept the token if it is less than `timeout` seconds old. """ key = '\0'.join((current_app.config['SECRET_KEY'], realm)) token = urlsafe_b64decode(token + '=' * (4 - len(token) % 4)) mac = token[:20] tokentime = struct.unpack('>i', token[20:24])[0] payload = token[24:] if not constant_time_compare(mac, hmac.new(key, token[20:], sha1).digest()): raise ConfirmationInvalid('MAC does not match') if timeout is not None and time() > tokentime + timeout: raise ConfirmationTimeout('Token is too old') return pickle.loads(payload) def http_verify_confirmation(*args, **kwargs): """ Like `verify_confirmation`, but raise HTTP exceptions with appropriate messages instead of `Confirmation{Invalid,Timeout}`. """ try: return verify_confirmation(*args, **kwargs) except ConfirmationInvalid: raise Forbidden(u'Ungültiger Bestätigungslink.') except ConfirmationTimeout: raise Forbidden(u'Bestätigungslink ist zu alt.') # Shamelessly stolen from https://github.com/mitsuhiko/itsdangerous/ # (C) 2011 by Armin Ronacher and the Django Software Foundation. 3-clause BSD. def constant_time_compare(val1, val2): """Returns True if the two strings are equal, False otherwise. The time taken is independent of the number of characters that match. Do not use this function for anything else than comparision with known length targets. This is should be implemented in C in order to get it completely right. """ if _builtin_constant_time_compare is not None: return _builtin_constant_time_compare(val1, val2) len_eq = len(val1) == len(val2) if len_eq: result = 0 left = val1 else: result = 1 left = val2 for x, y in izip(bytearray(left), bytearray(val2)): result |= x ^ y return result == 0 def send_mail(recipient, subject, body, sender=None): if sender is None: sender = current_app.config['MAIL_DEFAULT_SENDER'] safe = lambda s: s.split('\n', 1)[0] msg = MIMEText(body, _charset='utf-8') msg['Subject'] = safe(subject) msg['To'] = safe(recipient) msg['From'] = safe(sender) p = subprocess.Popen([current_app.config['SENDMAIL_COMMAND'], '-t'], stdin=subprocess.PIPE) p.stdin.write(msg.as_string()) p.stdin.close() if p.wait() != 0: raise RuntimeError('sendmail terminated with %d' % p.returncode) class Service(object): def __init__(self, id, name, url): self.id = id self.name = name self.url = url self.changed = None # used by settings view def __repr__(self): return '' % self.id def ensure_utf8(s): if isinstance(s, unicode): s = s.encode('utf8') return s def send_register_confirmation_mail(username, mail): confirm_token = make_confirmation('register', (username, mail)) confirm_link = url_for('register_complete', token=confirm_token, _external=True) body = render_template('mail/register.txt', username=username, mail=mail, link=confirm_link) send_mail(mail, u'E-Mail-Adresse bestätigen', body, sender=current_app.config.get('MAIL_CONFIRM_SENDER')) class NotRegexp(Regexp): """ Like wtforms.validators.Regexp, but rejects data that DOES match the regex. """ def __call__(self, form, field): if self.regex.match(field.data or u''): if self.message is None: self.message = field.gettext(u'Invalid input.') raise ValidationError(self.message) def url_for(endpoint, **values): """Wrap `flask.url_for` so that it always returns https links""" #XXX: Drop this in favor of config.PREFERRED_URL_SCHEME when we require Flask 0.9 u = flask_url_for(endpoint, **values) if '_external' in values and u.startswith('http://') \ and current_app.config['PREFERRED_URL_SCHEME'] == 'https': return 'https://' + u[7:] else: return u # used when we encounter inconsistent data etc class ShouldNotHappen(RuntimeError): pass