近半年来,公司的各个服务都在做异地多活改造。由于各服务改造进度不同,因此需要一些临时的过渡方案。
例如这几天在改造一个服务时,需要做这样一件事情:在一个服务中,向不同的RocketMQ集群发送消息,或是消费不同RocketMQ集群的消息。
这里以发送消息举例,配置两个producer实例,示意代码如下:

1
2
3
4
5
6
7
8
9
DefaultMQProducer producer1 = new DefaultMQProducer();
producer1.setNamesrvAddr(environment.getProperty("rocketmq.nameServerAddr1"));
producer1.setProducerGroup(environment.getProperty("rocketmq.producerGroupName1"));
producer1.start();

DefaultMQProducer producer2 = new DefaultMQProducer();
producer2.setNamesrvAddr(environment.getProperty("rocketmq.nameServerAddr2"));
producer2.setProducerGroup(environment.getProperty("rocketmq.producerGroupName2"));
producer2.start();

在测试时发现这样一个问题,用producer2发的消息也发到第一个RocketMQ集群去了。于是跟了下源码是什么原因导致这个问题。
start方法在DefaultMQProducerImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {
mQClientFactory.start();
}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

serviceState默认为CREATE_JUST,在自定义了produceGroup之后,会走到defaultMQProducer.changeInstanceNameToPID()

1
2
3
4
5
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}

instanceName在用户没有自定义时,默认初始值为DEFAULT,那么instanceName会被改成当前的进程id。

1
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

这行代码获取一个MQClientInstance实例,最后启动的实际是该实例。重点来看getAndCreateMQClientInstance方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}

return instance;
}

首先获取一个clientId,这个clientId在日志中我们经常可以看到。

1
2
3
4
5
6
7
8
9
10
11
12
13
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());

sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}

return sb.toString();
}

通过buildMQClientId方法,默认在没有自定义instanceName的情况下,得到一个ip@pid这样的clientId,如果自定义了instanceName,则得到ip@instanceName。这里我们发现,不同的producer,用户如果不自定义instanceName,就会得到相同的clientId,这样会有什么问题呢?我们接着往下看。

1
MQClientInstance instance = this.factoryTable.get(clientId);

根据clientIdfactoryTable中获取MQClientInstance实例,其中factoryTable是以clientIdkeyvalueMQClientInstancemap,结构如下

1
2
private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();

在我们上面定义的producer1调用start方法之后,factoryTable初始为空,这时就会实例化一个MQClientInstance并加入到factoryTable中,该MQClientInstance的实例就会包含nameSrv的信息。按照最上面的示意代码流程,producer1启动之后,producer2启动,由于两个producer都没有自定义instanceName,那么通过上面的分可知两者的clientId相同,因此producer2不会重新实例化一个MQClientInstance,而是直接从factoryTable中获取,这就导致producer1producer2共用一个MQClientInstance实例,就会导致producer2虽然设置了nameSrv,但实际上没有生效。
再回到DefaultMQProducerImpl#start方法,在获取到MQClientInstance的实例mQClientFactory之后,将producerGroupproducer实例注册到mQClientFactory上:

1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}

MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}

return true;
}

这里的producerTable是一个producerGroupkeyvalueMQProducerInner实例的map

结论

分析到这里,已经足够解决我们在文章开头提出的问题,结论如下:
对于在同一个应用中,需要向不同的RocketMQ集群发送消息,或是消费不同的RocketMQ集群的消息的场景下,我们必须要为不同的producer或是consumer设置不同的instanceName,否则就会出现不能正确的把消息发送到目标集群或是无法正确消费到目标集群的消息的情况。