rabbitmq_delayed_message_exchange

向 RabbitMQ 添加延迟消息(或计划消息)的插件。

下载插件

下载地址:https://www.rabbitmq.com/community-plugins.html

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

安装插件

与所有第三方插件一样,该.ez文件必须放在节点的插件目录中 ,并且可以被 RabbitMQ 进程的有效用户读取。

要找出插件目录是什么,请使用rabbitmq-plugins directories

[root@rabbitmq-1 src]# rabbitmq-plugins directories -s
Plugin archives directory: /usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@rabbitmq-1-plugins-expand
Enabled plugins file: /etc/rabbitmq/enabled_plugins

将出输出目录:将上述下载的plugin放到 Plugin archives directory

RabbitMQ 3.10.x
mv /usr/local/src/rabbitmq_delayed_message_exchange-3.10.2.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
RabbitMQ 3.11.x
mv /usr/local/src/rabbitmq_delayed_message_exchange-3.11.1.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.11.15/plugins
RabbitMQ 3.12.x
mv /usr/local/src/rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.12.12/plugins
启用/停用插件

然后运行以下命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins disable rabbitmq_delayed_message_exchange

用法

要使用延迟消息传递功能,请声明一个交换类型x-delayed-message:

// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...

请注意,我们在路由x-delayed-type部分下传递了一个名为 的额外标头,更多信息。

一旦我们声明了交换,我们就可以发布消息,提供一个标头,告诉插件延迟我们的消息多长时间:

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

byte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8");
Map<String, Object> headers2 = new HashMap<String, Object>();
headers2.put("x-delay", 1000);
AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2);
channel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2);
// ... more code ...

在上面的示例中,我们发布了两条消息,并在消息头中指定了延迟时间x-delay。对于这个例子,插件将首先向我们的队列传递带有正文的消息”more delayed payload”,然后是带有正文的消息”delayed payload”。

如果x-delay标头不存在,则插件将立即继续路由消息。

作者:Jeebiz  创建时间:2023-04-06 19:44
最后编辑:Jeebiz  更新时间:2024-11-01 10:06