Python编写BC260Y TCP数据收发压力测试脚本
Python编写BC260Y TCP数据收发压力测试脚本
使用BC260Y的TCP AT命令发送数据时,能够在数据中带有’\r\n’(回车换行),而其他模组会将’\r\n’当做AT命令处理的结束符,例如EC800E,为了验证TCP数据中带有’\r\n’时数据发收的稳定性,决定进行压力测试。
- 编写一个Python脚本周期向TCP服务器发送1024字节数据(数据中带有不止一个’\r\n’),在超时时间内判断是否发送成功和接收成功。
- TCP服务器同样以’\r\n’为结束符,收到结束符后将收到的所有数据原封不动回发。
- 所有的数据收发log保存到文件,便于出错后分析。
使用串口工具测试步骤如下:
编写Python脚本:
#########################################
## AT测试脚本:根据BC260Y模组TCP模块编写TCP压力测试脚本,模组使用直吐模式连接TCP服务器,每隔5s向服务器发送1024字节数据,服务器以'\r\n'(回车换行)作为数据接结束标志,将接收到的数据返回
## pyhton版本:3.6.5
## 需要安装的第三方库:pip install pyserial
#########################################
## -- coding: utf-8 --
import os
import sys
import threading
import time
import datetime
import msvcrt
import serial
import logging
import signal
import json
import hmac
import base64
from hashlib import sha256
############# 宏定义(开始) ####################
ATCOM = 'COM38'
ATBaud = 9600
LOGNamePrefix = 'LOG-NAME'
g_ExitFlag = 0 ## 0--未退出;1--退出
g_RecvResult = 0 ## 0--未接收;1--接收成功
g_RecvData = "" ## 接收数据缓存
g_SendSuccCount = 0 ## 成功次数
g_SendFailCount = 0 ## 失败次数
g_SendTotleCount = 0 ## 测试总次数
g_TimeOut = 0 ## 0--未超时;1--超时
g_Cereg = 0 ## 0--未驻网;1--已驻网
############# 宏定义(结束) ####################
###******************************************************************************
## 日志格式,输出到文件,同时输出到屏幕
def logInit(log_dir, logFileName) :
## 日志格式,输出到log_dir路径下logFileName文件
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file_path = os.path.join(log_dir, logFileName)
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s][%(levelname)s]: %(message)s',
filename=log_file_path,
## 写入模式,覆盖现有的日志文件,如果文件已经存在,它的内容将被清空
filemode='w'
)
## 日志格式,输出到屏幕
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
## %(asctime)s:表示日志事件发生的时间。默认格式为YYYY-MM-DD HH:MM:SS,mmm,其中mmm表示毫秒。
## %(levelname)s:表示日志记录的级别,如DEBUG, INFO, WARNING, ERROR, CRITICAL等。
## %(message)s:表示日志记录的消息内容。
formatter = logging.Formatter('[%(asctime)s][%(levelname)s]: %(message)s')
console.setFormatter(formatter)
logging.getLogger().addHandler(console)
###******************************************************************************
## 打开串口
## 端口,GNU/Linux上的/dev/ttyUSB0等 或 Windows上的COM3等
## 波特率,标准值之一:50,75,110,134,150,200,300,600,1200,1800,2400,4800,9600,19200,38400,57600,115200
## 超时设置,None:永远等待操作,0为立即返回请求结果,其他值为等待超时时间(单位为秒)
def openComPort(portx, bps, timeout):
ret = False
ser = ""
try:
ser = serial.Serial(portx, bps, timeout=timeout)
if(ser.is_open):
ret = True
except Exception as e:
logging.error("openComPort faild:",e)
return ser,ret
###******************************************************************************
## 关闭串口
def closeComPort(ser):
if(ser.is_open):
ser.close()
return 0
return -1
###******************************************************************************
## 串口发送AT命令并接收返回值
def ATSend(atcmd):
global g_RecvData
## 发送AT命令前先将接收缓存g_RecvData清空
g_RecvData = ""
if(len(atcmd) > 0):
logging.info(">>>>>>Send AT cmd:[%s]" %(atcmd+'\r\n'))
bytes_written = ser.write((atcmd+'\r\n').encode('utf-8'))
return bytes_written
###******************************************************************************
## 接收数据线程
def threadFunComRecv():
global g_RecvResult, g_RecvData
rxDataTmp = ""
rxData = ""
while(0 == g_ExitFlag):
count = ser.inWaiting()
if(count > 0):
tmp = ser.read(ser.in_waiting)
try:
rxDataTmp = tmp.decode('utf-8')
except:
logging.info(tmp)
else:
rxData = rxData + rxDataTmp
if(ser.in_waiting > 0):
continue
## 以'\n'为结束符将数据存入接收缓存g_RecvResult
if('\n' == rxData[-1]):
g_RecvData = g_RecvData + rxData
rxData = ""
rxDataTmp = ""
## 将接收标志为置位
g_RecvResult = 1
###******************************************************************************
## 按键终止
def signal_Handler(sig, frame):
global g_ExitFlag
g_ExitFlag = 1
logging.info("!!!Exit!!!")
sys.exit(0)
###******************************************************************************
## 定时器超时函数
def timeOut():
global g_TimeOut
g_TimeOut = 1
logging.info("!!!ERROR:Time out!!!\r\n")
###******************************************************************************
## 开启定时器
def start_timer(delay):
timer = threading.Timer(delay, timeOut)
timer.start()
return timer
###******************************************************************************
## 检查接收数据函数
def checkRecvData(strings, timeout):
global g_TimeOut
## 启动定时器,在超时时间timeout内判断是否接收到指定数据strings
timer = start_timer(timeout)
while(1 != g_TimeOut):
if(str(g_RecvData).find(strings) >= 0):
g_TimeOut = 0
timer.cancel()
## 波特率9600时,1024字节数据传输需要1.06s,加2s延时确保在收到指定数据strings后的其他数据都能接收完全
time.sleep(2)
logging.info("<<<<<<g_RecvData:[%s] \r\n" %(g_RecvData))
return 1
if(1 == g_TimeOut):
g_TimeOut = 0
timer.cancel()
time.sleep(2)
logging.info("!!!ERROR: Time out!!!")
return -1
###******************************************************************************
## 主函数
if __name__ == "__main__":
## 将当前本地时间格式化为'%Y%m%d%H%M%S'这样的格式
t = time.strftime('%Y%m%d%H%M%S', time.localtime())
logFileName = "%s-%s.log" %(LOGNamePrefix, t)
log_dir = ".\\log"
# log_dir = "C:\\Users\\Administrator\\Desktop\\TCPRecvTest\\log"
## 初始化log输出
logInit(log_dir, logFileName)
## ctrl+c终止程序
signal.signal(signal.SIGTERM, signal_Handler)
signal.signal(signal.SIGINT, signal_Handler)
## 打开串口
ser,ret = openComPort(ATCOM, ATBaud, 5)
if(False == ret):
exit
else:
logging.info("ATCOM:%s Open Success!" %ATCOM)
## 开启接收线程
trdComRecv = threading.Thread(target=threadFunComRecv)
trdComRecv.start()
## 唤醒模组,模组在睡眠模式中时,第一条命令会唤醒模组,但是无法响应该AT命令,因此先发一条AT命令唤醒模组
ser.write(('AT\r\n').encode('utf-8'))
logging.info("Wake up module!")
time.sleep(1)
## 退出睡眠模式 AT+QSCLK=0
ATCMD = "AT+QSCLK=0"
bytes_written = ATSend(ATCMD)
## 长度加2是因为多了'\r\n'
if(bytes_written == len(ATCMD) + 2):
## 5s内检查是否收到'OK'
ret = checkRecvData("OK", 5)
if(1 == ret):
logging.info(">>>>>>[%s] OK \r\n" %(ATCMD))
else:
logging.info(">>>>>>[%s] ERROR \r\n" %(ATCMD))
logging.info("!!!Exit!!!")
sys.exit(0)
## 等待模组驻网 AT+CEREG?
while(0 == g_Cereg):
ATCMD = "AT+CEREG?"
bytes_written = ATSend(ATCMD)
if(bytes_written == len(ATCMD) + 2):
## 5s内检查是否收到'+CEREG: 0,1'
ret = checkRecvData("+CEREG: 0,1", 5)
if(1 == ret):
logging.info(">>>>>>[%s] OK \r\n" %(ATCMD))
g_Cereg = 1
else:
logging.info(">>>>>>[%s] ERROR \r\n" %(ATCMD))
logging.info("!!!Exit!!!")
sys.exit(0)
## 连接TCP服务器 AT+QIOPEN=0,0,"TCP","112.91.141.248",8000,0,1
ATCMD = "AT+QIOPEN=0,0,\"TCP\",\"112.91.141.248\",8000,0,1"
bytes_written = ATSend(ATCMD)
if(bytes_written == len(ATCMD) + 2):
## 5s内检查是否收到'+QIOPEN: 0'
ret = checkRecvData("+QIOPEN: 0,", 5)
if(1 == ret):
## 模组上电第一次连接TCP服务器时返回+QIOPEN: 0,0
if(str(g_RecvData).find("+QIOPEN: 0,0") >= 0):
logging.info(">>>>>>[%s] OK Firts connection \r\n" %(ATCMD))
## 若该TCP连接已存在则回复+QIOPEN: 0,563
elif(str(g_RecvData).find("+QIOPEN: 0,563") >= 0):
logging.info(">>>>>>[%s] OK Existing connection \r\n" %(ATCMD))
else:
logging.info(">>>>>>[%s] ERROR \r\n" %(ATCMD))
logging.info("!!!Exit!!!")
sys.exit(0)
else:
logging.info(">>>>>>[%s] ERROR \r\n" %(ATCMD))
logging.info("!!!Exit!!!")
sys.exit(0)
## 主线程
while(0 == g_ExitFlag):
time.sleep(5)
## 退出睡眠模式 AT+QISEND=0,1024,"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789A\r\nA456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234B\r\nB9012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901C\r\nC678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123D\r\nD89012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456E\r\nE12345678901234567890123456789012345678901234567890123456789012345678901\r\n"
ATCMD="AT+QISEND=0,1024,\"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789A\r\nA456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234B\r\nB9012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901C\r\nC678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123D\r\nD89012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456E\r\nE12345678901234567890123456789012345678901234567890123456789012345678901\r\n\""
bytes_written = ATSend(ATCMD)
if(bytes_written == len(ATCMD) + 2):
## 5s内检查是否收到'"recv"',表示模组接收到TCP服务器回复的数据
ret = checkRecvData("+QIURC: \"recv\",", 5)
## 记录测试测试、成功次数、失败次数
if(1 == ret):
g_SendSuccCount = g_SendSuccCount + 1
g_SendTotleCount = g_SendTotleCount + 1
logging.info("######SEND SUCC! TOTAL COUNT:%d PASS COUNT:%d FAIL COUNT:%d\r\n" %(g_SendTotleCount, g_SendSuccCount, g_SendFailCount))
else:
g_SendFailCount = g_SendFailCount + 1
g_SendTotleCount = g_SendTotleCount + 1
logging.info("######SEND SUCC! TOTAL COUNT:%d PASS COUNT:%d FAIL COUNT:%d\r\n" %(g_SendTotleCount, g_SendSuccCount, g_SendFailCount))
time.sleep(5)
使用Python脚本测试:
保存的log:
VScode包(包含运行时环境配置)