RocketMQ中生产者通过DefaultProducer来创建。
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
private String producerGroup;private String createTopicKey = MixAll.DEFAULT_TOPIC;
private volatile int defaultTopicQueueNums = 4;
private int sendMsgTimeout = 3000;
private int compressMsgBodyOverHowmuch = 1024 * 4;
private int retryTimesWhenSendFailed = 2;
private boolean retryAnotherBrokerWhenNotStoreOK = false;
private int maxMessageSize = 1024 * 128;
defaultMQProducerImpl作为生产者的具体实现,而被DefaultProducer来包装在内部。
ProductGroup代表该生产者具体位于哪个生产者集群底下。
接下里是一些在实际运用当中需要的参数,而在这底下虽然已经被被配置过相应的值,但主要还是用于test以及demo,具体还是需要相应的配置。
从上往下分别是该生产者相应的topic,默认的消息队数量,发送消息的timeout,消息超过多大容量需要压缩的大小,消息发送失败后重发次数的上限,如果发送给一个broker失败是否选择换一个broker发送,最大的消息大小。这些都有相应的getter,setter方法在DefaultProducer当中可以设置。
DefaultProducer继承了ClientConfig类,可以说这个类就是生产者客户端的配置管理类。
在ClientConfig中存放了更底层的配置信息。
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));private String clientIP = RemotingUtil.getLocalAddress();
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
private int pollNameServerInteval = 1000 * 30;private int heartbeatBrokerInterval = 1000 * 30;
其中namesrvAddr则是最重要的名称服务的服务器地址。
后两者则是向名称服务器更新的频率,以及心跳数据发送的频率。
public static final String SendMessageWithVIPChannelProperty = "com.rocketmq.sendMessageWithVIPChannel";private boolean sendMessageWithVIPChannel = Boolean.parseBoolean(System.getProperty(SendMessageWithVIPChannelProperty, "false"));
上面的参数用来设置是否选择VIP消息队列(高优先级)。
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {this.producerGroup = producerGroup;defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
在构造方法中设置相应的producerGroup设置相应的生产者组名,以及传入rpcHook实现具体的DefaultMQProducer的构造方法。
public void start() throws MQClientException {this.defaultMQProducerImpl.start();
}
defaultMQProducer通过start()直接调用DefaultMQProducerImpl的start()方法完成生产者的具体开启。
下面具体说DefaultMQProducerImpl
在一开始的构造方法的实现很简单。
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {this.defaultMQProducer = defaultMQProducer;this.rpcHook = rpcHook;
}
具体的开启在start()方法中。
public void start() throws MQClientException {this.start(true);
}public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}this.mQClientFactory =MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer,rpcHook);boolean registerOK =mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please."+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {mQClientFactory.start();}log.info("the producer [{}] start OK", this.defaultMQProducer.getProducerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "//+ this.serviceState//+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);default:break;}this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
在一开始的状态量为CREAT_JUST,可以顺利进入具体的开始实现,通过swith结构巧妙的保证了start()的单一实现。
在start()实现的一开始先将状态量改变,保证配置不会重复。
之后调用checkConfig()方法,以保证生产者组名被正确配置,并没有和生产者的默认组名冲突。
private void checkConfig() throws MQClientException {Validators.checkGroup(this.defaultMQProducer.getProducerGroup());if (null == this.defaultMQProducer.getProducerGroup()) {throw new MQClientException("producerGroup is null", null);}if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP+ ", please specify another one.", null);}
}
在接下来就要完成具体生产者客户端的配置。调用客户端管理MQClientManager的getCAndCreateMQClientInstance()来获取客户端实例的获取。
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {String clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;} else {// TODO log}}return instance;
}
首先在DefaultProducer的超类ClientConfig当中设置客户端id,除了ip之外还有相应的实例名称。
public String buildMQClientId() {StringBuilder sb = new StringBuilder();sb.append(this.getClientIP());sb.append("@");sb.append(this.getInstanceName());if(!UtilAll.isBlank(this.unitName)) {sb.append("@");sb.append(this.unitName);}return sb.toString();
}
在获取客户端ip之后,ClinetManager中有一个map管理着相应的客户端实例,可以先从当中根据id获取,如果没有,则重新创建。
private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable =new ConcurrentHashMap<String, MQClientInstance>();
为了保证内存安全,传入客户端实例构造方法的ClientConfig需要完全复制一份,而不是直接传入。
在成功获取了生产者客户端实例之后,需要向客户端实例注册相应的生产者组名以及该生产者。具体的实现也只是将组名与生产者实例的键值对存放在客户端实例下面的生产者map里。
在defaultMQProducerImpl中有一个map来缓存具体的路由队列信息。
private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =new ConcurrentHashMap<String, TopicPublishInfo>();
下面是TopicPublishInfo的结构。
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private AtomicInteger sendWhichQueue = new AtomicInteger(0);
其中的list具体存放该topic的消息队列,而下面则表示消息发送的具体哪条队列。
在之前注册完毕之后也需要将DefaultProducer的topic与新创建的TopicPublishInfo放入map中,以便接下来的使用。
之后如果startFactory为true(默认为true),则调用客户端实例的start()方法,来正式启动该生产者的客户端。
接下里看客户端在之前为了获取实例就调用的构造方法。
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {this.clientConfig = clientConfig;this.instanceIndex = instanceIndex;this.nettyClientConfig = new NettyClientConfig();this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());this.clientRemotingProcessor = new ClientRemotingProcessor(this);this.mQClientAPIImpl =new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig.getUnitName());if (this.clientConfig.getNamesrvAddr() != null) {this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());}this.clientId = clientId;this.mQAdminImpl = new MQAdminImpl(this);this.pullMessageService = new PullMessageService(this);this.rebalanceService = new RebalanceService(this);this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);this.defaultMQProducer.resetClientConfig(clientConfig);this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}",//this.instanceIndex, //this.clientId, //this.clientConfig, //MQVersion.getVersionDesc(MQVersion.CurrentVersion), RemotingCommand.getSerializeTypeConfigInThisServer());
}
可以看到在一开始先将配置ClientConfig中的配置属性。接下来配置nettyClient的服务配置。
接下来完成了MQClientAPIImpl客户端API的配置创建,在MQClientAPIImpl的构造方法里完成了Netty远程客户端的创建,用以管理具体的网络访问。
同时在MQClientAPIImpl在构造方法中完成了TopAddressing的创建用来完成名称服务寻址功能的实现。
如果一开始就配置了名称服务的服务器地址,就需要通过MQClientAPIImpl的updateNameServerAddressList()方法来更新具体的名称服务的地址在客户端上。
public void updateNameServerAddressList(final String addrs) {List<String> lst = new ArrayList<String>();String[] addrArray = addrs.split(";");if (addrArray != null) {for (String addr : addrArray) {lst.add(addr);}this.remotingClient.updateNameServerAddressList(lst);}
}
将名称服务字符串解析为数组后调用了netty客户端的方法。
public void updateNameServerAddressList(List<String> addrs) {List<String> old = this.namesrvAddrList.get();boolean update = false;if (!addrs.isEmpty()) {if (null == old) {update = true;}else if (addrs.size() != old.size()) {update = true;}else {for (int i = 0; i < addrs.size() && !update; i++) {if (!old.contains(addrs.get(i))) {update = true;}}}if (update) {Collections.shuffle(addrs);this.namesrvAddrList.set(addrs);}}
}
在这里将名称服务的地址正式配置在客户端上面。
接下来MQClientInstance的构造方法的成员创建将在下面具体发挥作用的时候解释。
在构造方法完毕后,不久之后DefaultProducerImpl将会调用MQClientInstance的start()方法。
public void start() throws MQClientException {PackageConflictDetect.detectFastjson();synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}
这里的start()是非常让人激动的。
如果一开始就没有配置相应的名称服务服务器地址,这里将会通过MQClientAPIImpl的fetchNameServerAddr()方法主动寻址。
public String fetchNameServerAddr() {try {String addrs = this.topAddressing.fetchNSAddr();if (addrs != null) {if (!addrs.equals(this.nameSrvAddr)) {log.info("name server address changed, old: " + this.nameSrvAddr + " new: " + addrs);this.updateNameServerAddressList(addrs);this.nameSrvAddr = addrs;return nameSrvAddr;}}}catch (Exception e) {log.error("fetchNameServerAddr Exception", e);}return nameSrvAddr;
}
在topAdressing中通过fetchNSAddr()来在默认配置的
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
地址来试图获取名称服务的服务器地址。
在尝试取址之后,将会调用DefaultProducerImpl的start()方法,在MQClientAPIImpl的start()方法中只是简单的调用了netty客户端的start()方法,也就是说在这里,生产者关于服务器的启动正式开始。
之后将会startSchedulTask()开启一些定时任务(依赖secheduledExecutorService实现)。
private void startScheduledTask() {if (null == this.clientConfig.getNamesrvAddr()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();}catch (Exception e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.updateTopicRouteInfoFromNameServer();}catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);}}}, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.cleanOfflineBroker();MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();}catch (Exception e) {log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);}}}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();}catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.adjustThreadPool();}catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);}}}, 1, 1, TimeUnit.MINUTES);
}
从上往下分别是
1.如果不存在名称服务器的地址 则会定时调用之前的寻址方法。
2.定时从名称服务更新生产者消费者路由信息
public void updateTopicRouteInfoFromNameServer() {Set<String> topicList = new HashSet<String>();// Consumer{Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {Set<SubscriptionData> subList = impl.subscriptions();if (subList != null) {for (SubscriptionData subData : subList) {topicList.add(subData.getTopic());}}}}}// Producer{Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {Set<String> lst = impl.getPublishTopicList();topicList.addAll(lst);}}}for (String topic : topicList) {this.updateTopicRouteInfoFromNameServer(topic);}
}
将所有的生产者消费者路由信息打包通过调用updateTopicRouteInfoFronnameServer()方法更新本地客户端的所有路由信息。
public boolean updateTopicRouteInfoFromNameServer(final String topic) {return updateTopicRouteInfoFromNameServer(topic, false, null);
}public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {try {TopicRouteData topicRouteData;if (isDefault && defaultMQProducer != null) {topicRouteData =this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);if (topicRouteData != null) {for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}}else {topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);}if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {changed = this.isNeedUpdateTopicRouteInfo(topic);}else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) {TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put TopicRouteData[{}]", cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}}else {log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);}}catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}}finally {this.lockNamesrv.unlock();}}else {log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LockTimeoutMillis);}}catch (InterruptedException e) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false;
}
在第一步先通过MQClientAPIImpl来封装发送更新路由信息请求给名称服务器来取得最新的路由信息,得到新的信息将会与现有的信息进行比较,如果发生了改变,依次更新Broker,生产消费者的路由信息。
3.定期清除已经离线的Broker服务器(在从名称服务获取的路由信息中该Broker的地址已经不存在),以及向所有仍在线的Broker发送心跳信息。
4.定期持久化各消费者队列消费进度。
5.定期根据消费者数量调整线程池大小。
在这之后rocketMQ生产者的客户端正式启动,但在结束之前将会试图给所有Broker发送心跳信息。
Producer的启动宣告结束。