Debezium SQL Server Source Connector

Debezium SQL Server Source Connector 是一个可以抓取 SQL Server 数据库中现有数据的快照,然后监视并记录对该数据的所有后续行级更改的连接器。每个表的所有事件都记录在单独的 Apache Kafka® 主题中,应用程序和服务可以轻松使用它们。

  • Confluence 支持 Debezium SQL Server Source Connector 0.9.3 版及更高版本。
  • Azure SQL 托管实例支持 CDC,并受 SQL Server 源连接器支持。有关详细信息,请参阅功能比较:Azure SQL 数据库和 Azure SQL 托管实例
  • Debezium SQL Server 源连接器需要 CDC 功能才能运行。CDC 功能由 SQL Server 标准版(2016 SP1 及更高版本)或 SQL Server 企业版提供。

一、安装 Debezium SQL Server Connector

您可以使用 Confluence Hub 客户端安装 或 手动下载 ZIP 文件来安装此连接器。

方式1:使用 Confluent Hub CLI 安装

特别说明,因为 Debezium SQL Server Connector 的 Jar 需要 JDK 1.11+ 版本才能正常运行。

Confluence Hub 客户端的安装。默认情况下,它随 Confluence Enterprise 一起安装。

安装最新 ( latest) 连接器版本。

要安装latest连接器版本,请导航到 Confluence Platform 安装目录并运行以下命令:

confluent-hub install debezium/debezium-connector-sqlserver:latest \
   --component-dir /home/kafka/plugins \
   --worker-configs /usr/local/kafka_2.13-3.6.0/config/connect-distributed-prod.properties

您可以通过替换版本号来安装特定版本,latest如下例所示:

confluent-hub install debezium/debezium-connector-sqlserver:<version-number>

Debezium PostgreSQL 源连接器有特定的 ACL 要求。请参阅Debezium 源连接器的 ACL 要求,以确保满足指定的要求。

方式2:手动安装连接器

手动下载并解压连接器的压缩文件,然后按照说明进行操作。

https://debezium.io/releases/

https://debezium.io/releases/2.4/#releases

上传 /usr/local/src 目标并解压

cd /usr/local/src && tar -zxvf debezium-connector-sqlserver-2.4.2.Final-plugin.tar.gz -C  /home/kafka/plugins

方式3:命令安装连接器

通过Linux命令下载并解压连接器的压缩文件,然后按照说明进行操作。

上传 /usr/local/src 目标并解压

$ cd /usr/local/src && wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.4.2.Final/debezium-connector-sqlserver-2.4.2.Final-plugin.tar.gz --no-check-certificate
$ tar -zxvf debezium-connector-sqlserver-2.4.2.Final-plugin.tar.gz -C  /home/kafka/plugins

二、开启 SQL Server 数据库 CDC 功能

-- 开启cdc
USE YSPT
GO
EXEC sys.sp_cdc_enable_db
GO
--  配置具体的表
USE YSPT
GO
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name   = 'xueqi_info',
@role_name     = NULL,
@supports_net_changes = 1
GO
--  查询映射表
EXEC sys.sp_cdc_help_change_data_capture
-- 删除表映射
EXEC sys.sp_cdc_disable_table
    @source_schema = 'dbo',
    @source_name   =  'xueqi_info',
    @capture_instance = 'dbo.xueqi_info_CT'
    GO
-- 删除所有映射
USE YSPT
GO
EXEC sys.sp_cdc_disable_db
GO

三、创建 SQL Server Connector 监听

https://debezium.io/documentation/reference/2.4/connectors/sqlserver.html

{
    "name": "sst-sqlserver-connector",
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "database.hostname": "192.168.0.65",
        "database.port": "1433",
        "database.user": "sa",
        "database.password": "sa",
        "database.names": "YSPT",
        "topic.prefix": "sqlserver.table.name",
        "table.include.list": "dbo.yonghu_school,dbo.yonghu_info,dbo.jiaoshi_info,dbo.xuesheng_info,dbo.jiazhang_info,dbo.nianji_info,dbo.xuenianbanji_info,dbo.xueshengxuenian_info",
        "column.include.list":"dbo.yonghu_school.Valus,dbo.yonghu_school.Name,dbo.yonghu_school.Brief,dbo.yonghu_school.Intrd,dbo.yonghu_school.city,dbo.yonghu_school.tel,dbo.yonghu_info.Type,dbo.yonghu_info.School,dbo.yonghu_info.Mail_num,dbo.yonghu_info.Mail_pwd,dbo.yonghu_info.UserName,dbo.jiaoshi_info.School,dbo.jiaoshi_info.code,dbo.jiaoshi_info.Name,dbo.jiaoshi_info.tel1,dbo.jiaoshi_info.tel2,dbo.jiaoshi_info.sex,dbo.jiaoshi_info.Dept,dbo.jiaoshi_info.Duty,dbo.jiaoshi_info.Birthday,dbo.jiaoshi_info.Native",
        "skip.messages.without.change":true,
        "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
        "schema.history.internal.kafka.topic": "sqlserver.schemahistory.fullfillment",
        "database.encrypt":false
    }
}

参考资料

https://docs.confluent.io/kafka-connectors/debezium-sqlserver-source/current/overview.html
https://docs.confluent.io/kafka-connectors/debezium-sqlserver-source/current/sqlserver_source_connector_config.html

作者:Jeebiz  创建时间:2023-12-07 12:28
最后编辑:Jeebiz  更新时间:2024-09-23 10:03