Data Server 服务可优化点整理
Data Server 服务可优化点整理
Redis 连接池配置优化
protected JedisPooled provideRedis() {
var poolConfig = new GenericObjectPoolConfig<Connection>();
// 设置获取连接的最大等待时间
poolConfig.setMaxWaitMillis(config.getMaxWaitMillis());
// 设置最大连接数
poolConfig.setMaxTotal(config.getMaxTotal());
// 设置最大空闲连接数
poolConfig.setMaxIdle(config.getMaxIdle());
// 设置最小空闲连接数
poolConfig.setMinIdle(config.getMinIdle());
// 设置获取连接时不进行连接验证(通过 PoolableObjectFactory.validateObject() 验证连接是否有效)
poolConfig.setTestOnBorrow(config.isTestOnBorrow());
// 设置退还连接时不进行连接验证(通过 PoolableObjectFactory.validateObject() 验证连接是否有效)
poolConfig.setTestOnReturn(config.isTestOnReturn());
// 设置连接空闲时进行连接验证
poolConfig.setTestWhileIdle(config.isTestWhileIdle());
// 设置连接被回收前的最大空闲时间
poolConfig.setMinEvictableIdleTimeMillis(config.getMinEvictableIdleTimeMillis());
// 设置检测线程的运行时间间隔
poolConfig.setTimeBetweenEvictionRunsMillis(config.getTimeBetweenEvictionRunsMillis());
// 设置检测线程每次检测的对象数
poolConfig.setNumTestsPerEvictionRun(-1);
var clientConfig = DefaultJedisClientConfig.builder()
.user(config.getUser())
.password(config.getPassword())
.database(config.getDatabase())
.build();
var hostPort = new HostAndPort(config.getAddress(), config.getPort());
return new JedisPooled(poolConfig, hostPort, clientConfig);
}
- 1、开启连接空闲时的有效性检查,降低连接使用时可能出现连接不可用的情况
- 2、设置连接被回收前的最大空闲时间,及时释放连接资源
Hikari 连接池配置优化
private DataSource createDatasource(JdbcConfig conf) {
var config = new HikariConfig();
config.setJdbcUrl(conf.getUrl());
config.setUsername(conf.getUser());
config.setPassword(conf.getPass());
//config.setThreadFactory(Thread.ofVirtual().factory());
config.setConnectionTestQuery(conf.getConnectionTestQuery());
config.setMaxLifetime(conf.getMaxLifetime());
config.setMaximumPoolSize(conf.getMaxPoolSize());
config.setMinimumIdle(conf.getMinIdle());
config.setIdleTimeout(conf.getIdleTimeout());
return new HikariDataSource(config);
}
- 增加连接测试查询配置,及时发现无效了解,降低数据库连接失效引起的程序异常概率
- 增加其他参数,用于生产环境参数配置
缓存逻辑优化
1、好友邀请成就,逻辑优化,改为批量保存
假设“好友邀请成就”每分钟执行一次,下面的逻辑中 inviteDao.aggInvites 获取到的数据量不可控,可能是几条,也可能是几千条,与 Const.INVITE_LEVELS 进行双循环后,逐个进行数据库的行更新,对连接的获取频率会比较高,此次可改为批量保存
public void run() {
var nowTs = DateUtil.current();
var lastTs = redisDao.getLong(RedisKey.inviteTaskTs(), null);
var result = inviteDao.aggInvites(lastTs);
var levels = Const.INVITE_LEVELS;
for (var it : result.entrySet()) {
var inviteCount = it.getValue().getCount();
for (var n : levels) {
if (inviteCount < n) continue;
//大于等于n, 表示已达到, 记录入库
achievementDao.add(it.getKey(), Const.ACH_INVITE, n.toString(), inviteCount.toString());
break;
}
}
redisDao.setLong(RedisKey.inviteTaskTs(), nowTs);
}
2、ROI 成就排行,逻辑优化,改为批量保存
roiRank24h 查询是对 t_options_firm_order、t_user 两个数据量会比较多的表进行连表查询,最后返回了与用户量同等数量的数据,当用户量达到一定量级后,该查询将会降低查询效率,并创建大量的对象,占用内存空间
SELECT * FROM
(
SELECT
{0}.id,
{0}.user_id uid,
{1}.user_name un,
{1}.country ct,
(profit_loss/(price*quantity)) roi,
@rn := IF(@prev = user_id, @rn + 1, 1) AS rn,
@prev := user_id
FROM {0}
JOIN (SELECT @prev := NULL, @rn := 0) AS vars
LEFT JOIN {1} on {0}.user_id = {1}.id
WHERE {2}
ORDER BY {0}.user_id, roi desc
) AS T1
WHERE rn = 1
order by roi desc
与 roiRank24h 方法 配合使用的 cacheRoiRank24hInRedis 方法,为了解决用户多的情况,使用了固定线程数的线程池进行处理,当用户量达到一定量级后,会出现以下几种情况:
- 会占用更多的内存空间外
- 对Redis连接的消耗也会增加
- 线程池处理也会需要一定的时间,整体上不是比较优的解决方法。
// 线程方法
public void run() {
var nowTs = DateUtil.current();
var ranks = orderDao.roiRank24h(nowTs-Const.DAY_MS, config.getBaseToken(), config.getQuoteToken());
cacheDao.cacheRoiRank24hInRedis(ranks, 200);
}
// 排行并保存的方法
public void cacheRoiRank24hInRedis(List<RankingVo> ranks, int limit) {
var countries = countryDao.all();
var counter = new Counter();
var countryMap = new HashMap<String, List<RankingVo>>();
var bested = new HashSet<String>();
var best = new ArrayList<RankingVo>();
var global = new ArrayList<RankingVo>();
for (var i = 0; i < ranks.size(); i++) {
var it = ranks.get(i);
it.setGr(i+1); //全球排名
if(i < limit) global.add(it);
var isWhite = countries.containsKey(it.getCt());
var country = isWhite ? it.getCt() : "Local";
var countryRank = counter.incr(country);
it.setLr(countryRank);//在本国排名
if(!countryMap.containsKey(country)) {
countryMap.put(country, new ArrayList<>());
}
var local = countryMap.get(country);
if(local.size() <= limit) local.add(it);
//best country
if(isWhite && !bested.contains(country)) {
bested.add(country);
best.add(it);
}
worker.submit(() -> {
redisDao.setJson(RedisKey.userRank(it.getUid()), it, config.getRankExpiresSec(), false);
});
}
redisDao.setJson(RedisKey.globalRank(), global, config.getRankExpiresSec(), true);
redisDao.setJson(RedisKey.bestCountry(), best, config.getRankExpiresSec(), true);
countryMap.forEach((name, rank) -> {
redisDao.setJson(RedisKey.countryRank(name), rank, config.getRankExpiresSec(), true);
});
}
可考虑数据订阅,结合 zset 缓存结构进行重写,方案有:
- 基于 Kafka Connect 实现 MySQL 数据库中现有数据的快照,然后监听和消费 Kafka 消息,实现数据从 MySQL >> Kafka >> Redis 的处理过程
- 基于 RocketMQ Connect 实现 MySQL 数据库中现有数据的快照,然后监听和消费 RocketMQ 消息,实现数据从 MySQL >> Kafka >> Redis 的处理过程
- 基于 Canal 实现 MySQL 数据日志订阅,然后监听消费 binlog 日志,实现数据从 MySQL >> Canal >> Redis 的处理过程
缓存结构优化
程序中多处使用 redisDao.setJson() 方法,调用 redis 的 set 命令 , 将排行数据保存为 json 格式,这种方式使用简单,但不够灵活高效
- 比如 取出全球排行中的前 50,按现在的方式则需要:先取出数据 >> 反序列化 >> 内存排序 >> 取出top50, 如果缓存改为 zset 结构,只需要使用 zset 的 命令即可快速取回 top50, 并在返回数据时自动完成反序列化;
- 依此考虑,其他排行均可优化为zset格式
作者:Jeebiz 创建时间:2024-09-20 12:13
最后编辑:Jeebiz 更新时间:2024-10-10 11:21
最后编辑:Jeebiz 更新时间:2024-10-10 11:21