ACE之ACE_Message_Queue
简介
ACE_Message_Queue是ACE中通用消息队列,其是ACE_Task以及并发中ACE_Activation_Queue的基础
结构
ACE_Message_Queue_Base
ACE_Message_Queue_Base
是消息队列的抽象类,与其相关的枚举有队列的状态以及高位,低位大小
enum
{
DEFAULT_HWM = 16 * 1024,
DEFAULT_LWM = 16 * 1024,
ACTIVATED = 1,
DEACTIVATED = 2,
PULSED = 3
};
包含一个表示状态的成员
protected:
/// <ACTIVATED>, <DEACTIVATED>, or <PULSED>.
int state_;
相关的抽象方法
virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0) = 0;
virtual int enqueue_tail (ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0) = 0;
virtual int enqueue (ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0) = 0;
virtual int dequeue_head (ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0) = 0;
virtual int dequeue (ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0) = 0;
virtual bool is_full (void) = 0;
virtual bool is_empty (void) = 0;
virtual size_t message_bytes (void) = 0;
virtual size_t message_length (void) = 0;
virtual size_t message_count (void) = 0;
virtual void message_bytes (size_t new_size) = 0;
virtual void message_length (size_t new_length) = 0;
virtual int deactivate (void) = 0;
virtual int activate (void) = 0;
virtual int pulse (void) = 0;
virtual int state (void);
virtual int deactivated (void) = 0;
virtual ACE_Notification_Strategy *notification_strategy (void) = 0;
virtual void notification_strategy (ACE_Notification_Strategy *s) = 0;
virtual void dump (void) const = 0;
队列实现类
ACE_Message_Queue是队列的实现类模板,支持同步策略和时间策略,通过ACE_Message_Block
实现队列
ACE_Dynamic_Message_Queue:其将队列中的消息分为三个部分
- PENDING:当前时间小于消息的到期时间
- LATE:当前时间大于消息的到期时间
- BEYOND_LATE:当前时间大于消息的到期时间并且大于最大超时时间
另外其依赖于ACE_Dynamic_Message_Strategy
抽象,支持两个策略,一个是基于最后期限的ACE_Deadline_Message_Strategy
,另一个是基于松弛度的ACE_Laxity_Message_Strategy
ACE_Deadline_Message_Strategy
计算优先级是与消息的最后期限作比较,ACE_Laxity_Message_Strategy
会加上执行时间
void
ACE_Deadline_Message_Strategy::convert_priority (ACE_Time_Value & priority,
const ACE_Message_Block & mb)
{
priority -= mb.msg_deadline_time ();
}
void
ACE_Laxity_Message_Strategy::convert_priority (ACE_Time_Value & priority,
const ACE_Message_Block & mb)
{
priority += mb.msg_execution_time ();
priority -= mb.msg_deadline_time ();
}
ACE_Message_Queue_Ex:消息类型为类的第一个模板参数ACE_MESSAGE_TYPE
ACE_Message_Queue_Ex_N:在入队列前,将多个ACE_MESSAGE_TYPE拼接成一个由ACE_Message_Block构成的链表再放入队列,要求ACE_MESSAGE_TYPE类有next()方法
template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL, class TIME_POLICY> ACE_Message_Block *
ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::wrap_with_mbs_i
(ACE_MESSAGE_TYPE *new_item)
{
ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::wrap_with_mbs_i");
// We need to keep a reference to the head of the chain
ACE_Message_Block *mb_head = 0;
ACE_NEW_RETURN (mb_head,
ACE_Message_Block ((char *) new_item,
sizeof (*new_item),
ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::DEFAULT_PRIORITY),
0);
// mb_tail will point to the last ACE_Message_Block
ACE_Message_Block *mb_tail = mb_head;
// Run through rest of the messages and wrap them
for (ACE_MESSAGE_TYPE *pobj = new_item->next (); pobj; pobj = pobj->next ())
{
ACE_Message_Block *mb_temp = 0;
ACE_NEW_NORETURN (mb_temp,
ACE_Message_Block ((char *) pobj,
sizeof (*pobj),
ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE, TIME_POLICY>::DEFAULT_PRIORITY));
if (mb_temp == 0)
{
mb_head->release ();
mb_head = 0;
break;
}
mb_tail->next (mb_temp);
mb_tail = mb_temp;
}
return mb_head;
}
队列创建
ACE_Message_Queue_Factory:消息队列工厂模板类
- create_static_message_queue:创建静态消息队列
- create_deadline_message_queue:创建最终期限的动态消息队列
- create_laxity_message_queue:创建松弛度的动态消息队列
- create_Vx_message_queue:创建vx消息队列
- create_NT_message_queue:创建nt消息队列