当前位置: 首页 > article >正文

Python 封装 socket 为 [TCP/UDP/MULTICAST] 客户端

发送 TCP/UDP/MULTICAST 数据并接收响应。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket


class ClientSocket:
    def __init__(self, *, protocol: str, ip: str, port: int, recv_timeout: float = 1.5):
        """客户端套接字

        发送 TCP/UDP/MULTICAST 数据并接收响应。

        Args:
            protocol (str): 协议
            ip (str): ip 地址
            port (int): 端口号
            recv_timeout (float, optional): 接收超时时间. Defaults to 1.5.

        Raises:
            ValueError: 无效的端口号, 应为 [1-65535]
            ValueError: 无效的协议类型, 应为 [TCP, UDP, MULTICAST]
        """
        if port < 1 or port > 65535:
            raise ValueError(f'ServerSocket 无效的端口号 "{port}"')
        if protocol not in ['TCP', 'UDP', 'MULTICAST']:
            raise ValueError(f'ServerSocket 无效的协议类型 "{protocol}"')

        self.protocol = protocol
        self.ip = ip
        self.port = port
        self.recv_timeout = recv_timeout
        self.sock: socket.socket | None = None
        self.__connected = False

    def __str__(self) -> str:
        return f"ClientSocket({self.ip}:{self.port})"

    def connect(self) -> bool:
        if not self.__connected:
            try:
                match self.protocol:
                    case 'TCP':
                        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                        self.sock.connect((self.ip, self.port))
                    case 'UDP':
                        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                        if self.ip.endswith('.255'): # 设置 SO_BROADCAST 为 1, 允许发送广播数据包
                            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
                    case 'MULTICAST':
                        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
                        self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
                        self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1)
                        self.sock.bind(('', self.port))
                        self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self.ip) + socket.inet_aton('0.0.0.0'))
                self.sock.settimeout(self.recv_timeout)
                self.__connected = True
            except ConnectionRefusedError:
                print(f'{self} 无法连接: {self.ip}:{self.port}')
            except Exception as e:
                print(f'{self} 创建失败: {e}')
        return self.__connected

    def send(self, data: bytes) -> None:
        try:
            print(f'{self} 发送数据: {data}')

            if self.protocol == 'TCP':
                self.sock.sendall(data)
            else:
                self.sock.sendto(data, (self.ip, self.port))
        except OSError as e:
            if e.errno == 10057:
                print(f'{self} 发送失败连接未建立')
            else:
                print(f'{self} 发送失败: {e}')

    def recv(self) -> bytes | None:
        try:
            data, _ = self.sock.recvfrom(1024)
            print(f'{self} 收到数据: {data}')
            return data
        except TimeoutError:
            return None
        except OSError as e:
            if e.errno == 10057:
                print(f'{self} 接收失败连接未建立')
            else:
                print(f'{self} 接收失败: {e}')

    def close(self) -> bool:
        if self.__connected:
            try:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                self.__connected = False
            except Exception as e:
                print(f'{self} 关闭失败: {e}')
        return not self.__connected


if __name__ == '__main__':
    from time import sleep
    # client = ClientSocket(protocol='TCP', ip='127.0.0.1', port=60000)
    # client = ClientSocket(protocol='UDP', ip='127.0.0.1', port=60000)
    client = ClientSocket(protocol='MULTICAST', ip='224.1.1.1', port=65000)
    client.connect()

    times = 3
    while times > 0:
        times -= 1
        client.send(b'hello')
        client.recv()
        sleep(1)
    client.send(b'q')
    client.recv()
    client.close()


http://www.kler.cn/news/328085.html

相关文章:

  • powerbi计算销售额同比增长率
  • MySql Explain优化命令使用
  • Vue实战教程:如何用JS封装一个可复用的Loading组件
  • 基于php的律所管理系统
  • leetcode 513 找到左下角的值
  • SQLite3模块使用详解
  • 使用WebClient 快速发起请求(不使用WebClientUtils工具类)
  • 测试面试题:pytest断言时,数据是符点类型,如何断言?
  • 【Python|接口自动化测试】使用requests发送http请求时添加headers
  • 【LeetCode】每日一题 2024_9_27 每种字符至少取 K 个(双指针)
  • Android 安装应用-提交阶段之后剩下的操作
  • uniapp生物识别示例(人脸识别、指纹识别)
  • 【docker】docker常见命令
  • 动态分配内存
  • Gin框架简易搭建(3)--Grom与数据库
  • 归并排序【C语言版-笔记】
  • Unreal 实现建造游戏|地面交互shader
  • 06.C/C++内存管理
  • 【数据库】MongoDB 用户权限与数据之间的关系详解
  • Android studio配置AVD虚拟机
  • 【60天备战2024年11月软考高级系统架构设计师——第33天:云计算与大数据架构——大数据处理框架的应用场景】
  • 关于Java中的List<User>如何进行深拷贝
  • 贝锐蒲公英工业物联方案:助力美的智慧楼宇全球布局
  • Leetcode 611. 有效三角形的个数
  • 前端面试题(八)
  • 音视频入门基础:FLV专题(7)——Tag header简介
  • 【STM32单片机_(HAL库)】4-1【定时器TIM】定时器中断点灯实验
  • 【漏洞复现】JeecgBoot 积木报表 queryFieldBySql sql注入漏洞
  • 【进阶OpenCV】 (2)--Harris角点检测
  • 衡水中学资料大全-重构版(状元、学霸笔记)