Chapter 48: Advanced Network Automation

Learning Objectives

By the end of this chapter, you will be able to: - Implement advanced Ansible automation for network infrastructure - Develop Python scripts for network management and monitoring - Configure NETCONF and RESTCONF for programmable networks - Build CI/CD pipelines for network configuration management - Create automated testing frameworks for network validation

Advanced Ansible for Network Automation

Ansible Architecture for Networks

Ansible provides agentless automation for network devices using SSH, NETCONF, or API connections.

Ansible Components

  • Control Node: Machine running Ansible
  • Managed Nodes: Network devices being automated
  • Inventory: List of managed devices
  • Playbooks: Automation scripts in YAML
  • Modules: Task execution units
  • Plugins: Extend Ansible functionality

Advanced Ansible Lab Setup

# Advanced network automation lab
name: network-automation
prefix: auto

topology:
  nodes:
    # Ansible control node
    ansible-controller:
      kind: linux
      image: ubuntu:20.04
      mgmt-ipv4: 172.20.20.10
      exec:
        - apt update && apt install -y python3 python3-pip openssh-client git
        - pip3 install ansible netmiko napalm jinja2 paramiko
        - mkdir -p /etc/ansible /opt/automation
        - echo "[defaults]" > /etc/ansible/ansible.cfg
        - echo "host_key_checking = False" >> /etc/ansible/ansible.cfg
        - echo "timeout = 30" >> /etc/ansible/ansible.cfg

    # Network devices for automation
    core-router:
      kind: cisco_iosxe
      image: cisco/iosxe:latest
      mgmt-ipv4: 172.20.20.11
      startup-config: |
        hostname Core-Router
        !
        username ansible privilege 15 secret ansible123
        !
        ip domain-name automation.lab
        crypto key generate rsa modulus 2048
        !
        line vty 0 15
         login local
         transport input ssh
        !
        interface GigabitEthernet0/0/0
         description To-Distribution
         ip address 10.1.12.1 255.255.255.252
         no shutdown
        !
        interface Loopback0
         ip address 1.1.1.1 255.255.255.255
        !

    dist-switch:
      kind: cisco_iosxe
      image: cisco/catalyst:latest
      mgmt-ipv4: 172.20.20.12
      startup-config: |
        hostname Dist-Switch
        !
        username ansible privilege 15 secret ansible123
        !
        ip domain-name automation.lab
        crypto key generate rsa modulus 2048
        !
        line vty 0 15
         login local
         transport input ssh
        !
        interface GigabitEthernet1/0/1
         description To-Core
         switchport mode trunk
         no shutdown
        !
        interface Loopback0
         ip address 2.2.2.2 255.255.255.255
        !

    access-switch:
      kind: cisco_iosxe
      image: cisco/catalyst:latest
      mgmt-ipv4: 172.20.20.13
      startup-config: |
        hostname Access-Switch
        !
        username ansible privilege 15 secret ansible123
        !
        ip domain-name automation.lab
        crypto key generate rsa modulus 2048
        !
        line vty 0 15
         login local
         transport input ssh
        !
        interface GigabitEthernet1/0/1
         description To-Distribution
         switchport mode trunk
         no shutdown
        !
        interface Loopback0
         ip address 3.3.3.3 255.255.255.255
        !

  links:
    - endpoints: ["core-router:eth1", "dist-switch:eth1"]
    - endpoints: ["dist-switch:eth2", "access-switch:eth1"]

Ansible Inventory Management

Dynamic Inventory

#!/usr/bin/env python3
# dynamic_inventory.py
import json
import sys

def get_inventory():
    inventory = {
        'all': {
            'hosts': ['172.20.20.11', '172.20.20.12', '172.20.20.13'],
            'vars': {
                'ansible_user': 'ansible',
                'ansible_password': 'ansible123',
                'ansible_network_os': 'ios',
                'ansible_connection': 'network_cli'
            }
        },
        'routers': {
            'hosts': ['172.20.20.11'],
            'vars': {
                'device_type': 'router'
            }
        },
        'switches': {
            'hosts': ['172.20.20.12', '172.20.20.13'],
            'vars': {
                'device_type': 'switch'
            }
        },
        '_meta': {
            'hostvars': {
                '172.20.20.11': {
                    'hostname': 'Core-Router',
                    'device_role': 'core'
                },
                '172.20.20.12': {
                    'hostname': 'Dist-Switch',
                    'device_role': 'distribution'
                },
                '172.20.20.13': {
                    'hostname': 'Access-Switch',
                    'device_role': 'access'
                }
            }
        }
    }
    return inventory

if __name__ == '__main__':
    if len(sys.argv) == 2 and sys.argv[1] == '--list':
        print(json.dumps(get_inventory(), indent=2))
    elif len(sys.argv) == 3 and sys.argv[1] == '--host':
        print(json.dumps({}))
    else:
        print("Usage: %s --list or %s --host <hostname>" % (sys.argv[0], sys.argv[0]))

Static Inventory with Groups

# inventory/hosts.ini
[all:vars]
ansible_user=ansible
ansible_password=ansible123
ansible_network_os=ios
ansible_connection=network_cli

[routers]
core-router ansible_host=172.20.20.11 hostname=Core-Router

[switches]
dist-switch ansible_host=172.20.20.12 hostname=Dist-Switch
access-switch ansible_host=172.20.20.13 hostname=Access-Switch

[core]
core-router

[distribution]
dist-switch

[access]
access-switch

Advanced Ansible Playbooks

Configuration Management Playbook

# playbooks/configure_network.yml
---
- name: Configure Network Infrastructure
  hosts: all
  gather_facts: no
  vars:
    backup_dir: "./backups/{{ inventory_hostname }}"

  tasks:
    - name: Create backup directory
      file:
        path: "{{ backup_dir }}"
        state: directory
      delegate_to: localhost
      run_once: true

    - name: Backup current configuration
      ios_config:
        backup: yes
        backup_options:
          filename: "{{ inventory_hostname }}_{{ ansible_date_time.epoch }}.cfg"
          dir_path: "{{ backup_dir }}"

    - name: Configure global settings
      ios_config:
        lines:
          - service timestamps debug datetime msec
          - service timestamps log datetime msec
          - service password-encryption
          - no ip domain-lookup
          - ip domain-name {{ domain_name | default('automation.lab') }}
        save_when: modified

    - name: Configure NTP
      ios_config:
        lines:
          - ntp server {{ ntp_server | default('pool.ntp.org') }}
        save_when: modified

    - name: Configure SNMP
      ios_config:
        lines:
          - snmp-server community {{ snmp_community | default('public') }} RO
          - snmp-server location {{ snmp_location | default('Data Center') }}
          - snmp-server contact {{ snmp_contact | default('admin@company.com') }}
        save_when: modified

    - name: Configure logging
      ios_config:
        lines:
          - logging buffered 16384 informational
          - logging console critical
          - logging monitor informational
          - logging {{ syslog_server | default('172.20.20.100') }}
        save_when: modified

    - name: Verify configuration
      ios_command:
        commands:
          - show running-config | include ntp
          - show running-config | include snmp
          - show running-config | include logging
      register: config_verification

    - name: Display verification results
      debug:
        var: config_verification.stdout_lines

VLAN Management Playbook

# playbooks/manage_vlans.yml
---
- name: Manage VLANs on Switches
  hosts: switches
  gather_facts: no
  vars:
    vlans:
      - { id: 10, name: "USERS" }
      - { id: 20, name: "SERVERS" }
      - { id: 30, name: "MANAGEMENT" }
      - { id: 99, name: "NATIVE" }

  tasks:
    - name: Configure VLANs
      ios_vlans:
        config:
          - vlan_id: "{{ item.id }}"
            name: "{{ item.name }}"
            state: active
        state: merged
      loop: "{{ vlans }}"

    - name: Configure trunk interfaces
      ios_l2_interfaces:
        config:
          - name: "{{ item }}"
            trunk:
              allowed_vlans: "10,20,30,99"
              native_vlan: 99
        state: merged
      loop:
        - GigabitEthernet1/0/1
        - GigabitEthernet1/0/2
      when: inventory_hostname in groups['switches']

    - name: Configure access interfaces
      ios_l2_interfaces:
        config:
          - name: "GigabitEthernet1/0/{{ item.port }}"
            access:
              vlan: "{{ item.vlan }}"
        state: merged
      loop:
        - { port: 3, vlan: 10 }
        - { port: 4, vlan: 10 }
        - { port: 5, vlan: 20 }
        - { port: 6, vlan: 20 }
      when: inventory_hostname in groups['access']

    - name: Verify VLAN configuration
      ios_command:
        commands:
          - show vlan brief
          - show interfaces trunk
      register: vlan_status

    - name: Save VLAN status to file
      copy:
        content: "{{ vlan_status.stdout[0] }}"
        dest: "./reports/{{ inventory_hostname }}_vlans.txt"
      delegate_to: localhost

Jinja2 Templates for Configuration

Router Configuration Template

{# templates/router_config.j2 #}
hostname {{ hostname }}
!
{% if domain_name is defined %}
ip domain-name {{ domain_name }}
{% endif %}
!
{% for interface in interfaces %}
interface {{ interface.name }}
{% if interface.description is defined %}
 description {{ interface.description }}
{% endif %}
{% if interface.ip_address is defined %}
 ip address {{ interface.ip_address }} {{ interface.subnet_mask }}
{% endif %}
{% if interface.shutdown is not defined or not interface.shutdown %}
 no shutdown
{% endif %}
!
{% endfor %}
!
{% if ospf is defined %}
router ospf {{ ospf.process_id }}
 router-id {{ ospf.router_id }}
{% for network in ospf.networks %}
 network {{ network.network }} {{ network.wildcard }} area {{ network.area }}
{% endfor %}
!
{% endif %}
!
{% if static_routes is defined %}
{% for route in static_routes %}
ip route {{ route.network }} {{ route.mask }} {{ route.next_hop }}
{% endfor %}
{% endif %}

Switch Configuration Template

{# templates/switch_config.j2 #}
hostname {{ hostname }}
!
{% if vlans is defined %}
{% for vlan in vlans %}
vlan {{ vlan.id }}
 name {{ vlan.name }}
!
{% endfor %}
{% endif %}
!
{% if interfaces is defined %}
{% for interface in interfaces %}
interface {{ interface.name }}
{% if interface.description is defined %}
 description {{ interface.description }}
{% endif %}
{% if interface.mode == 'access' %}
 switchport mode access
 switchport access vlan {{ interface.vlan }}
{% elif interface.mode == 'trunk' %}
 switchport mode trunk
{% if interface.allowed_vlans is defined %}
 switchport trunk allowed vlan {{ interface.allowed_vlans }}
{% endif %}
{% if interface.native_vlan is defined %}
 switchport trunk native vlan {{ interface.native_vlan }}
{% endif %}
{% endif %}
{% if interface.portfast is defined and interface.portfast %}
 spanning-tree portfast
{% endif %}
 no shutdown
!
{% endfor %}
{% endif %}

Python Network Automation

Advanced Python Scripting

Network Device Management Class

#!/usr/bin/env python3
# network_manager.py
import netmiko
import json
import logging
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

class NetworkManager:
    def __init__(self, devices_file):
        self.devices = self.load_devices(devices_file)
        self.setup_logging()

    def load_devices(self, devices_file):
        with open(devices_file, 'r') as f:
            return json.load(f)

    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('network_automation.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def connect_device(self, device):
        try:
            connection = netmiko.ConnectHandler(**device)
            self.logger.info(f"Connected to {device['host']}")
            return connection
        except Exception as e:
            self.logger.error(f"Failed to connect to {device['host']}: {str(e)}")
            return None

    def execute_command(self, device, command):
        connection = self.connect_device(device)
        if connection:
            try:
                output = connection.send_command(command)
                connection.disconnect()
                return {
                    'host': device['host'],
                    'command': command,
                    'output': output,
                    'status': 'success'
                }
            except Exception as e:
                connection.disconnect()
                return {
                    'host': device['host'],
                    'command': command,
                    'error': str(e),
                    'status': 'failed'
                }
        return {
            'host': device['host'],
            'command': command,
            'error': 'Connection failed',
            'status': 'failed'
        }

    def execute_commands_parallel(self, commands, max_workers=5):
        results = []
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for device in self.devices:
                for command in commands:
                    future = executor.submit(self.execute_command, device, command)
                    futures.append(future)

            for future in as_completed(futures):
                results.append(future.result())

        return results

    def backup_configurations(self, backup_dir='./backups'):
        import os
        os.makedirs(backup_dir, exist_ok=True)

        results = []
        for device in self.devices:
            connection = self.connect_device(device)
            if connection:
                try:
                    config = connection.send_command('show running-config')
                    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
                    filename = f"{backup_dir}/{device['host']}_{timestamp}.cfg"

                    with open(filename, 'w') as f:
                        f.write(config)

                    results.append({
                        'host': device['host'],
                        'backup_file': filename,
                        'status': 'success'
                    })
                    connection.disconnect()
                except Exception as e:
                    results.append({
                        'host': device['host'],
                        'error': str(e),
                        'status': 'failed'
                    })
                    connection.disconnect()

        return results

    def deploy_configuration(self, device, config_lines):
        connection = self.connect_device(device)
        if connection:
            try:
                output = connection.send_config_set(config_lines)
                connection.save_config()
                connection.disconnect()
                return {
                    'host': device['host'],
                    'output': output,
                    'status': 'success'
                }
            except Exception as e:
                connection.disconnect()
                return {
                    'host': device['host'],
                    'error': str(e),
                    'status': 'failed'
                }
        return {
            'host': device['host'],
            'error': 'Connection failed',
            'status': 'failed'
        }

# Usage example
if __name__ == '__main__':
    nm = NetworkManager('devices.json')

    # Backup all configurations
    backup_results = nm.backup_configurations()
    print("Backup Results:", json.dumps(backup_results, indent=2))

    # Execute commands in parallel
    commands = ['show version', 'show ip interface brief', 'show running-config | include hostname']
    results = nm.execute_commands_parallel(commands)

    # Save results to file
    with open('command_results.json', 'w') as f:
        json.dump(results, f, indent=2)

Network Monitoring Script

#!/usr/bin/env python3
# network_monitor.py
import netmiko
import json
import time
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
import threading

class NetworkMonitor:
    def __init__(self, config_file):
        with open(config_file, 'r') as f:
            self.config = json.load(f)
        self.devices = self.config['devices']
        self.thresholds = self.config['thresholds']
        self.email_config = self.config['email']
        self.monitoring = True

    def check_interface_utilization(self, device):
        try:
            connection = netmiko.ConnectHandler(**device)
            output = connection.send_command('show interfaces | include rate')
            connection.disconnect()

            alerts = []
            for line in output.split('\n'):
                if 'input rate' in line and 'output rate' in line:
                    # Parse interface utilization
                    parts = line.split()
                    input_rate = int(parts[4])
                    output_rate = int(parts[9])

                    if input_rate > self.thresholds['interface_utilization']:
                        alerts.append(f"High input rate on {device['host']}: {input_rate} bps")
                    if output_rate > self.thresholds['interface_utilization']:
                        alerts.append(f"High output rate on {device['host']}: {output_rate} bps")

            return alerts
        except Exception as e:
            return [f"Error checking {device['host']}: {str(e)}"]

    def check_cpu_utilization(self, device):
        try:
            connection = netmiko.ConnectHandler(**device)
            output = connection.send_command('show processes cpu | include CPU')
            connection.disconnect()

            # Parse CPU utilization
            for line in output.split('\n'):
                if 'CPU utilization' in line:
                    cpu_percent = int(line.split('%')[0].split()[-1])
                    if cpu_percent > self.thresholds['cpu_utilization']:
                        return [f"High CPU utilization on {device['host']}: {cpu_percent}%"]

            return []
        except Exception as e:
            return [f"Error checking CPU on {device['host']}: {str(e)}"]

    def check_memory_utilization(self, device):
        try:
            connection = netmiko.ConnectHandler(**device)
            output = connection.send_command('show memory statistics')
            connection.disconnect()

            # Parse memory utilization
            for line in output.split('\n'):
                if 'Processor' in line and 'Used' in line:
                    parts = line.split()
                    used = int(parts[2])
                    total = int(parts[1])
                    utilization = (used / total) * 100

                    if utilization > self.thresholds['memory_utilization']:
                        return [f"High memory utilization on {device['host']}: {utilization:.1f}%"]

            return []
        except Exception as e:
            return [f"Error checking memory on {device['host']}: {str(e)}"]

    def send_alert(self, alerts):
        if not alerts:
            return

        msg = MimeMultipart()
        msg['From'] = self.email_config['from']
        msg['To'] = ', '.join(self.email_config['to'])
        msg['Subject'] = 'Network Alert'

        body = '\n'.join(alerts)
        msg.attach(MimeText(body, 'plain'))

        try:
            server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
            server.starttls()
            server.login(self.email_config['username'], self.email_config['password'])
            server.send_message(msg)
            server.quit()
            print(f"Alert sent: {len(alerts)} issues")
        except Exception as e:
            print(f"Failed to send alert: {str(e)}")

    def monitor_device(self, device):
        all_alerts = []

        # Check various metrics
        all_alerts.extend(self.check_interface_utilization(device))
        all_alerts.extend(self.check_cpu_utilization(device))
        all_alerts.extend(self.check_memory_utilization(device))

        return all_alerts

    def start_monitoring(self, interval=300):  # 5 minutes
        print(f"Starting network monitoring (interval: {interval}s)")

        while self.monitoring:
            all_alerts = []

            # Monitor all devices
            for device in self.devices:
                alerts = self.monitor_device(device)
                all_alerts.extend(alerts)

            # Send alerts if any issues found
            if all_alerts:
                self.send_alert(all_alerts)

            time.sleep(interval)

    def stop_monitoring(self):
        self.monitoring = False
        print("Monitoring stopped")

# Configuration file example
monitor_config = {
    "devices": [
        {
            "device_type": "cisco_ios",
            "host": "172.20.20.11",
            "username": "ansible",
            "password": "ansible123"
        }
    ],
    "thresholds": {
        "interface_utilization": 80000000,  # 80 Mbps
        "cpu_utilization": 80,              # 80%
        "memory_utilization": 80            # 80%
    },
    "email": {
        "smtp_server": "smtp.gmail.com",
        "smtp_port": 587,
        "from": "alerts@company.com",
        "to": ["admin@company.com"],
        "username": "alerts@company.com",
        "password": "app_password"
    }
}

# Save configuration
with open('monitor_config.json', 'w') as f:
    json.dump(monitor_config, f, indent=2)

NETCONF and RESTCONF

NETCONF Implementation

NETCONF Client Script

#!/usr/bin/env python3
# netconf_client.py
from ncclient import manager
import xml.etree.ElementTree as ET
import json

class NetconfClient:
    def __init__(self, host, username, password, port=830):
        self.host = host
        self.username = username
        self.password = password
        self.port = port
        self.connection = None

    def connect(self):
        try:
            self.connection = manager.connect(
                host=self.host,
                port=self.port,
                username=self.username,
                password=self.password,
                hostkey_verify=False,
                device_params={'name': 'iosxe'}
            )
            print(f"Connected to {self.host} via NETCONF")
            return True
        except Exception as e:
            print(f"Failed to connect: {str(e)}")
            return False

    def get_config(self, source='running'):
        if not self.connection:
            return None

        try:
            config = self.connection.get_config(source=source)
            return config.data_xml
        except Exception as e:
            print(f"Failed to get config: {str(e)}")
            return None

    def get_interfaces(self):
        filter_xml = """
        <filter>
            <interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
                <interface>
                    <name/>
                    <type/>
                    <enabled/>
                    <oper-status/>
                </interface>
            </interfaces>
        </filter>
        """

        try:
            result = self.connection.get(filter=filter_xml)
            return result.data_xml
        except Exception as e:
            print(f"Failed to get interfaces: {str(e)}")
            return None

    def configure_interface(self, interface_name, ip_address, subnet_mask):
        config_xml = f"""
        <config>
            <interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
                <interface>
                    <name>{interface_name}</name>
                    <type xmlns:ianaift="urn:ietf:params:xml:ns:yang:iana-if-type">
                        ianaift:ethernetCsmacd
                    </type>
                    <enabled>true</enabled>
                    <ipv4 xmlns="urn:ietf:params:xml:ns:yang:ietf-ip">
                        <address>
                            <ip>{ip_address}</ip>
                            <netmask>{subnet_mask}</netmask>
                        </address>
                    </ipv4>
                </interface>
            </interfaces>
        </config>
        """

        try:
            result = self.connection.edit_config(target='running', config=config_xml)
            print(f"Interface {interface_name} configured successfully")
            return True
        except Exception as e:
            print(f"Failed to configure interface: {str(e)}")
            return False

    def disconnect(self):
        if self.connection:
            self.connection.close_session()
            print("NETCONF session closed")

# Usage example
if __name__ == '__main__':
    client = NetconfClient('172.20.20.11', 'ansible', 'ansible123')

    if client.connect():
        # Get interface information
        interfaces = client.get_interfaces()
        if interfaces:
            print("Current interfaces:")
            print(interfaces)

        # Configure an interface
        client.configure_interface('GigabitEthernet0/0/1', '10.1.1.1', '255.255.255.0')

        client.disconnect()

RESTCONF Implementation

RESTCONF Client Script

#!/usr/bin/env python3
# restconf_client.py
import requests
import json
from requests.auth import HTTPBasicAuth
import urllib3

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class RestconfClient:
    def __init__(self, host, username, password, port=443):
        self.host = host
        self.username = username
        self.password = password
        self.port = port
        self.base_url = f"https://{host}:{port}/restconf"
        self.auth = HTTPBasicAuth(username, password)
        self.headers = {
            'Accept': 'application/yang-data+json',
            'Content-Type': 'application/yang-data+json'
        }

    def get_capabilities(self):
        url = f"{self.base_url}/data/ietf-restconf-monitoring:restconf-state/capabilities"
        try:
            response = requests.get(url, auth=self.auth, headers=self.headers, verify=False)
            if response.status_code == 200:
                return response.json()
            else:
                print(f"Failed to get capabilities: {response.status_code}")
                return None
        except Exception as e:
            print(f"Error getting capabilities: {str(e)}")
            return None

    def get_interfaces(self):
        url = f"{self.base_url}/data/ietf-interfaces:interfaces"
        try:
            response = requests.get(url, auth=self.auth, headers=self.headers, verify=False)
            if response.status_code == 200:
                return response.json()
            else:
                print(f"Failed to get interfaces: {response.status_code}")
                return None
        except Exception as e:
            print(f"Error getting interfaces: {str(e)}")
            return None

    def create_interface(self, interface_data):
        url = f"{self.base_url}/data/ietf-interfaces:interfaces"
        try:
            response = requests.post(
                url,
                auth=self.auth,
                headers=self.headers,
                data=json.dumps(interface_data),
                verify=False
            )
            if response.status_code in [200, 201, 204]:
                print("Interface created successfully")
                return True
            else:
                print(f"Failed to create interface: {response.status_code}")
                print(response.text)
                return False
        except Exception as e:
            print(f"Error creating interface: {str(e)}")
            return False

    def update_interface(self, interface_name, interface_data):
        url = f"{self.base_url}/data/ietf-interfaces:interfaces/interface={interface_name}"
        try:
            response = requests.put(
                url,
                auth=self.auth,
                headers=self.headers,
                data=json.dumps(interface_data),
                verify=False
            )
            if response.status_code in [200, 204]:
                print(f"Interface {interface_name} updated successfully")
                return True
            else:
                print(f"Failed to update interface: {response.status_code}")
                return False
        except Exception as e:
            print(f"Error updating interface: {str(e)}")
            return False

    def delete_interface(self, interface_name):
        url = f"{self.base_url}/data/ietf-interfaces:interfaces/interface={interface_name}"
        try:
            response = requests.delete(url, auth=self.auth, headers=self.headers, verify=False)
            if response.status_code in [200, 204]:
                print(f"Interface {interface_name} deleted successfully")
                return True
            else:
                print(f"Failed to delete interface: {response.status_code}")
                return False
        except Exception as e:
            print(f"Error deleting interface: {str(e)}")
            return False

# Usage example
if __name__ == '__main__':
    client = RestconfClient('172.20.20.11', 'ansible', 'ansible123')

    # Get current interfaces
    interfaces = client.get_interfaces()
    if interfaces:
        print("Current interfaces:")
        print(json.dumps(interfaces, indent=2))

    # Create new interface configuration
    new_interface = {
        "ietf-interfaces:interface": {
            "name": "Loopback100",
            "type": "iana-if-type:softwareLoopback",
            "enabled": True,
            "ietf-ip:ipv4": {
                "address": [
                    {
                        "ip": "100.100.100.100",
                        "netmask": "255.255.255.255"
                    }
                ]
            }
        }
    }

    # Create the interface
    client.create_interface(new_interface)

CI/CD for Network Configuration

GitLab CI/CD Pipeline

# .gitlab-ci.yml
stages:
  - validate
  - test
  - deploy
  - verify

variables:
  ANSIBLE_HOST_KEY_CHECKING: "False"
  ANSIBLE_STDOUT_CALLBACK: "yaml"

validate_syntax:
  stage: validate
  image: python:3.9
  before_script:
    - pip install ansible yamllint ansible-lint
  script:
    - yamllint playbooks/
    - ansible-lint playbooks/
    - ansible-playbook --syntax-check playbooks/site.yml
  only:
    - merge_requests
    - master

test_configurations:
  stage: test
  image: python:3.9
  before_script:
    - pip install ansible netmiko pytest
  script:
    - python -m pytest tests/ -v
    - ansible-playbook playbooks/site.yml --check --diff
  only:
    - merge_requests
    - master

deploy_staging:
  stage: deploy
  image: python:3.9
  before_script:
    - pip install ansible netmiko
  script:
    - ansible-playbook -i inventory/staging playbooks/site.yml
  environment:
    name: staging
  only:
    - master
  when: manual

deploy_production:
  stage: deploy
  image: python:3.9
  before_script:
    - pip install ansible netmiko
  script:
    - ansible-playbook -i inventory/production playbooks/site.yml
  environment:
    name: production
  only:
    - master
  when: manual

verify_deployment:
  stage: verify
  image: python:3.9
  before_script:
    - pip install ansible netmiko pytest
  script:
    - python -m pytest tests/integration/ -v
    - ansible-playbook playbooks/verify.yml
  only:
    - master
  dependencies:
    - deploy_production

Network Testing Framework

#!/usr/bin/env python3
# tests/test_network.py
import pytest
import netmiko
import json
import re

class TestNetworkConnectivity:
    @pytest.fixture(scope="class")
    def devices(self):
        with open('inventory/devices.json', 'r') as f:
            return json.load(f)

    @pytest.fixture(scope="class")
    def connections(self, devices):
        connections = {}
        for device in devices:
            try:
                conn = netmiko.ConnectHandler(**device)
                connections[device['host']] = conn
            except Exception as e:
                pytest.fail(f"Failed to connect to {device['host']}: {str(e)}")
        yield connections

        # Cleanup
        for conn in connections.values():
            conn.disconnect()

    def test_device_reachability(self, connections):
        """Test that all devices are reachable"""
        for host, conn in connections.items():
            output = conn.send_command("show version")
            assert "uptime" in output.lower(), f"Device {host} not responding properly"

    def test_interface_status(self, connections):
        """Test that critical interfaces are up"""
        critical_interfaces = {
            '172.20.20.11': ['GigabitEthernet0/0/0'],
            '172.20.20.12': ['GigabitEthernet1/0/1'],
            '172.20.20.13': ['GigabitEthernet1/0/1']
        }

        for host, conn in connections.items():
            if host in critical_interfaces:
                output = conn.send_command("show ip interface brief")
                for interface in critical_interfaces[host]:
                    assert interface in output, f"Interface {interface} not found on {host}"
                    # Check if interface is up
                    pattern = rf"{interface}\s+\S+\s+\S+\s+up\s+up"
                    assert re.search(pattern, output), f"Interface {interface} is down on {host}"

    def test_routing_table(self, connections):
        """Test that routing tables contain expected routes"""
        for host, conn in connections.items():
            output = conn.send_command("show ip route")
            # Should have at least connected routes
            assert "C " in output, f"No connected routes found on {host}"

    def test_vlan_configuration(self, connections):
        """Test VLAN configuration on switches"""
        switch_hosts = ['172.20.20.12', '172.20.20.13']
        expected_vlans = ['10', '20', '30']

        for host in switch_hosts:
            if host in connections:
                conn = connections[host]
                output = conn.send_command("show vlan brief")
                for vlan in expected_vlans:
                    assert vlan in output, f"VLAN {vlan} not found on switch {host}"

    def test_spanning_tree(self, connections):
        """Test spanning tree status"""
        switch_hosts = ['172.20.20.12', '172.20.20.13']

        for host in switch_hosts:
            if host in connections:
                conn = connections[host]
                output = conn.send_command("show spanning-tree summary")
                assert "forwarding" in output.lower(), f"No forwarding ports on {host}"

    def test_ntp_synchronization(self, connections):
        """Test NTP synchronization"""
        for host, conn in connections.items():
            output = conn.send_command("show ntp status")
            # Should show synchronized or at least configured
            assert "Clock is" in output, f"NTP not configured on {host}"

class TestNetworkSecurity:
    @pytest.fixture(scope="class")
    def connections(self):
        # Same as above
        pass

    def test_ssh_access_only(self, connections):
        """Test that only SSH is enabled for remote access"""
        for host, conn in connections.items():
            output = conn.send_command("show running-config | include line vty")
            assert "transport input ssh" in output, f"Telnet may be enabled on {host}"

    def test_snmp_community(self, connections):
        """Test SNMP community strings"""
        for host, conn in connections.items():
            output = conn.send_command("show running-config | include snmp-server community")
            # Should not have default communities
            assert "public" not in output, f"Default SNMP community found on {host}"
            assert "private" not in output, f"Default SNMP community found on {host}"

if __name__ == '__main__':
    pytest.main(['-v', __file__])

Summary

Advanced network automation combines multiple technologies and methodologies to create robust, scalable, and maintainable network infrastructure. Understanding Ansible automation, Python scripting, NETCONF/RESTCONF protocols, and CI/CD practices enables modern network operations and DevNetOps workflows.

Key concepts covered: - Advanced Ansible playbooks and inventory management - Python scripting for network management and monitoring - NETCONF and RESTCONF for programmable networks - CI/CD pipelines for network configuration management - Automated testing frameworks for network validation

In the next chapter, we’ll explore network orchestration and Infrastructure as Code (IaC) concepts for large-scale network deployments.

Review Questions

  1. How do you implement dynamic inventory in Ansible for network devices?
  2. What are the advantages of NETCONF over traditional CLI-based management?
  3. How do you create automated testing for network configurations?
  4. What are best practices for CI/CD in network automation?
  5. How do you implement parallel execution in Python network scripts?

Hands-on Exercises

Exercise 1: Advanced Ansible Automation

  1. Deploy the network automation lab
  2. Create dynamic inventory scripts
  3. Develop comprehensive playbooks for device configuration
  4. Implement Jinja2 templates for configuration generation

Exercise 2: Python Network Management

  1. Create a network monitoring script with alerting
  2. Implement parallel device management
  3. Build configuration backup and restore functionality
  4. Develop network discovery and documentation tools

Exercise 3: NETCONF/RESTCONF Implementation

  1. Configure NETCONF on network devices
  2. Create NETCONF client scripts for configuration management
  3. Implement RESTCONF API interactions
  4. Compare NETCONF vs RESTCONF performance and capabilities

Exercise 4: CI/CD Pipeline Development

  1. Set up GitLab CI/CD for network automation
  2. Create automated testing frameworks
  3. Implement staging and production deployment workflows
  4. Develop rollback and recovery procedures

Additional Resources