Debezium MySQL Source Connector
Debezium MySQL Source Connector 是一个可以抓取 MySQL 数据库中现有数据的快照,然后监视并记录对该数据的所有后续行级更改的连接器。第一次连接到 MySQL 服务器时,它会读取所有数据库的一致快照。该快照完成后,连接器会持续读取提交到 MySQL 的更改并生成相应的插入、更新和删除事件。每个表的所有事件都记录在单独的 Apache Kafka® 主题中,应用程序和服务可以轻松使用它们。
- Confluence 支持 MySQL 连接器版本 0.9.3 及更高版本。
- Confluence 支持将此连接器与 MySQL 5.6 或更高版本一起使用。
一、安装 Debezium MySQL Connector
您可以使用 Confluence Hub 客户端安装 或 手动下载 ZIP 文件来安装此连接器。
方式1:使用 Confluent Hub CLI 安装
Confluence Hub 客户端的安装。默认情况下,它随 Confluence Enterprise 一起安装。
安装最新 (latest) 连接器版本。
要安装latest连接器版本,请导航到 Confluence Platform 安装目录并运行以下命令:
confluent-hub install debezium/debezium-connector-mysql:latest \
--component-dir /home/kafka/plugins \
--worker-configs /usr/local/kafka_2.13-3.6.0/config/connect-distributed-prod.properties
您可以通过替换版本号来安装特定版本,latest如下例所示:
confluent-hub install debezium/debezium-connector-mysql:<version-number>
Debezium MySQL Source Connector 有特定的 ACL 要求。请参阅Debezium 源连接器的 ACL 要求,以确保满足指定的要求。
方式2:手动安装连接器
手动下载并解压连接器的压缩文件,然后按照说明进行操作。
https://debezium.io/releases/2.4/#releases
上传 /usr/local/src
目标并解压
cd /usr/local/src && tar -zxvf debezium-connector-mysql-2.4.2.Final-plugin.tar.gz -C /home/kafka/plugins
方式3:命令安装连接器
通过Linux命令下载并解压连接器的压缩文件,然后按照说明进行操作。
上传 /usr/local/src
目标并解压
$ cd /usr/local/src && wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.4.2.Final/debezium-connector-mysql-2.4.2.Final-plugin.tar.gz --no-check-certificate
$ tar -zxvf debezium-connector-mysql-2.4.2.Final-plugin.tar.gz -C /home/kafka/plugins
二、开启 MySQL 数据库 binlog 功能
log-bin=mysql-bin
binlog_format=row
server-id = 1
binlog_row_image=FULL
三、创建 MySQL Connector 监听
https://debezium.io/documentation/reference/2.4/connectors/mysql.html
{
"name": "dw-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "4408",
"database.user": "root",
"database.password": "1caaf39927054ccb",
"database.server.id": "184054",
"database.server.name": "lyexcel",
"topic.prefix": "mysql.table.name",
"database.include.list": "lyexcel",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "mysql.schemahistory.fullfillment",
"include.schema.changes": "true" ,
"table.include.list": "lyexcel.parm_xuenian,lyexcel.yonghu_school",
"skip.messages.without.change":false,
"column.include.list": "lyexcel.parm_xuenian.xuenian,lyexcel.parm_xuenian.xueqi,lyexcel.yonghu_school.Name,lyexcel.yonghu_school.Valus"
}
}
参考资料
https://docs.confluent.io/kafka-connectors/debezium-mysql-source/current/overview.html
https://docs.confluent.io/kafka-connectors/debezium-mysql-source/current/mysql_source_connector_config.html
最后编辑:Jeebiz 更新时间:2024-11-01 10:06