多线程同步
多线程
程序中默认只有一个线程,pthread_create()
函数调用后就有2个线程。
pthread_create()
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
using namespace std;
//线程函数
void * callback(void * arg) {
cout <<"pthread arg:"<< *(int *)arg << endl;
return NULL;
}
int main() {
pthread_t tid;
// 创建一个子线程
int num = 10;//传入参数
int ret = pthread_create(&tid, NULL, callback, (void *)&num);
if(ret != 0) {//不等于0说明失败了
char * errstr = strerror(ret);
cout << errstr << endl;
}
//主进程运行
cout << "child pthread:" << tid << endl;
cout << "main pthread:" << pthread_self() << endl;
for (int i = 0; i < 50; i++)
{
cout << i << endl;
}
//主线程退出不会影响其他进程
pthread_exit(NULL);
return 0;
}
pthread_exit()
是退出线程
pthread_join()
子线程的资源可以由任意线程通过:pthread_join()
回收资源,一般由主线程负责回收。
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
using namespace std;
//线程函数
int val = 10;//全局变量
void *callback(void *arg)
{
cout << "child thread:" << pthread_self() << endl;
// 返回值是void*,作用相当return
pthread_exit((void *)&val);
}
int main() {
pthread_t tid;
// 创建一个子线程
pthread_create(&tid, NULL, callback, NULL);
//主进程运行
for (int i = 0; i < 10; i++)
{
cout << i << endl;
}
//主线程回收子线程资源
//返回值是二级指针
int *pth_ret;
pthread_join(tid, (void **)&pth_ret);
cout << "exit data:" << *pth_ret << endl;
pthread_exit(NULL);
return 0;
}
pthread_join()
是一个阻塞函数用全局变量才能保持值稳定
pthread_death()
分离一个线程,被分离的线程的资源会被系统自动释放
- 不可以多次分离线程
- 不能分离一个已经分离的线程
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
using namespace std;
//线程函数
void* callback(void *arg){
pthread_exit(NULL);
}
int main() {
pthread_t tid;
// 创建一个子线程
pthread_create(&tid, NULL, callback, NULL);
// 输出线程号
cout << "main:" << pthread_self() << ",child:" << tid << endl;
//设置子线程分离
pthread_detach(tid);
//释放主线程
pthread_exit(NULL);
return 0;
}
pthread_cancel()
向线程发送一个取消的线程,说白了就是终止线程
不是立即终止,而是执行到某个取消点才会终止
- 取消点是系统规定好的一些系统调用
- 可以把取消点理解为从用户区到内核区的切换点
//线程函数 void* callback(void *arg){ for (int i = 0; i < 6;i++){ cout << "child pthread:" << i << endl; } return NULL; } int main() { pthread_t tid; // 创建一个子线程 pthread_create(&tid, NULL, callback, NULL); // 输出线程号 cout << "main:" << pthread_self() << ",child:" << tid << endl; /*取消线程*/ pthread_cancel(tid); for (int i = 0; i < 6; i++) { cout << "main pthread:" << i << endl; } // 释放主线程 pthread_exit(NULL); return 0; }
说明cancel的线程不是立即取消的
线程属性attr
//线程属性类型 pthread_attr_t
int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t *attr);
int pthread_attr_getdetachstate(const pthread_attr_t *attr, int *detachstate);
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
还有很多的线程属性相关函数
//线程函数
void* callback(void *arg){
for (int i = 0; i < 6;i++){
cout << "child pthread:" << i << endl;
}
return NULL;
}
int main() {
//创建线程属性变量
pthread_attr_t attr;
//初始化线程属性
pthread_attr_init(&attr);
//设置线程属性
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_t tid;
//第二个参数是自己设置好的线程属性
pthread_create(&tid, &attr, callback, NULL);
cout << "main:" << pthread_self() << ",child:" << tid << endl;
//释放线程属性资源
pthread_attr_destroy(&attr);
pthread_exit(NULL);
return 0;
}
获取栈的大小
int main() { //创建线程属性变量 pthread_attr_t attr; //初始化线程属性 pthread_attr_init(&attr); //设置线程属性 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_t tid; //第二个参数是自己设置好的线程属性 pthread_create(&tid, &attr, callback, NULL); //获取栈的大小 size_t size; pthread_attr_getstacksize(&attr,&size); cout << "pthread size:" << size << endl; // 释放线程属性资源 pthread_attr_destroy(&attr); pthread_exit(NULL); return 0; }
线程同步
多线程编程的重点
#include <pthread.h> #include <string.h> #include <unistd.h> #include <iostream> using namespace std; /* 三个窗口一起卖100张票 */ //线程函数是卖票 int ticket = 100;//全局 void* sellTicket(void* arg){ usleep(100); while (ticket > 0) { cout << pthread_self() << "在卖第" << ticket << "张票" << endl; ticket--; } return NULL; } int main() { //创建3个子线程,主线程只进行线程资源回收 pthread_t tid1, tid2, tid3; pthread_create(&tid1, NULL, sellTicket, NULL); pthread_create(&tid2, NULL, sellTicket, NULL); pthread_create(&tid3, NULL, sellTicket, NULL); //回收子线程资源 pthread_join(tid1, NULL); pthread_join(tid2, NULL); pthread_join(tid3, NULL); //退出主线程 pthread_exit(NULL); return 0; }
线程的优势就是可以通过全局变量共享信息,不过共享就存在了竞争
多个线程不会同时修改同一变量;某一线程不会读取其他线程正在修改的变量
这个代码的问题在于:
同一张票可以被多个线程卖;线程执行时间问题导致买了负票
临界资源
临界区是指访问某一共享资源的代码片段,并且这段代码的执行应为原子操作,也就是同时访问同一共享资源的其他线程不应终端该片段的执行。
线程同步:
即当有一个线程在对内存进行操作时,其他线程都不可以对这个内存地址进行操作.直到该线程完成操作,其他线程才能对该内存地址进行操作,而其他线程则处于等待状态。
线程同步影响并发效率,但是为了数据安全是必须的
原子操作就是执行一段代码只能完整执行,不能被中断,防止并发执行带来的问题
互斥锁
mutex
--------- mutual exclusion的缩写
- 确保某时刻仅有一个线程可以访问某项共享资源。
- 可以使用互斥量来保证对任意共享资源的原子访问。
互斥量有两种状态:
- 已锁定(locked)
- 未锁定(unlocked)
每一线程在访问同一资源时将采用如下协议:
- 针对共享资源锁定互斥量
- 访问共享资源
- 对互斥量解锁
只有一个线程能对共享资源加锁,其他线程加锁会被阻塞
//互斥量的类型 pthread_mutex_t mt;
//相关函数 //restrict--c语言修饰符 int pthread_mutex_init(pthread_mutex_t *restrict mutex, constpthread_mutexattr_t *restrict attr); int pthread_mutex_destroy(pthread_mutex_t *mutex); int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
代码使用:
#include <pthread.h> #include <string.h> #include <unistd.h> #include <iostream> using namespace std; /* 三个窗口一起卖1000张票 */ int ticket = 1000;//全局 /*互斥量:全局变量*/ pthread_mutex_t mutex; void *sellTicket(void *arg){ while(1){ /*加锁*/ pthread_mutex_lock(&mutex); if (ticket > 0) { usleep(200); cout << pthread_self() << "在卖第" << ticket << "张票" << endl; ticket--; }else{ /*解锁*/ pthread_mutex_unlock(&mutex); break; } //解锁 pthread_mutex_unlock(&mutex); } return NULL; } int main() { //初始化互斥量 pthread_mutex_init(&mutex, NULL); // 创建3个子线程,主线程只进行线程资源回收 pthread_t tid1, tid2, tid3; pthread_create(&tid1, NULL, sellTicket, NULL); pthread_create(&tid2, NULL, sellTicket, NULL); pthread_create(&tid3, NULL, sellTicket, NULL); //回收子线程资源 pthread_join(tid1, NULL); pthread_join(tid2, NULL); pthread_join(tid3, NULL); //退出主线程 pthread_exit(NULL); //释放互斥量 pthread_mutex_destroy(&mutex); return 0; }
死锁
什么是死锁
两个线程都在等待对方释放锁,在没有外力的作用下,这些线程会一直相互等待,就没办法继续运行,这种情况就是发生了死锁。
线程B锁住了资源2但是等待线程A释放资源1
线程A锁住资源1同时又在等待线程B释放资源2
死锁发生的4个条件
-
互斥条件
多个线程不能同时使用同一个资源
-
持有并等待条件
线程 在等待资源 1的同时并不会释放自己已经持有的资源 2。
-
不可剥夺条件
当线程已经持有了资源 ,在自己使用完之前不能被其他线程获取
-
环路等待条件
在死锁发生的时候,两个线程获取资源的顺序构成了环形链
死锁的例子
#include <pthread.h> #include <string.h> #include <unistd.h> #include <iostream> using namespace std; //初始化两个锁 pthread_mutex_t mutex1, mutex2; //线程函数 void* workA(void* arg){ pthread_mutex_lock(&mutex1); sleep(1); pthread_mutex_lock(&mutex2); cout << "workA,,,," << endl; pthread_mutex_unlock(&mutex2); pthread_mutex_unlock(&mutex1); return NULL; } void* workB(void* arg){ pthread_mutex_lock(&mutex2); sleep(1); pthread_mutex_lock(&mutex1); cout << "workB,,,," << endl; pthread_mutex_unlock(&mutex1); pthread_mutex_unlock(&mutex2); return NULL; } int main(){ //初始化锁 pthread_mutex_init(&mutex1,NULL); pthread_mutex_init(&mutex2, NULL); //创建两个线程 pthread_t tid1, tid2; pthread_create(&tid1, NULL,workA, NULL); pthread_create(&tid1, NULL,workB, NULL); //退出主线程 pthread_exit(NULL); //释放两把锁 pthread_mutex_destroy(&mutex1); pthread_mutex_destroy(&mutex2); return 0; }
程序卡死
读写锁
实际上多个线程同时读访问共享资源并不会导致问题。
在对数据的读写操作中,更多的是读操作,写操作较少
读写锁的特点:
-
如果有其它线程读数据,则允许其它线程执行读操作,但不允许写操作
-
如果有其它线程写数据,则其它线程都不允许读、写操作
-
写是独占的,写的优先级高
同样有读锁和写锁请求资源,优先满足写
//读写锁的类型
pthread_rwlock_t
//函数
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
读写锁也是一把锁,只是可以指定锁类型
#include <pthread.h> #include <string.h> #include <unistd.h> #include <iostream> using namespace std; /*8个线程,3个写,5个读*/ //共享数据 int nums = 100; // 初始化锁 pthread_rwlock_t rw; //线程函数 void* writeNum(void* arg){ while(1){ pthread_rwlock_wrlock(&rw); nums++; cout << pthread_self() << "is write,nums=" << nums << endl; pthread_rwlock_unlock(&rw); usleep(100); } return NULL; } void* readNum(void* arg){ while(1){ pthread_rwlock_rdlock(&rw); cout << pthread_self() << "is read,nums=" << nums << endl; pthread_rwlock_unlock(&rw); usleep(100); } return NULL; } int main(){ //初始化锁 pthread_rwlock_init(&rw,NULL); // 创建线程 pthread_t wtid[3],rtid[5]; for (int i = 0; i < 3;i++) pthread_create(&wtid[i], NULL, writeNum, NULL); for (int i = 0; i < 5;i++) pthread_create(&rtid[i], NULL, readNum, NULL); //设置线程分离 for (int i = 0; i < 3;i++) pthread_detach(wtid[i]); for (int i = 0; i < 5;i++) pthread_detach(rtid[i]); pthread_exit(NULL); //释放锁 pthread_rwlock_destroy(&rw); return 0; }
生产者消费者模型
什么是生产者-消费者模式
比如有两个进程A和B,它们共享一个固定大小的缓冲区,A进程产生数据放入缓冲区,B进程从缓冲区中取出数据进行计算,那么这里其实就是一个生产者和消费者的模式,A相当于生产者,B相当于消费者
该模式的对象:
- 生产者
- 消费者
- 缓冲区
为了达到生产者和消费者生产数据和消费数据之间的平衡,那么就需要一个缓冲区用来存储生产者生产的数据
生产者-消费者模式的特点
- 保证生产者不会在缓冲区满的时候继续向缓冲区放入数据,而消费者也不会在缓冲区空的时候,消耗数据
- 当缓冲区满的时候,生产者会进入休眠状态,当下次消费者开始消耗缓冲区的数据时,生产者才会被唤醒,开始往缓冲区中添加数据;
- 当缓冲区空的时候,消费者也会进入休眠状态,直到生产者往缓冲区中添加数据时才会被唤醒
#include <pthread.h> #include <string.h> #include <unistd.h> #include <iostream> using namespace std; //创建一个互斥量 pthread_mutex_t mutex; /*生产者消费者模型*/ struct node{ int num; node *next; node(int val,node* next=nullptr):num(val),next(next){} }; node *head = NULL; void *product(void *) { //不断创建新节点到链表中 while(1){ pthread_mutex_lock(&mutex); node *newNode = new node(rand() % 100, head); head = newNode; cout << pthread_self << " create node" << newNode->num << endl; pthread_mutex_unlock(&mutex); usleep(100); } return NULL; } void* custom(void*){ //不断取出头节点 while(1){ pthread_mutex_lock(&mutex); node *tmp = head; if(head){ head = head->next; cout << pthread_self() << " consume node" << tmp->num << endl; delete (tmp); pthread_mutex_unlock(&mutex); usleep(100); }else{ pthread_mutex_unlock(&mutex); } } return NULL; } int main(){ //创建互斥量 pthread_mutex_init(&mutex,NULL); // 创建5个生产者和5个消费者 pthread_t pTids[5], cTids[5]; for (int i = 0; i < 5;i++){ pthread_create(&pTids[i], NULL, product, NULL); pthread_create(&cTids[i], NULL, custom, NULL); } //设置分离 for (int i = 0; i < 5;i++){ pthread_detach(pTids[i]); pthread_detach(cTids[i]); } /*保证程序执行*/ while(1){ sleep(10); } //释放互斥量 pthread_mutex_destroy(&mutex); // 退出主线程 pthread_exit(NULL); return 0; }
有可能发生死锁,没有解决消费者没数据要通知生产者生产数据
条件变量
condition variables
不是锁,只是阻塞和解除阻塞
分为一直等待和限时等待
wait,timewait
可以唤醒1个或者多个线程
signal一个,broadcast多个
//条件变量的类型
//pthread_cond_t
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t*restrict attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t*restrict mutex, const struct timespec *restrict abstime);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
加了条件变量
#include <pthread.h> #include <string.h> #include <unistd.h> #include <iostream> using namespace std; pthread_mutex_t mutex; /*创建条件变量*/ pthread_cond_t cond; struct node { int num; node *next; node(int val,node* next=nullptr):num(val),next(next){} }; node *head = NULL; void *product(void *){ while(1){ pthread_mutex_lock(&mutex); node *newNode = new node(rand() % 100, head); head = newNode; cout << pthread_self << " create node" << newNode->num << endl; /*只要生产了,就通知消费者消费*/ pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); usleep(100); } return NULL; } void* custom(void*){ while(1){ pthread_mutex_lock(&mutex); node *tmp = head; if(head){ head = head->next; cout << pthread_self() << " consume node" << tmp->num << endl; delete (tmp); pthread_mutex_unlock(&mutex); usleep(100); }else{ /*没有数据,等待*/ //阻塞会释放互斥锁 pthread_cond_wait(&cond, &mutex); pthread_mutex_unlock(&mutex);//没数据也要释放,不然死锁 } } return NULL; } int main(){ pthread_mutex_init(&mutex,NULL); /*创建条件变量*/ pthread_cond_init(&cond, NULL); pthread_t pTids[5], cTids[5]; for (int i = 0; i < 5;i++){ pthread_create(&pTids[i], NULL, product, NULL); pthread_create(&cTids[i], NULL, custom, NULL); } for (int i = 0; i < 5;i++){ pthread_detach(pTids[i]); pthread_detach(cTids[i]); } while(1){ sleep(10); } pthread_mutex_destroy(&mutex); /*释放条件变量*/ pthread_cond_destroy(&cond); pthread_exit(NULL); return 0; }
信号量
信号量是一个计数器,用于控制多个进程对共享资源的访问。
信号量是一种特殊的变量,它可以被增加或减少,但对其的关键访问被保证是原子操作。
//信号量的类型
//sem_t
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
int sem_post(sem_t *sem);
int sem_getvalue(sem_t *sem, int *sval);
- 初始化:第二个变量区分进程和线程(0是线程)
- wait阻塞,把信号量值-1
- post,把信号量值+1,唤醒wait
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
#include <semaphore.h>
using namespace std;
pthread_mutex_t mutex;
//创建2个信号量
sem_t psem, csem;
struct node
{
int num;
node *next;
node(int val,node* next=nullptr):num(val),next(next){}
};
node *head = NULL;
void *product(void *){
while(1){
//生产者-1
sem_wait(&psem);
pthread_mutex_lock(&mutex);
node *newNode = new node(rand() % 100, head);
head = newNode;
cout << pthread_self << " create node" << newNode->num << endl;
pthread_mutex_unlock(&mutex);
//消费者+1
sem_post(&csem);
usleep(100);
}
return NULL;
}
void* custom(void*){
while(1){
//消费者-1
sem_wait(&csem);
pthread_mutex_lock(&mutex);
node *tmp = head;
head = head->next;
cout << pthread_self() << " consume node" << tmp->num << endl;
delete (tmp);
pthread_mutex_unlock(&mutex);
//生产者+1
sem_post(&psem);
}
return NULL;
}
int main(){
pthread_mutex_init(&mutex,NULL);
/*创建信号量*/
sem_init(&psem, 0, 8);
sem_init(&csem, 0, 0);
pthread_t pTids[5], cTids[5];
for (int i = 0; i < 5;i++){
pthread_create(&pTids[i], NULL, product, NULL);
pthread_create(&cTids[i], NULL, custom, NULL);
}
for (int i = 0; i < 5;i++){
pthread_detach(pTids[i]);
pthread_detach(cTids[i]);
}
while(1){
sleep(10);
}
pthread_mutex_destroy(&mutex);
/*释放信号量*/
sem_destroy(&psem);
sem_destroy(&csem);
pthread_exit(NULL);
return 0;
}