当前位置: 首页 > article >正文

Boost Asio TCP异步服务端和客户端

服务端
消息分两次发送,第一次发送head,第二次发送body。接收也是先接收head,然后通过head结构中的body长度字段再接收body。
TcpServer.h

#pragma once
#include <atomic>
#include <vector>
#include <unordered_set>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/system/system_error.hpp>
#include "Connection.h"

using namespace boost::asio;
using namespace boost::asio::ip;
using namespace boost::system;

class TcpServer : public Connection::Listener {
public:
    using Handler = std::function<void(std::vector<uint8_t>, MessageType)>;
    TcpServer(uint16_t port, Handler&& handler);
    ~TcpServer();
    void _startListen();
    void _startAccept();
    void _handleAccept(const error_code& error, std::shared_ptr<tcp::socket> socket);

    virtual void onDataReceived(ConnectionPtr connection, const std::vector<uint8_t>& data, MessageType type);

    void send(const char*, int size);

private:
    uint16_t m_localPort;
    io_service m_oAcceptService;
    io_service::work m_oAcceptWork;
    tcp::acceptor *m_pAcceptor = nullptr;
    std::atomic_bool m_bStop = false;

    mutable boost::shared_mutex _connectionMutex;
    std::unordered_set<ConnectionPtr> _connections;

    Handler m_handler;
};


TcpServer.cpp

#include "TcpServer.h"
#include <boost/asio/buffer.hpp>
#include <fstream>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/asio.hpp>

TcpServer::TcpServer(uint16_t port, Handler&& handler)
    : m_oAcceptWork(m_oAcceptService)
    , m_localPort(port)
    , m_handler(handler)
{
    m_pAcceptor = new boost::asio::ip::tcp::acceptor(m_oAcceptService);

    _startListen();
    _startAccept();

    std::thread([&]() {
        while (1)
        {
            m_oAcceptService.run();
        }
    }).detach();
}

TcpServer::~TcpServer() {
    m_bStop = true;
}

void TcpServer::_startListen() {
    boost::asio::ip::tcp::endpoint localEndpoint(boost::asio::ip::tcp::v4(), m_localPort);
    m_pAcceptor->open(localEndpoint.protocol());

    m_pAcceptor->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
    boost::asio::ip::tcp::no_delay noDelayOption(false);
    m_pAcceptor->set_option(noDelayOption);

    boost::system::error_code ec;
    boost::system::error_code code = m_pAcceptor->bind(localEndpoint, ec);
    if (!ec.value())
    {
        m_pAcceptor->listen();
    }
    else
        std::cout << (std::string("TcpServer start listen exception: ") + ec.message().c_str()) << std::endl;

}

void TcpServer::_startAccept() {

    if (m_bStop)
    {
        return;
    }

    auto socket = std::make_shared<boost::asio::ip::tcp::socket>(m_oAcceptService);
    m_pAcceptor->async_accept(*socket, boost::bind(&TcpServer::_handleAccept
        , this
        , boost::asio::placeholders::error, socket));

}

void TcpServer::_handleAccept(const error_code& error, std::shared_ptr<tcp::socket> socket) {

    if (!error) {
        // read and write.
        std::cout << "_handleAccept" << std::endl;

        auto connection = std::make_shared<Connection>(std::move(*socket)
            , socket->get_io_service()
            , this);

        boost::unique_lock<boost::shared_mutex> lock(_connectionMutex);
        _connections.emplace(connection);
        lock.unlock();
        connection->start();
    }

    _startAccept();

}

void TcpServer::onDataReceived(ConnectionPtr connection, const std::vector<uint8_t>& data, MessageType type) {

    //connection->send(data);

    m_handler(data, type);
}

void TcpServer::send(const char* data, int size)
{
    for (auto conn : _connections)
    {
        conn->send(data, size);
    }
}


Connection.h

#pragma once
#define BOOST_ASIO_DISABLE_STD_CHRONO
#include <boost/asio.hpp>
#include <boost/date_time/time_duration.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/mutex.hpp>
#include <atomic>
#include <memory>
#include <list>
#include <future>
#include <boost/asio/steady_timer.hpp>
#include "message.h"

namespace pt = boost::posix_time;
namespace placeholders = boost::asio::placeholders;
using boost::asio::buffer;
using boost::asio::const_buffer;

// Connection的最大作用是保存TcpServer连接的客户端socket,以及单独启动线程或异步进行数据收发
class Connection : public std::enable_shared_from_this<Connection> {
public:
    class Listener;
    Connection(boost::asio::ip::tcp::socket&& socket
        , boost::asio::io_service& oTimerService
        , Listener* listener);
    ~Connection();
    void start();
    void stop();
    void _ranDataReception();
    void _handleReadHeader(const boost::system::error_code& error);
    void _handleReadData(const boost::system::error_code& error, const std::vector<uint8_t>& body, MessageType type);

    void send(const char* data, int size);
    void send(const std::vector<uint8_t>& data);
    void on_write(const boost::system::error_code & err, size_t bytes);

private:
    bool _stopped = false;
    boost::asio::ip::tcp::socket _socket;
    MessageHeader _header;
    Listener* _listener;
};

typedef std::shared_ptr<Connection> ConnectionPtr;

class Connection::Listener {
public:
    virtual ~Listener() {}
    virtual void onDataReceived(ConnectionPtr connection, const std::vector<uint8_t>& data, MessageType type) = 0;
};

Connection.cpp

#include "Connection.h"
#include <boost/bind.hpp>
#include <functional>
#include <iostream>

Connection::Connection(boost::asio::ip::tcp::socket&& socket, boost::asio::io_service& oTimerService, Listener* listener)
    : _socket(std::move(socket))
    , _listener(listener)
{
}

Connection::~Connection()
{

}

void Connection::start()
{
    _stopped = false;
    _ranDataReception();
}

void Connection::stop()
{
    _stopped = true;
}

void Connection::_ranDataReception() {
    if (!_stopped)
    {
        memset(&_header, 0, sizeof(MessageHeader));
        boost::system::error_code oError;
        boost::asio::async_read(_socket, boost::asio::buffer(&_header, sizeof(MessageHeader)),
            boost::asio::transfer_exactly(sizeof(MessageHeader)),
            boost::bind(&Connection::_handleReadHeader, shared_from_this(), oError));
    }
}

void Connection::_handleReadHeader(const boost::system::error_code& error) {

    if (!_stopped) {
        if (!error) {

            MessageType type = _header.type;
            int bodyLen = _header.length;

            //std::string strBody;
            std::vector<uint8_t> strBody;
            strBody.resize(bodyLen);

            //
            boost::system::error_code error;
            size_t iReadSize = _socket.read_some(boost::asio::buffer(strBody.data(), bodyLen), error);
            while (!error)
            {
                if (iReadSize < bodyLen)
                {
                    iReadSize += _socket.read_some(boost::asio::buffer(strBody.data() + iReadSize
                        , bodyLen - iReadSize), error);
                }
                else
                {
                    break;
                }
            }

            if (!error && iReadSize == bodyLen)
            {
                _handleReadData(error, strBody, type);
            }
            else
            {
            }


        }
    }
}

void Connection::_handleReadData(const boost::system::error_code& error, const std::vector<uint8_t>& body, MessageType type)
{
    //
    if (!_stopped)
    {
        if (!error)
        {
            _listener->onDataReceived(shared_from_this(), body, type);
            _ranDataReception();
        }
    }
}

void Connection::send(const char* data, int size)
{
    boost::system::error_code error;
    _socket.async_write_some(boost::asio::buffer(data, size),
        boost::bind(&Connection::on_write, this,
            boost::placeholders::_1,
            boost::placeholders::_2));
}

void Connection::send(const std::vector<uint8_t>& data)
{
    boost::system::error_code error;
    _socket.async_write_some(boost::asio::buffer(data.data(), data.size()), 
        boost::bind(&Connection::on_write, this, 
        boost::placeholders::_1, 
        boost::placeholders::_2));
}


void Connection::on_write(const boost::system::error_code & err, size_t bytes)
{

}

客户端
Network.h

#pragma once
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>

namespace sinftech {
namespace tv {
class Network {
public:
    Network(boost::asio::io_service& ioService, const std::string& address, uint16_t port);
    ~Network();
    void start();
    void stop();
    size_t send(char* data, size_t size);
    size_t receive(char* data, size_t size);

private:
    bool _running;
    boost::asio::ip::tcp::socket _socket;
    boost::asio::ip::tcp::endpoint _remoteEndpoint;
};
}//namespace tv
}//namespace sinftech

Network.cpp (windows平台setopt设置超时时间使用整数,Linux平台使用结构体struct timeval)

#include "Network.h"
#include <boost/asio/buffer.hpp>
#include <thread>

namespace sinftech {
namespace tv {

Network::Network(boost::asio::io_service& ioService, const std::string& address, uint16_t port)
    : _running(false)
    , _socket(ioService)
    , _remoteEndpoint(boost::asio::ip::address::from_string(address), port) 
{
    int timeout = 1000;
    int iRet = setsockopt(_socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout));
    if (0 != iRet)
    {
        printf("Set rcv time out error\n");
    }

    int iRcvSize = 1024 * 1000;
    iRet = setsockopt(_socket.native(), SOL_SOCKET, SO_RCVBUF, (char *)&iRcvSize, sizeof(iRcvSize));
    if (0 != iRet)
    {
        printf("Set rcv buffer size error\n");
    }

    start();
}

Network::~Network() {
    stop();
}

void Network::start() {
    _running = true;
}

void Network::stop() {
    _running = false;
    boost::system::error_code ec;
    _socket.close(ec);
}

size_t Network::send(char* data, size_t size) {
    size_t bytesSent = 0;
    if (_running) {
        boost::system::error_code ec;
        if (!_socket.is_open()) {
            _socket.connect(_remoteEndpoint, ec);
        }
        if (!ec) {            
            bytesSent = _socket.write_some(boost::asio::buffer(data, size), ec);
        }
        if (ec) {
            _socket.close(ec);
        }
    }
    return bytesSent;
}

size_t Network::receive(char* data, size_t size) {
    size_t bytesRecv = 0;
    if (_running) {
        boost::system::error_code ec;
        if (!_socket.is_open()) {
            _socket.connect(_remoteEndpoint, ec);
        }
        if (!ec) {            
            bytesRecv = _socket.read_some(boost::asio::buffer(data, size), ec);
        }
        if (ec) {
            _socket.close(ec);
        }
    }

    return bytesRecv;
}

}//namespace tv
}//namespace sinftech

注意,Linux和Windows平台使用setopt设置超时参数的方式是不同的。在Linux上,你可以使用setsockopt来设置套接字选项,包括读取和写入超时。具体的选项是SO_RCVTIMEO和SO_SNDTIMEO。

#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>

int set_socket_timeout(int sockfd, int timeout_ms) {
    struct timeval timeout;
    timeout.tv_sec = timeout_ms / 1000;
    timeout.tv_usec = (timeout_ms % 1000) * 1000;

    // 设置接收超时
    if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
        perror("setsockopt failed");
        return -1;
    }

    // 设置发送超时
    if (setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
        perror("setsockopt failed");
        return -1;
    }

    return 0;
}

在Windows上,setsockopt同样用于设置套接字选项,但超时时间是以毫秒为单位的整数,而不是timeval结构体。你需要使用SO_RCVTIMEO和SO_SNDTIMEO选项,并传递一个DWORD类型的值。

#include <winsock2.h>
#include <ws2tcpip.h>

#pragma comment(lib, "Ws2_32.lib")

int set_socket_timeout(SOCKET sockfd, DWORD timeout_ms) {
    // 设置接收超时
    if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout_ms, sizeof(timeout_ms)) == SOCKET_ERROR) {
        printf("setsockopt failed with error: %ld\n", WSAGetLastError());
        return -1;
    }

    // 设置发送超时
    if (setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char*)&timeout_ms, sizeof(timeout_ms)) == SOCKET_ERROR) {
        printf("setsockopt failed with error: %ld\n", WSAGetLastError());
        return -1;
    }

    return 0;
}

// 在程序开始时需要初始化Winsock库
int main() {
    WSADATA wsaData;
    int result = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (result != 0) {
        printf("WSAStartup failed: %d\n", result);
        return 1;
    }

    // ... 创建并配置套接字 ...

    // 在程序结束前清理Winsock库
    WSACleanup();
    return 0;
}

http://www.kler.cn/a/510123.html

相关文章:

  • 抖音a_bogus,mstoken全参数爬虫逆向补环境2024-06-15最新版
  • 初学stm32 --- CAN
  • Social LSTM:Human Trajectory Prediction in Crowded Spaces | 文献翻译
  • 力扣解题汇总(简单)_JAVA
  • npm ERR! code CERT_HAS_EXPIRED
  • C#中无法在串口serialPort1_DataReceived启动定时器的解决方法
  • 数据库管理-第285期 Oracle 23ai:深入浅出向量索引(20250117)
  • 2025年编程语言热度分析:Python领跑,Go与Rust崛起
  • 考研计算机组成原理——零基础学习的笔记
  • PHP语言的字符串处理
  • 深度学习实战:使用卷积神经网络(CNN)进行图像分类
  • 使用Pytorch完成图像分类任务
  • 数据分析如何正确使用ChatGPT进行辅助?
  • 《多模态语言模型的局限性与生态系统发展现状分析》
  • Phi小模型开发教程:用C#开发本地部署AI聊天工具,只需CPU,不需要GPU,3G内存就可以运行,不输GPT-3.5
  • 拟合算法 (matlab工具箱)
  • 使用Websocket进行前后端实时通信
  • 彻底理解JVM类加载机制
  • 企业可以通过以下方式利用全星QMS软件提高质量管理的效率和准确性
  • PCL 计算点云的最大距离【2025最新版】
  • 蓝桥杯训练—芯片测试
  • 安装httpd
  • CentOS 7.9下安装Docker
  • WEB渗透技术研究与安全防御
  • 乘联会:1月汽车零售预计175万辆 环比暴跌33.6%
  • 构建安全防线:基于视频AI的煤矿管理系统架构创新成果展示