[libvirt] [test-API][PATCH] Add testcases for testing permission control and sasl authentication of unix socket

Nan Zhang nzhang at redhat.com
Tue Aug 9 08:27:39 UTC 2011


On 08/06/2011 12:09 AM, Guannan Ren wrote:
>      add new testcases repos/remoteAccess/unix_perm_sasl.py
> ---
>   repos/remoteAccess/unix_perm_sasl.py |  234 ++++++++++++++++++++++++++++++++++
>   1 files changed, 234 insertions(+), 0 deletions(-)
>   create mode 100644 repos/remoteAccess/unix_perm_sasl.py
>
> diff --git a/repos/remoteAccess/unix_perm_sasl.py b/repos/remoteAccess/unix_perm_sasl.py
> new file mode 100644
> index 0000000..9bb2600
> --- /dev/null
> +++ b/repos/remoteAccess/unix_perm_sasl.py
> @@ -0,0 +1,234 @@
> +#!/usr/bin/env python
> +""" testing for permission and authentication of unix domain socket
> +    remoteAccess:unix_perm_sasl
> +        auth_unix_ro
> +            none|sasl
> +        auth_unix_rw
> +            none|sasl
> +        unix_sock_group(optional)
> +            libvirt
> +"""
> +
> +__author__ = 'Guannan Ren: gren at redhat.com'
> +__date__ = 'Fri Aug 5, 2011'
> +__version__ = '0.1.0'
> +__credits__ = 'Copyright (C) 2011 Red Hat, Inc.'
> +__all__ = ['unix_perm_sasl', 'group_sasl_set',
> +           'libvirt_configure', 'hypervisor_connecting_test']
> +
> +import os
> +import re
> +import sys
> +import commands
> +
> +from pwd import getpwnam
> +
> +def append_path(path):
> +    """Append root path of package"""
> +    if path in sys.path:
> +        pass
> +    else:
> +        sys.path.append(path)
> +
> +pwd = os.getcwd()
> +result = re.search('(.*)libvirt-test-API', pwd)
> +append_path(result.group(0))
> +
> +from lib import connectAPI
> +from exception import LibvirtAPI
> +
> +TESTING_USER = 'testapi'
> +LIBVIRTD_CONF = "/etc/libvirt/libvirtd.conf"
> +SASLPASSWD2 = "/usr/sbin/saslpasswd2"
> +
> +def check_params(params):
> +    """check out the arguments requried for the testcase"""
> +    logger = params['logger']
> +    keys = ['auth_unix_ro', 'auth_unix_rw']
> +    for key in keys:
> +        if key not in params:
> +            logger.error("Argument %s is required" % key)
> +            return 1
> +    return 0
> +
> +def get_output(command, flag, logger):
> +    """execute shell command
> +    """
> +    status, ret = commands.getstatusoutput(command)
> +    if not flag and status:
> +        logger.error("executing "+ "\"" +  command  + "\"" + " failed")
> +        logger.error(ret)
> +    return status, ret
It's better to move get_output() out to utils library as a public 
function, since I saw it is often called in our test repos.

- Nan
> +
> +def libvirt_configure(unix_sock_group, auth_unix_ro, auth_unix_rw, logger):
> +    """configure libvirt.conf """
> +    logger.info("configuring libvirt.conf")
> +
> +    # uncomment unix_sock_group
> +    unix_group_add = "echo 'unix_sock_group = \"%s\"'>>  %s" % \
> +                       (unix_sock_group, LIBVIRTD_CONF)
> +    status, output = get_output(unix_group_add, 0, logger)
> +    if status:
> +        logger.error("setting unix_sock_group to %s failed" % unix_sock_group)
> +        return 1
> +
> +    auth_unix_ro_add = "echo 'auth_unix_ro = \"%s\"'>>  %s" % \
> +                             (auth_unix_ro, LIBVIRTD_CONF)
> +    status, output = get_output(auth_unix_ro_add, 0, logger)
> +    if status:
> +        logger.error("setting auth_unix_ro to %s failed" % auth_unix_ro)
> +        return 1
> +
> +    auth_unix_rw_add = "echo 'auth_unix_rw = \"%s\"'>>  %s" % \
> +                         (auth_unix_rw, LIBVIRTD_CONF)
> +    status, output = get_output(auth_unix_rw_add, 0, logger)
> +    if status:
> +        logger.error("setting auth_unix_rw to %s failed" % auth_unix_rw)
> +        return 1
> +
> +    return 0
> +
> +def group_sasl_set(unix_sock_group, auth_unix_ro, auth_unix_rw, logger):
> +    """add libvirt group and set sasl authentication if needed"""
> +    logger.info("add unix socket group and sasl authentication if we need")
> +
> +    # add unix socket group
> +    libvirt_group_add = "groupadd %s" % unix_sock_group
> +    status, output = get_output(libvirt_group_add, 0, logger)
> +    if status:
> +        logger.error("failed to add %s group" % unix_sock_group)
> +        return 1
> +
> +    # add "testapi" as the testing user
> +    libvirt_user_add = "useradd -g %s %s" % (unix_sock_group, TESTING_USER)
> +    status, output = get_output(libvirt_user_add, 0, logger)
> +    if status:
> +        logger.error("failed to add %s user into group %s" % \
> +                       (TESTING_USER, unix_sock_group))
> +        return 1
> +
> +    # add sasl user
> +    if auth_unix_ro == 'sasl' or auth_unix_rw == 'sasl':
> +        saslpasswd2_add = "echo %s | %s -a libvirt %s" % \
> +                            (TESTING_USER, SASLPASSWD2, TESTING_USER)
> +        status, output = get_output(saslpasswd2_add, 0, logger)
> +        if status:
> +            logger.error("failed to set sasl user %s" % TESTING_USER)
> +            return 1
> +
> +    return 0
> +
> +def request_credentials(credentials, user_data):
> +    for credential in credentials:
> +        if credential[0] == connectAPI.VIR_CRED_AUTHNAME:
> +            credential[4] = user_data[0]
> +
> +            if len(credential[4]) == 0:
> +                credential[4] = credential[3]
> +        elif credential[0] == connectAPI.VIR_CRED_PASSPHRASE:
> +            credential[4] = user_data[1]
> +        else:
> +            return -1
> +
> +    return 0
> +
> +def hypervisor_connecting_test(uri, auth_unix_ro, auth_unix_rw, logger):
> +    """connect to hypervisor"""
> +    logger.info("connect to hypervisor")
> +    orginal_user = os.geteuid()
> +    testing_user_id = getpwnam(TESTING_USER)[2]
> +    logger.info("the testing_user id is %d" % testing_user_id)
> +
> +    logger.info("set euid to %d" % testing_user_id)
> +    os.seteuid(testing_user_id)
> +
> +    try:
> +        conn = connectAPI.ConnectAPI()
> +        if auth_unix_ro == 'none':
> +            virconn = conn.open_read_only(uri)
> +        elif auth_unix_ro == 'sasl':
> +            user_data = [TESTING_USER, TESTING_USER]
> +            auth = [[connectAPI.VIR_CRED_AUTHNAME, \
> +                     connectAPI.VIR_CRED_PASSPHRASE],
> +                    request_credentials, user_data]
> +            virconn = conn.openAuth(uri, auth, 0)
> +
> +        if auth_unix_rw == 'none':
> +            virconn = conn.open(uri)
> +        elif auth_unix_rw == 'sasl':
> +            user_data = [TESTING_USER, TESTING_USER]
> +            auth = [[connectAPI.VIR_CRED_AUTHNAME, \
> +                     connectAPI.VIR_CRED_PASSPHRASE],
> +                    request_credentials, user_data]
> +            virconn = conn.openAuth(uri, auth, 0)
> +        conn.close()
> +    except LibvirtAPI, e:
> +        logger.error("API error message: %s, error code is %s" % \
> +                     (e.response()['message'], e.response()['code']))
> +        logger.info("set euid back to %d" % orginal_user)
> +        os.seteuid(orginal_user)
> +        conn.close()
> +        return 1
> +
> +    logger.info("set euid back to %d" % orginal_user)
> +    os.seteuid(orginal_user)
> +    return 0
> +
> +def unix_perm_sasl(params):
> +    """ test unix socket group function and sasl authentication"""
> +    logger = params['logger']
> +    params_check_result = check_params(params)
> +    if params_check_result:
> +        return 1
> +
> +    auth_unix_ro = params['auth_unix_ro']
> +    auth_unix_rw = params['auth_unix_rw']
> +
> +    unix_sock_group = 'libvirt'
> +    if params.has_key('unix_sock_group'):
> +        unix_sock_group = params['unix_sock_group']
> +
> +    uri = "qemu:///system"
> +
> +
> +    if group_sasl_set(unix_sock_group, auth_unix_ro, auth_unix_rw, logger):
> +        return 1
> +
> +    if libvirt_configure(unix_sock_group, auth_unix_ro, auth_unix_rw, logger):
> +        return 1
> +
> +    if hypervisor_connecting_test(uri, auth_unix_ro, auth_unix_rw, logger):
> +        return 1
> +
> +    return 0
> +
> +def unix_perm_sasl_clean(params):
> +    """clean testing environment"""
> +    logger = params['logger']
> +
> +    auth_unix_ro = params['auth_unix_ro']
> +    auth_unix_rw = params['auth_unix_rw']
> +
> +    unix_sock_group = 'libvirt'
> +    if params.has_key('unix_sock_group'):
> +        unix_sock_group = params['unix_sock_group']
> +
> +    # delete "testapi" user
> +    libvirt_user_del = "userdel %s" % TESTING_USER
> +    status, output = get_output(libvirt_user_del, 0, logger)
> +    if status:
> +        logger.error("failed to del %s user into group %s" % TESTING_USER)
> +
> +    # delete unix socket group
> +    libvirt_group_del = "groupdel %s" % unix_sock_group
> +    status, output = get_output(libvirt_group_del, 0, logger)
> +    if status:
> +        logger.error("failed to del %s group" % unix_sock_group)
> +
> +    # delete sasl user
> +    if auth_unix_ro == 'sasl' or auth_unix_rw == 'sasl':
> +        saslpasswd2_delete = "%s -a libvirt -d %s" % (SASLPASSWD2, TESTING_USER)
> +        status, output = get_output(saslpasswd2_delete, 0, logger)
> +        if status:
> +            logger.error("failed to delete sasl user %s" % TESTING_USER)
> +




More information about the libvir-list mailing list