Databend Kafka Connect
现在我们也为 Databend 提供了 Kafka Connect Sink Plugin,这篇文章我们将会介绍如何使用 MySQL JDBC Source Connector
和 Databend Sink Connector
构建实时的数据同步管道。
配置 Connector
MySQL Source Connector
1、安装 MySQL Source Connector Plugin
这里我们使用 Confluent 提供的 JDBC Source Connector。
从 Confluent hub 下载 Kafka Connect JDBC 插件并将 zip 文件解压到 /path/kafka/libs 目录下。
2、安装 MySQL JDBC Driver
因为 Connector 需要与数据库进行通信,所以还需要 JDBC 驱动程序。JDBC Connector 插件也没有内置 MySQL 驱动程序,需要我们单独下载驱动程序。MySQL 为许多平台提供了 JDBC 驱动程序。选择 Platform Independent 选项,然后下载压缩的 TAR 文件。该文件包含 JAR 文件和源代码。将此 tar.gz 文件的内容解压到一个临时目录。将 jar 文件(例如,mysql-connector-java-8.0.17.jar
),并且仅将此 JAR 文件复制到与 kafka-connect-jdbc jar 文件相同的 libs 目录下:
cp mysql-connector-j-8.0.32.jar /opt/homebrew/Cellar/kafka/3.4.0/libexec/libs/
3、配置 MySQL Connector
在 /path/kafka/config
下创建 mysql.properties
配置文件,并使用下面的配置:
name=test-source-mysql-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/mydb?useSSL=false
connection.user=root
connection.password=123456
#mode=timestamp+incrementing
mode=incrementing
table.whitelist=mydb.test_kafka
poll.interval.ms=1000
table.poll.interval.ms=3000
incrementing.column.name=id
#timestamp.column.name=tms
topics=test_kafka
针对配置我们这里重点介绍 mode , incrementing.column.name ,和 timestamp.column.name 几个字段。Kafka Connect MySQL JDBC Source 提供了三种增量同步模式:
incrementingtimestamptimestamp+incrementing
4、在 incrementing 模式下,每次都是根据 incrementing.column.name 参数指定的列,查询大于自上次拉取的最大id:
SELECT * FROM mydb.test_kafka
WHERE id > ?
ORDER BY id ASC
这种模式的缺点是无法捕获行上更新操作(例如,UPDATE、DELETE)的变更,因为无法增大该行的 id。
5、timestamp 模式基于表上时间戳列来检测是否是新行或者修改的行。该列最好是随着每次写入而更新,并且值是单调递增的。需要使用 timestamp.column.name 参数指定时间戳列。
需要注意的是时间戳列在数据表中不能设置为 Nullable.
在 timestamp 模式下,每次都是根据 timestamp.column.name 参数指定的列,查询大于自上次拉取成功的 gmt_modified:
SELECT * FROM mydb.test_kafka
WHERE tms > ? AND tms < ?
ORDER BY tms ASC
这种模式可以捕获行上 UPDATE 变更,缺点是可能造成数据的丢失。由于时间戳列不是唯一列字段,可能存在相同时间戳的两列或者多列,假设在导入第二条的过程中发生了崩溃,在恢复重新导入时,拥有相同时间戳的第二条以及后面几条数据都会丢失。这是因为第一条导入成功后,对应的时间戳会被记录已成功消费,恢复后会从大于该时间戳的记录开始同步。此外,也需要确保时间戳列是随着时间递增的,如果人为的修改时间戳列小于当前同步成功的最大时间戳,也会导致该变更不能同步。
- 仅使用 incrementing 或 timestamp 模式都存在缺陷。将 timestamp 和 incrementing 一起使用,可以充分利用 incrementing 模式不丢失数据的优点以及 timestamp 模式捕获更新操作变更的优点。需要使用 incrementing.column.name 参数指定严格递增列、使用 timestamp.column.name 参数指定时间戳列。
SELECT * FROM mydb.test_kafka
WHERE tms < ?
AND ((tms = ? AND id > ?) OR tms > ?)
ORDER BY tms, id ASC
由于 MySQL JDBC Source Connector 是基于 query-based 的数据获取方式,使用 SELECT 查询来检索数据,并没有复杂的机制来检测已删除的行,所以不支持 DELETE 操作。可以使用基于 log-based 的 [Kafka Connect Debezium]。
后面的演示中会分别演示上述模式的效果。更多的配置参数可以参考 MySQL Source Configs 。
Databend Kafka Connector
安装 OR 编译 Databend Kafka Connector
1、可以从源码编译得到 jar 或者从 release 直接下载。
git clone https://github.com/databendcloud/databend-kafka-connect.git & cd databend-kafka-connect
mvn -Passembly -Dmaven.test.skip package
将 databend-kafka-connect.jar
拷贝至 /path/kafka/libs
目录下。
2、安装 Databend JDBC Driver
从 Maven Central 下载最新的 Databend JDBC 并拷贝至 /path/kafka/libs 目录下。
3、配置 Databend Kafka Connector
在 /path/kafka/config
下创建 mysql.properties
配置文件,并使用下面的配置:
name=databend
connector.class=com.databend.kafka.connect.DatabendSinkConnector
connection.url=jdbc:databend://localhost:8000
connection.user=databend
connection.password=databend
connection.attempts=5
connection.backoff.ms=10000
connection.database=default
table.name.format=default.${topic}
max.retries=10
batch.size=1
auto.create=true
auto.evolve=true
insert.mode=upsert
pk.mode=record_value
pk.fields=id
topics=test_kafka
errors.tolerance=all
auto.create
和 auto.evolve
设置成 true
后会自动建表并在源表结构发生变化时同步到目标表。关于更多配置参数的介绍可以参考 Databend Kafka Connect Properties。
测试 Databend Kafka Connect
准备各个组件
1、启动 MySQL
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
2、启动 Databend
version: '3'
services:
databend:
image: datafuselabs/databend
volumes:
- /Users/hanshanjie/databend/local-test/databend/databend-query.toml:/etc/databend/query.toml
environment:
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
MINIO_ENABLED: 'true'
ports:
- '8000:8000'
- '9000:9000'
- '3307:3307'
- '8124:8124'
3、以 standalone 模式启动 Kafka Connect,并加载 MySQL Source Connector
和 Databend Sink Connector
:
./bin/connect-standalone.sh config/connect-standalone.properties config/databend.properties config/mysql.properties
[2023-09-06 17:39:23,128] WARN [databend|task-0] These configurations '[metrics.context.connect.kafka.cluster.id]' were supplied but are not used yet. (org.apache.kafka.clients.consumer.ConsumerConfig:385)
[2023-09-06 17:39:23,128] INFO [databend|task-0] Kafka version: 3.4.0 (org.apache.kafka.common.utils.AppInfoParser:119)
[2023-09-06 17:39:23,128] INFO [databend|task-0] Kafka commitId: 2e1947d240607d53 (org.apache.kafka.common.utils.AppInfoParser:120)
[2023-09-06 17:39:23,128] INFO [databend|task-0] Kafka startTimeMs: 1693993163128 (org.apache.kafka.common.utils.AppInfoParser:121)
[2023-09-06 17:39:23,148] INFO Created connector databend (org.apache.kafka.connect.cli.ConnectStandalone:113)
[2023-09-06 17:39:23,148] INFO [databend|task-0] [Consumer clientId=connector-consumer-databend-0, groupId=connect-databend] Subscribed to topic(s): test_kafka (org.apache.kafka.clients.consumer.KafkaConsumer:969)
[2023-09-06 17:39:23,150] INFO [databend|task-0] Starting Databend Sink task (com.databend.kafka.connect.sink.DatabendSinkConfig:33)
[2023-09-06 17:39:23,150] INFO [databend|task-0] DatabendSinkConfig values:...
Insert
Insert 模式下我们需要使用如下的 MySQL Connector 配置:
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/mydb?useSSL=false
connection.user=root
connection.password=123456
#mode=timestamp+incrementing
mode=incrementing
table.whitelist=mydb.test_kafka
poll.interval.ms=1000
table.poll.interval.ms=3000
incrementing.column.name=id
#timestamp.column.name=tms
topics=test_kafka
在 MySQL 中创建数据库 mydb
和表 test_kafka
:
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE test_kafka (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE test_kafka AUTO_INCREMENT = 10;
在插入数据之前,databend-kafka-connect 并不会收到 event 进行建表和数据写入。
插入数据:
INSERT INTO test_kafka VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");
源表端插入数据后,
Databend 目标端的表就新建出来了:
同时数据也会成功插入:
Support DDL
我们在配置文件中 auto.evolve=true
,所以在源表结构发生变化的时候,会将 DDL 同步至目标表。这里我们正好需要将 MySQL Source Connector 的模式从 incrementing
改成 timestamp+incrementing
,需要新增一个 timestamp 字段并打开 timestamp.column.name=tms
配置。我们在原表中执行:
alter table test_kafka add column tms timestamp;
并插入一条数据:
insert into test_kafka values(20,"new data","from kafka",now());
到目标表中查看:
发现 tms 字段已经同步至 Databend table,并且该条数据也已经插入成功:
Upsert
修改 MySQL Connector 的配置为:
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/mydb?useSSL=false
connection.user=root
connection.password=123456
mode=timestamp+incrementing
#mode=incrementing
table.whitelist=mydb.test_kafka
poll.interval.ms=1000
table.poll.interval.ms=3000
incrementing.column.name=id
timestamp.column.name=tms
topics=test_kafka
主要是将 mode
改为 timestamp+incrementing
并添加 timestamp.column.name
字段。
重启 Kafka Connect。
在源表中更新一条数据:
update test_kafka set name="update from kafka test" where id=20;
到目标表中可以看到更新的数据:
总结
通过上面的内容可以看到 Databend Kafka Connect 具有以下特性:
- Table 和 Column 支持自动创建: auto.create 和 auto-evolve 的配置支持下,可以自动创建 Table 和 Column,Table name是基于 Kafka topic name 创建的;
- Kafka Shemas 支持: Connector 支持 Avro、JSON Schema 和 Protobuf 输入数据格式。必须启用 Schema Registry 才能使用基于 Schema Registry 的格式;
- 多个写入模式: Connector 支持 insert 和 upsert 写入模式;
- 多任务支持: 在 Kafka Connect 的能力下,Connector 支持运行一个或多个任务。增加任务的数量可以提高系统性能;
- 高可用: 分布式模式下可以自动平衡工作负载,并可以动态扩展(或缩减)以及提供容错能力。
同时,Databend Kafka Connect 也能够使用原生 Connect 支持的配置,更多配置参考 Kafka Connect Sink Configuration Properties for Confluent Platform。
相关资料
https://zhuanlan.zhihu.com/p/660303784
最后编辑:Jeebiz 更新时间:2024-11-01 10:06