rocketmq/patch012-backport-enhance-rockdbconfigtojson.patch
2023-10-06 23:43:03 +08:00

2921 lines
148 KiB
Diff

From fec141481496c53a0db398367006c34264662d18 Mon Sep 17 00:00:00 2001
From: yx9o <yangx_soft@163.com>
Date: Wed, 23 Aug 2023 08:22:34 +0800
Subject: [PATCH 1/8] [ISSUE #7166] Optimize the display format of admin
(#7210)
---
.../java/org/apache/rocketmq/tools/command/MQAdminStartup.java | 2 +-
.../command/acl/ClusterAclConfigVersionListSubCommand.java | 2 +-
.../tools/command/acl/DeleteAccessConfigSubCommand.java | 2 +-
.../rocketmq/tools/command/acl/GetAccessConfigSubCommand.java | 2 +-
.../tools/command/acl/UpdateAccessConfigSubCommand.java | 2 +-
.../tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java | 2 +-
.../tools/command/broker/BrokerConsumeStatsSubCommad.java | 2 +-
.../rocketmq/tools/command/broker/BrokerStatusSubCommand.java | 2 +-
.../tools/command/broker/CommitLogSetReadAheadSubCommand.java | 2 +-
.../tools/command/broker/DeleteExpiredCommitLogSubCommand.java | 2 +-
.../rocketmq/tools/command/broker/GetBrokerConfigCommand.java | 2 +-
.../rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java | 2 +-
.../tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java | 2 +-
.../broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java | 2 +-
.../tools/command/broker/ResetMasterFlushOffsetSubCommand.java | 2 +-
.../tools/command/broker/UpdateBrokerConfigSubCommand.java | 2 +-
.../broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java | 2 +-
.../rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java | 2 +-
.../rocketmq/tools/command/cluster/ClusterListSubCommand.java | 2 +-
.../tools/command/connection/ConsumerConnectionSubCommand.java | 2 +-
.../tools/command/connection/ProducerConnectionSubCommand.java | 2 +-
.../tools/command/consumer/ConsumerStatusSubCommand.java | 2 +-
.../tools/command/consumer/GetConsumerConfigSubCommand.java | 2 +-
.../tools/command/consumer/StartMonitoringSubCommand.java | 2 +-
.../tools/command/consumer/UpdateSubGroupSubCommand.java | 2 +-
.../rocketmq/tools/command/container/AddBrokerSubCommand.java | 2 +-
.../tools/command/container/RemoveBrokerSubCommand.java | 2 +-
.../command/controller/CleanControllerBrokerMetaSubCommand.java | 2 +-
.../command/controller/GetControllerMetaDataSubCommand.java | 2 +-
.../tools/command/controller/ReElectMasterSubCommand.java | 2 +-
.../rocketmq/tools/command/export/ExportConfigsCommand.java | 2 +-
.../rocketmq/tools/command/export/ExportMetadataCommand.java | 2 +-
.../rocketmq/tools/command/export/ExportMetricsCommand.java | 2 +-
.../rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java | 2 +-
.../apache/rocketmq/tools/command/ha/HAStatusSubCommand.java | 2 +-
.../rocketmq/tools/command/message/CheckMsgSendRTCommand.java | 2 +-
.../rocketmq/tools/command/message/ConsumeMessageCommand.java | 2 +-
.../tools/command/message/DumpCompactionLogCommand.java | 2 +-
.../tools/command/message/PrintMessageByQueueCommand.java | 2 +-
.../rocketmq/tools/command/message/PrintMessageSubCommand.java | 2 +-
.../rocketmq/tools/command/message/QueryMsgByIdSubCommand.java | 2 +-
.../rocketmq/tools/command/message/QueryMsgByKeySubCommand.java | 2 +-
.../tools/command/message/QueryMsgByOffsetSubCommand.java | 2 +-
.../tools/command/message/QueryMsgByUniqueKeySubCommand.java | 2 +-
.../tools/command/message/QueryMsgTraceByIdSubCommand.java | 2 +-
.../rocketmq/tools/command/message/SendMessageCommand.java | 2 +-
.../rocketmq/tools/command/namesrv/AddWritePermSubCommand.java | 2 +-
.../rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java | 2 +-
.../tools/command/offset/SkipAccumulationSubCommand.java | 2 +-
.../apache/rocketmq/tools/command/stats/StatsAllSubCommand.java | 2 +-
.../rocketmq/tools/command/topic/AllocateMQSubCommand.java | 2 +-
.../rocketmq/tools/command/topic/TopicClusterSubCommand.java | 2 +-
.../rocketmq/tools/command/topic/TopicListSubCommand.java | 2 +-
.../rocketmq/tools/command/topic/TopicRouteSubCommand.java | 2 +-
.../rocketmq/tools/command/topic/TopicStatusSubCommand.java | 2 +-
.../rocketmq/tools/command/topic/UpdateOrderConfCommand.java | 2 +-
.../tools/command/topic/UpdateStaticTopicSubCommand.java | 2 +-
.../rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java | 2 +-
.../rocketmq/tools/command/topic/UpdateTopicSubCommand.java | 2 +-
59 files changed, 59 insertions(+), 59 deletions(-)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 0c2618e91..890125ca0 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -278,7 +278,7 @@ public class MQAdminStartup {
System.out.printf("The most commonly used mqadmin commands are:%n");
for (SubCommand cmd : SUB_COMMANDS) {
- System.out.printf(" %-25s %s%n", cmd.commandName(), cmd.commandDesc());
+ System.out.printf(" %-35s %s%n", cmd.commandName(), cmd.commandDesc());
}
System.out.printf("%nSee 'mqadmin help <command>' for more information on a specific command.%n");
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java
index f8a00b1e0..26ed028fb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java
@@ -47,7 +47,7 @@ public class ClusterAclConfigVersionListSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "List all of acl config version information in cluster";
+ return "List all of acl config version information in cluster.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java
index fd3a92fff..a7f3d295a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java
@@ -42,7 +42,7 @@ public class DeleteAccessConfigSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Delete Acl Config Account in broker";
+ return "Delete Acl Config Account in broker.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java
index 25844d6a1..f1c9a1496 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java
@@ -49,7 +49,7 @@ public class GetAccessConfigSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "List all of acl config information in cluster";
+ return "List all of acl config information in cluster.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
index 3be40daa1..d8a06f92d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
@@ -40,7 +40,7 @@ public class UpdateAccessConfigSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Update acl config yaml file in broker";
+ return "Update acl config yaml file in broker.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java
index ff662b506..9dacf1fae 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java
@@ -37,7 +37,7 @@ public class UpdateGlobalWhiteAddrSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Update global white address for acl Config File in broker";
+ return "Update global white address for acl Config File in broker.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
index 3f2f90673..7658a2139 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
@@ -61,7 +61,7 @@ public class BrokerConsumeStatsSubCommad implements SubCommand {
@Override
public String commandDesc() {
- return "Fetch broker consume stats data";
+ return "Fetch broker consume stats data.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
index 830ff3425..ce934f547 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
@@ -44,7 +44,7 @@ public class BrokerStatusSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Fetch broker runtime status data";
+ return "Fetch broker runtime status data.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java
index b00c7f5f5..4fdabfdf8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java
@@ -44,7 +44,7 @@ public class CommitLogSetReadAheadSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "set read ahead mode for all commitlog files";
+ return "Set read ahead mode for all commitlog files.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java
index a4b2a51ad..142bb7b3c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java
@@ -37,7 +37,7 @@ public class DeleteExpiredCommitLogSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Delete expired CommitLog files";
+ return "Delete expired CommitLog files.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
index 5d86c10e4..c4762a296 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
@@ -45,7 +45,7 @@ public class GetBrokerConfigCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Get broker config by cluster or special broker";
+ return "Get broker config by cluster or special broker.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java
index abe8fc622..1a8961e04 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java
@@ -38,7 +38,7 @@ public class GetBrokerEpochSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Fetch broker epoch entries";
+ return "Fetch broker epoch entries.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java
index 7c54e650c..34b3ba7d3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java
@@ -47,7 +47,7 @@ public class GetColdDataFlowCtrInfoSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "get cold data flow ctr info";
+ return "Get cold data flow ctr info.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java
index b0477924f..f20407480 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java
@@ -36,7 +36,7 @@ public class RemoveColdDataFlowCtrGroupConfigSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "remove consumer from cold ctr config";
+ return "Remove consumer from cold ctr config.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java
index b2ac48c84..90451b51f 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java
@@ -33,7 +33,7 @@ public class ResetMasterFlushOffsetSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Reset master flush offset in slave";
+ return "Reset master flush offset in slave.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
index 98abeb6ae..62816ef03 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
@@ -37,7 +37,7 @@ public class UpdateBrokerConfigSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Update broker's config";
+ return "Update broker's config.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java
index d06a24b57..8d1a00077 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java
@@ -39,7 +39,7 @@ public class UpdateColdDataFlowCtrGroupConfigSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "addOrUpdate cold data flow ctr group config";
+ return "Add or update cold data flow ctr group config.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
index 7253970bd..d755e9e5d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
@@ -48,7 +48,7 @@ public class CLusterSendMsgRTCommand implements SubCommand {
@Override
public String commandDesc() {
- return "List All clusters Message Send RT";
+ return "List All clusters Message Send RT.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
index a7a840a44..ede0fa5cf 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
@@ -41,7 +41,7 @@ public class ClusterListSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "List cluster infos";
+ return "List cluster infos.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
index 630961e31..35f73d8a0 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
@@ -39,7 +39,7 @@ public class ConsumerConnectionSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Query consumer's socket connection, client version and subscription";
+ return "Query consumer's socket connection, client version and subscription.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
index 2533982c8..bde674ab2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
@@ -36,7 +36,7 @@ public class ProducerConnectionSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Query producer's socket connection and client version";
+ return "Query producer's socket connection and client version.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
index 72b9c975e..d8f6f9aa9 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
@@ -47,7 +47,7 @@ public class ConsumerStatusSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Query consumer's internal data structure";
+ return "Query consumer's internal data structure.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
index 6095e7668..4a8253a02 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
@@ -43,7 +43,7 @@ public class GetConsumerConfigSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Get consumer config by subscription group name";
+ return "Get consumer config by subscription group name.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java
index 2d08d0bd0..f5e140433 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java
@@ -34,7 +34,7 @@ public class StartMonitoringSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Start Monitoring";
+ return "Start Monitoring.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
index f87bafc93..b17da4de4 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
@@ -41,7 +41,7 @@ public class UpdateSubGroupSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Update or create subscription group";
+ return "Update or create subscription group.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java
index e9e5be4a5..007d42ae6 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java
@@ -33,7 +33,7 @@ public class AddBrokerSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Add a broker to specified container";
+ return "Add a broker to specified container.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java
index 7c455f858..ab25d8ebe 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java
@@ -33,7 +33,7 @@ public class RemoveBrokerSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Remove a broker from specified container";
+ return "Remove a broker from specified container.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerMetaSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerMetaSubCommand.java
index 856e4b426..24ed02566 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerMetaSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerMetaSubCommand.java
@@ -37,7 +37,7 @@ public class CleanControllerBrokerMetaSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Clean metadata of broker on controller";
+ return "Clean metadata of broker on controller.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java
index 70bd7f8e9..966443127 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java
@@ -34,7 +34,7 @@ public class GetControllerMetaDataSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Get controller cluster's metadata";
+ return "Get controller cluster's metadata.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
index 1affe81f9..a522a903d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
@@ -37,7 +37,7 @@ public class ReElectMasterSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Re-elect the specified broker as master";
+ return "Re-elect the specified broker as master.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java
index b8191296d..03613b29c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java
@@ -42,7 +42,7 @@ public class ExportConfigsCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Export configs";
+ return "Export configs.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java
index 1f9cf7d96..748f7b16e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java
@@ -46,7 +46,7 @@ public class ExportMetadataCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Export metadata";
+ return "Export metadata.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java
index a793b4b84..5d8bb37ba 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java
@@ -56,7 +56,7 @@ public class ExportMetricsCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Export metrics";
+ return "Export metrics.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
index 44b3ec3e1..b6231e4f9 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
@@ -40,7 +40,7 @@ public class GetSyncStateSetSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Fetch syncStateSet for target brokers";
+ return "Fetch syncStateSet for target brokers.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java
index b1795e046..931658a08 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java
@@ -41,7 +41,7 @@ public class HAStatusSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Fetch ha runtime status data";
+ return "Fetch ha runtime status data.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java
index 4c6d5ffb6..b15b59d50 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java
@@ -40,7 +40,7 @@ public class CheckMsgSendRTCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Check message send response time";
+ return "Check message send response time.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
index 8aed59ea4..02ff53269 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
@@ -70,7 +70,7 @@ public class ConsumeMessageCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Consume message";
+ return "Consume message.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
index ae6d9bdcf..eee8f3d4b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
@@ -38,7 +38,7 @@ import java.nio.file.Paths;
public class DumpCompactionLogCommand implements SubCommand {
@Override
public String commandDesc() {
- return "parse compaction log to message";
+ return "Parse compaction log to message.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
index 654560167..0418e88a7 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
@@ -108,7 +108,7 @@ public class PrintMessageByQueueCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Print Message Detail";
+ return "Print Message Detail by queueId.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
index d01c36d42..bb82f5079 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
@@ -62,7 +62,7 @@ public class PrintMessageSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Print Message Detail";
+ return "Print Message Detail.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
index 2880477f1..b42612150 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
@@ -186,7 +186,7 @@ public class QueryMsgByIdSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Query Message by Id";
+ return "Query Message by Id.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
index ba7b00c3b..64627fd19 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
@@ -36,7 +36,7 @@ public class QueryMsgByKeySubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Query Message by Key";
+ return "Query Message by Key.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java
index d27313af1..14d0625fd 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java
@@ -39,7 +39,7 @@ public class QueryMsgByOffsetSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Query Message by offset";
+ return "Query Message by offset.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
index 1b28f8be1..b71cee901 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
@@ -141,7 +141,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Query Message by Unique key";
+ return "Query Message by Unique key.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java
index 2b982efef..2c546ec56 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java
@@ -65,7 +65,7 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Query a message trace";
+ return "Query a message trace.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java
index 836ee192b..970da6b16 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java
@@ -41,7 +41,7 @@ public class SendMessageCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Send a message";
+ return "Send a message.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java
index 98542d065..0b0a075bd 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java
@@ -34,7 +34,7 @@ public class AddWritePermSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Add write perm of broker in all name server you defined in the -n param";
+ return "Add write perm of broker in all name server you defined in the -n param.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java
index 213931ed8..637dd52c8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java
@@ -34,7 +34,7 @@ public class WipeWritePermSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Wipe write perm of broker in all name server you defined in the -n param";
+ return "Wipe write perm of broker in all name server you defined in the -n param.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java
index 139821f9c..b22491a59 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java
@@ -41,7 +41,7 @@ public class SkipAccumulationSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Skip all messages that are accumulated (not consumed) currently";
+ return "Skip all messages that are accumulated (not consumed) currently.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java
index 1d49bbe11..96097a93e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java
@@ -144,7 +144,7 @@ public class StatsAllSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Topic and Consumer tps stats";
+ return "Topic and Consumer tps stats.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
index 3fa42f297..6a9b81eb8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
@@ -41,7 +41,7 @@ public class AllocateMQSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Allocate MQ";
+ return "Allocate MQ.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java
index 1dab693d9..098f34ff0 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java
@@ -34,7 +34,7 @@ public class TopicClusterSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Get cluster info for topic";
+ return "Get cluster info for topic.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java
index 346bac704..d9a279f80 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java
@@ -45,7 +45,7 @@ public class TopicListSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Fetch all topic list from name server";
+ return "Fetch all topic list from name server.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java
index f2dabec4e..70949d388 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java
@@ -42,7 +42,7 @@ public class TopicRouteSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Examine topic route info";
+ return "Examine topic route info.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
index fdb249fab..a1619eced 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
@@ -40,7 +40,7 @@ public class TopicStatusSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Examine topic Status info";
+ return "Examine topic Status info.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java
index bebc646b4..3040d04c2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java
@@ -36,7 +36,7 @@ public class UpdateOrderConfCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Create or update or delete order conf";
+ return "Create or update or delete order conf.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 85a18c654..3daeee86c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -48,7 +48,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Update or create static topic, which has fixed number of queues";
+ return "Update or create static topic, which has fixed number of queues.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java
index aaa881538..d27cd1861 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java
@@ -44,7 +44,7 @@ public class UpdateTopicPermSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Update topic perm";
+ return "Update topic perm.";
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
index b68463396..298914175 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
@@ -42,7 +42,7 @@ public class UpdateTopicSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Update or create topic";
+ return "Update or create topic.";
}
@Override
--
2.32.0.windows.2
From 744167bd01fab6821b4d5ae1794dc845153d5156 Mon Sep 17 00:00:00 2001
From: Ziyi Tan <tanziyi0925@gmail.com>
Date: Wed, 23 Aug 2023 08:32:17 +0800
Subject: [PATCH 2/8] [ISSUE #7142] Add command `RocksDBConfigToJson` to
inspect rocksdb content (#7180)
* feat: add command `RocksDBConfigToJson` to inspect rocksdb content
Signed-off-by: Ziy1-Tan <ajb4596984460@gmail.com>
* refactor: fix style
---------
Signed-off-by: Ziy1-Tan <ajb4596984460@gmail.com>
Co-authored-by: Ziy1-Tan <ajb4596984460@gmail.com>
---
.../tools/command/MQAdminStartup.java | 2 +
.../metadata/RocksDBConfigToJsonCommand.java | 118 ++++++++++++++++++
.../metadata/KvConfigToJsonCommandTest.java | 65 ++++++++++
3 files changed, 185 insertions(+)
create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 890125ca0..324aa1856 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -80,6 +80,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand;
import org.apache.rocketmq.tools.command.message.SendMessageCommand;
+import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand;
import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand;
import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
@@ -211,6 +212,7 @@ public class MQAdminStartup {
initCommand(new ClusterListSubCommand());
initCommand(new TopicListSubCommand());
+ initCommand(new RocksDBConfigToJsonCommand());
initCommand(new UpdateKvConfigCommand());
initCommand(new DeleteKvConfigCommand());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
new file mode 100644
index 000000000..3053f4684
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.metadata;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.config.RocksDBConfigManager;
+import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RocksDBConfigToJsonCommand implements SubCommand {
+ private static final String TOPICS_JSON_CONFIG = "topics";
+ private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups";
+
+ @Override
+ public String commandName() {
+ return "rocksDBConfigToJson";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Convert RocksDB kv config (topics/subscriptionGroups) to json";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option pathOption = new Option("p", "path", true,
+ "Absolute path to the metadata directory");
+ pathOption.setRequired(true);
+ options.addOption(pathOption);
+
+ Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " +
+ "topics/subscriptionGroups");
+ configTypeOption.setRequired(true);
+ options.addOption(configTypeOption);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+ String path = commandLine.getOptionValue("path").trim();
+ if (StringUtils.isEmpty(path) || !new File(path).exists()) {
+ System.out.print("Rocksdb path is invalid.\n");
+ return;
+ }
+
+ String configType = commandLine.getOptionValue("configType").trim().toLowerCase();
+
+ final long memTableFlushInterval = 60 * 60 * 1000L;
+ RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(memTableFlushInterval);
+ try {
+ if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) {
+ // for topics.json
+ final Map<String, JSONObject> topicsJsonConfig = new HashMap<>();
+ final Map<String, JSONObject> topicConfigTable = new HashMap<>();
+ boolean isLoad = kvConfigManager.load(path, (key, value) -> {
+ final String topic = new String(key, DataConverter.charset);
+ final String topicConfig = new String(value, DataConverter.charset);
+ final JSONObject jsonObject = JSONObject.parseObject(topicConfig);
+ topicConfigTable.put(topic, jsonObject);
+ });
+
+ if (isLoad) {
+ topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable));
+ final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true);
+ System.out.print(topicsJsonStr + "\n");
+ return;
+ }
+ }
+ if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) {
+ // for subscriptionGroup.json
+ final Map<String, JSONObject> subscriptionGroupJsonConfig = new HashMap<>();
+ final Map<String, JSONObject> subscriptionGroupTable = new HashMap<>();
+ boolean isLoad = kvConfigManager.load(path, (key, value) -> {
+ final String subscriptionGroup = new String(key, DataConverter.charset);
+ final String subscriptionGroupConfig = new String(value, DataConverter.charset);
+ final JSONObject jsonObject = JSONObject.parseObject(subscriptionGroupConfig);
+ subscriptionGroupTable.put(subscriptionGroup, jsonObject);
+ });
+
+ if (isLoad) {
+ subscriptionGroupJsonConfig.put("subscriptionGroupTable",
+ (JSONObject) JSONObject.toJSON(subscriptionGroupTable));
+ final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true);
+ System.out.print(subscriptionGroupJsonStr + "\n");
+ return;
+ }
+ }
+ System.out.print("Config type was not recognized, configType=" + configType + "\n");
+ } finally {
+ kvConfigManager.stop();
+ }
+ }
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java
new file mode 100644
index 000000000..b2f66c7b0
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.metadata;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class KvConfigToJsonCommandTest {
+ private static final String BASE_PATH = System.getProperty("user.home") + File.separator + "store/config/";
+
+ @Test
+ public void testExecute() throws SubCommandException {
+ {
+ String[] cases = new String[]{"topics", "subscriptionGroups"};
+ for (String c : cases) {
+ RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand();
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ String[] subargs = new String[]{"-p " + BASE_PATH + c, "-t " + c};
+ final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs,
+ cmd.buildCommandlineOptions(options), new DefaultParser());
+ cmd.execute(commandLine, options, null);
+ assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c);
+ assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c);
+ }
+ }
+ // invalid cases
+ {
+ String[][] cases = new String[][]{
+ {"-p " + BASE_PATH + "tmpPath", "-t topics"},
+ {"-p ", "-t topics"},
+ {"-p " + BASE_PATH + "topics", "-t invalid_type"}
+ };
+
+ for (String[] c : cases) {
+ RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand();
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), c,
+ cmd.buildCommandlineOptions(options), new DefaultParser());
+ cmd.execute(commandLine, options, null);
+ }
+ }
+ }
+}
--
2.32.0.windows.2
From bdede35db365a49b211cdc249c68b0f60a3df46d Mon Sep 17 00:00:00 2001
From: mxsm <ljbmxsm@gmail.com>
Date: Wed, 23 Aug 2023 08:34:56 +0800
Subject: [PATCH 3/8] [ISSUE #7124] Fix the typos in the code comments (#7125)
---
.../apache/rocketmq/broker/processor/ReplyMessageProcessor.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index b2db356c8..d3bb048f7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -234,7 +234,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
} else {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
- //set to zore to avoid client decoding exception
+ //set to zero to avoid client decoding exception
responseHeader.setMsgId("0");
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(0L);
--
2.32.0.windows.2
From 9bb73b9a38548b99ac5126c40380c3c2e7fc586e Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Wed, 23 Aug 2023 09:46:27 +0800
Subject: [PATCH 4/8] [#ISSUE 7222] Bug fix and refactoring of the Indexfile in
tiered storage (#7224)
---
.../tieredstore/file/TieredIndexFile.java | 38 +++++++--
.../tieredstore/file/TieredIndexFileTest.java | 84 +++++--------------
2 files changed, 52 insertions(+), 70 deletions(-)
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
index 50beb01ae..eda5e0106 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.tieredstore.file;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -99,7 +100,7 @@ public class TieredIndexFile {
this::doScheduleTask, 10, 10, TimeUnit.SECONDS);
}
- private void doScheduleTask() {
+ protected void doScheduleTask() {
try {
curFileLock.lock();
try {
@@ -145,6 +146,11 @@ public class TieredIndexFile {
}
}
+ @VisibleForTesting
+ public MappedFile getPreMappedFile() {
+ return preMappedFile;
+ }
+
private void initFile() throws IOException {
curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
initIndexFileHeader(curMappedFile);
@@ -156,19 +162,26 @@ public class TieredIndexFile {
if (isFileSealed(curMappedFile)) {
if (preFileExists) {
- preFile.delete();
+ if (preFile.delete()) {
+ logger.info("Pre IndexFile deleted success", preFilepath);
+ } else {
+ logger.error("Pre IndexFile deleted failed", preFilepath);
+ }
}
boolean rename = curMappedFile.renameTo(preFilepath);
if (rename) {
preMappedFile = curMappedFile;
curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
+ initIndexFileHeader(curMappedFile);
preFileExists = true;
}
}
+
if (preFileExists) {
synchronized (TieredIndexFile.class) {
if (inflightCompactFuture.isDone()) {
- inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit(new CompactTask(storeConfig, preMappedFile, flatFile), null);
+ inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit(
+ new CompactTask(storeConfig, preMappedFile, flatFile), null);
}
}
}
@@ -261,7 +274,8 @@ public class TieredIndexFile {
}
}
- public CompletableFuture<List<Pair<Long, ByteBuffer>>> queryAsync(String topic, String key, long beginTime, long endTime) {
+ public CompletableFuture<List<Pair<Long, ByteBuffer>>> queryAsync(String topic, String key, long beginTime,
+ long endTime) {
int hashCode = indexKeyHashMethod(buildKey(topic, key));
int slotPosition = hashCode % maxHashSlotNum;
List<TieredFileSegment> fileSegmentList = flatFile.getFileListByTime(beginTime, endTime);
@@ -355,7 +369,7 @@ public class TieredIndexFile {
private final int fileMaxSize;
private MappedFile originFile;
private TieredFlatFile fileQueue;
- private final MappedFile compactFile;
+ private MappedFile compactFile;
public CompactTask(TieredMessageStoreConfig storeConfig, MappedFile originFile,
TieredFlatFile fileQueue) throws IOException {
@@ -381,6 +395,17 @@ public class TieredIndexFile {
} catch (Throwable throwable) {
logger.error("TieredIndexFile#compactTask: compact index file failed:", throwable);
}
+
+ try {
+ if (originFile != null) {
+ originFile.destroy(-1);
+ }
+ if (compactFile != null) {
+ compactFile.destroy(-1);
+ }
+ } catch (Throwable throwable) {
+ logger.error("TieredIndexFile#compactTask: destroy index file failed:", throwable);
+ }
}
public void compact() {
@@ -396,6 +421,8 @@ public class TieredIndexFile {
fileQueue.commit(true);
compactFile.destroy(-1);
originFile.destroy(-1);
+ compactFile = null;
+ originFile = null;
}
private void buildCompactFile() {
@@ -414,6 +441,7 @@ public class TieredIndexFile {
if (slotValue != -1) {
int indexTotalSize = 0;
int indexPosition = slotValue;
+
while (indexPosition >= 0 && indexPosition < maxIndexNum) {
int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
+ indexPosition * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
index 7ef49578d..262d6645b 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
@@ -19,9 +19,8 @@ package org.apache.rocketmq.tieredstore.file;
import com.sun.jna.Platform;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
@@ -31,9 +30,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
-import org.junit.Assume;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
public class TieredIndexFileTest {
@@ -45,11 +42,12 @@ public class TieredIndexFileTest {
@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
+ storeConfig.setBrokerName("IndexFileBroker");
storeConfig.setStorePathRootDir(storePath);
- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment");
- storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2);
- storeConfig.setTieredStoreIndexFileMaxIndexNum(3);
- mq = new MessageQueue("TieredIndexFileTest", storeConfig.getBrokerName(), 1);
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
+ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
+ storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
+ mq = new MessageQueue("IndexFileTest", storeConfig.getBrokerName(), 1);
TieredStoreUtil.getMetadataStore(storeConfig);
TieredStoreExecutor.init();
}
@@ -61,77 +59,33 @@ public class TieredIndexFileTest {
TieredStoreExecutor.shutdown();
}
- @Ignore
@Test
public void testAppendAndQuery() throws IOException, ClassNotFoundException, NoSuchMethodException {
if (Platform.isWindows()) {
return;
}
- // skip this test on windows
- Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
-
TieredFileAllocator fileQueueFactory = new TieredFileAllocator(storeConfig);
TieredIndexFile indexFile = new TieredIndexFile(fileQueueFactory, storePath);
+
indexFile.append(mq, 0, "key3", 3, 300, 1000);
indexFile.append(mq, 0, "key2", 2, 200, 1100);
indexFile.append(mq, 0, "key1", 1, 100, 1200);
- Awaitility.waitAtMost(5, TimeUnit.SECONDS)
- .until(() -> {
- List<Pair<Long, ByteBuffer>> indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
- if (indexList.size() != 1) {
- return false;
- }
-
- ByteBuffer indexBuffer = indexList.get(0).getValue();
- Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 2, indexBuffer.remaining());
-
- Assert.assertEquals(1, indexBuffer.getLong(4 + 4 + 4));
- Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8));
- Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
-
- Assert.assertEquals(3, indexBuffer.getLong(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4));
- Assert.assertEquals(300, indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4 + 8));
- Assert.assertEquals(0, indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4 + 8 + 4));
- return true;
- });
-
- indexFile.append(mq, 0, "key4", 4, 400, 1300);
- indexFile.append(mq, 0, "key4", 4, 400, 1300);
- indexFile.append(mq, 0, "key4", 4, 400, 1300);
-
- Awaitility.waitAtMost(5, TimeUnit.SECONDS)
- .until(() -> {
- List<Pair<Long, ByteBuffer>> indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1300, 1300).join();
- if (indexList.size() != 1) {
- return false;
- }
-
- ByteBuffer indexBuffer = indexList.get(0).getValue();
- Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, indexBuffer.remaining());
- Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4));
- Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8));
- Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
- return true;
- });
-
- List<Pair<Long, ByteBuffer>> indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1300, 1300).join();
+ // do not do schedule task here
+ TieredStoreExecutor.shutdown();
+ List<Pair<Long, ByteBuffer>> indexList =
+ indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
Assert.assertEquals(0, indexList.size());
- indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1200, 1300).join();
- Assert.assertEquals(2, indexList.size());
-
- ByteBuffer indexBuffer = indexList.get(0).getValue();
- Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, indexBuffer.remaining());
- Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4));
- Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8));
- Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+ // do compaction once
+ TieredStoreExecutor.init();
+ storeConfig.setTieredStoreIndexFileRollingIdleInterval(0);
+ indexFile.doScheduleTask();
+ Awaitility.await().atMost(Duration.ofSeconds(10))
+ .until(() -> !indexFile.getPreMappedFile().getFile().exists());
- indexBuffer = indexList.get(1).getValue();
- Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE, indexBuffer.remaining());
- Assert.assertEquals(2, indexBuffer.getLong(4 + 4 + 4));
- Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8));
- Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+ indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
+ Assert.assertEquals(1, indexList.size());
}
}
--
2.32.0.windows.2
From 69c26d3d29cde7b4484ecd112ab9224f9f42bf45 Mon Sep 17 00:00:00 2001
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
Date: Wed, 23 Aug 2023 10:27:52 +0800
Subject: [PATCH 5/8] [ISSUE #7228] Converge the use of some important
variables for some class
---
.../apache/rocketmq/store/ConsumeQueue.java | 16 ++++++------
.../rocketmq/store/MappedFileQueue.java | 26 +++++++++++--------
.../store/MultiPathMappedFileQueue.java | 4 +--
3 files changed, 24 insertions(+), 22 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index a0b886eb0..56bee2af3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -145,7 +145,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
- this.maxPhysicOffset = offset + size;
+ this.setMaxPhysicOffset(offset + size);
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
@@ -409,7 +409,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
int logicFileSize = this.mappedFileSize;
- this.maxPhysicOffset = phyOffset;
+ this.setMaxPhysicOffset(phyOffset);
long maxExtAddr = 1;
boolean shouldDeleteFile = false;
while (true) {
@@ -435,7 +435,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
- this.maxPhysicOffset = offset + size;
+ this.setMaxPhysicOffset(offset + size);
// This maybe not take effect, when not every consume queue has extend file.
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
@@ -453,7 +453,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
- this.maxPhysicOffset = offset + size;
+ this.setMaxPhysicOffset(offset + size);
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
@@ -881,8 +881,8 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
- if (offset + size <= this.maxPhysicOffset) {
- log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
+ if (offset + size <= this.getMaxPhysicOffset()) {
+ log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", this.getMaxPhysicOffset(), offset);
return true;
}
@@ -926,7 +926,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
);
}
}
- this.maxPhysicOffset = offset + size;
+ this.setMaxPhysicOffset(offset + size);
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
@@ -1130,7 +1130,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
@Override
public void destroy() {
- this.maxPhysicOffset = -1;
+ this.setMaxPhysicOffset(-1);
this.minLogicOffset = 0;
this.mappedFileQueue.destroy();
if (isExtReadEnable()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 0bc70642f..32b90d14f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -285,7 +285,7 @@ public class MappedFileQueue implements Swappable {
if (this.mappedFiles.isEmpty())
return 0;
- long committed = this.flushedWhere;
+ long committed = this.getFlushedWhere();
if (committed != 0) {
MappedFile mappedFile = this.getLastMappedFile(0, false);
if (mappedFile != null) {
@@ -442,11 +442,11 @@ public class MappedFileQueue implements Swappable {
}
public long remainHowManyDataToCommit() {
- return getMaxWrotePosition() - committedWhere;
+ return getMaxWrotePosition() - getCommittedWhere();
}
public long remainHowManyDataToFlush() {
- return getMaxOffset() - flushedWhere;
+ return getMaxOffset() - this.getFlushedWhere();
}
public void deleteLastMappedFile() {
@@ -616,15 +616,15 @@ public class MappedFileQueue implements Swappable {
public boolean flush(final int flushLeastPages) {
boolean result = true;
- MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
+ MappedFile mappedFile = this.findMappedFileByOffset(this.getFlushedWhere(), this.getFlushedWhere() == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
- result = where == this.flushedWhere;
- this.flushedWhere = where;
+ result = where == this.getFlushedWhere();
+ this.setFlushedWhere(where);
if (0 == flushLeastPages) {
- this.storeTimestamp = tmpTimeStamp;
+ this.setStoreTimestamp(tmpTimeStamp);
}
}
@@ -633,12 +633,12 @@ public class MappedFileQueue implements Swappable {
public synchronized boolean commit(final int commitLeastPages) {
boolean result = true;
- MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
+ MappedFile mappedFile = this.findMappedFileByOffset(this.getCommittedWhere(), this.getCommittedWhere() == 0);
if (mappedFile != null) {
int offset = mappedFile.commit(commitLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
- result = where == this.committedWhere;
- this.committedWhere = where;
+ result = where == this.getCommittedWhere();
+ this.setCommittedWhere(where);
}
return result;
@@ -763,7 +763,7 @@ public class MappedFileQueue implements Swappable {
mf.destroy(1000 * 3);
}
this.mappedFiles.clear();
- this.flushedWhere = 0;
+ this.setFlushedWhere(0);
// delete parent directory
File file = new File(storePath);
@@ -848,6 +848,10 @@ public class MappedFileQueue implements Swappable {
return storeTimestamp;
}
+ public void setStoreTimestamp(long storeTimestamp) {
+ this.storeTimestamp = storeTimestamp;
+ }
+
public List<MappedFile> getMappedFiles() {
return mappedFiles;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
index 8f5af9438..8ff050dfe 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.store;
-
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@@ -113,8 +112,7 @@ public class MultiPathMappedFileQueue extends MappedFileQueue {
mf.destroy(1000 * 3);
}
this.mappedFiles.clear();
- this.flushedWhere = 0;
-
+ this.setFlushedWhere(0);
Set<String> storePathSet = getPaths();
storePathSet.addAll(getReadonlyPaths());
--
2.32.0.windows.2
From 3884f595949462044c5cb3c236199bc1d7ad2341 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=9F=B3=E8=87=BB=E8=87=BB=28Steven=20shi=29?=
<shirenchuang@users.noreply.github.com>
Date: Wed, 23 Aug 2023 11:10:30 +0800
Subject: [PATCH 6/8] [ISSUE #7149] When creating and updating Topic, there
will be problems with permission settings (#7151)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
* [ISSUE #7149] fix bug : When creating and updating Topic, there will be problems with permission settings
* [ISSUE #7149] fix bug : When creating and updating Topic, there will be problems with permission settings
* [issue#7249]
---------
Co-authored-by: 十真 <shirenchuang.src@cainiao.com>
---
.../main/java/org/apache/rocketmq/broker/BrokerController.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 13f9d002b..e8f943702 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1733,7 +1733,8 @@ public class BrokerController {
new TopicConfig(topicConfig.getTopicName(),
topicConfig.getReadQueueNums(),
topicConfig.getWriteQueueNums(),
- this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
+ topicConfig.getPerm()
+ & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
} else {
registerTopicConfig = new TopicConfig(topicConfig);
}
--
2.32.0.windows.2
From 017ad110475e8024585327b44f47e5e97aabc63b Mon Sep 17 00:00:00 2001
From: echooymxq <echooy.mxq@gmail.com>
Date: Wed, 23 Aug 2023 11:11:42 +0800
Subject: [PATCH 7/8] [ISSUE #7219] Fix Concurrent modify syncStateSet and Mark
synchronizing frequently when shrink. (#7220)
---
.../broker/controller/ReplicasManager.java | 29 ++++++++++---------
.../ha/autoswitch/AutoSwitchHAService.java | 21 ++++++++------
2 files changed, 28 insertions(+), 22 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index abae7cdb0..37c82e434 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -542,7 +542,7 @@ public class ReplicasManager {
this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
this.tempBrokerMetadata.clear();
this.brokerControllerId = this.brokerMetadata.getBrokerId();
- this.haService.setBrokerControllerId(this.brokerControllerId);
+ this.haService.setLocalBrokerId(this.brokerControllerId);
return true;
} catch (Exception e) {
LOGGER.error("fail to create metadata file", e);
@@ -594,7 +594,7 @@ public class ReplicasManager {
if (this.brokerMetadata.isLoaded()) {
this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
this.brokerControllerId = brokerMetadata.getBrokerId();
- this.haService.setBrokerControllerId(this.brokerControllerId);
+ this.haService.setLocalBrokerId(this.brokerControllerId);
return;
}
// 2. check if temp metadata exist
@@ -735,23 +735,26 @@ public class ReplicasManager {
if (this.checkSyncStateSetTaskFuture != null) {
this.checkSyncStateSetTaskFuture.cancel(false);
}
- this.checkSyncStateSetTaskFuture = this.scheduledService.scheduleAtFixedRate(() -> {
- checkSyncStateSetAndDoReport();
- }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(), TimeUnit.MILLISECONDS);
+ this.checkSyncStateSetTaskFuture = this.scheduledService.scheduleAtFixedRate(this::checkSyncStateSetAndDoReport, 3 * 1000,
+ this.brokerConfig.getCheckSyncStateSetPeriod(), TimeUnit.MILLISECONDS);
}
private void checkSyncStateSetAndDoReport() {
- final Set<Long> newSyncStateSet = this.haService.maybeShrinkSyncStateSet();
- newSyncStateSet.add(this.brokerControllerId);
- synchronized (this) {
- if (this.syncStateSet != null) {
- // Check if syncStateSet changed
- if (this.syncStateSet.size() == newSyncStateSet.size() && this.syncStateSet.containsAll(newSyncStateSet)) {
- return;
+ try {
+ final Set<Long> newSyncStateSet = this.haService.maybeShrinkSyncStateSet();
+ newSyncStateSet.add(this.brokerControllerId);
+ synchronized (this) {
+ if (this.syncStateSet != null) {
+ // Check if syncStateSet changed
+ if (this.syncStateSet.size() == newSyncStateSet.size() && this.syncStateSet.containsAll(newSyncStateSet)) {
+ return;
+ }
}
}
+ doReportSyncStateSetChanged(newSyncStateSet);
+ } catch (Exception e) {
+ LOGGER.error("Check syncStateSet error", e);
}
- doReportSyncStateSetChanged(newSyncStateSet);
}
private void doReportSyncStateSetChanged(Set<Long> newSyncStateSet) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 6dc734e0c..d5393fdca 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -41,6 +41,7 @@ import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -73,7 +74,7 @@ public class AutoSwitchHAService extends DefaultHAService {
private EpochFileCache epochCache;
private AutoSwitchHAClient haClient;
- private Long brokerControllerId = null;
+ private Long localBrokerId = null;
public AutoSwitchHAService() {
}
@@ -287,9 +288,11 @@ public class AutoSwitchHAService extends DefaultHAService {
// If the slaveBrokerId is in syncStateSet but not in connectionCaughtUpTimeTable,
// it means that the broker has not connected.
- for (Long slaveBrokerId : newSyncStateSet) {
- if (!this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) {
- newSyncStateSet.remove(slaveBrokerId);
+ Iterator<Long> iterator = newSyncStateSet.iterator();
+ while (iterator.hasNext()) {
+ Long slaveBrokerId = iterator.next();
+ if (!Objects.equals(slaveBrokerId, this.localBrokerId) && !this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) {
+ iterator.remove();
isSyncStateSetChanged = true;
}
}
@@ -419,7 +422,7 @@ public class AutoSwitchHAService extends DefaultHAService {
// To avoid the syncStateSet is not consistent with connectionList.
// Fix issue: https://github.com/apache/rocketmq/issues/6662
for (Long syncId : currentSyncStateSet) {
- if (!idList.contains(syncId) && this.brokerControllerId != null && !Objects.equals(syncId, this.brokerControllerId)) {
+ if (!idList.contains(syncId) && this.localBrokerId != null && !Objects.equals(syncId, this.localBrokerId)) {
LOGGER.warn("Slave {} is still in syncStateSet, but has lost its connection. So new offset can't be compute.", syncId);
// Without check and re-compute, return the confirmOffset's value directly.
return this.defaultMessageStore.getConfirmOffsetDirectly();
@@ -545,12 +548,12 @@ public class AutoSwitchHAService extends DefaultHAService {
return this.epochCache.getAllEntries();
}
- public Long getBrokerControllerId() {
- return brokerControllerId;
+ public Long getLocalBrokerId() {
+ return localBrokerId;
}
- public void setBrokerControllerId(Long brokerControllerId) {
- this.brokerControllerId = brokerControllerId;
+ public void setLocalBrokerId(Long localBrokerId) {
+ this.localBrokerId = localBrokerId;
}
class AutoSwitchAcceptSocketService extends AcceptSocketService {
--
2.32.0.windows.2
From 77e8e54b37c3fc3ea0beffc1ace6f5bf20af10d9 Mon Sep 17 00:00:00 2001
From: lk <xdkxlk@outlook.com>
Date: Wed, 23 Aug 2023 15:56:39 +0800
Subject: [PATCH 8/8] [ISSUE #7223] Support batch ack for grpc client in proxy
(#7225)
---
.../client/impl/mqclient/MQClientAPIExt.java | 26 +++
.../rocketmq/proxy/config/ProxyConfig.java | 10 +
.../grpc/v2/consumer/AckMessageActivity.java | 136 ++++++++---
.../proxy/processor/AbstractProcessor.java | 4 +-
.../proxy/processor/BatchAckResult.java | 53 +++++
.../proxy/processor/ConsumerProcessor.java | 64 +++++
.../processor/DefaultMessagingProcessor.java | 7 +
.../proxy/processor/MessagingProcessor.java | 18 ++
.../message/ClusterMessageService.java | 16 +-
.../service/message/LocalMessageService.java | 58 +++++
.../proxy/service/message/MessageService.java | 8 +
.../service/message/ReceiptHandleMessage.java | 39 ++++
.../v2/consumer/AckMessageActivityTest.java | 221 +++++++++++++++---
.../proxy/processor/BaseProcessorTest.java | 18 +-
.../processor/ConsumerProcessorTest.java | 115 +++++++++
.../service/mqclient/MQClientAPIExtTest.java | 12 +
16 files changed, 728 insertions(+), 77 deletions(-)
create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java
create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index fb8f8d11f..d7c8ef8d9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -306,6 +306,32 @@ public class MQClientAPIExt extends MQClientAPIImpl {
return future;
}
+ public CompletableFuture<AckResult> batchAckMessageAsync(
+ String brokerAddr,
+ String topic,
+ String consumerGroup,
+ List<String> extraInfoList,
+ long timeoutMillis
+ ) {
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+ try {
+ this.batchAckMessageAsync(brokerAddr, timeoutMillis, new AckCallback() {
+ @Override
+ public void onSuccess(AckResult ackResult) {
+ future.complete(ackResult);
+ }
+
+ @Override
+ public void onException(Throwable t) {
+ future.completeExceptionally(t);
+ }
+ }, topic, consumerGroup, extraInfoList);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+
public CompletableFuture<AckResult> changeInvisibleTimeAsync(
String brokerAddr,
String brokerName,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 39caaa0d9..76a243919 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -250,6 +250,8 @@ public class ProxyConfig implements ConfigFile {
private long remotingWaitTimeMillsInTopicRouteQueue = 3 * 1000;
private long remotingWaitTimeMillsInDefaultQueue = 3 * 1000;
+ private boolean enableBatchAck = false;
+
@Override
public void initData() {
parseDelayLevel();
@@ -1379,4 +1381,12 @@ public class ProxyConfig implements ConfigFile {
public void setRemotingWaitTimeMillsInDefaultQueue(long remotingWaitTimeMillsInDefaultQueue) {
this.remotingWaitTimeMillsInDefaultQueue = remotingWaitTimeMillsInDefaultQueue;
}
+
+ public boolean isEnableBatchAck() {
+ return enableBatchAck;
+ }
+
+ public void setEnableBatchAck(boolean enableBatchAck) {
+ this.enableBatchAck = enableBatchAck;
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
index 9a3a77201..97c716c8f 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
@@ -31,12 +31,15 @@ import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
+import org.apache.rocketmq.proxy.processor.BatchAckResult;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
public class AckMessageActivity extends AbstractMessingActivity {
@@ -50,60 +53,98 @@ public class AckMessageActivity extends AbstractMessingActivity {
try {
validateTopicAndConsumerGroup(request.getTopic(), request.getGroup());
-
- CompletableFuture<AckMessageResultEntry>[] futures = new CompletableFuture[request.getEntriesCount()];
- for (int i = 0; i < request.getEntriesCount(); i++) {
- futures[i] = processAckMessage(ctx, request, request.getEntries(i));
+ String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
+ String topic = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic());
+ if (ConfigurationManager.getProxyConfig().isEnableBatchAck()) {
+ future = ackMessageInBatch(ctx, group, topic, request);
+ } else {
+ future = ackMessageOneByOne(ctx, group, topic, request);
}
- CompletableFuture.allOf(futures).whenComplete((val, throwable) -> {
- if (throwable != null) {
- future.completeExceptionally(throwable);
- return;
- }
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+
+ protected CompletableFuture<AckMessageResponse> ackMessageInBatch(ProxyContext ctx, String group, String topic, AckMessageRequest request) {
+ List<ReceiptHandleMessage> handleMessageList = new ArrayList<>(request.getEntriesCount());
+ for (AckMessageEntry ackMessageEntry : request.getEntriesList()) {
+ String handleString = getHandleString(ctx, group, request, ackMessageEntry);
+ handleMessageList.add(new ReceiptHandleMessage(ReceiptHandle.decode(handleString), ackMessageEntry.getMessageId()));
+ }
+ return this.messagingProcessor.batchAckMessage(ctx, handleMessageList, group, topic)
+ .thenApply(batchAckResultList -> {
+ AckMessageResponse.Builder responseBuilder = AckMessageResponse.newBuilder();
Set<Code> responseCodes = new HashSet<>();
- List<AckMessageResultEntry> entryList = new ArrayList<>();
- for (CompletableFuture<AckMessageResultEntry> entryFuture : futures) {
- AckMessageResultEntry entryResult = entryFuture.join();
- responseCodes.add(entryResult.getStatus().getCode());
- entryList.add(entryResult);
- }
- AckMessageResponse.Builder responseBuilder = AckMessageResponse.newBuilder()
- .addAllEntries(entryList);
- if (responseCodes.size() > 1) {
- responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.MULTIPLE_RESULTS, Code.MULTIPLE_RESULTS.name()));
- } else if (responseCodes.size() == 1) {
- Code code = responseCodes.stream().findAny().get();
- responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(code, code.name()));
- } else {
- responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack message result is empty"));
+ for (BatchAckResult batchAckResult : batchAckResultList) {
+ AckMessageResultEntry entry = convertToAckMessageResultEntry(batchAckResult);
+ responseBuilder.addEntries(entry);
+ responseCodes.add(entry.getStatus().getCode());
}
- future.complete(responseBuilder.build());
+ setAckResponseStatus(responseBuilder, responseCodes);
+ return responseBuilder.build();
});
- } catch (Throwable t) {
- future.completeExceptionally(t);
+ }
+
+ protected AckMessageResultEntry convertToAckMessageResultEntry(BatchAckResult batchAckResult) {
+ ReceiptHandleMessage handleMessage = batchAckResult.getReceiptHandleMessage();
+ AckMessageResultEntry.Builder resultBuilder = AckMessageResultEntry.newBuilder()
+ .setMessageId(handleMessage.getMessageId())
+ .setReceiptHandle(handleMessage.getReceiptHandle().getReceiptHandle());
+ if (batchAckResult.getProxyException() != null) {
+ resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(batchAckResult.getProxyException()));
+ } else {
+ AckResult ackResult = batchAckResult.getAckResult();
+ if (AckStatus.OK.equals(ackResult.getStatus())) {
+ resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name()));
+ } else {
+ resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack failed: status is abnormal"));
+ }
}
- return future;
+ return resultBuilder.build();
}
- protected CompletableFuture<AckMessageResultEntry> processAckMessage(ProxyContext ctx, AckMessageRequest request,
+ protected CompletableFuture<AckMessageResponse> ackMessageOneByOne(ProxyContext ctx, String group, String topic, AckMessageRequest request) {
+ CompletableFuture<AckMessageResponse> resultFuture = new CompletableFuture<>();
+ CompletableFuture<AckMessageResultEntry>[] futures = new CompletableFuture[request.getEntriesCount()];
+ for (int i = 0; i < request.getEntriesCount(); i++) {
+ futures[i] = processAckMessage(ctx, group, topic, request, request.getEntries(i));
+ }
+ CompletableFuture.allOf(futures).whenComplete((val, throwable) -> {
+ if (throwable != null) {
+ resultFuture.completeExceptionally(throwable);
+ return;
+ }
+
+ Set<Code> responseCodes = new HashSet<>();
+ List<AckMessageResultEntry> entryList = new ArrayList<>();
+ for (CompletableFuture<AckMessageResultEntry> entryFuture : futures) {
+ AckMessageResultEntry entryResult = entryFuture.join();
+ responseCodes.add(entryResult.getStatus().getCode());
+ entryList.add(entryResult);
+ }
+ AckMessageResponse.Builder responseBuilder = AckMessageResponse.newBuilder()
+ .addAllEntries(entryList);
+ setAckResponseStatus(responseBuilder, responseCodes);
+ resultFuture.complete(responseBuilder.build());
+ });
+ return resultFuture;
+ }
+
+ protected CompletableFuture<AckMessageResultEntry> processAckMessage(ProxyContext ctx, String group, String topic, AckMessageRequest request,
AckMessageEntry ackMessageEntry) {
CompletableFuture<AckMessageResultEntry> future = new CompletableFuture<>();
try {
- String handleString = ackMessageEntry.getReceiptHandle();
-
- String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
- MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
- if (messageReceiptHandle != null) {
- handleString = messageReceiptHandle.getReceiptHandleStr();
- }
+ String handleString = this.getHandleString(ctx, group, request, ackMessageEntry);
CompletableFuture<AckResult> ackResultFuture = this.messagingProcessor.ackMessage(
ctx,
ReceiptHandle.decode(handleString),
ackMessageEntry.getMessageId(),
group,
- GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()));
+ topic
+ );
ackResultFuture.thenAccept(result -> {
future.complete(convertToAckMessageResultEntry(ctx, ackMessageEntry, result));
}).exceptionally(t -> {
@@ -139,4 +180,25 @@ public class AckMessageActivity extends AbstractMessingActivity {
.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack failed: status is abnormal"))
.build();
}
+
+ protected void setAckResponseStatus(AckMessageResponse.Builder responseBuilder, Set<Code> responseCodes) {
+ if (responseCodes.size() > 1) {
+ responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.MULTIPLE_RESULTS, Code.MULTIPLE_RESULTS.name()));
+ } else if (responseCodes.size() == 1) {
+ Code code = responseCodes.stream().findAny().get();
+ responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(code, code.name()));
+ } else {
+ responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack message result is empty"));
+ }
+ }
+
+ protected String getHandleString(ProxyContext ctx, String group, AckMessageRequest request, AckMessageEntry ackMessageEntry) {
+ String handleString = ackMessageEntry.getReceiptHandle();
+
+ MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
+ if (messageReceiptHandle != null) {
+ handleString = messageReceiptHandle.getReceiptHandleStr();
+ }
+ return handleString;
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
index b61c3df9e..c63212c23 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
@@ -27,6 +27,8 @@ public abstract class AbstractProcessor extends AbstractStartAndShutdown {
protected MessagingProcessor messagingProcessor;
protected ServiceManager serviceManager;
+ protected static final ProxyException EXPIRED_HANDLE_PROXY_EXCEPTION = new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is expired");
+
public AbstractProcessor(MessagingProcessor messagingProcessor,
ServiceManager serviceManager) {
this.messagingProcessor = messagingProcessor;
@@ -35,7 +37,7 @@ public abstract class AbstractProcessor extends AbstractStartAndShutdown {
protected void validateReceiptHandle(ReceiptHandle handle) {
if (handle.isExpired()) {
- throw new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is expired");
+ throw EXPIRED_HANDLE_PROXY_EXCEPTION;
}
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java
new file mode 100644
index 000000000..dfb9c9b9e
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor;
+
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
+
+public class BatchAckResult {
+
+ private final ReceiptHandleMessage receiptHandleMessage;
+ private AckResult ackResult;
+ private ProxyException proxyException;
+
+ public BatchAckResult(ReceiptHandleMessage receiptHandleMessage,
+ AckResult ackResult) {
+ this.receiptHandleMessage = receiptHandleMessage;
+ this.ackResult = ackResult;
+ }
+
+ public BatchAckResult(ReceiptHandleMessage receiptHandleMessage,
+ ProxyException proxyException) {
+ this.receiptHandleMessage = receiptHandleMessage;
+ this.proxyException = proxyException;
+ }
+
+ public ReceiptHandleMessage getReceiptHandleMessage() {
+ return receiptHandleMessage;
+ }
+
+ public AckResult getAckResult() {
+ return ackResult;
+ }
+
+ public ProxyException getProxyException() {
+ return proxyException;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index 656a6339d..f3522b374 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -48,6 +48,7 @@ import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
import org.apache.rocketmq.proxy.service.ServiceManager;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
@@ -241,6 +242,69 @@ public class ConsumerProcessor extends AbstractProcessor {
return FutureUtils.addExecutor(future, this.executor);
}
+ public CompletableFuture<List<BatchAckResult>> batchAckMessage(
+ ProxyContext ctx,
+ List<ReceiptHandleMessage> handleMessageList,
+ String consumerGroup,
+ String topic,
+ long timeoutMillis
+ ) {
+ CompletableFuture<List<BatchAckResult>> future = new CompletableFuture<>();
+ try {
+ List<BatchAckResult> batchAckResultList = new ArrayList<>(handleMessageList.size());
+ Map<String, List<ReceiptHandleMessage>> brokerHandleListMap = new HashMap<>();
+
+ for (ReceiptHandleMessage handleMessage : handleMessageList) {
+ if (handleMessage.getReceiptHandle().isExpired()) {
+ batchAckResultList.add(new BatchAckResult(handleMessage, EXPIRED_HANDLE_PROXY_EXCEPTION));
+ continue;
+ }
+ List<ReceiptHandleMessage> brokerHandleList = brokerHandleListMap.computeIfAbsent(handleMessage.getReceiptHandle().getBrokerName(), key -> new ArrayList<>());
+ brokerHandleList.add(handleMessage);
+ }
+
+ if (brokerHandleListMap.isEmpty()) {
+ return FutureUtils.addExecutor(CompletableFuture.completedFuture(batchAckResultList), this.executor);
+ }
+ Set<Map.Entry<String, List<ReceiptHandleMessage>>> brokerHandleListMapEntrySet = brokerHandleListMap.entrySet();
+ CompletableFuture<List<BatchAckResult>>[] futures = new CompletableFuture[brokerHandleListMapEntrySet.size()];
+ int futureIndex = 0;
+ for (Map.Entry<String, List<ReceiptHandleMessage>> entry : brokerHandleListMapEntrySet) {
+ futures[futureIndex++] = processBrokerHandle(ctx, consumerGroup, topic, entry.getValue(), timeoutMillis);
+ }
+ CompletableFuture.allOf(futures).whenComplete((val, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ }
+ for (CompletableFuture<List<BatchAckResult>> resultFuture : futures) {
+ batchAckResultList.addAll(resultFuture.join());
+ }
+ future.complete(batchAckResultList);
+ });
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return FutureUtils.addExecutor(future, this.executor);
+ }
+
+ protected CompletableFuture<List<BatchAckResult>> processBrokerHandle(ProxyContext ctx, String consumerGroup, String topic, List<ReceiptHandleMessage> handleMessageList, long timeoutMillis) {
+ return this.serviceManager.getMessageService().batchAckMessage(ctx, handleMessageList, consumerGroup, topic, timeoutMillis)
+ .thenApply(result -> {
+ List<BatchAckResult> results = new ArrayList<>();
+ for (ReceiptHandleMessage handleMessage : handleMessageList) {
+ results.add(new BatchAckResult(handleMessage, result));
+ }
+ return results;
+ })
+ .exceptionally(throwable -> {
+ List<BatchAckResult> results = new ArrayList<>();
+ for (ReceiptHandleMessage handleMessage : handleMessageList) {
+ results.add(new BatchAckResult(handleMessage, new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, throwable.getMessage(), throwable)));
+ }
+ return results;
+ });
+ }
+
public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, ReceiptHandle handle,
String messageId, String groupName, String topicName, long invisibleTime, long timeoutMillis) {
CompletableFuture<AckResult> future = new CompletableFuture<>();
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 188cb7b9b..ba150051b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.ServiceManager;
import org.apache.rocketmq.proxy.service.ServiceManagerFactory;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
@@ -183,6 +184,12 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
return this.consumerProcessor.ackMessage(ctx, handle, messageId, consumerGroup, topic, timeoutMillis);
}
+ @Override
+ public CompletableFuture<List<BatchAckResult>> batchAckMessage(ProxyContext ctx,
+ List<ReceiptHandleMessage> handleMessageList, String consumerGroup, String topic, long timeoutMillis) {
+ return this.consumerProcessor.batchAckMessage(ctx, handleMessageList, consumerGroup, topic, timeoutMillis);
+ }
+
@Override
public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, ReceiptHandle handle, String messageId,
String groupName, String topicName, long invisibleTime, long timeoutMillis) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index d86be0bd8..2ae7418ba 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
@@ -155,6 +156,23 @@ public interface MessagingProcessor extends StartAndShutdown {
long timeoutMillis
);
+ default CompletableFuture<List<BatchAckResult>> batchAckMessage(
+ ProxyContext ctx,
+ List<ReceiptHandleMessage> handleMessageList,
+ String consumerGroup,
+ String topic
+ ) {
+ return batchAckMessage(ctx, handleMessageList, consumerGroup, topic, DEFAULT_TIMEOUT_MILLS);
+ }
+
+ CompletableFuture<List<BatchAckResult>> batchAckMessage(
+ ProxyContext ctx,
+ List<ReceiptHandleMessage> handleMessageList,
+ String consumerGroup,
+ String topic,
+ long timeoutMillis
+ );
+
default CompletableFuture<AckResult> changeInvisibleTime(
ProxyContext ctx,
ReceiptHandle handle,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
index 9f163f1b9..70b72deae 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
@@ -20,9 +20,11 @@ import com.google.common.collect.Lists;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
@@ -31,7 +33,6 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.common.utils.FutureUtils;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -137,6 +138,19 @@ public class ClusterMessageService implements MessageService {
);
}
+ @Override
+ public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, List<ReceiptHandleMessage> handleList, String consumerGroup,
+ String topic, long timeoutMillis) {
+ List<String> extraInfoList = handleList.stream().map(message -> message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList());
+ return this.mqClientAPIFactory.getClient().batchAckMessageAsync(
+ this.resolveBrokerAddrInReceiptHandle(ctx, handleList.get(0).getReceiptHandle()),
+ topic,
+ consumerGroup,
+ extraInfoList,
+ timeoutMillis
+ );
+ }
+
@Override
public CompletableFuture<PullResult> pullMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
PullMessageRequestHeader requestHeader, long timeoutMillis) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index eb2c4d9ee..ca7dcc9eb 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.service.message;
import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -54,6 +55,8 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.BatchAck;
+import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
@@ -364,6 +367,61 @@ public class LocalMessageService implements MessageService {
});
}
+ @Override
+ public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, List<ReceiptHandleMessage> handleList,
+ String consumerGroup, String topic, long timeoutMillis) {
+ SimpleChannel channel = channelManager.createChannel(ctx);
+ ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+
+ Map<String, BatchAck> batchAckMap = new HashMap<>();
+ for (ReceiptHandleMessage receiptHandleMessage : handleList) {
+ String extraInfo = receiptHandleMessage.getReceiptHandle().getReceiptHandle();
+ String[] extraInfoData = ExtraInfoUtil.split(extraInfo);
+ String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" +
+ ExtraInfoUtil.getQueueId(extraInfoData) + "@" +
+ ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" +
+ ExtraInfoUtil.getPopTime(extraInfoData);
+ BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> {
+ BatchAck newBatchAck = new BatchAck();
+ newBatchAck.setConsumerGroup(consumerGroup);
+ newBatchAck.setTopic(topic);
+ newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData));
+ newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData));
+ newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData));
+ newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData));
+ newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData));
+ newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData));
+ newBatchAck.setBitSet(new BitSet());
+ return newBatchAck;
+ });
+ bAck.getBitSet().set((int) (ExtraInfoUtil.getQueueOffset(extraInfoData) - ExtraInfoUtil.getCkQueueOffset(extraInfoData)));
+ }
+ BatchAckMessageRequestBody requestBody = new BatchAckMessageRequestBody();
+ requestBody.setBrokerName(brokerController.getBrokerConfig().getBrokerName());
+ requestBody.setAcks(new ArrayList<>(batchAckMap.values()));
+
+ command.setBody(requestBody.encode());
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ try {
+ RemotingCommand response = brokerController.getAckMessageProcessor()
+ .processRequest(channelHandlerContext, command);
+ future.complete(response);
+ } catch (Exception e) {
+ log.error("Fail to process batchAckMessage command", e);
+ future.completeExceptionally(e);
+ }
+ return future.thenApply(r -> {
+ AckResult ackResult = new AckResult();
+ if (ResponseCode.SUCCESS == r.getCode()) {
+ ackResult.setStatus(AckStatus.OK);
+ } else {
+ ackResult.setStatus(AckStatus.NO_EXIST);
+ }
+ return ackResult;
+ });
+ }
+
@Override
public CompletableFuture<PullResult> pullMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
PullMessageRequestHeader requestHeader, long timeoutMillis) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
index 15da17154..58a835adb 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
@@ -91,6 +91,14 @@ public interface MessageService {
long timeoutMillis
);
+ CompletableFuture<AckResult> batchAckMessage(
+ ProxyContext ctx,
+ List<ReceiptHandleMessage> handleList,
+ String consumerGroup,
+ String topic,
+ long timeoutMillis
+ );
+
CompletableFuture<PullResult> pullMessage(
ProxyContext ctx,
AddressableMessageQueue messageQueue,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java
new file mode 100644
index 000000000..ae63fed49
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.message;
+
+import org.apache.rocketmq.common.consumer.ReceiptHandle;
+
+public class ReceiptHandleMessage {
+
+ private final ReceiptHandle receiptHandle;
+ private final String messageId;
+
+ public ReceiptHandleMessage(ReceiptHandle receiptHandle, String messageId) {
+ this.receiptHandle = receiptHandle;
+ this.messageId = messageId;
+ }
+
+ public ReceiptHandle getReceiptHandle() {
+ return receiptHandle;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
index 49fdfc6a8..3c4746105 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
@@ -20,21 +20,32 @@ package org.apache.rocketmq.proxy.grpc.v2.consumer;
import apache.rocketmq.v2.AckMessageEntry;
import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
+import apache.rocketmq.v2.AckMessageResultEntry;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.Resource;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
+import org.apache.rocketmq.proxy.processor.BatchAckResult;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.stubbing.Answer;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
public class AckMessageActivityTest extends BaseActivityTest {
@@ -52,43 +63,197 @@ public class AckMessageActivityTest extends BaseActivityTest {
@Test
public void testAckMessage() throws Throwable {
- when(this.messagingProcessor.ackMessage(any(), any(), eq("msg1"), anyString(), anyString()))
+ ConfigurationManager.getProxyConfig().setEnableBatchAck(false);
+
+ String msg1 = "msg1";
+ String msg2 = "msg2";
+ String msg3 = "msg3";
+
+ when(this.messagingProcessor.ackMessage(any(), any(), eq(msg1), anyString(), anyString()))
.thenThrow(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is expired"));
AckResult msg2AckResult = new AckResult();
msg2AckResult.setStatus(AckStatus.OK);
- when(this.messagingProcessor.ackMessage(any(), any(), eq("msg2"), anyString(), anyString()))
+ when(this.messagingProcessor.ackMessage(any(), any(), eq(msg2), anyString(), anyString()))
.thenReturn(CompletableFuture.completedFuture(msg2AckResult));
AckResult msg3AckResult = new AckResult();
msg3AckResult.setStatus(AckStatus.NO_EXIST);
- when(this.messagingProcessor.ackMessage(any(), any(), eq("msg3"), anyString(), anyString()))
+ when(this.messagingProcessor.ackMessage(any(), any(), eq(msg3), anyString(), anyString()))
.thenReturn(CompletableFuture.completedFuture(msg3AckResult));
- AckMessageResponse response = this.ackMessageActivity.ackMessage(
- createContext(),
- AckMessageRequest.newBuilder()
- .setTopic(Resource.newBuilder().setName(TOPIC).build())
- .setGroup(Resource.newBuilder().setName(GROUP).build())
- .addEntries(AckMessageEntry.newBuilder()
- .setMessageId("msg1")
- .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000))
- .build())
- .addEntries(AckMessageEntry.newBuilder()
- .setMessageId("msg2")
- .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000))
- .build())
- .addEntries(AckMessageEntry.newBuilder()
- .setMessageId("msg3")
- .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000))
- .build())
- .build()
- ).get();
-
- assertEquals(Code.MULTIPLE_RESULTS, response.getStatus().getCode());
- assertEquals(3, response.getEntriesCount());
- assertEquals(Code.INVALID_RECEIPT_HANDLE, response.getEntries(0).getStatus().getCode());
- assertEquals(Code.OK, response.getEntries(1).getStatus().getCode());
- assertEquals(Code.INTERNAL_SERVER_ERROR, response.getEntries(2).getStatus().getCode());
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(msg1)
+ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000))
+ .build())
+ .build()
+ ).get();
+ assertEquals(Code.INVALID_RECEIPT_HANDLE, response.getStatus().getCode());
+ }
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(msg2)
+ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000))
+ .build())
+ .build()
+ ).get();
+ assertEquals(Code.OK, response.getStatus().getCode());
+ }
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(msg3)
+ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000))
+ .build())
+ .build()
+ ).get();
+ assertEquals(Code.INTERNAL_SERVER_ERROR, response.getStatus().getCode());
+ }
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(msg1)
+ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000))
+ .build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(msg2)
+ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000))
+ .build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(msg3)
+ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000))
+ .build())
+ .build()
+ ).get();
+
+ assertEquals(Code.MULTIPLE_RESULTS, response.getStatus().getCode());
+ assertEquals(3, response.getEntriesCount());
+ assertEquals(Code.INVALID_RECEIPT_HANDLE, response.getEntries(0).getStatus().getCode());
+ assertEquals(Code.OK, response.getEntries(1).getStatus().getCode());
+ assertEquals(Code.INTERNAL_SERVER_ERROR, response.getEntries(2).getStatus().getCode());
+ }
+ }
+
+ @Test
+ public void testAckMessageInBatch() throws Throwable {
+ ConfigurationManager.getProxyConfig().setEnableBatchAck(true);
+
+ String successMessageId = "msg1";
+ String notOkMessageId = "msg2";
+ String exceptionMessageId = "msg3";
+
+ doAnswer((Answer<CompletableFuture<List<BatchAckResult>>>) invocation -> {
+ List<ReceiptHandleMessage> receiptHandleMessageList = invocation.getArgument(1, List.class);
+ List<BatchAckResult> batchAckResultList = new ArrayList<>();
+ for (ReceiptHandleMessage receiptHandleMessage : receiptHandleMessageList) {
+ BatchAckResult batchAckResult;
+ if (receiptHandleMessage.getMessageId().equals(successMessageId)) {
+ AckResult ackResult = new AckResult();
+ ackResult.setStatus(AckStatus.OK);
+ batchAckResult = new BatchAckResult(receiptHandleMessage, ackResult);
+ } else if (receiptHandleMessage.getMessageId().equals(notOkMessageId)) {
+ AckResult ackResult = new AckResult();
+ ackResult.setStatus(AckStatus.NO_EXIST);
+ batchAckResult = new BatchAckResult(receiptHandleMessage, ackResult);
+ } else {
+ batchAckResult = new BatchAckResult(receiptHandleMessage, new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, ""));
+ }
+ batchAckResultList.add(batchAckResult);
+ }
+ return CompletableFuture.completedFuture(batchAckResultList);
+ }).when(this.messagingProcessor).batchAckMessage(any(), anyList(), anyString(), anyString());
+
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(successMessageId)
+ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000))
+ .build())
+ .build()
+ ).get();
+ assertEquals(Code.OK, response.getStatus().getCode());
+ }
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(notOkMessageId)
+ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000))
+ .build())
+ .build()
+ ).get();
+ assertEquals(Code.INTERNAL_SERVER_ERROR, response.getStatus().getCode());
+ }
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(exceptionMessageId)
+ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000))
+ .build())
+ .build()
+ ).get();
+ assertEquals(Code.INVALID_RECEIPT_HANDLE, response.getStatus().getCode());
+ }
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(successMessageId)
+ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000))
+ .build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(notOkMessageId)
+ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000))
+ .build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(exceptionMessageId)
+ .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000))
+ .build())
+ .build()
+ ).get();
+
+ assertEquals(Code.MULTIPLE_RESULTS, response.getStatus().getCode());
+ assertEquals(3, response.getEntriesCount());
+ Map<String, Code> msgCode = new HashMap<>();
+ for (AckMessageResultEntry entry : response.getEntriesList()) {
+ msgCode.put(entry.getMessageId(), entry.getStatus().getCode());
+ }
+ assertEquals(Code.OK, msgCode.get(successMessageId));
+ assertEquals(Code.INTERNAL_SERVER_ERROR, msgCode.get(notOkMessageId));
+ assertEquals(Code.INVALID_RECEIPT_HANDLE, msgCode.get(exceptionMessageId));
+ }
}
}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java
index 5c1ea9627..072630e39 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java
@@ -66,14 +66,6 @@ public class BaseProcessorTest extends InitConfigTest {
protected ProxyRelayService proxyRelayService;
@Mock
protected MetadataService metadataService;
- @Mock
- protected ProducerProcessor producerProcessor;
- @Mock
- protected ConsumerProcessor consumerProcessor;
- @Mock
- protected TransactionProcessor transactionProcessor;
- @Mock
- protected ClientProcessor clientProcessor;
public void before() throws Throwable {
super.before();
@@ -92,6 +84,13 @@ public class BaseProcessorTest extends InitConfigTest {
}
protected static MessageExt createMessageExt(String topic, String tags, int reconsumeTimes, long invisibleTime) {
+ return createMessageExt(topic, tags, reconsumeTimes, invisibleTime, System.currentTimeMillis(),
+ RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE),
+ RANDOM.nextInt(Integer.MAX_VALUE), "mockBroker");
+ }
+
+ protected static MessageExt createMessageExt(String topic, String tags, int reconsumeTimes, long invisibleTime, long popTime,
+ long startOffset, int reviveQid, int queueId, long queueOffset, String brokerName) {
MessageExt messageExt = new MessageExt();
messageExt.setTopic(topic);
messageExt.setTags(tags);
@@ -100,8 +99,7 @@ public class BaseProcessorTest extends InitConfigTest {
messageExt.setMsgId(MessageClientIDSetter.createUniqID());
messageExt.setCommitLogOffset(RANDOM.nextInt(Integer.MAX_VALUE));
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_POP_CK,
- ExtraInfoUtil.buildExtraInfo(RANDOM.nextInt(Integer.MAX_VALUE), System.currentTimeMillis(), invisibleTime,
- RANDOM.nextInt(Integer.MAX_VALUE), topic, "mockBroker", RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE)));
+ ExtraInfoUtil.buildExtraInfo(startOffset, popTime, invisibleTime, reviveQid, topic, brokerName, queueId, queueOffset));
return messageExt;
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
index 717e86fc0..db268a06e 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
@@ -20,8 +20,11 @@ package org.apache.rocketmq.proxy.processor;
import com.google.common.collect.Sets;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
@@ -39,7 +42,10 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -50,16 +56,22 @@ import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsumerProcessorTest extends BaseProcessorTest {
@@ -162,6 +174,109 @@ public class ConsumerProcessorTest extends BaseProcessorTest {
assertEquals(handle.getReceiptHandle(), requestHeaderArgumentCaptor.getValue().getExtraInfo());
}
+ @Test
+ public void testBatchAckExpireMessage() throws Throwable {
+ String brokerName1 = "brokerName1";
+
+ List<ReceiptHandleMessage> receiptHandleMessageList = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ MessageExt expireMessage = createMessageExt(TOPIC, "", 0, 3000, System.currentTimeMillis() - 10000,
+ 0, 0, 0, i, brokerName1);
+ ReceiptHandle expireHandle = create(expireMessage);
+ receiptHandleMessageList.add(new ReceiptHandleMessage(expireHandle, expireMessage.getMsgId()));
+ }
+
+ List<BatchAckResult> batchAckResultList = this.consumerProcessor.batchAckMessage(createContext(), receiptHandleMessageList, CONSUMER_GROUP, TOPIC, 3000).get();
+
+ verify(this.messageService, never()).batchAckMessage(any(), anyList(), anyString(), anyString(), anyLong());
+ assertEquals(receiptHandleMessageList.size(), batchAckResultList.size());
+ for (BatchAckResult batchAckResult : batchAckResultList) {
+ assertNull(batchAckResult.getAckResult());
+ assertNotNull(batchAckResult.getProxyException());
+ assertNotNull(batchAckResult.getReceiptHandleMessage());
+ }
+
+ }
+
+ @Test
+ public void testBatchAckMessage() throws Throwable {
+ String brokerName1 = "brokerName1";
+ String brokerName2 = "brokerName2";
+ String errThrowBrokerName = "errThrowBrokerName";
+ MessageExt expireMessage = createMessageExt(TOPIC, "", 0, 3000, System.currentTimeMillis() - 10000,
+ 0, 0, 0, 0, brokerName1);
+ ReceiptHandle expireHandle = create(expireMessage);
+
+ List<ReceiptHandleMessage> receiptHandleMessageList = new ArrayList<>();
+ receiptHandleMessageList.add(new ReceiptHandleMessage(expireHandle, expireMessage.getMsgId()));
+ List<String> broker1Msg = new ArrayList<>();
+ List<String> broker2Msg = new ArrayList<>();
+
+ long now = System.currentTimeMillis();
+ int msgNum = 3;
+ for (int i = 0; i < msgNum; i++) {
+ MessageExt brokerMessage = createMessageExt(TOPIC, "", 0, 3000, now,
+ 0, 0, 0, i + 1, brokerName1);
+ ReceiptHandle brokerHandle = create(brokerMessage);
+ receiptHandleMessageList.add(new ReceiptHandleMessage(brokerHandle, brokerMessage.getMsgId()));
+ broker1Msg.add(brokerMessage.getMsgId());
+ }
+ for (int i = 0; i < msgNum; i++) {
+ MessageExt brokerMessage = createMessageExt(TOPIC, "", 0, 3000, now,
+ 0, 0, 0, i + 1, brokerName2);
+ ReceiptHandle brokerHandle = create(brokerMessage);
+ receiptHandleMessageList.add(new ReceiptHandleMessage(brokerHandle, brokerMessage.getMsgId()));
+ broker2Msg.add(brokerMessage.getMsgId());
+ }
+
+ // for this message, will throw exception in batchAckMessage
+ MessageExt errThrowMessage = createMessageExt(TOPIC, "", 0, 3000, now,
+ 0, 0, 0, 0, errThrowBrokerName);
+ ReceiptHandle errThrowHandle = create(errThrowMessage);
+ receiptHandleMessageList.add(new ReceiptHandleMessage(errThrowHandle, errThrowMessage.getMsgId()));
+
+ Collections.shuffle(receiptHandleMessageList);
+
+ doAnswer((Answer<CompletableFuture<AckResult>>) invocation -> {
+ List<ReceiptHandleMessage> handleMessageList = invocation.getArgument(1, List.class);
+ AckResult ackResult = new AckResult();
+ String brokerName = handleMessageList.get(0).getReceiptHandle().getBrokerName();
+ if (brokerName.equals(brokerName1)) {
+ ackResult.setStatus(AckStatus.OK);
+ } else if (brokerName.equals(brokerName2)) {
+ ackResult.setStatus(AckStatus.NO_EXIST);
+ } else {
+ return FutureUtils.completeExceptionally(new RuntimeException());
+ }
+
+ return CompletableFuture.completedFuture(ackResult);
+ }).when(this.messageService).batchAckMessage(any(), anyList(), anyString(), anyString(), anyLong());
+
+ List<BatchAckResult> batchAckResultList = this.consumerProcessor.batchAckMessage(createContext(), receiptHandleMessageList, CONSUMER_GROUP, TOPIC, 3000).get();
+ assertEquals(receiptHandleMessageList.size(), batchAckResultList.size());
+
+ // check ackResult for each msg
+ Map<String, BatchAckResult> msgBatchAckResult = new HashMap<>();
+ for (BatchAckResult batchAckResult : batchAckResultList) {
+ msgBatchAckResult.put(batchAckResult.getReceiptHandleMessage().getMessageId(), batchAckResult);
+ }
+ for (String msgId : broker1Msg) {
+ assertEquals(AckStatus.OK, msgBatchAckResult.get(msgId).getAckResult().getStatus());
+ assertNull(msgBatchAckResult.get(msgId).getProxyException());
+ }
+ for (String msgId : broker2Msg) {
+ assertEquals(AckStatus.NO_EXIST, msgBatchAckResult.get(msgId).getAckResult().getStatus());
+ assertNull(msgBatchAckResult.get(msgId).getProxyException());
+ }
+ assertNotNull(msgBatchAckResult.get(expireMessage.getMsgId()).getProxyException());
+ assertEquals(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, msgBatchAckResult.get(expireMessage.getMsgId()).getProxyException().getCode());
+ assertNull(msgBatchAckResult.get(expireMessage.getMsgId()).getAckResult());
+
+ assertNotNull(msgBatchAckResult.get(errThrowMessage.getMsgId()).getProxyException());
+ assertEquals(ProxyExceptionCode.INTERNAL_SERVER_ERROR, msgBatchAckResult.get(errThrowMessage.getMsgId()).getProxyException().getCode());
+ assertNull(msgBatchAckResult.get(errThrowMessage.getMsgId()).getAckResult());
+ }
+
@Test
public void testChangeInvisibleTime() throws Throwable {
ReceiptHandle handle = create(createMessageExt(MixAll.RETRY_GROUP_TOPIC_PREFIX + TOPIC, "", 0, 3000));
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
index 77a119a29..3f3a4ae40 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
@@ -220,6 +220,18 @@ public class MQClientAPIExtTest {
assertSame(ackResult, mqClientAPI.ackMessageAsync(BROKER_ADDR, new AckMessageRequestHeader(), TIMEOUT).get());
}
+ @Test
+ public void testBatchAckMessageAsync() throws Exception {
+ AckResult ackResult = new AckResult();
+ doAnswer((Answer<Void>) mock -> {
+ AckCallback ackCallback = mock.getArgument(2);
+ ackCallback.onSuccess(ackResult);
+ return null;
+ }).when(mqClientAPI).batchAckMessageAsync(anyString(), anyLong(), any(AckCallback.class), any());
+
+ assertSame(ackResult, mqClientAPI.batchAckMessageAsync(BROKER_ADDR, TOPIC, CONSUMER_GROUP, new ArrayList<>(), TIMEOUT).get());
+ }
+
@Test
public void testChangeInvisibleTimeAsync() throws Exception {
AckResult ackResult = new AckResult();
--
2.32.0.windows.2