Image
How to write a Python script to create dynamic Ansible inventories
Write a script in Python that fetches hosts using Nmap to generate dynamic inventories.
In my previous article, I explained why using a static inventory to handle your Ansible playbooks is not very convenient. Here are the reasons:
- The inventory can be quite large.
- The devices in the inventory that require automation may change frequently (for example, a pool of Linux laptops that use DHCP).
[ Download now: A system administrator's guide to IT automation. ]
In my last article, I used the host_list and Nmap plugins to generate a dynamic inventory to cover that gap. This article covers how to write your own dynamic inventory script using Python while following good practices for packaging tools, using virtual environments, and unit testing your code.
Enter the world of dynamic inventory
The Ansible documentation explains several ways to generate dynamic inventories; I decided to write a simple Python script that is a frontend to the Nmap command.
Why might you choose to write your own dynamic script?
- You have legacy code written in a language other than Python (Java, Perl, Bash, Ruby, Go—the sky is the limit), and you want to reuse that logic to generate the host list.
- You or your team are proficient in a specific language (such as Bash or Ruby). The Ansible dynamic script is flexible enough that you can write your plugin in the language of your choice.
To illustrate this, I will write a script that fetches hosts using Nmap.
Build the foundation
The first step is building the code foundation, running the Nmap command line interface, and parsing the results in XML format.
The foundation is a wrapper around the Nmap command that looks like this:
- NmapRunner executes the Nmap command with the desired flags and captures the XML output.
- OutputParser parses the XML and returns just the IP addresses needed.
- NmapRunner implements an iterator, so you can process each address any way you see fit.
Here is the Python code:
import os
import shlex
import shutil
import subprocess
from typing import List, Dict
from xml.etree import ElementTree
class OutputParser:
def __init__(self, xml: str):
self.xml = xml
def get_addresses(self) -> List[Dict[str, str]]:
"""
Several things need to happen for an address to be included:
1. Host is up
2. Port is TCP 22
3. Port status is open
Otherwise the iterator will not be filled
:return:
"""
addresses = []
root = ElementTree.fromstring(self.xml)
for host in root.findall('host'):
name = None
for hostnames in host.findall('hostnames'):
for hostname in hostnames:
name = hostname.attrib['name']
break
if not name:
continue
is_up = True
for status in host.findall('status'):
if status.attrib['state'] == 'down':
is_up = False
break
if not is_up:
continue
port_22_open = False
for ports in host.findall('ports'):
for port in ports.findall('port'):
if port.attrib['portid'] == '22':
for state in port.findall('state'):
if state.attrib['state'] == "open": # Up not the same as open, we want SSH access!
port_22_open = True
break
if not port_22_open:
continue
address = None
for address_data in host.findall('address'):
address = address_data.attrib['addr']
break
addresses.append({name: address})
return addresses
class NmapRunner:
def __init__(self, hosts: str):
self.nmap_report_file = None
found_nmap = shutil.which('nmap', mode=os.F_OK | os.X_OK)
if not found_nmap:
raise ValueError(f"Nmap is missing!")
self.nmap = found_nmap
self.hosts = hosts
def __iter__(self):
command = [self.nmap]
command.extend(__NMAP__FLAGS__)
command.append(self.hosts)
completed = subprocess.run(
command,
capture_output=True,
shell=False,
check=True
)
completed.check_returncode()
out_par = OutputParser(completed.stdout.decode('utf-8'))
self.addresses = out_par.get_addresses()
return self
def __next__(self):
try:
return self.addresses.pop()
except IndexError:
raise StopIteration
"""
Convert the args for proper usage on the Nmap CLI
Also, do not use the -n flag. We need to resolve IP addresses to hostname, even if we sacrifice a little bit of speed
"""
NMAP_DEFAULT_FLAGS = {
'-p22': 'Port 22 scanning',
'-T4': 'Aggressive timing template',
'-PE': 'Enable this echo request behavior. Good for internal networks',
'--disable-arp-ping': 'No ARP or ND Ping',
'--max-hostgroup 50': 'Hostgroup (batch of hosts scanned concurrently) size',
'--min-parallelism 50': 'Number of probes that may be outstanding for a host group',
'--osscan-limit': 'Limit OS detection to promising targets',
'--max-os-tries 1': 'Maximum number of OS detection tries against a target',
'-oX -': 'Send XML output to STDOUT, avoid creating a temp file'
}
__NMAP__FLAGS__ = shlex.split(" ".join(NMAP_DEFAULT_FLAGS.keys()))
For example, you could use the NmapRunner like this:
import pprint
def test_iter():
for hosts_data in NmapRunner("192.168.1.0/24"):
pprint.print(hosts_data)
Believe it or not, this is the most challenging portion of writing the Python script. The next part requires writing a script that follows Ansible requirements for dynamic inventory scripts.
[ Learn more: Ansible vs. Terraform, clarified ]
Write an inventory script
Ansible's documentation is very clear about the inventory script's requirements:
-
--list
and--host
excluding flags.
It must support - It must return JSON in a format that Ansible can understand.
- Other flags can be added, but Ansible will not use them.
But wait a second. There is nothing in there that says Ansible provides the network to scan for hosts, so how do you inject that?
Simple! The script will read a YAML configuration file from a predefined location, like /home/josevnz/.ansible/plugins/cliconf/nmap_plugin.yaml
with the following code:
# Sample configuration file. Suspiciously similar to the official Nmap plugin configuration file
---
plugin: nmap_plugin
address: 192.168.1.0/24
The class that reads the YAML configuration file is quite simple:
"""
Using a configuration file in YAML format, so it can be reused by the plugin.
Init file with ConfigParser is more convenient, trying to keep Ansible happy :wink:
"""
import os
from yaml import safe_load
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
def load_config(config_file: str = os.path.expanduser("~/.ansible/plugins/cliconf/nmap_inventory.cfg")):
"""
Where to copy the configuration file:
```shell
[josevnz@dmaf5 EnableSysadmin]$ ansible-config dump |grep DEFAULT_CLICONF_PLUGIN_PATH
DEFAULT_CLICONF_PLUGIN_PATH(default) = ['/home/josevnz/.ansible/plugins/cliconf', '/usr/share/ansible/plugins/cliconf']
```
:param config_file:
:return:
"""
with open(config_file, 'r') as stream:
data = safe_load(stream)
return data
Very good. Here is the dynamic inventory script code now:
!/usr/bin/env python
"""
# nmap_inventory.py - Generates an Ansible dynamic inventory using NMAP
# Author
Jose Vicente Nunez Zuleta (kodegeek.com@protonmail.com)
"""
import json
import os.path
import argparse
from configparser import ConfigParser, MissingSectionHeaderError
from inventories.nmap import NmapRunner
def load_config() -> ConfigParser:
cp = ConfigParser()
try:
config_file = os.path.expanduser("~/.config/nmap_inventory.cfg")
cp.read(config_file)
if not cp.has_option('DEFAULT', 'Addresses'):
raise ValueError("Missing configuration option: DEFAULT -> Addresses")
except MissingSectionHeaderError as mhe:
raise ValueError("Invalid or missing configuration file:", mhe)
return cp
def get_empty_vars():
return json.dumps({})
def get_list(search_address: str, pretty=False) -> str:
"""
All group is always returned
Ungrouped at least contains all the names found
IP addresses are added as vars in the __meta tag, for efficiency as mentioned in the Ansible documentation.
Note than we can add logic here to put machines in custom groups, will keep it simple for now.
:param search_address: Results of the scan with Nmap
:param pretty: Indentation
:return: JSON string
"""
found_data = list(NmapRunner(search_address))
hostvars = {}
ungrouped = []
for host_data in found_data:
for name, address in host_data.items():
if name not in ungrouped:
ungrouped.append(name)
if name not in hostvars:
hostvars[name] = {'ip': []}
hostvars[name]['ip'].append(address)
data = {
'_meta': {
'hostvars': hostvars
},
'all': {
'children': [
'ungrouped'
]
},
'ungrouped': {
'hosts': ungrouped
}
}
return json.dumps(data, indent=pretty)
if __name__ == '__main__':
arg_parser = argparse.ArgumentParser(
description=__doc__,
prog=__file__
)
arg_parser.add_argument(
'--pretty',
action='store_true',
default=False,
help="Pretty print JSON"
)
mandatory_options = arg_parser.add_mutually_exclusive_group()
mandatory_options.add_argument(
'--list',
action='store',
nargs="*",
default="dummy",
help="Show JSON of all managed hosts"
)
mandatory_options.add_argument(
'--host',
action='store',
help="Display vars related to the host"
)
try:
config = load_config()
addresses = config.get('DEFAULT', 'Addresses')
args = arg_parser.parse_args()
if args.host:
print(get_empty_vars())
elif len(args.list) >= 0:
print(get_list(addresses, args.pretty))
else:
raise ValueError("Expecting either --host $HOSTNAME or --list")
except ValueError:
raise
You probably noticed a few things:
- Most of the code in this script is dedicated to handling arguments and loading configurations, besides presenting the JSON.
- You could add grouping logic into
get_list
. For now, I'm populating the two required default groups.
[ Looking for more on system automation? Get started with The Automated Enterprise, a free book from Red Hat. ]
Give it a try
It is time to kick the tires. Install the code first:
$ git clone git@github.com:josevnz/ExtendingAnsibleWithPython.git
$ cd ExtendingAnsibleWithPython/Inventory
$ python3 -m venv ~/virtualenv/ExtendingAnsibleWithPythonInventory
. ~/virtualenv/ExtendingAnsibleWithPythonInventory/bin/activate
$ pip install wheel
$ pip install --upgrade pip
$ pip install build
$ python setup.py bdist_wheel
$ pip install dist/*
The virtual environment should be active now. See if you get an empty host information result (use the name of a machine in your network):
$ ansible-inventory --inventory scripts/nmap_inventory.py --host raspberrypi
{}
Good, the empty JSON is expected, as you did not implement the --host $HOSTNAME
method. What about --list
?
$ ansible-inventory --inventory scripts/nmap_inventory.py --list
{
"_meta": {
"hostvars": {
"dmaf5.home": {
"ip": [
"192.168.1.26",
"192.168.1.25"
]
},
"macmini2": {
"ip": [
"192.168.1.16"
]
},
"raspberrypi": {
"ip": [
"192.168.1.11"
]
}
}
},
"all": {
"children": [
"ungrouped"
]
},
"ungrouped": {
"hosts": [
"dmaf5.home",
"macmini2",
"raspberrypi"
]
}
}
Finally, try the new inventory with the ping
module:
$ ansible --inventory scripts/nmap_inventory.py --user josevnz -m ping all
dmaf5.home | SUCCESS => {
"changed": false,
"ping": "pong"
}
raspberrypi | SUCCESS => {
"changed": false,
"ping": "pong"
}
macmini2 | SUCCESS => {
"changed": false,
"ping": "pong"
}
What's next?
This article covered a lot of material. Here is a summary of what you did:
- Wrote utility classes to call Nmap and parse the scan results
- Reused those classes inside a script that complies with the Ansible inventory requirements so that it can be used to create a dynamic inventory
This is probably the most flexible option for creating dynamic inventories in terms of coding, as the requirements are pretty loose and can be done in any language.
But is it the right way? In the last part of this article series, I'll show you why it might be better to write an Ansible plugin instead of using an inventory script.
Remember, you can download the code and experiment! The best way to learn is by doing and making mistakes.
Image
Learn how to use the host_list and Nmap plugins to build inventory files for your Ansible playbooks.
Image
Ansible makes it easier to create, share, and manage automation, but like any tool, some ways of using it are better than others.
Image
Using Ansible makes provisioning virtual machines automated, flexible, repeatable, and fast.
Jose Vicente Nunez
Proud dad and husband, software developer and sysadmin. Recreational runner and geek. More about me