Debezium Server

Debezium 提供了一款即用型应用程序,能够将变更事件从源数据库流式传输至消息基础设施,如 Amazon KinesisGoogle Cloud Pub/SubApache PulsarRedis(Stream)NATS JetStream。若需将变更事件流式传输至 Apache Kafka,建议通过 Kafka Connect 部署 Debezium 连接器。

安装

要安装服务器,请下载并解压服务器分发压缩包:

将创建一个名为 debezium-server 的目录,其内容如下:

debezium-server/
|-- CHANGELOG.md
|-- config
|-- CONTRIBUTE.md
|-- COPYRIGHT.txt
|-- debezium-server-3.2.0.Final-runner.jar
|-- lib
|-- LICENSE-3rd-PARTIES.txt
|-- LICENSE.txt
|-- README.md
`-- run.sh

服务器通过 run.sh 脚本启动,依赖项存储在 lib 目录中,而 config 目录则包含配置文件。

使用 Oracle 连接器时,需将 ORACLE JDBC 驱动(若使用 XStream,还需包括 XStream API 文件)添加至 lib 目录, 详情参见: 获取 Oracle JDBC 驱动及 XStream API 文件

配置

主配置文件位于 config/application.properties,其中配置了多个部分:

  • debezium.source 用于源连接器配置;每个 Debezium Server 实例仅运行一个连接器。
  • debezium.sink 用于配置接收系统
  • debezium.format 用于配置输出序列化格式
  • debezium.transforms 用于配置消息转换
  • debezium.predicates 用于配置消息转换断言

一个示例配置文件可能如下所示:

debezium.sink.type=kinesis
debezium.sink.kinesis.region=eu-central-1
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=inventory

在此配置文件的示例中:

  • 该接收器配置为在 eu-central-1 区域使用 AWS Kinesis。
  • 源连接器已配置为使用默认的 Debezium decoderbufs 插件来连接 PostgreSQL。 如果使用 PostgreSQL 内置的 pgoutput 插件,请设置 debezium.source.plugin.name=pgoutput
  • 源连接器设置为从名为 “inventory” 的架构中捕获事件。若需捕获数据库中的所有更改,请删除此行。否则,请更新此行以对应您偏好的架构或表。
  • 源偏移量将存储在数据目录下名为 offsets.dat 的文件中。请注意,可能需要创建此目录以避免启动时出现错误。

服务器启动时,会生成如下一系列日志消息:

__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-05-15 11:33:12,189 INFO  [io.deb.ser.kin.KinesisChangeConsumer] (main) Using 'io.debezium.server.kinesis.KinesisChangeConsumer$$Lambda$119/0x0000000840130c40@f58853c' stream name mapper
2020-05-15 11:33:12,628 INFO  [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@d1f74b8'
2020-05-15 11:33:12,628 INFO  [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated
2020-05-15 11:33:12,754 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true

2020-05-15 11:33:12,757 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false

2020-05-15 11:33:12,763 INFO  [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
    access.control.allow.methods =
    access.control.allow.origin =
    admin.listeners = null
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = default
    config.providers = []
    connector.client.config.override.policy = None
    header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
    internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
    internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    listeners = null
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    offset.flush.interval.ms = 0
    offset.flush.timeout.ms = 5000
    offset.storage.file.filename = data/offsets.dat
    offset.storage.partitions = null
    offset.storage.replication.factor = null
    offset.storage.topic =
    plugin.path = null
    rest.advertised.host.name = null
    rest.advertised.listener = null
    rest.advertised.port = null
    rest.extension.classes = []
    rest.host.name = null
    rest.port = 8083
    ssl.client.auth = none
    task.shutdown.graceful.timeout.ms = 5000
    topic.tracking.allow.reset = true
    topic.tracking.enable = true
    value.converter = class org.apache.kafka.connect.json.JsonConverter

2020-05-15 11:33:12,763 INFO  [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,763 INFO  [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,765 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true

2020-05-15 11:33:12,765 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true

2020-05-15 11:33:12,767 INFO  [io.deb.ser.DebeziumServer] (main) Engine executor started
2020-05-15 11:33:12,773 INFO  [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat
2020-05-15 11:33:12,835 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting PostgresConnectorTask with configuration:
2020-05-15 11:33:12,837 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    connector.class = io.debezium.connector.postgresql.PostgresConnector
2020-05-15 11:33:12,837 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    offset.flush.interval.ms = 0
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.user = postgres
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.dbname = postgres
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    offset.storage.file.filename = data/offsets.dat
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.hostname = localhost
2020-05-15 11:33:12,838 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.password = ********
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    name = kinesis
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    topic.prefix = tutorial
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    database.port = 5432
2020-05-15 11:33:12,839 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1)    schema.include.list = inventory
2020-05-15 11:33:12,908 INFO  [io.quarkus] (main) debezium-server 1.2.0-SNAPSHOT (powered by Quarkus 1.4.1.Final) started in 1.198s. Listening on: http://0.0.0.0:8080
2020-05-15 11:33:12,911 INFO  [io.quarkus] (main) Profile prod activated.
2020-05-15 11:33:12,911 INFO  [io.quarkus] (main) Installed features: [cdi, smallrye-health]

源配置

源配置采用了与特定连接器文档页面上描述的配置属性相同的设置(仅需添加 debezium.source 前缀),同时包含了一些在 Kafka Connect 外部运行时所需的额外特定属性。

格式配置

消息输出格式可以分别针对键和值进行配置。默认情况下,输出采用 JSON 格式,但也可以使用 Kafka Connect 的 Converter 的任意实现。

Predicates 配置

Predicates 可与转换相关联,以使该转换成为可选项。 服务器支持由 Kafka Connect 定义的 过滤与条件 SMTs。 配置中需包含谓词列表、每个谓词的实现类以及各谓词的配置选项。

异步引擎属性

默认情况下,Debezium 服务器采用异步嵌入式引擎(AsyncEmbeddedEngine)作为其处理引擎。您可以为该异步嵌入式引擎配置以下选项:

额外配置

Debezium 服务器构建于 Quarkus 框架之上。Quarkus 所暴露的所有配置选项在 Debezium 服务器中同样可用。最常用的包括:

通过在 config/application.properties 文件中设置 quarkus.log.console.json=false,可以禁用 JSON 日志记录,如 config/application.properties.example 文件所示。

启用消息过滤

Debezium 服务器提供了过滤 SMT(单消息转换)功能。详情请参阅 消息过滤 部分。 然而,出于安全考虑,此功能默认未启用,需在启动 Debezium 服务器时明确开启。 要启用该功能,需将环境变量 ‘ENABLE_DEBEZIUM_SCRIPTING‘ 设置为 true。 这将把 debezium-scripting jar 文件以及 JSR 223 实现(目前包括 Groovy 和 graalvm.js)的 jar 文件添加到服务器的类路径中。 这些 jar 文件包含在 Debezium 服务器发行版的 opt_lib 目录下。

接收器配置

每种接收器类型的配置都是特定的。

通过配置属性‘debezium.sink.type‘来选择接收器。

作者:Jeebiz  创建时间:2025-08-12 17:02
最后编辑:Jeebiz  更新时间:2025-08-12 20:14