From fec141481496c53a0db398367006c34264662d18 Mon Sep 17 00:00:00 2001 From: yx9o Date: Wed, 23 Aug 2023 08:22:34 +0800 Subject: [PATCH 1/8] [ISSUE #7166] Optimize the display format of admin (#7210) --- .../java/org/apache/rocketmq/tools/command/MQAdminStartup.java | 2 +- .../command/acl/ClusterAclConfigVersionListSubCommand.java | 2 +- .../tools/command/acl/DeleteAccessConfigSubCommand.java | 2 +- .../rocketmq/tools/command/acl/GetAccessConfigSubCommand.java | 2 +- .../tools/command/acl/UpdateAccessConfigSubCommand.java | 2 +- .../tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java | 2 +- .../tools/command/broker/BrokerConsumeStatsSubCommad.java | 2 +- .../rocketmq/tools/command/broker/BrokerStatusSubCommand.java | 2 +- .../tools/command/broker/CommitLogSetReadAheadSubCommand.java | 2 +- .../tools/command/broker/DeleteExpiredCommitLogSubCommand.java | 2 +- .../rocketmq/tools/command/broker/GetBrokerConfigCommand.java | 2 +- .../rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java | 2 +- .../tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java | 2 +- .../broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java | 2 +- .../tools/command/broker/ResetMasterFlushOffsetSubCommand.java | 2 +- .../tools/command/broker/UpdateBrokerConfigSubCommand.java | 2 +- .../broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java | 2 +- .../rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java | 2 +- .../rocketmq/tools/command/cluster/ClusterListSubCommand.java | 2 +- .../tools/command/connection/ConsumerConnectionSubCommand.java | 2 +- .../tools/command/connection/ProducerConnectionSubCommand.java | 2 +- .../tools/command/consumer/ConsumerStatusSubCommand.java | 2 +- .../tools/command/consumer/GetConsumerConfigSubCommand.java | 2 +- .../tools/command/consumer/StartMonitoringSubCommand.java | 2 +- .../tools/command/consumer/UpdateSubGroupSubCommand.java | 2 +- .../rocketmq/tools/command/container/AddBrokerSubCommand.java | 2 +- .../tools/command/container/RemoveBrokerSubCommand.java | 2 +- .../command/controller/CleanControllerBrokerMetaSubCommand.java | 2 +- .../command/controller/GetControllerMetaDataSubCommand.java | 2 +- .../tools/command/controller/ReElectMasterSubCommand.java | 2 +- .../rocketmq/tools/command/export/ExportConfigsCommand.java | 2 +- .../rocketmq/tools/command/export/ExportMetadataCommand.java | 2 +- .../rocketmq/tools/command/export/ExportMetricsCommand.java | 2 +- .../rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java | 2 +- .../apache/rocketmq/tools/command/ha/HAStatusSubCommand.java | 2 +- .../rocketmq/tools/command/message/CheckMsgSendRTCommand.java | 2 +- .../rocketmq/tools/command/message/ConsumeMessageCommand.java | 2 +- .../tools/command/message/DumpCompactionLogCommand.java | 2 +- .../tools/command/message/PrintMessageByQueueCommand.java | 2 +- .../rocketmq/tools/command/message/PrintMessageSubCommand.java | 2 +- .../rocketmq/tools/command/message/QueryMsgByIdSubCommand.java | 2 +- .../rocketmq/tools/command/message/QueryMsgByKeySubCommand.java | 2 +- .../tools/command/message/QueryMsgByOffsetSubCommand.java | 2 +- .../tools/command/message/QueryMsgByUniqueKeySubCommand.java | 2 +- .../tools/command/message/QueryMsgTraceByIdSubCommand.java | 2 +- .../rocketmq/tools/command/message/SendMessageCommand.java | 2 +- .../rocketmq/tools/command/namesrv/AddWritePermSubCommand.java | 2 +- .../rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java | 2 +- .../tools/command/offset/SkipAccumulationSubCommand.java | 2 +- .../apache/rocketmq/tools/command/stats/StatsAllSubCommand.java | 2 +- .../rocketmq/tools/command/topic/AllocateMQSubCommand.java | 2 +- .../rocketmq/tools/command/topic/TopicClusterSubCommand.java | 2 +- .../rocketmq/tools/command/topic/TopicListSubCommand.java | 2 +- .../rocketmq/tools/command/topic/TopicRouteSubCommand.java | 2 +- .../rocketmq/tools/command/topic/TopicStatusSubCommand.java | 2 +- .../rocketmq/tools/command/topic/UpdateOrderConfCommand.java | 2 +- .../tools/command/topic/UpdateStaticTopicSubCommand.java | 2 +- .../rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java | 2 +- .../rocketmq/tools/command/topic/UpdateTopicSubCommand.java | 2 +- 59 files changed, 59 insertions(+), 59 deletions(-) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 0c2618e91..890125ca0 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -278,7 +278,7 @@ public class MQAdminStartup { System.out.printf("The most commonly used mqadmin commands are:%n"); for (SubCommand cmd : SUB_COMMANDS) { - System.out.printf(" %-25s %s%n", cmd.commandName(), cmd.commandDesc()); + System.out.printf(" %-35s %s%n", cmd.commandName(), cmd.commandDesc()); } System.out.printf("%nSee 'mqadmin help ' for more information on a specific command.%n"); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java index f8a00b1e0..26ed028fb 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java @@ -47,7 +47,7 @@ public class ClusterAclConfigVersionListSubCommand implements SubCommand { @Override public String commandDesc() { - return "List all of acl config version information in cluster"; + return "List all of acl config version information in cluster."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java index fd3a92fff..a7f3d295a 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java @@ -42,7 +42,7 @@ public class DeleteAccessConfigSubCommand implements SubCommand { @Override public String commandDesc() { - return "Delete Acl Config Account in broker"; + return "Delete Acl Config Account in broker."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java index 25844d6a1..f1c9a1496 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/GetAccessConfigSubCommand.java @@ -49,7 +49,7 @@ public class GetAccessConfigSubCommand implements SubCommand { @Override public String commandDesc() { - return "List all of acl config information in cluster"; + return "List all of acl config information in cluster."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java index 3be40daa1..d8a06f92d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java @@ -40,7 +40,7 @@ public class UpdateAccessConfigSubCommand implements SubCommand { @Override public String commandDesc() { - return "Update acl config yaml file in broker"; + return "Update acl config yaml file in broker."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java index ff662b506..9dacf1fae 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java @@ -37,7 +37,7 @@ public class UpdateGlobalWhiteAddrSubCommand implements SubCommand { @Override public String commandDesc() { - return "Update global white address for acl Config File in broker"; + return "Update global white address for acl Config File in broker."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java index 3f2f90673..7658a2139 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java @@ -61,7 +61,7 @@ public class BrokerConsumeStatsSubCommad implements SubCommand { @Override public String commandDesc() { - return "Fetch broker consume stats data"; + return "Fetch broker consume stats data."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java index 830ff3425..ce934f547 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java @@ -44,7 +44,7 @@ public class BrokerStatusSubCommand implements SubCommand { @Override public String commandDesc() { - return "Fetch broker runtime status data"; + return "Fetch broker runtime status data."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java index b00c7f5f5..4fdabfdf8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java @@ -44,7 +44,7 @@ public class CommitLogSetReadAheadSubCommand implements SubCommand { @Override public String commandDesc() { - return "set read ahead mode for all commitlog files"; + return "Set read ahead mode for all commitlog files."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java index a4b2a51ad..142bb7b3c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/DeleteExpiredCommitLogSubCommand.java @@ -37,7 +37,7 @@ public class DeleteExpiredCommitLogSubCommand implements SubCommand { @Override public String commandDesc() { - return "Delete expired CommitLog files"; + return "Delete expired CommitLog files."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java index 5d86c10e4..c4762a296 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java @@ -45,7 +45,7 @@ public class GetBrokerConfigCommand implements SubCommand { @Override public String commandDesc() { - return "Get broker config by cluster or special broker"; + return "Get broker config by cluster or special broker."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java index abe8fc622..1a8961e04 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochSubCommand.java @@ -38,7 +38,7 @@ public class GetBrokerEpochSubCommand implements SubCommand { @Override public String commandDesc() { - return "Fetch broker epoch entries"; + return "Fetch broker epoch entries."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java index 7c54e650c..34b3ba7d3 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java @@ -47,7 +47,7 @@ public class GetColdDataFlowCtrInfoSubCommand implements SubCommand { @Override public String commandDesc() { - return "get cold data flow ctr info"; + return "Get cold data flow ctr info."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java index b0477924f..f20407480 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java @@ -36,7 +36,7 @@ public class RemoveColdDataFlowCtrGroupConfigSubCommand implements SubCommand { @Override public String commandDesc() { - return "remove consumer from cold ctr config"; + return "Remove consumer from cold ctr config."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java index b2ac48c84..90451b51f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/ResetMasterFlushOffsetSubCommand.java @@ -33,7 +33,7 @@ public class ResetMasterFlushOffsetSubCommand implements SubCommand { @Override public String commandDesc() { - return "Reset master flush offset in slave"; + return "Reset master flush offset in slave."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java index 98abeb6ae..62816ef03 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java @@ -37,7 +37,7 @@ public class UpdateBrokerConfigSubCommand implements SubCommand { @Override public String commandDesc() { - return "Update broker's config"; + return "Update broker's config."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java index d06a24b57..8d1a00077 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java @@ -39,7 +39,7 @@ public class UpdateColdDataFlowCtrGroupConfigSubCommand implements SubCommand { @Override public String commandDesc() { - return "addOrUpdate cold data flow ctr group config"; + return "Add or update cold data flow ctr group config."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java index 7253970bd..d755e9e5d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java @@ -48,7 +48,7 @@ public class CLusterSendMsgRTCommand implements SubCommand { @Override public String commandDesc() { - return "List All clusters Message Send RT"; + return "List All clusters Message Send RT."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java index a7a840a44..ede0fa5cf 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java @@ -41,7 +41,7 @@ public class ClusterListSubCommand implements SubCommand { @Override public String commandDesc() { - return "List cluster infos"; + return "List cluster infos."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java index 630961e31..35f73d8a0 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java @@ -39,7 +39,7 @@ public class ConsumerConnectionSubCommand implements SubCommand { @Override public String commandDesc() { - return "Query consumer's socket connection, client version and subscription"; + return "Query consumer's socket connection, client version and subscription."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java index 2533982c8..bde674ab2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java @@ -36,7 +36,7 @@ public class ProducerConnectionSubCommand implements SubCommand { @Override public String commandDesc() { - return "Query producer's socket connection and client version"; + return "Query producer's socket connection and client version."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java index 72b9c975e..d8f6f9aa9 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java @@ -47,7 +47,7 @@ public class ConsumerStatusSubCommand implements SubCommand { @Override public String commandDesc() { - return "Query consumer's internal data structure"; + return "Query consumer's internal data structure."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java index 6095e7668..4a8253a02 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java @@ -43,7 +43,7 @@ public class GetConsumerConfigSubCommand implements SubCommand { @Override public String commandDesc() { - return "Get consumer config by subscription group name"; + return "Get consumer config by subscription group name."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java index 2d08d0bd0..f5e140433 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java @@ -34,7 +34,7 @@ public class StartMonitoringSubCommand implements SubCommand { @Override public String commandDesc() { - return "Start Monitoring"; + return "Start Monitoring."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java index f87bafc93..b17da4de4 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java @@ -41,7 +41,7 @@ public class UpdateSubGroupSubCommand implements SubCommand { @Override public String commandDesc() { - return "Update or create subscription group"; + return "Update or create subscription group."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java index e9e5be4a5..007d42ae6 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/container/AddBrokerSubCommand.java @@ -33,7 +33,7 @@ public class AddBrokerSubCommand implements SubCommand { @Override public String commandDesc() { - return "Add a broker to specified container"; + return "Add a broker to specified container."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java index 7c455f858..ab25d8ebe 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java @@ -33,7 +33,7 @@ public class RemoveBrokerSubCommand implements SubCommand { @Override public String commandDesc() { - return "Remove a broker from specified container"; + return "Remove a broker from specified container."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerMetaSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerMetaSubCommand.java index 856e4b426..24ed02566 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerMetaSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerMetaSubCommand.java @@ -37,7 +37,7 @@ public class CleanControllerBrokerMetaSubCommand implements SubCommand { @Override public String commandDesc() { - return "Clean metadata of broker on controller"; + return "Clean metadata of broker on controller."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java index 70bd7f8e9..966443127 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/GetControllerMetaDataSubCommand.java @@ -34,7 +34,7 @@ public class GetControllerMetaDataSubCommand implements SubCommand { @Override public String commandDesc() { - return "Get controller cluster's metadata"; + return "Get controller cluster's metadata."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java index 1affe81f9..a522a903d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java @@ -37,7 +37,7 @@ public class ReElectMasterSubCommand implements SubCommand { @Override public String commandDesc() { - return "Re-elect the specified broker as master"; + return "Re-elect the specified broker as master."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java index b8191296d..03613b29c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java @@ -42,7 +42,7 @@ public class ExportConfigsCommand implements SubCommand { @Override public String commandDesc() { - return "Export configs"; + return "Export configs."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java index 1f9cf7d96..748f7b16e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java @@ -46,7 +46,7 @@ public class ExportMetadataCommand implements SubCommand { @Override public String commandDesc() { - return "Export metadata"; + return "Export metadata."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java index a793b4b84..5d8bb37ba 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java @@ -56,7 +56,7 @@ public class ExportMetricsCommand implements SubCommand { @Override public String commandDesc() { - return "Export metrics"; + return "Export metrics."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java index 44b3ec3e1..b6231e4f9 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java @@ -40,7 +40,7 @@ public class GetSyncStateSetSubCommand implements SubCommand { @Override public String commandDesc() { - return "Fetch syncStateSet for target brokers"; + return "Fetch syncStateSet for target brokers."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java index b1795e046..931658a08 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/HAStatusSubCommand.java @@ -41,7 +41,7 @@ public class HAStatusSubCommand implements SubCommand { @Override public String commandDesc() { - return "Fetch ha runtime status data"; + return "Fetch ha runtime status data."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java index 4c6d5ffb6..b15b59d50 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java @@ -40,7 +40,7 @@ public class CheckMsgSendRTCommand implements SubCommand { @Override public String commandDesc() { - return "Check message send response time"; + return "Check message send response time."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java index 8aed59ea4..02ff53269 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java @@ -70,7 +70,7 @@ public class ConsumeMessageCommand implements SubCommand { @Override public String commandDesc() { - return "Consume message"; + return "Consume message."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java index ae6d9bdcf..eee8f3d4b 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java @@ -38,7 +38,7 @@ import java.nio.file.Paths; public class DumpCompactionLogCommand implements SubCommand { @Override public String commandDesc() { - return "parse compaction log to message"; + return "Parse compaction log to message."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java index 654560167..0418e88a7 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java @@ -108,7 +108,7 @@ public class PrintMessageByQueueCommand implements SubCommand { @Override public String commandDesc() { - return "Print Message Detail"; + return "Print Message Detail by queueId."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java index d01c36d42..bb82f5079 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java @@ -62,7 +62,7 @@ public class PrintMessageSubCommand implements SubCommand { @Override public String commandDesc() { - return "Print Message Detail"; + return "Print Message Detail."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java index 2880477f1..b42612150 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java @@ -186,7 +186,7 @@ public class QueryMsgByIdSubCommand implements SubCommand { @Override public String commandDesc() { - return "Query Message by Id"; + return "Query Message by Id."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java index ba7b00c3b..64627fd19 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java @@ -36,7 +36,7 @@ public class QueryMsgByKeySubCommand implements SubCommand { @Override public String commandDesc() { - return "Query Message by Key"; + return "Query Message by Key."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java index d27313af1..14d0625fd 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java @@ -39,7 +39,7 @@ public class QueryMsgByOffsetSubCommand implements SubCommand { @Override public String commandDesc() { - return "Query Message by offset"; + return "Query Message by offset."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java index 1b28f8be1..b71cee901 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java @@ -141,7 +141,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { @Override public String commandDesc() { - return "Query Message by Unique key"; + return "Query Message by Unique key."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java index 2b982efef..2c546ec56 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java @@ -65,7 +65,7 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand { @Override public String commandDesc() { - return "Query a message trace"; + return "Query a message trace."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java index 836ee192b..970da6b16 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java @@ -41,7 +41,7 @@ public class SendMessageCommand implements SubCommand { @Override public String commandDesc() { - return "Send a message"; + return "Send a message."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java index 98542d065..0b0a075bd 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommand.java @@ -34,7 +34,7 @@ public class AddWritePermSubCommand implements SubCommand { @Override public String commandDesc() { - return "Add write perm of broker in all name server you defined in the -n param"; + return "Add write perm of broker in all name server you defined in the -n param."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java index 213931ed8..637dd52c8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java @@ -34,7 +34,7 @@ public class WipeWritePermSubCommand implements SubCommand { @Override public String commandDesc() { - return "Wipe write perm of broker in all name server you defined in the -n param"; + return "Wipe write perm of broker in all name server you defined in the -n param."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java index 139821f9c..b22491a59 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java @@ -41,7 +41,7 @@ public class SkipAccumulationSubCommand implements SubCommand { @Override public String commandDesc() { - return "Skip all messages that are accumulated (not consumed) currently"; + return "Skip all messages that are accumulated (not consumed) currently."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java index 1d49bbe11..96097a93e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java @@ -144,7 +144,7 @@ public class StatsAllSubCommand implements SubCommand { @Override public String commandDesc() { - return "Topic and Consumer tps stats"; + return "Topic and Consumer tps stats."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java index 3fa42f297..6a9b81eb8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java @@ -41,7 +41,7 @@ public class AllocateMQSubCommand implements SubCommand { @Override public String commandDesc() { - return "Allocate MQ"; + return "Allocate MQ."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java index 1dab693d9..098f34ff0 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java @@ -34,7 +34,7 @@ public class TopicClusterSubCommand implements SubCommand { @Override public String commandDesc() { - return "Get cluster info for topic"; + return "Get cluster info for topic."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java index 346bac704..d9a279f80 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java @@ -45,7 +45,7 @@ public class TopicListSubCommand implements SubCommand { @Override public String commandDesc() { - return "Fetch all topic list from name server"; + return "Fetch all topic list from name server."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java index f2dabec4e..70949d388 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java @@ -42,7 +42,7 @@ public class TopicRouteSubCommand implements SubCommand { @Override public String commandDesc() { - return "Examine topic route info"; + return "Examine topic route info."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java index fdb249fab..a1619eced 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java @@ -40,7 +40,7 @@ public class TopicStatusSubCommand implements SubCommand { @Override public String commandDesc() { - return "Examine topic Status info"; + return "Examine topic Status info."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java index bebc646b4..3040d04c2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java @@ -36,7 +36,7 @@ public class UpdateOrderConfCommand implements SubCommand { @Override public String commandDesc() { - return "Create or update or delete order conf"; + return "Create or update or delete order conf."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 85a18c654..3daeee86c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -48,7 +48,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { @Override public String commandDesc() { - return "Update or create static topic, which has fixed number of queues"; + return "Update or create static topic, which has fixed number of queues."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java index aaa881538..d27cd1861 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java @@ -44,7 +44,7 @@ public class UpdateTopicPermSubCommand implements SubCommand { @Override public String commandDesc() { - return "Update topic perm"; + return "Update topic perm."; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java index b68463396..298914175 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java @@ -42,7 +42,7 @@ public class UpdateTopicSubCommand implements SubCommand { @Override public String commandDesc() { - return "Update or create topic"; + return "Update or create topic."; } @Override -- 2.32.0.windows.2 From 744167bd01fab6821b4d5ae1794dc845153d5156 Mon Sep 17 00:00:00 2001 From: Ziyi Tan Date: Wed, 23 Aug 2023 08:32:17 +0800 Subject: [PATCH 2/8] [ISSUE #7142] Add command `RocksDBConfigToJson` to inspect rocksdb content (#7180) * feat: add command `RocksDBConfigToJson` to inspect rocksdb content Signed-off-by: Ziy1-Tan * refactor: fix style --------- Signed-off-by: Ziy1-Tan Co-authored-by: Ziy1-Tan --- .../tools/command/MQAdminStartup.java | 2 + .../metadata/RocksDBConfigToJsonCommand.java | 118 ++++++++++++++++++ .../metadata/KvConfigToJsonCommandTest.java | 65 ++++++++++ 3 files changed, 185 insertions(+) create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 890125ca0..324aa1856 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -80,6 +80,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand; import org.apache.rocketmq.tools.command.message.SendMessageCommand; +import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand; import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand; import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; @@ -211,6 +212,7 @@ public class MQAdminStartup { initCommand(new ClusterListSubCommand()); initCommand(new TopicListSubCommand()); + initCommand(new RocksDBConfigToJsonCommand()); initCommand(new UpdateKvConfigCommand()); initCommand(new DeleteKvConfigCommand()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java new file mode 100644 index 000000000..3053f4684 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.metadata; + +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.config.RocksDBConfigManager; +import org.apache.rocketmq.common.utils.DataConverter; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +public class RocksDBConfigToJsonCommand implements SubCommand { + private static final String TOPICS_JSON_CONFIG = "topics"; + private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups"; + + @Override + public String commandName() { + return "rocksDBConfigToJson"; + } + + @Override + public String commandDesc() { + return "Convert RocksDB kv config (topics/subscriptionGroups) to json"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option pathOption = new Option("p", "path", true, + "Absolute path to the metadata directory"); + pathOption.setRequired(true); + options.addOption(pathOption); + + Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " + + "topics/subscriptionGroups"); + configTypeOption.setRequired(true); + options.addOption(configTypeOption); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + String path = commandLine.getOptionValue("path").trim(); + if (StringUtils.isEmpty(path) || !new File(path).exists()) { + System.out.print("Rocksdb path is invalid.\n"); + return; + } + + String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); + + final long memTableFlushInterval = 60 * 60 * 1000L; + RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(memTableFlushInterval); + try { + if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) { + // for topics.json + final Map topicsJsonConfig = new HashMap<>(); + final Map topicConfigTable = new HashMap<>(); + boolean isLoad = kvConfigManager.load(path, (key, value) -> { + final String topic = new String(key, DataConverter.charset); + final String topicConfig = new String(value, DataConverter.charset); + final JSONObject jsonObject = JSONObject.parseObject(topicConfig); + topicConfigTable.put(topic, jsonObject); + }); + + if (isLoad) { + topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable)); + final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true); + System.out.print(topicsJsonStr + "\n"); + return; + } + } + if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) { + // for subscriptionGroup.json + final Map subscriptionGroupJsonConfig = new HashMap<>(); + final Map subscriptionGroupTable = new HashMap<>(); + boolean isLoad = kvConfigManager.load(path, (key, value) -> { + final String subscriptionGroup = new String(key, DataConverter.charset); + final String subscriptionGroupConfig = new String(value, DataConverter.charset); + final JSONObject jsonObject = JSONObject.parseObject(subscriptionGroupConfig); + subscriptionGroupTable.put(subscriptionGroup, jsonObject); + }); + + if (isLoad) { + subscriptionGroupJsonConfig.put("subscriptionGroupTable", + (JSONObject) JSONObject.toJSON(subscriptionGroupTable)); + final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true); + System.out.print(subscriptionGroupJsonStr + "\n"); + return; + } + } + System.out.print("Config type was not recognized, configType=" + configType + "\n"); + } finally { + kvConfigManager.stop(); + } + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java new file mode 100644 index 000000000..b2f66c7b0 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/KvConfigToJsonCommandTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.metadata; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.junit.Test; + +import java.io.File; + +import static org.assertj.core.api.Assertions.assertThat; + +public class KvConfigToJsonCommandTest { + private static final String BASE_PATH = System.getProperty("user.home") + File.separator + "store/config/"; + + @Test + public void testExecute() throws SubCommandException { + { + String[] cases = new String[]{"topics", "subscriptionGroups"}; + for (String c : cases) { + RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-p " + BASE_PATH + c, "-t " + c}; + final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, + cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.execute(commandLine, options, null); + assertThat(commandLine.getOptionValue("p").trim()).isEqualTo(BASE_PATH + c); + assertThat(commandLine.getOptionValue("t").trim()).isEqualTo(c); + } + } + // invalid cases + { + String[][] cases = new String[][]{ + {"-p " + BASE_PATH + "tmpPath", "-t topics"}, + {"-p ", "-t topics"}, + {"-p " + BASE_PATH + "topics", "-t invalid_type"} + }; + + for (String[] c : cases) { + RocksDBConfigToJsonCommand cmd = new RocksDBConfigToJsonCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), c, + cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.execute(commandLine, options, null); + } + } + } +} -- 2.32.0.windows.2 From bdede35db365a49b211cdc249c68b0f60a3df46d Mon Sep 17 00:00:00 2001 From: mxsm Date: Wed, 23 Aug 2023 08:34:56 +0800 Subject: [PATCH 3/8] [ISSUE #7124] Fix the typos in the code comments (#7125) --- .../apache/rocketmq/broker/processor/ReplyMessageProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java index b2db356c8..d3bb048f7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java @@ -234,7 +234,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor { } else { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); - //set to zore to avoid client decoding exception + //set to zero to avoid client decoding exception responseHeader.setMsgId("0"); responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(0L); -- 2.32.0.windows.2 From 9bb73b9a38548b99ac5126c40380c3c2e7fc586e Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Wed, 23 Aug 2023 09:46:27 +0800 Subject: [PATCH 4/8] [#ISSUE 7222] Bug fix and refactoring of the Indexfile in tiered storage (#7224) --- .../tieredstore/file/TieredIndexFile.java | 38 +++++++-- .../tieredstore/file/TieredIndexFileTest.java | 84 +++++-------------- 2 files changed, 52 insertions(+), 70 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java index 50beb01ae..eda5e0106 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.tieredstore.file; +import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -99,7 +100,7 @@ public class TieredIndexFile { this::doScheduleTask, 10, 10, TimeUnit.SECONDS); } - private void doScheduleTask() { + protected void doScheduleTask() { try { curFileLock.lock(); try { @@ -145,6 +146,11 @@ public class TieredIndexFile { } } + @VisibleForTesting + public MappedFile getPreMappedFile() { + return preMappedFile; + } + private void initFile() throws IOException { curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize); initIndexFileHeader(curMappedFile); @@ -156,19 +162,26 @@ public class TieredIndexFile { if (isFileSealed(curMappedFile)) { if (preFileExists) { - preFile.delete(); + if (preFile.delete()) { + logger.info("Pre IndexFile deleted success", preFilepath); + } else { + logger.error("Pre IndexFile deleted failed", preFilepath); + } } boolean rename = curMappedFile.renameTo(preFilepath); if (rename) { preMappedFile = curMappedFile; curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize); + initIndexFileHeader(curMappedFile); preFileExists = true; } } + if (preFileExists) { synchronized (TieredIndexFile.class) { if (inflightCompactFuture.isDone()) { - inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit(new CompactTask(storeConfig, preMappedFile, flatFile), null); + inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit( + new CompactTask(storeConfig, preMappedFile, flatFile), null); } } } @@ -261,7 +274,8 @@ public class TieredIndexFile { } } - public CompletableFuture>> queryAsync(String topic, String key, long beginTime, long endTime) { + public CompletableFuture>> queryAsync(String topic, String key, long beginTime, + long endTime) { int hashCode = indexKeyHashMethod(buildKey(topic, key)); int slotPosition = hashCode % maxHashSlotNum; List fileSegmentList = flatFile.getFileListByTime(beginTime, endTime); @@ -355,7 +369,7 @@ public class TieredIndexFile { private final int fileMaxSize; private MappedFile originFile; private TieredFlatFile fileQueue; - private final MappedFile compactFile; + private MappedFile compactFile; public CompactTask(TieredMessageStoreConfig storeConfig, MappedFile originFile, TieredFlatFile fileQueue) throws IOException { @@ -381,6 +395,17 @@ public class TieredIndexFile { } catch (Throwable throwable) { logger.error("TieredIndexFile#compactTask: compact index file failed:", throwable); } + + try { + if (originFile != null) { + originFile.destroy(-1); + } + if (compactFile != null) { + compactFile.destroy(-1); + } + } catch (Throwable throwable) { + logger.error("TieredIndexFile#compactTask: destroy index file failed:", throwable); + } } public void compact() { @@ -396,6 +421,8 @@ public class TieredIndexFile { fileQueue.commit(true); compactFile.destroy(-1); originFile.destroy(-1); + compactFile = null; + originFile = null; } private void buildCompactFile() { @@ -414,6 +441,7 @@ public class TieredIndexFile { if (slotValue != -1) { int indexTotalSize = 0; int indexPosition = slotValue; + while (indexPosition >= 0 && indexPosition < maxIndexNum) { int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE + indexPosition * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE; diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java index 7ef49578d..262d6645b 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java @@ -19,9 +19,8 @@ package org.apache.rocketmq.tieredstore.file; import com.sun.jna.Platform; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.List; -import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; @@ -31,9 +30,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; -import org.junit.Assume; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; public class TieredIndexFileTest { @@ -45,11 +42,12 @@ public class TieredIndexFileTest { @Before public void setUp() { storeConfig = new TieredMessageStoreConfig(); + storeConfig.setBrokerName("IndexFileBroker"); storeConfig.setStorePathRootDir(storePath); - storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment"); - storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2); - storeConfig.setTieredStoreIndexFileMaxIndexNum(3); - mq = new MessageQueue("TieredIndexFileTest", storeConfig.getBrokerName(), 1); + storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); + storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5); + storeConfig.setTieredStoreIndexFileMaxIndexNum(20); + mq = new MessageQueue("IndexFileTest", storeConfig.getBrokerName(), 1); TieredStoreUtil.getMetadataStore(storeConfig); TieredStoreExecutor.init(); } @@ -61,77 +59,33 @@ public class TieredIndexFileTest { TieredStoreExecutor.shutdown(); } - @Ignore @Test public void testAppendAndQuery() throws IOException, ClassNotFoundException, NoSuchMethodException { if (Platform.isWindows()) { return; } - // skip this test on windows - Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS); - TieredFileAllocator fileQueueFactory = new TieredFileAllocator(storeConfig); TieredIndexFile indexFile = new TieredIndexFile(fileQueueFactory, storePath); + indexFile.append(mq, 0, "key3", 3, 300, 1000); indexFile.append(mq, 0, "key2", 2, 200, 1100); indexFile.append(mq, 0, "key1", 1, 100, 1200); - Awaitility.waitAtMost(5, TimeUnit.SECONDS) - .until(() -> { - List> indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); - if (indexList.size() != 1) { - return false; - } - - ByteBuffer indexBuffer = indexList.get(0).getValue(); - Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 2, indexBuffer.remaining()); - - Assert.assertEquals(1, indexBuffer.getLong(4 + 4 + 4)); - Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8)); - Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8 + 4)); - - Assert.assertEquals(3, indexBuffer.getLong(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4)); - Assert.assertEquals(300, indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4 + 8)); - Assert.assertEquals(0, indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4 + 8 + 4)); - return true; - }); - - indexFile.append(mq, 0, "key4", 4, 400, 1300); - indexFile.append(mq, 0, "key4", 4, 400, 1300); - indexFile.append(mq, 0, "key4", 4, 400, 1300); - - Awaitility.waitAtMost(5, TimeUnit.SECONDS) - .until(() -> { - List> indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1300, 1300).join(); - if (indexList.size() != 1) { - return false; - } - - ByteBuffer indexBuffer = indexList.get(0).getValue(); - Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, indexBuffer.remaining()); - Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4)); - Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8)); - Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4)); - return true; - }); - - List> indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1300, 1300).join(); + // do not do schedule task here + TieredStoreExecutor.shutdown(); + List> indexList = + indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); Assert.assertEquals(0, indexList.size()); - indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1200, 1300).join(); - Assert.assertEquals(2, indexList.size()); - - ByteBuffer indexBuffer = indexList.get(0).getValue(); - Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, indexBuffer.remaining()); - Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4)); - Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8)); - Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4)); + // do compaction once + TieredStoreExecutor.init(); + storeConfig.setTieredStoreIndexFileRollingIdleInterval(0); + indexFile.doScheduleTask(); + Awaitility.await().atMost(Duration.ofSeconds(10)) + .until(() -> !indexFile.getPreMappedFile().getFile().exists()); - indexBuffer = indexList.get(1).getValue(); - Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE, indexBuffer.remaining()); - Assert.assertEquals(2, indexBuffer.getLong(4 + 4 + 4)); - Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8)); - Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8 + 4)); + indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); + Assert.assertEquals(1, indexList.size()); } } -- 2.32.0.windows.2 From 69c26d3d29cde7b4484ecd112ab9224f9f42bf45 Mon Sep 17 00:00:00 2001 From: guyinyou <36399867+guyinyou@users.noreply.github.com> Date: Wed, 23 Aug 2023 10:27:52 +0800 Subject: [PATCH 5/8] [ISSUE #7228] Converge the use of some important variables for some class --- .../apache/rocketmq/store/ConsumeQueue.java | 16 ++++++------ .../rocketmq/store/MappedFileQueue.java | 26 +++++++++++-------- .../store/MultiPathMappedFileQueue.java | 4 +-- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index a0b886eb0..56bee2af3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -145,7 +145,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { if (offset >= 0 && size > 0) { mappedFileOffset = i + CQ_STORE_UNIT_SIZE; - this.maxPhysicOffset = offset + size; + this.setMaxPhysicOffset(offset + size); if (isExtAddr(tagsCode)) { maxExtAddr = tagsCode; } @@ -409,7 +409,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { int logicFileSize = this.mappedFileSize; - this.maxPhysicOffset = phyOffset; + this.setMaxPhysicOffset(phyOffset); long maxExtAddr = 1; boolean shouldDeleteFile = false; while (true) { @@ -435,7 +435,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { mappedFile.setWrotePosition(pos); mappedFile.setCommittedPosition(pos); mappedFile.setFlushedPosition(pos); - this.maxPhysicOffset = offset + size; + this.setMaxPhysicOffset(offset + size); // This maybe not take effect, when not every consume queue has extend file. if (isExtAddr(tagsCode)) { maxExtAddr = tagsCode; @@ -453,7 +453,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { mappedFile.setWrotePosition(pos); mappedFile.setCommittedPosition(pos); mappedFile.setFlushedPosition(pos); - this.maxPhysicOffset = offset + size; + this.setMaxPhysicOffset(offset + size); if (isExtAddr(tagsCode)) { maxExtAddr = tagsCode; } @@ -881,8 +881,8 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { - if (offset + size <= this.maxPhysicOffset) { - log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); + if (offset + size <= this.getMaxPhysicOffset()) { + log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", this.getMaxPhysicOffset(), offset); return true; } @@ -926,7 +926,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { ); } } - this.maxPhysicOffset = offset + size; + this.setMaxPhysicOffset(offset + size); return mappedFile.appendMessage(this.byteBufferIndex.array()); } return false; @@ -1130,7 +1130,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { @Override public void destroy() { - this.maxPhysicOffset = -1; + this.setMaxPhysicOffset(-1); this.minLogicOffset = 0; this.mappedFileQueue.destroy(); if (isExtReadEnable()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index 0bc70642f..32b90d14f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -285,7 +285,7 @@ public class MappedFileQueue implements Swappable { if (this.mappedFiles.isEmpty()) return 0; - long committed = this.flushedWhere; + long committed = this.getFlushedWhere(); if (committed != 0) { MappedFile mappedFile = this.getLastMappedFile(0, false); if (mappedFile != null) { @@ -442,11 +442,11 @@ public class MappedFileQueue implements Swappable { } public long remainHowManyDataToCommit() { - return getMaxWrotePosition() - committedWhere; + return getMaxWrotePosition() - getCommittedWhere(); } public long remainHowManyDataToFlush() { - return getMaxOffset() - flushedWhere; + return getMaxOffset() - this.getFlushedWhere(); } public void deleteLastMappedFile() { @@ -616,15 +616,15 @@ public class MappedFileQueue implements Swappable { public boolean flush(final int flushLeastPages) { boolean result = true; - MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); + MappedFile mappedFile = this.findMappedFileByOffset(this.getFlushedWhere(), this.getFlushedWhere() == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; - result = where == this.flushedWhere; - this.flushedWhere = where; + result = where == this.getFlushedWhere(); + this.setFlushedWhere(where); if (0 == flushLeastPages) { - this.storeTimestamp = tmpTimeStamp; + this.setStoreTimestamp(tmpTimeStamp); } } @@ -633,12 +633,12 @@ public class MappedFileQueue implements Swappable { public synchronized boolean commit(final int commitLeastPages) { boolean result = true; - MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0); + MappedFile mappedFile = this.findMappedFileByOffset(this.getCommittedWhere(), this.getCommittedWhere() == 0); if (mappedFile != null) { int offset = mappedFile.commit(commitLeastPages); long where = mappedFile.getFileFromOffset() + offset; - result = where == this.committedWhere; - this.committedWhere = where; + result = where == this.getCommittedWhere(); + this.setCommittedWhere(where); } return result; @@ -763,7 +763,7 @@ public class MappedFileQueue implements Swappable { mf.destroy(1000 * 3); } this.mappedFiles.clear(); - this.flushedWhere = 0; + this.setFlushedWhere(0); // delete parent directory File file = new File(storePath); @@ -848,6 +848,10 @@ public class MappedFileQueue implements Swappable { return storeTimestamp; } + public void setStoreTimestamp(long storeTimestamp) { + this.storeTimestamp = storeTimestamp; + } + public List getMappedFiles() { return mappedFiles; } diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java index 8f5af9438..8ff050dfe 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.store; - import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -113,8 +112,7 @@ public class MultiPathMappedFileQueue extends MappedFileQueue { mf.destroy(1000 * 3); } this.mappedFiles.clear(); - this.flushedWhere = 0; - + this.setFlushedWhere(0); Set storePathSet = getPaths(); storePathSet.addAll(getReadonlyPaths()); -- 2.32.0.windows.2 From 3884f595949462044c5cb3c236199bc1d7ad2341 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=B3=E8=87=BB=E8=87=BB=28Steven=20shi=29?= Date: Wed, 23 Aug 2023 11:10:30 +0800 Subject: [PATCH 6/8] [ISSUE #7149] When creating and updating Topic, there will be problems with permission settings (#7151) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [ISSUE #7149] fix bug : When creating and updating Topic, there will be problems with permission settings * [ISSUE #7149] fix bug : When creating and updating Topic, there will be problems with permission settings * [issue#7249] --------- Co-authored-by: 十真 --- .../main/java/org/apache/rocketmq/broker/BrokerController.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 13f9d002b..e8f943702 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1733,7 +1733,8 @@ public class BrokerController { new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), - this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag()); + topicConfig.getPerm() + & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag()); } else { registerTopicConfig = new TopicConfig(topicConfig); } -- 2.32.0.windows.2 From 017ad110475e8024585327b44f47e5e97aabc63b Mon Sep 17 00:00:00 2001 From: echooymxq Date: Wed, 23 Aug 2023 11:11:42 +0800 Subject: [PATCH 7/8] [ISSUE #7219] Fix Concurrent modify syncStateSet and Mark synchronizing frequently when shrink. (#7220) --- .../broker/controller/ReplicasManager.java | 29 ++++++++++--------- .../ha/autoswitch/AutoSwitchHAService.java | 21 ++++++++------ 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index abae7cdb0..37c82e434 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -542,7 +542,7 @@ public class ReplicasManager { this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId()); this.tempBrokerMetadata.clear(); this.brokerControllerId = this.brokerMetadata.getBrokerId(); - this.haService.setBrokerControllerId(this.brokerControllerId); + this.haService.setLocalBrokerId(this.brokerControllerId); return true; } catch (Exception e) { LOGGER.error("fail to create metadata file", e); @@ -594,7 +594,7 @@ public class ReplicasManager { if (this.brokerMetadata.isLoaded()) { this.registerState = RegisterState.CREATE_METADATA_FILE_DONE; this.brokerControllerId = brokerMetadata.getBrokerId(); - this.haService.setBrokerControllerId(this.brokerControllerId); + this.haService.setLocalBrokerId(this.brokerControllerId); return; } // 2. check if temp metadata exist @@ -735,23 +735,26 @@ public class ReplicasManager { if (this.checkSyncStateSetTaskFuture != null) { this.checkSyncStateSetTaskFuture.cancel(false); } - this.checkSyncStateSetTaskFuture = this.scheduledService.scheduleAtFixedRate(() -> { - checkSyncStateSetAndDoReport(); - }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(), TimeUnit.MILLISECONDS); + this.checkSyncStateSetTaskFuture = this.scheduledService.scheduleAtFixedRate(this::checkSyncStateSetAndDoReport, 3 * 1000, + this.brokerConfig.getCheckSyncStateSetPeriod(), TimeUnit.MILLISECONDS); } private void checkSyncStateSetAndDoReport() { - final Set newSyncStateSet = this.haService.maybeShrinkSyncStateSet(); - newSyncStateSet.add(this.brokerControllerId); - synchronized (this) { - if (this.syncStateSet != null) { - // Check if syncStateSet changed - if (this.syncStateSet.size() == newSyncStateSet.size() && this.syncStateSet.containsAll(newSyncStateSet)) { - return; + try { + final Set newSyncStateSet = this.haService.maybeShrinkSyncStateSet(); + newSyncStateSet.add(this.brokerControllerId); + synchronized (this) { + if (this.syncStateSet != null) { + // Check if syncStateSet changed + if (this.syncStateSet.size() == newSyncStateSet.size() && this.syncStateSet.containsAll(newSyncStateSet)) { + return; + } } } + doReportSyncStateSetChanged(newSyncStateSet); + } catch (Exception e) { + LOGGER.error("Check syncStateSet error", e); } - doReportSyncStateSetChanged(newSyncStateSet); } private void doReportSyncStateSetChanged(Set newSyncStateSet) { diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java index 6dc734e0c..d5393fdca 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java @@ -41,6 +41,7 @@ import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -73,7 +74,7 @@ public class AutoSwitchHAService extends DefaultHAService { private EpochFileCache epochCache; private AutoSwitchHAClient haClient; - private Long brokerControllerId = null; + private Long localBrokerId = null; public AutoSwitchHAService() { } @@ -287,9 +288,11 @@ public class AutoSwitchHAService extends DefaultHAService { // If the slaveBrokerId is in syncStateSet but not in connectionCaughtUpTimeTable, // it means that the broker has not connected. - for (Long slaveBrokerId : newSyncStateSet) { - if (!this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) { - newSyncStateSet.remove(slaveBrokerId); + Iterator iterator = newSyncStateSet.iterator(); + while (iterator.hasNext()) { + Long slaveBrokerId = iterator.next(); + if (!Objects.equals(slaveBrokerId, this.localBrokerId) && !this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) { + iterator.remove(); isSyncStateSetChanged = true; } } @@ -419,7 +422,7 @@ public class AutoSwitchHAService extends DefaultHAService { // To avoid the syncStateSet is not consistent with connectionList. // Fix issue: https://github.com/apache/rocketmq/issues/6662 for (Long syncId : currentSyncStateSet) { - if (!idList.contains(syncId) && this.brokerControllerId != null && !Objects.equals(syncId, this.brokerControllerId)) { + if (!idList.contains(syncId) && this.localBrokerId != null && !Objects.equals(syncId, this.localBrokerId)) { LOGGER.warn("Slave {} is still in syncStateSet, but has lost its connection. So new offset can't be compute.", syncId); // Without check and re-compute, return the confirmOffset's value directly. return this.defaultMessageStore.getConfirmOffsetDirectly(); @@ -545,12 +548,12 @@ public class AutoSwitchHAService extends DefaultHAService { return this.epochCache.getAllEntries(); } - public Long getBrokerControllerId() { - return brokerControllerId; + public Long getLocalBrokerId() { + return localBrokerId; } - public void setBrokerControllerId(Long brokerControllerId) { - this.brokerControllerId = brokerControllerId; + public void setLocalBrokerId(Long localBrokerId) { + this.localBrokerId = localBrokerId; } class AutoSwitchAcceptSocketService extends AcceptSocketService { -- 2.32.0.windows.2 From 77e8e54b37c3fc3ea0beffc1ace6f5bf20af10d9 Mon Sep 17 00:00:00 2001 From: lk Date: Wed, 23 Aug 2023 15:56:39 +0800 Subject: [PATCH 8/8] [ISSUE #7223] Support batch ack for grpc client in proxy (#7225) --- .../client/impl/mqclient/MQClientAPIExt.java | 26 +++ .../rocketmq/proxy/config/ProxyConfig.java | 10 + .../grpc/v2/consumer/AckMessageActivity.java | 136 ++++++++--- .../proxy/processor/AbstractProcessor.java | 4 +- .../proxy/processor/BatchAckResult.java | 53 +++++ .../proxy/processor/ConsumerProcessor.java | 64 +++++ .../processor/DefaultMessagingProcessor.java | 7 + .../proxy/processor/MessagingProcessor.java | 18 ++ .../message/ClusterMessageService.java | 16 +- .../service/message/LocalMessageService.java | 58 +++++ .../proxy/service/message/MessageService.java | 8 + .../service/message/ReceiptHandleMessage.java | 39 ++++ .../v2/consumer/AckMessageActivityTest.java | 221 +++++++++++++++--- .../proxy/processor/BaseProcessorTest.java | 18 +- .../processor/ConsumerProcessorTest.java | 115 +++++++++ .../service/mqclient/MQClientAPIExtTest.java | 12 + 16 files changed, 728 insertions(+), 77 deletions(-) create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java index fb8f8d11f..d7c8ef8d9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java @@ -306,6 +306,32 @@ public class MQClientAPIExt extends MQClientAPIImpl { return future; } + public CompletableFuture batchAckMessageAsync( + String brokerAddr, + String topic, + String consumerGroup, + List extraInfoList, + long timeoutMillis + ) { + CompletableFuture future = new CompletableFuture<>(); + try { + this.batchAckMessageAsync(brokerAddr, timeoutMillis, new AckCallback() { + @Override + public void onSuccess(AckResult ackResult) { + future.complete(ackResult); + } + + @Override + public void onException(Throwable t) { + future.completeExceptionally(t); + } + }, topic, consumerGroup, extraInfoList); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; + } + public CompletableFuture changeInvisibleTimeAsync( String brokerAddr, String brokerName, diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index 39caaa0d9..76a243919 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -250,6 +250,8 @@ public class ProxyConfig implements ConfigFile { private long remotingWaitTimeMillsInTopicRouteQueue = 3 * 1000; private long remotingWaitTimeMillsInDefaultQueue = 3 * 1000; + private boolean enableBatchAck = false; + @Override public void initData() { parseDelayLevel(); @@ -1379,4 +1381,12 @@ public class ProxyConfig implements ConfigFile { public void setRemotingWaitTimeMillsInDefaultQueue(long remotingWaitTimeMillsInDefaultQueue) { this.remotingWaitTimeMillsInDefaultQueue = remotingWaitTimeMillsInDefaultQueue; } + + public boolean isEnableBatchAck() { + return enableBatchAck; + } + + public void setEnableBatchAck(boolean enableBatchAck) { + this.enableBatchAck = enableBatchAck; + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java index 9a3a77201..97c716c8f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java @@ -31,12 +31,15 @@ import org.apache.rocketmq.client.consumer.AckStatus; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; +import org.apache.rocketmq.proxy.processor.BatchAckResult; import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; public class AckMessageActivity extends AbstractMessingActivity { @@ -50,60 +53,98 @@ public class AckMessageActivity extends AbstractMessingActivity { try { validateTopicAndConsumerGroup(request.getTopic(), request.getGroup()); - - CompletableFuture[] futures = new CompletableFuture[request.getEntriesCount()]; - for (int i = 0; i < request.getEntriesCount(); i++) { - futures[i] = processAckMessage(ctx, request, request.getEntries(i)); + String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); + String topic = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()); + if (ConfigurationManager.getProxyConfig().isEnableBatchAck()) { + future = ackMessageInBatch(ctx, group, topic, request); + } else { + future = ackMessageOneByOne(ctx, group, topic, request); } - CompletableFuture.allOf(futures).whenComplete((val, throwable) -> { - if (throwable != null) { - future.completeExceptionally(throwable); - return; - } + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; + } + + protected CompletableFuture ackMessageInBatch(ProxyContext ctx, String group, String topic, AckMessageRequest request) { + List handleMessageList = new ArrayList<>(request.getEntriesCount()); + for (AckMessageEntry ackMessageEntry : request.getEntriesList()) { + String handleString = getHandleString(ctx, group, request, ackMessageEntry); + handleMessageList.add(new ReceiptHandleMessage(ReceiptHandle.decode(handleString), ackMessageEntry.getMessageId())); + } + return this.messagingProcessor.batchAckMessage(ctx, handleMessageList, group, topic) + .thenApply(batchAckResultList -> { + AckMessageResponse.Builder responseBuilder = AckMessageResponse.newBuilder(); Set responseCodes = new HashSet<>(); - List entryList = new ArrayList<>(); - for (CompletableFuture entryFuture : futures) { - AckMessageResultEntry entryResult = entryFuture.join(); - responseCodes.add(entryResult.getStatus().getCode()); - entryList.add(entryResult); - } - AckMessageResponse.Builder responseBuilder = AckMessageResponse.newBuilder() - .addAllEntries(entryList); - if (responseCodes.size() > 1) { - responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.MULTIPLE_RESULTS, Code.MULTIPLE_RESULTS.name())); - } else if (responseCodes.size() == 1) { - Code code = responseCodes.stream().findAny().get(); - responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(code, code.name())); - } else { - responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack message result is empty")); + for (BatchAckResult batchAckResult : batchAckResultList) { + AckMessageResultEntry entry = convertToAckMessageResultEntry(batchAckResult); + responseBuilder.addEntries(entry); + responseCodes.add(entry.getStatus().getCode()); } - future.complete(responseBuilder.build()); + setAckResponseStatus(responseBuilder, responseCodes); + return responseBuilder.build(); }); - } catch (Throwable t) { - future.completeExceptionally(t); + } + + protected AckMessageResultEntry convertToAckMessageResultEntry(BatchAckResult batchAckResult) { + ReceiptHandleMessage handleMessage = batchAckResult.getReceiptHandleMessage(); + AckMessageResultEntry.Builder resultBuilder = AckMessageResultEntry.newBuilder() + .setMessageId(handleMessage.getMessageId()) + .setReceiptHandle(handleMessage.getReceiptHandle().getReceiptHandle()); + if (batchAckResult.getProxyException() != null) { + resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(batchAckResult.getProxyException())); + } else { + AckResult ackResult = batchAckResult.getAckResult(); + if (AckStatus.OK.equals(ackResult.getStatus())) { + resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())); + } else { + resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack failed: status is abnormal")); + } } - return future; + return resultBuilder.build(); } - protected CompletableFuture processAckMessage(ProxyContext ctx, AckMessageRequest request, + protected CompletableFuture ackMessageOneByOne(ProxyContext ctx, String group, String topic, AckMessageRequest request) { + CompletableFuture resultFuture = new CompletableFuture<>(); + CompletableFuture[] futures = new CompletableFuture[request.getEntriesCount()]; + for (int i = 0; i < request.getEntriesCount(); i++) { + futures[i] = processAckMessage(ctx, group, topic, request, request.getEntries(i)); + } + CompletableFuture.allOf(futures).whenComplete((val, throwable) -> { + if (throwable != null) { + resultFuture.completeExceptionally(throwable); + return; + } + + Set responseCodes = new HashSet<>(); + List entryList = new ArrayList<>(); + for (CompletableFuture entryFuture : futures) { + AckMessageResultEntry entryResult = entryFuture.join(); + responseCodes.add(entryResult.getStatus().getCode()); + entryList.add(entryResult); + } + AckMessageResponse.Builder responseBuilder = AckMessageResponse.newBuilder() + .addAllEntries(entryList); + setAckResponseStatus(responseBuilder, responseCodes); + resultFuture.complete(responseBuilder.build()); + }); + return resultFuture; + } + + protected CompletableFuture processAckMessage(ProxyContext ctx, String group, String topic, AckMessageRequest request, AckMessageEntry ackMessageEntry) { CompletableFuture future = new CompletableFuture<>(); try { - String handleString = ackMessageEntry.getReceiptHandle(); - - String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); - MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle()); - if (messageReceiptHandle != null) { - handleString = messageReceiptHandle.getReceiptHandleStr(); - } + String handleString = this.getHandleString(ctx, group, request, ackMessageEntry); CompletableFuture ackResultFuture = this.messagingProcessor.ackMessage( ctx, ReceiptHandle.decode(handleString), ackMessageEntry.getMessageId(), group, - GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic())); + topic + ); ackResultFuture.thenAccept(result -> { future.complete(convertToAckMessageResultEntry(ctx, ackMessageEntry, result)); }).exceptionally(t -> { @@ -139,4 +180,25 @@ public class AckMessageActivity extends AbstractMessingActivity { .setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack failed: status is abnormal")) .build(); } + + protected void setAckResponseStatus(AckMessageResponse.Builder responseBuilder, Set responseCodes) { + if (responseCodes.size() > 1) { + responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.MULTIPLE_RESULTS, Code.MULTIPLE_RESULTS.name())); + } else if (responseCodes.size() == 1) { + Code code = responseCodes.stream().findAny().get(); + responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(code, code.name())); + } else { + responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR, "ack message result is empty")); + } + } + + protected String getHandleString(ProxyContext ctx, String group, AckMessageRequest request, AckMessageEntry ackMessageEntry) { + String handleString = ackMessageEntry.getReceiptHandle(); + + MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle()); + if (messageReceiptHandle != null) { + handleString = messageReceiptHandle.getReceiptHandleStr(); + } + return handleString; + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java index b61c3df9e..c63212c23 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java @@ -27,6 +27,8 @@ public abstract class AbstractProcessor extends AbstractStartAndShutdown { protected MessagingProcessor messagingProcessor; protected ServiceManager serviceManager; + protected static final ProxyException EXPIRED_HANDLE_PROXY_EXCEPTION = new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is expired"); + public AbstractProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) { this.messagingProcessor = messagingProcessor; @@ -35,7 +37,7 @@ public abstract class AbstractProcessor extends AbstractStartAndShutdown { protected void validateReceiptHandle(ReceiptHandle handle) { if (handle.isExpired()) { - throw new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is expired"); + throw EXPIRED_HANDLE_PROXY_EXCEPTION; } } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java new file mode 100644 index 000000000..dfb9c9b9e --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.processor; + +import org.apache.rocketmq.client.consumer.AckResult; +import org.apache.rocketmq.proxy.common.ProxyException; +import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; + +public class BatchAckResult { + + private final ReceiptHandleMessage receiptHandleMessage; + private AckResult ackResult; + private ProxyException proxyException; + + public BatchAckResult(ReceiptHandleMessage receiptHandleMessage, + AckResult ackResult) { + this.receiptHandleMessage = receiptHandleMessage; + this.ackResult = ackResult; + } + + public BatchAckResult(ReceiptHandleMessage receiptHandleMessage, + ProxyException proxyException) { + this.receiptHandleMessage = receiptHandleMessage; + this.proxyException = proxyException; + } + + public ReceiptHandleMessage getReceiptHandleMessage() { + return receiptHandleMessage; + } + + public AckResult getAckResult() { + return ackResult; + } + + public ProxyException getProxyException() { + return proxyException; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java index 656a6339d..f3522b374 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java @@ -48,6 +48,7 @@ import org.apache.rocketmq.proxy.common.ProxyExceptionCode; import org.apache.rocketmq.proxy.common.utils.FutureUtils; import org.apache.rocketmq.proxy.common.utils.ProxyUtils; import org.apache.rocketmq.proxy.service.ServiceManager; +import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; @@ -241,6 +242,69 @@ public class ConsumerProcessor extends AbstractProcessor { return FutureUtils.addExecutor(future, this.executor); } + public CompletableFuture> batchAckMessage( + ProxyContext ctx, + List handleMessageList, + String consumerGroup, + String topic, + long timeoutMillis + ) { + CompletableFuture> future = new CompletableFuture<>(); + try { + List batchAckResultList = new ArrayList<>(handleMessageList.size()); + Map> brokerHandleListMap = new HashMap<>(); + + for (ReceiptHandleMessage handleMessage : handleMessageList) { + if (handleMessage.getReceiptHandle().isExpired()) { + batchAckResultList.add(new BatchAckResult(handleMessage, EXPIRED_HANDLE_PROXY_EXCEPTION)); + continue; + } + List brokerHandleList = brokerHandleListMap.computeIfAbsent(handleMessage.getReceiptHandle().getBrokerName(), key -> new ArrayList<>()); + brokerHandleList.add(handleMessage); + } + + if (brokerHandleListMap.isEmpty()) { + return FutureUtils.addExecutor(CompletableFuture.completedFuture(batchAckResultList), this.executor); + } + Set>> brokerHandleListMapEntrySet = brokerHandleListMap.entrySet(); + CompletableFuture>[] futures = new CompletableFuture[brokerHandleListMapEntrySet.size()]; + int futureIndex = 0; + for (Map.Entry> entry : brokerHandleListMapEntrySet) { + futures[futureIndex++] = processBrokerHandle(ctx, consumerGroup, topic, entry.getValue(), timeoutMillis); + } + CompletableFuture.allOf(futures).whenComplete((val, throwable) -> { + if (throwable != null) { + future.completeExceptionally(throwable); + } + for (CompletableFuture> resultFuture : futures) { + batchAckResultList.addAll(resultFuture.join()); + } + future.complete(batchAckResultList); + }); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return FutureUtils.addExecutor(future, this.executor); + } + + protected CompletableFuture> processBrokerHandle(ProxyContext ctx, String consumerGroup, String topic, List handleMessageList, long timeoutMillis) { + return this.serviceManager.getMessageService().batchAckMessage(ctx, handleMessageList, consumerGroup, topic, timeoutMillis) + .thenApply(result -> { + List results = new ArrayList<>(); + for (ReceiptHandleMessage handleMessage : handleMessageList) { + results.add(new BatchAckResult(handleMessage, result)); + } + return results; + }) + .exceptionally(throwable -> { + List results = new ArrayList<>(); + for (ReceiptHandleMessage handleMessage : handleMessageList) { + results.add(new BatchAckResult(handleMessage, new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, throwable.getMessage(), throwable))); + } + return results; + }); + } + public CompletableFuture changeInvisibleTime(ProxyContext ctx, ReceiptHandle handle, String messageId, String groupName, String topicName, long invisibleTime, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index 188cb7b9b..ba150051b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -46,6 +46,7 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.service.ServiceManager; import org.apache.rocketmq.proxy.service.ServiceManagerFactory; +import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; import org.apache.rocketmq.proxy.service.metadata.MetadataService; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData; @@ -183,6 +184,12 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen return this.consumerProcessor.ackMessage(ctx, handle, messageId, consumerGroup, topic, timeoutMillis); } + @Override + public CompletableFuture> batchAckMessage(ProxyContext ctx, + List handleMessageList, String consumerGroup, String topic, long timeoutMillis) { + return this.consumerProcessor.batchAckMessage(ctx, handleMessageList, consumerGroup, topic, timeoutMillis); + } + @Override public CompletableFuture changeInvisibleTime(ProxyContext ctx, ReceiptHandle handle, String messageId, String groupName, String topicName, long invisibleTime, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java index d86be0bd8..2ae7418ba 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.proxy.common.Address; import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; import org.apache.rocketmq.proxy.service.metadata.MetadataService; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData; @@ -155,6 +156,23 @@ public interface MessagingProcessor extends StartAndShutdown { long timeoutMillis ); + default CompletableFuture> batchAckMessage( + ProxyContext ctx, + List handleMessageList, + String consumerGroup, + String topic + ) { + return batchAckMessage(ctx, handleMessageList, consumerGroup, topic, DEFAULT_TIMEOUT_MILLS); + } + + CompletableFuture> batchAckMessage( + ProxyContext ctx, + List handleMessageList, + String consumerGroup, + String topic, + long timeoutMillis + ); + default CompletableFuture changeInvisibleTime( ProxyContext ctx, ReceiptHandle handle, diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java index 9f163f1b9..70b72deae 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java @@ -20,9 +20,11 @@ import com.google.common.collect.Lists; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import org.apache.rocketmq.client.consumer.AckResult; import org.apache.rocketmq.client.consumer.PopResult; import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.Message; @@ -31,7 +33,6 @@ import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; import org.apache.rocketmq.proxy.common.utils.FutureUtils; -import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.route.TopicRouteService; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -137,6 +138,19 @@ public class ClusterMessageService implements MessageService { ); } + @Override + public CompletableFuture batchAckMessage(ProxyContext ctx, List handleList, String consumerGroup, + String topic, long timeoutMillis) { + List extraInfoList = handleList.stream().map(message -> message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList()); + return this.mqClientAPIFactory.getClient().batchAckMessageAsync( + this.resolveBrokerAddrInReceiptHandle(ctx, handleList.get(0).getReceiptHandle()), + topic, + consumerGroup, + extraInfoList, + timeoutMillis + ); + } + @Override public CompletableFuture pullMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, PullMessageRequestHeader requestHeader, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index eb2c4d9ee..ca7dcc9eb 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.service.message; import io.netty.channel.ChannelHandlerContext; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -54,6 +55,8 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.BatchAck; +import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader; @@ -364,6 +367,61 @@ public class LocalMessageService implements MessageService { }); } + @Override + public CompletableFuture batchAckMessage(ProxyContext ctx, List handleList, + String consumerGroup, String topic, long timeoutMillis) { + SimpleChannel channel = channelManager.createChannel(ctx); + ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null); + + Map batchAckMap = new HashMap<>(); + for (ReceiptHandleMessage receiptHandleMessage : handleList) { + String extraInfo = receiptHandleMessage.getReceiptHandle().getReceiptHandle(); + String[] extraInfoData = ExtraInfoUtil.split(extraInfo); + String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" + + ExtraInfoUtil.getQueueId(extraInfoData) + "@" + + ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" + + ExtraInfoUtil.getPopTime(extraInfoData); + BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> { + BatchAck newBatchAck = new BatchAck(); + newBatchAck.setConsumerGroup(consumerGroup); + newBatchAck.setTopic(topic); + newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData)); + newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData)); + newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData)); + newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData)); + newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData)); + newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData)); + newBatchAck.setBitSet(new BitSet()); + return newBatchAck; + }); + bAck.getBitSet().set((int) (ExtraInfoUtil.getQueueOffset(extraInfoData) - ExtraInfoUtil.getCkQueueOffset(extraInfoData))); + } + BatchAckMessageRequestBody requestBody = new BatchAckMessageRequestBody(); + requestBody.setBrokerName(brokerController.getBrokerConfig().getBrokerName()); + requestBody.setAcks(new ArrayList<>(batchAckMap.values())); + + command.setBody(requestBody.encode()); + CompletableFuture future = new CompletableFuture<>(); + try { + RemotingCommand response = brokerController.getAckMessageProcessor() + .processRequest(channelHandlerContext, command); + future.complete(response); + } catch (Exception e) { + log.error("Fail to process batchAckMessage command", e); + future.completeExceptionally(e); + } + return future.thenApply(r -> { + AckResult ackResult = new AckResult(); + if (ResponseCode.SUCCESS == r.getCode()) { + ackResult.setStatus(AckStatus.OK); + } else { + ackResult.setStatus(AckStatus.NO_EXIST); + } + return ackResult; + }); + } + @Override public CompletableFuture pullMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, PullMessageRequestHeader requestHeader, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java index 15da17154..58a835adb 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java @@ -91,6 +91,14 @@ public interface MessageService { long timeoutMillis ); + CompletableFuture batchAckMessage( + ProxyContext ctx, + List handleList, + String consumerGroup, + String topic, + long timeoutMillis + ); + CompletableFuture pullMessage( ProxyContext ctx, AddressableMessageQueue messageQueue, diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java new file mode 100644 index 000000000..ae63fed49 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.message; + +import org.apache.rocketmq.common.consumer.ReceiptHandle; + +public class ReceiptHandleMessage { + + private final ReceiptHandle receiptHandle; + private final String messageId; + + public ReceiptHandleMessage(ReceiptHandle receiptHandle, String messageId) { + this.receiptHandle = receiptHandle; + this.messageId = messageId; + } + + public ReceiptHandle getReceiptHandle() { + return receiptHandle; + } + + public String getMessageId() { + return messageId; + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java index 49fdfc6a8..3c4746105 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java @@ -20,21 +20,32 @@ package org.apache.rocketmq.proxy.grpc.v2.consumer; import apache.rocketmq.v2.AckMessageEntry; import apache.rocketmq.v2.AckMessageRequest; import apache.rocketmq.v2.AckMessageResponse; +import apache.rocketmq.v2.AckMessageResultEntry; import apache.rocketmq.v2.Code; import apache.rocketmq.v2.Resource; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.client.consumer.AckResult; import org.apache.rocketmq.client.consumer.AckStatus; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest; +import org.apache.rocketmq.proxy.processor.BatchAckResult; +import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; import org.junit.Before; import org.junit.Test; +import org.mockito.stubbing.Answer; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; public class AckMessageActivityTest extends BaseActivityTest { @@ -52,43 +63,197 @@ public class AckMessageActivityTest extends BaseActivityTest { @Test public void testAckMessage() throws Throwable { - when(this.messagingProcessor.ackMessage(any(), any(), eq("msg1"), anyString(), anyString())) + ConfigurationManager.getProxyConfig().setEnableBatchAck(false); + + String msg1 = "msg1"; + String msg2 = "msg2"; + String msg3 = "msg3"; + + when(this.messagingProcessor.ackMessage(any(), any(), eq(msg1), anyString(), anyString())) .thenThrow(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is expired")); AckResult msg2AckResult = new AckResult(); msg2AckResult.setStatus(AckStatus.OK); - when(this.messagingProcessor.ackMessage(any(), any(), eq("msg2"), anyString(), anyString())) + when(this.messagingProcessor.ackMessage(any(), any(), eq(msg2), anyString(), anyString())) .thenReturn(CompletableFuture.completedFuture(msg2AckResult)); AckResult msg3AckResult = new AckResult(); msg3AckResult.setStatus(AckStatus.NO_EXIST); - when(this.messagingProcessor.ackMessage(any(), any(), eq("msg3"), anyString(), anyString())) + when(this.messagingProcessor.ackMessage(any(), any(), eq(msg3), anyString(), anyString())) .thenReturn(CompletableFuture.completedFuture(msg3AckResult)); - AckMessageResponse response = this.ackMessageActivity.ackMessage( - createContext(), - AckMessageRequest.newBuilder() - .setTopic(Resource.newBuilder().setName(TOPIC).build()) - .setGroup(Resource.newBuilder().setName(GROUP).build()) - .addEntries(AckMessageEntry.newBuilder() - .setMessageId("msg1") - .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000)) - .build()) - .addEntries(AckMessageEntry.newBuilder() - .setMessageId("msg2") - .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) - .build()) - .addEntries(AckMessageEntry.newBuilder() - .setMessageId("msg3") - .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) - .build()) - .build() - ).get(); - - assertEquals(Code.MULTIPLE_RESULTS, response.getStatus().getCode()); - assertEquals(3, response.getEntriesCount()); - assertEquals(Code.INVALID_RECEIPT_HANDLE, response.getEntries(0).getStatus().getCode()); - assertEquals(Code.OK, response.getEntries(1).getStatus().getCode()); - assertEquals(Code.INTERNAL_SERVER_ERROR, response.getEntries(2).getStatus().getCode()); + { + AckMessageResponse response = this.ackMessageActivity.ackMessage( + createContext(), + AckMessageRequest.newBuilder() + .setTopic(Resource.newBuilder().setName(TOPIC).build()) + .setGroup(Resource.newBuilder().setName(GROUP).build()) + .addEntries(AckMessageEntry.newBuilder() + .setMessageId(msg1) + .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000)) + .build()) + .build() + ).get(); + assertEquals(Code.INVALID_RECEIPT_HANDLE, response.getStatus().getCode()); + } + { + AckMessageResponse response = this.ackMessageActivity.ackMessage( + createContext(), + AckMessageRequest.newBuilder() + .setTopic(Resource.newBuilder().setName(TOPIC).build()) + .setGroup(Resource.newBuilder().setName(GROUP).build()) + .addEntries(AckMessageEntry.newBuilder() + .setMessageId(msg2) + .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000)) + .build()) + .build() + ).get(); + assertEquals(Code.OK, response.getStatus().getCode()); + } + { + AckMessageResponse response = this.ackMessageActivity.ackMessage( + createContext(), + AckMessageRequest.newBuilder() + .setTopic(Resource.newBuilder().setName(TOPIC).build()) + .setGroup(Resource.newBuilder().setName(GROUP).build()) + .addEntries(AckMessageEntry.newBuilder() + .setMessageId(msg3) + .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000)) + .build()) + .build() + ).get(); + assertEquals(Code.INTERNAL_SERVER_ERROR, response.getStatus().getCode()); + } + { + AckMessageResponse response = this.ackMessageActivity.ackMessage( + createContext(), + AckMessageRequest.newBuilder() + .setTopic(Resource.newBuilder().setName(TOPIC).build()) + .setGroup(Resource.newBuilder().setName(GROUP).build()) + .addEntries(AckMessageEntry.newBuilder() + .setMessageId(msg1) + .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis() - 10000, 1000)) + .build()) + .addEntries(AckMessageEntry.newBuilder() + .setMessageId(msg2) + .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) + .build()) + .addEntries(AckMessageEntry.newBuilder() + .setMessageId(msg3) + .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) + .build()) + .build() + ).get(); + + assertEquals(Code.MULTIPLE_RESULTS, response.getStatus().getCode()); + assertEquals(3, response.getEntriesCount()); + assertEquals(Code.INVALID_RECEIPT_HANDLE, response.getEntries(0).getStatus().getCode()); + assertEquals(Code.OK, response.getEntries(1).getStatus().getCode()); + assertEquals(Code.INTERNAL_SERVER_ERROR, response.getEntries(2).getStatus().getCode()); + } + } + + @Test + public void testAckMessageInBatch() throws Throwable { + ConfigurationManager.getProxyConfig().setEnableBatchAck(true); + + String successMessageId = "msg1"; + String notOkMessageId = "msg2"; + String exceptionMessageId = "msg3"; + + doAnswer((Answer>>) invocation -> { + List receiptHandleMessageList = invocation.getArgument(1, List.class); + List batchAckResultList = new ArrayList<>(); + for (ReceiptHandleMessage receiptHandleMessage : receiptHandleMessageList) { + BatchAckResult batchAckResult; + if (receiptHandleMessage.getMessageId().equals(successMessageId)) { + AckResult ackResult = new AckResult(); + ackResult.setStatus(AckStatus.OK); + batchAckResult = new BatchAckResult(receiptHandleMessage, ackResult); + } else if (receiptHandleMessage.getMessageId().equals(notOkMessageId)) { + AckResult ackResult = new AckResult(); + ackResult.setStatus(AckStatus.NO_EXIST); + batchAckResult = new BatchAckResult(receiptHandleMessage, ackResult); + } else { + batchAckResult = new BatchAckResult(receiptHandleMessage, new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "")); + } + batchAckResultList.add(batchAckResult); + } + return CompletableFuture.completedFuture(batchAckResultList); + }).when(this.messagingProcessor).batchAckMessage(any(), anyList(), anyString(), anyString()); + + { + AckMessageResponse response = this.ackMessageActivity.ackMessage( + createContext(), + AckMessageRequest.newBuilder() + .setTopic(Resource.newBuilder().setName(TOPIC).build()) + .setGroup(Resource.newBuilder().setName(GROUP).build()) + .addEntries(AckMessageEntry.newBuilder() + .setMessageId(successMessageId) + .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) + .build()) + .build() + ).get(); + assertEquals(Code.OK, response.getStatus().getCode()); + } + { + AckMessageResponse response = this.ackMessageActivity.ackMessage( + createContext(), + AckMessageRequest.newBuilder() + .setTopic(Resource.newBuilder().setName(TOPIC).build()) + .setGroup(Resource.newBuilder().setName(GROUP).build()) + .addEntries(AckMessageEntry.newBuilder() + .setMessageId(notOkMessageId) + .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) + .build()) + .build() + ).get(); + assertEquals(Code.INTERNAL_SERVER_ERROR, response.getStatus().getCode()); + } + { + AckMessageResponse response = this.ackMessageActivity.ackMessage( + createContext(), + AckMessageRequest.newBuilder() + .setTopic(Resource.newBuilder().setName(TOPIC).build()) + .setGroup(Resource.newBuilder().setName(GROUP).build()) + .addEntries(AckMessageEntry.newBuilder() + .setMessageId(exceptionMessageId) + .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) + .build()) + .build() + ).get(); + assertEquals(Code.INVALID_RECEIPT_HANDLE, response.getStatus().getCode()); + } + { + AckMessageResponse response = this.ackMessageActivity.ackMessage( + createContext(), + AckMessageRequest.newBuilder() + .setTopic(Resource.newBuilder().setName(TOPIC).build()) + .setGroup(Resource.newBuilder().setName(GROUP).build()) + .addEntries(AckMessageEntry.newBuilder() + .setMessageId(successMessageId) + .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) + .build()) + .addEntries(AckMessageEntry.newBuilder() + .setMessageId(notOkMessageId) + .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) + .build()) + .addEntries(AckMessageEntry.newBuilder() + .setMessageId(exceptionMessageId) + .setReceiptHandle(buildReceiptHandle(TOPIC, System.currentTimeMillis(), 3000)) + .build()) + .build() + ).get(); + + assertEquals(Code.MULTIPLE_RESULTS, response.getStatus().getCode()); + assertEquals(3, response.getEntriesCount()); + Map msgCode = new HashMap<>(); + for (AckMessageResultEntry entry : response.getEntriesList()) { + msgCode.put(entry.getMessageId(), entry.getStatus().getCode()); + } + assertEquals(Code.OK, msgCode.get(successMessageId)); + assertEquals(Code.INTERNAL_SERVER_ERROR, msgCode.get(notOkMessageId)); + assertEquals(Code.INVALID_RECEIPT_HANDLE, msgCode.get(exceptionMessageId)); + } } } \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java index 5c1ea9627..072630e39 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java @@ -66,14 +66,6 @@ public class BaseProcessorTest extends InitConfigTest { protected ProxyRelayService proxyRelayService; @Mock protected MetadataService metadataService; - @Mock - protected ProducerProcessor producerProcessor; - @Mock - protected ConsumerProcessor consumerProcessor; - @Mock - protected TransactionProcessor transactionProcessor; - @Mock - protected ClientProcessor clientProcessor; public void before() throws Throwable { super.before(); @@ -92,6 +84,13 @@ public class BaseProcessorTest extends InitConfigTest { } protected static MessageExt createMessageExt(String topic, String tags, int reconsumeTimes, long invisibleTime) { + return createMessageExt(topic, tags, reconsumeTimes, invisibleTime, System.currentTimeMillis(), + RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE), + RANDOM.nextInt(Integer.MAX_VALUE), "mockBroker"); + } + + protected static MessageExt createMessageExt(String topic, String tags, int reconsumeTimes, long invisibleTime, long popTime, + long startOffset, int reviveQid, int queueId, long queueOffset, String brokerName) { MessageExt messageExt = new MessageExt(); messageExt.setTopic(topic); messageExt.setTags(tags); @@ -100,8 +99,7 @@ public class BaseProcessorTest extends InitConfigTest { messageExt.setMsgId(MessageClientIDSetter.createUniqID()); messageExt.setCommitLogOffset(RANDOM.nextInt(Integer.MAX_VALUE)); MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_POP_CK, - ExtraInfoUtil.buildExtraInfo(RANDOM.nextInt(Integer.MAX_VALUE), System.currentTimeMillis(), invisibleTime, - RANDOM.nextInt(Integer.MAX_VALUE), topic, "mockBroker", RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE))); + ExtraInfoUtil.buildExtraInfo(startOffset, popTime, invisibleTime, reviveQid, topic, brokerName, queueId, queueOffset)); return messageExt; } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java index 717e86fc0..db268a06e 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java @@ -20,8 +20,11 @@ package org.apache.rocketmq.proxy.processor; import com.google.common.collect.Sets; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -39,7 +42,10 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +import org.apache.rocketmq.proxy.common.utils.FutureUtils; import org.apache.rocketmq.proxy.common.utils.ProxyUtils; +import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.route.MessageQueueView; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -50,16 +56,22 @@ import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.stubbing.Answer; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerProcessorTest extends BaseProcessorTest { @@ -162,6 +174,109 @@ public class ConsumerProcessorTest extends BaseProcessorTest { assertEquals(handle.getReceiptHandle(), requestHeaderArgumentCaptor.getValue().getExtraInfo()); } + @Test + public void testBatchAckExpireMessage() throws Throwable { + String brokerName1 = "brokerName1"; + + List receiptHandleMessageList = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + MessageExt expireMessage = createMessageExt(TOPIC, "", 0, 3000, System.currentTimeMillis() - 10000, + 0, 0, 0, i, brokerName1); + ReceiptHandle expireHandle = create(expireMessage); + receiptHandleMessageList.add(new ReceiptHandleMessage(expireHandle, expireMessage.getMsgId())); + } + + List batchAckResultList = this.consumerProcessor.batchAckMessage(createContext(), receiptHandleMessageList, CONSUMER_GROUP, TOPIC, 3000).get(); + + verify(this.messageService, never()).batchAckMessage(any(), anyList(), anyString(), anyString(), anyLong()); + assertEquals(receiptHandleMessageList.size(), batchAckResultList.size()); + for (BatchAckResult batchAckResult : batchAckResultList) { + assertNull(batchAckResult.getAckResult()); + assertNotNull(batchAckResult.getProxyException()); + assertNotNull(batchAckResult.getReceiptHandleMessage()); + } + + } + + @Test + public void testBatchAckMessage() throws Throwable { + String brokerName1 = "brokerName1"; + String brokerName2 = "brokerName2"; + String errThrowBrokerName = "errThrowBrokerName"; + MessageExt expireMessage = createMessageExt(TOPIC, "", 0, 3000, System.currentTimeMillis() - 10000, + 0, 0, 0, 0, brokerName1); + ReceiptHandle expireHandle = create(expireMessage); + + List receiptHandleMessageList = new ArrayList<>(); + receiptHandleMessageList.add(new ReceiptHandleMessage(expireHandle, expireMessage.getMsgId())); + List broker1Msg = new ArrayList<>(); + List broker2Msg = new ArrayList<>(); + + long now = System.currentTimeMillis(); + int msgNum = 3; + for (int i = 0; i < msgNum; i++) { + MessageExt brokerMessage = createMessageExt(TOPIC, "", 0, 3000, now, + 0, 0, 0, i + 1, brokerName1); + ReceiptHandle brokerHandle = create(brokerMessage); + receiptHandleMessageList.add(new ReceiptHandleMessage(brokerHandle, brokerMessage.getMsgId())); + broker1Msg.add(brokerMessage.getMsgId()); + } + for (int i = 0; i < msgNum; i++) { + MessageExt brokerMessage = createMessageExt(TOPIC, "", 0, 3000, now, + 0, 0, 0, i + 1, brokerName2); + ReceiptHandle brokerHandle = create(brokerMessage); + receiptHandleMessageList.add(new ReceiptHandleMessage(brokerHandle, brokerMessage.getMsgId())); + broker2Msg.add(brokerMessage.getMsgId()); + } + + // for this message, will throw exception in batchAckMessage + MessageExt errThrowMessage = createMessageExt(TOPIC, "", 0, 3000, now, + 0, 0, 0, 0, errThrowBrokerName); + ReceiptHandle errThrowHandle = create(errThrowMessage); + receiptHandleMessageList.add(new ReceiptHandleMessage(errThrowHandle, errThrowMessage.getMsgId())); + + Collections.shuffle(receiptHandleMessageList); + + doAnswer((Answer>) invocation -> { + List handleMessageList = invocation.getArgument(1, List.class); + AckResult ackResult = new AckResult(); + String brokerName = handleMessageList.get(0).getReceiptHandle().getBrokerName(); + if (brokerName.equals(brokerName1)) { + ackResult.setStatus(AckStatus.OK); + } else if (brokerName.equals(brokerName2)) { + ackResult.setStatus(AckStatus.NO_EXIST); + } else { + return FutureUtils.completeExceptionally(new RuntimeException()); + } + + return CompletableFuture.completedFuture(ackResult); + }).when(this.messageService).batchAckMessage(any(), anyList(), anyString(), anyString(), anyLong()); + + List batchAckResultList = this.consumerProcessor.batchAckMessage(createContext(), receiptHandleMessageList, CONSUMER_GROUP, TOPIC, 3000).get(); + assertEquals(receiptHandleMessageList.size(), batchAckResultList.size()); + + // check ackResult for each msg + Map msgBatchAckResult = new HashMap<>(); + for (BatchAckResult batchAckResult : batchAckResultList) { + msgBatchAckResult.put(batchAckResult.getReceiptHandleMessage().getMessageId(), batchAckResult); + } + for (String msgId : broker1Msg) { + assertEquals(AckStatus.OK, msgBatchAckResult.get(msgId).getAckResult().getStatus()); + assertNull(msgBatchAckResult.get(msgId).getProxyException()); + } + for (String msgId : broker2Msg) { + assertEquals(AckStatus.NO_EXIST, msgBatchAckResult.get(msgId).getAckResult().getStatus()); + assertNull(msgBatchAckResult.get(msgId).getProxyException()); + } + assertNotNull(msgBatchAckResult.get(expireMessage.getMsgId()).getProxyException()); + assertEquals(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, msgBatchAckResult.get(expireMessage.getMsgId()).getProxyException().getCode()); + assertNull(msgBatchAckResult.get(expireMessage.getMsgId()).getAckResult()); + + assertNotNull(msgBatchAckResult.get(errThrowMessage.getMsgId()).getProxyException()); + assertEquals(ProxyExceptionCode.INTERNAL_SERVER_ERROR, msgBatchAckResult.get(errThrowMessage.getMsgId()).getProxyException().getCode()); + assertNull(msgBatchAckResult.get(errThrowMessage.getMsgId()).getAckResult()); + } + @Test public void testChangeInvisibleTime() throws Throwable { ReceiptHandle handle = create(createMessageExt(MixAll.RETRY_GROUP_TOPIC_PREFIX + TOPIC, "", 0, 3000)); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java index 77a119a29..3f3a4ae40 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java @@ -220,6 +220,18 @@ public class MQClientAPIExtTest { assertSame(ackResult, mqClientAPI.ackMessageAsync(BROKER_ADDR, new AckMessageRequestHeader(), TIMEOUT).get()); } + @Test + public void testBatchAckMessageAsync() throws Exception { + AckResult ackResult = new AckResult(); + doAnswer((Answer) mock -> { + AckCallback ackCallback = mock.getArgument(2); + ackCallback.onSuccess(ackResult); + return null; + }).when(mqClientAPI).batchAckMessageAsync(anyString(), anyLong(), any(AckCallback.class), any()); + + assertSame(ackResult, mqClientAPI.batchAckMessageAsync(BROKER_ADDR, TOPIC, CONSUMER_GROUP, new ArrayList<>(), TIMEOUT).get()); + } + @Test public void testChangeInvisibleTimeAsync() throws Exception { AckResult ackResult = new AckResult(); -- 2.32.0.windows.2