1、读取消息的超时时间

当我们使用 StreamReadOptions.empty().block(Duration.ofMillis(1000)) 配置阻塞时间时,这个配置的阻塞时间必须要比 spring.redis.timeout 配置的时间短,否则可能会报超时异常。

2、ObjectRecord反序列化错误

如果我们在读取消息时发生如下异常,那么排查思路如下:

    java.lang.IllegalArgumentException: Value must not be null!
        at org.springframework.util.Assert.notNull(Assert.java:201)
        at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
        at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
        at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138)
        at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecords(StreamObjectMapper.java:164)
        at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:594)
        at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:413)
        at com.huan.study.redis.stream.consumer.xread.XreadNonBlockConsumer02.lambda$afterPropertiesSet$1(XreadNonBlockConsumer02.java:61)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
  • 1、检测 RedisTemplate的HashValueSerializer的序列化方式,最好不要使用json可以使用RedisSerializer.string()。

  • 2、检查

redisTemplate.opsForStream()中配置的HashMapper,默认是ObjectHashMapper这个是把对象字段和值序列化成byte[]格式。

提供一个可用的配置
# RedisTemplate的hash value 使用string类型的序列化方式
redisTemplate.setHashValueSerializer(RedisSerializer.string());
# 这个方法opsForStream()里面使用默认的ObjectHashMapper
redisTemplate.opsForStream()

3、使用xread顺序读取数据漏数据

如果我们使用xread读取数据发现有写数据漏掉了,这个时候我们需要检查第二次读取时配置的StreamOffset是否合法,这个值需要是上一次读取的最后一个值。

举例说明:

  • 1、SteamOffset传递的是 $ 表示读取最新的一个数据。

  • 2、处理上一步读取到的数据,此时另外的生产者又向Stream中插入了几个数据,这个时候读取到的数据还没有处理完。

  • 3、再次读取Stream中的数据,还是传递的$,那么表示还是读取最新的数据。那么在上一步流入到Stream中的数据,这个消费者就读取不到了,因为它读取的是最新的数据。

4、StreamMessageListenerContainer的使用

  • 1、可以动态的添加和删除消费者

  • 2、可以进行消费组消费

  • 3、可以直接独立消费

  • 4、如果传输ObjectRecord的时候,需要注意一下序列化方式。参考上面的代码。

作者:Jeebiz  创建时间:2023-01-31 19:05
最后编辑:Jeebiz  更新时间:2024-08-16 11:14