您好,欢迎来到站长目录(28sn.com)!


RocketMQ的发送模式和消费模式

来源:网络整理 浏览:183次 时间:2021-09-20
前言

小伙伴们大家好啊,王子又来和大家一起闲谈MQ技术了。

通过之前文章的学习,我们已经对RocketMQ的基本架构有了初步的了解,那今天王子就和大家一起来点实际的,用代码和大家一起看看RocketMQ的几种发送模式和消费模式。好了,让我们开始吧。

 RocketMQ的环境准备

本次我们的环境搭建是基于docker的,需要小伙伴们准备一台CentOS7的虚拟机(虚拟机的安装这里就不做介绍了)。

王子使用的是安装在VMware中的CentOS7虚拟机系统。正式安装环境之前,为了测试方便,我们关闭了centos系统的防火墙。然后我们需要在这台虚拟机中安装docke和docker-compose,具体的安装步骤大家可以参考https://www.runoob.com/docker/centos-docker-install.html,一些常用的docker命令也可以在这个网站里查看,还是很方便的,王子在这里分享给大家。

安装过docker和docker-compose后,我们就要开始安装RocketMQ的环境了,具体的安装步骤可以参考https://gitee.com/lm970585581/docker-rocketmq中的一键部署docker-compose部分。

步骤都很简单,基本上算是一键式傻瓜安装了,但是,王子在安装的过程中却被一个问题困扰住了很久。

下边给大家说一下遇到的问题和解决的办法,这是重点哦。

王子按照步骤一步一步的执行,到最后一步执行start.sh的时候悲剧发生了。

如果一切正常,执行了这一步骤后,docker中一共会启动三个容器,分别是nameserver、broker、和rocketMQ的管控台,但是王子执行docker ps查看正在运行的容器的时候发现,居然只有两个容器在运行,broker不见了。

于是打开localhost:8180(管控台地址),发现管控台中是空的。

于是我们开始查看broker容器的启动日志,使用的命令是:docker logs -f -t --tail 行数 容器名 

发现的报错信息是 java.io.FileNotFoundException: /etc/rocketmq/broker.conf (Permission denied)

到这里真实丈二的和尚摸不着头脑,一脸懵逼有木有。

进过一番努力的查找,终于找到了原因,由于centos7中安全模块selinux把权限禁掉了,所以导致不能读取broker.conf文件,具体这个安全模块是做什么的王子也没有仔细研究,我们目前是用不到,于是在linux中使用setenforce 0命令把它关闭了。

再重新启动broker容器,发现问题解决了。

好了环境我们部署完成之后,进入我们今天的主要内容,RocketMQ的发送模式和消费模式有哪几种。

 

RocketMQ的发送

 

同步发送

我们先来看一段生产者的代码:

import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.remoting.common.RemotingHelper;public class RocketMQProducer {    // RocketMQ的生产者类    private static DefaultMQProducer producer;    static {        // 构建生产者对象,指定生产组        producer=new DefaultMQProducer("test_group");        // 设置NameServer的地址,拉取路由信息        producer.setNamesrvAddr("192.168.220.110:9876");        try {            // 启动生产者            producer.start();        } catch (MQClientException e) {            e.printStackTrace();        }    }    public static void send(String topic,String message) throws Exception {        // 构建消息对象        Message msg=new Message(topic,                "",//这里存放的Tag 我们之后会讲解                message.getBytes(RemotingHelper.DEFAULT_CHARSET));        SendResult send = producer.send(msg);        System.out.println(send);    }    public static void main(String[] args) {        try {            send("test","hello world!");        } catch (Exception e) {            e.printStackTrace();        }    }}

上面的代码片段就是生产者发送消息到RocketMQ里去的代码,其实这种方式就是所谓的同步发送消息到MQ

那么什么叫同步发送消息到MQ里去?

所谓同步,意思就是你通过这行代码发送消息到MQ去,SendResult sendResult = producer.send(msg),然后你会卡在这里,代码不能往下走了

你要一直等待MQ返回一个结果给你,你拿到了结果之后,你的程序才会继续向下运行。

这个就是所谓的同步发送模式。

 

异步发送

接着我们来看一下异步发送的代码:

import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendCallback;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.remoting.common.RemotingHelper;public class RocketMQProducer {    // RocketMQ的生产者类    private static DefaultMQProducer producer;    static {        // 构建生产者对象,指定生产组        producer=new DefaultMQProducer("test_group");        // 设置NameServer的地址,拉取路由信息        producer.setNamesrvAddr("192.168.220.110:9876");        try {            // 启动生产者            producer.start();        } catch (MQClientException e) {            e.printStackTrace();        }        // 设置异步发送失败的时候不重试        producer.setRetryTimesWhenSendAsyncFailed(0);    }    public static void send(String topic,String message) throws Exception {        // 构建消息对象        Message msg=new Message(topic,                "",//这里存放的Tag 我们之后会讲解                message.getBytes(RemotingHelper.DEFAULT_CHARSET));        producer.send(msg, new SendCallback() {            public void onSuccess(SendResult sendResult) {                System.out.println(sendResult);            }            public void onException(Throwable throwable) {                System.out.println(throwable);            }        });    }    public static void main(String[] args) {        try {            send("test","hello world!");        } catch (Exception e) {            e.printStackTrace();        }    }}

意思就是消息发送后,代码继续向下运行,等到mq返回结果的时候,如果返回成功,就会调用回调函数onSuccess方法,返回失败就会调用onException方法。

这就是异步发送,它的特点就是不会阻塞程序,消息返回结果后再调用回调函数。

 

单向发送

还有一种发送方式,叫做单向发送,那么什么是单向发送呢?

代码如下:

producer.sendOneway(msg);

它的意思就是生产者发送消息给MQ,发送后程序继续向下运行,不会阻塞,而且不会再管MQ的返回值。

也就是说发送过后就不关它的事了。

RocketMQ的发送方式就介绍到这里,关于具体的使用场景我们之后的文章再讨论,现在只要清楚有这些方式就可以了。

 

RocketMQ的消费

Push消费

我们来看一下push消费的代码

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;import java.util.List;import static com.alibaba.rocketmq.remoting.common.RemotingHelper.DEFAULT_CHARSET;public class RocketMQConsumer {    public static void main(String[] args) throws MQClientException {        // 创建push消费者实例,指定消费者组        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group");        // 设置NameServer的地址,拉取路由信息        consumer.setNamesrvAddr("192.168.220.110:9876");        // 订阅test Topic , 第二个参数是Tag        consumer.subscribe("test",null);        // 注册消费者监听器,接收到消息就会调用这个方法        consumer.registerMessageListener(new MessageListenerConcurrently() {            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {                // 在这里进行消息的处理                for (MessageExt t : msgs) {                    try {                        System.out.println(new String(t.getBody(), DEFAULT_CHARSET));                    } catch (UnsupportedEncodingException e) {                        e.printStackTrace();                    }                }                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        // 启动消费者实例        consumer.start();        System.out.println("----------Consumer Start-----------");    }}

大家注意里面Consumer的类名:DefaultMQPushConsumer。

从类名中我们可以提取出来一个关键的信息:Push。其实从这里我们就能看出来,当前我们使用的消息消费实际上是Push模式。

那么什么是Push消费模式呢?

简单来讲就是Broker会主动把消息发送给你的消费者,你的消费者是被动的接收Broker推送给过来的消息,然后进行处理。

这个就是所谓的Push模式,意思就是Broker主动推送消息给消费者。

Pull消费

接下来我们简单说一下Pull消费的方式。

Pull消费其实理解起来也很容易,就是Broker不在主动推送消息给消费者了,而是消费者主动发送请求从Broker中拉取消息。

至于什么时候用Push模式,什么时候用Pull模式,我们以后再聊这个话题。

 

总结

今天我们一起使用Docker快速部署了RocketMQ的环境,又为小伙伴们分享了部署环境时遇到的坑和解决办法,希望小伙伴们不要再遇到这个问题的时候无从下手。

之后我们又一起写了生产者和消费者的代码,聊了聊RocketMQ的几种发送模式和消费模式。相信小伙伴们应该会有所收获。

那今天的分享就到这里,希望小伙伴们继续支持我,那么下次的分享不见不散。

推荐站点

  • 我爱发烧音乐我爱发烧音乐

    我爱发烧音乐囊括了从流行音乐到古典音乐多个类型的音乐作品,专栏推荐最新的音乐,提供音乐排名榜单!可供免费线上收听音乐,歌曲流畅,音效极佳! 网站提供的钢琴以及二胡专栏,可供收听者,陶冶情操,改善心情,是难得的轻音乐典藏!

    www.520fs.com
  • 世纪音乐网世纪音乐网

    世纪音乐网是专业的在线音乐试听MP3下载网站。歌曲总计30余万首,收录了网上最新歌曲和流行音乐,DJ舞曲,非主流音乐,经典老歌,劲舞团歌曲,搞笑歌曲,儿童歌曲,英文歌曲等。是您上网听歌的最佳网站。

    www.ssjj.com
  • 杭州网杭州网

      杭州网是杭州地区唯一的新闻门户网站,由中共杭州市委宣传部、杭州日报报业集团和杭州广播电视集团共同组建的杭州网络传媒有限公司运营。

    www.hangzhou.com.cn
  • 深圳在线深圳在线

      深圳在线 www.szol.net是深圳本地最大、最早的地方生活资讯网站之一,网站名“深圳在线www.szol.net”由南方报业传媒集团编辑委员会总编辑、南方日报社总编辑、南方都市报总编辑、南方书画院名誉院长王春芙亲笔题名,深圳在线www.szol.net团队与深圳热线www.szonline.net、奥一网www.oeeee.com都源于全国最早成立于1996年的知名网络公司——深圳万用网。

    www.szol.net
  • 今题网今题网

     今题网- 中国领先的社区服务网,提供社区服务, 在线交友和商家推广服务,于2004年创建上线,公司现有员工超过百名。今题网自成立以来,凭借其独特的定位和丰富的社区交友功能, 凭借其团队超强的搜索引擎优化技术吸引超过千万的用户成为今题网的注册会员。

    www.jinti.com

鄂公网安备 42062502000001号