C++:pthread的使用
pthread 简介
pthread 是 POSIX 线程(POSIX Threads)的简称,它是 POSIX 标准中定义的线程接口规范。pthread 库提供了一系列函数,用于创建、销毁、同步和管理线程。在类 Unix 系统(如 Linux、macOS)中,pthread 库被广泛使用,是实现多线程编程的重要工具。
基本使用方法
线程创建
使用 pthread_create 函数来创建一个新线程。其函数原型如下:
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
thread:指向 pthread_t 类型变量的指针,用于存储新创建线程的标识符。
attr:线程属性,通常设为 NULL,使用默认属性。
start_routine:指向线程函数的指针,线程启动后将执行该函数。
arg:传递给线程函数的参数。
#include <stdio.h>
#include <pthread.h>
// 线程函数
void* thread_function(void* arg) {
printf("This is a thread, argument: %d\n", *(int*)arg);
return NULL;
}
int main() {
pthread_t thread;
int arg = 10;
// 创建线程
if (pthread_create(&thread, NULL, thread_function, &arg)!= 0) {
perror("Failed to create thread");
return 1;
}
printf("Thread created successfully\n");
// 等待线程结束
if (pthread_join(thread, NULL)!= 0) {
perror("Failed to join thread");
return 1;
}
printf("Thread joined successfully\n");
return 0;
}
线程等待
使用 pthread_join 函数等待一个线程结束。函数原型:
int pthread_join(pthread_t thread, void **retval);
thread:要等待的线程标识符。
retval:用于接收线程函数的返回值,通常设为 NULL。
线程销毁
可以使用 pthread_cancel 函数来取消一个线程。函数原型:
int pthread_cancel(pthread_t thread);
thread:要取消的线程标识符。
注意,被取消的线程需要有相应的清理机制,否则可能会导致资源泄漏。
传参实例
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
int j = 2;
void* pthread_fun(void *arg)
{
while(j--)
{
cout << " in pthread_task" << endl;
cout << *(int*)arg << endl;
sleep(1);
}
// 线程返回一个整数值
int* result = new int(42);
return static_cast<void*>(result);//pthread_exit(reinterpret_cast<void*>(result));
}
int main()
{
int ret = 0;
pthread_t tid = 0;
int argsend = 99;
int i = 3;
ret = pthread_create(&tid, NULL, pthread_fun, &argsend);
if(ret != 0)
{
cout << " pthread_create error" << endl;
return -1;
}
while(i--)
{
cout << "pthread _create success" << endl;
sleep(2);
//等待回收资源
}
void* ret1;
// 阻塞式等待线程结束,并获取返回值
if (pthread_join(tid, &ret1) != 0) {
std::cerr << "Failed to join thread." << std::endl;
return 1;
}
// 处理线程的返回值
int* result = static_cast<int*>(ret1);
std::cout << "Thread returned: " << *result << std::endl;
// 释放动态分配的内存
delete result;
return 0;
}
线程同步
多线程编程中,线程同步是至关重要的。pthread 提供了多种同步机制,如互斥锁、条件变量、信号量等。
互斥锁
互斥锁用于保证同一时刻只有一个线程能够访问共享资源。使用 pthread_mutex_t 类型来定义互斥锁,相关函数有:
pthread_mutex_init:初始化互斥锁。
pthread_mutex_lock:加锁,若锁已被占用则线程阻塞。
pthread_mutex_unlock:解锁。
pthread_mutex_destroy:销毁互斥锁。
#include <stdio.h>
#include <pthread.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int shared_variable = 0;
void* increment(void* arg) {
for (int i = 0; i < 1000; ++i) {
pthread_mutex_lock(&mutex);
shared_variable++;
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main() {
pthread_t thread1, thread2;
// 创建两个线程
if (pthread_create(&thread1, NULL, increment, NULL)!= 0 ||
pthread_create(&thread2, NULL, increment, NULL)!= 0) {
perror("Failed to create thread");
return 1;
}
// 等待线程结束
if (pthread_join(thread1, NULL)!= 0 ||
pthread_join(thread2, NULL)!= 0) {
perror("Failed to join thread");
return 1;
}
printf("Shared variable value: %d\n", shared_variable);
// 销毁互斥锁
pthread_mutex_destroy(&mutex);
return 0;
}
thread_function1线程在条件ready为false时等待,thread_function2线程在工作一段时间后将ready设置为true,并通过pthread_cond_signal唤醒thread_function1线程。
读写锁
#include <iostream>
#include <pthread.h>
#include <unistd.h>
// 定义读写锁
pthread_rwlock_t rwlock;
// 共享数据
int sharedData = 0;
// 读线程函数
void* reader(void* arg) {
int id = *(int*)arg;
while (true) {
// 加读锁
pthread_rwlock_rdlock(&rwlock);
std::cout << "Reader " << id << " reads sharedData: " << sharedData << std::endl;
// 释放读锁
pthread_rwlock_unlock(&rwlock);
// 模拟一些工作
sleep(1);
}
return nullptr;
}
// 写线程函数
void* writer(void* arg) {
int id = *(int*)arg;
while (true) {
// 加写锁
pthread_rwlock_wrlock(&rwlock);
sharedData++;
std::cout << "Writer " << id << " writes sharedData: " << sharedData << std::endl;
// 释放写锁
pthread_rwlock_unlock(&rwlock);
// 模拟一些工作
sleep(2);
}
return nullptr;
}
int main() {
// 初始化读写锁
pthread_rwlock_init(&rwlock, nullptr);
// 创建读线程和写线程
pthread_t readers[3];
pthread_t writers[2];
int readerIds[3] = {1, 2, 3};
int writerIds[2] = {1, 2};
for (int i = 0; i < 3; ++i) {
pthread_create(&readers[i], nullptr, reader, &readerIds[i]);
}
for (int i = 0; i < 2; ++i) {
pthread_create(&writers[i], nullptr, writer, &writerIds[i]);
}
// 等待线程结束(这里实际上不会结束,因为线程是无限循环)
for (int i = 0; i < 3; ++i) {
pthread_join(readers[i], nullptr);
}
for (int i = 0; i < 2; ++i) {
pthread_join(writers[i], nullptr);
}
// 销毁读写锁
pthread_rwlock_destroy(&rwlock);
return 0;
}
读写锁的定义和初始化:
pthread_rwlock_t rwlock;:定义一个读写锁变量 rwlock。
pthread_rwlock_init(&rwlock, nullptr);:在 main 函数中初始化读写锁,第二个参数为 nullptr 表示使用默认的属性。
共享数据:
int sharedData = 0;:定义一个共享的整数变量 sharedData,多个线程将对其进行读写操作。
读线程函数 reader:
pthread_rwlock_rdlock(&rwlock);:调用 pthread_rwlock_rdlock 函数加读锁,允许多个读线程同时持有读锁。
读取共享数据 sharedData 并输出。
pthread_rwlock_unlock(&rwlock);:调用 pthread_rwlock_unlock 函数释放读锁。
写线程函数 writer:
pthread_rwlock_wrlock(&rwlock);:调用 pthread_rwlock_wrlock 函数加写锁,写锁是独占的,同一时间只能有一个线程持有写锁。
对共享数据 sharedData 进行加 1 操作并输出。
pthread_rwlock_unlock(&rwlock);:调用 pthread_rwlock_unlock 函数释放写锁。
线程的创建和等待:
使用 pthread_create 函数创建 3 个读线程和 2 个写线程。
使用 pthread_join 函数等待线程结束,但由于线程是无限循环,实际上不会结束。
读写锁的销毁:
pthread_rwlock_destroy(&rwlock);:在程序结束时销毁读写锁,释放相关资源。
非阻塞式互斥锁
在多线程编程中,常规的互斥锁在获取锁时,如果锁已被占用,线程会进入阻塞状态,一直等待锁的释放。而非阻塞式互斥锁则不同,当线程尝试获取锁时,如果锁当前不可用,它不会阻塞线程,而是立即返回一个表示获取锁失败的状态,线程可以根据这个状态决定后续操作。这种特性在一些对响应时间要求极高,且线程不能长时间等待的场景中非常有用。
在 C++ 中,使用 pthread 库实现非阻塞式互斥锁时,可以利用pthread_mutex_trylock函数
#include <iostream>
#include <pthread.h>
#include <unistd.h>
// 定义互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 线程函数
void* threadFunction(void* arg) {
// 尝试获取非阻塞式互斥锁
if (pthread_mutex_trylock(&mutex) == 0) {
std::cout << "Thread got the non - blocking mutex." << std::endl;
sleep(1); // 模拟一些工作
pthread_mutex_unlock(&mutex);
std::cout << "Thread released the non - blocking mutex." << std::endl;
} else {
std::cout << "Thread couldn't get the non - blocking mutex." << std::endl;
}
return nullptr;
}
int main() {
pthread_t thread;
// 初始化互斥锁
if (pthread_mutex_init(&mutex, nullptr)!= 0) {
std::cerr << "Failed to initialize mutex" << std::endl;
return 1;
}
// 主线程先获取锁
if (pthread_mutex_lock(&mutex)!= 0) {
std::cerr << "Failed to lock mutex in main thread" << std::endl;
return 1;
}
// 创建线程
if (pthread_create(&thread, nullptr, threadFunction, nullptr)!= 0) {
std::cerr << "Failed to create thread" << std::endl;
pthread_mutex_unlock(&mutex);
return 1;
}
sleep(2); // 主线程保持锁一段时间
pthread_mutex_unlock(&mutex); // 主线程释放锁
// 等待线程结束
if (pthread_join(thread, nullptr)!= 0) {
std::cerr << "Failed to join thread" << std::endl;
return 1;
}
// 销毁互斥锁
if (pthread_mutex_destroy(&mutex)!= 0) {
std::cerr << "Failed to destroy mutex" << std::endl;
return 1;
}
return 0;
}
threadFunction线程尝试使用pthread_mutex_trylock获取互斥锁,如果获取成功则执行相关操作并释放锁,否则输出获取失败的信息。主线程先获取锁并保持一段时间,模拟锁被占用的情况,展示非阻塞式互斥锁的工作机制。
非阻塞式读写锁锁
读写锁区分了读操作和写操作,允许多个线程同时进行读操作,但只允许一个线程进行写操作。非阻塞式读写锁在获取读锁或写锁时,如果锁被占用,不会阻塞线程,而是立即返回获取结果。
在 C++ 中,使用 pthread 库实现非阻塞式读写锁,读锁可以通过pthread_rwlock_tryrdlock函数尝试获取,写锁可以通过pthread_rwlock_trywrlock函数尝试获取。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
// 定义读写锁
pthread_rwlock_t rwLock = PTHREAD_RWLOCK_INITIALIZER;
int sharedData = 0;
// 读线程函数
void* readThreadFunction(void* arg) {
// 尝试获取非阻塞式读锁
if (pthread_rwlock_tryrdlock(&rwLock) == 0) {
std::cout << "Read thread got the non - blocking read lock. Data: " << sharedData << std::endl;
sleep(1); // 模拟读操作
pthread_rwlock_unlock(&rwLock);
std::cout << "Read thread released the non - blocking read lock." << std::endl;
} else {
std::cout << "Read thread couldn't get the non - blocking read lock." << std::endl;
}
return nullptr;
}
// 写线程函数
void* writeThreadFunction(void* arg) {
// 尝试获取非阻塞式写锁
if (pthread_rwlock_trywrlock(&rwLock) == 0) {
sharedData++;
std::cout << "Write thread got the non - blocking write lock. Data updated: " << sharedData << std::endl;
sleep(1); // 模拟写操作
pthread_rwlock_unlock(&rwLock);
std::cout << "Write thread released the non - blocking write lock." << std::endl;
} else {
std::cout << "Write thread couldn't get the non - blocking write lock." << std::endl;
}
return nullptr;
}
int main() {
pthread_t readThread, writeThread;
// 初始化读写锁
if (pthread_rwlock_init(&rwLock, nullptr)!= 0) {
std::cerr << "Failed to initialize rwlock" << std::endl;
return 1;
}
// 创建读线程
if (pthread_create(&readThread, nullptr, readThreadFunction, nullptr)!= 0) {
std::cerr << "Failed to create read thread" << std::endl;
pthread_rwlock_destroy(&rwLock);
return 1;
}
// 创建写线程
if (pthread_create(&writeThread, nullptr, writeThreadFunction, nullptr)!= 0) {
std::cerr << "Failed to create write thread" << std::endl;
pthread_cancel(readThread);
pthread_rwlock_destroy(&rwLock);
return 1;
}
// 等待读线程结束
if (pthread_join(readThread, nullptr)!= 0) {
std::cerr << "Failed to join read thread" << std::endl;
return 1;
}
// 等待写线程结束
if (pthread_join(writeThread, nullptr)!= 0) {
std::cerr << "Failed to join write thread" << std::endl;
return 1;
}
// 销毁读写锁
if (pthread_rwlock_destroy(&rwLock)!= 0) {
std::cerr << "Failed to destroy rwlock" << std::endl;
return 1;
}
return 0;
}
readThreadFunction线程尝试使用pthread_rwlock_tryrdlock获取非阻塞式读锁,writeThreadFunction线程尝试使用pthread_rwlock_trywrlock获取非阻塞式写锁,根据获取结果进行相应操作,展示了非阻塞式读写锁在 pthread 中的使用方法。
条件变量
条件变量用于线程间的同步,一个线程可以在某个条件不满足时等待,另一个线程在条件满足时唤醒等待的线程。使用 pthread_cond_t 类型定义条件变量,相关函数有:
pthread_cond_init:初始化条件变量。
pthread_cond_wait:等待条件变量,调用时会自动释放互斥锁,当被唤醒时会重新获取互斥锁。
pthread_cond_signal:唤醒一个等待的线程。
pthread_cond_broadcast:唤醒所有等待的线程。
pthread_cond_destroy:销毁条件变量。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
// 互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 条件变量
pthread_cond_t cond_var = PTHREAD_COND_INITIALIZER;
// 共享数据
bool ready = false;
// 线程函数1
void* thread_function1(void* arg) {
// 加锁
pthread_mutex_lock(&mutex);
while (!ready) {
// 等待条件变量,此时会自动释放互斥锁
pthread_cond_wait(&cond_var, &mutex);
}
std::cout << "Thread 1: Condition is met, continuing execution." << std::endl;
// 解锁
pthread_mutex_unlock(&mutex);
return nullptr;
}
// 线程函数2
void* thread_function2(void* arg) {
sleep(2); // 模拟一些工作
// 加锁
pthread_mutex_lock(&mutex);
ready = true;
std::cout << "Thread 2: Setting condition to true and signaling." << std::endl;
// 唤醒一个等待的线程
pthread_cond_signal(&cond_var);
// 解锁
pthread_mutex_unlock(&mutex);
return nullptr;
}
int main() {
pthread_t thread1, thread2;
// 创建线程1
if (pthread_create(&thread1, nullptr, thread_function1, nullptr)!= 0) {
std::cerr << "Failed to create thread 1" << std::endl;
return 1;
}
// 创建线程2
if (pthread_create(&thread2, nullptr, thread_function2, nullptr)!= 0) {
std::cerr << "Failed to create thread 2" << std::endl;
pthread_cancel(thread1);
return 1;
}
// 等待线程1结束
if (pthread_join(thread1, nullptr)!= 0) {
std::cerr << "Failed to join thread 1" << std::endl;
}
// 等待线程2结束
if (pthread_join(thread2, nullptr)!= 0) {
std::cerr << "Failed to join thread 2" << std::endl;
}
// 销毁互斥锁
pthread_mutex_destroy(&mutex);
// 销毁条件变量
pthread_cond_destroy(&cond_var);
return 0;
}
信号量
信号量用于控制对共享资源的访问数量。使用 sem_t 类型定义信号量,相关函数有:
sem_init:初始化信号量。
sem_wait:等待信号量,若信号量值为 0 则线程阻塞。
sem_post:释放信号量,使信号量值加 1。
sem_destroy:销毁信号量。
#include <iostream>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>
// 定义信号量,初始值设为3,表示最多允许3个线程同时访问共享资源
sem_t semaphore;
// 共享资源
int sharedResource = 0;
// 线程函数
void* threadFunction(void* arg) {
// 等待信号量,获取访问共享资源的许可
sem_wait(&semaphore);
std::cout << "Thread " << *(int*)arg << " is accessing the shared resource." << std::endl;
// 模拟对共享资源的操作
sharedResource++;
sleep(1);
std::cout << "Thread " << *(int*)arg << " is leaving the shared resource." << std::endl;
// 释放信号量,允许其他线程访问共享资源
sem_post(&semaphore);
return nullptr;
}
int main() {
// 初始化信号量,初始值为3
if (sem_init(&semaphore, 0, 3)!= 0) {
std::cerr << "Failed to initialize semaphore" << std::endl;
return 1;
}
pthread_t threads[5];
int threadIds[5] = {1, 2, 3, 4, 5};
// 创建5个线程
for (int i = 0; i < 5; ++i) {
if (pthread_create(&threads[i], nullptr, threadFunction, &threadIds[i])!= 0) {
std::cerr << "Failed to create thread " << i << std::endl;
// 清理已创建的线程
for (int j = 0; j < i; ++j) {
pthread_cancel(threads[j]);
}
// 销毁信号量
sem_destroy(&semaphore);
return 1;
}
}
// 等待所有线程结束
for (int i = 0; i < 5; ++i) {
if (pthread_join(threads[i], nullptr)!= 0) {
std::cerr << "Failed to join thread " << i << std::endl;
}
}
// 销毁信号量
sem_destroy(&semaphore);
std::cout << "Final value of shared resource: " << sharedResource << std::endl;
return 0;
}
定义了一个信号量,初始值为 3,这意味着最多允许 3 个线程同时访问共享资源。每个线程在访问共享资源前,先通过sem_wait等待信号量,获取访问许可;访问结束后,通过sem_post释放信号量,允许其他线程访问。