采坑笔记
笔记
RoekctMq 踩坑
同一个进程消费不同mq集群, 出现问题
// DefaultMQPushConsumerImpl.class public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: /** ignore */ this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); /** ignore */ } }
// MQClientManager.class public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; }
拿取的 clientId 没有特殊配置, 因此拿的是上一个注册的
需要特别配置instanceName
// ClientConfig.java public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); sb.append("@"); sb.append(this.getInstanceName()); if (!UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); } return sb.toString(); } public String getInstanceName() { return instanceName; } public void setInstanceName(String instanceName) { this.instanceName = instanceName; } public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = String.valueOf(UtilAll.getPid()); } }