diff options
Diffstat (limited to 'utils.py')
-rw-r--r-- | utils.py | 274 |
1 files changed, 0 insertions, 274 deletions
diff --git a/utils.py b/utils.py deleted file mode 100644 index 86991b3..0000000 --- a/utils.py +++ /dev/null @@ -1,274 +0,0 @@ -# -*- coding: utf-8 -*- -import hmac -import ldap -import pickle -import re -import struct -import subprocess -from base64 import urlsafe_b64encode, urlsafe_b64decode -from Crypto.Cipher import AES -from email.mime.text import MIMEText -from email.utils import parseaddr -from functools import wraps -from flask import current_app, flash, g, redirect, render_template, request, session -from flask import url_for as flask_url_for -from hashlib import sha1 -from itertools import izip -from random import randint -from time import time -from werkzeug.exceptions import Forbidden -from wtforms.validators import Regexp, ValidationError - - - -_username_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9-]{1,15}$') -_username_exclude_re = re.compile(r'^(admin|root)') - -# using http://flask.pocoo.org/docs/patterns/viewdecorators/ -def templated(template=None): - def templated_(f): - @wraps(f) - def templated__(*args, **kwargs): - template_name = template - if template_name is None: - template_name = request.endpoint \ - .replace('.', '/') + '.html' - ctx = f(*args, **kwargs) - if ctx is None: - ctx = {} - elif not isinstance(ctx, dict): - return ctx - return render_template(template_name, **ctx) - return templated__ - return templated_ - -def login_required(f): - @wraps(f) - def login_required_(*args, **kwargs): - if not g.user: - raise Forbidden(u'Bitte einloggen!') - return f(*args, **kwargs) - return login_required_ - -def admin_required(f): - @wraps(f) - def admin_required_(*args, **kwargs): - if not g.user: - raise Forbidden(u'Bitte einloggen!') - if g.user.uid not in current_app.config.get('ADMIN_USERS', []): - raise Forbidden(u'Du bist kein Admin.') - return f(*args, **kwargs) - return admin_required_ - -def logout_required(f): - @wraps(f) - def logout_required_(*args, **kwargs): - if g.user: - raise Forbidden(u'Diese Seite ist nur für nicht eingeloggte Benutzer gedacht!') - return f(*args, **kwargs) - return logout_required_ - - -def login_user(username, password): - username = ensure_utf8(username) - password = ensure_utf8(password) - - try: - g.user = g.ldap.auth(username, password) - except ldap.INVALID_CREDENTIALS: - return False - except ldap.NO_SUCH_OBJECT: - return False - - session['username'] = username - session['password'] = encrypt_password(password) - - return True - - -def logout_user(): - session.pop('username', None) - session.pop('password', None) - g.user = None - - -def pad(s, numbytes=32, padding='\0'): - return s + (numbytes - len(s) % numbytes) * padding - -def encrypt_password(password): - """ - Encrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`. - The key must be 32 bytes long. - """ - assert len(current_app.config['PASSWORD_ENCRYPTION_KEY']) == 32 - - password = ensure_utf8(password) - - iv = ''.join(chr(randint(0, 0xff)) for i in range(16)) - encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv) - return iv + encryptor.encrypt(pad(password)) - -def decrypt_password(ciphertext): - """ - Decrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`. - """ - iv = ciphertext[:16] - encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv) - return encryptor.decrypt(ciphertext[16:]).rstrip('\0').decode('utf8') - - -def make_confirmation(realm, data): - """ - Create a confirmation token e.g. for confirmation mails. - - Expects as input a realm to distinguish data for several applications and - some data (pickle-able). - """ - key = '\0'.join((current_app.config['SECRET_KEY'], realm)) - payload = ''.join((struct.pack('>i', time()), pickle.dumps(data))) - mac = hmac.new(key, payload, sha1) - return urlsafe_b64encode(''.join((mac.digest(), payload))).rstrip('=') - -class ConfirmationInvalid(ValueError): - """Raised by `verify_confirmation` on invalid input data""" - -class ConfirmationTimeout(ValueError): - """Raised by `verify_confirmation` when the input data is too old""" - -def verify_confirmation(realm, token, timeout=None): - """ - Verify a confirmation token created by `make_confirmation` and, if it is - valid, return the original data. - If `timeout` is given, only accept the token if it is less than `timeout` - seconds old. - """ - key = '\0'.join((current_app.config['SECRET_KEY'], realm)) - - token = urlsafe_b64decode(token + '=' * (4 - len(token) % 4)) - mac = token[:20] - tokentime = struct.unpack('>i', token[20:24])[0] - payload = token[24:] - - if not constant_time_compare(mac, hmac.new(key, token[20:], sha1).digest()): - raise ConfirmationInvalid('MAC does not match') - - if timeout is not None and time() > tokentime + timeout: - raise ConfirmationTimeout('Token is too old') - - return pickle.loads(payload) - -def http_verify_confirmation(*args, **kwargs): - """ - Like `verify_confirmation`, but raise HTTP exceptions with appropriate - messages instead of `Confirmation{Invalid,Timeout}`. - """ - - try: - return verify_confirmation(*args, **kwargs) - except ConfirmationInvalid: - raise Forbidden(u'Ungültiger Bestätigungslink.') - except ConfirmationTimeout: - raise Forbidden(u'Bestätigungslink ist zu alt.') - - -# Shamelessly stolen from https://github.com/mitsuhiko/itsdangerous/ -# (C) 2011 by Armin Ronacher and the Django Software Foundation. 3-clause BSD. -def constant_time_compare(val1, val2): - """Returns True if the two strings are equal, False otherwise. - - The time taken is independent of the number of characters that match. Do - not use this function for anything else than comparision with known - length targets. - - This is should be implemented in C in order to get it completely right. - """ - len_eq = len(val1) == len(val2) - if len_eq: - result = 0 - left = val1 - else: - result = 1 - left = val2 - for x, y in izip(bytearray(left), bytearray(val2)): - result |= x ^ y - return result == 0 - - -def send_mail(recipient, subject, body, sender=None): - if sender is None: - sender = current_app.config['MAIL_DEFAULT_SENDER'] - - safe = lambda s: s.split('\n', 1)[0] - - msg = MIMEText(body, _charset='utf-8') - msg['Subject'] = safe(subject) - msg['To'] = safe(recipient) - msg['From'] = safe(sender) - - envelope = [] - (name, address) = parseaddr(safe(sender)) - if address != '': - envelope = ['-f', address] - - p = subprocess.Popen([current_app.config['SENDMAIL_COMMAND']] + envelope + ['-t'], - stdin=subprocess.PIPE) - p.stdin.write(msg.as_string()) - p.stdin.close() - - if p.wait() != 0: - raise RuntimeError('sendmail terminated with %d' % p.returncode) - - -class Service(object): - def __init__(self, id, name, url): - self.id = id - self.name = name - self.url = url - self.changed = None # used by settings view - - def __repr__(self): - return '<Service %s>' % self.id - - -def ensure_utf8(s): - if isinstance(s, unicode): - s = s.encode('utf8') - return s - - -def send_register_confirmation_mail(username, mail): - confirm_token = make_confirmation('register', (username, mail)) - confirm_link = url_for('register_complete', token=confirm_token, _external=True) - - body = render_template('mail/register.txt', username=username, - mail=mail, link=confirm_link) - - send_mail(mail, u'E-Mail-Adresse bestätigen', body, - sender=current_app.config.get('MAIL_CONFIRM_SENDER')) - - -class NotRegexp(Regexp): - """ - Like wtforms.validators.Regexp, but rejects data that DOES match the regex. - """ - def __call__(self, form, field): - if self.regex.match(field.data or u''): - if self.message is None: - self.message = field.gettext(u'Invalid input.') - - raise ValidationError(self.message) - - -def url_for(endpoint, **values): - """Wrap `flask.url_for` so that it always returns https links""" - #XXX: Drop this in favor of config.PREFERRED_URL_SCHEME when we require Flask 0.9 - u = flask_url_for(endpoint, **values) - if '_external' in values and u.startswith('http://') \ - and current_app.config['PREFERRED_URL_SCHEME'] == 'https': - return 'https://' + u[7:] - else: - return u - -# used when we encounter inconsistent data etc -class ShouldNotHappen(RuntimeError): - pass |