from ldap3 import Tls, Server, Connection, BASE from ldap3.utils.dn import safe_dn import ssl def user_cls(login): def decorator(cls): login.user_loader(lambda uid: cls.query.get(uid)) return cls return decorator def _format_dn(parts): return ','.join([safe_dn(part) for part in parts]) def auth(config, model, username, password): tls_configuration = Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1) server = Server(config['host'], use_ssl=True, tls=tls_configuration) user_dn = _format_dn(['uid=%s' % username] + config['base_dn']) conn = Connection(server, user=user_dn, password=password) if not conn.bind(): return None user = model.query.filter_by(name=username).first() if user is None: if not conn.search(user_dn, '(objectclass=inetOrgPerson)', search_scope=BASE, attributes=['mail']): return None user_data = conn.entries[0] user = model.create(name=username, email=user_data.mail.value) return user