Debezium SQL Server Source Connector
Debezium SQL Server Source Connector 是一个可以抓取 SQL Server 数据库中现有数据的快照,然后监视并记录对该数据的所有后续行级更改的连接器。每个表的所有事件都记录在单独的 Apache Kafka® 主题中,应用程序和服务可以轻松使用它们。
- Confluence 支持 Debezium SQL Server Source Connector 0.9.3 版及更高版本。
- Azure SQL 托管实例支持 CDC,并受 SQL Server 源连接器支持。有关详细信息,请参阅功能比较:Azure SQL 数据库和 Azure SQL 托管实例。
- Debezium SQL Server 源连接器需要 CDC 功能才能运行。CDC 功能由 SQL Server 标准版(2016 SP1 及更高版本)或 SQL Server 企业版提供。
一、安装 Debezium SQL Server Connector
您可以使用 Confluence Hub 客户端安装 或 手动下载 ZIP 文件来安装此连接器。
方式1:使用 Confluent Hub CLI 安装
特别说明,因为 Debezium SQL Server Connector 的 Jar 需要 JDK 1.11+ 版本才能正常运行。
Confluence Hub 客户端的安装。默认情况下,它随 Confluence Enterprise 一起安装。
安装最新 ( latest) 连接器版本。
要安装latest连接器版本,请导航到 Confluence Platform 安装目录并运行以下命令:
confluent-hub install debezium/debezium-connector-sqlserver:latest \
--component-dir /home/kafka/plugins \
--worker-configs /usr/local/kafka_2.13-3.6.0/config/connect-distributed-prod.properties
您可以通过替换版本号来安装特定版本,latest如下例所示:
confluent-hub install debezium/debezium-connector-sqlserver:<version-number>
Debezium PostgreSQL 源连接器有特定的 ACL 要求。请参阅Debezium 源连接器的 ACL 要求,以确保满足指定的要求。
方式2:手动安装连接器
手动下载并解压连接器的压缩文件,然后按照说明进行操作。
https://debezium.io/releases/2.4/#releases
上传 /usr/local/src
目标并解压
cd /usr/local/src && tar -zxvf debezium-connector-sqlserver-2.4.2.Final-plugin.tar.gz -C /home/kafka/plugins
方式3:命令安装连接器
通过Linux命令下载并解压连接器的压缩文件,然后按照说明进行操作。
上传 /usr/local/src
目标并解压
$ cd /usr/local/src && wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.4.2.Final/debezium-connector-sqlserver-2.4.2.Final-plugin.tar.gz --no-check-certificate
$ tar -zxvf debezium-connector-sqlserver-2.4.2.Final-plugin.tar.gz -C /home/kafka/plugins
二、开启 SQL Server 数据库 CDC 功能
-- 开启cdc
USE YSPT
GO
EXEC sys.sp_cdc_enable_db
GO
-- 配置具体的表
USE YSPT
GO
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'xueqi_info',
@role_name = NULL,
@supports_net_changes = 1
GO
-- 查询映射表
EXEC sys.sp_cdc_help_change_data_capture
-- 删除表映射
EXEC sys.sp_cdc_disable_table
@source_schema = 'dbo',
@source_name = 'xueqi_info',
@capture_instance = 'dbo.xueqi_info_CT'
GO
-- 删除所有映射
USE YSPT
GO
EXEC sys.sp_cdc_disable_db
GO
三、创建 SQL Server Connector 监听
https://debezium.io/documentation/reference/2.4/connectors/sqlserver.html
{
"name": "sst-sqlserver-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "192.168.0.65",
"database.port": "1433",
"database.user": "sa",
"database.password": "sa",
"database.names": "YSPT",
"topic.prefix": "sqlserver.table.name",
"table.include.list": "dbo.yonghu_school,dbo.yonghu_info,dbo.jiaoshi_info,dbo.xuesheng_info,dbo.jiazhang_info,dbo.nianji_info,dbo.xuenianbanji_info,dbo.xueshengxuenian_info",
"column.include.list":"dbo.yonghu_school.Valus,dbo.yonghu_school.Name,dbo.yonghu_school.Brief,dbo.yonghu_school.Intrd,dbo.yonghu_school.city,dbo.yonghu_school.tel,dbo.yonghu_info.Type,dbo.yonghu_info.School,dbo.yonghu_info.Mail_num,dbo.yonghu_info.Mail_pwd,dbo.yonghu_info.UserName,dbo.jiaoshi_info.School,dbo.jiaoshi_info.code,dbo.jiaoshi_info.Name,dbo.jiaoshi_info.tel1,dbo.jiaoshi_info.tel2,dbo.jiaoshi_info.sex,dbo.jiaoshi_info.Dept,dbo.jiaoshi_info.Duty,dbo.jiaoshi_info.Birthday,dbo.jiaoshi_info.Native",
"skip.messages.without.change":true,
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "sqlserver.schemahistory.fullfillment",
"database.encrypt":false
}
}
参考资料
https://docs.confluent.io/kafka-connectors/debezium-sqlserver-source/current/overview.html
https://docs.confluent.io/kafka-connectors/debezium-sqlserver-source/current/sqlserver_source_connector_config.html
最后编辑:Jeebiz 更新时间:2024-11-01 10:06