当前位置: 首页 > article >正文

使用paho.mqtt.cpp库实现ssl/tls加密通信

老规矩,贴链接:
源码:https://github.com/eclipse-paho/paho.mqtt.cpp/blob/master/README.md

直接上代码:

// ssl_publish.cpp
//
// This is a Paho MQTT C++ client, sample application.
//
// It's an example of how to connect to an MQTT broker securely, and then
// send messages as an MQTT publisher using the C++ asynchronous client
// interface.
//
// The sample demonstrates:
//  - Connecting to an MQTT server/broker securely
//  - Setting SSL/TLS options
//  - Last will and testament
//  - Publishing messages
//  - Using asynchronous tokens
//  - Implementing callbacks and action listeners
//
// We can test this using mosquitto configured with certificates in the
// Paho C library. The C library has an SSL/TSL test suite, and we can use
// that to test:
//     $ cd paho.mqtt.c
//     $ mosquitto -c test/tls-testing/mosquitto.conf
//
// Then use the files "test-root-ca.crt" and "client.pem" from the
// test/ssl directory (paho.mqtt.c/test/ssl) for the trust store and
// key_store, respectively, for this program.
//

/*******************************************************************************
 * Author luozesong
 * email zesong.luo@seres.cn
 * phone 17815029213
 *******************************************************************************/

#include <iostream>
#include <fstream>
#include <cstdlib>
#include <string>
#include <chrono>
#include <cstring>
#include "async_client.h"
#include "toml.hpp"
//#include "comlog/comlog.h"

//const std::string DFLT_SERVER_ADDRESS	{ "mqtts://localhost:18884" };
const std::string DFLT_SERVER_ADDRESS	{ "mqtts://127.0.0.1:8883" };
const std::string DFLT_CLIENT_ID		{ "ssl_publish_cpp" };

const std::string KEY_STORE				{ "/home/workspace/tmp/openssl_ca/emqx.pem" };
const std::string TRUST_STORE			{ "/home/workspace/tmp/openssl_ca/ca.pem" };

const std::string LWT_TOPIC				{ "events/disconnect" };
const std::string LWT_PAYLOAD			{ "Last will and testament." };

const auto TIMEOUT = std::chrono::seconds(10);

/

/**
 * A callback class for use with the main MQTT client.
 */
class callback : public virtual mqtt::callback
{
public:
	void connection_lost(const std::string& cause) override {
		std::cout << "\nConnection lost" << std::endl;
		if (!cause.empty())
			std::cout << "\tcause: " << cause << std::endl;
	}

	void delivery_complete(mqtt::delivery_token_ptr tok) override {
		std::cout << "\tDelivery complete for token: "
			<< (tok ? tok->get_message_id() : -1) << std::endl;
	}
};

/

using namespace std;

int main()
{
    // conf init
    const auto conf = toml::parse("./cfg/mqtt_pub.toml");
    std::string clientID = conf.at("module").at("name").as_string();
    
    //const auto servers = toml::find<std::map<std::string, std::map<std::string, std::string>>>(conf, "servers");
    std::string address = conf.at("servers").at("ccu_broker").at("addr").as_string();
    std::string ca_crt = conf.at("servers").at("ccu_broker").at("ca_crt").as_string();
    std::string client_crt = conf.at("servers").at("ccu_broker").at("client_crt").as_string();
    std::string client_key = conf.at("servers").at("ccu_broker").at("client_key").as_string();
	cout << ca_crt << ", " << client_crt << ", " << client_key << endl;
    int timeout = conf.at("servers").at("ccu_broker").at("timeout").as_integer();
    const auto TIMEOUT = std::chrono::seconds(timeout);

    int mqtt_qos = conf.at("mqtt").at("qos").as_integer();
    std::string mqtt_topic = conf.at("mqtt").at("topic").as_string();
    
	cout << "Initializing for server '" << address << "'..." << endl;
	mqtt::async_client client(address, clientID);

	callback cb;
	client.set_callback(cb);

	// Build the connect options, including SSL and a LWT message.

	auto sslopts = mqtt::ssl_options_builder()
					   .trust_store(ca_crt)
					   .key_store(client_crt)
					   .private_key(client_key)
					   .error_handler([](const std::string& msg) {
						   std::cerr << "SSL Error: " << msg << std::endl;
					   })
					   .finalize();

	auto willmsg = mqtt::message(LWT_TOPIC, LWT_PAYLOAD, mqtt_qos, true);

	auto connopts = mqtt::connect_options_builder()
					    .will(std::move(willmsg))
						.ssl(std::move(sslopts))
						.finalize();

	cout << "  ...OK" << endl;

	try {
		// Connect using SSL/TLS

		cout << "\nConnecting..." << endl;
		mqtt::token_ptr conntok = client.connect(connopts);
		cout << "Waiting for the connection..." << endl;
		conntok->wait();
		cout << "  ...OK" << endl;

		// Send a message

		cout << "\nSending message..." << endl;
		auto msg = mqtt::make_message("hello", "Hello secure C++ world!", mqtt_qos, false);
		client.publish(msg)->wait_for(TIMEOUT);
		cout << "  ...OK" << endl;

		// Disconnect

		cout << "\nDisconnecting..." << endl;
		client.disconnect()->wait();
		cout << "  ...OK" << endl;
	}
	catch (const mqtt::exception& exc) {
		cerr << exc.what() << endl;
		return 1;
	}

 	return 0;
}


// sync_consume.cpp
//
// This is a Paho MQTT C++ client, sample application.
//
// This application is an MQTT consumer/subscriber using the C++ synchronous
// client interface, which uses the queuing API to receive messages.
//
// The sample demonstrates:
//  - Connecting to an MQTT server/broker
//  - Using a persistent (non-clean) session
//  - Subscribing to multiple topics
//  - Receiving messages through the queueing consumer API
//  - Receiving and acting upon commands via MQTT topics
//  - Auto reconnect
//  - Updating auto-reconnect data
//

/*******************************************************************************
* @author luozesong
* @email zesong.luo@seres.cn
* @phone 17815029213
 *******************************************************************************/

#include <iostream>
#include <cstdlib>
#include <string>
#include <cstring>
#include <cctype>
#include <thread>
#include <chrono>
#include "mqtt/client.h"

using namespace std;
using namespace std::chrono;

const string SERVER_ADDRESS	{ "mqtts://localhost:8883" };
const string CLIENT_ID		{ "paho_cpp_sync_consume" };
const std::string KEY_STORE				{ "/home/workspace/tmp/openssl_ca/client.pem" };
const std::string TRUST_STORE			{ "/home/workspace/tmp/openssl_ca/ca.pem" };


/

int main(int argc, char* argv[])
{
	mqtt::client cli(SERVER_ADDRESS, CLIENT_ID);

	auto sslopts = mqtt::ssl_options_builder()
					   .trust_store(TRUST_STORE)
					   .key_store(KEY_STORE)
					   .private_key("/home/lzs/workspace/tmp/openssl_ca/client.key")
					   .error_handler([](const std::string& msg) {
						   std::cerr << "SSL Error: " << msg << std::endl;
					   })
					   .finalize();

	auto connOpts = mqtt::connect_options_builder()
		.user_name("user")
		.password("passwd")
		.keep_alive_interval(seconds(30))
		.automatic_reconnect(seconds(2), seconds(30))
		.clean_session(false)
		.ssl(std::move(sslopts))
		.finalize();

	// You can install a callback to change some connection data
	// on auto reconnect attempts. To make a change, update the
	// `connect_data` and return 'true'.
	cli.set_update_connection_handler(
		[](mqtt::connect_data& connData) {
			string newUserName { "newuser" };
			if (connData.get_user_name() == newUserName)
				return false;

			cout << "Previous user: '" << connData.get_user_name()
				<< "'" << endl;
			connData.set_user_name(newUserName);
			cout << "New user name: '" << connData.get_user_name()
				<< "'" << endl;
			return true;
		}
	);

	const vector<string> TOPICS { "data/#", "command", "hello" };
	const vector<int> QOS { 0, 1, 2 };

	try {
		cout << "Connecting to the MQTT server..." << flush;
		mqtt::connect_response rsp = cli.connect(connOpts);
		cout << "OK\n" << endl;

		if (!rsp.is_session_present()) {
			std::cout << "Subscribing to topics..." << std::flush;
			cli.subscribe(TOPICS, QOS);
			std::cout << "OK" << std::endl;
		}
		else {
			cli.subscribe(TOPICS, QOS);
			cout << "Session already present. Skipping subscribe." << std::endl;
		}

		// Consume messages

		while (true) {
			auto msg = cli.consume_message();

			if (msg) {
				if (msg->get_topic() == "command" &&
						msg->to_string() == "exit") {
					cout << "Exit command received" << endl;
					break;
				} 
				cout << msg->get_topic() << ": " << msg->to_string() << endl;
			}
			else if (!cli.is_connected()) {
				cout << "Lost connection" << endl;
				while (!cli.is_connected()) {
					this_thread::sleep_for(milliseconds(250));
				}
				cout << "Re-established connection" << endl;
			}
		}

		// Disconnect

		cout << "\nDisconnecting from the MQTT server..." << flush;
		cli.disconnect();
		cout << "OK" << endl;
	}
	catch (const mqtt::exception& exc) {
		cerr << exc.what() << endl;
		return 1;
	}

 	return 0;
}


需要注意的是,如果我们按emqx配置ssl,tls双向验证生成的ca,client端的证书和client端密钥,这三都得带上,否则验证不通过 官网案例需要再加上.private_key(client_key)

另外,suber的写法里面,如果你想再emqx的dashboard中看到连接,需在官网案例中恒定加上cli.subscribe(TOPICS, QOS);如下
if (!rsp.is_session_present()) {
std::cout << “Subscribing to topics…” << std::flush;
cli.subscribe(TOPICS, QOS);
std::cout << “OK” << std::endl;
}
else {
cli.subscribe(TOPICS, QOS);
cout << “Session already present. Skipping subscribe.” << std::endl;
}


http://www.kler.cn/a/429863.html

相关文章:

  • 下载导出Tomcat上的excle文档,浏览器上显示下载
  • 机器学习笔记合集
  • NanoLog起步笔记-6-StaticLogInfo
  • 攻防世界 - Web - Level 1 | file_include
  • springboot的restTemplate发起get请求参数到服务端无法被解析,curl或postman可以正常调用的url。
  • 【JavaWeb后端学习笔记】Spring事务管理
  • ISO45001职业健康安全管理体系涵盖了丰富的内容
  • 【计网笔记】习题
  • 调度系统:使用 Apache Airflow 管理和调度 Couchbase SQL 脚本的实际例子
  • C++ 游戏开发与小程序:跨平台开发与智能化游戏体验的结合
  • SpringBoot | 拦截器 | 统一数据返回格式 | 统一异常处理 | 适配器模式
  • 链式设计模式:装饰模式,职责链模式
  • 一根网线如何用软路由给手机、电脑分配设置不同IP
  • 从watch、watchEffect、useEffect原理到vue、react响应原理
  • keepalived 各模式设置
  • 实时数据开发|Flink状态计算 有状态VS无状态,区别和优劣
  • NanoLog起步笔记-7-log解压过程初探
  • 什么是反向代理?作用、原理和实例详解
  • 反向代理-缓存篇
  • ubuntu22.04 使用可以用的镜像源获取你要的镜像
  • 数据结构与算法学习笔记----树与图的深度优先遍历
  • MACOS M1/M2芯片 Homebrew 安装教程