summaryrefslogtreecommitdiffstats
path: root/accounts/backend/user/ldap.py
diff options
context:
space:
mode:
Diffstat (limited to 'accounts/backend/user/ldap.py')
-rw-r--r--accounts/backend/user/ldap.py174
1 files changed, 106 insertions, 68 deletions
diff --git a/accounts/backend/user/ldap.py b/accounts/backend/user/ldap.py
index 99080a4..a1ed904 100644
--- a/accounts/backend/user/ldap.py
+++ b/accounts/backend/user/ldap.py
@@ -4,7 +4,11 @@
import ldap3
from ldap3.utils.conv import escape_filter_chars
from ldap3.utils.dn import escape_rdn
-from ldap3.core.exceptions import LDAPInvalidCredentialsResult, LDAPException, LDAPOperationResult
+from ldap3.core.exceptions import (
+ LDAPInvalidCredentialsResult,
+ LDAPException,
+ LDAPOperationResult,
+)
from ldap3.core.results import RESULT_SUCCESS
from ldap3.abstract.entry import Entry
from ldap3 import Connection
@@ -24,28 +28,32 @@ def _escape(value: str, wildcard=False):
return escape_rdn(value)
-def _change_password(conn: Connection, dn,
- passwords: tuple[Optional[str], Optional[str]],
- as_admin=False):
+def _change_password(
+ conn: Connection,
+ dn,
+ passwords: tuple[Optional[str], Optional[str]],
+ as_admin=False,
+):
old_password, new_password = passwords
if as_admin:
conn.extend.standard.modify_password(dn, None, new_password)
else:
try:
- conn.extend.standard.modify_password(dn, old_password,
- new_password)
+ conn.extend.standard.modify_password(
+ dn, old_password, new_password
+ )
except LDAPException:
- raise InvalidPasswordError('Invalid password')
+ raise InvalidPasswordError("Invalid password")
class LdapBackend(Backend):
def __init__(self, app: AccountsFlask) -> None:
super(LdapBackend, self).__init__(app)
- self.host: str = self.app.config['LDAP_HOST']
- self.base_dn: list[tuple[str, str]] = self.app.config['LDAP_BASE_DN']
- self.admin_user: str = self.app.config['LDAP_ADMIN_USER']
- self.admin_pass: str = self.app.config['LDAP_ADMIN_PASS']
+ self.host: str = self.app.config["LDAP_HOST"]
+ self.base_dn: list[tuple[str, str]] = self.app.config["LDAP_BASE_DN"]
+ self.admin_user: str = self.app.config["LDAP_ADMIN_USER"]
+ self.admin_pass: str = self.app.config["LDAP_ADMIN_PASS"]
self.services: list[Service] = self.app.all_services
self.admin = False
@@ -56,29 +64,31 @@ class LdapBackend(Backend):
Tries to authenticate a user with a given password. If the
authentication is successful an Account object will be returned.
"""
- user_dn: str = self._format_dn([('uid', username), ('ou', 'users')])
+ user_dn: str = self._format_dn([("uid", username), ("ou", "users")])
conn: Connection = self._connect(user_dn, password)
uid = None
mail = None
uidNumber = None
services = []
- conn.search(user_dn, '(objectClass=*)',
- attributes=['objectClass', 'uid', 'mail', 'cn', 'uidNumber'])
+ conn.search(
+ user_dn,
+ "(objectClass=*)",
+ attributes=["objectClass", "uid", "mail", "cn", "uidNumber"],
+ )
entry: Entry
for entry in conn.entries:
- if 'splineAccount' in entry.objectClass.values:
+ if "splineAccount" in entry.objectClass.values:
uid = entry.uid.value
mail = entry.mail.value
uidNumber = entry.uidNumber.value
- elif 'servicePassword' in entry.objectClass.value:
+ elif "servicePassword" in entry.objectClass.value:
services.append(entry.cn.value)
if uid is None or mail is None or uidNumber is None:
raise NoSuchUserError("User not found")
- return Account(uid, mail, services, password,
- uidNumber=uidNumber)
+ return Account(uid, mail, services, password, uidNumber=uidNumber)
def find(self, filters: Optional[dict[str, str]] = None, wildcard=False):
"""
@@ -87,21 +97,32 @@ class LdapBackend(Backend):
if filters is None:
filters = dict()
- filters['objectClass'] = 'splineAccount'
- filter_as_list = ['(%s=%s)' % (attr, _escape(value, wildcard))
- for attr, value in list(filters.items())]
- filterstr = '(&%s)' % ''.join(filter_as_list)
+ filters["objectClass"] = "splineAccount"
+ filter_as_list = [
+ "(%s=%s)" % (attr, _escape(value, wildcard))
+ for attr, value in list(filters.items())
+ ]
+ filterstr = "(&%s)" % "".join(filter_as_list)
conn = self._connect()
- base_dn = self._format_dn([('ou', 'users')])
+ base_dn = self._format_dn([("ou", "users")])
accounts: list[Account] = []
try:
- conn.search(base_dn, filterstr, search_scope=ldap3.LEVEL,
- attributes=['uid', 'mail', 'uidNumber'])
+ conn.search(
+ base_dn,
+ filterstr,
+ search_scope=ldap3.LEVEL,
+ attributes=["uid", "mail", "uidNumber"],
+ )
for entry in conn.entries:
- accounts.append(Account(entry.uid.value, entry.mail.value,
- uidNumber=entry.uidNumber.value))
+ accounts.append(
+ Account(
+ entry.uid.value,
+ entry.mail.value,
+ uidNumber=entry.uidNumber.value,
+ )
+ )
except LDAPException:
pass
@@ -110,14 +131,14 @@ class LdapBackend(Backend):
def _store(self, account: Account):
conn = self._connect_as_admin()
- user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')])
+ user_dn = self._format_dn([("uid", account.uid), ("ou", "users")])
attrs = {
- 'objectClass': ['top', 'inetOrgPerson', 'splineAccount'],
- 'uid': _escape(account.uid),
- 'sn': 'n/a',
- 'cn': _escape(account.uid),
- 'mail': _escape(account.mail),
- 'uidNumber': _escape(account.uidNumber),
+ "objectClass": ["top", "inetOrgPerson", "splineAccount"],
+ "uid": _escape(account.uid),
+ "sn": "n/a",
+ "cn": _escape(account.uid),
+ "mail": _escape(account.mail),
+ "uidNumber": _escape(account.uidNumber),
}
conn.add(user_dn, attributes=attrs)
@@ -129,14 +150,16 @@ class LdapBackend(Backend):
Updates account informations like passwords or email.
"""
conn = None
- user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')])
+ user_dn = self._format_dn([("uid", account.uid), ("ou", "users")])
if as_admin:
conn = self._connect_as_admin()
else:
conn = self._connect(user_dn, account.password)
- attrs = {key: [(ldap3.MODIFY_REPLACE, [value])]
- for key, value in list(account.attributes.items())}
+ attrs = {
+ key: [(ldap3.MODIFY_REPLACE, [value])]
+ for key, value in list(account.attributes.items())
+ }
conn.modify(user_dn, attrs)
self._alter_passwords(conn, account, as_admin=as_admin)
@@ -150,57 +173,64 @@ class LdapBackend(Backend):
else:
conn = self._connect(account.uid, account.password)
- dns = [[('cn', service), ('uid', account.uid), ('ou', 'users')]
- for service in account.services] + \
- [[('uid', account.uid), ('ou', 'users')]]
+ dns = [
+ [("cn", service), ("uid", account.uid), ("ou", "users")]
+ for service in account.services
+ ] + [[("uid", account.uid), ("ou", "users")]]
for dn in dns:
conn.delete(self._format_dn(dn))
- def _format_dn(self, attr: list[tuple[str, str]], with_base_dn=True) -> str:
+ def _format_dn(
+ self, attr: list[tuple[str, str]], with_base_dn=True
+ ) -> str:
if with_base_dn:
attr.extend(self.base_dn)
- dn = ['%s=%s' % (item[0], _escape(item[1]))
- for item in attr]
+ dn = ["%s=%s" % (item[0], _escape(item[1])) for item in attr]
- return ','.join(dn)
+ return ",".join(dn)
- def _connect(self, user: Optional[str] = None,
- password: Optional[str] = None) -> Connection:
+ def _connect(
+ self, user: Optional[str] = None, password: Optional[str] = None
+ ) -> Connection:
server = ldap3.Server(self.host)
conn = ldap3.Connection(server, user, password, raise_exceptions=True)
try:
conn.bind()
except LDAPInvalidCredentialsResult:
- raise InvalidPasswordError('Invalid password')
+ raise InvalidPasswordError("Invalid password")
return conn
def _connect_as_admin(self) -> Connection:
- admin_dn = self._format_dn([('cn', self.admin_user)])
+ admin_dn = self._format_dn([("cn", self.admin_user)])
return self._connect(admin_dn, self.admin_pass)
- def _alter_passwords(self, conn: Connection, account: Account,
- as_admin=False):
+ def _alter_passwords(
+ self, conn: Connection, account: Account, as_admin=False
+ ):
if account.new_password_root:
- user_dn = self._format_dn([('uid', account.uid), ('ou', 'users')])
- _change_password(conn, user_dn, account.new_password_root, as_admin)
+ user_dn = self._format_dn([("uid", account.uid), ("ou", "users")])
+ _change_password(
+ conn, user_dn, account.new_password_root, as_admin
+ )
_, account.password = account.new_password_root
account.new_password_root = None
for service, passwords in list(account.new_password_services.items()):
service_id = service.lower()
- service_dn = self._format_dn([('cn', service_id), ('uid', account.uid),
- ('ou', 'users')])
+ service_dn = self._format_dn(
+ [("cn", service_id), ("uid", account.uid), ("ou", "users")]
+ )
_, new = passwords
if new is not None:
if service_id not in account.services:
attrs = {
- 'objectClass': ['top', 'servicePassword'],
- 'cn': _escape(service_id),
+ "objectClass": ["top", "servicePassword"],
+ "cn": _escape(service_id),
}
conn.add(service_dn, attributes=attrs)
account.services.append(service_id)
@@ -214,30 +244,38 @@ class LdapBackend(Backend):
del account.new_password_services[service]
def _get_last_uidNumber(self, conn: Connection):
- uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')])
- conn.search(uidNumber_dn, '(objectClass=uidNumberMaximum)',
- attributes=['uidNumber'])
+ uidNumber_dn = self._format_dn([("cn", "uidMax"), ("ou", "other")])
+ conn.search(
+ uidNumber_dn,
+ "(objectClass=uidNumberMaximum)",
+ attributes=["uidNumber"],
+ )
for entry in conn.entries:
return int(entry.uidNumber.value)
- raise ShouldNotHappen('Last uidNumber not found.')
+ raise ShouldNotHappen("Last uidNumber not found.")
def _get_next_uidNumber(self):
conn = self._connect_as_admin()
- uidNumber_dn = self._format_dn([('cn', 'uidMax'), ('ou', 'other')])
+ uidNumber_dn = self._format_dn([("cn", "uidMax"), ("ou", "other")])
uidNumber = self._get_last_uidNumber(conn)
# Try to acquire next uidNumber
for i in [0, 1, 2, 3, 4, 5]:
try:
- conn.modify(uidNumber_dn, {'uidNumber': [
- (ldap3.MODIFY_DELETE, ['%d' % (uidNumber + i)]),
- (ldap3.MODIFY_ADD, ['%d' % (uidNumber + i + 1)]),
- ]})
-
- if conn.result['result'] == RESULT_SUCCESS:
+ conn.modify(
+ uidNumber_dn,
+ {
+ "uidNumber": [
+ (ldap3.MODIFY_DELETE, ["%d" % (uidNumber + i)]),
+ (ldap3.MODIFY_ADD, ["%d" % (uidNumber + i + 1)]),
+ ]
+ },
+ )
+
+ if conn.result["result"] == RESULT_SUCCESS:
return uidNumber + i + 1
except LDAPOperationResult:
pass
- raise ShouldNotHappen('Unable to get next uidNumber, try again.')
+ raise ShouldNotHappen("Unable to get next uidNumber, try again.")