Kafka 监控

一、开启 JMX 监控

JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务,实际上,用户能够在任何Java应用程序中使用这些代理和服务实现管理。用人话说,就是对外暴露更多数据,方便某些监控之类的插件来使用

修改 kafka 启动脚本

$ vi /usr/local/kafka_2.13-3.6.0/bin/kafka-server-start.sh
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
fi
...
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.rmi.server.hostname=xxx.xxx.xxx.xxx kafka.Kafka "$@"

重启 Kafka

$ systemctl restart kafka.service

二、下载安装

1、下载并安装 Kafka-Eagle

官方地址:http://www.kafka-eagle.org

所有的主机,下载安装最新的 Kafka-Eagle 版本

$ cd /usr/local/src && wget https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz -O kafka-eagle-bin-3.0.1.tar.gz --no-check-certificate
$ tar -xzf kafka-eagle-bin-3.0.1.tar.gz -C /usr/local
$ cd /usr/local && tar -xzf kafka-eagle-bin-3.0.1/efak-web-3.0.1-bin.tar.gz -C /usr/local
$ rm -rf kafka-eagle-bin-3.0.1/
$ echo 'export KE_HOME=/usr/local/efak-web-3.0.1' >>/etc/profile
$ echo 'export PATH="$PATH:$KE_HOME/bin"' >>/etc/profile
$ source /etc/profile

2、创建 Kafka-Eagle 使用的 MySQL 数据库

Kafka-Eagle 支持 SQLLite、MySQL 作为数据库,这里我选择使用 MySQL, 首先需要创建对应的数据库

数据库:kafka_eagle
账 号:kafka_eagle
密 码:mpjLHANcHk5T6EKX

3、修改配置文件

根据Kafka集群的实际情况配置EFAK,例如zookeeper地址、Kafka集群的版本类型(低版本为zk、高版本为kafka)、开启安全认证的Kafka集群等。

cd ${KE_HOME}/conf
vi system-config.properties

修改后的配置

######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
#efak.zk.cluster.alias=cluster1,cluster2
#cluster1.zk.list=tdn1:2181,tdn2:2181,tdn3:2181
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
efak.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.3.73:2181,192.168.3.74:2181,192.168.3.75:2181

######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

######################################
# broker size online list
######################################
cluster1.efak.broker.size=20

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=16

######################################
# EFAK webui port
######################################
efak.webui.port=8048

######################################
# EFAK enable distributed
######################################
efak.distributed.enable=false
efak.cluster.mode.status=master
efak.worknode.master.host=localhost
efak.worknode.port=8085

######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456

######################################
# kafka offset storage
######################################
#cluster1.efak.offset.storage=kafka
#cluster2.efak.offset.storage=zk
# 使用 zkCli.sh 脚本,ls / 查询
cluster1.efak.offset.storage=config

######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.charts=true
efak.metrics.retain=15

######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10

######################################
# delete kafka topic token
######################################
efak.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=

######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=

######################################
# kafka sqlite jdbc driver address
######################################
#efak.driver=org.sqlite.JDBC
#efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#efak.username=root
#efak.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
######################################
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://192.168.3.91:13306/kafka_eagle?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=kafka_eagle
efak.password=mpjLHANcHk5T6EKX

3、启动 EFAK 服务

3.1、启动 EFAK 服务(独立模式)

$KE_HOME/bin目录下有一个ke.sh脚本文件。执行启动命令如下:

cd ${KE_HOME}/bin
chmod +x ke.sh
ke.sh start

之后,当 EFAK 服务器重新启动或停止时,执行以下命令:

ke.sh restart
ke.sh stop

如下图所示:

3.2、启动EFAK服务器(分布式)

$KE_HOME/bin目录下有一个ke.sh脚本文件。执行启动命令如下:

cd ${KE_HOME}/bin
# sync efak package to other worknode node
# if $KE_HOME is /data/soft/new/efak
for i in `cat $KE_HOME/conf/works`;do scp -r $KE_HOME $i:/data/soft/new;done
# sync efak server .bash_profile environment
for i in `cat $KE_HOME/conf/works`;do scp -r ~/.bash_profile $i:~/;done
chmod +x ke.sh
ke.sh cluster start

之后,当 EFAK 服务器重新启动或停止时,执行以下命令:

ke.sh cluster restart
ke.sh cluster stop

如下图所示:

4、Kafka-Eagle 开机启动

[root@kafka kafka]# vi /lib/systemd/system/kafka-eagle.service

脚本内容:

[Unit]
Description=Kafka Eagle Server
Documentation=https://docs.kafka-eagle.org/2.installation/2.installonlinuxmac
Requires=network.target

[Service]
Type=forking
User=root
Environment="JAVA_HOME=/usr/local/java"
Environment="KE_HOME=/usr/local/efak-web-3.0.1"
ExecStart=/usr/local/efak-web-3.0.1/bin/ke.sh start
ExecStop=/usr/local/efak-web-3.0.1/bin/ke.sh stop
Restart=/usr/local/efak-web-3.0.1/bin/ke.sh restart
StartLimitInterval=60
StartLimitBurst=50
[Install]
WantedBy=multi-user.target

设置随机启动:

[root@kafka kafka]# systemctl daemon-reload
[root@kafka kafka]# systemctl enable kafka-eagle.service
[root@kafka kafka]# systemctl start kafka-eagle.service
[root@kafka kafka]# systemctl restart kafka-eagle.service
[root@kafka kafka]# systemctl status kafka-eagle.service

管理界面首页

作者:Jeebiz  创建时间:2023-12-06 21:38
最后编辑:Jeebiz  更新时间:2024-09-23 10:03