1、产生死信消息的原因
当在消费消息时,如果队列里的消息出现以下情况,那么该消息将成为一条死信消息:
- 当一条消息被使用 channel.basicNack方法 或 channel.basicReject方法所nack响应 ,并且此时requeue 属性被设置为false。
- 消息在队列的存活时间超过设置的生存时间(TTL)时间。
- 消息队列的消息数量超过了设置的最大队列长度。
死信队列(DLQ)非常简单,就一个普通的队列,只不过它是和死信交换机绑定的而已,在声明队列的时候,通过x-dead-letter-exchange参数和x-dead-letter-routing-key指定死信交换机以及死信路由键即可。
- 参数dead-letter-exchange:指定死信交换机。
- 参数dead-letter-routing-key:指定死信路由键,用于绑定死信交换机和死信队列。
2、代码实现
pom.xml
plugins {
id 'java' id 'org.springframework.boot' version '3.1.1' id 'io.spring.dependency-management' version '1.1.0' } group = 'com.kexuexiong' version = '0.0.1-SNAPSHOT' java {
sourceCompatibility = '17' } configurations {
compileOnly {
extendsFrom annotationProcessor } } repositories {
// mavenCentral() maven {
url 'https://maven.aliyun.com/repository/public' } } dependencies {
implementation 'org.springframework.boot:spring-boot-starter-jdbc' implementation 'org.springframework.boot:spring-boot-starter-validation' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:3.0.2' compileOnly 'org.projectlombok:lombok' runtimeOnly 'com.mysql:mysql-connector-j' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter-test:3.0.2' // https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp implementation 'org.springframework.boot:spring-boot-starter-amqp' implementation 'cn.hutool:hutool-all:5.8.16' } tasks.named('test') {
useJUnitPlatform() }
讯享网
yml配置文件:
搭建的是RabbitMQ集群:192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672
详细搭建过程可以参考往期中的RabbitMQ集群搭建。
讯享网server: port: 8014 spring: rabbitmq: username: admin password: dynamic: true # port: 5672 # host: 192.168.49.9 addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672 publisher-confirm-type: correlated publisher-returns: true application: name: shushan datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://ip/shushan username: root password: hikari: minimum-idle: 10 maximum-pool-size: 20 idle-timeout: 50000 max-lifetime: connection-test-query: select 1 connection-timeout:
常量文件:
package com.kexuexiong.shushan.common.mq; import javax.swing.plaf.PanelUI; public class MqConstant {
public final static String demoDirectQueue = "demoDirectQueue"; public final static String demoDirectExchange = "demoDirectExchange"; public final static String demoDirectRouting = "demoDirectRouting"; public final static String lonelyDirectExchange = "lonelyDirectExchange"; public final static String topicExchange = "topicExchange"; public final static String BIG_CAR_TOPIC = "topic.big_car"; public final static String SMALL_CAR_TOPIC = "topic.small_car"; public final static String TOPIC_ALL = "topic.#"; public final static String FANOUT_A = "fanout.A"; public final static String FANOUT_B = "fanout_B"; public final static String FANOUT_C = "fanout_c"; public final static String FANOUT_EXCHANGE = "fanoutExchange"; public final static String DEAD_LETTER_EXCHANGE = "dead.letter.exchange"; public final static String DEAD_LETTER_QUEUE = "dead.letter.queue"; public final static String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key"; public final static String BUSINESS_QUEUE = "business.queue"; public final static String BUSINESS_ROUTING_KEY = "business.routing.key"; public final static String BUSINESS_EXCHANGE = "business.exchange"; }
RabbitMQ配置文件:
讯享网package com.kexuexiong.shushan.common.mq; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class DirectDLXRabbitConfig {
@Bean public Queue businessDirectQueue() {
HashMap<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",MqConstant.DEAD_LETTER_EXCHANGE);//设置死信交换机 args.put("x-dead-letter-routing-key",MqConstant.DEAD_LETTER_ROUTING_KEY); return QueueBuilder.durable(MqConstant.BUSINESS_QUEUE).withArguments(args).build(); } @Bean DirectExchange businessDirectExchange() {
return new DirectExchange(MqConstant.BUSINESS_EXCHANGE, true, false); } @Bean DirectExchange deadLetterDirectExchange() {
return new DirectExchange(MqConstant.DEAD_LETTER_EXCHANGE, true, false); } @Bean public Queue deadLetterDirectQueue() {
return new Queue(MqConstant.DEAD_LETTER_QUEUE, true, false, false); } @Bean Binding deadLetterBingingDirect() {
return BindingBuilder.bind(deadLetterDirectQueue()).to(deadLetterDirectExchange()).with(MqConstant.DEAD_LETTER_ROUTING_KEY); } @Bean Binding businessBingingDirect() {
return BindingBuilder.bind(businessDirectQueue()).to(businessDirectExchange()).with(MqConstant.BUSINESS_ROUTING_KEY); } }
队列绑定死信交换机:

讯享网
生产者:
package com.kexuexiong.shushan.controller.mq; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.RandomUtil; import com.kexuexiong.shushan.common.mq.MqConstant; import com.kexuexiong.shushan.controller.BaseController; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID; @Slf4j @RestController @RequestMapping("/mq/") public class MqController extends BaseController {
@Autowired RabbitTemplate rabbitTemplate; @GetMapping("/deadLetterSendDirectMessage") public String deadLetterSendDirectMessage(){
String msgId = UUID.randomUUID().toString(); String msg = "demo msg ,dead letter!!"; String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss"); Map<String,Object> map = new HashMap(); map.put("msgId",msgId); map.put("msg",msg); map.put("createTime",createTime); rabbitTemplate.convertAndSend(MqConstant.BUSINESS_EXCHANGE,MqConstant.BUSINESS_ROUTING_KEY,map); return "ok"; } @GetMapping("/deadLetterTimeOutSendDirectMessage") public String deadLetterTimeOutSendDirectMessage(){
String msgId = UUID.randomUUID().toString(); String msg = "demo msg ,dead letter!!"; String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss"); log.info("msg time :" +createTime); Map<String,Object> map = new HashMap(); map.put("msgId",msgId); map.put("msg",msg); map.put("createTime",createTime); Integer randomInt = RandomUtil.randomInt(30000); map.put("randomTime",randomInt); MessagePostProcessor messagePostProcessor = message -> {
// 设置消息过期时间为5秒 message.getMessageProperties().setExpiration("30000"); return message; }; rabbitTemplate.convertAndSend(MqConstant.BUSINESS_EXCHANGE,MqConstant.BUSINESS_ROUTING_KEY,map,messagePostProcessor); return "ok"; } }
消费者:
讯享网package com.kexuexiong.shushan.common.mq; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MessageListenerConfig {
@Autowired private CachingConnectionFactory connectionFactory; @Autowired private AckReceiver ackReceiver; @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); simpleMessageListenerContainer.setConcurrentConsumers(2); simpleMessageListenerContainer.setMaxConcurrentConsumers(2); simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPIC simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE); simpleMessageListenerContainer.setMessageListener(ackReceiver); return simpleMessageListenerContainer; } }
AckReceiver:

package com.kexuexiong.shushan.common.mq; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.Map; import java.util.Objects; @Slf4j @Component public class AckReceiver implements ChannelAwareMessageListener {
@Override public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag(); byte[] messageBody = message.getBody(); try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(messageBody));) {
Map<String, String> msg = (Map<String, String>) inputStream.readObject(); log.info(message.getMessageProperties().getConsumerQueue()+"-ack Receiver :" + msg); log.info("header msg :"+message.getMessageProperties().getHeaders()); if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){
//拒绝 channel.basicNack(deliveryTag,false,false); }else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){
channel.basicAck(deliveryTag, true); }else {
channel.basicAck(deliveryTag, true); } } catch (Exception e) {
channel.basicReject(deliveryTag, false); log.error(e.getMessage()); } } }
测试:
1、当一条消息被使用 channel.basicNack方法 或 channel.basicReject方法所nack响应 ,并且此时requeue 属性被设置为false

讯享网2023-10-11T16:18:13.124+08:00 INFO 28104 --- [enerContainer-2] c.k.shushan.common.mq.AckReceiver : business.queue-ack Receiver :{
msg=demo msg ,dead letter!!, createTime=2023-10-11 16:18:13, msgId=9b31b4b3-c58f-47bd-8b27-cac4f53ae120} 2023-10-11T16:18:13.125+08:00 INFO 28104 --- [enerContainer-2] c.k.shushan.common.mq.AckReceiver : header msg :{
spring_listener_return_correlation=995de3d2-d5a8-42fe-91e6-992fd485d20d} 2023-10-11T16:18:13.125+08:00 INFO 28104 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null 2023-10-11T16:18:13.125+08:00 INFO 28104 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :true 2023-10-11T16:18:13.125+08:00 INFO 28104 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :null 2023-10-11T16:18:13.127+08:00 INFO 28104 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : dead.letter.queue-ack Receiver :{
msg=demo msg ,dead letter!!, createTime=2023-10-11 16:18:13, msgId=9b31b4b3-c58f-47bd-8b27-cac4f53ae120} 2023-10-11T16:18:13.127+08:00 INFO 28104 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : header msg :{
spring_listener_return_correlation=995de3d2-d5a8-42fe-91e6-992fd485d20d, x-first-death-exchange=business.exchange, x-death=[{
reason=rejected, count=1, exchange=business.exchange, time=Wed Oct 11 16:18:14 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=rejected, x-first-death-queue=business.queue}
if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){
//拒绝 channel.basicNack(deliveryTag,false,false); }
消费者代码中拒绝消费,然后消息被路由到死信队列中。
讯享网{
spring_listener_return_correlation=995de3d2-d5a8-42fe-91e6-992fd485d20d, x-first-death-exchange=business.exchange, x-death=[{
reason=rejected, count=1, exchange=business.exchange, time=Wed Oct 11 16:18:14 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=rejected, x-first-death-queue=business.queue}
reason为rejected,并记录原交换机 exchange=business.exchange。
2、消息在队列的存活时间超过设置的生存时间(TTL)时间
将消费者中的MqConstant.BUSINESS_QUEUE去掉,后测试。
设置消息过期时间可以在队列中设置:增加参数x-message-ttl;
@Bean("businessQueue") public Queue businessQueue() {
Map<String, Object> args = new HashMap<>(16); // 设置当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // 设置当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY); // 设置消息的过期时间 单位:ms(毫秒) args.put("x-message-ttl", 5000); return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build(); }
也可以针对每个消息进行设置过期时间:
讯享网 @GetMapping("/deadLetterTimeOutSendDirectMessage") public String deadLetterTimeOutSendDirectMessage(){
String msgId = UUID.randomUUID().toString(); String msg = "demo msg ,dead letter!!"; String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss"); log.info("msg time :" +createTime); Map<String,Object> map = new HashMap(); map.put("msgId",msgId); map.put("msg",msg); map.put("createTime",createTime); Integer randomInt = RandomUtil.randomInt(30000); map.put("randomTime",randomInt); MessagePostProcessor messagePostProcessor = message -> {
// 设置消息过期时间为5秒 message.getMessageProperties().setExpiration("30000"); return message; }; rabbitTemplate.convertAndSend(MqConstant.BUSINESS_EXCHANGE,MqConstant.BUSINESS_ROUTING_KEY,map,messagePostProcessor); return "ok"; }

测试:

2023-10-11T16:28:49.052+08:00 INFO 25848 --- [nio-8014-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet' 2023-10-11T16:28:49.052+08:00 INFO 25848 --- [nio-8014-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet' 2023-10-11T16:28:49.053+08:00 INFO 25848 --- [nio-8014-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms 2023-10-11T16:28:49.099+08:00 INFO 25848 --- [nio-8014-exec-1] c.k.shushan.controller.mq.MqController : msg time :2023-10-11 16:28:49 2023-10-11T16:28:49.120+08:00 INFO 25848 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null 2023-10-11T16:28:49.121+08:00 INFO 25848 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :true 2023-10-11T16:28:49.121+08:00 INFO 25848 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :null 2023-10-11T16:29:19.123+08:00 INFO 25848 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : dead.letter.queue-ack Receiver :{
msg=demo msg ,dead letter!!, randomTime=22235, createTime=2023-10-11 16:28:49, msgId=4793dd5c-70c2-4c2f-a6ae-cdc6663e0b05} 2023-10-11T16:29:19.123+08:00 INFO 25848 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : header msg :{
spring_listener_return_correlation=22fdc49f-30da-4524-9fc8-d4f69eb4c2c3, x-first-death-exchange=business.exchange, x-death=[{
reason=expired, original-expiration=30000, count=1, exchange=business.exchange, time=Wed Oct 11 16:29:20 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=expired, x-first-death-queue=business.queue}

30秒过期,进入死信队列,然后被消费。
3、 消息队列的消息数量超过了设置的最大队列长度
修改RabbitMQ配置文件,创建businessDirectQueue时增加x-max-length设置容量的参数。
讯享网package com.kexuexiong.shushan.common.mq; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class DirectDLXRabbitConfig {
@Bean public Queue businessDirectQueue() {
HashMap<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",MqConstant.DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key",MqConstant.DEAD_LETTER_ROUTING_KEY); args.put("x-max-length", 3); return QueueBuilder.durable(MqConstant.BUSINESS_QUEUE).withArguments(args).build(); } @Bean DirectExchange businessDirectExchange() {
return new DirectExchange(MqConstant.BUSINESS_EXCHANGE, true, false); } @Bean DirectExchange deadLetterDirectExchange() {
return new DirectExchange(MqConstant.DEAD_LETTER_EXCHANGE, true, false); } @Bean public Queue deadLetterDirectQueue() {
return new Queue(MqConstant.DEAD_LETTER_QUEUE, true, false, false); } @Bean Binding deadLetterBingingDirect() {
return BindingBuilder.bind(deadLetterDirectQueue()).to(deadLetterDirectExchange()).with(MqConstant.DEAD_LETTER_ROUTING_KEY); } @Bean Binding businessBingingDirect() {
return BindingBuilder.bind(businessDirectQueue()).to(businessDirectExchange()).with(MqConstant.BUSINESS_ROUTING_KEY); } }

测试:


第三次请求的结果:

第四次请求结果:

控制台输出,死信队列接收到消息。
2023-10-11T16:46:54.783+08:00 INFO 20444 --- [nectionFactory2] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null 2023-10-11T16:46:54.783+08:00 INFO 20444 --- [nectionFactory2] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :true 2023-10-11T16:46:54.783+08:00 INFO 20444 --- [nectionFactory2] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :null 2023-10-11T16:46:54.787+08:00 INFO 20444 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : dead.letter.queue-ack Receiver :{
msg=demo msg ,dead letter!!, createTime=2023-10-11 16:44:35, msgId=db7d2d7f-12a0-4ca4-9e92-81758cda436e} 2023-10-11T16:46:54.787+08:00 INFO 20444 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver : header msg :{
spring_listener_return_correlation=e2a1871a-7765-43ee-b44c-d80d0ac6323c, x-first-death-exchange=business.exchange, x-death=[{
reason=maxlen, count=1, exchange=business.exchange, time=Wed Oct 11 16:46:56 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=maxlen, x-first-death-queue=business.queue}
到这里三种情况都介绍完了,总体来讲RabbitMQ的死信队列还是很简单的。但是它作用还是很强大的,可以用于实现延时消息,订单到时取消等。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/38874.html