from flask_peewee.auth import Auth from flask_peewee.utils import get_next from flask import session, url_for, request, redirect from models import User, Session from app import app, db, pad from datetime import datetime from padlite import APIException import ldap import uuid import functools class LdapAuth(Auth): def get_user_model(self): return User def authenticate(self, username, password): ldap.protocol_version = 3 l = ldap.initialize(app.config['LDAP']['host']) l.set_option( ldap.OPT_X_TLS_DEMAND, True ) try: user_dn = self._format_dn([('uid', username)]) l.simple_bind_s(user_dn, password) except ldap.INVALID_CREDENTIALS: return False try: user = User.get(User.username == username) except User.DoesNotExist: user_data = l.search_s(user_dn, ldap.SCOPE_BASE) if (len(user_data) != 1): return False (dn, user_data) = user_data[0] user = User.create( username = username, email = user_data['mail'][0], api_id = pad.createAuthorIfNotExistsFor(user_dn, username)) return user def login_user(self, user): user.last_login = datetime.now() user.save() session['uuid'] = uuid.uuid4() return super(LdapAuth, self).login_user(user) def logout_user(self): if 'uuid' in session: for s in Session.select().where(Session.uuid == session['uuid']): try: s.delete_instance() except APIException: pass del session['uuid'] return super(LdapAuth, self).logout_user() def _format_dn(self, attr, with_base_dn = True): if with_base_dn: attr.extend(app.config['LDAP']['base_dn']) dn = ['%s=%s' % (item[0], self._escape(item[1])) for item in attr] return ','.join(dn) def _escape(self, s, wildcard=False): chars_to_escape = ['\\',',','=','+','<','>',';','"','\'','#','(',')','\0'] if not wildcard: chars_to_escape.append('*') escape = lambda x,y: x.replace(y,'\%02X' % ord(y)) return reduce(escape, chars_to_escape, s) def test_user(self, test_fn): def decorator(fn): @functools.wraps(fn) def inner(*args, **kwargs): user = self.get_logged_in_user() if not user or not test_fn(user): login_url = url_for('%s.login' % self.blueprint.name, next="%s%s" % (request.environ['SCRIPT_NAME'], get_next())) return redirect(login_url) return fn(*args, **kwargs) return inner return decorator auth = LdapAuth(app, db, user_model=User, default_next_url='/teams')