当前位置: 首页 > article >正文

Pyhon中串口通信详解

前言:

pyserial 是一个用于串口通信的 Python 库,它为在 Python 中与串口设备通信提供了简单易用的接口。这个库支持多个操作系统,包括 Windows、Linux 和 Mac OS。

安装 PySerial

首先,你需要确保已安装 pyserial。可以通过以下命令安装:

pip install pyserial

功能简介

pyserial 主要支持以下功能:

打开和关闭串口
读取和写入数据
配置串口参数(如波特率、数据位、停止位等)
支持超时和读取方式
检查可用串口
基本使用方法
以下是 pyserial 的一些基本使用示例。

1. 导入库和打开串口

import serial

# 打开串口, 参数包括设备名称和波特率
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)  # Linux
# ser = serial.Serial('COM3', 9600, timeout=1)  # Windows

2. 配置串口参数

你可以设置串口的参数,如波特率、数据位、停止位等:

ser.baudrate = 115200  # 设置波特率
ser.bytesize = serial.EIGHTBITS  # 数据位
ser.parity = serial.PARITY_NONE  # 校验位
ser.stopbits = serial.STOPBITS_ONE  # 停止位

3. 读取数据

从串口读取数据的几种常用方法:

# 读取一行
line = ser.readline()
print(line.decode('utf-8'))  # 转换为str

# 读取固定字节数
data = ser.read(10)  # 读取10字节
print(data)

# 读取所有可用的数据
all_data = ser.read_all()
print(all_data.decode('utf-8'))

4. 写入数据

向串口写入数据:

ser.write(b'hello world')  # 发送字节串

5. 关闭串口

在完成串口操作后,记得关闭串口:

ser.close()

6.完整示例

以下是一个完整的封装好的方法,展示了如何使用 pyserial 进行基本的串口通信:

from time import sleep, time

import serial


class SerialCom:

    def __init__(self, port, baud_rate, timeout=5):
        """
        :param port: 端口
        :param baud_rate: 波特率9600,19200,38400,57600,115200
        :param timeout: 超时时间,默认为5
        """
        self.port = port
        self.baud_rate = baud_rate
        self.timeout = timeout if timeout else 5
        self.password = ''
        self.login_head = ''
        self.ser = None

    @property
    def is_connected(self):
        if self.ser is None:
            return False
        return self.ser.is_open

    def connect(self):
        try:
            self.ser = serial.Serial(port=self.port, baudrate=self.baud_rate, timeout=self.timeout,
                                     write_timeout=self.timeout)
            self._login_head_fresh()
        except Exception as e:
            raise f'f"Serial open failed...{e}"'
        return self.is_connected

    def cmd_send(self, cmd: str, ret_str='', wait4ret_str=None, timeout=None, wait4res=None, delay_recv=1):
        """
        向串口发送命令,并取回命令的打印信息
        :param cmd: 待发送的命令字符串
        :param wait4ret_str:
        :param ret_str: 遇到该字符串就立即返回,不再继续读取数据
        :param timeout: 超过该时间,即使没有遇到ret_str,也要立即返回
        :return: ret: 返回的信息
        """
        ret = ""
        if not self.is_connected:
            print('Com未连接或已断开')
            return ""
        timeout = timeout if timeout else self.timeout

        tmp = self._msg_read()  # 发送命令前,检查缓存,清空缓存
        if tmp:
            print('清理缓存残留数据')

        # 发送命令数据
        cmd = cmd + '\r'
        try:
            self.ser.write(cmd.encode())
            sleep(1 + delay_recv)  # 等待in_waiting更新
        except Exception as e:
            print(f'Sendcmd failed:{e}')
            return ret
        if not wait4res:
            return ret
        ret = self._msg_read(ret_str=ret_str, timeout=timeout, wait4ret_str=wait4ret_str)
        ret = ret.replace(cmd.strip(), '').replace(self.login_head, '')
        return ret

    def close(self) -> bool:
        """关闭串口"""
        self.ser.close()
        return not self.is_connected

    def _msg_read(self, ret_str='', timeout=0, wait4ret_str=False) -> str:
        """
        读回所有信息(包括cmd以及login_head)
        :param ret_str: 遇到该字符串就立即返回,不再继续读取数据
        :param timeout: 超过该时间,即使没有遇到ret_str,也要立即返回
        :return: 所有信息(包括cmd以及login_head)
        """
        ret = ''
        timeout = timeout if timeout else self.timeout
        t = time()
        while self.ser.in_waiting:
            tmp = self.ser.read(self.ser.in_waiting).decode()
            ret += tmp
            if wait4ret_str and ret_str in ret:
                break
            if time() - t > timeout:
                print('message read timeout!')
                break
        ret = ret.replace('\r\r', '\r')
        return ret

    def _login_head_fresh(self) -> None:
        self.login_head = self.cmd_send('')
        print(f"login head change to {self.login_head}")


if __name__ == '__main__':
    com6 = SerialCom('com6', 115200)
    com6.connect()
    ret = com6.cmd_send("ls")
    print(ret)

7.常用方法:

ser.isOpen():查看端口是否被打开。
ser.open() :打开端口‘。
ser.close():关闭端口。
ser.read():从端口读字节数据。默认1个字节。
ser.read_all():从端口接收全部数据。
ser.write("hello"):向端口写数据。
ser.readline():读一行数据。
ser.readlines():读多行数据。
in_waiting():返回接收缓存中的字节数。
flush():等待所有数据写出。
flushInput():丢弃接收缓存中的所有数据。
flushOutput():终止当前写操作,并丢弃发送缓存中的数据。


http://www.kler.cn/a/373435.html

相关文章:

  • 如何在小红书发布笔记时显示外地IP地址
  • pip使用
  • Three.js 快速入门构建你的第一个 3D 应用
  • 实验干货|电流型霍尔传感器采样设计03-信号调理
  • k8s部署redis远程连接示例
  • gitlab不同账号间·仓库转移
  • 【Nginx系列】499错误
  • word试题转excel(一键转写excel,无格式要求)
  • 【C++】哈希表模拟:闭散列技术与哈希冲突处理
  • HTML入门教程18:HTML类
  • ef core $ 附近有语法错误_ef core contains $符近语法错
  • 「Mac畅玩鸿蒙与硬件5」鸿蒙开发环境配置篇5 - 熟悉 DevEco Studio 界面
  • 力扣每日一题 冗余连接 并查集
  • (前瞻篇)机器学习与深度学习对比
  • 知识融合是什么? - 给小学生的人工智能科普
  • golang flag介绍和使用
  • 一文带你全面了解Android 虚拟化框架(AVF)
  • 理解 CSS 中的绝对定位与 Flex 布局混用
  • 电子电气架构 --- 车载芯片现状
  • 在Vue 3项目中集成normalize.scss
  • 通过Promise和async/await解决异步操作 - 2024最新版前端秋招面试短期突击面试题
  • Vue中Axios和VantUI的基础使用
  • Vue3+element-ui 实现可编辑表格,鼠标右键自定义菜单(复制行列,粘贴行列,插入删除等)
  • 我自己的资料整理导引(一):概论
  • webpack+react中问题解决
  • 大模型,多模态大模型面试问题记录【时序,Qformer,卷积,感受野,ControlNet,IP-adapter】