Debezium Engine

Debezium 连接器通常通过部署到 Kafka Connect 服务来运行,并配置一个或多个连接器来监控上游数据库,并为其在上游数据库中观察到的所有变化生成数据变更事件。这些数据变更事件被写入 Kafka,在那里它们可以被许多不同的应用程序独立消费。Kafka Connect 提供了出色的容错性和可扩展性,因为它作为分布式服务运行,并确保所有注册和配置的连接器始终处于运行状态。例如,即使集群中的一个 Kafka Connect 端点发生故障,其余的 Kafka Connect 端点也会重新启动之前在该现已终止的端点上运行的任何连接器,从而最大限度地减少停机时间并消除管理活动。

并非所有应用都需要如此高水平的容错和可靠性,它们可能也不愿依赖于外部的 Kafka 代理集群及 Kafka Connect 服务。相反,一些应用更倾向于将 Debezium 连接器直接嵌入应用空间内。它们依然期望获取相同的数据变更事件,但更希望连接器将这些事件直接发送给应用,而非持久化存储于 Kafka 中。

该 debezium-api 模块定义了一个简洁的 API,使应用程序能够轻松配置并运行基于 Debezium Engine 的 Debezium 连接器。

自 2.6.0 版本起,Debezium 提供了两种 DebeziumEngine 接口的实现。较早的 EmbeddedEngine 实现运行单一连接器,仅使用一个任务。该连接器顺序地发出所有记录。

EmbeddedEngine 是 Debezium 3.1.0 最终版及更早版本中的默认实现。从 Debezium 3.2.0 Alpha1 开始,默认实现改为 AsyncEmbeddedEngine,而 EmbeddedEngine 实现不再可用。

自 2.6.0 版本起,新的 AsyncEmbeddedEngine 实现已可供使用。该实现同样仅运行单一连接器,但若连接器支持(目前仅 SQL Server 和 MongoDB 的连接器支持在单个连接器内运行多个任务),它能够在多线程中处理记录,并执行多项任务。由于这两种引擎实现了相同的接口并共享同一 API,后续的代码示例对两种引擎均适用。两种实现均支持相同的配置选项。

然而,新的 AsyncEmbeddedEngine 提供了几项新的配置选项,用于设置和微调并行处理。 有关这些新配置选项的信息,请参阅 异步引擎属性 。 欲深入了解 AsyncEmbeddedEngine 开发的动机及其实现细节,请参阅 异步嵌入式引擎设计文档 。

依赖项

要使用 Debezium Engine 模块,需将 debezium-api 模块添加至应用程序的依赖项中。该 API 的一个开箱即用实现位于 debezium-embedded 模块,也应一并加入依赖。对于 Maven 用户,这意味着需在应用的 POM 文件中添加如下内容:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>

其中,${version.debezium} 是您正在使用的 Debezium 版本,或者是一个包含 Debezium 版本字符串的 Maven 属性值。

同样地,为应用程序将使用的每个 Debezium 连接器添加依赖项。例如,以下内容可以添加到应用程序的 Maven POM 文件中,以便应用程序能够使用 MySQL 连接器:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>

或者对于 MongoDB 连接器:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mongodb</artifactId>
    <version>${version.debezium}</version>
</dependency>

本文档的剩余部分将阐述如何将 MySQL 连接器嵌入您的应用程序中。其他连接器的使用方式与此类似,只是配置、主题和事件方面需针对具体连接器进行调整。

打包您的项目

Debezium 通过 ServiceLoader 使用 SPI 加载实现。该实现可基于连接器类型,也可以是自定义实现。

某些接口拥有多种实现。例如,io.debezium.snapshot.spi.SnapshotLock 在核心中有一个默认实现,同时每个连接器又有其特定的实现。为了确保 Debezium 能够定位到所需的实现,您必须明确配置构建工具以合并 META-INF/services 文件。

例如,如果您正在使用 Maven shade 插件 , 添加 ServicesResourceTransformer 转换器,如下例所示:

...
<configuration>
 <transformers>
    ...
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
    ...
 </transformers>
...
</configuration>

或者,若您采用 Maven Assembly 插件 ,则可以利用 metaInf-services 容器描述符处理器 。

在代码中

您的应用程序需要为每个要运行的连接器实例设置一个嵌入式引擎。io.debezium.engine.DebeziumEngine<R> 类作为一个易于使用的包装器,围绕任何 Debezium 连接器,并完全管理连接器的生命周期。您可以使用其构建器 API 创建 DebeziumEngine 实例,并提供以下内容:

  • 您希望接收消息的格式,例如 JSON、Avro 或作为 Kafka Connect SourceRecord
  • 配置属性(可能从属性文件中加载),用于定义引擎和连接器的运行环境
  • 每当连接器产生数据变更事件时将被调用的方法

以下是一个配置并运行嵌入式 MySQL 连接器 的代码示例:

// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "mysqluser");
props.setProperty("database.password", "mysqlpw");
props.setProperty("database.server.id", "85744");
props.setProperty("topic.prefix", "my-app-connector");
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat");

// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        }).build()
    ) {
    // Run the engine asynchronously ...
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.execute(engine);

    // Do something else or wait for a signal or an event
}
// Engine is stopped when the main code is finished

让我们更详细地查看这段代码,从我们在此重复的前几行开始:

// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");

这将创建一个新的标准 Properties 对象,用于设置引擎所需的多个字段,无论使用哪种连接器。首先是引擎的名称,它将被用于连接器生成的源记录及其内部状态中,因此在您的应用程序中应使用有意义的名称。connector.class 字段定义了扩展自 Kafka Connect org.apache.kafka.connect.source.SourceConnector 抽象类的类名;在此示例中,我们指定了 Debezium 的 MySqlConnector 类。

当 Kafka Connect 连接器运行时,它会从源读取信息,并定期记录 “偏移量”,以确定已处理信息的范围。如果连接器需要重启,它将使用最后记录的偏移量来了解应从源信息的哪个位置继续读取。由于连接器并不知晓或关心偏移量的存储方式,因此由引擎负责提供存储和恢复这些偏移量的方法。我们配置中的接下来几个字段指定了我们的引擎应使用 FileOffsetBackingStore 类,将偏移量存储在本地文件系统的 /path/to/storage/offset.dat 文件中(该文件可以任意命名并存储在任何位置)。此外,尽管连接器在生成每条源记录时都会记录偏移量,但引擎会定期(在我们的案例中,每分钟一次)将这些偏移量刷新到后端存储中。这些字段可以根据您的应用程序需求进行定制。

接下来的几行定义了特定于连接器的字段(详见各连接器文档),在我们的示例中,即 MySqlConnector 连接器:

 /* begin connector properties */
    props.setProperty("database.hostname", "localhost");
    props.setProperty("database.port", "3306");
    props.setProperty("database.user", "mysqluser");
    props.setProperty("database.password", "mysqlpw");
    props.setProperty("database.server.id", "85744");
    props.setProperty("topic.prefix", "my-app-connector");
    props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
    props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat");

在此,我们设定运行 MySQL 数据库服务器的主机名称和端口号,并定义用于连接 MySQL 数据库的用户名和密码。需要注意的是,MySQL 的用户名和密码应对应于已被授予以下 MySQL 权限的数据库用户:

  • SELECT
  • RELOAD
  • SHOW DATABASES
  • REPLICATION SLAVE
  • REPLICATION CLIENT

前三个权限在读取数据库的一致性快照时是必需的。后两个权限允许数据库读取服务器的二进制日志,该日志通常用于 MySQL 的复制。

配置中还包含一个用于 server.id 的数字标识符。由于 MySQL 的 binlog 是 MySQL 复制机制的一部分,为了读取 binlog,MySqlConnector 实例必须加入 MySQL 服务器组,这意味着此服务器 ID 必须在 构成 MySQL 服务器组的所有进程中唯一 ,且为 1 至 2^32-1 之间的任意整数。在我们的代码中,我们将其设置为一个较大且相对随机的值,仅供我们的应用程序使用。

配置中还指定了 MySQL 服务器的逻辑名称。连接器在其生成的每条源记录的 “主题” 字段中包含此逻辑名称,使您的应用程序能够识别这些记录的来源。我们的示例中使用了 “products” 作为服务器名称,大概是因为该数据库包含产品信息。当然,您可以根据应用程序的需求为其命名任何有意义的名称。

MySqlConnector 类运行时,它会读取 MySQL 服务器的 binlog,其中包含对服务器上托管数据库所做的所有数据变更和模式变更。由于所有数据变更都是以变更记录时所属表的模式来结构化的,因此连接器需要跟踪所有模式变更,以便正确解码变更事件。连接器记录了模式信息,以便在连接器重启并从最后记录的偏移量恢复读取时,能够准确了解该偏移量处数据库模式的状态。连接器如何记录数据库模式历史,由我们配置中的最后两个字段定义,即我们的连接器应使用 FileSchemaHistory 类来存储数据库模式历史变更,这些变更保存在本地文件系统的 /path/to/storage/schemahistory.dat 文件中(再次说明,此文件可以任意命名并存储在任何位置)。

最终,不可变的配置通过‘build ()‘方法构建完成。(顺便提一下,与其通过编程方式构建,我们本可以使用‘Configuration.read (...)‘方法之一从属性文件中读取配置。)

现在我们有了配置,便可以创建我们的引擎。以下是相关代码行:

// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        })
        .build()) {
}

所有变更事件都将传递给指定的处理方法,该方法必须与‘java.util.function.Consumer<R>‘函数式接口的签名相匹配,其中‘<R>‘必须与调用‘create ()‘时指定的格式类型一致。需要注意的是,应用程序的处理函数不应抛出任何异常;如果确实抛出异常,引擎将记录该方法抛出的任何异常,并继续处理下一条源记录,但应用程序将无法再次处理引发该异常的特定源记录,这意味着应用程序可能会与数据库状态不一致。

此时,我们已有一个配置完毕且准备运行的现有 DebeziumEngine 对象,但它尚未执行任何操作。DebeziumEngine 设计为由 ExecutorExecutorService 异步执行:

// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);

// Do something else or wait for a signal or an event

您的应用程序可以通过调用其 close () 方法来安全且优雅地停止引擎:

// At some later time ...
engine.close();

或者,由于引擎支持 Closeable 接口,当离开 try 代码块时,它会自动被调用。

引擎的连接器将停止从源系统读取信息,将所有剩余的变更事件转发给您的处理函数,并将最新的偏移量刷新到偏移量存储中。只有在所有这些操作完成后,引擎的 run () 方法才会返回。如果您的应用程序需要等待引擎完全停止后再退出,您可以通过 ExecutorServiceshutdownawaitTermination 方法来实现这一点:

try {
    executor.shutdown();
    while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
        logger.info("Waiting another 5 seconds for the embedded engine to shut down");
    }
}
catch ( InterruptedException e ) {
    Thread.currentThread().interrupt();
}

或者,您可以在创建 DebeziumEngine 时注册 CompletionCallback 作为回调,以便在引擎终止时得到通知。

请记住,当 JVM 关闭时,它只会等待非守护线程。因此,当你在守护线程上运行引擎时,如果应用程序退出,请确保等待引擎进程完成。

您的应用程序应始终正确停止引擎,以确保优雅且完整的关闭,并确保每条源记录仅被发送至应用程序一次。例如,不要依赖于关闭 ExecutorService,因为这会中断正在运行的线程。尽管 DebeziumEngine 确实会在其线程被中断时终止,但引擎可能无法干净地终止,当您的应用程序重新启动时,可能会看到一些在关闭前已处理过的相同源记录。

如前所述,DebeziumEngine 接口有两个实现。这两个实现使用相同的 API,前面的代码示例对两个版本都有效。唯一的例外是 DebeziumEngine 实例的创建。正如引言中也提到的,默认情况下,使用 AsyncEmbeddedEngine 实现。因此,DebeziumEngine.create (Json.class)方法在内部导致使用 AsyncEmbeddedEngine 实例。

输出消息格式

DebeziumEngine#create ()可接受多种不同的参数,这些参数会影响消费者接收消息的格式。允许的值为:

  • Connect.class - 输出值为封装了 Kafka Connect 的 SourceRecord 的变更事件
  • Json.class - 输出值为一对键值,编码为 JSON 字符串
  • JsonByteArray.class - 输出值为键值对,格式化为 JSON 并以 UTF-8 字节数组编码
  • Avro.class - 输出值为键值对,编码为 Avro 序列化记录(更多详情请参阅 Avro 序列化 )
  • CloudEvents.class - 输出值为键值对,编码为 Cloud Events 消息

调用 DebeziumEngine#create ()时,也可以指定头格式。允许的值为:

  • Json.class - 头部值被编码为 JSON 字符串
  • JsonByteArray.class - 标头值以 JSON 格式编码,并转换为 UTF-8 字节数组

在内部,引擎将数据转换任务委托给 Kafka ConnectApicurio 转换器实现,采用最适合执行转换的算法。通过引擎属性可以对转换器进行参数配置,以调整其行为。

JSON 输出格式的一个示例是:

final Properties props = new Properties();
...
props.setProperty("converter.schemas.enable", "false"); // don't include schema in message
...
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
    .using(props)
    .notifying((records, committer) -> {

        for (ChangeEvent<String, String> r : records) {
            System.out.println("Key = '" + r.key() + "' value = '" + r.value() + "'");
            committer.markProcessed(r);
        }
...

其中,ChangeEvent 数据类型为 key/value 对。

消息转换

在消息被传递给处理程序之前,可以通过 Kafka Connect 的 简单消息转换 (SMT) 管道进行处理。 每个 SMT 可以选择原封不动地传递消息、修改消息或过滤掉消息。 该链通过属性 transforms 进行配置。 该属性包含一个逗号分隔的列表,列出了要应用的转换的逻辑名称。 属性进行转换。.type 然后为每个转换定义实现类的名称并进行转换。.* 传递给转换的配置选项。

配置示例如下:

final Properties props = new Properties();
...
props.setProperty("transforms", "filter, router");                                               // (1)
props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");  // (2)
props.setProperty("transforms.router.regex", "(.*)");                                            // (3)
props.setProperty("transforms.router.replacement", "trf$1");                                     // (3)
props.setProperty("transforms.filter.type", "io.debezium.embedded.ExampleFilterTransform");      // (4)
  1. 定义了两种变换 —— 过滤器和路由器
  2. 路由器转换的实现是 org.apache.kafka.connect.transforms.RegexRouter
  3. 路由器转换有两个配置选项 ——regex 和 replacement
  4. 过滤器变换的实现是 io.debezium.embedded.ExampleFilterTransform

消息转换 Predicates

Predicates 可应用于转换,使转换成为可选项。

配置示例如下:

final Properties props = new Properties();
...
props.setProperty("transforms", "filter");                                                 // (1)
props.setProperty("predicates", "headerExists");                                           // (2)
props.setProperty("predicates.headerExists.type", "org.apache.kafka.connect.transforms.predicates.HasHeaderKey"); //(3)
props.setProperty("predicates.headerExists.name", "header.name");                          // (4)
props.setProperty("transforms.filter.type", "io.debezium.embedded.ExampleFilterTransform");// (5)
props.setProperty("transforms.filter.predicate", "headerExists");                          // (6)
props.setProperty("transforms.filter.negate", "true");                                     // (7)
  1. 定义了一个转换操作 —— 过滤器。
  2. 定义了一个predicate ——headerExists
  3. ‘headerExists‘ predicate 的实现是 ‘org.apache.kafka.connect.transforms.predicates.HasHeaderKey‘。
  4. HeaderExists 谓词有一个配置选项 ——name。
  5. 过滤器变换的实现是 io.debezium.embedded.ExampleFilterTransform
  6. 过滤转换需要谓词 headerExists。
  7. 过滤转换期望谓词的值被否定,使得谓词能够判断头部是否不存在。

高级记录消费(Advanced Record Consuming)

在某些应用场景下,例如尝试批量写入记录或针对异步 API 操作时,上述描述的功能接口可能会面临挑战。在此类情况下,使用‘io.debezium.engine.DebeziumEngine.ChangeConsumer<R>‘接口或许更为便捷。

此接口具备单一功能,其签名如下:

/**
  * Handles a batch of records, calling the {@link RecordCommitter#markProcessed(Object)}
  * for each record and {@link RecordCommitter#markBatchFinished()} when this batch is finished.
  * @param records the records to be processed
  * @param committer the committer that indicates to the system that we are finished
  */
 void handleBatch(List<R> records, RecordCommitter<R> committer) throws InterruptedException;

如 Javadoc 中所述,RecordCommitter 对象应在处理每条记录时调用,并在每批次完成后调用一次。RecordCommitter 接口是线程安全的,这使得记录的处理更加灵活。

您可以选择性地覆盖已处理记录的偏移量。具体操作是,首先通过调用‘RecordCommitter#buildOffsets ()‘构建一个新的‘Offsets‘对象,接着使用‘Offsets#set (String key,Object value)‘更新偏移量,然后调用‘RecordCommitter#markProcessed (SourceRecord record,Offsets sourceOffsets)‘,并传入更新后的‘Offsets‘

要使用 ChangeConsumer API,您必须将接口的实现传递给通知 API,如下所示:

class MyChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
  public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
    ...
  }
}
// Create the engine with this configuration ...
DebeziumEngine<RecordChangeEvent<SourceRecord>> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(props)
        .notifying(new MyChangeConsumer())
        .build();

如果使用 JSON 格式(其他格式的等效方法也同样适用),代码将如下所示:

class JsonChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {
  public void handleBatch(List<ChangeEvent<String, String>> records,
    RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException {
    ...
  }
}
// Create the engine with this configuration ...
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(new JsonChangeConsumer())
        .build();

引擎属性

以下配置属性为必需项,除非已有默认值(为文本格式考虑,Java 类的包名已替换为 <>)。

异步引擎属性

数据库模式历史属性

部分连接器还需要配置数据库模式历史记录的额外属性集:

  • MySQL
  • SQL Server
  • Oracle
  • Db2

若未正确配置数据库模式历史记录,连接器将拒绝启动。默认配置要求有可用的 Kafka 集群。对于其他部署场景,提供了一种基于文件的数据库模式历史记录存储实现方案。

故障处理

当引擎执行时,其连接器会主动记录每个源记录中的源偏移量,并且引擎会定期将这些偏移量刷新到持久存储中。当应用程序和引擎正常关闭或崩溃时,在它们重新启动后,引擎及其连接器将从最后记录的偏移量处继续读取源信息。

那么,当您的应用程序在嵌入式引擎运行时发生故障,会发生什么呢?其净效应是,应用程序重启后可能会接收到一些在崩溃前已经处理过的源记录。具体数量取决于引擎向存储刷新偏移量的频率(通过 offset.flush.interval.ms 属性设置)以及特定连接器一次返回的源记录数量。最理想的情况是每次偏移量都被刷新(例如,offset.flush.interval.ms 设置为 0),但即便如此,嵌入式引擎也仅在从连接器接收到每批源记录后才会刷新偏移量。

例如,MySQL 连接器使用 max.batch.size 来指定一个批次中可以出现的最多源记录数。即使将 offset.flush.interval.ms 设置为 0,当应用程序在崩溃后重启时,可能会看到最多 n 个重复记录,其中 n 是批次的大小。如果 offset.flush.interval.ms 属性设置得更高,则应用程序可能会看到最多 n * m 个重复记录,其中 n 是批次的最大大小,m 是在单个 offset flush 间隔期间可能累积的批次数量。(显然,可以配置嵌入式连接器不使用批处理并始终刷新偏移量,这样应用程序就永远不会收到任何重复的源记录。然而,这会显著增加开销并降低连接器的吞吐量。)

关键在于,在使用嵌入式连接器时,应用程序在正常操作期间(包括优雅关机后的重启)将准确无误地接收每条源记录一次,但在崩溃或非正常关机后重启时,需具备容忍接收重复事件的能力。若应用程序需要更严格的一次性处理保证,则应采用完整的 Debezium 平台,该平台即使在崩溃和重启后也能确保记录仅被处理一次。

作者:Jeebiz  创建时间:2025-08-12 16:19
最后编辑:Jeebiz  更新时间:2025-08-12 17:10