759 lines
26 KiB
Python
Executable File
759 lines
26 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
######################################################################
|
|
# Tool for modifying group memberships in AD
|
|
#
|
|
######################################################################
|
|
|
|
"""
|
|
This module provides the LdapUserDir class. It is used to interact
|
|
with an LDAP based user directory service.
|
|
|
|
Author: Derek Feichtinger <derek.feichtinger@psi.ch>
|
|
"""
|
|
|
|
import ldap
|
|
from ldap.controls import SimplePagedResultsControl
|
|
import sys
|
|
import re
|
|
from glob import fnmatch
|
|
import logging
|
|
import time
|
|
import itertools
|
|
|
|
|
|
class LdapUserDirError(Exception):
|
|
"""Exception class for LdapUserDir error conditions"""
|
|
def __init__(self, errmsg):
|
|
super().__init__(errmsg)
|
|
|
|
|
|
class LdapUserDir(object):
|
|
"""A class to interact with a LDAP based user and group directory
|
|
|
|
Parameters
|
|
----------
|
|
serverurl : str
|
|
URL of LDAP server (e.g. ldaps://host:port)
|
|
user_dn : str
|
|
DN of user for authenticating to LDAP
|
|
user_pw : str
|
|
password for authenticating to LDAP
|
|
group_ou : str, optional
|
|
base path for groups
|
|
user_ou : str, optional
|
|
base path for users
|
|
page_size : int
|
|
page size for paged retrieval of results in search_s_reconn.
|
|
The default value is '500'. A value of '0' disables paged
|
|
results.
|
|
logger : logger instance, optional
|
|
|
|
Attributes
|
|
----------
|
|
serverurl : str
|
|
group_ou : str
|
|
user_ou : str
|
|
user_dn : str
|
|
user_pw : str
|
|
page_size : int
|
|
logger : logger instance
|
|
|
|
Raises
|
|
------
|
|
ldap.LDAPError
|
|
Reraises original exception from ldap modules
|
|
"""
|
|
def __init__(self,
|
|
serverurl,
|
|
user_dn,
|
|
user_pw,
|
|
group_ou='ou=example.com',
|
|
user_ou='ou=example.com',
|
|
page_size=500,
|
|
logger=None):
|
|
self.serverurl = serverurl
|
|
self.group_ou = group_ou
|
|
self.user_ou = user_ou
|
|
self.user_dn = user_dn
|
|
self.user_pw = user_pw
|
|
self.page_size = page_size
|
|
|
|
if logger is None:
|
|
self.logger = logging.getLogger('LdapUserDir')
|
|
self.logger.setLevel(logging.WARNING)
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
ch = logging.StreamHandler()
|
|
|
|
ch.setFormatter(formatter)
|
|
ch.setLevel(logging.DEBUG)
|
|
self.logger.addHandler(ch)
|
|
else:
|
|
self.logger = logger
|
|
|
|
self._ldap = ldap.initialize(self.serverurl, trace_level=0,
|
|
trace_file=sys.stderr,
|
|
bytes_mode=False)
|
|
self.logger.debug('binding to: %s\n' % serverurl)
|
|
self.logger.debug('binding as user: %s\n' % user_dn)
|
|
|
|
# Without this, paged results don't work (see the python-ldap FAQ for a
|
|
# hint as to why)
|
|
self._ldap.set_option(ldap.OPT_REFERRALS, 0)
|
|
|
|
try:
|
|
self._ldap.bind_s(self.user_dn, self.user_pw)
|
|
except ldap.INVALID_CREDENTIALS:
|
|
self.logger.error('Authentication failure for dn:"%s"\n' %
|
|
self.user_dn)
|
|
raise
|
|
# need to clean that later
|
|
except ldap.LDAPError:
|
|
raise
|
|
|
|
@staticmethod
|
|
def ensure_utf8(bstr):
|
|
try:
|
|
if isinstance(bstr, bytes):
|
|
return bstr.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
pass
|
|
return bstr
|
|
|
|
@staticmethod
|
|
def has_dn_format(name):
|
|
"""Returns true if name has the format of a distinguished name
|
|
"""
|
|
try:
|
|
ldap.explode_dn(name)
|
|
except Exception:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
@staticmethod
|
|
def dn_to_cn(dn):
|
|
"""
|
|
Parameters
|
|
----------
|
|
dn :
|
|
|
|
Raises
|
|
------
|
|
RuntimeError
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
"""
|
|
"""transforms a DN to a CN
|
|
"""
|
|
reg = re.compile(r'^CN=([^,]+),')
|
|
m = reg.match(dn)
|
|
if m:
|
|
return m.group(1)
|
|
else:
|
|
raise RuntimeError("failed to convert DN to CN (%s)" % dn)
|
|
|
|
def _search_s(self, base, scope, filterstr='(objectClass=*)',
|
|
attrlist=None, attrsonly=0):
|
|
"""Helper for search_s_reconn.
|
|
|
|
Wraps ldap.search_ext to use paged results if desired (see self.page_size).
|
|
|
|
Returns
|
|
-------
|
|
list of tuples
|
|
list of tuples of the form (dn, attributes)
|
|
"""
|
|
if self.page_size == 0:
|
|
# Do not use paged results
|
|
self.logger.debug('not using paging since page_size is %d' % self.page_size)
|
|
return self._ldap.search_s(base, scope, filterstr, attrlist,
|
|
attrsonly)
|
|
else:
|
|
# Use paged results
|
|
self.logger.debug('paging with current page size set to %d' % self.page_size)
|
|
page_ctrl = SimplePagedResultsControl(criticality=True,
|
|
size=self.page_size,
|
|
cookie='')
|
|
msgid = self._ldap.search_ext(base, scope, filterstr, attrlist,
|
|
attrsonly,
|
|
serverctrls=[page_ctrl])
|
|
|
|
results = []
|
|
npages=0
|
|
while True:
|
|
_, rdata, _, resp_ctrls = self._ldap.result3(msgid)
|
|
npages += 1
|
|
results.extend(rdata)
|
|
|
|
# Extract the SimplePagedResultsControl to get the cookie.
|
|
pagecontrols = [c for c in resp_ctrls if c.controlType == SimplePagedResultsControl.controlType]
|
|
self.logger.debug('Retrieved page %d of results' % npages)
|
|
if not pagecontrols:
|
|
raise RuntimeError("The server ignores RFC 2696 control (paged results)")
|
|
if not pagecontrols[0].cookie:
|
|
# We're done.
|
|
break
|
|
# Update the cookie to retrieve the next page.
|
|
page_ctrl.cookie = pagecontrols[0].cookie
|
|
|
|
msgid = self._ldap.search_ext(base, scope, filterstr, attrlist, attrsonly,
|
|
serverctrls=[page_ctrl])
|
|
|
|
# result4 return triples instead of tuples, despite what the
|
|
# python-ldap documentation says. Drop the third element.
|
|
return [r[:2] for r in results]
|
|
|
|
def search_s_reconn(self, base, scope, filterstr='(objectClass=*)',
|
|
attrlist=None, attrsonly=0, recon_attempts=2):
|
|
"""wrapper of standard ldap.search_s synchronous search that
|
|
tries to reconnect
|
|
|
|
Implemented the functionality in this class since the use of
|
|
the standard automatic reconnect available
|
|
(ReconnectLDAPObject) did not do a rebind correctly.
|
|
|
|
Parameters
|
|
----------
|
|
base : str
|
|
base DN
|
|
scope : {ldap.SCOPE_BASE, ldap.SCOPE_ONELEVEL, ldap.SCOPE_SUBTREE}
|
|
filterstr : str
|
|
LDAP search filter, optional
|
|
attrlist : list of str, optional
|
|
list of attributes to search for
|
|
attrsonly : int, optional
|
|
do not return values for attributes if nonzero
|
|
recon_attempts : int, optional
|
|
number of reconnects to attempt in case of failure
|
|
|
|
Raises
|
|
------
|
|
Reraises original exception from ldap.search_s
|
|
|
|
Returns
|
|
-------
|
|
list of tuples
|
|
list of tuples of the form (dn, attributes)
|
|
"""
|
|
|
|
attempts = 0
|
|
ok = False
|
|
while not ok:
|
|
try:
|
|
ok = True
|
|
attempts += 1
|
|
repl = self._search_s(base, scope, filterstr, attrlist,
|
|
attrsonly)
|
|
except Exception as err:
|
|
ok = False
|
|
|
|
self.logger.warning("Got ldap error: %s" % (err,))
|
|
if attempts >= recon_attempts:
|
|
raise
|
|
|
|
# we try to reconnect and rebind
|
|
try:
|
|
del self._ldap
|
|
except Exception as err:
|
|
self.logger.warning("failed to delete LDAP object: %s"
|
|
% (err,))
|
|
|
|
self.logger.warning("Trying reconnecting to ldap (attempt %s)"
|
|
% attempts)
|
|
time.sleep(1)
|
|
|
|
try:
|
|
self._ldap = ldap.initialize(self.serverurl, trace_level=0,
|
|
trace_file=sys.stderr,
|
|
bytes_mode=False)
|
|
except ldap.SERVER_DOWN:
|
|
self.logger.warning("ldap initialization error" +
|
|
", server down (server: %s)" %
|
|
self.serverurl
|
|
+ ": %s" % (err,))
|
|
except Exception as err:
|
|
self.logger.warning("ldap initialization error" +
|
|
" (server: %s)" %
|
|
self.serverurl
|
|
+ ": %s" % (err,))
|
|
|
|
try:
|
|
self._ldap.bind_s(self.user_dn, self.user_pw)
|
|
except ldap.INVALID_CREDENTIALS:
|
|
self.logger.error('Authentication failure for dn:"%s"\n'
|
|
% self.user_dn)
|
|
except Exception as err:
|
|
self.logger.warning("ldap binding error" +
|
|
" (server: %s)" %
|
|
self.serverurl
|
|
+ ": %s" % (err,))
|
|
|
|
return repl
|
|
|
|
def get_users(self, filter='*', ou=None, mssfu=False):
|
|
"""get the names of all users from the directory service
|
|
|
|
Parameters
|
|
----------
|
|
filter : str, optional
|
|
filter expression used for the cn part of the ldap dn
|
|
ou : str, optional
|
|
mssfu : bool, optional
|
|
Whether to only show users with mssfu mappings
|
|
|
|
Returns
|
|
-------
|
|
dict
|
|
dictionary of the matching users { dn1:list1, ... }
|
|
"""
|
|
if ou is None:
|
|
ou = self.user_ou
|
|
|
|
if mssfu:
|
|
srch = '(&(objectClass=user)(!(objectClass=computer))(msSFU30UidNumber=*)(msSFU30HomeDirectory=*)(cn=%s))'
|
|
else:
|
|
srch = '(&(objectClass=user)(!(objectClass=computer))(cn=%s))'
|
|
|
|
r = self.search_s_reconn(ou, ldap.SCOPE_SUBTREE,
|
|
srch % filter)
|
|
|
|
return r
|
|
|
|
def get_users_by_mailaddr(self, filter='*', ou=None, mssfu=False):
|
|
"""get the names of all users from the directory service
|
|
|
|
Parameters
|
|
----------
|
|
filter : str, optional
|
|
filter expression used for the cn part of the ldap dn
|
|
ou : str, optional
|
|
mssfu : bool, optional
|
|
Whether to only show users with mssfu mappings
|
|
|
|
Returns
|
|
-------
|
|
dict
|
|
dictionary of the matching users { dn1:list1, ... }
|
|
"""
|
|
if ou is None:
|
|
ou = self.user_ou
|
|
|
|
if mssfu:
|
|
srch = '(&(objectClass=user)(!(objectClass=computer))(msSFU30UidNumber=*)(msSFU30HomeDirectory=*)(mail=%s))'
|
|
else:
|
|
srch = '(&(objectClass=user)(!(objectClass=computer))(mail=%s))'
|
|
|
|
r = self.search_s_reconn(ou, ldap.SCOPE_SUBTREE,
|
|
srch % filter)
|
|
return r
|
|
|
|
def list_users_etcpwd(self, records, verbose=0):
|
|
"""Print '/etc/pwd' format like information about matching users
|
|
Parameters
|
|
----------
|
|
records : ldap result structure with user records
|
|
verbose : int, optional
|
|
"""
|
|
fields = ['cn', 'msSFU30UidNumber', 'msSFU30UidNumber',
|
|
'msSFU30GidNumber', 'displayName',
|
|
'msSFU30LoginShell', 'msSFU30HomeDirectory']
|
|
for dn, entry in records:
|
|
if verbose == 1:
|
|
for k in fields + ['description', 'mail', 'mobile','department']:
|
|
if k in entry:
|
|
sys.stdout.write('[%s:]%s:' % (k, self.ensure_utf8(entry[k][0])))
|
|
else:
|
|
sys.stdout.write('[%s:]N.A.:' % k)
|
|
sys.stdout.write('\n')
|
|
elif verbose >= 1:
|
|
print("DN=%s" % dn)
|
|
for k in entry:
|
|
sys.stdout.write(' %s: %s\n' % (k, self.ensure_utf8(entry[k][0])))
|
|
else:
|
|
for k in fields:
|
|
if k in entry:
|
|
sys.stdout.write('%s:' % (self.ensure_utf8(entry[k][0])))
|
|
else:
|
|
sys.stdout.write('N.A.:')
|
|
sys.stdout.write('\n')
|
|
|
|
def systemuser2dn(self, uname, mssfu=True):
|
|
"""Converts a user's system username to the dn of the ldap directory
|
|
by performing a search on ldap
|
|
|
|
Parameters
|
|
----------
|
|
uname : str
|
|
system username
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
DN of the user
|
|
|
|
Raises
|
|
------
|
|
LdapUserDirError("No such user")
|
|
if no such user exists
|
|
|
|
"""
|
|
srch = '(&(objectClass=user)(!(objectClass=computer))(cn=%s))' % uname
|
|
if mssfu:
|
|
srch = '(&(objectClass=user)(!(objectClass=computer))(msSFU30UidNumber=*)(msSFU30HomeDirectory=*)(cn=%s))' % uname
|
|
|
|
self.logger.debug('systemuser2dn: %s' % srch)
|
|
r = self.search_s_reconn(self.user_ou, ldap.SCOPE_SUBTREE, srch)
|
|
|
|
if len(r) == 0:
|
|
raise LdapUserDirError("No such user")
|
|
|
|
dn = self.ensure_utf8(r[0][0])
|
|
self.logger.debug('systemuser2dn: dn = %s' % dn)
|
|
return dn
|
|
|
|
def get_groups_struct(self, gfilter='*', ou=None, mssfu=False):
|
|
"""searches for groups that match filter
|
|
|
|
returns the full ldap search result structure for the search
|
|
with the optional filter applied to the cn field
|
|
|
|
Parameters
|
|
----------
|
|
gfilter : str, optional
|
|
filter expression used for the cn part of the ldap dn
|
|
ou : str, optional
|
|
group OU
|
|
mssfu : bool, optional
|
|
Whether to only show users with mssfu mappings
|
|
|
|
Returns
|
|
-------
|
|
dict
|
|
list of the matching groups { (dn1:dict1), ... }
|
|
"""
|
|
if ou is None:
|
|
group_ou = self.group_ou
|
|
|
|
if mssfu:
|
|
srch = '(&(objectClass=Group)(msSFU30GidNumber=*)(cn=%s))' % gfilter
|
|
else:
|
|
srch = '(&(objectClass=Group)(cn=%s))' % gfilter
|
|
self.logger.debug('get_groups_struct: %s' % srch)
|
|
|
|
r = self.search_s_reconn(group_ou, ldap.SCOPE_SUBTREE, srch)
|
|
|
|
# The following filter is necessary, because AD yielded
|
|
# some (None,String) fields when searching with
|
|
# --group-ou='dc=d,dc=example,dc=org'. This led to errors.
|
|
r = [el for el in r if el[0] is not None]
|
|
return r
|
|
|
|
def get_memberof(self, dn, recursive=True, mssfu=False):
|
|
"""Get all memberOf attributes for an dn (optionally recursively)
|
|
|
|
This routine relies on the LDAP directory keeping memberOf attributes
|
|
in the individual entries.
|
|
|
|
Note that a recursive search may not show all groups, if mssfu
|
|
was selected, and a group in the hierarchy tree has no msSFU
|
|
settings.
|
|
|
|
Parameters
|
|
----------
|
|
dn : str
|
|
distinguished name
|
|
recursive : bool, optional
|
|
if True, recurse into hierarchical groups
|
|
mssfu : bool, optional
|
|
whether to only return entries with MSsfu attributes
|
|
|
|
Returns
|
|
-------
|
|
list
|
|
|
|
"""
|
|
self.logger.debug('get_memberof: for dn: %s' % dn)
|
|
|
|
srch = '(&)'
|
|
if mssfu:
|
|
srch = '(|(msSFU30GidNumber=*)(msSFU30UidNumber=*))'
|
|
|
|
self.logger.debug('get_memberof: query = %s' % srch )
|
|
r = self.search_s_reconn(dn, ldap.SCOPE_BASE, srch,
|
|
attrlist=['memberOf'])
|
|
|
|
if r == []:
|
|
return []
|
|
|
|
grplist = []
|
|
if 'memberOf' in r[0][1]:
|
|
grplist = [self.ensure_utf8(g) for g in r[0][1]['memberOf']]
|
|
|
|
if mssfu:
|
|
srch = '(msSFU30GidNumber=*)'
|
|
tmplist = []
|
|
for g in grplist:
|
|
self.logger.debug('testing msSFU for %s' % g)
|
|
r2 = self.search_s_reconn(g, ldap.SCOPE_BASE, srch)
|
|
if len(r2):
|
|
tmplist.append(g)
|
|
else:
|
|
self.logger.debug('no msSFU info found for %s' % g)
|
|
grplist = tmplist
|
|
|
|
if recursive:
|
|
tmplist = []
|
|
for g in grplist:
|
|
tmplist.extend(self.get_memberof(g, recursive=recursive))
|
|
grplist.extend(tmplist)
|
|
|
|
return grplist
|
|
|
|
def get_groups_for_user(self, user, gfilter=None, returndn=False,
|
|
mssfu=False, recursive=True):
|
|
"""Get groups for a particular user from LDAP.
|
|
|
|
The function will try to determine whether it receives a DN or
|
|
a system username that needs to be converted to a DN first.
|
|
|
|
Parameters
|
|
----------
|
|
user : str
|
|
system username or user DN
|
|
ou : str, optional
|
|
The organisational unit to be used in the ldap search
|
|
returndn : bool, optional
|
|
If set True the function will return DN, otherwise CN
|
|
mssfu : bool, optional
|
|
Whether to only show users with mssfu mappings
|
|
|
|
Returns
|
|
-------
|
|
list
|
|
|
|
Raises
|
|
------
|
|
RuntimeError
|
|
if CN cannot be identified in a resulting group
|
|
"""
|
|
if not self.has_dn_format(user):
|
|
dnname = self.systemuser2dn(user, mssfu=mssfu)
|
|
else:
|
|
dnname = user
|
|
|
|
reslist = self.get_memberof(dnname, recursive=recursive, mssfu=mssfu)
|
|
|
|
if not returndn:
|
|
try:
|
|
reslist = [self.dn_to_cn(grp) for grp in reslist]
|
|
except RuntimeError as e:
|
|
self.logger.error(str(e))
|
|
|
|
if gfilter:
|
|
reslist = fnmatch.filter(reslist, gfilter)
|
|
|
|
return reslist
|
|
|
|
def list_groups(self, filter='*', ou=None, mssfu=False,
|
|
returndn=False, verbose=0, recursive=False,indent=0):
|
|
"""Prints a list of groups from the LDAP directory to stdout
|
|
|
|
Parameters
|
|
----------
|
|
filter : str, optional
|
|
filter expression used for the cn part of the ldap dn
|
|
ou : str, optional
|
|
organisational unit to be used in the ldap search
|
|
mssfu : bool, optional
|
|
Whether to only show users with mssfu mappings
|
|
returndn : bool, optional
|
|
If true, return full DNs
|
|
verbose : int, optional
|
|
If true, print one name per line
|
|
recursive : bool, optional
|
|
If true, any groups contained within the output will be resolved recursively to users
|
|
indent : int, optional
|
|
For internal use only. Indicates indent level for verbose recursive mode. Otherwise ignored.
|
|
"""
|
|
if returndn and verbose == 0:
|
|
verbose = 1
|
|
|
|
r = self.get_groups_struct(filter, ou, mssfu)
|
|
if len(r) == 0:
|
|
sys.stderr.write("%sError: no groups found (filter: %s)\n" % (' '*indent, filter))
|
|
return
|
|
|
|
if verbose > 0:
|
|
indent_increment = 3 # amount to indent members
|
|
for dn, entry in r:
|
|
if returndn:
|
|
print("%sgroup: %s" % (' '*indent, dn), end=''),
|
|
else:
|
|
print("%sgroup: %s" % (' '*indent, self.ensure_utf8(entry['cn'][0])), end='')
|
|
if not 'msSFU30GidNumber' in entry:
|
|
gid = '---'
|
|
else:
|
|
gid = self.ensure_utf8(entry['msSFU30GidNumber'][0])
|
|
print("(%s)" % gid)
|
|
if 'member' in entry:
|
|
for member in (self.ensure_utf8(m) for m in entry['member']):
|
|
# Check if member is itself a group. This might be ExampleOrg-specific
|
|
is_group = self._is_group(member)
|
|
if recursive and is_group:
|
|
self.list_groups(
|
|
filter=self.dn_to_cn(member),
|
|
ou=ou,
|
|
mssfu=mssfu,
|
|
returndn=returndn,
|
|
verbose=verbose,
|
|
recursive=recursive,
|
|
indent=indent+indent_increment)
|
|
else:
|
|
if returndn:
|
|
print('%smember: %s' % (' '*(indent+indent_increment), member ))
|
|
else:
|
|
print('%smember: %s' % (' '*(indent+indent_increment),
|
|
self.dn_to_cn(member)))
|
|
else:
|
|
for dn, entry in r:
|
|
if not 'msSFU30GidNumber' in entry:
|
|
gid = '---'
|
|
else:
|
|
gid = self.ensure_utf8(entry['msSFU30GidNumber'][0])
|
|
|
|
sys.stdout.write("%s:IGNORE:%s:" % (self.ensure_utf8(entry['cn'][0]), gid))
|
|
if 'member' in entry:
|
|
members = [self.dn_to_cn(self.ensure_utf8(dn)) for dn in entry['member']]
|
|
if recursive:
|
|
members = [m for cn in members for m in self._get_all_members(cn)]
|
|
sys.stdout.write(",".join(members) + "\n")
|
|
else:
|
|
sys.stdout.write("\n")
|
|
|
|
def _get_all_members(self, filter, include_groups=False, ou=None, mssfu=False):
|
|
"""Generator to recursively get the CN for all members of a group.
|
|
|
|
If filter does not match any groups it is assumed to be a user and is yeilded directly.
|
|
|
|
Parameters
|
|
----------
|
|
filter : str
|
|
filter expression used for the cn part of the ldap dn. If it matches
|
|
multiple entries they will be concatenated.
|
|
include_groups : bool
|
|
if true, the CN of any member groups will be included in the results
|
|
(preordered traversal)
|
|
ou : str, optional
|
|
organisational unit to be used in the ldap search
|
|
mssfu : bool, optional
|
|
Whether to only show users with mssfu mappings
|
|
|
|
Returns
|
|
-------
|
|
Generator of str : CN for all members of groups matching the filter.
|
|
"""
|
|
r = self.get_groups_struct(filter, ou, mssfu)
|
|
if len(r) == 0:
|
|
# no group entry, so input must have been a user
|
|
yield filter
|
|
return
|
|
|
|
for dn, entry in r:
|
|
# preorder the group
|
|
if include_groups:
|
|
yield self.dn_to_cn(dn)
|
|
if "member" in entry:
|
|
# not an empty group
|
|
for member in entry["member"]:
|
|
member = self.ensure_utf8(member)
|
|
# Shortcut recursion for known leaves
|
|
#if not LdapUserDir._is_group(member):
|
|
# yield member
|
|
#else:
|
|
cn = self.dn_to_cn(member)
|
|
for submember in self._get_all_members(cn, include_groups=include_groups, ou=ou, mssfu=mssfu):
|
|
yield submember
|
|
|
|
def _is_group(self, dn):
|
|
"""Quick check if a DN is a group.
|
|
|
|
Parameters
|
|
----------
|
|
dn : str
|
|
DN of the entry
|
|
"""
|
|
|
|
# self.logger.debug("_is_group: %s %s:" % (self.group_ou, dn))
|
|
|
|
# Use casefold() for proper case-insensitive comparison in Python 3
|
|
return self.group_ou.casefold() in dn.casefold()
|
|
|
|
def _mod_groupmembers(self, ldapmode, dngroup, usernames):
|
|
"""modifies (adds/deletes) members of an LDAP group entry
|
|
|
|
Parameters
|
|
----------
|
|
ldapmode : {ldap.MOD_ADD, ldap.MOD_DELETE}
|
|
dngroup : str
|
|
DN of the group
|
|
usernames : list of str
|
|
List of usernames (system names or DNs)
|
|
|
|
Raises
|
|
------
|
|
ldap.LDAPError
|
|
Reraises original exception for LDAP problems
|
|
RuntimeError
|
|
if no such user exists
|
|
"""
|
|
if not self.has_dn_format(dngroup):
|
|
dngroup = ''.join(['cn=', dngroup, ',', self.group_ou])
|
|
dnlist = []
|
|
for name in usernames:
|
|
if not self.has_dn_format(name):
|
|
dnname = self.systemuser2dn(name)
|
|
if dnname == '':
|
|
raise RuntimeError('Error: No such ldap user: %s' % name)
|
|
dnlist.append(dnname)
|
|
else:
|
|
dnlist.append(name)
|
|
mod_attrs = [(ldapmode, 'member', dnlist)]
|
|
try:
|
|
self._ldap.modify_s(dngroup, mod_attrs)
|
|
except ldap.ALREADY_EXISTS:
|
|
self.logger.error('Entry already exists in group %s' % dngroup)
|
|
raise
|
|
except ldap.LDAPError:
|
|
self.logger.error('ERROR modifying LDAP: group: %s; user list: %s\n'
|
|
% (dngroup, dnlist))
|
|
raise
|
|
|
|
def add_groupmembers(self, group, usernames):
|
|
"""Adds users to an LDAP group
|
|
|
|
Parameters
|
|
----------
|
|
dngroup : str
|
|
DN of the group
|
|
usernames : list of str
|
|
List of usernames (system names or DNs)
|
|
"""
|
|
self._mod_groupmembers(ldap.MOD_ADD, group, usernames)
|
|
|
|
def del_groupmembers(self, group, usernames):
|
|
"""Deletes users from an LDAP group
|
|
|
|
Parameters
|
|
----------
|
|
dngroup : str
|
|
DN of the group
|
|
usernames : list of str
|
|
List of usernames (system names or DNs)
|
|
"""
|
|
self._mod_groupmembers(ldap.MOD_DELETE, group, usernames)
|
|
|
|
def __del__(self):
|
|
self._ldap.unbind()
|