Background
I have noticed that previously my script is a one to one, meaning the scripts I wrote do specific things to a single target, this will be very impractical if the targets are more than one and requires to do the same function. Hence I am exploring into threading.
Threading
My first use of threading was importing the threading.Thread module, then create a function which I need to be executed through individual threads, at first I created a Thread object, then I pass in the function with arguments, and start the threat, if the iteration is between tcp port 1 and tcp port 100 it is fine for the first few hundreds of ip address however runtime error started when no more further threads can be created.
Join method of Thread class
So after studying and research I realize I cannot just fire threads and expect the threads to be executed without problem, the join() method of Thread class allows all threads to finish execution then start the main thread again, during the join() method, the main thread will wait for the thread to finish execution.
I do not know how to retrieve results from function executed by threads
After I used the join() method the script can port scan for each ip address from two subnets between tcp 1 and tcp 1024, it executed until finish which took 17minutes (much slower than nmap), however this is not what i need, i need the thread worker to help me collect values, however I do not know how to…
Scripts that execute port status scan for unknown number of subnets
The port range has to be specified, the subnet is determined from the network_discovery.py, the scanner.py scan for port with specific min and max ports, the start_now.py runs the entire script for testing.
Code for network_discovery.py
import netifaces from ipaddress import IPv4Interface import re pattern = r'127\.\d+\.\d+\.\d+/\d+' loopback_regex = re.compile(pattern) def collect_interface_ip_addresses(interface): return {'intf_id': interface, 'ip_address': netifaces.ifaddresses(interface).get(2)[0]['addr'], 'netmask': netifaces.ifaddresses(interface).get(2)[0]['netmask']} def netmask_to_cidr(netmask): return (sum(bin(int(octet)).count('1') for octet in netmask.split('.'))) def get_host_network(): host_collection = [] host_network_collection = [] for interface in netifaces.interfaces(): host_collection.append(collect_interface_ip_addresses(interface)) for host in host_collection: host['netmask'] = str(netmask_to_cidr(host['netmask'])) # only interested in valid and non-loopback address if not loopback_regex.match(host['ip_address'] + "/" + host['netmask']): host_network_collection.append(str(IPv4Interface(host['ip_address'] + "/" + host['netmask']).network)) return host_network_collection
scanner.py
from socket import * from threading import * threads = [] def connect_tcp_host(host, port): with socket(AF_INET, SOCK_STREAM) as sock: if sock.connect_ex((host, port)): print("{}: {}/tcp closed".format(host, port)) else: print("{}: {}/tcp open".format(host, port)) def scan_tcp_ports(host, min_port, max_port): setdefaulttimeout(2) for port in range(min_port, max_port): port_thread = Thread(target=connect_tcp_host, args=(host, port)) port_thread.start() threads.append(port_thread) for thread in threads: thread.join()
start_now.py
from network_discovery import get_host_network from time import time from scanner import scan_tcp_ports from ipaddress import IPv4Network start_time = time() if __name__ == '__main__': result_bucket = [] for subnet in get_host_network(): for addr in IPv4Network(subnet).hosts(): scan_tcp_ports(str(addr), 443, 1025) print("{} seconds to complete.".format(time() - start_time))
I usually use concurrent.futures module which is basically a wrapper on top of threading and multiprocessing modules but allows you to do something like this:
“`
from concurrent.futures import ThreadPoolExecutor, as_completed
from pprint import pprint
from datetime import datetime
import time
from itertools import repeat
import yaml
from netmiko import ConnectHandler
start_msg = ‘===> {} Connection to device: {}’
received_msg = ‘<=== {} Received result from device: {}'
def connect_ssh(device_dict, command):
print(start_msg.format(datetime.now().time(), device_dict['ip']))
if device_dict['ip'] == '192.168.100.1':
time.sleep(10)
with ConnectHandler(**device_dict) as ssh:
ssh.enable()
result = ssh.send_command(command)
print(received_msg.format(datetime.now().time(), device_dict['ip']))
return {device_dict['ip']: result}
def threads_conn(function, devices, limit=2, command=''):
all_results = {}
with ThreadPoolExecutor(max_workers=limit) as executor:
future_ssh = []
for device in devices:
future = executor.submit(function, device, command)
future_ssh.append(future)
print('Future: {} for device {}'.format(future, device['ip']))
for f in as_completed(future_ssh):
result = f.result()
print('Future done {}'.format(f))
all_results.update(result)
return all_results
if __name__ == '__main__':
devices = yaml.load(open('devices.yaml'))
all_done = threads_conn(connect_ssh,
devices['routers'],
command='sh clock')
pprint(all_done)
“`
I usually use concurrent.futures module which is basically a wrapper on top of threading and multiprocessing modules but allows you to do something like this:
“`
from concurrent.futures import ThreadPoolExecutor, as_completed
from pprint import pprint
from datetime import datetime
import time
from itertools import repeat
import yaml
from netmiko import ConnectHandler
start_msg = ‘===> {} Connection to device: {}’
received_msg = ‘<=== {} Received result from device: {}'
def connect_ssh(device_dict, command):
print(start_msg.format(datetime.now().time(), device_dict['ip']))
if device_dict['ip'] == '192.168.100.1':
time.sleep(10)
with ConnectHandler(**device_dict) as ssh:
ssh.enable()
result = ssh.send_command(command)
print(received_msg.format(datetime.now().time(), device_dict['ip']))
return {device_dict['ip']: result}
def threads_conn(function, devices, limit=2, command=''):
all_results = {}
with ThreadPoolExecutor(max_workers=limit) as executor:
future_ssh = []
for device in devices:
future = executor.submit(function, device, command)
future_ssh.append(future)
print('Future: {} for device {}'.format(future, device['ip']))
for f in as_completed(future_ssh):
result = f.result()
print('Future done {}'.format(f))
all_results.update(result)
return all_results
if __name__ == '__main__':
devices = yaml.load(open('devices.yaml'))
all_done = threads_conn(connect_ssh,
devices['routers'],
command='sh clock')
pprint(all_done)
“`
Hi, thanks. The ThreatPoolExecutor class has a max_workers argument, this is for defining the number of threads? I will try with a specific range with the ThreatPoolExecutor class. Thanks!
That is correct and I see you use it in the new post =)
I have instead import a multiprocessing.pool.Threatpool class to help me return values from function that the thread is running, i just found out that thread cannot help to pass values when executing a function…