当前位置: 首页 > article >正文

Python采集modBus协议数据

使用python 进行采集 modbus协议的数据

# -*- coding: utf-8 -*-
import logging
from pymodbus.client.sync import ModbusTcpClient
import struct
from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime, timedelta
import time
import json
import paho.mqtt.client as mqtt
import base64
from Crypto.Util.Padding import pad
from Crypto.Cipher import AES

# 配置日志记录
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    handlers=[
                        logging.FileHandler("./getData.log"),
                        logging.StreamHandler()
                    ])

# 配置 Modbus 服务器的 IP 地址和端口
MODBUS_SERVER_IP = '192.168.0.1'
MODBUS_SERVER_PORT = 502
DEVICE_ID = 1  # 设备地址

# 配置 InfluxDB 连接信息
FLUXDB_URL = "http://192.168.0.1:8086"
FLUXDB_TOKEN = "abcdabcdabcdbacd=="
FLUXDB_ORG = "testOrg"
FLUXDB_BUCKET = "data_bucket"

# 创建 InfluxDB 客户端
fluxdb_client = InfluxDBClient(url=FLUXDB_URL, token=FLUXDB_TOKEN, org=FLUXDB_ORG)
write_api = fluxdb_client.write_api()

# 寄存器信息列表
registers = [
    {"instance_name": "电弧炉水冷炉壁", "name": "电弧炉水冷炉壁冷却水出水温度", "address": 24, "type": "Real", "tag": "电弧炉", "identifier": "abcd123", "attr_id": "24"},
    {"instance_name": "电弧炉水冷炉壁", "name": "电弧炉水冷炉壁冷却水进出水流量差", "address": 26, "type": "Real", "tag": "电弧炉", "identifier": "edfr456", "attr_id": "26"},
]

# 数据存储
data_buffer = []
mqtt_data_buffer = []

def hex_string_to_byte(s):
    """将十六进制字符串转换为字节数组"""
    return bytes.fromhex(s)

def encrypt_aes(data, key):
    """使用 AES 加密数据"""
    try:
        # 将十六进制格式的秘钥转换为字节数组
        key_bytes = hex_string_to_byte(key)
        # 创建 AES 加密器,使用 ECB 模式和 PKCS7 填充
        cipher = AES.new(key_bytes, AES.MODE_ECB)
        # 将数据转换为字节数组并进行 PKCS7 填充
        padded_data = pad(data.encode('utf-8'), AES.block_size)
        # 加密数据
        encrypted_data = cipher.encrypt(padded_data)
        # 将加密后的字节数组转换为 Base64 格式
        return base64.b64encode(encrypted_data).decode('utf-8')
    except Exception as e:
        print(f"加密过程中发生异常: {e}")
        return None

# 数据读取与上传
upload_interval = timedelta(seconds=15)  # 上传间隔设置为15while True:
    try:
        # 创建 Modbus TCP 客户端
        client = ModbusTcpClient(MODBUS_SERVER_IP, MODBUS_SERVER_PORT)
        if not client.connect():
            logging.error("无法连接到Modbus服务器")
            time.sleep(60)  # 等待60秒后重试
            continue

        # 读取数据
        for register in registers:
            try:
                if register["type"] == "Real":
                    result = client.read_holding_registers(register["address"], 2, unit=DEVICE_ID)
                    if not result.isError():
                        value = struct.unpack('>f', struct.pack('>HH', result.registers[0], result.registers[1]))[0]
                        # 写入 InfluxDB
                        point_time = datetime.utcnow()  # 获取当前写入时间
                        point = (Point("monitorData").tag("attr_id", register["attr_id"]).tag("cn_name", register["name"])
                                 .tag("instance_name", register["instance_name"]).tag("tag", register["tag"]).tag("name", register["name"]).tag("eng_name", register["name"])
                                 .field("value", value).tag("id", register["attr_id"])
                                 .time(point_time, WritePrecision.NS))
                        logging.info("register[type] == Real , register = %s", register)
                        logging.info("value = %s, type = %s", value, type(value))

                        write_api.write(bucket=FLUXDB_BUCKET, org=FLUXDB_ORG, record=point)
                        # 存储值和写入时间
                        data_buffer.append({"name": register["name"], "value": value, "identifier": register["identifier"], "collectTime": point_time})

                elif register["type"] == "Bool":
                    result = client.read_holding_registers(register["address"], 2, unit=DEVICE_ID)
                    if not result.isError():
                        value = struct.unpack('>f', struct.pack('>HH', result.registers[0], result.registers[1]))[0]
                        value = 0.0 if value else 1.0
                        # 写入 InfluxDB
                        point_time = datetime.utcnow()  # 获取当前写入时间
                        point = (Point("monitorData").tag("attr_id", register["attr_id"]).tag("cn_name", register["name"])
                                 .tag("instance_name", register["instance_name"]).tag("tag", register["tag"]).tag("name", register["name"]).tag("eng_name", register["name"])
                                 .field("value", value).tag("id", register["attr_id"])
                                 .time(point_time, WritePrecision.NS))
                        logging.info("register[type] == Bool , register = %s", register)
                        logging.info("value = %s, type = %s", value, type(value))
                        write_api.write(bucket=FLUXDB_BUCKET, org=FLUXDB_ORG, record=point)
                        # 存储值和写入时间
                        data_buffer.append(
                            {"name": register["name"], "value": value, "identifier": register["identifier"],
                             "collectTime": point_time})

            except Exception as e:
                logging.error(f"处理寄存器 {register['name']} 时发生异常: {e}")

        client.close()

        # 每15秒保存到InfluxDB并上传
        time.sleep(upload_interval.total_seconds())


    except Exception as e:
        logging.error(f"主循环发生异常: {e}")
        time.sleep(60)  # 等待60秒后重试


http://www.kler.cn/a/512193.html

相关文章:

  • 【环境搭建】Metersphere v2.x 容器部署教程踩坑总结
  • Python文本处理:LDA主题聚类模型
  • MySQL 很重要的库 - 信息字典
  • JavaFx + SpringBoot 快速开始脚手架
  • python助力WRF自动化运行
  • C++的auto_ptr智能指针:从诞生到被弃用的历程
  • Linux网络IOv1.1介绍-纯PDF版
  • MySQL 中单独获取已知日期的年月日
  • 直驱式风电储能制氢仿真模型matlab/simulink
  • Type-C充电与智能家居的结合
  • 【王树森搜索引擎技术】概要01:搜索引擎的基本概念
  • MySQL 事务及MVCC机制详解
  • TypeScript - 利用GPT辅助学习
  • SparkSQL数据模型综合实践
  • 电路研究9——GPRS用的AT命令手册
  • Javascript IndexedDB 数据库
  • Golang学习笔记_28——工厂方法模式(实例)
  • 【开源免费】基于SpringBoot+Vue.JS夕阳红公寓管理系统(JAVA毕业设计)
  • 告别手动编辑:如何用Python快速创建Ansible hosts文件?
  • MyBatis与Hibernate的全面对比
  • 软件测试 —— Postman(断言)
  • Windows FileZila Server共享电脑文件夹 映射21端口外网连接
  • centos设置开机自启的几种方案(frp为例)
  • leetcode——无重复字符的最长字串(java)
  • 网站HTTP改成HTTPS
  • 一种简单又强势的Js-Forward脚本编写方式