当前位置: 首页 > article >正文

Linux-Robust-Futex学习笔记

robust futex

简介

概述: 为了保证futex的robustness,添加了一种能够用于处理进程异常终止时锁状态的机制,就是当拥有锁的线程异常终止时,该线程能够将锁进行释放

基本原理: 每个线程都有一个私有的robust list,这个list在线程生命周期只能注册一次。在线程退出时,kernel会检查这个list,并进行相应的清理操作。

线程在执行do_exit()时常见的情况以及相应的处理方式:

  • 常见情况:在大多数情况下,在线程执行do_exit函数时,kernel并没有注册任何list,因此对于robust list的处理成本仅仅是进行一个简单的比较,即检查当前线程的robust list是否为空
  • 已注册list的情况:如果线程已经注册了list,通常这个list也大概率是空的,如果是正常退出的
  • 线程异常退出的情况:如果注册的list不为空,则kernel会遍历这个list,并将所有由该线程拥有的锁标记为FUTEX_OWNER_DIED,然后唤醒一个waiter

潜在的竞争条件: 由于添加和删除list是在获取锁之后进行的,所以给线程留下了一个小窗口,在此期间可能导致线程异常退出,从而使锁被悬挂,为了防止这种可能性,还需要多维护一个list_op_pending字段,并在完成列表的添加或删除操作后将其清除【存在疑惑】

实现细节

新系统调用:

  • sys_set_robust_list:注册robust list,接收一个指向robust list head的指针和list head的长度作为参数

    long get_robust_list(int pid, struct robust_list_head **head_ptr, size_t *len_ptr);
    
  • sys_get_robust_list:查询已注册的robust list,接收一个PID作为参数,并返回指向robust list head的指针和len指针。

    • 如果传入的PID为0,则返回调用线程自身的robust list head指针

    • long set_robust_list(struct robust_list_head *head, size_t len);
      
  • 当操作成功,返回0,否则返回错误码:

    • set_robust_list():
      • EINVAL:len和kernel预期的结构体robust_list_head的大小不匹配
    • get_robust_list():
      • ERERM:调用进程无权查看TID为PID的线程的robust list,并且不具有CAP_SYS_PTRACE功能(这是一个linux中的特权能力,用于允许进程进行调试和trace其他进程,具有该能力的成可以使用ptrace()系统调用来观察和控制其他进程的执行,包括读取和修改其内存、寄存器和指令流)
      • ESRCH:找不到TID为PID的线程
      • EFALUT:robust list head不能存储在location head中

注册list: robust list指针简单存储在robust_list字段中,如果robus futex比较普通,可以通过扩展sys_clone来为新线程注册robust list head,而无需通过sys_set_robust_list注册

系统调用开销:

  • 不使用robust futex的任务:几乎没有额外开销
  • 使用robust futex的任务:每个线程的生命周期只会增加一个额外的系统调用,清理futex的操作时快速且简单的

Futex字段值

  • 如果未获取锁,futex值的后面30位(即TID位)为0,所以只需要看后面30位是否有值就知道该futex是否被上锁
  • 如果锁被获取,futex值的后面30位(即TID位)为拥有该锁的线程的TID
  • 如果锁时拥有的,并且有线程争用锁,则应在futex中标识FUTEX_WAITERS位,即FUTEX_WAITERS | TID(FUTEX_WAITERS为0x8000_0000,也就是第一位)
  • 如果线程退出时发现某个robust futex仍被持有,kernel会设置相应的futex字段的FUTEX_OWNER_DIED位(0x4000_0000,也就是第二位),然后唤醒下一个waiter(如果存在)。其余清理工作由用户空间完成

代码示例

使用示例程序

代码仓库: 1037827920/Futex-Example-Program

Rust示例程序:

use libc::{
    syscall, SYS_futex, SYS_get_robust_list, SYS_gettid, SYS_set_robust_list, FUTEX_WAIT,
    FUTEX_WAKE,
};
use std::{
    io, mem, panic, ptr,
    sync::{
        atomic::{AtomicU32, Ordering},
        Arc, Mutex,
    },
    thread,
    time::Duration,
};

const FUTEX_INIT: u32 = 0x0000_0000;
const FUTEX_WAITERS: u32 = 0x8000_0000;
const FUTEX_TID_MASK: u32 = 0x3fff_ffff;

#[repr(C)]
#[derive(Debug, Clone)]
struct RobustList {
    next: *mut RobustList,
    // futex: u32, (通过偏移量访问)
}

#[repr(C)]
#[derive(Debug, Clone)]
struct RobustListHead {
    list: RobustList,
    /// 这个偏移量的作用是为了能够通过(list项的地址+偏移量)得到futex用户空间地址
    /// futex_offset的单位为字节,即1表示一个字节的偏移量
    /// 在用户空间添加这个字段还能进行灵活编码偏移量而不是在内核硬编码
    futex_offset: i64,
    list_op_pending: *mut RobustList,
}

impl RobustListHead {
    fn new(futex_offset: i64) -> RobustListHead {
        RobustListHead {
            list: RobustList {
                next: ptr::null_mut(),
            },
            futex_offset,
            list_op_pending: ptr::null_mut(),
        }
    }

    fn push(&mut self, futex: AtomicU32) {
        let new_node = Box::into_raw(Box::new(RobustList {
            next: self.list.next,
        }));
        unsafe {
            ((new_node as *mut u8).offset(self.futex_offset as isize) as *mut AtomicU32)
                .write(futex);
        }
        self.list.next = new_node;
    }

    fn print_robust_list(&self) {
        let mut current = self.list.next;
        while !current.is_null() {
            unsafe {
                print!(
                    "{:?}(futex: {:?}, futex_val: {:?}) -> ",
                    current,
                    (current as *const u8).add(self.futex_offset as usize),
                    ((current as *const u8).add(self.futex_offset as usize) as *const AtomicU32)
                        .read(),
                );
                current = (*current).next;
            }
        }
        println!("NULL");
    }

    fn get_futex(&self, index: usize) -> *const AtomicU32 {
        let mut current = self.list.next;
        for _ in 0..index {
            if current.is_null() {
                return ptr::null_mut();
            }
            unsafe {
                current = (*current).next;
            }
        }
        if current.is_null() {
            return ptr::null_mut();
        }
        unsafe { (current as *const u8).add(self.futex_offset as usize) as *mut AtomicU32 }
    }
}

unsafe impl Send for RobustListHead {}
unsafe impl Sync for RobustListHead {}

fn main() {
    test_set_and_get_robust_list();
    test_robust_futex();
}

fn futex_wait(futex: *const AtomicU32, thread: &str, tid: i64) {
    let futex_ref = unsafe { &*futex };
    loop {
        // 如果当前futex没有被其他线程持有
        if (futex_ref.load(Ordering::SeqCst) & FUTEX_TID_MASK) == 0 {
            futex_ref.swap(tid as u32, Ordering::SeqCst);
            // 加锁后直接返回,这样就不用执行系统调用,减少一定开销
            println!(
                "线程{thread}上锁成功, futex值: {:#x}",
                (*futex_ref).load(Ordering::SeqCst)
            );
            return;
        }

        // 标识futex的FUTEX_WAITERS位
        futex_ref.fetch_or(FUTEX_WAITERS, Ordering::SeqCst);
        println!(
            "线程{thread}正在等待futex, futex值: {:#x}",
            (*futex_ref).load(Ordering::SeqCst)
        );
        let ret = unsafe {
            syscall(
                SYS_futex,
                futex_ref as *const AtomicU32 as *mut u32,
                FUTEX_WAIT,
                futex_ref.load(Ordering::SeqCst),
                0,
                0,
                0,
            )
        };
        if ret == -1 {
            panic!("futex_wait系统调用执行失败");
        }
    }
}

fn futex_wake(futex: *const AtomicU32, thread: &str) {
    let futex_ref = unsafe { &*futex };
    let ret = unsafe {
        syscall(
            SYS_futex,
            futex_ref as *const AtomicU32 as *mut u32,
            FUTEX_WAKE,
            1,
            0,
            0,
            0,
        )
    };
    if ret == -1 {
        panic!("futex_wake系统调用执行失败");
    }
    futex_ref.store(FUTEX_INIT, Ordering::SeqCst);
    println!("线程{thread}释放锁");
}

/// 向kernel注册一个robust list
fn set_robust_list(robust_list_head_ptr: *const RobustListHead) {
    let ret = unsafe { syscall(SYS_set_robust_list, robust_list_head_ptr, mem::size_of::<RobustListHead>()) };
    if ret == -1 {
        panic!(
            "set_robust_list系统调用执行失败, Err: {:?}",
            io::Error::last_os_error()
        );
    }
}

/// 获取kernel注册的robust list
/// 当pid=0时,表示获取当前进程的robust list
fn get_robust_list(pid: i32, robust_list_head_ptr: &mut *mut RobustListHead, len: &mut usize) {
    let ret = unsafe { syscall(SYS_get_robust_list, pid, robust_list_head_ptr, len) };
    if ret == -1 {
        panic!(
            "get_robust_list系统调用执行失败, Err: {:?}",
            io::Error::last_os_error()
        );
    }
}

/// 测试set_robust_list系统调用和get_robust_list系统调用
fn test_set_and_get_robust_list() {
    // 初始化robust list head
    // 8字节为RobustList的大小
    let mut robust_list_head = RobustListHead::new(8);

    // 初始化robust list
    robust_list_head.push(AtomicU32::new(FUTEX_INIT));
    robust_list_head.print_robust_list();

    // 设置robust list
    let robust_list_head_ptr = &robust_list_head as *const RobustListHead;
    println!(
        "robust_list_head_ptr: {:?}, len: {:?}",
        robust_list_head_ptr, mem::size_of::<RobustListHead>()
    );
    set_robust_list(robust_list_head_ptr);

    // 获取robust list
    let mut robust_list_head_ptr_geted: *mut RobustListHead = ptr::null_mut();
    let mut len_geted: usize = 0;
    get_robust_list(0, &mut robust_list_head_ptr_geted, &mut len_geted);
    println!(
        "robust_list_head_ptr_geted: {:?}, robust_list_head: {:?} len_geted: {:?}",
        robust_list_head_ptr_geted,
        unsafe { &*robust_list_head_ptr_geted },
        len_geted
    );
}

/// 测试一个线程异常退出时,robust list的表现
fn test_robust_futex() {
    // 创建robust list head
    // 8字节为RobustList的大小
    let robust_list_head = Arc::new(Mutex::new(RobustListHead::new(8)));

    // 初始化robust list
    let mut robust_list_head_guard = robust_list_head.lock().unwrap();
    (*robust_list_head_guard).push(AtomicU32::new(FUTEX_INIT));
    drop(robust_list_head_guard);

    // 线程1
    let robust_list_head_clone1 = robust_list_head.clone();
    let thread1 = thread::spawn(move || {
        let tid = unsafe { syscall(SYS_gettid) };
        println!("线程1的线程号: {tid}");

        // 向kernel注册robust list
        let robust_list_head_guard = robust_list_head_clone1.lock().unwrap();
        let robust_list_head_ptr = &(*robust_list_head_guard) as *const RobustListHead;
        set_robust_list(robust_list_head_ptr);

        // 尝试获取锁
        let futex = robust_list_head_guard.get_futex(0);
        futex_wait(futex, "1", tid);
        // 执行具体的业务逻辑
        thread::sleep(Duration::from_secs(5));

        // 模拟线程异常退出,是否会正常把未释放的锁释放掉
        println!("thread1异常退出");
        return;
    });

    thread::sleep(Duration::from_secs(3));

    // 线程2
    let robust_list_head_clone2 = robust_list_head.clone();
    let thread2 = thread::spawn(move || {
        let tid = unsafe { syscall(SYS_gettid) };
        println!("线程2的线程号: {tid}");

        // 向kernel注册robust list
        let robust_list_head_guard = robust_list_head_clone2.lock().unwrap();
        let robust_list_head_ptr = &(*robust_list_head_guard) as *const RobustListHead;
        set_robust_list(robust_list_head_ptr);

        // 尝试获取锁
        let futex = robust_list_head_guard.get_futex(0);
        println!(
            "线程2尝试获取锁, futex值: {:#x}",
            unsafe { &*futex }.load(Ordering::SeqCst)
        );
        futex_wait(futex, "2", tid);
        // 执行具体的业务逻辑
        thread::sleep(Duration::from_secs(3));
        // 释放锁
        futex_wake(futex, "2");
    });

    thread1.join().unwrap();
    thread2.join().unwrap();
}

C示例程序:

#include <stdio.h>
#include <linux/futex.h>
#include <stdatomic.h>
#include <stdlib.h>
#include <stdint.h>
#include <sys/syscall.h>
#include <unistd.h>
#include <pthread.h>

#define FUTEX_INIT 0x00000000
#define FUTEX_WAITERS 0x80000000
#define FUTEX_TID_MASK 0x3fffffff

// 定义一个结构体来封装线程任务参数
typedef struct {
    struct robust_list_head* rlh;
    int thread;
} thread_task_args;

// 向robust list插入新节点
void robust_list_push(struct robust_list_head* rlh, atomic_uint futex) {
    struct robust_list* new_robust_list = (struct robust_list*)malloc(sizeof(struct robust_list));
    if (new_robust_list == NULL) {
        perror("new robust list分配内存失败");
    }

    // 通过偏移量将futex写入新robust list
    atomic_uint* futex_ptr = (atomic_uint*)((uint8_t*)new_robust_list + rlh->futex_offset);
    *futex_ptr = futex;

    rlh->list.next = new_robust_list;
}

// 获取索引为index的futex地址
atomic_uint* robust_list_get_futex(struct robust_list_head* rlh, size_t index) {
    struct robust_list* current = rlh->list.next;
    size_t i = 0;
    while (current != NULL) {
        if (i == index) {
            return (atomic_uint*)((uint8_t*)current + rlh->futex_offset);
        }
        current = current->next;
        i++;
    }
    return NULL;
}

// 打印robust list
void robust_list_print(struct robust_list_head* rlh) {
    struct robust_list* current = rlh->list.next;
    while (current != NULL) {
        atomic_uint* futex_ptr = (atomic_uint*)((uint8_t*)current + rlh->futex_offset);
        atomic_int futex = *futex_ptr;

        printf("%p(futex: %p, futex_val: %u) -> ", (void*)current, (void*)futex_ptr, futex);

        current = current->next;
    }
    printf("NULL\n");
}

// 向kernel注册robust list
void set_robust_list(struct robust_list_head* rlh) {
    long ret = syscall(SYS_set_robust_list, rlh, sizeof(struct robust_list_head));
    if (ret == -1) {
        perror("set_robust_list系统调用执行失败\n");
    }
}

// 从kernel获取robust list
void get_robust_list(int pid, struct robust_list_head** rlh, size_t* len) {
    long ret = syscall(SYS_get_robust_list, pid, rlh, len);
    if (ret == -1) {
        perror("get_robust_list系统调用执行失败\n");
    }
}

// futex_wait函数
void* futex_wait(atomic_uint* futex, int thread, long tid) {
    while (1) {
        // 如果当前futex没有其他线程持有
        if ((*futex & FUTEX_TID_MASK) == 0) {
            atomic_exchange(futex, (unsigned int)tid);
            // 加锁后直接返回
            printf("线程%d上锁成功. futex值: 0x%x\n", thread, *futex);
            return NULL;
        }

        // 线程进入等待状态
        atomic_fetch_or(futex, FUTEX_WAITERS);
        printf("线程%d正在等待futex, futex值: 0x%x\n", thread, *futex);
        long ret = syscall(SYS_futex, (unsigned*)futex, FUTEX_WAIT, *futex, 0, 0, 0);
        if (ret == -1) {
            perror("futex_wait系统调用执行失败\n");
            return NULL;
        }
    }
}

// futex_wake函数
void* futex_wake(atomic_uint* futex, int thread) {
    long ret = syscall(SYS_futex, (unsigned*)futex, FUTEX_WAKE, 1, 0, 0, 0);
    if (ret == -1) {
        perror("futex_wake系统调用执行失败\n");
        return NULL;
    }
    atomic_store(futex, FUTEX_INIT);
    printf("线程%d释放锁\n", thread);
    return NULL;
}

// 测试set_robust_list系统调用和get_robust_list系统调用
void test_set_and_get_robust_list() {
    // 创建一个robust list head
    struct robust_list_head* rlh = (struct robust_list_head*)malloc(sizeof(struct robust_list_head));
    rlh->futex_offset = 8;

    // 初始化robust list
    robust_list_push(rlh, 0x00000000);
    robust_list_print(rlh);

    // 设置robust list
    printf("robust_list_head_ptr: %p, len: %ld\n", rlh, sizeof(struct robust_list_head));
    set_robust_list(rlh);

    // 获取robust list
    struct robust_list_head* rlh_geted = (struct robust_list_head*)malloc(sizeof(struct robust_list_head));
    size_t len_geted;
    get_robust_list(0, &rlh_geted, &len_geted);
    printf("robust_list_head_geted_ptr: %p, len_geted: %ld\n", rlh_geted, len_geted);
}

void* thread1_task(void* arg) {
    thread_task_args* args = (thread_task_args*)arg;
    // robust list head指针
    struct robust_list_head* rlh = args->rlh;
    // 线程号
    int thread = args->thread;
    // TID
    long tid = syscall(SYS_gettid);

    // 注册robust list
    set_robust_list(rlh);

    // 尝试获取锁
    atomic_uint* futex = robust_list_get_futex(rlh, 0);
    futex_wait(futex, thread, tid);
    // 执行具体的业务逻辑
    sleep(5);

    // 线程异常退出
    printf("线程%d异常退出\n", thread);
    pthread_exit(NULL);
}

void* thread2_task(void* arg) {
    thread_task_args* args = (thread_task_args*)arg;
    // robust list head指针
    struct robust_list_head* rlh = args->rlh;
    // 线程号
    int thread = args->thread;
    // TID
    long tid = syscall(SYS_gettid);

    // 注册robust list
    set_robust_list(rlh);

    // 尝试获取锁
    atomic_uint* futex = robust_list_get_futex(rlh, 0);
    futex_wait(futex, thread, tid);
    // 执行具体的业务逻辑
    sleep(5);
    // 释放锁
    futex_wake(futex, thread);
    return NULL;
}

// 测试一个线程异常退出时,robust list的表现
void test_robust_futex() {
    // 线程句柄
    pthread_t t1, t2;

    // 创建robust list head
    struct robust_list_head* rlh = (struct robust_list_head*)malloc(sizeof(struct robust_list_head));

    // 初始化robust list
    robust_list_push(rlh, 0x00000000);

    // 构建线程参数
    thread_task_args args1 = { rlh, 1 };
    thread_task_args args2 = { rlh, 2 };

    // 线程1
    pthread_create(&t1, NULL, thread1_task, (void*)&args1);

    // 等线程1先获取futex锁
    sleep(3);

    // 线程2
    pthread_create(&t2, NULL, thread2_task, (void*)&args2);

    // 等待线程结束
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
}

int main() {
    test_set_and_get_robust_list();
    test_robust_futex();
}

运行结果:
在这里插入图片描述

实现示例程序

【待完成】

参考

  • futex(2) - Linux 手册页 — futex(2) - Linux manual page

http://www.kler.cn/a/526392.html

相关文章:

  • OpenFeign的工作原理是什么?它第一次加载的时候为什么慢?
  • 12.udp
  • 详解python的修饰符
  • JavaScript系列(50)--编译器实现详解
  • SQL注入漏洞之绕过[前端 服务端 waf]限制 以及 防御手法 一篇文章给你搞定
  • pytorch实现半监督学习
  • Timer计时器
  • 从0到1:C++ 开启游戏开发奇幻之旅(二)
  • 我的求职面经:(1)C++里指针和数组的区别
  • 【异或和之差——Trie,DP】
  • 基于Springboot的社区药房管理系统
  • 【练习】PAT 乙 1023 组个最小数
  • 镭速大文件传输自动选择压缩算法原理
  • 学技术学英语:elasticsearch查询的两阶段queryingfetching
  • 如何监控ubuntu系统某个程序的运行状态,如果程序出现异常,对其自动重启。
  • MATLAB的数据类型和各类数据类型转化示例
  • 基于新一代电子电器架构的SOA服务设计方法
  • DeepSeek R1与OpenAI o1深度对比
  • 智能家居监控系统数据收集积压优化
  • games101-作业3
  • 【漫话机器学习系列】069.哈达马乘积(Hadamard Product)
  • llama.cpp LLM_CHAT_TEMPLATE_DEEPSEEK_3
  • NLP模型大对比:Transformer >Seq2Seq > LSTM > RNN > n-gram
  • 能源行业区块链相关的书籍
  • 【Linux】线程互斥与同步
  • Python标准库 - os (2) 进程管理