Scan every possible port in a host using Python

I am writing a program that should scan all 65535 ports in a host that are looking for those that are open. This is what I have so far and it works, but it produces different results every time I run the script , why is this happening?

def check_open_port(host, port): s = socket.socket() s.settimeout(0.1) # the SO_REUSEADDR flag tells the kernel to reuse a local # socket in TIME_WAIT state, without waiting for its natural # timeout to expire. s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: code = s.connect_ex((host, port)) s.close() if code == 0: return True else: return False except socket.error: return False def get_open_ports(host, max_port=65535): open_ports = [] def worker(port): if check_open_port(host, port): open_ports.append(port) pool = ThreadPoolExecutor(max_workers=10000) [pool.submit(worker, port) for port in range(1, max_port + 1)] pool.shutdown(wait=True) return open_ports 

For example, on a host with ports 22, 80, and 443, I sometimes get this answer:

[22, 80]

and sometimes I get:

[22, 80, 443]

or even:

[22]

Hosts with more open ports create more combinations.

I played with max_workers and settimeout() values, but I can't get it to work well. The only time it worked was without using threads, but obviously it took a long time to complete, I have to use them.

Am I missing something? Is there any other way to implement this?

+5
source share
3 answers

2 questions:

  • I'm missing something
  • Is there any other way to implement this?

Did I miss something

I think it is worth checking the error codes here:

 if code == 0: return True else: return False 

considering you are trying to start a pool ! 10K all kinds of errors may occur, i.e. Reach some restrictions on the system / your users (check ulimit -a ), and you will consider such errors as a closed port without notification. This may explain the erratic results that you experience.

By the way, on my MacBook, the results are consistent (check on my live server on VPS hosting)

Id also selects fewer threads - 10K is an excess. For example, here are the default values ​​suggested in python docs :

Changed in version 3.5: If max_workers is missing or not set, it will default to the number of processors on the machine multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I / O of the CPU, and the number of workers should be higher than the number of workers for ProcessPoolExecutor

Is there any other way to implement this?

First of all, there is no need to use threads/processes - non-blocking sockets + event multiplexers , for example epoll exist for many years, so you can leave without additional threads / processed things.

The connect / close method is also suboptimal because you just need to check if the port is open or not - you do not need a full TCP connection here.

In the simplest case, you just need to send the SYN segment and verify that the server will respond.

Heres a good article with dozens of methods using scapy

Scapy is a powerful interactive batch processing program. it is an opportunity to fake or decode packets from a large number of protocols, send them on a wire, capture them, respond to requests and answers, and much more. It can easily handle most of the classic tasks like scanning, tracing, probing, unit tests, attacks or network discovery (this can replace hping, 85% nmap, arpspoof, arp-sk, arping, tcpdump, tethereal, p0f, etc. d.).

Here is one of the method descriptions ("TCP connect scan"):

The client sends the first handshake using the SYN flag and the port to connect to the server in the TCP packet. If the server responds with RST instead of SYN-ACK, then this port closes to the Server

And another method ("Scan TCP Stack"):

This method is similar to checking the TCP connection. The client sends a TCP packet with the SYN flag set and the port number to connect to. If the port is open, the server responds with the SYN and ACK flags inside the TCP packet. But this time, the client sends the RST flag to TCP, not RST + ACK, which was the case with the TCP scan connection. This method is used to prevent port scan detection through firewalls.

Of course, if you just want to play with sockets / streams, your approach will also be fine even without pcap / scapy

+7
source

I tried my code on a jupyter laptop and I always get the same set of ports:

 get_open_ports('127.0.0.1') 

Output:

[ 133, 200, 144...60700]

Is it possible that a different number of ports are open at a specific time for a host request?

To check for a small set of ports, I reduced max_port to 10000 , and I still get the same set of ports every time:

 def get_open_ports(host, max_port=10000): open_ports = [] def worker(port): if check_open_port(host, port): open_ports.append(port) with ThreadPoolExecutor(max_workers=10000) as executor: [executor.submit(worker, port) for port in range(1, max_port + 1)] executor.shutdown(wait=True) return open_ports get_open_ports('127.0.0.1') 

Yield: [150, 900, 1035, 7789]

Note. I changed the port numbers for security.

EDIT:

 def get_open_ports(host, max_port=65535): open_ports = [] def worker(port): if check_open_port(host, port): open_ports.append(port) # We can use a with statement to ensure threads are cleaned up promptly with ThreadPoolExecutor(max_workers=100) as executor: print('main:starting') wait_for=[executor.submit(worker,port) for port in range(1, max_port + 1)] for f in as_completed(wait_for): print('main: result: {}'.format(f.result())) #check result on each thread execution # executor.shutdown(wait=True) #not required when using the 'with' statement return len(open_ports) test = get_open_ports('45.60.112.163') #hostname for www.indracompany.com #max_workers not defined & max_port=10000 # len(test) #test1: 148 # len(test) #test 2: 79 #max_workers = 10000 & max_port=65535 # len(test) #test1: 1 # len(test) #test2:1 # len(test) #test3:1 #max_workers = 20000 & max_port=65535 # len(test) #test1: 14 # len(test) #test2:1 # len(test) #test3: 1 # len(test) #test4:1 #max_workers not defined & max_port=65535 #quite time-consuming # len(test) #test1: 63 

EDIT 2: More Reliable Solution

As @Tarun suggested, the python-nmap Python library does a better job when scanning hosts.

The solution below gives an accurate result, but I did see a significant trade-off in terms of performance as the port detection range increased. Perhaps threads can be included in the code to improve performance. I also imported a time library to get the program execution time at the end. This can be used for comparison when testing performance.

 # The python-nmap library helps to programmatically manipulate scanned results of nmap to automate port scanning tasks. # To use this library you must have the Nmap software installed. This can be installed from https://nmap.org/download.html. # Network Mapper (Nmap) is a free and open-source tool used for network discovery and security auditing. # It runs on all major computer operating systems, and official binary packages are available for Linux, Windows, and Mac OS X. # For Windows 7 and later, you must also upgrade 'NCap' from https://nmap.org/npcap/ # For Windows, make sure nmap.exe is added to PATH. # When you're ready, pip install python-nmap import time import nmap nm = nmap.PortScanner() #initialize PortScanner object host = '45.60.112.163' #specify host nm.scan(host, '1-100') #run the scan, specify host and range of ports to scan #Optional steps for verification: #Output: nmap -oX - -p 1-100 -sV 45.60.112.163 print(nm.command_line()) #command_line command to execute on nmap command prompt #Output: {'tcp': {'method': 'syn', 'services': '1-100'}} print(nm.scaninfo()) #nmap scan information #Now we can scan all hosts #From Official documentation at https://xael.org/pages/python-nmap-en.html start_time = time.time() #To get program execution time for host in nm.all_hosts(): print('----------------------------------------------------') print('Host : %s (%s)' % (host, nm[host].hostname())) print('State : %s' % nm[host].state()) for proto in nm[host].all_protocols(): print('----------') print('Protocol : %s' % proto) lport = nm[host][proto].keys() for key in sorted(lport): for port in lport: print ('port : %s\tstate : %s' % (port, nm[host][proto][port]['state'])) print('Execution time: %s seconds' % (time.time() - start_time)) #Output: ---------------------------------------------------- Host : 45.60.112.163 () State : up ---------- Protocol : tcp port : 25 state : open port : 51 state : open port : 53 state : open port : 80 state : open port : 81 state : open port : 85 state : open port : 91 state : open port : 25 state : open port : 51 state : open port : 53 state : open port : 80 state : open port : 81 state : open port : 85 state : open port : 91 state : open port : 25 state : open port : 51 state : open port : 53 state : open port : 80 state : open port : 81 state : open port : 85 state : open port : 91 state : open port : 25 state : open port : 51 state : open port : 53 state : open port : 80 state : open port : 81 state : open port : 85 state : open port : 91 state : open port : 25 state : open port : 51 state : open port : 53 state : open port : 80 state : open port : 81 state : open port : 85 state : open port : 91 state : open port : 25 state : open port : 51 state : open port : 53 state : open port : 80 state : open port : 81 state : open port : 85 state : open port : 91 state : open port : 25 state : open port : 51 state : open port : 53 state : open port : 80 state : open port : 81 state : open port : 85 state : open port : 91 state : open Execution time: 0.015624761581420898 seconds 

To convert the output to csv, use:

 print(nm.csv()) 

As a result of this research, Nmap was now installed on my computer. Just for fun, I also checked the command line check using the command below. This scan was performed for the range β€œ1-1000” and took more than 15 minutes (I did not sit for the whole session!).

enter image description here

+5
source

I came across this port scanner at the time, working with sockets in python ...

 import socket import threading from queue import * print_lock = threading.Lock() target = input("Enter websit or IP Adress to scan: ") minPort = int(input("Enter minimum Port to scan (1 is the smallest): ")) maxPort = int(input("Enter maximum Port to scan: ")) threadNo = int(input("Enter No. of threads to use(500 is a good all around number): ")) def portscan(port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: con = s.connect((target, port)) with print_lock: print("Port", port, "is open!") con.close() except: pass def threader(): while True: worker = q.get() portscan(worker) q.task_done() q = Queue() for x in range(threadNo): t = threading.Thread(target=threader) t.daemon = True t.start() for worker in range (minPort, maxPort): q.put(worker) q.join() 

It works quite well and can be easily adapted :)

+1
source

Source: https://habr.com/ru/post/1275721/


All Articles