使用 influxDB 收集实时分析数据
什么是 InfluxDB?
InfluxDB 是一个用于时间序列数据的数据库,适用于指标、分析、物联网监控等。它速度快、可用且可扩展。更多详情,请访问https://www.influxdata.com/
初始设置
在我们开始之前,我们需要做几件事:
- 设置 Docker (安装 Docker)
- 安装 Gradle
建筑学
这是一个 javalin 微服务应用程序,具有 2 个端点:
- POST /upload:接收事件并将其插入数据库。如果事件已过期,则将其丢弃。
- GET /statistics:返回事件的摘要(计数、总和、最小值和最大值)
执行
该应用程序由 2 个简单的 kotlin 文件组成:
realtimestatistics.Main:创建端点和应用程序设置
data class Statistic(val count: Int = 0, val timestamp: Long = Date().time)
data class Total(val count: Double, val sum: Double, val min: Double, val max: Double)
val influxHost = System.getenv().getOrDefault("influx.host", "influxdb")!!
val influxDB: InfluxDB by lazy { InfluxDBFactory.connect("http://$influxHost:8086", "root", "root") }
fun main(args: Array<String>) {
val app = Javalin.create().start(7000)
val statisticService = StatisticsService(influxDB)
val controller = Controller(statisticService)
app.routes {
get("/statistics", { ctx ->
controller.get(ctx)
})
post("/upload", { ctx ->
controller.post(ctx)
})
}
}
class Controller(private val statisticService: StatisticsService) {
private val asStatusCode = fun StatisticResult.(): Int {
return if (this == StatisticResult.OK) {
201
} else {
204
}
}
fun post(ctx: Context) {
val statistic = ctx.bodyAsClass(Statistic::class.java)
val result = statisticService.create(statistic)
ctx.status(result.asStatusCode())
}
fun get(ctx: Context) {
ctx.json(statisticService.aggregated())
}
}
realtimestatistics.StatisticsService:包含创建和检索指标的业务逻辑以及数据库初始化
private val timeFrameInMillis = 60000
private val aggregateQuery = """
SELECT count(s_count) as count,
sum(s_count) as sum,
min(s_count) as min,
max(s_count) as max
FROM uploads
where time > now() - 60s
"""
init {
influxDB.createDatabase(dbName)
}
fun create(statistic: Statistic): StatisticResult {
val now = Date().time
if ((statistic.timestamp + timeFrameInMillis) >= now) {
influxDB.write(dbName, "", Point.measurement("uploads")
.time(statistic.timestamp, TimeUnit.MILLISECONDS)
.addField("s_count", statistic.count)
.addField("s_timestamp", statistic.timestamp)
.build())
return StatisticResult.OK
}
return StatisticResult.OLD
}
fun aggregated(): Total {
val query = Query(
aggregateQuery,
dbName
)
val results = influxDB.query(query)
.results
if (results.first().series == null) {
return Total(0.0, 0.0, 0.0, 0.0)
}
return results.first().series.first().values
.map { mutableList ->
Total(mutableList[1].toString().toDouble(),
mutableList[2].toString().toDouble(),
mutableList[3].toString().toDouble(),
mutableList[4].toString().toDouble()
)
}[0]
}
完整源代码请见 https://github.com/ricardobaumann/real-time-statistics
本地运行
对于本地运行,我使用 docker compose。因此,在根文件夹下,运行 docker-compose up 并检出上面提到的端点 http://localhost:7000/
用法
在服务运行时,尝试使用 POST/upload发送 { “count” : 40 }
然后,从 获取摘要/statistics。
作者:Jeebiz 创建时间:2025-05-04 00:35
最后编辑:Jeebiz 更新时间:2025-05-04 00:55
最后编辑:Jeebiz 更新时间:2025-05-04 00:55