Kafka Connect 服务安装

请确保主机已经安装 Apache Kafka,如果用户还没有安装,可以参考前面的安装说明进行安装。

1、创建文件目录

mkdir -p /home/kafka/plugins;

2、修改配置文件

Kafka Connect 目前支持两种执行模式:Standalone 模式 和 Distributed 模式。

Standalone 模式

在 Standalone 模式下,所有的工作都在单个进程中完成。这种模式更容易配置以及入门,但不能充分利用 Kafka Connect 的某些重要功能,例如,容错。

connect-standalone.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
# Kafka 服务的地址和端口
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
# 指定Kafka中数据格式以及如何将其转换为Connect数据
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# 用于存储 Offset 数据的文件
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
# Offset 数据刷新周期
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Kafka Connector 插件存放目录
#plugin.path=

拷贝一个新的配置文件:connect-standalone-prod.properties

$ cat >/usr/local/kafka_2.13-3.6.0/config/connect-standalone-prod.properties <<'EOF'
# Kafka 服务的地址和端口
bootstrap.servers=localhost:9092
# 指定Kafka中数据格式以及如何将其转换为Connect数据
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 用于存储 Offset 数据的文件
offset.storage.file.filename=/home/kafka/connect.offsets
# Offset 数据刷新周期
offset.flush.interval.ms=10000
# Kafka Connector 插件存放目录
plugin.path=/home/kafka/plugins
EOF

我们可以使用如下命令启动 Standalone 进程:

cd ${KAFKA_HOME}/bin
bin/connect-standalone.sh config/connect-standalone-prod.properties connector1.properties [connector2.properties ...]

第一个参数 config/connect-standalone-prod.properties 是 worker 的配置。这其中包括 Kafka 连接参数、序列化格式以及提交 Offset 的频率等配置:

后面的配置是指定要启动的 Connector 的参数。上述提供的默认配置适用于使用 config/server.properties 提供的默认配置运行的本地集群。如果使用不同配置或者在生产部署,那就需要对默认配置做调整。但无论怎样,所有 Worker(独立的和分布式的)都需要一些配置:

  • bootstrap.servers: 该参数列出了将要与 Connect 协同工作的 broker 服务器,Connector 将会向这些 broker 写入数据或者从它们那里读取数据。你不需要指定集群的所有 broker,但是建议至少指定 3 个。
  • key.converter 和 value.converter: 分别指定了消息键和消息值所使用的的转换器,用于在 Kafka Connect 格式和写入 Kafka 的序列化格式之间进行转换。这控制了写入 Kafka 或从 Kafka 读取的消息中键和值的格式。由于这与 Connector 没有任何关系,因此任何 Connector 可以与任何序列化格式一起使用。默认使用 Kafka 提供的 JSONConverter。有些转换器还包含了特定的配置参数。例如,通过将 key.converter.schemas.enable 设置成 true 或者 false 来指定 JSON 消息是否包含 schema。
  • offset.storage.file.filename: 用于存储 Offset 数据的文件。

这些配置参数可以让 Kafka Connect 的生产者和消费者访问配置、Offset 和 状态 Topic。配置 Kafka Source 任务使用的生产者和 Kafka Sink 任务使用的消费者,可以使用相同的参数,但需要分别加上 ‘producer.’ 和 ‘consumer.’ 前缀。bootstrap.servers 是唯一不需要添加前缀的 Kafka 客户端参数。

Distributed 模式

分布式模式可以自动平衡工作负载,并可以动态扩展(或缩减)以及提供容错。分布式模式的执行与 Standalone 模式非常相似:

connect-distributed.properties

##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
# Kafka 服务的地址和端口
bootstrap.servers=localhost:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
# 群集的唯一名称,用于形成连接群集组,不能与消费者群组ID相同
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
# 指定Kafka中数据格式以及如何将其转换为Connect数据
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
# 用于存储 Offset 的 Topic,默认为 connect-offsets
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
# 用于存储 Connector 和任务配置的 Topic,默认为 connect-configs
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
# 用于存储状态的 Topic,默认为 connect-status
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5

# Flush much faster than normal, which is useful for testing/debugging
# Offset 数据刷新周期
offset.flush.interval.ms=10000

# List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
# Specify hostname as 0.0.0.0 to bind to all interfaces.
# Leave hostname empty to bind to default interface.
# Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"
#listeners=HTTP://:8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
# If not set, it uses the value for "listeners" if configured.
#rest.advertised.host.name=
#rest.advertised.port=
#rest.advertised.listener=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=

拷贝一个新的配置文件:connect-standalone-prod.properties

$ cat >/usr/local/kafka_2.13-3.6.0/config/connect-distributed-prod.properties <<'EOF'
# Kafka 服务的地址和端口
bootstrap.servers=192.168.3.73:9092,192.168.3.74:9092,192.168.3.75:9092
# 群集的唯一名称,用于形成连接群集组,不能与消费者群组ID相同
group.id=connect-cluster
# 指定Kafka中数据格式以及如何将其转换为Connect数据
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 用于存储 Offset 的 Topic,默认为 connect-offsets
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
#offset.storage.partitions=25
# 用于存储 Connector 和任务配置的 Topic
config.storage.topic=connect-configs
config.storage.replication.factor=3
# 用于存储状态的 Topic,默认为 connect-status
status.storage.topic=connect-status
status.storage.replication.factor=3
#status.storage.partitions=5
# Offset 数据刷新周期
offset.flush.interval.ms=10000
# Kafka Connector 插件存放目录
plugin.path=/home/kafka/plugins
EOF

我们可以使用如下命令启动 Distributed 进程:

cd ${KAFKA_HOME}/bin
bin/connect-distributed.sh config/connect-distributed-prod.properties connector1.properties [connector2.properties ...]

不同之处在于启动的脚本以及配置参数。在分布式模式下,使用 connect-distributed.sh 来代替 connect-standalone.sh。第一个 worker 配置参数使用的是 config/connect-distributed-prod.properties 配置文件。

Kafka Connect 将 Offset、配置以及任务状态存储在 Kafka Topic 中。建议手动创建 Offset、配置和状态的 Topic,以达到所需的分区数和复制因子。如果在启动 Kafka Connect 时尚未创建 Topic,将使用默认分区数和复制因子来自动创建 Topic,这可能不适合我们的应用。在启动集群之前配置如下参数至关重要:

  • group.id: Connect 集群的唯一名称,默认为 connect-cluster。具有相同 group id 的 worker 属于同一个 Connect 集群。需要注意的是这不能与消费者组 ID 冲突。
  • config.storage.topic: 用于存储 Connector 和任务配置的 Topic,默认为 connect-configs。需要注意的是这是一个只有一个分区、高度复制、压缩的 Topic。我们可能需要手动创建 Topic 以确保配置的正确,因为自动创建的 Topic 可能有多个分区或自动配置为删除而不是压缩。
  • offset.storage.topic: 用于存储 Offset 的 Topic,默认为 connect-offsets。这个 Topic 可以有多个分区。
  • status.storage.topic: 用于存储状态的 Topic,默认为 connect-status。这个 Topic 可以有多个分区。

需要注意的是在分布式模式下需要通过 rest api 来管理 Connector。

以下是当前支持的REST API端点:

接口 说明
GET /connectors 返回活动连接器的列表
POST /connectors 创建一个新的连接器;请求主体应为JSON对象,其中包含字符串name字段和config带有连接器配置参数的对象字段
GET /connectors/{name} 获取有关特定连接器的信息
GET /connectors/{name}/config 获取特定连接器的配置参数
PUT /connectors/{name}/config 更新特定连接器的配置参数
GET /connectors/{name}/status 获取连接器的当前状态,包括连接器正在运行,发生故障,已暂停等情况
GET /connectors/{name}/tasks 获取连接器当前正在运行的任务列表
GET /connectors/{name}/tasks/{taskid}/status 获取任务的当前状态,包括它是否正在运行,失败
PUT /connectors/{name}/pause 暂停连接器及其任务,这将停止消息处理,直到恢复连接器为止
PUT /connectors/{name}/resume 恢复已暂停的连接器(如果连接器未暂停,则不执行任何操作)
POST /connectors/{name}/restart 重新启动连接器(通常是因为它失败了)
POST /connectors/{name}/tasks/{taskId}/restart 重新启动单个任务(通常是因为它失败了)
DELETE /connectors/{name} 删除连接器,暂停所有任务并删除其配置
GET /connectors/{name}/topics 获取自创建连接器以来或自发出重置其活动主题集的请求以来特定连接器正在使用的主题集
PUT /connectors/{name}/topics/reset 发送请求以清空连接器的活动主题集

4、开机启动

[root@kafka kafka]# vi /lib/systemd/system/kafka-connect.service

脚本内容:

[Unit]
Description=Apache Kafka Connect
Documentation=https://kafka.apache.org/documentation/#connect
Requires=network.target
After=zookeeper.service

[Service]
Type=forking
User=kafka
Group=kafka
Environment="JAVA_HOME=/usr/local/java"
ExecStart=/usr/local/kafka_2.13-3.6.0/bin/connect-distributed.sh -daemon /usr/local/kafka_2.13-3.6.0/config/connect-distributed-prod.properties
ExecReload=/bin/kill -s HUP $MAINPID
ExecStop=/bin/kill -s QUIT $MAINPID
Restart=on-failure
LimitNOFILE=65536
TimeoutStopSec=20
StartLimitInterval=60
StartLimitBurst=3000
[Install]
WantedBy=multi-user.target

设置随机启动:

[root@kafka kafka]# systemctl daemon-reload
[root@kafka kafka]# systemctl enable kafka-connect.service
[root@kafka kafka]# systemctl start kafka-connect.service
[root@kafka kafka]# systemctl status kafka-connect.service
[root@kafka kafka]# systemctl stop kafka-connect.service
[root@kafka kafka]# systemctl restart kafka-connect.service
作者:Jeebiz  创建时间:2023-12-07 20:08
最后编辑:Jeebiz  更新时间:2024-09-23 10:03