在上篇文章中我们介绍了使用TiDB Binlog将数据同步至下游的Mysql 中,本篇我们学习下使用TiDB Binlog工具将数据同步至Kafka中自定义业务逻辑,比如可以做TIDB和ES、MongoDB 或 Redis的数据同步,这功能就和Canal解析Mysql的binlog功能相差不大。如果还不了解TiDB Binlog工具的也可以参考我的上篇博客:
注意:在做实验前,请确保已经配置好Kafka环境:不了解的可以参考下面一篇我的博客:
消息中间件KafKa集群搭建与使用:
在上篇文章中,我们使用tiup 扩容出了一个pump 和 一个 drainer,我们先看下现在的集群架构:

下面我们再讲下扩容的方式,没有安装pump 和 drainer的就用看下面的方式:
编写扩容配置
写入以下内容:
注意 storage.stop-write-at-available-space 这个参数表示存储空间低于指定值时不再接收 binlog 写入请求,默认为10G ,如果硬盘没这么大,就调小一点。
开始扩容:
等待一会就可以看到集群中已经有pump 和 drainer了:

修改 server_configs 的配制:

使用mysql 客户端连接tidb,查看bnlog是否已经开启:

看下pump和drainer的状态:



下载官方demo
https://github.com/pingcap/tidb-tools/tree/master/tidb-binlog/driver/example/kafkaReader
官方demo是直接用的Java Kafka Api,本篇我们使用SpringBoot 的 spring-kafka 。

POM文件引入的主要依赖:
application配制信息:
注意consumer.value-deserializer这个要使用ByteArrayDeserializer,主要发送端就是byte[],我们只能配合:
日志监听:

添加数据:
BinLogInfo.Binlog toString信息:



BinLogInfo.Binlog toString信息:


版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/140691.html