Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,而在我们实际使用中,往往是需要定义各种输入输出流。使用方法也很简单。
下面我们以发短信为例,描述使用过程!
消息生产者
自定义消息输出通道 SmsSink
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* SMS消息通道
*/
public interface SmsSink {
/**
* 消息生产者的配置
*/
String OUTPUT = "sms-output";
@Output(SmsSink.OUTPUT)
MessageChannel smsOutput();
}
工具类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* 工具类:负责向中间件发送数据
*/
@EnableBinding(SmsSink.class)
@Component
public class SmsMessageSender {
@Autowired
@Qualifier(SmsSink.OUTPUT)
private MessageChannel smsOutput;
//发送消息
public void send(Object obj){
smsOutput.send(MessageBuilder.withPayload(obj).build());
}
}
yml 配置文件
spring:
# Rabbitmq 配置
rabbitmq:
host: 192.168.31.100
port: 5672
username: admin
password: admin
virtual-host: /
publisher-confirm-type: simple
#启动消息失败返回,比如路由不到队列时触发回调
publisher-returns: true
listener:
simple:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: auto
direct:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: auto
cloud:
stream:
# 消息中间件绑定配置:BindingProperties
bindings:
sms-output:
# 指定要使用的 Exchange 名称
destination: ramq.sms.topic
#设置消息类型
content-type: application/json
binders:
defaultRabbit:
type: rabbit
消息的消费者
自定义消息通道 SmsSource
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* SMS消息通道
*/
public interface SmsSource {
/**
* 消息消费者的配置
*/
String INPUT = "sms-input";
@Input(SmsSource.INPUT)
SubscribableChannel smsInput();
}
SmsMessageListener
@Component
@EnableBinding(SmsSource.class)
public class SmsMessageListener {
@StreamListener(SmsSource.INPUT)
public void input(String message){
System.out.println("获取到的消息是:"+message);
}
}
配置文件
spring:
# Rabbitmq 配置
rabbitmq:
host: 192.168.31.100
port: 5672
username: admin
password: admin
virtual-host: /
publisher-confirm-type: simple
#启动消息失败返回,比如路由不到队列时触发回调
publisher-returns: true
listener:
simple:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: auto
direct:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: auto
cloud:
stream:
# 消息中间件绑定配置:BindingProperties
bindings:
sms-input:
# 指定要使用的 Exchange 名称
destination: ramq.sms.topic
#设置消息类型
content-type: application/json
binders:
defaultRabbit:
type: rabbit
作者:Jeebiz 创建时间:2023-03-30 11:50
最后编辑:Jeebiz 更新时间:2024-11-01 10:06
最后编辑:Jeebiz 更新时间:2024-11-01 10:06