[python]Remove duplicates from a list

I have learned something interesting… removing duplicates in a list is actually quite easily achieved without using iteration…

from collections import OrderedDict

a = ["1", 1, 2, 3, "3", "3", "4"]
b = list(set(a))

c = list(OrderedDict.fromkeys(a))
print("Unordered non-duplicated list {}\n".format(b))
print("Ordered non-duplicated list {}, preserved the list of the original a".format(c))

The output is self explanatory, you use set if you do not bother about the order, you use Ordereddict if you need to preserve the original order.

Unordered non-duplicated list [1, 2, 3, '4', '3', '1']

Ordered non-duplicated list ['1', 1, 2, 3, '3', '4'], preserved the list of the original a

Advertisements
Posted in Python, Scripting | Tagged , , | Leave a comment

[python]List comprehension and map method

I want this output in string:
[‘1’, ‘2’, ‘3’, ‘4’, ‘5’, ‘6’, ‘7’, ‘8’, ‘9’, ’10’, ’11’, ’12’, ’13’, ’14’, ’15’, ’16’, ’17’, ’18’, ’19’, ’20’]

Method one: List comprehension:

print([str(i) for i in range(1,21)])

Method two: Map method.
map(function, iterable)

print(list(map(str, range(1,21))))
Posted in Python, Scripting | Tagged , , | Leave a comment

[python]Sending access request to Tufin SecureChange

Required improvements/Bugs

  • IP address checking required, situation such as this 192.168.1.1/24.
  • Dynamically update the action field, right now all request is assumed Accept.
  • Dynamically update comment field on each access request, right now the script does not check comment
  • Optimize the code by writing methods, there are codes inside which are repetitive hence causing the entire code to be 200++ long…
  • Use logging module to replace the print output, place the logger on different events for future troubleshooting. (I normally do this, but i am too tired…
  • Use the os.path module for checking if rules.xlsx exist and also to check if the base.xml exist.

Pre-requisite to understand the code

  • Try out xml module library to appreciate how xml document is created, try not to use beautifulsoup, beautifulsoup is more human in my opinion i.e. more flavours for human to consume. Xml module is rather raw… like sushi…
  • Try out the openpyxl module, fantasy module to read and write Excel and CSV, it is designed to be more human to be honest I prefer cooked meal to sushi…
  • Try out mako template, to be honest, at first i was quite intimidated by it, i read the documentation impatiently and I got nothing at first here’s a tip for myself and people who are reading this:
    To insert python control statements (if, elif, else, for loop, while loop) use % % eg.

    % for i in range(0, 6)
    can be anything
    % endfor

    if it is if statement, use %endif, I have not tried while loop yet hence i am not sure.
    If you need to insert a block of module calling first import the module using
    Screen Shot 2018-05-12 at 4.16.24 AM
    if you want to use the modules, classes or other non-control statement you need
    Screen Shot 2018-05-12 at 4.16.55 AM
    eg.
    Screen Shot 2018-05-12 at 4.15.31 AM.png
    The ${} is for variable, you should use this if it is out of % % or
    Screen Shot 2018-05-12 at 4.16.55 AM

  • Use a template engine makes your life easier, I had problems joining the xml documents pieces by pieces to be sustainable always use a template engine eg. Jinja2 or Mako.
  • Try out requests module if you have not, this is use for calling REST APIs.
  • Install Ipython! I use this to test my syntax before I actually write them down!
  • Use postman to test the output to have an idea of the requirement to open an access request.

Base xml template
Screen Shot 2018-05-12 at 4.14.30 AM

This is not a fully automated template. I think would be good if I make the namespace as variable.

Tufin SecureChange workflow
This is the workflow I designed for testing my script, the script submits request to step one, if it is not step one you need to do some adjustment on the template, the field names must follow exactly the same (including cases) as your workflow design as it will not work.

Screen Shot 2018-05-12 at 4.03.33 AM.png

Screen Shot 2018-05-12 at 4.03.53 AM.png

Screen Shot 2018-05-12 at 4.04.21 AM.png

rule.xlsx
This is an excel sheet which user will submit.
The code has to be adjusted to satisfy your need, learn about openpyxl before changing the code.
Screen Shot 2018-05-12 at 4.06.50 AM.png

Screen Shot 2018-05-12 at 4.07.01 AM.png

Screen Shot 2018-05-12 at 4.07.09 AM.png

The outcome
This is the outcome after the script reads the workbook and uses the datas to generate access request.
Screen Shot 2018-05-12 at 4.09.44 AM

The python code
Hardwork has been spent to work this out… the motivation for me to do another script in python because the invoke-restmethod command in powershell version 3 is buggy, very often I will encountered content length error which stops the script…

# For opening workbook from xlsx and csv
from openpyxl import load_workbook
# Easy to learn and use template engine, not limited to only html/xml can be any documents
from mako.template import Template
# A module easy to get the subnet mask bit, subnet mask, subnet and ip address
from netaddr import IPNetwork, IPAddress
# For creating xml document
import xml.etree.ElementTree as ET
# For calling REST APIs, very easy to learn and use
import requests
# For using the gethostbyname() method to resolve hostname to IP address
# gethostbyname() does not throw exception when the input is an ip address
# use with caution!
import socket
import re
from requests.packages.urllib3.exceptions import InsecureRequestWarning

# To suppress the certificate warning from the script output.
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# Credential for Tufin SC
USERNAME = 'cyrus'
PASSWORD = '121278'

# Reference: Regular Expression Cookbook by Steven Levithan; Jan Goyvaerts
# Published by O'Reilly Media, Inc., 2009
# 7.16. Matching IPv4 Addresses
IPV4_REGEX = "^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"


# use row=2 if you are using MS Excel, row=3 because I was using Mac Numbers.
row = 3
# Column numbers, you need to adjust according to your excel sheet format
source_id = 1
destination_id = 2
service_id = 3
action_id = 4
comment_id = 5
# Initialize all to zeroes
sources = []
destinations = []
services = []
source_blocks = []
destination_blocks = []
service_blocks = []
access_requests = []

# Use to accept the subject string from an external ticketing system
SUBJECT = "Test XML2"

# Collect sources, destinations and services
def get_rule_items(ws, row, column):
    items = []
    while ws.cell(row, column).value is not None:
        items.append(ws.cell(row, column).value)
        row += 1
    return items


# Create source, destination and service xml trees
def construct_rule_items_template(item, item_id):
    if item_id is source_id:
        if "-" in item:
            first_ip = item.split('-')[0]
            second_ip = item.split('-')[1]
            if re.match(IPV4_REGEX, first_ip) and re.match(IPV4_REGEX, second_ip):
                source_block = ET.Element('source')
                source_block.set("type", "RANGE")
                ET.SubElement(source_block, 'range_first_ip').text = first_ip
                ET.SubElement(source_block, 'range_last_ip').text = second_ip
                print(ET.tostring(source_block, encoding='utf-8').decode('utf-8'))
                print(item)
                return source_block
            else:
                try:
                    print(item)
                    answer = socket.gethostbyname(item)
                    source_block = ET.Element('source')
                    source_block.set("type", "IP")
                    ET.SubElement(source_block, 'ip_address').text = str(IPAddress(answer))
                    ET.SubElement(source_block, 'cidr').text = str(IPAddress(answer).netmask_bits())
                    print(ET.tostring(source_block, encoding='utf-8').decode('utf-8'))
                    return source_block
                except Exception as e:
                    print(e)
                    pass
        elif "/" in item:
            try:
                network_id = str(IPNetwork(item).ip)
                network_mask = str(IPNetwork(item).netmask)
                prefix_length = str(IPNetwork(item).prefixlen)
                source_block = ET.Element('source')
                source_block.set("type", "IP")
                ET.SubElement(source_block, 'ip_address').text = network_id
                ET.SubElement(source_block, 'netmask').text = network_mask
                ET.SubElement(source_block, 'cidr').text = prefix_length
                print(ET.tostring(source_block, encoding='utf-8').decode('utf-8'))
                return source_block
            except Exception as e:
                print(e)
                pass
        elif re.match(IPV4_REGEX, item):
            source_block = ET.Element('source')
            source_block.set("type", "IP")
            ET.SubElement(source_block, 'ip_address').text = str(IPAddress(item))
            ET.SubElement(source_block, 'cidr').text = str(IPAddress(item).netmask_bits())
            print(ET.tostring(source_block, encoding='utf-8').decode('utf-8'))
            return source_block
        else:
            answer = socket.gethostbyname(item)
            source_block = ET.Element('source')
            source_block.set("type", "IP")
            ET.SubElement(source_block, 'ip_address').text = str(IPAddress(answer))
            ET.SubElement(source_block, 'cidr').text = str(IPAddress(answer).netmask_bits())
            print(ET.tostring(source_block, encoding='utf-8').decode('utf-8'))
            return source_block
    elif item_id is destination_id:
        if "-" in item:
            first_ip = item.split('-')[0]
            second_ip = item.split('-')[1]
            if re.match(IPV4_REGEX, first_ip) and re.match(IPV4_REGEX, second_ip):
                destination_block = ET.Element('destination')
                destination_block.set("type", "RANGE")
                ET.SubElement(destination_block, 'range_first_ip').text = first_ip
                ET.SubElement(destination_block, 'range_last_ip').text = second_ip
                print(ET.tostring(destination_block, encoding='utf-8').decode('utf-8'))
                print(item)
                return destination_block
            else:
                try:
                    print(item)
                    answer = socket.gethostbyname(item)
                    destination_block = ET.Element('destination')
                    destination_block.set("type", "IP")
                    ET.SubElement(destination_block, 'ip_address').text = str(IPAddress(answer))
                    ET.SubElement(destination_block, 'cidr').text = str(IPAddress(answer).netmask_bits())
                    print(ET.tostring(destination_block, encoding='utf-8').decode('utf-8'))
                    return destination_block
                except Exception as e:
                    print(e)
                    pass
        elif "/" in item:
            try:
                network_id = str(IPNetwork(item).ip)
                network_mask = str(IPNetwork(item).netmask)
                prefix_length = str(IPNetwork(item).prefixlen)
                destination_block = ET.Element('destination')
                destination_block.set("type", "IP")
                ET.SubElement(destination_block, 'ip_address').text = network_id
                ET.SubElement(destination_block, 'netmask').text = network_mask
                ET.SubElement(destination_block, 'cidr').text = prefix_length
                print(ET.tostring(destination_block, encoding='utf-8').decode('utf-8'))
                return destination_block
            except Exception as e:
                print(e)
                pass
        elif re.match(IPV4_REGEX, item):
            destination_block = ET.Element('destination')
            destination_block.set("type", "IP")
            ET.SubElement(destination_block, 'ip_address').text = str(IPAddress(item))
            ET.SubElement(destination_block, 'cidr').text = str(IPAddress(item).netmask_bits())
            print(ET.tostring(destination_block, encoding='utf-8').decode('utf-8'))
            return destination_block
        else:
            answer = socket.gethostbyname(item)
            destination_block = ET.Element('destination')
            destination_block.set("type", "IP")
            ET.SubElement(destination_block, 'ip_address').text = str(IPAddress(answer))
            ET.SubElement(destination_block, 'cidr').text = str(IPAddress(answer).netmask_bits())
            print(ET.tostring(destination_block, encoding='utf-8').decode('utf-8'))
            return destination_block
    elif item_id is service_id:
        service_block = ET.Element('service')
        service_block.set("type", "PROTOCOL")
        if 'tcp' in item.lower():
            ET.SubElement(service_block, 'protocol').text = 'TCP'
            ET.SubElement(service_block, 'port').text = item.lower().split('tcp')[1]
            return service_block
        elif 'udp' in item.lower():
            ET.SubElement(service_block, 'protocol').text = 'UDP'
            ET.SubElement(service_block, 'port').text = item.lower().split('udp')[1]
            return service_block
        else:
            print("Error, the item is not supported.")
    else:
        print("Unknown item, not ip address, subnet or service.")


# Create the access request xml tree
def contruct_access_request_template():
    ar = ET.Element('access_request')
    ET.SubElement(ar, 'use_topology').text = 'true'
    targets = ET.Element('targets')
    target = ET.Element('target')
    target.set("type", "ANY")
    targets.append(target)
    ar.append(targets)
    src = ET.Element('sources')
    dst = ET.Element('destinations')
    svc = ET.Element('services')
    return {'ar': ar, 'sources': src, 'destinations': dst, 'services': svc}


# Prepare for REST API request session
def initialize_requests(username, password):
    tufin_sc = requests.session()
    tufin_sc.headers.update({'Content-type': 'application/xml'})
    tufin_sc.verify = False
    tufin_sc.auth = requests.auth.HTTPBasicAuth(username, password)
    return tufin_sc

# Open the MS Excel workbook
rulebook = load_workbook("rules.xlsx")

# Save the worksheet name for pointing the cells in the correct worksheet
rules = rulebook.sheetnames

if __name__ == '__main__':
    # Initialize the ar fields: source, destination, service
    field = {}
    # Start the template
    template = Template(filename='templates/base.xml')
    # On every worksheet do below
    for rule in rules:
        # Create access request xml for each worksheets
        field = contruct_access_request_template()
        # Assume no more access request if the next worksheet has no data on the data rows
        if rulebook[rule].cell(row, source_id).value is not None or rulebook[rule].cell(row, destination_id).value \
                is not None or rulebook[rule].cell(row, service_id).value is not None:
            # Collect source addresses, destination addresses and services
            sources = get_rule_items(rulebook[rule], row, source_id)
            destinations = get_rule_items(rulebook[rule], row, destination_id)
            services = get_rule_items(rulebook[rule], row, service_id)
            # Collect all source xml blocks
            for source in sources:
                source_blocks.append(construct_rule_items_template(source, source_id))
            # Collect all destination xml blocks
            for destination in destinations:
                destination_blocks.append(construct_rule_items_template(destination, destination_id))
            # Collect all service xml blocks
            for service in services:
                service_blocks.append(construct_rule_items_template(service, service_id))
            # Reconcile the source xml blocks to parent node - sources tag
            for source_block in source_blocks:
                field['sources'].append(source_block)
            # Reconcile the destination xml blocks to parent node - destinations tag
            for destination_block in destination_blocks:
                field['destinations'].append(destination_block)
            # Reconcile the service xml blocks to parent node - services tag
            for service_block in service_blocks:
                field['services'].append(service_block)
        # Reconcile all sources xml blocks, destinations xml blocks and services xml blocks
        # back to grand daddy access request
        field['ar'].append(field['sources'])
        field['ar'].append(field['destinations'])
        field['ar'].append(field['services'])
        # Each access request will have an action field, this one assume Accept.
        # Tufin SC can only do accept. Although Tufin SC has remove and deny option,
        # those are for documentation only, Tufin SC can only remove Cisco ACL (routers)
        # Tufin SC definitely cannot do deny rule.
        ET.SubElement(field['ar'], 'action').text = 'Accept'
        ET.SubElement(field['ar'], 'labels')
        # Initialize all for the next worksheet, to create fresh access request xml block
        sources = []
        destinations = []
        services = []
        source_blocks = []
        destination_blocks = []
        service_blocks = []
        # Collect access request xml blocks
        access_requests.append(ET.tostring(field['ar'], encoding='utf-8').decode('utf-8'))
    # Get the entire xml body from template
    body = template.render(SUBJECT=SUBJECT, access_requests=access_requests)
    # Can disable, for me to see the results for troubleshooting only.
    print(body)
    # Prepare the session
    tufin_sc = initialize_requests(USERNAME, PASSWORD)
    # Start calling the REST API POST method, the payload has to be encoded in utf-8
    # payload is byte type which will be sent over to Tufin SC
    try:
        response = tufin_sc.post("https://192.168.1.19/securechangeworkflow/api/securechange/tickets",
                                 data=body.encode('utf-8'))
        # Can disable, but for troubleshooting, the response from the server is clearer than postman's response.
        print(response.text)
    except Exception as e:
        print(response.text, e)
Posted in Python, Scripting | Tagged , , , , | Leave a comment

[tufin]Illegal processing instruction target, xml (case insensitive) is reserved by the specs. at [row,col {unknown-source}]: [2,5]

When I submit the xml body to Tufin SecureChange, Tufin’s response is xml (case insensitive) is reserved by the specs. at [row,col {unknown-source}]: [2,5] this is an exception threw by the Java app in tufin, the reason is because I put a declaration in my mako template, I remove the utf-8 declaration and submit the body with utf-8 the problem resolves.

I used postman to test the mako template output, postman only returned http 400 bad syntax, the reason was not as clear as the response received by requests module. Kudos to python’s requests module!

Referene: https://stackoverflow.com/questions/23758823/error-processing-incoming-soapmessage-with-xml-data, See helderdarocha’s answer.

Posted in Python, Scripting | Tagged , | Leave a comment

[python]Collecting values from xlsx

This script collects data from each column on each worksheet. The script has to be modified if used in real MS Excel.
Screen Shot 2018-05-10 at 10.16.35 PM.png
Screen Shot 2018-05-10 at 10.16.45 PM

from openpyxl import load_workbook

row = 3
source_id = 1
destination_id = 2
service_id = 3
sources = []
destinations = []
services = []


def get_rule_items(ws, row, column):
    items = []
    while ws.cell(row, column).value is not None:
        items.append(ws.cell(row, column).value)
        row += 1
    return items


rulebook = load_workbook("rules.xlsx")

rules = rulebook.sheetnames

for rule in rules:
    if rulebook[rule].cell(row, source_id).value is not None or rulebook[rule].cell(row, destination_id).value \
        is not None or rulebook[rule].cell(row, service_id).value is not None:

        sources = get_rule_items(rulebook[rule], row, source_id)
        destinations = get_rule_items(rulebook[rule], row, destination_id)
        services = get_rule_items(rulebook[rule], row, service_id)

        print(rule)
        print(sources, destinations, services)

This is the result:

Sheet 1
['192.168.1.1', '10.1.1.1'] ['10.1.1.1', '192.168.1.100'] ['tcp80']
Sheet 2
['1.1.1.1'] ['2.2.2.2'] ['Tcp80']

Posted in Python, Scripting | Tagged , , | Leave a comment

[python]Dynamically store key value pair from API response

This example code demonstrate to store device(key) and id(value) pair in a dictionary.

import requests
from bs4 import BeautifulSoup
USERNAME_SC = 'admin'
PASSWORD_SC = 'p@ssw0rd'
TUFIN_ST_API_BASE = "https://secure_track/securetrack/api/"
TUFIN_SC_API_BASE = "https://secure_change/securechangeworkflow/api/securechange/"
devices = {}
devices_id = []
i = 0
tufin_sc = requests.session()
tufin_sc.verify = False
tufin_sc.auth = requests.auth.HTTPBasicAuth(USERNAME_SC, PASSWORD_SC)
tufin_sc.headers.update({'Content-type': 'application/xml'})


response = tufin_sc.get(TUFIN_SC_API_BASE + "devices")
soup = BeautifulSoup(response.text, 'html.parser')
for id in soup.find_all('id'):
    devices_id.append(id.text)

for name in soup.find_all('name'):
    devices[name.text] = devices_id[i]
    i += 1

print(devices)
Posted in Python, Scripting | Tagged | Leave a comment

[python] caution on using socket module to check for hostname

I had a problem which socket threw an exception when i was testing whether an object is a hostname or not.

if socket.gethostbyname(source) and "-" in address:
.....
.....
.....

the error is this socket.gaierror: [errno 11004] getaddrinfo failed this is because if the source is not a hostname but is an ip address the testing of socket.gethostbyname() will still be true.

A solution is to check if source is not an ip address this is the code snippet:

def is_ipv4_address(address):
    if "-" in address:
        ip_addr = address.split("-")
        #print(ip_addr[0], ip_addr[1])
        if IPAddress(ip_addr[0]) and IPAddress(ip_addr[1]):
            return True
        else:
            return False
    elif "/" in address:
        if IPNetwork(address):
            return True
        else:
            return False
    elif IPAddress(address):
        return True
    else:
        return False

if not is_ipv4_address(source) and "-" in source:
            # print(str(is_ipv4_address(source)), source)
            try:
                answers.append(socket.gethostbyname(source))
            except Exception as e:
                logging.error("Unknown element in the request, ignore...", e)
                pass
            source_address_field.append(collect_resolved_ip_addr(answers))
Posted in Python, Scripting | Tagged | Leave a comment