asio中的异步accept分析
简介
主要分析linux平台下的,即reactive_socket_service_base
和reactive_socket_service
发起
由basic_socket_acceptor
调用async_accept
,前提是需要调用open
创建socket添加到reactor中。其定义为
template <typename SocketService, typename AcceptHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(AcceptHandler,
void (boost::system::error_code))
async_accept(basic_socket<protocol_type, SocketService>& peer,
endpoint_type& peer_endpoint, BOOST_ASIO_MOVE_ARG(AcceptHandler) handler)
{
// If you get an error on the following line it means that your handler does
// not meet the documented type requirements for a AcceptHandler.
BOOST_ASIO_ACCEPT_HANDLER_CHECK(AcceptHandler, handler) type_check;
return this->get_service().async_accept(this->get_implementation(), peer,
&peer_endpoint, BOOST_ASIO_MOVE_CAST(AcceptHandler)(handler));
}
peer
:为连接后的socket
peer_endpoint
:对端的端口信息
handler
:连接成功后的处理器
其底层是调用reactive_socket_service
的async_accept
template <typename Socket, typename Handler>
void async_accept(implementation_type& impl, Socket& peer,
endpoint_type* peer_endpoint, Handler& handler)
{
bool is_continuation =
boost_asio_handler_cont_helpers::is_continuation(handler);
// Allocate and construct an operation to wrap the handler.
typedef reactive_socket_accept_op<Socket, Protocol, Handler> op;
typename op::ptr p = { boost::asio::detail::addressof(handler),
boost_asio_handler_alloc_helpers::allocate(
sizeof(op), handler), 0 };
p.p = new (p.v) op(impl.socket_, impl.state_, peer,
impl.protocol_, peer_endpoint, handler);
BOOST_ASIO_HANDLER_CREATION((p.p, "socket", &impl, "async_accept"));
start_accept_op(impl, p.p, is_continuation, peer.is_open());
p.v = p.p = 0;
}
首先创建reactive_socket_accept_op
,其会动态分配内存handler,如果开启了小内存回收,分配策略是使用thread_info::allocate
,否则是使用new
void* asio_handler_allocate(std::size_t size, ...)
{
#if !defined(BOOST_ASIO_DISABLE_SMALL_BLOCK_RECYCLING)
# if defined(BOOST_ASIO_HAS_IOCP)
typedef detail::win_iocp_io_service io_service_impl;
typedef detail::win_iocp_thread_info thread_info;
# else // defined(BOOST_ASIO_HAS_IOCP)
typedef detail::task_io_service io_service_impl;
typedef detail::task_io_service_thread_info thread_info;
# endif // defined(BOOST_ASIO_HAS_IOCP)
typedef detail::call_stack<io_service_impl, thread_info> call_stack;
return thread_info::allocate(call_stack::top(), size);
#else // !defined(BOOST_ASIO_DISABLE_SMALL_BLOCK_RECYCLING)
return ::operator new(size);
#endif // !defined(BOOST_ASIO_DISABLE_SMALL_BLOCK_RECYCLING)
}
然后调用reactive_socket_service_base
的start_accept_op
将reactive_socket_accept_op添加到per_descriptor_data
的队列中
void reactive_socket_service_base::start_accept_op(
reactive_socket_service_base::base_implementation_type& impl,
reactor_op* op, bool is_continuation, bool peer_is_open)
{
if (!peer_is_open)
start_op(impl, reactor::read_op, op, true, is_continuation, false);
else
{
op->ec_ = boost::asio::error::already_open;
reactor_.post_immediate_completion(op, is_continuation);
}
}
其内部是调用start_op
void reactive_socket_service_base::start_op(
reactive_socket_service_base::base_implementation_type& impl,
int op_type, reactor_op* op, bool is_continuation,
bool is_non_blocking, bool noop)
{
if (!noop)
{
if ((impl.state_ & socket_ops::non_blocking)
|| socket_ops::set_internal_non_blocking(
impl.socket_, impl.state_, true, op->ec_))
{
reactor_.start_op(op_type, impl.socket_,
impl.reactor_data_, op, is_continuation, is_non_blocking);
return;
}
}
reactor_.post_immediate_completion(op, is_continuation);
}
对于非阻塞socket 是调用 reactor的start_op
,以epoll_reactor
为例,添加到descriptor_data的read队列中,等待io的回调
void epoll_reactor::start_op(int op_type, socket_type descriptor,
epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
bool is_continuation, bool allow_speculative)
{
if (!descriptor_data)
{
op->ec_ = boost::asio::error::bad_descriptor;
post_immediate_completion(op, is_continuation);
return;
}
mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
if (descriptor_data->shutdown_)
{
post_immediate_completion(op, is_continuation);
return;
}
if (descriptor_data->op_queue_[op_type].empty())
{
if (allow_speculative
&& (op_type != read_op
|| descriptor_data->op_queue_[except_op].empty()))
{
if (op->perform())
{
descriptor_lock.unlock();
io_service_.post_immediate_completion(op, is_continuation);
return;
}
if (op_type == write_op)
{
if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
{
epoll_event ev = { 0, { 0 } };
ev.events = descriptor_data->registered_events_ | EPOLLOUT;
ev.data.ptr = descriptor_data;
if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
{
descriptor_data->registered_events_ |= ev.events;
}
else
{
op->ec_ = boost::system::error_code(errno,
boost::asio::error::get_system_category());
io_service_.post_immediate_completion(op, is_continuation);
return;
}
}
}
}
else
{
if (op_type == write_op)
{
descriptor_data->registered_events_ |= EPOLLOUT;
}
epoll_event ev = { 0, { 0 } };
ev.events = descriptor_data->registered_events_;
ev.data.ptr = descriptor_data;
epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
}
}
descriptor_data->op_queue_[op_type].push(op);
io_service_.work_started();
}
回调处理
当listen套接字有连接到来时,会触发reactive_socket_accept_op
的完成操作,其先调用其父类reactive_socket_accept_op_base
的do_perform
方法执行非阻塞accept操作
static bool do_perform(reactor_op* base)
{
reactive_socket_accept_op_base* o(
static_cast<reactive_socket_accept_op_base*>(base));
std::size_t addrlen = o->peer_endpoint_ ? o->peer_endpoint_->capacity() : 0;
socket_type new_socket = invalid_socket;
bool result = socket_ops::non_blocking_accept(o->socket_,
o->state_, o->peer_endpoint_ ? o->peer_endpoint_->data() : 0,
o->peer_endpoint_ ? &addrlen : 0, o->ec_, new_socket);
// On success, assign new connection to peer socket object.
if (new_socket != invalid_socket)
{
socket_holder new_socket_holder(new_socket);
if (o->peer_endpoint_)
o->peer_endpoint_->resize(addrlen);
if (!o->peer_.assign(o->protocol_, new_socket, o->ec_))
new_socket_holder.release();
}
return result;
}
bool non_blocking_accept(socket_type s,
state_type state, socket_addr_type* addr, std::size_t* addrlen,
boost::system::error_code& ec, socket_type& new_socket)
{
for (;;)
{
// Accept the waiting connection.
new_socket = socket_ops::accept(s, addr, addrlen, ec);
// Check if operation succeeded.
if (new_socket != invalid_socket)
return true;
// Retry operation if interrupted by signal.
if (ec == boost::asio::error::interrupted)
continue;
// Operation failed.
if (ec == boost::asio::error::would_block
|| ec == boost::asio::error::try_again)
{
if (state & user_set_non_blocking)
return true;
// Fall through to retry operation.
}
else if (ec == boost::asio::error::connection_aborted)
{
if (state & enable_connection_aborted)
return true;
// Fall through to retry operation.
}
#if defined(EPROTO)
else if (ec.value() == EPROTO)
{
if (state & enable_connection_aborted)
return true;
// Fall through to retry operation.
}
#endif // defined(EPROTO)
else
return true;
return false;
}
}
当non_blocking_accept
操作由于错误码为would_block或者try_again时,此时new_socket
为invalid_socket
,此时`peer_的值为
impl.socket_ = invalid_socket;
impl.state_ = 0;
针对新的套接字不会作任何操作,会对 状态以及socket合法性判断
if ((impl.state_ & socket_ops::non_blocking)
|| socket_ops::set_internal_non_blocking(
impl.socket_, impl.state_, true, op->ec_))
{
reactor_.start_op(op_type, impl.socket_,
impl.reactor_data_, op, is_continuation, is_non_blocking);
return;
}
对于接受连接成功的,会调用basic_socket
的assign
方法,会最终是调用reactive_socket_service
的assign
将新的socket注册到reactor中
boost::system::error_code assign(implementation_type& impl,
const protocol_type& protocol, const native_handle_type& native_socket,
boost::system::error_code& ec)
{
if (!do_assign(impl, protocol.type(), native_socket, ec))
impl.protocol_ = protocol;
return ec;
}
boost::system::error_code reactive_socket_service_base::do_assign(
reactive_socket_service_base::base_implementation_type& impl, int type,
const reactive_socket_service_base::native_handle_type& native_socket,
boost::system::error_code& ec)
{
if (is_open(impl))
{
ec = boost::asio::error::already_open;
return ec;
}
if (int err = reactor_.register_descriptor(
native_socket, impl.reactor_data_))
{
ec = boost::system::error_code(err,
boost::asio::error::get_system_category());
return ec;
}
impl.socket_ = native_socket;
switch (type)
{
case SOCK_STREAM: impl.state_ = socket_ops::stream_oriented; break;
case SOCK_DGRAM: impl.state_ = socket_ops::datagram_oriented; break;
default: impl.state_ = 0; break;
}
impl.state_ |= socket_ops::possible_dup;
ec = boost::system::error_code();
return ec;
}