[Freeipa-devel] [PATCH 1/1] Resolve external members from trusted domain via Global Catalog
Alexander Bokovoy
abokovoy at redhat.com
Mon Oct 29 17:59:59 UTC 2012
A sequence is following:
1. Match external member against existing trusted domain
2. Find trusted domain's domain controller
3. Fetch trusted domain account auth info
4. Set up ccache in /var/run/ipa/ipa_memcached/krb5cc_TRUSTEDDOMAIN with principal ourdomain$@trusted.domain
5. Do LDAP SASL interactive bind using the ccache
6. Search for the member's SID
7. Decode SID
8. Replace external member name by SID
https://fedorahosted.org/freeipa/ticket/3211
---
ipalib/plugins/group.py | 32 +++++----
ipaserver/dcerpc.py | 172 +++++++++++++++++++++++++++++++++++++++++----
ipaserver/plugins/ldap2.py | 3 +
3 files changed, 181 insertions(+), 26 deletions(-)
diff --git a/ipalib/plugins/group.py b/ipalib/plugins/group.py
index a174ba62cc32a7fb83474f52e2621521553889af..f86b134e61fc8c7518a64d25329babee3398c6ef 100644
--- a/ipalib/plugins/group.py
+++ b/ipalib/plugins/group.py
@@ -83,28 +83,30 @@ External members should be added to groups that specifically created as
external and non-POSIX. Such group later should be included into one of POSIX
groups.
-An external group member is currently a Security Identifier as defined by
-the trusted domain.
+An external group member is currently a Security Identifier (SID) as defined by
+the trusted domain. When adding external group members, it is possible to
+specify them in either SID, or DOM\\name, or name at domain format. IPA will attempt
+to resolve passed name to SID with the use of Global Catalog of the trusted domain.
Example:
-1. Make note of the trusted domain security identifier
-
- domainsid = `ipa trust-show <ad.domain> | grep Identifier | cut -d: -f2`
-
-2. Create group for the trusted domain admins' mapping and their local POSIX group:
+1. Create group for the trusted domain admins' mapping and their local POSIX group:
ipa group-add --desc='<ad.domain> admins external map' ad_admins_external --external
ipa group-add --desc='<ad.domain> admins' ad_admins
-3. Add security identifier of Domain Admins of the <ad.domain> to the ad_admins_external
- group (security identifier of <ad.domain SID>-513 is Domain Admins group):
+2. Add security identifier of Domain Admins of the <ad.domain> to the ad_admins_external
+ group:
- ipa group-add-member ad_admins_external --external ${domainsid}-513
+ ipa group-add-member ad_admins_external --external 'AD\\Domain Admins'
-4. Allow members of ad_admins_external group to be associated with ad_admins POSIX group:
+3. Allow members of ad_admins_external group to be associated with ad_admins POSIX group:
ipa group-add-member ad_admins --groups ad_admins_external
+
+4. List members of external members of ad_admins_external group to see their SIDs:
+
+ ipa group-show ad_admins_external
""")
PROTECTED_GROUPS = (u'admins', u'trust admins', u'default smb group')
@@ -165,7 +167,7 @@ api.register(group)
ipaexternalmember_param = Str('ipaexternalmember*',
cli_name='external',
label=_('External member'),
- doc=_('comma-separated SIDs of members of a trusted domain'),
+ doc=_('comma-separated list of members of a trusted domain in DOM\\name or name at domain form'),
csv=True,
flags=['no_create', 'no_update', 'no_search'],
)
@@ -382,7 +384,11 @@ class group_add_member(LDAPAddMember):
if domain_validator.is_trusted_sid_valid(sid):
sids.append(sid)
else:
- failed_sids.append((sid, 'Not a trusted domain SID'))
+ actual_sid = domain_validator.get_sid_trusted_domain_object(sid)
+ if isinstance(actual_sid, unicode):
+ sids.append(actual_sid)
+ else:
+ failed_sids.append((sid, 'Not a trusted domain SID'))
if len(sids) == 0:
raise errors.ValidationError(name=_('external member'),
error=_('values are not recognized as valid SIDs from trusted domain'))
diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py
index c40313a697fa6ff842129944b7ad9c1f5fc14a77..2c53fafe532414a3ad6624a3583cf3f853ccc72c 100644
--- a/ipaserver/dcerpc.py
+++ b/ipaserver/dcerpc.py
@@ -29,6 +29,7 @@ from ipalib import Command
from ipalib import errors
from ipapython import ipautil
from ipapython.ipa_log_manager import *
+from ipapython.dn import DN
from ipaserver.install import installutils
import os, string, struct, copy
@@ -46,6 +47,7 @@ try:
except ImportError:
from ldap.controls import LDAPControl as LDAPControl #pylint: disable=F0401
import ldap as _ldap
+from ipaserver.ipaldap import IPAdmin
__doc__ = _("""
Classes to manage trust joins using DCE-RPC calls
@@ -102,6 +104,8 @@ class DomainValidator(object):
ATTR_FLATNAME = 'ipantflatname'
ATTR_SID = 'ipantsecurityidentifier'
ATTR_TRUSTED_SID = 'ipanttrusteddomainsid'
+ ATTR_TRUST_PARTNER = 'ipanttrustpartner'
+ ATTR_TRUST_AUTHOUT = 'ipanttrustauthoutgoing'
def __init__(self, api):
self.api = api
@@ -111,6 +115,9 @@ class DomainValidator(object):
self.dn = None
self.sid = None
self._domains = None
+ self._info = dict()
+ self._creds = None
+ self._parm = None
def is_configured(self):
cn_trust_local = DN(('cn', self.api.env.domain), self.api.env.container_cifsdomains, self.api.env.basedn)
@@ -125,14 +132,22 @@ class DomainValidator(object):
return True
def get_trusted_domains(self):
+ """Returns dict of trusted domain tuples (flatname, sid, trust_auth_outgoing), keyed by domain name"""
cn_trust = DN(('cn', 'ad'), self.api.env.container_trusts, self.api.env.basedn)
try:
search_kw = {'objectClass': 'ipaNTTrustedDomain'}
filter = self.ldap.make_filter(search_kw, rules=self.ldap.MATCH_ALL)
(entries, truncated) = self.ldap.find_entries(filter=filter, base_dn=cn_trust,
- attrs_list=[self.ATTR_TRUSTED_SID, 'dn'])
+ attrs_list=[self.ATTR_TRUSTED_SID,
+ self.ATTR_FLATNAME,
+ self.ATTR_TRUST_PARTNER,
+ self.ATTR_TRUST_AUTHOUT])
- result = map (lambda entry: security.dom_sid(entry[1][self.ATTR_TRUSTED_SID][0]), entries)
+ result = dict()
+ for entry in entries:
+ result[entry[1][self.ATTR_TRUST_PARTNER][0]] = (entry[1][self.ATTR_FLATNAME][0].lower(),
+ security.dom_sid(entry[1][self.ATTR_TRUSTED_SID][0]),
+ entry[1][self.ATTR_TRUST_AUTHOUT][0])
return result
except errors.NotFound, e:
return []
@@ -158,13 +173,153 @@ class DomainValidator(object):
# We have non-zero list of trusted domains and have to go through them
# one by one and check their sids as prefixes
test_sid_subauths = test_sid.sub_auths
- for domsid in self._domains:
+ for domain in self._domains:
+ domsid = self._domains[domain][1]
sub_auths = domsid.sub_auths
num_auths = min(test_sid.num_auths, domsid.num_auths)
if test_sid_subauths[:num_auths] == sub_auths[:num_auths]:
return True
return False
+ def normalize_name(self, name):
+ result = dict()
+ components = name.split('@')
+ if len(components) == 2:
+ result['domain'] = unicode(components[1]).lower()
+ result['name'] = unicode(components[0]).lower()
+ else:
+ components = name.split('\\')
+ if len(components) == 2:
+ result['flatname'] = unicode(components[0]).lower()
+ result['name'] = unicode(components[1]).lower()
+ else:
+ result['name'] = unicode(name).lower()
+ return result
+
+ def get_sid_trusted_domain_object(self, object_name):
+ """Returns SID for the trusted domain object (user or group only)"""
+ if not self.domain:
+ # our domain is not configured or self.is_configured() never run
+ return None
+ if not self._domains:
+ self._domains = self.get_trusted_domains()
+ if len(self._domains) == 0:
+ # Our domain is configured but no trusted domains are configured
+ return None
+ components = self.normalize_name(object_name)
+ if not ('domain' in components or 'flatname' in components):
+ # No domain or realm specified, ambiguous search
+ return False
+
+ entry = None
+ if 'domain' in components and components['domain'] in self._domains:
+ # Now we have a name to check against our list of trusted domains
+ info = self.__retrieve_trusted_domain_dc(components['domain'])
+ if 'dc' in info:
+ entry = self.__resolve_against_gc(info, components['name'])
+ elif 'flatname' in components:
+ # Flatname was specified, traverse through the list of trusted
+ # domains first to find the proper one
+ for domain in self._domains:
+ if self._domains[domain][0] == components['flatname']:
+ info = self.__retrieve_trusted_domain_dc(domain)
+ if info and 'dc' in info:
+ entry = self.__resolve_against_gc(info, components['name'])
+ if entry:
+ l = len(entry)
+ if l > 2:
+ return False
+ sid = self.__sid_to_str(entry[0][1]['objectSid'][0])
+ try:
+ test_sid = security.dom_sid(sid)
+ return unicode(test_sid)
+ except TypeError, e:
+ return False
+ return False
+
+ def __sid_to_str(self, sid):
+ sid_rev_num = ord(sid[0])
+ number_sub_id = ord(sid[1])
+ ia = struct.unpack('!Q','\x00\x00'+sid[2:8])[0]
+ subs = [
+ struct.unpack('<I',sid[8+4*i:12+4*i])[0]
+ for i in range(number_sub_id)
+ ]
+ return u'S-%d-%d-%s' % ( sid_rev_num, ia, '-'.join([str(s) for s in subs]),)
+
+ def __extract_trusted_auth(self, info):
+ clear = None
+ auth = drsblobs.trustAuthInOutBlob()
+ auth.__ndr_unpack__(info['auth'])
+ auth_array = auth.current.array[0]
+ if auth_array.AuthType == lsa.TRUST_AUTH_TYPE_CLEAR:
+ clear = ''.join(map(chr, auth_array.AuthInfo.password)).decode('utf-16-le')
+ return clear
+
+ def __kinit_as_trusted_account(self, info, password):
+ ccache_name = "/var/run/ipa/ipa_memcached/krb5cc_TRUSTEDDOMAIN"
+ principal = '%s$@%s' % (self.flatname, info['dns_domain'].upper())
+ (stdout, stderr, returncode) = ipautil.run(['/usr/bin/kinit', principal],
+ env={'KRB5CCNAME':ccache_name},
+ stdin=password, raiseonerr=False)
+ if returncode == 0:
+ return (ccache_name, principal)
+
+ def __resolve_against_gc(self, info, name):
+ conn = IPAdmin(host=info['dc'], port=3268)
+ auth = self.__extract_trusted_auth(info)
+ if auth:
+ (ccache_name, principal) = self.__kinit_as_trusted_account(info, auth)
+ if ccache_name:
+ cb_info = dict()
+ # pass empty dict, SASL GSSAPI is able to get all from the ccache
+ sasl_auth = _ldap.sasl.sasl(cb_info,'GSSAPI')
+ old_ccache = os.environ.get('KRB5CCNAME')
+ os.environ["KRB5CCNAME"] = ccache_name
+ conn.sasl_interactive_bind_s(None, sasl_auth)
+ base = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
+ # We don't use conn.getEntry() because it will attempt to fetch schema from GC and that will fail
+ filterstr = conn.encode('(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' % dict(name=name))
+ attrlist = conn.encode(['sAMAccountName', 'sAMAccountType', 'objectSid', 'groupType', 'description'])
+ entry = conn.conn.search_s(str(base), _ldap.SCOPE_SUBTREE, filterstr, attrlist, 0)
+ os.environ["KRB5CCNAME"] = old_ccache
+ return entry
+
+ def __retrieve_trusted_domain_dc(self, domain):
+ if domain in self._info:
+ return self._info[domain]
+
+ if not self._creds:
+ self._parm = param.LoadParm()
+ self._parm.load(os.path.join(ipautil.SHARE_DIR,"smb.conf.empty"))
+ self._parm.set('netbios name', self.flatname)
+ self._creds = credentials.Credentials()
+ self._creds.set_kerberos_state(credentials.MUST_USE_KERBEROS)
+ self._creds.guess(self._parm)
+ self._creds.set_workstation(self.flatname)
+
+ # FIXME: search for actual GC server instead of DC and assuming it is always GC
+ netrc = net.Net(creds=self._creds, lp=self._parm)
+ try:
+ result = netrc.finddc(domain=domain, flags=nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_DS)
+ except RuntimeError, e:
+ raise assess_dcerpc_exception(message=str(e))
+
+ if not result:
+ return None
+
+ info = dict()
+ info['name'] = unicode(result.domain_name)
+ info['dns_domain'] = unicode(result.dns_domain)
+ info['dns_forest'] = unicode(result.forest)
+ info['guid'] = unicode(result.domain_uuid)
+ info['dc'] = unicode(result.pdc_dns_name)
+ info['auth'] = self._domains[domain][2]
+
+ self._info[domain] = info
+ return info
+
+
class TrustDomainInstance(object):
def __init__(self, hostname, creds=None):
@@ -375,19 +530,10 @@ class TrustDomainInstance(object):
except RuntimeError, e:
pass
try:
- trustdom_handle = self._pipe.CreateTrustedDomainEx2(self._policy_handle, info, self.auth_info, security.SEC_STD_DELETE)
+ self._pipe.CreateTrustedDomainEx2(self._policy_handle, info, self.auth_info, security.SEC_STD_DELETE)
except RuntimeError, (num, message):
raise assess_dcerpc_exception(num=num, message=message)
- try:
- infoclass = lsa.TrustDomainInfoSupportedEncTypes()
- infoclass.enc_types = security.KERB_ENCTYPE_RC4_HMAC_MD5
- infoclass.enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
- infoclass.enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
- self._pipe.SetInformationTrustedDomain(trustdom_handle, lsa.LSA_TRUSTED_DOMAIN_SUPPORTED_ENCRYPTION_TYPES, infoclass)
- except RuntimeError, e:
- pass
-
def verify_trust(self, another_domain):
def retrieve_netlogon_info_2(domain, function_code, data):
try:
diff --git a/ipaserver/plugins/ldap2.py b/ipaserver/plugins/ldap2.py
index caf35096c981363927f8471e2567476954f664e5..55c25dadf5b8cd4d9afdadf8dfec9c156b2121b8 100644
--- a/ipaserver/plugins/ldap2.py
+++ b/ipaserver/plugins/ldap2.py
@@ -94,6 +94,7 @@ MEMBERS_INDIRECT = 2
SASL_AUTH = _ldap_sasl.sasl({}, 'GSSAPI')
DN_SYNTAX_OID = '1.3.6.1.4.1.1466.115.121.1.12'
+OBJECT_SID_OID = '1.2.840.113556.1.4.146'
def unicode_from_utf8(val):
'''
@@ -263,6 +264,7 @@ class IPASimpleLDAPObject(object):
# Note: the oid for dn syntax is: 1.3.6.1.4.1.1466.115.121.1.12
_SYNTAX_MAPPING = {
+ OBJECT_SID_OID : str, # Object SID
'1.3.6.1.4.1.1466.115.121.1.1' : str, # ACI item
'1.3.6.1.4.1.1466.115.121.1.4' : str, # Audio
'1.3.6.1.4.1.1466.115.121.1.5' : str, # Binary
@@ -313,6 +315,7 @@ class IPASimpleLDAPObject(object):
'managedtemplate': DN_SYNTAX_OID, # DN
'managedbase': DN_SYNTAX_OID, # DN
'originscope': DN_SYNTAX_OID, # DN
+ 'objectsid' : OBJECT_SID_OID,# Object SID
})
def __init__(self, uri):
--
1.7.12.1
More information about the Freeipa-devel
mailing list