Qt网络编程-ZMQ的使用
不同主机或者相同主机中不同进程之间可以借助网络通信相互进行数据交互,网络通信实现了进程之间的通信。比如两个进程之间需要借助UDP进行单播通信,则双方需要知道对方的IP和端口,假设两者不在同一主机中,如下示意图:
主机1中的进程A想要与主机2中的进程B进行通信,则首先他们之间需要知道对方所在主机的IP地质以及对方所绑定的端口,好比A和B住在同一个小区(同一个局域网内),A派人给B送信则需要制度B所住的楼栋(主机IP地址)以及B的门牌号(绑定的IP地址)。
不同主机之间的不同进程可以借助网络套接字进行通信,同主机内也是如此,关于进程间通信我之前也写过对应博客:
linux消息队列的简单使用_linux消息队列发送消息为0-CSDN博客
QLocalSocket、QLocalServer实现本地进程通信简单实例_qt vs2015 进程间通信-CSDN博客
这里使用TCP或者UDP进行同一主机内通信也是没有问题的,比如同主机内不同进程使用UDP套接字进行通信,两者可以根据都绑定本机IP127.0.0.1,然后各自根据配置文件绑定不同端口,最后通过QUdpSocket进行数据交互。使用QTcp也是类似,只不过各进程需要指定对应的服务器和客户端,本地进程间通信如果自己实现我还是推进使用QUdpSocket,因为UDP面向报文,无需建立链接,除了单播还可以广播、组播,而且因为处于同一主机,所以基本不用考虑整个报文丢失的问题。第三方库可以采用ZMQ进行进程间通信
第三方库ZeroMQ(也称为ZMQ)是一个流行的消息传递库,它提供了简单而高效的消息传递模式,用于在应用程序之间进行通信。ZeroMQ(ZMQ)可以基于多种传输协议进行通信,包括TCP、UDP、in-process(进程内通信)等。它提供了不同的传输协议来满足不同的通信需求。
ZMQ下载:
The ZeroMQ project · GitHub
下载之后直接解压:
ZMQ编译:
环境:Windows,Qt515.2,VS2022
使用cmake打开:
完成之后打开对应生成的目录:
使用VS打开对应工程:
编译对应Debug版和release版的库:
等待编译完成:
同样方法编译生成Debug版:
生成动态库对应位置:
动态导入库位置:
在工程引入,文件结构:
ZMQ通信主要有REP/REQ、PUB/SUB、PUSH/PULL三种类型,本文主要介绍第二种PUB/SUB即发布/订阅模式 。
工程文件中引入ZMQ头文件以及库文件:
#引入libzmq头文件和库
INCLUDEPATH += $PWD/zmq/include
win32 {
LIBS += -L$$PWD/zmq/lib
CONFIG(release, debug|release) {
LIBS += -llibzmq-v143-mt-4_3_6
}
CONFIG(debug, debug|release) {
LIBS += -llibzmq-v143-mt-gd-4_3_6
}
}
发布类ZmqPublisher
头文件:
#ifndef ZMQPUBLISHER_H
#define ZMQPUBLISHER_H
#include <QObject>
class ZmqPublisher : public QObject {
Q_OBJECT
public:
explicit ZmqPublisher(QObject *parent = nullptr);
~ZmqPublisher();
bool bind(quint16 port);
signals:
public slots:
void publishData(const QByteArray &data);
private:
void *context;
void *socket;
};
#endif // ZMQPUBLISHER_H
源文件:
#include "zmqpublisher.h"
#include "zmq/include/zmq.h"
#include <QDebug>
ZmqPublisher::ZmqPublisher(QObject *parent) : QObject{parent} {
context = zmq_ctx_new();
socket = zmq_socket(context, ZMQ_PUB);
}
ZmqPublisher::~ZmqPublisher() {
zmq_close(socket);
zmq_ctx_destroy(context);
}
bool ZmqPublisher::bind(quint16 port) {
QString arg = QString("tcp://*:%1").arg(port);
int rc = zmq_bind(socket, arg.toUtf8().constData());
return rc == 0;
}
void ZmqPublisher::publishData(const QByteArray &data) {
zmq_send(socket, data, data.size(), 0);
}
订阅类ZmqSubscriber
头文件:
#ifndef ZMQSUBSCRIBER_H
#define ZMQSUBSCRIBER_H
#include <QObject>
class ZmqSubscriber : public QObject {
Q_OBJECT
public:
explicit ZmqSubscriber(QObject *parent = nullptr);
~ZmqSubscriber();
bool connectTo(quint16 port);
public slots:
void procesMessage();
signals:
void dataReceived(const QByteArray &data);
private:
void *context;
void *socket;
};
#endif // ZMQSUBSCRIBER_H
源文件:
#include "zmqsubscriber.h"
#include "zmq/include/zmq.h"
ZmqSubscriber::ZmqSubscriber(QObject *parent) : QObject{parent} {
context = zmq_ctx_new();
socket = zmq_socket(context, ZMQ_SUB);
zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
}
ZmqSubscriber::~ZmqSubscriber() {
zmq_close(socket);
zmq_ctx_destroy(context);
}
bool ZmqSubscriber::connectTo(quint16 port) {
QString arg = QString("tcp://localhost:%1").arg(port);
int rc = zmq_connect(socket, arg.toUtf8().constData());
return rc == 0;
}
void ZmqSubscriber::procesMessage() {
while (true) {
zmq_msg_t message;
zmq_msg_init(&message);
zmq_recvmsg(socket, &message, 0);
QByteArray data(static_cast<const char *>(zmq_msg_data(&message)),
zmq_msg_size(&message));
emit dataReceived(data);
zmq_msg_close(&message);
}
}
调用示例:
新建对应发布类以及线程,将发布类移入线程:
QThread *m_PubThread;
QThread *m_SubThread;
ZmqPublisher *m_Publisher;
ZmqSubscriber *m_Subscriber;
使用线程调用:
m_PubThread = new QThread;
m_SubThread = new QThread;
m_Publisher = new ZmqPublisher;
m_Subscriber = new ZmqSubscriber;
m_Publisher->moveToThread(m_PubThread);
m_Subscriber->moveToThread(m_SubThread);
connect(this, &MainWindow::publishData, m_Publisher,
&ZmqPublisher::publishData);
connect(m_Subscriber, &ZmqSubscriber::dataReceived,
[=](const QByteArray &data) {
//TODO 处理zmq接收数据
});
m_PubThread->start();
m_SubThread->start();
QMetaObject::invokeMethod(m_Subscriber, &ZmqSubscriber::procesMessage);
运行效果: