RocketMQ源码:消息发送流程

RocketMQ客户端的消息发送通常分为以下3层

业务层:通常指直接调用RocketMQ Client发送API的业务代码。

消息处理层:指RocketMQ Client获取业务发送的消息对象后,一系列的参数检查、消息发送准备、参数包装等操作。

通信层:指RocketMQ基于Netty封装的一个RPC通信服务,RocketMQ的各个组件之间的通信全部使用该通信层。
总体上讲,消息发送流程首先是 RocketMQ 客户端接收业务层消息,然后通过DefaultMQProducerImpl发送一个RPC请求给Broker,再由Broker处理请求并保存消息。

下面以DefaultMQProducer.send(Message msg)接口为例讲解发送流程

消息发送流程具体分为3步:
第一步:调用defaultMQProducerImpl.send()方法发送消息。

第二步:通过设置的发送超时时间,调用defaultMQProducerImpl.send()方法发送消息。设置的超时时间可以通过sendMsgTimeout进行变更,其默认值为3s。

第三步:执行defaultMQProducerImpl.sendDefaultImpl()方法。这是一个公共发送方法,我们先看看入参:

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) 

communicationMode:通信模式,同步、异步还是单向。
sendCallback:对于异步模式,需要设置发送完成后的回调

image.png

该方法是发送消息的核心方法,执行过程分为5步:
第一步,两个检查:生产者状态、消息及消息内容。没有运行的生产者不能发送消息。消息检查主要检查消息是否为空,消息的Topic的名字是否为空或者是否符合规范;消息体大小是否符合要求,最大值为4MB,可以通过maxMessageSize进行设置。

第二步,执行tryToFindTopicPublishInfo()方法:获取Topic路由信息,如果本地缓存没有路由信息,就通过Namesrv获取路由信息,更新到本地,再返回。
代码如下:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
      TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
      if (null == topicPublishInfo || !topicPublishInfo.ok()) {
          this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
          this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
          topicPublishInfo = this.topicPublishInfoTable.get(topic);
      }

      if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
          return topicPublishInfo;
      } else {
          this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
          topicPublishInfo = this.topicPublishInfoTable.get(topic);
          return topicPublishInfo;
      }
  }

第三步,计算消息发送的重试次数,同步重试和异步重试的执行方式是不同的。

第四步,执行队列选择方法selectOneMessageQueue()。根据队列对象中保存的上次发送消息的Broker的名字和Topic路由,选择(轮询)一个Queue将消息发送到Broker。我们可以通过 sendLatencyFaultEnable 来设置是否总是发送到延迟级别较低的 Broker,默认值为False。

第五步,执行sendKernelImpl()方法。该方法是发送消息的核心方法,主要用于准备通信层的入参(比如Broker地址、请求体等),将请求传递给通信层,内部实现是基于Netty的,在封装为通信层request对象RemotingCommand前,会设置RequestCode表示当前请求是发送单个消息还是批量消息。
代码如下

if (sendSmartMsg || msg instanceof MessageBatch) {
              SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
              request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
          } else {
              request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
          }


Netty 本身是一个异步的网络通信框架,怎么实现同步的调用呢?我们可以通过org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl()方法来实现同步的调用,实现代码如下

    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

在每次发送同步请求后,程序会执行 waitResponse()方法,直到 Netty接收Broker的返回结果

   public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
    }

然后,通过putResponse()方法释放锁,让请求线程同步返回。
异步发送时有很多request,每个response返回后怎么与request进行对应呢?这里面有一个关键参数——opaque,RocketMQ每次发送同步请求前都会为一个request分配一个opaque,这是一个原子自增的id,一个response会以opaque作为key保存在responseTable中,这样用opaque就将request和response连接起来了。

注:文章部分内容来自互联网,如有侵权请联系站长。