Kafka 服务验证
在上个文章中,介绍了简单的Kafka服务安装,下面我们对安装好的服务进行环境验证,检查是否可正常使用!
1、创建一个主题来存储您的事件
Kafka 是一个分布式事件流平台,可让您跨多台计算机 读取、写入、存储和处理 事件(在文档中也称为记录或 消息)。
示例事件包括支付交易、来自手机的地理位置更新、运输订单、来自物联网设备或医疗设备的传感器测量等等。这些事件被组织并存储在 主题中。非常简单,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。
因此,在您编写第一个事件之前,您必须创建一个主题。打开另一个终端会话并运行:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Kafka 的所有命令行工具都有额外的选项:运行 kafka-topics.sh
不带任何参数的命令以显示使用信息。例如,它还可以向您显示 新主题 的分区计数等详细信息:
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
2、将一些事件写入主题
Kafka 客户端通过网络与 Kafka 代理通信以写入(或读取)事件。一旦收到,代理将以持久和容错的方式存储事件,只要您需要——甚至永远。
运行控制台生产者客户端以将一些事件写入您的主题。默认情况下,您输入的每一行都会导致一个单独的事件被写入主题。
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
您可以使用Ctrl-C
随时停止生产者客户端。
3、读取事件
打开另一个终端会话并运行控制台消费者客户端以读取您刚刚创建的事件:
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
您可以使用Ctrl-C
随时停止消费者客户端。
随意尝试:例如,切换回您的生产者终端(上一步)以编写其他事件,并查看这些事件如何立即显示在您的消费者终端中。
因为事件持久存储在 Kafka 中,所以它们可以被任意多次读取,并且可以被任意多的消费者读取。您可以通过打开另一个终端会话并再次重新运行之前的命令来轻松验证这一点。
4、使用 Kafka Connect 将数据导入/导出为事件流
您可能在关系数据库或传统消息传递系统等现有系统中拥有大量数据,以及许多已经在使用这些系统的应用程序。 Kafka Connect允许您不断地将数据从外部系统提取到 Kafka 中,反之亦然。它是一个运行 连接器的可扩展工具,连接器实现了与外部系统交互的自定义逻辑。因此很容易将现有系统与 Kafka 集成。为了使这个过程更容易,有数百个这样的连接器随时可用。
在本快速入门中,我们将了解如何使用简单的连接器运行 Kafka Connect,这些连接器将数据从文件导入到 Kafka 主题并将数据从 Kafka 主题导出到文件。
首先,确保添加connect-file-3.3.1.jar
到 plugin.path
Connect worker 配置中的属性。出于本快速入门的目的,我们将使用相对路径并将连接器的包视为超级 jar,当从安装目录运行快速入门命令时,它会起作用。但是,值得注意的是,对于生产部署,使用绝对路径始终是可取的。有关如何设置此配置的详细说明, 请参阅plugin.path 。
编辑 config/connect-standalone.properties
文件,添加或更改 plugin.path
与以下匹配的配置属性,然后保存文件:
> echo "plugin.path=libs/connect-file-3.3.1.jar"
然后,首先创建一些种子数据进行测试:
> echo -e "foo\nbar" > test.txt
或者在 Windows 上:
> echo foo> test.txt
> echo bar>> test.txt
接下来,我们将启动两个以独立模式运行的连接器,这意味着它们在单个本地专用进程中运行。我们提供三个配置文件作为参数。第一个始终是 Kafka Connect 进程的配置,包含常见配置,例如要连接的 Kafka 代理和数据的序列化格式。其余配置文件分别指定要创建的连接器。这些文件包括唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
这些示例配置文件包含在 Kafka 中,使用您之前启动的默认本地集群配置并创建两个连接器:第一个是源连接器,它从输入文件中读取行并将每个行生成到 Kafka 主题,第二个是接收器连接器它从 Kafka 主题读取消息并在输出文件中将每条消息生成为一行。
在启动过程中,您会看到许多日志消息,包括一些表明正在实例化连接器的消息。一旦 Kafka Connect 进程启动,源连接器应该开始从主题读取行
test.txt
并将它们生成到主题connect-test
,而接收器连接器应该开始从主题读取消息connect-test
并将它们写入文件test.sink.txt
。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传送:
> more test.sink.txt
foo
bar
请注意,数据存储在 Kafka 主题
connect-test
中,因此我们还可以运行控制台消费者来查看主题中的数据(或使用自定义消费者代码来处理它):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
连接器继续处理数据,因此我们可以将数据添加到文件并查看它在管道中的移动:
> echo Another line>> test.txt
您应该看到该行出现在控制台消费者输出和接收器文件中。
5、使用 Kafka Streams 处理您的事件
一旦您的数据作为事件存储在 Kafka 中,您就可以使用适用于 Java/Scala 的 Kafka Streams客户端库处理数据。它允许您实施任务关键型实时应用程序和微服务,其中输入和/或输出数据存储在 Kafka 主题中。Kafka Streams 将在客户端编写和部署标准 Java 和 Scala 应用程序的简单性与 Kafka 的服务器端集群技术的优势相结合,使这些应用程序具有高度可扩展性、弹性、容错性和分布式。该库支持恰好一次处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。
为了给你一个初步的体验,下面是如何实现流行的WordCount算法:
KStream<String, String> textLines = builder.stream("quickstart-events");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
Kafka Streams 示例 和应用程序开发教程演示 了 如何从头到尾编写和运行此类流应用程序。
最后编辑:Jeebiz 更新时间:2024-11-01 10:06