Boost Asio TCP异步服务端和客户端
服务端
消息分两次发送,第一次发送head,第二次发送body。接收也是先接收head,然后通过head结构中的body长度字段再接收body。
TcpServer.h
#pragma once
#include <atomic>
#include <vector>
#include <unordered_set>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/system/system_error.hpp>
#include "Connection.h"
using namespace boost::asio;
using namespace boost::asio::ip;
using namespace boost::system;
class TcpServer : public Connection::Listener {
public:
using Handler = std::function<void(std::vector<uint8_t>, MessageType)>;
TcpServer(uint16_t port, Handler&& handler);
~TcpServer();
void _startListen();
void _startAccept();
void _handleAccept(const error_code& error, std::shared_ptr<tcp::socket> socket);
virtual void onDataReceived(ConnectionPtr connection, const std::vector<uint8_t>& data, MessageType type);
void send(const char*, int size);
private:
uint16_t m_localPort;
io_service m_oAcceptService;
io_service::work m_oAcceptWork;
tcp::acceptor *m_pAcceptor = nullptr;
std::atomic_bool m_bStop = false;
mutable boost::shared_mutex _connectionMutex;
std::unordered_set<ConnectionPtr> _connections;
Handler m_handler;
};
TcpServer.cpp
#include "TcpServer.h"
#include <boost/asio/buffer.hpp>
#include <fstream>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/asio.hpp>
TcpServer::TcpServer(uint16_t port, Handler&& handler)
: m_oAcceptWork(m_oAcceptService)
, m_localPort(port)
, m_handler(handler)
{
m_pAcceptor = new boost::asio::ip::tcp::acceptor(m_oAcceptService);
_startListen();
_startAccept();
std::thread([&]() {
while (1)
{
m_oAcceptService.run();
}
}).detach();
}
TcpServer::~TcpServer() {
m_bStop = true;
}
void TcpServer::_startListen() {
boost::asio::ip::tcp::endpoint localEndpoint(boost::asio::ip::tcp::v4(), m_localPort);
m_pAcceptor->open(localEndpoint.protocol());
m_pAcceptor->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
boost::asio::ip::tcp::no_delay noDelayOption(false);
m_pAcceptor->set_option(noDelayOption);
boost::system::error_code ec;
boost::system::error_code code = m_pAcceptor->bind(localEndpoint, ec);
if (!ec.value())
{
m_pAcceptor->listen();
}
else
std::cout << (std::string("TcpServer start listen exception: ") + ec.message().c_str()) << std::endl;
}
void TcpServer::_startAccept() {
if (m_bStop)
{
return;
}
auto socket = std::make_shared<boost::asio::ip::tcp::socket>(m_oAcceptService);
m_pAcceptor->async_accept(*socket, boost::bind(&TcpServer::_handleAccept
, this
, boost::asio::placeholders::error, socket));
}
void TcpServer::_handleAccept(const error_code& error, std::shared_ptr<tcp::socket> socket) {
if (!error) {
// read and write.
std::cout << "_handleAccept" << std::endl;
auto connection = std::make_shared<Connection>(std::move(*socket)
, socket->get_io_service()
, this);
boost::unique_lock<boost::shared_mutex> lock(_connectionMutex);
_connections.emplace(connection);
lock.unlock();
connection->start();
}
_startAccept();
}
void TcpServer::onDataReceived(ConnectionPtr connection, const std::vector<uint8_t>& data, MessageType type) {
//connection->send(data);
m_handler(data, type);
}
void TcpServer::send(const char* data, int size)
{
for (auto conn : _connections)
{
conn->send(data, size);
}
}
Connection.h
#pragma once
#define BOOST_ASIO_DISABLE_STD_CHRONO
#include <boost/asio.hpp>
#include <boost/date_time/time_duration.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/mutex.hpp>
#include <atomic>
#include <memory>
#include <list>
#include <future>
#include <boost/asio/steady_timer.hpp>
#include "message.h"
namespace pt = boost::posix_time;
namespace placeholders = boost::asio::placeholders;
using boost::asio::buffer;
using boost::asio::const_buffer;
// Connection的最大作用是保存TcpServer连接的客户端socket,以及单独启动线程或异步进行数据收发
class Connection : public std::enable_shared_from_this<Connection> {
public:
class Listener;
Connection(boost::asio::ip::tcp::socket&& socket
, boost::asio::io_service& oTimerService
, Listener* listener);
~Connection();
void start();
void stop();
void _ranDataReception();
void _handleReadHeader(const boost::system::error_code& error);
void _handleReadData(const boost::system::error_code& error, const std::vector<uint8_t>& body, MessageType type);
void send(const char* data, int size);
void send(const std::vector<uint8_t>& data);
void on_write(const boost::system::error_code & err, size_t bytes);
private:
bool _stopped = false;
boost::asio::ip::tcp::socket _socket;
MessageHeader _header;
Listener* _listener;
};
typedef std::shared_ptr<Connection> ConnectionPtr;
class Connection::Listener {
public:
virtual ~Listener() {}
virtual void onDataReceived(ConnectionPtr connection, const std::vector<uint8_t>& data, MessageType type) = 0;
};
Connection.cpp
#include "Connection.h"
#include <boost/bind.hpp>
#include <functional>
#include <iostream>
Connection::Connection(boost::asio::ip::tcp::socket&& socket, boost::asio::io_service& oTimerService, Listener* listener)
: _socket(std::move(socket))
, _listener(listener)
{
}
Connection::~Connection()
{
}
void Connection::start()
{
_stopped = false;
_ranDataReception();
}
void Connection::stop()
{
_stopped = true;
}
void Connection::_ranDataReception() {
if (!_stopped)
{
memset(&_header, 0, sizeof(MessageHeader));
boost::system::error_code oError;
boost::asio::async_read(_socket, boost::asio::buffer(&_header, sizeof(MessageHeader)),
boost::asio::transfer_exactly(sizeof(MessageHeader)),
boost::bind(&Connection::_handleReadHeader, shared_from_this(), oError));
}
}
void Connection::_handleReadHeader(const boost::system::error_code& error) {
if (!_stopped) {
if (!error) {
MessageType type = _header.type;
int bodyLen = _header.length;
//std::string strBody;
std::vector<uint8_t> strBody;
strBody.resize(bodyLen);
//
boost::system::error_code error;
size_t iReadSize = _socket.read_some(boost::asio::buffer(strBody.data(), bodyLen), error);
while (!error)
{
if (iReadSize < bodyLen)
{
iReadSize += _socket.read_some(boost::asio::buffer(strBody.data() + iReadSize
, bodyLen - iReadSize), error);
}
else
{
break;
}
}
if (!error && iReadSize == bodyLen)
{
_handleReadData(error, strBody, type);
}
else
{
}
}
}
}
void Connection::_handleReadData(const boost::system::error_code& error, const std::vector<uint8_t>& body, MessageType type)
{
//
if (!_stopped)
{
if (!error)
{
_listener->onDataReceived(shared_from_this(), body, type);
_ranDataReception();
}
}
}
void Connection::send(const char* data, int size)
{
boost::system::error_code error;
_socket.async_write_some(boost::asio::buffer(data, size),
boost::bind(&Connection::on_write, this,
boost::placeholders::_1,
boost::placeholders::_2));
}
void Connection::send(const std::vector<uint8_t>& data)
{
boost::system::error_code error;
_socket.async_write_some(boost::asio::buffer(data.data(), data.size()),
boost::bind(&Connection::on_write, this,
boost::placeholders::_1,
boost::placeholders::_2));
}
void Connection::on_write(const boost::system::error_code & err, size_t bytes)
{
}
客户端
Network.h
#pragma once
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
namespace sinftech {
namespace tv {
class Network {
public:
Network(boost::asio::io_service& ioService, const std::string& address, uint16_t port);
~Network();
void start();
void stop();
size_t send(char* data, size_t size);
size_t receive(char* data, size_t size);
private:
bool _running;
boost::asio::ip::tcp::socket _socket;
boost::asio::ip::tcp::endpoint _remoteEndpoint;
};
}//namespace tv
}//namespace sinftech
Network.cpp (windows平台setopt设置超时时间使用整数,Linux平台使用结构体struct timeval)
#include "Network.h"
#include <boost/asio/buffer.hpp>
#include <thread>
namespace sinftech {
namespace tv {
Network::Network(boost::asio::io_service& ioService, const std::string& address, uint16_t port)
: _running(false)
, _socket(ioService)
, _remoteEndpoint(boost::asio::ip::address::from_string(address), port)
{
int timeout = 1000;
int iRet = setsockopt(_socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout));
if (0 != iRet)
{
printf("Set rcv time out error\n");
}
int iRcvSize = 1024 * 1000;
iRet = setsockopt(_socket.native(), SOL_SOCKET, SO_RCVBUF, (char *)&iRcvSize, sizeof(iRcvSize));
if (0 != iRet)
{
printf("Set rcv buffer size error\n");
}
start();
}
Network::~Network() {
stop();
}
void Network::start() {
_running = true;
}
void Network::stop() {
_running = false;
boost::system::error_code ec;
_socket.close(ec);
}
size_t Network::send(char* data, size_t size) {
size_t bytesSent = 0;
if (_running) {
boost::system::error_code ec;
if (!_socket.is_open()) {
_socket.connect(_remoteEndpoint, ec);
}
if (!ec) {
bytesSent = _socket.write_some(boost::asio::buffer(data, size), ec);
}
if (ec) {
_socket.close(ec);
}
}
return bytesSent;
}
size_t Network::receive(char* data, size_t size) {
size_t bytesRecv = 0;
if (_running) {
boost::system::error_code ec;
if (!_socket.is_open()) {
_socket.connect(_remoteEndpoint, ec);
}
if (!ec) {
bytesRecv = _socket.read_some(boost::asio::buffer(data, size), ec);
}
if (ec) {
_socket.close(ec);
}
}
return bytesRecv;
}
}//namespace tv
}//namespace sinftech
注意,Linux和Windows平台使用setopt设置超时参数的方式是不同的。在Linux上,你可以使用setsockopt来设置套接字选项,包括读取和写入超时。具体的选项是SO_RCVTIMEO和SO_SNDTIMEO。
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
int set_socket_timeout(int sockfd, int timeout_ms) {
struct timeval timeout;
timeout.tv_sec = timeout_ms / 1000;
timeout.tv_usec = (timeout_ms % 1000) * 1000;
// 设置接收超时
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
perror("setsockopt failed");
return -1;
}
// 设置发送超时
if (setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
perror("setsockopt failed");
return -1;
}
return 0;
}
在Windows上,setsockopt同样用于设置套接字选项,但超时时间是以毫秒为单位的整数,而不是timeval结构体。你需要使用SO_RCVTIMEO和SO_SNDTIMEO选项,并传递一个DWORD类型的值。
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma comment(lib, "Ws2_32.lib")
int set_socket_timeout(SOCKET sockfd, DWORD timeout_ms) {
// 设置接收超时
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout_ms, sizeof(timeout_ms)) == SOCKET_ERROR) {
printf("setsockopt failed with error: %ld\n", WSAGetLastError());
return -1;
}
// 设置发送超时
if (setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char*)&timeout_ms, sizeof(timeout_ms)) == SOCKET_ERROR) {
printf("setsockopt failed with error: %ld\n", WSAGetLastError());
return -1;
}
return 0;
}
// 在程序开始时需要初始化Winsock库
int main() {
WSADATA wsaData;
int result = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (result != 0) {
printf("WSAStartup failed: %d\n", result);
return 1;
}
// ... 创建并配置套接字 ...
// 在程序结束前清理Winsock库
WSACleanup();
return 0;
}