消息可以重复消费,多个消费者订阅同一频道即可
一个消费者根据匹配规则订阅多个频道
消费者只能消费订阅之后发布的消息,这意味着,消费者下线再上线这期间发布的消息将会丢失
数据不具有持久化。同样Redis宕机也会数据丢失
消息发布后,是推送到一个缓冲区(内存),消费者从缓冲区拉取消息,当消息堆积,缓冲区溢出,消费者就会被迫下线,同时释放对应的缓冲区;
RedisConfig 中添加监听器
/**
* redis消息监听器容器
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅频道,通配符*表示任意多个占位符
container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
return container;
}
订阅者
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
public class MySubscribe implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("订阅频道:" + new String(message.getChannel()));
System.out.println("接收数据:" + new String(message.getBody()));
}
}
消息发布
@GetMapping("/publish")
public void publish() {
redisTemplate.convertAndSend("channel_first", "hello world");
}
另一种发布方式
/**
* redis消息监听器容器
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅频道,通配符*表示任意多个占位符
container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
// 通配符?:表示一个占位符
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
listenerAdapter.afterPropertiesSet();
container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));
return container;
}
public class MySubscribe2 {
public void getMessage(Object message, String channel) {
System.out.println("订阅频道2:" + channel);
System.out.println("接收数据2:" + message);
}
}
@GetMapping("/publish2")
public void publish2() {
redisTemplate.convertAndSend("channel2", "hello world");
}
消息是实体对象,进行转换
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
private static final long serialVersionUID = 5250232737975907491L;
private Integer id;
private String username;
}
public class MySubscribe3 implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
Jackson2JsonRedisSerializer<User> jacksonSerializer = new Jackson2JsonRedisSerializer<>(User.class);
jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);
User user = jacksonSerializer.deserialize(message.getBody());
System.out.println("订阅频道3:" + new String(message.getChannel()));
System.out.println("接收数据3:" + user);
}
}
/**
* redis消息监听器容器
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅频道,通配符*:表示任意多个占位符
container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
// 通配符?:表示一个占位符
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
listenerAdapter.afterPropertiesSet();
container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));
container.addMessageListener(new MySubscribe3(), new PatternTopic("user"));
return container;
}
/**
* redis消息监听器容器
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅频道,通配符*:表示任意多个占位符
container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
// 通配符?:表示一个占位符
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
listenerAdapter.afterPropertiesSet();
container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));
container.addMessageListener(new MySubscribe3(), new PatternTopic("user"));
return container;
}
@GetMapping("/publish3")
public void publish3() {
User user = User.builder().id(1).username("yzm").build();
redisTemplate.convertAndSend("user", user);
}
作者:Jeebiz 创建时间:2023-01-31 15:51
最后编辑:Jeebiz 更新时间:2024-08-16 11:44
最后编辑:Jeebiz 更新时间:2024-08-16 11:44