前言
offset顾名思义,即偏移量,我们知道消息从生产者发送到kafka的topic之后,是进入到不同的分区,在consumer未对消息进行消费之前,消息是有序存储在各个分区中;
offset内部原理
在之前我们了解了kafka的消费者原理之后,提出这样一个疑问,kafka怎么知道某个消费组中的消费者消费消息的进度呢?
1、从0.9版本开始,consumer默认将offset保存在Kafka ,一个内置的topic中,该topic为__consumer_offsets;
2、 Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中;
这也就是说,kafka是通过 offset这个值来管理消费组消费进度的,下面是一张关于kafka的offset的原理图;
关于offset做下面几点补充:
- __consumer_offsets 主题里面采用 key 和 value 的方式存储数据;
- key 是 group.id+topic+ 分区号,value 就是当前 offset 的值;
- 每隔一段时间,kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据;
默认情况下,保存offset数据的系统主题是看不到的,为了查看该系统主题数据,要将下面这个参数修改为false
exclude.internal.topics=false【在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false】
自动提交 offset
为了使我们能够专注于自己的业务逻辑, Kafka 提供了自动提交 offset的功能,自动提交 offset 的相关参数:
1、enable.auto.commit : 是否开启自动提交 offset 功能,默认是 true;
默认值为 true ,消费者会自动周期性地向服务器提交偏移量
2、auto.commit.interval.ms : 自动提交 offset 的时间间隔,默认是 5s;
如果设置了 enable.auto.commit 的值为 true , 则该值定义了消 费者偏移量向 Kafka 提交的频率, 默认 5s

代码展示
producer 端代码
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class OffsetProducer1 { public static void main(String[] args) throws Exception { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); System.out.println("开始发送数据"); // 4. 调用 send 方法,发送消息 for (int i = 0; i < 15; i++) { kafkaProducer.send(new ProducerRecord<>("zcy234","congge " + i)); } // 5. 关闭资源 kafkaProducer.close(); } }
讯享网
consumer 端代码
讯享网import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Properties; public class OffsetConsumer1 { public static void main(String[] args) { // 1. 创建 kafka 消费者配置类 Properties properties = new Properties(); // 2. 添加配置参数 // 添加连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092"); // 配置序列化 必须 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group2"); // 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 修改 提交 offset 的时间周期 1000ms,默认情况下为 5s properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); //3. 创建 kafka 消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //4. 设置消费主题 形参是列表 consumer.subscribe(Arrays.asList("zcy234")); System.out.println("准备开始消费数据"); //5. 消费数据 while (true){ // 读取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); // 输出消息 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.value()); } } } }
核心的代码即添加下面这两行配置
运行上面的程序,效果上面和之前差不多,


手动提交 offset
虽然自动提交 offset 十分简单便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因 此Kafka 还提供了手动提交 offset 的 API,关于手动提交offset,做如下几点说明:
- 手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交);
- 两者的相 同点是,都会将本次提交的一批数据最高的偏移量提交;
- 不同点是,同步提交阻塞当前线程,一直到提交成 功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故 有可能提交失败;
commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据;
commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了;


同步提交 offset
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提
交的效率比较低。
下面看同步提交offset的consumer的完整代码:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class SyncConsumer1 { public static void main(String[] args) { // 1. 创建 kafka 消费者配置类 Properties properties = new Properties(); // 2. 添加配置参数 // 添加连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092"); // 配置序列化 必须 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group3"); // 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 修改 提交 offset 的时间周期 1000ms,默认情况下为 5s properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); //3. 创建 kafka 消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //3. 创建 kafka 消费者 //4. 设置消费主题 形参是列表 consumer.subscribe(Arrays.asList("zcy234")); System.out.println("准备开始消费数据"); //5. 消费数据 while (true){ // 读取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); // 输出消息 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.value()); } // 同步提交 offset consumer.commitSync(); } } }
仍然使用上面的producer向zcy234这个topic发送几条消息,观察消费端控制台输出情况,仍然可以正常消费到消息;

异步提交 offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此 吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
下面是完整的代码
讯享网import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class AsyncConsumer1 { public static void main(String[] args) { // 1. 创建 kafka 消费者配置类 Properties properties = new Properties(); // 2. 添加配置参数 // 添加连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092"); // 配置序列化 必须 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group5"); // 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //3. 创建 Kafka 消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //4. 设置消费主题 形参是列表 consumer.subscribe(Arrays.asList("zcy234")); //5. 消费数据 while (true) { // 读取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); // 输出消息 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.value()); } // 异步提交 offset consumer.commitAsync(); } } }
仍然使用上面的producer向zcy234这个topic发送几条消息,观察消费端控制台输出情况,仍然可以正常消费到消息;

指定 Offset 消费
kafka中消费者在消费数据时的offset的机制有3种,默认情况下为latest,即从最近的那一次的位置开始消费;
auto.offset.reset = earliest | latest | none 默认是 latest
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
时(例如该数据已被删除),该怎么办?
1、 earliest :自动将偏移量重置为最早的偏移量, --from-beginning;
2、latest (默认值) :自动将偏移量重置为最新偏移量;
3、none :如果未找到消费者组的先前偏移量,则向消费者抛出异常;

于是在实际业务中可能会遇到这么一种场景,即新的消费者并不想消费最早的那一批消息,而是指定从某个offset位置开始消费;
下面看具体的consumer端代码:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.*; public class SpecialOffsetConsumer1 { public static void main(String[] args) { // 0 配置信息 Properties properties = new Properties(); // 连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092"); // key value 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group6"); // 1 创建一个消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 订阅一个主题 ArrayList<String> topics = new ArrayList<>(); topics.add("zcy234"); kafkaConsumer.subscribe(topics); Set<TopicPartition> assignment = new HashSet<>(); // 获取消费者分区分配信息(有了分区分配信息才能开始消费),避免开始消费的时候分区信息还未就绪 while (assignment.size() == 0) { kafkaConsumer.poll(Duration.ofSeconds(1)); assignment = kafkaConsumer.assignment(); } // 遍历所有分区,并指定 offset 从 5 的位置开始消费 for (TopicPartition tp : assignment) { kafkaConsumer.seek(tp, 5); } System.out.println("准备开始消费数据"); // 3 消费该主题数据 while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }
运行这段代码,然后再次使用上面的producer发送消息,观察控制台输出效果,可以看到,数据消费的offset的位置从5开始

指定时间消费
需求:在生产环境中,比如说遇到最近消费的某一段时间的数据有异常,想重新按照时间消费?或者要求按照时间消费前一天的数据,怎么处理?
下面看具体的代码处理
讯享网import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.*; public class SpecialTimeConsumer1 { public static void main(String[] args) { // 0 配置信息 Properties properties = new Properties(); // 连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "101.34.23.80:9092"); // key value 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group7"); // 创建一个消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 订阅主题 ArrayList<String> topics = new ArrayList<>(); topics.add("zcy234"); kafkaConsumer.subscribe(topics); Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0) { kafkaConsumer.poll(Duration.ofSeconds(1)); // 获取消费者分区分配信息(有了分区分配信息才能开始消费) assignment = kafkaConsumer.assignment(); } HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>(); // 封装集合存储,每个分区对应一天前的数据 for (TopicPartition topicPartition : assignment) { //用当前时间减去业务上需要回退的时间,比如这里想重新消费24个小时之前的数据 timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000); } // 获取从 1 天前开始消费的每个分区的 offset Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch); // 遍历每个分区,对每个分区设置消费时间。 for (TopicPartition topicPartition : assignment) { OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition); // 根据时间指定开始消费的位置 if (offsetAndTimestamp != null) { kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset()); } } } }

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/39635.html