package com.example.rocketmq.listener;
import com.example.rocketmq.dto.OrderDTO; import com.example.rocketmq.entity.Order; import com.example.rocketmq.entity.TransactionLog; import com.example.rocketmq.repository.OrderRepository; import com.example.rocketmq.repository.TransactionLogRepository; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional;
import java.util.Optional;
/
- 订单事务监听器 - 执行本地事务和回查 */ @Slf4j @RocketMQTransactionListener @Component public class OrderTransactionListener implements RocketMQLocalTransactionListener , orderId={}", transactionId, dto.getOrderId());
try ", transactionId); // 4. 返回提交 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("本地事务失败,transactionId={}", transactionId, e); // 更新事务日志状态为失败 OptionalexistingLog = logRepository.findByTransactionId(transactionId); if (existingLog.isPresent()) return RocketMQLocalTransactionState.ROLLBACK; } }
/
- 回查事务状态
- RocketMQ 会定时调用此方法检查未决事务
- @param msg MQ 消息
- @return 事务状态 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) ", transactionId);
// 查询事务日志 Optional
logOpt = logRepository.findByTransactionId(transactionId); if (!logOpt.isPresent()) {
// 没有日志,说明事务未执行,回滚 log.warn("未找到事务日志,返回 ROLLBACK,transactionId={}", transactionId); return RocketMQLocalTransactionState.ROLLBACK;}
TransactionLog log = logOpt.get();
// 根据日志状态返回 switch (log.getStatus()) {
case 1: // 成功 log.info("事务状态:COMMIT,transactionId={}", transactionId); return RocketMQLocalTransactionState.COMMIT; case 2: // 失败 log.info("事务状态:ROLLBACK,transactionId={}", transactionId); return RocketMQLocalTransactionState.ROLLBACK; default: // 执行中,等待下次回查 log.info("事务状态:UNKNOWN,等待下次回查,transactionId={}", transactionId); return RocketMQLocalTransactionState.UNKNOWN;} } }
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/258122.html