Chapter 48: Advanced Network Automation
Learning Objectives
By the end of this chapter, you will be able to: - Implement advanced Ansible automation for network infrastructure - Develop Python scripts for network management and monitoring - Configure NETCONF and RESTCONF for programmable networks - Build CI/CD pipelines for network configuration management - Create automated testing frameworks for network validation
Advanced Ansible for Network Automation
Ansible Architecture for Networks
Ansible provides agentless automation for network devices using SSH, NETCONF, or API connections.
Ansible Components
- Control Node: Machine running Ansible
- Managed Nodes: Network devices being automated
- Inventory: List of managed devices
- Playbooks: Automation scripts in YAML
- Modules: Task execution units
- Plugins: Extend Ansible functionality
Advanced Ansible Lab Setup
# Advanced network automation lab
name: network-automation
prefix: auto
topology:
nodes:
# Ansible control node
ansible-controller:
kind: linux
image: ubuntu:20.04
mgmt-ipv4: 172.20.20.10
exec:
- apt update && apt install -y python3 python3-pip openssh-client git
- pip3 install ansible netmiko napalm jinja2 paramiko
- mkdir -p /etc/ansible /opt/automation
- echo "[defaults]" > /etc/ansible/ansible.cfg
- echo "host_key_checking = False" >> /etc/ansible/ansible.cfg
- echo "timeout = 30" >> /etc/ansible/ansible.cfg
# Network devices for automation
core-router:
kind: cisco_iosxe
image: cisco/iosxe:latest
mgmt-ipv4: 172.20.20.11
startup-config: |
hostname Core-Router
!
username ansible privilege 15 secret ansible123
!
ip domain-name automation.lab
crypto key generate rsa modulus 2048
!
line vty 0 15
login local
transport input ssh
!
interface GigabitEthernet0/0/0
description To-Distribution
ip address 10.1.12.1 255.255.255.252
no shutdown
!
interface Loopback0
ip address 1.1.1.1 255.255.255.255
!
dist-switch:
kind: cisco_iosxe
image: cisco/catalyst:latest
mgmt-ipv4: 172.20.20.12
startup-config: |
hostname Dist-Switch
!
username ansible privilege 15 secret ansible123
!
ip domain-name automation.lab
crypto key generate rsa modulus 2048
!
line vty 0 15
login local
transport input ssh
!
interface GigabitEthernet1/0/1
description To-Core
switchport mode trunk
no shutdown
!
interface Loopback0
ip address 2.2.2.2 255.255.255.255
!
access-switch:
kind: cisco_iosxe
image: cisco/catalyst:latest
mgmt-ipv4: 172.20.20.13
startup-config: |
hostname Access-Switch
!
username ansible privilege 15 secret ansible123
!
ip domain-name automation.lab
crypto key generate rsa modulus 2048
!
line vty 0 15
login local
transport input ssh
!
interface GigabitEthernet1/0/1
description To-Distribution
switchport mode trunk
no shutdown
!
interface Loopback0
ip address 3.3.3.3 255.255.255.255
!
links:
- endpoints: ["core-router:eth1", "dist-switch:eth1"]
- endpoints: ["dist-switch:eth2", "access-switch:eth1"]Ansible Inventory Management
Dynamic Inventory
#!/usr/bin/env python3
# dynamic_inventory.py
import json
import sys
def get_inventory():
inventory = {
'all': {
'hosts': ['172.20.20.11', '172.20.20.12', '172.20.20.13'],
'vars': {
'ansible_user': 'ansible',
'ansible_password': 'ansible123',
'ansible_network_os': 'ios',
'ansible_connection': 'network_cli'
}
},
'routers': {
'hosts': ['172.20.20.11'],
'vars': {
'device_type': 'router'
}
},
'switches': {
'hosts': ['172.20.20.12', '172.20.20.13'],
'vars': {
'device_type': 'switch'
}
},
'_meta': {
'hostvars': {
'172.20.20.11': {
'hostname': 'Core-Router',
'device_role': 'core'
},
'172.20.20.12': {
'hostname': 'Dist-Switch',
'device_role': 'distribution'
},
'172.20.20.13': {
'hostname': 'Access-Switch',
'device_role': 'access'
}
}
}
}
return inventory
if __name__ == '__main__':
if len(sys.argv) == 2 and sys.argv[1] == '--list':
print(json.dumps(get_inventory(), indent=2))
elif len(sys.argv) == 3 and sys.argv[1] == '--host':
print(json.dumps({}))
else:
print("Usage: %s --list or %s --host <hostname>" % (sys.argv[0], sys.argv[0]))Static Inventory with Groups
# inventory/hosts.ini
[all:vars]
ansible_user=ansible
ansible_password=ansible123
ansible_network_os=ios
ansible_connection=network_cli
[routers]
core-router ansible_host=172.20.20.11 hostname=Core-Router
[switches]
dist-switch ansible_host=172.20.20.12 hostname=Dist-Switch
access-switch ansible_host=172.20.20.13 hostname=Access-Switch
[core]
core-router
[distribution]
dist-switch
[access]
access-switchAdvanced Ansible Playbooks
Configuration Management Playbook
# playbooks/configure_network.yml
---
- name: Configure Network Infrastructure
hosts: all
gather_facts: no
vars:
backup_dir: "./backups/{{ inventory_hostname }}"
tasks:
- name: Create backup directory
file:
path: "{{ backup_dir }}"
state: directory
delegate_to: localhost
run_once: true
- name: Backup current configuration
ios_config:
backup: yes
backup_options:
filename: "{{ inventory_hostname }}_{{ ansible_date_time.epoch }}.cfg"
dir_path: "{{ backup_dir }}"
- name: Configure global settings
ios_config:
lines:
- service timestamps debug datetime msec
- service timestamps log datetime msec
- service password-encryption
- no ip domain-lookup
- ip domain-name {{ domain_name | default('automation.lab') }}
save_when: modified
- name: Configure NTP
ios_config:
lines:
- ntp server {{ ntp_server | default('pool.ntp.org') }}
save_when: modified
- name: Configure SNMP
ios_config:
lines:
- snmp-server community {{ snmp_community | default('public') }} RO
- snmp-server location {{ snmp_location | default('Data Center') }}
- snmp-server contact {{ snmp_contact | default('admin@company.com') }}
save_when: modified
- name: Configure logging
ios_config:
lines:
- logging buffered 16384 informational
- logging console critical
- logging monitor informational
- logging {{ syslog_server | default('172.20.20.100') }}
save_when: modified
- name: Verify configuration
ios_command:
commands:
- show running-config | include ntp
- show running-config | include snmp
- show running-config | include logging
register: config_verification
- name: Display verification results
debug:
var: config_verification.stdout_linesVLAN Management Playbook
# playbooks/manage_vlans.yml
---
- name: Manage VLANs on Switches
hosts: switches
gather_facts: no
vars:
vlans:
- { id: 10, name: "USERS" }
- { id: 20, name: "SERVERS" }
- { id: 30, name: "MANAGEMENT" }
- { id: 99, name: "NATIVE" }
tasks:
- name: Configure VLANs
ios_vlans:
config:
- vlan_id: "{{ item.id }}"
name: "{{ item.name }}"
state: active
state: merged
loop: "{{ vlans }}"
- name: Configure trunk interfaces
ios_l2_interfaces:
config:
- name: "{{ item }}"
trunk:
allowed_vlans: "10,20,30,99"
native_vlan: 99
state: merged
loop:
- GigabitEthernet1/0/1
- GigabitEthernet1/0/2
when: inventory_hostname in groups['switches']
- name: Configure access interfaces
ios_l2_interfaces:
config:
- name: "GigabitEthernet1/0/{{ item.port }}"
access:
vlan: "{{ item.vlan }}"
state: merged
loop:
- { port: 3, vlan: 10 }
- { port: 4, vlan: 10 }
- { port: 5, vlan: 20 }
- { port: 6, vlan: 20 }
when: inventory_hostname in groups['access']
- name: Verify VLAN configuration
ios_command:
commands:
- show vlan brief
- show interfaces trunk
register: vlan_status
- name: Save VLAN status to file
copy:
content: "{{ vlan_status.stdout[0] }}"
dest: "./reports/{{ inventory_hostname }}_vlans.txt"
delegate_to: localhostJinja2 Templates for Configuration
Router Configuration Template
{# templates/router_config.j2 #}
hostname {{ hostname }}
!
{% if domain_name is defined %}
ip domain-name {{ domain_name }}
{% endif %}
!
{% for interface in interfaces %}
interface {{ interface.name }}
{% if interface.description is defined %}
description {{ interface.description }}
{% endif %}
{% if interface.ip_address is defined %}
ip address {{ interface.ip_address }} {{ interface.subnet_mask }}
{% endif %}
{% if interface.shutdown is not defined or not interface.shutdown %}
no shutdown
{% endif %}
!
{% endfor %}
!
{% if ospf is defined %}
router ospf {{ ospf.process_id }}
router-id {{ ospf.router_id }}
{% for network in ospf.networks %}
network {{ network.network }} {{ network.wildcard }} area {{ network.area }}
{% endfor %}
!
{% endif %}
!
{% if static_routes is defined %}
{% for route in static_routes %}
ip route {{ route.network }} {{ route.mask }} {{ route.next_hop }}
{% endfor %}
{% endif %}
Switch Configuration Template
{# templates/switch_config.j2 #}
hostname {{ hostname }}
!
{% if vlans is defined %}
{% for vlan in vlans %}
vlan {{ vlan.id }}
name {{ vlan.name }}
!
{% endfor %}
{% endif %}
!
{% if interfaces is defined %}
{% for interface in interfaces %}
interface {{ interface.name }}
{% if interface.description is defined %}
description {{ interface.description }}
{% endif %}
{% if interface.mode == 'access' %}
switchport mode access
switchport access vlan {{ interface.vlan }}
{% elif interface.mode == 'trunk' %}
switchport mode trunk
{% if interface.allowed_vlans is defined %}
switchport trunk allowed vlan {{ interface.allowed_vlans }}
{% endif %}
{% if interface.native_vlan is defined %}
switchport trunk native vlan {{ interface.native_vlan }}
{% endif %}
{% endif %}
{% if interface.portfast is defined and interface.portfast %}
spanning-tree portfast
{% endif %}
no shutdown
!
{% endfor %}
{% endif %}
Python Network Automation
Advanced Python Scripting
Network Device Management Class
#!/usr/bin/env python3
# network_manager.py
import netmiko
import json
import logging
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
class NetworkManager:
def __init__(self, devices_file):
self.devices = self.load_devices(devices_file)
self.setup_logging()
def load_devices(self, devices_file):
with open(devices_file, 'r') as f:
return json.load(f)
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('network_automation.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def connect_device(self, device):
try:
connection = netmiko.ConnectHandler(**device)
self.logger.info(f"Connected to {device['host']}")
return connection
except Exception as e:
self.logger.error(f"Failed to connect to {device['host']}: {str(e)}")
return None
def execute_command(self, device, command):
connection = self.connect_device(device)
if connection:
try:
output = connection.send_command(command)
connection.disconnect()
return {
'host': device['host'],
'command': command,
'output': output,
'status': 'success'
}
except Exception as e:
connection.disconnect()
return {
'host': device['host'],
'command': command,
'error': str(e),
'status': 'failed'
}
return {
'host': device['host'],
'command': command,
'error': 'Connection failed',
'status': 'failed'
}
def execute_commands_parallel(self, commands, max_workers=5):
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for device in self.devices:
for command in commands:
future = executor.submit(self.execute_command, device, command)
futures.append(future)
for future in as_completed(futures):
results.append(future.result())
return results
def backup_configurations(self, backup_dir='./backups'):
import os
os.makedirs(backup_dir, exist_ok=True)
results = []
for device in self.devices:
connection = self.connect_device(device)
if connection:
try:
config = connection.send_command('show running-config')
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{backup_dir}/{device['host']}_{timestamp}.cfg"
with open(filename, 'w') as f:
f.write(config)
results.append({
'host': device['host'],
'backup_file': filename,
'status': 'success'
})
connection.disconnect()
except Exception as e:
results.append({
'host': device['host'],
'error': str(e),
'status': 'failed'
})
connection.disconnect()
return results
def deploy_configuration(self, device, config_lines):
connection = self.connect_device(device)
if connection:
try:
output = connection.send_config_set(config_lines)
connection.save_config()
connection.disconnect()
return {
'host': device['host'],
'output': output,
'status': 'success'
}
except Exception as e:
connection.disconnect()
return {
'host': device['host'],
'error': str(e),
'status': 'failed'
}
return {
'host': device['host'],
'error': 'Connection failed',
'status': 'failed'
}
# Usage example
if __name__ == '__main__':
nm = NetworkManager('devices.json')
# Backup all configurations
backup_results = nm.backup_configurations()
print("Backup Results:", json.dumps(backup_results, indent=2))
# Execute commands in parallel
commands = ['show version', 'show ip interface brief', 'show running-config | include hostname']
results = nm.execute_commands_parallel(commands)
# Save results to file
with open('command_results.json', 'w') as f:
json.dump(results, f, indent=2)Network Monitoring Script
#!/usr/bin/env python3
# network_monitor.py
import netmiko
import json
import time
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
import threading
class NetworkMonitor:
def __init__(self, config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.devices = self.config['devices']
self.thresholds = self.config['thresholds']
self.email_config = self.config['email']
self.monitoring = True
def check_interface_utilization(self, device):
try:
connection = netmiko.ConnectHandler(**device)
output = connection.send_command('show interfaces | include rate')
connection.disconnect()
alerts = []
for line in output.split('\n'):
if 'input rate' in line and 'output rate' in line:
# Parse interface utilization
parts = line.split()
input_rate = int(parts[4])
output_rate = int(parts[9])
if input_rate > self.thresholds['interface_utilization']:
alerts.append(f"High input rate on {device['host']}: {input_rate} bps")
if output_rate > self.thresholds['interface_utilization']:
alerts.append(f"High output rate on {device['host']}: {output_rate} bps")
return alerts
except Exception as e:
return [f"Error checking {device['host']}: {str(e)}"]
def check_cpu_utilization(self, device):
try:
connection = netmiko.ConnectHandler(**device)
output = connection.send_command('show processes cpu | include CPU')
connection.disconnect()
# Parse CPU utilization
for line in output.split('\n'):
if 'CPU utilization' in line:
cpu_percent = int(line.split('%')[0].split()[-1])
if cpu_percent > self.thresholds['cpu_utilization']:
return [f"High CPU utilization on {device['host']}: {cpu_percent}%"]
return []
except Exception as e:
return [f"Error checking CPU on {device['host']}: {str(e)}"]
def check_memory_utilization(self, device):
try:
connection = netmiko.ConnectHandler(**device)
output = connection.send_command('show memory statistics')
connection.disconnect()
# Parse memory utilization
for line in output.split('\n'):
if 'Processor' in line and 'Used' in line:
parts = line.split()
used = int(parts[2])
total = int(parts[1])
utilization = (used / total) * 100
if utilization > self.thresholds['memory_utilization']:
return [f"High memory utilization on {device['host']}: {utilization:.1f}%"]
return []
except Exception as e:
return [f"Error checking memory on {device['host']}: {str(e)}"]
def send_alert(self, alerts):
if not alerts:
return
msg = MimeMultipart()
msg['From'] = self.email_config['from']
msg['To'] = ', '.join(self.email_config['to'])
msg['Subject'] = 'Network Alert'
body = '\n'.join(alerts)
msg.attach(MimeText(body, 'plain'))
try:
server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
server.quit()
print(f"Alert sent: {len(alerts)} issues")
except Exception as e:
print(f"Failed to send alert: {str(e)}")
def monitor_device(self, device):
all_alerts = []
# Check various metrics
all_alerts.extend(self.check_interface_utilization(device))
all_alerts.extend(self.check_cpu_utilization(device))
all_alerts.extend(self.check_memory_utilization(device))
return all_alerts
def start_monitoring(self, interval=300): # 5 minutes
print(f"Starting network monitoring (interval: {interval}s)")
while self.monitoring:
all_alerts = []
# Monitor all devices
for device in self.devices:
alerts = self.monitor_device(device)
all_alerts.extend(alerts)
# Send alerts if any issues found
if all_alerts:
self.send_alert(all_alerts)
time.sleep(interval)
def stop_monitoring(self):
self.monitoring = False
print("Monitoring stopped")
# Configuration file example
monitor_config = {
"devices": [
{
"device_type": "cisco_ios",
"host": "172.20.20.11",
"username": "ansible",
"password": "ansible123"
}
],
"thresholds": {
"interface_utilization": 80000000, # 80 Mbps
"cpu_utilization": 80, # 80%
"memory_utilization": 80 # 80%
},
"email": {
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"from": "alerts@company.com",
"to": ["admin@company.com"],
"username": "alerts@company.com",
"password": "app_password"
}
}
# Save configuration
with open('monitor_config.json', 'w') as f:
json.dump(monitor_config, f, indent=2)NETCONF and RESTCONF
NETCONF Implementation
NETCONF Client Script
#!/usr/bin/env python3
# netconf_client.py
from ncclient import manager
import xml.etree.ElementTree as ET
import json
class NetconfClient:
def __init__(self, host, username, password, port=830):
self.host = host
self.username = username
self.password = password
self.port = port
self.connection = None
def connect(self):
try:
self.connection = manager.connect(
host=self.host,
port=self.port,
username=self.username,
password=self.password,
hostkey_verify=False,
device_params={'name': 'iosxe'}
)
print(f"Connected to {self.host} via NETCONF")
return True
except Exception as e:
print(f"Failed to connect: {str(e)}")
return False
def get_config(self, source='running'):
if not self.connection:
return None
try:
config = self.connection.get_config(source=source)
return config.data_xml
except Exception as e:
print(f"Failed to get config: {str(e)}")
return None
def get_interfaces(self):
filter_xml = """
<filter>
<interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
<interface>
<name/>
<type/>
<enabled/>
<oper-status/>
</interface>
</interfaces>
</filter>
"""
try:
result = self.connection.get(filter=filter_xml)
return result.data_xml
except Exception as e:
print(f"Failed to get interfaces: {str(e)}")
return None
def configure_interface(self, interface_name, ip_address, subnet_mask):
config_xml = f"""
<config>
<interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
<interface>
<name>{interface_name}</name>
<type xmlns:ianaift="urn:ietf:params:xml:ns:yang:iana-if-type">
ianaift:ethernetCsmacd
</type>
<enabled>true</enabled>
<ipv4 xmlns="urn:ietf:params:xml:ns:yang:ietf-ip">
<address>
<ip>{ip_address}</ip>
<netmask>{subnet_mask}</netmask>
</address>
</ipv4>
</interface>
</interfaces>
</config>
"""
try:
result = self.connection.edit_config(target='running', config=config_xml)
print(f"Interface {interface_name} configured successfully")
return True
except Exception as e:
print(f"Failed to configure interface: {str(e)}")
return False
def disconnect(self):
if self.connection:
self.connection.close_session()
print("NETCONF session closed")
# Usage example
if __name__ == '__main__':
client = NetconfClient('172.20.20.11', 'ansible', 'ansible123')
if client.connect():
# Get interface information
interfaces = client.get_interfaces()
if interfaces:
print("Current interfaces:")
print(interfaces)
# Configure an interface
client.configure_interface('GigabitEthernet0/0/1', '10.1.1.1', '255.255.255.0')
client.disconnect()RESTCONF Implementation
RESTCONF Client Script
#!/usr/bin/env python3
# restconf_client.py
import requests
import json
from requests.auth import HTTPBasicAuth
import urllib3
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class RestconfClient:
def __init__(self, host, username, password, port=443):
self.host = host
self.username = username
self.password = password
self.port = port
self.base_url = f"https://{host}:{port}/restconf"
self.auth = HTTPBasicAuth(username, password)
self.headers = {
'Accept': 'application/yang-data+json',
'Content-Type': 'application/yang-data+json'
}
def get_capabilities(self):
url = f"{self.base_url}/data/ietf-restconf-monitoring:restconf-state/capabilities"
try:
response = requests.get(url, auth=self.auth, headers=self.headers, verify=False)
if response.status_code == 200:
return response.json()
else:
print(f"Failed to get capabilities: {response.status_code}")
return None
except Exception as e:
print(f"Error getting capabilities: {str(e)}")
return None
def get_interfaces(self):
url = f"{self.base_url}/data/ietf-interfaces:interfaces"
try:
response = requests.get(url, auth=self.auth, headers=self.headers, verify=False)
if response.status_code == 200:
return response.json()
else:
print(f"Failed to get interfaces: {response.status_code}")
return None
except Exception as e:
print(f"Error getting interfaces: {str(e)}")
return None
def create_interface(self, interface_data):
url = f"{self.base_url}/data/ietf-interfaces:interfaces"
try:
response = requests.post(
url,
auth=self.auth,
headers=self.headers,
data=json.dumps(interface_data),
verify=False
)
if response.status_code in [200, 201, 204]:
print("Interface created successfully")
return True
else:
print(f"Failed to create interface: {response.status_code}")
print(response.text)
return False
except Exception as e:
print(f"Error creating interface: {str(e)}")
return False
def update_interface(self, interface_name, interface_data):
url = f"{self.base_url}/data/ietf-interfaces:interfaces/interface={interface_name}"
try:
response = requests.put(
url,
auth=self.auth,
headers=self.headers,
data=json.dumps(interface_data),
verify=False
)
if response.status_code in [200, 204]:
print(f"Interface {interface_name} updated successfully")
return True
else:
print(f"Failed to update interface: {response.status_code}")
return False
except Exception as e:
print(f"Error updating interface: {str(e)}")
return False
def delete_interface(self, interface_name):
url = f"{self.base_url}/data/ietf-interfaces:interfaces/interface={interface_name}"
try:
response = requests.delete(url, auth=self.auth, headers=self.headers, verify=False)
if response.status_code in [200, 204]:
print(f"Interface {interface_name} deleted successfully")
return True
else:
print(f"Failed to delete interface: {response.status_code}")
return False
except Exception as e:
print(f"Error deleting interface: {str(e)}")
return False
# Usage example
if __name__ == '__main__':
client = RestconfClient('172.20.20.11', 'ansible', 'ansible123')
# Get current interfaces
interfaces = client.get_interfaces()
if interfaces:
print("Current interfaces:")
print(json.dumps(interfaces, indent=2))
# Create new interface configuration
new_interface = {
"ietf-interfaces:interface": {
"name": "Loopback100",
"type": "iana-if-type:softwareLoopback",
"enabled": True,
"ietf-ip:ipv4": {
"address": [
{
"ip": "100.100.100.100",
"netmask": "255.255.255.255"
}
]
}
}
}
# Create the interface
client.create_interface(new_interface)CI/CD for Network Configuration
GitLab CI/CD Pipeline
# .gitlab-ci.yml
stages:
- validate
- test
- deploy
- verify
variables:
ANSIBLE_HOST_KEY_CHECKING: "False"
ANSIBLE_STDOUT_CALLBACK: "yaml"
validate_syntax:
stage: validate
image: python:3.9
before_script:
- pip install ansible yamllint ansible-lint
script:
- yamllint playbooks/
- ansible-lint playbooks/
- ansible-playbook --syntax-check playbooks/site.yml
only:
- merge_requests
- master
test_configurations:
stage: test
image: python:3.9
before_script:
- pip install ansible netmiko pytest
script:
- python -m pytest tests/ -v
- ansible-playbook playbooks/site.yml --check --diff
only:
- merge_requests
- master
deploy_staging:
stage: deploy
image: python:3.9
before_script:
- pip install ansible netmiko
script:
- ansible-playbook -i inventory/staging playbooks/site.yml
environment:
name: staging
only:
- master
when: manual
deploy_production:
stage: deploy
image: python:3.9
before_script:
- pip install ansible netmiko
script:
- ansible-playbook -i inventory/production playbooks/site.yml
environment:
name: production
only:
- master
when: manual
verify_deployment:
stage: verify
image: python:3.9
before_script:
- pip install ansible netmiko pytest
script:
- python -m pytest tests/integration/ -v
- ansible-playbook playbooks/verify.yml
only:
- master
dependencies:
- deploy_productionNetwork Testing Framework
#!/usr/bin/env python3
# tests/test_network.py
import pytest
import netmiko
import json
import re
class TestNetworkConnectivity:
@pytest.fixture(scope="class")
def devices(self):
with open('inventory/devices.json', 'r') as f:
return json.load(f)
@pytest.fixture(scope="class")
def connections(self, devices):
connections = {}
for device in devices:
try:
conn = netmiko.ConnectHandler(**device)
connections[device['host']] = conn
except Exception as e:
pytest.fail(f"Failed to connect to {device['host']}: {str(e)}")
yield connections
# Cleanup
for conn in connections.values():
conn.disconnect()
def test_device_reachability(self, connections):
"""Test that all devices are reachable"""
for host, conn in connections.items():
output = conn.send_command("show version")
assert "uptime" in output.lower(), f"Device {host} not responding properly"
def test_interface_status(self, connections):
"""Test that critical interfaces are up"""
critical_interfaces = {
'172.20.20.11': ['GigabitEthernet0/0/0'],
'172.20.20.12': ['GigabitEthernet1/0/1'],
'172.20.20.13': ['GigabitEthernet1/0/1']
}
for host, conn in connections.items():
if host in critical_interfaces:
output = conn.send_command("show ip interface brief")
for interface in critical_interfaces[host]:
assert interface in output, f"Interface {interface} not found on {host}"
# Check if interface is up
pattern = rf"{interface}\s+\S+\s+\S+\s+up\s+up"
assert re.search(pattern, output), f"Interface {interface} is down on {host}"
def test_routing_table(self, connections):
"""Test that routing tables contain expected routes"""
for host, conn in connections.items():
output = conn.send_command("show ip route")
# Should have at least connected routes
assert "C " in output, f"No connected routes found on {host}"
def test_vlan_configuration(self, connections):
"""Test VLAN configuration on switches"""
switch_hosts = ['172.20.20.12', '172.20.20.13']
expected_vlans = ['10', '20', '30']
for host in switch_hosts:
if host in connections:
conn = connections[host]
output = conn.send_command("show vlan brief")
for vlan in expected_vlans:
assert vlan in output, f"VLAN {vlan} not found on switch {host}"
def test_spanning_tree(self, connections):
"""Test spanning tree status"""
switch_hosts = ['172.20.20.12', '172.20.20.13']
for host in switch_hosts:
if host in connections:
conn = connections[host]
output = conn.send_command("show spanning-tree summary")
assert "forwarding" in output.lower(), f"No forwarding ports on {host}"
def test_ntp_synchronization(self, connections):
"""Test NTP synchronization"""
for host, conn in connections.items():
output = conn.send_command("show ntp status")
# Should show synchronized or at least configured
assert "Clock is" in output, f"NTP not configured on {host}"
class TestNetworkSecurity:
@pytest.fixture(scope="class")
def connections(self):
# Same as above
pass
def test_ssh_access_only(self, connections):
"""Test that only SSH is enabled for remote access"""
for host, conn in connections.items():
output = conn.send_command("show running-config | include line vty")
assert "transport input ssh" in output, f"Telnet may be enabled on {host}"
def test_snmp_community(self, connections):
"""Test SNMP community strings"""
for host, conn in connections.items():
output = conn.send_command("show running-config | include snmp-server community")
# Should not have default communities
assert "public" not in output, f"Default SNMP community found on {host}"
assert "private" not in output, f"Default SNMP community found on {host}"
if __name__ == '__main__':
pytest.main(['-v', __file__])Summary
Advanced network automation combines multiple technologies and methodologies to create robust, scalable, and maintainable network infrastructure. Understanding Ansible automation, Python scripting, NETCONF/RESTCONF protocols, and CI/CD practices enables modern network operations and DevNetOps workflows.
Key concepts covered: - Advanced Ansible playbooks and inventory management - Python scripting for network management and monitoring - NETCONF and RESTCONF for programmable networks - CI/CD pipelines for network configuration management - Automated testing frameworks for network validation
In the next chapter, we’ll explore network orchestration and Infrastructure as Code (IaC) concepts for large-scale network deployments.
Review Questions
- How do you implement dynamic inventory in Ansible for network devices?
- What are the advantages of NETCONF over traditional CLI-based management?
- How do you create automated testing for network configurations?
- What are best practices for CI/CD in network automation?
- How do you implement parallel execution in Python network scripts?
Hands-on Exercises
Exercise 1: Advanced Ansible Automation
- Deploy the network automation lab
- Create dynamic inventory scripts
- Develop comprehensive playbooks for device configuration
- Implement Jinja2 templates for configuration generation
Exercise 2: Python Network Management
- Create a network monitoring script with alerting
- Implement parallel device management
- Build configuration backup and restore functionality
- Develop network discovery and documentation tools
Exercise 3: NETCONF/RESTCONF Implementation
- Configure NETCONF on network devices
- Create NETCONF client scripts for configuration management
- Implement RESTCONF API interactions
- Compare NETCONF vs RESTCONF performance and capabilities
Exercise 4: CI/CD Pipeline Development
- Set up GitLab CI/CD for network automation
- Create automated testing frameworks
- Implement staging and production deployment workflows
- Develop rollback and recovery procedures