Skip to main content

How to write an Ansible plugin to create inventory files

Take advantage of Ansible's ecosystem to write inventory files for your playbooks.
Image
Plugs in an extension cord that look like a smiling robot

Photo by Steve Johnson on Pexels

In my previous articles in this series, I wrote about dynamic Ansible inventories and how to write a very flexible Python script that uses Nmap results to create them.

But there are a few reasons why you might want to use an Ansible plugin rather than a Python script to create inventory files:

  1. You want to standardize the language used to write provisioning tools. It is great if your team knows how to write Perl, Ruby, Python, Go, Rust, and others, but can you ensure that all your members are proficient in all of these languages? It pays off to stick to a few tools and master them.
  2. This may be a "don't repeat yourself" (DRY) situation. Ansible plugins give you many things for free, like services for caching, encryption, and configuration management.
  3. An Ansible inventory plugin is expected to live in specific locations. This makes it predictable and easier to distribute to other servers or share with other teams.

Here, I'll cover the third approach to tackling dynamic inventories: writing an Ansible plugin and still focusing on Nmap as the discovery tool. I'll continue to encourage good practices for packaging tools, using virtual environments, and unit testing the code.

Write an Ansible module

The idea is to take advantage of the Ansible ecosystem for common tasks like execution and caching, as explained in Ansible's documentation.

I will use the parser and Nmap wrapper I wrote in the previous article, so the module file will also have those classes embedded.

[ Download A guide to implementing DevSecOps. ]

Add Ansible as a dependency in requirements.txt to make development easier. This provides things like auto-completion:

setuptools>=60.5.0
build>=0.7.0
packaging==21.3
wheel==0.37.1
pip-audit==2.0.0
ansible==5.4.0

Next, install dependencies (Ansible is a heavy package, so you should go a grab a coffee):

# Also you can:
# pip install ansible==5.4.0
pip install -r requirements.txt

Create the module

To keep the dependencies simple for this tutorial, I included the OutputParser and NmapRunner together in the module nmap_plugin, where the new plugin class is NmapInventoryModule. It looks like this:

"""
A simple inventory plugin that uses Nmap to get the list of hosts
Jose Vicente Nunez (kodegeek.com@protonmail.com)
"""

import os.path
from subprocess import CalledProcessError
import os
import shlex
import shutil
import subprocess
from typing import List, Dict, Any
from xml.etree import ElementTree
# The imports below are the ones required for an Ansible plugin
from ansible.errors import AnsibleParserError
from ansible.plugins.inventory import BaseInventoryPlugin, Cacheable, Constructable

DOCUMENTATION = r'''
    name: nmap_plugin
    plugin_type: inventory
    short_description: Returns a dynamic host inventory from Nmap scan
    description: Returns a dynamic host inventory from Nmap scan, filter machines that can be accessed with SSH
    options:
      plugin:
          description: Name of the plugin
          required: true
          choices: ['nmap_plugin']
      address:
        description: Address to scan, in Nmap supported format
        required: true
'''


class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):

    NAME = 'nmap_plugin'

    def __init__(self):
        super(InventoryModule, self).__init__()
        self.address = None
        self.plugin = None

    def verify_file(self, path: str):
        if super(InventoryModule, self).verify_file(path):
            return path.endswith('yaml') or path.endswith('yml')
        return False

    def parse(self, inventory: Any, loader: Any, path: Any, cache: bool = True) -> Any:
        super(InventoryModule, self).parse(inventory, loader, path, cache)
        self._read_config_data(path)  # This also loads the cache
        try:
            self.plugin = self.get_option('plugin')
            self.address = self.get_option('address')
            hosts_data = list(NmapRunner(self.address))
            if not hosts_data:
                raise AnsibleParserError("Unable to get data for Nmap scan!")
            for host_data in hosts_data:
                for name, address in host_data.items():
                    self.inventory.add_host(name)
                    self.inventory.set_variable(name, 'ip', address)
        except KeyError as kerr:
            raise AnsibleParserError(f'Missing required option on the configuration file: {path}', kerr)
        except CalledProcessError as cpe:
            raise AnsibleParserError("There was an error while calling Nmap", cpe)


class OutputParser:
    def __init__(self, xml: str):
        self.xml = xml

    def get_addresses(self) -> List[Dict[str, str]]:
        """
        Several things need to happen for an address to be included:
        1. Host is up
        2. Port is TCP 22
        3. Port status is open
        4. Uses IPv4
        """
        addresses = []
        root = ElementTree.fromstring(self.xml)
        for host in root.findall('host'):
            name = None
            for hostnames in host.findall('hostnames'):
                for hostname in hostnames:
                    name = hostname.attrib['name']
                    break
            if not name:
                continue
            is_up = True
            for status in host.findall('status'):
                if status.attrib['state'] == 'down':
                    is_up = False
                    break
            if not is_up:
                continue
            port_22_open = False
            for ports in host.findall('ports'):
                for port in ports.findall('port'):
                    if port.attrib['portid'] == '22':
                        for state in port.findall('state'):
                            if state.attrib['state'] == "open":  # Up not the same as open, we want SSH access!
                                port_22_open = True
                                break
            if not port_22_open:
                continue
            address = None
            for address_data in host.findall('address'):
                address = address_data.attrib['addr']
                break
            addresses.append({name: address})
        return addresses


class NmapRunner:

    def __init__(self, hosts: str):
        self.nmap_report_file = None
        found_nmap = shutil.which('nmap', mode=os.F_OK | os.X_OK)
        if not found_nmap:
            raise ValueError("Nmap binary is missing!")
        self.nmap = found_nmap
        self.hosts = hosts

    def __iter__(self):
        command = [self.nmap]
        command.extend(__NMAP__FLAGS__)
        command.append(self.hosts)
        completed = subprocess.run(
            command,
            capture_output=True,
            shell=False,
            check=True
        )
        completed.check_returncode()
        out_par = OutputParser(completed.stdout.decode('utf-8'))
        self.addresses = out_par.get_addresses()
        return self

    def __next__(self):
        try:
            return self.addresses.pop()
        except IndexError:
            raise StopIteration


"""
Convert the args for proper usage on the Nmap CLI
Also, do not use the -n flag. We need to resolve IP addresses to hostname, even if we sacrifice a little bit of speed
"""
NMAP_DEFAULT_FLAGS = {
    '-p22': 'Port 22 scanning',
    '-T4': 'Aggressive timing template',
    '-PE': 'Enable this echo request behavior. Good for internal networks',
    '--disable-arp-ping': 'No ARP or ND Ping',
    '--max-hostgroup 50': 'Hostgroup (batch of hosts scanned concurrently) size',
    '--min-parallelism 50': 'Number of probes that may be outstanding for a host group',
    '--osscan-limit': 'Limit OS detection to promising targets',
    '--max-os-tries 1': 'Maximum number of OS detection tries against a target',
    '-oX -': 'Send XML output to STDOUT, avoid creating a temp file'
}
__NMAP__FLAGS__ = shlex.split(" ".join(NMAP_DEFAULT_FLAGS.keys()))

Things to notice on the InventoryModule:

  • If some of these classes look familiar, it's because I reused the Nmap wrapper and XML parsing I wrote for the dynamic inventory script in the last article.
  • The method verify_file doesn't need to be implemented, but it is a good idea. It decides if a configuration file is good enough to use.
  • The plugin class requires the parse method to be implemented. This is where Nmap is called, XML output is parsed, and the inventory is populated.
  • It uses multiple inheritances, and because of that, you get a few things for free, like configuration parsing and caching.
  • All the exceptions coming from this module must be wrapped around an AnsibleParserError.

The configuration file is in place from the previous exercise.

Deploy the module

Next, deploy the module where Ansible can find it:

$ ansible-config dump|grep DEFAULT_INVENTORY_PLUGIN_PATH
DEFAULT_INVENTORY_PLUGIN_PATH(default) = ['/home/josevnz/.ansible/plugins/inventory', '/usr/share/ansible/plugins/inventory']
/bin/mkdir --parents --verbose /home/josevnz/.ansible/plugins/inventory/
/bin/cp -p -v Inventories/inventories/nmap_plugin.py /home/josevnz/.ansible/plugins/inventory/

Finally, define an inventory file that uses the new plugin nmap_plugin_inventory.yaml (test/nmap_plugin_inventory.yaml):

# Sample configuration file for custom nmap_plugin. Yes, it is the same file we used for tye dynamic inventory script
---
plugin: nmap_plugin
address: 192.168.1.0/24

Test it

It's time to test the new module:

# Does Ansible recognize it?
$ ansible-doc -t inventory -l|grep nmap_plugin
nmap_plugin         Returns a dynamic host inventory from Nmap scan
# Smoke test, check if we get any host listed
(ExtendingAnsibleWithPythonInventory) [josevnz@dmaf5 Inventories]$ ansible-inventory --inventory $PWD/test/nmap_plugin_inventory.yaml  --list -v -v -v
[josevnz@dmaf5 ExtendingAnsibleWithPython]$ ansible-inventory --inventory Inventories/test/nmap_plugin_inventory.yaml --list
{
    "_meta": {
        "hostvars": {
            "dmaf5.home": {
                "ip": "192.168.1.25"
            },
            "macmini2": {
                "ip": "192.168.1.16"
            },
            "raspberrypi": {
                "ip": "192.168.1.11"
            }
        }
    },
    "all": {
        "children": [
            "ungrouped"
        ]
    },
    "ungrouped": {
        "hosts": [
            "dmaf5.home",
            "macmini2",
            "raspberrypi"
        ]
    }
}

The results are the same as you got with the dynamic inventory plugin. But if you enable other functionality, such as caching results (not covered here), you will see benefits like increased speed on inventory generation. (Things like this are huge if you have a large number of hosts.)

What's next?

In this tutorial, you created an inventory plugin, taking advantage of the Ansible environment to build a network scanner without too much boilerplate code. It is more rigid than the dynamic inventory script, but you get several services for free, like caching and configuration file parsing.

But there is more to learn! Now that you know at least three ways to handle dynamic inventories, check out the following:

Remember, you can download the code and experiment. The best way to learn is by doing and making mistakes.

Topics:   Ansible   Programming  
Author’s photo

Jose Vicente Nunez

Proud dad and husband, software developer and sysadmin. Recreational runner and geek. More about me

Try Red Hat Enterprise Linux

Download it at no charge from the Red Hat Developer program.