[Freeipa-devel] [PATCH 1/1] Resolve external members from trusted domain via Global Catalog

Alexander Bokovoy abokovoy at redhat.com
Wed Oct 31 20:52:12 UTC 2012


A sequence is following:
1. Match external member against existing trusted domain
2. Find trusted domain's domain controller and preferred GC hosts
3. Fetch trusted domain account auth info
4. Set up ccache in /var/run/ipa_memcached/krb5cc_TD<domain> with principal ourdomain$@trusted.domain
5. Do LDAP SASL interactive bind using the ccache
6. Search for the member's SID
7. Decode SID
8. Replace external member name by SID
---
 ipalib/plugins/group.py |  32 ++++---
 ipalib/plugins/trust.py |  17 ++--
 ipaserver/dcerpc.py     | 233 +++++++++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 257 insertions(+), 25 deletions(-)

diff --git a/ipalib/plugins/group.py b/ipalib/plugins/group.py
index a174ba62cc32a7fb83474f52e2621521553889af..f86b134e61fc8c7518a64d25329babee3398c6ef 100644
--- a/ipalib/plugins/group.py
+++ b/ipalib/plugins/group.py
@@ -83,28 +83,30 @@ External members should be added to groups that specifically created as
 external and non-POSIX. Such group later should be included into one of POSIX
 groups.
 
-An external group member is currently a Security Identifier as defined by
-the trusted domain.
+An external group member is currently a Security Identifier (SID) as defined by
+the trusted domain. When adding external group members, it is possible to
+specify them in either SID, or DOM\\name, or name at domain format. IPA will attempt
+to resolve passed name to SID with the use of Global Catalog of the trusted domain.
 
 Example:
 
-1. Make note of the trusted domain security identifier
-
-   domainsid = `ipa trust-show <ad.domain> | grep Identifier | cut -d: -f2`
-
-2. Create group for the trusted domain admins' mapping and their local POSIX group:
+1. Create group for the trusted domain admins' mapping and their local POSIX group:
 
    ipa group-add --desc='<ad.domain> admins external map' ad_admins_external --external
    ipa group-add --desc='<ad.domain> admins' ad_admins
 
-3. Add security identifier of Domain Admins of the <ad.domain> to the ad_admins_external
-   group (security identifier of <ad.domain SID>-513 is Domain Admins group):
+2. Add security identifier of Domain Admins of the <ad.domain> to the ad_admins_external
+   group:
 
-   ipa group-add-member ad_admins_external --external ${domainsid}-513
+   ipa group-add-member ad_admins_external --external 'AD\\Domain Admins'
 
-4. Allow members of ad_admins_external group to be associated with ad_admins POSIX group:
+3. Allow members of ad_admins_external group to be associated with ad_admins POSIX group:
 
    ipa group-add-member ad_admins --groups ad_admins_external
+
+4. List members of external members of ad_admins_external group to see their SIDs:
+
+   ipa group-show ad_admins_external
 """)
 
 PROTECTED_GROUPS = (u'admins', u'trust admins', u'default smb group')
@@ -165,7 +167,7 @@ api.register(group)
 ipaexternalmember_param = Str('ipaexternalmember*',
             cli_name='external',
             label=_('External member'),
-            doc=_('comma-separated SIDs of members of a trusted domain'),
+            doc=_('comma-separated list of members of a trusted domain in DOM\\name or name at domain form'),
             csv=True,
             flags=['no_create', 'no_update', 'no_search'],
         )
@@ -382,7 +384,11 @@ class group_add_member(LDAPAddMember):
                 if domain_validator.is_trusted_sid_valid(sid):
                     sids.append(sid)
                 else:
-                    failed_sids.append((sid, 'Not a trusted domain SID'))
+                    actual_sid = domain_validator.get_sid_trusted_domain_object(sid)
+                    if isinstance(actual_sid, unicode):
+                        sids.append(actual_sid)
+                    else:
+                        failed_sids.append((sid, 'Not a trusted domain SID'))
             if len(sids) == 0:
                 raise errors.ValidationError(name=_('external member'),
                                              error=_('values are not recognized as valid SIDs from trusted domain'))
diff --git a/ipalib/plugins/trust.py b/ipalib/plugins/trust.py
index 44679e7a26c2fd70dc5ad25b312ccfd363df15a7..ffff1e2fc0684484bd962f21c6ad0695d5d52054 100644
--- a/ipalib/plugins/trust.py
+++ b/ipalib/plugins/trust.py
@@ -78,24 +78,23 @@ should be included into one of local POSIX groups.
 
 Example:
 
-1. Make note of the trusted domain security identifier
-
-   domainsid = `ipa trust-show <ad.domain> | grep Identifier | cut -d: -f2`
-
-2. Create group for the trusted domain admins' mapping and their local POSIX group:
+1. Create group for the trusted domain admins' mapping and their local POSIX group:
 
    ipa group-add --desc='<ad.domain> admins external map' ad_admins_external --external
    ipa group-add --desc='<ad.domain> admins' ad_admins
 
-3. Add security identifier of Domain Admins of the <ad.domain> to the ad_admins_external
-   group (security identifier of <ad.domain SID>-512 is Domain Admins group):
+2. Add security identifier of Domain Admins of the <ad.domain> to the ad_admins_external
+   group:
 
-   ipa group-add-member ad_admins_external --external ${domainsid}-512
+   ipa group-add-member ad_admins_external --external 'AD\\Domain Admins'
 
-4. Allow members of ad_admins_external group to be associated with ad_admins POSIX group:
+3. Allow members of ad_admins_external group to be associated with ad_admins POSIX group:
 
    ipa group-add-member ad_admins --groups ad_admins_external
 
+4. List members of external members of ad_admins_external group to see their SIDs:
+
+   ipa group-show ad_admins_external
 """)
 
 trust_output_params = (
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py
index c40313a697fa6ff842129944b7ad9c1f5fc14a77..16837ae4aceea7ad0de0a801eb1c0ecec0235947 100644
--- a/ipaserver/dcerpc.py
+++ b/ipaserver/dcerpc.py
@@ -29,6 +29,7 @@ from ipalib import Command
 from ipalib import errors
 from ipapython import ipautil
 from ipapython.ipa_log_manager import *
+from ipapython.dn import DN
 from ipaserver.install import installutils
 
 import os, string, struct, copy
@@ -46,6 +47,10 @@ try:
 except ImportError:
     from ldap.controls import LDAPControl as LDAPControl    #pylint: disable=F0401
 import ldap as _ldap
+from ipaserver.ipaldap import IPAdmin
+from ipalib.session import krbccache_dir, krbccache_prefix
+from dns import resolver, rdatatype
+from dns.exception import DNSException
 
 __doc__ = _("""
 Classes to manage trust joins using DCE-RPC calls
@@ -102,6 +107,8 @@ class DomainValidator(object):
     ATTR_FLATNAME = 'ipantflatname'
     ATTR_SID = 'ipantsecurityidentifier'
     ATTR_TRUSTED_SID = 'ipanttrusteddomainsid'
+    ATTR_TRUST_PARTNER = 'ipanttrustpartner'
+    ATTR_TRUST_AUTHOUT = 'ipanttrustauthoutgoing'
 
     def __init__(self, api):
         self.api = api
@@ -111,6 +118,9 @@ class DomainValidator(object):
         self.dn = None
         self.sid = None
         self._domains = None
+        self._info = dict()
+        self._creds = None
+        self._parm = None
 
     def is_configured(self):
         cn_trust_local = DN(('cn', self.api.env.domain), self.api.env.container_cifsdomains, self.api.env.basedn)
@@ -125,14 +135,22 @@ class DomainValidator(object):
         return True
 
     def get_trusted_domains(self):
+        """Returns dict of trusted domain tuples (flatname, sid, trust_auth_outgoing), keyed by domain name"""
         cn_trust = DN(('cn', 'ad'), self.api.env.container_trusts, self.api.env.basedn)
         try:
             search_kw = {'objectClass': 'ipaNTTrustedDomain'}
             filter = self.ldap.make_filter(search_kw, rules=self.ldap.MATCH_ALL)
             (entries, truncated) = self.ldap.find_entries(filter=filter, base_dn=cn_trust,
-                                                          attrs_list=[self.ATTR_TRUSTED_SID, 'dn'])
+                                                          attrs_list=[self.ATTR_TRUSTED_SID,
+                                                                      self.ATTR_FLATNAME,
+                                                                      self.ATTR_TRUST_PARTNER,
+                                                                      self.ATTR_TRUST_AUTHOUT])
 
-            result = map (lambda entry: security.dom_sid(entry[1][self.ATTR_TRUSTED_SID][0]), entries)
+            result = dict()
+            for entry in entries:
+                result[entry[1][self.ATTR_TRUST_PARTNER][0]] = (entry[1][self.ATTR_FLATNAME][0].lower(),
+                                                                security.dom_sid(entry[1][self.ATTR_TRUSTED_SID][0]),
+                                                                entry[1][self.ATTR_TRUST_AUTHOUT][0])
             return result
         except errors.NotFound, e:
             return []
@@ -158,13 +176,222 @@ class DomainValidator(object):
         # We have non-zero list of trusted domains and have to go through them
         # one by one and check their sids as prefixes
         test_sid_subauths = test_sid.sub_auths
-        for domsid in self._domains:
+        for domain in self._domains:
+            domsid = self._domains[domain][1]
             sub_auths = domsid.sub_auths
             num_auths = min(test_sid.num_auths, domsid.num_auths)
             if test_sid_subauths[:num_auths] == sub_auths[:num_auths]:
                 return True
         return False
 
+    def normalize_name(self, name):
+        result = dict()
+        components = name.split('@')
+        if len(components) == 2:
+            result['domain'] = unicode(components[1]).lower()
+            result['name'] = unicode(components[0]).lower()
+        else:
+            components = name.split('\\')
+            if len(components) == 2:
+                result['flatname'] = unicode(components[0]).lower()
+                result['name'] = unicode(components[1]).lower()
+            else:
+                result['name'] = unicode(name).lower()
+        return result
+
+    def get_sid_trusted_domain_object(self, object_name):
+        """Returns SID for the trusted domain object (user or group only)"""
+        if not self.domain:
+            # our domain is not configured or self.is_configured() never run
+            return None
+        if not self._domains:
+            self._domains = self.get_trusted_domains()
+        if len(self._domains) == 0:
+            # Our domain is configured but no trusted domains are configured
+            return None
+        components = self.normalize_name(object_name)
+        if not ('domain' in components or 'flatname' in components):
+            # No domain or realm specified, ambiguous search
+            return False
+
+        entry = None
+        if 'domain' in components and components['domain'] in self._domains:
+            # Now we have a name to check against our list of trusted domains
+            entry = self.resolve_against_gc(components['domain'], components['name'])
+        elif 'flatname' in components:
+            # Flatname was specified, traverse through the list of trusted
+            # domains first to find the proper one
+            for domain in self._domains:
+                if self._domains[domain][0] == components['flatname']:
+                    entry = self.resolve_against_gc(domain, components['name'])
+                    if entry:
+                        break
+        if entry:
+            try:
+                test_sid = security.dom_sid(entry)
+                return unicode(test_sid)
+            except TypeError, e:
+                return False
+        return False
+
+    def __sid_to_str(self, sid):
+        """
+        Converts binary SID to string representation
+        Returns unicode string
+        """
+        sid_rev_num = ord(sid[0])
+        number_sub_id = ord(sid[1])
+        ia = struct.unpack('!Q','\x00\x00'+sid[2:8])[0]
+        subs = [
+            struct.unpack('<I',sid[8+4*i:12+4*i])[0]
+            for i in range(number_sub_id)
+        ]
+        return u'S-%d-%d-%s' % ( sid_rev_num, ia, '-'.join([str(s) for s in subs]),)
+
+    def __extract_trusted_auth(self, info):
+        """
+        Returns in clear trusted domain account credentials
+        """
+        clear = None
+        auth = drsblobs.trustAuthInOutBlob()
+        auth.__ndr_unpack__(info['auth'])
+        auth_array = auth.current.array[0]
+        if auth_array.AuthType == lsa.TRUST_AUTH_TYPE_CLEAR:
+            clear = ''.join(map(chr, auth_array.AuthInfo.password)).decode('utf-16-le')
+        return clear
+
+    def __kinit_as_trusted_account(self, info, password):
+        """
+        Initializes ccache with trusted domain account credentials.
+
+        Applies session code defaults for ccache directory and naming prefix.
+        Session code uses krbccache_prefix+<pid>, we use
+        krbccache_prefix+<TD>+<domain netbios name> so there is no clash
+
+        Returns tuple (ccache name, principal) where (None, None) signifes an error
+        on ccache initialization
+        """
+        ccache_name = os.path.join(krbccache_dir, "%sTD%s" % (krbccache_prefix, info['name'][0]))
+        principal = '%s$@%s' % (self.flatname, info['dns_domain'].upper())
+        (stdout, stderr, returncode) = ipautil.run(['/usr/bin/kinit', principal],
+                                                   env={'KRB5CCNAME':ccache_name},
+                                                   stdin=password, raiseonerr=False)
+        if returncode == 0:
+            return (ccache_name, principal)
+        else:
+            return (None, None)
+
+    def resolve_against_gc(self, domain, name):
+        """
+        Resolves `name' against trusted domain `domain' using Global Catalog
+        Returns SID of the `name' or None
+        """
+        entry = None
+        sid = None
+        info = self.__retrieve_trusted_domain_gc_list(domain)
+        if not info:
+            return None
+        for (host, port) in info['gc']:
+            entry = self.__resolve_against_gc(info, host, port, name)
+            if entry:
+                break
+
+        if entry:
+            l = len(entry)
+            if l > 2:
+                # Treat non-unique entries as invalid
+                return None
+            sid = self.__sid_to_str(entry[0][1]['objectSid'][0])
+        return sid
+
+    def __resolve_against_gc(self, info, host, port, name):
+        """
+        Actual resolution against LDAP server, using SASL GSSAPI authentication
+        Returns LDAP result or None
+        """
+        conn = IPAdmin(host=host, port=port)
+        auth = self.__extract_trusted_auth(info)
+        if auth:
+            (ccache_name, principal) = self.__kinit_as_trusted_account(info, auth)
+            if ccache_name:
+                cb_info = dict()
+                # pass empty dict, SASL GSSAPI is able to get all from the ccache
+                sasl_auth = _ldap.sasl.sasl(cb_info,'GSSAPI')
+                old_ccache = os.environ.get('KRB5CCNAME')
+                os.environ["KRB5CCNAME"] = ccache_name
+                # OPT_X_SASL_NOCANON is used to avoid hard requirement for PTR
+                # records pointing back to the same host name
+                conn.set_option(_ldap.OPT_X_SASL_NOCANON, _ldap.OPT_ON)
+                conn.sasl_interactive_bind_s(None, sasl_auth)
+                base = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
+                # We don't use conn.getEntry() because it will attempt to fetch schema from GC and that will fail
+                filterstr = conn.encode('(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' % dict(name=name))
+                attrlist = conn.encode(['sAMAccountName', 'sAMAccountType', 'objectSid', 'groupType', 'description'])
+                entry = conn.conn.search_s(str(base), _ldap.SCOPE_SUBTREE, filterstr, attrlist, 0)
+                os.environ["KRB5CCNAME"] = old_ccache
+                return entry
+
+    def __retrieve_trusted_domain_gc_list(self, domain):
+        """
+        Retrieves domain information and preferred GC list
+        Returns dictionary with following keys
+             name       -- NetBIOS name of the trusted domain
+             dns_domain -- DNS name of the trusted domain
+             auth       -- encrypted credentials for trusted domain account
+             gc         -- array of tuples (server, port) for Global Catalog
+        """
+        if domain in self._info:
+            return self._info[domain]
+
+        if not self._creds:
+            self._parm = param.LoadParm()
+            self._parm.load(os.path.join(ipautil.SHARE_DIR,"smb.conf.empty"))
+            self._parm.set('netbios name', self.flatname)
+            self._creds = credentials.Credentials()
+            self._creds.set_kerberos_state(credentials.MUST_USE_KERBEROS)
+            self._creds.guess(self._parm)
+            self._creds.set_workstation(self.flatname)
+
+        netrc = net.Net(creds=self._creds, lp=self._parm)
+        finddc_error = None
+        result = None
+        try:
+            result = netrc.finddc(domain=domain, flags=nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_GC | nbt.NBT_SERVER_CLOSEST)
+        except RuntimeError, e:
+            finddc_error = e
+
+        info = dict()
+        info['auth'] = self._domains[domain][2]
+        servers = []
+        if result:
+            info['name'] = unicode(result.domain_name)
+            info['dns_domain'] = unicode(result.dns_domain)
+            servers = [(unicode(result.pdc_dns_name), 3268)]
+        else:
+            info['name'] = self._domains[domain]
+            info['dns_domain'] = domain
+            # Retrieve GC servers list
+            gc_name = '_gc._tcp.%s.' % info['dns_domain']
+
+            try:
+                answers = resolver.query(gc_name, rdatatype.SRV)
+            except DNSException, e:
+                answers = []
+
+            for answer in answers:
+                server = str(answer.target).rstrip(".")
+                servers.append((server, answer.port))
+
+        info['gc'] = servers
+
+        # Both methods should not fail at the same time
+        if finddc_error and len(info['gc']) == 0:
+            raise assess_dcerpc_exception(message=str(finddc_error))
+
+        self._info[domain] = info
+        return info
+
+
 class TrustDomainInstance(object):
 
     def __init__(self, hostname, creds=None):
-- 
1.7.12.1




More information about the Freeipa-devel mailing list