什么是 InfluxDB?

InfluxDB 是一个用于时间序列数据的数据库,适用于指标、分析、物联网监控等。它速度快、可用且可扩展。更多详情,请访问https://www.influxdata.com/
初始设置

在我们开始之前,我们需要做几件事:

  • 设置 Docker (安装 Docker)
  • 安装 Gradle

建筑学

这是一个 javalin 微服务应用程序,具有 2 个端点:

  • POST /upload:接收事件并将其插入数据库。如果事件已过期,则将其丢弃。
  • GET /statistics:返回事件的摘要(计数、总和、最小值和最大值)

执行

该应用程序由 2 个简单的 kotlin 文件组成:

realtimestatistics.Main:创建端点和应用程序设置

data class Statistic(val count: Int = 0, val timestamp: Long = Date().time)

data class Total(val count: Double, val sum: Double, val min: Double, val max: Double)

val influxHost = System.getenv().getOrDefault("influx.host", "influxdb")!!

val influxDB: InfluxDB by lazy { InfluxDBFactory.connect("http://$influxHost:8086", "root", "root") }

fun main(args: Array<String>) {
    val app = Javalin.create().start(7000)
    val statisticService = StatisticsService(influxDB)
    val controller = Controller(statisticService)

    app.routes {
        get("/statistics", { ctx ->
            controller.get(ctx)
        })
        post("/upload", { ctx ->
            controller.post(ctx)
        })
    }

}

class Controller(private val statisticService: StatisticsService) {
    private val asStatusCode = fun StatisticResult.(): Int {
        return if (this == StatisticResult.OK) {
            201
        } else {
            204
        }
    }

    fun post(ctx: Context) {
        val statistic = ctx.bodyAsClass(Statistic::class.java)
        val result = statisticService.create(statistic)
        ctx.status(result.asStatusCode())
    }

    fun get(ctx: Context) {
        ctx.json(statisticService.aggregated())
    }
}

realtimestatistics.StatisticsService:包含创建和检索指标的业务逻辑以及数据库初始化

private val timeFrameInMillis = 60000

private val aggregateQuery = """
    SELECT  count(s_count) as count,
            sum(s_count) as sum,
            min(s_count) as min,
            max(s_count) as max
    FROM uploads
    where time > now() - 60s
    """

init {
    influxDB.createDatabase(dbName)
}

fun create(statistic: Statistic): StatisticResult {
    val now = Date().time
    if ((statistic.timestamp + timeFrameInMillis) >= now) {
        influxDB.write(dbName, "", Point.measurement("uploads")
                .time(statistic.timestamp, TimeUnit.MILLISECONDS)
                .addField("s_count", statistic.count)
                .addField("s_timestamp", statistic.timestamp)
                .build())
        return StatisticResult.OK
    }
    return StatisticResult.OLD
}

fun aggregated(): Total {
    val query = Query(
            aggregateQuery,
            dbName
    )
    val results = influxDB.query(query)
            .results
    if (results.first().series == null) {
        return Total(0.0, 0.0, 0.0, 0.0)
    }
    return results.first().series.first().values
            .map { mutableList ->
                Total(mutableList[1].toString().toDouble(),
                        mutableList[2].toString().toDouble(),
                        mutableList[3].toString().toDouble(),
                        mutableList[4].toString().toDouble()
                )
            }[0]
}

完整源代码请见 https://github.com/ricardobaumann/real-time-statistics

本地运行

对于本地运行,我使用 docker compose。因此,在根文件夹下,运行 docker-compose up 并检出上面提到的端点 http://localhost:7000/

用法
在服务运行时,尝试使用 POST/upload发送 { “count” : 40 }

然后,从 获取摘要/statistics。

作者:Jeebiz  创建时间:2025-05-04 00:35
最后编辑:Jeebiz  更新时间:2025-05-04 00:55