Data Server 服务可优化点整理

Redis 连接池配置优化

protected JedisPooled provideRedis() {
    var poolConfig = new GenericObjectPoolConfig<Connection>();
    // 设置获取连接的最大等待时间
    poolConfig.setMaxWaitMillis(config.getMaxWaitMillis());
    // 设置最大连接数
    poolConfig.setMaxTotal(config.getMaxTotal());
    // 设置最大空闲连接数
    poolConfig.setMaxIdle(config.getMaxIdle());
    // 设置最小空闲连接数
    poolConfig.setMinIdle(config.getMinIdle());
    // 设置获取连接时不进行连接验证(通过 PoolableObjectFactory.validateObject() 验证连接是否有效)
    poolConfig.setTestOnBorrow(config.isTestOnBorrow());
    // 设置退还连接时不进行连接验证(通过 PoolableObjectFactory.validateObject() 验证连接是否有效)
    poolConfig.setTestOnReturn(config.isTestOnReturn());
    // 设置连接空闲时进行连接验证
    poolConfig.setTestWhileIdle(config.isTestWhileIdle());
    // 设置连接被回收前的最大空闲时间
    poolConfig.setMinEvictableIdleTimeMillis(config.getMinEvictableIdleTimeMillis());
    // 设置检测线程的运行时间间隔
    poolConfig.setTimeBetweenEvictionRunsMillis(config.getTimeBetweenEvictionRunsMillis());
    // 设置检测线程每次检测的对象数
    poolConfig.setNumTestsPerEvictionRun(-1);

    var clientConfig = DefaultJedisClientConfig.builder()
        .user(config.getUser())
        .password(config.getPassword())
        .database(config.getDatabase())
        .build();
    var hostPort = new HostAndPort(config.getAddress(), config.getPort());
    return new JedisPooled(poolConfig, hostPort, clientConfig);
}
  • 1、开启连接空闲时的有效性检查,降低连接使用时可能出现连接不可用的情况
  • 2、设置连接被回收前的最大空闲时间,及时释放连接资源

Hikari 连接池配置优化

private DataSource createDatasource(JdbcConfig conf) {
    var config = new HikariConfig();
    config.setJdbcUrl(conf.getUrl());
    config.setUsername(conf.getUser());
    config.setPassword(conf.getPass());
    //config.setThreadFactory(Thread.ofVirtual().factory());
    config.setConnectionTestQuery(conf.getConnectionTestQuery());
    config.setMaxLifetime(conf.getMaxLifetime());
    config.setMaximumPoolSize(conf.getMaxPoolSize());
    config.setMinimumIdle(conf.getMinIdle());
    config.setIdleTimeout(conf.getIdleTimeout());
    return new HikariDataSource(config);
}
  • 增加连接测试查询配置,及时发现无效了解,降低数据库连接失效引起的程序异常概率
  • 增加其他参数,用于生产环境参数配置

缓存逻辑优化

1、好友邀请成就,逻辑优化,改为批量保存

假设“好友邀请成就”每分钟执行一次,下面的逻辑中 inviteDao.aggInvites 获取到的数据量不可控,可能是几条,也可能是几千条,与 Const.INVITE_LEVELS 进行双循环后,逐个进行数据库的行更新,对连接的获取频率会比较高,此次可改为批量保存

public void run() {
    var nowTs = DateUtil.current();
    var lastTs = redisDao.getLong(RedisKey.inviteTaskTs(), null);

    var result = inviteDao.aggInvites(lastTs);
    var levels = Const.INVITE_LEVELS;
    for (var it : result.entrySet()) {
        var inviteCount = it.getValue().getCount();
        for (var n : levels) {
            if (inviteCount < n) continue;
            //大于等于n, 表示已达到, 记录入库
            achievementDao.add(it.getKey(), Const.ACH_INVITE, n.toString(), inviteCount.toString());
            break;
        }
    }

    redisDao.setLong(RedisKey.inviteTaskTs(), nowTs);
}

2、ROI 成就排行,逻辑优化,改为批量保存

roiRank24h 查询是对 t_options_firm_order、t_user 两个数据量会比较多的表进行连表查询,最后返回了与用户量同等数量的数据,当用户量达到一定量级后,该查询将会降低查询效率,并创建大量的对象,占用内存空间

SELECT * FROM
(
    SELECT
        {0}.id,
        {0}.user_id uid,
        {1}.user_name un,
        {1}.country ct,
        (profit_loss/(price*quantity)) roi,
        @rn := IF(@prev = user_id, @rn + 1, 1) AS rn,
        @prev := user_id
    FROM {0}
    JOIN (SELECT @prev := NULL, @rn := 0) AS vars
    LEFT JOIN {1} on {0}.user_id = {1}.id
    WHERE {2}
    ORDER BY {0}.user_id, roi desc
) AS T1
WHERE rn = 1
order by roi desc

与 roiRank24h 方法 配合使用的 cacheRoiRank24hInRedis 方法,为了解决用户多的情况,使用了固定线程数的线程池进行处理,当用户量达到一定量级后,会出现以下几种情况

  • 会占用更多的内存空间外
  • 对Redis连接的消耗也会增加
  • 线程池处理也会需要一定的时间,整体上不是比较优的解决方法。
// 线程方法
public void run() {
    var nowTs = DateUtil.current();
    var ranks = orderDao.roiRank24h(nowTs-Const.DAY_MS, config.getBaseToken(), config.getQuoteToken());
    cacheDao.cacheRoiRank24hInRedis(ranks, 200);
}

// 排行并保存的方法
public void cacheRoiRank24hInRedis(List<RankingVo> ranks, int limit) {
    var countries = countryDao.all();
    var counter = new Counter();
    var countryMap = new HashMap<String, List<RankingVo>>();
    var bested = new HashSet<String>();
    var best = new ArrayList<RankingVo>();
    var global = new ArrayList<RankingVo>();
    for (var i = 0; i < ranks.size(); i++) {
        var it = ranks.get(i);
        it.setGr(i+1); //全球排名
        if(i < limit) global.add(it);

        var isWhite = countries.containsKey(it.getCt());
        var country = isWhite ? it.getCt() : "Local";

        var countryRank = counter.incr(country);
        it.setLr(countryRank);//在本国排名
        if(!countryMap.containsKey(country)) {
            countryMap.put(country, new ArrayList<>());
        }

        var local = countryMap.get(country);
        if(local.size() <= limit) local.add(it);

        //best country
        if(isWhite && !bested.contains(country)) {
            bested.add(country);
            best.add(it);
        }

        worker.submit(() -> {
            redisDao.setJson(RedisKey.userRank(it.getUid()), it, config.getRankExpiresSec(), false);
        });
    }

    redisDao.setJson(RedisKey.globalRank(), global, config.getRankExpiresSec(), true);
    redisDao.setJson(RedisKey.bestCountry(), best, config.getRankExpiresSec(), true);
    countryMap.forEach((name, rank) -> {
        redisDao.setJson(RedisKey.countryRank(name), rank, config.getRankExpiresSec(), true);
    });
}

可考虑数据订阅,结合 zset 缓存结构进行重写,方案有:

  • 基于 Kafka Connect 实现 MySQL 数据库中现有数据的快照,然后监听和消费 Kafka 消息,实现数据从 MySQL >> Kafka >> Redis 的处理过程
  • 基于 RocketMQ Connect 实现 MySQL 数据库中现有数据的快照,然后监听和消费 RocketMQ 消息,实现数据从 MySQL >> Kafka >> Redis 的处理过程
  • 基于 Canal 实现 MySQL 数据日志订阅,然后监听消费 binlog 日志,实现数据从 MySQL >> Canal >> Redis 的处理过程

缓存结构优化

程序中多处使用 redisDao.setJson() 方法,调用 redis 的 set 命令 , 将排行数据保存为 json 格式,这种方式使用简单,但不够灵活高效

  • 比如 取出全球排行中的前 50,按现在的方式则需要:先取出数据 >> 反序列化 >> 内存排序 >> 取出top50, 如果缓存改为 zset 结构,只需要使用 zset 的 命令即可快速取回 top50, 并在返回数据时自动完成反序列化;
  • 依此考虑,其他排行均可优化为zset格式
作者:Jeebiz  创建时间:2024-09-20 12:13
最后编辑:Jeebiz  更新时间:2024-10-10 11:21