Debezium Server
Debezium 提供了一款即用型应用程序,能够将变更事件从源数据库流式传输至消息基础设施,如 Amazon Kinesis
、Google Cloud Pub/Sub
、Apache Pulsar
、Redis(Stream)
或 NATS JetStream
。若需将变更事件流式传输至 Apache Kafka
,建议通过 Kafka Connect 部署 Debezium 连接器。
安装
要安装服务器,请下载并解压服务器分发压缩包:
将创建一个名为 debezium-server 的目录,其内容如下:
debezium-server/
|-- CHANGELOG.md
|-- config
|-- CONTRIBUTE.md
|-- COPYRIGHT.txt
|-- debezium-server-3.2.0.Final-runner.jar
|-- lib
|-- LICENSE-3rd-PARTIES.txt
|-- LICENSE.txt
|-- README.md
`-- run.sh
服务器通过 run.sh
脚本启动,依赖项存储在 lib
目录中,而 config
目录则包含配置文件。
使用 Oracle 连接器时,需将 ORACLE JDBC 驱动(若使用 XStream,还需包括 XStream API 文件)添加至 lib 目录, 详情参见: 获取 Oracle JDBC 驱动及 XStream API 文件
配置
主配置文件位于 config/application.properties
,其中配置了多个部分:
debezium.source
用于源连接器配置;每个 Debezium Server 实例仅运行一个连接器。debezium.sink
用于配置接收系统debezium.format
用于配置输出序列化格式debezium.transforms
用于配置消息转换debezium.predicates
用于配置消息转换断言
一个示例配置文件可能如下所示:
debezium.sink.type=kinesis
debezium.sink.kinesis.region=eu-central-1
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=inventory
在此配置文件的示例中:
- 该接收器配置为在 eu-central-1 区域使用 AWS Kinesis。
- 源连接器已配置为使用默认的 Debezium decoderbufs 插件来连接 PostgreSQL。 如果使用 PostgreSQL 内置的 pgoutput 插件,请设置
debezium.source.plugin.name=pgoutput
- 源连接器设置为从名为 “inventory” 的架构中捕获事件。若需捕获数据库中的所有更改,请删除此行。否则,请更新此行以对应您偏好的架构或表。
- 源偏移量将存储在数据目录下名为 offsets.dat 的文件中。请注意,可能需要创建此目录以避免启动时出现错误。
服务器启动时,会生成如下一系列日志消息:
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-05-15 11:33:12,189 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using 'io.debezium.server.kinesis.KinesisChangeConsumer$$Lambda$119/0x0000000840130c40@f58853c' stream name mapper
2020-05-15 11:33:12,628 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@d1f74b8'
2020-05-15 11:33:12,628 INFO [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated
2020-05-15 11:33:12,754 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-05-15 11:33:12,757 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = false
2020-05-15 11:33:12,763 INFO [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
config.providers = []
connector.client.config.override.policy = None
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = null
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 0
offset.flush.timeout.ms = 5000
offset.storage.file.filename = data/offsets.dat
offset.storage.partitions = null
offset.storage.replication.factor = null
offset.storage.topic =
plugin.path = null
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
rest.host.name = null
rest.port = 8083
ssl.client.auth = none
task.shutdown.graceful.timeout.ms = 5000
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.json.JsonConverter
2020-05-15 11:33:12,763 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,763 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,765 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-05-15 11:33:12,765 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-05-15 11:33:12,767 INFO [io.deb.ser.DebeziumServer] (main) Engine executor started
2020-05-15 11:33:12,773 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat
2020-05-15 11:33:12,835 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting PostgresConnectorTask with configuration:
2020-05-15 11:33:12,837 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) connector.class = io.debezium.connector.postgresql.PostgresConnector
2020-05-15 11:33:12,837 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.flush.interval.ms = 0
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.user = postgres
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.dbname = postgres
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.storage.file.filename = data/offsets.dat
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.hostname = localhost
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.password = ********
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) name = kinesis
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) topic.prefix = tutorial
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.port = 5432
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) schema.include.list = inventory
2020-05-15 11:33:12,908 INFO [io.quarkus] (main) debezium-server 1.2.0-SNAPSHOT (powered by Quarkus 1.4.1.Final) started in 1.198s. Listening on: http://0.0.0.0:8080
2020-05-15 11:33:12,911 INFO [io.quarkus] (main) Profile prod activated.
2020-05-15 11:33:12,911 INFO [io.quarkus] (main) Installed features: [cdi, smallrye-health]
源配置
源配置采用了与特定连接器文档页面上描述的配置属性相同的设置(仅需添加 debezium.source 前缀),同时包含了一些在 Kafka Connect 外部运行时所需的额外特定属性。
格式配置
消息输出格式可以分别针对键和值进行配置。默认情况下,输出采用 JSON 格式,但也可以使用 Kafka Connect 的 Converter 的任意实现。
Predicates 配置
Predicates 可与转换相关联,以使该转换成为可选项。 服务器支持由 Kafka Connect 定义的 过滤与条件 SMTs。 配置中需包含谓词列表、每个谓词的实现类以及各谓词的配置选项。
异步引擎属性
默认情况下,Debezium 服务器采用异步嵌入式引擎(AsyncEmbeddedEngine)作为其处理引擎。您可以为该异步嵌入式引擎配置以下选项:
额外配置
Debezium 服务器构建于 Quarkus 框架之上。Quarkus 所暴露的所有配置选项在 Debezium 服务器中同样可用。最常用的包括:
通过在 config/application.properties 文件中设置 quarkus.log.console.json=false,可以禁用 JSON 日志记录,如 config/application.properties.example 文件所示。
启用消息过滤
Debezium 服务器提供了过滤 SMT(单消息转换)功能。详情请参阅 消息过滤 部分。 然而,出于安全考虑,此功能默认未启用,需在启动 Debezium 服务器时明确开启。 要启用该功能,需将环境变量 ‘ENABLE_DEBEZIUM_SCRIPTING‘ 设置为 true。 这将把 debezium-scripting jar 文件以及 JSR 223 实现(目前包括 Groovy 和 graalvm.js)的 jar 文件添加到服务器的类路径中。 这些 jar 文件包含在 Debezium 服务器发行版的 opt_lib 目录下。
接收器配置
每种接收器类型的配置都是特定的。
通过配置属性‘debezium.sink.type‘来选择接收器。
最后编辑:Jeebiz 更新时间:2025-08-12 20:14