!42 backport Retry topic v2 in pop
From: @zhiliatox Reviewed-by: @hu-zongtang Signed-off-by: @hu-zongtang
This commit is contained in:
commit
ffa9e6a51d
845
patch037-backport-Retry-topic-v2-in-pop.patch
Normal file
845
patch037-backport-Retry-topic-v2-in-pop.patch
Normal file
@ -0,0 +1,845 @@
|
||||
From ca721b0145994d7f5e67b4d2fe3b7a4ad7a1c132 Mon Sep 17 00:00:00 2001
|
||||
From: zhanghong <985492783@qq.com>
|
||||
Date: Tue, 21 Nov 2023 14:03:24 +0800
|
||||
Subject: [PATCH 1/3] [ISSUE #7462] Remove deprecated LocalTransactionExecuter
|
||||
(#7463)
|
||||
|
||||
---
|
||||
.../impl/producer/DefaultMQProducerImpl.java | 9 +++----
|
||||
.../client/producer/DefaultMQProducer.java | 16 -----------
|
||||
.../producer/LocalTransactionExecuter.java | 27 -------------------
|
||||
.../rocketmq/client/producer/MQProducer.java | 3 ---
|
||||
.../producer/TransactionMQProducer.java | 16 -----------
|
||||
.../client.producer.DefaultMQProducer.schema | 1 -
|
||||
6 files changed, 4 insertions(+), 68 deletions(-)
|
||||
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
|
||||
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
||||
index b0c212e46..545f17d93 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
||||
@@ -54,7 +54,6 @@ import org.apache.rocketmq.client.latency.MQFaultStrategy;
|
||||
import org.apache.rocketmq.client.latency.Resolver;
|
||||
import org.apache.rocketmq.client.latency.ServiceDetector;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
-import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionState;
|
||||
import org.apache.rocketmq.client.producer.MessageQueueSelector;
|
||||
import org.apache.rocketmq.client.producer.RequestCallback;
|
||||
@@ -1379,10 +1378,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
||||
}
|
||||
|
||||
public TransactionSendResult sendMessageInTransaction(final Message msg,
|
||||
- final LocalTransactionExecuter localTransactionExecuter, final Object arg)
|
||||
+ final TransactionListener localTransactionListener, final Object arg)
|
||||
throws MQClientException {
|
||||
TransactionListener transactionListener = getCheckListener();
|
||||
- if (null == localTransactionExecuter && null == transactionListener) {
|
||||
+ if (null == localTransactionListener && null == transactionListener) {
|
||||
throw new MQClientException("tranExecutor is null", null);
|
||||
}
|
||||
|
||||
@@ -1414,8 +1413,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
||||
if (null != transactionId && !"".equals(transactionId)) {
|
||||
msg.setTransactionId(transactionId);
|
||||
}
|
||||
- if (null != localTransactionExecuter) {
|
||||
- localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
|
||||
+ if (null != localTransactionListener) {
|
||||
+ localTransactionState = localTransactionListener.executeLocalTransaction(msg, arg);
|
||||
} else {
|
||||
log.debug("Used new transaction API");
|
||||
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
|
||||
index c5b1b5223..7bd3876f5 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
|
||||
@@ -853,22 +853,6 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
|
||||
this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
|
||||
}
|
||||
|
||||
- /**
|
||||
- * This method is to send transactional messages.
|
||||
- *
|
||||
- * @param msg Transactional message to send.
|
||||
- * @param tranExecuter local transaction executor.
|
||||
- * @param arg Argument used along with local transaction executor.
|
||||
- * @return Transaction result.
|
||||
- * @throws MQClientException if there is any client error.
|
||||
- */
|
||||
- @Override
|
||||
- public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter,
|
||||
- final Object arg)
|
||||
- throws MQClientException {
|
||||
- throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
|
||||
- }
|
||||
-
|
||||
/**
|
||||
* This method is used to send transactional messages.
|
||||
*
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
|
||||
deleted file mode 100644
|
||||
index 267ba10bd..000000000
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
|
||||
+++ /dev/null
|
||||
@@ -1,27 +0,0 @@
|
||||
-/*
|
||||
- * Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
- * contributor license agreements. See the NOTICE file distributed with
|
||||
- * this work for additional information regarding copyright ownership.
|
||||
- * The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
- * (the "License"); you may not use this file except in compliance with
|
||||
- * the License. You may obtain a copy of the License at
|
||||
- *
|
||||
- * http://www.apache.org/licenses/LICENSE-2.0
|
||||
- *
|
||||
- * Unless required by applicable law or agreed to in writing, software
|
||||
- * distributed under the License is distributed on an "AS IS" BASIS,
|
||||
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
- * See the License for the specific language governing permissions and
|
||||
- * limitations under the License.
|
||||
- */
|
||||
-package org.apache.rocketmq.client.producer;
|
||||
-
|
||||
-import org.apache.rocketmq.common.message.Message;
|
||||
-
|
||||
-/**
|
||||
- * @deprecated This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
|
||||
- */
|
||||
-@Deprecated
|
||||
-public interface LocalTransactionExecuter {
|
||||
- LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
|
||||
-}
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
|
||||
index 78657e623..8bd30e98d 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
|
||||
@@ -81,9 +81,6 @@ public interface MQProducer extends MQAdmin {
|
||||
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
|
||||
throws MQClientException, RemotingException, InterruptedException;
|
||||
|
||||
- TransactionSendResult sendMessageInTransaction(final Message msg,
|
||||
- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
|
||||
-
|
||||
TransactionSendResult sendMessageInTransaction(final Message msg,
|
||||
final Object arg) throws MQClientException;
|
||||
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
|
||||
index baa8b4408..d529f3e77 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
|
||||
@@ -67,22 +67,6 @@ public class TransactionMQProducer extends DefaultMQProducer {
|
||||
this.defaultMQProducerImpl.destroyTransactionEnv();
|
||||
}
|
||||
|
||||
- /**
|
||||
- * This method will be removed in the version 5.0.0, method <code>sendMessageInTransaction(Message,Object)</code>}
|
||||
- * is recommended.
|
||||
- */
|
||||
- @Override
|
||||
- @Deprecated
|
||||
- public TransactionSendResult sendMessageInTransaction(final Message msg,
|
||||
- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
|
||||
- if (null == this.transactionCheckListener) {
|
||||
- throw new MQClientException("localTransactionBranchCheckListener is null", null);
|
||||
- }
|
||||
-
|
||||
- msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
|
||||
- return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
|
||||
- }
|
||||
-
|
||||
@Override
|
||||
public TransactionSendResult sendMessageInTransaction(final Message msg,
|
||||
final Object arg) throws MQClientException {
|
||||
diff --git a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema
|
||||
index 0418c73fe..d1111fb45 100644
|
||||
--- a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema
|
||||
+++ b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema
|
||||
@@ -122,7 +122,6 @@ Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq
|
||||
Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (void)
|
||||
Method send(org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.SendResult)
|
||||
Method send(org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (org.apache.rocketmq.client.producer.SendResult)
|
||||
-Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.client.producer.LocalTransactionExecuter,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult)
|
||||
Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult)
|
||||
Method sendOneway(java.lang.Object,org.apache.rocketmq.client.producer.MessageQueueSelector,org.apache.rocketmq.common.message.Message) : public throws (void)
|
||||
Method sendOneway(org.apache.rocketmq.common.message.Message) : public throws (void)
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From a7d493b2fbc153cc6cbdf2b2ffcbf19cf7cba803 Mon Sep 17 00:00:00 2001
|
||||
From: panzhi <panzhi33@qq.com>
|
||||
Date: Tue, 21 Nov 2023 20:55:35 +0800
|
||||
Subject: [PATCH 2/3] transactionProducer get the topic route before sending
|
||||
the message (#7569)
|
||||
|
||||
---
|
||||
.../impl/producer/DefaultMQProducerImpl.java | 15 +++++
|
||||
.../client/producer/DefaultMQProducer.java | 63 +++++++++++++++++++
|
||||
.../producer/TransactionMQProducer.java | 23 +++++--
|
||||
.../transaction/TransactionProducer.java | 3 +-
|
||||
4 files changed, 98 insertions(+), 6 deletions(-)
|
||||
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
||||
index 545f17d93..088bff089 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
||||
@@ -262,6 +262,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
||||
mQClientFactory.start();
|
||||
}
|
||||
|
||||
+ this.initTopicRoute();
|
||||
+
|
||||
this.mqFaultStrategy.startDetector();
|
||||
|
||||
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
|
||||
@@ -1740,6 +1742,19 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
||||
}
|
||||
}
|
||||
|
||||
+ private void initTopicRoute() {
|
||||
+ List<String> topics = this.defaultMQProducer.getTopics();
|
||||
+ if (topics != null && topics.size() > 0) {
|
||||
+ topics.forEach(topic -> {
|
||||
+ String newTopic = NamespaceUtil.wrapNamespace(this.defaultMQProducer.getNamespace(), topic);
|
||||
+ TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(newTopic);
|
||||
+ if (topicPublishInfo == null || !topicPublishInfo.ok()) {
|
||||
+ log.warn("No route info of this topic: " + newTopic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO));
|
||||
+ }
|
||||
+ });
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
|
||||
return topicPublishInfoTable;
|
||||
}
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
|
||||
index 7bd3876f5..700e00aac 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
|
||||
@@ -87,6 +87,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
|
||||
*/
|
||||
private String producerGroup;
|
||||
|
||||
+ /**
|
||||
+ * Topics that need to be initialized for transaction producer
|
||||
+ */
|
||||
+ private List<String> topics;
|
||||
+
|
||||
/**
|
||||
* Just for testing or demo program
|
||||
*/
|
||||
@@ -235,6 +240,22 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
|
||||
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
|
||||
}
|
||||
|
||||
+ /**
|
||||
+ * Constructor specifying namespace, producer group, topics and RPC hook.
|
||||
+ *
|
||||
+ * @param namespace Namespace for this MQ Producer instance.
|
||||
+ * @param producerGroup Producer group, see the name-sake field.
|
||||
+ * @param topics Topic that needs to be initialized for routing
|
||||
+ * @param rpcHook RPC hook to execute per each remoting command execution.
|
||||
+ */
|
||||
+ public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) {
|
||||
+ this.namespace = namespace;
|
||||
+ this.producerGroup = producerGroup;
|
||||
+ this.topics = topics;
|
||||
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
|
||||
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
|
||||
+ }
|
||||
+
|
||||
/**
|
||||
* Constructor specifying producer group and enabled msg trace flag.
|
||||
*
|
||||
@@ -290,6 +311,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
|
||||
}
|
||||
}
|
||||
|
||||
+ /**
|
||||
+ * Constructor specifying namespace, producer group, topics, RPC hook, enabled msgTrace flag and customized trace topic
|
||||
+ * name.
|
||||
+ *
|
||||
+ * @param namespace Namespace for this MQ Producer instance.
|
||||
+ * @param producerGroup Producer group, see the name-sake field.
|
||||
+ * @param topics Topic that needs to be initialized for routing
|
||||
+ * @param rpcHook RPC hook to execute per each remoting command execution.
|
||||
+ * @param enableMsgTrace Switch flag instance for message trace.
|
||||
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
|
||||
+ * trace topic name.
|
||||
+ */
|
||||
+ public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics,
|
||||
+ RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
|
||||
+ this.namespace = namespace;
|
||||
+ this.producerGroup = producerGroup;
|
||||
+ this.topics = topics;
|
||||
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
|
||||
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
|
||||
+ //if client open the message trace feature
|
||||
+ if (enableMsgTrace) {
|
||||
+ try {
|
||||
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
|
||||
+ dispatcher.setHostProducer(this.defaultMQProducerImpl);
|
||||
+ traceDispatcher = dispatcher;
|
||||
+ this.defaultMQProducerImpl.registerSendMessageHook(
|
||||
+ new SendMessageTraceHookImpl(traceDispatcher));
|
||||
+ this.defaultMQProducerImpl.registerEndTransactionHook(
|
||||
+ new EndTransactionTraceHookImpl(traceDispatcher));
|
||||
+ } catch (Throwable e) {
|
||||
+ logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
@Override
|
||||
public void setUseTLS(boolean useTLS) {
|
||||
super.setUseTLS(useTLS);
|
||||
@@ -1316,4 +1372,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
|
||||
defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize);
|
||||
}
|
||||
|
||||
+ public List<String> getTopics() {
|
||||
+ return topics;
|
||||
+ }
|
||||
+
|
||||
+ public void setTopics(List<String> topics) {
|
||||
+ this.topics = topics;
|
||||
+ }
|
||||
}
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
|
||||
index d529f3e77..2c3b479f7 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
|
||||
@@ -16,6 +16,7 @@
|
||||
*/
|
||||
package org.apache.rocketmq.client.producer;
|
||||
|
||||
+import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
@@ -36,19 +37,31 @@ public class TransactionMQProducer extends DefaultMQProducer {
|
||||
}
|
||||
|
||||
public TransactionMQProducer(final String producerGroup) {
|
||||
- this(null, producerGroup, null);
|
||||
+ this(null, producerGroup, null, null);
|
||||
+ }
|
||||
+
|
||||
+ public TransactionMQProducer(final String producerGroup, final List<String> topics) {
|
||||
+ this(null, producerGroup, topics, null);
|
||||
}
|
||||
|
||||
public TransactionMQProducer(final String namespace, final String producerGroup) {
|
||||
- this(namespace, producerGroup, null);
|
||||
+ this(namespace, producerGroup, null, null);
|
||||
+ }
|
||||
+
|
||||
+ public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics) {
|
||||
+ this(namespace, producerGroup, topics, null);
|
||||
}
|
||||
|
||||
public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
|
||||
- this(null, producerGroup, rpcHook);
|
||||
+ this(null, producerGroup, null, rpcHook);
|
||||
+ }
|
||||
+
|
||||
+ public TransactionMQProducer(final String producerGroup, final List<String> topics, RPCHook rpcHook) {
|
||||
+ this(null, producerGroup, topics, rpcHook);
|
||||
}
|
||||
|
||||
- public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
|
||||
- super(namespace, producerGroup, rpcHook);
|
||||
+ public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) {
|
||||
+ super(namespace, producerGroup, topics, rpcHook);
|
||||
}
|
||||
|
||||
public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
|
||||
diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
|
||||
index 5973c3c30..d1d57c55e 100644
|
||||
--- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
|
||||
+++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
|
||||
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.remoting.common.RemotingHelper;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
+import java.util.Arrays;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
@@ -39,7 +40,7 @@ public class TransactionProducer {
|
||||
|
||||
public static void main(String[] args) throws MQClientException, InterruptedException {
|
||||
TransactionListener transactionListener = new TransactionListenerImpl();
|
||||
- TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
|
||||
+ TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC));
|
||||
|
||||
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
|
||||
// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 5b43387be33506e4c19df4783724d06b1dfdc062 Mon Sep 17 00:00:00 2001
|
||||
From: Zhouxiang Zhan <zhouxzhan@apache.org>
|
||||
Date: Thu, 23 Nov 2023 14:53:48 +0800
|
||||
Subject: [PATCH 3/3] [ISSUE #7543] Retry topic v2 in pop (#7544)
|
||||
|
||||
* Implement pop retry topic v2
|
||||
|
||||
* Use pop retry topic v2 to notify the origin topic
|
||||
|
||||
* add parse group
|
||||
|
||||
* retry topic v2 compatibility
|
||||
|
||||
* calculate consumer lag
|
||||
|
||||
* delete retry topic
|
||||
---
|
||||
.../acl/plain/PlainAccessResource.java | 3 +-
|
||||
.../ExpressionForRetryMessageFilter.java | 3 +-
|
||||
.../NotifyMessageArrivingListener.java | 3 +-
|
||||
.../longpolling/PopLongPollingService.java | 10 +++
|
||||
.../broker/metrics/ConsumerLagCalculator.java | 11 ++++
|
||||
.../processor/AdminBrokerProcessor.java | 4 ++
|
||||
.../processor/NotificationProcessor.java | 2 +-
|
||||
.../broker/processor/PopMessageProcessor.java | 24 ++++++-
|
||||
.../broker/processor/PopReviveService.java | 9 ---
|
||||
.../processor/SendMessageProcessor.java | 3 +-
|
||||
.../apache/rocketmq/common/BrokerConfig.java | 10 +++
|
||||
.../apache/rocketmq/common/KeyBuilder.java | 37 +++++++++--
|
||||
.../rocketmq/common/KeyBuilderTest.java | 65 +++++++++++++++++++
|
||||
.../consumer/ConsumerProgressSubCommand.java | 3 +-
|
||||
.../tools/monitor/MonitorService.java | 3 +-
|
||||
15 files changed, 168 insertions(+), 22 deletions(-)
|
||||
create mode 100644 common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
|
||||
|
||||
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
|
||||
index 72aa8ca71..1e185afff 100644
|
||||
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
|
||||
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
|
||||
@@ -48,6 +48,7 @@ import org.apache.rocketmq.acl.common.AuthenticationHeader;
|
||||
import org.apache.rocketmq.acl.common.AuthorizationHeader;
|
||||
import org.apache.rocketmq.acl.common.Permission;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
+import org.apache.rocketmq.common.KeyBuilder;
|
||||
import org.apache.rocketmq.common.MQVersion;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.PlainAccessConfig;
|
||||
@@ -341,7 +342,7 @@ public class PlainAccessResource implements AccessResource {
|
||||
if (retryTopic == null) {
|
||||
return null;
|
||||
}
|
||||
- return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ return KeyBuilder.parseGroup(retryTopic);
|
||||
}
|
||||
|
||||
public static String getRetryTopic(String group) {
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
|
||||
index bc01b21cb..cc3e37bf4 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
|
||||
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.filter;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
+import org.apache.rocketmq.common.KeyBuilder;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.filter.ExpressionType;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
@@ -62,7 +63,7 @@ public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
|
||||
tempProperties = MessageDecoder.decodeProperties(msgBuffer);
|
||||
}
|
||||
String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC);
|
||||
- String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ String group = KeyBuilder.parseGroup(subscriptionData.getTopic());
|
||||
realFilterData = this.consumerFilterManager.get(realTopic, group);
|
||||
}
|
||||
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
|
||||
index 3c099fe2f..e55ed2778 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
|
||||
@@ -17,12 +17,11 @@
|
||||
|
||||
package org.apache.rocketmq.broker.longpolling;
|
||||
|
||||
+import java.util.Map;
|
||||
import org.apache.rocketmq.broker.processor.NotificationProcessor;
|
||||
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
|
||||
import org.apache.rocketmq.store.MessageArrivingListener;
|
||||
|
||||
-import java.util.Map;
|
||||
-
|
||||
public class NotifyMessageArrivingListener implements MessageArrivingListener {
|
||||
private final PullRequestHoldService pullRequestHoldService;
|
||||
private final PopMessageProcessor popMessageProcessor;
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
|
||||
index 113c91297..f1bc9adc4 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
|
||||
@@ -144,6 +144,16 @@ public class PopLongPollingService extends ServiceThread {
|
||||
}
|
||||
}
|
||||
|
||||
+ public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId) {
|
||||
+ String notifyTopic;
|
||||
+ if (KeyBuilder.isPopRetryTopicV2(topic)) {
|
||||
+ notifyTopic = KeyBuilder.parseNormalTopic(topic);
|
||||
+ } else {
|
||||
+ notifyTopic = topic;
|
||||
+ }
|
||||
+ notifyMessageArriving(notifyTopic, queueId);
|
||||
+ }
|
||||
+
|
||||
public void notifyMessageArriving(final String topic, final int queueId) {
|
||||
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
|
||||
if (cids == null) {
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
|
||||
index af08a83c7..d1f3fffde 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
|
||||
@@ -185,6 +185,17 @@ public class ConsumerLagCalculator {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
+ if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
|
||||
+ String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
|
||||
+ TopicConfig retryTopicConfigV1 = topicConfigManager.selectTopicConfig(retryTopicV1);
|
||||
+ if (retryTopicConfigV1 != null) {
|
||||
+ int retryTopicPerm = retryTopicConfigV1.getPerm() & brokerConfig.getBrokerPermission();
|
||||
+ if (PermName.isReadable(retryTopicPerm) || PermName.isWriteable(retryTopicPerm)) {
|
||||
+ consumer.accept(new ProcessGroupInfo(group, topic, true, retryTopicV1));
|
||||
+ continue;
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
consumer.accept(new ProcessGroupInfo(group, topic, true, null));
|
||||
} else {
|
||||
consumer.accept(new ProcessGroupInfo(group, topic, false, null));
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||||
index fbba6633b..863b275d1 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||||
@@ -548,6 +548,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||||
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) {
|
||||
deleteTopicInBroker(popRetryTopic);
|
||||
}
|
||||
+ final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
|
||||
+ if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) {
|
||||
+ deleteTopicInBroker(popRetryTopicV1);
|
||||
+ }
|
||||
}
|
||||
// delete topic
|
||||
deleteTopicInBroker(topic);
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
|
||||
index a15340383..91d275dfe 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
|
||||
@@ -58,7 +58,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
|
||||
}
|
||||
|
||||
public void notifyMessageArriving(final String topic, final int queueId) {
|
||||
- popLongPollingService.notifyMessageArriving(topic, queueId);
|
||||
+ popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId);
|
||||
}
|
||||
|
||||
@Override
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
|
||||
index 7ed4d53ab..58baecc05 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
|
||||
@@ -185,7 +185,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
|
||||
}
|
||||
|
||||
public void notifyMessageArriving(final String topic, final int queueId) {
|
||||
- popLongPollingService.notifyMessageArriving(topic, queueId);
|
||||
+ popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId);
|
||||
}
|
||||
|
||||
public boolean notifyMessageArriving(final String topic, final String cid, final int queueId) {
|
||||
@@ -364,6 +364,17 @@ public class PopMessageProcessor implements NettyRequestProcessor {
|
||||
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
|
||||
}
|
||||
}
|
||||
+ if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
|
||||
+ TopicConfig retryTopicConfigV1 =
|
||||
+ this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
|
||||
+ if (retryTopicConfigV1 != null) {
|
||||
+ for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) {
|
||||
+ int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums();
|
||||
+ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
|
||||
+ startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
}
|
||||
if (requestHeader.getQueueId() < 0) {
|
||||
// read all queue
|
||||
@@ -388,6 +399,17 @@ public class PopMessageProcessor implements NettyRequestProcessor {
|
||||
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
|
||||
}
|
||||
}
|
||||
+ if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
|
||||
+ TopicConfig retryTopicConfigV1 =
|
||||
+ this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
|
||||
+ if (retryTopicConfigV1 != null) {
|
||||
+ for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) {
|
||||
+ int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums();
|
||||
+ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
|
||||
+ startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
}
|
||||
|
||||
final RemotingCommand finalResponse = response;
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
|
||||
index 3fb689ed6..8d25bc57e 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
|
||||
@@ -142,15 +142,6 @@ public class PopReviveService extends ServiceThread {
|
||||
this.brokerController.getBrokerStatsManager().incBrokerPutNums(popCheckPoint.getTopic(), 1);
|
||||
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
|
||||
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
|
||||
- if (brokerController.getPopMessageProcessor() != null) {
|
||||
- brokerController.getPopMessageProcessor().notifyMessageArriving(
|
||||
- KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
|
||||
- popCheckPoint.getCId(),
|
||||
- -1
|
||||
- );
|
||||
- brokerController.getNotificationProcessor().notifyMessageArriving(
|
||||
- KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
|
||||
- }
|
||||
return true;
|
||||
}
|
||||
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
|
||||
index 956ef43fb..4ec84c146 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
|
||||
@@ -28,6 +28,7 @@ import org.apache.rocketmq.broker.BrokerController;
|
||||
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
|
||||
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
|
||||
import org.apache.rocketmq.common.AbortProcessException;
|
||||
+import org.apache.rocketmq.common.KeyBuilder;
|
||||
import org.apache.rocketmq.common.MQVersion;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.TopicConfig;
|
||||
@@ -178,7 +179,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
|
||||
MessageExt msg, TopicConfig topicConfig, Map<String, String> properties) {
|
||||
String newTopic = requestHeader.getTopic();
|
||||
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
|
||||
- String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ String groupName = KeyBuilder.parseGroup(newTopic);
|
||||
SubscriptionGroupConfig subscriptionGroupConfig =
|
||||
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
|
||||
if (null == subscriptionGroupConfig) {
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||||
index 0d248c4e1..c186352d1 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||||
@@ -223,6 +223,8 @@ public class BrokerConfig extends BrokerIdentity {
|
||||
private boolean enablePopBatchAck = false;
|
||||
private boolean enableNotifyAfterPopOrderLockRelease = true;
|
||||
private boolean initPopOffsetByCheckMsgInMem = true;
|
||||
+ // read message from pop retry topic v1, for the compatibility, will be removed in the future version
|
||||
+ private boolean retrieveMessageFromPopRetryTopicV1 = true;
|
||||
|
||||
private boolean realTimeNotifyConsumerChange = true;
|
||||
|
||||
@@ -1284,6 +1286,14 @@ public class BrokerConfig extends BrokerIdentity {
|
||||
this.initPopOffsetByCheckMsgInMem = initPopOffsetByCheckMsgInMem;
|
||||
}
|
||||
|
||||
+ public boolean isRetrieveMessageFromPopRetryTopicV1() {
|
||||
+ return retrieveMessageFromPopRetryTopicV1;
|
||||
+ }
|
||||
+
|
||||
+ public void setRetrieveMessageFromPopRetryTopicV1(boolean retrieveMessageFromPopRetryTopicV1) {
|
||||
+ this.retrieveMessageFromPopRetryTopicV1 = retrieveMessageFromPopRetryTopicV1;
|
||||
+ }
|
||||
+
|
||||
public boolean isRealTimeNotifyConsumerChange() {
|
||||
return realTimeNotifyConsumerChange;
|
||||
}
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
|
||||
index e1532d939..f2a8c4089 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
|
||||
@@ -18,24 +18,53 @@ package org.apache.rocketmq.common;
|
||||
|
||||
public class KeyBuilder {
|
||||
public static final int POP_ORDER_REVIVE_QUEUE = 999;
|
||||
+ private static final String POP_RETRY_SEPARATOR_V1 = "_";
|
||||
+ private static final String POP_RETRY_SEPARATOR_V2 = ":";
|
||||
|
||||
public static String buildPopRetryTopic(String topic, String cid) {
|
||||
- return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + topic;
|
||||
+ return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2 + topic;
|
||||
+ }
|
||||
+
|
||||
+ public static String buildPopRetryTopicV1(String topic, String cid) {
|
||||
+ return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1 + topic;
|
||||
}
|
||||
|
||||
public static String parseNormalTopic(String topic, String cid) {
|
||||
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
|
||||
- return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_").length());
|
||||
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2)) {
|
||||
+ return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2).length());
|
||||
+ }
|
||||
+ return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1).length());
|
||||
} else {
|
||||
return topic;
|
||||
}
|
||||
}
|
||||
|
||||
+ public static String parseNormalTopic(String retryTopic) {
|
||||
+ if (isPopRetryTopicV2(retryTopic)) {
|
||||
+ String[] result = retryTopic.split(POP_RETRY_SEPARATOR_V2);
|
||||
+ if (result.length == 2) {
|
||||
+ return result[1];
|
||||
+ }
|
||||
+ }
|
||||
+ return retryTopic;
|
||||
+ }
|
||||
+
|
||||
+ public static String parseGroup(String retryTopic) {
|
||||
+ if (isPopRetryTopicV2(retryTopic)) {
|
||||
+ String[] result = retryTopic.split(POP_RETRY_SEPARATOR_V2);
|
||||
+ if (result.length == 2) {
|
||||
+ return result[0].substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ }
|
||||
+ }
|
||||
+ return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ }
|
||||
+
|
||||
public static String buildPollingKey(String topic, String cid, int queueId) {
|
||||
return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId;
|
||||
}
|
||||
|
||||
- public static String buildPollingNotificationKey(String topic, int queueId) {
|
||||
- return topic + PopAckConstants.SPLIT + queueId;
|
||||
+ public static boolean isPopRetryTopicV2(String retryTopic) {
|
||||
+ return retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && retryTopic.contains(POP_RETRY_SEPARATOR_V2);
|
||||
}
|
||||
}
|
||||
diff --git a/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
|
||||
new file mode 100644
|
||||
index 000000000..f83e0aa14
|
||||
--- /dev/null
|
||||
+++ b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
|
||||
@@ -0,0 +1,65 @@
|
||||
+/*
|
||||
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
+ * contributor license agreements. See the NOTICE file distributed with
|
||||
+ * this work for additional information regarding copyright ownership.
|
||||
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
+ * (the "License"); you may not use this file except in compliance with
|
||||
+ * the License. You may obtain a copy of the License at
|
||||
+ *
|
||||
+ * http://www.apache.org/licenses/LICENSE-2.0
|
||||
+ *
|
||||
+ * Unless required by applicable law or agreed to in writing, software
|
||||
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
||||
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
+ * See the License for the specific language governing permissions and
|
||||
+ * limitations under the License.
|
||||
+ */
|
||||
+
|
||||
+package org.apache.rocketmq.common;
|
||||
+
|
||||
+import org.junit.Test;
|
||||
+
|
||||
+import static org.assertj.core.api.Assertions.assertThat;
|
||||
+
|
||||
+public class KeyBuilderTest {
|
||||
+ String topic = "test-topic";
|
||||
+ String group = "test-group";
|
||||
+
|
||||
+ @Test
|
||||
+ public void buildPopRetryTopic() {
|
||||
+ assertThat(KeyBuilder.buildPopRetryTopic(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + ":" + topic);
|
||||
+ }
|
||||
+
|
||||
+ @Test
|
||||
+ public void buildPopRetryTopicV1() {
|
||||
+ assertThat(KeyBuilder.buildPopRetryTopicV1(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + "_" + topic);
|
||||
+ }
|
||||
+
|
||||
+ @Test
|
||||
+ public void parseNormalTopic() {
|
||||
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
|
||||
+ assertThat(KeyBuilder.parseNormalTopic(popRetryTopic, group)).isEqualTo(topic);
|
||||
+ String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
|
||||
+ assertThat(KeyBuilder.parseNormalTopic(popRetryTopicV1, group)).isEqualTo(topic);
|
||||
+ }
|
||||
+
|
||||
+ @Test
|
||||
+ public void testParseNormalTopic() {
|
||||
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
|
||||
+ assertThat(KeyBuilder.parseNormalTopic(popRetryTopic)).isEqualTo(topic);
|
||||
+ }
|
||||
+
|
||||
+ @Test
|
||||
+ public void parseGroup() {
|
||||
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
|
||||
+ assertThat(KeyBuilder.parseGroup(popRetryTopic)).isEqualTo(group);
|
||||
+ }
|
||||
+
|
||||
+ @Test
|
||||
+ public void isPopRetryTopicV2() {
|
||||
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
|
||||
+ assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopic)).isEqualTo(true);
|
||||
+ String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
|
||||
+ assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopicV1)).isEqualTo(false);
|
||||
+ }
|
||||
+}
|
||||
\ No newline at end of file
|
||||
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
|
||||
index 97125b854..c489cad68 100644
|
||||
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
|
||||
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
|
||||
@@ -25,6 +25,7 @@ import java.util.Map;
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.Option;
|
||||
import org.apache.commons.cli.Options;
|
||||
+import org.apache.rocketmq.common.KeyBuilder;
|
||||
import org.apache.rocketmq.common.MQVersion;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.UtilAll;
|
||||
@@ -212,7 +213,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
|
||||
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
|
||||
for (String topic : topicList.getTopicList()) {
|
||||
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
|
||||
- String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ String consumerGroup = KeyBuilder.parseGroup(topic);
|
||||
try {
|
||||
ConsumeStats consumeStats = null;
|
||||
try {
|
||||
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
|
||||
index 45dc3a036..b66dfad20 100644
|
||||
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
|
||||
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
|
||||
@@ -34,6 +34,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
+import org.apache.rocketmq.common.KeyBuilder;
|
||||
import org.apache.rocketmq.common.MQVersion;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
||||
@@ -172,7 +173,7 @@ public class MonitorService {
|
||||
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
|
||||
for (String topic : topicList.getTopicList()) {
|
||||
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
|
||||
- String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ String consumerGroup = KeyBuilder.parseGroup(topic);
|
||||
|
||||
try {
|
||||
this.reportUndoneMsgs(consumerGroup);
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
Summary: Cloud-Native, Distributed Messaging and Streaming
|
||||
Name: rocketmq
|
||||
Version: 5.1.5
|
||||
Release: 37
|
||||
Release: 38
|
||||
License: Apache-2.0
|
||||
Group: Applications/Message
|
||||
URL: https://rocketmq.apache.org/
|
||||
@ -46,6 +46,7 @@ Patch0033: patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.pat
|
||||
Patch0034: patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch
|
||||
Patch0035: patch035-backport-fix-some-bugs.patch
|
||||
Patch0036: patch036-backport-RIP65.patch
|
||||
Patch0037: patch037-backport-Retry-topic-v2-in-pop.patch
|
||||
BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git
|
||||
Requires: java-1.8.0-openjdk-devel
|
||||
|
||||
@ -86,6 +87,9 @@ exit 0
|
||||
|
||||
|
||||
%changelog
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-38
|
||||
- backport Retry topic v2 in pop
|
||||
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-37
|
||||
- backport rip 65
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user