2025年ReplyingKafkaTemplate的使用——Spring结合Kafka实现同步调用

ReplyingKafkaTemplate的使用——Spring结合Kafka实现同步调用参考 https docs spring io spring kafka reference html replying template 使用 Spring Request Reply 实现基于 Kafka 的同步请求响应 一 应用场景 使用消息转发可以实现业务解耦 这种转发基于 kafka 的同步请求方式 二 使用实践 1 版本要求

大家好,我是讯享网,很高兴认识大家。

一、应用场景

使用消息转发可以实现业务解耦,这种转发基于kafka的同步请求方式。

二、使用实践

1. 版本要求

需要支持ReplyingKafkaTemplate的spring版本,即 2.1.3 及以上版本,kafka版本无要求

2. 注意事项

暂不支持配置文件方式(yml\properties),需要java代码实现config

3. Producer
3.1 producer配置

核心点:

配置ReplyingKafkaTemplate,其repliesContainer中设置REPLY_TOPIC:“REPLY_ASYN_MESSAGE”

配置示例:

@Configuration @EnableKafka public class KafkaProducerConfig { 
    / * 同步的kafka需要ReplyingKafkaTemplate,指定repliesContainer * @param producerFactory * @param repliesContainer * @return */ @Bean public ReplyingKafkaTemplate<String, String, String> replyingTemplate( ProducerFactory<String, String> producerFactory, ConcurrentMessageListenerContainer<String, String> repliesContainer) { 
    ReplyingKafkaTemplate template = new ReplyingKafkaTemplate<>(producerFactory, repliesContainer); //同步相应超时时间:10s template.setReplyTimeout(10000); return template; } @Bean public ProducerFactory<String,String> producerFactory() { 
    return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { 
    Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.128.100.100:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, ); //发送一次message最大大小,默认是1M,这里设置为20M //props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, ); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } / * 指定consumer返回数据到指定的topic * @return */ @Bean public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) { 
    ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("REPLY_ASYN_MESSAGE"); repliesContainer.getContainerProperties().setGroupId("replies_message_group"); repliesContainer.setAutoStartup(false); return repliesContainer; } } 

讯享网
3.2 producer发送

核心点:

设置ProducerRecord,其包含发送topic,header中设置reply_topic


讯享网

发送消息代码:

讯享网@Component @Slf4j public class AsynchronousMessageProducer { 
    @Autowired private ReplyingKafkaTemplate replyingKafkaTemplate; / * 发送消息数据,同步返回结果 * @param paraMessageBO */ public String sendMessage(MessageBO paraMessageBO){ 
    String returnValue = null; String message = null; try { 
    message = new ObjectMapper().writeValueAsString(paraMessageBO); log.info("同步发送消息数据start:" + message); //发送topic ProducerRecord<String, String> record = new ProducerRecord<>("ASYN_MESSAGE", message); //回复topic record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "REPLY_ASYN_MESSAGE".getBytes())); RequestReplyFuture<String, String, String> replyFuture = replyingKafkaTemplate.sendAndReceive(record); SendResult<String, String> sendResult = replyFuture.getSendFuture().get(); log.info("Sent ok: " + sendResult.getRecordMetadata()); ConsumerRecord<String, String> consumerRecord = replyFuture.get(); returnValue = consumerRecord.value(); log.info("Return value: " + returnValue); log.info("同步发送消息数据end。"); }catch (Exception e){ 
    log.error("同步发送消息失败 MESSAGE:"+message,e.getMessage()); } return returnValue; } } 
4. Consumer
4.1 consumer配置

核心点:

containerFactory中设置kafkaTemplate

配置代码示例:

@Configuration @EnableKafka public class KafkaConsumerConfig { 
    @Autowired private KafkaTemplate kafkaTemplate; @Bean ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); //设置kafkaTemplate支持sendTo factory.setReplyTemplate(kafkaTemplate); return factory; } @Bean public Map<String, Object> consumerConfigs() { 
    Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.128.100.100:9092"); //默认的group_id props.put(ConsumerConfig.GROUP_ID_CONFIG, "message-group"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } } 
4.2 consumer消费

核心点:

指定containerFactory,否则无法收到此条message,并加入@SendTo,否则无法返回给producer

消费消息并返回:

讯享网@Component @Slf4j public class MessageConsumer { 
    @KafkaListener(topics = "ASYN_MESSAGE",containerFactory = "containerFactory") @SendTo public String consumerAsyn(String receiveMessage){ 
    return "I GOT IT!"; } } 

三、实践说明

1. Consumer测试说明
监听了containerFactory 未监听containerFactory
加了@SendTo 则会只选择其中一个consumer进行消费,且作同步返回 spring kafka会报错
未加@SendTo 则会只选择其中一个consumer进行消费,无法返回,producer会有超时的提醒 无法消费,producer会有超时提醒

所以要监听containerFactory同时加了@SendTo,并且containerFactory中要设置kafkaTemplate

2.遇到问题
  • 如若提示kafkaListenerContainerFactory无法注入(问题见:https://stackoverflow.com/questions//spring-kafka-consumerfactory-bean-not-found),需要new DefaultKafkaProducerFactory<>(producerConfigs())
  • 如若提示consumer确少group-id,需要在每个listener中指定groupId,eg;
    @KafkaListener(topics = “IMAGE_MESSAGE”,groupId = “image-message-group”)
3. 注意事项

kafka应用中,通常会有多个consumer监听同一个topic(无论是不同应用作为消费端,还是同一个应用集群部署造成的多consumer),但是在同步消息返回这块,为了consumer的同步处理,需要有一样的逻辑处理代码,来保证producer得到一致的结果

小讯
上一篇 2025-01-11 19:06
下一篇 2025-02-15 22:05

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/123852.html