消息可以重复消费,多个消费者订阅同一频道即可
一个消费者根据匹配规则订阅多个频道
消费者只能消费订阅之后发布的消息,这意味着,消费者下线再上线这期间发布的消息将会丢失
数据不具有持久化。同样Redis宕机也会数据丢失
消息发布后,是推送到一个缓冲区(内存),消费者从缓冲区拉取消息,当消息堆积,缓冲区溢出,消费者就会被迫下线,同时释放对应的缓冲区;
RedisConfig 中添加监听器
    /**
     * redis消息监听器容器
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅频道,通配符*表示任意多个占位符
        container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
        return container;
    }
订阅者
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
public class MySubscribe implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("订阅频道:" + new String(message.getChannel()));
        System.out.println("接收数据:" + new String(message.getBody()));
    }
}消息发布
@GetMapping("/publish")
public void publish() {
    redisTemplate.convertAndSend("channel_first", "hello world");
}
另一种发布方式
    /**
     * redis消息监听器容器
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅频道,通配符*表示任意多个占位符
        container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
        // 通配符?:表示一个占位符
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
        listenerAdapter.afterPropertiesSet();
        container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));
        return container;
    }
public class MySubscribe2 {
    public void getMessage(Object message, String channel) {
        System.out.println("订阅频道2:" + channel);
        System.out.println("接收数据2:" + message);
    }
}
    @GetMapping("/publish2")
    public void publish2() {
        redisTemplate.convertAndSend("channel2", "hello world");
    }
消息是实体对象,进行转换
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
    private static final long serialVersionUID = 5250232737975907491L;
    private Integer id;
    private String username;
}
public class MySubscribe3 implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] bytes) {
        Jackson2JsonRedisSerializer<User> jacksonSerializer = new Jackson2JsonRedisSerializer<>(User.class);
        jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);
        User user = jacksonSerializer.deserialize(message.getBody());
        System.out.println("订阅频道3:" + new String(message.getChannel()));
        System.out.println("接收数据3:" + user);
    }
}
    /**
     * redis消息监听器容器
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅频道,通配符*:表示任意多个占位符
        container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
        // 通配符?:表示一个占位符
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
        listenerAdapter.afterPropertiesSet();
        container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));
        container.addMessageListener(new MySubscribe3(), new PatternTopic("user"));
        return container;
    }
    /**
     * redis消息监听器容器
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅频道,通配符*:表示任意多个占位符
        container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
        // 通配符?:表示一个占位符
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
        listenerAdapter.afterPropertiesSet();
        container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));
        container.addMessageListener(new MySubscribe3(), new PatternTopic("user"));
        return container;
    }
   @GetMapping("/publish3")
    public void publish3() {
        User user = User.builder().id(1).username("yzm").build();
        redisTemplate.convertAndSend("user", user);
    }
作者:Jeebiz  创建时间:2023-01-31 15:51
最后编辑:Jeebiz 更新时间:2025-08-04 17:42
最后编辑:Jeebiz 更新时间:2025-08-04 17:42