当前位置: 首页 > article >正文

asio中的异步accept分析

简介

主要分析linux平台下的,即reactive_socket_service_basereactive_socket_service

发起

basic_socket_acceptor调用async_accept,前提是需要调用open创建socket添加到reactor中。其定义为

template <typename SocketService, typename AcceptHandler>
  BOOST_ASIO_INITFN_RESULT_TYPE(AcceptHandler,
      void (boost::system::error_code))
  async_accept(basic_socket<protocol_type, SocketService>& peer,
      endpoint_type& peer_endpoint, BOOST_ASIO_MOVE_ARG(AcceptHandler) handler)
  {
    // If you get an error on the following line it means that your handler does
    // not meet the documented type requirements for a AcceptHandler.
    BOOST_ASIO_ACCEPT_HANDLER_CHECK(AcceptHandler, handler) type_check;

    return this->get_service().async_accept(this->get_implementation(), peer,
        &peer_endpoint, BOOST_ASIO_MOVE_CAST(AcceptHandler)(handler));
  }

peer:为连接后的socket
peer_endpoint:对端的端口信息
handler:连接成功后的处理器
其底层是调用reactive_socket_serviceasync_accept

template <typename Socket, typename Handler>
  void async_accept(implementation_type& impl, Socket& peer,
      endpoint_type* peer_endpoint, Handler& handler)
  {
    bool is_continuation =
      boost_asio_handler_cont_helpers::is_continuation(handler);

    // Allocate and construct an operation to wrap the handler.
    typedef reactive_socket_accept_op<Socket, Protocol, Handler> op;
    typename op::ptr p = { boost::asio::detail::addressof(handler),
      boost_asio_handler_alloc_helpers::allocate(
        sizeof(op), handler), 0 };
    p.p = new (p.v) op(impl.socket_, impl.state_, peer,
        impl.protocol_, peer_endpoint, handler);

    BOOST_ASIO_HANDLER_CREATION((p.p, "socket", &impl, "async_accept"));

    start_accept_op(impl, p.p, is_continuation, peer.is_open());
    p.v = p.p = 0;
  }

首先创建reactive_socket_accept_op,其会动态分配内存handler,如果开启了小内存回收,分配策略是使用thread_info::allocate,否则是使用new

void* asio_handler_allocate(std::size_t size, ...)
{
#if !defined(BOOST_ASIO_DISABLE_SMALL_BLOCK_RECYCLING)
# if defined(BOOST_ASIO_HAS_IOCP)
  typedef detail::win_iocp_io_service io_service_impl;
  typedef detail::win_iocp_thread_info thread_info;
# else // defined(BOOST_ASIO_HAS_IOCP)
  typedef detail::task_io_service io_service_impl;
  typedef detail::task_io_service_thread_info thread_info;
# endif // defined(BOOST_ASIO_HAS_IOCP)
  typedef detail::call_stack<io_service_impl, thread_info> call_stack;
  return thread_info::allocate(call_stack::top(), size);
#else // !defined(BOOST_ASIO_DISABLE_SMALL_BLOCK_RECYCLING)
  return ::operator new(size);
#endif // !defined(BOOST_ASIO_DISABLE_SMALL_BLOCK_RECYCLING)
}

然后调用reactive_socket_service_basestart_accept_op将reactive_socket_accept_op添加到per_descriptor_data的队列中

void reactive_socket_service_base::start_accept_op(
    reactive_socket_service_base::base_implementation_type& impl,
    reactor_op* op, bool is_continuation, bool peer_is_open)
{
  if (!peer_is_open)
    start_op(impl, reactor::read_op, op, true, is_continuation, false);
  else
  {
    op->ec_ = boost::asio::error::already_open;
    reactor_.post_immediate_completion(op, is_continuation);
  }
}

其内部是调用start_op

void reactive_socket_service_base::start_op(
    reactive_socket_service_base::base_implementation_type& impl,
    int op_type, reactor_op* op, bool is_continuation,
    bool is_non_blocking, bool noop)
{
  if (!noop)
  {
    if ((impl.state_ & socket_ops::non_blocking)
        || socket_ops::set_internal_non_blocking(
          impl.socket_, impl.state_, true, op->ec_))
    {
      reactor_.start_op(op_type, impl.socket_,
          impl.reactor_data_, op, is_continuation, is_non_blocking);
      return;
    }
  }

  reactor_.post_immediate_completion(op, is_continuation);
}

对于非阻塞socket 是调用 reactor的start_op,以epoll_reactor为例,添加到descriptor_data的read队列中,等待io的回调

void epoll_reactor::start_op(int op_type, socket_type descriptor,
    epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
    bool is_continuation, bool allow_speculative)
{
  if (!descriptor_data)
  {
    op->ec_ = boost::asio::error::bad_descriptor;
    post_immediate_completion(op, is_continuation);
    return;
  }

  mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);

  if (descriptor_data->shutdown_)
  {
    post_immediate_completion(op, is_continuation);
    return;
  }

  if (descriptor_data->op_queue_[op_type].empty())
  {
    if (allow_speculative
        && (op_type != read_op
          || descriptor_data->op_queue_[except_op].empty()))
    {
      if (op->perform())
      {
        descriptor_lock.unlock();
        io_service_.post_immediate_completion(op, is_continuation);
        return;
      }

      if (op_type == write_op)
      {
        if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
        {
          epoll_event ev = { 0, { 0 } };
          ev.events = descriptor_data->registered_events_ | EPOLLOUT;
          ev.data.ptr = descriptor_data;
          if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
          {
            descriptor_data->registered_events_ |= ev.events;
          }
          else
          {
            op->ec_ = boost::system::error_code(errno,
                boost::asio::error::get_system_category());
            io_service_.post_immediate_completion(op, is_continuation);
            return;
          }
        }
      }
    }
    else
    {
      if (op_type == write_op)
      {
        descriptor_data->registered_events_ |= EPOLLOUT;
      }

      epoll_event ev = { 0, { 0 } };
      ev.events = descriptor_data->registered_events_;
      ev.data.ptr = descriptor_data;
      epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
    }
  }

  descriptor_data->op_queue_[op_type].push(op);
  io_service_.work_started();
}

回调处理

当listen套接字有连接到来时,会触发reactive_socket_accept_op的完成操作,其先调用其父类reactive_socket_accept_op_basedo_perform方法执行非阻塞accept操作

static bool do_perform(reactor_op* base)
  {
    reactive_socket_accept_op_base* o(
        static_cast<reactive_socket_accept_op_base*>(base));

    std::size_t addrlen = o->peer_endpoint_ ? o->peer_endpoint_->capacity() : 0;
    socket_type new_socket = invalid_socket;
    bool result = socket_ops::non_blocking_accept(o->socket_,
          o->state_, o->peer_endpoint_ ? o->peer_endpoint_->data() : 0,
          o->peer_endpoint_ ? &addrlen : 0, o->ec_, new_socket);

    // On success, assign new connection to peer socket object.
    if (new_socket != invalid_socket)
    {
      socket_holder new_socket_holder(new_socket);
      if (o->peer_endpoint_)
        o->peer_endpoint_->resize(addrlen);
      if (!o->peer_.assign(o->protocol_, new_socket, o->ec_))
        new_socket_holder.release();
    }

    return result;
  }

bool non_blocking_accept(socket_type s,
    state_type state, socket_addr_type* addr, std::size_t* addrlen,
    boost::system::error_code& ec, socket_type& new_socket)
{
  for (;;)
  {
    // Accept the waiting connection.
    new_socket = socket_ops::accept(s, addr, addrlen, ec);

    // Check if operation succeeded.
    if (new_socket != invalid_socket)
      return true;

    // Retry operation if interrupted by signal.
    if (ec == boost::asio::error::interrupted)
      continue;

    // Operation failed.
    if (ec == boost::asio::error::would_block
        || ec == boost::asio::error::try_again)
    {
      if (state & user_set_non_blocking)
        return true;
      // Fall through to retry operation.
    }
    else if (ec == boost::asio::error::connection_aborted)
    {
      if (state & enable_connection_aborted)
        return true;
      // Fall through to retry operation.
    }
#if defined(EPROTO)
    else if (ec.value() == EPROTO)
    {
      if (state & enable_connection_aborted)
        return true;
      // Fall through to retry operation.
    }
#endif // defined(EPROTO)
    else
      return true;

    return false;
  }
}

non_blocking_accept操作由于错误码为would_block或者try_again时,此时new_socket invalid_socket,此时`peer_的值为

impl.socket_ = invalid_socket;
impl.state_ = 0;

针对新的套接字不会作任何操作,会对 状态以及socket合法性判断

if ((impl.state_ & socket_ops::non_blocking)
        || socket_ops::set_internal_non_blocking(
          impl.socket_, impl.state_, true, op->ec_))
    {
      reactor_.start_op(op_type, impl.socket_,
          impl.reactor_data_, op, is_continuation, is_non_blocking);
      return;
    }

对于接受连接成功的,会调用basic_socketassign方法,会最终是调用reactive_socket_serviceassign将新的socket注册到reactor中

boost::system::error_code assign(implementation_type& impl,
      const protocol_type& protocol, const native_handle_type& native_socket,
      boost::system::error_code& ec)
  {
    if (!do_assign(impl, protocol.type(), native_socket, ec))
      impl.protocol_ = protocol;
    return ec;
  }

boost::system::error_code reactive_socket_service_base::do_assign(
    reactive_socket_service_base::base_implementation_type& impl, int type,
    const reactive_socket_service_base::native_handle_type& native_socket,
    boost::system::error_code& ec)
{
  if (is_open(impl))
  {
    ec = boost::asio::error::already_open;
    return ec;
  }

  if (int err = reactor_.register_descriptor(
        native_socket, impl.reactor_data_))
  {
    ec = boost::system::error_code(err,
        boost::asio::error::get_system_category());
    return ec;
  }

  impl.socket_ = native_socket;
  switch (type)
  {
  case SOCK_STREAM: impl.state_ = socket_ops::stream_oriented; break;
  case SOCK_DGRAM: impl.state_ = socket_ops::datagram_oriented; break;
  default: impl.state_ = 0; break;
  }
  impl.state_ |= socket_ops::possible_dup;
  ec = boost::system::error_code();
  return ec;
}

http://www.kler.cn/a/301889.html

相关文章:

  • 记录一次面试中被问到的问题 (HR面)
  • docker+ffmpeg+nginx+rtmp 拉取摄像机视频
  • 数据结构(1~10)
  • Linux pget 下载命令详解
  • 微信小程序map组件所有markers展示在视野范围内
  • 晨辉面试抽签和评分管理系统之一:考生信息管理和编排
  • 如何将 Electron 项目上架 Apple Store
  • 【时时三省】c语言例题----华为机试题<进制转换>
  • Android-10分区存储介绍及百度APP适配实践(1)
  • google vr 入门之制作简易的VR播放器(二)
  • 基于OpenCV与MQTT的停车场车牌识别系统:结合SQLite和Flask的设计流程
  • 基于Qt的自定制WPS
  • 垃圾回收
  • Flutter集成Firebase中的Realtime Analytics
  • 初学者指南:MyBatis 入门教程
  • 【开源免费】基于SpringBoot+Vue.JS房产销售系统(JAVA毕业设计)
  • Redis Key的过期策略
  • 18 Python如何操作文件?
  • docker和docker-compose安装脚本
  • Ajax 揭秘:异步 Web 交互的艺术
  • Transformer学习(1):注意力机制
  • Linux——网络基础Socket编程
  • 10个Python办公自动化案例
  • Unity3D 服务器AStar寻路客户端位置同步显示验证详解
  • C语言学习笔记
  • 运维学习————运维日志分析系统es——Elasticsearch