2024 lines
99 KiB
Diff
2024 lines
99 KiB
Diff
From b028277018946868838a82a08211071bc231a175 Mon Sep 17 00:00:00 2001
|
|
From: Ji Juntao <juntao.jjt@alibaba-inc.com>
|
|
Date: Tue, 29 Aug 2023 16:13:38 +0800
|
|
Subject: [PATCH] [ISSUE #6567] [RIP-63] Queue Selection Strategy Optimization
|
|
(#6568)
|
|
|
|
Optimize the proxy's and client's selection strategy for brokers when sending messages, and use multiple selection strategies as a pipeline to filter suitable queues.
|
|
---
|
|
.../apache/rocketmq/client/ClientConfig.java | 54 +++++
|
|
.../client/common/ThreadLocalIndex.java | 8 +
|
|
.../rocketmq/client/impl/MQClientAPIImpl.java | 12 +-
|
|
.../client/impl/factory/MQClientInstance.java | 7 +
|
|
.../impl/producer/DefaultMQProducerImpl.java | 87 ++++++--
|
|
.../impl/producer/TopicPublishInfo.java | 40 ++++
|
|
.../client/latency/LatencyFaultTolerance.java | 66 +++++-
|
|
.../latency/LatencyFaultToleranceImpl.java | 189 ++++++++++++++----
|
|
.../client/latency/MQFaultStrategy.java | 155 ++++++++++----
|
|
.../rocketmq/client/latency/Resolver.java | 17 +-
|
|
.../client/latency/ServiceDetector.java | 30 +++
|
|
.../LatencyFaultToleranceImplTest.java | 36 +++-
|
|
.../processor/DefaultRequestProcessor.java | 24 ---
|
|
.../rocketmq/proxy/config/ProxyConfig.java | 46 +++++
|
|
.../grpc/v2/producer/SendMessageActivity.java | 2 +-
|
|
.../proxy/processor/ProducerProcessor.java | 18 +-
|
|
.../service/route/LocalTopicRouteService.java | 2 +-
|
|
.../service/route/MessageQueueSelector.java | 95 ++++++++-
|
|
.../proxy/service/route/MessageQueueView.java | 18 +-
|
|
.../service/route/TopicRouteService.java | 80 +++++++-
|
|
.../consumer/ReceiveMessageActivityTest.java | 5 +-
|
|
.../v2/producer/SendMessageActivityTest.java | 82 +++++++-
|
|
.../proxy/service/BaseServiceTest.java | 4 +-
|
|
.../route/MessageQueueSelectorTest.java | 8 +-
|
|
.../sysmessage/HeartbeatSyncerTest.java | 2 +-
|
|
.../ClusterTransactionServiceTest.java | 8 +-
|
|
26 files changed, 919 insertions(+), 176 deletions(-)
|
|
rename remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java => client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java (65%)
|
|
create mode 100644 client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java
|
|
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
|
|
index f87450f66..bb0fe3522 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
|
|
@@ -38,6 +38,8 @@ public class ClientConfig {
|
|
public static final String SOCKS_PROXY_CONFIG = "com.rocketmq.socks.proxy.config";
|
|
public static final String DECODE_READ_BODY = "com.rocketmq.read.body";
|
|
public static final String DECODE_DECOMPRESS_BODY = "com.rocketmq.decompress.body";
|
|
+ public static final String SEND_LATENCY_ENABLE = "com.rocketmq.sendLatencyEnable";
|
|
+ public static final String START_DETECTOR_ENABLE = "com.rocketmq.startDetectorEnable";
|
|
public static final String HEART_BEAT_V2 = "com.rocketmq.heartbeat.v2";
|
|
private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
|
|
private String clientIP = NetworkUtil.getLocalAddress();
|
|
@@ -72,6 +74,8 @@ public class ClientConfig {
|
|
private String socksProxyConfig = System.getProperty(SOCKS_PROXY_CONFIG, "{}");
|
|
|
|
private int mqClientApiTimeout = 3 * 1000;
|
|
+ private int detectTimeout = 200;
|
|
+ private int detectInterval = 2 * 1000;
|
|
|
|
private LanguageCode language = LanguageCode.JAVA;
|
|
|
|
@@ -81,6 +85,15 @@ public class ClientConfig {
|
|
*/
|
|
protected boolean enableStreamRequestType = false;
|
|
|
|
+ /**
|
|
+ * Enable the fault tolerance mechanism of the client sending process.
|
|
+ * DO NOT OPEN when ORDER messages are required.
|
|
+ * Turning on will interfere with the queue selection functionality,
|
|
+ * possibly conflicting with the order message.
|
|
+ */
|
|
+ private boolean sendLatencyEnable = Boolean.parseBoolean(System.getProperty(SEND_LATENCY_ENABLE, "false"));
|
|
+ private boolean startDetectorEnable = Boolean.parseBoolean(System.getProperty(START_DETECTOR_ENABLE, "false"));
|
|
+
|
|
public String buildMQClientId() {
|
|
StringBuilder sb = new StringBuilder();
|
|
sb.append(this.getClientIP());
|
|
@@ -186,6 +199,10 @@ public class ClientConfig {
|
|
this.decodeDecompressBody = cc.decodeDecompressBody;
|
|
this.enableStreamRequestType = cc.enableStreamRequestType;
|
|
this.useHeartbeatV2 = cc.useHeartbeatV2;
|
|
+ this.startDetectorEnable = cc.startDetectorEnable;
|
|
+ this.sendLatencyEnable = cc.sendLatencyEnable;
|
|
+ this.detectInterval = cc.detectInterval;
|
|
+ this.detectTimeout = cc.detectTimeout;
|
|
}
|
|
|
|
public ClientConfig cloneClientConfig() {
|
|
@@ -210,6 +227,10 @@ public class ClientConfig {
|
|
cc.decodeDecompressBody = decodeDecompressBody;
|
|
cc.enableStreamRequestType = enableStreamRequestType;
|
|
cc.useHeartbeatV2 = useHeartbeatV2;
|
|
+ cc.startDetectorEnable = startDetectorEnable;
|
|
+ cc.sendLatencyEnable = sendLatencyEnable;
|
|
+ cc.detectInterval = detectInterval;
|
|
+ cc.detectTimeout = detectTimeout;
|
|
return cc;
|
|
}
|
|
|
|
@@ -381,6 +402,38 @@ public class ClientConfig {
|
|
this.enableStreamRequestType = enableStreamRequestType;
|
|
}
|
|
|
|
+ public boolean isSendLatencyEnable() {
|
|
+ return sendLatencyEnable;
|
|
+ }
|
|
+
|
|
+ public void setSendLatencyEnable(boolean sendLatencyEnable) {
|
|
+ this.sendLatencyEnable = sendLatencyEnable;
|
|
+ }
|
|
+
|
|
+ public boolean isStartDetectorEnable() {
|
|
+ return startDetectorEnable;
|
|
+ }
|
|
+
|
|
+ public void setStartDetectorEnable(boolean startDetectorEnable) {
|
|
+ this.startDetectorEnable = startDetectorEnable;
|
|
+ }
|
|
+
|
|
+ public int getDetectTimeout() {
|
|
+ return this.detectTimeout;
|
|
+ }
|
|
+
|
|
+ public void setDetectTimeout(int detectTimeout) {
|
|
+ this.detectTimeout = detectTimeout;
|
|
+ }
|
|
+
|
|
+ public int getDetectInterval() {
|
|
+ return this.detectInterval;
|
|
+ }
|
|
+
|
|
+ public void setDetectInterval(int detectInterval) {
|
|
+ this.detectInterval = detectInterval;
|
|
+ }
|
|
+
|
|
public boolean isUseHeartbeatV2() {
|
|
return useHeartbeatV2;
|
|
}
|
|
@@ -403,6 +456,7 @@ public class ClientConfig {
|
|
+ ", socksProxyConfig=" + socksProxyConfig + ", language=" + language.name()
|
|
+ ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout
|
|
+ ", decodeReadBody=" + decodeReadBody + ", decodeDecompressBody=" + decodeDecompressBody
|
|
+ + ", sendLatencyEnable=" + sendLatencyEnable + ", startDetectorEnable=" + startDetectorEnable
|
|
+ ", enableStreamRequestType=" + enableStreamRequestType + ", useHeartbeatV2=" + useHeartbeatV2 + "]";
|
|
}
|
|
}
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
|
|
index 4a3d90135..3a086c13d 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
|
|
@@ -33,6 +33,14 @@ public class ThreadLocalIndex {
|
|
return index & POSITIVE_MASK;
|
|
}
|
|
|
|
+ public void reset() {
|
|
+ int index = Math.abs(random.nextInt(Integer.MAX_VALUE));
|
|
+ if (index < 0) {
|
|
+ index = 0;
|
|
+ }
|
|
+ this.threadLocalIndex.set(index);
|
|
+ }
|
|
+
|
|
@Override
|
|
public String toString() {
|
|
return "ThreadLocalIndex{" +
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
|
|
index 213c26fd6..3201a493f 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
|
|
@@ -666,7 +666,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
|
|
} catch (Throwable e) {
|
|
}
|
|
|
|
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
|
|
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true);
|
|
return;
|
|
}
|
|
|
|
@@ -684,14 +684,14 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
|
|
} catch (Throwable e) {
|
|
}
|
|
|
|
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
|
|
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true);
|
|
} catch (Exception e) {
|
|
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
|
|
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true);
|
|
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
|
|
retryTimesWhenSendFailed, times, e, context, false, producer);
|
|
}
|
|
} else {
|
|
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
|
|
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true);
|
|
if (!responseFuture.isSendRequestOK()) {
|
|
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
|
|
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
|
|
@@ -711,7 +711,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
|
|
});
|
|
} catch (Exception ex) {
|
|
long cost = System.currentTimeMillis() - beginStartTime;
|
|
- producer.updateFaultItem(brokerName, cost, true);
|
|
+ producer.updateFaultItem(brokerName, cost, true, false);
|
|
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
|
|
retryTimesWhenSendFailed, times, ex, context, true, producer);
|
|
}
|
|
@@ -735,7 +735,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
|
|
if (needRetry && tmp <= timesTotal) {
|
|
String retryBrokerName = brokerName;//by default, it will send to the same broker
|
|
if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
|
|
- MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
|
|
+ MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName, false);
|
|
retryBrokerName = instance.getBrokerNameFromMessageQueue(mqChosen);
|
|
}
|
|
String addr = instance.findBrokerAddressInPublish(retryBrokerName);
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
|
|
index 8851bc815..9484b26f8 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
|
|
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
+import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.locks.Lock;
|
|
@@ -125,6 +126,12 @@ public class MQClientInstance {
|
|
private final Set<String/* Broker address */> brokerSupportV2HeartbeatSet = new HashSet();
|
|
private final ConcurrentMap<String, Integer> brokerAddrHeartbeatFingerprintTable = new ConcurrentHashMap();
|
|
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread"));
|
|
+ private final ScheduledExecutorService fetchRemoteConfigExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
|
|
+ @Override
|
|
+ public Thread newThread(Runnable r) {
|
|
+ return new Thread(r, "MQClientFactoryFetchRemoteConfigScheduledThread");
|
|
+ }
|
|
+ });
|
|
private final PullMessageService pullMessageService;
|
|
private final RebalanceService rebalanceService;
|
|
private final DefaultMQProducer defaultMQProducer;
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
|
index 3f4c6e5f7..bbbb17b07 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
|
@@ -33,6 +33,8 @@ import java.util.concurrent.RejectedExecutionException;
|
|
import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
+
|
|
+import com.google.common.base.Optional;
|
|
import org.apache.rocketmq.client.QueryResult;
|
|
import org.apache.rocketmq.client.Validators;
|
|
import org.apache.rocketmq.client.common.ClientErrorCode;
|
|
@@ -49,6 +51,8 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
|
|
import org.apache.rocketmq.client.impl.MQClientManager;
|
|
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
|
|
import org.apache.rocketmq.client.latency.MQFaultStrategy;
|
|
+import org.apache.rocketmq.client.latency.Resolver;
|
|
+import org.apache.rocketmq.client.latency.ServiceDetector;
|
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
|
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
|
import org.apache.rocketmq.client.producer.LocalTransactionState;
|
|
@@ -112,7 +116,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
private ServiceState serviceState = ServiceState.CREATE_JUST;
|
|
private MQClientInstance mQClientFactory;
|
|
private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<>();
|
|
- private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
|
|
+ private MQFaultStrategy mqFaultStrategy;
|
|
private ExecutorService asyncSenderExecutor;
|
|
|
|
// compression related
|
|
@@ -153,8 +157,38 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
semaphoreAsyncSendSize = new Semaphore(1024 * 1024, true);
|
|
log.info("semaphoreAsyncSendSize can not be smaller than 1M.");
|
|
}
|
|
- }
|
|
|
|
+ ServiceDetector serviceDetector = new ServiceDetector() {
|
|
+ @Override
|
|
+ public boolean detect(String endpoint, long timeoutMillis) {
|
|
+ Optional<String> candidateTopic = pickTopic();
|
|
+ if (!candidateTopic.isPresent()) {
|
|
+ return false;
|
|
+ }
|
|
+ try {
|
|
+ MessageQueue mq = new MessageQueue(candidateTopic.get(), null, 0);
|
|
+ mQClientFactory.getMQClientAPIImpl()
|
|
+ .getMaxOffset(endpoint, mq, timeoutMillis);
|
|
+ return true;
|
|
+ } catch (Exception e) {
|
|
+ return false;
|
|
+ }
|
|
+ }
|
|
+ };
|
|
+
|
|
+ this.mqFaultStrategy = new MQFaultStrategy(defaultMQProducer.cloneClientConfig(), new Resolver() {
|
|
+ @Override
|
|
+ public String resolve(String name) {
|
|
+ return DefaultMQProducerImpl.this.mQClientFactory.findBrokerAddressInPublish(name);
|
|
+ }
|
|
+ }, serviceDetector);
|
|
+ }
|
|
+ private Optional<String> pickTopic() {
|
|
+ if (topicPublishInfoTable.isEmpty()) {
|
|
+ return Optional.absent();
|
|
+ }
|
|
+ return Optional.of(topicPublishInfoTable.keySet().iterator().next());
|
|
+ }
|
|
public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) {
|
|
this.checkForbiddenHookList.add(checkForbiddenHook);
|
|
log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(),
|
|
@@ -229,6 +263,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
mQClientFactory.start();
|
|
}
|
|
|
|
+ if (this.mqFaultStrategy.isStartDetectorEnable()) {
|
|
+ this.mqFaultStrategy.startDetector();
|
|
+ }
|
|
+
|
|
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
|
|
this.defaultMQProducer.isSendMessageWithVIPChannel());
|
|
this.serviceState = ServiceState.RUNNING;
|
|
@@ -273,6 +311,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
if (shutdownFactory) {
|
|
this.mQClientFactory.shutdown();
|
|
}
|
|
+ if (this.mqFaultStrategy.isStartDetectorEnable()) {
|
|
+ this.mqFaultStrategy.shutdown();
|
|
+ }
|
|
RequestFutureHolder.getInstance().shutdown(this);
|
|
log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
|
|
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
|
|
@@ -574,7 +615,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
}
|
|
|
|
public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg,
|
|
- final long timeout) throws MQClientException, RemotingTooMuchRequestException {
|
|
+ final long timeout) throws MQClientException, RemotingTooMuchRequestException {
|
|
long beginStartTime = System.currentTimeMillis();
|
|
this.makeSureStateOK();
|
|
Validators.checkMessage(msg, this.defaultMQProducer);
|
|
@@ -584,7 +625,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
MessageQueue mq = null;
|
|
try {
|
|
List<MessageQueue> messageQueueList =
|
|
- mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
|
|
+ mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
|
|
Message userMessage = MessageAccessor.cloneMessage(msg);
|
|
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
|
|
userMessage.setTopic(userTopic);
|
|
@@ -609,12 +650,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
|
|
}
|
|
|
|
- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
|
|
- return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
|
|
+ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
|
|
+ return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName, resetIndex);
|
|
}
|
|
|
|
- public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
|
|
- this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
|
|
+ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation,
|
|
+ boolean reachable) {
|
|
+ this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable);
|
|
}
|
|
|
|
private void validateNameServerSetting() throws MQClientException {
|
|
@@ -647,9 +689,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
|
|
int times = 0;
|
|
String[] brokersSent = new String[timesTotal];
|
|
+ boolean resetIndex = false;
|
|
for (; times < timesTotal; times++) {
|
|
String lastBrokerName = null == mq ? null : mq.getBrokerName();
|
|
- MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
|
|
+ if (times > 0) {
|
|
+ resetIndex = true;
|
|
+ }
|
|
+ MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);
|
|
if (mqSelected != null) {
|
|
mq = mqSelected;
|
|
brokersSent[times] = mq.getBrokerName();
|
|
@@ -667,7 +713,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
|
|
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
|
|
endTimestamp = System.currentTimeMillis();
|
|
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
|
|
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
|
|
switch (communicationMode) {
|
|
case ASYNC:
|
|
return null;
|
|
@@ -684,9 +730,22 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
default:
|
|
break;
|
|
}
|
|
- } catch (RemotingException | MQClientException e) {
|
|
+ } catch (MQClientException e) {
|
|
endTimestamp = System.currentTimeMillis();
|
|
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
|
|
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
|
|
+ log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
|
|
+ log.warn(msg.toString());
|
|
+ exception = e;
|
|
+ continue;
|
|
+ } catch (RemotingException e) {
|
|
+ endTimestamp = System.currentTimeMillis();
|
|
+ if (this.mqFaultStrategy.isStartDetectorEnable()) {
|
|
+ // Set this broker unreachable when detecting schedule task is running for RemotingException.
|
|
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false);
|
|
+ } else {
|
|
+ // Otherwise, isolate this broker.
|
|
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, true);
|
|
+ }
|
|
log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
|
|
if (log.isDebugEnabled()) {
|
|
log.debug(msg.toString());
|
|
@@ -695,7 +754,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
continue;
|
|
} catch (MQBrokerException e) {
|
|
endTimestamp = System.currentTimeMillis();
|
|
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
|
|
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false);
|
|
log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
|
|
if (log.isDebugEnabled()) {
|
|
log.debug(msg.toString());
|
|
@@ -712,7 +771,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
}
|
|
} catch (InterruptedException e) {
|
|
endTimestamp = System.currentTimeMillis();
|
|
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
|
|
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
|
|
log.warn("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);
|
|
if (log.isDebugEnabled()) {
|
|
log.debug(msg.toString());
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
|
|
index 275ada7ac..37b1f3252 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
|
|
@@ -18,6 +18,8 @@ package org.apache.rocketmq.client.impl.producer;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
+
|
|
+import com.google.common.base.Preconditions;
|
|
import org.apache.rocketmq.client.common.ThreadLocalIndex;
|
|
import org.apache.rocketmq.common.message.MessageQueue;
|
|
import org.apache.rocketmq.remoting.protocol.route.QueueData;
|
|
@@ -30,6 +32,10 @@ public class TopicPublishInfo {
|
|
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
|
|
private TopicRouteData topicRouteData;
|
|
|
|
+ public interface QueueFilter {
|
|
+ boolean filter(MessageQueue mq);
|
|
+ }
|
|
+
|
|
public boolean isOrderTopic() {
|
|
return orderTopic;
|
|
}
|
|
@@ -66,6 +72,40 @@ public class TopicPublishInfo {
|
|
this.haveTopicRouterInfo = haveTopicRouterInfo;
|
|
}
|
|
|
|
+ public MessageQueue selectOneMessageQueue(QueueFilter ...filter) {
|
|
+ return selectOneMessageQueue(this.messageQueueList, this.sendWhichQueue, filter);
|
|
+ }
|
|
+
|
|
+ private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) {
|
|
+ if (messageQueueList == null || messageQueueList.isEmpty()) {
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ if (filter != null && filter.length != 0) {
|
|
+ for (int i = 0; i < messageQueueList.size(); i++) {
|
|
+ int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
|
|
+ MessageQueue mq = messageQueueList.get(index);
|
|
+ boolean filterResult = true;
|
|
+ for (QueueFilter f: filter) {
|
|
+ Preconditions.checkNotNull(f);
|
|
+ filterResult &= f.filter(mq);
|
|
+ }
|
|
+ if (filterResult) {
|
|
+ return mq;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
|
|
+ return messageQueueList.get(index);
|
|
+ }
|
|
+
|
|
+ public void resetIndex() {
|
|
+ this.sendWhichQueue.reset();
|
|
+ }
|
|
+
|
|
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
|
|
if (lastBrokerName == null) {
|
|
return selectOneMessageQueue();
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
|
|
index 09a8aa461..72d2f3450 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
|
|
@@ -18,11 +18,75 @@
|
|
package org.apache.rocketmq.client.latency;
|
|
|
|
public interface LatencyFaultTolerance<T> {
|
|
- void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
|
|
+ /**
|
|
+ * Update brokers' states, to decide if they are good or not.
|
|
+ *
|
|
+ * @param name Broker's name.
|
|
+ * @param currentLatency Current message sending process's latency.
|
|
+ * @param notAvailableDuration Corresponding not available time, ms. The broker will be not available until it
|
|
+ * spends such time.
|
|
+ * @param reachable To decide if this broker is reachable or not.
|
|
+ */
|
|
+ void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration,
|
|
+ final boolean reachable);
|
|
|
|
+ /**
|
|
+ * To check if this broker is available.
|
|
+ *
|
|
+ * @param name Broker's name.
|
|
+ * @return boolean variable, if this is true, then the broker is available.
|
|
+ */
|
|
boolean isAvailable(final T name);
|
|
|
|
+ /**
|
|
+ * To check if this broker is reachable.
|
|
+ *
|
|
+ * @param name Broker's name.
|
|
+ * @return boolean variable, if this is true, then the broker is reachable.
|
|
+ */
|
|
+ boolean isReachable(final T name);
|
|
+
|
|
+ /**
|
|
+ * Remove the broker in this fault item table.
|
|
+ *
|
|
+ * @param name broker's name.
|
|
+ */
|
|
void remove(final T name);
|
|
|
|
+ /**
|
|
+ * The worst situation, no broker can be available. Then choose random one.
|
|
+ *
|
|
+ * @return A random mq will be returned.
|
|
+ */
|
|
T pickOneAtLeast();
|
|
+
|
|
+ /**
|
|
+ * Start a new thread, to detect the broker's reachable tag.
|
|
+ */
|
|
+ void startDetector();
|
|
+
|
|
+ /**
|
|
+ * Shutdown threads that started by LatencyFaultTolerance.
|
|
+ */
|
|
+ void shutdown();
|
|
+
|
|
+ /**
|
|
+ * A function reserved, just detect by once, won't create a new thread.
|
|
+ */
|
|
+ void detectByOneRound();
|
|
+
|
|
+ /**
|
|
+ * Use it to set the detect timeout bound.
|
|
+ *
|
|
+ * @param detectTimeout timeout bound
|
|
+ */
|
|
+ void setDetectTimeout(final int detectTimeout);
|
|
+
|
|
+ /**
|
|
+ * Use it to set the detector's detector interval for each broker (each broker will be detected once during this
|
|
+ * time)
|
|
+ *
|
|
+ * @param detectInterval each broker's detecting interval
|
|
+ */
|
|
+ void setDetectInterval(final int detectInterval);
|
|
}
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
|
|
index 93795d957..8af629574 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
|
|
@@ -21,30 +21,97 @@ import java.util.Collections;
|
|
import java.util.Enumeration;
|
|
import java.util.LinkedList;
|
|
import java.util.List;
|
|
+import java.util.Map;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
+import java.util.concurrent.Executors;
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
+import java.util.concurrent.ThreadFactory;
|
|
+import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.client.common.ThreadLocalIndex;
|
|
+import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
|
|
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
|
|
- private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<>(16);
|
|
+ private final static Logger log = LoggerFactory.getLogger(MQFaultStrategy.class);
|
|
+ private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
|
|
+ private int detectTimeout = 200;
|
|
+ private int detectInterval = 2000;
|
|
+ private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
|
|
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
|
|
+ @Override
|
|
+ public Thread newThread(Runnable r) {
|
|
+ return new Thread(r, "LatencyFaultToleranceScheduledThread");
|
|
+ }
|
|
+ });
|
|
|
|
- private final ThreadLocalIndex randomItem = new ThreadLocalIndex();
|
|
+ private final Resolver resolver;
|
|
+
|
|
+ private final ServiceDetector serviceDetector;
|
|
+
|
|
+ public LatencyFaultToleranceImpl(Resolver resolver, ServiceDetector serviceDetector) {
|
|
+ this.resolver = resolver;
|
|
+ this.serviceDetector = serviceDetector;
|
|
+ }
|
|
+
|
|
+ public void detectByOneRound() {
|
|
+ for (Map.Entry<String, FaultItem> item : this.faultItemTable.entrySet()) {
|
|
+ FaultItem brokerItem = item.getValue();
|
|
+ if (System.currentTimeMillis() - brokerItem.checkStamp >= 0) {
|
|
+ brokerItem.checkStamp = System.currentTimeMillis() + this.detectInterval;
|
|
+ String brokerAddr = resolver.resolve(brokerItem.getName());
|
|
+ if (brokerAddr == null) {
|
|
+ faultItemTable.remove(item.getKey());
|
|
+ continue;
|
|
+ }
|
|
+ if (null == serviceDetector) {
|
|
+ continue;
|
|
+ }
|
|
+ boolean serviceOK = serviceDetector.detect(brokerAddr, detectTimeout);
|
|
+ if (serviceOK && !brokerItem.reachableFlag) {
|
|
+ log.info(brokerItem.name + " is reachable now, then it can be used.");
|
|
+ brokerItem.reachableFlag = true;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public void startDetector() {
|
|
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
|
|
+ @Override
|
|
+ public void run() {
|
|
+ try {
|
|
+ detectByOneRound();
|
|
+ } catch (Exception e) {
|
|
+ log.warn("Unexpected exception raised while detecting service reachability", e);
|
|
+ }
|
|
+ }
|
|
+ }, 3, 3, TimeUnit.SECONDS);
|
|
+ }
|
|
+
|
|
+ public void shutdown() {
|
|
+ this.scheduledExecutorService.shutdown();
|
|
+ }
|
|
|
|
@Override
|
|
- public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
|
|
+ public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration,
|
|
+ final boolean reachable) {
|
|
FaultItem old = this.faultItemTable.get(name);
|
|
if (null == old) {
|
|
final FaultItem faultItem = new FaultItem(name);
|
|
faultItem.setCurrentLatency(currentLatency);
|
|
- faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
|
|
-
|
|
+ faultItem.updateNotAvailableDuration(notAvailableDuration);
|
|
+ faultItem.setReachable(reachable);
|
|
old = this.faultItemTable.putIfAbsent(name, faultItem);
|
|
- if (old != null) {
|
|
- old.setCurrentLatency(currentLatency);
|
|
- old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
|
|
- }
|
|
- } else {
|
|
+ }
|
|
+
|
|
+ if (null != old) {
|
|
old.setCurrentLatency(currentLatency);
|
|
- old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
|
|
+ old.updateNotAvailableDuration(notAvailableDuration);
|
|
+ old.setReachable(reachable);
|
|
+ }
|
|
+
|
|
+ if (!reachable) {
|
|
+ log.info(name + " is unreachable, it will not be used until it's reachable");
|
|
}
|
|
}
|
|
|
|
@@ -57,6 +124,14 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
|
|
return true;
|
|
}
|
|
|
|
+ public boolean isReachable(final String name) {
|
|
+ final FaultItem faultItem = this.faultItemTable.get(name);
|
|
+ if (faultItem != null) {
|
|
+ return faultItem.isReachable();
|
|
+ }
|
|
+ return true;
|
|
+ }
|
|
+
|
|
@Override
|
|
public void remove(final String name) {
|
|
this.faultItemTable.remove(name);
|
|
@@ -65,68 +140,98 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
|
|
@Override
|
|
public String pickOneAtLeast() {
|
|
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
|
|
- List<FaultItem> tmpList = new LinkedList<>();
|
|
+ List<FaultItem> tmpList = new LinkedList<FaultItem>();
|
|
while (elements.hasMoreElements()) {
|
|
final FaultItem faultItem = elements.nextElement();
|
|
tmpList.add(faultItem);
|
|
}
|
|
+
|
|
if (!tmpList.isEmpty()) {
|
|
- Collections.sort(tmpList);
|
|
- final int half = tmpList.size() / 2;
|
|
- if (half <= 0) {
|
|
- return tmpList.get(0).getName();
|
|
- } else {
|
|
- final int i = this.randomItem.incrementAndGet() % half;
|
|
- return tmpList.get(i).getName();
|
|
+ Collections.shuffle(tmpList);
|
|
+ for (FaultItem faultItem : tmpList) {
|
|
+ if (faultItem.reachableFlag) {
|
|
+ return faultItem.name;
|
|
+ }
|
|
}
|
|
}
|
|
+
|
|
return null;
|
|
}
|
|
|
|
@Override
|
|
public String toString() {
|
|
return "LatencyFaultToleranceImpl{" +
|
|
- "faultItemTable=" + faultItemTable +
|
|
- ", whichItemWorst=" + randomItem +
|
|
- '}';
|
|
+ "faultItemTable=" + faultItemTable +
|
|
+ ", whichItemWorst=" + whichItemWorst +
|
|
+ '}';
|
|
+ }
|
|
+
|
|
+ public void setDetectTimeout(final int detectTimeout) {
|
|
+ this.detectTimeout = detectTimeout;
|
|
}
|
|
|
|
- class FaultItem implements Comparable<FaultItem> {
|
|
+ public void setDetectInterval(final int detectInterval) {
|
|
+ this.detectInterval = detectInterval;
|
|
+ }
|
|
+
|
|
+ public class FaultItem implements Comparable<FaultItem> {
|
|
private final String name;
|
|
private volatile long currentLatency;
|
|
private volatile long startTimestamp;
|
|
+ private volatile long checkStamp;
|
|
+ private volatile boolean reachableFlag;
|
|
|
|
public FaultItem(final String name) {
|
|
this.name = name;
|
|
}
|
|
|
|
+ public void updateNotAvailableDuration(long notAvailableDuration) {
|
|
+ if (notAvailableDuration > 0 && System.currentTimeMillis() + notAvailableDuration > this.startTimestamp) {
|
|
+ this.startTimestamp = System.currentTimeMillis() + notAvailableDuration;
|
|
+ log.info(name + " will be isolated for " + notAvailableDuration + " ms.");
|
|
+ }
|
|
+ }
|
|
+
|
|
@Override
|
|
public int compareTo(final FaultItem other) {
|
|
if (this.isAvailable() != other.isAvailable()) {
|
|
- if (this.isAvailable())
|
|
+ if (this.isAvailable()) {
|
|
return -1;
|
|
+ }
|
|
|
|
- if (other.isAvailable())
|
|
+ if (other.isAvailable()) {
|
|
return 1;
|
|
+ }
|
|
}
|
|
|
|
- if (this.currentLatency < other.currentLatency)
|
|
+ if (this.currentLatency < other.currentLatency) {
|
|
return -1;
|
|
- else if (this.currentLatency > other.currentLatency) {
|
|
+ } else if (this.currentLatency > other.currentLatency) {
|
|
return 1;
|
|
}
|
|
|
|
- if (this.startTimestamp < other.startTimestamp)
|
|
+ if (this.startTimestamp < other.startTimestamp) {
|
|
return -1;
|
|
- else if (this.startTimestamp > other.startTimestamp) {
|
|
+ } else if (this.startTimestamp > other.startTimestamp) {
|
|
return 1;
|
|
}
|
|
-
|
|
return 0;
|
|
}
|
|
|
|
+ public void setReachable(boolean reachableFlag) {
|
|
+ this.reachableFlag = reachableFlag;
|
|
+ }
|
|
+
|
|
+ public void setCheckStamp(long checkStamp) {
|
|
+ this.checkStamp = checkStamp;
|
|
+ }
|
|
+
|
|
public boolean isAvailable() {
|
|
- return (System.currentTimeMillis() - startTimestamp) >= 0;
|
|
+ return reachableFlag && System.currentTimeMillis() >= startTimestamp;
|
|
+ }
|
|
+
|
|
+ public boolean isReachable() {
|
|
+ return reachableFlag;
|
|
}
|
|
|
|
@Override
|
|
@@ -139,28 +244,32 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
|
|
|
|
@Override
|
|
public boolean equals(final Object o) {
|
|
- if (this == o)
|
|
+ if (this == o) {
|
|
return true;
|
|
- if (!(o instanceof FaultItem))
|
|
+ }
|
|
+ if (!(o instanceof FaultItem)) {
|
|
return false;
|
|
+ }
|
|
|
|
final FaultItem faultItem = (FaultItem) o;
|
|
|
|
- if (getCurrentLatency() != faultItem.getCurrentLatency())
|
|
+ if (getCurrentLatency() != faultItem.getCurrentLatency()) {
|
|
return false;
|
|
- if (getStartTimestamp() != faultItem.getStartTimestamp())
|
|
+ }
|
|
+ if (getStartTimestamp() != faultItem.getStartTimestamp()) {
|
|
return false;
|
|
+ }
|
|
return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
|
|
-
|
|
}
|
|
|
|
@Override
|
|
public String toString() {
|
|
return "FaultItem{" +
|
|
- "name='" + name + '\'' +
|
|
- ", currentLatency=" + currentLatency +
|
|
- ", startTimestamp=" + startTimestamp +
|
|
- '}';
|
|
+ "name='" + name + '\'' +
|
|
+ ", currentLatency=" + currentLatency +
|
|
+ ", startTimestamp=" + startTimestamp +
|
|
+ ", reachableFlag=" + reachableFlag +
|
|
+ '}';
|
|
}
|
|
|
|
public String getName() {
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
|
|
index 1e1953fad..c01490784 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
|
|
@@ -17,25 +17,86 @@
|
|
|
|
package org.apache.rocketmq.client.latency;
|
|
|
|
-import org.apache.commons.lang3.StringUtils;
|
|
+import org.apache.rocketmq.client.ClientConfig;
|
|
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
|
|
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo.QueueFilter;
|
|
import org.apache.rocketmq.common.message.MessageQueue;
|
|
-import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
|
|
public class MQFaultStrategy {
|
|
- private final static Logger log = LoggerFactory.getLogger(MQFaultStrategy.class);
|
|
- private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
|
|
+ private LatencyFaultTolerance<String> latencyFaultTolerance;
|
|
+ private boolean sendLatencyFaultEnable;
|
|
+ private boolean startDetectorEnable;
|
|
+ private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L};
|
|
+ private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 10000L, 30000L};
|
|
|
|
- private boolean sendLatencyFaultEnable = false;
|
|
+ public static class BrokerFilter implements QueueFilter {
|
|
+ private String lastBrokerName;
|
|
+
|
|
+ public void setLastBrokerName(String lastBrokerName) {
|
|
+ this.lastBrokerName = lastBrokerName;
|
|
+ }
|
|
+
|
|
+ @Override public boolean filter(MessageQueue mq) {
|
|
+ if (lastBrokerName != null) {
|
|
+ return !mq.getBrokerName().equals(lastBrokerName);
|
|
+ }
|
|
+ return true;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private ThreadLocal<BrokerFilter> threadBrokerFilter = new ThreadLocal<BrokerFilter>() {
|
|
+ @Override protected BrokerFilter initialValue() {
|
|
+ return new BrokerFilter();
|
|
+ }
|
|
+ };
|
|
+
|
|
+ private QueueFilter reachableFilter = new QueueFilter() {
|
|
+ @Override public boolean filter(MessageQueue mq) {
|
|
+ return latencyFaultTolerance.isReachable(mq.getBrokerName());
|
|
+ }
|
|
+ };
|
|
+
|
|
+ private QueueFilter availableFilter = new QueueFilter() {
|
|
+ @Override public boolean filter(MessageQueue mq) {
|
|
+ return latencyFaultTolerance.isAvailable(mq.getBrokerName());
|
|
+ }
|
|
+ };
|
|
+
|
|
+
|
|
+ public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector serviceDetector) {
|
|
+ this.setStartDetectorEnable(cc.isStartDetectorEnable());
|
|
+ this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
|
|
+ this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher, serviceDetector);
|
|
+ this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval());
|
|
+ this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout());
|
|
+ }
|
|
+
|
|
+ // For unit test.
|
|
+ public MQFaultStrategy(ClientConfig cc, LatencyFaultTolerance<String> tolerance) {
|
|
+ this.setStartDetectorEnable(cc.isStartDetectorEnable());
|
|
+ this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
|
|
+ this.latencyFaultTolerance = tolerance;
|
|
+ this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval());
|
|
+ this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout());
|
|
+ }
|
|
|
|
- private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
|
|
- private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
|
|
|
|
public long[] getNotAvailableDuration() {
|
|
return notAvailableDuration;
|
|
}
|
|
|
|
+ public QueueFilter getAvailableFilter() {
|
|
+ return availableFilter;
|
|
+ }
|
|
+
|
|
+ public QueueFilter getReachableFilter() {
|
|
+ return reachableFilter;
|
|
+ }
|
|
+
|
|
+ public ThreadLocal<BrokerFilter> getThreadBrokerFilter() {
|
|
+ return threadBrokerFilter;
|
|
+ }
|
|
+
|
|
public void setNotAvailableDuration(final long[] notAvailableDuration) {
|
|
this.notAvailableDuration = notAvailableDuration;
|
|
}
|
|
@@ -56,51 +117,69 @@ public class MQFaultStrategy {
|
|
this.sendLatencyFaultEnable = sendLatencyFaultEnable;
|
|
}
|
|
|
|
- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
|
|
+ public boolean isStartDetectorEnable() {
|
|
+ return startDetectorEnable;
|
|
+ }
|
|
+
|
|
+ public void setStartDetectorEnable(boolean startDetectorEnable) {
|
|
+ this.startDetectorEnable = startDetectorEnable;
|
|
+ }
|
|
+
|
|
+ public void startDetector() {
|
|
+ // user should start the detector
|
|
+ // and the thread should not be in running state.
|
|
+ if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
|
|
+ // start the detector.
|
|
+ this.latencyFaultTolerance.startDetector();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public void shutdown() {
|
|
+ if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
|
|
+ this.latencyFaultTolerance.shutdown();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
|
|
+ BrokerFilter brokerFilter = threadBrokerFilter.get();
|
|
+ brokerFilter.setLastBrokerName(lastBrokerName);
|
|
if (this.sendLatencyFaultEnable) {
|
|
- try {
|
|
- int index = tpInfo.getSendWhichQueue().incrementAndGet();
|
|
- for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
|
|
- int pos = index++ % tpInfo.getMessageQueueList().size();
|
|
- MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
|
|
- if (!StringUtils.equals(lastBrokerName, mq.getBrokerName()) && latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
|
|
- return mq;
|
|
- }
|
|
- }
|
|
-
|
|
- final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
|
|
- int writeQueueNums = tpInfo.getWriteQueueNumsByBroker(notBestBroker);
|
|
- if (writeQueueNums > 0) {
|
|
- final MessageQueue mq = tpInfo.selectOneMessageQueue();
|
|
- if (notBestBroker != null) {
|
|
- mq.setBrokerName(notBestBroker);
|
|
- mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
|
|
- }
|
|
- return mq;
|
|
- } else {
|
|
- latencyFaultTolerance.remove(notBestBroker);
|
|
- }
|
|
- } catch (Exception e) {
|
|
- log.error("Error occurred when selecting message queue", e);
|
|
+ if (resetIndex) {
|
|
+ tpInfo.resetIndex();
|
|
+ }
|
|
+ MessageQueue mq = tpInfo.selectOneMessageQueue(availableFilter, brokerFilter);
|
|
+ if (mq != null) {
|
|
+ return mq;
|
|
+ }
|
|
+
|
|
+ mq = tpInfo.selectOneMessageQueue(reachableFilter, brokerFilter);
|
|
+ if (mq != null) {
|
|
+ return mq;
|
|
}
|
|
|
|
return tpInfo.selectOneMessageQueue();
|
|
}
|
|
|
|
- return tpInfo.selectOneMessageQueue(lastBrokerName);
|
|
+ MessageQueue mq = tpInfo.selectOneMessageQueue(brokerFilter);
|
|
+ if (mq != null) {
|
|
+ return mq;
|
|
+ }
|
|
+ return tpInfo.selectOneMessageQueue();
|
|
}
|
|
|
|
- public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
|
|
+ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation,
|
|
+ final boolean reachable) {
|
|
if (this.sendLatencyFaultEnable) {
|
|
- long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
|
|
- this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
|
|
+ long duration = computeNotAvailableDuration(isolation ? 10000 : currentLatency);
|
|
+ this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration, reachable);
|
|
}
|
|
}
|
|
|
|
private long computeNotAvailableDuration(final long currentLatency) {
|
|
for (int i = latencyMax.length - 1; i >= 0; i--) {
|
|
- if (currentLatency >= latencyMax[i])
|
|
+ if (currentLatency >= latencyMax[i]) {
|
|
return this.notAvailableDuration[i];
|
|
+ }
|
|
}
|
|
|
|
return 0;
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java b/client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java
|
|
similarity index 65%
|
|
rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java
|
|
rename to client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java
|
|
index 6aa547047..1c29ba334 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/GetRemoteClientConfigBody.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/Resolver.java
|
|
@@ -14,20 +14,9 @@
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
-package org.apache.rocketmq.remoting.protocol.body;
|
|
+package org.apache.rocketmq.client.latency;
|
|
|
|
-import java.util.ArrayList;
|
|
-import java.util.List;
|
|
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
|
|
+public interface Resolver {
|
|
|
|
-public class GetRemoteClientConfigBody extends RemotingSerializable {
|
|
- private List<String> keys = new ArrayList<>();
|
|
-
|
|
- public List<String> getKeys() {
|
|
- return keys;
|
|
- }
|
|
-
|
|
- public void setKeys(List<String> keys) {
|
|
- this.keys = keys;
|
|
- }
|
|
+ String resolve(String name);
|
|
}
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java b/client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java
|
|
new file mode 100644
|
|
index 000000000..c6ffbad1c
|
|
--- /dev/null
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/ServiceDetector.java
|
|
@@ -0,0 +1,30 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+package org.apache.rocketmq.client.latency;
|
|
+
|
|
+/**
|
|
+ * Detect whether the remote service state is normal.
|
|
+ */
|
|
+public interface ServiceDetector {
|
|
+
|
|
+ /**
|
|
+ * Check if the remote service is normal.
|
|
+ * @param endpoint Service endpoint to check against
|
|
+ * @return true if the service is back to normal; false otherwise.
|
|
+ */
|
|
+ boolean detect(String endpoint, long timeoutMillis);
|
|
+}
|
|
diff --git a/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java b/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java
|
|
index 86690e40b..42ccdae5a 100644
|
|
--- a/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java
|
|
+++ b/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java
|
|
@@ -16,11 +16,14 @@
|
|
*/
|
|
package org.apache.rocketmq.client.latency;
|
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
+import org.awaitility.core.ThrowingRunnable;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
+
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
+import static org.awaitility.Awaitility.await;
|
|
|
|
public class LatencyFaultToleranceImplTest {
|
|
private LatencyFaultTolerance<String> latencyFaultTolerance;
|
|
@@ -29,28 +32,31 @@ public class LatencyFaultToleranceImplTest {
|
|
|
|
@Before
|
|
public void init() {
|
|
- latencyFaultTolerance = new LatencyFaultToleranceImpl();
|
|
+ latencyFaultTolerance = new LatencyFaultToleranceImpl(null, null);
|
|
}
|
|
|
|
@Test
|
|
public void testUpdateFaultItem() throws Exception {
|
|
- latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000);
|
|
+ latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000, true);
|
|
assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse();
|
|
assertThat(latencyFaultTolerance.isAvailable(anotherBrokerName)).isTrue();
|
|
}
|
|
|
|
@Test
|
|
public void testIsAvailable() throws Exception {
|
|
- latencyFaultTolerance.updateFaultItem(brokerName, 3000, 50);
|
|
+ latencyFaultTolerance.updateFaultItem(brokerName, 3000, 50, true);
|
|
assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse();
|
|
|
|
- TimeUnit.MILLISECONDS.sleep(70);
|
|
- assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue();
|
|
+ await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(new ThrowingRunnable() {
|
|
+ @Override public void run() throws Throwable {
|
|
+ assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue();
|
|
+ }
|
|
+ });
|
|
}
|
|
|
|
@Test
|
|
public void testRemove() throws Exception {
|
|
- latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000);
|
|
+ latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000, true);
|
|
assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse();
|
|
latencyFaultTolerance.remove(brokerName);
|
|
assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue();
|
|
@@ -58,10 +64,20 @@ public class LatencyFaultToleranceImplTest {
|
|
|
|
@Test
|
|
public void testPickOneAtLeast() throws Exception {
|
|
- latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000);
|
|
+ latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000, true);
|
|
assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName);
|
|
|
|
- latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000);
|
|
- assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName);
|
|
+ // Bad case, since pickOneAtLeast's behavior becomes random
|
|
+ // latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000, "127.0.0.1:12011", true);
|
|
+ // assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName);
|
|
+ }
|
|
+
|
|
+ @Test
|
|
+ public void testIsReachable() throws Exception {
|
|
+ latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000, true);
|
|
+ assertThat(latencyFaultTolerance.isReachable(brokerName)).isEqualTo(true);
|
|
+
|
|
+ latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000, false);
|
|
+ assertThat(latencyFaultTolerance.isReachable(anotherBrokerName)).isEqualTo(false);
|
|
}
|
|
}
|
|
\ No newline at end of file
|
|
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
|
|
index fada0efd7..485b95c42 100644
|
|
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
|
|
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
|
|
@@ -41,7 +41,6 @@ import org.apache.rocketmq.remoting.protocol.RequestCode;
|
|
import org.apache.rocketmq.remoting.protocol.ResponseCode;
|
|
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
|
|
import org.apache.rocketmq.remoting.protocol.body.GetBrokerMemberGroupResponseBody;
|
|
-import org.apache.rocketmq.remoting.protocol.body.GetRemoteClientConfigBody;
|
|
import org.apache.rocketmq.remoting.protocol.body.RegisterBrokerBody;
|
|
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
|
|
import org.apache.rocketmq.remoting.protocol.body.TopicList;
|
|
@@ -132,8 +131,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
|
|
return this.updateConfig(ctx, request);
|
|
case RequestCode.GET_NAMESRV_CONFIG:
|
|
return this.getConfig(ctx, request);
|
|
- case RequestCode.GET_CLIENT_CONFIG:
|
|
- return this.getClientConfigs(ctx, request);
|
|
default:
|
|
String error = " request type " + request.getCode() + " not supported";
|
|
return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
|
|
@@ -661,25 +658,4 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
|
|
return response;
|
|
}
|
|
|
|
- private RemotingCommand getClientConfigs(ChannelHandlerContext ctx, RemotingCommand request) {
|
|
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
|
|
- final GetRemoteClientConfigBody body = GetRemoteClientConfigBody.decode(request.getBody(), GetRemoteClientConfigBody.class);
|
|
-
|
|
- String content = this.namesrvController.getConfiguration().getClientConfigsFormatString(body.getKeys());
|
|
- if (StringUtils.isNotBlank(content)) {
|
|
- try {
|
|
- response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
|
|
- } catch (UnsupportedEncodingException e) {
|
|
- log.error("getConfig error, ", e);
|
|
- response.setCode(ResponseCode.SYSTEM_ERROR);
|
|
- response.setRemark("UnsupportedEncodingException " + e);
|
|
- return response;
|
|
- }
|
|
- }
|
|
-
|
|
- response.setCode(ResponseCode.SUCCESS);
|
|
- response.setRemark(null);
|
|
- return response;
|
|
- }
|
|
-
|
|
}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
|
|
index 2994893d7..b2478fec3 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
|
|
@@ -232,6 +232,12 @@ public class ProxyConfig implements ConfigFile {
|
|
private String remotingAccessAddr = "";
|
|
private int remotingListenPort = 8080;
|
|
|
|
+ // related to proxy's send strategy in cluster mode.
|
|
+ private boolean sendLatencyEnable = false;
|
|
+ private boolean startDetectorEnable = false;
|
|
+ private int detectTimeout = 200;
|
|
+ private int detectInterval = 2 * 1000;
|
|
+
|
|
private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER;
|
|
private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER;
|
|
private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
|
|
@@ -1409,6 +1415,46 @@ public class ProxyConfig implements ConfigFile {
|
|
this.remotingWaitTimeMillsInDefaultQueue = remotingWaitTimeMillsInDefaultQueue;
|
|
}
|
|
|
|
+ public boolean isSendLatencyEnable() {
|
|
+ return sendLatencyEnable;
|
|
+ }
|
|
+
|
|
+ public boolean isStartDetectorEnable() {
|
|
+ return startDetectorEnable;
|
|
+ }
|
|
+
|
|
+ public void setStartDetectorEnable(boolean startDetectorEnable) {
|
|
+ this.startDetectorEnable = startDetectorEnable;
|
|
+ }
|
|
+
|
|
+ public void setSendLatencyEnable(boolean sendLatencyEnable) {
|
|
+ this.sendLatencyEnable = sendLatencyEnable;
|
|
+ }
|
|
+
|
|
+ public boolean getStartDetectorEnable() {
|
|
+ return this.startDetectorEnable;
|
|
+ }
|
|
+
|
|
+ public boolean getSendLatencyEnable() {
|
|
+ return this.sendLatencyEnable;
|
|
+ }
|
|
+
|
|
+ public int getDetectTimeout() {
|
|
+ return detectTimeout;
|
|
+ }
|
|
+
|
|
+ public void setDetectTimeout(int detectTimeout) {
|
|
+ this.detectTimeout = detectTimeout;
|
|
+ }
|
|
+
|
|
+ public int getDetectInterval() {
|
|
+ return detectInterval;
|
|
+ }
|
|
+
|
|
+ public void setDetectInterval(int detectInterval) {
|
|
+ this.detectInterval = detectInterval;
|
|
+ }
|
|
+
|
|
public boolean isEnableBatchAck() {
|
|
return enableBatchAck;
|
|
}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
|
|
index 6146c80cd..f670df205 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
|
|
@@ -382,7 +382,7 @@ public class SendMessageActivity extends AbstractMessingActivity {
|
|
int bucket = Hashing.consistentHash(shardingKey.hashCode(), writeQueues.size());
|
|
targetMessageQueue = writeQueues.get(bucket);
|
|
} else {
|
|
- targetMessageQueue = messageQueueView.getWriteSelector().selectOne(false);
|
|
+ targetMessageQueue = messageQueueView.getWriteSelector().selectOneByPipeline(false);
|
|
}
|
|
return targetMessageQueue;
|
|
} catch (Exception e) {
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
|
|
index 0d0c62168..a80f6df0b 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
|
|
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.processor;
|
|
import java.util.List;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ExecutorService;
|
|
+
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.rocketmq.client.producer.SendResult;
|
|
import org.apache.rocketmq.client.producer.SendStatus;
|
|
@@ -66,6 +67,8 @@ public class ProducerProcessor extends AbstractProcessor {
|
|
public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSelector queueSelector,
|
|
String producerGroup, int sysFlag, List<Message> messageList, long timeoutMillis) {
|
|
CompletableFuture<List<SendResult>> future = new CompletableFuture<>();
|
|
+ long beginTimestampFirst = System.currentTimeMillis();
|
|
+ AddressableMessageQueue messageQueue = null;
|
|
try {
|
|
Message message = messageList.get(0);
|
|
String topic = message.getTopic();
|
|
@@ -79,7 +82,7 @@ public class ProducerProcessor extends AbstractProcessor {
|
|
}
|
|
}
|
|
}
|
|
- AddressableMessageQueue messageQueue = queueSelector.select(ctx,
|
|
+ messageQueue = queueSelector.select(ctx,
|
|
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx, topic));
|
|
if (messageQueue == null) {
|
|
throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue");
|
|
@@ -90,6 +93,7 @@ public class ProducerProcessor extends AbstractProcessor {
|
|
}
|
|
SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId());
|
|
|
|
+ AddressableMessageQueue finalMessageQueue = messageQueue;
|
|
future = this.serviceManager.getMessageService().sendMessage(
|
|
ctx,
|
|
messageQueue,
|
|
@@ -102,11 +106,19 @@ public class ProducerProcessor extends AbstractProcessor {
|
|
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus()) &&
|
|
tranType == MessageSysFlag.TRANSACTION_PREPARED_TYPE &&
|
|
StringUtils.isNotBlank(sendResult.getTransactionId())) {
|
|
- fillTransactionData(ctx, producerGroup, messageQueue, sendResult, messageList);
|
|
+ fillTransactionData(ctx, producerGroup, finalMessageQueue, sendResult, messageList);
|
|
}
|
|
}
|
|
return sendResultList;
|
|
- }, this.executor);
|
|
+ }, this.executor)
|
|
+ .whenComplete((result, exception) -> {
|
|
+ long endTimestamp = System.currentTimeMillis();
|
|
+ if (exception != null) {
|
|
+ this.serviceManager.getTopicRouteService().updateFaultItem(finalMessageQueue.getBrokerName(), endTimestamp - beginTimestampFirst, true, false);
|
|
+ } else {
|
|
+ this.serviceManager.getTopicRouteService().updateFaultItem(finalMessageQueue.getBrokerName(),endTimestamp - beginTimestampFirst, false, true);
|
|
+ }
|
|
+ });
|
|
} catch (Throwable t) {
|
|
future.completeExceptionally(t);
|
|
}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
|
|
index d67b68f38..aced15cee 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
|
|
@@ -54,7 +54,7 @@ public class LocalTopicRouteService extends TopicRouteService {
|
|
@Override
|
|
public MessageQueueView getCurrentMessageQueueView(ProxyContext ctx, String topic) throws Exception {
|
|
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
|
|
- return new MessageQueueView(topic, toTopicRouteData(topicConfig));
|
|
+ return new MessageQueueView(topic, toTopicRouteData(topicConfig), null);
|
|
}
|
|
|
|
@Override
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
|
|
index 85cd18d45..f25fb907e 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
|
|
@@ -17,6 +17,7 @@
|
|
package org.apache.rocketmq.proxy.service.route;
|
|
|
|
import com.google.common.base.MoreObjects;
|
|
+import com.google.common.base.Preconditions;
|
|
import com.google.common.math.IntMath;
|
|
import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
@@ -30,6 +31,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.stream.Collectors;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
|
|
+import org.apache.rocketmq.client.latency.MQFaultStrategy;
|
|
import org.apache.rocketmq.common.constant.PermName;
|
|
import org.apache.rocketmq.common.message.MessageQueue;
|
|
import org.apache.rocketmq.remoting.protocol.route.QueueData;
|
|
@@ -44,8 +47,9 @@ public class MessageQueueSelector {
|
|
private final Map<String, AddressableMessageQueue> brokerNameQueueMap = new ConcurrentHashMap<>();
|
|
private final AtomicInteger queueIndex;
|
|
private final AtomicInteger brokerIndex;
|
|
+ private MQFaultStrategy mqFaultStrategy;
|
|
|
|
- public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read) {
|
|
+ public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, MQFaultStrategy mqFaultStrategy, boolean read) {
|
|
if (read) {
|
|
this.queues.addAll(buildRead(topicRouteWrapper));
|
|
} else {
|
|
@@ -55,6 +59,7 @@ public class MessageQueueSelector {
|
|
Random random = new Random();
|
|
this.queueIndex = new AtomicInteger(random.nextInt());
|
|
this.brokerIndex = new AtomicInteger(random.nextInt());
|
|
+ this.mqFaultStrategy = mqFaultStrategy;
|
|
}
|
|
|
|
private static List<AddressableMessageQueue> buildRead(TopicRouteWrapper topicRoute) {
|
|
@@ -154,6 +159,86 @@ public class MessageQueueSelector {
|
|
return selectOneByIndex(nextIndex, onlyBroker);
|
|
}
|
|
|
|
+ public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) {
|
|
+ if (mqFaultStrategy != null && mqFaultStrategy.isSendLatencyFaultEnable()) {
|
|
+ List<MessageQueue> messageQueueList = null;
|
|
+ MessageQueue messageQueue = null;
|
|
+ if (onlyBroker) {
|
|
+ messageQueueList = transferAddressableQueues(brokerActingQueues);
|
|
+ } else {
|
|
+ messageQueueList = transferAddressableQueues(queues);
|
|
+ }
|
|
+ AddressableMessageQueue addressableMessageQueue = null;
|
|
+
|
|
+ // use both available filter.
|
|
+ messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex,
|
|
+ mqFaultStrategy.getAvailableFilter(), mqFaultStrategy.getReachableFilter());
|
|
+ addressableMessageQueue = transferQueue2Addressable(messageQueue);
|
|
+ if (addressableMessageQueue != null) {
|
|
+ return addressableMessageQueue;
|
|
+ }
|
|
+
|
|
+ // use available filter.
|
|
+ messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex,
|
|
+ mqFaultStrategy.getAvailableFilter());
|
|
+ addressableMessageQueue = transferQueue2Addressable(messageQueue);
|
|
+ if (addressableMessageQueue != null) {
|
|
+ return addressableMessageQueue;
|
|
+ }
|
|
+
|
|
+ // no available filter, then use reachable filter.
|
|
+ messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex,
|
|
+ mqFaultStrategy.getReachableFilter());
|
|
+ addressableMessageQueue = transferQueue2Addressable(messageQueue);
|
|
+ if (addressableMessageQueue != null) {
|
|
+ return addressableMessageQueue;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ // SendLatency is not enabled, or no queue is selected, then select by index.
|
|
+ return selectOne(onlyBroker);
|
|
+ }
|
|
+
|
|
+ private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, AtomicInteger sendQueue, TopicPublishInfo.QueueFilter...filter) {
|
|
+ if (messageQueueList == null || messageQueueList.isEmpty()) {
|
|
+ return null;
|
|
+ }
|
|
+ if (filter != null && filter.length != 0) {
|
|
+ for (int i = 0; i < messageQueueList.size(); i++) {
|
|
+ int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
|
|
+ MessageQueue mq = messageQueueList.get(index);
|
|
+ boolean filterResult = true;
|
|
+ for (TopicPublishInfo.QueueFilter f: filter) {
|
|
+ Preconditions.checkNotNull(f);
|
|
+ filterResult &= f.filter(mq);
|
|
+ }
|
|
+ if (filterResult) {
|
|
+ return mq;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ public List<MessageQueue> transferAddressableQueues(List<AddressableMessageQueue> addressableMessageQueueList) {
|
|
+ if (addressableMessageQueueList == null) {
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ return addressableMessageQueueList.stream()
|
|
+ .map(AddressableMessageQueue::getMessageQueue)
|
|
+ .collect(Collectors.toList());
|
|
+ }
|
|
+
|
|
+ private AddressableMessageQueue transferQueue2Addressable(MessageQueue messageQueue) {
|
|
+ for (AddressableMessageQueue amq: queues) {
|
|
+ if (amq.getMessageQueue().equals(messageQueue)) {
|
|
+ return amq;
|
|
+ }
|
|
+ }
|
|
+ return null;
|
|
+ }
|
|
+
|
|
public AddressableMessageQueue selectNextOne(AddressableMessageQueue last) {
|
|
boolean onlyBroker = last.getQueueId() < 0;
|
|
AddressableMessageQueue newOne = last;
|
|
@@ -190,6 +275,14 @@ public class MessageQueueSelector {
|
|
return brokerActingQueues;
|
|
}
|
|
|
|
+ public MQFaultStrategy getMQFaultStrategy() {
|
|
+ return mqFaultStrategy;
|
|
+ }
|
|
+
|
|
+ public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
|
|
+ this.mqFaultStrategy = mqFaultStrategy;
|
|
+ }
|
|
+
|
|
@Override
|
|
public boolean equals(Object o) {
|
|
if (this == o) {
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
|
|
index fe5387cfd..8b3c2f7c8 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
|
|
@@ -17,20 +17,22 @@
|
|
package org.apache.rocketmq.proxy.service.route;
|
|
|
|
import com.google.common.base.MoreObjects;
|
|
+import org.apache.rocketmq.client.latency.MQFaultStrategy;
|
|
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
|
|
|
|
public class MessageQueueView {
|
|
- public static final MessageQueueView WRAPPED_EMPTY_QUEUE = new MessageQueueView("", new TopicRouteData());
|
|
+ public static final MessageQueueView WRAPPED_EMPTY_QUEUE = new MessageQueueView("", new TopicRouteData(), null);
|
|
|
|
private final MessageQueueSelector readSelector;
|
|
private final MessageQueueSelector writeSelector;
|
|
private final TopicRouteWrapper topicRouteWrapper;
|
|
+ private MQFaultStrategy mqFaultStrategy;
|
|
|
|
- public MessageQueueView(String topic, TopicRouteData topicRouteData) {
|
|
+ public MessageQueueView(String topic, TopicRouteData topicRouteData, MQFaultStrategy mqFaultStrategy) {
|
|
this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic);
|
|
|
|
- this.readSelector = new MessageQueueSelector(topicRouteWrapper, true);
|
|
- this.writeSelector = new MessageQueueSelector(topicRouteWrapper, false);
|
|
+ this.readSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, true);
|
|
+ this.writeSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, false);
|
|
}
|
|
|
|
public TopicRouteData getTopicRouteData() {
|
|
@@ -65,4 +67,12 @@ public class MessageQueueView {
|
|
.add("topicRouteWrapper", topicRouteWrapper)
|
|
.toString();
|
|
}
|
|
+
|
|
+ public MQFaultStrategy getMQFaultStrategy() {
|
|
+ return mqFaultStrategy;
|
|
+ }
|
|
+
|
|
+ public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
|
|
+ this.mqFaultStrategy = mqFaultStrategy;
|
|
+ }
|
|
}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
|
|
index 84348adc3..74769a423 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
|
|
@@ -25,7 +25,13 @@ import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
+
|
|
+import com.google.common.base.Optional;
|
|
+import org.apache.rocketmq.client.ClientConfig;
|
|
import org.apache.rocketmq.client.exception.MQClientException;
|
|
+import org.apache.rocketmq.client.latency.MQFaultStrategy;
|
|
+import org.apache.rocketmq.client.latency.Resolver;
|
|
+import org.apache.rocketmq.client.latency.ServiceDetector;
|
|
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
@@ -39,6 +45,7 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
|
|
import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
import org.apache.rocketmq.proxy.config.ProxyConfig;
|
|
import org.apache.rocketmq.remoting.protocol.ResponseCode;
|
|
+import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
|
|
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
|
|
import org.checkerframework.checker.nullness.qual.NonNull;
|
|
import org.checkerframework.checker.nullness.qual.Nullable;
|
|
@@ -47,6 +54,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
|
|
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
|
|
|
|
private final MQClientAPIFactory mqClientAPIFactory;
|
|
+ private MQFaultStrategy mqFaultStrategy;
|
|
|
|
protected final LoadingCache<String /* topicName */, MessageQueueView> topicCache;
|
|
protected final ScheduledExecutorService scheduledExecutorService;
|
|
@@ -97,15 +105,83 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
|
|
}
|
|
}
|
|
});
|
|
-
|
|
+ ServiceDetector serviceDetector = new ServiceDetector() {
|
|
+ @Override
|
|
+ public boolean detect(String endpoint, long timeoutMillis) {
|
|
+ Optional<String> candidateTopic = pickTopic();
|
|
+ if (!candidateTopic.isPresent()) {
|
|
+ return false;
|
|
+ }
|
|
+ try {
|
|
+ GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
|
|
+ requestHeader.setTopic(candidateTopic.get());
|
|
+ requestHeader.setQueueId(0);
|
|
+ Long maxOffset = mqClientAPIFactory.getClient().getMaxOffset(endpoint, requestHeader, timeoutMillis).get();
|
|
+ return true;
|
|
+ } catch (Exception e) {
|
|
+ return false;
|
|
+ }
|
|
+ }
|
|
+ };
|
|
+ mqFaultStrategy = new MQFaultStrategy(extractClientConfigFromProxyConfig(config), new Resolver() {
|
|
+ @Override
|
|
+ public String resolve(String name) {
|
|
+ try {
|
|
+ String brokerAddr = getBrokerAddr(null, name);
|
|
+ return brokerAddr;
|
|
+ } catch (Exception e) {
|
|
+ return null;
|
|
+ }
|
|
+ }
|
|
+ }, serviceDetector);
|
|
this.init();
|
|
}
|
|
|
|
+ // pickup one topic in the topic cache
|
|
+ private Optional<String> pickTopic() {
|
|
+ if (topicCache.asMap().isEmpty()) {
|
|
+ return Optional.absent();
|
|
+ }
|
|
+ return Optional.of(topicCache.asMap().keySet().iterator().next());
|
|
+ }
|
|
+
|
|
protected void init() {
|
|
this.appendShutdown(this.scheduledExecutorService::shutdown);
|
|
this.appendStartAndShutdown(this.mqClientAPIFactory);
|
|
}
|
|
|
|
+ @Override
|
|
+ public void shutdown() throws Exception {
|
|
+ if (this.mqFaultStrategy.isStartDetectorEnable()) {
|
|
+ mqFaultStrategy.shutdown();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void start() throws Exception {
|
|
+ if (this.mqFaultStrategy.isStartDetectorEnable()) {
|
|
+ this.mqFaultStrategy.startDetector();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig proxyConfig) {
|
|
+ ClientConfig tempClientConfig = new ClientConfig();
|
|
+ tempClientConfig.setSendLatencyEnable(proxyConfig.getSendLatencyEnable());
|
|
+ tempClientConfig.setStartDetectorEnable(proxyConfig.getStartDetectorEnable());
|
|
+ tempClientConfig.setDetectTimeout(proxyConfig.getDetectTimeout());
|
|
+ tempClientConfig.setDetectInterval(proxyConfig.getDetectInterval());
|
|
+ return tempClientConfig;
|
|
+ }
|
|
+
|
|
+ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation,
|
|
+ boolean reachable) {
|
|
+ this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable);
|
|
+ }
|
|
+
|
|
+ public MQFaultStrategy getMqFaultStrategy() {
|
|
+ return this.mqFaultStrategy;
|
|
+ }
|
|
+
|
|
public MessageQueueView getAllMessageQueueView(ProxyContext ctx, String topicName) throws Exception {
|
|
return getCacheMessageQueueWrapper(this.topicCache, topicName);
|
|
}
|
|
@@ -136,7 +212,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
|
|
|
|
protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) {
|
|
if (isTopicRouteValid(topicRouteData)) {
|
|
- MessageQueueView tmp = new MessageQueueView(topic, topicRouteData);
|
|
+ MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, TopicRouteService.this.getMqFaultStrategy());
|
|
log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp);
|
|
return tmp;
|
|
}
|
|
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
|
|
index 7fd9a9ffd..77ae5e4d1 100644
|
|
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
|
|
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
|
|
@@ -93,7 +93,6 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
|
|
pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong()))
|
|
.thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList())));
|
|
|
|
-
|
|
ProxyContext context = createContext();
|
|
context.setRemainingMs(1L);
|
|
this.receiveMessageActivity.receiveMessage(
|
|
@@ -274,7 +273,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
|
|
}
|
|
|
|
@Test
|
|
- public void testReceiveMessageQueueSelector() {
|
|
+ public void testReceiveMessageQueueSelector() throws Exception {
|
|
TopicRouteData topicRouteData = new TopicRouteData();
|
|
List<QueueData> queueDatas = new ArrayList<>();
|
|
for (int i = 0; i < 2; i++) {
|
|
@@ -298,7 +297,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
|
|
}
|
|
topicRouteData.setBrokerDatas(brokerDatas);
|
|
|
|
- MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData);
|
|
+ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null);
|
|
ReceiveMessageActivity.ReceiveMessageQueueSelector selector = new ReceiveMessageActivity.ReceiveMessageQueueSelector("");
|
|
|
|
AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView);
|
|
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
|
|
index 588423bb9..4882a5ed8 100644
|
|
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
|
|
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
|
|
@@ -35,6 +35,8 @@ import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
+import org.apache.rocketmq.client.ClientConfig;
|
|
+import org.apache.rocketmq.client.latency.MQFaultStrategy;
|
|
import org.apache.rocketmq.client.producer.SendResult;
|
|
import org.apache.rocketmq.client.producer.SendStatus;
|
|
import org.apache.rocketmq.common.MixAll;
|
|
@@ -49,6 +51,7 @@ import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
|
|
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException;
|
|
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
|
|
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
|
|
+import org.apache.rocketmq.proxy.service.route.TopicRouteService;
|
|
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
|
|
import org.apache.rocketmq.remoting.protocol.route.QueueData;
|
|
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
|
|
@@ -62,15 +65,19 @@ import static org.junit.Assert.assertThrows;
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
import static org.mockito.ArgumentMatchers.anyInt;
|
|
import static org.mockito.ArgumentMatchers.anyString;
|
|
+import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.when;
|
|
|
|
public class SendMessageActivityTest extends BaseActivityTest {
|
|
|
|
protected static final String BROKER_NAME = "broker";
|
|
+ protected static final String BROKER_NAME2 = "broker2";
|
|
protected static final String CLUSTER_NAME = "cluster";
|
|
protected static final String BROKER_ADDR = "127.0.0.1:10911";
|
|
+ protected static final String BROKER_ADDR2 = "127.0.0.1:10912";
|
|
private static final String TOPIC = "topic";
|
|
private static final String CONSUMER_GROUP = "consumerGroup";
|
|
+ MQFaultStrategy mqFaultStrategy;
|
|
|
|
private SendMessageActivity sendMessageActivity;
|
|
|
|
@@ -262,7 +269,7 @@ public class SendMessageActivityTest extends BaseActivityTest {
|
|
}
|
|
|
|
@Test
|
|
- public void testSendOrderMessageQueueSelector() {
|
|
+ public void testSendOrderMessageQueueSelector() throws Exception {
|
|
TopicRouteData topicRouteData = new TopicRouteData();
|
|
QueueData queueData = new QueueData();
|
|
BrokerData brokerData = new BrokerData();
|
|
@@ -277,7 +284,7 @@ public class SendMessageActivityTest extends BaseActivityTest {
|
|
brokerData.setBrokerAddrs(brokerAddrs);
|
|
topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData));
|
|
|
|
- MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData);
|
|
+ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null);
|
|
SendMessageActivity.SendMessageQueueSelector selector1 = new SendMessageActivity.SendMessageQueueSelector(
|
|
SendMessageRequest.newBuilder()
|
|
.addMessages(Message.newBuilder()
|
|
@@ -288,6 +295,12 @@ public class SendMessageActivityTest extends BaseActivityTest {
|
|
.build()
|
|
);
|
|
|
|
+ TopicRouteService topicRouteService = mock(TopicRouteService.class);
|
|
+ MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class);
|
|
+ when(topicRouteService.getAllMessageQueueView(any(), any())).thenReturn(messageQueueView);
|
|
+ when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy);
|
|
+ when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false);
|
|
+
|
|
SendMessageActivity.SendMessageQueueSelector selector2 = new SendMessageActivity.SendMessageQueueSelector(
|
|
SendMessageRequest.newBuilder()
|
|
.addMessages(Message.newBuilder()
|
|
@@ -328,12 +341,17 @@ public class SendMessageActivityTest extends BaseActivityTest {
|
|
brokerData.setBrokerAddrs(brokerAddrs);
|
|
topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData));
|
|
|
|
- MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData);
|
|
+
|
|
SendMessageActivity.SendMessageQueueSelector selector = new SendMessageActivity.SendMessageQueueSelector(
|
|
SendMessageRequest.newBuilder()
|
|
.addMessages(Message.newBuilder().build())
|
|
.build()
|
|
);
|
|
+ TopicRouteService topicRouteService = mock(TopicRouteService.class);
|
|
+ MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class);
|
|
+ when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy);
|
|
+ when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false);
|
|
+ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy());
|
|
|
|
AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView);
|
|
AddressableMessageQueue secondSelect = selector.select(ProxyContext.create(), messageQueueView);
|
|
@@ -343,6 +361,45 @@ public class SendMessageActivityTest extends BaseActivityTest {
|
|
assertNotEquals(firstSelect, secondSelect);
|
|
}
|
|
|
|
+ @Test
|
|
+ public void testSendNormalMessageQueueSelectorPipeLine() throws Exception {
|
|
+ TopicRouteData topicRouteData = new TopicRouteData();
|
|
+ int queueNums = 2;
|
|
+
|
|
+ QueueData queueData = createQueueData(BROKER_NAME, queueNums);
|
|
+ QueueData queueData2 = createQueueData(BROKER_NAME2, queueNums);
|
|
+ topicRouteData.setQueueDatas(Lists.newArrayList(queueData,queueData2));
|
|
+
|
|
+
|
|
+ BrokerData brokerData = createBrokerData(CLUSTER_NAME, BROKER_NAME, BROKER_ADDR);
|
|
+ BrokerData brokerData2 = createBrokerData(CLUSTER_NAME, BROKER_NAME2, BROKER_ADDR2);
|
|
+ topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData, brokerData2));
|
|
+
|
|
+ SendMessageActivity.SendMessageQueueSelector selector = new SendMessageActivity.SendMessageQueueSelector(
|
|
+ SendMessageRequest.newBuilder()
|
|
+ .addMessages(Message.newBuilder().build())
|
|
+ .build()
|
|
+ );
|
|
+
|
|
+ ClientConfig cc = new ClientConfig();
|
|
+ this.mqFaultStrategy = new MQFaultStrategy(cc, null, null);
|
|
+ mqFaultStrategy.setSendLatencyFaultEnable(true);
|
|
+ mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, true);
|
|
+ mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, false);
|
|
+
|
|
+ TopicRouteService topicRouteService = mock(TopicRouteService.class);
|
|
+ when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy);
|
|
+ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy());
|
|
+
|
|
+
|
|
+ AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView);
|
|
+ assertEquals(firstSelect.getBrokerName(), BROKER_NAME2);
|
|
+
|
|
+ mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, false);
|
|
+ mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, true);
|
|
+ AddressableMessageQueue secondSelect = selector.select(ProxyContext.create(), messageQueueView);
|
|
+ assertEquals(secondSelect.getBrokerName(), BROKER_NAME);
|
|
+ }
|
|
@Test
|
|
public void testParameterValidate() {
|
|
// too large message body
|
|
@@ -850,4 +907,23 @@ public class SendMessageActivityTest extends BaseActivityTest {
|
|
}
|
|
return sb.toString();
|
|
}
|
|
+
|
|
+ private static QueueData createQueueData(String brokerName, int writeQueueNums) {
|
|
+ QueueData queueData = new QueueData();
|
|
+ queueData.setBrokerName(brokerName);
|
|
+ queueData.setWriteQueueNums(writeQueueNums);
|
|
+ queueData.setPerm(PermName.PERM_WRITE);
|
|
+ return queueData;
|
|
+ }
|
|
+
|
|
+ private static BrokerData createBrokerData(String clusterName, String brokerName, String brokerAddrs) {
|
|
+ BrokerData brokerData = new BrokerData();
|
|
+ brokerData.setCluster(clusterName);
|
|
+ brokerData.setBrokerName(brokerName);
|
|
+ HashMap<Long, String> brokerAddrsMap = new HashMap<>();
|
|
+ brokerAddrsMap.put(MixAll.MASTER_ID, brokerAddrs);
|
|
+ brokerData.setBrokerAddrs(brokerAddrsMap);
|
|
+
|
|
+ return brokerData;
|
|
+ }
|
|
}
|
|
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
|
|
index c97bd5a72..ca6fe909e 100644
|
|
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
|
|
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
|
|
@@ -78,7 +78,7 @@ public class BaseServiceTest extends InitConfigTest {
|
|
topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData));
|
|
|
|
when(this.topicRouteService.getAllMessageQueueView(any(), eq(ERR_TOPIC))).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, ""));
|
|
- when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData));
|
|
- when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData));
|
|
+ when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData, null));
|
|
+ when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData, null));
|
|
}
|
|
}
|
|
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
|
|
index e44ed28f4..d150f87c4 100644
|
|
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
|
|
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
|
|
@@ -30,12 +30,12 @@ public class MessageQueueSelectorTest extends BaseServiceTest {
|
|
public void testReadMessageQueue() {
|
|
queueData.setPerm(PermName.PERM_READ);
|
|
queueData.setReadQueueNums(0);
|
|
- MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true);
|
|
+ MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true);
|
|
assertTrue(messageQueueSelector.getQueues().isEmpty());
|
|
|
|
queueData.setPerm(PermName.PERM_READ);
|
|
queueData.setReadQueueNums(3);
|
|
- messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true);
|
|
+ messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true);
|
|
assertEquals(3, messageQueueSelector.getQueues().size());
|
|
assertEquals(1, messageQueueSelector.getBrokerActingQueues().size());
|
|
for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) {
|
|
@@ -58,12 +58,12 @@ public class MessageQueueSelectorTest extends BaseServiceTest {
|
|
public void testWriteMessageQueue() {
|
|
queueData.setPerm(PermName.PERM_WRITE);
|
|
queueData.setReadQueueNums(0);
|
|
- MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false);
|
|
+ MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false);
|
|
assertTrue(messageQueueSelector.getQueues().isEmpty());
|
|
|
|
queueData.setPerm(PermName.PERM_WRITE);
|
|
queueData.setWriteQueueNums(3);
|
|
- messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false);
|
|
+ messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false);
|
|
assertEquals(3, messageQueueSelector.getQueues().size());
|
|
assertEquals(1, messageQueueSelector.getBrokerActingQueues().size());
|
|
for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) {
|
|
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
|
|
index c67f4953d..43fba3d03 100644
|
|
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
|
|
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
|
|
@@ -132,7 +132,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
|
|
brokerAddr.put(0L, "127.0.0.1:10911");
|
|
brokerData.setBrokerAddrs(brokerAddr);
|
|
topicRouteData.getBrokerDatas().add(brokerData);
|
|
- MessageQueueView messageQueueView = new MessageQueueView("foo", topicRouteData);
|
|
+ MessageQueueView messageQueueView = new MessageQueueView("foo", topicRouteData, null);
|
|
when(this.topicRouteService.getAllMessageQueueView(any(), anyString())).thenReturn(messageQueueView);
|
|
}
|
|
}
|
|
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
|
|
index a0063544e..91af74cbe 100644
|
|
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
|
|
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
|
|
@@ -64,7 +64,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest {
|
|
this.clusterTransactionService = new ClusterTransactionService(this.topicRouteService, this.producerManager,
|
|
this.mqClientAPIFactory);
|
|
|
|
- MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData);
|
|
+ MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null);
|
|
when(this.topicRouteService.getAllMessageQueueView(any(), anyString()))
|
|
.thenReturn(messageQueueView);
|
|
|
|
@@ -127,7 +127,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest {
|
|
brokerData.setBrokerAddrs(brokerAddrs);
|
|
topicRouteData.getQueueDatas().add(queueData);
|
|
topicRouteData.getBrokerDatas().add(brokerData);
|
|
- when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData));
|
|
+ when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData, null));
|
|
|
|
TopicRouteData clusterTopicRouteData = new TopicRouteData();
|
|
QueueData clusterQueueData = new QueueData();
|
|
@@ -141,7 +141,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest {
|
|
brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR);
|
|
clusterBrokerData.setBrokerAddrs(brokerAddrs);
|
|
clusterTopicRouteData.setBrokerDatas(Lists.newArrayList(clusterBrokerData));
|
|
- when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, clusterTopicRouteData));
|
|
+ when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, clusterTopicRouteData, null));
|
|
|
|
TopicRouteData clusterTopicRouteData2 = new TopicRouteData();
|
|
QueueData clusterQueueData2 = new QueueData();
|
|
@@ -155,7 +155,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest {
|
|
brokerAddrs.put(MixAll.MASTER_ID, brokerAddr2);
|
|
clusterBrokerData2.setBrokerAddrs(brokerAddrs);
|
|
clusterTopicRouteData2.setBrokerDatas(Lists.newArrayList(clusterBrokerData2));
|
|
- when(this.topicRouteService.getAllMessageQueueView(any(), eq(clusterName2))).thenReturn(new MessageQueueView(clusterName2, clusterTopicRouteData2));
|
|
+ when(this.topicRouteService.getAllMessageQueueView(any(), eq(clusterName2))).thenReturn(new MessageQueueView(clusterName2, clusterTopicRouteData2, null));
|
|
|
|
ConfigurationManager.getProxyConfig().setTransactionHeartbeatBatchNum(2);
|
|
this.clusterTransactionService.start();
|
|
--
|
|
2.32.0.windows.2
|
|
|