summaryrefslogtreecommitdiffstats
path: root/utils.py
diff options
context:
space:
mode:
authorAlexander Sulfrian <alexander@sulfrian.net>2016-01-23 13:04:35 +0100
committerAlexander Sulfrian <alexander@sulfrian.net>2016-01-25 01:56:13 +0100
commit899947bd00df10cca15aca1b3c14125b38b35ecf (patch)
treecbe0cd8023efa657b478476a70f83d2f5cb5509b /utils.py
parentbd20d0ff1c7a582f3c53d30bfc387139c419ef35 (diff)
downloadweb-899947bd00df10cca15aca1b3c14125b38b35ecf.tar.gz
web-899947bd00df10cca15aca1b3c14125b38b35ecf.tar.bz2
web-899947bd00df10cca15aca1b3c14125b38b35ecf.zip
Moved everything into a package
Diffstat (limited to 'utils.py')
-rw-r--r--utils.py274
1 files changed, 0 insertions, 274 deletions
diff --git a/utils.py b/utils.py
deleted file mode 100644
index 86991b3..0000000
--- a/utils.py
+++ /dev/null
@@ -1,274 +0,0 @@
-# -*- coding: utf-8 -*-
-import hmac
-import ldap
-import pickle
-import re
-import struct
-import subprocess
-from base64 import urlsafe_b64encode, urlsafe_b64decode
-from Crypto.Cipher import AES
-from email.mime.text import MIMEText
-from email.utils import parseaddr
-from functools import wraps
-from flask import current_app, flash, g, redirect, render_template, request, session
-from flask import url_for as flask_url_for
-from hashlib import sha1
-from itertools import izip
-from random import randint
-from time import time
-from werkzeug.exceptions import Forbidden
-from wtforms.validators import Regexp, ValidationError
-
-
-
-_username_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9-]{1,15}$')
-_username_exclude_re = re.compile(r'^(admin|root)')
-
-# using http://flask.pocoo.org/docs/patterns/viewdecorators/
-def templated(template=None):
- def templated_(f):
- @wraps(f)
- def templated__(*args, **kwargs):
- template_name = template
- if template_name is None:
- template_name = request.endpoint \
- .replace('.', '/') + '.html'
- ctx = f(*args, **kwargs)
- if ctx is None:
- ctx = {}
- elif not isinstance(ctx, dict):
- return ctx
- return render_template(template_name, **ctx)
- return templated__
- return templated_
-
-def login_required(f):
- @wraps(f)
- def login_required_(*args, **kwargs):
- if not g.user:
- raise Forbidden(u'Bitte einloggen!')
- return f(*args, **kwargs)
- return login_required_
-
-def admin_required(f):
- @wraps(f)
- def admin_required_(*args, **kwargs):
- if not g.user:
- raise Forbidden(u'Bitte einloggen!')
- if g.user.uid not in current_app.config.get('ADMIN_USERS', []):
- raise Forbidden(u'Du bist kein Admin.')
- return f(*args, **kwargs)
- return admin_required_
-
-def logout_required(f):
- @wraps(f)
- def logout_required_(*args, **kwargs):
- if g.user:
- raise Forbidden(u'Diese Seite ist nur für nicht eingeloggte Benutzer gedacht!')
- return f(*args, **kwargs)
- return logout_required_
-
-
-def login_user(username, password):
- username = ensure_utf8(username)
- password = ensure_utf8(password)
-
- try:
- g.user = g.ldap.auth(username, password)
- except ldap.INVALID_CREDENTIALS:
- return False
- except ldap.NO_SUCH_OBJECT:
- return False
-
- session['username'] = username
- session['password'] = encrypt_password(password)
-
- return True
-
-
-def logout_user():
- session.pop('username', None)
- session.pop('password', None)
- g.user = None
-
-
-def pad(s, numbytes=32, padding='\0'):
- return s + (numbytes - len(s) % numbytes) * padding
-
-def encrypt_password(password):
- """
- Encrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`.
- The key must be 32 bytes long.
- """
- assert len(current_app.config['PASSWORD_ENCRYPTION_KEY']) == 32
-
- password = ensure_utf8(password)
-
- iv = ''.join(chr(randint(0, 0xff)) for i in range(16))
- encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv)
- return iv + encryptor.encrypt(pad(password))
-
-def decrypt_password(ciphertext):
- """
- Decrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`.
- """
- iv = ciphertext[:16]
- encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv)
- return encryptor.decrypt(ciphertext[16:]).rstrip('\0').decode('utf8')
-
-
-def make_confirmation(realm, data):
- """
- Create a confirmation token e.g. for confirmation mails.
-
- Expects as input a realm to distinguish data for several applications and
- some data (pickle-able).
- """
- key = '\0'.join((current_app.config['SECRET_KEY'], realm))
- payload = ''.join((struct.pack('>i', time()), pickle.dumps(data)))
- mac = hmac.new(key, payload, sha1)
- return urlsafe_b64encode(''.join((mac.digest(), payload))).rstrip('=')
-
-class ConfirmationInvalid(ValueError):
- """Raised by `verify_confirmation` on invalid input data"""
-
-class ConfirmationTimeout(ValueError):
- """Raised by `verify_confirmation` when the input data is too old"""
-
-def verify_confirmation(realm, token, timeout=None):
- """
- Verify a confirmation token created by `make_confirmation` and, if it is
- valid, return the original data.
- If `timeout` is given, only accept the token if it is less than `timeout`
- seconds old.
- """
- key = '\0'.join((current_app.config['SECRET_KEY'], realm))
-
- token = urlsafe_b64decode(token + '=' * (4 - len(token) % 4))
- mac = token[:20]
- tokentime = struct.unpack('>i', token[20:24])[0]
- payload = token[24:]
-
- if not constant_time_compare(mac, hmac.new(key, token[20:], sha1).digest()):
- raise ConfirmationInvalid('MAC does not match')
-
- if timeout is not None and time() > tokentime + timeout:
- raise ConfirmationTimeout('Token is too old')
-
- return pickle.loads(payload)
-
-def http_verify_confirmation(*args, **kwargs):
- """
- Like `verify_confirmation`, but raise HTTP exceptions with appropriate
- messages instead of `Confirmation{Invalid,Timeout}`.
- """
-
- try:
- return verify_confirmation(*args, **kwargs)
- except ConfirmationInvalid:
- raise Forbidden(u'Ungültiger Bestätigungslink.')
- except ConfirmationTimeout:
- raise Forbidden(u'Bestätigungslink ist zu alt.')
-
-
-# Shamelessly stolen from https://github.com/mitsuhiko/itsdangerous/
-# (C) 2011 by Armin Ronacher and the Django Software Foundation. 3-clause BSD.
-def constant_time_compare(val1, val2):
- """Returns True if the two strings are equal, False otherwise.
-
- The time taken is independent of the number of characters that match. Do
- not use this function for anything else than comparision with known
- length targets.
-
- This is should be implemented in C in order to get it completely right.
- """
- len_eq = len(val1) == len(val2)
- if len_eq:
- result = 0
- left = val1
- else:
- result = 1
- left = val2
- for x, y in izip(bytearray(left), bytearray(val2)):
- result |= x ^ y
- return result == 0
-
-
-def send_mail(recipient, subject, body, sender=None):
- if sender is None:
- sender = current_app.config['MAIL_DEFAULT_SENDER']
-
- safe = lambda s: s.split('\n', 1)[0]
-
- msg = MIMEText(body, _charset='utf-8')
- msg['Subject'] = safe(subject)
- msg['To'] = safe(recipient)
- msg['From'] = safe(sender)
-
- envelope = []
- (name, address) = parseaddr(safe(sender))
- if address != '':
- envelope = ['-f', address]
-
- p = subprocess.Popen([current_app.config['SENDMAIL_COMMAND']] + envelope + ['-t'],
- stdin=subprocess.PIPE)
- p.stdin.write(msg.as_string())
- p.stdin.close()
-
- if p.wait() != 0:
- raise RuntimeError('sendmail terminated with %d' % p.returncode)
-
-
-class Service(object):
- def __init__(self, id, name, url):
- self.id = id
- self.name = name
- self.url = url
- self.changed = None # used by settings view
-
- def __repr__(self):
- return '<Service %s>' % self.id
-
-
-def ensure_utf8(s):
- if isinstance(s, unicode):
- s = s.encode('utf8')
- return s
-
-
-def send_register_confirmation_mail(username, mail):
- confirm_token = make_confirmation('register', (username, mail))
- confirm_link = url_for('register_complete', token=confirm_token, _external=True)
-
- body = render_template('mail/register.txt', username=username,
- mail=mail, link=confirm_link)
-
- send_mail(mail, u'E-Mail-Adresse bestätigen', body,
- sender=current_app.config.get('MAIL_CONFIRM_SENDER'))
-
-
-class NotRegexp(Regexp):
- """
- Like wtforms.validators.Regexp, but rejects data that DOES match the regex.
- """
- def __call__(self, form, field):
- if self.regex.match(field.data or u''):
- if self.message is None:
- self.message = field.gettext(u'Invalid input.')
-
- raise ValidationError(self.message)
-
-
-def url_for(endpoint, **values):
- """Wrap `flask.url_for` so that it always returns https links"""
- #XXX: Drop this in favor of config.PREFERRED_URL_SCHEME when we require Flask 0.9
- u = flask_url_for(endpoint, **values)
- if '_external' in values and u.startswith('http://') \
- and current_app.config['PREFERRED_URL_SCHEME'] == 'https':
- return 'https://' + u[7:]
- else:
- return u
-
-# used when we encounter inconsistent data etc
-class ShouldNotHappen(RuntimeError):
- pass