在 Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部和尾部添加新的元素

优势:

  • 顺序排序,保证先进先出
  • 队列为空时,自动从Redis数据库删除
  • 在队列的两头插入或删除元素,效率极高,即使队列中元素达到百万级
  • List中可以包含的最大元素数量是4294967295

定时器监听队列

生产者

@Slf4j
@Component
public class MessageProducer {

    public static final String MESSAGE_KEY = "message:queue";

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    public void lPush() {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                Long size = redisTemplate.opsForList().leftPush(MESSAGE_KEY, Thread.currentThread().getName() + ":hello world");
                log.info(Thread.currentThread().getName() + ":put message size = " + size);
            }).start();
        }
    }
}

消费者:消费消息,定时器以达到监听队列功能

@Slf4j
@Component
@EnableScheduling
public class MessageConsumer {

    public static final String MESSAGE_KEY = "message:queue";

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000)
    public void rPop() {
        String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);
        log.info(message);
    }
}
@RestController
public class RedisController {

    @Autowired
    private MessageProducer messageProducer;

    @GetMapping("/lPush")
    public void lPush() {
        messageProducer.lPush();
    }
}

测试

http://localhost:8080/lPush

可能出现的问题:

  • 1.通过定时器监听List中是否有待处理消息,每执行一次都会发起一次连接,这会造成不必要的浪费。
  • 2.生产速度大于消费速度,队列堆积,消息时效性差,占用内存。

运行即监控队列

修改消息消费者代码。

当队列没有元素时,会阻塞10秒,然后再次监听队列,
需要注意的是,阻塞时间必须小于连接超时时间

@Slf4j
@Component
@EnableScheduling
public class MessageConsumer {

    public static final String MESSAGE_KEY = "message:queue";

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    //@Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000)
    public void rPop() {
        String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);
        log.info(message);
    }

    @PostConstruct
    public void brPop() {
        new Thread(() -> {
            while (true) {
                String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY, 10, TimeUnit.SECONDS);
                log.info(message);
            }
        }).start();
    }
}

阻塞时间不能为负,直接报错超时为负
阻塞时间为零,此时阻塞时间等于超时时间,最后报错连接超时
阻塞时间大于超时时间,报错连接超时

测试:

  • 消息不可重复消费,因为消息从队列POP之后就被移除了,即不支持多个消费者消费同一批数据
  • 消息丢失,消费期间发生异常,消息未能正常消费
作者:Jeebiz  创建时间:2023-01-31 15:46
最后编辑:Jeebiz  更新时间:2024-08-16 11:14