Pyhon中串口通信详解
前言:
pyserial 是一个用于串口通信的 Python 库,它为在 Python 中与串口设备通信提供了简单易用的接口。这个库支持多个操作系统,包括 Windows、Linux 和 Mac OS。
安装 PySerial
首先,你需要确保已安装 pyserial。可以通过以下命令安装:
pip install pyserial
功能简介
pyserial 主要支持以下功能:
打开和关闭串口
读取和写入数据
配置串口参数(如波特率、数据位、停止位等)
支持超时和读取方式
检查可用串口
基本使用方法
以下是 pyserial 的一些基本使用示例。
1. 导入库和打开串口
import serial
# 打开串口, 参数包括设备名称和波特率
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1) # Linux
# ser = serial.Serial('COM3', 9600, timeout=1) # Windows
2. 配置串口参数
你可以设置串口的参数,如波特率、数据位、停止位等:
ser.baudrate = 115200 # 设置波特率
ser.bytesize = serial.EIGHTBITS # 数据位
ser.parity = serial.PARITY_NONE # 校验位
ser.stopbits = serial.STOPBITS_ONE # 停止位
3. 读取数据
从串口读取数据的几种常用方法:
# 读取一行
line = ser.readline()
print(line.decode('utf-8')) # 转换为str
# 读取固定字节数
data = ser.read(10) # 读取10字节
print(data)
# 读取所有可用的数据
all_data = ser.read_all()
print(all_data.decode('utf-8'))
4. 写入数据
向串口写入数据:
ser.write(b'hello world') # 发送字节串
5. 关闭串口
在完成串口操作后,记得关闭串口:
ser.close()
6.完整示例
以下是一个完整的封装好的方法,展示了如何使用 pyserial 进行基本的串口通信:
from time import sleep, time
import serial
class SerialCom:
def __init__(self, port, baud_rate, timeout=5):
"""
:param port: 端口
:param baud_rate: 波特率9600,19200,38400,57600,115200
:param timeout: 超时时间,默认为5
"""
self.port = port
self.baud_rate = baud_rate
self.timeout = timeout if timeout else 5
self.password = ''
self.login_head = ''
self.ser = None
@property
def is_connected(self):
if self.ser is None:
return False
return self.ser.is_open
def connect(self):
try:
self.ser = serial.Serial(port=self.port, baudrate=self.baud_rate, timeout=self.timeout,
write_timeout=self.timeout)
self._login_head_fresh()
except Exception as e:
raise f'f"Serial open failed...{e}"'
return self.is_connected
def cmd_send(self, cmd: str, ret_str='', wait4ret_str=None, timeout=None, wait4res=None, delay_recv=1):
"""
向串口发送命令,并取回命令的打印信息
:param cmd: 待发送的命令字符串
:param wait4ret_str:
:param ret_str: 遇到该字符串就立即返回,不再继续读取数据
:param timeout: 超过该时间,即使没有遇到ret_str,也要立即返回
:return: ret: 返回的信息
"""
ret = ""
if not self.is_connected:
print('Com未连接或已断开')
return ""
timeout = timeout if timeout else self.timeout
tmp = self._msg_read() # 发送命令前,检查缓存,清空缓存
if tmp:
print('清理缓存残留数据')
# 发送命令数据
cmd = cmd + '\r'
try:
self.ser.write(cmd.encode())
sleep(1 + delay_recv) # 等待in_waiting更新
except Exception as e:
print(f'Sendcmd failed:{e}')
return ret
if not wait4res:
return ret
ret = self._msg_read(ret_str=ret_str, timeout=timeout, wait4ret_str=wait4ret_str)
ret = ret.replace(cmd.strip(), '').replace(self.login_head, '')
return ret
def close(self) -> bool:
"""关闭串口"""
self.ser.close()
return not self.is_connected
def _msg_read(self, ret_str='', timeout=0, wait4ret_str=False) -> str:
"""
读回所有信息(包括cmd以及login_head)
:param ret_str: 遇到该字符串就立即返回,不再继续读取数据
:param timeout: 超过该时间,即使没有遇到ret_str,也要立即返回
:return: 所有信息(包括cmd以及login_head)
"""
ret = ''
timeout = timeout if timeout else self.timeout
t = time()
while self.ser.in_waiting:
tmp = self.ser.read(self.ser.in_waiting).decode()
ret += tmp
if wait4ret_str and ret_str in ret:
break
if time() - t > timeout:
print('message read timeout!')
break
ret = ret.replace('\r\r', '\r')
return ret
def _login_head_fresh(self) -> None:
self.login_head = self.cmd_send('')
print(f"login head change to {self.login_head}")
if __name__ == '__main__':
com6 = SerialCom('com6', 115200)
com6.connect()
ret = com6.cmd_send("ls")
print(ret)
7.常用方法:
ser.isOpen():查看端口是否被打开。
ser.open() :打开端口‘。
ser.close():关闭端口。
ser.read():从端口读字节数据。默认1个字节。
ser.read_all():从端口接收全部数据。
ser.write("hello"):向端口写数据。
ser.readline():读一行数据。
ser.readlines():读多行数据。
in_waiting():返回接收缓存中的字节数。
flush():等待所有数据写出。
flushInput():丢弃接收缓存中的所有数据。
flushOutput():终止当前写操作,并丢弃发送缓存中的数据。