2025年RabbitMQ死信队列原理与项目代码示例

RabbitMQ死信队列原理与项目代码示例1 产生死信消息的原因 当在消费消息时 如果队列里的消息出现以下情况 那么该消息将成为一条死信消息 当一条消息被使用 channel basicNack 方法 或 channel basicReject 方法所 nack 响应 并且此时 requeue 属性被设置为 false 消息在队列的存活时间超过设置的生存时间

大家好,我是讯享网,很高兴认识大家。
1、产生死信消息的原因

当在消费消息时,如果队列里的消息出现以下情况,那么该消息将成为一条死信消息:

  • 当一条消息被使用 channel.basicNack方法 或 channel.basicReject方法所nack响应 ,并且此时requeue 属性被设置为false。
  • 消息在队列的存活时间超过设置的生存时间(TTL)时间。
  • 消息队列的消息数量超过了设置的最大队列长度。

死信队列(DLQ)非常简单,就一个普通的队列,只不过它是和死信交换机绑定的而已,在声明队列的时候,通过x-dead-letter-exchange参数和x-dead-letter-routing-key指定死信交换机以及死信路由键即可。

  • 参数dead-letter-exchange:指定死信交换机。
  • 参数dead-letter-routing-key:指定死信路由键,用于绑定死信交换机和死信队列。
2、代码实现

pom.xml

plugins { 
    id 'java' id 'org.springframework.boot' version '3.1.1' id 'io.spring.dependency-management' version '1.1.0' } group = 'com.kexuexiong' version = '0.0.1-SNAPSHOT' java { 
    sourceCompatibility = '17' } configurations { 
    compileOnly { 
    extendsFrom annotationProcessor } } repositories { 
    // mavenCentral() maven { 
    url 'https://maven.aliyun.com/repository/public' } } dependencies { 
    implementation 'org.springframework.boot:spring-boot-starter-jdbc' implementation 'org.springframework.boot:spring-boot-starter-validation' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:3.0.2' compileOnly 'org.projectlombok:lombok' runtimeOnly 'com.mysql:mysql-connector-j' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter-test:3.0.2' // https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp implementation 'org.springframework.boot:spring-boot-starter-amqp' implementation 'cn.hutool:hutool-all:5.8.16' } tasks.named('test') { 
    useJUnitPlatform() } 

讯享网

yml配置文件:
搭建的是RabbitMQ集群:192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672
详细搭建过程可以参考往期中的RabbitMQ集群搭建。

讯享网server: port: 8014 spring: rabbitmq: username: admin password:  dynamic: true # port: 5672 # host: 192.168.49.9 addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672 publisher-confirm-type: correlated publisher-returns: true application: name: shushan datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://ip/shushan username: root password: hikari: minimum-idle: 10 maximum-pool-size: 20 idle-timeout: 50000 max-lifetime:  connection-test-query: select 1 connection-timeout:  

常量文件:

package com.kexuexiong.shushan.common.mq; import javax.swing.plaf.PanelUI; public class MqConstant { 
    public final static String demoDirectQueue = "demoDirectQueue"; public final static String demoDirectExchange = "demoDirectExchange"; public final static String demoDirectRouting = "demoDirectRouting"; public final static String lonelyDirectExchange = "lonelyDirectExchange"; public final static String topicExchange = "topicExchange"; public final static String BIG_CAR_TOPIC = "topic.big_car"; public final static String SMALL_CAR_TOPIC = "topic.small_car"; public final static String TOPIC_ALL = "topic.#"; public final static String FANOUT_A = "fanout.A"; public final static String FANOUT_B = "fanout_B"; public final static String FANOUT_C = "fanout_c"; public final static String FANOUT_EXCHANGE = "fanoutExchange"; public final static String DEAD_LETTER_EXCHANGE = "dead.letter.exchange"; public final static String DEAD_LETTER_QUEUE = "dead.letter.queue"; public final static String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key"; public final static String BUSINESS_QUEUE = "business.queue"; public final static String BUSINESS_ROUTING_KEY = "business.routing.key"; public final static String BUSINESS_EXCHANGE = "business.exchange"; } 

RabbitMQ配置文件:

讯享网package com.kexuexiong.shushan.common.mq; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class DirectDLXRabbitConfig { 
    @Bean public Queue businessDirectQueue() { 
    HashMap<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",MqConstant.DEAD_LETTER_EXCHANGE);//设置死信交换机 args.put("x-dead-letter-routing-key",MqConstant.DEAD_LETTER_ROUTING_KEY); return QueueBuilder.durable(MqConstant.BUSINESS_QUEUE).withArguments(args).build(); } @Bean DirectExchange businessDirectExchange() { 
    return new DirectExchange(MqConstant.BUSINESS_EXCHANGE, true, false); } @Bean DirectExchange deadLetterDirectExchange() { 
    return new DirectExchange(MqConstant.DEAD_LETTER_EXCHANGE, true, false); } @Bean public Queue deadLetterDirectQueue() { 
    return new Queue(MqConstant.DEAD_LETTER_QUEUE, true, false, false); } @Bean Binding deadLetterBingingDirect() { 
    return BindingBuilder.bind(deadLetterDirectQueue()).to(deadLetterDirectExchange()).with(MqConstant.DEAD_LETTER_ROUTING_KEY); } @Bean Binding businessBingingDirect() { 
    return BindingBuilder.bind(businessDirectQueue()).to(businessDirectExchange()).with(MqConstant.BUSINESS_ROUTING_KEY); } } 

队列绑定死信交换机:
在这里插入图片描述
讯享网

生产者:

package com.kexuexiong.shushan.controller.mq; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.RandomUtil; import com.kexuexiong.shushan.common.mq.MqConstant; import com.kexuexiong.shushan.controller.BaseController; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID; @Slf4j @RestController @RequestMapping("/mq/") public class MqController extends BaseController { 
    @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/deadLetterSendDirectMessage") public String deadLetterSendDirectMessage(){ 
    String msgId = UUID.randomUUID().toString(); String msg = "demo msg ,dead letter!!"; String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss"); Map<String,Object> map = new HashMap(); map.put("msgId",msgId); map.put("msg",msg); map.put("createTime",createTime); rabbitTemplate.convertAndSend(MqConstant.BUSINESS_EXCHANGE,MqConstant.BUSINESS_ROUTING_KEY,map); return "ok"; } @GetMapping("/deadLetterTimeOutSendDirectMessage") public String deadLetterTimeOutSendDirectMessage(){ 
    String msgId = UUID.randomUUID().toString(); String msg = "demo msg ,dead letter!!"; String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss"); log.info("msg time :" +createTime); Map<String,Object> map = new HashMap(); map.put("msgId",msgId); map.put("msg",msg); map.put("createTime",createTime); Integer randomInt = RandomUtil.randomInt(30000); map.put("randomTime",randomInt); MessagePostProcessor messagePostProcessor = message -> { 
    // 设置消息过期时间为5秒 message.getMessageProperties().setExpiration("30000"); return message; }; rabbitTemplate.convertAndSend(MqConstant.BUSINESS_EXCHANGE,MqConstant.BUSINESS_ROUTING_KEY,map,messagePostProcessor); return "ok"; } } 

消费者:

讯享网package com.kexuexiong.shushan.common.mq; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MessageListenerConfig { 
    @Autowired private CachingConnectionFactory connectionFactory; @Autowired private AckReceiver ackReceiver; @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { 
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); simpleMessageListenerContainer.setConcurrentConsumers(2); simpleMessageListenerContainer.setMaxConcurrentConsumers(2); simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPIC simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE); simpleMessageListenerContainer.setMessageListener(ackReceiver); return simpleMessageListenerContainer; } } 

AckReceiver:

package com.kexuexiong.shushan.common.mq; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.Map; import java.util.Objects; @Slf4j @Component public class AckReceiver implements ChannelAwareMessageListener { 
    @Override public void onMessage(Message message, Channel channel) throws Exception { 
    long deliveryTag = message.getMessageProperties().getDeliveryTag(); byte[] messageBody = message.getBody(); try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(messageBody));) { 
    Map<String, String> msg = (Map<String, String>) inputStream.readObject(); log.info(message.getMessageProperties().getConsumerQueue()+"-ack Receiver :" + msg); log.info("header msg :"+message.getMessageProperties().getHeaders()); if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){ 
    //拒绝 channel.basicNack(deliveryTag,false,false); }else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){ 
    channel.basicAck(deliveryTag, true); }else { 
    channel.basicAck(deliveryTag, true); } } catch (Exception e) { 
    channel.basicReject(deliveryTag, false); log.error(e.getMessage()); } } } 

测试:

1、当一条消息被使用 channel.basicNack方法 或 channel.basicReject方法所nack响应 ,并且此时requeue 属性被设置为false

在这里插入图片描述

讯享网2023-10-11T16:18:13.124+08:00 INFO 28104 --- [enerContainer-2] c.k.shushan.common.mq.AckReceiver : business.queue-ack Receiver :{ 
   msg=demo msg ,dead letter!!, createTime=2023-10-11 16:18:13, msgId=9b31b4b3-c58f-47bd-8b27-cac4f53ae120} 2023-10-11T16:18:13.125+08:00 INFO 28104 --- [enerContainer-2] c.k.shushan.common.mq.AckReceiver : header msg :{ 
   spring_listener_return_correlation=995de3d2-d5a8-42fe-91e6-992fd485d20d} 2023-10-11T16:18:13.125+08:00 INFO 28104 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null 2023-10-11T16:18:13.125+08:00 INFO 28104 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :true 2023-10-11T16:18:13.125+08:00 INFO 28104 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :null 2023-10-11T16:18:13.127+08:00 INFO 28104 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : dead.letter.queue-ack Receiver :{ 
   msg=demo msg ,dead letter!!, createTime=2023-10-11 16:18:13, msgId=9b31b4b3-c58f-47bd-8b27-cac4f53ae120} 2023-10-11T16:18:13.127+08:00 INFO 28104 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : header msg :{ 
   spring_listener_return_correlation=995de3d2-d5a8-42fe-91e6-992fd485d20d, x-first-death-exchange=business.exchange, x-death=[{ 
   reason=rejected, count=1, exchange=business.exchange, time=Wed Oct 11 16:18:14 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=rejected, x-first-death-queue=business.queue} 
if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){ 
    //拒绝 channel.basicNack(deliveryTag,false,false); } 

消费者代码中拒绝消费,然后消息被路由到死信队列中。

讯享网{ 
   spring_listener_return_correlation=995de3d2-d5a8-42fe-91e6-992fd485d20d, x-first-death-exchange=business.exchange, x-death=[{ 
   reason=rejected, count=1, exchange=business.exchange, time=Wed Oct 11 16:18:14 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=rejected, x-first-death-queue=business.queue} 

reason为rejected,并记录原交换机 exchange=business.exchange

2、消息在队列的存活时间超过设置的生存时间(TTL)时间

将消费者中的MqConstant.BUSINESS_QUEUE去掉,后测试。
设置消息过期时间可以在队列中设置:增加参数x-message-ttl

 @Bean("businessQueue") public Queue businessQueue() { 
    Map<String, Object> args = new HashMap<>(16); // 设置当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // 设置当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY); // 设置消息的过期时间 单位:ms(毫秒) args.put("x-message-ttl", 5000); return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build(); } 

也可以针对每个消息进行设置过期时间:

讯享网 @GetMapping("/deadLetterTimeOutSendDirectMessage") public String deadLetterTimeOutSendDirectMessage(){ 
    String msgId = UUID.randomUUID().toString(); String msg = "demo msg ,dead letter!!"; String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss"); log.info("msg time :" +createTime); Map<String,Object> map = new HashMap(); map.put("msgId",msgId); map.put("msg",msg); map.put("createTime",createTime); Integer randomInt = RandomUtil.randomInt(30000); map.put("randomTime",randomInt); MessagePostProcessor messagePostProcessor = message -> { 
    // 设置消息过期时间为5秒 message.getMessageProperties().setExpiration("30000"); return message; }; rabbitTemplate.convertAndSend(MqConstant.BUSINESS_EXCHANGE,MqConstant.BUSINESS_ROUTING_KEY,map,messagePostProcessor); return "ok"; } 

在这里插入图片描述
测试:
在这里插入图片描述

2023-10-11T16:28:49.052+08:00 INFO 25848 --- [nio-8014-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet' 2023-10-11T16:28:49.052+08:00 INFO 25848 --- [nio-8014-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet' 2023-10-11T16:28:49.053+08:00 INFO 25848 --- [nio-8014-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms 2023-10-11T16:28:49.099+08:00 INFO 25848 --- [nio-8014-exec-1] c.k.shushan.controller.mq.MqController : msg time :2023-10-11 16:28:49 2023-10-11T16:28:49.120+08:00 INFO 25848 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null 2023-10-11T16:28:49.121+08:00 INFO 25848 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :true 2023-10-11T16:28:49.121+08:00 INFO 25848 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :null 2023-10-11T16:29:19.123+08:00 INFO 25848 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : dead.letter.queue-ack Receiver :{ 
   msg=demo msg ,dead letter!!, randomTime=22235, createTime=2023-10-11 16:28:49, msgId=4793dd5c-70c2-4c2f-a6ae-cdc6663e0b05} 2023-10-11T16:29:19.123+08:00 INFO 25848 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : header msg :{ 
   spring_listener_return_correlation=22fdc49f-30da-4524-9fc8-d4f69eb4c2c3, x-first-death-exchange=business.exchange, x-death=[{ 
   reason=expired, original-expiration=30000, count=1, exchange=business.exchange, time=Wed Oct 11 16:29:20 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=expired, x-first-death-queue=business.queue} 

在这里插入图片描述
30秒过期,进入死信队列,然后被消费。

3、 消息队列的消息数量超过了设置的最大队列长度

修改RabbitMQ配置文件,创建businessDirectQueue时增加x-max-length设置容量的参数。

讯享网package com.kexuexiong.shushan.common.mq; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class DirectDLXRabbitConfig { 
    @Bean public Queue businessDirectQueue() { 
    HashMap<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",MqConstant.DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key",MqConstant.DEAD_LETTER_ROUTING_KEY); args.put("x-max-length", 3); return QueueBuilder.durable(MqConstant.BUSINESS_QUEUE).withArguments(args).build(); } @Bean DirectExchange businessDirectExchange() { 
    return new DirectExchange(MqConstant.BUSINESS_EXCHANGE, true, false); } @Bean DirectExchange deadLetterDirectExchange() { 
    return new DirectExchange(MqConstant.DEAD_LETTER_EXCHANGE, true, false); } @Bean public Queue deadLetterDirectQueue() { 
    return new Queue(MqConstant.DEAD_LETTER_QUEUE, true, false, false); } @Bean Binding deadLetterBingingDirect() { 
    return BindingBuilder.bind(deadLetterDirectQueue()).to(deadLetterDirectExchange()).with(MqConstant.DEAD_LETTER_ROUTING_KEY); } @Bean Binding businessBingingDirect() { 
    return BindingBuilder.bind(businessDirectQueue()).to(businessDirectExchange()).with(MqConstant.BUSINESS_ROUTING_KEY); } } 

在这里插入图片描述

测试:
在这里插入图片描述

在这里插入图片描述
第三次请求的结果:
在这里插入图片描述

第四次请求结果:
在这里插入图片描述
控制台输出,死信队列接收到消息。

2023-10-11T16:46:54.783+08:00 INFO 20444 --- [nectionFactory2] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null 2023-10-11T16:46:54.783+08:00 INFO 20444 --- [nectionFactory2] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :true 2023-10-11T16:46:54.783+08:00 INFO 20444 --- [nectionFactory2] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :null 2023-10-11T16:46:54.787+08:00 INFO 20444 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : dead.letter.queue-ack Receiver :{ 
   msg=demo msg ,dead letter!!, createTime=2023-10-11 16:44:35, msgId=db7d2d7f-12a0-4ca4-9e92-81758cda436e} 2023-10-11T16:46:54.787+08:00 INFO 20444 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : header msg :{ 
   spring_listener_return_correlation=e2a1871a-7765-43ee-b44c-d80d0ac6323c, x-first-death-exchange=business.exchange, x-death=[{ 
   reason=maxlen, count=1, exchange=business.exchange, time=Wed Oct 11 16:46:56 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=maxlen, x-first-death-queue=business.queue} 

到这里三种情况都介绍完了,总体来讲RabbitMQ的死信队列还是很简单的。但是它作用还是很强大的,可以用于实现延时消息,订单到时取消等。

小讯
上一篇 2025-03-09 23:21
下一篇 2025-01-18 10:39

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/38874.html