生产消息,score = 时间搓+60s随机数
public static final String MESSAGE_ZKEY = "message:ZSetqueue";
public volatile AtomicInteger count = new AtomicInteger();
public void zAdd() {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
int increment = count.getAndIncrement();
log.info(Thread.currentThread().getName() + ":put message to zset = " + increment);
double score = System.currentTimeMillis() + new Random().nextInt(60 * 1000);
redisTemplate.opsForZSet().add(MESSAGE_ZKEY, Thread.currentThread().getName() + " hello zset:" + increment, score);
}).start();
}
}
消费者:定时任务,每秒执行一次
public static final String MESSAGE_ZKEY = "message:ZSetqueue";
public SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
@Scheduled(initialDelay = 5 * 1000, fixedRate = 1000)
public void zrangebysocre() {
log.info("延时队列消费。。。");
// 拉取score小于当前时间戳的消息
Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(MESSAGE_ZKEY, 0, System.currentTimeMillis());
if (messages != null) {
for (Object message : messages) {
Double score = redisTemplate.opsForZSet().score(MESSAGE_ZKEY, message);
log.info("消费了:" + message + "消费时间为:" + simpleDateFormat.format(score));
redisTemplate.opsForZSet().remove(MESSAGE_ZKEY, message);
}
}
}
@GetMapping("/zadd")
public void zadd() {
messageProducer.zAdd();
}
作者:Jeebiz 创建时间:2023-01-31 15:56
最后编辑:Jeebiz 更新时间:2024-08-16 11:44
最后编辑:Jeebiz 更新时间:2024-08-16 11:44