当前位置: 首页 > article >正文

线程概念、操作

一、背景知识

1、地址空间进一步理解

在父子进程对同一变量进行修改时发生写时拷贝,这时候拷贝的基本单位是4KB,会将该变量所在的页框全拷贝一份,这是因为修改该变量很有可能会修改其周围的变量(局部性原理),这是一种以空间换时间的做法;malloc和new其实对申请内存做了封装,申请的也是4KB的整数倍。

OS如何管理页框?先描述再组织。描述:用一个结构体struct page,管理采用数组struct page memory[n] 等数据结构。一个4GB的内存有1048576(1MB)(计算方式:4*1024*1024*1024 / (4*1024))个页框,那么管理的数组有1048576个元素,每一个page有了下标,这里的下标*4KB就是对应的每个page的下标。

2、页表的深入理解

如果页表是真的如下图所示,左边是虚拟地址,右边是物理地址,那么每一个条目就要有8个字节,加上表示位置的一个字节,就是9个字节,一个进程的虚拟地址空间一般是4GB,那么一张页表就有4GB*9=36GB大小,这明显是不合适的。

物理地址一定是在物理内存中的某个页的。只要找到该物理地址在该页内的偏移量,就可以找到该物理地址对应的那个字节。

对于一个物理地址,其实就是一个32位的二进制数字。真实的页表是将这32位数字划分为10+10+12三类。

3、虚拟地址的本质

函数是有地址的,函数的地址一般指函数入口的地址,而函数这一段代码块是一段连续的地址。函数的本质就是一段连续的代码地址。

虚拟地址是一种资源。

二、线程的概念与代码实现

1、概念

线程是进程内部运行的CPU调度的基本单位。

线程是在进程的PCB中运行的,一个进程中可能有多个PCB(对应在LINUX中就是多个task_struct)

Linux中的线程:

复用pcb,用pcb统一表示执行流,这样就不需要为线程单独设计数据结构和调度算法了。

windows中的线程:对线程先描述再组织,有一个struct tcb结构体,内部有线程ID、优先级、状态、上下文、连接属性等属性。每一个线程都要与进程相连接。最后设计成的示意图如下:

Linux系统实现线程的方式是更优的,维护成本低,更简单也就更不容易出错。

Windows中是真的有进程,而Linux中叫轻量级进程。

2、代码实现

创建线程的函数:

#include <iostream>
#include <pthread.h>
#include <unistd.h>

using namespace std;

void *newthread_process(void *str)
{
    while (1)
    {
        sleep(1);
        cout<<"new thread processing... pid:"<<getpid()<<endl;
    }
}

int main()
{
    pthread_t mythread1;
    int thread_ret = pthread_create(&mythread1, nullptr, newthread_process, (void *)"new thread");
    //主线程
    while(1)
    {
        cout<<"main thread processing... pid:"<<getpid()<<endl;
        sleep(1);
    }
    return 0;
}
.PHONY: all
all:test_thread

test_thread : test_thread.cpp
	g++ -o $@ $^ -std=c++11 -lpthread

.PHONY: clean
clean: 
	rm -f test_thread

在编译时需要带入pthread动态库,即-lpthread

ps-aL查看轻量级进程(ps axj查看进程)其中LWP表示的就是轻量级进程的ID

3、关于线程的几个问题:

(1)已经有多进程,为何要有多线程?

① 进程创建成本非常高,要创建PCB、地址空间、页表,并建立映射等;创建线程只需要创建PCB,然后将资源分配给该PCB。

② 运行时线程相对于进程,切换时不需要切换地址空间和页表。线程调度成本低。

线程调度成本为何低?

实际页表对应一个寄存器,仅仅是这个寄存器的切换不会有太大的效率影响。

CPU内部有一个硬件cache,根据局部性原理存储热数据;如果是进程间切换,cache中的热数据需要切换;而线程切换的cache热数据无需变化。

③ 删除时线程也成本低。

(2)那为何要有进程?

线程也有劣势。在一个进程中的多个线程,当一个线程出现野指针、除0等类似的错误时,整个进程都要崩溃回收,其他线程也就崩溃了,线程的健壮性比较差。一个线程出异常,可能会改变其他线程的数据,这样其他线程的正确性无法保证,所以全部崩溃。

(3)不同的OS实现线程的方式不一样?

Windows实现线程是先描述再组织;Linux则是通过复用代码。尽管实现方式不一样,但都满足线程的统一定义:进程内部运行的CPU调度的基本单位。只是具体的如何实现在进程内部,如何实现其是CPU调度的基本单位,是不一样的。线程的原理都是正确的。

(4)线程的页表划分

每一个线程有自己的代码,对应在页表中不同的区域。也就是说,对于同一张页表,不同的线程对应着页表上不同的区域。这样就是线程对页表的划分。

(5)OS与进程的关系

OS其实就是一个进程。虚拟机的原理就是这样,虚拟机中一个OS挂掉了不会影响其他OS。

(6)如何分几个线程?

对于计算密集型应用,对于一个单核CPU,分再多的线程,效率只会变低,因为线程切换需要成本。最好分的线程个数与CPU核数一样合适。

对于I/O密集型应用,可以多创建几个线程,因为IO操作的大部分时间都在等待,多分几个线程这样等待的时间重叠。

4、代码验证线程的健壮性较低

代码:

#include <iostream>
#include <pthread.h>
#include <unistd.h>

using namespace std;

void *newthread_process(void *str)
{
    while (1)
    {
        int x=rand()%5;
        cout<<"new thread processing... pid:"<<getpid()<<endl;
        sleep(1);
        if(x==0)
        {
            int *p=nullptr;
            *p=100;
        }

    }
}

int main()
{   
    srand(time(nullptr));
    pthread_t mythread1,mythread2,mythread3;
    int thread_ret1 = pthread_create(&mythread1, nullptr, newthread_process, (void *)"new thread");
    int thread_ret2 = pthread_create(&mythread2, nullptr, newthread_process, (void *)"new thread");
    int thread_ret3 = pthread_create(&mythread3, nullptr, newthread_process, (void *)"new thread");


    //主线程
    while(1)
    {
        sleep(1);
        cout<<"main thread processing... pid:"<<getpid()<<endl;
    }
    return 0;
}

验证结果:当一个进程出现报错,所有进程都退出。

不仅健壮性较低,由于多线程共享地址空间的大部分资源,所以其缺乏访问控制。

5、进程与线程对比

 

线程独有的数据,比较重要的是:一组寄存器和栈。

寄存器:硬件中的上下文数据,反映了线程可以动态运行的;

栈:每个线程都要有自己独立的栈结构,因为函数执行要是独立的。线程在运行的时候,会形成各种临时变量,临时变量会被每个线程保存在自己的栈区。

三、线程控制

在编译以上代码时加上了-lpthread。由于linux中没有线程只有轻量级进程,所以系统调用的接口只会给用户提供创建轻量级进程的接口;而我们在写代码时直接使用的是线程的相关接口,这是通过pthread动态库实现的。

1、pthread_create函数详解

第一个参数是输出型参数,其实际是Linux对 unsigned long int 类型的一个封装,是一个地址;第二个参数是要修改的线程的属性,可以直接设置为nullptr,第三个参数是一个返回值为void*且参数也为void*的函数指针,第四个参数是回调的函数参数,传给第三个参数作为函数参数。

返回值为0,表示正确创建了线程,否则返回错误码,thread中的内容将会是未定义。

2、代码验证

(1)等待线程的函数

第一个参数即为pthread_create函数中的第一个参数,表示要等待哪一个线程;第二个参数是得到pthread_create函数中的第三个参数的函数最终的返回值.

(2)代码

①两个线程的代码
#include <iostream>
#include <pthread.h>
#include <unistd.h>

using namespace std;


class ThreadData
{
public:
    int x;
    int y;
    string name;
    int Add()
    {
       return x+y;
    }

private:


};
class ThreadResult
{
public:
    int x;
    int y;
    int result;
    


private:

};

void* threadRun(void* args)
{
    /*int cnt=5;
    while(cnt)
    {
        cout<<"new thread run,cnt:"<<cnt--<<endl;
        sleep(1);
    }*/
    auto td=static_cast<ThreadData*>(args);// 安全强转
    //cout<<"id: "<<td->id<<" name:"<<td->name<<endl;
    ThreadResult* ret=new ThreadResult();
    ret->x=td->x;
    ret->y=td->y;
    ret->result=td->Add();
    delete td;
    return (void*)ret;


}

string PrintToHex(pthread_t& tid)
{
    char cache[64];
    snprintf(cache,sizeof(cache),"0x%lx",tid);
    return cache;
}



int main()
{
    pthread_t tid;
    ThreadData* td=new ThreadData();//推荐在堆空间上开辟
    td->x=10;
    td->y=20;
    td->name="thread-1";
    int n=pthread_create(&tid,nullptr,threadRun,(void*)td);
    if(n!=0)
    {
        cerr<<"create thread"<<endl;
        return 1;
    }
    string tid_hex=PrintToHex(tid);
    cout<<"tid的16进制形式为:"<<tid_hex<<endl;//验证tid
    //join等待
    cout<<"pthread join begin..."<<endl;
    ThreadResult* ret=nullptr;
    pthread_join(tid,(void**)&ret);//会阻塞到这等待 类似于wait
    //如果不join,当主线程退出,所有的线程都退出了,因为进程都退出了 ,因为当main线程不退出,而new线程退出时,不join会造成类似僵尸线程的问题
    cout<<"ret:"<<ret->result<<endl;
    cout<<"pthread join success"<<endl;

    return 0;
}
②多线程代码
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>

using namespace std;

string PrintToHex(pthread_t& tid)
{
 char cache[64];
 snprintf(cache,sizeof(cache),"0x%lx",tid);
 return cache;
}

void *ThreadRun(void *args)
{
    string name = static_cast<char *>(args);
    while (1)
    {
        cout << "new thread running,name:" << name << endl;
        sleep(1);
        //break;
    }
    //pthread_exit(args);//等价于return args
    

    //return args;
}

int main()
{
    int num = 10;
    vector<pthread_t> tids;

    for (int i = 0; i < num; i++)
    {
        pthread_t tid;
        /*char name[64];*///这样写是在栈空间上开辟,会有问题——线程覆盖问题
        char* name=new char[64];//这样在堆空间上开辟
        sprintf(name,  "thread-%d", i + 1);
        pthread_create(&tid, nullptr, ThreadRun, (void *)name);
        tids.emplace_back(tid);
    }

    //join等待
    sleep(5);

    for(auto tid:tids)
    {
        pthread_cancel(tid);
        void* ret=nullptr;
        pthread_join(tid,(void**)&ret);

        cout<<PrintToHex(tid)<<" quit ... ret:"<<(long long int)ret<<endl;
       // delete ret;

    }

    //sleep(100);

    return 0;
}

(3)几个问题

①main线程和new线程,谁先运行是不确定的。
②我们期望main线程是最后退出,如何做到?

main线程需要对new线程进行回收,所以我们期望main线程最后退出。做到这一点是通过join函数做到的。如果new线程退出了,main线程还没退出,且main线程没有join,那么此时new线程会进入类似僵尸进程的状态。

③tid到底是什么?

虚拟地址,可以以16进制的形式打印出来方便观察。

打印tid的代码:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include<string>

using namespace std;

string PrintToHex(pthread_t& tid)
{
 char cache[64];
 snprintf(cache,sizeof(cache),"0x%lx",tid);
 return cache;
}

void* ThreadRun(void* args)
{
    string name=static_cast<const char*>(args);
    while(1)
    {
        sleep(1);
        pthread_t tid=pthread_self();
        cout<<"new thread running...name:"<<name<<" tid:"<<PrintToHex(tid)<<endl;
    }

}


int main()
{
    pthread_t tid;
    pthread_create(&tid,nullptr,ThreadRun,(void*)"thread-1");
    
    cout<<"new thread tid:"<<PrintToHex(tid)<<endl;
    pthread_join(tid,nullptr);
    return 0;
}

运行结果:

而查到的LWP:

也就是说,给用户提供的线程ID,不是内核中的LWP,而是pthread维护的一个唯一值。库内部也要承担对线程的管理。(首先便是对线程ID的赋值)

要理解线程ID是一个地址,首先理解pthread库。pthread库本质是一个文件,进程未开启时是在磁盘上的。

多线程在创建之前,首先是一个进程;最开始加载时磁盘上的可执行程序加载到内存,然后建立pcb、虚拟地址空间、页表,然后页表在内存中作映射,这时候一个进程被创建好,然后才有多线程。

当进程创建好之后,要创建新线程,调用pthread_create方法,这是就需要将pthread库加载到内存中。同时要将被加载的库映射到地址空间的堆栈之间的共享区。此时就可以创建新线程了。这是加载的pthread库叫做共享库。原因是,当有新进程创建时,不需要再从磁盘中加载pthread库了,而只需要在新进程的共享区建立与内存中 pthread库的映射。这和以前的动态库加载是一样的。

总结:线程ID就是线程属性集合的起始虚拟地址,其是在pthread库中维护的。

④全面看待对线程运行函数传参

给线程运行函数传参是穿一个void*,即一个地址,我们可以通过这个地址传整数、字符串、甚至类对象地址。传递一个类对象,那么可以给线程传递多个参数、方法。

如果直接在主线程定义一个结构体,那么这是在主线程的栈上开辟的空间存储该变量,这与线程要有自己的独立栈空间矛盾了,更重要的是,如果有多个新线程,其中某个新线程对该栈空间上的变量做修改,那么就会导致全部都会变化。

所以一般采用new的写法,在堆空间上开辟空间。

⑤线程运行函数的返回值

pthread_join函数的第二个参数是输出型参数,用于获取线程运行函数的返回值。返回值void* ret是指针变量,意味着其是有空间来接收返回值的。

与进程退出的区别是,进程异常退出时会有退出信号,但线程异常退出时意味着整个进程退出(信号是发给进程的),其余线程也就退出了。所以线程退出时只关注正确退出的情况。

线程返回一个void*,即一个地址,我们可以通过这个地址传整数、字符串、甚至类对象地址。传递一个类对象,那么可以让线程返回多个参数、方法。

⑥如何创建多线程?

见(2)②多线程代码

⑦新线程如何终止?

a、线程函数return

b、exit() 不可以,其表示的是进程终止;return表示函数退出,只有main函数的return表示进程退出,exit则是进程退出

c、pthread_exit()专门用来终止一个线程,是新线程自己主动调用

d、pthread_cancel取消一个线程 一般都用主线程取消一个新线程;新线程退出结果是-1

注:main线程结束,表示进程结束,所以要尽量保证主线程最后终止。

⑧如何不join线程,而是让其结束后直接退出

使用线程分离。一个线程被创建,默认是必须要被 join的。如果一个线程被分离,那么该线程的工作状态为分离状态,不需要也不能被join;但被分离的进程依旧属于进程内部。

可以新线程自己把自己分离pthread_detach(pthread_self()),也可以主线程将新线程分离。

四、C++中的多线程

以上说的是linux中的原生线程库的操作;C++标准中的线程库其实是对linux中的原生线程库的封装,这意味着在编译时也要加上-lpthread.

真实情况是,C++标准对每个环境下的线程都做了封装,所以语言具有跨平台性。

文件操作也是类似的。

类似C++标准库,简单封装一下线程库

hpp文件代码:

#pragma once

#include <pthread.h>
#include <iostream>
#include <string>

class mythread
{
    typedef void (*fun_t)(const std::string &name);

public:
    mythread(const std::string name,fun_t func)
    :_name(name),_func(func)
    {
        _isrunning=false;

    }
    static void *threadRun(void *args)
    {
        mythread *thread = static_cast<mythread *>(args);
        thread->_func(thread->_name);
        return nullptr;
    }

    bool Start()
    {
        int n = pthread_create(&_tid, nullptr, threadRun, (void *)this);
        if (n != 0)
            return false;
        else
        {
            _isrunning = true;
            return true;
        }
    }
    void Stop()
    {
        if(_isrunning)
        {
            _isrunning=false;
            ::pthread_cancel(_tid);
        }
    }
    void Join()
    {
   
            pthread_join(_tid,nullptr);
        
    }

    ~mythread()
    {
        if(_isrunning)
        {
            Stop();
            Join();
        }
    }

private:
    pthread_t _tid;
    std::string _name;
    bool _isrunning;
    fun_t _func;
};

注:这里的线程运行函数要注意,写在类内部默认有this指针参数,所以要加上static使其成为静态的。然后传入this指针调用类内部的函数fun_t.

#include<iostream>
#include<unistd.h>
#include<string>
#include"mythreadlib.hpp"

void mythreadrun(const std::string& name)
{
    while(1)
    {
        std::cout<<"new thread is running,name:"<<name<<std::endl;
        sleep(1);
    }
}

int main()
{
    std::string name="thread1";
    mythread thread1(name,mythreadrun);
    thread1.Start();
    sleep(5);
    thread1.Stop();
    thread1.Join();
    return 0;
}

以上就是我们自己模拟的对原生线程的管理。描述是通过这个类描述的,管理可以通过一个vector进行管理。

五、线程互斥

线程之间天然就很容易看到同一份资源,所以通信很容易,但容易造成数据不一致问题;对于多个线程能看到的资源,我们称之为共享资源,我们需要对这部分资源进行保护。保护资源的方式分为互斥和同步。

以下是一个抢票的代码,体现了数据不一致的问题。

#include <iostream>
#include <unistd.h>
#include <string>
#include "mythreadlib.hpp"

int g_tickets = 10000;

void mythreadrun(const std::string &name)
{
    while (1)
    {
        if (g_tickets > 0)
        {

            usleep(1000); // 休息1000微秒 1ms ->抢票花费的时间
            //std::cout << "here success" << std::endl;

            std::cout << name << " get ticket:" << g_tickets << std::endl;
            g_tickets--;
        }
        else
        {
            break;
        }
    }
}

int main()
{
    // std::string name="thread1";
    mythread thread1("thread1", mythreadrun);
    mythread thread2("thread2", mythreadrun);
    mythread thread3("thread3", mythreadrun);
    mythread thread4("thread4", mythreadrun);

    thread1.Start();
    thread2.Start();
    thread3.Start();
    thread4.Start();

    thread1.Join();
    thread2.Join();
    thread3.Join();
    thread4.Join();

    return 0;
}

抢到最后,程序运行结果:

会发现出现抢票负数的问题,但在代码逻辑中是对g_tickets做了是否大于0的判断的。

解释原因:

直接原因:首先,对g_tickets做是否大于0的判断是一种计算。该计算是通过CPU进行调度的。判断tickets是否大于0要通过三步:

CPU寄存器内部的数据可以有多套,属于线程私有,看起来放在了一套公共寄存器,但是属于线程私有,当线程切换时要带走自己的数据;回来的时候会恢复。tickets只剩一张时,线程1被调度,判断其是满足条件的,但当①步骤做完了,②步骤还没做时,线程被切换到线程2,此时全局变量tickets还没变化还是1,那么线程2也判断其是满足条件的,也可以抢这张票。tickets--和判断tickets是否大于0是两个独立的操作,其也需要分三步:重读数据,--数据,写回数据。那么在重读数据时就会发生tickets变为负数的情况。

1、相关接口使用

如果锁是全局的或者静态的,那么直接init即可,由于此时锁的生命周期与整个程序的生命周期是一致的,则不需要destory;如果锁是动态开辟的,那么则需要初始化函数对其初始化,且需要destory

锁被创建出来、初始化之后,需要加锁、解锁

我们使用锁对临界资源进行保护,本质是对临界区代码进行保护。我们对所有资源进行访问,本质都是通过代码进行访问的。所以我们保护资源,就是把访问资源的代码保护起来。

                

①加锁的原则是加锁的范围,粒度一定要尽量小(临界区包含的代码要尽量少)

②任何线程,要进行抢票,都要先申请锁,原则上不该有例外

③所有线程申请锁,前提是所有线程都得看到这把锁,锁本身也是共享资源;那么就要求加锁的过程,必须是原子的。

④原子性:要么不做,要么做完,没有中间状态,就是原子性。

⑤如果线程申请锁失败,那么该线程要被阻塞

⑥如果线程申请锁成功,那么该线程继续向后运行

⑦线程申请锁成功,执行临界区的代码期间,可以切换。(线程切换在任意时刻都可能做)但其他线程无法进入临界区,因为申请锁成功的线程未释放锁。也就是说,申请锁成功的线程可以放心的执行临界区代码,没有其他线程可以进入临界区。

这对于其他线程,要么本线程没有申请锁,要么释放了锁,对其他线程才有意义。这意味着,本线程访问临界区,对其他线程是原子的。

相关代码:

#include <iostream>
#include <unistd.h>
#include <string>
#include<vector>
#include "mythreadlib.hpp"
#include"LockGuard.hpp"

int g_tickets = 10000;
//pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER; // 定义全局锁并初始化

/*void mythreadrun(const std::string &name)
{
    while (1)
    {
        //pthread_mutex_lock(&gmutex);
        if (g_tickets > 0)
        {

            usleep(1000); // 休息1000微秒 1ms ->抢票花费的时间
            // std::cout << "here success" << std::endl;

            std::cout << name << " get ticket:" << g_tickets << std::endl;
            g_tickets--;
            //pthread_mutex_unlock(&gmutex);
        }
        else
        {
            //pthread_mutex_unlock(&gmutex);
            break;
        }
    }
}*/ //全局变量锁的写法

void myRoute (ThreadData* td)
{
    while(1)
    {
        lockguard lock(td->_lock); //临时变量 此段代码执行完时生命周期结束
        if (g_tickets > 0)
        {

            usleep(1000); // 休息1000微秒 1ms ->抢票花费的时间
            // std::cout << "here success" << std::endl;

            std::cout << td->_name << " get ticket:" << g_tickets << std::endl;
            g_tickets--;
            //pthread_mutex_unlock(td->_lock);
        }
        else
        {
            //pthread_mutex_unlock(&gmutex);
            //pthread_mutex_unlock(td->_lock);

            break;
        }
    }

}

static int thread_num=4;

int main()
{
    std::vector<mythread> threads;
    pthread_mutex_t lock;
    pthread_mutex_init(&lock,nullptr);

    for(int i=0;i<thread_num;i++)
    {
        std::string name="thread-"+std::to_string(i+1); 
        ThreadData* td=new ThreadData(name,&lock);
        threads.emplace_back(name,myRoute,td);

    }

    for(auto& thread:threads)
    {
        thread.Start();
    }
    for(auto& thread:threads)
    {
        thread.Join();
    }
    return 0;
}
#pragma once
//hpp代码一
#include <pthread.h>
#include <iostream>
#include <string>

class ThreadData
{
public:
    ThreadData(const std::string &name, pthread_mutex_t *lock)
        : _name(name), _lock(lock)
    {
    }

public:
    std::string _name;
    pthread_mutex_t *_lock;
};

class mythread
{

    typedef void (*fun_t)(ThreadData *td);

public:
    mythread(const std::string name, fun_t func, ThreadData *td)
        : _name(name), _func(func), _td(td)
    {
        _isrunning = false;
    }
    static void *threadRun(void *args)
    {
        mythread *thread = static_cast<mythread *>(args);
        thread->_func(thread->_td);
        thread->_isrunning = false; // 运行结束
        return nullptr;
    }

    bool Start()
    {
        int n = pthread_create(&_tid, nullptr, threadRun, (void *)this);
        if (n != 0)
            return false;
        else
        {
            _isrunning = true;
            return true;
        }
    }
    void Stop()
    {
        if (_isrunning)
        {
            _isrunning = false;
            ::pthread_cancel(_tid);
        }
    }
    void Join()
    {
        pthread_join(_tid, nullptr);
    }

    ~mythread()
    {
        if (_isrunning)
        {
            Stop();
            Join();
        }
    }

private:
    pthread_t _tid;
    std::string _name;
    bool _isrunning;
    fun_t _func;
    ThreadData *_td;
};
#pragma once
//hpp代码2
#include<pthread.h>

class lockguard
{
public:
    lockguard(pthread_mutex_t* lock)
    :_lock(lock)
    {
        pthread_mutex_lock(_lock);
    }
    ~lockguard()
    {
        pthread_mutex_unlock(_lock);
    }
private:
    pthread_mutex_t* _lock;
};

2、从原理角度理解锁

申请锁成功,允许进入临界区的本质是pthread_mutex_lock()函数会返回;反之,申请锁失败(表示锁没有就绪),pthread_mutex_lock()函数不返回,线程阻塞了。当申请锁成功的线程pthread_mutex_unlock()之后,库中的线程会被唤醒,从而重新申请锁。

3、从实现角度理解锁

前提:

①CPU的寄存器只有一套,被所有的线程共享。但是寄存器里面的数据,属于执行流的上下文,属于执行流私有的数据。

②CPU在执行代码的时候,一定要有对应的执行载体。进程&&线程。

③数据在内存中,是被所有线程共享的。

结论:把数据移动到寄存器,本质是把数据从共享的状态变为线程私有的状态。

六、线程同步

仅仅是线程互斥的话,可能会导致一个线程在某段时间内一直获取资源,因为某线程在第一次获取临界资源的时候,下一次其再次获取临界资源的可能性是更大的。我们为了让线程获取临界资源更合理、让其具有顺序性,这里的顺序性就是同步。需要注意的是,这里的顺序可以是严格的顺序性,也可以是相对的顺序性。

1、条件变量

(1)相关接口

(2)理解条件变量

一个线程队列+通知机制(唤醒一个或者唤醒全部)

用一个测试代码测试:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>
#include <vector>

const int threadnum = 4;
int gtickets = 10000;
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;

void *thread_Run(void *args)
{
    char* name=static_cast<char*>(args);
    while (1)
    {
        //加锁、同步
        usleep(1000);
        pthread_mutex_lock(&lock);
        pthread_cond_wait(&cond,&lock);
        
        std::cout<<"I am "<<name<<",running..."<<std::endl;
        pthread_mutex_unlock(&lock);
    }
}

int main()
{
    // 对线程同步进行测试
    std::vector<pthread_t> threads;
    for (int i = 0; i < threadnum; i++)
    {
        pthread_t tid;
        char *name = new char[128];
        snprintf(name, 128, "thread-%d", i + 1);
        pthread_create(&tid, nullptr, thread_Run, (void *)name);
        threads.emplace_back(tid);
    }
    // 线程等待
    while(1)
    {
        //唤醒线程
        //pthread_cond_broadcast(&cond);
        pthread_cond_signal(&cond);
        sleep(1);
    }
    // sleep(10);
    for (auto& thread : threads)
    {
        pthread_join(thread, nullptr);
    }

    return 0;
}

测试结果:

可以发现是按照一定的顺序打印,这就保证了线程同步。

条件变量的使用需要配合互斥锁、等待以及唤醒函数。

2、生产消费模型

生产消费模型有一个三原则:

①一个交易场所:(特定数据结构形式存在的一段内存空间)

②两种角色:生产者和消费者(对应的就是生产线程和消费线程)

③三种关系:生产者和生产者,消费者和消费者,生产者和消费者,以上的三种关系全都是互斥关系。

3、代码实现生产消费模型

首先是利用阻塞队列来实现生产消费模型,什么是阻塞队列:

hpp代码:

#pragma once

#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <queue>

#define DEFAULT_CAP 4

template <typename T>
class Prod_Cons_Model
{
public:
    Prod_Cons_Model() {}

    Prod_Cons_Model(int max_cap = DEFAULT_CAP)
        : _max_cap(max_cap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cons_cond,nullptr);
        pthread_cond_init(&_prod_cond,nullptr);
    }

    ~Prod_Cons_Model()
    {
        pthread_mutex_destroy(&_mutex);
        // pthread_cond_destroy(&_cons_cond);
        // pthread_cond_destroy(&_prod_cond);
        pthread_cond_destroy(&_cons_cond);
        pthread_cond_destroy(&_prod_cond);

    }

    bool Is_Full()
    {
        return _block_queue.size() == _max_cap;
    }
    bool Is_Empty()
    {
        return _block_queue.size() == 0;
    }

    void Productor(const T &data)
    {
        // 临界资源
        pthread_mutex_lock(&_mutex);
        while (Is_Full()) // 是while而不是if
        {
            pthread_cond_wait(&_prod_cond, &_mutex);
        }
        _block_queue.push(data);

        pthread_mutex_unlock(&_mutex);//和signal函数的顺序对调也可以
        pthread_cond_signal(&_cons_cond);

    }

    // 消费数据
    void Consume(T *ret)
    {
        pthread_mutex_lock(&_mutex);
        while (Is_Empty())
        {
            pthread_cond_wait(&_cons_cond, &_mutex);
        }
        *ret = _block_queue.front();
        _block_queue.pop();
        // 唤醒生产者
        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_prod_cond);

    }

private:
    std::queue<T> _block_queue;
    int _max_cap;
    pthread_mutex_t _mutex;
    pthread_cond_t _cons_cond;
    pthread_cond_t _prod_cond;
};

cpp代码:

#include "model.hpp"
#include <vector>
#include <cstdlib>
#include <pthread.h>
#include <ctime>
#include <iostream>
void *myconsume(void *args)
{

    auto model = static_cast<Prod_Cons_Model<int> *>(args);
    while (1)
    {
        int ret;
        model->Consume(&ret);
        std::cout << "I am customer,get data:" << ret << std::endl;
    }

    return nullptr;
}

void *myproductor(void *args)
{
    auto model = static_cast<Prod_Cons_Model<int> *>(args);

    while (1)
    {
        sleep(2);
        int data = rand() % 5; // 生成5以内的随机数
        model->Productor(data);
        std::cout << "I am productor,prodece data:" << data << std::endl;
    }

    return nullptr;
}

int main()
{

    Prod_Cons_Model<int> *Int_Model = new Prod_Cons_Model<int>(4);
    pthread_t cons1;
    pthread_t cons2;

    pthread_t prod1;
    pthread_t prod2;
    pthread_t prod3;

    srand(time(nullptr) ^ getpid());

    pthread_create(&cons1, nullptr, myconsume, Int_Model);
    pthread_create(&cons2, nullptr, myconsume, Int_Model);
    pthread_create(&prod1, nullptr, myproductor, Int_Model);
    pthread_create(&prod2, nullptr, myproductor, Int_Model);
    pthread_create(&prod3, nullptr, myproductor, Int_Model);

    pthread_join(cons1, nullptr);
    pthread_join(cons2, nullptr);
    pthread_join(prod1, nullptr);
    pthread_join(prod2, nullptr);
    pthread_join(prod3, nullptr);

    return 0;
}

①pthread_cond_wait函数详解:

当pthread_cond_wait函数调用的时候,不仅让自己这个线程进入等待状态(停在该函数内部),还释放获取的锁,当被唤醒时再次进入队列中竞争锁。当再次竞争到锁时,函数才返回,这就是该函数第二个参数还要传入一个锁的原因。

pthread_cond_wait函数一定要在临界代码中,原因是等待函数要检测队列中数据是否满足条件,这个检测的过程需要在临界代码中。而信号量是不需要判断。

②用while而不是if

这里注意,在消费函数和生产函数内部需要等待的时候,要用while而不是if,原因是:当有2个线程进入等待状态,而唤醒用的是broad_cast的话,2个线程同时被唤醒,其中一个竞争到锁,另一个被阻塞在锁里而不是wait里。竞争到锁的线程对队列中的内容进行操作(以消耗数据为例),如果此时将队列中的数据全消耗完了,然后释放锁,而另一个线程竞争到锁之后,往后走会直接消耗队列中的数据,而队列中的数据已经为空了,这就会导致问题。所以要用while而不是if.

③解锁和唤醒函数的顺序

在生产函数和消费函数内部,解锁和唤醒两个函数的顺序是可以对调的。因为无论哪种顺序,唤醒线程之后,被唤醒的线程还是要竞争同一把锁,所以在解锁前和解锁后都一样。但是唤醒的操作一定要在对队列进行操作之后。

④分配任务

注意:这里生产消费模型,还可以用于任务的分配执行。例如,生产出一个任务交给消费者去执行。

⑤代码适用性

以上的代码不仅适合单生产单消费情景,也适合多生产多消费情景。具体在实际应用中的如何选择则要根据需要。

4、生产消费模型的特点

(1)协调忙闲不均

通过线程同步,实现对忙闲不均的协调

(2)生产者代码和消费者代码解耦

生产者代码和消费者代码互不影响。

(3)效率高

尽管在临界资源中永远只有一个线程在执行,但是生产任务、处理任务也是需要花费时间的,对于消费者来说,获取任务和处理任务是并发的;对于生产者来说,发送任务和产生任务是并发的;这里的并发对于整个工作流程是效率高的。

5、信号量

(1)信号量概念

见《共享内存与信号量》一文。

(2)信号量相关接口

初始化:

value表示的是多少信号量。

信号量的P操作:

该操作的意思是,申请信号量,申请成功时信号量--;申请失败时则会阻塞在这个函数中。

信号量的V操作:

(3)环形队列配合信号量实现生产消费模型

环形队列的特点:当head==end时,队列为空或者队列为满。如何判断空还是满:引入一个计数器。

多线程如何在环形队列中生产和消费:

①当队列为空,让生产者先生产

②当队列为满时,让消费者先消费

③为空为满是少数情况,大部分情况是既不为空,也不为满;此时head(生产者下标)和end(消费者下标)一定不指向一个位置,此时允许生产和消费同时进行。此时可以看出环形队列一定是比阻塞队列实现的生产消费模型快的。

以上的这些条件,可以直接使用信号量实现。所以说信号量是用来实现互斥与同步的。

对于消费者来说,数据资源是他真正的资源;而对于生产者而言,空间资源是其真正的资源。所以我们得设置两个信号量,一个是对应数据资源,一个是对应空间资源。在初始化时设置数据资源为空,空间资源为满(等于环形队列的容量)。

对于生产者,要申请空间资源(P一个空间资源),但释放的是一个数据资源(V一个空间资源),因为生产者申请到一个空间资源后,是向这个空间资源中放数据的,放完数据后空间资源并未增多,而是数据资源增多了一个;对于消费者,要申请数据资源,而要释放一个空间资源,因为消费者申请一个数据资源后,是拿到该数据,拿到之后该数据已经没有用了,所以是释放空间资源,可以让生产者再在这个空间生产数据。

(4)代码实现

先写上层调用逻辑:

对于单生产单消费模型,让生产者生产,消费者消费,让两者看到同一份环形队列资源,生产者不断地生产,消费者不断地消费(对于队列空和满的情况在.hpp文件中实现即可)。

Main.cc代码:

#include "RingQueue.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>

void *Consume(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
    while (1)
    {
        // 消费
        int out;

        rq->Pop(&out);

        // 处理数据
        std::cout << "得到的数据为:" << out << std::endl;
        sleep(1);
    }
}

void *Productor(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
    sleep(2);

    while (1)
    {
        // 构造数据
        int in = rand() % 10 + 1;

        // 生产
        rq->Push(in);

        std::cout << "构建的数据为:" << in << std::endl;
        sleep(2);
    }
}

int main()
{
    srand(time(nullptr) ^ getpid());
    RingQueue<int> *rq = new RingQueue<int>();
    pthread_t c, p;
    pthread_create(&c, nullptr, Consume, rq);
    pthread_create(&p, nullptr, Productor, rq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    return 0;
}

hpp代码:

#pragma once

#include<pthread.h>
#include<iostream>
#include<string>
#include<vector>
#include <semaphore.h>

#define DEFAULT_SIZE 5

template<typename t>
class RingQueue
{
private:
    void P(sem_t& sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t& sem)
    {
        sem_post(&sem);
    }
public:
    RingQueue(int size=DEFAULT_SIZE)
    :_max_cap(size),
    _head(0),
    _end(0)    
    {
        _queue.resize(_max_cap);
        sem_init(&_data_sem,0,0);
        sem_init(&_space_sem,0,_max_cap);
        pthread_mutex_init(&_c_mutex,nullptr);
        pthread_mutex_init(&_p_mutex,nullptr);

    }

    ~RingQueue()
    {
        sem_destroy(&_data_sem);
        sem_destroy(&_space_sem);
        pthread_mutex_destroy(&_c_mutex);
        pthread_mutex_destroy(&_p_mutex);

    }

    void Push(const t& data)//生产
    {
        P(_space_sem);
        pthread_mutex_lock(&_p_mutex);
        _queue[_head]=data;
        _head++;
        _head%=_max_cap;
        pthread_mutex_unlock(&_p_mutex);

        V(_data_sem);
    }

    void Pop(t* out)//消费
    {
        P(_data_sem);
        pthread_mutex_lock(&_c_mutex);

        *out=_queue[_end];
        _end++;
        _end%=_max_cap;
        pthread_mutex_unlock(&_c_mutex);

        V(_space_sem);

    }

private:
    std::vector<t> _queue;
    int _max_cap;
    int _head;//生产者下标
    int _end;//消费者下标
    // int _data_num; //不太需要
    sem_t _data_sem;
    sem_t _space_sem;
    pthread_mutex_t _c_mutex;
    pthread_mutex_t _p_mutex;



};

代码验证结果:(保持了同步以及互斥)

但如果要实现多生产多消费,则要加上锁。


http://www.kler.cn/a/524430.html

相关文章:

  • 2025春招 SpringCloud 面试题汇总
  • Baklib引领企业内容中台建设的新思路与应用案例
  • olloama下载deepseek-r1大模型本地部署
  • 8639 折半插入排序
  • 马尔科夫模型和隐马尔科夫模型区别
  • 【PyTorch】5.张量索引操作
  • Python NumPy(6):修改数组形状、翻转数组、修改数组维度
  • MySQL查询优化(三):深度解读 MySQL客户端和服务端协议
  • 网站如何正式上线(运维详解)
  • 解决 pip install 出现 error: subprocess-exited-with-error 错误的方法
  • 小黑日常积累:学习了CROSS APPLY字段,将sqlserver中字段通过分隔符拆分并统计
  • “爱”之浅谈(一)
  • 混合专家模型MoE的全面详解
  • MybatisX插件快速创建项目
  • [C语言日寄] <stdio.h> 头文件功能介绍
  • Go学习:字符、字符串需注意的点
  • MotionLCM 部署笔记
  • 基于最近邻数据进行分类
  • 蓝牙技术在物联网中的应用有哪些
  • xclode版本
  • AI大模型开发原理篇-1:语言模型雏形之N-Gram模型
  • 【Pandas】pandas Series cummax
  • JavaScript_03 超简计算器
  • 深入理解指针(2)
  • Apple M1 ARM MacBook 安装 Apache TVM
  • VScode 插件开发 国际化帮助工具