Strimzi 介绍

Strimzi 简化了在Kubernetes集群中运行Apache Kafka的过程。

本指南旨在作为建立对Strimzi理解的一个起点。本指南介绍了Kafka背后的一些关键概念,它是Strimzi的核心,简要解释了Kafka组件的目的。配置要点被概述,包括保护和监控Kafka的选项。Strimzi的发行版提供了部署和管理Kafka集群的文件,以及用于配置和监控部署的示例文件。

描述了一个典型的Kafka部署,以及用于部署和管理Kafka的工具。

Kafka组件交互图

使用安装文件部署 Strimzi

应用 Strimzi 安装文件,包括 ClusterRoles、ClusterRoleBindings 和 一些自定义资源定义(CRD)。CRD定义了用于自定义资源(CRs,如Kafka、KafkaTopic等)的模式,你将用它来管理Kafka集群、主题和用户。

kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

以上命令的作用是,从strimzi.io 下载的 ClusterRoles 和 ClusterRoleBindings 的YAML文件包含一个默认的 myproject 命名空间。查询参数 namespace=kafka 将这些文件更新为使用kafka。通过在运行kubectl
create 时指定-n kafka,没有命名空间参考的定义和配置也会被安装到kafka命名空间中。如果命名空间之间不匹配,那么Strimzi集群操作员将没有必要的权限来执行其操作。

观察Strimzi集群运营商的部署。

kubectl get pod -n kafka --watch

你也可以查看运营商的日志。

kubectl logs deployment/strimzi-cluster-operator -n kafka -f

一旦运营商运行,它将观察新的自定义资源,并创建与这些自定义资源对应的Kafka集群、主题或用户。

创建 Apache Kafka 集群

创建一个新的Kafka 自定义资源,以获得一个小型的持久性 Apache Kafka 集群,其中一个节点用于Apache Zookeeper 和 Apache Kafka。

# Apply the `Kafka` Cluster CR file
kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka

在Kubernetes启动所需的pod、服务等时等待。

kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n kafka

如果你在一个缓慢的网络上下载镜像,上述命令可能会超时。如果发生这种情况,你可以再次运行它。

测试消息发送和接收

随着集群的运行,运行一个简单的生产者来发送消息到Kafka主题(主题是自动创建的)。

kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic

而要在不同的终端接收它们,请运行。

kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning

享受你的Apache Kafka集群。

部署 Connect

使用以下Yaml创建服务,注意替换变量BOOTSTRAP_SERVERS 的值


kind: Deployment
apiVersion: apps/v1
metadata:
  name: mysql-connect-cluster
  namespace: kafka
  labels:
    app: mysql-connect-cluster
  annotations:
    deployment.kubernetes.io/revision: '1'
    kubesphere.io/alias-name: Kafka MySQL 连接器
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mysql-connect-cluster
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: mysql-connect-cluster
    spec:
      volumes:
        - name: host-time
          hostPath:
            path: /etc/localtime
            type: ''
      containers:
        - name: connect
          image: 'quay.io/debezium/connect:1.9'
          ports:
            - name: http-8083
              containerPort: 8083
              protocol: TCP
          env:
            - name: BOOTSTRAP_SERVERS
              value: 'my-cluster-kafka-brokers.kafka:9092'
            - name: REST_HOST_NAME
              value: 0.0.0.0
            - name: GROUP_ID
              value: '1'
            - name: CONFIG_STORAGE_TOPIC
              value: my_connect_configs
            - name: OFFSET_STORAGE_TOPIC
              value: my_connect_offsets
            - name: STATUS_STORAGE_TOPIC
              value: my_connect_statuses
          resources: { }
          volumeMounts:
            - name: host-time
              readOnly: true
              mountPath: /etc/localtime
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          imagePullPolicy: IfNotPresent
      restartPolicy: Always
      terminationGracePeriodSeconds: 30
      dnsPolicy: ClusterFirst
      serviceAccountName: default
      serviceAccount: default
      securityContext: { }
      schedulerName: default-scheduler
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 25%
      maxSurge: 25%
  revisionHistoryLimit: 10
  progressDeadlineSeconds: 600

部署 kafka-ui

使用以下Yaml创建服务,注意替换变量KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS 的值

kind: Deployment
apiVersion: apps/v1
metadata:
  name: kafka-ui
  namespace: kafka
  labels:
    app: kafka-ui
  annotations:
    deployment.kubernetes.io/revision: '3'
    kubesphere.io/alias-name: kafka管理页面
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-ui
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: kafka-ui
    spec:
      volumes:
        - name: host-time
          hostPath:
            path: /etc/localtime
            type: ''
      containers:
        - name: container-tw7bam
          image: 'provectuslabs/kafka-ui:latest'
          ports:
            - name: http-8080
              containerPort: 8080
              protocol: TCP
          env:
            - name: KAFKA_CLUSTERS_0_NAME
              value: local
            - name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
              value: '10.233.18.243:9092'
          resources: { }
          volumeMounts:
            - name: host-time
              readOnly: true
              mountPath: /etc/localtime
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          imagePullPolicy: IfNotPresent
      restartPolicy: Always
      terminationGracePeriodSeconds: 30
      dnsPolicy: ClusterFirst
      serviceAccountName: default
      serviceAccount: default
      securityContext: { }
      schedulerName: default-scheduler
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 25%
      maxSurge: 25%
  revisionHistoryLimit: 10
  progressDeadlineSeconds: 600

进入connect,使用curl创建解析任务

替换 database.hostnamedatabase.portdatabase.userdatabase.passworddatabase.server.id
database.exclude.list 的值

  • 创建任务
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
    "name": "data-warehouse-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "192.168.3.40",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "NS^qB&o*SVHvu5",
        "database.server.id": "1",
        "database.server.name": "dwcdc.test",
        "database.exclude.list": "sys,information_schema,performance_schema,datax_web,dolphinscheduler",
        "database.history.kafka.bootstrap.servers": "my-cluster-kafka-brokers.kafka:9092",
        "database.history.kafka.topic": "dbhistory.data-warehouse",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false"
    }
}'
  • 创建业务接口订阅任务
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
    "name": "dw-admin-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "10.16.8.3",
        "database.port": "4408",
        "database.user": "xLf_01zzzs",
        "database.password": "HzgsZZ+2022-",
        "database.server.id": "3",
        "database.server.name": "dwcdc.prod",
        "database.include.list": "dw_admin",
        "database.history.kafka.bootstrap.servers": "my-cluster-kafka-brokers.kafka:9092",
        "database.history.kafka.topic": "dbhistory.dw_admin",
        "table.include.list": "dw_admin.sys_authz_user,dw_admin.sj_zxxx_xxjbsjlb,dw_admin.sj_zxjz_jzgjbsjzlb,dw_admin.sj_zxxs_xsjbsjzlb,dw_admin.t_role",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false"
    }
}'
  • 查看任务
curl -i -X GET -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/
  • 删除任务
curl -i -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/data-warehouse-connector

等待任务指向,并通过 kafka-ui 查看队列创建情况

作者:Jeebiz  创建时间:2023-12-06 14:11
最后编辑:Jeebiz  更新时间:2024-09-23 10:03