Debezium MongoDB Source Connector

Debezium MongoDB Connector 可以监视 MongoDB 副本集或 MongoDB 分片集群以了解数据库和集合中的文档更改,并将这些更改记录为 Apache Kafka® 主题中的事件。连接器自动处理分片集群中分片的添加或删除、每个副本集成员资格的更改、每个副本集中的选举,并在发生通信问题时进行动态调整。
Debezium MongoDB Connector 使用 MongoDB 的 oplog 来捕获更改。由于它利用 MongoDB 的复制机制,因此该连接器仅适用于 MongoDB 副本集或分片集群。
Debezium MongoDB Connector 无法监视独立 MongoDB 服务器的更改,因为独立服务器没有 oplog。如果独立服务器转换为具有一名成员的副本集,则连接器将起作用。

  • Confluence 支持 MongoDB 连接器版本 0.9.3 及更高版本。
  • Confluence 支持将此连接器与 MongoDB 3.4 或更高版本一起使用。

一、安装 Debezium MongoDB Connector

您可以使用 Confluence Hub 客户端安装 或 手动下载 ZIP 文件来安装此连接器。

方式1:使用 Confluent Hub CLI 安装

Confluence Hub 客户端的安装。默认情况下,它随 Confluence Enterprise 一起安装。

安装最新 (latest) 连接器版本。

要安装latest连接器版本,请导航到 Confluence Platform 安装目录并运行以下命令:

confluent-hub install debezium/debezium-connector-mongodb:latest \
   --component-dir /home/kafka/plugins \
   --worker-configs /usr/local/kafka_2.13-3.6.0/config/connect-distributed-prod.properties

您可以通过替换版本号来安装特定版本,latest如下例所示:

confluent-hub install debezium/debezium-connector-mongodb:<version-number>

Debezium MongoDB Source Connector 有特定的 ACL 要求。请参阅Debezium 源连接器的 ACL 要求,以确保满足指定的要求。

方式2:手动安装连接器

手动下载并解压连接器的压缩文件,然后按照说明进行操作。

https://debezium.io/releases/

https://debezium.io/releases/2.4/#releases

上传 /usr/local/src 目标并解压

cd /usr/local/src && tar -zxvf debezium-connector-mongodb-2.4.2.Final-plugin.tar.gz -C /home/kafka/plugins

方式3:命令安装连接器

通过Linux命令下载并解压连接器的压缩文件,然后按照说明进行操作。

上传 /usr/local/src 目标并解压

$ cd /usr/local/src && wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mongodb/2.4.2.Final/debezium-connector-mongodb-2.4.2.Final-plugin.tar.gz --no-check-certificate
$ tar -zxvf debezium-connector-mongodb-2.4.2.Final-plugin.tar.gz -C  /home/kafka/plugins

二、开启 MySQL 数据库 binlog 功能

log-bin=mysql-bin
binlog_format=row
server-id = 1
binlog_row_image=FULL

三、创建 MySQL Connector 监听

https://debezium.io/documentation/reference/2.4/connectors/mongodb.html

{
    "name": "dw-mysql-connector", 
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "database.hostname": "localhost", 
        "database.port": "4408", 
        "database.user": "root", 
        "database.password": "1caaf39927054ccb", 
        "database.server.id": "184054", 
        "database.server.name": "lyexcel",
        "topic.prefix": "mysql.table.name",  
        "database.include.list": "lyexcel", 
        "schema.history.internal.kafka.bootstrap.servers": "localhost:9092", 
        "schema.history.internal.kafka.topic": "mysql.schemahistory.fullfillment", 
        "include.schema.changes": "true" ,
        "table.include.list": "lyexcel.parm_xuenian,lyexcel.yonghu_school",
        "skip.messages.without.change":false,
        "column.include.list": "lyexcel.parm_xuenian.xuenian,lyexcel.parm_xuenian.xueqi,lyexcel.yonghu_school.Name,lyexcel.yonghu_school.Valus"
    }
}

参考资料

https://docs.confluent.io/kafka-connectors/debezium-mysql-source/current/overview.html
https://docs.confluent.io/kafka-connectors/debezium-mysql-source/current/mysql_source_connector_config.html

作者:Jeebiz  创建时间:2023-12-07 13:59
最后编辑:Jeebiz  更新时间:2024-09-23 10:03