使用 Debezium SQL Server 连接器将 Apache Kafka 连接到 SQL Server

Apache Kafka 支持借助各种内置连接器与 Microsoft SQL Server 和众多其他数据库/数据仓库连接。这些连接器有助于将数据从您选择的来源引入Apache Kafka,然后将其从 Kafka 主题流式传输到您选择的目的地。

可以通过以下步骤实现:

  • 步骤 1:配置 Microsoft SQL Server 以启用 CDC
  • 步骤 2:在工作站上安装 Apache Kafka
  • 步骤 3:为 Apache Kafka 安装 Debezium Microsoft SQL Server 连接器
  • 步骤 4:启动 Apache Kafka、Zookeeper 和 Connect 服务器
  • 步骤 5:将 Apache Kafka 集成到 SQL Server 以开始提取数据

步骤 1:配置 Microsoft SQL Server 以启用 CDC

要配置您的 Microsoft SQL Server 实例,请在系统上启动Microsoft SQL Server Management Studio,然后通过右键单击对象资源管理器中的服务器选项来选择属性选项。

单击它后,屏幕上将打开一个新窗口,其中显示服务器属性。要允许 Debezium 的 SQL Server 连接器从 Microsoft SQL Server 实例实时获取数据,请单击“安全”选项并选择“SQL Server 和 Windows 身份验证”作为所需的身份验证机制。

现在重新启动 Microsoft SQL Server 实例以使更改生效。现在 Microsoft SQL Server Management Studio 已启动并运行,展开对象资源管理器中的“安全”文件夹,然后右键单击“登录”文件夹。

现在您需要为 Microsoft SQL Server 创建新的登录名。为此,请添加新用户名“testuser”以及您选择的密码。

单击左侧面板中的服务器角色选项,然后从屏幕上可用的选项列表中选择“sysadmin”,然后单击确定。

完成所有必要的配置后,您现在需要启用 CDC 功能。为此,使用 Windows Powershell 启动记事本应用程序并创建一个新的 SQL 文件“Test_DB.sql”。将以下代码行添加到您的文件中并保存:

# Creating a temporary test database 
CREATE DATABASE Test_DB;
GO
USE Test_DB;
EXEC sys.sp_cdc_enable_db;ALTER DATABASE Test_DB
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)

# Creating a temporary table & enabling CDC for the same
CREATE TABLE prospects (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO prospects(first_name,last_name,email)
  VALUES ('Jeff','Thomas','thomas.jeff@afer.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Sasha','Bailey','s.bailey@foobar.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Edward','Hart','edhart@walker.com');

EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'fruit', @role_name = NULL, @supports_net_changes = 0;
GO

现在您的 SQL 文件已准备就绪,请使用以下代码行重新启动您的 Microsoft SQL Server 实例:

net stop MSSQLSERVER
net start MSSQLSERVER

启动 Microsoft SQL 服务代理并使用以下代码行执行“Test_DB.sql”文件:

net start SQLSERVERAGENT
sqlcmd -U testuser -P {your password} -i {location of Test_DB.sql}

这是配置 Microsoft SQL Server 并启用 CDC 功能以将 Apache Kafka 连接到 SQL Server 的方法。

步骤 2:在工作站上安装 Apache Kafka

现在您的 Microsoft SQL Server 已准备就绪,您需要下载并安装 Apache Kafka,无论是独立模式还是分布式模式。您可以查看以下链接并遵循 Apache Kafka 的官方文档,这将帮助您开始安装过程:

步骤 3:为 Apache Kafka 安装 Debezium Microsoft SQL Server 连接器

Confluent 为用户提供了一组多样化的内置连接器,这些连接器可充当源或接收器,并允许用户通过 Apache Kafka 将数据从所需的数据源(例如 Microsoft SQL Server)传输到他们选择的目的地。允许用户将 Apache Kafka 连接到 SQL Server 的一个连接器是 Apache Kafka 的 Debezium SQL Server 连接器。

要安装 Debezium SQL Server 连接器,请转到 Confluent Hub 的 官方网站 ,并使用屏幕顶部的搜索栏搜索 Microsoft SQL Server。如果您发现很难找到正确的连接器,您可以 单击此处 直接转到连接器的页面。

找到所需的 Debezium SQL Server 连接器后,您现在可以通过在 Confluent CLI 上执行以下命令来下载连接器:

confluent-hub install debezium/debezium-connector-sqlserver:1.2.2

如果您的系统上没有安装 Confluent CLI,您可以 单击此处 并按照 Confluent 的官方文档进行操作,它将帮助您安装和配置 Confluent CLI。

您也可以单击下载按钮来安装连接器。单击后,系统将开始下载一个 zip 文件。解压 zip 文件并将 lib 文件夹中的所有 jar 文件复制到您的 Confluent 安装中。

这就是安装 Debezium SQL Server 连接器的方法,它将帮助您将 Apache Kafka 连接到 SQL Server。

步骤 4:启动 Apache Kafka、Zookeeper 和 Connect 服务器

安装 Apache Kafka 的 Debezium SQL Server 连接器后,现在需要启动 Zookeeper、Kafka 和模式注册表。为此,请在不同的终端上执行以下代码行:

/bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
./bin/kafka-server-start  ./etc/kafka/server.properties
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

如果您运行的是 Confluent Platform v5.3 或更高版本,您也可以使用以下代码行:

confluent local start connect

启用 Apache Kafka 服务器后,您必须为 Kafka Connect 创建一个 JSON 配置文件,您可以使用 POST REST API 调用加载该文件。它将指示 Debezium 连接器使用特定端口号和主机 IP 监视您的 Microsoft SQL Server 实例。您可以使用以下代码行进行 API 调用:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "{Private IP Address}",
"database.port": "1433",
"database.user": "testuser",
 "database.password": "password!",
"database.dbname": "TestDB",
"database.server.name": "TestDB",
"table.whitelist": "dbo.topic_name",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.fulfillment" }
}';

现在,您可以使用以下 curl 命令验证 Debezium Connector 是否正常运行:

curl -k https://localhost:8083/connectors/sqlserver-posts-connector/status

这是启动 Apache Kafka、Zookeeper 和 Connect 服务器的方法。

步骤 5:将 Apache Kafka 集成到 SQL Server 以开始提取数据

现在 Apache Kafka 服务器已在您的系统上启动并运行,您可以开始从 Microsoft SQL Server 提取数据。为此,您必须为 Apache Kafka 服务器创建一个消费者,该消费者将从 Test_DB 的所需主题中读取数据。您可以使用以下代码行来创建 Kafka 消费者:

/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Test_DB.dbo.topic_name --from-beginning

现在您将能够看到您的 Kafka 消费者开始以 JSON 格式从您的 Kafka 主题获取数据。

Kafka 消费者正在获取数据。

图片来源:Medium

您还可以通过修改数据库然后检查 Kafka Consumer 输出来验证 Apache Kafka 是否正确使用数据。为此,请在系统上启动 Windows Powershell 并使用以下代码行创建一个新的 SQL 文件“new_insert.sql”:

use TestDB;
INSERT INTO customers(first_name,last_name,email)
VALUES ('Helen','Keyler','keyler.helen@wr.com');

保存文件并使用以下命令执行:

sqlcmd -U testuser -P password! -i {location of new_insert.sql}

现在您将能够看到 Apache Kafka 使用来自 Microsoft SQL Server 表的新记录,如下所示:

Kafka 正在消费新记录。

图片来源:Medium

这就是如何使用 Debezium SQL Server 连接器将 Apache Kafka 连接到 SQL Server,以开始无缝传输数据。

使用 Debezium SQL Server Connector 的限制:

  • 使用 Debezium SQL Server 连接器需要您编写自定义代码并进行 API 调用,因此您必须具备强大的技术知识。
  • Apache Kafka 主题区分大小写,因此在进行 API 调用以连接 Debezium SQL Server 连接器时必须小心,以避免在流式传输数据时出现任何故障/异常。
  • Debezium SQL Server 连接器会监控并流式传输对源表所做的任何数据更改;但是,它无法将任何基于结构的更改复制到源表。因此,每当源表结构发生变化时,它都需要用户手动修改捕获表的结构。
  • 如果该系统中的任何组件发生故障,则存在数据丢失或交易完整性受损的风险。开发团队有责任确保数据完整性并尽量降低任何潜在问题的风险。
  • 尽管 Debezium 提供了多个连接器,但其中一些连接器存在可扩展性问题。即使使用 Postgres,在将预写日志输出转换为 JSON 时,诸如 wal2json 之类的插件也会导致内存不足异常。另一个问题是无法在仍对传入事件开放的情况下对表进行快照。对于大型表,这意味着在快照期间表可能长时间不可用。
  • 某些数据连接器在支持特定数据类型方面存在限制。例如,Debezium 的 Oracle 连接器在处理 BLOB 数据类型方面存在限制,因为它们仍处于孵化状态。因此,不建议在用于生产的基于 Debezium 的模块中使用 BLOB 数据类型。

参考

作者:Jeebiz  创建时间:2024-10-11 18:12
最后编辑:Jeebiz  更新时间:2024-11-01 10:06