一、应用场景
使用消息转发可以实现业务解耦,这种转发基于kafka的同步请求方式。
二、使用实践
1. 版本要求
需要支持ReplyingKafkaTemplate的spring版本,即 2.1.3 及以上版本,kafka版本无要求
2. 注意事项
暂不支持配置文件方式(yml\properties),需要java代码实现config
3. Producer
3.1 producer配置
核心点:
配置ReplyingKafkaTemplate,其repliesContainer中设置REPLY_TOPIC:“REPLY_ASYN_MESSAGE”
配置示例:
@Configuration @EnableKafka public class KafkaProducerConfig {
/ * 同步的kafka需要ReplyingKafkaTemplate,指定repliesContainer * @param producerFactory * @param repliesContainer * @return */ @Bean public ReplyingKafkaTemplate<String, String, String> replyingTemplate( ProducerFactory<String, String> producerFactory, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
ReplyingKafkaTemplate template = new ReplyingKafkaTemplate<>(producerFactory, repliesContainer); //同步相应超时时间:10s template.setReplyTimeout(10000); return template; } @Bean public ProducerFactory<String,String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.128.100.100:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, ); //发送一次message最大大小,默认是1M,这里设置为20M //props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, ); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } / * 指定consumer返回数据到指定的topic * @return */ @Bean public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("REPLY_ASYN_MESSAGE"); repliesContainer.getContainerProperties().setGroupId("replies_message_group"); repliesContainer.setAutoStartup(false); return repliesContainer; } }
讯享网
3.2 producer发送
核心点:
设置ProducerRecord,其包含发送topic,header中设置reply_topic
发送消息代码:
讯享网@Component @Slf4j public class AsynchronousMessageProducer {
@Autowired private ReplyingKafkaTemplate replyingKafkaTemplate; / * 发送消息数据,同步返回结果 * @param paraMessageBO */ public String sendMessage(MessageBO paraMessageBO){
String returnValue = null; String message = null; try {
message = new ObjectMapper().writeValueAsString(paraMessageBO); log.info("同步发送消息数据start:" + message); //发送topic ProducerRecord<String, String> record = new ProducerRecord<>("ASYN_MESSAGE", message); //回复topic record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "REPLY_ASYN_MESSAGE".getBytes())); RequestReplyFuture<String, String, String> replyFuture = replyingKafkaTemplate.sendAndReceive(record); SendResult<String, String> sendResult = replyFuture.getSendFuture().get(); log.info("Sent ok: " + sendResult.getRecordMetadata()); ConsumerRecord<String, String> consumerRecord = replyFuture.get(); returnValue = consumerRecord.value(); log.info("Return value: " + returnValue); log.info("同步发送消息数据end。"); }catch (Exception e){
log.error("同步发送消息失败 MESSAGE:"+message,e.getMessage()); } return returnValue; } }
4. Consumer
4.1 consumer配置
核心点:
containerFactory中设置kafkaTemplate
配置代码示例:
@Configuration @EnableKafka public class KafkaConsumerConfig {
@Autowired private KafkaTemplate kafkaTemplate; @Bean ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); //设置kafkaTemplate支持sendTo factory.setReplyTemplate(kafkaTemplate); return factory; } @Bean public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.128.100.100:9092"); //默认的group_id props.put(ConsumerConfig.GROUP_ID_CONFIG, "message-group"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } }
4.2 consumer消费
核心点:
指定containerFactory,否则无法收到此条message,并加入@SendTo,否则无法返回给producer
消费消息并返回:
讯享网@Component @Slf4j public class MessageConsumer {
@KafkaListener(topics = "ASYN_MESSAGE",containerFactory = "containerFactory") @SendTo public String consumerAsyn(String receiveMessage){
return "I GOT IT!"; } }
三、实践说明
1. Consumer测试说明
| 监听了containerFactory | 未监听containerFactory | |
|---|---|---|
| 加了@SendTo | 则会只选择其中一个consumer进行消费,且作同步返回 | spring kafka会报错 |
| 未加@SendTo | 则会只选择其中一个consumer进行消费,无法返回,producer会有超时的提醒 | 无法消费,producer会有超时提醒 |
所以要监听containerFactory同时加了@SendTo,并且containerFactory中要设置kafkaTemplate
2.遇到问题
- 如若提示kafkaListenerContainerFactory无法注入(问题见:https://stackoverflow.com/questions//spring-kafka-consumerfactory-bean-not-found),需要new DefaultKafkaProducerFactory<>(producerConfigs())
- 如若提示consumer确少group-id,需要在每个listener中指定groupId,eg;
@KafkaListener(topics = “IMAGE_MESSAGE”,groupId = “image-message-group”)
3. 注意事项
kafka应用中,通常会有多个consumer监听同一个topic(无论是不同应用作为消费端,还是同一个应用集群部署造成的多consumer),但是在同步消息返回这块,为了consumer的同步处理,需要有一样的逻辑处理代码,来保证producer得到一致的结果。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/123852.html