rocketmq/patch014-backport-Queue-Selection-Strategy-Optimization.patch
2023-10-30 20:40:07 +08:00

2024 lines
99 KiB
Diff

From b028277018946868838a82a08211071bc231a175 Mon Sep 17 00:00:00 2001
From: Ji Juntao <juntao.jjt@alibaba-inc.com>
Date: Tue, 29 Aug 2023 16:13:38 +0800
Subject: [PATCH] [ISSUE #6567] [RIP-63] Queue Selection Strategy Optimization
(#6568)
Optimize the proxy's and client's selection strategy for brokers when sending messages, and use multiple selection strategies as a pipeline to filter suitable queues.
---
.../apache/rocketmq/client/ClientConfig.java | 54 +++++
.../client/common/ThreadLocalIndex.java | 8 +
.../rocketmq/client/impl/MQClientAPIImpl.java | 12 +-
.../client/impl/factory/MQClientInstance.java | 7 +
.../impl/producer/DefaultMQProducerImpl.java | 87 ++++++--
.../impl/producer/TopicPublishInfo.java | 40 ++++
.../client/latency/LatencyFaultTolerance.java | 66 +++++-
.../latency/LatencyFaultToleranceImpl.java | 189 ++++++++++++++----
.../client/latency/MQFaultStrategy.java | 155 ++++++++++----
.../rocketmq/client/latency/Resolver.java | 17 +-
.../client/latency/ServiceDetector.java | 30 +++
.../LatencyFaultToleranceImplTest.java | 36 +++-
.../processor/DefaultRequestProcessor.java | 24 ---
.../rocketmq/proxy/config/ProxyConfig.java | 46 +++++
.../grpc/v2/producer/SendMessageActivity.java | 2 +-
.../proxy/processor/ProducerProcessor.java | 18 +-
.../service/route/LocalTopicRouteService.java | 2 +-
.../service/route/MessageQueueSelector.java | 95 ++++++++-
.../proxy/service/route/MessageQueueView.java | 18 +-
.../service/route/TopicRouteService.java | 80 +++++++-
.../consumer/ReceiveMessageActivityTest.java | 5 +-
.../v2/producer/SendMessageActivityTest.java | 82 +++++++-
.../proxy/service/BaseServiceTest.java | 4 +-
.../route/MessageQueueSelectorTest.java | 8 +-
.../sysmessage/HeartbeatSyncerTest.java | 2 +-
.../ClusterTransactionServiceTest.java | 8 +-
26 files changed, 919 insertions(+), 176 deletions(-)
rename remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java => client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java (65%)
create mode 100644 client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index f87450f66..bb0fe3522 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -38,6 +38,8 @@ public class ClientConfig {
public static final String SOCKS_PROXY_CONFIG = "com.rocketmq.socks.proxy.config";
public static final String DECODE_READ_BODY = "com.rocketmq.read.body";
public static final String DECODE_DECOMPRESS_BODY = "com.rocketmq.decompress.body";
+ public static final String SEND_LATENCY_ENABLE = "com.rocketmq.sendLatencyEnable";
+ public static final String START_DETECTOR_ENABLE = "com.rocketmq.startDetectorEnable";
public static final String HEART_BEAT_V2 = "com.rocketmq.heartbeat.v2";
private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
private String clientIP = NetworkUtil.getLocalAddress();
@@ -72,6 +74,8 @@ public class ClientConfig {
private String socksProxyConfig = System.getProperty(SOCKS_PROXY_CONFIG, "{}");
private int mqClientApiTimeout = 3 * 1000;
+ private int detectTimeout = 200;
+ private int detectInterval = 2 * 1000;
private LanguageCode language = LanguageCode.JAVA;
@@ -81,6 +85,15 @@ public class ClientConfig {
*/
protected boolean enableStreamRequestType = false;
+ /**
+ * Enable the fault tolerance mechanism of the client sending process.
+ * DO NOT OPEN when ORDER messages are required.
+ * Turning on will interfere with the queue selection functionality,
+ * possibly conflicting with the order message.
+ */
+ private boolean sendLatencyEnable = Boolean.parseBoolean(System.getProperty(SEND_LATENCY_ENABLE, "false"));
+ private boolean startDetectorEnable = Boolean.parseBoolean(System.getProperty(START_DETECTOR_ENABLE, "false"));
+
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
@@ -186,6 +199,10 @@ public class ClientConfig {
this.decodeDecompressBody = cc.decodeDecompressBody;
this.enableStreamRequestType = cc.enableStreamRequestType;
this.useHeartbeatV2 = cc.useHeartbeatV2;
+ this.startDetectorEnable = cc.startDetectorEnable;
+ this.sendLatencyEnable = cc.sendLatencyEnable;
+ this.detectInterval = cc.detectInterval;
+ this.detectTimeout = cc.detectTimeout;
}
public ClientConfig cloneClientConfig() {
@@ -210,6 +227,10 @@ public class ClientConfig {
cc.decodeDecompressBody = decodeDecompressBody;
cc.enableStreamRequestType = enableStreamRequestType;
cc.useHeartbeatV2 = useHeartbeatV2;
+ cc.startDetectorEnable = startDetectorEnable;
+ cc.sendLatencyEnable = sendLatencyEnable;
+ cc.detectInterval = detectInterval;
+ cc.detectTimeout = detectTimeout;
return cc;
}
@@ -381,6 +402,38 @@ public class ClientConfig {
this.enableStreamRequestType = enableStreamRequestType;
}
+ public boolean isSendLatencyEnable() {
+ return sendLatencyEnable;
+ }
+
+ public void setSendLatencyEnable(boolean sendLatencyEnable) {
+ this.sendLatencyEnable = sendLatencyEnable;
+ }
+
+ public boolean isStartDetectorEnable() {
+ return startDetectorEnable;
+ }
+
+ public void setStartDetectorEnable(boolean startDetectorEnable) {
+ this.startDetectorEnable = startDetectorEnable;
+ }
+
+ public int getDetectTimeout() {
+ return this.detectTimeout;
+ }
+
+ public void setDetectTimeout(int detectTimeout) {
+ this.detectTimeout = detectTimeout;
+ }
+
+ public int getDetectInterval() {
+ return this.detectInterval;
+ }
+
+ public void setDetectInterval(int detectInterval) {
+ this.detectInterval = detectInterval;
+ }
+
public boolean isUseHeartbeatV2() {
return useHeartbeatV2;
}
@@ -403,6 +456,7 @@ public class ClientConfig {
+ ", socksProxyConfig=" + socksProxyConfig + ", language=" + language.name()
+ ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout
+ ", decodeReadBody=" + decodeReadBody + ", decodeDecompressBody=" + decodeDecompressBody
+ + ", sendLatencyEnable=" + sendLatencyEnable + ", startDetectorEnable=" + startDetectorEnable
+ ", enableStreamRequestType=" + enableStreamRequestType + ", useHeartbeatV2=" + useHeartbeatV2 + "]";
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
index 4a3d90135..3a086c13d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
@@ -33,6 +33,14 @@ public class ThreadLocalIndex {
return index & POSITIVE_MASK;
}
+ public void reset() {
+ int index = Math.abs(random.nextInt(Integer.MAX_VALUE));
+ if (index < 0) {
+ index = 0;
+ }
+ this.threadLocalIndex.set(index);
+ }
+
@Override
public String toString() {
return "ThreadLocalIndex{" +
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 213c26fd6..3201a493f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -666,7 +666,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
} catch (Throwable e) {
}
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true);
return;
}
@@ -684,14 +684,14 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
} catch (Throwable e) {
}
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true);
} catch (Exception e) {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true);
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer);
}
} else {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true);
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
@@ -711,7 +711,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
});
} catch (Exception ex) {
long cost = System.currentTimeMillis() - beginStartTime;
- producer.updateFaultItem(brokerName, cost, true);
+ producer.updateFaultItem(brokerName, cost, true, false);
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
}
@@ -735,7 +735,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
if (needRetry && tmp <= timesTotal) {
String retryBrokerName = brokerName;//by default, it will send to the same broker
if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
- MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
+ MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName, false);
retryBrokerName = instance.getBrokerNameFromMessageQueue(mqChosen);
}
String addr = instance.findBrokerAddressInPublish(retryBrokerName);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 8851bc815..9484b26f8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -125,6 +126,12 @@ public class MQClientInstance {
private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet = new HashSet();
private final ConcurrentMap<String, Integer> brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread"));
+ private final ScheduledExecutorService fetchRemoteConfigExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "MQClientFactoryFetchRemoteConfigScheduledThread");
+ }
+ });
private final PullMessageService pullMessageService;
private final RebalanceService rebalanceService;
private final DefaultMQProducer defaultMQProducer;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 3f4c6e5f7..bbbb17b07 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -33,6 +33,8 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Optional;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
@@ -49,6 +51,8 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.latency.MQFaultStrategy;
+import org.apache.rocketmq.client.latency.Resolver;
+import org.apache.rocketmq.client.latency.ServiceDetector;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
@@ -112,7 +116,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<>();
- private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
+ private MQFaultStrategy mqFaultStrategy;
private ExecutorService asyncSenderExecutor;
// compression related
@@ -153,8 +157,38 @@ public class DefaultMQProducerImpl implements MQProducerInner {
semaphoreAsyncSendSize = new Semaphore(1024 * 1024, true);
log.info("semaphoreAsyncSendSize can not be smaller than 1M.");
}
- }
+ ServiceDetector serviceDetector = new ServiceDetector() {
+ @Override
+ public boolean detect(String endpoint, long timeoutMillis) {
+ Optional<String> candidateTopic = pickTopic();
+ if (!candidateTopic.isPresent()) {
+ return false;
+ }
+ try {
+ MessageQueue mq = new MessageQueue(candidateTopic.get(), null, 0);
+ mQClientFactory.getMQClientAPIImpl()
+ .getMaxOffset(endpoint, mq, timeoutMillis);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ };
+
+ this.mqFaultStrategy = new MQFaultStrategy(defaultMQProducer.cloneClientConfig(), new Resolver() {
+ @Override
+ public String resolve(String name) {
+ return DefaultMQProducerImpl.this.mQClientFactory.findBrokerAddressInPublish(name);
+ }
+ }, serviceDetector);
+ }
+ private Optional<String> pickTopic() {
+ if (topicPublishInfoTable.isEmpty()) {
+ return Optional.absent();
+ }
+ return Optional.of(topicPublishInfoTable.keySet().iterator().next());
+ }
public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) {
this.checkForbiddenHookList.add(checkForbiddenHook);
log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(),
@@ -229,6 +263,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
mQClientFactory.start();
}
+ if (this.mqFaultStrategy.isStartDetectorEnable()) {
+ this.mqFaultStrategy.startDetector();
+ }
+
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
@@ -273,6 +311,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (shutdownFactory) {
this.mQClientFactory.shutdown();
}
+ if (this.mqFaultStrategy.isStartDetectorEnable()) {
+ this.mqFaultStrategy.shutdown();
+ }
RequestFutureHolder.getInstance().shutdown(this);
log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
@@ -574,7 +615,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg,
- final long timeout) throws MQClientException, RemotingTooMuchRequestException {
+ final long timeout) throws MQClientException, RemotingTooMuchRequestException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
@@ -584,7 +625,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
MessageQueue mq = null;
try {
List<MessageQueue> messageQueueList =
- mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
+ mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
@@ -609,12 +650,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
- return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
+ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
+ return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName, resetIndex);
}
- public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
- this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
+ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation,
+ boolean reachable) {
+ this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable);
}
private void validateNameServerSetting() throws MQClientException {
@@ -647,9 +689,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
+ boolean resetIndex = false;
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
- MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
+ if (times > 0) {
+ resetIndex = true;
+ }
+ MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
@@ -667,7 +713,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
switch (communicationMode) {
case ASYNC:
return null;
@@ -684,9 +730,22 @@ public class DefaultMQProducerImpl implements MQProducerInner {
default:
break;
}
- } catch (RemotingException | MQClientException e) {
+ } catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
+ log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
+ log.warn(msg.toString());
+ exception = e;
+ continue;
+ } catch (RemotingException e) {
+ endTimestamp = System.currentTimeMillis();
+ if (this.mqFaultStrategy.isStartDetectorEnable()) {
+ // Set this broker unreachable when detecting schedule task is running for RemotingException.
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false);
+ } else {
+ // Otherwise, isolate this broker.
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, true);
+ }
log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
if (log.isDebugEnabled()) {
log.debug(msg.toString());
@@ -695,7 +754,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false);
log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
if (log.isDebugEnabled()) {
log.debug(msg.toString());
@@ -712,7 +771,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
log.warn("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
if (log.isDebugEnabled()) {
log.debug(msg.toString());
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 275ada7ac..37b1f3252 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.client.impl.producer;
import java.util.ArrayList;
import java.util.List;
+
+import com.google.common.base.Preconditions;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
@@ -30,6 +32,10 @@ public class TopicPublishInfo {
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
+ public interface QueueFilter {
+ boolean filter(MessageQueue mq);
+ }
+
public boolean isOrderTopic() {
return orderTopic;
}
@@ -66,6 +72,40 @@ public class TopicPublishInfo {
this.haveTopicRouterInfo = haveTopicRouterInfo;
}
+ public MessageQueue selectOneMessageQueue(QueueFilter ...filter) {
+ return selectOneMessageQueue(this.messageQueueList, this.sendWhichQueue, filter);
+ }
+
+ private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) {
+ if (messageQueueList == null || messageQueueList.isEmpty()) {
+ return null;
+ }
+
+ if (filter != null && filter.length != 0) {
+ for (int i = 0; i < messageQueueList.size(); i++) {
+ int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
+ MessageQueue mq = messageQueueList.get(index);
+ boolean filterResult = true;
+ for (QueueFilter f: filter) {
+ Preconditions.checkNotNull(f);
+ filterResult &= f.filter(mq);
+ }
+ if (filterResult) {
+ return mq;
+ }
+ }
+
+ return null;
+ }
+
+ int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
+ return messageQueueList.get(index);
+ }
+
+ public void resetIndex() {
+ this.sendWhichQueue.reset();
+ }
+
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
index 09a8aa461..72d2f3450 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
@@ -18,11 +18,75 @@
package org.apache.rocketmq.client.latency;
public interface LatencyFaultTolerance<T> {
- void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
+ /**
+ * Update brokers' states, to decide if they are good or not.
+ *
+ * @param name Broker's name.
+ * @param currentLatency Current message sending process's latency.
+ * @param notAvailableDuration Corresponding not available time, ms. The broker will be not available until it
+ * spends such time.
+ * @param reachable To decide if this broker is reachable or not.
+ */
+ void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration,
+ final boolean reachable);
+ /**
+ * To check if this broker is available.
+ *
+ * @param name Broker's name.
+ * @return boolean variable, if this is true, then the broker is available.
+ */
boolean isAvailable(final T name);
+ /**
+ * To check if this broker is reachable.
+ *
+ * @param name Broker's name.
+ * @return boolean variable, if this is true, then the broker is reachable.
+ */
+ boolean isReachable(final T name);
+
+ /**
+ * Remove the broker in this fault item table.
+ *
+ * @param name broker's name.
+ */
void remove(final T name);
+ /**
+ * The worst situation, no broker can be available. Then choose random one.
+ *
+ * @return A random mq will be returned.
+ */
T pickOneAtLeast();
+
+ /**
+ * Start a new thread, to detect the broker's reachable tag.
+ */
+ void startDetector();
+
+ /**
+ * Shutdown threads that started by LatencyFaultTolerance.
+ */
+ void shutdown();
+
+ /**
+ * A function reserved, just detect by once, won't create a new thread.
+ */
+ void detectByOneRound();
+
+ /**
+ * Use it to set the detect timeout bound.
+ *
+ * @param detectTimeout timeout bound
+ */
+ void setDetectTimeout(final int detectTimeout);
+
+ /**
+ * Use it to set the detector's detector interval for each broker (each broker will be detected once during this
+ * time)
+ *
+ * @param detectInterval each broker's detecting interval
+ */
+ void setDetectInterval(final int detectInterval);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
index 93795d957..8af629574 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -21,30 +21,97 @@ import java.util.Collections;
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
- private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<>(16);
+ private final static Logger log = LoggerFactory.getLogger(MQFaultStrategy.class);
+ private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
+ private int detectTimeout = 200;
+ private int detectInterval = 2000;
+ private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "LatencyFaultToleranceScheduledThread");
+ }
+ });
- private final ThreadLocalIndex randomItem = new ThreadLocalIndex();
+ private final Resolver resolver;
+
+ private final ServiceDetector serviceDetector;
+
+ public LatencyFaultToleranceImpl(Resolver resolver, ServiceDetector serviceDetector) {
+ this.resolver = resolver;
+ this.serviceDetector = serviceDetector;
+ }
+
+ public void detectByOneRound() {
+ for (Map.Entry<String, FaultItem> item : this.faultItemTable.entrySet()) {
+ FaultItem brokerItem = item.getValue();
+ if (System.currentTimeMillis() - brokerItem.checkStamp >= 0) {
+ brokerItem.checkStamp = System.currentTimeMillis() + this.detectInterval;
+ String brokerAddr = resolver.resolve(brokerItem.getName());
+ if (brokerAddr == null) {
+ faultItemTable.remove(item.getKey());
+ continue;
+ }
+ if (null == serviceDetector) {
+ continue;
+ }
+ boolean serviceOK = serviceDetector.detect(brokerAddr, detectTimeout);
+ if (serviceOK && !brokerItem.reachableFlag) {
+ log.info(brokerItem.name + " is reachable now, then it can be used.");
+ brokerItem.reachableFlag = true;
+ }
+ }
+ }
+ }
+
+ public void startDetector() {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ detectByOneRound();
+ } catch (Exception e) {
+ log.warn("Unexpected exception raised while detecting service reachability", e);
+ }
+ }
+ }, 3, 3, TimeUnit.SECONDS);
+ }
+
+ public void shutdown() {
+ this.scheduledExecutorService.shutdown();
+ }
@Override
- public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
+ public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration,
+ final boolean reachable) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
- faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
-
+ faultItem.updateNotAvailableDuration(notAvailableDuration);
+ faultItem.setReachable(reachable);
old = this.faultItemTable.putIfAbsent(name, faultItem);
- if (old != null) {
- old.setCurrentLatency(currentLatency);
- old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
- }
- } else {
+ }
+
+ if (null != old) {
old.setCurrentLatency(currentLatency);
- old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
+ old.updateNotAvailableDuration(notAvailableDuration);
+ old.setReachable(reachable);
+ }
+
+ if (!reachable) {
+ log.info(name + " is unreachable, it will not be used until it's reachable");
}
}
@@ -57,6 +124,14 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
return true;
}
+ public boolean isReachable(final String name) {
+ final FaultItem faultItem = this.faultItemTable.get(name);
+ if (faultItem != null) {
+ return faultItem.isReachable();
+ }
+ return true;
+ }
+
@Override
public void remove(final String name) {
this.faultItemTable.remove(name);
@@ -65,68 +140,98 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
@Override
public String pickOneAtLeast() {
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
- List<FaultItem> tmpList = new LinkedList<>();
+ List<FaultItem> tmpList = new LinkedList<FaultItem>();
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
tmpList.add(faultItem);
}
+
if (!tmpList.isEmpty()) {
- Collections.sort(tmpList);
- final int half = tmpList.size() / 2;
- if (half <= 0) {
- return tmpList.get(0).getName();
- } else {
- final int i = this.randomItem.incrementAndGet() % half;
- return tmpList.get(i).getName();
+ Collections.shuffle(tmpList);
+ for (FaultItem faultItem : tmpList) {
+ if (faultItem.reachableFlag) {
+ return faultItem.name;
+ }
}
}
+
return null;
}
@Override
public String toString() {
return "LatencyFaultToleranceImpl{" +
- "faultItemTable=" + faultItemTable +
- ", whichItemWorst=" + randomItem +
- '}';
+ "faultItemTable=" + faultItemTable +
+ ", whichItemWorst=" + whichItemWorst +
+ '}';
+ }
+
+ public void setDetectTimeout(final int detectTimeout) {
+ this.detectTimeout = detectTimeout;
}
- class FaultItem implements Comparable<FaultItem> {
+ public void setDetectInterval(final int detectInterval) {
+ this.detectInterval = detectInterval;
+ }
+
+ public class FaultItem implements Comparable<FaultItem> {
private final String name;
private volatile long currentLatency;
private volatile long startTimestamp;
+ private volatile long checkStamp;
+ private volatile boolean reachableFlag;
public FaultItem(final String name) {
this.name = name;
}
+ public void updateNotAvailableDuration(long notAvailableDuration) {
+ if (notAvailableDuration > 0 && System.currentTimeMillis() + notAvailableDuration > this.startTimestamp) {
+ this.startTimestamp = System.currentTimeMillis() + notAvailableDuration;
+ log.info(name + " will be isolated for " + notAvailableDuration + " ms.");
+ }
+ }
+
@Override
public int compareTo(final FaultItem other) {
if (this.isAvailable() != other.isAvailable()) {
- if (this.isAvailable())
+ if (this.isAvailable()) {
return -1;
+ }
- if (other.isAvailable())
+ if (other.isAvailable()) {
return 1;
+ }
}
- if (this.currentLatency < other.currentLatency)
+ if (this.currentLatency < other.currentLatency) {
return -1;
- else if (this.currentLatency > other.currentLatency) {
+ } else if (this.currentLatency > other.currentLatency) {
return 1;
}
- if (this.startTimestamp < other.startTimestamp)
+ if (this.startTimestamp < other.startTimestamp) {
return -1;
- else if (this.startTimestamp > other.startTimestamp) {
+ } else if (this.startTimestamp > other.startTimestamp) {
return 1;
}
-
return 0;
}
+ public void setReachable(boolean reachableFlag) {
+ this.reachableFlag = reachableFlag;
+ }
+
+ public void setCheckStamp(long checkStamp) {
+ this.checkStamp = checkStamp;
+ }
+
public boolean isAvailable() {
- return (System.currentTimeMillis() - startTimestamp) >= 0;
+ return reachableFlag && System.currentTimeMillis() >= startTimestamp;
+ }
+
+ public boolean isReachable() {
+ return reachableFlag;
}
@Override
@@ -139,28 +244,32 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
@Override
public boolean equals(final Object o) {
- if (this == o)
+ if (this == o) {
return true;
- if (!(o instanceof FaultItem))
+ }
+ if (!(o instanceof FaultItem)) {
return false;
+ }
final FaultItem faultItem = (FaultItem) o;
- if (getCurrentLatency() != faultItem.getCurrentLatency())
+ if (getCurrentLatency() != faultItem.getCurrentLatency()) {
return false;
- if (getStartTimestamp() != faultItem.getStartTimestamp())
+ }
+ if (getStartTimestamp() != faultItem.getStartTimestamp()) {
return false;
+ }
return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
-
}
@Override
public String toString() {
return "FaultItem{" +
- "name='" + name + '\'' +
- ", currentLatency=" + currentLatency +
- ", startTimestamp=" + startTimestamp +
- '}';
+ "name='" + name + '\'' +
+ ", currentLatency=" + currentLatency +
+ ", startTimestamp=" + startTimestamp +
+ ", reachableFlag=" + reachableFlag +
+ '}';
}
public String getName() {
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index 1e1953fad..c01490784 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -17,25 +17,86 @@
package org.apache.rocketmq.client.latency;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo.QueueFilter;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class MQFaultStrategy {
- private final static Logger log = LoggerFactory.getLogger(MQFaultStrategy.class);
- private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
+ private LatencyFaultTolerance<String> latencyFaultTolerance;
+ private boolean sendLatencyFaultEnable;
+ private boolean startDetectorEnable;
+ private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L};
+ private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 10000L, 30000L};
- private boolean sendLatencyFaultEnable = false;
+ public static class BrokerFilter implements QueueFilter {
+ private String lastBrokerName;
+
+ public void setLastBrokerName(String lastBrokerName) {
+ this.lastBrokerName = lastBrokerName;
+ }
+
+ @Override public boolean filter(MessageQueue mq) {
+ if (lastBrokerName != null) {
+ return !mq.getBrokerName().equals(lastBrokerName);
+ }
+ return true;
+ }
+ }
+
+ private ThreadLocal<BrokerFilter> threadBrokerFilter = new ThreadLocal<BrokerFilter>() {
+ @Override protected BrokerFilter initialValue() {
+ return new BrokerFilter();
+ }
+ };
+
+ private QueueFilter reachableFilter = new QueueFilter() {
+ @Override public boolean filter(MessageQueue mq) {
+ return latencyFaultTolerance.isReachable(mq.getBrokerName());
+ }
+ };
+
+ private QueueFilter availableFilter = new QueueFilter() {
+ @Override public boolean filter(MessageQueue mq) {
+ return latencyFaultTolerance.isAvailable(mq.getBrokerName());
+ }
+ };
+
+
+ public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector serviceDetector) {
+ this.setStartDetectorEnable(cc.isStartDetectorEnable());
+ this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
+ this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher, serviceDetector);
+ this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval());
+ this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout());
+ }
+
+ // For unit test.
+ public MQFaultStrategy(ClientConfig cc, LatencyFaultTolerance<String> tolerance) {
+ this.setStartDetectorEnable(cc.isStartDetectorEnable());
+ this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
+ this.latencyFaultTolerance = tolerance;
+ this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval());
+ this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout());
+ }
- private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
- private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
public long[] getNotAvailableDuration() {
return notAvailableDuration;
}
+ public QueueFilter getAvailableFilter() {
+ return availableFilter;
+ }
+
+ public QueueFilter getReachableFilter() {
+ return reachableFilter;
+ }
+
+ public ThreadLocal<BrokerFilter> getThreadBrokerFilter() {
+ return threadBrokerFilter;
+ }
+
public void setNotAvailableDuration(final long[] notAvailableDuration) {
this.notAvailableDuration = notAvailableDuration;
}
@@ -56,51 +117,69 @@ public class MQFaultStrategy {
this.sendLatencyFaultEnable = sendLatencyFaultEnable;
}
- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
+ public boolean isStartDetectorEnable() {
+ return startDetectorEnable;
+ }
+
+ public void setStartDetectorEnable(boolean startDetectorEnable) {
+ this.startDetectorEnable = startDetectorEnable;
+ }
+
+ public void startDetector() {
+ // user should start the detector
+ // and the thread should not be in running state.
+ if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
+ // start the detector.
+ this.latencyFaultTolerance.startDetector();
+ }
+ }
+
+ public void shutdown() {
+ if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
+ this.latencyFaultTolerance.shutdown();
+ }
+ }
+
+ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
+ BrokerFilter brokerFilter = threadBrokerFilter.get();
+ brokerFilter.setLastBrokerName(lastBrokerName);
if (this.sendLatencyFaultEnable) {
- try {
- int index = tpInfo.getSendWhichQueue().incrementAndGet();
- for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
- int pos = index++ % tpInfo.getMessageQueueList().size();
- MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
- if (!StringUtils.equals(lastBrokerName, mq.getBrokerName()) && latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
- return mq;
- }
- }
-
- final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
- int writeQueueNums = tpInfo.getWriteQueueNumsByBroker(notBestBroker);
- if (writeQueueNums > 0) {
- final MessageQueue mq = tpInfo.selectOneMessageQueue();
- if (notBestBroker != null) {
- mq.setBrokerName(notBestBroker);
- mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
- }
- return mq;
- } else {
- latencyFaultTolerance.remove(notBestBroker);
- }
- } catch (Exception e) {
- log.error("Error occurred when selecting message queue", e);
+ if (resetIndex) {
+ tpInfo.resetIndex();
+ }
+ MessageQueue mq = tpInfo.selectOneMessageQueue(availableFilter, brokerFilter);
+ if (mq != null) {
+ return mq;
+ }
+
+ mq = tpInfo.selectOneMessageQueue(reachableFilter, brokerFilter);
+ if (mq != null) {
+ return mq;
}
return tpInfo.selectOneMessageQueue();
}
- return tpInfo.selectOneMessageQueue(lastBrokerName);
+ MessageQueue mq = tpInfo.selectOneMessageQueue(brokerFilter);
+ if (mq != null) {
+ return mq;
+ }
+ return tpInfo.selectOneMessageQueue();
}
- public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
+ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation,
+ final boolean reachable) {
if (this.sendLatencyFaultEnable) {
- long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
- this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
+ long duration = computeNotAvailableDuration(isolation ? 10000 : currentLatency);
+ this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration, reachable);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
- if (currentLatency >= latencyMax[i])
+ if (currentLatency >= latencyMax[i]) {
return this.notAvailableDuration[i];
+ }
}
return 0;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java b/client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java
similarity index 65%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java
rename to client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java
index 6aa547047..1c29ba334 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java
@@ -14,20 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.protocol.body;
+package org.apache.rocketmq.client.latency;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+public interface Resolver {
-public class GetRemoteClientConfigBody extends RemotingSerializable {
- private List<String> keys = new ArrayList<>();
-
- public List<String> getKeys() {
- return keys;
- }
-
- public void setKeys(List<String> keys) {
- this.keys = keys;
- }
+ String resolve(String name);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java b/client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java
new file mode 100644
index 000000000..c6ffbad1c
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.latency;
+
+/**
+ * Detect whether the remote service state is normal.
+ */
+public interface ServiceDetector {
+
+ /**
+ * Check if the remote service is normal.
+ * @param endpoint Service endpoint to check against
+ * @return true if the service is back to normal; false otherwise.
+ */
+ boolean detect(String endpoint, long timeoutMillis);
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java b/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java
index 86690e40b..42ccdae5a 100644
--- a/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java
@@ -16,11 +16,14 @@
*/
package org.apache.rocketmq.client.latency;
-import java.util.concurrent.TimeUnit;
+import org.awaitility.core.ThrowingRunnable;
import org.junit.Before;
import org.junit.Test;
+import java.util.concurrent.TimeUnit;
+
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
public class LatencyFaultToleranceImplTest {
private LatencyFaultTolerance<String> latencyFaultTolerance;
@@ -29,28 +32,31 @@ public class LatencyFaultToleranceImplTest {
@Before
public void init() {
- latencyFaultTolerance = new LatencyFaultToleranceImpl();
+ latencyFaultTolerance = new LatencyFaultToleranceImpl(null, null);
}
@Test
public void testUpdateFaultItem() throws Exception {
- latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000);
+ latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000, true);
assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse();
assertThat(latencyFaultTolerance.isAvailable(anotherBrokerName)).isTrue();
}
@Test
public void testIsAvailable() throws Exception {
- latencyFaultTolerance.updateFaultItem(brokerName, 3000, 50);
+ latencyFaultTolerance.updateFaultItem(brokerName, 3000, 50, true);
assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse();
- TimeUnit.MILLISECONDS.sleep(70);
- assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue();
+ await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(new ThrowingRunnable() {
+ @Override public void run() throws Throwable {
+ assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue();
+ }
+ });
}
@Test
public void testRemove() throws Exception {
- latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000);
+ latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000, true);
assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse();
latencyFaultTolerance.remove(brokerName);
assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue();
@@ -58,10 +64,20 @@ public class LatencyFaultToleranceImplTest {
@Test
public void testPickOneAtLeast() throws Exception {
- latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000);
+ latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000, true);
assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName);
- latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000);
- assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName);
+ // Bad case, since pickOneAtLeast's behavior becomes random
+ // latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000, "127.0.0.1:12011", true);
+ // assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName);
+ }
+
+ @Test
+ public void testIsReachable() throws Exception {
+ latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000, true);
+ assertThat(latencyFaultTolerance.isReachable(brokerName)).isEqualTo(true);
+
+ latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000, false);
+ assertThat(latencyFaultTolerance.isReachable(anotherBrokerName)).isEqualTo(false);
}
}
\ No newline at end of file
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index fada0efd7..485b95c42 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -41,7 +41,6 @@ import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.GetBrokerMemberGroupResponseBody;
-import org.apache.rocketmq.remoting.protocol.body.GetRemoteClientConfigBody;
import org.apache.rocketmq.remoting.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
@@ -132,8 +131,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
- case RequestCode.GET_CLIENT_CONFIG:
- return this.getClientConfigs(ctx, request);
default:
String error = " request type " + request.getCode() + " not supported";
return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
@@ -661,25 +658,4 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- private RemotingCommand getClientConfigs(ChannelHandlerContext ctx, RemotingCommand request) {
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- final GetRemoteClientConfigBody body = GetRemoteClientConfigBody.decode(request.getBody(), GetRemoteClientConfigBody.class);
-
- String content = this.namesrvController.getConfiguration().getClientConfigsFormatString(body.getKeys());
- if (StringUtils.isNotBlank(content)) {
- try {
- response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
- } catch (UnsupportedEncodingException e) {
- log.error("getConfig error, ", e);
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("UnsupportedEncodingException " + e);
- return response;
- }
- }
-
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
- return response;
- }
-
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 2994893d7..b2478fec3 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -232,6 +232,12 @@ public class ProxyConfig implements ConfigFile {
private String remotingAccessAddr = "";
private int remotingListenPort = 8080;
+ // related to proxy's send strategy in cluster mode.
+ private boolean sendLatencyEnable = false;
+ private boolean startDetectorEnable = false;
+ private int detectTimeout = 200;
+ private int detectInterval = 2 * 1000;
+
private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER;
private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER;
private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
@@ -1409,6 +1415,46 @@ public class ProxyConfig implements ConfigFile {
this.remotingWaitTimeMillsInDefaultQueue = remotingWaitTimeMillsInDefaultQueue;
}
+ public boolean isSendLatencyEnable() {
+ return sendLatencyEnable;
+ }
+
+ public boolean isStartDetectorEnable() {
+ return startDetectorEnable;
+ }
+
+ public void setStartDetectorEnable(boolean startDetectorEnable) {
+ this.startDetectorEnable = startDetectorEnable;
+ }
+
+ public void setSendLatencyEnable(boolean sendLatencyEnable) {
+ this.sendLatencyEnable = sendLatencyEnable;
+ }
+
+ public boolean getStartDetectorEnable() {
+ return this.startDetectorEnable;
+ }
+
+ public boolean getSendLatencyEnable() {
+ return this.sendLatencyEnable;
+ }
+
+ public int getDetectTimeout() {
+ return detectTimeout;
+ }
+
+ public void setDetectTimeout(int detectTimeout) {
+ this.detectTimeout = detectTimeout;
+ }
+
+ public int getDetectInterval() {
+ return detectInterval;
+ }
+
+ public void setDetectInterval(int detectInterval) {
+ this.detectInterval = detectInterval;
+ }
+
public boolean isEnableBatchAck() {
return enableBatchAck;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
index 6146c80cd..f670df205 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
@@ -382,7 +382,7 @@ public class SendMessageActivity extends AbstractMessingActivity {
int bucket = Hashing.consistentHash(shardingKey.hashCode(), writeQueues.size());
targetMessageQueue = writeQueues.get(bucket);
} else {
- targetMessageQueue = messageQueueView.getWriteSelector().selectOne(false);
+ targetMessageQueue = messageQueueView.getWriteSelector().selectOneByPipeline(false);
}
return targetMessageQueue;
} catch (Exception e) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
index 0d0c62168..a80f6df0b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.processor;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
@@ -66,6 +67,8 @@ public class ProducerProcessor extends AbstractProcessor {
public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSelector queueSelector,
String producerGroup, int sysFlag, List<Message> messageList, long timeoutMillis) {
CompletableFuture<List<SendResult>> future = new CompletableFuture<>();
+ long beginTimestampFirst = System.currentTimeMillis();
+ AddressableMessageQueue messageQueue = null;
try {
Message message = messageList.get(0);
String topic = message.getTopic();
@@ -79,7 +82,7 @@ public class ProducerProcessor extends AbstractProcessor {
}
}
}
- AddressableMessageQueue messageQueue = queueSelector.select(ctx,
+ messageQueue = queueSelector.select(ctx,
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx, topic));
if (messageQueue == null) {
throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue");
@@ -90,6 +93,7 @@ public class ProducerProcessor extends AbstractProcessor {
}
SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId());
+ AddressableMessageQueue finalMessageQueue = messageQueue;
future = this.serviceManager.getMessageService().sendMessage(
ctx,
messageQueue,
@@ -102,11 +106,19 @@ public class ProducerProcessor extends AbstractProcessor {
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus()) &&
tranType == MessageSysFlag.TRANSACTION_PREPARED_TYPE &&
StringUtils.isNotBlank(sendResult.getTransactionId())) {
- fillTransactionData(ctx, producerGroup, messageQueue, sendResult, messageList);
+ fillTransactionData(ctx, producerGroup, finalMessageQueue, sendResult, messageList);
}
}
return sendResultList;
- }, this.executor);
+ }, this.executor)
+ .whenComplete((result, exception) -> {
+ long endTimestamp = System.currentTimeMillis();
+ if (exception != null) {
+ this.serviceManager.getTopicRouteService().updateFaultItem(finalMessageQueue.getBrokerName(), endTimestamp - beginTimestampFirst, true, false);
+ } else {
+ this.serviceManager.getTopicRouteService().updateFaultItem(finalMessageQueue.getBrokerName(),endTimestamp - beginTimestampFirst, false, true);
+ }
+ });
} catch (Throwable t) {
future.completeExceptionally(t);
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
index d67b68f38..aced15cee 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
@@ -54,7 +54,7 @@ public class LocalTopicRouteService extends TopicRouteService {
@Override
public MessageQueueView getCurrentMessageQueueView(ProxyContext ctx, String topic) throws Exception {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
- return new MessageQueueView(topic, toTopicRouteData(topicConfig));
+ return new MessageQueueView(topic, toTopicRouteData(topicConfig), null);
}
@Override
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
index 85cd18d45..f25fb907e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.proxy.service.route;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
import com.google.common.math.IntMath;
import java.util.ArrayList;
import java.util.Collections;
@@ -30,6 +31,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
@@ -44,8 +47,9 @@ public class MessageQueueSelector {
private final Map<String, AddressableMessageQueue> brokerNameQueueMap = new ConcurrentHashMap<>();
private final AtomicInteger queueIndex;
private final AtomicInteger brokerIndex;
+ private MQFaultStrategy mqFaultStrategy;
- public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read) {
+ public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, MQFaultStrategy mqFaultStrategy, boolean read) {
if (read) {
this.queues.addAll(buildRead(topicRouteWrapper));
} else {
@@ -55,6 +59,7 @@ public class MessageQueueSelector {
Random random = new Random();
this.queueIndex = new AtomicInteger(random.nextInt());
this.brokerIndex = new AtomicInteger(random.nextInt());
+ this.mqFaultStrategy = mqFaultStrategy;
}
private static List<AddressableMessageQueue> buildRead(TopicRouteWrapper topicRoute) {
@@ -154,6 +159,86 @@ public class MessageQueueSelector {
return selectOneByIndex(nextIndex, onlyBroker);
}
+ public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) {
+ if (mqFaultStrategy != null && mqFaultStrategy.isSendLatencyFaultEnable()) {
+ List<MessageQueue> messageQueueList = null;
+ MessageQueue messageQueue = null;
+ if (onlyBroker) {
+ messageQueueList = transferAddressableQueues(brokerActingQueues);
+ } else {
+ messageQueueList = transferAddressableQueues(queues);
+ }
+ AddressableMessageQueue addressableMessageQueue = null;
+
+ // use both available filter.
+ messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex,
+ mqFaultStrategy.getAvailableFilter(), mqFaultStrategy.getReachableFilter());
+ addressableMessageQueue = transferQueue2Addressable(messageQueue);
+ if (addressableMessageQueue != null) {
+ return addressableMessageQueue;
+ }
+
+ // use available filter.
+ messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex,
+ mqFaultStrategy.getAvailableFilter());
+ addressableMessageQueue = transferQueue2Addressable(messageQueue);
+ if (addressableMessageQueue != null) {
+ return addressableMessageQueue;
+ }
+
+ // no available filter, then use reachable filter.
+ messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex,
+ mqFaultStrategy.getReachableFilter());
+ addressableMessageQueue = transferQueue2Addressable(messageQueue);
+ if (addressableMessageQueue != null) {
+ return addressableMessageQueue;
+ }
+ }
+
+ // SendLatency is not enabled, or no queue is selected, then select by index.
+ return selectOne(onlyBroker);
+ }
+
+ private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, AtomicInteger sendQueue, TopicPublishInfo.QueueFilter...filter) {
+ if (messageQueueList == null || messageQueueList.isEmpty()) {
+ return null;
+ }
+ if (filter != null && filter.length != 0) {
+ for (int i = 0; i < messageQueueList.size(); i++) {
+ int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
+ MessageQueue mq = messageQueueList.get(index);
+ boolean filterResult = true;
+ for (TopicPublishInfo.QueueFilter f: filter) {
+ Preconditions.checkNotNull(f);
+ filterResult &= f.filter(mq);
+ }
+ if (filterResult) {
+ return mq;
+ }
+ }
+ }
+ return null;
+ }
+
+ public List<MessageQueue> transferAddressableQueues(List<AddressableMessageQueue> addressableMessageQueueList) {
+ if (addressableMessageQueueList == null) {
+ return null;
+ }
+
+ return addressableMessageQueueList.stream()
+ .map(AddressableMessageQueue::getMessageQueue)
+ .collect(Collectors.toList());
+ }
+
+ private AddressableMessageQueue transferQueue2Addressable(MessageQueue messageQueue) {
+ for (AddressableMessageQueue amq: queues) {
+ if (amq.getMessageQueue().equals(messageQueue)) {
+ return amq;
+ }
+ }
+ return null;
+ }
+
public AddressableMessageQueue selectNextOne(AddressableMessageQueue last) {
boolean onlyBroker = last.getQueueId() < 0;
AddressableMessageQueue newOne = last;
@@ -190,6 +275,14 @@ public class MessageQueueSelector {
return brokerActingQueues;
}
+ public MQFaultStrategy getMQFaultStrategy() {
+ return mqFaultStrategy;
+ }
+
+ public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
+ this.mqFaultStrategy = mqFaultStrategy;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
index fe5387cfd..8b3c2f7c8 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
@@ -17,20 +17,22 @@
package org.apache.rocketmq.proxy.service.route;
import com.google.common.base.MoreObjects;
+import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
public class MessageQueueView {
- public static final MessageQueueView WRAPPED_EMPTY_QUEUE = new MessageQueueView("", new TopicRouteData());
+ public static final MessageQueueView WRAPPED_EMPTY_QUEUE = new MessageQueueView("", new TopicRouteData(), null);
private final MessageQueueSelector readSelector;
private final MessageQueueSelector writeSelector;
private final TopicRouteWrapper topicRouteWrapper;
+ private MQFaultStrategy mqFaultStrategy;
- public MessageQueueView(String topic, TopicRouteData topicRouteData) {
+ public MessageQueueView(String topic, TopicRouteData topicRouteData, MQFaultStrategy mqFaultStrategy) {
this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic);
- this.readSelector = new MessageQueueSelector(topicRouteWrapper, true);
- this.writeSelector = new MessageQueueSelector(topicRouteWrapper, false);
+ this.readSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, true);
+ this.writeSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, false);
}
public TopicRouteData getTopicRouteData() {
@@ -65,4 +67,12 @@ public class MessageQueueView {
.add("topicRouteWrapper", topicRouteWrapper)
.toString();
}
+
+ public MQFaultStrategy getMQFaultStrategy() {
+ return mqFaultStrategy;
+ }
+
+ public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
+ this.mqFaultStrategy = mqFaultStrategy;
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index 84348adc3..74769a423 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -25,7 +25,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Optional;
+import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.latency.MQFaultStrategy;
+import org.apache.rocketmq.client.latency.Resolver;
+import org.apache.rocketmq.client.latency.ServiceDetector;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -39,6 +45,7 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -47,6 +54,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
private final MQClientAPIFactory mqClientAPIFactory;
+ private MQFaultStrategy mqFaultStrategy;
protected final LoadingCache<String /* topicName */, MessageQueueView> topicCache;
protected final ScheduledExecutorService scheduledExecutorService;
@@ -97,15 +105,83 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
}
}
});
-
+ ServiceDetector serviceDetector = new ServiceDetector() {
+ @Override
+ public boolean detect(String endpoint, long timeoutMillis) {
+ Optional<String> candidateTopic = pickTopic();
+ if (!candidateTopic.isPresent()) {
+ return false;
+ }
+ try {
+ GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
+ requestHeader.setTopic(candidateTopic.get());
+ requestHeader.setQueueId(0);
+ Long maxOffset = mqClientAPIFactory.getClient().getMaxOffset(endpoint, requestHeader, timeoutMillis).get();
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ };
+ mqFaultStrategy = new MQFaultStrategy(extractClientConfigFromProxyConfig(config), new Resolver() {
+ @Override
+ public String resolve(String name) {
+ try {
+ String brokerAddr = getBrokerAddr(null, name);
+ return brokerAddr;
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ }, serviceDetector);
this.init();
}
+ // pickup one topic in the topic cache
+ private Optional<String> pickTopic() {
+ if (topicCache.asMap().isEmpty()) {
+ return Optional.absent();
+ }
+ return Optional.of(topicCache.asMap().keySet().iterator().next());
+ }
+
protected void init() {
this.appendShutdown(this.scheduledExecutorService::shutdown);
this.appendStartAndShutdown(this.mqClientAPIFactory);
}
+ @Override
+ public void shutdown() throws Exception {
+ if (this.mqFaultStrategy.isStartDetectorEnable()) {
+ mqFaultStrategy.shutdown();
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ if (this.mqFaultStrategy.isStartDetectorEnable()) {
+ this.mqFaultStrategy.startDetector();
+ }
+ }
+
+ public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig proxyConfig) {
+ ClientConfig tempClientConfig = new ClientConfig();
+ tempClientConfig.setSendLatencyEnable(proxyConfig.getSendLatencyEnable());
+ tempClientConfig.setStartDetectorEnable(proxyConfig.getStartDetectorEnable());
+ tempClientConfig.setDetectTimeout(proxyConfig.getDetectTimeout());
+ tempClientConfig.setDetectInterval(proxyConfig.getDetectInterval());
+ return tempClientConfig;
+ }
+
+ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation,
+ boolean reachable) {
+ this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable);
+ }
+
+ public MQFaultStrategy getMqFaultStrategy() {
+ return this.mqFaultStrategy;
+ }
+
public MessageQueueView getAllMessageQueueView(ProxyContext ctx, String topicName) throws Exception {
return getCacheMessageQueueWrapper(this.topicCache, topicName);
}
@@ -136,7 +212,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) {
if (isTopicRouteValid(topicRouteData)) {
- MessageQueueView tmp = new MessageQueueView(topic, topicRouteData);
+ MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, TopicRouteService.this.getMqFaultStrategy());
log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp);
return tmp;
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index 7fd9a9ffd..77ae5e4d1 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -93,7 +93,6 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong()))
.thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList())));
-
ProxyContext context = createContext();
context.setRemainingMs(1L);
this.receiveMessageActivity.receiveMessage(
@@ -274,7 +273,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
}
@Test
- public void testReceiveMessageQueueSelector() {
+ public void testReceiveMessageQueueSelector() throws Exception {
TopicRouteData topicRouteData = new TopicRouteData();
List<QueueData> queueDatas = new ArrayList<>();
for (int i = 0; i < 2; i++) {
@@ -298,7 +297,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
}
topicRouteData.setBrokerDatas(brokerDatas);
- MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData);
+ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null);
ReceiveMessageActivity.ReceiveMessageQueueSelector selector = new ReceiveMessageActivity.ReceiveMessageQueueSelector("");
AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView);
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
index 588423bb9..4882a5ed8 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
@@ -35,6 +35,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
@@ -49,6 +51,7 @@ import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
+import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
@@ -62,15 +65,19 @@ import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SendMessageActivityTest extends BaseActivityTest {
protected static final String BROKER_NAME = "broker";
+ protected static final String BROKER_NAME2 = "broker2";
protected static final String CLUSTER_NAME = "cluster";
protected static final String BROKER_ADDR = "127.0.0.1:10911";
+ protected static final String BROKER_ADDR2 = "127.0.0.1:10912";
private static final String TOPIC = "topic";
private static final String CONSUMER_GROUP = "consumerGroup";
+ MQFaultStrategy mqFaultStrategy;
private SendMessageActivity sendMessageActivity;
@@ -262,7 +269,7 @@ public class SendMessageActivityTest extends BaseActivityTest {
}
@Test
- public void testSendOrderMessageQueueSelector() {
+ public void testSendOrderMessageQueueSelector() throws Exception {
TopicRouteData topicRouteData = new TopicRouteData();
QueueData queueData = new QueueData();
BrokerData brokerData = new BrokerData();
@@ -277,7 +284,7 @@ public class SendMessageActivityTest extends BaseActivityTest {
brokerData.setBrokerAddrs(brokerAddrs);
topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData));
- MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData);
+ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null);
SendMessageActivity.SendMessageQueueSelector selector1 = new SendMessageActivity.SendMessageQueueSelector(
SendMessageRequest.newBuilder()
.addMessages(Message.newBuilder()
@@ -288,6 +295,12 @@ public class SendMessageActivityTest extends BaseActivityTest {
.build()
);
+ TopicRouteService topicRouteService = mock(TopicRouteService.class);
+ MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class);
+ when(topicRouteService.getAllMessageQueueView(any(), any())).thenReturn(messageQueueView);
+ when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy);
+ when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false);
+
SendMessageActivity.SendMessageQueueSelector selector2 = new SendMessageActivity.SendMessageQueueSelector(
SendMessageRequest.newBuilder()
.addMessages(Message.newBuilder()
@@ -328,12 +341,17 @@ public class SendMessageActivityTest extends BaseActivityTest {
brokerData.setBrokerAddrs(brokerAddrs);
topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData));
- MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData);
+
SendMessageActivity.SendMessageQueueSelector selector = new SendMessageActivity.SendMessageQueueSelector(
SendMessageRequest.newBuilder()
.addMessages(Message.newBuilder().build())
.build()
);
+ TopicRouteService topicRouteService = mock(TopicRouteService.class);
+ MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class);
+ when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy);
+ when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false);
+ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy());
AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView);
AddressableMessageQueue secondSelect = selector.select(ProxyContext.create(), messageQueueView);
@@ -343,6 +361,45 @@ public class SendMessageActivityTest extends BaseActivityTest {
assertNotEquals(firstSelect, secondSelect);
}
+ @Test
+ public void testSendNormalMessageQueueSelectorPipeLine() throws Exception {
+ TopicRouteData topicRouteData = new TopicRouteData();
+ int queueNums = 2;
+
+ QueueData queueData = createQueueData(BROKER_NAME, queueNums);
+ QueueData queueData2 = createQueueData(BROKER_NAME2, queueNums);
+ topicRouteData.setQueueDatas(Lists.newArrayList(queueData,queueData2));
+
+
+ BrokerData brokerData = createBrokerData(CLUSTER_NAME, BROKER_NAME, BROKER_ADDR);
+ BrokerData brokerData2 = createBrokerData(CLUSTER_NAME, BROKER_NAME2, BROKER_ADDR2);
+ topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData, brokerData2));
+
+ SendMessageActivity.SendMessageQueueSelector selector = new SendMessageActivity.SendMessageQueueSelector(
+ SendMessageRequest.newBuilder()
+ .addMessages(Message.newBuilder().build())
+ .build()
+ );
+
+ ClientConfig cc = new ClientConfig();
+ this.mqFaultStrategy = new MQFaultStrategy(cc, null, null);
+ mqFaultStrategy.setSendLatencyFaultEnable(true);
+ mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, true);
+ mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, false);
+
+ TopicRouteService topicRouteService = mock(TopicRouteService.class);
+ when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy);
+ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy());
+
+
+ AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView);
+ assertEquals(firstSelect.getBrokerName(), BROKER_NAME2);
+
+ mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, false);
+ mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, true);
+ AddressableMessageQueue secondSelect = selector.select(ProxyContext.create(), messageQueueView);
+ assertEquals(secondSelect.getBrokerName(), BROKER_NAME);
+ }
@Test
public void testParameterValidate() {
// too large message body
@@ -850,4 +907,23 @@ public class SendMessageActivityTest extends BaseActivityTest {
}
return sb.toString();
}
+
+ private static QueueData createQueueData(String brokerName, int writeQueueNums) {
+ QueueData queueData = new QueueData();
+ queueData.setBrokerName(brokerName);
+ queueData.setWriteQueueNums(writeQueueNums);
+ queueData.setPerm(PermName.PERM_WRITE);
+ return queueData;
+ }
+
+ private static BrokerData createBrokerData(String clusterName, String brokerName, String brokerAddrs) {
+ BrokerData brokerData = new BrokerData();
+ brokerData.setCluster(clusterName);
+ brokerData.setBrokerName(brokerName);
+ HashMap<Long, String> brokerAddrsMap = new HashMap<>();
+ brokerAddrsMap.put(MixAll.MASTER_ID, brokerAddrs);
+ brokerData.setBrokerAddrs(brokerAddrsMap);
+
+ return brokerData;
+ }
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
index c97bd5a72..ca6fe909e 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
@@ -78,7 +78,7 @@ public class BaseServiceTest extends InitConfigTest {
topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData));
when(this.topicRouteService.getAllMessageQueueView(any(), eq(ERR_TOPIC))).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, ""));
- when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData));
- when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData));
+ when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData, null));
+ when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData, null));
}
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
index e44ed28f4..d150f87c4 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
@@ -30,12 +30,12 @@ public class MessageQueueSelectorTest extends BaseServiceTest {
public void testReadMessageQueue() {
queueData.setPerm(PermName.PERM_READ);
queueData.setReadQueueNums(0);
- MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true);
+ MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true);
assertTrue(messageQueueSelector.getQueues().isEmpty());
queueData.setPerm(PermName.PERM_READ);
queueData.setReadQueueNums(3);
- messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true);
+ messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true);
assertEquals(3, messageQueueSelector.getQueues().size());
assertEquals(1, messageQueueSelector.getBrokerActingQueues().size());
for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) {
@@ -58,12 +58,12 @@ public class MessageQueueSelectorTest extends BaseServiceTest {
public void testWriteMessageQueue() {
queueData.setPerm(PermName.PERM_WRITE);
queueData.setReadQueueNums(0);
- MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false);
+ MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false);
assertTrue(messageQueueSelector.getQueues().isEmpty());
queueData.setPerm(PermName.PERM_WRITE);
queueData.setWriteQueueNums(3);
- messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false);
+ messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false);
assertEquals(3, messageQueueSelector.getQueues().size());
assertEquals(1, messageQueueSelector.getBrokerActingQueues().size());
for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) {
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index c67f4953d..43fba3d03 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -132,7 +132,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
brokerAddr.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddr);
topicRouteData.getBrokerDatas().add(brokerData);
- MessageQueueView messageQueueView = new MessageQueueView("foo", topicRouteData);
+ MessageQueueView messageQueueView = new MessageQueueView("foo", topicRouteData, null);
when(this.topicRouteService.getAllMessageQueueView(any(), anyString())).thenReturn(messageQueueView);
}
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
index a0063544e..91af74cbe 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
@@ -64,7 +64,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest {
this.clusterTransactionService = new ClusterTransactionService(this.topicRouteService, this.producerManager,
this.mqClientAPIFactory);
- MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData);
+ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null);
when(this.topicRouteService.getAllMessageQueueView(any(), anyString()))
.thenReturn(messageQueueView);
@@ -127,7 +127,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest {
brokerData.setBrokerAddrs(brokerAddrs);
topicRouteData.getQueueDatas().add(queueData);
topicRouteData.getBrokerDatas().add(brokerData);
- when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData));
+ when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData, null));
TopicRouteData clusterTopicRouteData = new TopicRouteData();
QueueData clusterQueueData = new QueueData();
@@ -141,7 +141,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest {
brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR);
clusterBrokerData.setBrokerAddrs(brokerAddrs);
clusterTopicRouteData.setBrokerDatas(Lists.newArrayList(clusterBrokerData));
- when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, clusterTopicRouteData));
+ when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, clusterTopicRouteData, null));
TopicRouteData clusterTopicRouteData2 = new TopicRouteData();
QueueData clusterQueueData2 = new QueueData();
@@ -155,7 +155,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest {
brokerAddrs.put(MixAll.MASTER_ID, brokerAddr2);
clusterBrokerData2.setBrokerAddrs(brokerAddrs);
clusterTopicRouteData2.setBrokerDatas(Lists.newArrayList(clusterBrokerData2));
- when(this.topicRouteService.getAllMessageQueueView(any(), eq(clusterName2))).thenReturn(new MessageQueueView(clusterName2, clusterTopicRouteData2));
+ when(this.topicRouteService.getAllMessageQueueView(any(), eq(clusterName2))).thenReturn(new MessageQueueView(clusterName2, clusterTopicRouteData2, null));
ConfigurationManager.getProxyConfig().setTransactionHeartbeatBatchNum(2);
this.clusterTransactionService.start();
--
2.32.0.windows.2