rabbitmq承接MES客户端服务器
文章目录
- 背景
- 整体架构概述
- 方案详细步骤
- 1. 数据库选型与搭建
- 2. 设备端数据上传至数据库
- 3. 搭建 RabbitMQ 服务器
- 4. 数据同步模块(数据库到 RabbitMQ)
- 5. MES 服务器从 RabbitMQ 接收数据
- 6. 指令接收模块(RabbitMQ 到设备端)
- 7. MES 服务器发送指令到 RabbitMQ
- 方案优势
- 注意事项
背景
在非标设备里,经常需要用到mes上传数据给客户服务器,然而不同的客户需求不同,即便是同一个客户,也经常改动接口,随之而来的就需要经常改动设备代码,为了降低设备软件和客户mes服务器之间的强耦合,在非标设备和客户 MES 服务器之间添加 RabbitMQ 中间层,实现生产数据上传和指令接收,同时避免对设备代码进行大幅修改。但是不能完全避免修改设备代码。
整体架构概述
此方案的核心是在非标设备软件和客户 MES 服务器之间引入 RabbitMQ 作为中间层。非标设备将生产数据上传至本地数据库,然后由数据同步模块从数据库中读取数据并发送到 RabbitMQ。客户 MES 服务器从 RabbitMQ 接收数据。同时,MES 服务器可以将启动、暂停等指令发送到 RabbitMQ,设备端的指令接收模块从 RabbitMQ 接收这些指令并执行相应操作。
方案详细步骤
1. 数据库选型与搭建
- 选型:根据设备数据量和性能需求,选择合适的数据库。如果数据量较小且对实时性要求不高,可以选择 SQLite;如果数据量较大且需要高并发处理,建议选择 MySQL 或 PostgreSQL,sqlserver。
- 搭建:安装并配置所选数据库,创建相应的数据库和表来存储生产数据。以下是一个使用 MySQL 存储生产数据的示例 SQL 脚本:
-- 创建数据库
CREATE DATABASE device_data;
-- 使用数据库
USE device_data;
-- 创建产能表
CREATE TABLE production_capacity (
id INT AUTO_INCREMENT PRIMARY KEY,
device_id VARCHAR(50),
capacity INT,
record_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建报警信息表
CREATE TABLE alarm_info (
id INT AUTO_INCREMENT PRIMARY KEY,
device_id VARCHAR(50),
alarm_message TEXT,
alarm_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建运行状态表
CREATE TABLE running_status (
id INT AUTO_INCREMENT PRIMARY KEY,
device_id VARCHAR(50),
status VARCHAR(20),
status_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
2. 设备端数据上传至数据库
- 修改设备代码:在不进行大幅修改的前提下,在设备代码中添加将生产数据插入数据库的逻辑。以下是一个使用 Python 和 MySQL 数据库的示例代码:
import mysql.connector
# 连接数据库
mydb = mysql.connector.connect(
host="localhost",
user="your_username",
password="your_password",
database="device_data"
)
mycursor = mydb.cursor()
# 模拟设备生产数据
device_id = "device_001"
capacity = 100
alarm_message = "温度过高"
status = "运行中"
# 插入产能数据
sql = "INSERT INTO production_capacity (device_id, capacity) VALUES (%s, %s)"
val = (device_id, capacity)
mycursor.execute(sql, val)
mydb.commit()
# 插入报警信息
sql = "INSERT INTO alarm_info (device_id, alarm_message) VALUES (%s, %s)"
val = (device_id, alarm_message)
mycursor.execute(sql, val)
mydb.commit()
# 插入运行状态数据
sql = "INSERT INTO running_status (device_id, status) VALUES (%s, %s)"
val = (device_id, status)
mycursor.execute(sql, val)
mydb.commit()
mydb.close()
3. 搭建 RabbitMQ 服务器
- 安装:根据操作系统类型,选择合适的方式安装 RabbitMQ 服务器。例如,在 Ubuntu 系统上可以使用以下命令进行安装:
sudo apt-get update
sudo apt-get install rabbitmq-server
- 配置:启动 RabbitMQ 服务器,并进行必要的配置,如创建用户、虚拟主机等。以下是创建一个新用户并授予权限的示例命令:
sudo rabbitmqctl add_user your_username your_password
sudo rabbitmqctl add_vhost your_vhost
sudo rabbitmqctl set_permissions -p your_vhost your_username ".*" ".*" ".*"
4. 数据同步模块(数据库到 RabbitMQ)
- 开发思路:编写一个数据同步脚本,定期从数据库中读取最新的生产数据,并将其发送到 RabbitMQ 的相应队列中。
- 示例代码:以下是一个使用 Python 和
pika
库实现的示例代码:
import mysql.connector
import pika
# 连接数据库
mydb = mysql.connector.connect(
host="localhost",
user="your_username",
password="your_password",
database="device_data"
)
mycursor = mydb.cursor()
# 连接 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='production_capacity_queue')
channel.queue_declare(queue='alarm_info_queue')
channel.queue_declare(queue='running_status_queue')
# 同步产能数据
mycursor.execute("SELECT * FROM production_capacity WHERE id > (SELECT IFNULL(MAX(id), 0) FROM synced_production_capacity)")
results = mycursor.fetchall()
for row in results:
message = f"Device ID: {row[1]}, Capacity: {row[2]}, Record Time: {row[3]}"
channel.basic_publish(exchange='', routing_key='production_capacity_queue', body=message)
# 标记已同步的数据
sql = "INSERT INTO synced_production_capacity (id) VALUES (%s)"
val = (row[0],)
mycursor.execute(sql, val)
mydb.commit()
# 同步报警信息和运行状态数据的逻辑类似
connection.close()
mydb.close()
5. MES 服务器从 RabbitMQ 接收数据
- 开发思路:在客户 MES 服务器端编写代码,连接到 RabbitMQ 并从相应队列中接收生产数据。
- 示例代码:以下是一个使用 Python 和
pika
库实现的示例代码:
import pika
def callback(ch, method, properties, body):
print(f"Received: {body.decode()}")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='production_capacity_queue')
channel.queue_declare(queue='alarm_info_queue')
channel.queue_declare(queue='running_status_queue')
# 消费产能数据
channel.basic_consume(queue='production_capacity_queue', on_message_callback=callback, auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
6. 指令接收模块(RabbitMQ 到设备端)
- 开发思路:在设备端编写一个指令接收脚本,连接到 RabbitMQ 并从相应队列中接收启动、暂停等指令,然后执行相应操作。
- 示例代码:以下是一个使用 Python 和
pika
库实现的示例代码:
import pika
def callback(ch, method, properties, body):
command = body.decode()
if command == "start":
# 执行启动设备的操作
print("Starting the device...")
elif command == "pause":
# 执行暂停设备的操作
print("Pausing the device...")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明指令队列
channel.queue_declare(queue='device_command_queue')
# 消费指令
channel.basic_consume(queue='device_command_queue', on_message_callback=callback, auto_ack=True)
print('Waiting for commands. To exit press CTRL+C')
channel.start_consuming()
7. MES 服务器发送指令到 RabbitMQ
- 开发思路:在客户 MES 服务器端编写代码,连接到 RabbitMQ 并将启动、暂停等指令发送到相应队列中。
- 示例代码:以下是一个使用 Python 和
pika
库实现的示例代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明指令队列
channel.queue_declare(queue='device_command_queue')
# 发送启动指令
channel.basic_publish(exchange='', routing_key='device_command_queue', body='start')
connection.close()
方案优势
- 降低耦合度:通过引入 RabbitMQ 中间层,将设备端和 MES 服务器解耦,减少了对设备代码的修改。
- 提高可靠性:RabbitMQ 具有消息持久化和重试机制,确保数据传输的可靠性。
- 易于扩展:可以方便地添加新的设备或修改数据格式,而不会影响整个系统的稳定性。
注意事项
-
数据一致性:确保数据库和 RabbitMQ 之间的数据同步一致性,避免数据丢失或重复。
-
性能优化:根据实际需求对数据库和 RabbitMQ 进行性能优化,如调整队列大小、设置合适的并发数等。
-
安全性:对 RabbitMQ 进行安全配置,如设置访问权限、使用 SSL 加密等,确保数据传输的安全性。
-
假如是一条生产线,那么只需要将rabbitmq部署到局域网,专门开发一个软件负责rabbitmq的数据维护,其他设备只需要按照要求,上传需要的数据,然后根据客户要求在合适的地方触发数据上传,比如将数据库某个字段设置成1 , rabbitmq维护软件读取到1后,开始将数据库里的数据进行组合封装,发送给rabbitmq , mes服务器从rabbitmq接收数据,最好采用rabbitmq的订阅模式,以上整体方案已经用python 和C++跑通,暂未正式发布,后续肯定会遇到其他问题,到时再更新