# -*- coding: utf-8 -*- import hmac import ldap import pickle import re import struct import subprocess from base64 import urlsafe_b64encode, urlsafe_b64decode from Crypto.Cipher import AES from email.mime.text import MIMEText from functools import wraps from flask import current_app, flash, g, redirect, render_template, request, session, url_for from hashlib import sha1 from random import randint from time import time from werkzeug.exceptions import Forbidden _username_re = re.compile(r'^[a-z]{2,16}') # using http://flask.pocoo.org/docs/patterns/viewdecorators/ def templated(template=None): def templated_(f): @wraps(f) def templated__(*args, **kwargs): template_name = template if template_name is None: template_name = request.endpoint \ .replace('.', '/') + '.html' ctx = f(*args, **kwargs) if ctx is None: ctx = {} elif not isinstance(ctx, dict): return ctx return render_template(template_name, **ctx) return templated__ return templated_ def login_required(f): @wraps(f) def login_required_(*args, **kwargs): if not g.user: raise Forbidden return f(*args, **kwargs) return login_required_ def login_user(username, password): try: g.user = g.ldap.auth(username, password) except ldap.INVALID_CREDENTIALS: return False session['username'] = username session['password'] = encrypt_password(password) return True def logout_user(): session.pop('username', None) session.pop('password', None) g.user = None def pad(s, numbytes=32, padding='\0'): return s + (numbytes - len(s) % numbytes) * padding def encrypt_password(password): """ Encrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`. The key must be 32 bytes long. """ assert len(current_app.config['PASSWORD_ENCRYPTION_KEY']) == 32 if isinstance(password, unicode): password = password.encode('utf8') iv = ''.join(chr(randint(0, 0xff)) for i in range(16)) encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv) return iv + encryptor.encrypt(pad(password)) def decrypt_password(ciphertext): """ Decrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`. """ iv = ciphertext[:16] encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv) return encryptor.decrypt(ciphertext[16:]).rstrip('\0') def make_confirmation(realm, data): """ Create a confirmation token e.g. for confirmation mails. Expects as input a realm to distinguish data for several applications and some data (pickle-able). """ key = '\0'.join((current_app.config['SECRET_KEY'], realm)) payload = ''.join((struct.pack('>i', time()), pickle.dumps(data))) mac = hmac.new(key, payload, sha1) return urlsafe_b64encode(''.join((mac.digest(), payload))).rstrip('=') class ConfirmationInvalid(ValueError): """Raised by `verify_confirmation` on invalid input data""" class ConfirmationTimeout(ValueError): """Raised by `verify_confirmation` when the input data is too old""" def verify_confirmation(realm, token, timeout=None): """ Verify a confirmation token created by `make_confirmation` and, if it is valid, return the original data. If `timeout` is given, only accept the token if it is less than `timeout` seconds old. """ key = '\0'.join((current_app.config['SECRET_KEY'], realm)) token = urlsafe_b64decode(token + '=' * (4 - len(token) % 4)) mac = token[:20] tokentime = struct.unpack('>i', token[20:24])[0] payload = token[24:] if mac != hmac.new(key, token[20:], sha1).digest(): raise ConfirmationInvalid('MAC does not match') print '%d+%d=%d <> %d' % (tokentime, timeout, tokentime+timeout, time()) if timeout is not None and time() > tokentime + timeout: raise ConfirmationTimeout('Token is too old') return pickle.loads(payload) def http_verify_confirmation(*args, **kwargs): """ Like `verify_confirmation`, but raise HTTP exceptions with appropriate messages instead of `Confirmation{Invalid,Timeout}`. """ try: return verify_confirmation(*args, **kwargs) except ConfirmationInvalid: raise Forbidden(u'Ungültiger Bestätigungslink') except ConfirmationTimeout: raise Forbidden(u'Bestätigungslink ist zu alt') def send_mail(recipient, subject, body, sender=None): if sender is None: sender = current_app.config['MAIL_DEFAULT_SENDER'] safe = lambda s: s.split('\n', 1)[0] msg = MIMEText(body, _charset='utf-8') msg['Subject'] = safe(subject) msg['To'] = safe(recipient) msg['From'] = safe(sender) p = subprocess.Popen([current_app.config['SENDMAIL_COMMAND'], '-t'], stdin=subprocess.PIPE) p.stdin.write(msg.as_string()) p.stdin.close() if p.wait() != 0: raise RuntimeError('sendmail terminated with %d' % p.returncode)