基于 Kafka Connect 实现排行数据处理的方案

Kafka Connect 介绍

Kafka Connect 是 Apache Kafka 的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。

image

Source Connector 用于把数据实时地从其它系统读出来发送给 Kafka Connect。Sink Connector 用于 从 Kafka Connect 接收数据并写入 其它系统

image

我们使用 Kafka Connector 读取或写入外部系统、管理数据流以及扩展系统,所有这些都无需开发新代码。Kafka Connect 管理与其他系统连接时的所有常见问题( Schema 管理、容错、并行性、延迟、投递语义等),每个 Connector 只关注如何在目标系统和 Kafka 之间复制数据。

Kafka 连接器通常用来构建 data pipeline,一般有两种使用场景:

  • 开始和结束的端点: 例如,将 Kafka 中的数据导出到 Databend 数据库,或者把 Mysql 数据库中的数据导入 Kafka 中。
  • 数据传输的中间媒介: 例如,为了把海量的日志数据存储到 Elasticsearch 中,可以先把这些日志数据传输到 Kafka 中,然后再从 Kafka 中将这些数据导入到 Elasticsearch 中进行存储。Kafka 连接器可以作为数据管道各个阶段的缓冲区,将消费者程序和生产者程序有效地进行解耦。

Kafka Connect 分为两种:

  • Source Connect: 负责将数据导入 Kafka。
  • Sink Connect: 负责将数据从 Kafka 系统中导出到目标表。

Kafka 目前在 Confluent Hub 上提供了上百种 Connector,比如 Elasticsearch Service Sink Connector, Amazon Sink Connector, HDFS Sink 等,用户可以使用这些 Connector 以 Kafka 为中心构建任意系统之间的数据管道。

基于 Kafka Connect 实现 好友邀请成就排行、ROI 成就排行

一、基于 Kafka Connect 实现 好友邀请成就排行

实现逻辑大致如下:

  • 1、部署实施 Kafka Connect ,实现 MySQL 数据订阅,配置要订阅的数据表、消息 topic 等

  • 2、Data Server 服务引入 kafka-clients 组件

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.8.0</version>
    </dependency>

    3、Data Server 服务中创建对应的消费者,实现以下逻辑流程

    Created with Raphaël 2.2.0
    MySQL
    Kafka Connect 订阅分享数据表
    Data Server 消费数据
    缓存 zset:task:ts:invite 对应用户分享次数 +1 并返回累计邀请次数
    判断用户邀请次数所属等级区间 并保存或更新用户成就表数据
    结束
  • 4、重写读取缓存的接口

二、基于 Kafka Connect 实现 ROI 成就排行

与好友邀请成就相比,ROI 排行更加的复杂一些,涉及到的数据会更多一些

实现逻辑大致如下:

  • 1、部署实施 Kafka Connect ,实现 MySQL 数据订阅,配置要订阅的数据表、消息 topic 等

  • 2、Data Server 服务引入 kafka-clients 组件

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.8.0</version>
    </dependency>

    3、Data Server 服务中创建对应的消费者,实现以下逻辑流程,注意 :ROI = profit_loss/(price*quantity)

    MySQL
    Kafka Connect 订阅 t_options_firm_order 数据表
    Data Server 消费数据
    计算此条数据用户的 ROI
    使用 lua 脚本判断并更新缓存 zset:task:rank:global 对应用户的 roi
    判断此用户所属国家是否在白名单 非白名单国家计入Local
    使用 lua 脚本判断并更新国家排行 zset:task:rank:country缓存 对应用户的roi
    如果在国家白名单 使用 lua 脚本判断并更新 zset:task:rank:bestCountry 缓存
    更新用户对应的 task:rank:user:uid 缓存
    结束
  • 4、重写读取缓存的接口

作者:Jeebiz  创建时间:2024-09-20 12:12
最后编辑:Jeebiz  更新时间:2024-10-10 11:21