RocketMQ源码:生产者启动流程

程序员阿牛
2021-05-29 / 0 评论 / 207 阅读
温馨提示:
本文最后更新于2021-05-29,若内容或图片失效,请留言反馈。

前言

本文的初衷是带着大家一起了解源码以及关键步骤、方法、属性;并不是把源码的各个细节都过一遍,各位coder可以结合文章自己去阅读源码,这样效果更加。

生产者继承关系

DefaultMQProducer是RocketMQ中默认的生产者实现,
DefaultMQProducer的类之间的继承关系如下图

image.png
可以看到这个生产者在实现时包含生产者的操作和配置属性,这是典型的类对象设计。
下面我们将介绍类对象的一些核心属性和方法。以下是一些核心属性:namesrvAddr:继承自 ClientConfig,表示 RocketMQ 集群的 Namesrv 地址,如果是多个则用分号分开。比如:127.0.0.1:9876;127.0.0.2:9876。

clientIP:使用的客户端程序所在机器的 IP地址。支持 IPv4和 IPv6,IPv4 排除了本地的环回地址(127.0.xxx.xxx)和私有内网地址(192.168.xxx.xxx)。这里需要注意的是,如果 Client 运行在 Docker 容器中,获取的 IP 地址是容器所在的 IP地址,而非宿主机的IP地址。

instanceName:实例名,每个实例都需要取唯一的名字,因为有时我们会在同一个机器上部署多个程序进程,如果名字有重复就会导致启动失败。

vipChannelEnabled:这是一个 boolean 值,表示是否开启 VIP 通道。VIP 通道和非VIP通道的区别是:在通信过程中使用的端口号不同。

clientCallbackExecutorThreads:客户端回调线程数。该参数表示 Netty 通信层回调线程的个数,默认值Runtime.getRuntime().availableProcessors()表示当前CPU的有效个数。

pollNameServerInterval:获取 Topic 路由信息的间隔时长,单位为 ms,默认为30 000ms。

heartbeatBrokerInterval:与Broker心跳间隔的时长,单位为ms,默认为30000ms。

defaultMQProducerImpl:默认生产者的实现类,其中封装了Broker的各种API(启动及关闭生产者的接口)。如果你想自己实现一个生产者,可以添加一个新的实现,保持DefaultMQProducer对外接口不变,用户完全没有感知。

producerGroup:生产者组名,这是一个必须传递的参数。RocketMQ-way表示同一个生产者组中的生产者实例行为需要一致。

sendMsgTimeout:发送超时时间,单位为ms。

compressMsgBodyOverHowmuch:消息体的容量上限,超过该上限时消息体会通过ZIP进行压缩,该值默认为4MB。该参数在Client中是如何生效的呢?具体实现代码如下:

private boolean tryToCompressMessage(final Message msg) {
        if (msg instanceof MessageBatch) {
            //batch dose not support compressing right now
            return false;
        }
        byte[] body = msg.getBody();
        if (body != null) {
            if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
                try {
                    byte[] data = UtilAll.compress(body, zipCompressLevel);
                    if (data != null) {
                        msg.setBody(data);
                        return true;
                    }
                } catch (IOException e) {
                    log.error("tryToCompressMessage exception", e);
                    log.warn(msg.toString());
                }
            }
        }

        return false;
    }


retryTimesWhenSendFailed:同步发送失败后重试的次数。默认为2次,也就是说,一共有3次发送机会。

retryTimesWhenSendAsyncFailed:异步发送失败后重试的次数。默认为 2次。异步重试是有条件的重试,并不是每次发送失败后都重试。源代码可以查看org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync()方法。每次发送失败抛出异常后,通过执行onExceptionImpl()方法来决定什么场景进行重试。

以下是一些核心方法:

start():这是启动整个生产者实例的入口,主要负责校验生产者的配置参数是否正确,并启动通信通道、各种定时计划任务、Pull服务、Rebalance服务、注册生产者到Broker等操作。

shutdown():关闭本地已注册的生产者,关闭已注册到Broker的客户端。

fetchPublishMessageQueues(Topic):获取一个Topic有哪些Queue。在发送消息、Pull消息时都需要调用。

send(Message msg):同步发送普通消息。

send(Message msg,long timeout):同步发送普通消息(超时设置)。

send(Message msg,SendCallback sendCallback):异步发送普通消息。

send(Message msg,SendCallback sendCallback,long timeout):异步发送普通消息(超时设置)。

sendOneway(Message msg):发送单向消息。只负责发送消息,不管发送结果。

send(Message msg,MessageQueue mq):同步向指定队列发送消息。

send(Message msg,MessageQueue mq,long timeout):同步向指定队列发送消息(超时设置)。

同步向指定队列发送消息时,如果只有一个发送线程,在发送到某个指定队列中时,这个指定队列中的消息是有顺序的,那么就按照发送时间排序;如果某个Topic的队列都是这种情况,那么我们称该Topic的全部消息是分区有序的。

send(Message msg,MessageQueue mq,SendCallback sendCallback):异步发送消息到指定队列。

send(Message msg,MessageQueue mq,SendCallback sendCallback,long timeout):异步发送消息到指定队列(超时设置)。

send(Message msg,MessageQueueSelector selector,Object arg,SendCallback sendCallback):自定义消息发送到指定队列。通过实现MessageQueueSelector接口来选择将消息发送到哪个队列。

send(Collection<Message>msgs):批量发送消息。

下面介绍两个核心管理接口:

createTopic(String key,String newTopic,int queueNum):创建Topic。

viewMessage(String offsetMsgId):根据消息id查询消息内容。生产者启动的流程比消费者启动的流程更加简单,一般用户使用DefaultMQProducer的构造函数构造一个生产者实例,并设置各种参数。比如Namesrv地址、生产者组名等,调用start()方法启动生产者实例,start()方法调用了生产者默认实现类的start()方法启动,这里我们主要讲实现类的start()方法内部是怎么实现的,其流程如下图所示。

image.png

第一步:通过 switch-case 判断当前生产者的服务状态,创建时默认状态是CREATE_JUST。设置默认启动状态为启动失败。

第二步:执行checkConfig()方法。校验生产者实例设置的各种参数。比如生产者组名是否为空、是否满足命名规则、长度是否满足等。

第三步:执行changeInstanceNameToPID()方法。校验instance name,如果是默认名字则将其修改为进程id。

第四步:执行getOrCreateMQClientInstance()方法。根据生产者组名获取或者初始化一个MQClientInstance。

初始化代码如下:

   public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }


由此可见,MQClientInstance实例与clientId是一一对应的,而clientId是由clientIP、instanceName 及 unitName 构成的。一般来讲,为了减少客户端的使用资源,如果将所有的 instanceName和 unitName设置为同样的值,就会只创建一个 MQClientInstance实例,
具体实现代码如下:

    public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());

        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }

        return sb.toString();
    }

MQClientInstance 实例的功能是管理本实例中全部生产者与消费者的生产和消费行为。下面我们来看一下org.apache.rocketmq.client.impl.factory.MQClientInstance 类的核心属性(删除了部分不重要的),具体代码如下:

public class MQClientInstance {

    private final String clientId;

    //producerTable:当前client实例的全部生产者的内部实例。
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();

    //consumerTable:当前client实例的全部消费者的内部实例
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();

    //adminExtTable:当前client实例的全部管理实例
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();

    //mQClientAPIImpl:其实每个client也是一个Netty Server,也会支持Broker访问,这里实现了全部client支持的接口。
    private final MQClientAPIImpl mQClientAPIImpl;

    //mQAdminImpl:管理接口的本地实现类
    private final MQAdminImpl mQAdminImpl;
  
    //topicRouteTable:当前生产者、消费者中全部Topic的本地缓存路由信息
    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();


    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();

    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
        new ConcurrentHashMap<String, HashMap<String, Integer>>();

    //scheduledExecutorService:本地定时任务,比如定期获取当前 Namesrv 地址、定期同步Namesrv信息、定期更新Topic路由信息、定期发送心跳信息给Broker、定期清理已下线的Broker、定期持久化消费位点、定期调整消费线程数
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });

     //clientRemotingProcessor:请求的处理器,从处理方法processRequest()中我们可以知道目前支持哪些功能接口
    private final ClientRemotingProcessor clientRemotingProcessor;

    //pullMessageService:Pull服务
    private final PullMessageService pullMessageService;
    
    //rebalanceService:重新平衡服务。定期执行重新平衡方法this.mqClientFactory.doRebalance()。
    这里的 mqClientFactory 就是MQClientInstance 实例,
    通过依次调用MQClientInstance中保存的消费者实例的doRebalance()方法,
    来感知订阅关系的变化、集群变化等,以达到重新平衡
    private final RebalanceService rebalanceService;

    private final DefaultMQProducer defaultMQProducer;

    //consumerStatsManager:消费监控。比如拉取RT(Response Time,响应时间)、拉取TPS(Transactions Per Second,每秒处理消息数)、消费RT等都可以统计。 
    private final ConsumerStatsManager consumerStatsManager;


MQClientInstance中还有一些核心方法如下:

//从多个Namesrv中获取最新Topic路由信息,更新本地缓存。
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer);

//清理已经下线的Broker
private void cleanOfflineBroker();

//检查Client是否在Broker中有效
public void checkClientInBroker(final String brokerAddr, final String consumerGroup,
        final String clientId, final SubscriptionData subscriptionData,
        final long timeoutMillis);

//发送客户端的心跳信息给所有的Broker
public void sendHeartbeatToAllBrokerWithLock();

//在本地注册一个消费者
public boolean registerConsumer(final String group, final MQConsumerInner consumer);

//取消本地注册
public void unregisterConsumer(final String group);

//在本地注册一个生产者
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer);

//取消本地注册
public void unregisterProducer(final String group);

//注册一个管理实例
public boolean registerAdminExt(final String group, final MQAdminExtInner admin);

//立即执行一次 Rebalance。该操作是通过 RocketMQ 的一个CountDownLatch2锁来实现的
 public void rebalanceImmediately() ;

//对于所有已经注册的消费者实例,执行一次Rebalance
public void doRebalance() ;

//在本地缓存中查找Master或者Slave Broker信息
public FindBrokerResult findBrokerAddressInAdmin(final String brokerName);

//在本地缓存中查找Slave Broker信息
public FindBrokerResult findBrokerAddressInSubscribe(
        final String brokerName,
        final long brokerId,
        final boolean onlyThisBroker
    );

//在本地缓存中查找Master Broker地址
public String findBrokerAddressInPublish(final String brokerName);

//通过Topic名字查找Broker地址
 public String findBrokerAddrByTopic(final String topic) ;

//获取一个订阅关系中每个队列的消费进度
public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) ;

//直接将消息发送给指定的消费者消费,和正常投递不同的是,指定了已经订阅的消费者组中的一个,而不是全部已经订阅的消费者。一般适用于在消费消息后,某一个消费者组想再消费一次的场景
public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg,
        final String consumerGroup,
        final String brokerName);

//获取消费者的消费统计信息。包含消费RT、消费TPS等。
public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup);

声明:文章内容部分来自于互联网,如有侵权,请联系作者进行删除