测试消息收发

工具测试消息收发

在进行工具测试消息收发之前,我们需要告诉客户端NameServer的地址,RocketMQ有多种方式在客户端中设置NameServer地址,这里我们利用环境变量NAMESRV_ADDR

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...

SDK测试消息收发

工具测试完成后,我们可以尝试使用 SDK 收发消息。这里以 Java SDK 为例介绍一下消息收发过程,可以从 rocketmq-clients 中参阅更多细节。

在IDEA中创建一个Java工程。

在 pom.xml 文件中添加以下依赖引入Java依赖库,将 rocketmq-client-java-version 替换成 最新的版本.

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client-java</artifactId>
   <version>${rocketmq-client-java-version}</version>
</dependency>

通过mqadmin创建 Topic。

$ sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster

在已创建的Java工程中,创建发送普通消息程序并运行,示例代码如下:

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.time.Duration;
import java.util.List;
public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        //接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoint = "localhost:8081";
        //消息发送的目标Topic名称,需要提前创建。
        String topic = "TestTopic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();
        //初始化Producer时需要设置通信配置以及预绑定的Topic。
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        //普通消息发送。
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                //设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                //消息体。
                .setBody("messageBody".getBytes())
                .build();
        try {
            //发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

在已创建的Java工程中,创建订阅普通消息程序并运行。Apache RocketMQ 支持SimpleConsumer和PushConsumer两种消费者类型,您可以选择以下任意一种方式订阅消息。

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;

import java.io.IOException;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        //接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoints = "localhost:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        //订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        //为消费者指定所属的消费者分组,Group需要提前创建。
        String consumerGroup = "Your ConsumerGroup";
        //指定需要订阅哪个目标Topic,Topic需要提前创建。
        String topic = "TestTopic";
        //初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                //设置消费者分组。
                .setConsumerGroup(consumerGroup)
                //设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                //设置消费监听器。
                .setMessageListener(messageView -> {
                    //处理消息并返回消费结果。
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume message!!");
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        //如果不需要再使用PushConsumer,可关闭该进程。
        //pushConsumer.close();
    }
}
作者:Jeebiz  创建时间:2023-01-30 11:20
最后编辑:Jeebiz  更新时间:2024-11-01 10:06