https://blog.csdn.net/hhl18730252820/article/details/114826366
https://blog.csdn.net/qq_41466892/article/details/121334650
Spring Data Redis Stream的使用
1、引入jar包
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>
2、配置RedisTemplate依赖
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
// 这个地方不可使用 json 序列化,如果使用的是ObjectRecord传输对象时,可能会有问题,会出现一个 java.lang.IllegalArgumentException: Value must not be null! 错误
redisTemplate.setHashValueSerializer(RedisSerializer.string());
return redisTemplate;
}
}
注意:
此处需要注意 setHashValueSerializer 的序列化的方式,具体注意事项后期再说。
3、准备一个实体对象
这个实体对象是需要发送到Stream中的对象。
@Getter
@Setter
@ToString
public class Book {
private String title;
private String author;
public static Book create() {
com.github.javafaker.Book fakerBook = Faker.instance().book();
Book book = new Book();
book.setTitle(fakerBook.title());
book.setAuthor(fakerBook.author());
return book;
}
}
每次调用create方法时,会自动产生一个Book的对象,对象模拟数据是使用javafaker来模拟生成的。
4、编写一个常量类,配置Stream的名称
/**
* 常量
*
*/
public class Cosntants {
public static final String STREAM_KEY_001 = "stream-001";
}
5、编写一个生产者,向Stream中生产数据
5.1、编写一个生产者,向Stream中产生ObjectRecord类型的数据
/**
* 消息生产者
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class StreamProducer {
private final RedisTemplate<String, Object> redisTemplate;
public void sendRecord(String streamKey) {
Book book = Book.create();
log.info("产生一本书的信息:[{}]", book);
ObjectRecord<String, Book> record = StreamRecords.newRecord()
.in(streamKey)
.ofObject(book)
.withId(RecordId.autoGenerate());
RecordId recordId = redisTemplate.opsForStream()
.add(record);
log.info("返回的record-id:[{}]", recordId);
}
}
5.2、每隔5s就生产一个数据到Stream中
/**
* 周期性的向流中产生消息
*/
@Component
@AllArgsConstructor
public class CycleGeneratorStreamMessageRunner implements ApplicationRunner {
private final StreamProducer streamProducer;
@Override
public void run(ApplicationArguments args) {
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(() -> streamProducer.sendRecord(STREAM_KEY_001),
0, 5, TimeUnit.SECONDS);
}
}
三、独立消费
独立消费指的是脱离消费组的直接消费Stream中的消息,是使用 xread方法读取流中的数据,流中的数据在读取后并不会被删除,还是存在的。如果多个程序同时使用xread读取,都是可以读取到消息的。
1、实现从头开始消费-xread实现
此处实现的是从Stream的第一个消息开始消费
import com.huan.study.redis.constan.Cosntants;
import com.huan.study.redis.entity.Book;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 脱离消费组-直接消费Stream中的数据,可以获取到Stream中所有的消息
*/
@Component
@Slf4j
public class XreadNonBlockConsumer01 implements InitializingBean, DisposableBean {
private ThreadPoolExecutor threadPoolExecutor;
@Resource
private RedisTemplate<String, Object> redisTemplate;
private volatile boolean stop = false;
@Override
public void afterPropertiesSet() {
// 初始化线程池
threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("xread-nonblock-01");
return thread;
});
StreamReadOptions streamReadOptions = StreamReadOptions.empty()
// 如果没有数据,则阻塞1s 阻塞时间需要小于`spring.redis.timeout`配置的时间
.block(Duration.ofMillis(1000))
// 一直阻塞直到获取数据,可能会报超时异常
// .block(Duration.ofMillis(0))
// 1次获取10个数据
.count(10);
StringBuilder readOffset = new StringBuilder("0-0");
threadPoolExecutor.execute(() -> {
while (!stop) {
// 使用xread读取数据时,需要记录下最后一次读取到offset,然后当作下次读取的offset,否则读取出来的数据会有问题
List<ObjectRecord<String, Book>> objectRecords = redisTemplate.opsForStream()
.read(Book.class, streamReadOptions, StreamOffset.create(Cosntants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
if (CollectionUtils.isEmpty(objectRecords)) {
log.warn("没有获取到数据");
continue;
}
for (ObjectRecord<String, Book> objectRecord : objectRecords) {
log.info("获取到的数据信息 id:[{}] book:[{}]", objectRecord.getId(), objectRecord.getValue());
readOffset.setLength(0);
readOffset.append(objectRecord.getId());
}
}
});
}
@Override
public void destroy() throws Exception {
stop = true;
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
}
}
注意:
下一次读取数据时,offset 是上一次最后获取到的id的值,否则可能会出现漏数据。
2、StreamMessageListenerContainer实现独立消费
见下方的消费组消费的代码
四、消费组消费
1、实现StreamListener接口
实现这个接口的目的是为了,消费Stream中的数据。需要注意在注册时使用的是
streamMessageListenerContainer.receiveAutoAck()还是streamMessageListenerContainer.receive()方法,如果是第二个,则需要手动ack,手动ack的代码:redisTemplate.opsForStream().acknowledge(“key”,”group”,”recordId”);
/**
* 通过监听器异步消费
*
*/
@Slf4j
@Getter
@Setter
public class AsyncConsumeStreamListener implements StreamListener<String, ObjectRecord<String, Book>> {
/**
* 消费者类型:独立消费、消费组消费
*/
private String consumerType;
/**
* 消费组
*/
private String group;
/**
* 消费组中的某个消费者
*/
private String consumerName;
public AsyncConsumeStreamListener(String consumerType, String group, String consumerName) {
this.consumerType = consumerType;
this.group = group;
this.consumerName = consumerName;
}
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(ObjectRecord<String, Book> message) {
String stream = message.getStream();
RecordId id = message.getId();
Book value = message.getValue();
if (StringUtils.isBlank(group)) {
log.info("[{}]: 接收到一个消息 stream:[{}],id:[{}],value:[{}]", consumerType, stream, id, value);
} else {
log.info("[{}] group:[{}] consumerName:[{}] 接收到一个消息 stream:[{}],id:[{}],value:[{}]", consumerType,
group, consumerName, stream, id, value);
}
// 当是消费组消费时,如果不是自动ack,则需要在这个地方手动ack
// redisTemplate.opsForStream()
// .acknowledge("key","group","recordId");
}
}
2、获取消费或消费消息过程中错误的处理
/**
* StreamPollTask 获取消息或对应的listener消费消息过程中发生了异常
* */
@Slf4j
public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
log.error("发生了异常", t);
}
}
3、消费组配置
/**
* redis stream 消费组配置
*/
@Configuration
public class RedisStreamConfiguration {
@Resource
private RedisConnectionFactory redisConnectionFactory;
/**
* 可以同时支持 独立消费 和 消费者组 消费
* <p>
* 可以支持动态的 增加和删除 消费者
* <p>
* 消费组需要预先创建出来
*
* @return StreamMessageListenerContainer
*/
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {
AtomicInteger index = new AtomicInteger(1);
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {
Thread thread = new Thread(r);
thread.setName("async-stream-consumer-" + index.getAndIncrement());
thread.setDaemon(true);
return thread;
});
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Book>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次最多获取多少条消息
.batchSize(10)
// 运行 Stream 的 poll task
.executor(executor)
// 可以理解为 Stream Key 的序列化方式
.keySerializer(RedisSerializer.string())
// 可以理解为 Stream 后方的字段的 key 的序列化方式
.hashKeySerializer(RedisSerializer.string())
// 可以理解为 Stream 后方的字段的 value 的序列化方式
.hashValueSerializer(RedisSerializer.string())
// Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小
.pollTimeout(Duration.ofSeconds(1))
// ObjectRecord 时,将 对象的 filed 和 value 转换成一个 Map 比如:将Book对象转换成map
.objectMapper(new ObjectHashMapper())
// 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
.errorHandler(new CustomErrorHandler())
// 将发送到Stream中的Record转换成ObjectRecord,转换成具体的类型是这个地方指定的类型
.targetType(Book.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
// 独立消费
String streamKey = Cosntants.STREAM_KEY_001;
streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
new AsyncConsumeStreamListener("独立消费", null, null));
// 消费组A,不自动ack
// 从消费组中没有分配给消费者的消息开始消费
streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-a"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消费组消费", "group-a", "consumer-a"));
// 从消费组中没有分配给消费者的消息开始消费
streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消费组消费A", "group-a", "consumer-b"));
// 消费组B,自动ack
streamMessageListenerContainer.receiveAutoAck(Consumer.from("group-b", "consumer-a"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消费组消费B", "group-b", "consumer-bb"));
// 如果需要对某个消费者进行个性化配置在调用register方法的时候传递`StreamReadRequest`对象
return streamMessageListenerContainer;
}
}
注意:
提前建立好消费组
127.0.0.1:6379> xgroup create stream-001 group-a $
OK
127.0.0.1:6379> xgroup create stream-001 group-b $
3.1、独有消费配置
streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey), new AsyncConsumeStreamListener("独立消费", null, null));
不传递Consumer即可。
3.2、配置消费组-不自动ack消息
streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消费组消费A", "group-a", "consumer-b"));
- 1、需要注意ReadOffset的取值。
- 2、需要注意group需要提前创建好。
3.3、配置消费组-自动ack消息
streamMessageListenerContainer.receiveAutoAck()
五、序列化策略
Stream Property | Serializer | Description |
---|---|---|
key | keySerializer | used for Record#getStream() |
field | hashKeySerializer | used for each map key in the payload |
value | hashValueSerializer | used for each map value in the payload |
六、ReadOffset策略
消费消息时的Read Offset 策略
Read offset | Standalone | Consumer Group |
---|---|---|
Latest | Read latest message(读取最新的消息) | Read latest message(读取最新的消息) |
Specific Message Id | Use last seen message as the next MessageId | |
(读取大于指定的消息id的消息) | Use last seen message as the next MessageId | |
(读取大于指定的消息id的消息) | ||
Last Consumed | Use last seen message as the next MessageId | |
(读取大于指定的消息id的消息) | Last consumed message as per consumer group | |
(读取还未分配给消费组中的消费组的消息) |
————————————————
版权声明:本文为CSDN博主「普通网友」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_41466892/article/details/121334650
最后编辑:Jeebiz 更新时间:2024-08-16 11:44