Python采集modBus协议数据
使用python 进行采集 modbus协议的数据
# -*- coding: utf-8 -*-
import logging
from pymodbus.client.sync import ModbusTcpClient
import struct
from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime, timedelta
import time
import json
import paho.mqtt.client as mqtt
import base64
from Crypto.Util.Padding import pad
from Crypto.Cipher import AES
# 配置日志记录
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("./getData.log"),
logging.StreamHandler()
])
# 配置 Modbus 服务器的 IP 地址和端口
MODBUS_SERVER_IP = '192.168.0.1'
MODBUS_SERVER_PORT = 502
DEVICE_ID = 1 # 设备地址
# 配置 InfluxDB 连接信息
FLUXDB_URL = "http://192.168.0.1:8086"
FLUXDB_TOKEN = "abcdabcdabcdbacd=="
FLUXDB_ORG = "testOrg"
FLUXDB_BUCKET = "data_bucket"
# 创建 InfluxDB 客户端
fluxdb_client = InfluxDBClient(url=FLUXDB_URL, token=FLUXDB_TOKEN, org=FLUXDB_ORG)
write_api = fluxdb_client.write_api()
# 寄存器信息列表
registers = [
{"instance_name": "电弧炉水冷炉壁", "name": "电弧炉水冷炉壁冷却水出水温度", "address": 24, "type": "Real", "tag": "电弧炉", "identifier": "abcd123", "attr_id": "24"},
{"instance_name": "电弧炉水冷炉壁", "name": "电弧炉水冷炉壁冷却水进出水流量差", "address": 26, "type": "Real", "tag": "电弧炉", "identifier": "edfr456", "attr_id": "26"},
]
# 数据存储
data_buffer = []
mqtt_data_buffer = []
def hex_string_to_byte(s):
"""将十六进制字符串转换为字节数组"""
return bytes.fromhex(s)
def encrypt_aes(data, key):
"""使用 AES 加密数据"""
try:
# 将十六进制格式的秘钥转换为字节数组
key_bytes = hex_string_to_byte(key)
# 创建 AES 加密器,使用 ECB 模式和 PKCS7 填充
cipher = AES.new(key_bytes, AES.MODE_ECB)
# 将数据转换为字节数组并进行 PKCS7 填充
padded_data = pad(data.encode('utf-8'), AES.block_size)
# 加密数据
encrypted_data = cipher.encrypt(padded_data)
# 将加密后的字节数组转换为 Base64 格式
return base64.b64encode(encrypted_data).decode('utf-8')
except Exception as e:
print(f"加密过程中发生异常: {e}")
return None
# 数据读取与上传
upload_interval = timedelta(seconds=15) # 上传间隔设置为15秒
while True:
try:
# 创建 Modbus TCP 客户端
client = ModbusTcpClient(MODBUS_SERVER_IP, MODBUS_SERVER_PORT)
if not client.connect():
logging.error("无法连接到Modbus服务器")
time.sleep(60) # 等待60秒后重试
continue
# 读取数据
for register in registers:
try:
if register["type"] == "Real":
result = client.read_holding_registers(register["address"], 2, unit=DEVICE_ID)
if not result.isError():
value = struct.unpack('>f', struct.pack('>HH', result.registers[0], result.registers[1]))[0]
# 写入 InfluxDB
point_time = datetime.utcnow() # 获取当前写入时间
point = (Point("monitorData").tag("attr_id", register["attr_id"]).tag("cn_name", register["name"])
.tag("instance_name", register["instance_name"]).tag("tag", register["tag"]).tag("name", register["name"]).tag("eng_name", register["name"])
.field("value", value).tag("id", register["attr_id"])
.time(point_time, WritePrecision.NS))
logging.info("register[type] == Real , register = %s", register)
logging.info("value = %s, type = %s", value, type(value))
write_api.write(bucket=FLUXDB_BUCKET, org=FLUXDB_ORG, record=point)
# 存储值和写入时间
data_buffer.append({"name": register["name"], "value": value, "identifier": register["identifier"], "collectTime": point_time})
elif register["type"] == "Bool":
result = client.read_holding_registers(register["address"], 2, unit=DEVICE_ID)
if not result.isError():
value = struct.unpack('>f', struct.pack('>HH', result.registers[0], result.registers[1]))[0]
value = 0.0 if value else 1.0
# 写入 InfluxDB
point_time = datetime.utcnow() # 获取当前写入时间
point = (Point("monitorData").tag("attr_id", register["attr_id"]).tag("cn_name", register["name"])
.tag("instance_name", register["instance_name"]).tag("tag", register["tag"]).tag("name", register["name"]).tag("eng_name", register["name"])
.field("value", value).tag("id", register["attr_id"])
.time(point_time, WritePrecision.NS))
logging.info("register[type] == Bool , register = %s", register)
logging.info("value = %s, type = %s", value, type(value))
write_api.write(bucket=FLUXDB_BUCKET, org=FLUXDB_ORG, record=point)
# 存储值和写入时间
data_buffer.append(
{"name": register["name"], "value": value, "identifier": register["identifier"],
"collectTime": point_time})
except Exception as e:
logging.error(f"处理寄存器 {register['name']} 时发生异常: {e}")
client.close()
# 每15秒保存到InfluxDB并上传
time.sleep(upload_interval.total_seconds())
except Exception as e:
logging.error(f"主循环发生异常: {e}")
time.sleep(60) # 等待60秒后重试