Go To My HomePage

RocketMQ的使用

一、RocketMQ介绍

MQ作用

MQ-worker

MQ是一种”“先进先出”的数据结构,主要包含以下三个作用

  • 应用解耦

​ 系统的耦合性越高,容错性就越低。例如下订单如果RPC调用仓储系统,如果仓储系统出库模块异常将导致订单无法创建,这基本是我们无法容忍的。所以可以采用MQ的方式,接收创建订单成功的消息调用仓储出库。

  • 流量削峰

​ 应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。

  • 异步执行

​ 通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。

各种MQ产品的比较

特性 ActiveMQ RabbitMQ RocketMQ kafka
开发语言 java erlang java scala
单机吞吐量 万级 万级 10万级 10万级
时效性 ms级 us级 ms级 ms级以内
可用性 高(主从架构) 高(主从架构) 非常高(分布式架构) 非常高(分布式架构)
功能特性 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 MQ功能比较完备,扩展性佳 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广

RocketMQ各角色介绍

角色 含义
Producer 消息的发送者
Consumer 消息接收者
Broker 暂存和传输消息
NameServer 管理Broker,发送者和消费者从NameServer找到对应的Broker
Topic 消息主题
Message Queue 相当于是Topic的分区

RocketMQ-role

二、RocketMQ集群搭建

前置条件:JDK1.8

RocketMQ下载地址、RocketMQ-Externals(GUI管理界面)下载地址

各角色之间的关系

  • NameServer是一个几乎无状态节点。可集群部署,节点之间无任何信息同步。

  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

2Master-2Slave的RocketMQ服务搭建

rmq-basic-arc

角色介绍

IP 角色 架构模式
192.168.22.162 nameserver、brokerserver Master1、Slave2、NameServer
192.168.22.163 nameserver、brokerserver Master2、Slave1、NameServer

第一步:两台机器都解压

unzip rocketmq-all-[version]-bin-release.zip
mv rocketmq-[version] /opt/module
mv /opt/module/rocketmq-[version] /opt/module/rocketmq

第二步:两台机器都创建消息存储目录

# 192.168.22.162
mkdir -p /opt/module/rocketmq/store/broker-a
mkdir -p /opt/module/rocketmq/store/broker-b-s

# 192.168.22.163
mkdir -p /opt/module/rocketmq/store/broker-b
mkdir -p /opt/module/rocketmq/store/broker-a-s

第三步:

  • 配置192.168.22.162master1,编辑/opt/module/rocketmq/conf/2m-2s-sync/broker-a.properties (可重命名),删除 broker-a-s.properties
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0表示Master,1表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=192.168.22.162:9876;192.168.22.163:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker对外服务的监听端口,不可重复
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# consumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/opt/module/rocketmq/store/broker-a
# 限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
# Broker 的角色
#  - ASYNC_MASTER 异步复制Master
#  - SYNC_MASTER 同步双写Master
#  - SLAVE
brokerRole=SYNC_MASTER
# 刷盘方式
#  - ASYNC_FLUSH 异步刷盘
#  - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
# 发消息线程池数量
#sendMessageThreadPoolNums=128
# 拉消息线程池数量
#pullMessageThreadPoolNums=128
  • 配置192.168.22.162slave2,编辑/opt/module/rocketmq/conf/2m-2s-sync/broker-b-s.properties (可重命名),删除 broker-b.properties
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
# 0表示Master,1表示Slave
brokerId=1
# nameServer地址,分号分割
namesrvAddr=192.168.22.162:9876;192.168.22.163:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=11011
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/opt/module/rocketmq/store/broker-b-s
# 限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
# Broker 的角色
#  - ASYNC_MASTER 异步复制Master
#  - SYNC_MASTER 同步双写Master
#  - SLAVE
brokerRole=SLAVE
# 刷盘方式
#  - ASYNC_FLUSH 异步刷盘
#  - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
# 发消息线程池数量
#sendMessageThreadPoolNums=128
# 拉消息线程池数量
#pullMessageThreadPoolNums=128
  • 配置192.168.22.163master2,编辑/opt/module/rocketmq/conf/2m-2s-sync/broker-b.properties (可重命名),删除 broker-b-s.properties
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
# 0表示Master,1表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=192.168.22.162:9876;192.168.22.163:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker对外服务的监听端口,不可重复
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# consumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/opt/module/rocketmq/store/broker-b
# 限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
# Broker 的角色
#  - ASYNC_MASTER 异步复制Master
#  - SYNC_MASTER 同步双写Master
#  - SLAVE
brokerRole=SYNC_MASTER
# 刷盘方式
#  - ASYNC_FLUSH 异步刷盘
#  - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
# 发消息线程池数量
#sendMessageThreadPoolNums=128
# 拉消息线程池数量
#pullMessageThreadPoolNums=128
  • 配置192.168.22.163slave1,编辑/opt/module/rocketmq/conf/2m-2s-sync/broker-a-s.properties (可重命名),删除 broker-a.properties
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0表示Master,1表示Slave
brokerId=1
# nameServer地址,分号分割
namesrvAddr=192.168.22.162:9876;192.168.22.163:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=11011
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/opt/module/rocketmq/store/broker-a-s
# 限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
# Broker 的角色
#  - ASYNC_MASTER 异步复制Master
#  - SYNC_MASTER 同步双写Master
#  - SLAVE
brokerRole=SLAVE
# 刷盘方式
#  - ASYNC_FLUSH 异步刷盘
#  - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
# 发消息线程池数量
#sendMessageThreadPoolNums=128
# 拉消息线程池数量
#pullMessageThreadPoolNums=128

第四步:修改启动脚本文件

修改/opt/module/rocketmq/bin/runbroker.sh

#===================================================
# 开发环境配置 JVM Configuration
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

修改/opt/module/rocketmq/bin/runserver.sh

#===================================================
# 开发环境配置 JVM Configuration
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

第五步:启动NameServe集群,分别在两台机器都启动NameServer

# 1.启动NameServer
nohup sh mqnamesrv > mqnamesrv.log  2>&1 &
# 查看进程
jps

第六步:在192.168.22.162上启动master1和slave2

nohup sh mqbroker -c /opt/module/rocketmq/conf/2m-2s-sync/broker-a.properties > broker-a.log  2>&1 &
nohup sh mqbroker -c /opt/module/rocketmq/conf/2m-2s-sync/broker-b-s.properties > broker-b-s.log  2>&1 &
jps

第七步:在192.168.22.163上启动master2和slave1

nohup sh mqbroker -c /opt/module/rocketmq/conf/2m-2s-sync/broker-b.properties > broker-b.log  2>&1 &
nohup sh mqbroker -c /opt/module/rocketmq/conf/2m-2s-sync/broker-a-s.properties > broker-a-s.log  2>&1 &
jps

关闭broker命令:sh mqshutdown broker

集群监控平台搭建

下载源码包,修改application.properties文件

rocketmq.config.namesrvAddr=192.168.22.162:9876;192.168.22.163:9876

打包编译运行即可

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-[version].jar

三、RocketMQ基本使用

发送同步消息

可靠性的同步反馈发送结果

@Test
public void syncSendMessageTest() throws Exception {
  // 实例化消息生产者Producer
  DefaultMQProducer producer = new DefaultMQProducer("rocketmq-cluster", "sync_send_message_producer_group");
  // 设置NameServer的地址
  producer.setNamesrvAddr("192.168.22.162:9876;192.168.22.163:9876");
  // 启动producer
  producer.start();

  for (int i = 0; i < 10; i++) {
    // 创建消息,并指定Topic,Tag和消息体
    Message message = new Message("sync_product", JSON.toJSONBytes("Hello RocketMQ Sync Send Message " + i));
    // 发送消息
    SendResult sendResult = producer.send(message);
    System.out.println("sendResult = " + JSON.toJSONString(sendResult));
  }

  TimeUnit.SECONDS.sleep(5);
  // 如果不再发送消息,关闭Producer实例。
  producer.shutdown();
}

发送异步消息

异步接收发送的结果,不阻塞,快速响应

@Test
public void asyncSendMessageTest() throws Exception {
  // 实例化消息生产者Producer
  DefaultMQProducer producer = new DefaultMQProducer("rocketmq-cluster", "async_send_message_producer_group");
  // 设置NameServer的地址
  producer.setNamesrvAddr("192.168.22.162:9876;192.168.22.163:9876");
  // 启动producer
  producer.start();

  for (int i = 0; i < 10; i++) {
    // 创建消息,并指定Topic,Tag和消息体
    Message message = new Message("async_product", JSON.toJSONBytes("Hello RocketMQ Async Send Message " + i));
    // 发送消息
    producer.send(message, new SendCallback() {

      // 成功处理
      @Override
      public void onSuccess(SendResult sendResult) {
        System.out.println("act=onSuccess sendResult = " + JSON.toJSONString(sendResult));
      }

      // 异常处理
      @Override
      public void onException(Throwable throwable) {
        System.out.println("act=onException throwable = " + JSON.toJSONString(throwable));
      }
    });

  }

  TimeUnit.SECONDS.sleep(5);
  // 如果不再发送消息,关闭Producer实例。
  producer.shutdown();
}

单向发送消息

发送消息,对发送结果不关心

@Test
public void onewaySendMessageTest() throws Exception {
  // 实例化消息生产者Producer
  DefaultMQProducer producer = new DefaultMQProducer("rocketmq-cluster", "oneway_send_message_producer_group");
  // 设置NameServer的地址
  producer.setNamesrvAddr("192.168.22.162:9876;192.168.22.163:9876");
  // 启动producer
  producer.start();

  for (int i = 0; i < 10; i++) {
    // 创建消息,并指定Topic,Tag和消息体
    Message message = new Message("oneway_product", JSON.toJSONBytes("Hello RocketMQ Oneway Send Message " + i));
    // 发送消息, 不关心发送结果直接返回
    producer.sendOneway(message);
  }

  TimeUnit.SECONDS.sleep(5);
  // 如果不再发送消息,关闭Producer实例。
  producer.shutdown();
}

批量发送消息

批量发送消息,消息一次不能超过4M

@Test
public void batchSendMessageTest() throws Exception {
  // 实例化消息生产者Producer
  DefaultMQProducer producer = new DefaultMQProducer("rocketmq-cluster", "batch_send_message_producer_group");
  // 设置NameServer的地址
  producer.setNamesrvAddr("192.168.22.162:9876;192.168.22.163:9876");
  // 启动producer
  producer.start();

  // 构建消息
  List<Message> messages = new ArrayList<>();
  messages.add(new Message("batch_topic", JSON.toJSONBytes("Batch Message 0")));
  messages.add(new Message("batch_topic", JSON.toJSONBytes("Batch Message 1")));
  messages.add(new Message("batch_topic", JSON.toJSONBytes("Batch Message 2")));

  // 发送消息, 不关心发送结果直接返回
  SendResult sendResult = producer.send(messages);
  System.out.println("sendResult = " + JSON.toJSONString(sendResult));

  TimeUnit.SECONDS.sleep(5);
  // 如果不再发送消息,关闭Producer实例。
  producer.shutdown();
}

如果消息超过了4M,可以用如下工具类切分

public class ListSplitter implements Iterator<List<Message>> {
  private final int SIZE_LIMIT = 1024 * 1024 * 4;
  private final List<Message> messages;
  private int currIndex;
  public ListSplitter(List<Message> messages) {
    this.messages = messages;
  }
  @Override 
  public boolean hasNext() {
    return currIndex < messages.size();
  }
  @Override 
  public List<Message> next() {
    int nextIndex = currIndex;
    int totalSize = 0;
    for (; nextIndex < messages.size(); nextIndex++) {
      Message message = messages.get(nextIndex);
      int tmpSize = message.getTopic().length() + message.getBody().length;
      Map<String, String> properties = message.getProperties();
      for (Map.Entry<String, String> entry : properties.entrySet()) {
        tmpSize += entry.getKey().length() + entry.getValue().length();
      }
      tmpSize = tmpSize + 20; // 增加日志的开销20字节
      if (tmpSize > SIZE_LIMIT) {
        // 单个消息超过了最大的限制
        // 忽略,否则会阻塞分裂的进程
        if (nextIndex - currIndex == 0) {
          // 假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
          nextIndex++;
        }
        break;
      }
      if (tmpSize + totalSize > SIZE_LIMIT) {
        break;
      } else {
        totalSize += tmpSize;
      }

    }
    List<Message> subList = messages.subList(currIndex, nextIndex);
    currIndex = nextIndex;
    return subList;
  }
}

集群模式消费消息

消费者采用集群方式消费消息,同一个组多个消费者共同消费队列消息(即发送10个消息,两个不同组,每组都部署了N台,每组都消费10个消息),这是最常用的消费模式

@Test
public void clusterConsumerTest() throws Exception {
  // 实例化消息生产者,指定组名
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketmq-cluster","clusterConsumerGroup");
  // 设置NameServer的地址
  consumer.setNamesrvAddr("192.168.22.162:9876;192.168.22.163:9876");
  // 订阅Topic
  consumer.subscribe("sync_product", "*");
  // 集群模式消费
  consumer.setMessageModel(MessageModel.CLUSTERING);
  // 从队列尾部消费
  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  // 注册回调函数,处理消息
  consumer.registerMessageListener(new MessageListenerConcurrently() {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
      for (MessageExt msg : msgs) {
        System.out.printf("topic=%s tag=%s msgId=%s msgKey=%s msgBody=%s \n",
                          msg.getTopic(), msg.getTags(), msg.getMsgId(), msg.getKeys(), 
                          JSON.parseObject(msg.getBody(), String.class));
      }
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
  });
  //启动消息者
  consumer.start();
  TimeUnit.SECONDS.sleep(1000);
}

广播模式消费消息

消费者采用广播方式消费消息,每个部署实例都消费队列的消息(即只要有实例都消费,每个实例会消费到相同的消息)

@Test
public void broadcastConsumerTest() throws Exception {
  // 实例化消息生产者,指定组名
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketmq-cluster","broadcastConsumerGroup");
  // 设置NameServer的地址
  consumer.setNamesrvAddr("192.168.22.162:9876;192.168.22.163:9876");
  // 订阅Topic
  consumer.subscribe("sync_product", "*");
  // 广播模式消费
  consumer.setMessageModel(MessageModel.BROADCASTING);
  // 从队列尾部消费
  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  // 注册回调函数,处理消息
  consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
      for (MessageExt msg : msgs) {
        System.out.printf("topic=%s tag=%s msgId=%s msgKey=%s msgBody=%s \n",
                          msg.getTopic(), msg.getTags(), msg.getMsgId(), msg.getKeys(), 
                          JSON.parseObject(msg.getBody(), String.class));
      }
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
  });
  //启动消息者
  consumer.start();
  TimeUnit.SECONDS.sleep(1000);
}

顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列),而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

@Test
public void sendOrderMessageTest() throws Exception {
  // 实例化消息生产者Producer
  DefaultMQProducer producer = new DefaultMQProducer("rocketmq-cluster", "order_message_producer_group");
  // 设置NameServer的地址
  producer.setNamesrvAddr("192.168.22.162:9876;192.168.22.163:9876");
  // 启动producer
  producer.start();

  for (int i = 0; i < 10; i++) {
    // 创建消息,并指定Topic,Tag和消息体
    Message message = new Message("order_product", JSON.toJSONBytes("order Message " + i));
    // 发送消息
    SendResult sendResult = producer.send(message,new MessageQueueSelector() {

      @Override
      public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer argInt = (Integer) arg;
        return mqs.get(argInt % mqs.size());
      }
    }, 101);
    System.out.println("sendResult = " + JSON.toJSONString(sendResult));
  }

  TimeUnit.SECONDS.sleep(10);
  // 如果不再发送消息,关闭Producer实例。
  producer.shutdown();
}


@Test
public void orderConsumerTest3() throws Exception {
  // 实例化消息生产者,指定组名
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketmq-cluster", "orderConsumerGroup");
  // 设置NameServer的地址
  consumer.setNamesrvAddr("192.168.22.162:9876;192.168.22.163:9876");
  // 订阅Topic
  consumer.subscribe("order_product", "*");
  // 集群模式消费
  consumer.setMessageModel(MessageModel.CLUSTERING);
  // 从队列尾部消费
  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  // 注册回调函数,处理消息
  consumer.registerMessageListener(new MessageListenerOrderly() {

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
      for (MessageExt msg : msgs) {
        System.out.printf("topic=%s tag=%s msgId=%s msgKey=%s msgBody=%s \n",
                          msg.getTopic(), msg.getTags(), msg.getMsgId(), msg.getKeys(),
                          JSON.parseObject(msg.getBody(), String.class));
      }
      return ConsumeOrderlyStatus.SUCCESS;
    }

  });
  //启动消息者
  consumer.start();
  TimeUnit.SECONDS.sleep(1000);
}

延时消息

延迟消息可见,RocketMQ支持如下时间点的延迟1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h, 等级从左到右从1开始

@Test
public void testDelayMessage() throws Exception {
  // 实例化消息生产者Producer
  DefaultMQProducer producer = new DefaultMQProducer("rocketmq-cluster", "delay_message_producer_group");
  // 设置NameServer的地址
  producer.setNamesrvAddr("192.168.22.162:9876;192.168.22.163:9876");
  // 启动producer
  producer.start();

  for (int i = 0; i < 10; i++) {
    // 创建消息,并指定Topic,Tag和消息体
    Message message = new Message("delay_product", JSON.toJSONBytes("Hello RocketMQ Delay Send Message " + i));
    // 设置消息延迟等级
    message.setDelayTimeLevel(4);
    // 发送消息
    SendResult sendResult = producer.send(message);
    System.out.println("sendResult = " + JSON.toJSONString(sendResult));
  }

  TimeUnit.SECONDS.sleep(10);
  // 如果不再发送消息,关闭Producer实例。
  producer.shutdown();
}

apache没有定时消息的概念,阿里云有定时消息的概念:指定时间转成时间戳,表示定时消息

过滤消息消费

  • tag过滤
// 利用Tag进行过滤,接收所有Tag消息为 *
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
  • SQL过滤消息消费
// a取自消息的属性,即Message.putUserProperty("a", String.valueOf(num))
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

事务消息

事务消息只保证发送端的事务,不保证消费端的事务,消费失败时发送并不会任务补偿回滚

transaction-message

1)事务消息发送及提交

  1. 发送消息(half消息,不可见)
  2. 服务端响应消息写入结果
  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
  4. 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2)事务补偿

  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
  2. Producer收到回查消息,检查回查消息对应的本地事务的状态
  3. 根据本地事务状态,重新Commit或者Rollback

3)事务消息状态

  1. TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息
  2. TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费
  3. TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态

发送事务消息

@Test
public void sendTransactionMessageTest() throws Exception {
  // 实例化消息生产者Producer
  TransactionMQProducer producer = new TransactionMQProducer("rocketmq-cluster", "transaction_message_producer_group");
  // 设置NameServer的地址
  producer.setNamesrvAddr("192.168.22.162:9876;192.168.22.163:9876");
  producer.setTransactionListener(new OrderTransactionListenerImpl());
  // 启动producer
  producer.start();

  for (int i = 0; i < 10; i++) {
    // 创建消息,并指定Topic,Tag和消息体
    Message message = new Message("transaction_product", JSON.toJSONBytes("Order  Transaction Message"));
    // 发送消息
    SendResult sendResult = producer.sendMessageInTransaction(message, null);
    System.out.println("sendResult = " + JSON.toJSONString(sendResult));
  }

  TimeUnit.SECONDS.sleep(1000);
  // 如果不再发送消息,关闭Producer实例。
  producer.shutdown();
}

事务消息的Listener

public class OrderTransactionListenerImpl implements TransactionListener {

  // 这里不应该是本地缓存,应该使用redis等
  private static ConcurrentHashMap<String, Integer> countCheckNumMap = new ConcurrentHashMap<>();

  // 最大检查次数
  private static final int MAX_CHECK = 5;

  @Override
  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    try {
      System.out.println("transactionId = " + msg.getTransactionId());
      //执行本地事务
      System.out.println("执行本地事务并保存TransactionId到一张表中");
      // 模拟执行过程中发送异常
      boolean exeException = RandomUtil.randomBoolean();
      if (exeException) {
        System.out.println("transactionId = " + msg.getTransactionId() + " 执行过程中遇到了异常");
        throw new RuntimeException("执行过程中遇到了异常");
      }
      // 模拟执行过程宕机,回滚
      boolean unknowException = RandomUtil.randomBoolean();
      if (unknowException) {
        System.out.println("transactionId = " + msg.getTransactionId() + " 执行过程中遇到了中断执行");
        return LocalTransactionState.UNKNOW;
      }
    } catch (Throwable e) {
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    System.out.println("transactionId = " + msg.getTransactionId() + " 提交");
    return LocalTransactionState.COMMIT_MESSAGE;
  }

  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    System.out.println("checkLocalTransaction transactionId = " + msg.getTransactionId());
    // 从数据库中查找是否有此transactionId,有返回commit,没有返回unknow,示例代码用随机了
    boolean findTransaction = RandomUtil.randomBoolean();
    if (findTransaction) {
      System.out.println("提交 transactionId = " + msg.getTransactionId());
      return LocalTransactionState.COMMIT_MESSAGE;
    }
    return rollbackOrUnknow(msg.getTransactionId());
  }

  private LocalTransactionState rollbackOrUnknow(String transactionId) {
    Integer countCheckNum = countCheckNumMap.get(transactionId);

    if (countCheckNum != null && countCheckNum > MAX_CHECK) {
      System.out.println("回滚 transactionId = " + transactionId);
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    if (countCheckNum == null) {
      countCheckNumMap.put(transactionId, 1);
    }

    System.out.println("未知 transactionId = " + transactionId);
    return LocalTransactionState.UNKNOW;
  }
}

事务消息注意点

  • 事务消息不支持延时消息和批量消息。
  • 为了避免单个消息被检查太多次而导致半队列消息累积,默认单个消息的检查次数限制为15次,可以修改Broker配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话(N =transactionCheckMax ) 则Broker将丢弃此消息,并在默认情况下同时打印错误日志。可以重写AbstractTransactionCheckListener修改这个行为

  • 事务消息将在Broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,默认为1分钟

  • 事务性消息可能不止一次被检查或消费

  • 事务消息的生产者ID不能与其他类型消息的生产者ID共享。事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者

四、SpringBoot集成RocketMQ

前置条件POM引入依赖

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>[version]</version>
</dependency>

配置application.properties文件

# nameServer服务器地址
rocketmq.name-server=192.168.22.162:9876;192.168.22.163:9876
# 发送者组名称
rocketmq.producer.group=myApp-producer-group
# 发送消息的超时时间
rocketmq.producer.send-message-timeout=3000
# 压缩消息发送的阈值,单位字节
rocketmq.producer.compress-message-body-threshold=4096
# 消息体允许的最大大小
rocketmq.producer.max-message-size=4194304
# 异步发送消息失败的重试次数。默认2次
rocketmq.producer.retry-times-when-send-async-failed=0
# 发送消息给Broker时,如果发送失败,是否重试另外一台Broker,默认为false
rocketmq.producer.retry-next-server=false
# 同步发送消息失败的重试次数,默认2次
rocketmq.producer.retry-times-when-send-failed=2
# 是否开启消息轨迹功能,默认为false关闭
rocketmq.producer.enable-msg-trace=true
# 是否开启消息轨迹功能,默认为false关闭
rocketmq.consumer.enable-msg-trace=true
# 自定义消息轨迹的Topic,默认为 RMQ_SYS_TRACE_TOPIC
rocketmq.producer.customized-trace-topic=RMQ_SYS_TRACE_TOPIC

发送消息

  • 同步发送消息

      // 第一种方式
      rocketMQTemplate.convertAndSend("userTopic:tagA", user);
      // 第二种方式
      rocketMQTemplate.send("userTopic:tagB", MessageBuilder.withPayload(user).build());
    
  • 异步发送消息

      rocketMQTemplate.asyncSend("userTopic:tagC", user, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
          System.out.println("async onSuccess SendResult=" + JSON.toJSONString(sendResult));
        }
        
        @Override
        public void onException(Throwable throwable) {
          System.out.println("async onException Throwable=" + JSON.toJSONString(throwable));
        }
      });
    
  • 发送顺序消息

      // 第三个参数为以哪个字段作为hashkey发送到对应队列
      rocketMQTemplate.syncSendOrderly("orderly_user_topic", user, "name");
    
  • 发送事务消息[以下采用分离rocketTemplate方式]

    • 创建新的专属的RocketTemplate
      @Configuration
      @AutoConfigureAfter({ RocketMQAutoConfiguration.class})
      @AutoConfigureBefore({ RocketMQTransactionConfiguration.class})
      public class MqTransactionConfigure implements ApplicationContextAware {
        
        public static final String PRODUCER_BEAN_NAME = "defaultMQProducer";
        public static final String CONSUMER_BEAN_NAME = "defaultLitePullConsumer";
        
        private ApplicationContext applicationContext;
        
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
          this.applicationContext = applicationContext;
        }
        
        @Bean(name = "userTransactionRocketMqTemplate", destroyMethod = "destroy")
        public RocketMQTemplate userTransactionRocketMqTemplate(RocketMQMessageConverter rocketMqMessageConverter) {
          RocketMQTemplate userTransactionRocketMqTemplate = new RocketMQTemplate(){
            @Override
            public void afterPropertiesSet() throws Exception {
              return;
            }
          };
          if (applicationContext.containsBean(PRODUCER_BEAN_NAME)) {
            userTransactionRocketMqTemplate.setProducer((DefaultMQProducer) applicationContext.getBean(PRODUCER_BEAN_NAME));
          }
          if (applicationContext.containsBean(CONSUMER_BEAN_NAME)) {
            userTransactionRocketMqTemplate.setConsumer((DefaultLitePullConsumer) applicationContext.getBean(CONSUMER_BEAN_NAME));
          }
          userTransactionRocketMqTemplate.setMessageConverter(rocketMqMessageConverter.getMessageConverter());
          return userTransactionRocketMqTemplate;
        }
      }
    
    • 事务执行Listener
      @Service
      @RocketMQTransactionListener(corePoolSize=5, maximumPoolSize = 20, rocketMQTemplateBeanName="userTransactionRocketMqTemplate")
      public class UserTransactionListener implements RocketMQLocalTransactionListener {
        
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
          System.out.println("executeLocalTransaction");
          return RocketMQLocalTransactionState.COMMIT;
        }
        
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
          System.out.println("checkLocalTransaction");
          return RocketMQLocalTransactionState.COMMIT;
        }
      }
    
    • 发送事务消息
      TransactionSendResult sendResult = userTransactionRocketMqTemplate.sendMessageInTransaction(
        "test_transaction", MessageBuilder.withPayload(user).build(), null);
    

消费消息

  • 只需要消息的有效负载

      @Service
      @RocketMQMessageListener(
        consumerGroup = "myApp-userTopic-consumer",
        topic = "userTopic",
        selectorExpression = "*", 
        messageModel = MessageModel.CLUSTERING)
      public class UserConsumer implements RocketMQListener<User> {
        
        @Override
        public void onMessage(User user) {
          System.out.println("user = " + JSON.toJSONString(user));
        }
      }
    
  • 需要消息本身的属性

      @Service
      @RocketMQMessageListener(
        topic = "userTopic", 
        selectorExpression = "tagB||tagC", 
        consumerGroup = "myApp-userTopic-ext-consumer")
      public class UserMessageExtConsumer implements RocketMQListener<MessageExt> {
        
        @Override
        public void onMessage(MessageExt message) {
          System.out.println("topic=" + message.getTopic());
          System.out.println("tags=" + message.getTags());
          System.out.println("msgId=" + message.getMsgId());
          System.out.println("msgKey=" + message.getKeys());
          System.out.println("body=" + JSON.parseObject(message.getBody(), User.class));
          System.out.println("重试次数=" + message.getReconsumeTimes());
        }
      }
    
  • 消费顺序消息

      @Service
      @RocketMQMessageListener(
        consumerGroup = "myApp-userTopic-orderly-consumer",
        topic = "orderly_user_topic", 
        selectorExpression = "*",
        consumeMode = ConsumeMode.ORDERLY)
      public class OrderlyUserConsumer implements RocketMQListener<User> {
        
        @Override
        public void onMessage(User user) {
          System.out.println("orderly user = " + JSON.toJSONString(user));
        }
      }
    

五、高级功能

消息存储

RocketMQ使用文件顺序存储和零拷贝的方法存储数据,目前的高性能磁盘,顺序写速度可以达到600MB/s,而随机写的速度只有大概100KB/s。

message-store

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分别是同步刷盘和异步刷盘

  • 同步刷盘

    在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。

  • 异步刷盘

    在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE。当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个

高可用性机制

消息消费高可用

在Consumer的配置文件中,并不需要设置是从Master读还是从Slave读。当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。

消息发送高可用

在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。目前还不支持Slave自动转成Master,如果机器资源不足需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。

消息主从复制

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。

  • 同步复制

    同步复制方式是等Master和Slave均写成功后才反馈给客户端写成功状态在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量

  • 异步复制

    异步复制方式是只要Master写成功即可反馈给客户端写成功状态。在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失

配置:同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE中的一个

实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把Master的Save配置成ASYNC_FLUSH的刷盘方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。

负载均衡

Producer负载均衡

Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下

Producer负载均衡


Consumer负载均衡

  • 集群模式

    在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均

分配queue给每个实例。有如下分配策略

策略名称 策略
平均分配策略(默认) AllocateMessageQueueAveragely
环形分配策略 AllocateMessageQueueAveragelyByCircle
手动配置分配策略 AllocateMessageQueueByConfig
机房分配策略 AllocateMessageQueueByMachineRoom
一致性哈希分配策略 AllocateMessageQueueConsistentHash
  • 广播模式

    由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

消息重试

顺序消息的重试

对于顺序消息,当消费者消费消息失败后,会自动不断进行消息重试(每次间隔时间为1秒),这时应用会出现消息消费被阻塞的情况。因此在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

无序消息的重试

对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,可以通过设置返回状态达到消息重试的结果

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下

第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

如果消息重试16次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试,超过这个时间范围消息将不再重试投递。可通过Message.getReconsumeTimes()获取当前重试次数。

注意: 一条消息无论重试多少次,这些重试消息的 Message ID不会改变。

广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息

死信队列

当一条消息初次消费失败,RocketMQ会自动进行消息重试。达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

在RocketMQ中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)

死信队列的特点

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

  • 一个死信队列对应一个Group ID,而不是对应单个消费者实例。
  • 如果一个Group ID未产生死信消息,不会为其创建相应的死信队列。
  • 一个死信队列包含了对应Group ID产生的所有死信消息,不论该消息属于哪个Topic。

消费幂等

在网络不稳定的情况下,消息有可能会出现重复投递,或者消费失败会进行重新消费。一般情况下应用都是允许消息重复投递和消费的,所以要在消费端做幂等。

因为Message ID有可能出现冲突(重复),所以真正安全的幂等处理,不建议以Message ID作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Message Key进行设置。Message Key做幂等可以使用Redis、或者Etcd的方式存储