python 组播udp诊断
测试代码
import socket
import struct
MULTICAST_IP = "239.1.1.5" # 替换为你的多播IP
MULTICAST_PORT = 9200 # 替换为你的多播端口
LOCAL_IP = "172.20.10.3" # 替换为你的本地IP
def diagnose_multicast():
try:
# 检查本地IP是否有效
try:
local_ip_valid = False
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as test_sock:
test_sock.bind((LOCAL_IP, 0))
local_ip_valid = True
if local_ip_valid:
print(f"本地IP有效: {LOCAL_IP}")
except OSError:
print(f"错误:未找到有效的本地IP {LOCAL_IP}")
return
# 创建Socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
# 绑定到本地地址
try:
sock.bind((LOCAL_IP, MULTICAST_PORT))
print(f"成功绑定到本地地址: {LOCAL_IP}:{MULTICAST_PORT}")
except OSError as e:
print(f"绑定失败: {e}")
return
# 设置 IP_TOS
try:
sock.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, 4)
print("IP_TOS 设置成功")
except OSError as e:
print(f"IP_TOS 设置失败: {e}")
# 加入多播组
try:
mreq = struct.pack("=4sl", socket.inet_aton(MULTICAST_IP), socket.inet_aton(LOCAL_IP))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
print(f"成功加入多播组: {MULTICAST_IP}")
except OSError as e:
print(f"加入多播组失败: {e}")
except Exception as e:
print(f"诊断失败: {e}")
if __name__ == "__main__":
diagnose_multicast()