Skip to main content

How to write a Python script to create dynamic Ansible inventories

Write a script in Python that fetches hosts using Nmap to generate dynamic inventories.
Image
Man listening to music while he works

Photo by Eren Li from Pexels

In my previous article, I explained why using a static inventory to handle your Ansible playbooks is not very convenient. Here are the reasons:

[ Download now: A system administrator's guide to IT automation. ]

In my last article, I used the host_list and Nmap plugins to generate a dynamic inventory to cover that gap. This article covers how to write your own dynamic inventory script using Python while following good practices for packaging tools, using virtual environments, and unit testing your code.

Enter the world of dynamic inventory

The Ansible documentation explains several ways to generate dynamic inventories; I decided to write a simple Python script that is a frontend to the Nmap command.

Why might you choose to write your own dynamic script?

  • You have legacy code written in a language other than Python (Java, Perl, Bash, Ruby, Go—the sky is the limit), and you want to reuse that logic to generate the host list.
  • You or your team are proficient in a specific language (such as Bash or Ruby). The Ansible dynamic script is flexible enough that you can write your plugin in the language of your choice.

To illustrate this, I will write a script that fetches hosts using Nmap.

Build the foundation

The first step is building the code foundation, running the Nmap command line interface, and parsing the results in XML format.

The foundation is a wrapper around the Nmap command that looks like this:

  1. NmapRunner executes the Nmap command with the desired flags and captures the XML output.
  2. OutputParser parses the XML and returns just the IP addresses needed.
  3. NmapRunner implements an iterator, so you can process each address any way you see fit.

Here is the Python code:

import os
import shlex
import shutil
import subprocess
from typing import List, Dict
from xml.etree import ElementTree


class OutputParser:
    def __init__(self, xml: str):
        self.xml = xml

    def get_addresses(self) -> List[Dict[str, str]]:
        """
        Several things need to happen for an address to be included:
        1. Host is up
        2. Port is TCP 22
        3. Port status is open
        Otherwise the iterator will not be filled
        :return:
        """
        addresses = []
        root = ElementTree.fromstring(self.xml)
        for host in root.findall('host'):
            name = None
            for hostnames in host.findall('hostnames'):
                for hostname in hostnames:
                    name = hostname.attrib['name']
                    break
            if not name:
                continue
            is_up = True
            for status in host.findall('status'):
                if status.attrib['state'] == 'down':
                    is_up = False
                    break
            if not is_up:
                continue
            port_22_open = False
            for ports in host.findall('ports'):
                for port in ports.findall('port'):
                    if port.attrib['portid'] == '22':
                        for state in port.findall('state'):
                            if state.attrib['state'] == "open":  # Up not the same as open, we want SSH access!
                                port_22_open = True
                                break
            if not port_22_open:
                continue
            address = None
            for address_data in host.findall('address'):
                address = address_data.attrib['addr']
                break
            addresses.append({name: address})
        return addresses


class NmapRunner:

    def __init__(self, hosts: str):
        self.nmap_report_file = None
        found_nmap = shutil.which('nmap', mode=os.F_OK | os.X_OK)
        if not found_nmap:
            raise ValueError(f"Nmap is missing!")
        self.nmap = found_nmap
        self.hosts = hosts

    def __iter__(self):
        command = [self.nmap]
        command.extend(__NMAP__FLAGS__)
        command.append(self.hosts)
        completed = subprocess.run(
            command,
            capture_output=True,
            shell=False,
            check=True
        )
        completed.check_returncode()
        out_par = OutputParser(completed.stdout.decode('utf-8'))
        self.addresses = out_par.get_addresses()
        return self

    def __next__(self):
        try:
            return self.addresses.pop()
        except IndexError:
            raise StopIteration


"""
Convert the args for proper usage on the Nmap CLI
Also, do not use the -n flag. We need to resolve IP addresses to hostname, even if we sacrifice a little bit of speed
"""
NMAP_DEFAULT_FLAGS = {
    '-p22': 'Port 22 scanning',
    '-T4': 'Aggressive timing template',
    '-PE': 'Enable this echo request behavior. Good for internal networks',
    '--disable-arp-ping': 'No ARP or ND Ping',
    '--max-hostgroup 50': 'Hostgroup (batch of hosts scanned concurrently) size',
    '--min-parallelism 50': 'Number of probes that may be outstanding for a host group',
    '--osscan-limit': 'Limit OS detection to promising targets',
    '--max-os-tries 1': 'Maximum number of OS detection tries against a target',
    '-oX -': 'Send XML output to STDOUT, avoid creating a temp file'
}
__NMAP__FLAGS__ = shlex.split(" ".join(NMAP_DEFAULT_FLAGS.keys()))

For example, you could use the NmapRunner like this:

import pprint
def test_iter():
    for hosts_data in NmapRunner("192.168.1.0/24"):
        pprint.print(hosts_data)

Believe it or not, this is the most challenging portion of writing the Python script. The next part requires writing a script that follows Ansible requirements for dynamic inventory scripts.

[ Learn more: Ansible vs. Terraform, clarified ]

Write an inventory script

Ansible's documentation is very clear about the inventory script's requirements:

  1. It must support --list and --host excluding flags.
  2. It must return JSON in a format that Ansible can understand.
  3. Other flags can be added, but Ansible will not use them.

But wait a second. There is nothing in there that says Ansible provides the network to scan for hosts, so how do you inject that?

Simple! The script will read a YAML configuration file from a predefined location, like /home/josevnz/.ansible/plugins/cliconf/nmap_plugin.yaml with the following code:

# Sample configuration file. Suspiciously similar to the official Nmap plugin configuration file
---
plugin: nmap_plugin
address: 192.168.1.0/24

The class that reads the YAML configuration file is quite simple:

"""
Using a configuration file in YAML format, so it can be reused by the plugin.
Init file with ConfigParser is more convenient, trying to keep Ansible happy :wink:
"""
import os
from yaml import safe_load

try:
    from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
    from yaml import Loader, Dumper

def load_config(config_file: str = os.path.expanduser("~/.ansible/plugins/cliconf/nmap_inventory.cfg")):
    """
    Where to copy the configuration file:
    ```shell
    [josevnz@dmaf5 EnableSysadmin]$ ansible-config dump |grep DEFAULT_CLICONF_PLUGIN_PATH
    DEFAULT_CLICONF_PLUGIN_PATH(default) = ['/home/josevnz/.ansible/plugins/cliconf', '/usr/share/ansible/plugins/cliconf']
    ```
    :param config_file:
    :return:
    """
    with open(config_file, 'r') as stream:
        data = safe_load(stream)
        return data

Very good. Here is the dynamic inventory script code now:

!/usr/bin/env python
"""
# nmap_inventory.py - Generates an Ansible dynamic inventory using NMAP
# Author
Jose Vicente Nunez Zuleta (kodegeek.com@protonmail.com)
"""
import json
import os.path
import argparse
from configparser import ConfigParser, MissingSectionHeaderError

from inventories.nmap import NmapRunner

def load_config() -> ConfigParser:
    cp = ConfigParser()
    try:
        config_file = os.path.expanduser("~/.config/nmap_inventory.cfg")
        cp.read(config_file)
        if not cp.has_option('DEFAULT', 'Addresses'):
            raise ValueError("Missing configuration option: DEFAULT -> Addresses")
    except MissingSectionHeaderError as mhe:
        raise ValueError("Invalid or missing configuration file:", mhe)
    return cp


def get_empty_vars():
    return json.dumps({})


def get_list(search_address: str, pretty=False) -> str:
    """
    All group is always returned
    Ungrouped at least contains all the names found
    IP addresses are added as vars in the __meta tag, for efficiency as mentioned in the Ansible documentation.
    Note than we can add logic here to put machines in custom groups, will keep it simple for now.
    :param search_address: Results of the scan with Nmap
    :param pretty: Indentation
    :return: JSON string
    """
    found_data = list(NmapRunner(search_address))
    hostvars = {}
    ungrouped = []
    for host_data in found_data:
        for name, address in host_data.items():
            if name not in ungrouped:
                ungrouped.append(name)
            if name not in hostvars:
                hostvars[name] = {'ip': []}
            hostvars[name]['ip'].append(address)
    data = {
        '_meta': {
          'hostvars': hostvars
        },
        'all': {
            'children': [
                'ungrouped'
            ]
        },
        'ungrouped': {
            'hosts': ungrouped
        }
    }
    return json.dumps(data, indent=pretty)

if __name__ == '__main__':

    arg_parser = argparse.ArgumentParser(
        description=__doc__,
        prog=__file__
    )
    arg_parser.add_argument(
        '--pretty',
        action='store_true',
        default=False,
        help="Pretty print JSON"
    )
    mandatory_options = arg_parser.add_mutually_exclusive_group()
    mandatory_options.add_argument(
        '--list',
        action='store',
        nargs="*",
        default="dummy",
        help="Show JSON of all managed hosts"
    )
    mandatory_options.add_argument(
        '--host',
        action='store',
        help="Display vars related to the host"
    )

    try:
        config = load_config()
        addresses = config.get('DEFAULT', 'Addresses')

        args = arg_parser.parse_args()
        if args.host:
            print(get_empty_vars())
        elif len(args.list) >= 0:
            print(get_list(addresses, args.pretty))
        else:
            raise ValueError("Expecting either --host $HOSTNAME or --list")

    except ValueError:
        raise

You probably noticed a few things:

  1. Most of the code in this script is dedicated to handling arguments and loading configurations, besides presenting the JSON.
  2. You could add grouping logic into get_list. For now, I'm populating the two required default groups.

[ Looking for more on system automation? Get started with The Automated Enterprise, a free book from Red Hat. ]

Give it a try

It is time to kick the tires. Install the code first:

$ git clone git@github.com:josevnz/ExtendingAnsibleWithPython.git
$ cd ExtendingAnsibleWithPython/Inventory
$ python3 -m venv ~/virtualenv/ExtendingAnsibleWithPythonInventory
. ~/virtualenv/ExtendingAnsibleWithPythonInventory/bin/activate
$ pip install wheel
$ pip install --upgrade pip
$ pip install build
$ python setup.py bdist_wheel
$ pip install dist/*

The virtual environment should be active now. See if you get an empty host information result (use the name of a machine in your network):

$ ansible-inventory --inventory scripts/nmap_inventory.py --host raspberrypi
{}

Good, the empty JSON is expected, as you did not implement the --host $HOSTNAME method. What about --list?

$ ansible-inventory --inventory scripts/nmap_inventory.py --list
{
    "_meta": {
        "hostvars": {
            "dmaf5.home": {
                "ip": [
                    "192.168.1.26",
                    "192.168.1.25"
                ]
            },
            "macmini2": {
                "ip": [
                    "192.168.1.16"
                ]
            },
            "raspberrypi": {
                "ip": [
                    "192.168.1.11"
                ]
            }
        }
    },
    "all": {
        "children": [
            "ungrouped"
        ]
    },
    "ungrouped": {
        "hosts": [
            "dmaf5.home",
            "macmini2",
            "raspberrypi"
        ]
    }
}

Finally, try the new inventory with the ping module:

$ ansible --inventory scripts/nmap_inventory.py --user josevnz -m ping all
dmaf5.home | SUCCESS => {
    "changed": false,
    "ping": "pong"
}
raspberrypi | SUCCESS => {
    "changed": false,
    "ping": "pong"
}
macmini2 | SUCCESS => {
    "changed": false,
    "ping": "pong"
}

What's next?

This article covered a lot of material. Here is a summary of what you did:

  • Wrote utility classes to call Nmap and parse the scan results
  • Reused those classes inside a script that complies with the Ansible inventory requirements so that it can be used to create a dynamic inventory

This is probably the most flexible option for creating dynamic inventories in terms of coding, as the requirements are pretty loose and can be done in any language.

But is it the right way? In the last part of this article series, I'll show you why it might be better to write an Ansible plugin instead of using an inventory script.

Remember, you can download the code and experiment! The best way to learn is by doing and making mistakes.

Topics:   Ansible   Scripting   Python   Programming  
Author’s photo

Jose Vicente Nunez

Proud dad and husband, software developer and sysadmin. Recreational runner and geek. More about me

Try Red Hat Enterprise Linux

Download it at no charge from the Red Hat Developer program.