2025年多线程编程

多线程编程线程概述 线程模型 按照线程运行环境和调度者的身份 线程可以分为内核线程 和用户线程 内核线程 运行在内核空间 由内核来调度 当进程的一个内核线程获得 cpu 使用权时 他就加载并运行一个用户线程 用户线程 运行在用户空间 由线程库来调用 一个进程可以拥有 M 个内核线程和 N 个用户线程

大家好,我是讯享网,很高兴认识大家。

线程概述

线程模型

按照线程运行环境和调度者的身份,线程可以分为内核线程用户线程

内核线程运行在内核空间,由内核来调度。当进程的一个内核线程获得cpu使用权时,他就加载并运行一个用户线程。

用户线程运行在用户空间,由线程库来调用。

一个进程可以拥有M个内核线程和N个用户线程, M ≤ N M \le N MN。并且在一个sys里的所有进程中,系统线程和用户线程的比值都是固定的:完全在用户空间实现、完全由内核实现、双层调度。

完全在用户空间实现(和协程其实有点像):无需内核支持,线程库管理并执行所有的线程,比如线程的优先级、时间片等。线程利用longjump来切换线程,但是实际上内核依然将整个进程作为最小单位来调度,对外表现出相同的优先级。优点是创建和调度无需内核干预,因而速度很快。缺点是对于多处理器系统,一个进程的多个线程无法在多个处理机上运行。此外线程优先级只对同一个进程中的其他线程有效,对不同进程中的线程无效。
完全由内核实现将创建调度任务都交给了内核。线程库无需自行管理任务,优缺点与上面相反。M:N = 1:1
双层调度在这里插入图片描述
讯享网

Linux线程库

LinuxThreads和NPTL,都是按照1:1模式实现的(完全由内核实现)。Linux默认使用的是NPTL

创建和结束线程

定义在ptherad.h

pthread_create

#include<pthread.h> int pthread_create(pthread_t *thread, const pthread_attr_t* attr, void*(*start_routine)(void*), void* arg); typedef unsigned long int pthread_t; 

讯享网

thread参数是新线程的标识符,后续pthread_*系列函数通过它来引用新线程。其类型其实是unsigned long int
attr参数用来设定新线程的属性,设置为NULL表示使用默认属性。start_routinearg表示线程将运行的函数以及参数列表。
pthread_create调用成功返回0,失败返回错误码(和fork区别)。

pthread_exit

线程函数结束时最好调用如下函数保证安全干净地退出

讯享网#include<pthread.h> void pthread_exit(void* retval); 

函数通过retval向线程的回收者传递其退出信息,执行完后不会回到调用者且不会执行失败。

pthread_join

一个进程中的线程可以调用pthread_join函数来回收其他线程(若其他线程是可回收的),即等待其他线程结束。类似于回收进程的wait和waitpid机制。

int pthread_join(pthread_t thread, void** retval); 

thread是目标线程id,retval是目标线程的退出信息。该函数会阻塞至目标线程结束为止。成功返回0,失败:
在这里插入图片描述

pthread_cancel

讯享网int pthread_cancel(pthread_t thread); 

用来异常终止一个线程。thread是目标线程的标识符。

接收到取消请求的线程可以决定是否取消以及如何取消。

int pthread_setcancelstate(int state, int* oldstate); int pthread_setcanceltype(int type, int* oldtype); 

state设置取消状态(是否允许取消),oldstate记录原来的取消状态(和setnonblocking里返回oldopt一个意思)。
type设置取消类型(如何取消),oldtype记录原来的取消类型。
在这里插入图片描述

线程属性

讯享网#include<bits/pthreadtypes.h> #define __SIZEOF_PTHREAD_ATTR_T 36 typedef union{ 
    char __size[__SIZEOF_PTHREAD_ATTR_T]; long int __align; } pthread_attr_t; 

可见线程的属性都包含在一个字符数组中,线程库定义了一系列函数来操作pthread_attr_t类型的变量。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

POSIX信号量

Linux上有两组信号量API,多进程里的System V IPC信号量(semget…等),以及线程里的POSIX信号量。接口很接近但是不保证能互换。
POSIX信号量格式是sem_*

#include<semaphore.h> int sem_init(sem_t* sem, int pshared, unsigned int value); int sem_destroy(sem_t* sem); // P操作 int sem_wait(sem_t* sem); int sem_trywait(sem_t* sem); // V操作 int sem_post(sem_t* sem); 

sem指向操作的信号量。
在这里插入图片描述

互斥锁

基础API

讯享网#include<pthread.h> // init int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* mutexattr); // init pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; int pthread_mutex_destroy(pthread_mutex_t* mutex); int pthread_mutex_lock(pthread_mutex_t* mutex); int pthread_mutex_trylock(pthread_mutex_t* mutex); int pthread_mutex_unlock(pthread_mutex_t* mutex); 

mutex指向要操作的目标互斥锁,互斥锁类型是struct pthread_mutex_t
pthread_mutex_init初始化一个互斥锁,mutexattr指定互斥锁的属性,NULL表示默认属性。
另外可以使用宏初始化,其实是默认将互斥锁的每个字段设置为0.
在这里插入图片描述

互斥锁属性

# define __SIZEOF_PTHREAD_MUTEX_T 40 typedef union { 
    struct __pthread_mutex_s __data; char __size[__SIZEOF_PTHREAD_MUTEX_T]; long int __align; } pthread_mutex_t; 

线程库提供了一系列函数来操作pthread_mutex_t

讯享网// 初始化互斥锁对象 int pthread_mutexattr_init(pthread_mutexattr__t* attr); // 销毁互斥锁属性对象 int pthread_mutexattr_destroy(pthread_mutexattr_t* attr); // 获取和设置互斥锁的pshared属性 int pthread_mutexattr_getpshared(const pthread_mutexattr__t* attr, int* pshared); int pthread_mutexattr_setpshared(const pthread_mutexattr_t* attr, int* pshared); // 获取和设置互斥锁的type属性 int pthread_mutexattr_gettype(const pthread_mutexattr__t* attr, int* type); int pthread_mutexattr_settype(const pthread_mutexattr_t* attr, int* type); 

pshared指定是否允许跨进程共享互斥锁

  • PSHARED_PROCESS_SHARED 互斥锁可以被跨进程共享
  • PTHREAD_PROCESS_PRIVATE 互斥锁只能被和锁的初始化线程隶属的同一个进程的线程共享

type指定互斥锁的类型
在这里插入图片描述

Example

使用pthread_mutex_tpthread_t求和(采取累加方式),来测试多线程加速计算。
示例了如何编写多线程程序以及如何传递参数。

单线程:test1.cpp

int main(){ 
    unsigned long long int ans = 0; for(unsigned long long i = 0; i < ; i++){ 
    ans += i; } cout<<ans<<endl; } 

运行台输入 time ./test1:

讯享网real 0m0.017s user 0m0.017s sys 0m0.001s 

多线程计算:test2.cpp

#include<time.h> #include<iostream> #include<unistd.h> #include<pthread.h> #include<sys/time.h> using std::cout; using std::endl; // 使用struct往线程函数里传值 struct parameter{ 
    int begin, end; parameter(int a, int b) : begin(a), end(b) { 
   } }; // 临界值和对应的锁 unsigned long long int ans = 0; pthread_mutex_t ans_mutex; void* func(void* argc){ 
    parameter* val = (parameter*)argc; unsigned long long int v = 0; for(unsigned long long i = val->begin; i < val->end; i++){ 
    v += i; } pthread_mutex_lock(&ans_mutex); ans += v; pthread_mutex_unlock(&ans_mutex); pthread_exit(NULL); } int main(){ 
    // 初始化锁 pthread_mutex_init(&ans_mutex, NULL); pthread_t id1; pthread_t id2; pthread_create(&id1, NULL, func, new parameter(0, )); pthread_create(&id2, NULL, func, new parameter(, )); pthread_join(id1, NULL); pthread_join(id2, NULL); cout<<ans<<endl; pthread_mutex_destroy(&ans_mutex); } 

注意:原生Linux函数里并不带pthread,所以要在编译指令里加 -lpthread
g++ test2.cpp -o test2 -lpthread
测试:
time ./test2

讯享网real 0m0.010s user 0m0.015s sys 0m0.000s 

条件变量

互斥锁一般用于同步线程对共享数据的访问,条件变量一般用于在线程间同步共享变量的值。条件变量是线程间通知机制,当某个共享数据的值达到某个范围时,唤醒等待这个共享数据的线程

#include<pthread.h> // 初始化条件变量 int pthread_cond_init(pthread_cond__t* cond, const pthread_condattr_t* cond_attr); pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 销毁条件变量,如果销毁一个正在被等待的条件变量会返回EBUSY int pthread_cond_destroy(pthread_cond_t* cond); // 以广播的方式唤醒所有等待目标条件变量的线程 int pthread_cond_broadcast(pthread_cond_t* cond); // 唤醒一个等待目标条件变量的线程 int pthread_cond_signal(pthread_cond_t* cond); // 等待目标条件变量 int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex); 

如何唤醒特定的线程:
在这里插入图片描述
pthread_cond_wait:等待目标条件变量,mutex参数是用于保护条件变量的互斥锁,以确保pthread_cond_wait操作的原子性。在调用pthread_cond_wait前,必须确保互斥锁mutex已经加锁,否则会导致不可预期的后果。
pthread_cond_wait函数执行时,首先把调用线程放进条件变量的等待队列中,然后将互斥锁mutex解锁。可见,在pthread_cond_wait开始执行到其调用线程被放入条件变量的等待队列间,pthread_cond_broadcast和pthread_cond_signal等函数不会修改条件变量。
所以,pthread_cond_wait函数不会错过目标条件变量的任何变化,pthread_cond_wait成功返回时,mutex将被再次锁上。

封装成类

讯享网#ifndef LOCKER_H #define LOCKER_H #include<exception> #include<pthread.h> #include<semaphore.h> class sem { 
    private: sem_t m_sem; public: sem(int v) { 
    if (sem_init(&m_sem, 0, v) != 0) { 
    throw std::exception(); } } sem(int v = 0) { 
    if (sem_init(&m_sem, 0, v) != 0) { 
    throw std::exception(); } } ~sem() { 
    sem_destroy(&m_sem); } bool wait() { 
    return sem_wait(&m_sem) == 0; } bool post() { 
    return sem_post(&m_sem) == 0; } }; class locker { 
    private: pthread_mutex_t mutex; public: locker() { 
    if (pthread_mutex_init(&mutex, NULL) != 0) { 
    throw std::exception(); } } ~locker() { 
    pthread_mutex_destroy(&mutex); } bool lock() { 
    return pthread_mutex_lock(&mutex) == 0; } bool unlock() { 
    return pthread_mutex_unlock(&mutex) == 0; } }; class cond { 
    private: pthread_cond_t m_cond; pthread_mutex_t cond_mutex; public: cond() { 
    if (pthread_mutex_init(&cond_mutex) != 0) { 
    throw std::exception(); } if (pthread_cond_init(&m_cond, NULL) != 0) { 
    throw std::exception(); } } ~cond() { 
    pthread_cond_destroy(&m_cond); pthread_mutex_destroy(&cond_mutex); } bool wait() { 
    int ret = 0; pthread_mutex_lock(&cond_mutex); ret = pthread_cond_wait(&m_cond, &cond_mutex); pthread_mutex_unlock(&cond_mutex); return ret == 0; } bool signal() { 
    return pthread_cond_signal(&m_cond) == 0; } }; #endif // !LOCKER_H 

多线程环境

可重入函数

如果一个函数能被多个线程同时被调用且不发生竞态条件,就称之为线程安全的,或者它是可重入函数
Linux大多库函数都是可重入的,不可重入的主要是因为内部使用了静态变量,但是一般都有可重入版本:在函数名尾部加_r
在多线程程序中调用一定要使用可重入版本。

线程和进程

Problem:在多线程程序的某个线程调用fork函数,那么新创建的子进程是否会自动创建和父进程相同数量的线程?
子进程只拥有一个执行线程,是调用fork的哪个线程的完整赋值。

void* func(void *argc){ 
    int v = (*((int*)argc)); cout<<v<<endl; pthread_exit(NULL); } int main(){ 
    for(int i = 0; i < 5; i++){ 
    pthread_t id; int v = i; pthread_create(&id, NULL, func, &v); } pid_t id = fork(); if(id == 0){ 
    cout<<"Child Over"<<endl; }else{ 
    cout<<"P Over"<<endl; } } /* 0 1 3 3 4 P Over Child Over */ 
讯享网pthread_mutex_t mutex; void* func(void *argc){ 
    cout<<"In thread, wanna lock the locker"<<endl; pthread_mutex_lock(&mutex); cout<<"In thread, got the locker"<<endl; sleep(5); pthread_mutex_unlock(&mutex); cout<<"In thread, give up the locker"<<endl; pthread_exit(NULL); } int main(){ 
    pthread_mutex_init(&mutex, NULL); pthread_t id; pthread_create(&id, NULL, func, NULL); sleep(1); pid_t pid = fork(); if(pid == 0){ 
    cout<<"Child wanna get the locker"<<endl; pthread_mutex_lock(&mutex); cout<<"Child wanna got the locker"<<endl; pthread_mutex_unlock(&mutex); exit(0); }else{ 
    waitpid(-1, NULL, 0); } pthread_join(id, NULL); pthread_mutex_destroy(&mutex); return 0; } /* In thread, wanna lock the locker In thread, got the locker Child wanna get the locker In thread, give up the locker */ 

pthread提供了专门的函数pthread_atfork,确保fork调用后父进程和子进程都有一个清楚的锁状态

int pthread_atfork(void(*prepare)(void), void(*parent)(void), boid (*child)(void)); 

该函数建立三个fork句柄来清理互斥锁的状态。
prepare句柄将在fork调用创建出子进程之前被执行,它可以锁住所有父进程中的互斥锁。
parent句柄则是在fork调用创建出子进程之后,fork返回之前在父进程中执行。作用是释放所有在prepare句柄中被锁住的互斥锁。
child句柄是在fork返回前,在子进程中执行,用于释放所有在prepare中被锁住的互斥锁。

所以在之前的代码fork()前加上:

讯享网void pre(){ 
    pthread_mutex_lock(&mutex); } void infork(){ 
    pthread_mutex_unlock(&mutex); } pthread_atfork(pre, infork, infork); pid_t pid = fork(); // ...... 

线程和信号

每个线程都可以独立设置信号掩码。

#include<pthread.h> #include<signal.h> int pthread_sigmask(int how, const sigset_t* newmask, sigset_t* oldmask); 

newmasl参数指定新的信号掩码,oldmask输出保存以前的旧的信号掩码(与set_nonblocking里面return oldopt一个意思),how参数指定设置进程信号掩码的方式

进程中的所有线程都共享该进程的信号,所以线程库根据线程掩码决定将信号发送给具体的哪个线程。
所以如果我们在每个子线程中都单独设置信号掩码,就很容易导致逻辑错误。
所有线程共享信号处理函数,所以在一个线程中设置了信号处理函数,其他线程的对应信号处理函数也会被覆盖。所以我们应该单独设置一个线程来处理所有的信号:

  • ①在主线程创建出其他子线程前就调用pthread_sigmask设置好信号掩码,所有新的子线程都将自动继承这个信号掩码。那么之后的所有子线程都不会响应被屏蔽的信号了。
  • ②在线程中调用如下函数并等待信号并处理之:
讯享网int sigwait(const sigset_t* set, int* sig); 

set参数指定等待的信号集,我们可以将其指定为第一步中创建的信号掩码,那么就只有这个线程会响应对应的信号了。sig这个整数参数存储该函数返回的信号值。当sigwait正确返回,就可以对接收到的信号进行处理。
当我们使用sigwait时,就不应该再为信号设置信号处理函数了,因为当程序接收到信号时,二者中只有一个能起作用

example

inline void handle_err_en(int en, const char* msg) { 
    errno = en; perror(msg); exit(EXIT_FAILURE); } void* sig_thread(void* arg) { 
    // 仅在该线程处理如下信号 sigset_t* sigs = (sigset_t*)arg; int s, sig; for (;;) { 
    s = sigwait(sigs, &sig); if (s != 0) { 
    handle_err_en(s, "sigwait"); } printf("Signal handling thread got signal %d", sig); } } int main() { 
    pthread_t th; sigset_t sigs; int s; sigemptyset(&sigs); sigaddset(&sigs, SIGQUIT); sigaddset(&sigs, SIGUSR1); // 其他线程屏蔽该信号集 s = pthread_sigmask(SIG_BLOCK, &sigs, NULL); if (s != 0) { 
    handle_err_en(s, "pthread_sigmask"); } s = pthread_create(&th, NULL, sig_thread, &sigs); if (s != 0) { 
    handle_err_en(s, "pthread_create"); } return 0; } 

pthread_kill

发送信号给指定线程
int pthread_kill(pthread_t th, int sig);
sig指定待发送的信号,如果为0,则不发送信号,但是仍然执行错误检测,可以利用此来完成线程是否存在的检测。

讯享网#include<pthread.h> #include<signal.h> #include<stdio.h> #include<errno.h> #include<unistd.h> void* func(void* arg){ 
    sigset_t sigs; sigaddset(&sigs, SIGALRM); int i = 0; bool run = true; while(run){ 
    printf("wait for signal\n"); int sig; sigwait(&sigs, &sig); if(sig == SIGALRM){ 
    if(i++ == 1) run = false; printf("Recv sig %d\n", sig); } } pthread_exit(NULL); } int main(){ 
    int ret = -1; pthread_t th1; pthread_create(&th1, NULL, func, NULL); sleep(5); for(int i = 0; i < 4; i++){ 
    // 发送两次后,子线程会退出,后续会失败 sleep(3); ret = pthread_kill(th1, SIGALRM); if(ret != 0) printf("Error code %d\n", errno); } pthread_join(th1, NULL); printf("Over\n"); return 0; } /* wait for signal Recv sig 14 wait for signal Recv sig 14 Error code 0 Error code 0 Over */ 
小讯
上一篇 2025-01-26 17:21
下一篇 2025-02-22 09:50

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/47339.html