backport enhance admin output
This commit is contained in:
parent
e3f42198a2
commit
69115550b1
892
patch013-backport-enhance-admin-output.patch
Normal file
892
patch013-backport-enhance-admin-output.patch
Normal file
@ -0,0 +1,892 @@
|
||||
From 7e018520ef707a841c66c55d621f6560d03b631b Mon Sep 17 00:00:00 2001
|
||||
From: Zhouxiang Zhan <zhouxzhan@apache.org>
|
||||
Date: Fri, 25 Aug 2023 09:49:22 +0800
|
||||
Subject: [PATCH 1/6] Add expireAfterAccess for cache (#7247)
|
||||
|
||||
Add expireAfterAccess for cache
|
||||
---
|
||||
.../rocketmq/proxy/config/ProxyConfig.java | 59 ++++++++++++++-----
|
||||
.../metadata/ClusterMetadataService.java | 6 +-
|
||||
.../service/route/TopicRouteService.java | 14 +++--
|
||||
3 files changed, 56 insertions(+), 23 deletions(-)
|
||||
|
||||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
|
||||
index 76a243919..2994893d7 100644
|
||||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
|
||||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
|
||||
@@ -155,14 +155,17 @@ public class ProxyConfig implements ConfigFile {
|
||||
private int consumerProcessorThreadPoolQueueCapacity = 10000;
|
||||
|
||||
private boolean useEndpointPortFromRequest = false;
|
||||
- private int topicRouteServiceCacheExpiredInSeconds = 20;
|
||||
+
|
||||
+ private int topicRouteServiceCacheExpiredSeconds = 300;
|
||||
+ private int topicRouteServiceCacheRefreshSeconds = 20;
|
||||
private int topicRouteServiceCacheMaxNum = 20000;
|
||||
private int topicRouteServiceThreadPoolNums = PROCESSOR_NUMBER;
|
||||
private int topicRouteServiceThreadPoolQueueCapacity = 5000;
|
||||
-
|
||||
- private int topicConfigCacheExpiredInSeconds = 20;
|
||||
+ private int topicConfigCacheExpiredSeconds = 300;
|
||||
+ private int topicConfigCacheRefreshSeconds = 20;
|
||||
private int topicConfigCacheMaxNum = 20000;
|
||||
- private int subscriptionGroupConfigCacheExpiredInSeconds = 20;
|
||||
+ private int subscriptionGroupConfigCacheExpiredSeconds = 300;
|
||||
+ private int subscriptionGroupConfigCacheRefreshSeconds = 20;
|
||||
private int subscriptionGroupConfigCacheMaxNum = 20000;
|
||||
private int metadataThreadPoolNums = 3;
|
||||
private int metadataThreadPoolQueueCapacity = 100000;
|
||||
@@ -794,12 +797,20 @@ public class ProxyConfig implements ConfigFile {
|
||||
this.consumerProcessorThreadPoolQueueCapacity = consumerProcessorThreadPoolQueueCapacity;
|
||||
}
|
||||
|
||||
- public int getTopicRouteServiceCacheExpiredInSeconds() {
|
||||
- return topicRouteServiceCacheExpiredInSeconds;
|
||||
+ public int getTopicRouteServiceCacheExpiredSeconds() {
|
||||
+ return topicRouteServiceCacheExpiredSeconds;
|
||||
+ }
|
||||
+
|
||||
+ public void setTopicRouteServiceCacheExpiredSeconds(int topicRouteServiceCacheExpiredSeconds) {
|
||||
+ this.topicRouteServiceCacheExpiredSeconds = topicRouteServiceCacheExpiredSeconds;
|
||||
}
|
||||
|
||||
- public void setTopicRouteServiceCacheExpiredInSeconds(int topicRouteServiceCacheExpiredInSeconds) {
|
||||
- this.topicRouteServiceCacheExpiredInSeconds = topicRouteServiceCacheExpiredInSeconds;
|
||||
+ public int getTopicRouteServiceCacheRefreshSeconds() {
|
||||
+ return topicRouteServiceCacheRefreshSeconds;
|
||||
+ }
|
||||
+
|
||||
+ public void setTopicRouteServiceCacheRefreshSeconds(int topicRouteServiceCacheRefreshSeconds) {
|
||||
+ this.topicRouteServiceCacheRefreshSeconds = topicRouteServiceCacheRefreshSeconds;
|
||||
}
|
||||
|
||||
public int getTopicRouteServiceCacheMaxNum() {
|
||||
@@ -826,12 +837,20 @@ public class ProxyConfig implements ConfigFile {
|
||||
this.topicRouteServiceThreadPoolQueueCapacity = topicRouteServiceThreadPoolQueueCapacity;
|
||||
}
|
||||
|
||||
- public int getTopicConfigCacheExpiredInSeconds() {
|
||||
- return topicConfigCacheExpiredInSeconds;
|
||||
+ public int getTopicConfigCacheRefreshSeconds() {
|
||||
+ return topicConfigCacheRefreshSeconds;
|
||||
+ }
|
||||
+
|
||||
+ public void setTopicConfigCacheRefreshSeconds(int topicConfigCacheRefreshSeconds) {
|
||||
+ this.topicConfigCacheRefreshSeconds = topicConfigCacheRefreshSeconds;
|
||||
+ }
|
||||
+
|
||||
+ public int getTopicConfigCacheExpiredSeconds() {
|
||||
+ return topicConfigCacheExpiredSeconds;
|
||||
}
|
||||
|
||||
- public void setTopicConfigCacheExpiredInSeconds(int topicConfigCacheExpiredInSeconds) {
|
||||
- this.topicConfigCacheExpiredInSeconds = topicConfigCacheExpiredInSeconds;
|
||||
+ public void setTopicConfigCacheExpiredSeconds(int topicConfigCacheExpiredSeconds) {
|
||||
+ this.topicConfigCacheExpiredSeconds = topicConfigCacheExpiredSeconds;
|
||||
}
|
||||
|
||||
public int getTopicConfigCacheMaxNum() {
|
||||
@@ -842,12 +861,20 @@ public class ProxyConfig implements ConfigFile {
|
||||
this.topicConfigCacheMaxNum = topicConfigCacheMaxNum;
|
||||
}
|
||||
|
||||
- public int getSubscriptionGroupConfigCacheExpiredInSeconds() {
|
||||
- return subscriptionGroupConfigCacheExpiredInSeconds;
|
||||
+ public int getSubscriptionGroupConfigCacheRefreshSeconds() {
|
||||
+ return subscriptionGroupConfigCacheRefreshSeconds;
|
||||
+ }
|
||||
+
|
||||
+ public void setSubscriptionGroupConfigCacheRefreshSeconds(int subscriptionGroupConfigCacheRefreshSeconds) {
|
||||
+ this.subscriptionGroupConfigCacheRefreshSeconds = subscriptionGroupConfigCacheRefreshSeconds;
|
||||
+ }
|
||||
+
|
||||
+ public int getSubscriptionGroupConfigCacheExpiredSeconds() {
|
||||
+ return subscriptionGroupConfigCacheExpiredSeconds;
|
||||
}
|
||||
|
||||
- public void setSubscriptionGroupConfigCacheExpiredInSeconds(int subscriptionGroupConfigCacheExpiredInSeconds) {
|
||||
- this.subscriptionGroupConfigCacheExpiredInSeconds = subscriptionGroupConfigCacheExpiredInSeconds;
|
||||
+ public void setSubscriptionGroupConfigCacheExpiredSeconds(int subscriptionGroupConfigCacheExpiredSeconds) {
|
||||
+ this.subscriptionGroupConfigCacheExpiredSeconds = subscriptionGroupConfigCacheExpiredSeconds;
|
||||
}
|
||||
|
||||
public int getSubscriptionGroupConfigCacheMaxNum() {
|
||||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
|
||||
index bc9582ad8..d34a0efd9 100644
|
||||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
|
||||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
|
||||
@@ -69,11 +69,13 @@ public class ClusterMetadataService extends AbstractStartAndShutdown implements
|
||||
);
|
||||
this.topicConfigCache = CacheBuilder.newBuilder()
|
||||
.maximumSize(config.getTopicConfigCacheMaxNum())
|
||||
- .refreshAfterWrite(config.getTopicConfigCacheExpiredInSeconds(), TimeUnit.SECONDS)
|
||||
+ .expireAfterAccess(config.getTopicConfigCacheExpiredSeconds(), TimeUnit.SECONDS)
|
||||
+ .refreshAfterWrite(config.getTopicConfigCacheRefreshSeconds(), TimeUnit.SECONDS)
|
||||
.build(new ClusterTopicConfigCacheLoader());
|
||||
this.subscriptionGroupConfigCache = CacheBuilder.newBuilder()
|
||||
.maximumSize(config.getSubscriptionGroupConfigCacheMaxNum())
|
||||
- .refreshAfterWrite(config.getSubscriptionGroupConfigCacheExpiredInSeconds(), TimeUnit.SECONDS)
|
||||
+ .expireAfterAccess(config.getSubscriptionGroupConfigCacheExpiredSeconds(), TimeUnit.SECONDS)
|
||||
+ .refreshAfterWrite(config.getSubscriptionGroupConfigCacheRefreshSeconds(), TimeUnit.SECONDS)
|
||||
.build(new ClusterSubscriptionGroupConfigCacheLoader());
|
||||
|
||||
this.init();
|
||||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
|
||||
index e012a5465..84348adc3 100644
|
||||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
|
||||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
|
||||
@@ -68,10 +68,13 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
|
||||
);
|
||||
this.mqClientAPIFactory = mqClientAPIFactory;
|
||||
|
||||
- this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()).
|
||||
- refreshAfterWrite(config.getTopicRouteServiceCacheExpiredInSeconds(), TimeUnit.SECONDS).
|
||||
- executor(cacheRefreshExecutor).build(new CacheLoader<String, MessageQueueView>() {
|
||||
- @Override public @Nullable MessageQueueView load(String topic) throws Exception {
|
||||
+ this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum())
|
||||
+ .expireAfterAccess(config.getTopicRouteServiceCacheExpiredSeconds(), TimeUnit.SECONDS)
|
||||
+ .refreshAfterWrite(config.getTopicRouteServiceCacheRefreshSeconds(), TimeUnit.SECONDS)
|
||||
+ .executor(cacheRefreshExecutor)
|
||||
+ .build(new CacheLoader<String, MessageQueueView>() {
|
||||
+ @Override
|
||||
+ public @Nullable MessageQueueView load(String topic) throws Exception {
|
||||
try {
|
||||
TopicRouteData topicRouteData = mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic, Duration.ofSeconds(3).toMillis());
|
||||
return buildMessageQueueView(topic, topicRouteData);
|
||||
@@ -83,7 +86,8 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
|
||||
}
|
||||
}
|
||||
|
||||
- @Override public @Nullable MessageQueueView reload(@NonNull String key,
|
||||
+ @Override
|
||||
+ public @Nullable MessageQueueView reload(@NonNull String key,
|
||||
@NonNull MessageQueueView oldValue) throws Exception {
|
||||
try {
|
||||
return load(key);
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 5f6dc90f9dab35809fcb0407d4d5cc2737d2335e Mon Sep 17 00:00:00 2001
|
||||
From: Ziyi Tan <tanziyi0925@gmail.com>
|
||||
Date: Fri, 25 Aug 2023 11:17:23 +0800
|
||||
Subject: [PATCH 2/6] [ISSUE #7250] Beautify command rocksDBConfigToJson output
|
||||
|
||||
Co-authored-by: Ziy1-Tan <ajb4596984460@gmail.com>
|
||||
---
|
||||
.../metadata/RocksDBConfigToJsonCommand.java | 32 +++++++++++--------
|
||||
1 file changed, 18 insertions(+), 14 deletions(-)
|
||||
|
||||
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
|
||||
index 3053f4684..3fc63e4dd 100644
|
||||
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
|
||||
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
|
||||
@@ -21,13 +21,13 @@ import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.Option;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
+import org.apache.rocketmq.common.UtilAll;
|
||||
import org.apache.rocketmq.common.config.RocksDBConfigManager;
|
||||
import org.apache.rocketmq.common.utils.DataConverter;
|
||||
import org.apache.rocketmq.remoting.RPCHook;
|
||||
import org.apache.rocketmq.tools.command.SubCommand;
|
||||
import org.apache.rocketmq.tools.command.SubCommandException;
|
||||
|
||||
-import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@@ -48,7 +48,7 @@ public class RocksDBConfigToJsonCommand implements SubCommand {
|
||||
@Override
|
||||
public Options buildCommandlineOptions(Options options) {
|
||||
Option pathOption = new Option("p", "path", true,
|
||||
- "Absolute path to the metadata directory");
|
||||
+ "Absolute path for the metadata directory");
|
||||
pathOption.setRequired(true);
|
||||
options.addOption(pathOption);
|
||||
|
||||
@@ -63,15 +63,14 @@ public class RocksDBConfigToJsonCommand implements SubCommand {
|
||||
@Override
|
||||
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
|
||||
String path = commandLine.getOptionValue("path").trim();
|
||||
- if (StringUtils.isEmpty(path) || !new File(path).exists()) {
|
||||
+ if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) {
|
||||
System.out.print("Rocksdb path is invalid.\n");
|
||||
return;
|
||||
}
|
||||
|
||||
String configType = commandLine.getOptionValue("configType").trim().toLowerCase();
|
||||
|
||||
- final long memTableFlushInterval = 60 * 60 * 1000L;
|
||||
- RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(memTableFlushInterval);
|
||||
+ RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(60 * 60 * 1000L);
|
||||
try {
|
||||
if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) {
|
||||
// for topics.json
|
||||
@@ -84,13 +83,16 @@ public class RocksDBConfigToJsonCommand implements SubCommand {
|
||||
topicConfigTable.put(topic, jsonObject);
|
||||
});
|
||||
|
||||
- if (isLoad) {
|
||||
- topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable));
|
||||
- final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true);
|
||||
- System.out.print(topicsJsonStr + "\n");
|
||||
+ if (!isLoad) {
|
||||
+ System.out.print("RocksDB load error, path=" + path);
|
||||
return;
|
||||
}
|
||||
+ topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable));
|
||||
+ final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true);
|
||||
+ System.out.print(topicsJsonStr + "\n");
|
||||
+ return;
|
||||
}
|
||||
+
|
||||
if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) {
|
||||
// for subscriptionGroup.json
|
||||
final Map<String, JSONObject> subscriptionGroupJsonConfig = new HashMap<>();
|
||||
@@ -102,13 +104,15 @@ public class RocksDBConfigToJsonCommand implements SubCommand {
|
||||
subscriptionGroupTable.put(subscriptionGroup, jsonObject);
|
||||
});
|
||||
|
||||
- if (isLoad) {
|
||||
- subscriptionGroupJsonConfig.put("subscriptionGroupTable",
|
||||
- (JSONObject) JSONObject.toJSON(subscriptionGroupTable));
|
||||
- final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true);
|
||||
- System.out.print(subscriptionGroupJsonStr + "\n");
|
||||
+ if (!isLoad) {
|
||||
+ System.out.print("RocksDB load error, path=" + path);
|
||||
return;
|
||||
}
|
||||
+ subscriptionGroupJsonConfig.put("subscriptionGroupTable",
|
||||
+ (JSONObject) JSONObject.toJSON(subscriptionGroupTable));
|
||||
+ final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true);
|
||||
+ System.out.print(subscriptionGroupJsonStr + "\n");
|
||||
+ return;
|
||||
}
|
||||
System.out.print("Config type was not recognized, configType=" + configType + "\n");
|
||||
} finally {
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From b4f73e2aabc1b141cec98431899e4090340adf0f Mon Sep 17 00:00:00 2001
|
||||
From: mxsm <ljbmxsm@gmail.com>
|
||||
Date: Sun, 27 Aug 2023 20:58:58 +0800
|
||||
Subject: [PATCH 3/6] [ISSUE #7271] Optimize the configuration for setting the
|
||||
quantity of TimerDequeuePutMessageService (#7272)
|
||||
|
||||
---
|
||||
.../java/org/apache/rocketmq/store/timer/TimerMessageStore.java | 2 +-
|
||||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||||
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
||||
index 690f4863e..181f7087a 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
||||
@@ -222,7 +222,7 @@ public class TimerMessageStore {
|
||||
dequeueGetMessageServices[i] = new TimerDequeueGetMessageService();
|
||||
}
|
||||
|
||||
- int putThreadNum = Math.max(storeConfig.getTimerGetMessageThreadNum(), 1);
|
||||
+ int putThreadNum = Math.max(storeConfig.getTimerPutMessageThreadNum(), 1);
|
||||
dequeuePutMessageServices = new TimerDequeuePutMessageService[putThreadNum];
|
||||
for (int i = 0; i < dequeuePutMessageServices.length; i++) {
|
||||
dequeuePutMessageServices[i] = new TimerDequeuePutMessageService();
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 3e100103af68588528bf32f3752a85e8023f46f8 Mon Sep 17 00:00:00 2001
|
||||
From: Ziyi Tan <tanziyi0925@gmail.com>
|
||||
Date: Tue, 29 Aug 2023 13:48:51 +0800
|
||||
Subject: [PATCH 4/6] [ISSUE #7277] Enhance rocksDBConfigToJson to support
|
||||
metadata counting (#7276)
|
||||
|
||||
---
|
||||
.../common/config/AbstractRocksDBStorage.java | 4 +-
|
||||
.../common/config/ConfigRocksDBStorage.java | 6 +
|
||||
.../tools/command/MQAdminStartup.java | 4 +-
|
||||
.../ExportMetadataInRocksDBCommand.java | 138 ++++++++++++++++++
|
||||
.../metadata/RocksDBConfigToJsonCommand.java | 122 ----------------
|
||||
...> ExportMetadataInRocksDBCommandTest.java} | 38 +++--
|
||||
6 files changed, 173 insertions(+), 139 deletions(-)
|
||||
create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
|
||||
delete mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
|
||||
rename tools/src/test/java/org/apache/rocketmq/tools/command/metadata/{KvConfigToJsonCommandTest.java => ExportMetadataInRocksDBCommandTest.java} (62%)
|
||||
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
|
||||
index e3673baad..a720a5be3 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
|
||||
@@ -385,8 +385,10 @@ public abstract class AbstractRocksDBStorage {
|
||||
this.options.close();
|
||||
}
|
||||
//4. close db.
|
||||
- if (db != null) {
|
||||
+ if (db != null && !this.readOnly) {
|
||||
this.db.syncWal();
|
||||
+ }
|
||||
+ if (db != null) {
|
||||
this.db.closeE();
|
||||
}
|
||||
//5. help gc.
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
|
||||
index 9d05ed282..463bd8fed 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
|
||||
@@ -60,6 +60,12 @@ public class ConfigRocksDBStorage extends AbstractRocksDBStorage {
|
||||
this.readOnly = false;
|
||||
}
|
||||
|
||||
+ public ConfigRocksDBStorage(final String dbPath, boolean readOnly) {
|
||||
+ super();
|
||||
+ this.dbPath = dbPath;
|
||||
+ this.readOnly = readOnly;
|
||||
+ }
|
||||
+
|
||||
private void initOptions() {
|
||||
this.options = createConfigDBOptions();
|
||||
|
||||
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
|
||||
index 324aa1856..788fa83c2 100644
|
||||
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
|
||||
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
|
||||
@@ -80,7 +80,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
|
||||
import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
|
||||
import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand;
|
||||
import org.apache.rocketmq.tools.command.message.SendMessageCommand;
|
||||
-import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand;
|
||||
+import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand;
|
||||
import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand;
|
||||
import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
|
||||
import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
|
||||
@@ -212,7 +212,6 @@ public class MQAdminStartup {
|
||||
|
||||
initCommand(new ClusterListSubCommand());
|
||||
initCommand(new TopicListSubCommand());
|
||||
- initCommand(new RocksDBConfigToJsonCommand());
|
||||
|
||||
initCommand(new UpdateKvConfigCommand());
|
||||
initCommand(new DeleteKvConfigCommand());
|
||||
@@ -257,6 +256,7 @@ public class MQAdminStartup {
|
||||
initCommand(new ExportMetadataCommand());
|
||||
initCommand(new ExportConfigsCommand());
|
||||
initCommand(new ExportMetricsCommand());
|
||||
+ initCommand(new ExportMetadataInRocksDBCommand());
|
||||
|
||||
initCommand(new HAStatusSubCommand());
|
||||
|
||||
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
|
||||
new file mode 100644
|
||||
index 000000000..2a7d3fba4
|
||||
--- /dev/null
|
||||
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java
|
||||
@@ -0,0 +1,138 @@
|
||||
+/*
|
||||
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
+ * contributor license agreements. See the NOTICE file distributed with
|
||||
+ * this work for additional information regarding copyright ownership.
|
||||
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
+ * (the "License"); you may not use this file except in compliance with
|
||||
+ * the License. You may obtain a copy of the License at
|
||||
+ *
|
||||
+ * http://www.apache.org/licenses/LICENSE-2.0
|
||||
+ *
|
||||
+ * Unless required by applicable law or agreed to in writing, software
|
||||
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
||||
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
+ * See the License for the specific language governing permissions and
|
||||
+ * limitations under the License.
|
||||
+ */
|
||||
+package org.apache.rocketmq.tools.command.export;
|
||||
+
|
||||
+import com.alibaba.fastjson.JSONObject;
|
||||
+import org.apache.commons.cli.CommandLine;
|
||||
+import org.apache.commons.cli.Option;
|
||||
+import org.apache.commons.cli.Options;
|
||||
+import org.apache.commons.lang3.StringUtils;
|
||||
+import org.apache.rocketmq.common.UtilAll;
|
||||
+import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
|
||||
+import org.apache.rocketmq.common.utils.DataConverter;
|
||||
+import org.apache.rocketmq.remoting.RPCHook;
|
||||
+import org.apache.rocketmq.tools.command.SubCommand;
|
||||
+import org.apache.rocketmq.tools.command.SubCommandException;
|
||||
+import org.rocksdb.RocksIterator;
|
||||
+
|
||||
+import java.util.HashMap;
|
||||
+import java.util.Map;
|
||||
+import java.util.concurrent.atomic.AtomicLong;
|
||||
+import java.util.function.BiConsumer;
|
||||
+
|
||||
+public class ExportMetadataInRocksDBCommand implements SubCommand {
|
||||
+ private static final String TOPICS_JSON_CONFIG = "topics";
|
||||
+ private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups";
|
||||
+
|
||||
+ @Override
|
||||
+ public String commandName() {
|
||||
+ return "exportMetadataInRocksDB";
|
||||
+ }
|
||||
+
|
||||
+ @Override
|
||||
+ public String commandDesc() {
|
||||
+ return "export RocksDB kv config (topics/subscriptionGroups)";
|
||||
+ }
|
||||
+
|
||||
+ @Override
|
||||
+ public Options buildCommandlineOptions(Options options) {
|
||||
+ Option pathOption = new Option("p", "path", true,
|
||||
+ "Absolute path for the metadata directory");
|
||||
+ pathOption.setRequired(true);
|
||||
+ options.addOption(pathOption);
|
||||
+
|
||||
+ Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " +
|
||||
+ "topics/subscriptionGroups");
|
||||
+ configTypeOption.setRequired(true);
|
||||
+ options.addOption(configTypeOption);
|
||||
+
|
||||
+ Option jsonEnableOption = new Option("j", "jsonEnable", true,
|
||||
+ "Json format enable, Default: false");
|
||||
+ jsonEnableOption.setRequired(false);
|
||||
+ options.addOption(jsonEnableOption);
|
||||
+
|
||||
+ return options;
|
||||
+ }
|
||||
+
|
||||
+ @Override
|
||||
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
|
||||
+ String path = commandLine.getOptionValue("path").trim();
|
||||
+ if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) {
|
||||
+ System.out.print("RocksDB path is invalid.\n");
|
||||
+ return;
|
||||
+ }
|
||||
+
|
||||
+ String configType = commandLine.getOptionValue("configType").trim().toLowerCase();
|
||||
+
|
||||
+ boolean jsonEnable = false;
|
||||
+ if (commandLine.hasOption("jsonEnable")) {
|
||||
+ jsonEnable = Boolean.parseBoolean(commandLine.getOptionValue("jsonEnable").trim());
|
||||
+ }
|
||||
+
|
||||
+
|
||||
+ ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /* readOnly */);
|
||||
+ if (!kvStore.start()) {
|
||||
+ System.out.print("RocksDB load error, path=" + path + "\n");
|
||||
+ return;
|
||||
+ }
|
||||
+
|
||||
+ try {
|
||||
+ if (TOPICS_JSON_CONFIG.equalsIgnoreCase(configType) || SUBSCRIPTION_GROUP_JSON_CONFIG.equalsIgnoreCase(configType)) {
|
||||
+ handleExportMetadata(kvStore, configType, jsonEnable);
|
||||
+ } else {
|
||||
+ System.out.printf("Invalid config type=%s, Options: topics,subscriptionGroups\n", configType);
|
||||
+ }
|
||||
+ } finally {
|
||||
+ kvStore.shutdown();
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ private static void handleExportMetadata(ConfigRocksDBStorage kvStore, String configType, boolean jsonEnable) {
|
||||
+ if (jsonEnable) {
|
||||
+ final Map<String, JSONObject> jsonConfig = new HashMap<>();
|
||||
+ final Map<String, JSONObject> configTable = new HashMap<>();
|
||||
+ iterateKvStore(kvStore, (key, value) -> {
|
||||
+ final String configKey = new String(key, DataConverter.charset);
|
||||
+ final String configValue = new String(value, DataConverter.charset);
|
||||
+ final JSONObject jsonObject = JSONObject.parseObject(configValue);
|
||||
+ configTable.put(configKey, jsonObject);
|
||||
+ }
|
||||
+ );
|
||||
+
|
||||
+ jsonConfig.put(configType.equalsIgnoreCase(TOPICS_JSON_CONFIG) ? "topicConfigTable" : "subscriptionGroupTable",
|
||||
+ (JSONObject) JSONObject.toJSON(configTable));
|
||||
+ final String jsonConfigStr = JSONObject.toJSONString(jsonConfig, true);
|
||||
+ System.out.print(jsonConfigStr + "\n");
|
||||
+ } else {
|
||||
+ AtomicLong count = new AtomicLong(0);
|
||||
+ iterateKvStore(kvStore, (key, value) -> {
|
||||
+ final String configKey = new String(key, DataConverter.charset);
|
||||
+ final String configValue = new String(value, DataConverter.charset);
|
||||
+ System.out.printf("%d, Key: %s, Value: %s%n", count.incrementAndGet(), configKey, configValue);
|
||||
+ });
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ private static void iterateKvStore(ConfigRocksDBStorage kvStore, BiConsumer<byte[], byte[]> biConsumer) {
|
||||
+ try (RocksIterator iterator = kvStore.iterator()) {
|
||||
+ iterator.seekToFirst();
|
||||
+ for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
|
||||
+ biConsumer.accept(iterator.key(), iterator.value());
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+}
|
||||
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
|
||||
deleted file mode 100644
|
||||
index 3fc63e4dd..000000000
|
||||
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
|
||||
+++ /dev/null
|
||||
@@ -1,122 +0,0 @@
|
||||
-/*
|
||||
- * Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
- * contributor license agreements. See the NOTICE file distributed with
|
||||
- * this work for additional information regarding copyright ownership.
|
||||
- * The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
- * (the "License"); you may not use this file except in compliance with
|
||||
- * the License. You may obtain a copy of the License at
|
||||
- *
|
||||
- * http://www.apache.org/licenses/LICENSE-2.0
|
||||
- *
|
||||
- * Unless required by applicable law or agreed to in writing, software
|
||||
- * distributed under the License is distributed on an "AS IS" BASIS,
|
||||
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
- * See the License for the specific language governing permissions and
|
||||
- * limitations under the License.
|
||||
- */
|
||||
-package org.apache.rocketmq.tools.command.metadata;
|
||||
-
|
||||
-import com.alibaba.fastjson.JSONObject;
|
||||
-import org.apache.commons.cli.CommandLine;
|
||||
-import org.apache.commons.cli.Option;
|
||||
-import org.apache.commons.cli.Options;
|
||||
-import org.apache.commons.lang3.StringUtils;
|
||||
-import org.apache.rocketmq.common.UtilAll;
|
||||
-import org.apache.rocketmq.common.config.RocksDBConfigManager;
|
||||
-import org.apache.rocketmq.common.utils.DataConverter;
|
||||
-import org.apache.rocketmq.remoting.RPCHook;
|
||||
-import org.apache.rocketmq.tools.command.SubCommand;
|
||||
-import org.apache.rocketmq.tools.command.SubCommandException;
|
||||
-
|
||||
-import java.util.HashMap;
|
||||
-import java.util.Map;
|
||||
-
|
||||
-public class RocksDBConfigToJsonCommand implements SubCommand {
|
||||
- private static final String TOPICS_JSON_CONFIG = "topics";
|
||||
- private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups";
|
||||
-
|
||||
- @Override
|
||||
- public String commandName() {
|
||||
- return "rocksDBConfigToJson";
|
||||
- }
|
||||
-
|
||||
- @Override
|
||||
- public String commandDesc() {
|
||||
- return "Convert RocksDB kv config (topics/subscriptionGroups) to json";
|
||||
- }
|
||||
-
|
||||
- @Override
|
||||
- public Options buildCommandlineOptions(Options options) {
|
||||
- Option pathOption = new Option("p", "path", true,
|
||||
- "Absolute path for the metadata directory");
|
||||
- pathOption.setRequired(true);
|
||||
- options.addOption(pathOption);
|
||||
-
|
||||
- Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " +
|
||||
- "topics/subscriptionGroups");
|
||||
- configTypeOption.setRequired(true);
|
||||
- options.addOption(configTypeOption);
|
||||
-
|
||||
- return options;
|
||||
- }
|
||||
-
|
||||
- @Override
|
||||
- public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
|
||||
- String path = commandLine.getOptionValue("path").trim();
|
||||
- if (StringUtils.isEmpty(path) || !UtilAll.isPathExists(path)) {
|
||||
- System.out.print("Rocksdb path is invalid.\n");
|
||||
- return;
|
||||
- }
|
||||
-
|
||||
- String configType = commandLine.getOptionValue("configType").trim().toLowerCase();
|
||||
-
|
||||
- RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(60 * 60 * 1000L);
|
||||
- try {
|
||||
- if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) {
|
||||
- // for topics.json
|
||||
- final Map<String, JSONObject> topicsJsonConfig = new HashMap<>();
|
||||
- final Map<String, JSONObject> topicConfigTable = new HashMap<>();
|
||||
- boolean isLoad = kvConfigManager.load(path, (key, value) -> {
|
||||
- final String topic = new String(key, DataConverter.charset);
|
||||
- final String topicConfig = new String(value, DataConverter.charset);
|
||||
- final JSONObject jsonObject = JSONObject.parseObject(topicConfig);
|
||||
- topicConfigTable.put(topic, jsonObject);
|
||||
- });
|
||||
-
|
||||
- if (!isLoad) {
|
||||
- System.out.print("RocksDB load error, path=" + path);
|
||||
- return;
|
||||
- }
|
||||
- topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable));
|
||||
- final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true);
|
||||
- System.out.print(topicsJsonStr + "\n");
|
||||
- return;
|
||||
- }
|
||||
-
|
||||
- if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) {
|
||||
- // for subscriptionGroup.json
|
||||
- final Map<String, JSONObject> subscriptionGroupJsonConfig = new HashMap<>();
|
||||
- final Map<String, JSONObject> subscriptionGroupTable = new HashMap<>();
|
||||
- boolean isLoad = kvConfigManager.load(path, (key, value) -> {
|
||||
- final String subscriptionGroup = new String(key, DataConverter.charset);
|
||||
- final String subscriptionGroupConfig = new String(value, DataConverter.charset);
|
||||
- final JSONObject jsonObject = JSONObject.parseObject(subscriptionGroupConfig);
|
||||
- subscriptionGroupTable.put(subscriptionGroup, jsonObject);
|
||||
- });
|
||||
-
|
||||
- if (!isLoad) {
|
||||
- System.out.print("RocksDB load error, path=" + path);
|
||||
- return;
|
||||
- }
|
||||
- subscriptionGroupJsonConfig.put("subscriptionGroupTable",
|
||||
- (JSONObject) JSONObject.toJSON(subscriptionGroupTable));
|
||||
- final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true);
|
||||
- System.out.print(subscriptionGroupJsonStr + "\n");
|
||||
- return;
|
||||
- }
|
||||
- System.out.print("Config type was not recognized, configType=" + configType + "\n");
|
||||
- } finally {
|
||||
- kvConfigManager.stop();
|
||||
- }
|
||||
- }
|
||||
-}
|
||||
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java
|
||||
similarity index 62%
|
||||
rename from tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java
|
||||
rename to tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java
|
||||
index b2f66c7b0..2b938c90f 100644
|
||||
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java
|
||||
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java
|
||||
@@ -21,43 +21,53 @@ import org.apache.commons.cli.DefaultParser;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.rocketmq.srvutil.ServerUtil;
|
||||
import org.apache.rocketmq.tools.command.SubCommandException;
|
||||
+import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
-public class KvConfigToJsonCommandTest {
|
||||
+public class ExportMetadataInRocksDBCommandTest {
|
||||
private static final String BASE_PATH = System.getProperty("user.home") + File.separator + "store/config/";
|
||||
|
||||
@Test
|
||||
public void testExecute() throws SubCommandException {
|
||||
{
|
||||
- String[] cases = new String[]{"topics", "subscriptionGroups"};
|
||||
- for (String c : cases) {
|
||||
- RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand();
|
||||
+ String[][] cases = new String[][] {
|
||||
+ {"topics", "false"},
|
||||
+ {"topics", "false1"},
|
||||
+ {"topics", "true"},
|
||||
+ {"subscriptionGroups", "false"},
|
||||
+ {"subscriptionGroups", "false2"},
|
||||
+ {"subscriptionGroups", "true"}
|
||||
+ };
|
||||
+
|
||||
+ for (String[] c : cases) {
|
||||
+ ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand();
|
||||
Options options = ServerUtil.buildCommandlineOptions(new Options());
|
||||
- String[] subargs = new String[]{"-p " + BASE_PATH + c, "-t " + c};
|
||||
+ String[] subargs = new String[] {"-p " + BASE_PATH + c[0], "-t " + c[0], "-j " + c[1]};
|
||||
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs,
|
||||
- cmd.buildCommandlineOptions(options), new DefaultParser());
|
||||
+ cmd.buildCommandlineOptions(options), new DefaultParser());
|
||||
cmd.execute(commandLine, options, null);
|
||||
- assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c);
|
||||
- assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c);
|
||||
+ assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c[0]);
|
||||
+ assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c[0]);
|
||||
+ assertThat(commandLine.getOptionValue("j").trim()).isEqualTo(c[1]);
|
||||
}
|
||||
}
|
||||
// invalid cases
|
||||
{
|
||||
- String[][] cases = new String[][]{
|
||||
- {"-p " + BASE_PATH + "tmpPath", "-t topics"},
|
||||
- {"-p ", "-t topics"},
|
||||
- {"-p " + BASE_PATH + "topics", "-t invalid_type"}
|
||||
+ String[][] cases = new String[][] {
|
||||
+ {"-p " + BASE_PATH + "tmpPath", "-t topics", "-j true"},
|
||||
+ {"-p ", "-t topics", "-j true"},
|
||||
+ {"-p " + BASE_PATH + "topics", "-t invalid_type", "-j true"}
|
||||
};
|
||||
|
||||
for (String[] c : cases) {
|
||||
- RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand();
|
||||
+ ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand();
|
||||
Options options = ServerUtil.buildCommandlineOptions(new Options());
|
||||
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), c,
|
||||
- cmd.buildCommandlineOptions(options), new DefaultParser());
|
||||
+ cmd.buildCommandlineOptions(options), new DefaultParser());
|
||||
cmd.execute(commandLine, options, null);
|
||||
}
|
||||
}
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From fa549154370cb866a90e37c13a90d2c598d6b1f6 Mon Sep 17 00:00:00 2001
|
||||
From: yuz10 <845238369@qq.com>
|
||||
Date: Tue, 29 Aug 2023 15:22:09 +0800
|
||||
Subject: [PATCH 5/6] [ISSUE #7261] Slave high CPU usage when
|
||||
enableScheduleAsyncDeliver=true (#7262)
|
||||
|
||||
* [ISSUE #6390] Add break to the exception of WHEEL_TIMER_NOT_ENABLE.
|
||||
|
||||
* fix broker start fail if mapped file size is 0
|
||||
|
||||
* log
|
||||
|
||||
* only delete the last empty file
|
||||
|
||||
* change dataReadAheadEnable default to true
|
||||
|
||||
* fix endless loop when master change to slave.
|
||||
---
|
||||
.../rocketmq/broker/schedule/ScheduleMessageService.java | 7 ++++++-
|
||||
1 file changed, 6 insertions(+), 1 deletion(-)
|
||||
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
|
||||
index aed0ee19f..297b14207 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
|
||||
@@ -566,7 +566,8 @@ public class ScheduleMessageService extends ConfigManager {
|
||||
pendingQueue.remove();
|
||||
break;
|
||||
case RUNNING:
|
||||
- break;
|
||||
+ scheduleNextTask();
|
||||
+ return;
|
||||
case EXCEPTION:
|
||||
if (!isStarted()) {
|
||||
log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString());
|
||||
@@ -586,6 +587,10 @@ public class ScheduleMessageService extends ConfigManager {
|
||||
}
|
||||
}
|
||||
|
||||
+ scheduleNextTask();
|
||||
+ }
|
||||
+
|
||||
+ private void scheduleNextTask() {
|
||||
if (isStarted()) {
|
||||
ScheduleMessageService.this.handleExecutorService
|
||||
.schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 9f34f55e1dac495730c9cd5469f2ab3225b8f0b9 Mon Sep 17 00:00:00 2001
|
||||
From: ShuangxiDing <dingshuangxi888@gmail.com>
|
||||
Date: Tue, 29 Aug 2023 15:48:46 +0800
|
||||
Subject: [PATCH 6/6] [ISSUE #7226] Filter tlvs in ppv2 which contents not are
|
||||
spec-compliant ASCII characters and space (#7227)
|
||||
|
||||
Filter tlvs in ppv2 which not are spec-compliant ASCII characters and space
|
||||
---
|
||||
.../rocketmq/common/utils/BinaryUtil.java | 17 +++++++++++++++++
|
||||
.../grpc/ProxyAndTlsProtocolNegotiator.java | 8 +++++++-
|
||||
.../remoting/netty/NettyRemotingServer.java | 8 +++++++-
|
||||
3 files changed, 31 insertions(+), 2 deletions(-)
|
||||
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java
|
||||
index 421adaca4..7b4b24819 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/BinaryUtil.java
|
||||
@@ -43,4 +43,21 @@ public class BinaryUtil {
|
||||
byte[] bytes = calculateMd5(content);
|
||||
return Hex.encodeHexString(bytes, false);
|
||||
}
|
||||
+
|
||||
+ /**
|
||||
+ * Returns true if subject contains only bytes that are spec-compliant ASCII characters.
|
||||
+ * @param subject
|
||||
+ * @return
|
||||
+ */
|
||||
+ public static boolean isAscii(byte[] subject) {
|
||||
+ if (subject == null) {
|
||||
+ return false;
|
||||
+ }
|
||||
+ for (byte b : subject) {
|
||||
+ if ((b & 0x80) != 0) {
|
||||
+ return false;
|
||||
+ }
|
||||
+ }
|
||||
+ return true;
|
||||
+ }
|
||||
}
|
||||
\ No newline at end of file
|
||||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
|
||||
index ee167bd7b..b584ddfbd 100644
|
||||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
|
||||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
|
||||
@@ -24,6 +24,7 @@ import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
|
||||
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
|
||||
import io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiationEvent;
|
||||
import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
|
||||
+import io.grpc.netty.shaded.io.netty.buffer.ByteBufUtil;
|
||||
import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
|
||||
import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
|
||||
import io.grpc.netty.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
@@ -44,6 +45,7 @@ import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.common.constant.HAProxyConstants;
|
||||
import org.apache.rocketmq.common.constant.LoggerName;
|
||||
+import org.apache.rocketmq.common.utils.BinaryUtil;
|
||||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||||
import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
||||
@@ -191,9 +193,13 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator
|
||||
}
|
||||
if (CollectionUtils.isNotEmpty(msg.tlvs())) {
|
||||
msg.tlvs().forEach(tlv -> {
|
||||
+ byte[] valueBytes = ByteBufUtil.getBytes(tlv.content());
|
||||
+ if (!BinaryUtil.isAscii(valueBytes)) {
|
||||
+ return;
|
||||
+ }
|
||||
Attributes.Key<String> key = AttributeKeys.valueOf(
|
||||
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
|
||||
- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
|
||||
+ String value = StringUtils.trim(new String(valueBytes, CharsetUtil.UTF_8));
|
||||
builder.set(key, value);
|
||||
});
|
||||
}
|
||||
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
||||
index 17f138f86..e626260c9 100644
|
||||
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
||||
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
||||
@@ -18,6 +18,7 @@ package org.apache.rocketmq.remoting.netty;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
+import io.netty.buffer.ByteBufUtil;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelDuplexHandler;
|
||||
@@ -58,6 +59,7 @@ import org.apache.rocketmq.common.Pair;
|
||||
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
||||
import org.apache.rocketmq.common.constant.HAProxyConstants;
|
||||
import org.apache.rocketmq.common.constant.LoggerName;
|
||||
+import org.apache.rocketmq.common.utils.BinaryUtil;
|
||||
import org.apache.rocketmq.common.utils.NetworkUtil;
|
||||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||||
@@ -787,9 +789,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
||||
}
|
||||
if (CollectionUtils.isNotEmpty(msg.tlvs())) {
|
||||
msg.tlvs().forEach(tlv -> {
|
||||
+ byte[] valueBytes = ByteBufUtil.getBytes(tlv.content());
|
||||
+ if (!BinaryUtil.isAscii(valueBytes)) {
|
||||
+ return;
|
||||
+ }
|
||||
AttributeKey<String> key = AttributeKeys.valueOf(
|
||||
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
|
||||
- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
|
||||
+ String value = StringUtils.trim(new String(valueBytes, CharsetUtil.UTF_8));
|
||||
channel.attr(key).set(value);
|
||||
});
|
||||
}
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
Summary: Cloud-Native, Distributed Messaging and Streaming
|
||||
Name: rocketmq
|
||||
Version: 5.1.3
|
||||
Release: 13
|
||||
Release: 14
|
||||
License: Apache-2.0
|
||||
Group: Applications/Message
|
||||
URL: https://rocketmq.apache.org/
|
||||
@ -22,6 +22,7 @@ Patch0009: patch009-backport-Support-KV-Storage.patch
|
||||
Patch0010: patch010-backport-add-some-fixes.patch
|
||||
Patch0011: patch011-backport-optimize-config.patch
|
||||
Patch0012: patch012-backport-enhance-rockdbconfigtojson.patch
|
||||
Patch0013: patch013-backport-enhance-admin-output.patch
|
||||
BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git
|
||||
Requires: java-1.8.0-openjdk-devel
|
||||
|
||||
@ -56,6 +57,9 @@ exit 0
|
||||
|
||||
|
||||
%changelog
|
||||
* Fri Oct 6 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-14
|
||||
- backport enhance admin output
|
||||
|
||||
* Wed Oct 2 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-13
|
||||
- backport enhance medata to json
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user