当前位置: 首页 > article >正文

Linux Futex学习笔记

Futex

简介

概述: Futex(Fast Userspace Mutex)是linux的一种特有机制,设计目标是避免传统的线程同步原语(如mutex、条件变量等)在用户空间和内核空间之间频繁的上下文切换。Futex允许在用户空间处理锁定和等待的操作,只有在必要时进入内核,从而减少了不必要的开销。

对比:

  • SpinLock:如果上锁成功,立即进入临界区,开销很小;但是如果上锁失败,CPU空转,浪费资源
  • Mutex:如果上锁失败,内核会切换到其他线程在CPU运行,不用自旋等待锁;但是即使是上锁成功也要进出内核,使用系统调用,会消耗几百个指令
  • Futex:结合上述二者的优点,在用户态尝试上锁,上锁成功既可以不用进入内核态,上锁失败就通过系统调用睡眠,唤醒也要通过系统调用
    • 在用户空间尝试加锁通常通过原子操作来完成,如CAS(Compare-And-Swap)或者其他无锁的原子操作

原理

本质: futex直接通过虚拟地址(是一个用户空间地址,通常是一个32位的锁变量字段,0表示空闲,1表示有线程持有当前锁)来处理锁,而不是像以前一样创建一个需要初始化以及跨进程全局内核句柄

用户空间地址: futex用户空间地址可以通过mmap系统调用分配,而mmap分配的内存有三种:匿名内存、共享内存段、内存映射文件。匿名内存只能用在同一进程的线程之间使用,而共享内存段和内存映射文件可以在多个进程之间使用

主要功能:

  • Futex原子操作:用户空间线程首先执行一些原子操作(如使用atomic函数或cmpxchg指令)来尝试获取锁
  • 短路路径:如果锁在用户空间已经被释放,线程可以直接在用户空间完成同步,无需涉及内核
  • 等待:如果一个线程尝试获取的锁已经被另一个线程占用,它可以调用futex_wait()进入休眠
  • 唤醒:当锁被释放,另一个线程可以调用futex_wake()唤醒等待的线程

涉及的系统调用:

  • futex():允许用户空间线程等待或唤醒其他线程。

    int futex(int *uddr, int op, int val, const struct timespec *timeout, int *uaddr2, int val2);
    
    • uaddr:用户空间地址,表示锁的位置
    • val:如果是FUTEX_WAIT,表示原子性地检查uaddr中的值是否为val,如果是则让进程睡眠,否则返回错误;如果是FUTEX_WAKE,表示最多唤醒val个等待在uaddr上的进程
    • op:操作类型:
      • FUTEX_WAIT:等待操作,线程会阻塞,直到指定的内存地址值发生变化
      • FUTEX_WAKE:唤醒操作,唤醒一个或多个等待该内存地址的线程
    • timeout:可选的等待时间,如果为空,那么调用会无限期睡眠。timeout默认会根据CLOCK_MONOTONIC时钟来计算,从linux4.5开始,可以再futex_op上指定FUTEX_CLOCK_REALTIME来选择CLOCK_REALTIME时钟。
    • uaddr2和val2在某些情况下用于双重条件等待
  • futex_wait()和futex_wake()是更高级的封装,用于在应用程序中更方便地实现等待和唤醒操作

实际应用

  • pthread_mutex:在linux中,pthread_mutex是基于futex实现的。
  • 内存屏障和自旋锁:在一些高效的同步机制中,futex也常常与自旋锁、内存屏障等技术结合使用

代码示例

代码仓库: 1037827920/Futex-Example-Program

futex本身的使用

Rust示例程序:(需要使用到libc crate)

use libc::{syscall, SYS_futex, FUTEX_WAIT, FUTEX_WAKE};
use std::{
    panic,
    sync::{
        atomic::{AtomicU32, Ordering},
        Arc,
    },
    thread,
    time::Duration,
};

const UNLOCKED: u32 = 0;
const LOCKED: u32 = 1;

macro_rules! futex_status {
    ($val:expr) => {
        if $val == 0 {
            "UNLOCKED"
        } else {
            "LOCKED"
        }
    };
}

fn main() {
    test_futex();
}

fn futex_wait(futex: &AtomicU32, thread: &str) {
    loop {
        // 如果当前futex没有被其他线程持有
        if futex
            .compare_exchange(UNLOCKED, LOCKED, Ordering::SeqCst, Ordering::SeqCst)
            .is_ok()
        {
            // 加锁后直接返回,这样就不用执行系统调用,减少一定开销
            println!(
                "线程{thread}上锁成功, futex状态: {}",
                futex_status!(futex.load(Ordering::SeqCst))
            );
            return;
        }

        // 线程进入等待状态
        println!("线程{thread}正在等待futex");
        let ret = unsafe {
            syscall(
                SYS_futex,
                futex as *const AtomicU32 as *mut u32,
                FUTEX_WAIT,
                futex.load(Ordering::SeqCst),
                0,
                0,
                0,
            )
        };
        if ret == -1 {
            panic!("futex_wait系统调用执行失败");
        }
    }
}

fn futex_wake(futex: &AtomicU32, thread: &str) {
    let ret = unsafe {
        syscall(
            SYS_futex,
            futex as *const AtomicU32 as *mut u32,
            FUTEX_WAKE,
            1,
            0,
            0,
            0,
        )
    };
    if ret == -1 {
        panic!("futex_wake系统调用执行失败");
    }
    println!("线程{thread}释放锁");
    futex.store(UNLOCKED, Ordering::SeqCst);
}

/// 测试基本的futex使用
fn test_futex() {
    // futex用户空间地址
    let futex = Arc::new(AtomicU32::new(0));
    let futex_clone1 = futex.clone();
    let futex_clone2 = futex.clone();

    // 线程1
    let thread1 = thread::spawn(move || {
        // 尝试获取锁
        futex_wait(&futex_clone1, "1");
        // 执行具体的业务逻辑
        thread::sleep(Duration::from_secs(5));
        // 释放锁
        futex_wake(&futex_clone1, "1");
    });

    // 线程2
    let thread2 = thread::spawn(move || {
        // 尝试获取锁
        futex_wait(&futex_clone2, "2");
        // 执行具体的业务逻辑
        thread::sleep(Duration::from_secs(5));
        // 释放锁
        futex_wake(&futex_clone2, "2");
    });

    thread1.join().unwrap();
    thread2.join().unwrap();
}

C示例程序:

#include <stdio.h>
#include <pthread.h>
#include <stdatomic.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <linux/futex.h>

#define UNLOCKED 0
#define LOCKED 1
#define FUTEX_STATUS(val) val == 0 ? "UNLOCKED" : "LOCKED"

// 定义一个结构体来封装参数
typedef struct {
    atomic_uint* futex;
    int thread;
} thread_args;

void* futex_wait(atomic_uint* futex, int thread) {
    while (1) {
        // 如果当前futex没有其他线程持有
        int expected = UNLOCKED;
        if (atomic_compare_exchange_strong(futex, &expected, LOCKED)) {
            // 加锁后直接返回
            printf("线程%d上锁成功. futex状态: %s\n", thread, FUTEX_STATUS(*futex));
            return NULL;
        }

        // 线程进入等待状态
        printf("线程%d正在等待futex\n", thread);
        long ret = syscall(SYS_futex, (unsigned*)futex, FUTEX_WAIT, *futex, 0, 0);
        if (ret == -1) {
            perror("futex_wait系统调用执行失败\n");
            return NULL;
        }
    }
}

void* futex_wake(atomic_uint* futex, int thread) {
    long ret = syscall(SYS_futex, (unsigned*)futex, FUTEX_WAKE, 1, 0, 0, 0);
    if (ret == -1) {
        perror("futex_wake系统调用执行失败\n");
        return NULL;
    }
    atomic_store(futex, UNLOCKED);
    printf("线程%d释放锁\n", thread);
    return NULL;
}

void* thread_task(void* arg) {
    thread_args* args = (thread_args*)arg;
    // futex用户空间地址
    atomic_uint* futex = args->futex;
    // 线程号
    int thread = args->thread;

    // 尝试获取锁
    futex_wait(futex, thread);
    // 执行具体的业务逻辑
    sleep(5);
    // 释放锁
    futex_wake(futex, thread);

    return NULL;
}

int main() {
    // 线程句柄
    pthread_t t1, t2;

    // futex用户空间地址
    atomic_uint futex = 0;

    thread_args args1 = { &futex, 1 };
    thread_args args2 = { &futex, 2 };

    // 创建两个线程同时递增cnt
    pthread_create(&t1, NULL, thread_task, (void*)&args1);
    pthread_create(&t2, NULL, thread_task, (void*)&args2);

    // 等待线程结束
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    return 0;
}

运行结果:

在这里插入图片描述

pthread_mutex的使用

C示例程序:

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

// 共享计数器
int shread_cnt = 0;
// 互斥锁
pthread_mutex_t cnt_mutex = PTHREAD_MUTEX_INITIALIZER;

// 线程执行的任务
void* thread_task(void* arg) {
    // 线程ID
    long tid = (long)arg;

    // 循环5次,每次增加计数器
    for (int i = 0; i < 5; ++i) {
        // 加锁
        pthread_mutex_lock(&cnt_mutex);

        // 临界区:修改共享资源
        int tmp = shread_cnt;
        // 模拟一些处理时间
        usleep(100);
        shread_cnt = tmp + 1;

        printf("线程 %ld: 计数器值 = %d\n", tid, shread_cnt);

        // 解锁
        pthread_mutex_unlock(&cnt_mutex);

        // 模拟一些处理时间
        usleep(200);
    }

    return NULL;
}

int main() {
    // 定义线程句柄数组
    pthread_t threads[3];

    // 创建3个线程
    for (long i = 0;i < 3; ++i) {
        int ret = pthread_create(&threads[i], NULL, thread_task, (void*)i);

        if (ret != 0) {
            perror("线程创建失败");
            return 1;
        }
    }

    // 等待所有线程完成
    for (int i = 0; i < 3; ++i) {
        pthread_join(threads[i], NULL);
    }

    // 打印最终计数器值
    printf("最终计数器值 = %d\n", shread_cnt);

    // 销毁互斥锁
    pthread_mutex_destroy(&cnt_mutex);

    return 0;
}

原理浅析:

pthread_mutex_lock实际上会执行__pthread_mutex_lock ,然后这个函数实现里面会调用LLL_MUTEX_LOCK宏,这个宏会调用__lll_lock宏,在这个宏里面就会先尝试在用户态进行上锁(也就是通过CAS的原子操作进行上锁),然后上锁失败再调用__lll_lock_wait(或_\lll_lock_wait_private),这个函数在追踪下去就会执行futex_wait系统调用。这个流程就跟上面futex本身的使用差不多了。

想要看更详细的讲解可以看这个博客:pthread_mutex_lock实现 - pslydhh

参考

  • ‍‌‌‌‬‬‬‍‬‌‍‬‬‌‍‬‌⁠‌‍futex技术分享
  • 2022年南大操作系统课程——并发控制:互斥
  • pthread_mutex_lock实现 - pslydhh

http://www.kler.cn/a/519487.html

相关文章:

  • 15天基础内容-5
  • AJAX RSS Reader:技术解析与应用场景
  • 对神经网络基础的理解
  • 31、Java集合概述
  • ceph新增节点,OSD设备,标签管理(二)
  • FreeBSD里制作ubuntu22 jammy兼容环境的脚本
  • 第 10 课 Python 内置函数
  • 在 Ubuntu22.04 上安装 Splunk
  • 2025年1月22日(什么是扫频)
  • vue router路由复用及刷新问题研究
  • 从 VJ 拥塞控制到 BBR:ACK 自时钟和 pacing
  • 《Kotlin核心编程》上篇
  • 【动态规划】杨表
  • YOLOv11改进,YOLOv11检测头融合DSConv(动态蛇形卷积),并添加小目标检测层(四头检测),适合目标检测、分割等任务
  • SQL注入漏洞之SQL注入基础知识点 如何检测是否含有sql注入漏洞
  • 【leetcode100】二叉树的层序遍历
  • Elasticsearch中的度量聚合:深度解析与实战应用
  • mock可视化生成前端代码
  • javascript-es6 (一)
  • 【Vim】Vim 中将文件内容复制到系统剪切板的方法
  • 基于Oracle 19C的ADVM与ACFS标准化实施文档
  • Python Pandas数据清洗与处理
  • RabbitMQ---面试题
  • Pyecharts图表交互功能提升
  • vue3+elementPlus之后台管理系统(从0到1)(day4-完结)
  • 在Ubuntu上安装RabbitMQ教程