[python] asa library

I created an asa library which contains functions require to perform some asa operations, this is not a complete set, there are some sets which I have not tested yet, the below functions are all tested and working.

from device_logging.cisco_logger import device_logging_decorator
from netmiko import ConnectHandler
from netmiko.ssh_exception import NetMikoAuthenticationException, NetMikoTimeoutException
from paramiko.ssh_exception import SSHException
from device_logging.cisco_logger import today_date
from jinja2 import FileSystemLoader, Environment
import os
import re
import logging

template_loader = FileSystemLoader("templates")
template_env = Environment(loader=template_loader)
netmiko_exceptions = NetMikoAuthenticationException, NetMikoTimeoutException, SSHException


# Decorator to help to push the command set.
def deploy_config_set(function):
    @device_logging_decorator
    def device_connection_wrapper(*args, **kwargs):
        # The wrapper uses the return values of the function,
        # to establish a connection to the device.
        # After connection to device is established,
        # the command set is sent over. If there is no exception,
        # the wrapper write the configuration set to memory.
        cmd, device = function(*args, **kwargs)
        try:
            with ConnectHandler(**device) as conn:
                try:
                    conn.send_config_set(cmd, delay_factor=2)
                    logging.info("Command executed successfully...")
                except netmiko_exceptions as e:
                    logging.error(e)
                conn.send_command("write memory")
                logging.info("Configuration saved to memory.")
        except netmiko_exceptions as e:
            logging.error(e)
    return device_connection_wrapper


# show running configuration.
# has the choice to display the configuration or backup to a file.
@device_logging_decorator
def show_configuration(is_backup=False, file_path="D:\\temp\\", **kwargs):
    with ConnectHandler(**kwargs) as conn:
        output = conn.send_command("show running-config")
        # if backup is needed, check file_path and file_path existence.
        # create a file and copy the config. else return only the output.
        if is_backup:
            hostname = conn.find_prompt()
            if file_path:
                file_path = file_path
                # if file_path exists go to create file, else create the directory.
                if not os.path.exists(file_path):
                    os.mkdir(file_path)
            with open("{}{}.cfg".format(file_path, hostname[:-1] + "_" + today_date), "w+") as file:
                file.write(output)
    return output


# Get network object-groups and return the result as dictionary.
@device_logging_decorator
def get_network_objects(**kwargs):
    with ConnectHandler(**kwargs) as conn:
        # show object-group network has a textfsm module to convert to dictionary.
        result = conn.send_command("show object-group network", use_textfsm=True)
    return result


# show int ip brief, get all interfaces information
# return the result as dictionary.
@device_logging_decorator
def show_firewall_interfaces(**kwargs):
    with ConnectHandler(**kwargs) as conn:
        result = conn.send_command("show interface ip brief")
    processed_result = []
    tmp_result2 = []
    tmp_result1 = result.splitlines()[1:]
    logging.info("Firewall interfaces retrieved.")
    for p in tmp_result1:
        logging.debug("{} is evaluated to check for 'administratively down' keyword.".format(p))
        if re.search("administratively down", p):
            logging.debug("Replacing the word 'administratively down' to 'shutdown'.")
            tmp_result2.append(p.replace("administratively down", "shutdown"))
        else:
            tmp_result2.append(p)
    for r in tmp_result2:
        r_list = r.split()
        processed_result.append(
            {
                "interface": r_list[0],
                "ip_address": r_list[1],
                "is_ok": r_list[2],
                "method": r_list[3],
                "status": r_list[4],
                "protocol": r_list[5]
            }
        )
    logging.debug("Interfaces information collected - variable is 'processed_result.'")
    return processed_result


# Get interfaces status, shutdown, down or up.
@device_logging_decorator
def get_firewall_interfaces(mode="all", **kwargs):
    interface_details = show_firewall_interfaces(**kwargs)
    interfaces = []
    if mode == "all":
        for i in interface_details:
            interfaces.append(i.get("interface"))
        return interfaces
    elif mode == "down":
        for i in interface_details:
            if i.get("protocol") == "down" or i.get("status") == "down" or i.get("status") == "shutdown":
                interfaces.append(i.get("interface"))
        return interfaces
    elif mode == "up":
        for i in interface_details:
            if i.get("protocol") == "up" and i.get("status") == "up":
                interfaces.append(i.get("interface"))
        return interfaces


# SSH hardening configuration
@deploy_config_set
def ssh_hardening_config(network, netmask, mgmt_nameif, mod=2048, **device):
    config_template = template_env.get_template("ssh_base.txt")
    logging.info("Retrieving ssh hardening base configuration.")
    device_config = device
    cmd_set = config_template.render(network=network,
                                     netmask=netmask,
                                     mgmt_nameif=mgmt_nameif,
                                     mod=mod)
    logging.info("SSH hardening configuration generated.")
    return cmd_set, device_config


# check if the interface is down
def is_interface_shutdown(interface, **kwargs):
    logging.info("Checking {} status.".format(interface))
    interfaces = get_firewall_interfaces(mode="down", **kwargs)
    if interface in interfaces:
        logging.info("{} is up.".format(interface))
        return True
    else:
        logging.info("{} is not connected or shutdown.".format(interface))
        return False


@deploy_config_set
def configure_routed_interface(interface, nameif, sec_level, ip_addr, netmask, **device):
    config_template = template_env.get_template("routed_interface_base.txt")
    logging.info("Retrieving base template.")
    device_config = device
    cmd_set = config_template.render(
        intf_id=interface,
        nameif=nameif,
        security_level=sec_level,
        ip_address=ip_addr,
        netmask=netmask
    )
    logging.info("Routed interface configuration generated.")
    return cmd_set, device_config


@deploy_config_set
def create_network_host_object(object_name, host_address, description=None, **device):
    config_template = template_env.get_template("network_object_host.txt")
    device_config = device
    cmd_set = config_template.render(
        object_name=object_name,
        ip_address=host_address,
        description=description
    )
    return cmd_set, device_config


@deploy_config_set
def configure_subif(*args, **kwargs):
    device_config = kwargs
    config_template = template_env.get_template("subif_base.txt")
    user_input_config = args[0]
    cmd_set = config_template.render(
        intf_id=user_input_config.get("interface", None),
        vlan_id=user_input_config.get("vlan", None),
        nameif=user_input_config.get("nameif", None),
        description=user_input_config.get("description", None),
        security_level=user_input_config.get("sec_level", None),
        ip_addr=user_input_config.get("ip", None),
        netmask=user_input_config.get("netmask", None)
    )
    return cmd_set, device_config
Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s