Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,而在我们实际使用中,往往是需要定义各种输入输出流。使用方法也很简单。

下面我们以发短信为例,描述使用过程!

消息生产者

自定义消息输出通道 SmsSink

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * SMS消息通道
 */
public interface SmsSink {

    /**
     * 消息生产者的配置
     */
    String OUTPUT = "sms-output";

    @Output(SmsSink.OUTPUT)
    MessageChannel smsOutput();

}

工具类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 工具类:负责向中间件发送数据
 */
@EnableBinding(SmsSink.class)
@Component
public class SmsMessageSender {

    @Autowired
    @Qualifier(SmsSink.OUTPUT)
    private MessageChannel smsOutput;

    //发送消息
    public void send(Object obj){
        smsOutput.send(MessageBuilder.withPayload(obj).build());
    }

}

yml 配置文件

spring:
  # Rabbitmq 配置
  rabbitmq:
    host: 192.168.31.100
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    publisher-confirm-type: simple
    #启动消息失败返回,比如路由不到队列时触发回调
    publisher-returns: true
    listener:
      simple:
        #NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
        acknowledge-mode: auto
      direct:
        #NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
        acknowledge-mode: auto
  cloud:
    stream:
      # 消息中间件绑定配置:BindingProperties
      bindings:
        sms-output:
          # 指定要使用的 Exchange 名称
          destination: ramq.sms.topic
          #设置消息类型
          content-type: application/json
      binders:
        defaultRabbit:
          type: rabbit

消息的消费者

自定义消息通道 SmsSource

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * SMS消息通道
 */
public interface SmsSource {

    /**
     * 消息消费者的配置
     */
    String INPUT = "sms-input";

    @Input(SmsSource.INPUT)
    SubscribableChannel smsInput();

}

SmsMessageListener

@Component
@EnableBinding(SmsSource.class)
public class SmsMessageListener {

    @StreamListener(SmsSource.INPUT)
    public void input(String message){
        System.out.println("获取到的消息是:"+message);
    }
}

配置文件

spring:
  # Rabbitmq 配置
  rabbitmq:
    host: 192.168.31.100
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    publisher-confirm-type: simple
    #启动消息失败返回,比如路由不到队列时触发回调
    publisher-returns: true
    listener:
      simple:
        #NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
        acknowledge-mode: auto
      direct:
        #NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
        acknowledge-mode: auto
  cloud:
    stream:
      # 消息中间件绑定配置:BindingProperties
      bindings:
        sms-input:
          # 指定要使用的 Exchange 名称
          destination: ramq.sms.topic
          #设置消息类型
          content-type: application/json
      binders:
        defaultRabbit:
          type: rabbit
作者:Jeebiz  创建时间:2023-03-30 11:50
最后编辑:Jeebiz  更新时间:2024-11-01 10:06