kafka保证数据有序性小结

kafka保证数据有序性小结最近 项目中使用过 kafka 但是不太理解 然后各种搜博客补习 然后对 kafka 如何保证数据的有序性很感兴趣 于是乎 又疯狂找博客学习 现在可以说是小有心得 在这里记录一下 怕忘记 也作为给大家的一个分享 本文内容为集多家之长 根据自己的理解就诞生了这篇内容 开始

大家好,我是讯享网,很高兴认识大家。

最近,项目中使用过kafka但是不太理解,然后各种搜博客补习。然后对kafka如何保证数据的有序性很感兴趣,于是乎,又疯狂找博客学习,现在可以说是小有心得,在这里记录一下,怕忘记。也作为给大家的一个分享。本文内容为集多家之长,根据自己的理解就诞生了这篇内容,开始。

 

自己在学习的过程中,看完博客结合自己理解的小结如下:

研究如何保障kafka消费的顺序性,宗旨就是通过将消息绑定到定向的分区或者队列来保证顺序性,通过增加分区或者线程来提升消费能力。

   1.要保证生产者发消息发的是顺序性的消息,这个好解决,发消息的时候指定一下key相同的key会发送到一个分区中,而分区时有序的在发消息的时候多个操作(下单,支付)保证顺序的话,保证这些操作在一个topic下的同一个分区,topic好指定,分区也好指定发送的时候指定key相同的key会去到同一个分区中,kafkaTemplate.send(topic,partition,key,data)有参数可以指定key。意思就是保证发消息的时候有序,发到同一个分区中,那么消费的时候也是有序的,因为分区可以理解为一个队列,消息先进先出。

2.单线程有序的话,发的时候有序你消费的时候直接消费也是有序的,但是这样当消息量很大的时候,单线程消费的肯定很慢,这个时候你只能扩展分区,扩展消费者(这个时候就是指定分区消费了),效率会高一些,但是高到一定程度的时候你还想扩展只能加机器了,做集群,这样成本就提升了

     3.第三种就是老板不让加机器,要从技术层面解决问题,那只能开多线程消费了,生产者还是按照key发消息不变,消费者拿到消息之后开个多线程进行消费,虽然是拿到消息的时候是有序的,但是线程有快慢,线程处理的速度不同,会导致有序性打乱了。这时候想办法解决这个问题,模仿kafka分区的方法,消费者拿到消息之后,消费之前把消息按照key的hash值取模,放到队列中,这个时候保证了队列中的消息有序

然后,让线程池消费队列中的消息即可。但是第三种没有跑通,还得思考。8.1号跑通了,搜了其他的文章,知道了是那个类实现了ApplicationRunner接口,重写public void run(ApplicationArguments args) {方法,在这个带参数的run方法中,开了两个线程去执行消费的任务,作用就是初始化内容的,实在applicaion的main方法执行完之后要立即执行这个ApplicationRunner接口的实现方法的run方法。在这个方法中可以初始化你想提前加载的信息。

理论知识结束,上代码:

实现顺序性原理:消息发送的时候保证有序,设置相同的key会把消息投递到同一个分区的topic中,再由一个消费者来消费该分区topic

topic: "topic_query_p3r1" 分配了三个partition分区


讯享网

producer 投递顺序消息

   同一组行为设置相同的key,会把这组数据投递到同一分区topic中。

/ * 投递顺序性消息,根据用户id做取模推送到不同分区的topic中 * 相同的key推送到同一分区中 * 第一个参数:topic * 第二个参数:key * 第三个参数:发送的消息内容 * 三个参数全部是sring类型 */ @RequestMapping("/kafka2") public String testKafka2() { for (int userId = 0; userId < 300; userId++) { kafkaTemplate.send("topic_query_p3r1", userId + "", "insert" + userId); kafkaTemplate.send("topic_query_p3r1", userId + "", "update" + userId); kafkaTemplate.send("topic_query_p3r1", userId + "", "delete" + userId); } return null; } 

讯享网

consumer 消费顺序消息

前提是生产者发消息的时候指定key了

方式1 - 直接进行消费

   因为投递的相同行为的消息是有序的,所以直接消费也不会有问题。

讯享网/ * 消费topic_query_p3r1主题,ConsumerGroupId1消费组 */ @KafkaListener(topics = "topic_query_p3r1", groupId = "ConsumerGroupId1") public void p3r2ConsumerGroupId0(ConsumerRecord<?, ?> consumer) throws InterruptedException { System.out.println("消费者A topic名称:" + consumer.topic() + ", key:" + consumer.key() + ", value:" + consumer.value() + ", 分区位置:" + consumer.partition() + ", 下标" + consumer.offset()+" "+Thread.currentThread().getId()); Thread.sleep(10); } 

方式2.1 - 一个消费者来指定具体分区进行消费

   指定具体分区来进行消费。

 / * 消费者,解决消息顺序性 * 注解参数:partitions=0表示:只消费该主题中0分区的数据。 */ @KafkaListener(topicPartitions = {@TopicPartition(topic = "topic_query_p3r1", partitions = {"0"})}, groupId = "ConsumerGroupId1") public void receive(ConsumerRecord<?, ?> consumer) { System.out.println("消费者C topic名称:" + consumer.topic() + ",key:" + consumer.key() + "," + ",value:" + consumer.value() + "," + "分区位置:" + consumer.partition() + ", 下标" + consumer.offset()); } 

方式2.2 - 多个消费者来指定不同分区进行消费。

   写多个消费者方法来分别指向不同分区,提高消费速度,但是此方法不灵活。

讯享网/ * 消费0分区的topic_query_p3r1主题消费者,ConsumerGroupId1消费组 */ @KafkaListener(topicPartitions = {@TopicPartition(topic = "topic_query_p3r1", partitions = {"0"})}, groupId = "ConsumerGroupId1") public void p3r2ConsumerGroupId0(ConsumerRecord<?, ?> consumer) throws InterruptedException { System.out.println("消费者A topic名称:" + consumer.topic() + ", key:" + consumer.key() + ", value:" + consumer.value() + ", 分区位置:" + consumer.partition() + ", 下标" + consumer.offset()+" "+Thread.currentThread().getId()); Thread.sleep(10); } / * 消费1分区的topic_query_p3r1主题消费者,ConsumerGroupId1消费组 */ @KafkaListener(topicPartitions = {@TopicPartition(topic = "topic_query_p3r1", partitions = {"1"})}, groupId = "ConsumerGroupId1") public void p3r2ConsumerGroupId1(ConsumerRecord<?, ?> consumer) throws InterruptedException { System.out.println("消费者A topic名称:" + consumer.topic() + ", key:" + consumer.key() + ", value:" + consumer.value() + ", 分区位置:" + consumer.partition() + ", 下标" + consumer.offset()+" "+Thread.currentThread().getId()); Thread.sleep(10); } / * 消费2分区的topic_query_p3r1主题消费者,ConsumerGroupId1消费组 */ @KafkaListener(topicPartitions = {@TopicPartition(topic = "topic_query_p3r1", partitions = {"2"})}, groupId = "ConsumerGroupId1") public void p3r2ConsumerGroupId2(ConsumerRecord<?, ?> consumer) throws InterruptedException { System.out.println("消费者A topic名称:" + consumer.topic() + ", key:" + consumer.key() + ", value:" + consumer.value() + ", 分区位置:" + consumer.partition() + ", 下标" + consumer.offset()+" "+Thread.currentThread().getId()); Thread.sleep(10); } 

多线程顺序消费

可以看出图都是拿别人的,但是这张图很棒,描述的很清楚

 这段代码把我坑死了,原文是没有implements ApplicationRunner {这段代码的直接开始内容,我弄到我本地根本跑不起来,还报错,最后多方查,看到重写的

run(ApplicationArguments args) { 这个方法带参数,才反推出来时实现了ApplicationRunner 接口,不过兄弟们,这个雷我踩过了,放心食用吧。
讯享网@RestController @Slf4j public class ShunXuConsumerMoreThread implements ApplicationRunner { @Autowired private KafkaTemplate<String, String> kafkaTemplate; // 使用两个内存队列 final int queueLingth = 2; // 创建两个内存队列 Queue<Map> queueA = new ConcurrentLinkedQueue<>(); Queue<Map> queueB = new ConcurrentLinkedQueue<>(); / * 投递顺序性消息,根据用户id做取模推送到不同分区的topic中 * 相同的key推送到相同的分区中 */ @RequestMapping("/kafka2") public String testKafka2() { for (int userId = 0; userId < 300; userId++) { kafkaTemplate.send("topic_query_2", userId + "", "insert" + userId); kafkaTemplate.send("topic_query_2", userId + "", "update" + userId); kafkaTemplate.send("topic_query_2", userId + "", "delete" + userId); } return null; } / * 主题消费者-把相同行为的数据放到同一内存队列中 */ @KafkaListener(topics = "topic_query_2", groupId = "ConsumerGroupId1") public void p3r2ConsumerGroupId0(ConsumerRecord<?, ?> consumer){ // 1.封装消息参数 Map param = new HashMap(); param.put("topic", consumer.topic()); param.put("key", consumer.key()); param.put("value", consumer.value()); param.put("p", consumer.partition()); // 2.把相同行为(key)数据添加到同一内存队列中 int queueHash = consumer.key().hashCode() % queueLingth; if (queueHash == 0) { queueA.add(param); } if (queueHash == 1) { queueB.add(param); } } // 开启两个线程消费内存队列中的消息 ApplicationRunner接口常用于项目启动后,(也就是ApringApplication.run()执行结束),立马执行某些逻辑。 //这里是立即启动两个线程 @Override public void run(ApplicationArguments args) { new Thread() { @Override public void run() { while (true) { if (queueA.size() > 0) { Map poll = queueA.poll(); //业务逻辑 System.out.println("Thrend-Id: " + Thread.currentThread().getId() + " topic:" + poll.get("topic") + " key:" + poll.get("key") + " value:" + poll.get("value") + " partition:" + poll.get("p")); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } } }.start(); new Thread() { @Override public void run() { while (true) { if (queueB.size() > 0) { Map poll = queueB.poll(); //业务逻辑 System.out.println("Thrend-Id: " + Thread.currentThread().getId() + " topic:" + poll.get("topic") + " key:" + poll.get("key") + " value:" + poll.get("value") + " partition:" + poll.get("p")); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } } }.start(); } } 

打印:insertupdatedelete都是有序的。相同行为都在同一线程下执行。

消费是按照顺序的,正常!!!

参考博客:kafka顺序性投递,顺序性消费代码_祁_z的博客-CSDN博客_kafka顺序消费代码 

小讯
上一篇 2025-02-23 20:54
下一篇 2025-03-15 11:47

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/116316.html