1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# -*- coding: utf-8 -*-
import hmac
import ldap
import pickle
import re
from Crypto.Cipher import AES
from email.mime.text import MIMEText
from functools import wraps
from flask import current_app, flash, g, redirect, render_template, request, session, url_for
from hashlib import sha1
from random import randint
from werkzeug.exceptions import Forbidden
_username_re = re.compile(r'^[a-z]{2,16}')
# using http://flask.pocoo.org/docs/patterns/viewdecorators/
def templated(template=None):
def templated_(f):
@wraps(f)
def templated__(*args, **kwargs):
template_name = template
if template_name is None:
template_name = request.endpoint \
.replace('.', '/') + '.html'
ctx = f(*args, **kwargs)
if ctx is None:
ctx = {}
elif not isinstance(ctx, dict):
return ctx
return render_template(template_name, **ctx)
return templated__
return templated_
def login_required(f):
@wraps(f)
def login_required_(*args, **kwargs):
if not g.user:
raise Forbidden
return f(*args, **kwargs)
return login_required_
def login_user(username, password):
try:
g.user = g.ldap.auth(username, password)
except ldap.INVALID_CREDENTIALS:
return False
session['username'] = username
session['password'] = encrypt_password(password)
return True
def logout_user():
session.pop('username', None)
session.pop('password', None)
g.user = None
def pad(s, numbytes=32, padding='\0'):
return s + (numbytes - len(s) % numbytes) * padding
def encrypt_password(password):
"""
Encrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`.
The key must be 32 bytes long.
"""
assert len(current_app.config['PASSWORD_ENCRYPTION_KEY']) == 32
if isinstance(password, unicode):
password = password.encode('utf8')
iv = ''.join(chr(randint(0, 0xff)) for i in range(16))
encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv)
return iv + encryptor.encrypt(pad(password))
def decrypt_password(ciphertext):
"""
Decrypt the given password with `config.PASSWORD_ENCRYPTION_KEY`.
"""
iv = ciphertext[:16]
encryptor = AES.new(current_app.config['PASSWORD_ENCRYPTION_KEY'], AES.MODE_CBC, iv)
return encryptor.decrypt(ciphertext[16:]).rstrip('\0')
def create_confirmation(realm, data):
"""
Create a confirmation token e.g. for confirmation mails.
Expects as input a realm to distinguish data for several applications and
some data (pickle-able).
"""
key = '\0'.join((current_app.config['SECRET_KEY'], realm))
payload = pickle.dumps(data)
mac = hmac.new(key, payload, sha1)
return ''.join((mac.digest(), payload)).encode('base64').strip()
class InvalidConfirmation(ValueError):
"""Raised by `verify_confirmation` on invalid input data"""
def verify_confirmation(realm, token):
"""
Verify a confirmation created by `create_confirmation` and, if it is
valid, return the original data.
"""
key = '\0'.join((current_app.config['SECRET_KEY'], realm))
token = token.decode('base64')
mac = token[:20]
payload = token[20:]
if mac != hmac.new(key, payload, sha1).digest():
raise InvalidConfirmation('MAC does not match')
return pickle.loads(payload)
def send_mail(recipient, subject, body, sender=None):
if sender is None:
sender = current_app.config['MAIL_DEFAULT_SENDER']
safe = lambda s: s.split('\n', 1)[0]
msg = MIMEText(body, _charset='utf-8')
msg['Subject'] = safe(subject)
msg['To'] = safe(recipient)
msg['From'] = safe(sender)
p = subprocess.Popen([current_app.config['SENDMAIL_COMMAND'], '-t'],
stdin=subprocess.PIPE)
p.stdin.write(msg.as_string())
p.stdin.close()
if p.wait() != 0:
raise RuntimeError('sendmail terminated with %d' % p.returncode)
|