From 7e018520ef707a841c66c55d621f6560d03b631b Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan Date: Fri, 25 Aug 2023 09:49:22 +0800 Subject: [PATCH 1/6] Add expireAfterAccess for cache (#7247) Add expireAfterAccess for cache --- .../rocketmq/proxy/config/ProxyConfig.java | 59 ++++++++++++++----- .../metadata/ClusterMetadataService.java | 6 +- .../service/route/TopicRouteService.java | 14 +++-- 3 files changed, 56 insertions(+), 23 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index 76a243919..2994893d7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -155,14 +155,17 @@ public class ProxyConfig implements ConfigFile { private int consumerProcessorThreadPoolQueueCapacity = 10000; private boolean useEndpointPortFromRequest = false; - private int topicRouteServiceCacheExpiredInSeconds = 20; + + private int topicRouteServiceCacheExpiredSeconds = 300; + private int topicRouteServiceCacheRefreshSeconds = 20; private int topicRouteServiceCacheMaxNum = 20000; private int topicRouteServiceThreadPoolNums = PROCESSOR_NUMBER; private int topicRouteServiceThreadPoolQueueCapacity = 5000; - - private int topicConfigCacheExpiredInSeconds = 20; + private int topicConfigCacheExpiredSeconds = 300; + private int topicConfigCacheRefreshSeconds = 20; private int topicConfigCacheMaxNum = 20000; - private int subscriptionGroupConfigCacheExpiredInSeconds = 20; + private int subscriptionGroupConfigCacheExpiredSeconds = 300; + private int subscriptionGroupConfigCacheRefreshSeconds = 20; private int subscriptionGroupConfigCacheMaxNum = 20000; private int metadataThreadPoolNums = 3; private int metadataThreadPoolQueueCapacity = 100000; @@ -794,12 +797,20 @@ public class ProxyConfig implements ConfigFile { this.consumerProcessorThreadPoolQueueCapacity = consumerProcessorThreadPoolQueueCapacity; } - public int getTopicRouteServiceCacheExpiredInSeconds() { - return topicRouteServiceCacheExpiredInSeconds; + public int getTopicRouteServiceCacheExpiredSeconds() { + return topicRouteServiceCacheExpiredSeconds; + } + + public void setTopicRouteServiceCacheExpiredSeconds(int topicRouteServiceCacheExpiredSeconds) { + this.topicRouteServiceCacheExpiredSeconds = topicRouteServiceCacheExpiredSeconds; } - public void setTopicRouteServiceCacheExpiredInSeconds(int topicRouteServiceCacheExpiredInSeconds) { - this.topicRouteServiceCacheExpiredInSeconds = topicRouteServiceCacheExpiredInSeconds; + public int getTopicRouteServiceCacheRefreshSeconds() { + return topicRouteServiceCacheRefreshSeconds; + } + + public void setTopicRouteServiceCacheRefreshSeconds(int topicRouteServiceCacheRefreshSeconds) { + this.topicRouteServiceCacheRefreshSeconds = topicRouteServiceCacheRefreshSeconds; } public int getTopicRouteServiceCacheMaxNum() { @@ -826,12 +837,20 @@ public class ProxyConfig implements ConfigFile { this.topicRouteServiceThreadPoolQueueCapacity = topicRouteServiceThreadPoolQueueCapacity; } - public int getTopicConfigCacheExpiredInSeconds() { - return topicConfigCacheExpiredInSeconds; + public int getTopicConfigCacheRefreshSeconds() { + return topicConfigCacheRefreshSeconds; + } + + public void setTopicConfigCacheRefreshSeconds(int topicConfigCacheRefreshSeconds) { + this.topicConfigCacheRefreshSeconds = topicConfigCacheRefreshSeconds; + } + + public int getTopicConfigCacheExpiredSeconds() { + return topicConfigCacheExpiredSeconds; } - public void setTopicConfigCacheExpiredInSeconds(int topicConfigCacheExpiredInSeconds) { - this.topicConfigCacheExpiredInSeconds = topicConfigCacheExpiredInSeconds; + public void setTopicConfigCacheExpiredSeconds(int topicConfigCacheExpiredSeconds) { + this.topicConfigCacheExpiredSeconds = topicConfigCacheExpiredSeconds; } public int getTopicConfigCacheMaxNum() { @@ -842,12 +861,20 @@ public class ProxyConfig implements ConfigFile { this.topicConfigCacheMaxNum = topicConfigCacheMaxNum; } - public int getSubscriptionGroupConfigCacheExpiredInSeconds() { - return subscriptionGroupConfigCacheExpiredInSeconds; + public int getSubscriptionGroupConfigCacheRefreshSeconds() { + return subscriptionGroupConfigCacheRefreshSeconds; + } + + public void setSubscriptionGroupConfigCacheRefreshSeconds(int subscriptionGroupConfigCacheRefreshSeconds) { + this.subscriptionGroupConfigCacheRefreshSeconds = subscriptionGroupConfigCacheRefreshSeconds; + } + + public int getSubscriptionGroupConfigCacheExpiredSeconds() { + return subscriptionGroupConfigCacheExpiredSeconds; } - public void setSubscriptionGroupConfigCacheExpiredInSeconds(int subscriptionGroupConfigCacheExpiredInSeconds) { - this.subscriptionGroupConfigCacheExpiredInSeconds = subscriptionGroupConfigCacheExpiredInSeconds; + public void setSubscriptionGroupConfigCacheExpiredSeconds(int subscriptionGroupConfigCacheExpiredSeconds) { + this.subscriptionGroupConfigCacheExpiredSeconds = subscriptionGroupConfigCacheExpiredSeconds; } public int getSubscriptionGroupConfigCacheMaxNum() { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java index bc9582ad8..d34a0efd9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java @@ -69,11 +69,13 @@ public class ClusterMetadataService extends AbstractStartAndShutdown implements ); this.topicConfigCache = CacheBuilder.newBuilder() .maximumSize(config.getTopicConfigCacheMaxNum()) - .refreshAfterWrite(config.getTopicConfigCacheExpiredInSeconds(), TimeUnit.SECONDS) + .expireAfterAccess(config.getTopicConfigCacheExpiredSeconds(), TimeUnit.SECONDS) + .refreshAfterWrite(config.getTopicConfigCacheRefreshSeconds(), TimeUnit.SECONDS) .build(new ClusterTopicConfigCacheLoader()); this.subscriptionGroupConfigCache = CacheBuilder.newBuilder() .maximumSize(config.getSubscriptionGroupConfigCacheMaxNum()) - .refreshAfterWrite(config.getSubscriptionGroupConfigCacheExpiredInSeconds(), TimeUnit.SECONDS) + .expireAfterAccess(config.getSubscriptionGroupConfigCacheExpiredSeconds(), TimeUnit.SECONDS) + .refreshAfterWrite(config.getSubscriptionGroupConfigCacheRefreshSeconds(), TimeUnit.SECONDS) .build(new ClusterSubscriptionGroupConfigCacheLoader()); this.init(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index e012a5465..84348adc3 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -68,10 +68,13 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { ); this.mqClientAPIFactory = mqClientAPIFactory; - this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()). - refreshAfterWrite(config.getTopicRouteServiceCacheExpiredInSeconds(), TimeUnit.SECONDS). - executor(cacheRefreshExecutor).build(new CacheLoader() { - @Override public @Nullable MessageQueueView load(String topic) throws Exception { + this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()) + .expireAfterAccess(config.getTopicRouteServiceCacheExpiredSeconds(), TimeUnit.SECONDS) + .refreshAfterWrite(config.getTopicRouteServiceCacheRefreshSeconds(), TimeUnit.SECONDS) + .executor(cacheRefreshExecutor) + .build(new CacheLoader() { + @Override + public @Nullable MessageQueueView load(String topic) throws Exception { try { TopicRouteData topicRouteData = mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic, Duration.ofSeconds(3).toMillis()); return buildMessageQueueView(topic, topicRouteData); @@ -83,7 +86,8 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { } } - @Override public @Nullable MessageQueueView reload(@NonNull String key, + @Override + public @Nullable MessageQueueView reload(@NonNull String key, @NonNull MessageQueueView oldValue) throws Exception { try { return load(key); -- 2.32.0.windows.2 From 5f6dc90f9dab35809fcb0407d4d5cc2737d2335e Mon Sep 17 00:00:00 2001 From: Ziyi Tan Date: Fri, 25 Aug 2023 11:17:23 +0800 Subject: [PATCH 2/6] [ISSUE #7250] Beautify command rocksDBConfigToJson output Co-authored-by: Ziy1-Tan --- .../metadata/RocksDBConfigToJsonCommand.java | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java index 3053f4684..3fc63e4dd 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java @@ -21,13 +21,13 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.config.RocksDBConfigManager; import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; -import java.io.File; import java.util.HashMap; import java.util.Map; @@ -48,7 +48,7 @@ public class RocksDBConfigToJsonCommand implements SubCommand { @Override public Options buildCommandlineOptions(Options options) { Option pathOption = new Option("p", "path", true, - "Absolute path to the metadata directory"); + "Absolute path for the metadata directory"); pathOption.setRequired(true); options.addOption(pathOption); @@ -63,15 +63,14 @@ public class RocksDBConfigToJsonCommand implements SubCommand { @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { String path = commandLine.getOptionValue("path").trim(); - if (StringUtils.isEmpty(path) || !new File(path).exists()) { + if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) { System.out.print("Rocksdb path is invalid.\n"); return; } String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); - final long memTableFlushInterval = 60 * 60 * 1000L; - RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(memTableFlushInterval); + RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(60 * 60 * 1000L); try { if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) { // for topics.json @@ -84,13 +83,16 @@ public class RocksDBConfigToJsonCommand implements SubCommand { topicConfigTable.put(topic, jsonObject); }); - if (isLoad) { - topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable)); - final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true); - System.out.print(topicsJsonStr + "\n"); + if (!isLoad) { + System.out.print("RocksDB load error, path=" + path); return; } + topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable)); + final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true); + System.out.print(topicsJsonStr + "\n"); + return; } + if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) { // for subscriptionGroup.json final Map subscriptionGroupJsonConfig = new HashMap<>(); @@ -102,13 +104,15 @@ public class RocksDBConfigToJsonCommand implements SubCommand { subscriptionGroupTable.put(subscriptionGroup, jsonObject); }); - if (isLoad) { - subscriptionGroupJsonConfig.put("subscriptionGroupTable", - (JSONObject) JSONObject.toJSON(subscriptionGroupTable)); - final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true); - System.out.print(subscriptionGroupJsonStr + "\n"); + if (!isLoad) { + System.out.print("RocksDB load error, path=" + path); return; } + subscriptionGroupJsonConfig.put("subscriptionGroupTable", + (JSONObject) JSONObject.toJSON(subscriptionGroupTable)); + final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true); + System.out.print(subscriptionGroupJsonStr + "\n"); + return; } System.out.print("Config type was not recognized, configType=" + configType + "\n"); } finally { -- 2.32.0.windows.2 From b4f73e2aabc1b141cec98431899e4090340adf0f Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 27 Aug 2023 20:58:58 +0800 Subject: [PATCH 3/6] [ISSUE #7271] Optimize the configuration for setting the quantity of TimerDequeuePutMessageService (#7272) --- .../java/org/apache/rocketmq/store/timer/TimerMessageStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 690f4863e..181f7087a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -222,7 +222,7 @@ public class TimerMessageStore { dequeueGetMessageServices[i] = new TimerDequeueGetMessageService(); } - int putThreadNum = Math.max(storeConfig.getTimerGetMessageThreadNum(), 1); + int putThreadNum = Math.max(storeConfig.getTimerPutMessageThreadNum(), 1); dequeuePutMessageServices = new TimerDequeuePutMessageService[putThreadNum]; for (int i = 0; i < dequeuePutMessageServices.length; i++) { dequeuePutMessageServices[i] = new TimerDequeuePutMessageService(); -- 2.32.0.windows.2 From 3e100103af68588528bf32f3752a85e8023f46f8 Mon Sep 17 00:00:00 2001 From: Ziyi Tan Date: Tue, 29 Aug 2023 13:48:51 +0800 Subject: [PATCH 4/6] [ISSUE #7277] Enhance rocksDBConfigToJson to support metadata counting (#7276) --- .../common/config/AbstractRocksDBStorage.java | 4 +- .../common/config/ConfigRocksDBStorage.java | 6 + .../tools/command/MQAdminStartup.java | 4 +- .../ExportMetadataInRocksDBCommand.java | 138 ++++++++++++++++++ .../metadata/RocksDBConfigToJsonCommand.java | 122 ---------------- ...> ExportMetadataInRocksDBCommandTest.java} | 38 +++-- 6 files changed, 173 insertions(+), 139 deletions(-) create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java delete mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java rename tools/src/test/java/org/apache/rocketmq/tools/command/metadata/{KvConfigToJsonCommandTest.java => ExportMetadataInRocksDBCommandTest.java} (62%) diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index e3673baad..a720a5be3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -385,8 +385,10 @@ public abstract class AbstractRocksDBStorage { this.options.close(); } //4. close db. - if (db != null) { + if (db != null && !this.readOnly) { this.db.syncWal(); + } + if (db != null) { this.db.closeE(); } //5. help gc. diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java index 9d05ed282..463bd8fed 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java @@ -60,6 +60,12 @@ public class ConfigRocksDBStorage extends AbstractRocksDBStorage { this.readOnly = false; } + public ConfigRocksDBStorage(final String dbPath, boolean readOnly) { + super(); + this.dbPath = dbPath; + this.readOnly = readOnly; + } + private void initOptions() { this.options = createConfigDBOptions(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 324aa1856..788fa83c2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -80,7 +80,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand; import org.apache.rocketmq.tools.command.message.SendMessageCommand; -import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand; +import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand; import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand; import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; @@ -212,7 +212,6 @@ public class MQAdminStartup { initCommand(new ClusterListSubCommand()); initCommand(new TopicListSubCommand()); - initCommand(new RocksDBConfigToJsonCommand()); initCommand(new UpdateKvConfigCommand()); initCommand(new DeleteKvConfigCommand()); @@ -257,6 +256,7 @@ public class MQAdminStartup { initCommand(new ExportMetadataCommand()); initCommand(new ExportConfigsCommand()); initCommand(new ExportMetricsCommand()); + initCommand(new ExportMetadataInRocksDBCommand()); initCommand(new HAStatusSubCommand()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java new file mode 100644 index 000000000..2a7d3fba4 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.export; + +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.config.ConfigRocksDBStorage; +import org.apache.rocketmq.common.utils.DataConverter; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.rocksdb.RocksIterator; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +public class ExportMetadataInRocksDBCommand implements SubCommand { + private static final String TOPICS_JSON_CONFIG = "topics"; + private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups"; + + @Override + public String commandName() { + return "exportMetadataInRocksDB"; + } + + @Override + public String commandDesc() { + return "export RocksDB kv config (topics/subscriptionGroups)"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option pathOption = new Option("p", "path", true, + "Absolute path for the metadata directory"); + pathOption.setRequired(true); + options.addOption(pathOption); + + Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " + + "topics/subscriptionGroups"); + configTypeOption.setRequired(true); + options.addOption(configTypeOption); + + Option jsonEnableOption = new Option("j", "jsonEnable", true, + "Json format enable, Default: false"); + jsonEnableOption.setRequired(false); + options.addOption(jsonEnableOption); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + String path = commandLine.getOptionValue("path").trim(); + if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) { + System.out.print("RocksDB path is invalid.\n"); + return; + } + + String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); + + boolean jsonEnable = false; + if (commandLine.hasOption("jsonEnable")) { + jsonEnable = Boolean.parseBoolean(commandLine.getOptionValue("jsonEnable").trim()); + } + + + ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /* readOnly */); + if (!kvStore.start()) { + System.out.print("RocksDB load error, path=" + path + "\n"); + return; + } + + try { + if (TOPICS_JSON_CONFIG.equalsIgnoreCase(configType) || SUBSCRIPTION_GROUP_JSON_CONFIG.equalsIgnoreCase(configType)) { + handleExportMetadata(kvStore, configType, jsonEnable); + } else { + System.out.printf("Invalid config type=%s, Options: topics,subscriptionGroups\n", configType); + } + } finally { + kvStore.shutdown(); + } + } + + private static void handleExportMetadata(ConfigRocksDBStorage kvStore, String configType, boolean jsonEnable) { + if (jsonEnable) { + final Map jsonConfig = new HashMap<>(); + final Map configTable = new HashMap<>(); + iterateKvStore(kvStore, (key, value) -> { + final String configKey = new String(key, DataConverter.charset); + final String configValue = new String(value, DataConverter.charset); + final JSONObject jsonObject = JSONObject.parseObject(configValue); + configTable.put(configKey, jsonObject); + } + ); + + jsonConfig.put(configType.equalsIgnoreCase(TOPICS_JSON_CONFIG) ? "topicConfigTable" : "subscriptionGroupTable", + (JSONObject) JSONObject.toJSON(configTable)); + final String jsonConfigStr = JSONObject.toJSONString(jsonConfig, true); + System.out.print(jsonConfigStr + "\n"); + } else { + AtomicLong count = new AtomicLong(0); + iterateKvStore(kvStore, (key, value) -> { + final String configKey = new String(key, DataConverter.charset); + final String configValue = new String(value, DataConverter.charset); + System.out.printf("%d, Key: %s, Value: %s%n", count.incrementAndGet(), configKey, configValue); + }); + } + } + + private static void iterateKvStore(ConfigRocksDBStorage kvStore, BiConsumer biConsumer) { + try (RocksIterator iterator = kvStore.iterator()) { + iterator.seekToFirst(); + for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) { + biConsumer.accept(iterator.key(), iterator.value()); + } + } + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java deleted file mode 100644 index 3fc63e4dd..000000000 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.tools.command.metadata; - -import com.alibaba.fastjson.JSONObject; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.config.RocksDBConfigManager; -import org.apache.rocketmq.common.utils.DataConverter; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.tools.command.SubCommand; -import org.apache.rocketmq.tools.command.SubCommandException; - -import java.util.HashMap; -import java.util.Map; - -public class RocksDBConfigToJsonCommand implements SubCommand { - private static final String TOPICS_JSON_CONFIG = "topics"; - private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups"; - - @Override - public String commandName() { - return "rocksDBConfigToJson"; - } - - @Override - public String commandDesc() { - return "Convert RocksDB kv config (topics/subscriptionGroups) to json"; - } - - @Override - public Options buildCommandlineOptions(Options options) { - Option pathOption = new Option("p", "path", true, - "Absolute path for the metadata directory"); - pathOption.setRequired(true); - options.addOption(pathOption); - - Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " + - "topics/subscriptionGroups"); - configTypeOption.setRequired(true); - options.addOption(configTypeOption); - - return options; - } - - @Override - public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { - String path = commandLine.getOptionValue("path").trim(); - if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) { - System.out.print("Rocksdb path is invalid.\n"); - return; - } - - String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); - - RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(60 * 60 * 1000L); - try { - if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) { - // for topics.json - final Map topicsJsonConfig = new HashMap<>(); - final Map topicConfigTable = new HashMap<>(); - boolean isLoad = kvConfigManager.load(path, (key, value) -> { - final String topic = new String(key, DataConverter.charset); - final String topicConfig = new String(value, DataConverter.charset); - final JSONObject jsonObject = JSONObject.parseObject(topicConfig); - topicConfigTable.put(topic, jsonObject); - }); - - if (!isLoad) { - System.out.print("RocksDB load error, path=" + path); - return; - } - topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable)); - final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true); - System.out.print(topicsJsonStr + "\n"); - return; - } - - if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) { - // for subscriptionGroup.json - final Map subscriptionGroupJsonConfig = new HashMap<>(); - final Map subscriptionGroupTable = new HashMap<>(); - boolean isLoad = kvConfigManager.load(path, (key, value) -> { - final String subscriptionGroup = new String(key, DataConverter.charset); - final String subscriptionGroupConfig = new String(value, DataConverter.charset); - final JSONObject jsonObject = JSONObject.parseObject(subscriptionGroupConfig); - subscriptionGroupTable.put(subscriptionGroup, jsonObject); - }); - - if (!isLoad) { - System.out.print("RocksDB load error, path=" + path); - return; - } - subscriptionGroupJsonConfig.put("subscriptionGroupTable", - (JSONObject) JSONObject.toJSON(subscriptionGroupTable)); - final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true); - System.out.print(subscriptionGroupJsonStr + "\n"); - return; - } - System.out.print("Config type was not recognized, configType=" + configType + "\n"); - } finally { - kvConfigManager.stop(); - } - } -} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java similarity index 62% rename from tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java rename to tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java index b2f66c7b0..2b938c90f 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java @@ -21,43 +21,53 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.command.SubCommandException; +import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand; import org.junit.Test; import java.io.File; import static org.assertj.core.api.Assertions.assertThat; -public class KvConfigToJsonCommandTest { +public class ExportMetadataInRocksDBCommandTest { private static final String BASE_PATH = System.getProperty("user.home") + File.separator + "store/config/"; @Test public void testExecute() throws SubCommandException { { - String[] cases = new String[]{"topics", "subscriptionGroups"}; - for (String c : cases) { - RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand(); + String[][] cases = new String[][] { + {"topics", "false"}, + {"topics", "false1"}, + {"topics", "true"}, + {"subscriptionGroups", "false"}, + {"subscriptionGroups", "false2"}, + {"subscriptionGroups", "true"} + }; + + for (String[] c : cases) { + ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand(); Options options = ServerUtil.buildCommandlineOptions(new Options()); - String[] subargs = new String[]{"-p " + BASE_PATH + c, "-t " + c}; + String[] subargs = new String[] {"-p " + BASE_PATH + c[0], "-t " + c[0], "-j " + c[1]}; final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, - cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.buildCommandlineOptions(options), new DefaultParser()); cmd.execute(commandLine, options, null); - assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c); - assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c); + assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c[0]); + assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c[0]); + assertThat(commandLine.getOptionValue("j").trim()).isEqualTo(c[1]); } } // invalid cases { - String[][] cases = new String[][]{ - {"-p " + BASE_PATH + "tmpPath", "-t topics"}, - {"-p ", "-t topics"}, - {"-p " + BASE_PATH + "topics", "-t invalid_type"} + String[][] cases = new String[][] { + {"-p " + BASE_PATH + "tmpPath", "-t topics", "-j true"}, + {"-p ", "-t topics", "-j true"}, + {"-p " + BASE_PATH + "topics", "-t invalid_type", "-j true"} }; for (String[] c : cases) { - RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand(); + ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand(); Options options = ServerUtil.buildCommandlineOptions(new Options()); final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), c, - cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.buildCommandlineOptions(options), new DefaultParser()); cmd.execute(commandLine, options, null); } } -- 2.32.0.windows.2 From fa549154370cb866a90e37c13a90d2c598d6b1f6 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Tue, 29 Aug 2023 15:22:09 +0800 Subject: [PATCH 5/6] [ISSUE #7261] Slave high CPU usage when enableScheduleAsyncDeliver=true (#7262) * [ISSUE #6390] Add break to the exception of WHEEL_TIMER_NOT_ENABLE. * fix broker start fail if mapped file size is 0 * log * only delete the last empty file * change dataReadAheadEnable default to true * fix endless loop when master change to slave. --- .../rocketmq/broker/schedule/ScheduleMessageService.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index aed0ee19f..297b14207 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -566,7 +566,8 @@ public class ScheduleMessageService extends ConfigManager { pendingQueue.remove(); break; case RUNNING: - break; + scheduleNextTask(); + return; case EXCEPTION: if (!isStarted()) { log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString()); @@ -586,6 +587,10 @@ public class ScheduleMessageService extends ConfigManager { } } + scheduleNextTask(); + } + + private void scheduleNextTask() { if (isStarted()) { ScheduleMessageService.this.handleExecutorService .schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS); -- 2.32.0.windows.2 From 9f34f55e1dac495730c9cd5469f2ab3225b8f0b9 Mon Sep 17 00:00:00 2001 From: ShuangxiDing Date: Tue, 29 Aug 2023 15:48:46 +0800 Subject: [PATCH 6/6] [ISSUE #7226] Filter tlvs in ppv2 which contents not are spec-compliant ASCII characters and space (#7227) Filter tlvs in ppv2 which not are spec-compliant ASCII characters and space --- .../rocketmq/common/utils/BinaryUtil.java | 17 +++++++++++++++++ .../grpc/ProxyAndTlsProtocolNegotiator.java | 8 +++++++- .../remoting/netty/NettyRemotingServer.java | 8 +++++++- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java index 421adaca4..7b4b24819 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java @@ -43,4 +43,21 @@ public class BinaryUtil { byte[] bytes = calculateMd5(content); return Hex.encodeHexString(bytes, false); } + + /** + * Returns true if subject contains only bytes that are spec-compliant ASCII characters. + * @param subject + * @return + */ + public static boolean isAscii(byte[] subject) { + if (subject == null) { + return false; + } + for (byte b : subject) { + if ((b & 0x80) != 0) { + return false; + } + } + return true; + } } \ No newline at end of file diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java index ee167bd7b..b584ddfbd 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java @@ -24,6 +24,7 @@ import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator; import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators; import io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiationEvent; import io.grpc.netty.shaded.io.netty.buffer.ByteBuf; +import io.grpc.netty.shaded.io.netty.buffer.ByteBufUtil; import io.grpc.netty.shaded.io.netty.channel.ChannelHandler; import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext; import io.grpc.netty.shaded.io.netty.channel.ChannelInboundHandlerAdapter; @@ -44,6 +45,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.constant.HAProxyConstants; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.BinaryUtil; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.config.ConfigurationManager; @@ -191,9 +193,13 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator } if (CollectionUtils.isNotEmpty(msg.tlvs())) { msg.tlvs().forEach(tlv -> { + byte[] valueBytes = ByteBufUtil.getBytes(tlv.content()); + if (!BinaryUtil.isAscii(valueBytes)) { + return; + } Attributes.Key key = AttributeKeys.valueOf( HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); - String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); + String value = StringUtils.trim(new String(valueBytes, CharsetUtil.UTF_8)); builder.set(key, value); }); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 17f138f86..e626260c9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.remoting.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; @@ -58,6 +59,7 @@ import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.HAProxyConstants; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.BinaryUtil; import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -787,9 +789,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } if (CollectionUtils.isNotEmpty(msg.tlvs())) { msg.tlvs().forEach(tlv -> { + byte[] valueBytes = ByteBufUtil.getBytes(tlv.content()); + if (!BinaryUtil.isAscii(valueBytes)) { + return; + } AttributeKey key = AttributeKeys.valueOf( HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); - String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); + String value = StringUtils.trim(new String(valueBytes, CharsetUtil.UTF_8)); channel.attr(key).set(value); }); } -- 2.32.0.windows.2