Linux 内核学习(5) --- Linux 内核底半部机制
目录
- 中断底半部
- 软中断
- tasklet
- 工作队列
- 使用工作队列
中断底半部
当产生一个中断时,会进入中断处理程序,但中断处理程序必须快速、异步、简单的对硬件做出迅速响应并完成那些时间要求很严格的操作,因此,对于那些其他的、对时间要求相对宽松的任务,就应该推后到中断被激活以后再去运行
这样,整个中断处理流程就被分为了两个部分:
第一个部分是中断处理程序上半部(top half
),内核通过对它的异步执行完成对硬件中断的即时响应(完成清楚中断标志等操作)
下半部(bottom half
) 下半部的任务主要是执行与中断相关的工作,这些工作没有被中断服务程序本身完成
下半部并不需要指明一个确切时间,只要把这些任务推迟一点,让它们在系统不太繁忙并且中断恢复后执行就可以了
上半部和下半部的主要区别:
- 上半部指的是中断处理程序,下半部则指的是一些虽然与中断有相关性但是可以延后执行的任务
- 上半部中断不能被相同类型的中断打断,而下半部依然可以被中断打断
- 通常下半部在中断处理程序一返回就会马上运行
- 上半部分简单快速,执行的时候禁止一些或者全部中断,下半部分稍后执行,而且执行期间可以响应所有的中断
linux
内核中,对中断下半部实现的方式有下面几种:
tasklet
- 软中断
- 工作队列
软中断
softirq
即软中断,代码位于 kernel/softirq.c
文件中, 每个软中断的处理函数用 softirq_action
表示:
// 软中断处理函数
struct softirq_action
{
void (*action)(struct softirq_action *);
};
// 软中断处理向量表
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;
enum
{
HI_SOFTIRQ=0,
TIMER_SOFTIRQ,
NET_TX_SOFTIRQ,
NET_RX_SOFTIRQ,
BLOCK_SOFTIRQ,
IRQ_POLL_SOFTIRQ,
TASKLET_SOFTIRQ,
SCHED_SOFTIRQ,
HRTIMER_SOFTIRQ, /* Unused, but kept as tools rely on the
numbering. Sigh! */
RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */
NR_SOFTIRQS
};
数组的成员数由 NR_SOFTIRQS
决定,是一个枚举常量。
新增一个软中断时,需要在文件 include/linux/interrupt.h
中添加一个枚举常量
软中断使用的几个要点:
- 一个软中断不会抢占另外一个软中断。
- 惟一可以抢占软中断的是中断处理程序。
- 其他的软中断可以在其他处理器上同时执行
注册软中断的接口:
void open_softirq(int nr, void (*action)(struct softirq_action *));
// kernel/soft_irq.c
void open_softirq(int nr, void (*action)(struct softirq_action *))
{
softirq_vec[nr].action = action;
}
使用 open_softirq
即注册对应类型的处理函数到全局数组 softirq_vec
中
触发软件中断的接口:
void raise_softirq(unsigned int nr);
实际上即以软中断类型 nr
作为偏移量会置位 irq_stat[cpu_id]
的成员变量 __softirq_pending.
__softirq_pending
字段中的每一个 bit
,对应着某一个软中断,某个 bit
被置位,说明有相应的软中断等待处理
这也是同一类型软中断可以在多个 cpu
上并行运行的根本原因
可以看到,使用软中断是需要修改内核,添加一个枚举的,有些繁琐
所以,通常我们不建议擅自增加软中断的数量,如果需要新的软中断,尽可能把它们实现为基于软中断的 tasklet
形式
软中断在内核中的处理是通过专门的内核线程来完成的,这些内核线程通常与 CPU
核心绑定,并且在一个称为ksoftirqd
的线程中运行。
每个 CPU
核心都有一个对应的 ksoftirqd
线程,例如,对于 CPU
核心 0,线程名为 ksoftirqd/0
,对于核心1,则是ksoftirqd/1
,以此类推
当软中断被触发时,它会被标记为待处理,并在适当的时候由 ksoftirqd
线程进行处理。这些线程会周期性地检查是否有待处理的软中断,并执行相应的处理函数
软中断的处理函数通常是原子的,并且不能睡眠。这意味着它们不能调用任何可能导致当前线程睡眠的函数,例如那些可能引起阻塞的 API
,注意这里的软中断和系统调用产生的软件中断不是一个含义
tasklet
tasklet 是利用软中断实现的一种下半部机制,关于软中断的和基于软中断实现的 tasklet
的选择:通常你应该用 tasklet
,就像我们在前面看到的,软中断资源有限,也麻烦,而且软中断的使用者屈指可数,它只在那些执行频率很高和连续性要求很高的情况下才需要
而 tasklet
却有更广泛的用途,大多数情况下用tasklet
效果都不错,而且它们还非常容易使用
因为 tasklet
是通过软中断实现的,所以它们本身也是软中断
创建 tasklet
:
#define DECLARE_TASKLET(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }
#define DECLARE_TASKLET_DISABLED(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }
本质上可以静态的创建出一个 tasklet_struct
结构,当 tasklet
被调度之后,对应的函数就会执行,参数由 data
给出
这两个宏之间的区别是引用计数的初始值不同,前一个将引用计数初始值设置 0,tasklet
处于激活状态,
另一个设置为 1,tasklet
处于禁止状态
tasklet struct
的 定义如下图所示:
// include/linux/interrupts.h
struct tasklet_struct {
struct tasklet_struct *next;
unsigned long state;
atomic_t count;
void (*func)(unsigned long);
unsigned long data;
};
还可以通过指针的方式动态创建一个 tasklet
:
struct tasklet_struct {
struct tasklet_struct *next;
unsigned long state;
atomic_t count;
void (*func)(unsigned long);
unsigned long data;
};
tasklet
的调度方式: 使用 tasklet_shedule
的方式调度 tasklet
static inline void tasklet_schedule(struct tasklet_struct *t);
下面是使用 tasklet
的一个实例:
// tasklet bottom half handle function
void btn_tasklet_func(unsigned long data)
{
printk("tasklet: btn interrupt handle\n");
wake_up_interruptible(&btn_wq);
}
DECLARE_TASKLET(btn_tasklet, btn_tasklet_func, 0)
static irqreturn_t btn_interrupt(int irq, void *dev)
{
struct pin_t *p = (struct pin_t*)dev;
key_values = p-> key_val;
tasklet_shedule(&btn_tasklet);
return IRQ_HANDLED;
}
先使用 DECLARE_TASKLET
静态声明一个 tasklet
,指定其下半部函数为 btn_tasklet_func
,在中断服务函数(上半部)获取按键值后,调用 tasklet_schedule
调度
工作队列
work queue
即工作队列,也是中断下半部的一种, work queue
将下半部工作推迟给一个内核线程去执行 ==> work 总是会在进程的上下文执行,重要的是 workqueue
允许重新调度甚至睡眠
两个关键的点:
- 如果推迟的工作需要睡眠,则使用
work queues
,否则使用softirq
或tasklets
work queues
适用于需要分配大量的内存,获得一个信号量,或者执行阻塞的I/O的情况
工作队列创建的内核线程称为 worker
线程(work thread
),工作队列子系统创建了一个缺省的工作者线程来处理这些推后的工作,一般都是使用缺省的工作线程
workqueue
允许在两个主要类型的线程中执行工作:
- 普通的内核线程:这些线程可以执行任何类型的工作,并且可以在多个
CPU
上并行执行。它们适用于通用的、非CPU
亲和性的工作。 - 绑定到特定
CPU
的内核线程:这些线程与特定的CPU
核心绑定,并专门用于在该核心上执行工作。这适用于需要与特定硬件交互或者需要保持数据局部性的任务
内核中存在两种类型的工作者线程:
默认工作队列(default workqueue
): 这是最常见的工作队列,它由内核自动创建和管理工作线程。默认工作队列中的工作可以在系统的任何一个CPU上执行
自定义工作队列(custom workqueue
): 用户可以创建自己的工作队列,并指定它们的工作线程是否绑定到特定的CPU核心上,如果绑定到特定的CPU
,那么工作只会在这个CPU
上执行,这有助于提高缓存亲和性
对于默认工作队列,内核会为每个CPU
核心创建一个工作者线程,这些线程的名字通常以 kworker
开头,后面跟着CPU编号,例如,如果一个系统有4个CPU
核心,那么可能会有名为 kworker/0:0
、kworker/1:0
、kworker/2:0
和kworker/3:0
的线程
对于自定义工作队列,如果它们是绑定到CPU
的,那么线程的名字会反映这一点,例如 kworker/u2:0
表示这是一个绑定到CPU
核心0的用户创建的工作队列的工作者线程。
使用工作队列
// include/linux/workqueue.h n for name f for function
#define DECLARE_WORK(n, f) struct work_struct n = __WORK_INITIALIZER(n, f)
DECLARE_WORK(name, void (*func)(void* ));
DECLARE_WORK
会静态的创建一个名称为 name
,处理函数为 func
的 work_struct
结构,这个 work_struct
函数每个工作队列的成员都会带一个(创建 work_struct
结构)
也可以在运行时通过指针创建一个 work
,传入的 work_struct
的指针
函数原型: INIT_WORK(struct work_struct *work, void(*func)(void*))
#define INIT_WORK(_work, _func) __INIT_WORK((_work), (_func), 0)
这样会动态的初始化一个 work
指向的工作,处理函数为 func
工作队列的处理函数原型:
void work_handler(void *data)
这样的函数会有一个工作者线程执行,默认的情况下,允许响应中断,并且不持有任何的锁,如何需要,函数可以睡眠
对工作队列进行调度:
static inline bool schedule_work(struct work_struct *work);
static inline bool schedule_delayed_work(struct delayed_work *dwork,unsigned long delay);
schedule_work
把给定的处理函数提交给缺省 events
的工作线程,work 马上会进行调度,一旦其所在处理器上的工作者线程被唤醒,它就会执行
schedule_delayed_work
经过一段时间后延时执行
刷新工作队列:
排入工作队列的工作会在 work thread
下一次被唤醒时执行,有时,在下一步工作之前,必须保证一些操作已经执行完毕了,卸载之前,有可能需要调用下面的函数,在内核的其他部分,为了防止竞争条件的出现,也有可能需要确保不再有待处理的工作:
static inline void flush_scheduled_work(void);
函数会一直等待,直到队列中所有的对象都被执行以后才会返回,在等待所有待处理的工作执行的时候,该函数会进入休眠状态,所以 只能在进程上下文使用
也可以创建新的工作队列,不使用缺省的工作队列函数:
// 返回一个 workqueue_struct 结构
#define create_workqueue(name) alloc_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM, 1, (name))
static inline bool queue_work(struct workqueue_struct *wq, struct work_struct *work)
static inline bool queue_delayed_work(struct workqueue_struct *wq,struct delayed_work *dwork, unsigned long delay);
// 刷新指定工作队列的函数
void flush_workqueue(struct workqueue_struct *wq);
参考代码:
struct drm_atomic_state {
......
struct work_struct commit_work;
}
// 在 drm_atomic_helper.c 中实现的 commit workqueue
INIT_WORK(&state->commit_work, commit_work);
static void commit_work(struct work_struct *work)
{
struct drm_atomic_state *state = container_of(work,
struct drm_atomic_state,
commit_work);
commit_tail(state);
}
if (nonblock)
queue_work(system_unbound_wq, &state->commit_work);
else
commit_tail(state);
任务队列和工作队列的区别:
任务队列(tasklet
):
-
任务队列是基于软中断的机制,它们是轻量级的,用于处理短小的、不需要睡眠的底半部任务。
-
任务队列保证在任意时刻在同一个
CPU
核心上只能执行一个特定的任务队列,因此它们不需要处理并发执行的问题,简化了同步 -
任务队列的执行上下文是中断上下文,这意味着任务队列中的代码不能睡眠
注意 tasklet
虽然是在中断上下文执行,但是不是在中断中执行的
工作队列(workqueue
):
工作队列是用于处理需要较长时间或者可以睡眠的底半部任务的机制。
-
工作队列中的任务会在内核线程中执行,这意味着它们可以睡眠,可以调用可能导致阻塞的函数,如 kmalloc、msleep等
-
工作队列提供了更多的灵活性,可以创建自定义的队列,并且可以控制任务的并发性和执行顺序
-
工作队列通常用于处理那些对延迟不太敏感的底半部任务,或者需要较长时间处理的任务