[Freeipa-devel] [PATCH 1/1] Resolve external members from trusted domain via Global Catalog

Alexander Bokovoy abokovoy at redhat.com
Mon Oct 29 17:59:59 UTC 2012


A sequence is following:
1. Match external member against existing trusted domain
2. Find trusted domain's domain controller
3. Fetch trusted domain account auth info
4. Set up ccache in /var/run/ipa/ipa_memcached/krb5cc_TRUSTEDDOMAIN with principal ourdomain$@trusted.domain
5. Do LDAP SASL interactive bind using the ccache
6. Search for the member's SID
7. Decode SID
8. Replace external member name by SID

https://fedorahosted.org/freeipa/ticket/3211
---
 ipalib/plugins/group.py    |  32 +++++----
 ipaserver/dcerpc.py        | 172 +++++++++++++++++++++++++++++++++++++++++----
 ipaserver/plugins/ldap2.py |   3 +
 3 files changed, 181 insertions(+), 26 deletions(-)

diff --git a/ipalib/plugins/group.py b/ipalib/plugins/group.py
index a174ba62cc32a7fb83474f52e2621521553889af..f86b134e61fc8c7518a64d25329babee3398c6ef 100644
--- a/ipalib/plugins/group.py
+++ b/ipalib/plugins/group.py
@@ -83,28 +83,30 @@ External members should be added to groups that specifically created as
 external and non-POSIX. Such group later should be included into one of POSIX
 groups.
 
-An external group member is currently a Security Identifier as defined by
-the trusted domain.
+An external group member is currently a Security Identifier (SID) as defined by
+the trusted domain. When adding external group members, it is possible to
+specify them in either SID, or DOM\\name, or name at domain format. IPA will attempt
+to resolve passed name to SID with the use of Global Catalog of the trusted domain.
 
 Example:
 
-1. Make note of the trusted domain security identifier
-
-   domainsid = `ipa trust-show <ad.domain> | grep Identifier | cut -d: -f2`
-
-2. Create group for the trusted domain admins' mapping and their local POSIX group:
+1. Create group for the trusted domain admins' mapping and their local POSIX group:
 
    ipa group-add --desc='<ad.domain> admins external map' ad_admins_external --external
    ipa group-add --desc='<ad.domain> admins' ad_admins
 
-3. Add security identifier of Domain Admins of the <ad.domain> to the ad_admins_external
-   group (security identifier of <ad.domain SID>-513 is Domain Admins group):
+2. Add security identifier of Domain Admins of the <ad.domain> to the ad_admins_external
+   group:
 
-   ipa group-add-member ad_admins_external --external ${domainsid}-513
+   ipa group-add-member ad_admins_external --external 'AD\\Domain Admins'
 
-4. Allow members of ad_admins_external group to be associated with ad_admins POSIX group:
+3. Allow members of ad_admins_external group to be associated with ad_admins POSIX group:
 
    ipa group-add-member ad_admins --groups ad_admins_external
+
+4. List members of external members of ad_admins_external group to see their SIDs:
+
+   ipa group-show ad_admins_external
 """)
 
 PROTECTED_GROUPS = (u'admins', u'trust admins', u'default smb group')
@@ -165,7 +167,7 @@ api.register(group)
 ipaexternalmember_param = Str('ipaexternalmember*',
             cli_name='external',
             label=_('External member'),
-            doc=_('comma-separated SIDs of members of a trusted domain'),
+            doc=_('comma-separated list of members of a trusted domain in DOM\\name or name at domain form'),
             csv=True,
             flags=['no_create', 'no_update', 'no_search'],
         )
@@ -382,7 +384,11 @@ class group_add_member(LDAPAddMember):
                 if domain_validator.is_trusted_sid_valid(sid):
                     sids.append(sid)
                 else:
-                    failed_sids.append((sid, 'Not a trusted domain SID'))
+                    actual_sid = domain_validator.get_sid_trusted_domain_object(sid)
+                    if isinstance(actual_sid, unicode):
+                        sids.append(actual_sid)
+                    else:
+                        failed_sids.append((sid, 'Not a trusted domain SID'))
             if len(sids) == 0:
                 raise errors.ValidationError(name=_('external member'),
                                              error=_('values are not recognized as valid SIDs from trusted domain'))
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py
index c40313a697fa6ff842129944b7ad9c1f5fc14a77..2c53fafe532414a3ad6624a3583cf3f853ccc72c 100644
--- a/ipaserver/dcerpc.py
+++ b/ipaserver/dcerpc.py
@@ -29,6 +29,7 @@ from ipalib import Command
 from ipalib import errors
 from ipapython import ipautil
 from ipapython.ipa_log_manager import *
+from ipapython.dn import DN
 from ipaserver.install import installutils
 
 import os, string, struct, copy
@@ -46,6 +47,7 @@ try:
 except ImportError:
     from ldap.controls import LDAPControl as LDAPControl    #pylint: disable=F0401
 import ldap as _ldap
+from ipaserver.ipaldap import IPAdmin
 
 __doc__ = _("""
 Classes to manage trust joins using DCE-RPC calls
@@ -102,6 +104,8 @@ class DomainValidator(object):
     ATTR_FLATNAME = 'ipantflatname'
     ATTR_SID = 'ipantsecurityidentifier'
     ATTR_TRUSTED_SID = 'ipanttrusteddomainsid'
+    ATTR_TRUST_PARTNER = 'ipanttrustpartner'
+    ATTR_TRUST_AUTHOUT = 'ipanttrustauthoutgoing'
 
     def __init__(self, api):
         self.api = api
@@ -111,6 +115,9 @@ class DomainValidator(object):
         self.dn = None
         self.sid = None
         self._domains = None
+        self._info = dict()
+        self._creds = None
+        self._parm = None
 
     def is_configured(self):
         cn_trust_local = DN(('cn', self.api.env.domain), self.api.env.container_cifsdomains, self.api.env.basedn)
@@ -125,14 +132,22 @@ class DomainValidator(object):
         return True
 
     def get_trusted_domains(self):
+        """Returns dict of trusted domain tuples (flatname, sid, trust_auth_outgoing), keyed by domain name"""
         cn_trust = DN(('cn', 'ad'), self.api.env.container_trusts, self.api.env.basedn)
         try:
             search_kw = {'objectClass': 'ipaNTTrustedDomain'}
             filter = self.ldap.make_filter(search_kw, rules=self.ldap.MATCH_ALL)
             (entries, truncated) = self.ldap.find_entries(filter=filter, base_dn=cn_trust,
-                                                          attrs_list=[self.ATTR_TRUSTED_SID, 'dn'])
+                                                          attrs_list=[self.ATTR_TRUSTED_SID,
+                                                                      self.ATTR_FLATNAME,
+                                                                      self.ATTR_TRUST_PARTNER,
+                                                                      self.ATTR_TRUST_AUTHOUT])
 
-            result = map (lambda entry: security.dom_sid(entry[1][self.ATTR_TRUSTED_SID][0]), entries)
+            result = dict()
+            for entry in entries:
+                result[entry[1][self.ATTR_TRUST_PARTNER][0]] = (entry[1][self.ATTR_FLATNAME][0].lower(),
+                                                                security.dom_sid(entry[1][self.ATTR_TRUSTED_SID][0]),
+                                                                entry[1][self.ATTR_TRUST_AUTHOUT][0])
             return result
         except errors.NotFound, e:
             return []
@@ -158,13 +173,153 @@ class DomainValidator(object):
         # We have non-zero list of trusted domains and have to go through them
         # one by one and check their sids as prefixes
         test_sid_subauths = test_sid.sub_auths
-        for domsid in self._domains:
+        for domain in self._domains:
+            domsid = self._domains[domain][1]
             sub_auths = domsid.sub_auths
             num_auths = min(test_sid.num_auths, domsid.num_auths)
             if test_sid_subauths[:num_auths] == sub_auths[:num_auths]:
                 return True
         return False
 
+    def normalize_name(self, name):
+        result = dict()
+        components = name.split('@')
+        if len(components) == 2:
+            result['domain'] = unicode(components[1]).lower()
+            result['name'] = unicode(components[0]).lower()
+        else:
+            components = name.split('\\')
+            if len(components) == 2:
+                result['flatname'] = unicode(components[0]).lower()
+                result['name'] = unicode(components[1]).lower()
+            else:
+                result['name'] = unicode(name).lower()
+        return result
+
+    def get_sid_trusted_domain_object(self, object_name):
+        """Returns SID for the trusted domain object (user or group only)"""
+        if not self.domain:
+            # our domain is not configured or self.is_configured() never run
+            return None
+        if not self._domains:
+            self._domains = self.get_trusted_domains()
+        if len(self._domains) == 0:
+            # Our domain is configured but no trusted domains are configured
+            return None
+        components = self.normalize_name(object_name)
+        if not ('domain' in components or 'flatname' in components):
+            # No domain or realm specified, ambiguous search
+            return False
+
+        entry = None
+        if 'domain' in components and components['domain'] in self._domains:
+            # Now we have a name to check against our list of trusted domains
+            info = self.__retrieve_trusted_domain_dc(components['domain'])
+            if 'dc' in info:
+                entry = self.__resolve_against_gc(info, components['name'])
+        elif 'flatname' in components:
+            # Flatname was specified, traverse through the list of trusted
+            # domains first to find the proper one
+            for domain in self._domains:
+                if self._domains[domain][0] == components['flatname']:
+                    info = self.__retrieve_trusted_domain_dc(domain)
+                    if info and 'dc' in info:
+                        entry = self.__resolve_against_gc(info, components['name'])
+        if entry:
+            l = len(entry)
+            if l > 2:
+                return False
+            sid = self.__sid_to_str(entry[0][1]['objectSid'][0])
+            try:
+                test_sid = security.dom_sid(sid)
+                return unicode(test_sid)
+            except TypeError, e:
+                return False
+        return False
+
+    def __sid_to_str(self, sid):
+        sid_rev_num = ord(sid[0])
+        number_sub_id = ord(sid[1])
+        ia = struct.unpack('!Q','\x00\x00'+sid[2:8])[0]
+        subs = [
+            struct.unpack('<I',sid[8+4*i:12+4*i])[0]
+            for i in range(number_sub_id)
+        ]
+        return u'S-%d-%d-%s' % ( sid_rev_num, ia, '-'.join([str(s) for s in subs]),)
+
+    def __extract_trusted_auth(self, info):
+        clear = None
+        auth = drsblobs.trustAuthInOutBlob()
+        auth.__ndr_unpack__(info['auth'])
+        auth_array = auth.current.array[0]
+        if auth_array.AuthType == lsa.TRUST_AUTH_TYPE_CLEAR:
+            clear = ''.join(map(chr, auth_array.AuthInfo.password)).decode('utf-16-le')
+        return clear
+
+    def __kinit_as_trusted_account(self, info, password):
+        ccache_name = "/var/run/ipa/ipa_memcached/krb5cc_TRUSTEDDOMAIN"
+        principal = '%s$@%s' % (self.flatname, info['dns_domain'].upper())
+        (stdout, stderr, returncode) = ipautil.run(['/usr/bin/kinit', principal],
+                                                   env={'KRB5CCNAME':ccache_name},
+                                                   stdin=password, raiseonerr=False)
+        if returncode == 0:
+            return (ccache_name, principal)
+
+    def __resolve_against_gc(self, info, name):
+        conn = IPAdmin(host=info['dc'], port=3268)
+        auth = self.__extract_trusted_auth(info)
+        if auth:
+            (ccache_name, principal) = self.__kinit_as_trusted_account(info, auth)
+            if ccache_name:
+                cb_info = dict()
+                # pass empty dict, SASL GSSAPI is able to get all from the ccache
+                sasl_auth = _ldap.sasl.sasl(cb_info,'GSSAPI')
+                old_ccache = os.environ.get('KRB5CCNAME')
+                os.environ["KRB5CCNAME"] = ccache_name
+                conn.sasl_interactive_bind_s(None, sasl_auth)
+                base = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
+                # We don't use conn.getEntry() because it will attempt to fetch schema from GC and that will fail
+                filterstr = conn.encode('(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' % dict(name=name))
+                attrlist = conn.encode(['sAMAccountName', 'sAMAccountType', 'objectSid', 'groupType', 'description'])
+                entry = conn.conn.search_s(str(base), _ldap.SCOPE_SUBTREE, filterstr, attrlist, 0)
+                os.environ["KRB5CCNAME"] = old_ccache
+                return entry
+
+    def __retrieve_trusted_domain_dc(self, domain):
+        if domain in self._info:
+            return self._info[domain]
+
+        if not self._creds:
+            self._parm = param.LoadParm()
+            self._parm.load(os.path.join(ipautil.SHARE_DIR,"smb.conf.empty"))
+            self._parm.set('netbios name', self.flatname)
+            self._creds = credentials.Credentials()
+            self._creds.set_kerberos_state(credentials.MUST_USE_KERBEROS)
+            self._creds.guess(self._parm)
+            self._creds.set_workstation(self.flatname)
+
+        # FIXME: search for actual GC server instead of DC and assuming it is always GC
+        netrc = net.Net(creds=self._creds, lp=self._parm)
+        try:
+            result = netrc.finddc(domain=domain, flags=nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_DS)
+        except RuntimeError, e:
+            raise assess_dcerpc_exception(message=str(e))
+
+        if not result:
+            return None
+
+        info = dict()
+        info['name'] = unicode(result.domain_name)
+        info['dns_domain'] = unicode(result.dns_domain)
+        info['dns_forest'] = unicode(result.forest)
+        info['guid'] = unicode(result.domain_uuid)
+        info['dc'] = unicode(result.pdc_dns_name)
+        info['auth'] = self._domains[domain][2]
+
+        self._info[domain] = info
+        return info
+
+
 class TrustDomainInstance(object):
 
     def __init__(self, hostname, creds=None):
@@ -375,19 +530,10 @@ class TrustDomainInstance(object):
         except RuntimeError, e:
             pass
         try:
-            trustdom_handle = self._pipe.CreateTrustedDomainEx2(self._policy_handle, info, self.auth_info, security.SEC_STD_DELETE)
+            self._pipe.CreateTrustedDomainEx2(self._policy_handle, info, self.auth_info, security.SEC_STD_DELETE)
         except RuntimeError, (num, message):
             raise assess_dcerpc_exception(num=num, message=message)
 
-        try:
-            infoclass = lsa.TrustDomainInfoSupportedEncTypes()
-            infoclass.enc_types = security.KERB_ENCTYPE_RC4_HMAC_MD5
-            infoclass.enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
-            infoclass.enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
-            self._pipe.SetInformationTrustedDomain(trustdom_handle, lsa.LSA_TRUSTED_DOMAIN_SUPPORTED_ENCRYPTION_TYPES, infoclass)
-        except RuntimeError, e:
-            pass
-
     def verify_trust(self, another_domain):
         def retrieve_netlogon_info_2(domain, function_code, data):
             try:
diff --git a/ipaserver/plugins/ldap2.py b/ipaserver/plugins/ldap2.py
index caf35096c981363927f8471e2567476954f664e5..55c25dadf5b8cd4d9afdadf8dfec9c156b2121b8 100644
--- a/ipaserver/plugins/ldap2.py
+++ b/ipaserver/plugins/ldap2.py
@@ -94,6 +94,7 @@ MEMBERS_INDIRECT = 2
 SASL_AUTH = _ldap_sasl.sasl({}, 'GSSAPI')
 
 DN_SYNTAX_OID = '1.3.6.1.4.1.1466.115.121.1.12'
+OBJECT_SID_OID = '1.2.840.113556.1.4.146'
 
 def unicode_from_utf8(val):
     '''
@@ -263,6 +264,7 @@ class IPASimpleLDAPObject(object):
     # Note: the oid for dn syntax is: 1.3.6.1.4.1.1466.115.121.1.12
 
     _SYNTAX_MAPPING = {
+        OBJECT_SID_OID                   : str, # Object SID
         '1.3.6.1.4.1.1466.115.121.1.1'   : str, # ACI item
         '1.3.6.1.4.1.1466.115.121.1.4'   : str, # Audio
         '1.3.6.1.4.1.1466.115.121.1.5'   : str, # Binary
@@ -313,6 +315,7 @@ class IPASimpleLDAPObject(object):
         'managedtemplate': DN_SYNTAX_OID, # DN
         'managedbase':     DN_SYNTAX_OID, # DN
         'originscope':     DN_SYNTAX_OID, # DN
+        'objectsid' :      OBJECT_SID_OID,# Object SID
     })
 
     def __init__(self, uri):
-- 
1.7.12.1




More information about the Freeipa-devel mailing list