Java RocketMQ2:生产者、消费者

1969-任同学

发表文章数:77

热门标签

,
首页 » 算法 » 正文

1. 创建工程

RocketMQ有四个角色,分别是Producer、Consumer、Broker和NameServer。 它们各自的作用如下:

Producer:消息生产者,负责消息的生产和发送。

Consumer:消息消费者,负责消息的接收和使用。

Broker:用于接收生产者发送消息,或者消费者消费消息的请求,负责消息的传输和临时存储。

NameServer:提供服务注册、服务剔除、服务发现等功能,负责协调整个消息队列,维护配置信息和状态信息。

Java RocketMQ2:生产者、消费者

创建一个Spring Boot工程,在pom.xml中添加配置:


    org.apache.rocketmq
    rocketmq-client
    4.4.0

2. 生产者

生产者将数据写入消息队列。创建一个Producer类:

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

public class Producer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        //1. 创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
        //2. 指定Nameserver地址
        producer.setNamesrvAddr("192.168.195.128:9876");
        //3. 启动producer
        producer.start();
        //4. 创建消息
        //参数1:主题 参数2:用于消息过滤 参数3:消息的唯一值 参数4:消息信息,需要转编码格式
        Message msg = new Message("TopicDemo",
                "Tag1",
                "Key1",
                "HelloWorld".getBytes(RemotingHelper.DEFAULT_CHARSET));
        //5. 发送消息
        SendResult result = producer.send(msg);
        System.out.println(result);
    }
}

输出:

SendResult [sendStatus=SEND_OK, msgId=C0A8017A590818B4AAC2509F26690000, offsetMsgId=C0A8C38000002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicDemo, brokerName=rocketmq-nameserver1, queueId=0], queueOffset=0]

在控制台message中可以看到这条消息。

3. 消费者

消费者负责读取已发送的数据。创建一个Consumer类:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1. 创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_producer_group");
        //2. 指定Nameserver地址
        consumer.setNamesrvAddr("192.168.195.128:9876");
        //设置消息拉取最大数
        consumer.setConsumeMessageBatchMaxSize(2);
        //3. 订阅主题Topic和Tag
        consumer.subscribe("TopicDemo", //要消费的主题
                "*"); //过滤规则
        //4. 创建消息监听
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                //迭代消息信息
                for (MessageExt msg : msgs) {
                    //获取主题
                    String topic = msg.getTopic();
                    //获取标签
                    String tags = msg.getTags();
                    //获取信息
                    byte[] body = msg.getBody();
                    try {
                        String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("Consumer消费信息——Topic: " + topic + ", Tags: " + tags + ", Result: " + result);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                //消息消费完成
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
    }
}

输出:

Consumer消费信息——Topic: TopicDemo, Tags: Tag1, Result: HelloWorld

未经允许不得转载:作者:1969-任同学, 转载或复制请以 超链接形式 并注明出处 拜师资源博客
原文地址:《Java RocketMQ2:生产者、消费者》 发布于2021-09-18

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录