rabbitmq_delayed_message_exchange
向 RabbitMQ 添加延迟消息(或计划消息)的插件。
下载插件
下载地址:https://www.rabbitmq.com/community-plugins.html
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
安装插件
与所有第三方插件一样,该.ez
文件必须放在节点的插件目录
中 ,并且可以被 RabbitMQ 进程的有效用户读取。
要找出插件目录是什么,请使用rabbitmq-plugins directories
[root@rabbitmq-1 src]# rabbitmq-plugins directories -s
Plugin archives directory: /usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@rabbitmq-1-plugins-expand
Enabled plugins file: /etc/rabbitmq/enabled_plugins
将出输出目录:将上述下载的plugin放到 Plugin archives directory
下
RabbitMQ 3.10.x
mv /usr/local/src/rabbitmq_delayed_message_exchange-3.10.2.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
RabbitMQ 3.11.x
mv /usr/local/src/rabbitmq_delayed_message_exchange-3.11.1.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.11.15/plugins
RabbitMQ 3.12.x
mv /usr/local/src/rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.12.12/plugins
启用/停用插件
然后运行以下命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
用法
要使用延迟消息传递功能,请声明一个交换类型x-delayed-message:
// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
请注意,我们在路由x-delayed-type
部分下传递了一个名为 的额外标头,更多信息。
一旦我们声明了交换,我们就可以发布消息,提供一个标头,告诉插件延迟我们的消息多长时间:
// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
byte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8");
Map<String, Object> headers2 = new HashMap<String, Object>();
headers2.put("x-delay", 1000);
AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2);
channel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2);
// ... more code ...
在上面的示例中,我们发布了两条消息,并在消息头中指定了延迟时间x-delay
。对于这个例子,插件将首先向我们的队列传递带有正文的消息”more delayed payload”,然后是带有正文的消息”delayed payload”。
如果x-delay
标头不存在,则插件将立即继续路由消息。
作者:Jeebiz 创建时间:2023-04-06 19:44
最后编辑:Jeebiz 更新时间:2024-11-01 10:06
最后编辑:Jeebiz 更新时间:2024-11-01 10:06