2025年阻塞队列和普通队列(阻塞队列和普通队列哪个好)

阻塞队列和普通队列(阻塞队列和普通队列哪个好)以下的相关的示例代码基于当前使用 pika 的版本是 1 2 最新版本的 以下的相关的示例代码基于当前使用 pika 的版本是 1 2 最新版本的 以下的相关的示例代码基于当前使用 pika 的版本是 1 2 最新版本的 不同的版本的之间客户端的差异挺大的 以前的旧的版本很多的函数和传参再新版里面几乎是木有了 如何保证我们的消息正确的发生到了 MQ 其实这个消息可靠性的问题

大家好,我是讯享网,很高兴认识大家。



以下的相关的示例代码基于当前使用pika的版本是1.2最新版本的!

以下的相关的示例代码基于当前使用pika的版本是1.2最新版本的!

以下的相关的示例代码基于当前使用pika的版本是1.2最新版本的!

不同的版本的之间客户端的差异挺大的!以前的旧的版本很多的函数和传参再新版里面几乎是木有了!

如何保证我们的消息正确的发生到了MQ?

其实这个消息可靠性的问题,在业务处理上某些业务场景还是非常的重要,对于强依赖性的消息的话,我们必须要确保我们的消息正确的投递到我们的MQ上,甚至于需要确保我们的消息被正确的进行消费了,避免重复消费等问题,甚至有些时候我们还不可规避的需要处理消息消费的幂等性的问题.

PS:消息消费的幂等性其实是指我的消息就算多次的消息同一个消息,我的到的结果都是一样的!

所以关于消息可靠性的问题,其实我们的可以分为两侧去分析:

之前的几个示例的中,其实关注消息投递可靠性是没有涉及到,我们的前几个示例只管消息的发送出去,都不在乎它是不是真的意见正确的发送到我们的MQ上面。这一小篇主要来思考一些这些问题。

首先一个消息正确的投递到MD 需要经过的步骤有:

消息投递的过程:消息—》交换机—-》消息队列—》消息持久化存贮

上面的过程中其实每一步都涉及到我们的MQ消息处理可靠性的确认.如何确保我们的消息不丢失,或如何监测到我们的消息投递失败的的监听,这是我们的需要考虑的问题.

所以首先我们需要认清的问题点是消息丢失的可能性有哪几种?

1:消息投递到交换机的时候,就出现了异常—》消息丢失 2:消息已正确的进入到了我们的交换机—-》但进入队列时异常了,或者说是routing_key匹配错了—消息丢失 3:消息正确的进入到我们的消息队列的时候,开启了持久化的时候,再持久化的时候出现问题—消息丢失

针对上述的几种,那我们的需要解决的问题点有:

1:我们的需要确保我们的消息到MQ是正常,成功了或错误了应该有回执通知 2:我们需要确保消息路由到正确的队列上不能出现匹配错误的情况,有错误的情况,应该有回执通知生产者 3:需要确保我们的消息在队列里面正确的存贮—


讯享网
image.png

官网提供的方案其实是有两种的:

使用事务机制

image.png

使用事务虽然可以保证消息的准确达到,但是它极大地牺牲了性能,因此我们为了性能上的要求,可以采用另一种高效的解决方案——通过使用Confirm模式来保证消息的准确性。

使用Confirm模式根据官网pika的文档

第1步是需要开启确认模式:



讯享网

第2步:进行消息发布的时候设置强制标志和处理异常来检查消息是否已传递:

首先是设置强制标志:

讯享网

然后对我们的发布的消息进行一步的捕获处理


下面的在管理端的UI进行关闭:

image.png

点击先的关闭的话,会触发 pika.exceptions.ConnectionClosedByBroker,这种情况下,你客户端只能重启或尝试重新创建新的链接进行处理!

另一种错误的示例:路由到不正确的key

一个完整的错误的示例,就是当我们的发布的消息路由到不存在的,或者说是不对的routing_key的时候的一个情况:

image.png
讯享网

这个是我们的运行发布我们的消息的时候,我们的会正常的接收到异常的信息!

image.png

但是上面的返回的错误的信息,如果我们的想要了解的清除的话,似乎都不知道是意思,并且具体的错误的原因都无法了解!分析到内部的源码的时候发现:

image.png

它抛出其实一个_puback_return!儿这个对应的其实是一个ReturnedMessage:

image.png

来自:from pika.adapters.blocking_connection import ReturnedMessage


所以其实我们的可以从这里获取对应的返回上面相关的信息:我们可以从上面获取到相关的信息如:错误原因:

讯享网

然后查看我们的打印输出的信息:


上面提示reply_text=NO_ROUTE:意思是我们的找到这个队列的路由key!!!


PS:对于捕获到我的异常的地方,我们的可以再次进行消息的确认发布的成功的确认,不过感觉上面那种方式还是一种阻塞的方式,后续使用协程一部的aio-pika应该可以处理提供这些发布消息的阻塞的问题!后续有时间再看看!

消息端的消息消费的确认,通常如果对于我们的一些不太紧要的消息的,我们可以设置回消息的自动的确认机制。但是一些特殊的消息的话,则最好是建议开启我们的手动ack的模式,进行消息消费的完成的确认。(翻车纠正)

这个手动的ack的模式之前示例其实也有涉及过:

讯享网

主要的地方如图示:

如果对消息开启了手动的ack确认的模式,当消费者处理完消息不发送ack回执,此时我们的队列的消息,不会被删除,因为没有收到ACK的确认的消息,此时消息的状态会转变为:Unacked,那么消息会被重新放入队列(状态从Unacked变成Ready),有其他消费者存在时,消息会发送给其他消费者。

如下示例:主要是看消费端:


上面的代码我开始了手动的确认的模式,并且只预取了一个消息,再我们的一直没有回复我们的消息ack的情况下:

image.png

此时消息状态就转为了:Unacked,并且一直阻塞了!!!

此时断开消费者的链接:过些时间后,我们的消息从从 unacked的消息状态会重新变为ready等待消费,又回到我们的原来的队列里面去了!

image.png

MQ的持久化,其实可以分为:

  • 交换机的持久化
讯享网
  • 队列的持久化

  • 讯享网
    消息的持久化

对于如果都开启的话,对MQ的性能肯定是有所影响滴,比较要处理的东西多了!

这里断线主要是针对发送的过程中,有可能出现的异常的问题!

官网基于retry的重试机制:


幂等性简单来说就是用户对于同一个行为的操作发起的一次请求或者多次请求的处理结果都是一致,也就是一次和多次的请求整个过程的相关的数据的变化是一致!不能存在不同的处理逻辑!

消息重试触发的原因多种:

  • MQ Broker与消费端传输消息过程出现网络抖动导致的延迟传输
  • 消费者消息消费过程的异常
  • 定时消息的重复分发
  • ack确认时网络闪断

这种场景的话,主要是消费者的处理消费的异常的情况,如何进行消息的重试消费的处理的问题。目前除了PY以后,好像其他的都有客户端都实现了重试的机制,就是PY木有实现!可怜兮兮的需要自己实现!

此时可以使用第三方库的重试机制来处理这种重试!

如:

讯享网

对于这个tenacity的使用可以参考官网的文档。

我这里不做过多的介绍!

这里的重试其实因为我需要针对某些错误的异常进行重试!一些验证性的异常的话!我觉得你再重试也都是没意义!所以我们在重试的时候强调的是合理重试比如导致你程序出现bug一直无法启动的那种!你就没必要重试了!重试通常主要是针对一些网络异常抖动之类的发引发的错误,重试可能会有机会再挽回的!

消息过程中异常导致消息重复消费完整重试示例(仅供参考):


ps:上面作为演示,只是对应ZeroDivisionError进行异常尝试!这个其实没意义!仅做测试演示!

另外关于消息的取消处理的时候:

讯享网
  • requeue=False:则会消息会直接的被丢弃!
  • requeue=True: 则会一直循环重试重复消费这个消息,只有消费端断开后,这个消息会从unacked的消息状态会重新变为ready

既然有消息重复消费机制,那就可能存在消费被多次消费的可能性,而如果消息被多次的消费的,某些业务场景是不允许!比如转钱!哈哈 所以如何保证消息幂等性呐

通常其实处理机制主要是再消息消费之前检验全局唯一的消息的ID是否被消费过!根据全局唯一消息ID或其他标志来去重而实现。

这种机制基于数据库的实现的方式的话,根据业务逻辑实现的话,其实可以分两种:

  • 如果业务是处理插入操作的话,可以通过数据库表的唯一主键约束来实现,确保表中有且只有这个一个主键值
  • 如果业务是其他操作如更新之类的话,可以使用数据库的乐观锁机制来实现。

还可以基于Redis的原子性实现,消费者在接收到消息的时候,可以根据消息ID或其他全局唯一的ID作为key执行 setnx 命令,如果执行成功就表示没有处理过这条消息,可以进行消费了,如果执行失败那么则表示消息之前已经被消费过了!不需要再进行消费!对于这个redis锁的有效期,则根据自己的业务来决定!

延伸扩展笔记:

乐观锁的意思是:假设数据一般情况下不会造成冲突,在数据进行提交更新处理的时候,才会正式对数据是否冲突进行检测,如果发现冲突,就会让返回用户错误的信息,让用户决定如何进行处理。(错误的概率可能相对不会那么高,对错误的发生保持乐观状态!)

悲观锁:相对于悲观锁而言,悲观锁的话,就是不管怎么样的你想修改我的所数据先要获取上锁,上锁之后其他人别想再来,需要等我处理完我的这个修改后,释放锁了再来获取,悲观锁它是直接对数据进行加锁的方式来以防止并发!(对错误的发生保持悲观状态,觉得有可能真的会发生,为安全,我只能自我保护上锁处理!!使用数据库的锁的方式,效率低,但是安全系数高点)

关于乐观锁一些补充:

乐观锁并未真正加锁,效率高。

方案1:记录数据版本。每次在执行数据的修改操作时,都会带上一个版本号,一旦版本号和数据的版本号一致就可以执行修改操作并对版本号执行+1操作,否则就执行失败,每次操作的版本号都会随之增加。[递增方案,可以多种]

上面这种版本号递增的方式对于高并发场景下的话,是存在一定的问题的!

方案2:更新的的时候使用原子操作

Rabbitma的队列按照不同维度来分,可以分为

  • 排他性队列
  • 普通队列
  • 延迟队列(死信队列)
  • 惰性队列
  • 发布订阅队列

所有的优先队列,其实就是对消息设置一个优先级的编号,用于消息的优先消息的排序。关于优先消息的场景,根据业务来订,如果比如紧急消息之类的通知处理的,需要优先被消费!

通过后端管理UI界面设置创建我们的优先级队列:

其他可选参数可以通过管理界面查看:

image.png

通过代码的方式设置创建我们的优先级队列和消息优先级:

image.png

首先启动生产端:发布20调测试信息:

image.png

然后再启动我们的客户端查看消费结果:

优先级越高,它就越先消费了~

image.png

普通队列和惰性队列的区别:

  • 普通队列消息是存在MQ的内存中,消息会占MQ的内存
  • 惰性队列的消息是存在磁盘中,消息会占磁盘的空间,但是数据会比较小(只是写入磁盘,但是不代表不会丢失,如果没启动持久化,重启MQ一样会丢失,所以和持久化队列有所区别,当然也可以惰性+持久化双核混搭!)

惰性队列其实关注的点是:消息存贮的方式它是存在内存还是存在磁盘。

惰性队列是把消息存在在磁盘中,当消息到MQ的时候,MQ把消息写入磁盘,而消费者需要获取消息的的时候,MQ需要先从磁盘读取消息到MQ的内存,再分发给我们的消费者!但是这个这个过程是一个耗时的过程。

惰性队列的应用场景:

  • 消费端异常,导致MQ消息积压的时候,为避免MQ内存爆满,把消息存在磁盘中!

通过后端管理UI界面设置创建我们的惰性队列:

image.png
  • Lazy mode(x-queue-mode=lazy):Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中

通过代码的方式设置创建惰性队列:


验证持久化和惰性的:发送消息到一个没有持久化的队列中:

然后重启的MQ:

image.png
讯享网

观察队列没了!

更换另一种方式,此时我们的也把惰性+队列持久化一起启用,然后再重启MQ的话:

image.png

委屈了!!忘记了消息也需要持久化了!!!我们的队列是不会消息了!但是消息没了!此时再加上我们的消息也持久化再测试!

image.png

观察重启后(惰性+队列持久化—消息持久化):

image.png

惰性和普通队列的取舍:

  • 惰性是为了减少消息对MQ内存的占用,避免相关内存不足而产生换页操作(内存和磁盘之前一种空间置换)
  • 处理效率普通队列消息直接从内存获取,效率比惰性高
  • 如果对效率执行要求不是很高,使用惰性的话,可以减少消息占用占用MQ内存的问题

但是这里,结合上面的惰性的话,如果你的死信队列也上了一个量级的话!其实可以进一步优化我们的死信队列也是一个惰性队列,这样其实即可以减小内存占用,又可以实现消息的延迟消费!这样也是可以考虑的一种方案!

关于延迟队列!上一小节已有讲述,这里复制过来!

3.2.1.1 死信消息和死信队列定义

关于死信说明的官方文档地址为:https://www.rabbitmq.com/ttl.html#per-queue-message-ttl

需要了解的:Dead Letter Exchange 死信队列(DLX)队列的简称。

另外对于死信消息:通常如果我们的一个消息存在以下的情况下的话则这消息被称为死信消息:

  • 1:消息被消费端拒绝,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false
  • 2:消息在队列的存活时间超过设置的TTL时间。
  • 3:消息队列的消息数量已经超过最大队列长度,无法再继续新增消息到MQ中
  • 4:一个队列中的消息的TTL对其他队列中同一条消息的TTL没有影响

对于死信消息的处理,Rabbitmq会依据是否配置死信队列的配置来决定消息的去留!如果开启了配置死信队列信息,则消息会被转移到这个 死信队列(DLX)中,如果没有配置,则此消息会被丢弃!

3.2.1.2 死信队列配置

官网文档:https://www.rabbitmq.com/dlx.html

  • 可以为每一个需要使用死信业务的队列配置一个死信交换机
  • 每个队列都可以配置专属自己的死信队列,相关消息的进入死信队列需要经过死信交换机来进程归纳处理
  • 死信交换机也只是一个普通的交换机,只是它是用来专门处理死信的交换机
  • 创建队列时可以给这个队列附带一个死信的交换机,在这个队列里因各自情况出现问题的作废的消息会被重新发到附带的交换机,然后让这个交换机重新路由这条消息。

具体的图示:

image.png

若要使用策略指定DLX,请将键“死信交换”添加到策略定义中。例如:


上面的策略将DLX队列“my-dlx”应用于所有队列。上面只是一个例子,实际上不同的队列可能会使用不同的死字设置(或者根本不使用)。

其他配置死信队里的方式有:

讯享网

PS:当指定了死信交换机后时,除了通常对声明队列的配置权限外,用户还需要对该队列具有读取权限,并对死信交换机具有写权限。权限在队列声明时进行验证。

完整的一个简单的示例:

下面的示例主要是演示里:1:设置消息的过期的时间为2s,2s之后就变为我们的死信

2:变为死信的消息,会被转移到我们的另一个死信交换机的队列上


运行上面的生产者的代码后观察我们的输出:中国发出了8个消息,

讯享网

结果这个8个消息都没有人去消费的时候:最后都转移到了死信的队列里面:

image.png
image.png

关于死信队列需要注意的点(来自官网的说明):

消息在发布到死信队列后DLX目标队列后会立即从原始队列中删除。这确保没有可能出现过多的消息积累,从而耗尽代理资源,但这确实意味着,如果目标队列无法接受消息,消息可能会丢失。

3.2.1.3 死信队列里面的死信的消费

当我们的死信消费者去消费死信消息时候,需要注意点有:

我们的“死信”消息消息的properties里面的header字段信息中增加一个叫做“x-death”的数组内容,包含了以下字段内容:


其中我们的’x-death’内容为::

讯享网

具体每个字段的意思是:

  • queue :进入死信队列之前来自于哪个的消息队列名称
  • reason:这个消息变为死信的原因?expired 表示是因为过期!变为死信!
  • count:这个消息在这个队列中被死了多少次
  • time:该消息发布时间
  • exchange :消息已发布到哪些交换机上,PS:如果这个消息是多次变为死信的话,这个地方最后就是死信的交换机
  • routing-keys 消息发不来来源的路由keys
  • original-expiration:原消息的过期时间属性,PS:(如果消息是死信的话)每条消息ttl):。这个过期属性将从死信中删除,以防止它在被路由到的任何队列中再次过期。
  • x-first-death-exchange:第一次变成死死信的时候来源的交换机
  • x-first-death-queue:第一次变成死信的时候来源队列
  • x-first-death-reason:第一次变成死信的原因:expired 表示是因为过期!

其他变为死信的原因的说明:


3.2.1.4 延迟队列

RabbitMQ本身没有直接支持延迟队列功能,但是通过对死信队列和过期时间的使用,其实我们可以综合起上面的两个特性来实现一个所谓的延迟队列,延迟队列的意思就是:某个消息再某个固定的时间后失效后,则进入到死信队列里面,其他死信的消费者实时的处理这些过期的消息,这个就可以起到一个延迟处理的效果!

延迟队列加上惰性队列的组合,即可以减小内存占用,又可以实现消息的延迟处理

在开发启动调试阶段,需要看到相关内置日志信息的话,通常需要开启一下日志配置(只需要在启动前配置即可):

讯享网

开启后可以看到具体的链接过程信息:


还有错误时候的日志信息:

讯享网

以上是大部门代码是来自官网提供的一些简单案例,结合自己的实践做的简单的笔记!如有笔误!欢迎批评指正!感谢各位大佬!

简单小笔记!仅供参考!

简书:https://www.jianshu.com/u/db087

掘金:https://juejin.cn/user/25608

公众号:微信搜【小儿来一壶枸杞酒泡茶】

小钟同学 | 文  【原创】【欢迎一起学习交流】| :

小讯
上一篇 2025-06-11 07:09
下一篇 2025-05-27 18:17

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/174835.html