Debezium Cassandra Connector

Debezium Cassandra Source Connector 是一个可以抓取 Cassanadra 数据库中现有数据的快照,然后监视并记录对该数据的所有后续行级更改的连接器。连接器第一次连接到 Cassandra 节点时,它会对所有键空间中所有启用 CDC 的表执行快照。连接器还将读取写入 Cassandra 提交日志的更改并生成相应的插入、更新和删除事件。每个表的所有事件都记录在单独的 kafka 主题中,应用程序和服务可以轻松使用它们。

概述

Cassandra 是一个开源 NoSQL 数据库。与大多数数据库类似,Cassandra 的写入路径从立即将更改记录到其提交日志中开始。提交日志驻留在每个节点本地,记录对该节点进行的每次写入。

从 Cassandra 3.0 开始,引入了变更数据捕获 (CDC) 功能。可以通过设置表属性在表级别启用 CDC 功能cdc=true,此后包含启用 CDC 的表的数据的任何提交日志都将移动到在cassandra.yaml丢弃时指定的 CDC 目录。

Cassandra 连接器驻留在每个 Cassandra 节点上并监视cdc_raw目录的更改。它处理检测到的所有本地提交日志段,为提交日志中的每个行级插入、更新和删除操作生成更改事件,在单独的 Kafka 主题中发布每个表的所有更改事件,最后删除从目录提交日志cdc_raw。最后一步很重要,因为一旦启用 CDC,Cassandra 本身就无法清除提交日志。如果cdc_free_space_in_mb已满,则对启用 CDC 的表的写入将被拒绝。

连接器可以容忍故障。当连接器读取提交日志并生成事件时,它会记录每个提交日志段的文件名和位置以及每个事件。如果连接器因任何原因(包括通信故障、网络问题或崩溃)而停止,则重新启动后,它只会继续读取上次停止的提交日志。这包括快照:如果连接器停止时快照未完成,则重新启动后它将开始新的快照。稍后我们将讨论出现问题时连接器的行为方式。

一、安装 Debezium Cassandra Connector

你可以手动下载压缩文件来安装此连接器,也可以通过Linux命令来安装。

方式1:手动安装连接器

手动下载并解压连接器的压缩文件,然后按照说明进行操作。

https://debezium.io/releases/

https://debezium.io/releases/2.4/#releases

上传 /usr/local/src 目标并解压

# Cassandra 3
$ cd /usr/local/src && tar -zxvf debezium-connector-cassandra-3-2.4.2.Final-plugin.tar.gz -C  /home/kafka/plugins
# Cassandra 4
$ cd /usr/local/src && tar -zxvf debezium-connector-cassandra-4-2.4.2.Final-plugin.tar.gz -C /home/kafka/plugins

方式2:命令安装连接器

通过Linux命令下载并解压连接器的压缩文件,然后按照说明进行操作。

上传 /usr/local/src 目标并解压

# Cassandra 3
$ cd /usr/local/src && wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-cassandra-3/2.4.2.Final/debezium-connector-cassandra-3-2.4.2.Final-plugin.tar.gz --no-check-certificate
$ tar -zxvf debezium-connector-cassandra-3-2.4.2.Final-plugin.tar.gz -C  /home/kafka/plugins
# Cassandra 4
$ cd /usr/local/src && wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-cassandra-4/2.4.2.Final/debezium-connector-cassandra-4-2.4.2.Final-plugin.tar.gz --no-check-certificate
$ tar -zxvf debezium-connector-cassandra-4-2.4.2.Final-plugin.tar.gz -C  /home/kafka/plugins

二、开启 MySQL 数据库 binlog 功能

log-bin=mysql-bin
binlog_format=row
server-id = 1
binlog_row_image=FULL

三、创建 MySQL Connector 监听

https://debezium.io/documentation/reference/2.4/connectors/mysql.html

{
    "name": "dw-mysql-connector", 
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "database.hostname": "localhost", 
        "database.port": "4408", 
        "database.user": "root", 
        "database.password": "1caaf39927054ccb", 
        "database.server.id": "184054", 
        "database.server.name": "lyexcel",
        "topic.prefix": "mysql.table.name",  
        "database.include.list": "lyexcel", 
        "schema.history.internal.kafka.bootstrap.servers": "localhost:9092", 
        "schema.history.internal.kafka.topic": "mysql.schemahistory.fullfillment", 
        "include.schema.changes": "true" ,
        "table.include.list": "lyexcel.parm_xuenian,lyexcel.yonghu_school",
        "skip.messages.without.change":false,
        "column.include.list": "lyexcel.parm_xuenian.xuenian,lyexcel.parm_xuenian.xueqi,lyexcel.yonghu_school.Name,lyexcel.yonghu_school.Valus"
    }
}

参考资料

https://debezium.io/documentation/reference/2.4/connectors/cassandra.html

作者:Jeebiz  创建时间:2023-12-14 20:48
最后编辑:Jeebiz  更新时间:2024-09-23 10:03