[libvirt] [libvirt-test-API][PATCH 2/2] Add securitylabel test case
hongming
honzhang at redhat.com
Wed Mar 11 08:03:46 UTC 2015
ACK and Pushed
Thanks
hongming
On 02/15/2015 04:12 PM, jiahu wrote:
> 2 new APIs securityLabel and securityLabelList will be covered in
> securitylabel.py
> ---
> repos/domain/securitylabel.py | 170 ++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 170 insertions(+)
> create mode 100644 repos/domain/securitylabel.py
>
> diff --git a/repos/domain/securitylabel.py b/repos/domain/securitylabel.py
> new file mode 100644
> index 0000000..cf4aaf3
> --- /dev/null
> +++ b/repos/domain/securitylabel.py
> @@ -0,0 +1,170 @@
> +#!/usr/bin/env python
> +# test securityLabel() and securityLabelList() API for libvirt
> +
> +import libvirt
> +
> +from libvirt import libvirtError
> +from src import sharedmod
> +from utils import utils
> +
> +required_params = ('guestname',)
> +optional_params = {}
> +
> +def check_qemu_conf(logger):
> + """
> + If security_driver is not equal to "selinux", report an error
> + """
> + GREP = "grep \"^security_driver\" /etc/libvirt/qemu.conf"
> + status, output = utils.exec_cmd(GREP, shell=True)
> + if status:
> + return True
> + else:
> + if "selinux" in output[0]:
> + return True
> + else:
> + logger.error("Not a default setting in qemu.conf")
> + return False
> +
> +def get_security_policy(logger):
> + """
> + get selinux type from host OS
> + """
> + SELINUX = "getenforce"
> + status, output = utils.exec_cmd(SELINUX, shell=True)
> + if not status:
> + if output[0] == "Enforcing":
> + sevalue = True
> + elif output[0] == "Permissive":
> + sevalue = False
> + elif output[0] == "Disabled":
> + sevalue = False
> + else:
> + logger.error("Can not find any results")
> + else:
> + logger.error("\"" + SELINUX + "\"" + "error")
> + logger.error(output)
> + return False
> + return sevalue
> +
> +def get_pid(name,logger):
> + """
> + get process id of specified domain.
> + """
> + PID = "ps aux |grep -v grep | grep \" -name %s\" \
> + |awk '{print $2}'"
> + status, output = utils.exec_cmd(PID % name, shell=True)
> + if not status:
> + pass
> + else:
> + logger.error("\"" + PID + "\"" + "error")
> + logger.error(output)
> + return False
> + return output[0]
> +
> +def get_pid_context(domain,logger):
> + """
> + return context of domain's pid
> + """
> + pid = get_pid(domain,logger)
> + CONTEXT = "ls -nZd /proc/%s"
> + status, output = utils.exec_cmd(CONTEXT % pid, shell=True)
> + if not status:
> + pass
> + else:
> + logger.error("\"" + CONTEXT + "\"" + "error")
> + logger.error(output)
> + return False
> + return pid,output[0]
> +
> +def check_selinux_label(api,domain,logger):
> + """
> + check vaules in selinux mode
> + """
> + pid,context = get_pid_context(domain,logger)
> + logger.debug("The context of %d is %s" % (int(pid), context))
> + get_enforce = get_security_policy(logger)
> + if api[0] in context:
> + if api[1] == get_enforce:
> + logger.debug("PASS: '%s'" % api)
> + return True
> + else:
> + logger.debug("Fail: '%s'" % api[1])
> + return False
> + else:
> + logger.debug("Fail: '%s'" % api[0])
> + return False
> +
> +def check_DAC_label(api,domain,logger):
> + """
> + check vaules in DAC mode
> + """
> + tmp = []
> + pid,context = get_pid_context(domain,logger)
> + logger.debug("The context of %d is %s" % (int(pid), context))
> + #enforcing is always false in DAC mode
> + for item in api:
> + tmp.append(item)
> + get_enforce = False
> + tmp1 = tmp[0].strip().replace("+","")
> + tmp[0] = tmp1.split(':')
> + tmp1 = context.split()
> + context = str(tmp1.pop(1) +" "+ tmp1.pop(1)).split()
> + if tmp[0] == context:
> + if tmp[1] == get_enforce:
> + logger.debug("PASS: '%s'" % api)
> + return True
> + else:
> + logger.debug("Fail: '%s'" % api[1])
> + return False
> + else:
> + logger.debug("Fail: '%s'" % api[0])
> + return False
> +
> +def securitylabel(params):
> + """
> + test APIs for securityLabel and securityLabelList in class virDomain
> + """
> + logger = params['logger']
> + domain_name = params['guestname']
> + if not check_qemu_conf(logger):
> + return 1
> + try:
> + conn = sharedmod.libvirtobj['conn']
> +
> + if conn.lookupByName(domain_name):
> + dom = conn.lookupByName(domain_name)
> + else:
> + logger.error("Domain %s is not exist" % domain_name)
> + return 1
> + if not dom.isActive():
> + logger.error("Domain %s is not running" % domain_name)
> + return 1
> +
> + first_label_api = dom.securityLabel()
> + logger.info("The first lable is %s" % first_label_api)
> +
> + if check_selinux_label(first_label_api, domain_name, logger):
> + logger.info("PASS, %s" % first_label_api)
> + else:
> + logger.error("FAIL, %s" % first_label_api)
> + return 1
> +
> + all_label_api = dom.securityLabelList()
> + logger.info("The all lable is %s" % all_label_api)
> + if check_selinux_label(all_label_api[0], domain_name, logger):
> + logger.info("PASS, %s" % all_label_api[0])
> + else:
> + logger.error("FAIL, %s" % all_label_api[0])
> + return 1
> +
> + if check_DAC_label(all_label_api[1], domain_name, logger):
> + logger.info("PASS, %s" % all_label_api[1])
> + else:
> + logger.error("FAIL, %s" % all_label_api[1])
> + return 1
> +
> + except libvirtError, e:
> + logger.error("API error message: %s" % e.message)
> + return 1
> +
> + return 0
More information about the libvir-list
mailing list