当前位置: 首页 > article >正文

【RabbitMQ 项目】服务端:连接模块

文章目录

  • 一.概念辨析
    • 1.什么是连接?为什么要自己封装 Connection
  • 二.实现思路
  • 三.代码实践

一.概念辨析

1.什么是连接?为什么要自己封装 Connection

连接就是对 TCP 连接的封装,Connection 和 TcpConnectionPtr 是一一对应的关系。我们之所以要自己封装 Connenction,因为多个信道共用一个 TCP 连接,我们要管理这种从属关系,便于查找和删除信道

二.实现思路

成员变量:

  1. muduo 库中的 TCP 连接
  2. 虚拟机句柄
  3. 协议处理句柄
  4. 消费者管理句柄
  5. 线程池句柄
    以上这 5 个成员都用于初始化信道
  6. 信道管理句柄(每个 Connection 都私有一份)
    成员方法:
  7. 创建信道
  8. 删除信道
  9. 根据信道 id 获取信道

三.代码实践

#pragma once
#include "muduo/protobuf/codec.h"
#include "muduo/protobuf/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/base/CountDownLatch.h"
#include "../common/ThreadPool.hpp"
#include "Channel.hpp"
#include <functional>
#include <iostream>
namespace ns_connection
{
    using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>;
    using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
    using ProtobufDispatcherPtr = std::shared_ptr<ProtobufDispatcher>;
    using ChannelPtr = std::shared_ptr<ns_channel::Channel>;
    using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>;
    using CommonResponsePtr = std::shared_ptr<ns_protocol::CommomResponse>;
    using PushMessageResonsePtr = std::shared_ptr<ns_protocol::PushMessageResponse>;

    /***********
     * Connection是对底层用于通信的TCP套接字封装(muduo库中的TcpConnectionPtr)
     * 一个Connection中包含多个信道,当Connection关闭,信道也会销毁
     * ******************/
    class Connection
    {
    private:
        muduo::net::EventLoopThread _loopThread;
        muduo::CountDownLatch _latch;
        muduo::net::TcpClient _client;
        muduo::net::TcpConnectionPtr _connPtr;
        ProtobufDispatcherPtr _distpatcherPtr;
        ProtobufCodecPtr _codecPtr;
        ns_channel::ChannelManager _channelManager;
        ThreadPoolPtr _threadPoolPtr;

    public:
        Connection(const std::string &serverIp, int serverPort, const ThreadPoolPtr &threadPoolPtr)
            : _loopThread(),
              _latch(1),
              _client(_loopThread.startLoop(), muduo::net::InetAddress(serverIp, serverPort), "client"),
              _connPtr(),
              _channelManager(),
              _threadPoolPtr(threadPoolPtr)

        {
            // 构造成员
            _distpatcherPtr = std::make_shared<ProtobufDispatcher>((std::bind(&Connection::onUnknownMessage,
                                                                              this,
                                                                              std::placeholders::_1,
                                                                              std::placeholders::_2,
                                                                              std::placeholders::_3)));
            _codecPtr = std::make_shared<ProtobufCodec>((std::bind(&ProtobufDispatcher::onProtobufMessage,
                                                                   _distpatcherPtr.get(),
                                                                   std::placeholders::_1,
                                                                   std::placeholders::_2,
                                                                   std::placeholders::_3)));

            // 给Client注册两个回调函数
            _client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));
            _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codecPtr.get(), std::placeholders::_1,
                                                 std::placeholders::_2, std::placeholders::_3));

            _distpatcherPtr->registerMessageCallback<ns_protocol::CommomResponse>(std::bind(&Connection::onCommonResponse,
                                                                                            this, std::placeholders::_1,
                                                                                            std::placeholders::_2,
                                                                                            std::placeholders::_3));
            _distpatcherPtr->registerMessageCallback<ns_protocol::PushMessageResponse>(std::bind(&Connection::onRecvMessage,
                                                                                            this, std::placeholders::_1,
                                                                                            std::placeholders::_2,
                                                                                            std::placeholders::_3));

            connect();
        }

        void connect()
        {
            _client.connect(); // 非阻塞
            _latch.wait();
        }

        ChannelPtr openChannel()
        {
            // 只是在本地建立了信道
            auto channelPtr = _channelManager.openChannel(_connPtr, _codecPtr, _threadPoolPtr);
            // 通过该信道发送建立信道的请求,要服务端也建立对应的信道

            if (!channelPtr->openChannel())
            {
                LOG(WARNING) << "打开信道失败" << endl;
                // 关闭本地的信道,防止内存泄漏
                _channelManager.closeChannel(channelPtr->_id);
            }
            
            return channelPtr;
        }

        void closeChannel(const ChannelPtr& channelPtr)
        {
            // 发送关闭信道的请求,让服务端关闭信道
            channelPtr->closeOpenChannel();
            // 把本地信道关掉
            _channelManager.closeChannel(channelPtr->_id);
        }

    private:
        // 给_client设置的回调
        void onConnection(muduo::net::TcpConnectionPtr connPtr)
        {
            if (connPtr->connected())
            {
                _connPtr = connPtr;
                _latch.countDown();
            }
            else
            {
                _connPtr.reset();
            }
        }

        void onUnknownMessage(const muduo::net::TcpConnectionPtr &connPtr,
                              const MessagePtr &resp, muduo::Timestamp time)
        {
            LOG(WARNING) << "未知响应" << endl;
        }

        // 业务处理函数
        void onCommonResponse(const muduo::net::TcpConnectionPtr &connPtr,
                              const CommonResponsePtr &respPtr, muduo::Timestamp time)
        {
            //LOG(DEBUG) << "收到CommonResponse, respId: " << respPtr->response_id() << endl;
            std::string channeId = respPtr->channel_id();
            auto channelPtr = _channelManager.getChannel(channeId);
            channelPtr->putCommonResponse(respPtr);
        }

        void onRecvMessage(const muduo::net::TcpConnectionPtr &connPtr,
                           const PushMessageResonsePtr &respPtr, muduo::Timestamp time)
        {
            //LOG(DEBUG) << "收到消息, body: " << respPtr->msg().saved_info().body() << endl;
            std::string channeId = respPtr->channel_id();
            auto channelPtr = _channelManager.getChannel(channeId);
            // 把处理消息的任务交给线程池来做
            _threadPoolPtr->push(std::bind(&ns_channel::Channel::consumeMessage, channelPtr.get(),
                                           respPtr->qname(), respPtr->msg()));
        }
    };
}

http://www.kler.cn/a/322348.html

相关文章:

  • 基于Java Springboot甘肃旅游管理系统
  • 1+X应急响应(网络)网络流量分析技术:
  • 云原生周刊:Kubernetes v1.32 要来了
  • 【时间之外】IT人求职和创业应知【36】-肖申克的救赎
  • django-ninja 实现cors跨域请求
  • 一文说清:c++标准库
  • redis常用五种数据类型的常用指令
  • 技术干货|热门仿真平台HyperMesh CFD功能详解:几何和网格(Part 1)
  • 每日英语听力 Day3
  • 【玩转贪心算法专题】763. 划分字母区间【中等】
  • 【React】事件绑定
  • 分享开源且强大的HTML5网页视频播放器
  • 一文教你看懂什么是Hadoop
  • Techub专访顾荣辉教授:解密CertiK的安全战略路线
  • 对抗攻击方法详解:梯度攻击、转移攻击与模型集成攻击
  • 用5款AI帮你写论文,只需10分钟(附详细工具)
  • [CKA]CKA简介
  • 【Spring】Spring Aop基础入门
  • QML使用Qt自带软键盘例子
  • 深度学习-图像处理篇1.1-1.2神经网络
  • 【Prometheus】实战二:Prometheus数据监控自定义组件Pushgateway
  • Day101 代码随想录打卡|动态规划篇--- 分割等和子集
  • 帆软通过JavaScript注入sql,实现数据动态查询
  • [Linux#55][网络协议] 序列化与反序列化 | TcpCalculate为例
  • 微信小程序加载H5页面及与H5页面通信的实战教程
  • 代码随想录算法训练营 | 二叉树理论基础