关于Kafka常见操作
1.查看zookeeper保存的消息主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --list
讯享网
2.生产者向broker生产消息
讯享网kafka-console-producer.sh --broker-list 172.16.131.130:9000 --topic topic_1 (其中172.16.131.130:9000是因为我在config/server.properties下配置的listeners=PLAINTEXT://172.16.131.130:9000)
3.消费者消费broker中的消息
kafka-console-consumer.sh --bootstrap-server 172.16.131.130:9000 --topic topic_1 --from-beginning (其中172.16.131.130:9000是因为我在config/server.properties下配置的listeners=PLAINTEXT://172.16.131.130:9000)
4.查看某个主题的描述信息
讯享网[root@Linux122 bin]# kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic tp_user_01 Topic:tp_user_01 PartitionCount:3 ReplicationFactor:1 Configs: Topic: tp_user_01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: tp_user_01 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: tp_user_01 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
5.在zookeeper中查看创建的主题等信息
查看创建主题保存在zookeeper中主题信息
[zk: localhost:2181(CONNECTED) 6] ls /myKafka/config/topics/tp_user_01 [] [zk: localhost:2181(CONNECTED) 7] get /myKafka/config/topics/tp_user_01 {
"version":1,"config":{
}} cZxid = 0x236 ctime = Sat Nov 20 21:38:30 CST 2021 mZxid = 0x236 mtime = Sat Nov 20 21:38:30 CST 2021 pZxid = 0x236 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 25 numChildren = 0
6.创建带有配置信息的主题
讯享网[root@Linux122 topic_1-0]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic tp_test02 --partitions 2 --replication-factor 1 --config cleanup.policy=compact WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Created topic "tp_test02".
同时我们在zookeeper下对应主题数据中看到如下信息
[zk: localhost:2181(CONNECTED) 11] get /myKafka/config/topics/tp_test02 {
"version":1,"config":{
"cleanup.policy":"compact"}} cZxid = 0x2e1 ctime = Tue Nov 23 22:51:43 CST 2021 mZxid = 0x2e1 mtime = Tue Nov 23 22:51:43 CST 2021 pZxid = 0x2e1 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 51 numChildren = 0
创建带有多个配置信息的主题
讯享网[root@Linux122 kafka-logs]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic tp_test03 --partitions 5 --replication-factor 1 --config compression.type=gzip --config max.message.bytes=512 WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Created topic "tp_test03". [zk: localhost:2181(CONNECTED) 12] get /myKafka/config/topics/tp_test03 {
"version":1,"config":{
"max.message.bytes":"512","compression.type":"gzip"}} cZxid = 0x2ef ctime = Tue Nov 23 23:00:15 CST 2021 mZxid = 0x2ef mtime = Tue Nov 23 23:00:15 CST 2021 pZxid = 0x2ef cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 76 numChildren = 0
7.查看覆盖过默认配置(设置)的主题
[root@Linux122 kafka-logs]# kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topics-with-overrides Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=,cleanup.policy=compact,compression.type=producer Topic:tp_test02 PartitionCount:2 ReplicationFactor:1 Configs:cleanup.policy=compact Topic:tp_test03 PartitionCount:5 ReplicationFactor:1 Configs:compression.type=gzip,max.message.bytes=512
8.对已经设置主题配置的主题再次增加配置
讯享网[root@Linux122 kafka-logs]# kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic tp_test02 --config segment.bytes= WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality Updated config for topic "tp_test02". [zk: localhost:2181(CONNECTED) 13] get /myKafka/config/topics/tp_test02 {
"version":1,"config":{
"segment.bytes":"","cleanup.policy":"compact"}} cZxid = 0x2e1 ctime = Tue Nov 23 22:51:43 CST 2021 mZxid = 0x306 mtime = Tue Nov 23 23:12:53 CST 2021 pZxid = 0x2e1 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 79 numChildren = 0
9.对已经设置的配置进行删除
[root@Linux122 kafka-logs]# kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic tp_test03 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality Updated config for topic "tp_test03". [root@Linux122 kafka-logs]# 我们看到已经对于tp_test03不存在max.message.bytes配置 [root@Linux122 kafka-logs]# kafka-topics.sh --zookeeper localhost:2181/myKafka describe --topics-with-overrides Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=,cleanup.policy=compact,compression.type=producer Topic:tp_test02 PartitionCount:2 ReplicationFactor:1 Configs:segment.bytes=,cleanup.policy=compact Topic:tp_test03 PartitionCount:5 ReplicationFactor:1 Configs:compression.type=gzip
10.删除主题(在kafka 1.0.0之前是不能删除的,顶多标记为删除)后续版本是先标记删除再真正删除
讯享网[root@Linux122 kafka-logs]# kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic tp_test03 Topic tp_test03 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. [root@Linux122 kafka-logs]# ls cleaner-offset-checkpoint __consumer_offsets-45 __consumer_offsets-0 __consumer_offsets-46 __consumer_offsets-1 __consumer_offsets-47 __consumer_offsets-10 __consumer_offsets-48 __consumer_offsets-11 __consumer_offsets-49 __consumer_offsets-12 __consumer_offsets-5 __consumer_offsets-13 __consumer_offsets-6 __consumer_offsets-14 __consumer_offsets-7 __consumer_offsets-15 __consumer_offsets-8 __consumer_offsets-16 __consumer_offsets-9 __consumer_offsets-17 log-start-offset-checkpoint __consumer_offsets-18 meta.properties __consumer_offsets-19 nptc-01-0 __consumer_offsets-2 nptc-02-0 __consumer_offsets-20 nptc-02-1 __consumer_offsets-21 nptc-02-2 __consumer_offsets-22 nptc-02-3 __consumer_offsets-23 nptc-02-4 __consumer_offsets-24 recovery-point-offset-checkpoint __consumer_offsets-25 replication-offset-checkpoint __consumer_offsets-26 topic_1-0 __consumer_offsets-27 topic_2-0 __consumer_offsets-28 topic_2-1 __consumer_offsets-29 topic_2-2 __consumer_offsets-3 topic_2-3 __consumer_offsets-30 topic_2-4 __consumer_offsets-31 topic-spring-01-0 __consumer_offsets-32 tp_demo_01-0 __consumer_offsets-33 tp_test02-0 __consumer_offsets-34 tp_test02-1 __consumer_offsets-35 tp_test03-0.a1a10bd94eaf46c58f2b60f5068f2fd9-delete __consumer_offsets-36 tp_test03-1.2a6a4a2749b94789ba32718ac80e050e-delete __consumer_offsets-37 tp_test03-2.69301aa491eea24c1400d7b6-delete __consumer_offsets-38 tp_test03-3.53d1a7e0e8ac4079b40688dd72-delete __consumer_offsets-39 tp_test03-4.a57daa78676d409da4b1ba9d705e98fd-delete __consumer_offsets-4 tp_user_01-0 __consumer_offsets-40 tp_user_01-1 __consumer_offsets-41 tp_user_01-2 __consumer_offsets-42 tp_user_02-0 __consumer_offsets-43 tp_user_03-0 __consumer_offsets-44 [root@Linux122 kafka-logs]# ls cleaner-offset-checkpoint __consumer_offsets-42 __consumer_offsets-0 __consumer_offsets-43 __consumer_offsets-1 __consumer_offsets-44 __consumer_offsets-10 __consumer_offsets-45 __consumer_offsets-11 __consumer_offsets-46 __consumer_offsets-12 __consumer_offsets-47 __consumer_offsets-13 __consumer_offsets-48 __consumer_offsets-14 __consumer_offsets-49 __consumer_offsets-15 __consumer_offsets-5 __consumer_offsets-16 __consumer_offsets-6 __consumer_offsets-17 __consumer_offsets-7 __consumer_offsets-18 __consumer_offsets-8 __consumer_offsets-19 __consumer_offsets-9 __consumer_offsets-2 log-start-offset-checkpoint __consumer_offsets-20 meta.properties __consumer_offsets-21 nptc-01-0 __consumer_offsets-22 nptc-02-0 __consumer_offsets-23 nptc-02-1 __consumer_offsets-24 nptc-02-2 __consumer_offsets-25 nptc-02-3 __consumer_offsets-26 nptc-02-4 __consumer_offsets-27 recovery-point-offset-checkpoint __consumer_offsets-28 replication-offset-checkpoint __consumer_offsets-29 topic_1-0 __consumer_offsets-3 topic_2-0 __consumer_offsets-30 topic_2-1 __consumer_offsets-31 topic_2-2 __consumer_offsets-32 topic_2-3 __consumer_offsets-33 topic_2-4 __consumer_offsets-34 topic-spring-01-0 __consumer_offsets-35 tp_demo_01-0 __consumer_offsets-36 tp_test02-0 __consumer_offsets-37 tp_test02-1 __consumer_offsets-38 tp_user_01-0 __consumer_offsets-39 tp_user_01-1 __consumer_offsets-4 tp_user_01-2 __consumer_offsets-40 tp_user_02-0 __consumer_offsets-41 tp_user_03-0 [root@Linux122 kafka-logs]#
我们同时也会看到在zookeeper中也会删除对应topic节点信息
[zk: localhost:2181(CONNECTED) 17] get /myKafka/config/topics/tp_test03 Node does not exist: /myKafka/config/topics/tp_test03 [zk: localhost:2181(CONNECTED) 18]
11.修改已经创建主题的分区数
讯享网[root@Linux122 kafka-logs]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --alter --topic tp_test03 --partitions 3 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! [root@Linux122 kafka-logs]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --describe --topic tp_test03 Topic:tp_test03 PartitionCount:3 ReplicationFactor:1 Configs: Topic: tp_test03 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: tp_test03 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: tp_test03 Partition: 2 Leader: 0 Replicas: 0 Isr: 0 [root@Linux122 kafka-logs]
如果我们修改比之前分区少的分区数
[root@Linux122 kafka-logs]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --alter --topic tp_test03 --partitions 2 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Error while executing topic command : The number of partitions for a topic can only be increased. Topic tp_test03 currently has 3 partitions, 2 would not be an increase. [2021-11-23 23:47:20,218] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic tp_test03 currently has 3 partitions, 2 would not be an increase. (kafka.admin.TopicCommand$)
提示:The number of partitions for a topic can only be increased.
12.创建主题
讯享网kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic tp_test03 --partitions 1 --replication-factor 1
13.关于消费者偏移量的操作
1.通过kafka broker查看当前存在的消费组
[root@Linux122 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server Linux122:9092 --list Note: This will not show information about old Zookeeper-based consumers. consumer1
以下通过消费组group来查看当前消费的信息
2.查看group消费组的信息(包括偏移量,积压消息等)
讯享网[root@Linux122 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server Linux122:9092 --describe --group group Note: This will not show information about old Zookeeper-based consumers. Consumer group 'group' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID tp_demo_02 1 0 0 0 - - - tp_demo_02 0 0 0 0 - - - tp_demo_02 2 0 0 0 - - - [root@Linux122 kafka-logs]# [root@Linux122 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server Linux122:9092 --describe --group group Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID tp_demo_02 0 1291 1302 11 consumer-group-1-45a37552-e0a3-4cc1-beee-88f7a0540a6b/172.16.131.1 consumer-group-1 tp_demo_02 1 1265 1272 7 consumer-group-1-45a37552-e0a3-4cc1-beee-88f7a0540a6b/172.16.131.1 consumer-group-1 tp_demo_02 2 1288 1299 11 consumer-group-1-45a37552-e0a3-4cc1-beee-88f7a0540a6b/172.16.131.1 consumer-group-1 [root@Linux122 kafka-logs]#
3.将指定主题的消费组偏移量重置初始位置
[root@Linux122 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server Linux122:9092 --reset-offsets --group group -topic tp_demo_02 --to-earliest Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION NEW-OFFSET tp_demo_02 2 0 tp_demo_02 1 0 tp_demo_02 0 0
注意此时并没有真正执行偏移量的重置(类似执行计划查看下最终效果)
真正执行加参数–execute
讯享网[root@Linux122kafka-logs]# kafka-consumer-groups.sh --bootstrap-server Linux122:9092 --reset-offsets --group group -topic tp_demo_02 --to-earliest --execute Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION NEW-OFFSET tp_demo_02 2 0 tp_demo_02 1 0 tp_demo_02 0 0 [root@Linux122 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server Linux122:-describe --group group Note: This will not show information about old Zookeeper-based consumers. Consumer group 'group' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID tp_demo_02 1 0 1904 1904 - - - tp_demo_02 0 0 1958 1958 - - - tp_demo_02 2 0 1977 1977 - - -
4.将指定主题的消费组在主题某个分区下偏移量前后移动几个偏移量
[root@Linux122 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server Linux122:9092 --reset-offsets -group group --topic tp_demo_02:0 --shift-by -10 --execute Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION NEW-OFFSET tp_demo_02 0 1948 [root@Linux122 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server Linux122:9092 --describe --group group Note: This will not show information about old Zookeeper-based consumers. Consumer group 'group' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID tp_demo_02 1 1904 1904 0 - - - tp_demo_02 0 1948 1958 10 - - - tp_demo_02 2 1977 1977 0 - - -
5.将指定主题的消费组偏移量移动到最新位置
讯享网[root@Linux122 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server Linux122:9092 --reset-offsets --group group --topic tp_demo_02 --to-latest --execute Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION NEW-OFFSET tp_demo_02 2 1977 tp_demo_02 1 1904 tp_demo_02 0 1958 [root@Linux122 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server Linux122:9092 --describe --group group Note: This will not show information about old Zookeeper-based consumers. Consumer group 'group' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID tp_demo_02 1 1904 1904 0 - - - tp_demo_02 0 1958 1958 0 - - - tp_demo_02 2 1977 1977 0 - - -
6.同时将指定主题多个分区指定的消费组偏移量移动几个位置
[root@Linux122 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server Linux122:9092 --reset-offsets --group group --topic tp_demo_02:0,2 --shift-by -100 --execute Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION NEW-OFFSET tp_demo_02 0 1858 tp_demo_02 2 1877 [root@Linux122 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server Linux122:9092 --describe --group group Note: This will not show information about old Zookeeper-based consumers. Consumer group 'group' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID tp_demo_02 1 1904 1904 0 - - - tp_demo_02 0 1858 1958 100 - - - tp_demo_02 2 1877 1977 100 - - -
14.如何将某个主题的分区移动到新加入集群的broker上
讯享网首先创建一个json文件说明哪些topic需要重新分区,内容如下 (json文件名没有实际意义,不带json后缀都行,一般为了方便带后缀,Linux系统只是识别内容) vim topics-to-move.json {
"topics":[ {
"topic":"tp_demo_02" } ], "version":1 } [root@Linux122 ~]# kafka-reassign-partitions.sh --zookeeper Linux122:2181/myKafka --topics-to-move-json-file topics-to-move.json --broker-list "0,1" --generate Current partition replica assignment {
"version":1,"partitions":[{
"topic":"tp_demo_02","partition":1,"replicas":[0],"log_dirs":["any"]},{
"topic":"tp_demo_02","partition":0,"replicas":[0],"log_dirs":["any"]},{
"topic":"tp_demo_02","partition":2,"replicas":[0],"log_dirs":["any"]}]} Proposed partition reassignment configuration {
"version":1,"partitions":[{
"topic":"tp_demo_02","partition":1,"replicas":[1],"log_dirs":["any"]},{
"topic":"tp_demo_02","partition":0,"replicas":[0],"log_dirs":["any"]},{
"topic":"tp_demo_02","partition":2,"replicas":[0],"log_dirs":["any"]}]}
我们目前tp_demo_02主题分区情况
[root@Linux122 ~]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --describe --topic tp_demo_02 Topic:tp_demo_02 PartitionCount:3 ReplicationFactor:1 Configs: Topic: tp_demo_02 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: tp_demo_02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: tp_demo_02 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
将我们通过工具生成的建议分区放置位置json放入创建文件中
讯享网[root@Linux122 ~]# kafka-reassign-partitions.sh --zookeeper Linux122:2181/myKafka --reassignment-json-file topics-to-move-execute.json --execute Current partition replica assignment {
"version":1,"partitions":[{
"topic":"tp_demo_02","partition":1,"replicas":[0],"log_dirs":["any"]},{
"topic":"tp_demo_02","partition":0,"replicas":[0],"log_dirs":["any"]},{
"topic":"tp_demo_02","partition":2,"replicas":[0],"log_dirs":["any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions.
此刻分区情况如下,达到重新分配分区到新broker
[root@Linux122 ~]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --describe --topic tp_demo_02 Topic:tp_demo_02 PartitionCount:3 ReplicationFactor:1 Configs: Topic: tp_demo_02 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: tp_demo_02 Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: tp_demo_02 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
讯享网[root@Linux122 ~]# cat my-topics-to-move-execute.json {
"version":1,"partitions":[{
"topic":"tp_demo_02","partition":1,"replicas":[0],"log_dirs":["any"]},{
"topic":"tp_demo_02","partition":0,"replicas":[1],"log_dirs":["any"]},{
"topic":"tp_demo_02","partition":2,"replicas":[1],"log_dirs":["any"]}]}
重新分区
[root@Linux122 ~]# kafka-reassign-partitions.sh --zookeeper Linux122:2181/myKafka --reassignment-json-file my-topics-to-move-execute.json --execute Current partition replica assignment {
"version":1,"partitions":[{
"topic":"tp_demo_02","partition":1,"replicas":[1],"log_dirs":["any"]},{
"topic":"tp_demo_02","partition":0,"replicas":[0],"log_dirs":["any"]},{
"topic":"tp_demo_02","partition":2,"replicas":[0],"log_dirs":["any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions. [root@Linux122 ~]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --describe --topic tp_demo_02 Topic:tp_demo_02 PartitionCount:3 ReplicationFactor:1 Configs: Topic: tp_demo_02 Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: tp_demo_02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: tp_demo_02 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
在生产中我们执行分区移动是耗时操作,同时我们还会进行验证操作
讯享网[root@Linux122 ~]# kafka-reassign-partitions.sh --zookeeper Linux122:2181/myKafka --reassignment-json-file my-topics-to-move-execute.json --verify Status of partition reassignment: Reassignment of partition tp_demo_02-1 completed successfully Reassignment of partition tp_demo_02-0 completed successfully Reassignment of partition tp_demo_02-2 completed successfully
我们细看执行分区移动命令提示Successfully started reassignment of partitions.
生产中我们可能会遇到verify分区移动情况时候提示in-progress状态
15.自动再均衡
我们有时候创建主题时候指定了分区的分配情况,但是由于时间长,不断的宕机,可能会发生Leader分区集中到少数broker上(或者恰好那个时刻某些broker没有宕机)
在以上情况存在的情况下我们需要对broker进行再均衡
模拟以上情况:
创建主题
[root@Linux122 ~]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --create --topic tp_demo_03 --replica-assignment "0:1,1:0,0:1" WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Created topic "tp_demo_03". [root@Linux122 ~]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --describe --topic tp_demo_03 Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs: Topic: tp_demo_03 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: tp_demo_03 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: tp_demo_03 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
我们将0号broker Kafka 进程 kill
讯享网[root@Linux122 ~]# jps 3202 Kafka 11346 Jps 1235 -- process information unavailable 3077 QuorumPeerMain 3719 ZooKeeperMain [root@Linux122 ~]# kill -9 3202
此刻我们发现各个分区启用了备用副本分区充当leader分区
[root@Linux122 ~]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --describe --topic tp_demo_03 Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs: Topic: tp_demo_03 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1 Topic: tp_demo_03 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1 Topic: tp_demo_03 Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1 [root@Linux122 ~]#
重新启动broker 0的kafka进程
我们发现主题分区没有发生改变,不会恢复之前的状态
以下我们可以对所有主题分区进行恢复创建主题时候的状态
讯享网[root@Linux122 ~]# kafka-preferred-replica-election.sh --zookeeper Linux122:2181/myKafka Created preferred replica election path with {
"version":1,"partitions":[{
"topic":"nptc-02","partition":4},{
"topic":"__consumer_offsets","partition":34},{
"topic":"nptc-01","partition":0},{
"topic":"__consumer_offsets","partition":36},{
"topic":"topic_2","partition":4},{
"topic":"__consumer_offsets","partition":27},{
"topic":"tp_user_01","partition":0},{
"topic":"__consumer_offsets","partition":1},{
"topic":"__consumer_offsets","partition":20},{
"topic":"__consumer_offsets","partition":7},{
"topic":"__consumer_offsets","partition":42},{
"topic":"tp_user_02","partition":0},{
"topic":"tp_demo_02","partition":2},{
"topic":"__consumer_offsets","partition":49},{
"topic":"tp_demo_66","partition":0},{
"topic":"topic_1","partition":0},{
"topic":"nptc-02","partition":0},{
"topic":"topic_2","partition":0},{
"topic":"topic-spring-01","partition":0},{
"topic":"__consumer_offsets","partition":4},{
"topic":"__consumer_offsets","partition":33},{
"topic":"tp_demo_03","partition":0},{
"topic":"__consumer_offsets","partition":14},{
"topic":"__consumer_offsets","partition":46},{
"topic":"tp_test03","partition":0},{
"topic":"__consumer_offsets","partition":24},{
"topic":"__consumer_offsets","partition":28},{
"topic":"tp_demo_02","partition":1},{
"topic":"__consumer_offsets","partition":6},{
"topic":"__consumer_offsets","partition":37},{
"topic":"__consumer_offsets","partition":43},{
"topic":"topic_2","partition":3},{
"topic":"__consumer_offsets","partition":21},{
"topic":"__consumer_offsets","partition":15},{
"topic":"__consumer_offsets","partition":11},{
"topic":"tp_demo_03","partition":1},{
"topic":"__consumer_offsets","partition":30},{
"topic":"nptc-02","partition":1},{
"topic":"__consumer_offsets","partition":2},{
"topic":"__consumer_offsets","partition":47},{
"topic":"tp_demo_66","partition":2},{
"topic":"__consumer_offsets","partition":25},{
"topic":"tp_user_01","partition":2},{
"topic":"__consumer_offsets","partition":29},{
"topic":"tp_test02","partition":0},{
"topic":"tp_test03","partition":1},{
"topic":"tp_demo_66","partition":4},{
"topic":"__consumer_offsets","partition":8},{
"topic":"__consumer_offsets","partition":23},{
"topic":"tp_demo_01","partition":0},{
"topic":"__consumer_offsets","partition":40},{
"topic":"__consumer_offsets","partition":31},{
"topic":"tp_demo_03","partition":2},{
"topic":"tp_demo_66","partition":3},{
"topic":"__consumer_offsets","partition":19},{
"topic":"__consumer_offsets","partition":16},{
"topic":"__consumer_offsets","partition":38},{
"topic":"tp_demo_02","partition":0},{
"topic":"nptc-02","partition":2},{
"topic":"topic_2","partition":2},{
"topic":"__consumer_offsets","partition":44},{
"topic":"tp_test03","partition":2},{
"topic":"__consumer_offsets","partition":10},{
"topic":"__consumer_offsets","partition":3},{
"topic":"__consumer_offsets","partition":35},{
"topic":"tp_user_03","partition":0},{
"topic":"tp_test02","partition":1},{
"topic":"__consumer_offsets","partition":26},{
"topic":"__consumer_offsets","partition":39},{
"topic":"tp_user_01","partition":1},{
"topic":"__consumer_offsets","partition":13},{
"topic":"__consumer_offsets","partition":17},{
"topic":"tp_demo_66","partition":1},{
"topic":"__consumer_offsets","partition":22},{
"topic":"__consumer_offsets","partition":9},{
"topic":"__consumer_offsets","partition":0},{
"topic":"__consumer_offsets","partition":41},{
"topic":"__consumer_offsets","partition":48},{
"topic":"__consumer_offsets","partition":18},{
"topic":"__consumer_offsets","partition":32},{
"topic":"__consumer_offsets","partition":12},{
"topic":"topic_2","partition":1},{
"topic":"nptc-02","partition":3},{
"topic":"__consumer_offsets","partition":45},{
"topic":"__consumer_offsets","partition":5}]} Successfully started preferred replica election for partitions Set(__consumer_offsets-32, __consumer_offsets-16, __consumer_offsets-49, __consumer_offsets-44, __consumer_offsets-28, tp_demo_02-2, tp_demo_66-2, topic_2-1, tp_user_01-1, __consumer_offsets-17, __consumer_offsets-23, __consumer_offsets-7, topic_2-2, nptc-02-2, nptc-02-3, __consumer_offsets-4, __consumer_offsets-29, __consumer_offsets-35, __consumer_offsets-3, topic_1-0, __consumer_offsets-24, __consumer_offsets-41, __consumer_offsets-0, __consumer_offsets-38, __consumer_offsets-13, __consumer_offsets-8, tp_demo_66-0, topic_2-4, __consumer_offsets-5, topic_2-3, tp_user_02-0, __consumer_offsets-39, __consumer_offsets-36, tp_user_01-0, tp_demo_66-4, __consumer_offsets-40, __consumer_offsets-45, __consumer_offsets-15, __consumer_offsets-33, __consumer_offsets-37, nptc-01-0, __consumer_offsets-21, tp_test03-1, tp_demo_03-1, tp_test02-0, __consumer_offsets-6, tp_test03-0, tp_demo_66-3, nptc-02-0, __consumer_offsets-11, tp_user_01-2, __consumer_offsets-20, tp_demo_03-0, __consumer_offsets-47, tp_demo_03-2, __consumer_offsets-2, __consumer_offsets-27, nptc-02-4, tp_demo_01-0, __consumer_offsets-34, nptc-02-1, __consumer_offsets-9, __consumer_offsets-22, __consumer_offsets-42, tp_demo_66-1, __consumer_offsets-14, tp_test03-2, __consumer_offsets-25, __consumer_offsets-10, __consumer_offsets-48, __consumer_offsets-31, __consumer_offsets-18, topic_2-0, __consumer_offsets-19, tp_user_03-0, topic-spring-01-0, __consumer_offsets-12, __consumer_offsets-46, __consumer_offsets-43, __consumer_offsets-1, __consumer_offsets-26, tp_demo_02-0, __consumer_offsets-30, tp_test02-1, tp_demo_02-1) 我们发下主题重新恢复到初始创建时候状态(再平衡) [root@Linux122 ~]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --describe --topic tp_demo_03 Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs: Topic: tp_demo_03 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 1,0 Topic: tp_demo_03 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: tp_demo_03 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 1,0
此外我们可以通过脚本的使用提示针对不同特定主题和特定分区进行再平衡
[root@Linux122 ~]# kafka-preferred-replica-election.sh This tool causes leadership for each partition to be transferred back to the 'preferred replica', it can be used to balance leadership among the servers. Option Description ------ ----------- --path-to-json-file <String: list of The JSON file with the list of partitions for which preferred partitions for which preferred replica leader election needs to be replica leader election should be triggered> done, in the following format - {
"partitions": [{
"topic": "foo", "partition": 1}, {
"topic": "foobar", "partition": 2}] } Defaults to all existing partitions --zookeeper <String: urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
讯享网[root@Linux122 ~]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --describe --topic tp_demo_03 Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs: Topic: tp_demo_03 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0 Topic: tp_demo_03 Partition: 1 Leader: 0 Replicas: 1,0 Isr: 0 Topic: tp_demo_03 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0
创建重新指定分区文件
[root@Linux122 ~]# vim preferred-replica.json [root@Linux122 ~]# cat preferred-replica.json {
"partitions": [{
"topic": "tp_demo_03", "partition": 1}] }
讯享网[root@Linux121 bin]# kafka-server-start.sh -daemon ../config/server.properties [root@Linux122 ~]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --describe --topic tp_demo_03 Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs: Topic: tp_demo_03 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: tp_demo_03 Partition: 1 Leader: 0 Replicas: 1,0 Isr: 0,1 Topic: tp_demo_03 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1 [root@Linux122 ~]#
此刻我们发现Isr恢复了增加了1节点
执行我们配置的指定再平衡初始状态分区信息文件
[root@Linux122 ~]# kafka-preferred-replica-election.sh --zookeeper Linux122:2181/myKafka --path-to-json-file preferred-replica.json Created preferred replica election path with {
"version":1,"partitions":[{
"topic":"tp_demo_03","partition":1}]} Successfully started preferred replica election for partitions Set(tp_demo_03-1) [root@Linux122 ~]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --describe --topic tp_demo_03 Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs: Topic: tp_demo_03 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: tp_demo_03 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1 Topic: tp_demo_03 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
讯享网kafka-topics.sh --zookeeper Linux122:2181/myKafka --alter --topic tp_demo_03 --replication-factor 3 Option "[replication-factor]" can't be used with option"[alter]"
我们创建一个3个分区factor为1的主题
[root@Linux122 ~]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --describe --topic tp_demo_04 Topic:tp_demo_04 PartitionCount:3 ReplicationFactor:1 Configs: Topic: tp_demo_04 Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: tp_demo_04 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: tp_demo_04 Partition: 2 Leader: 1 Replicas: 1 Isr: 1 [root@Linux122 ~]#
以上主题分区factor为1
我们修改factor为2
创建修改factor的文件(如果我们没有具体指定分区分配信息 可以利用脚本来查询建议分区分配情况)
讯享网[root@Linux122 ~]# cat increment-replication-factor.json {
"version":1, "partitions":[ {
"topic":"tp_demo_04","partition":0,"replicas":[0,1]}, {
"topic":"tp_demo_04","partition":1,"replicas":[0,1]}, {
"topic":"tp_demo_04","partition":2,"replicas":[1,0]} ] } [root@Linux122 ~]# kafka-reassign-partitions.sh --zookeeper Linux122:2181/myKafka --reassignment-json-file increment-replication-factor.json --execute Current partition replica assignment {
"version":1,"partitions":[{
"topic":"tp_demo_04","partition":1,"replicas":[0],"log_dirs":["any"]},{
"topic":"tp_demo_04","partition":2,"replicas":[1],"log_dirs":["any"]},{
"topic":"tp_demo_04","partition":0,"replicas":[1],"log_dirs":["any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions. 我们发现增加了对应分区的factor [root@Linux122 ~]# kafka-topics.sh --zookeeper Linux122:2181/myKafka --describe --topic tp_demo_04 Topic:tp_demo_04 PartitionCount:3 ReplicationFactor:2 Configs: Topic: tp_demo_04 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1,0 Topic: tp_demo_04 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: tp_demo_04 Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0 [root@Linux122 ~]#

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/17947.html