导语欢迎关注我的编程栈,在这里你将循序渐进的学到python相关知识,如:爬虫、web框架,数据库,web前端等。不需要你转发+评论什么的发资料,我也没有什么资料给你,你跟着文章敲代码即可。我目前是51CTO微职位辅导老师,如果您想系统的学习也可以联系我,当然,你也可以忽视这条广告如果你觉得文章对你有帮助:感谢关注我。我们常常说python无所不能,今天教大家使用多线程写一个端口扫描工具依赖socket或telnetlib,程序默认使用telnetlib,可选socket知识点:多线程、网络、简单正则、另一种错误处理(见check_ip方法)废话不多说,上代码import socket,threading,re
import telnetlib
# python的内置模块telnetlib也可以完成端口检测任务
# 强大的第三方包python-nmap也能完成
from concurrent.futures import ThreadPoolExecutor,wait
class CheckPort(object):
 ”’
 检查网段端口
 ”’
 def __init__(self,conn_type=’telent’,scan_type=’custom’):
 ”’
 conn_type 扫描时,连接端口的方式,可选:socket telent
 scan_type 扫描方式,custom 自定义(自行输入网段和端口),auto(输入网段后,自动扫描常用端口)
 ”’
 self.open_port_dict = {} # 保存扫描到的开放端口
 if scan_type == ‘custom':
 self.check_port_list = []
 elif scan_type == ‘auto':
 self.check_port_list = [21,22,23,25,42,69,79,80,88,110,113,119,135,220,389,443,636,993,1433,4000,8000,8080]
 elif scan_type == ‘easy':
 self.check_port_list = [21,22,23,80,443,1433,4000,8000,8080]
 else:
 print(f'[405]{self.errmsg(405)}’)
 exit()
 if conn_type == ‘telent':
 self.scan_func = self.port_by_telent
 elif conn_type == ‘socket':
 self.scan_func = self.port_by_socket
 else:
 print(f'[404]{self.errmsg(404)}’)
 exit()
 self.scan_type = scan_type
 self.star_ip = ”
 self.end_ip = ”
 self.ip_list = []
 self.input_ip()
 def input_ip(self):
 flag = True
 while flag:
 self.star_ip = input(‘请输入网段起始IP:’)
 if self.star_ip.upper() == ‘Q':
 exit()
 self.end_ip = input(‘请输入网段结束IP:’)
 if self.end_ip.upper() == ‘Q':
 exit()
 check = self.check_ip(self.star_ip,self.end_ip)
 if check == 0:
 flag = False
 if self.scan_type == ‘custom':
 self.input_port()
 else:
 print(f'[{check}]{self.errmsg(check)}’)
 def input_port(self):
 flag = True
 while flag:
 port = input(‘请输入要检查的端口:’).strip()
 port = port.replace(‘ ‘,’,’)
 port = port.replace(‘,’,’,’)
 port_list = self.re_port(port.split(‘,’))
 if port_list == 203:
 continue
 else:
 self.check_port_list = port_list
 flag = False
 def re_port(self,port_list):
 temp_port = []
 for port in port_list:
 if port.strip().isdigit():
 if int(port.strip()) < 65535 and int(port.strip()) > 1:
 temp_port.append(int(port.strip()))
 else:
 print(f'[201]{self.errmsg(201)}-去除[{port}]端口检测’)
 else:
 print(f'[202]{self.errmsg(202)}-去除[{port}]端口检测’)
 if len(temp_port) < 1:
 res = 203
 else:
 res = temp_port
 return res
 def check_ip(self,star_ip,end_ip):
 ”’
 检查IP格式和网段是否正确
 @return errcode
 ”’
 errcode = 0
 pattern = “^(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|[1-9])\\.(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)\\.(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)\\.(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)$”
 if re.match(pattern,star_ip) == None:
 errcode = 100
 return errcode
 if re.match(pattern,end_ip) == None:
 errcode = 100
 return errcode
 star_seg = star_ip.split(‘.’)
 end_seg = end_ip.split(‘.’)
 if star_seg[0] == end_seg[0] and star_seg[1] == end_seg[1] and star_seg[2] == end_seg[2]:
 self.ip_list = [f'{star_seg[0]}.{star_seg[1]}.{star_seg[2]}’,int(star_seg[3]),int(end_seg[3])]
 return errcode
 else:
 errcode = 101
 return errcode
 def port_by_socket(self,ip,port):
 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 # s.timeout = 5 # 设置超时
 try:
 s.connect((ip,port))
 except Exception as e:
 print(f'[{ip}]:[{port}]未开启’)
 # print(e)
 else:
 print(f'{ip}:{port} 开启’)
 self.open_port_dict[ip].append(port)
 finally:
 s.close()
 def port_by_telent(self,ip,port):
 server = telnetlib.Telnet() # 创建一个Telnet对象
 try:
 server.open(ip,port) # 利用Telnet对象的open方法进行tcp链接
 print(f'{ip}:{port} 开启’)
 self.open_port_dict[ip].append(port)
 except Exception as err:
 print(f'[{ip}]:[{port}]未开启’)
 finally:
 server.close()
 def multi_run(self,threads = 5):
 ”’
 threads 线程数量
 ”’
 pool = ThreadPoolExecutor(threads)
 futures = []
 for i in range(self.ip_list[1],self.ip_list[2] + 1):
 ip = f'{self.ip_list[0]}.{i}’
 self.open_port_dict[ip] = []
 for port in self.check_port_list:
 f = pool.submit(self.scan_func,ip,port)
 futures.append(f)
 # print(f)
 wait(futures,timeout=None,return_when=’ALL_COMPLETED’)
 # self.show_port()
 def show_port(self):
 print(self.open_port_dict)
 for key in self.open_port_dict:
 print(f'{key}开放如下端口’)
 for port in self.open_port_dict[key]:
 print(port)
 def errmsg(self,errcode):
 err_dict = {
 0:’ok’,
 100:’IP格式错误’,
 101:’IP网段错误’,
 201:’超出端口范围’,
 202:’端口格式错误,应为整数’,
 203:’无可检测端口,请重新输入’,
 404:’扫描方式错误’,
 405:’扫描类型错误’
 }
 return err_dict[errcode]
cp = CheckPort(conn_type=’socket’) # 实例化工具
cp.multi_run() # 多线程扫描
cp.show_port()
结语到这里,如果你认真阅读这篇文章,且认真的自己敲了一遍,你已经学会了一个python高级脚本啦看不懂?没关系,我的系列教程还没开始讲多线程的东西,这篇是给基础稍好的同学热热手,持续关注我,你会学到更多哦如果你觉得我的文章对你有用,请关注+收藏+分享哦你有python学习过程中的问题可以评论留言或私信我共同探讨~