基于 Kafka Connect 实现排行数据处理的方案
基于 Kafka Connect 实现排行数据处理的方案
Kafka Connect 介绍
Kafka Connect 是 Apache Kafka 的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。
Source Connector 用于把数据实时地从其它系统
读出来发送给 Kafka Connect。Sink Connector 用于 从 Kafka Connect 接收数据并写入 其它系统
。
我们使用 Kafka Connector 读取或写入外部系统、管理数据流以及扩展系统,所有这些都无需开发新代码。Kafka Connect 管理与其他系统连接时的所有常见问题( Schema 管理、容错、并行性、延迟、投递语义等),每个 Connector 只关注如何在目标系统和 Kafka 之间复制数据。
Kafka 连接器通常用来构建 data pipeline,一般有两种使用场景:
- 开始和结束的端点: 例如,将 Kafka 中的数据导出到 Databend 数据库,或者把 Mysql 数据库中的数据导入 Kafka 中。
- 数据传输的中间媒介: 例如,为了把海量的日志数据存储到 Elasticsearch 中,可以先把这些日志数据传输到 Kafka 中,然后再从 Kafka 中将这些数据导入到 Elasticsearch 中进行存储。Kafka 连接器可以作为数据管道各个阶段的缓冲区,将消费者程序和生产者程序有效地进行解耦。
Kafka Connect 分为两种:
- Source Connect: 负责将数据导入 Kafka。
- Sink Connect: 负责将数据从 Kafka 系统中导出到目标表。
Kafka 目前在 Confluent Hub 上提供了上百种 Connector,比如 Elasticsearch Service Sink Connector, Amazon Sink Connector, HDFS Sink 等,用户可以使用这些 Connector 以 Kafka 为中心构建任意系统之间的数据管道。
基于 Kafka Connect 实现 好友邀请成就排行、ROI 成就排行
一、基于 Kafka Connect 实现 好友邀请成就排行
实现逻辑大致如下:
1、部署实施 Kafka Connect ,实现 MySQL 数据订阅,配置要订阅的数据表、消息 topic 等
2、Data Server 服务引入 kafka-clients 组件
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.8.0</version> </dependency>
3、Data Server 服务中创建对应的消费者,实现以下逻辑流程
4、重写读取缓存的接口
二、基于 Kafka Connect 实现 ROI 成就排行
与好友邀请成就相比,ROI 排行更加的复杂一些,涉及到的数据会更多一些
实现逻辑大致如下:
1、部署实施 Kafka Connect ,实现 MySQL 数据订阅,配置要订阅的数据表、消息 topic 等
2、Data Server 服务引入 kafka-clients 组件
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.8.0</version> </dependency>
3、Data Server 服务中创建对应的消费者,实现以下逻辑流程,注意 :ROI = profit_loss/(price*quantity)
MySQL Kafka Connect 订阅 t_options_firm_order 数据表 Data Server 消费数据 计算此条数据用户的 ROI 使用 lua 脚本判断并更新缓存 zset:task:rank:global 对应用户的 roi 判断此用户所属国家是否在白名单 非白名单国家计入Local 使用 lua 脚本判断并更新国家排行 zset:task:rank:country缓存 对应用户的roi 如果在国家白名单 使用 lua 脚本判断并更新 zset:task:rank:bestCountry 缓存 更新用户对应的 task:rank:user:uid 缓存 结束4、重写读取缓存的接口
最后编辑:Jeebiz 更新时间:2024-10-10 11:21