[libvirt] [libvirt-test-API][PATCH 2/2] Add securitylabel test case

hongming honzhang at redhat.com
Wed Mar 11 08:03:46 UTC 2015


ACK and Pushed

Thanks
hongming

On 02/15/2015 04:12 PM, jiahu wrote:
> 2 new APIs securityLabel and securityLabelList will be covered in
> securitylabel.py
> ---
>   repos/domain/securitylabel.py | 170 ++++++++++++++++++++++++++++++++++++++++++
>   1 file changed, 170 insertions(+)
>   create mode 100644 repos/domain/securitylabel.py
>
> diff --git a/repos/domain/securitylabel.py b/repos/domain/securitylabel.py
> new file mode 100644
> index 0000000..cf4aaf3
> --- /dev/null
> +++ b/repos/domain/securitylabel.py
> @@ -0,0 +1,170 @@
> +#!/usr/bin/env python
> +# test securityLabel() and securityLabelList() API for libvirt
> +
> +import libvirt
> +
> +from libvirt import libvirtError
> +from src import sharedmod
> +from utils import utils
> +
> +required_params = ('guestname',)
> +optional_params = {}
> +
> +def check_qemu_conf(logger):
> +    """
> +       If security_driver is not equal to "selinux", report an error
> +    """
> +    GREP = "grep \"^security_driver\" /etc/libvirt/qemu.conf"
> +    status, output = utils.exec_cmd(GREP, shell=True)
> +    if status:
> +        return True
> +    else:
> +        if "selinux" in output[0]:
> +            return True
> +        else:
> +            logger.error("Not a default setting in qemu.conf")
> +            return False
> +
> +def get_security_policy(logger):
> +    """
> +       get selinux type from host OS
> +    """
> +    SELINUX = "getenforce"
> +    status, output = utils.exec_cmd(SELINUX, shell=True)
> +    if not status:
> +        if output[0] == "Enforcing":
> +            sevalue = True
> +        elif output[0] == "Permissive":
> +            sevalue = False
> +        elif output[0] == "Disabled":
> +            sevalue = False
> +        else:
> +            logger.error("Can not find any results")
> +    else:
> +        logger.error("\"" + SELINUX + "\"" + "error")
> +        logger.error(output)
> +        return False
> +    return sevalue
> +
> +def get_pid(name,logger):
> +    """
> +       get process id of specified domain.
> +    """
> +    PID = "ps aux |grep -v grep | grep \" -name %s\" \
> +           |awk '{print $2}'"
> +    status, output = utils.exec_cmd(PID % name, shell=True)
> +    if not status:
> +        pass
> +    else:
> +        logger.error("\"" + PID + "\"" + "error")
> +        logger.error(output)
> +        return False
> +    return output[0]
> +
> +def get_pid_context(domain,logger):
> +    """
> +       return context of domain's pid
> +    """
> +    pid = get_pid(domain,logger)
> +    CONTEXT = "ls -nZd /proc/%s"
> +    status, output = utils.exec_cmd(CONTEXT % pid, shell=True)
> +    if not status:
> +        pass
> +    else:
> +        logger.error("\"" + CONTEXT + "\"" + "error")
> +        logger.error(output)
> +        return False
> +    return pid,output[0]
> +
> +def check_selinux_label(api,domain,logger):
> +    """
> +       check vaules in selinux mode
> +    """
> +    pid,context = get_pid_context(domain,logger)
> +    logger.debug("The context of %d is %s" % (int(pid), context))
> +    get_enforce = get_security_policy(logger)
> +    if api[0] in context:
> +        if api[1] == get_enforce:
> +            logger.debug("PASS: '%s'" % api)
> +            return True
> +        else:
> +            logger.debug("Fail: '%s'" % api[1])
> +            return False
> +    else:
> +         logger.debug("Fail: '%s'" % api[0])
> +         return False
> +
> +def check_DAC_label(api,domain,logger):
> +    """
> +       check vaules in DAC mode
> +    """
> +    tmp = []
> +    pid,context = get_pid_context(domain,logger)
> +    logger.debug("The context of %d is %s" % (int(pid), context))
> +    #enforcing is always false in DAC mode
> +    for item in api:
> +         tmp.append(item)
> +    get_enforce = False
> +    tmp1 = tmp[0].strip().replace("+","")
> +    tmp[0] = tmp1.split(':')
> +    tmp1 = context.split()
> +    context = str(tmp1.pop(1) +" "+ tmp1.pop(1)).split()
> +    if tmp[0] == context:
> +        if tmp[1] == get_enforce:
> +            logger.debug("PASS: '%s'" % api)
> +            return True
> +        else:
> +            logger.debug("Fail: '%s'" % api[1])
> +            return False
> +    else:
> +         logger.debug("Fail: '%s'" % api[0])
> +         return False
> +
> +def securitylabel(params):
> +    """
> +       test APIs for securityLabel and securityLabelList in class virDomain
> +    """
> +    logger = params['logger']
> +    domain_name = params['guestname']
> +    if not check_qemu_conf(logger):
> +       return 1
> +    try:
> +        conn = sharedmod.libvirtobj['conn']
> +
> +        if conn.lookupByName(domain_name):
> +           dom = conn.lookupByName(domain_name)
> +        else:
> +           logger.error("Domain %s is not exist" % domain_name)
> +           return 1
> +        if not dom.isActive():
> +           logger.error("Domain %s is not running" % domain_name)
> +           return 1
> +
> +        first_label_api = dom.securityLabel()
> +        logger.info("The first lable is %s" % first_label_api)
> +
> +        if check_selinux_label(first_label_api, domain_name, logger):
> +            logger.info("PASS, %s" % first_label_api)
> +        else:
> +            logger.error("FAIL, %s" % first_label_api)
> +            return 1
> +
> +        all_label_api = dom.securityLabelList()
> +        logger.info("The all lable is %s" % all_label_api)
> +        if check_selinux_label(all_label_api[0], domain_name, logger):
> +            logger.info("PASS, %s" % all_label_api[0])
> +        else:
> +            logger.error("FAIL, %s" % all_label_api[0])
> +            return 1
> +
> +        if check_DAC_label(all_label_api[1], domain_name, logger):
> +            logger.info("PASS, %s" % all_label_api[1])
> +        else:
> +            logger.error("FAIL, %s" % all_label_api[1])
> +            return 1
> +
> +    except libvirtError, e:
> +        logger.error("API error message: %s" % e.message)
> +        return 1
> +
> +    return 0




More information about the libvir-list mailing list