Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的。)通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。
消息生产者
- 1.引入依赖
- 2.配置application.yml文件
- 3.发送消息的话,定义一个通道接口,通过接口中内置的messagechannel springcloudstream中内置接口 Source
- 4.@EnableBinding : 绑定对应通道
- 5.发送消息的话,通过MessageChannel发送消息
- 如果需要MessageChannel –> 通过绑定的内置接口获取
1、pom.xml 引入依赖
<dependencies>
<!-- For Spring Cloud Stream With RabbitMQ -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
2、配置文件
配置 RabbitMQ 的安装地址、端口以及账户信息
spring:
# Rabbitmq 配置
rabbitmq:
host: 192.168.31.100
port: 5672
username: admin
password: admin
virtual-host: /
publisher-confirm-type: simple
#启动消息失败返回,比如路由不到队列时触发回调
publisher-returns: true
listener:
simple:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: auto
direct:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: auto
cloud:
stream:
bindings:
output:
#指定消息发送的目的地,在rabbitmq中,发送到一个rbt-default的exchange中
destination: rbt-default
myoutput:
destination: rbt-custom-output
producer:
#分区关键字 对象中的id,对象
partition-key-expression: payload
#分区大小
partition-count: 2
binders:
#配置绑定器
defaultRabbit:
type: rabbit
- contentType:用于指定消息的类型。具体可以参考 spring cloud stream docs
- destination:指定了消息发送的目的地,对应 RabbitMQ,会发送到 exchange 是 itcastdefault 的所有消息队列中。
3、测试发送消息
/**
* 入门案例:
* 1.引入依赖
* 2.配置application.yml文件
* 3.发送消息的话,定义一个通道接口,通过接口中内置的messagechannel
* springcloudstream中内置接口 Source
* 4.@EnableBinding : 绑定对应通道
* 5.发送消息的话,通过MessageChannel发送消息
* * 如果需要MessageChannel --> 通过绑定的内置接口获取
*/
@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication implements CommandLineRunner{
@Autowired
private MessageChannel output;
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class);
}
@Override
public void run(String... args) throws Exception {
//发送消息
//通过工具类messageBuilder:创建消息
output.send(MessageBuilder.withPayload("hello老六").build());
}
}
运行启动类:成功效果如下:
消息消费者
- 1.引入依赖
- 2.配置application.yml文件
- 3.需要配置一个通道的接口
- 内置获取消息的通道接口 sink
- 4.绑定通道
- 5.配置一个监听的方法:当程序从中间件获取数据后,需要在监听方法上配置一个StreamListener
1、pom.xml 引入依赖
<dependencies>
<!-- For Spring Cloud Stream With RabbitMQ -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
2、配置文件
配置 RabbitMQ 的安装地址、端口以及账户信息
spring:
# Rabbitmq 配置
rabbitmq:
host: 192.168.31.100
port: 5672
username: admin
password: admin
virtual-host: /
publisher-confirm-type: simple
#启动消息失败返回,比如路由不到队列时触发回调
publisher-returns: true
listener:
simple:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: auto
direct:
#NONE:自动确认;AUTO:根据情况确认;MANUAL:手动确认
acknowledge-mode: auto
cloud:
stream:
#instanceCount: 2 #消费者总数
#instanceIndex: 0 #当前消费者的索引
bindings:
input:
#内置的获取消息的通道 , 从rbt-default中获取消息
destination: rbt-default
binders:
#配置绑定器
defaultRabbit:
type: rabbit
- contentType:用于指定消息的类型。具体可以参考 spring cloud stream docs
- destination:指定了消息发送的目的地,对应 RabbitMQ,会发送到 exchange 是 itcastdefault 的所有消息队列中。
3、测试接收消息
/**
* 入门案例:
* 1.引入依赖
* 2.配置application.yml文件
* 3.需要配置一个通道的接口
* 内置获取消息的通道接口 sink
* 4.绑定通道
* 5.配置一个监听的方法:当程序从中间件获取数据后,窒息感的业务逻辑方法
* 需要在监听方法上配置一个StreamListener
*/
@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication{
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class);
}
@StreamListener(Sink.INPUT)
public void input(String message){
System.out.println("获取到的消息是:"+message);
}
}
参考资料
https://www.cnblogs.com/coderArron/p/16207926.html#Spring_Cloud_Stream__7
https://blog.csdn.net/weixin_43638238/article/details/126188294
作者:Jeebiz 创建时间:2023-03-30 10:35
最后编辑:Jeebiz 更新时间:2024-11-01 10:06
最后编辑:Jeebiz 更新时间:2024-11-01 10:06