信息收集ip测活-Python脚本编写
编写一个python脚本,下面为实现的功能点:
文件读取与IP提取:读取指定文件夹下的所有txt文件,提取其中的IP地址。
端口号添加:为每个IP地址添加多个指定的端口号。
测活功能:使用socket库进行连接测试,判断IP:端口号是否存活。
多个验证URL,包含国内和国外常见网站,用于更严格地验证代理IP是否可用,
结果保存:将存活的IP:端口号组合保存到新的txt文件中。
测活过程展示:在测活过程中,实时展示每一条测活的结果。
强制终止:设置一个信号处理机制,允许用户强制终止脚本。
多线程/多进程:使用多线程或多进程技术提高并发处理能力。
结果保存:将存活的IP:端口号组合保存到新的txt文件中。
文件读取与IP提取
首先,我们需要读取指定文件夹下的所有txt文件,并从中提取IP地址。可以使用Python的os模块来遍历文件夹,re模块来提取IP地址。
import os
import re
def extract_ips_from_files(folder_path):
ip_pattern = re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')
ips = []
for filename in os.listdir(folder_path):
if filename.endswith('.txt'):
with open(os.path.join(folder_path, filename), 'r') as file:
content = file.read()
ips.extend(ip_pattern.findall(content))
return ips
端口号添加
为每个IP地址添加多个指定的端口号。
def add_ports_to_ips(ips, ports):
ip_port_pairs = [(ip, port) for ip in ips for port in ports]
return ip_port_pairs
测活功能
使用socket库进行连接测试,判断IP:端口号是否存活。
import socket
def is_ip_port_alive(ip, port):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(2)
s.connect((ip, port))
return True
except (socket.timeout, ConnectionRefusedError):
return False
多个验证URL
包含国内和国外常见网站,用于更严格地验证代理IP是否可用。
import requests
def validate_proxy_with_url(ip, port, url):
proxies = {
'http': f'http://{ip}:{port}',
'https': f'http://{ip}:{port}'
}
try:
response = requests.get(url, proxies=proxies, timeout=5)
return response.status_code == 200
except requests.RequestException:
return False
结果保存
将存活的IP:端口号组合保存到新的txt文件中。
python
复制
def save_results(results, output_file):
with open(output_file, 'w') as file:
for ip, port in results:
file.write(f'{ip}:{port}\n')
测活过程展示
在测活过程中,实时展示每一条测活的结果。
def display_live_results(ip, port, is_alive):
status = 'Alive' if is_alive else 'Dead'
print(f'Testing {ip}:{port} - {status}')
强制终止
设置一个信号处理机制,允许用户强制终止脚本。
import signal
import sys
def signal_handler(sig, frame):
print('Script terminated by user.')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
多线程/多进程
使用多线程或多进程技术提高并发处理能力。
from concurrent.futures import ThreadPoolExecutor
def test_ip_port_pairs(ip_port_pairs, urls):
alive_pairs = []
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for ip, port in ip_port_pairs:
futures.append(executor.submit(is_ip_port_alive, ip, port))
for (ip, port), future in zip(ip_port_pairs, futures):
is_alive = future.result()
display_live_results(ip, port, is_alive)
if is_alive:
for url in urls:
if validate_proxy_with_url(ip, port, url):
alive_pairs.append((ip, port))
break
return alive_pairs
主函数
将以上功能整合到主函数中。
def main():
folder_path = 'path/to/your/folder'
output_file = 'alive_proxies.txt'
ports = [80, 443, 8080]
urls = ['http://www.google.com', 'http://www.baidu.com']
ips = extract_ips_from_files(folder_path)
ip_port_pairs = add_ports_to_ips(ips, ports)
alive_pairs = test_ip_port_pairs(ip_port_pairs, urls)
save_results(alive_pairs, output_file)
if __name__ == '__main__':
main()