生产消息,score = 时间搓+60s随机数

    public static final String MESSAGE_ZKEY = "message:ZSetqueue";
    public volatile AtomicInteger count =  new AtomicInteger();
    public void zAdd() {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                int increment = count.getAndIncrement();
                log.info(Thread.currentThread().getName() + ":put message to zset = " + increment);
                double score = System.currentTimeMillis() + new Random().nextInt(60 * 1000);
                redisTemplate.opsForZSet().add(MESSAGE_ZKEY, Thread.currentThread().getName() + " hello zset:" + increment, score);
            }).start();
        }
    }

消费者:定时任务,每秒执行一次

   public static final String MESSAGE_ZKEY = "message:ZSetqueue";
    public SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
    @Scheduled(initialDelay = 5 * 1000, fixedRate = 1000)
    public void zrangebysocre() {
        log.info("延时队列消费。。。");
        // 拉取score小于当前时间戳的消息
        Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(MESSAGE_ZKEY, 0, System.currentTimeMillis());
        if (messages != null) {
            for (Object message : messages) {
                Double score = redisTemplate.opsForZSet().score(MESSAGE_ZKEY, message);
                log.info("消费了:" + message + "消费时间为:" + simpleDateFormat.format(score));
                redisTemplate.opsForZSet().remove(MESSAGE_ZKEY, message);
            }
        }
    }
@GetMapping("/zadd")
public void zadd() {
    messageProducer.zAdd();
}

作者:Jeebiz  创建时间:2023-01-31 15:56
最后编辑:Jeebiz  更新时间:2024-08-16 11:44