Background
This is an upgrade scripts to check for open ports on every addresses found in subnets. The upgrade uses Processpoolexecutor
, a minor re-write uses the processpoolexecutor
and theadpoolexecutor
together.
network_discovery.py – obtains directly connected subnets
import netifaces from ipaddress import IPv4Interface import re pattern = r'127\.\d+\.\d+\.\d+/\d+' loopback_regex = re.compile(pattern) def collect_interface_ip_addresses(interface): if netifaces.ifaddresses(interface).get(2): return {'intf_id': interface, 'ip_address': netifaces.ifaddresses(interface).get(2)[0]['addr'], 'netmask': netifaces.ifaddresses(interface).get(2)[0]['netmask']} def netmask_to_cidr(netmask): return (sum(bin(int(octet)).count('1') for octet in netmask.split('.'))) def get_host_network(): host_collection = [] host_network_collection = [] for interface in netifaces.interfaces(): host_collection.append(collect_interface_ip_addresses(interface)) for host in host_collection: if host: host['netmask'] = str(netmask_to_cidr(host['netmask'])) if not loopback_regex.match(host['ip_address'] + "/" + host['netmask']): host_network_collection.append(str(IPv4Interface(host['ip_address'] + "/" + host['netmask']).network)) return host_network_collection
scanner.py
This script does the port scanning based on the subnets obtained from network_discovery.py.
from socket import * from concurrent.futures import as_completed, ThreadPoolExecutor, ProcessPoolExecutor from network_discovery import get_host_network from ipaddress import IPv4Network hosts = [] setdefaulttimeout(2) def connect_tcp_host(host, port): with socket(AF_INET, SOCK_STREAM) as sock: if not sock.connect_ex((host, port)): #print(f"{host} {port}/tcp open") return port def start_scan_per_addr(addr, minport, maxport): with ThreadPoolExecutor(max_workers=(maxport - minport)) as ex: port_list = [] futures = [ex.submit(connect_tcp_host, str(addr), port) for port in range(minport, maxport)] for future in as_completed(futures): print(f"{addr}: {future.result()}") if future.result(): port_list.append(future.result()) return port_list def start_scan(minport, maxport): with ProcessPoolExecutor() as ex: for subnet in get_host_network(): for addr in IPv4Network(subnet).hosts(): futures = ex.submit(start_scan_per_addr, addr, minport, maxport) hosts.append( { 'host': str(addr), 'ports': [port for port in futures.result()] } ) return [sorted_host for sorted_host in hosts if sorted_host.get('ports')]
start_now.py – main script to test the scanner.py
from scanner import start_scan from time import time from pprint import pprint start_time = time() if __name__ == '__main__': pprint(start_scan(1,1025)) print("the script took {} seconds to complete.".format(time() - start_time))
Results
Processes for python were spawned.
The final results after 1024 ports were scanned, this is still slower than nmap.
This is done by nmap that scans against metasploitable, this is a check on the ports collected are accurate or not.