当前位置: 首页 > article >正文

ACE之ACE_Message_Queue

简介

ACE_Message_Queue是ACE中通用消息队列,其是ACE_Task以及并发中ACE_Activation_Queue的基础

结构

ACE_Message_Queue_Base
ACE_Message_Queue<SYNC, TIME_POLICY>

ACE_Message_Queue_Base

ACE_Message_Queue_Base 是消息队列的抽象类,与其相关的枚举有队列的状态以及高位,低位大小

enum
 {
   DEFAULT_HWM = 16 * 1024,
   DEFAULT_LWM = 16 * 1024,

   ACTIVATED = 1,
   DEACTIVATED = 2,
   PULSED = 3
 };

包含一个表示状态的成员

protected:
  /// <ACTIVATED>, <DEACTIVATED>, or <PULSED>.
  int state_;

相关的抽象方法

virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
                                 ACE_Time_Value *timeout = 0) = 0;
virtual int enqueue_tail (ACE_Message_Block *new_item,
                          ACE_Time_Value *timeout = 0) = 0;
virtual int enqueue (ACE_Message_Block *new_item,
                     ACE_Time_Value *timeout = 0) = 0;
virtual int dequeue_head (ACE_Message_Block *&first_item,
                          ACE_Time_Value *timeout = 0) = 0;
virtual int dequeue (ACE_Message_Block *&first_item,
                     ACE_Time_Value *timeout = 0) = 0;
virtual bool is_full (void) = 0;
virtual bool is_empty (void) = 0;
virtual size_t message_bytes (void) = 0;
virtual size_t message_length (void) = 0;
virtual size_t message_count (void) = 0;
virtual void message_bytes (size_t new_size) = 0;
virtual void message_length (size_t new_length) = 0;
virtual int deactivate (void) = 0;
virtual int activate (void) = 0;
virtual int pulse (void) = 0;
virtual int state (void);
virtual int deactivated (void) = 0;
virtual ACE_Notification_Strategy *notification_strategy (void) = 0;
virtual void notification_strategy (ACE_Notification_Strategy *s) = 0;
virtual void dump (void) const = 0;

队列实现类

ACE_Message_Queue<SYNC, TIME_POLICY>
# ACE_Message_Block *head_
# ACE_Message_Block *tail_
# size_t low_water_mark_
# size_t high_water_mark_
# size_t cur_bytes_
# size_t cur_length_
# size_t cur_count_
# ACE_Notification_Strategy *notification_strategy_
# ACE_SYNCH_MUTEX_T lock_
# ACE_SYNCH_CONDITION_T not_empty_cond_
# ACE_SYNCH_CONDITION_T not_full_cond_
# TIME_POLICY time_policy_
ACE_Dynamic_Message_Queue<SYNC, TIME_POLICY>
# ACE_Message_Block *pending_head_
# ACE_Message_Block *pending_tail_
# ACE_Message_Block *late_head_
# ACE_Message_Block *late_tail_
# ACE_Message_Block *beyond_late_head_
# ACE_Message_Block *beyond_late_tail_
# ACE_Dynamic_Message_Strategy &message_strategy_
ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, SYNC, TIME_POLICY>
ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, SYNC, TIME_POLICY>
ACE_Message_Queue_Vx
ACE_Message_Queue_Base
ACE_Message_Queue_NT

ACE_Message_Queue是队列的实现类模板,支持同步策略和时间策略,通过ACE_Message_Block实现队列
ACE_Dynamic_Message_Queue:其将队列中的消息分为三个部分

  • PENDING:当前时间小于消息的到期时间
  • LATE:当前时间大于消息的到期时间
  • BEYOND_LATE:当前时间大于消息的到期时间并且大于最大超时时间
    在这里插入图片描述

另外其依赖于ACE_Dynamic_Message_Strategy 抽象,支持两个策略,一个是基于最后期限的ACE_Deadline_Message_Strategy,另一个是基于松弛度的ACE_Laxity_Message_Strategy

ACE_Dynamic_Message_Queue
«abstract»
ACE_Dynamic_Message_Strategy
ACE_Deadline_Message_Strategy
ACE_Laxity_Message_Strategy

ACE_Deadline_Message_Strategy计算优先级是与消息的最后期限作比较,ACE_Laxity_Message_Strategy会加上执行时间

void
ACE_Deadline_Message_Strategy::convert_priority (ACE_Time_Value & priority,
                                                 const ACE_Message_Block & mb)
{
  priority -= mb.msg_deadline_time ();
}
void
ACE_Laxity_Message_Strategy::convert_priority (ACE_Time_Value & priority,
                                               const ACE_Message_Block & mb)
{
  priority += mb.msg_execution_time ();
  priority -= mb.msg_deadline_time ();
}

ACE_Message_Queue_Ex:消息类型为类的第一个模板参数ACE_MESSAGE_TYPE
ACE_Message_Queue_Ex_N:在入队列前,将多个ACE_MESSAGE_TYPE拼接成一个由ACE_Message_Block构成的链表再放入队列,要求ACE_MESSAGE_TYPE类有next()方法

template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> ACE_Message_Block *
ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::wrap_with_mbs_i
  (ACE_MESSAGE_TYPE *new_item)
{
  ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::wrap_with_mbs_i");

  // We need to keep a reference to the head of the chain
  ACE_Message_Block *mb_head = 0;

  ACE_NEW_RETURN (mb_head,
                  ACE_Message_Block ((char *) new_item,
                                     sizeof (*new_item),
                                     ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::DEFAULT_PRIORITY),
                  0);

  // mb_tail will point to the last ACE_Message_Block
  ACE_Message_Block *mb_tail = mb_head;

  // Run through rest of the messages and wrap them
  for (ACE_MESSAGE_TYPE *pobj = new_item->next (); pobj; pobj = pobj->next ())
    {
      ACE_Message_Block *mb_temp = 0;
      ACE_NEW_NORETURN (mb_temp,
                        ACE_Message_Block ((char *) pobj,
                                           sizeof (*pobj),
                                           ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::DEFAULT_PRIORITY));
      if (mb_temp == 0)
        {
          mb_head->release ();
          mb_head = 0;
          break;
        }

      mb_tail->next (mb_temp);
      mb_tail = mb_temp;
    }

  return mb_head;
}

队列创建

ACE_Message_Queue_Factory<SYNC, TIME_POLICY>
ACE_Message_Queue
ACE_Dynamic_Message_Queue
ACE_Message_Queue_Vx
ACE_Message_Queue_NT

ACE_Message_Queue_Factory:消息队列工厂模板类

  • create_static_message_queue:创建静态消息队列
  • create_deadline_message_queue:创建最终期限的动态消息队列
  • create_laxity_message_queue:创建松弛度的动态消息队列
  • create_Vx_message_queue:创建vx消息队列
  • create_NT_message_queue:创建nt消息队列

http://www.kler.cn/a/460830.html

相关文章:

  • 大模型 LangChain 开发框架:Runable 与 LCEL 初探
  • LeetCode:106.从中序与后序遍历序列构造二叉树
  • 小波滤波器处理一维信号-附Matlab源代码
  • leetcode 149. 直线上最多的点数
  • Leetcode 3405. Count the Number of Arrays with K Matching Adjacent Elements
  • Java重要面试名词整理(二十):GatewaySkyWalking
  • 《Java核心技术II》抽取子流和组合流
  • 攻破 kioprix level 4 靶机
  • C++语言编程————C++数据类型与表达式
  • 期权懂|国内场外期权都有哪些种类?
  • MybatisPlus查询更so easy
  • 数据结构与算法之动态规划: LeetCode 62. 不同路径 (Ts版)
  • 非常简单实用的前后端分离项目-仓库管理系统(Springboot+Vue)part 5(未实现预期目标)
  • Pytest 高级用法:间接参数化
  • 25考研希望渺茫,工作 VS 二战,怎么选?
  • 2024年RAG:回顾与展望
  • KEGG大更新:开启生物研究新纪元
  • 物联网技术在电商API接口中的应用实践
  • Spring Boot中使用Zookeeper实现分布式锁的案例
  • SQL相关子查询
  • 【数据分析】基于GEE的2000-2023年逐年归一化差值水分指数(NDMI)获取-以成都市为例
  • neo4j图数据库简介
  • AI的未来?华为仓颉编程语言与人工智能的接轨
  • 【网络协议】什么是 BGP? | 解释 BGP 路由
  • 【算法题解】B. President‘s Office - Python实现
  • 如何利用小程序高效获客,小程序引流怎么样