rocketmq/patch013-backport-enhance-admin-output.patch
2023-10-06 23:51:02 +08:00

893 lines
44 KiB
Diff

From 7e018520ef707a841c66c55d621f6560d03b631b Mon Sep 17 00:00:00 2001
From: Zhouxiang Zhan <zhouxzhan@apache.org>
Date: Fri, 25 Aug 2023 09:49:22 +0800
Subject: [PATCH 1/6] Add expireAfterAccess for cache (#7247)
Add expireAfterAccess for cache
---
.../rocketmq/proxy/config/ProxyConfig.java | 59 ++++++++++++++-----
.../metadata/ClusterMetadataService.java | 6 +-
.../service/route/TopicRouteService.java | 14 +++--
3 files changed, 56 insertions(+), 23 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 76a243919..2994893d7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -155,14 +155,17 @@ public class ProxyConfig implements ConfigFile {
private int consumerProcessorThreadPoolQueueCapacity = 10000;
private boolean useEndpointPortFromRequest = false;
- private int topicRouteServiceCacheExpiredInSeconds = 20;
+
+ private int topicRouteServiceCacheExpiredSeconds = 300;
+ private int topicRouteServiceCacheRefreshSeconds = 20;
private int topicRouteServiceCacheMaxNum = 20000;
private int topicRouteServiceThreadPoolNums = PROCESSOR_NUMBER;
private int topicRouteServiceThreadPoolQueueCapacity = 5000;
-
- private int topicConfigCacheExpiredInSeconds = 20;
+ private int topicConfigCacheExpiredSeconds = 300;
+ private int topicConfigCacheRefreshSeconds = 20;
private int topicConfigCacheMaxNum = 20000;
- private int subscriptionGroupConfigCacheExpiredInSeconds = 20;
+ private int subscriptionGroupConfigCacheExpiredSeconds = 300;
+ private int subscriptionGroupConfigCacheRefreshSeconds = 20;
private int subscriptionGroupConfigCacheMaxNum = 20000;
private int metadataThreadPoolNums = 3;
private int metadataThreadPoolQueueCapacity = 100000;
@@ -794,12 +797,20 @@ public class ProxyConfig implements ConfigFile {
this.consumerProcessorThreadPoolQueueCapacity = consumerProcessorThreadPoolQueueCapacity;
}
- public int getTopicRouteServiceCacheExpiredInSeconds() {
- return topicRouteServiceCacheExpiredInSeconds;
+ public int getTopicRouteServiceCacheExpiredSeconds() {
+ return topicRouteServiceCacheExpiredSeconds;
+ }
+
+ public void setTopicRouteServiceCacheExpiredSeconds(int topicRouteServiceCacheExpiredSeconds) {
+ this.topicRouteServiceCacheExpiredSeconds = topicRouteServiceCacheExpiredSeconds;
}
- public void setTopicRouteServiceCacheExpiredInSeconds(int topicRouteServiceCacheExpiredInSeconds) {
- this.topicRouteServiceCacheExpiredInSeconds = topicRouteServiceCacheExpiredInSeconds;
+ public int getTopicRouteServiceCacheRefreshSeconds() {
+ return topicRouteServiceCacheRefreshSeconds;
+ }
+
+ public void setTopicRouteServiceCacheRefreshSeconds(int topicRouteServiceCacheRefreshSeconds) {
+ this.topicRouteServiceCacheRefreshSeconds = topicRouteServiceCacheRefreshSeconds;
}
public int getTopicRouteServiceCacheMaxNum() {
@@ -826,12 +837,20 @@ public class ProxyConfig implements ConfigFile {
this.topicRouteServiceThreadPoolQueueCapacity = topicRouteServiceThreadPoolQueueCapacity;
}
- public int getTopicConfigCacheExpiredInSeconds() {
- return topicConfigCacheExpiredInSeconds;
+ public int getTopicConfigCacheRefreshSeconds() {
+ return topicConfigCacheRefreshSeconds;
+ }
+
+ public void setTopicConfigCacheRefreshSeconds(int topicConfigCacheRefreshSeconds) {
+ this.topicConfigCacheRefreshSeconds = topicConfigCacheRefreshSeconds;
+ }
+
+ public int getTopicConfigCacheExpiredSeconds() {
+ return topicConfigCacheExpiredSeconds;
}
- public void setTopicConfigCacheExpiredInSeconds(int topicConfigCacheExpiredInSeconds) {
- this.topicConfigCacheExpiredInSeconds = topicConfigCacheExpiredInSeconds;
+ public void setTopicConfigCacheExpiredSeconds(int topicConfigCacheExpiredSeconds) {
+ this.topicConfigCacheExpiredSeconds = topicConfigCacheExpiredSeconds;
}
public int getTopicConfigCacheMaxNum() {
@@ -842,12 +861,20 @@ public class ProxyConfig implements ConfigFile {
this.topicConfigCacheMaxNum = topicConfigCacheMaxNum;
}
- public int getSubscriptionGroupConfigCacheExpiredInSeconds() {
- return subscriptionGroupConfigCacheExpiredInSeconds;
+ public int getSubscriptionGroupConfigCacheRefreshSeconds() {
+ return subscriptionGroupConfigCacheRefreshSeconds;
+ }
+
+ public void setSubscriptionGroupConfigCacheRefreshSeconds(int subscriptionGroupConfigCacheRefreshSeconds) {
+ this.subscriptionGroupConfigCacheRefreshSeconds = subscriptionGroupConfigCacheRefreshSeconds;
+ }
+
+ public int getSubscriptionGroupConfigCacheExpiredSeconds() {
+ return subscriptionGroupConfigCacheExpiredSeconds;
}
- public void setSubscriptionGroupConfigCacheExpiredInSeconds(int subscriptionGroupConfigCacheExpiredInSeconds) {
- this.subscriptionGroupConfigCacheExpiredInSeconds = subscriptionGroupConfigCacheExpiredInSeconds;
+ public void setSubscriptionGroupConfigCacheExpiredSeconds(int subscriptionGroupConfigCacheExpiredSeconds) {
+ this.subscriptionGroupConfigCacheExpiredSeconds = subscriptionGroupConfigCacheExpiredSeconds;
}
public int getSubscriptionGroupConfigCacheMaxNum() {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
index bc9582ad8..d34a0efd9 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
@@ -69,11 +69,13 @@ public class ClusterMetadataService extends AbstractStartAndShutdown implements
);
this.topicConfigCache = CacheBuilder.newBuilder()
.maximumSize(config.getTopicConfigCacheMaxNum())
- .refreshAfterWrite(config.getTopicConfigCacheExpiredInSeconds(), TimeUnit.SECONDS)
+ .expireAfterAccess(config.getTopicConfigCacheExpiredSeconds(), TimeUnit.SECONDS)
+ .refreshAfterWrite(config.getTopicConfigCacheRefreshSeconds(), TimeUnit.SECONDS)
.build(new ClusterTopicConfigCacheLoader());
this.subscriptionGroupConfigCache = CacheBuilder.newBuilder()
.maximumSize(config.getSubscriptionGroupConfigCacheMaxNum())
- .refreshAfterWrite(config.getSubscriptionGroupConfigCacheExpiredInSeconds(), TimeUnit.SECONDS)
+ .expireAfterAccess(config.getSubscriptionGroupConfigCacheExpiredSeconds(), TimeUnit.SECONDS)
+ .refreshAfterWrite(config.getSubscriptionGroupConfigCacheRefreshSeconds(), TimeUnit.SECONDS)
.build(new ClusterSubscriptionGroupConfigCacheLoader());
this.init();
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index e012a5465..84348adc3 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -68,10 +68,13 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
);
this.mqClientAPIFactory = mqClientAPIFactory;
- this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()).
- refreshAfterWrite(config.getTopicRouteServiceCacheExpiredInSeconds(), TimeUnit.SECONDS).
- executor(cacheRefreshExecutor).build(new CacheLoader<String, MessageQueueView>() {
- @Override public @Nullable MessageQueueView load(String topic) throws Exception {
+ this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum())
+ .expireAfterAccess(config.getTopicRouteServiceCacheExpiredSeconds(), TimeUnit.SECONDS)
+ .refreshAfterWrite(config.getTopicRouteServiceCacheRefreshSeconds(), TimeUnit.SECONDS)
+ .executor(cacheRefreshExecutor)
+ .build(new CacheLoader<String, MessageQueueView>() {
+ @Override
+ public @Nullable MessageQueueView load(String topic) throws Exception {
try {
TopicRouteData topicRouteData = mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic, Duration.ofSeconds(3).toMillis());
return buildMessageQueueView(topic, topicRouteData);
@@ -83,7 +86,8 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
}
}
- @Override public @Nullable MessageQueueView reload(@NonNull String key,
+ @Override
+ public @Nullable MessageQueueView reload(@NonNull String key,
@NonNull MessageQueueView oldValue) throws Exception {
try {
return load(key);
--
2.32.0.windows.2
From 5f6dc90f9dab35809fcb0407d4d5cc2737d2335e Mon Sep 17 00:00:00 2001
From: Ziyi Tan <tanziyi0925@gmail.com>
Date: Fri, 25 Aug 2023 11:17:23 +0800
Subject: [PATCH 2/6] [ISSUE #7250] Beautify command rocksDBConfigToJson output
Co-authored-by: Ziy1-Tan <ajb4596984460@gmail.com>
---
.../metadata/RocksDBConfigToJsonCommand.java | 32 +++++++++++--------
1 file changed, 18 insertions(+), 14 deletions(-)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
index 3053f4684..3fc63e4dd 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
@@ -21,13 +21,13 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.config.RocksDBConfigManager;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
-import java.io.File;
import java.util.HashMap;
import java.util.Map;
@@ -48,7 +48,7 @@ public class RocksDBConfigToJsonCommand implements SubCommand {
@Override
public Options buildCommandlineOptions(Options options) {
Option pathOption = new Option("p", "path", true,
- "Absolute path to the metadata directory");
+ "Absolute path for the metadata directory");
pathOption.setRequired(true);
options.addOption(pathOption);
@@ -63,15 +63,14 @@ public class RocksDBConfigToJsonCommand implements SubCommand {
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
String path = commandLine.getOptionValue("path").trim();
- if (StringUtils.isEmpty(path) || !new File(path).exists()) {
+ if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) {
System.out.print("Rocksdb path is invalid.\n");
return;
}
String configType = commandLine.getOptionValue("configType").trim().toLowerCase();
- final long memTableFlushInterval = 60 * 60 * 1000L;
- RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(memTableFlushInterval);
+ RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(60 * 60 * 1000L);
try {
if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) {
// for topics.json
@@ -84,13 +83,16 @@ public class RocksDBConfigToJsonCommand implements SubCommand {
topicConfigTable.put(topic, jsonObject);
});
- if (isLoad) {
- topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable));
- final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true);
- System.out.print(topicsJsonStr + "\n");
+ if (!isLoad) {
+ System.out.print("RocksDB load error, path=" + path);
return;
}
+ topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable));
+ final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true);
+ System.out.print(topicsJsonStr + "\n");
+ return;
}
+
if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) {
// for subscriptionGroup.json
final Map<String, JSONObject> subscriptionGroupJsonConfig = new HashMap<>();
@@ -102,13 +104,15 @@ public class RocksDBConfigToJsonCommand implements SubCommand {
subscriptionGroupTable.put(subscriptionGroup, jsonObject);
});
- if (isLoad) {
- subscriptionGroupJsonConfig.put("subscriptionGroupTable",
- (JSONObject) JSONObject.toJSON(subscriptionGroupTable));
- final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true);
- System.out.print(subscriptionGroupJsonStr + "\n");
+ if (!isLoad) {
+ System.out.print("RocksDB load error, path=" + path);
return;
}
+ subscriptionGroupJsonConfig.put("subscriptionGroupTable",
+ (JSONObject) JSONObject.toJSON(subscriptionGroupTable));
+ final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true);
+ System.out.print(subscriptionGroupJsonStr + "\n");
+ return;
}
System.out.print("Config type was not recognized, configType=" + configType + "\n");
} finally {
--
2.32.0.windows.2
From b4f73e2aabc1b141cec98431899e4090340adf0f Mon Sep 17 00:00:00 2001
From: mxsm <ljbmxsm@gmail.com>
Date: Sun, 27 Aug 2023 20:58:58 +0800
Subject: [PATCH 3/6] [ISSUE #7271] Optimize the configuration for setting the
quantity of TimerDequeuePutMessageService (#7272)
---
.../java/org/apache/rocketmq/store/timer/TimerMessageStore.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 690f4863e..181f7087a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -222,7 +222,7 @@ public class TimerMessageStore {
dequeueGetMessageServices[i] = new TimerDequeueGetMessageService();
}
- int putThreadNum = Math.max(storeConfig.getTimerGetMessageThreadNum(), 1);
+ int putThreadNum = Math.max(storeConfig.getTimerPutMessageThreadNum(), 1);
dequeuePutMessageServices = new TimerDequeuePutMessageService[putThreadNum];
for (int i = 0; i < dequeuePutMessageServices.length; i++) {
dequeuePutMessageServices[i] = new TimerDequeuePutMessageService();
--
2.32.0.windows.2
From 3e100103af68588528bf32f3752a85e8023f46f8 Mon Sep 17 00:00:00 2001
From: Ziyi Tan <tanziyi0925@gmail.com>
Date: Tue, 29 Aug 2023 13:48:51 +0800
Subject: [PATCH 4/6] [ISSUE #7277] Enhance rocksDBConfigToJson to support
metadata counting (#7276)
---
.../common/config/AbstractRocksDBStorage.java | 4 +-
.../common/config/ConfigRocksDBStorage.java | 6 +
.../tools/command/MQAdminStartup.java | 4 +-
.../ExportMetadataInRocksDBCommand.java | 138 ++++++++++++++++++
.../metadata/RocksDBConfigToJsonCommand.java | 122 ----------------
...> ExportMetadataInRocksDBCommandTest.java} | 38 +++--
6 files changed, 173 insertions(+), 139 deletions(-)
create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
delete mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
rename tools/src/test/java/org/apache/rocketmq/tools/command/metadata/{KvConfigToJsonCommandTest.java => ExportMetadataInRocksDBCommandTest.java} (62%)
diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index e3673baad..a720a5be3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -385,8 +385,10 @@ public abstract class AbstractRocksDBStorage {
this.options.close();
}
//4. close db.
- if (db != null) {
+ if (db != null && !this.readOnly) {
this.db.syncWal();
+ }
+ if (db != null) {
this.db.closeE();
}
//5. help gc.
diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
index 9d05ed282..463bd8fed 100644
--- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
+++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
@@ -60,6 +60,12 @@ public class ConfigRocksDBStorage extends AbstractRocksDBStorage {
this.readOnly = false;
}
+ public ConfigRocksDBStorage(final String dbPath, boolean readOnly) {
+ super();
+ this.dbPath = dbPath;
+ this.readOnly = readOnly;
+ }
+
private void initOptions() {
this.options = createConfigDBOptions();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 324aa1856..788fa83c2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -80,7 +80,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand;
import org.apache.rocketmq.tools.command.message.SendMessageCommand;
-import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand;
+import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand;
import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand;
import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
@@ -212,7 +212,6 @@ public class MQAdminStartup {
initCommand(new ClusterListSubCommand());
initCommand(new TopicListSubCommand());
- initCommand(new RocksDBConfigToJsonCommand());
initCommand(new UpdateKvConfigCommand());
initCommand(new DeleteKvConfigCommand());
@@ -257,6 +256,7 @@ public class MQAdminStartup {
initCommand(new ExportMetadataCommand());
initCommand(new ExportConfigsCommand());
initCommand(new ExportMetricsCommand());
+ initCommand(new ExportMetadataInRocksDBCommand());
initCommand(new HAStatusSubCommand());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
new file mode 100644
index 000000000..2a7d3fba4
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.export;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
+import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.rocksdb.RocksIterator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+public class ExportMetadataInRocksDBCommand implements SubCommand {
+ private static final String TOPICS_JSON_CONFIG = "topics";
+ private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups";
+
+ @Override
+ public String commandName() {
+ return "exportMetadataInRocksDB";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "export RocksDB kv config (topics/subscriptionGroups)";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option pathOption = new Option("p", "path", true,
+ "Absolute path for the metadata directory");
+ pathOption.setRequired(true);
+ options.addOption(pathOption);
+
+ Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " +
+ "topics/subscriptionGroups");
+ configTypeOption.setRequired(true);
+ options.addOption(configTypeOption);
+
+ Option jsonEnableOption = new Option("j", "jsonEnable", true,
+ "Json format enable, Default: false");
+ jsonEnableOption.setRequired(false);
+ options.addOption(jsonEnableOption);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+ String path = commandLine.getOptionValue("path").trim();
+ if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) {
+ System.out.print("RocksDB path is invalid.\n");
+ return;
+ }
+
+ String configType = commandLine.getOptionValue("configType").trim().toLowerCase();
+
+ boolean jsonEnable = false;
+ if (commandLine.hasOption("jsonEnable")) {
+ jsonEnable = Boolean.parseBoolean(commandLine.getOptionValue("jsonEnable").trim());
+ }
+
+
+ ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /* readOnly */);
+ if (!kvStore.start()) {
+ System.out.print("RocksDB load error, path=" + path + "\n");
+ return;
+ }
+
+ try {
+ if (TOPICS_JSON_CONFIG.equalsIgnoreCase(configType) || SUBSCRIPTION_GROUP_JSON_CONFIG.equalsIgnoreCase(configType)) {
+ handleExportMetadata(kvStore, configType, jsonEnable);
+ } else {
+ System.out.printf("Invalid config type=%s, Options: topics,subscriptionGroups\n", configType);
+ }
+ } finally {
+ kvStore.shutdown();
+ }
+ }
+
+ private static void handleExportMetadata(ConfigRocksDBStorage kvStore, String configType, boolean jsonEnable) {
+ if (jsonEnable) {
+ final Map<String, JSONObject> jsonConfig = new HashMap<>();
+ final Map<String, JSONObject> configTable = new HashMap<>();
+ iterateKvStore(kvStore, (key, value) -> {
+ final String configKey = new String(key, DataConverter.charset);
+ final String configValue = new String(value, DataConverter.charset);
+ final JSONObject jsonObject = JSONObject.parseObject(configValue);
+ configTable.put(configKey, jsonObject);
+ }
+ );
+
+ jsonConfig.put(configType.equalsIgnoreCase(TOPICS_JSON_CONFIG) ? "topicConfigTable" : "subscriptionGroupTable",
+ (JSONObject) JSONObject.toJSON(configTable));
+ final String jsonConfigStr = JSONObject.toJSONString(jsonConfig, true);
+ System.out.print(jsonConfigStr + "\n");
+ } else {
+ AtomicLong count = new AtomicLong(0);
+ iterateKvStore(kvStore, (key, value) -> {
+ final String configKey = new String(key, DataConverter.charset);
+ final String configValue = new String(value, DataConverter.charset);
+ System.out.printf("%d, Key: %s, Value: %s%n", count.incrementAndGet(), configKey, configValue);
+ });
+ }
+ }
+
+ private static void iterateKvStore(ConfigRocksDBStorage kvStore, BiConsumer<byte[], byte[]> biConsumer) {
+ try (RocksIterator iterator = kvStore.iterator()) {
+ iterator.seekToFirst();
+ for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
+ biConsumer.accept(iterator.key(), iterator.value());
+ }
+ }
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
deleted file mode 100644
index 3fc63e4dd..000000000
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.tools.command.metadata;
-
-import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
-import org.apache.rocketmq.common.utils.DataConverter;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.rocketmq.tools.command.SubCommandException;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class RocksDBConfigToJsonCommand implements SubCommand {
- private static final String TOPICS_JSON_CONFIG = "topics";
- private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups";
-
- @Override
- public String commandName() {
- return "rocksDBConfigToJson";
- }
-
- @Override
- public String commandDesc() {
- return "Convert RocksDB kv config (topics/subscriptionGroups) to json";
- }
-
- @Override
- public Options buildCommandlineOptions(Options options) {
- Option pathOption = new Option("p", "path", true,
- "Absolute path for the metadata directory");
- pathOption.setRequired(true);
- options.addOption(pathOption);
-
- Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " +
- "topics/subscriptionGroups");
- configTypeOption.setRequired(true);
- options.addOption(configTypeOption);
-
- return options;
- }
-
- @Override
- public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
- String path = commandLine.getOptionValue("path").trim();
- if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) {
- System.out.print("Rocksdb path is invalid.\n");
- return;
- }
-
- String configType = commandLine.getOptionValue("configType").trim().toLowerCase();
-
- RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(60 * 60 * 1000L);
- try {
- if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) {
- // for topics.json
- final Map<String, JSONObject> topicsJsonConfig = new HashMap<>();
- final Map<String, JSONObject> topicConfigTable = new HashMap<>();
- boolean isLoad = kvConfigManager.load(path, (key, value) -> {
- final String topic = new String(key, DataConverter.charset);
- final String topicConfig = new String(value, DataConverter.charset);
- final JSONObject jsonObject = JSONObject.parseObject(topicConfig);
- topicConfigTable.put(topic, jsonObject);
- });
-
- if (!isLoad) {
- System.out.print("RocksDB load error, path=" + path);
- return;
- }
- topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable));
- final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true);
- System.out.print(topicsJsonStr + "\n");
- return;
- }
-
- if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) {
- // for subscriptionGroup.json
- final Map<String, JSONObject> subscriptionGroupJsonConfig = new HashMap<>();
- final Map<String, JSONObject> subscriptionGroupTable = new HashMap<>();
- boolean isLoad = kvConfigManager.load(path, (key, value) -> {
- final String subscriptionGroup = new String(key, DataConverter.charset);
- final String subscriptionGroupConfig = new String(value, DataConverter.charset);
- final JSONObject jsonObject = JSONObject.parseObject(subscriptionGroupConfig);
- subscriptionGroupTable.put(subscriptionGroup, jsonObject);
- });
-
- if (!isLoad) {
- System.out.print("RocksDB load error, path=" + path);
- return;
- }
- subscriptionGroupJsonConfig.put("subscriptionGroupTable",
- (JSONObject) JSONObject.toJSON(subscriptionGroupTable));
- final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true);
- System.out.print(subscriptionGroupJsonStr + "\n");
- return;
- }
- System.out.print("Config type was not recognized, configType=" + configType + "\n");
- } finally {
- kvConfigManager.stop();
- }
- }
-}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java
similarity index 62%
rename from tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java
rename to tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java
index b2f66c7b0..2b938c90f 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java
@@ -21,43 +21,53 @@ import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.command.SubCommandException;
+import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand;
import org.junit.Test;
import java.io.File;
import static org.assertj.core.api.Assertions.assertThat;
-public class KvConfigToJsonCommandTest {
+public class ExportMetadataInRocksDBCommandTest {
private static final String BASE_PATH = System.getProperty("user.home") + File.separator + "store/config/";
@Test
public void testExecute() throws SubCommandException {
{
- String[] cases = new String[]{"topics", "subscriptionGroups"};
- for (String c : cases) {
- RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand();
+ String[][] cases = new String[][] {
+ {"topics", "false"},
+ {"topics", "false1"},
+ {"topics", "true"},
+ {"subscriptionGroups", "false"},
+ {"subscriptionGroups", "false2"},
+ {"subscriptionGroups", "true"}
+ };
+
+ for (String[] c : cases) {
+ ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[]{"-p " + BASE_PATH + c, "-t " + c};
+ String[] subargs = new String[] {"-p " + BASE_PATH + c[0], "-t " + c[0], "-j " + c[1]};
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs,
- cmd.buildCommandlineOptions(options), new DefaultParser());
+ cmd.buildCommandlineOptions(options), new DefaultParser());
cmd.execute(commandLine, options, null);
- assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c);
- assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c);
+ assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c[0]);
+ assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c[0]);
+ assertThat(commandLine.getOptionValue("j").trim()).isEqualTo(c[1]);
}
}
// invalid cases
{
- String[][] cases = new String[][]{
- {"-p " + BASE_PATH + "tmpPath", "-t topics"},
- {"-p ", "-t topics"},
- {"-p " + BASE_PATH + "topics", "-t invalid_type"}
+ String[][] cases = new String[][] {
+ {"-p " + BASE_PATH + "tmpPath", "-t topics", "-j true"},
+ {"-p ", "-t topics", "-j true"},
+ {"-p " + BASE_PATH + "topics", "-t invalid_type", "-j true"}
};
for (String[] c : cases) {
- RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand();
+ ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), c,
- cmd.buildCommandlineOptions(options), new DefaultParser());
+ cmd.buildCommandlineOptions(options), new DefaultParser());
cmd.execute(commandLine, options, null);
}
}
--
2.32.0.windows.2
From fa549154370cb866a90e37c13a90d2c598d6b1f6 Mon Sep 17 00:00:00 2001
From: yuz10 <845238369@qq.com>
Date: Tue, 29 Aug 2023 15:22:09 +0800
Subject: [PATCH 5/6] [ISSUE #7261] Slave high CPU usage when
enableScheduleAsyncDeliver=true (#7262)
* [ISSUE #6390] Add break to the exception of WHEEL_TIMER_NOT_ENABLE.
* fix broker start fail if mapped file size is 0
* log
* only delete the last empty file
* change dataReadAheadEnable default to true
* fix endless loop when master change to slave.
---
.../rocketmq/broker/schedule/ScheduleMessageService.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index aed0ee19f..297b14207 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -566,7 +566,8 @@ public class ScheduleMessageService extends ConfigManager {
pendingQueue.remove();
break;
case RUNNING:
- break;
+ scheduleNextTask();
+ return;
case EXCEPTION:
if (!isStarted()) {
log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString());
@@ -586,6 +587,10 @@ public class ScheduleMessageService extends ConfigManager {
}
}
+ scheduleNextTask();
+ }
+
+ private void scheduleNextTask() {
if (isStarted()) {
ScheduleMessageService.this.handleExecutorService
.schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);
--
2.32.0.windows.2
From 9f34f55e1dac495730c9cd5469f2ab3225b8f0b9 Mon Sep 17 00:00:00 2001
From: ShuangxiDing <dingshuangxi888@gmail.com>
Date: Tue, 29 Aug 2023 15:48:46 +0800
Subject: [PATCH 6/6] [ISSUE #7226] Filter tlvs in ppv2 which contents not are
spec-compliant ASCII characters and space (#7227)
Filter tlvs in ppv2 which not are spec-compliant ASCII characters and space
---
.../rocketmq/common/utils/BinaryUtil.java | 17 +++++++++++++++++
.../grpc/ProxyAndTlsProtocolNegotiator.java | 8 +++++++-
.../remoting/netty/NettyRemotingServer.java | 8 +++++++-
3 files changed, 31 insertions(+), 2 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java
index 421adaca4..7b4b24819 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java
@@ -43,4 +43,21 @@ public class BinaryUtil {
byte[] bytes = calculateMd5(content);
return Hex.encodeHexString(bytes, false);
}
+
+ /**
+ * Returns true if subject contains only bytes that are spec-compliant ASCII characters.
+ * @param subject
+ * @return
+ */
+ public static boolean isAscii(byte[] subject) {
+ if (subject == null) {
+ return false;
+ }
+ for (byte b : subject) {
+ if ((b & 0x80) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
}
\ No newline at end of file
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
index ee167bd7b..b584ddfbd 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
@@ -24,6 +24,7 @@ import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
import io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiationEvent;
import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
+import io.grpc.netty.shaded.io.netty.buffer.ByteBufUtil;
import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
import io.grpc.netty.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
@@ -44,6 +45,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.BinaryUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
@@ -191,9 +193,13 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator
}
if (CollectionUtils.isNotEmpty(msg.tlvs())) {
msg.tlvs().forEach(tlv -> {
+ byte[] valueBytes = ByteBufUtil.getBytes(tlv.content());
+ if (!BinaryUtil.isAscii(valueBytes)) {
+ return;
+ }
Attributes.Key<String> key = AttributeKeys.valueOf(
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+ String value = StringUtils.trim(new String(valueBytes, CharsetUtil.UTF_8));
builder.set(key, value);
});
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 17f138f86..e626260c9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.remoting.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
@@ -58,6 +59,7 @@ import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.BinaryUtil;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -787,9 +789,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
if (CollectionUtils.isNotEmpty(msg.tlvs())) {
msg.tlvs().forEach(tlv -> {
+ byte[] valueBytes = ByteBufUtil.getBytes(tlv.content());
+ if (!BinaryUtil.isAscii(valueBytes)) {
+ return;
+ }
AttributeKey<String> key = AttributeKeys.valueOf(
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+ String value = StringUtils.trim(new String(valueBytes, CharsetUtil.UTF_8));
channel.attr(key).set(value);
});
}
--
2.32.0.windows.2