RocketMQ事务消息详解[源码]

RocketMQ事务消息详解[源码]package com example rocketmq listener import com example rocketmq dto OrderDTO import com example rocketmq entity Order import com example rocketmq entity TransactionL import com example

大家好,我是讯享网,很高兴认识大家。这里提供最前沿的Ai技术和互联网信息。

package com.example.rocketmq.listener;

import com.example.rocketmq.dto.OrderDTO; import com.example.rocketmq.entity.Order; import com.example.rocketmq.entity.TransactionLog; import com.example.rocketmq.repository.OrderRepository; import com.example.rocketmq.repository.TransactionLogRepository; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional;

import java.util.Optional;

/

  • 订单事务监听器 - 执行本地事务和回查 */ @Slf4j @RocketMQTransactionListener @Component public class OrderTransactionListener implements RocketMQLocalTransactionListener , orderId={}", transactionId, dto.getOrderId());
    try ", transactionId); // 4. 返回提交 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("本地事务失败,transactionId={}", transactionId, e); // 更新事务日志状态为失败 Optional 
        
          
          
            existingLog = logRepository.findByTransactionId(transactionId); if (existingLog.isPresent()) return RocketMQLocalTransactionState.ROLLBACK; } 
          

    }

    /

    • 回查事务状态
    • RocketMQ 会定时调用此方法检查未决事务
    • @param msg MQ 消息
    • @return 事务状态 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) ", transactionId);

      // 查询事务日志 Optional logOpt = logRepository.findByTransactionId(transactionId);

      if (!logOpt.isPresent()) {

      // 没有日志,说明事务未执行,回滚 log.warn("未找到事务日志,返回 ROLLBACK,transactionId={}", transactionId); return RocketMQLocalTransactionState.ROLLBACK; 

      }

      TransactionLog log = logOpt.get();

      // 根据日志状态返回 switch (log.getStatus()) {

      case 1: // 成功 log.info("事务状态:COMMIT,transactionId={}", transactionId); return RocketMQLocalTransactionState.COMMIT; case 2: // 失败 log.info("事务状态:ROLLBACK,transactionId={}", transactionId); return RocketMQLocalTransactionState.ROLLBACK; default: // 执行中,等待下次回查 log.info("事务状态:UNKNOWN,等待下次回查,transactionId={}", transactionId); return RocketMQLocalTransactionState.UNKNOWN; 

      } } }

小讯
上一篇 2026-04-15 08:40
下一篇 2026-04-15 08:38

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/258122.html