521 lines
25 KiB
Diff
521 lines
25 KiB
Diff
From e11e29419f6e2d1d9673d0329e57b824ebf3da47 Mon Sep 17 00:00:00 2001
|
|
From: lizhimins <707364882@qq.com>
|
|
Date: Wed, 6 Sep 2023 20:42:24 +0800
|
|
Subject: [PATCH 1/3] [ISSUE #7308] Adding topic blacklist and filter in tiered
|
|
storage module (#7310)
|
|
|
|
---
|
|
.../tieredstore/TieredDispatcher.java | 21 +++++++--
|
|
.../tieredstore/TieredMessageStore.java | 1 +
|
|
.../file/TieredFlatFileManager.java | 17 ++++---
|
|
.../TieredStoreTopicBlackListFilter.java | 45 +++++++++++++++++++
|
|
.../provider/TieredStoreTopicFilter.java | 25 +++++++++++
|
|
.../TieredStoreTopicBlackListFilterTest.java | 36 +++++++++++++++
|
|
6 files changed, 136 insertions(+), 9 deletions(-)
|
|
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
|
|
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
|
|
create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
|
index 430c2b62e..766c559e9 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
|
@@ -48,6 +48,8 @@ import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
|
|
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
|
|
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
|
|
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
|
|
+import org.apache.rocketmq.tieredstore.provider.TieredStoreTopicBlackListFilter;
|
|
+import org.apache.rocketmq.tieredstore.provider.TieredStoreTopicFilter;
|
|
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
|
|
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
|
|
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
@@ -56,6 +58,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
|
|
|
|
+ private TieredStoreTopicFilter topicFilter;
|
|
private final String brokerName;
|
|
private final MessageStore defaultStore;
|
|
private final TieredMessageStoreConfig storeConfig;
|
|
@@ -70,15 +73,15 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
|
|
this.defaultStore = defaultStore;
|
|
this.storeConfig = storeConfig;
|
|
this.brokerName = storeConfig.getBrokerName();
|
|
+ this.topicFilter = new TieredStoreTopicBlackListFilter();
|
|
this.tieredFlatFileManager = TieredFlatFileManager.getInstance(storeConfig);
|
|
this.dispatchRequestReadMap = new ConcurrentHashMap<>();
|
|
this.dispatchRequestWriteMap = new ConcurrentHashMap<>();
|
|
this.dispatchTaskLock = new ReentrantLock();
|
|
this.dispatchWriteLock = new ReentrantLock();
|
|
- this.initScheduleTask();
|
|
}
|
|
|
|
- private void initScheduleTask() {
|
|
+ protected void initScheduleTask() {
|
|
TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() ->
|
|
tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> {
|
|
if (!flatFile.getCompositeFlatFileLock().isLocked()) {
|
|
@@ -87,6 +90,14 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
|
|
}), 30, 10, TimeUnit.SECONDS);
|
|
}
|
|
|
|
+ public TieredStoreTopicFilter getTopicFilter() {
|
|
+ return topicFilter;
|
|
+ }
|
|
+
|
|
+ public void setTopicFilter(TieredStoreTopicFilter topicFilter) {
|
|
+ this.topicFilter = topicFilter;
|
|
+ }
|
|
+
|
|
@Override
|
|
public void dispatch(DispatchRequest request) {
|
|
if (stopped) {
|
|
@@ -94,7 +105,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
|
|
}
|
|
|
|
String topic = request.getTopic();
|
|
- if (TieredStoreUtil.isSystemTopic(topic)) {
|
|
+ if (topicFilter != null && topicFilter.filterTopic(topic)) {
|
|
return;
|
|
}
|
|
|
|
@@ -219,6 +230,10 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
|
|
return;
|
|
}
|
|
|
|
+ if (topicFilter != null && topicFilter.filterTopic(flatFile.getMessageQueue().getTopic())) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
if (flatFile.getDispatchOffset() == -1L) {
|
|
return;
|
|
}
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
index 78e855f36..9fb1b2f01 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
@@ -90,6 +90,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
boolean loadNextStore = next.load();
|
|
boolean result = loadFlatFile && loadNextStore;
|
|
if (result) {
|
|
+ dispatcher.initScheduleTask();
|
|
dispatcher.start();
|
|
}
|
|
return result;
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
|
|
index e9ae4a5a5..7c744af3b 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
|
|
@@ -134,21 +134,21 @@ public class TieredFlatFileManager {
|
|
public void doCleanExpiredFile() {
|
|
long expiredTimeStamp = System.currentTimeMillis() -
|
|
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
|
|
- Random random = new Random();
|
|
for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) {
|
|
- int delay = random.nextInt(storeConfig.getMaxCommitJitter());
|
|
- TieredStoreExecutor.cleanExpiredFileExecutor.schedule(() -> {
|
|
+ TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> {
|
|
flatFile.getCompositeFlatFileLock().lock();
|
|
try {
|
|
flatFile.cleanExpiredFile(expiredTimeStamp);
|
|
flatFile.destroyExpiredFile();
|
|
if (flatFile.getConsumeQueueBaseOffset() == -1) {
|
|
+ logger.info("Clean flatFile because file not initialized, topic={}, queueId={}",
|
|
+ flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId());
|
|
destroyCompositeFile(flatFile.getMessageQueue());
|
|
}
|
|
} finally {
|
|
flatFile.getCompositeFlatFileLock().unlock();
|
|
}
|
|
- }, delay, TimeUnit.MILLISECONDS);
|
|
+ });
|
|
}
|
|
if (indexFile != null) {
|
|
indexFile.cleanExpiredFile(expiredTimeStamp);
|
|
@@ -218,8 +218,13 @@ public class TieredFlatFileManager {
|
|
storeConfig.getBrokerName(), queueMetadata.getQueue().getQueueId()));
|
|
queueCount.incrementAndGet();
|
|
});
|
|
- logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms",
|
|
- topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS));
|
|
+
|
|
+ if (queueCount.get() == 0L) {
|
|
+ metadataStore.deleteTopic(topicMetadata.getTopic());
|
|
+ } else {
|
|
+ logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms",
|
|
+ topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS));
|
|
+ }
|
|
} catch (Exception e) {
|
|
logger.error("Recover TopicFlatFile error, topic: {}", topicMetadata.getTopic(), e);
|
|
} finally {
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
|
|
new file mode 100644
|
|
index 000000000..50adbb713
|
|
--- /dev/null
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
|
|
@@ -0,0 +1,45 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+
|
|
+package org.apache.rocketmq.tieredstore.provider;
|
|
+
|
|
+import java.util.HashSet;
|
|
+import java.util.Set;
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
+
|
|
+public class TieredStoreTopicBlackListFilter implements TieredStoreTopicFilter {
|
|
+
|
|
+ private final Set<String> topicBlackSet;
|
|
+
|
|
+ public TieredStoreTopicBlackListFilter() {
|
|
+ this.topicBlackSet = new HashSet<>();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean filterTopic(String topicName) {
|
|
+ if (StringUtils.isBlank(topicName)) {
|
|
+ return true;
|
|
+ }
|
|
+ return TieredStoreUtil.isSystemTopic(topicName) || topicBlackSet.contains(topicName);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void addTopicToWhiteList(String topicName) {
|
|
+ this.topicBlackSet.add(topicName);
|
|
+ }
|
|
+}
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
|
|
new file mode 100644
|
|
index 000000000..3f26b8b02
|
|
--- /dev/null
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
|
|
@@ -0,0 +1,25 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+
|
|
+package org.apache.rocketmq.tieredstore.provider;
|
|
+
|
|
+public interface TieredStoreTopicFilter {
|
|
+
|
|
+ boolean filterTopic(String topicName);
|
|
+
|
|
+ void addTopicToWhiteList(String topicName);
|
|
+}
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
|
|
new file mode 100644
|
|
index 000000000..2bf48173c
|
|
--- /dev/null
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
|
|
@@ -0,0 +1,36 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+package org.apache.rocketmq.tieredstore.provider;
|
|
+
|
|
+import org.apache.rocketmq.common.topic.TopicValidator;
|
|
+import org.junit.Assert;
|
|
+import org.junit.Test;
|
|
+
|
|
+public class TieredStoreTopicBlackListFilterTest {
|
|
+
|
|
+ @Test
|
|
+ public void filterTopicTest() {
|
|
+ TieredStoreTopicFilter topicFilter = new TieredStoreTopicBlackListFilter();
|
|
+ Assert.assertTrue(topicFilter.filterTopic(""));
|
|
+ Assert.assertTrue(topicFilter.filterTopic(TopicValidator.SYSTEM_TOPIC_PREFIX + "_Topic"));
|
|
+
|
|
+ String topicName = "WhiteTopic";
|
|
+ Assert.assertFalse(topicFilter.filterTopic(topicName));
|
|
+ topicFilter.addTopicToWhiteList(topicName);
|
|
+ Assert.assertTrue(topicFilter.filterTopic(topicName));
|
|
+ }
|
|
+}
|
|
\ No newline at end of file
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 628020537fa7035226bc8dcde9fa33d9d5df30ff Mon Sep 17 00:00:00 2001
|
|
From: rongtong <jinrongtong5@163.com>
|
|
Date: Thu, 7 Sep 2023 16:17:47 +0800
|
|
Subject: [PATCH 2/3] [ISSUE #7293] Fix NPE when alter sync state set
|
|
|
|
---
|
|
.../rocketmq/controller/impl/manager/ReplicasInfoManager.java | 2 +-
|
|
1 file changed, 1 insertion(+), 1 deletion(-)
|
|
|
|
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
|
|
index b0a67531d..d83a690f9 100644
|
|
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
|
|
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
|
|
@@ -104,7 +104,7 @@ public class ReplicasInfoManager {
|
|
}
|
|
|
|
// Check master
|
|
- if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) {
|
|
+ if (syncStateInfo.getMasterBrokerId() == null || !syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) {
|
|
String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}",
|
|
syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId());
|
|
LOGGER.error("{}", err);
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 6fd0073d6475c539e8f4c30dc4f104a56a21d724 Mon Sep 17 00:00:00 2001
|
|
From: Ji Juntao <juntao.jjt@alibaba-inc.com>
|
|
Date: Thu, 7 Sep 2023 20:21:16 +0800
|
|
Subject: [PATCH 3/3] [ISSUE #7319] Optimize fault-tolerant mechanism for
|
|
sending messages and hot update switch (#7320)
|
|
|
|
---
|
|
.../impl/producer/DefaultMQProducerImpl.java | 8 ++------
|
|
.../client/latency/LatencyFaultTolerance.java | 14 +++++++++++++
|
|
.../latency/LatencyFaultToleranceImpl.java | 13 +++++++++++-
|
|
.../client/latency/MQFaultStrategy.java | 20 +++++++------------
|
|
.../proxy/service/route/MessageQueueView.java | 9 ---------
|
|
.../service/route/TopicRouteService.java | 10 +++++++++-
|
|
6 files changed, 44 insertions(+), 30 deletions(-)
|
|
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
|
index 2d6b83ac2..b0c212e46 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
|
@@ -263,9 +263,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
mQClientFactory.start();
|
|
}
|
|
|
|
- if (this.mqFaultStrategy.isStartDetectorEnable()) {
|
|
- this.mqFaultStrategy.startDetector();
|
|
- }
|
|
+ this.mqFaultStrategy.startDetector();
|
|
|
|
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
|
|
this.defaultMQProducer.isSendMessageWithVIPChannel());
|
|
@@ -311,9 +309,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
if (shutdownFactory) {
|
|
this.mQClientFactory.shutdown();
|
|
}
|
|
- if (this.mqFaultStrategy.isStartDetectorEnable()) {
|
|
- this.mqFaultStrategy.shutdown();
|
|
- }
|
|
+ this.mqFaultStrategy.shutdown();
|
|
RequestFutureHolder.getInstance().shutdown(this);
|
|
log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
|
|
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
|
|
index 72d2f3450..17aaa266a 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
|
|
@@ -89,4 +89,18 @@ public interface LatencyFaultTolerance<T> {
|
|
* @param detectInterval each broker's detecting interval
|
|
*/
|
|
void setDetectInterval(final int detectInterval);
|
|
+
|
|
+ /**
|
|
+ * Use it to set the detector work or not.
|
|
+ *
|
|
+ * @param startDetectorEnable set the detector's work status
|
|
+ */
|
|
+ void setStartDetectorEnable(final boolean startDetectorEnable);
|
|
+
|
|
+ /**
|
|
+ * Use it to judge if the detector enabled.
|
|
+ *
|
|
+ * @return is the detector should be started.
|
|
+ */
|
|
+ boolean isStartDetectorEnable();
|
|
}
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
|
|
index 8af629574..d3ff7eb45 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
|
|
@@ -37,6 +37,8 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
|
|
private int detectTimeout = 200;
|
|
private int detectInterval = 2000;
|
|
private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
|
|
+
|
|
+ private volatile boolean startDetectorEnable = false;
|
|
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
|
|
@Override
|
|
public Thread newThread(Runnable r) {
|
|
@@ -80,7 +82,9 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
|
|
@Override
|
|
public void run() {
|
|
try {
|
|
- detectByOneRound();
|
|
+ if (startDetectorEnable) {
|
|
+ detectByOneRound();
|
|
+ }
|
|
} catch (Exception e) {
|
|
log.warn("Unexpected exception raised while detecting service reachability", e);
|
|
}
|
|
@@ -137,6 +141,13 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
|
|
this.faultItemTable.remove(name);
|
|
}
|
|
|
|
+ public boolean isStartDetectorEnable() {
|
|
+ return startDetectorEnable;
|
|
+ }
|
|
+
|
|
+ public void setStartDetectorEnable(boolean startDetectorEnable) {
|
|
+ this.startDetectorEnable = startDetectorEnable;
|
|
+ }
|
|
@Override
|
|
public String pickOneAtLeast() {
|
|
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
|
|
index c01490784..69fb533e5 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
|
|
@@ -24,8 +24,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
|
|
|
|
public class MQFaultStrategy {
|
|
private LatencyFaultTolerance<String> latencyFaultTolerance;
|
|
- private boolean sendLatencyFaultEnable;
|
|
- private boolean startDetectorEnable;
|
|
+ private volatile boolean sendLatencyFaultEnable;
|
|
+ private volatile boolean startDetectorEnable;
|
|
private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L};
|
|
private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 10000L, 30000L};
|
|
|
|
@@ -64,11 +64,11 @@ public class MQFaultStrategy {
|
|
|
|
|
|
public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector serviceDetector) {
|
|
- this.setStartDetectorEnable(cc.isStartDetectorEnable());
|
|
- this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
|
|
this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher, serviceDetector);
|
|
this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval());
|
|
this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout());
|
|
+ this.setStartDetectorEnable(cc.isStartDetectorEnable());
|
|
+ this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
|
|
}
|
|
|
|
// For unit test.
|
|
@@ -123,21 +123,15 @@ public class MQFaultStrategy {
|
|
|
|
public void setStartDetectorEnable(boolean startDetectorEnable) {
|
|
this.startDetectorEnable = startDetectorEnable;
|
|
+ this.latencyFaultTolerance.setStartDetectorEnable(startDetectorEnable);
|
|
}
|
|
|
|
public void startDetector() {
|
|
- // user should start the detector
|
|
- // and the thread should not be in running state.
|
|
- if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
|
|
- // start the detector.
|
|
- this.latencyFaultTolerance.startDetector();
|
|
- }
|
|
+ this.latencyFaultTolerance.startDetector();
|
|
}
|
|
|
|
public void shutdown() {
|
|
- if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
|
|
- this.latencyFaultTolerance.shutdown();
|
|
- }
|
|
+ this.latencyFaultTolerance.shutdown();
|
|
}
|
|
|
|
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
|
|
index 8b3c2f7c8..898e529f8 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
|
|
@@ -26,7 +26,6 @@ public class MessageQueueView {
|
|
private final MessageQueueSelector readSelector;
|
|
private final MessageQueueSelector writeSelector;
|
|
private final TopicRouteWrapper topicRouteWrapper;
|
|
- private MQFaultStrategy mqFaultStrategy;
|
|
|
|
public MessageQueueView(String topic, TopicRouteData topicRouteData, MQFaultStrategy mqFaultStrategy) {
|
|
this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic);
|
|
@@ -67,12 +66,4 @@ public class MessageQueueView {
|
|
.add("topicRouteWrapper", topicRouteWrapper)
|
|
.toString();
|
|
}
|
|
-
|
|
- public MQFaultStrategy getMQFaultStrategy() {
|
|
- return mqFaultStrategy;
|
|
- }
|
|
-
|
|
- public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
|
|
- this.mqFaultStrategy = mqFaultStrategy;
|
|
- }
|
|
}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
|
|
index 74769a423..caf62a1e0 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
|
|
@@ -127,7 +127,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
|
|
@Override
|
|
public String resolve(String name) {
|
|
try {
|
|
- String brokerAddr = getBrokerAddr(null, name);
|
|
+ String brokerAddr = getBrokerAddr(ProxyContext.createForInner("MQFaultStrategy"), name);
|
|
return brokerAddr;
|
|
} catch (Exception e) {
|
|
return null;
|
|
@@ -175,9 +175,17 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
|
|
|
|
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation,
|
|
boolean reachable) {
|
|
+ checkSendFaultToleranceEnable();
|
|
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable);
|
|
}
|
|
|
|
+ public void checkSendFaultToleranceEnable() {
|
|
+ boolean hotLatencySwitch = ConfigurationManager.getProxyConfig().isSendLatencyEnable();
|
|
+ boolean hotDetectorSwitch = ConfigurationManager.getProxyConfig().isStartDetectorEnable();
|
|
+ this.mqFaultStrategy.setSendLatencyFaultEnable(hotLatencySwitch);
|
|
+ this.mqFaultStrategy.setStartDetectorEnable(hotDetectorSwitch);
|
|
+ }
|
|
+
|
|
public MQFaultStrategy getMqFaultStrategy() {
|
|
return this.mqFaultStrategy;
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|