Strimzi 介绍
Strimzi简化了在Kubernetes集群中运行Apache Kafka的过程。
本指南旨在作为建立对Strimzi理解的一个起点。本指南介绍了Kafka背后的一些关键概念,它是Strimzi的核心,简要解释了Kafka组件的目的。配置要点被概述,包括保护和监控Kafka的选项。Strimzi的发行版提供了部署和管理Kafka集群的文件,以及用于配置和监控部署的示例文件。
描述了一个典型的Kafka部署,以及用于部署和管理Kafka的工具。
使用安装文件部署 Strimzi
创建一个名为的命名空间kafka
kubectl create namespace kafka
应用Strimzi安装文件,包括ClusterRoles、ClusterRoleBindings和一些自定义资源定义(CRD)。CRD定义了用于自定义资源(CRs,如Kafka、KafkaTopic等)的模式,你将用它来管理Kafka集群、主题和用户。
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
从strimzi.io下载的ClusterRoles和ClusterRoleBindings的YAML文件包含一个默认的myproject命名空间。查询参数namespace=kafka将这些文件更新为使用kafka。通过在运行kubectl
create时指定-n kafka,没有命名空间参考的定义和配置也会被安装到kafka命名空间中。如果命名空间之间不匹配,那么Strimzi集群操作员将没有必要的权限来执行其操作。
观察Strimzi集群运营商的部署。
kubectl get pod -n kafka --watch
你也可以查看运营商的日志。
kubectl logs deployment/strimzi-cluster-operator -n kafka -f
一旦运营商运行,它将观察新的自定义资源,并创建与这些自定义资源对应的Kafka集群、主题或用户。
创建 Apache Kafka 集群
创建一个新的Kafka自定义资源,以获得一个小型的持久性Apache Kafka集群,其中一个节点用于Apache Zookeeper和Apache Kafka。
# Apply the `Kafka` Cluster CR file
kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka
在Kubernetes启动所需的pod、服务等时等待。
kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n kafka
如果你在一个缓慢的网络上下载镜像,上述命令可能会超时。如果发生这种情况,你可以再次运行它。
测试消息发送和接收
随着集群的运行,运行一个简单的生产者来发送消息到Kafka主题(主题是自动创建的)。
kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic
而要在不同的终端接收它们,请运行。
kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
享受你的Apache Kafka集群。
部署 connect
使用以下Yaml创建服务,注意替换变量BOOTSTRAP_SERVERS
的值
kind: Deployment
apiVersion: apps/v1
metadata:
name: mysql-connect-cluster
namespace: kafka
labels:
app: mysql-connect-cluster
annotations:
deployment.kubernetes.io/revision: '1'
kubesphere.io/alias-name: kafka MySQL 连接器
spec:
replicas: 1
selector:
matchLabels:
app: mysql-connect-cluster
template:
metadata:
creationTimestamp: null
labels:
app: mysql-connect-cluster
spec:
volumes:
- name: host-time
hostPath:
path: /etc/localtime
type: ''
containers:
- name: connect
image: 'quay.io/debezium/connect:1.9'
ports:
- name: http-8083
containerPort: 8083
protocol: TCP
env:
- name: BOOTSTRAP_SERVERS
value: 'my-cluster-kafka-brokers.kafka:9092'
- name: REST_HOST_NAME
value: 0.0.0.0
- name: GROUP_ID
value: '1'
- name: CONFIG_STORAGE_TOPIC
value: my_connect_configs
- name: OFFSET_STORAGE_TOPIC
value: my_connect_offsets
- name: STATUS_STORAGE_TOPIC
value: my_connect_statuses
resources: { }
volumeMounts:
- name: host-time
readOnly: true
mountPath: /etc/localtime
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
imagePullPolicy: IfNotPresent
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
serviceAccountName: default
serviceAccount: default
securityContext: { }
schedulerName: default-scheduler
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 25%
maxSurge: 25%
revisionHistoryLimit: 10
progressDeadlineSeconds: 600
部署 kafka-ui
使用以下Yaml创建服务,注意替换变量KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
的值
kind: Deployment
apiVersion: apps/v1
metadata:
name: kafka-ui
namespace: kafka
labels:
app: kafka-ui
annotations:
deployment.kubernetes.io/revision: '3'
kubesphere.io/alias-name: kafka管理页面
spec:
replicas: 1
selector:
matchLabels:
app: kafka-ui
template:
metadata:
creationTimestamp: null
labels:
app: kafka-ui
spec:
volumes:
- name: host-time
hostPath:
path: /etc/localtime
type: ''
containers:
- name: container-tw7bam
image: 'provectuslabs/kafka-ui:latest'
ports:
- name: http-8080
containerPort: 8080
protocol: TCP
env:
- name: KAFKA_CLUSTERS_0_NAME
value: local
- name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
value: '10.233.18.243:9092'
resources: { }
volumeMounts:
- name: host-time
readOnly: true
mountPath: /etc/localtime
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
imagePullPolicy: IfNotPresent
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
serviceAccountName: default
serviceAccount: default
securityContext: { }
schedulerName: default-scheduler
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 25%
maxSurge: 25%
revisionHistoryLimit: 10
progressDeadlineSeconds: 600
进入connect,使用curl创建解析任务
替换 database.hostname
、database.port
、database.user
、database.password
、database.server.id
、database.exclude.list
的值
- 创建任务
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "data-warehouse-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "192.168.3.2",
"database.port": "3306",
"database.user": "x",
"database.password": "",
"database.server.id": "1",
"database.server.name": "dwcdc.test",
"database.exclude.list": "sys,information_schema,performance_schema,datax_web,dolphinscheduler",
"database.history.kafka.bootstrap.servers": "my-cluster-kafka-brokers.kafka:9092",
"database.history.kafka.topic": "dbhistory.data-warehouse",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}'
- 创建业务接口订阅任务
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "dw-admin-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "10.16.4.3",
"database.port": "4408",
"database.user": "",
"database.password": "",
"database.server.id": "3",
"database.server.name": "dwcdc.prod",
"database.include.list": "dw_admin",
"database.history.kafka.bootstrap.servers": "my-cluster-kafka-brokers.kafka:9092",
"database.history.kafka.topic": "dbhistory.dw_admin",
"table.include.list": "test.sys_authz_user,test.t_role",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}'
- 查看任务
curl -i -X GET -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/
- 删除任务
curl -i -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/data-warehouse-connector
等待任务指向,并通过 kafka-ui
查看队列创建情况
最后编辑:Jeebiz 更新时间:2024-08-02 14:28