非阻塞队列有哪些(非阻塞队列有哪些组成)

非阻塞队列有哪些(非阻塞队列有哪些组成)svg xmlns http www w3 org 2000 svg style display none svg

大家好,我是讯享网,很高兴认识大家。



 <svg xmlns="http://www.w3.org/2000/svg" style="display: none;"> <path stroke-linecap="round" d="M5,0 0,2.5 5,5z" id="raphael-marker-block" style="-webkit-tap-highlight-color: rgba(0, 0, 0, 0);"></path> </svg> <p>相信不少小伙伴在面试过程中都被问到过共享内存。通过阅读本文您可以学习到一套比较完整的共享内存使用流程&#xff0c;以及共享内存在使用过程中常见问题的处理方法。</p> 

讯享网

本文介绍一个基于共享内存SHM实现的消息队列,通过该队列能够实现多进程数据通信。在工程项目中,也具有一定的参考价值。

具体内容如下:

  1. SHM循环队列需求说明
  2. SHM循环队列整体设计
  3. SHM Manager设计
  4. SHM Mutex设计
  5. Circular Queue结构设计
  6. Circular Queue入队设计
  7. Circular Queue出队设计
  8. SHM Queue设计
  9. 总结
  • 需要基于共享内存,实现多个进程(非父子关系)之间相互通信的消息队列。
  • 每个进程都可以写入消息和读出消息。
  • 共享内存的消息队列使用循环队列。

其中某个进程崩溃时,要尽可能不影响其他进程访问共享内存。
在这里插入图片描述
讯享网

为了实现上述的SHM消息队列,需要完成如下一些模块的设计:

  1. 一个管理SHM对象的模块,SHM Manager,用于创建/销毁或打开/关闭SHM对象。
  2. 一个维护循环队列的模块,Circular Queue。
  3. 一个保证多个进程安全访问共享内存的同步模块,SHM Mutex。
  4. 一个用户调用接口模块,SHM Queue。
    在这里插入图片描述

为了简化设计,我们将进程分为两类:

  1. 一个manager进程,用于创建和销毁SHM资源,同时也能读写消息。
  2. 多个Normal进程,不需要创建和销毁SHM资源,只需要打开和关闭由manager进程创建的SHM资源,并通过循环队列读写消息。

程序运行流程如下:
图片
在Manager进程启动,并创建SHM资源后,SHM进入可用状态。在此状态中,Normal进程才能启动并读写SHM循环消息队列。

这里我们使用POSIX SHM接口(使用其他SHM方式也可以)。

ShmManager接口定义如下:

讯享网

创建/打开SHM资源的代码如下:

 

上面的代码产生了一个共享内存的区域,共享内存的首地址为start_ptr_,大小为shm_size_。

对于manager进程来说,调用shm_open时,需要使用O_RDWR | O_CREAT参数,以确保能新建SHM对象。对于normal进程,需要使用O_RDWR参数,以确保打开已有的SHM对象。

normal进程不需要调用ftruncate函数,来修改共享内存大小。

另外,manager进程需要在调用shm_open前,调用shm_unlink函数,以防止之前由于进程崩溃等原因,残留的shm文件能够被清理掉。以增加程序稳定性。

销毁/关闭SHM资源的代码如下:

讯享网

对于normal进程,在退出过程中不需要删除shm文件。

对于SHM文件,如果在调用shm_unlink函数之前,其它进程已经打开了该文件的文件描述,那么此文件描述一直有效,即使某些进程通过shm_unlink函数删除了该文件。

GetMemroy接口实现如下:

 

这个接口用来将SHM内存区传递给Circular Queue。

进程间的同步方式由很多种,其中不依赖SHM的简单方式有文件锁和信号量。

由于我们这里是对共享内存中的循环队列访问进行同步,使用同步对象是SHM对象已经被创建出来了,因此,这里可以采用需要依赖SHM的同步方式。

这里使用linux的互斥锁。

SHM Mutex接口定义如下:

讯享网

初始化接口Init的实现如下:

 

将pthread_mutex_t对象放在SHM内存区,并且设置PTHREAD_PROCESS_SHARED属性,就可以使得pthread_mutex_t对象能够完成进程之间的同步。

另外,为了防止单一进程在获取到锁之后,还没来得及解锁就出现进程崩溃,从而导致其它进程无法获取到锁而卡死(stalled)。这里为pthread_mutex_t对象设置了PTHREAD_MUTEX_ROBUST属性。

设置PTHREAD_MUTEX_ROBUST属性后,如果某个进程崩溃了,其它进程调用pthread_mutex_lock函数时,会有一个进程获得锁并返回EOWNERDEAD,这时需要该进程调用pthread_mutex_consistent函数以修复该pthread_mutex_t对象。

加锁接口lock的实现如下:

讯享网

解锁接口unlock的实现如下:

 

释放接口Release的实现如下:

讯享网

SHMMutex释放时,不应该调用pthread_mutex_destory。

基于SHM Manager产生的共享内存的首地址start_ptr和大小shm_size,我们需要设计一个循环队列,队列的多进程同步使用前面的SHM Mutex。

队列的数据结构如下:

 

共享内存区和循环队列的内存布局如下:
在这里插入图片描述
单个队列消息的内存布局如下:
在这里插入图片描述

由于这里的Message的data长度可变,为了使得共享内存空间得到充分的利用,循环队列的对头指针head和队尾指针tail以字节为单位记录队列数据状态。

Circular Queue接口设计如下:

讯享网

Init接口实现如下:

 

由上层的SHM Queue模块调用,将SHM Manager模块产生的内存区域传递给Circular Queue模块。调用代码如下:

讯享网

这样使得SHM Manager和Circular Queue没有直接模块依赖关系。

队列写入可以分为如下几个步骤:

  1. 数据校验:校验数据有效性,数据不能过大或为空。
  2. 同步加锁:使用SHMMutex加锁,保证操作过程时互斥的。
  3. 队列溢出检查:判断当前数据写入是否会导致超过队列缓存大小。
  4. 数据拷贝:在之前队尾指针指向的内存位置,向后分配一块足够大的内存空间,并将数据拷贝到该内存空间中。
  5. 修改队尾指针:将新的队尾指针写入tail。
  6. 解锁:解锁后其它进程才能加锁操作队列。
    在这里插入图片描述

Push接口实现代码如下:

 

在上述代码中,最复杂的是队列溢出检查和数据拷贝。

队列溢出检查种有两种情况出现队列溢出:

  1. 第一种情况:写入数据之前,队列头head和队列尾old_tail之间的数据跨越了buffer的末尾,新数据Message在old_tail基础上向后移动到new_tail,当head<=new_tail时,队列溢出。
    在这里插入图片描述
  2. 第二种情况:写入数据之前,队列头head和队列尾old_tail之间的数据没有跨越buffer的尾部,新数据Message在old_tail基础上向后移动到buffer的尾部后,长度不够,需要将剩下的数据从buffer的开始位置继续向后写,新的队列尾为new_tail_offset,如果head<=new_tail_offset,则队列溢出。
    在这里插入图片描述

数据拷贝分为三种情况:

  1. 一般情况的数据拷贝:源数据直接拷贝到tail指定的内存区域。
    在这里插入图片描述
  2. 队列缓存区尾部内存太小,不足以存放源数据data。
    在这里插入图片描述
  3. 队列缓存区尾部内存太小,不足以存放没有data的Message结构体(length字段)。
    在这里插入图片描述

如果在加锁后解锁前崩溃,不会对其它进程访问队列产生负面影响。原因如下:

  1. 在加锁后到解锁前这期间,进行了数据拷贝和修改队尾指针两个操作。
  2. 如果在数据拷贝过程中进程崩溃了,由于没有修改队尾指针,即使拷贝数据只进行了一部分,其它进程完全不会感觉到曾经有进程进行过该操作。
  3. 如果在修改队尾指针指针的过程中崩溃了,原子变量操作保证该操作的原子性。
  4. 在SHM Mutex中,使用PTHREAD_MUTEX_ROBUST属性,并配合pthread_mutex_consistent函数,修复了由于崩溃导致的锁状态混乱问题。

队列的出队操作,是写入操作的逆过程,基本流程如下:

  1. 同步加锁
  2. 检查队列是否为空
  3. 数据读出拷贝
  4. 修改队头指针:将新的队尾指针写入head。
  5. 解锁:解锁后其它进程才能加锁操作队列。
    在这里插入图片描述

队列读取的代码如下:

讯享网

这里将head==tail作为队列为空的条件。

在稳定性方面,Pop和前面的Push一样,具有较好的稳定性。在完成修改队头指针之前,如果有进程崩溃,并不会对其他进程有任何影响。如果在完成修改队头指针之后,进程崩溃,其它进程会认为被读走了一个Message,即已出队列。

上面的流程中,数据读出拷贝是最复杂的。和数据写入时的拷贝相对应,也需要对Push中三种情况分别进行处理。这里就不专门分开分析了。

SHM Queue直接调用SHM Manager和SHM Mutex的接口。

接口定义如下:

 

接口实现如下:

讯享网

SHM Queue逻辑比较简单。

本文设计的SHM循环队列,主要分成三个功能子模块SHM Manager、SHM Mutex、Circular Queue,和一个用户调用接口模块SHM Queue:

  1. SHM Manager管理SHM资源的创建和回收。
  2. SHM Mutex封装pthread_mutex_t对象,实现跨进程同步,并具备一定的鲁棒性。
  3. Circular Queue是基于SHM内存块的循环队列,逻辑比较复杂。
  4. SHM Queue提供用户接口。

SHM在使用过程中,一般需要重点关注如下一些问题:

  1. 创建和销毁SHM对象,本文通过单个Manager进程完成该项工作,避免了多进程同时创建和销毁的产生的多进程竞争问题。在某些工程实践场景下,需要处理多进程竞争问题。甚至需要考虑SHM是残留资源,还是正常使用的资源。要处理得很好还是很有挑战的。
  2. 共享内存通信的同步,本文通过封装pthread_mutex_t实现。当然还有很多其它方式,如:文件锁、信号量、读写锁、原子变量、自旋锁、条件变量,甚至socket等。
  3. 管理SHM内存块,本文通过循环队列以字节为单位管理内存区域。这部分很灵活,由很多设计思路,需要根据项目场景来设计。比较常见的方法是将大内存区域划分为多个小的内存块,然后按块分配内存。
  4. 对于通信资源是否可用的判断,通常有两种思路,轮询和通知。轮询的方式,在需要读数据或写数据时,通过不断尝试来检查通信资源是否可读或可写;通知的方式,通过注册回调接口对象或函数,当数据可写或可读时,由底层通知用户。一般情况下,通知的方式会比轮询的方式更好,可以有效避免忙等。但如果时对实时性要求较高的场景,轮询则更合适。也并不是绝对的,需要根据实际情况来判断。对于本文暴露的接口来说,只能用轮询的方式,轮询调用Pop函数或者Push函数。
  5. 稳定性问题,多进程的稳定性问题比单进程要复杂得多。一般考虑如何让一部分进程崩溃后,不影响另外一部分进程的运行。本文的互斥锁封装和循环队列的设计都在尽力考虑这个问题。

关注微信公众号“程序员小阳”,相互交流更多软件开发技术。

小讯
上一篇 2025-05-13 23:30
下一篇 2025-05-28 09:33

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/181621.html