rocketmq/patch016-backport-Optimize-fault-tolerant-mechanism.patch
2023-10-30 21:00:21 +08:00

521 lines
25 KiB
Diff

From e11e29419f6e2d1d9673d0329e57b824ebf3da47 Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Wed, 6 Sep 2023 20:42:24 +0800
Subject: [PATCH 1/3] [ISSUE #7308] Adding topic blacklist and filter in tiered
storage module (#7310)
---
.../tieredstore/TieredDispatcher.java | 21 +++++++--
.../tieredstore/TieredMessageStore.java | 1 +
.../file/TieredFlatFileManager.java | 17 ++++---
.../TieredStoreTopicBlackListFilter.java | 45 +++++++++++++++++++
.../provider/TieredStoreTopicFilter.java | 25 +++++++++++
.../TieredStoreTopicBlackListFilterTest.java | 36 +++++++++++++++
6 files changed, 136 insertions(+), 9 deletions(-)
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 430c2b62e..766c559e9 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -48,6 +48,8 @@ import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
+import org.apache.rocketmq.tieredstore.provider.TieredStoreTopicBlackListFilter;
+import org.apache.rocketmq.tieredstore.provider.TieredStoreTopicFilter;
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -56,6 +58,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+ private TieredStoreTopicFilter topicFilter;
private final String brokerName;
private final MessageStore defaultStore;
private final TieredMessageStoreConfig storeConfig;
@@ -70,15 +73,15 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
this.defaultStore = defaultStore;
this.storeConfig = storeConfig;
this.brokerName = storeConfig.getBrokerName();
+ this.topicFilter = new TieredStoreTopicBlackListFilter();
this.tieredFlatFileManager = TieredFlatFileManager.getInstance(storeConfig);
this.dispatchRequestReadMap = new ConcurrentHashMap<>();
this.dispatchRequestWriteMap = new ConcurrentHashMap<>();
this.dispatchTaskLock = new ReentrantLock();
this.dispatchWriteLock = new ReentrantLock();
- this.initScheduleTask();
}
- private void initScheduleTask() {
+ protected void initScheduleTask() {
TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() ->
tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> {
if (!flatFile.getCompositeFlatFileLock().isLocked()) {
@@ -87,6 +90,14 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
}), 30, 10, TimeUnit.SECONDS);
}
+ public TieredStoreTopicFilter getTopicFilter() {
+ return topicFilter;
+ }
+
+ public void setTopicFilter(TieredStoreTopicFilter topicFilter) {
+ this.topicFilter = topicFilter;
+ }
+
@Override
public void dispatch(DispatchRequest request) {
if (stopped) {
@@ -94,7 +105,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
}
String topic = request.getTopic();
- if (TieredStoreUtil.isSystemTopic(topic)) {
+ if (topicFilter != null && topicFilter.filterTopic(topic)) {
return;
}
@@ -219,6 +230,10 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
return;
}
+ if (topicFilter != null && topicFilter.filterTopic(flatFile.getMessageQueue().getTopic())) {
+ return;
+ }
+
if (flatFile.getDispatchOffset() == -1L) {
return;
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 78e855f36..9fb1b2f01 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -90,6 +90,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
boolean loadNextStore = next.load();
boolean result = loadFlatFile && loadNextStore;
if (result) {
+ dispatcher.initScheduleTask();
dispatcher.start();
}
return result;
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index e9ae4a5a5..7c744af3b 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -134,21 +134,21 @@ public class TieredFlatFileManager {
public void doCleanExpiredFile() {
long expiredTimeStamp = System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
- Random random = new Random();
for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) {
- int delay = random.nextInt(storeConfig.getMaxCommitJitter());
- TieredStoreExecutor.cleanExpiredFileExecutor.schedule(() -> {
+ TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> {
flatFile.getCompositeFlatFileLock().lock();
try {
flatFile.cleanExpiredFile(expiredTimeStamp);
flatFile.destroyExpiredFile();
if (flatFile.getConsumeQueueBaseOffset() == -1) {
+ logger.info("Clean flatFile because file not initialized, topic={}, queueId={}",
+ flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId());
destroyCompositeFile(flatFile.getMessageQueue());
}
} finally {
flatFile.getCompositeFlatFileLock().unlock();
}
- }, delay, TimeUnit.MILLISECONDS);
+ });
}
if (indexFile != null) {
indexFile.cleanExpiredFile(expiredTimeStamp);
@@ -218,8 +218,13 @@ public class TieredFlatFileManager {
storeConfig.getBrokerName(), queueMetadata.getQueue().getQueueId()));
queueCount.incrementAndGet();
});
- logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms",
- topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS));
+
+ if (queueCount.get() == 0L) {
+ metadataStore.deleteTopic(topicMetadata.getTopic());
+ } else {
+ logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms",
+ topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS));
+ }
} catch (Exception e) {
logger.error("Recover TopicFlatFile error, topic: {}", topicMetadata.getTopic(), e);
} finally {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
new file mode 100644
index 000000000..50adbb713
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+
+public class TieredStoreTopicBlackListFilter implements TieredStoreTopicFilter {
+
+ private final Set<String> topicBlackSet;
+
+ public TieredStoreTopicBlackListFilter() {
+ this.topicBlackSet = new HashSet<>();
+ }
+
+ @Override
+ public boolean filterTopic(String topicName) {
+ if (StringUtils.isBlank(topicName)) {
+ return true;
+ }
+ return TieredStoreUtil.isSystemTopic(topicName) || topicBlackSet.contains(topicName);
+ }
+
+ @Override
+ public void addTopicToWhiteList(String topicName) {
+ this.topicBlackSet.add(topicName);
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
new file mode 100644
index 000000000..3f26b8b02
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider;
+
+public interface TieredStoreTopicFilter {
+
+ boolean filterTopic(String topicName);
+
+ void addTopicToWhiteList(String topicName);
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
new file mode 100644
index 000000000..2bf48173c
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.provider;
+
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TieredStoreTopicBlackListFilterTest {
+
+ @Test
+ public void filterTopicTest() {
+ TieredStoreTopicFilter topicFilter = new TieredStoreTopicBlackListFilter();
+ Assert.assertTrue(topicFilter.filterTopic(""));
+ Assert.assertTrue(topicFilter.filterTopic(TopicValidator.SYSTEM_TOPIC_PREFIX + "_Topic"));
+
+ String topicName = "WhiteTopic";
+ Assert.assertFalse(topicFilter.filterTopic(topicName));
+ topicFilter.addTopicToWhiteList(topicName);
+ Assert.assertTrue(topicFilter.filterTopic(topicName));
+ }
+}
\ No newline at end of file
--
2.32.0.windows.2
From 628020537fa7035226bc8dcde9fa33d9d5df30ff Mon Sep 17 00:00:00 2001
From: rongtong <jinrongtong5@163.com>
Date: Thu, 7 Sep 2023 16:17:47 +0800
Subject: [PATCH 2/3] [ISSUE #7293] Fix NPE when alter sync state set
---
.../rocketmq/controller/impl/manager/ReplicasInfoManager.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index b0a67531d..d83a690f9 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -104,7 +104,7 @@ public class ReplicasInfoManager {
}
// Check master
- if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) {
+ if (syncStateInfo.getMasterBrokerId() == null || !syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) {
String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}",
syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId());
LOGGER.error("{}", err);
--
2.32.0.windows.2
From 6fd0073d6475c539e8f4c30dc4f104a56a21d724 Mon Sep 17 00:00:00 2001
From: Ji Juntao <juntao.jjt@alibaba-inc.com>
Date: Thu, 7 Sep 2023 20:21:16 +0800
Subject: [PATCH 3/3] [ISSUE #7319] Optimize fault-tolerant mechanism for
sending messages and hot update switch (#7320)
---
.../impl/producer/DefaultMQProducerImpl.java | 8 ++------
.../client/latency/LatencyFaultTolerance.java | 14 +++++++++++++
.../latency/LatencyFaultToleranceImpl.java | 13 +++++++++++-
.../client/latency/MQFaultStrategy.java | 20 +++++++------------
.../proxy/service/route/MessageQueueView.java | 9 ---------
.../service/route/TopicRouteService.java | 10 +++++++++-
6 files changed, 44 insertions(+), 30 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 2d6b83ac2..b0c212e46 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -263,9 +263,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
mQClientFactory.start();
}
- if (this.mqFaultStrategy.isStartDetectorEnable()) {
- this.mqFaultStrategy.startDetector();
- }
+ this.mqFaultStrategy.startDetector();
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
@@ -311,9 +309,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (shutdownFactory) {
this.mQClientFactory.shutdown();
}
- if (this.mqFaultStrategy.isStartDetectorEnable()) {
- this.mqFaultStrategy.shutdown();
- }
+ this.mqFaultStrategy.shutdown();
RequestFutureHolder.getInstance().shutdown(this);
log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
index 72d2f3450..17aaa266a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
@@ -89,4 +89,18 @@ public interface LatencyFaultTolerance<T> {
* @param detectInterval each broker's detecting interval
*/
void setDetectInterval(final int detectInterval);
+
+ /**
+ * Use it to set the detector work or not.
+ *
+ * @param startDetectorEnable set the detector's work status
+ */
+ void setStartDetectorEnable(final boolean startDetectorEnable);
+
+ /**
+ * Use it to judge if the detector enabled.
+ *
+ * @return is the detector should be started.
+ */
+ boolean isStartDetectorEnable();
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
index 8af629574..d3ff7eb45 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -37,6 +37,8 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
private int detectTimeout = 200;
private int detectInterval = 2000;
private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
+
+ private volatile boolean startDetectorEnable = false;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@@ -80,7 +82,9 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
@Override
public void run() {
try {
- detectByOneRound();
+ if (startDetectorEnable) {
+ detectByOneRound();
+ }
} catch (Exception e) {
log.warn("Unexpected exception raised while detecting service reachability", e);
}
@@ -137,6 +141,13 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
this.faultItemTable.remove(name);
}
+ public boolean isStartDetectorEnable() {
+ return startDetectorEnable;
+ }
+
+ public void setStartDetectorEnable(boolean startDetectorEnable) {
+ this.startDetectorEnable = startDetectorEnable;
+ }
@Override
public String pickOneAtLeast() {
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index c01490784..69fb533e5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -24,8 +24,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
public class MQFaultStrategy {
private LatencyFaultTolerance<String> latencyFaultTolerance;
- private boolean sendLatencyFaultEnable;
- private boolean startDetectorEnable;
+ private volatile boolean sendLatencyFaultEnable;
+ private volatile boolean startDetectorEnable;
private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 10000L, 30000L};
@@ -64,11 +64,11 @@ public class MQFaultStrategy {
public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector serviceDetector) {
- this.setStartDetectorEnable(cc.isStartDetectorEnable());
- this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher, serviceDetector);
this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval());
this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout());
+ this.setStartDetectorEnable(cc.isStartDetectorEnable());
+ this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
}
// For unit test.
@@ -123,21 +123,15 @@ public class MQFaultStrategy {
public void setStartDetectorEnable(boolean startDetectorEnable) {
this.startDetectorEnable = startDetectorEnable;
+ this.latencyFaultTolerance.setStartDetectorEnable(startDetectorEnable);
}
public void startDetector() {
- // user should start the detector
- // and the thread should not be in running state.
- if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
- // start the detector.
- this.latencyFaultTolerance.startDetector();
- }
+ this.latencyFaultTolerance.startDetector();
}
public void shutdown() {
- if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
- this.latencyFaultTolerance.shutdown();
- }
+ this.latencyFaultTolerance.shutdown();
}
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
index 8b3c2f7c8..898e529f8 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
@@ -26,7 +26,6 @@ public class MessageQueueView {
private final MessageQueueSelector readSelector;
private final MessageQueueSelector writeSelector;
private final TopicRouteWrapper topicRouteWrapper;
- private MQFaultStrategy mqFaultStrategy;
public MessageQueueView(String topic, TopicRouteData topicRouteData, MQFaultStrategy mqFaultStrategy) {
this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic);
@@ -67,12 +66,4 @@ public class MessageQueueView {
.add("topicRouteWrapper", topicRouteWrapper)
.toString();
}
-
- public MQFaultStrategy getMQFaultStrategy() {
- return mqFaultStrategy;
- }
-
- public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
- this.mqFaultStrategy = mqFaultStrategy;
- }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index 74769a423..caf62a1e0 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -127,7 +127,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
@Override
public String resolve(String name) {
try {
- String brokerAddr = getBrokerAddr(null, name);
+ String brokerAddr = getBrokerAddr(ProxyContext.createForInner("MQFaultStrategy"), name);
return brokerAddr;
} catch (Exception e) {
return null;
@@ -175,9 +175,17 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation,
boolean reachable) {
+ checkSendFaultToleranceEnable();
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable);
}
+ public void checkSendFaultToleranceEnable() {
+ boolean hotLatencySwitch = ConfigurationManager.getProxyConfig().isSendLatencyEnable();
+ boolean hotDetectorSwitch = ConfigurationManager.getProxyConfig().isStartDetectorEnable();
+ this.mqFaultStrategy.setSendLatencyFaultEnable(hotLatencySwitch);
+ this.mqFaultStrategy.setStartDetectorEnable(hotDetectorSwitch);
+ }
+
public MQFaultStrategy getMqFaultStrategy() {
return this.mqFaultStrategy;
}
--
2.32.0.windows.2