From 6eb1db6bff15e1611767f5219ee1b4ea558e3d28 Mon Sep 17 00:00:00 2001 From: Alexander Sulfrian Date: Sun, 24 Jan 2016 03:27:56 +0100 Subject: Create utils package --- accounts/utils.py | 250 --------------------------------------------- accounts/utils/__init__.py | 250 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 250 insertions(+), 250 deletions(-) delete mode 100644 accounts/utils.py create mode 100644 accounts/utils/__init__.py diff --git a/accounts/utils.py b/accounts/utils.py deleted file mode 100644 index 8f68733..0000000 --- a/accounts/utils.py +++ /dev/null @@ -1,250 +0,0 @@ -# -*- coding: utf-8 -*- -import hmac -import importlib -import ldap -import pickle -import re -import struct -from base64 import urlsafe_b64encode, urlsafe_b64decode -from Crypto.Cipher import AES -from functools import wraps -from flask import current_app, flash, g, redirect, render_template, request, session -from flask import url_for as flask_url_for -from hashlib import sha1 -from itertools import izip -from random import randint -from time import time -from werkzeug.exceptions import Forbidden -from wtforms.validators import Regexp, ValidationError - - -_username_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9-]{1,15}$') -_username_exclude_re = re.compile(r'^(admin|root)') - -# using http://flask.pocoo.org/docs/patterns/viewdecorators/ -def templated(template=None): - def templated_(f): - @wraps(f) - def templated__(*args, **kwargs): - template_name = template - if template_name is None: - template_name = request.endpoint \ - .replace('.', '/') + '.html' - ctx = f(*args, **kwargs) - if ctx is None: - ctx = {} - elif not isinstance(ctx, dict): - return ctx - return render_template(template_name, **ctx) - return templated__ - return templated_ - -def login_required(f): - @wraps(f) - def login_required_(*args, **kwargs): - if not g.user: - raise Forbidden(u'Bitte einloggen!') - return f(*args, **kwargs) - return login_required_ - -def admin_required(f): - @wraps(f) - def admin_required_(*args, **kwargs): - if not g.user: - raise Forbidden(u'Bitte einloggen!') - if g.user.uid not in current_app.config.get('ADMIN_USERS', []): - raise Forbidden(u'Du bist kein Admin.') - return f(*args, **kwargs) - return admin_required_ - -def logout_required(f): - @wraps(f) - def logout_required_(*args, **kwargs): - if g.user: - raise Forbidden(u'Diese Seite ist nur für nicht eingeloggte Benutzer gedacht!') - return f(*args, **kwargs) - return logout_required_ - - -def login_user(username, password): - username = ensure_utf8(username) - password = ensure_utf8(password) - - try: - g.user = current_app.user_backend.auth(username, password) - except ldap.INVALID_CREDENTIALS: - return False - except ldap.NO_SUCH_OBJECT: - return False - - session['username'] = username - session['password'] = encrypt_password(password) - - return True - - -def logout_user(): - session.pop('username', None) - session.pop('password', None) - g.user = None - - -def pad(s, numbytes=32, padding='\0'): - return s + (numbytes - len(s) % numbytes) * padding - -def encrypt_password(password): - """ - Encrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`. - The key must be 32 bytes long. - """ - assert len(current_app.config['PASSWORD_ENCRYPTION_KEY']) == 32 - - password = ensure_utf8(password) - - iv = ''.join(chr(randint(0, 0xff)) for i in range(16)) - encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv) - return iv + encryptor.encrypt(pad(password)) - -def decrypt_password(ciphertext): - """ - Decrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`. - """ - iv = ciphertext[:16] - encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv) - return encryptor.decrypt(ciphertext[16:]).rstrip('\0').decode('utf8') - - -def make_confirmation(realm, data): - """ - Create a confirmation token e.g. for confirmation mails. - - Expects as input a realm to distinguish data for several applications and - some data (pickle-able). - """ - key = '\0'.join((current_app.config['SECRET_KEY'], realm)) - payload = ''.join((struct.pack('>i', time()), pickle.dumps(data))) - mac = hmac.new(key, payload, sha1) - return urlsafe_b64encode(''.join((mac.digest(), payload))).rstrip('=') - -class ConfirmationInvalid(ValueError): - """Raised by `verify_confirmation` on invalid input data""" - -class ConfirmationTimeout(ValueError): - """Raised by `verify_confirmation` when the input data is too old""" - -def verify_confirmation(realm, token, timeout=None): - """ - Verify a confirmation token created by `make_confirmation` and, if it is - valid, return the original data. - If `timeout` is given, only accept the token if it is less than `timeout` - seconds old. - """ - key = '\0'.join((current_app.config['SECRET_KEY'], realm)) - - token = urlsafe_b64decode(token + '=' * (4 - len(token) % 4)) - mac = token[:20] - tokentime = struct.unpack('>i', token[20:24])[0] - payload = token[24:] - - if not constant_time_compare(mac, hmac.new(key, token[20:], sha1).digest()): - raise ConfirmationInvalid('MAC does not match') - - if timeout is not None and time() > tokentime + timeout: - raise ConfirmationTimeout('Token is too old') - - return pickle.loads(payload) - -def http_verify_confirmation(*args, **kwargs): - """ - Like `verify_confirmation`, but raise HTTP exceptions with appropriate - messages instead of `Confirmation{Invalid,Timeout}`. - """ - - try: - return verify_confirmation(*args, **kwargs) - except ConfirmationInvalid: - raise Forbidden(u'Ungültiger Bestätigungslink.') - except ConfirmationTimeout: - raise Forbidden(u'Bestätigungslink ist zu alt.') - - -# Shamelessly stolen from https://github.com/mitsuhiko/itsdangerous/ -# (C) 2011 by Armin Ronacher and the Django Software Foundation. 3-clause BSD. -def constant_time_compare(val1, val2): - """Returns True if the two strings are equal, False otherwise. - - The time taken is independent of the number of characters that match. Do - not use this function for anything else than comparision with known - length targets. - - This is should be implemented in C in order to get it completely right. - """ - len_eq = len(val1) == len(val2) - if len_eq: - result = 0 - left = val1 - else: - result = 1 - left = val2 - for x, y in izip(bytearray(left), bytearray(val2)): - result |= x ^ y - return result == 0 - - -class Service(object): - def __init__(self, id, name, url): - self.id = id - self.name = name - self.url = url - self.changed = None # used by settings view - - def __repr__(self): - return '' % self.id - - -def ensure_utf8(s): - if isinstance(s, unicode): - s = s.encode('utf8') - return s - - -def send_register_confirmation_mail(username, mail): - confirm_token = make_confirmation('register', (username, mail)) - confirm_link = url_for('default.register_complete', token=confirm_token, _external=True) - - body = render_template('mail/register.txt', username=username, - mail=mail, link=confirm_link) - - current_app.mail_backend.send( - mail, u'E-Mail-Adresse bestätigen', body, - sender=current_app.config.get('MAIL_CONFIRM_SENDER')) - - -class NotRegexp(Regexp): - """ - Like wtforms.validators.Regexp, but rejects data that DOES match the regex. - """ - def __call__(self, form, field): - if self.regex.match(field.data or u''): - if self.message is None: - self.message = field.gettext(u'Invalid input.') - - raise ValidationError(self.message) - - -def url_for(endpoint, **values): - """Wrap `flask.url_for` so that it always returns https links""" - #XXX: Drop this in favor of config.PREFERRED_URL_SCHEME when we require Flask 0.9 - u = flask_url_for(endpoint, **values) - if '_external' in values and u.startswith('http://') \ - and current_app.config['PREFERRED_URL_SCHEME'] == 'https': - return 'https://' + u[7:] - else: - return u - - -def get_backend(path, app): - module = path.rsplit(".", 1).pop() - class_name = '%sBackend' % module.title() - backend_class = getattr(importlib.import_module(path), class_name) - return backend_class(app) diff --git a/accounts/utils/__init__.py b/accounts/utils/__init__.py new file mode 100644 index 0000000..8f68733 --- /dev/null +++ b/accounts/utils/__init__.py @@ -0,0 +1,250 @@ +# -*- coding: utf-8 -*- +import hmac +import importlib +import ldap +import pickle +import re +import struct +from base64 import urlsafe_b64encode, urlsafe_b64decode +from Crypto.Cipher import AES +from functools import wraps +from flask import current_app, flash, g, redirect, render_template, request, session +from flask import url_for as flask_url_for +from hashlib import sha1 +from itertools import izip +from random import randint +from time import time +from werkzeug.exceptions import Forbidden +from wtforms.validators import Regexp, ValidationError + + +_username_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9-]{1,15}$') +_username_exclude_re = re.compile(r'^(admin|root)') + +# using http://flask.pocoo.org/docs/patterns/viewdecorators/ +def templated(template=None): + def templated_(f): + @wraps(f) + def templated__(*args, **kwargs): + template_name = template + if template_name is None: + template_name = request.endpoint \ + .replace('.', '/') + '.html' + ctx = f(*args, **kwargs) + if ctx is None: + ctx = {} + elif not isinstance(ctx, dict): + return ctx + return render_template(template_name, **ctx) + return templated__ + return templated_ + +def login_required(f): + @wraps(f) + def login_required_(*args, **kwargs): + if not g.user: + raise Forbidden(u'Bitte einloggen!') + return f(*args, **kwargs) + return login_required_ + +def admin_required(f): + @wraps(f) + def admin_required_(*args, **kwargs): + if not g.user: + raise Forbidden(u'Bitte einloggen!') + if g.user.uid not in current_app.config.get('ADMIN_USERS', []): + raise Forbidden(u'Du bist kein Admin.') + return f(*args, **kwargs) + return admin_required_ + +def logout_required(f): + @wraps(f) + def logout_required_(*args, **kwargs): + if g.user: + raise Forbidden(u'Diese Seite ist nur für nicht eingeloggte Benutzer gedacht!') + return f(*args, **kwargs) + return logout_required_ + + +def login_user(username, password): + username = ensure_utf8(username) + password = ensure_utf8(password) + + try: + g.user = current_app.user_backend.auth(username, password) + except ldap.INVALID_CREDENTIALS: + return False + except ldap.NO_SUCH_OBJECT: + return False + + session['username'] = username + session['password'] = encrypt_password(password) + + return True + + +def logout_user(): + session.pop('username', None) + session.pop('password', None) + g.user = None + + +def pad(s, numbytes=32, padding='\0'): + return s + (numbytes - len(s) % numbytes) * padding + +def encrypt_password(password): + """ + Encrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`. + The key must be 32 bytes long. + """ + assert len(current_app.config['PASSWORD_ENCRYPTION_KEY']) == 32 + + password = ensure_utf8(password) + + iv = ''.join(chr(randint(0, 0xff)) for i in range(16)) + encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv) + return iv + encryptor.encrypt(pad(password)) + +def decrypt_password(ciphertext): + """ + Decrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`. + """ + iv = ciphertext[:16] + encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv) + return encryptor.decrypt(ciphertext[16:]).rstrip('\0').decode('utf8') + + +def make_confirmation(realm, data): + """ + Create a confirmation token e.g. for confirmation mails. + + Expects as input a realm to distinguish data for several applications and + some data (pickle-able). + """ + key = '\0'.join((current_app.config['SECRET_KEY'], realm)) + payload = ''.join((struct.pack('>i', time()), pickle.dumps(data))) + mac = hmac.new(key, payload, sha1) + return urlsafe_b64encode(''.join((mac.digest(), payload))).rstrip('=') + +class ConfirmationInvalid(ValueError): + """Raised by `verify_confirmation` on invalid input data""" + +class ConfirmationTimeout(ValueError): + """Raised by `verify_confirmation` when the input data is too old""" + +def verify_confirmation(realm, token, timeout=None): + """ + Verify a confirmation token created by `make_confirmation` and, if it is + valid, return the original data. + If `timeout` is given, only accept the token if it is less than `timeout` + seconds old. + """ + key = '\0'.join((current_app.config['SECRET_KEY'], realm)) + + token = urlsafe_b64decode(token + '=' * (4 - len(token) % 4)) + mac = token[:20] + tokentime = struct.unpack('>i', token[20:24])[0] + payload = token[24:] + + if not constant_time_compare(mac, hmac.new(key, token[20:], sha1).digest()): + raise ConfirmationInvalid('MAC does not match') + + if timeout is not None and time() > tokentime + timeout: + raise ConfirmationTimeout('Token is too old') + + return pickle.loads(payload) + +def http_verify_confirmation(*args, **kwargs): + """ + Like `verify_confirmation`, but raise HTTP exceptions with appropriate + messages instead of `Confirmation{Invalid,Timeout}`. + """ + + try: + return verify_confirmation(*args, **kwargs) + except ConfirmationInvalid: + raise Forbidden(u'Ungültiger Bestätigungslink.') + except ConfirmationTimeout: + raise Forbidden(u'Bestätigungslink ist zu alt.') + + +# Shamelessly stolen from https://github.com/mitsuhiko/itsdangerous/ +# (C) 2011 by Armin Ronacher and the Django Software Foundation. 3-clause BSD. +def constant_time_compare(val1, val2): + """Returns True if the two strings are equal, False otherwise. + + The time taken is independent of the number of characters that match. Do + not use this function for anything else than comparision with known + length targets. + + This is should be implemented in C in order to get it completely right. + """ + len_eq = len(val1) == len(val2) + if len_eq: + result = 0 + left = val1 + else: + result = 1 + left = val2 + for x, y in izip(bytearray(left), bytearray(val2)): + result |= x ^ y + return result == 0 + + +class Service(object): + def __init__(self, id, name, url): + self.id = id + self.name = name + self.url = url + self.changed = None # used by settings view + + def __repr__(self): + return '' % self.id + + +def ensure_utf8(s): + if isinstance(s, unicode): + s = s.encode('utf8') + return s + + +def send_register_confirmation_mail(username, mail): + confirm_token = make_confirmation('register', (username, mail)) + confirm_link = url_for('default.register_complete', token=confirm_token, _external=True) + + body = render_template('mail/register.txt', username=username, + mail=mail, link=confirm_link) + + current_app.mail_backend.send( + mail, u'E-Mail-Adresse bestätigen', body, + sender=current_app.config.get('MAIL_CONFIRM_SENDER')) + + +class NotRegexp(Regexp): + """ + Like wtforms.validators.Regexp, but rejects data that DOES match the regex. + """ + def __call__(self, form, field): + if self.regex.match(field.data or u''): + if self.message is None: + self.message = field.gettext(u'Invalid input.') + + raise ValidationError(self.message) + + +def url_for(endpoint, **values): + """Wrap `flask.url_for` so that it always returns https links""" + #XXX: Drop this in favor of config.PREFERRED_URL_SCHEME when we require Flask 0.9 + u = flask_url_for(endpoint, **values) + if '_external' in values and u.startswith('http://') \ + and current_app.config['PREFERRED_URL_SCHEME'] == 'https': + return 'https://' + u[7:] + else: + return u + + +def get_backend(path, app): + module = path.rsplit(".", 1).pop() + class_name = '%sBackend' % module.title() + backend_class = getattr(importlib.import_module(path), class_name) + return backend_class(app) -- cgit v1.2.3-1-g7c22