【C++boost::asio网络编程】有关异步读写api的笔记
异步读写api
- 异步写操作
- async_write_some
- async_send
- 异步读操作
- async_read_some
- async_receive
定义一个Session类,主要是为了服务端专门为客户端服务创建的管理类
class Session {
public:
Session(std::shared_ptr<asio::ip::tcp::socket> socket);
void Connect(const asio::ip::tcp::endpoint& ep);
private:
std::shared_ptr<asio::ip::tcp::socket> _socket;
};
异步写操作
在介绍异步写之前,需要先封装一个Node结构,用来管理发送的数据
class MsgNode
{
friend class Session;
public:
MsgNode(const char* msg, int total_len)
:_total_len(total_len)
,_cur_len(0)
{
_msg = new char[total_len];
memcpy(_msg, msg, total_len);
}
MsgNode(int total_len)
:_total_len(total_len)
,_cur_len(0)
{
_msg = new char[_total_len];
}
~MsgNode()
{
delete[] _msg;
}
private:
char* _msg;
int _total_len;
int _cur_len;
};
其中,_msg
表示要发送的数据,_cur_len
表示已经发送的长度,而_total_len
表示数据的总长度
async_write_some
通过源码可以看出,async_write_some
需要两个参数。第一个参数是buffer
结构的数据,用来放需要发送的数据;第二个参数是一个回调函数,这个回调函数又有两个参数,一个是用来存放错误码的对象,另一个是无符号整数(这个无符号整数代表的就是当前具体发送数据的大小)
当调用完async_write_some之后(即一次异步写操作结束之后),系统会调用这个回调函数。
void Session::WriteCallBackErr(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode> node)
{
if (node->_cur_len + bytes_transferred <= node->_total_len)
{
node->_cur_len += bytes_transferred;
this->_socket->async_write_some(boost::asio::buffer(node->_msg + node->_cur_len, node->_total_len - node->_cur_len),
std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node));
}
}
void Session::WriteToSocketErr(const std::string& buf)
{
_send_node = std::make_shared<MsgNode>(buf.c_str(), buf.size());
_socket->async_write_some(boost::asio::buffer(buf.c_str(), buf.size()),
std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node));
}
在以上代码中,先在WriteToSocketErr
函数中创建一个消息结点,然后调用async_write_some
将数据发送出去。当一次写操作结束之后。系统会将错误码和已写入数据的长度作为参数给回调函数。
if (node->_cur_len + bytes_transferred <= node->_total_len)
在回调函数中判断是否已经将数据全部发送出去了,如果没有,则更新_cur_len
,然后继续执行异步发送操作
但是,以上代码逻辑中存在一个漏洞。在异步执行的逻辑中,代码调用的顺序是不确定的。
举个例子,当需要连续两次发送hello world
//连续两次调用
WriteToSocketErr("HelloWorld");
WriteToSocketErr("HelloWorld");
可能会发生第一次进行写入的时候只写入了Hello
,这时按照逻辑需要执行回调函数,当在回调函数中发现数据并没有发送完全,于是再次调用async_write_some
想继续写入World
,但此时第二次调用WriteToSocketErr("HelloWorld");
中,已经提前一步调用了async_write_some
并将数据全部写完,然后才轮到第一次发送时的回调函数将剩下的World
继续发完。这最终导致的结果时对方收到的数据为HelloHelloWorldWorld
.
为了确保发送顺序的问题,可以在Session类中定义一个队列用来管理需要发送的结点和i一个布尔类型变量用来表示当前是否有数据正在被发送(初始化为false)
class Session{
public:
Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
:_socket(socket)
,_send_pending(false)
{}
void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
void WriteToSocket(const std::string &buf);
private:
std::queue<std::shared_ptr<MsgNode>> _send_queue;
std::shared_ptr<asio::ip::tcp::socket> _socket;
bool _send_pending;
};
此时再对写操作进行改进
void Session::WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
if (ec.value() != 0)
{
std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
return;
}
std::shared_ptr<MsgNode>&node = _send_queue.front();
node->_cur_len += bytes_transferred;
if (node->_cur_len + bytes_transferred < node->_total_len)//还没有发送完
{
_socket->async_write_some(boost::asio::buffer(node->_msg + node->_cur_len, node->_total_len - bytes_transferred),
std::bind(&WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));
return;
}
_send_queue.pop();
if (_send_queue.empty())
{
_send_pending = false;
}
else
{
std::shared_ptr<MsgNode>& node = _send_queue.front();
_socket->async_write_some(boost::asio::buffer(node->_msg, node->_total_len),
std::bind(&Session::WriteCallBack, std::placeholders::_1, std::placeholders::_2));
}
}
void Session::WriteToSocket(const std::string& buf)
{
_send_queue.push(std::make_shared<MsgNode>(buf.c_str(), buf.size()));
if (_send_pending)//当前有消息正在发
{
return;
}
_socket->async_write_some(boost::asio::buffer(buf.c_str(), buf.size()),
std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));
_send_pending = true;
}
在WriteToSocket
函数中,先不着急将数据立马发送出去,而是将数据节点放入到发送队列中,然后判断当前是否有数据正在发送,如果有就返回避免冲突;没有就直接调用async_write_some
,在回调函数中,永远都是取出队首的结点进行发送,如果判断队首的元素数据已经发送完了就pop
掉,并且检查队列中是否还有需要发送的元素:如果有,继续执行发送逻辑;如果没有就将_send_pending
置为false表示当前已经没有数据正在发送了。
async_send
async_send
的作用是直接将所有数据全部发送完,代码逻辑也比async_write_some
要简单一些
void Session::WriteAllToSocket(const std::string& buf)
{
_send_queue.push(std::make_shared<MsgNode>(buf.c_str(), buf.size()));
if (_send_pending)
{
return;
}
_socket->async_send(boost::asio::buffer(buf.c_str(), buf.size()),
std::bind(&Session::WriteAllCallBck, this, std::placeholders::_1, std::placeholders::_2));
_send_pending = true;
}
void Session::WriteAllCallBck(const boost::system::error_code& ec, std::size_t bytes_tranferred)
{
if (ec.value() != 0)
{
std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
return;
}
_send_queue.pop();
if (_send_queue.empty())
{
_send_pending = false;
}
else
{
std::shared_ptr<MsgNode>& node = _send_queue.front();
_socket->async_send(boost::asio::buffer(node->_msg, node->_total_len),
std::bind(&Session::WriteAllCallBck, this, std::placeholders::_1, std::placeholders::_2));
}
}
注意
async_send
和async_write_some
不要放在一起使用,因为async_send
底层还是多次调用的async_write_some
。如果一起使用,还是会引发数据冲突的问题
异步读操作
为了准备读操作,需要在Session类中添加数据结点_recv_node
和一个布尔变量_recv_pending
class Session
{
public:
Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
:_socket(socket)
,_send_pending(false)
,_recv_pending(false)
{}
void Connect(boost::asio::ip::tcp::endpoint& ep);
void WriteCallBackErr(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode>);
void WriteToSocketErr(const std::string& buf);
void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
void WriteToSocket(const std::string& buf);
void WriteAllToSocket(const std::string& buf);
void WriteAllCallBck(const boost::system::error_code& ec, std::size_t bytes_tranferred);
void ReadFromSocket();
void ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
void ReadAllFromSocket();
void ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
private:
std::shared_ptr<boost::asio::ip::tcp::socket> _socket;
std::shared_ptr<MsgNode> _send_node;
std::queue<std::shared_ptr<MsgNode>> _send_queue;
std::shared_ptr<MsgNode> _recv_node;
bool _recv_pending;
bool _send_pending;
};
由于接收的数据在TCP缓冲区里面已经是排好序了的,所以并不需要队列来维护顺序
async_read_some
其实异步读和异步写的逻辑类似,这里就不多介绍了
void Session::ReadFromSocket()
{
if (_recv_pending)
{
return;
}
_recv_node = std::make_shared<MsgNode>(RECVSIZE);
_socket->async_read_some(boost::asio::buffer(_recv_node->_msg, _recv_node->_total_len),
std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));
_recv_pending = true;
}
void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
if (ec.value() != 0)
{
std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
return;
}
if (_recv_node->_cur_len + bytes_transferred < _recv_node->_total_len)
{
_recv_node->_cur_len += bytes_transferred;
_socket->async_read_some(boost::asio::buffer(_recv_node->_msg + _recv_node->_cur_len, _recv_node->_total_len - _recv_node->_cur_len),
std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));
return;
}
_recv_pending = false;
}
async_receive
void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
if (ec.value() != 0)
{
std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
return;
}
if (_recv_node->_cur_len + bytes_transferred < _recv_node->_total_len)
{
_recv_node->_cur_len += bytes_transferred;
_socket->async_read_some(boost::asio::buffer(_recv_node->_msg + _recv_node->_cur_len, _recv_node->_total_len - _recv_node->_cur_len),
std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));
return;
}
_recv_pending = false;
}
void Session::ReadAllFromSocket()
{
if (_recv_pending)
{
return;
}
_recv_node = std::make_shared<MsgNode>(RECVSIZE);
_socket->async_receive(boost::asio::buffer(_recv_node->_msg, _recv_node->_total_len),
std::bind(&Session::ReadAllCallBack, this, std::placeholders::_1, std::placeholders::_2));
_recv_pending = true;
}
void Session::ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
if (ec.value() != 0)
{
std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
return;
}
_recv_pending = false;
}