[python]Multithreading to scan for port status

Background
I have noticed that previously my script is a one to one, meaning the scripts I wrote do specific things to a single target, this will be very impractical if the targets are more than one and requires to do the same function. Hence I am exploring into threading.

Threading
My first use of threading was importing the threading.Thread module, then create a function which I need to be executed through individual threads, at first I created a Thread object, then I pass in the function with arguments, and start the threat, if the iteration is between tcp port 1 and tcp port 100 it is fine for the first few hundreds of ip address however runtime error started when no more further threads can be created.

Join method of Thread class
So after studying and research I realize I cannot just fire threads and expect the threads to be executed without problem, the join() method of Thread class allows all threads to finish execution then start the main thread again, during the join() method, the main thread will wait for the thread to finish execution.

I do not know how to retrieve results from function executed by threads
After I used the join() method the script can port scan for each ip address from two subnets between tcp 1 and tcp 1024, it executed until finish which took 17minutes (much slower than nmap), however this is not what i need, i need the thread worker to help me collect values, however I do not know how to…

Scripts that execute port status scan for unknown number of subnets
The port range has to be specified, the subnet is determined from the network_discovery.py, the scanner.py scan for port with specific min and max ports, the start_now.py runs the entire script for testing.

Code for network_discovery.py

import netifaces
from ipaddress import IPv4Interface
import re

pattern = r'127\.\d+\.\d+\.\d+/\d+'
loopback_regex = re.compile(pattern)

def collect_interface_ip_addresses(interface):
    return {'intf_id': interface,
            'ip_address': netifaces.ifaddresses(interface).get(2)[0]['addr'],
            'netmask': netifaces.ifaddresses(interface).get(2)[0]['netmask']}


def netmask_to_cidr(netmask):
    return (sum(bin(int(octet)).count('1') for octet in netmask.split('.')))


def get_host_network():
    host_collection = []
    host_network_collection = []
    for interface in netifaces.interfaces():
        host_collection.append(collect_interface_ip_addresses(interface))
    for host in host_collection:
        host['netmask'] = str(netmask_to_cidr(host['netmask']))
        # only interested in valid and non-loopback address
        if not loopback_regex.match(host['ip_address'] + "/" + host['netmask']):
            host_network_collection.append(str(IPv4Interface(host['ip_address'] + "/" + host['netmask']).network))
    return host_network_collection

scanner.py

from socket import *
from threading import *


threads = []
def connect_tcp_host(host, port):
    with socket(AF_INET, SOCK_STREAM) as sock:
        if sock.connect_ex((host, port)):
            print("{}: {}/tcp closed".format(host, port))
        else:
            print("{}: {}/tcp open".format(host, port))


def scan_tcp_ports(host, min_port, max_port):
    setdefaulttimeout(2)
    for port in range(min_port, max_port):
        port_thread = Thread(target=connect_tcp_host, args=(host, port))
        port_thread.start()
        threads.append(port_thread)

    for thread in threads:
        thread.join()

start_now.py

from network_discovery import get_host_network
from time import time
from scanner import scan_tcp_ports
from ipaddress import IPv4Network


start_time = time()
if __name__ == '__main__':
    result_bucket = []
    for subnet in get_host_network():
        for addr in IPv4Network(subnet).hosts():
            scan_tcp_ports(str(addr), 443, 1025)

    print("{} seconds to complete.".format(time() - start_time))
Advertisement

5 thoughts on “[python]Multithreading to scan for port status

  1. I usually use concurrent.futures module which is basically a wrapper on top of threading and multiprocessing modules but allows you to do something like this:
    “`
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from pprint import pprint
    from datetime import datetime
    import time
    from itertools import repeat

    import yaml
    from netmiko import ConnectHandler

    start_msg = ‘===> {} Connection to device: {}’
    received_msg = ‘<=== {} Received result from device: {}'

    def connect_ssh(device_dict, command):
    print(start_msg.format(datetime.now().time(), device_dict['ip']))
    if device_dict['ip'] == '192.168.100.1':
    time.sleep(10)
    with ConnectHandler(**device_dict) as ssh:
    ssh.enable()
    result = ssh.send_command(command)
    print(received_msg.format(datetime.now().time(), device_dict['ip']))
    return {device_dict['ip']: result}

    def threads_conn(function, devices, limit=2, command=''):
    all_results = {}
    with ThreadPoolExecutor(max_workers=limit) as executor:
    future_ssh = []
    for device in devices:
    future = executor.submit(function, device, command)
    future_ssh.append(future)
    print('Future: {} for device {}'.format(future, device['ip']))
    for f in as_completed(future_ssh):
    result = f.result()
    print('Future done {}'.format(f))
    all_results.update(result)
    return all_results

    if __name__ == '__main__':
    devices = yaml.load(open('devices.yaml'))
    all_done = threads_conn(connect_ssh,
    devices['routers'],
    command='sh clock')
    pprint(all_done)

    “`

  2. I usually use concurrent.futures module which is basically a wrapper on top of threading and multiprocessing modules but allows you to do something like this:
    “`
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from pprint import pprint
    from datetime import datetime
    import time
    from itertools import repeat

    import yaml
    from netmiko import ConnectHandler

    start_msg = ‘===> {} Connection to device: {}’
    received_msg = ‘<=== {} Received result from device: {}'

    def connect_ssh(device_dict, command):
    print(start_msg.format(datetime.now().time(), device_dict['ip']))
    if device_dict['ip'] == '192.168.100.1':
    time.sleep(10)
    with ConnectHandler(**device_dict) as ssh:
    ssh.enable()
    result = ssh.send_command(command)
    print(received_msg.format(datetime.now().time(), device_dict['ip']))
    return {device_dict['ip']: result}

    def threads_conn(function, devices, limit=2, command=''):
    all_results = {}
    with ThreadPoolExecutor(max_workers=limit) as executor:
    future_ssh = []
    for device in devices:
    future = executor.submit(function, device, command)
    future_ssh.append(future)
    print('Future: {} for device {}'.format(future, device['ip']))
    for f in as_completed(future_ssh):
    result = f.result()
    print('Future done {}'.format(f))
    all_results.update(result)
    return all_results

    if __name__ == '__main__':
    devices = yaml.load(open('devices.yaml'))
    all_done = threads_conn(connect_ssh,
    devices['routers'],
    command='sh clock')
    pprint(all_done)

    “`

  3. Hi, thanks. The ThreatPoolExecutor class has a max_workers argument, this is for defining the number of threads? I will try with a specific range with the ThreatPoolExecutor class. Thanks!

  4. I have instead import a multiprocessing.pool.Threatpool class to help me return values from function that the thread is running, i just found out that thread cannot help to pass values when executing a function…

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s