rocketmq/patch037-backport-Retry-topic-v2-in-pop.patch
2023-12-11 15:30:10 +08:00

846 lines
45 KiB
Diff

From ca721b0145994d7f5e67b4d2fe3b7a4ad7a1c132 Mon Sep 17 00:00:00 2001
From: zhanghong <985492783@qq.com>
Date: Tue, 21 Nov 2023 14:03:24 +0800
Subject: [PATCH 1/3] [ISSUE #7462] Remove deprecated LocalTransactionExecuter
(#7463)
---
.../impl/producer/DefaultMQProducerImpl.java | 9 +++----
.../client/producer/DefaultMQProducer.java | 16 -----------
.../producer/LocalTransactionExecuter.java | 27 -------------------
.../rocketmq/client/producer/MQProducer.java | 3 ---
.../producer/TransactionMQProducer.java | 16 -----------
.../client.producer.DefaultMQProducer.schema | 1 -
6 files changed, 4 insertions(+), 68 deletions(-)
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index b0c212e46..545f17d93 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -54,7 +54,6 @@ import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.client.latency.Resolver;
import org.apache.rocketmq.client.latency.ServiceDetector;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.RequestCallback;
@@ -1379,10 +1378,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public TransactionSendResult sendMessageInTransaction(final Message msg,
- final LocalTransactionExecuter localTransactionExecuter, final Object arg)
+ final TransactionListener localTransactionListener, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
- if (null == localTransactionExecuter && null == transactionListener) {
+ if (null == localTransactionListener && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
@@ -1414,8 +1413,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
- if (null != localTransactionExecuter) {
- localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
+ if (null != localTransactionListener) {
+ localTransactionState = localTransactionListener.executeLocalTransaction(msg, arg);
} else {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index c5b1b5223..7bd3876f5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -853,22 +853,6 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
}
- /**
- * This method is to send transactional messages.
- *
- * @param msg Transactional message to send.
- * @param tranExecuter local transaction executor.
- * @param arg Argument used along with local transaction executor.
- * @return Transaction result.
- * @throws MQClientException if there is any client error.
- */
- @Override
- public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter,
- final Object arg)
- throws MQClientException {
- throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
- }
-
/**
* This method is used to send transactional messages.
*
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
deleted file mode 100644
index 267ba10bd..000000000
--- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.producer;
-
-import org.apache.rocketmq.common.message.Message;
-
-/**
- * @deprecated This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
- */
-@Deprecated
-public interface LocalTransactionExecuter {
- LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index 78657e623..8bd30e98d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -81,9 +81,6 @@ public interface MQProducer extends MQAdmin {
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, InterruptedException;
- TransactionSendResult sendMessageInTransaction(final Message msg,
- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
-
TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException;
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
index baa8b4408..d529f3e77 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
@@ -67,22 +67,6 @@ public class TransactionMQProducer extends DefaultMQProducer {
this.defaultMQProducerImpl.destroyTransactionEnv();
}
- /**
- * This method will be removed in the version 5.0.0, method <code>sendMessageInTransaction(Message,Object)</code>}
- * is recommended.
- */
- @Override
- @Deprecated
- public TransactionSendResult sendMessageInTransaction(final Message msg,
- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
- if (null == this.transactionCheckListener) {
- throw new MQClientException("localTransactionBranchCheckListener is null", null);
- }
-
- msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
- return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
- }
-
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
diff --git a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema
index 0418c73fe..d1111fb45 100644
--- a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema
+++ b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema
@@ -122,7 +122,6 @@ Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq
Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (void)
Method send(org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.SendResult)
Method send(org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (org.apache.rocketmq.client.producer.SendResult)
-Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.client.producer.LocalTransactionExecuter,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult)
Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult)
Method sendOneway(java.lang.Object,org.apache.rocketmq.client.producer.MessageQueueSelector,org.apache.rocketmq.common.message.Message) : public throws (void)
Method sendOneway(org.apache.rocketmq.common.message.Message) : public throws (void)
--
2.32.0.windows.2
From a7d493b2fbc153cc6cbdf2b2ffcbf19cf7cba803 Mon Sep 17 00:00:00 2001
From: panzhi <panzhi33@qq.com>
Date: Tue, 21 Nov 2023 20:55:35 +0800
Subject: [PATCH 2/3] transactionProducer get the topic route before sending
the message (#7569)
---
.../impl/producer/DefaultMQProducerImpl.java | 15 +++++
.../client/producer/DefaultMQProducer.java | 63 +++++++++++++++++++
.../producer/TransactionMQProducer.java | 23 +++++--
.../transaction/TransactionProducer.java | 3 +-
4 files changed, 98 insertions(+), 6 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 545f17d93..088bff089 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -262,6 +262,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
mQClientFactory.start();
}
+ this.initTopicRoute();
+
this.mqFaultStrategy.startDetector();
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
@@ -1740,6 +1742,19 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
+ private void initTopicRoute() {
+ List<String> topics = this.defaultMQProducer.getTopics();
+ if (topics != null && topics.size() > 0) {
+ topics.forEach(topic -> {
+ String newTopic = NamespaceUtil.wrapNamespace(this.defaultMQProducer.getNamespace(), topic);
+ TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(newTopic);
+ if (topicPublishInfo == null || !topicPublishInfo.ok()) {
+ log.warn("No route info of this topic: " + newTopic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO));
+ }
+ });
+ }
+ }
+
public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
return topicPublishInfoTable;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 7bd3876f5..700e00aac 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -87,6 +87,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private String producerGroup;
+ /**
+ * Topics that need to be initialized for transaction producer
+ */
+ private List<String> topics;
+
/**
* Just for testing or demo program
*/
@@ -235,6 +240,22 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
+ /**
+ * Constructor specifying namespace, producer group, topics and RPC hook.
+ *
+ * @param namespace Namespace for this MQ Producer instance.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param topics Topic that needs to be initialized for routing
+ * @param rpcHook RPC hook to execute per each remoting command execution.
+ */
+ public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) {
+ this.namespace = namespace;
+ this.producerGroup = producerGroup;
+ this.topics = topics;
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+ }
+
/**
* Constructor specifying producer group and enabled msg trace flag.
*
@@ -290,6 +311,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
}
+ /**
+ * Constructor specifying namespace, producer group, topics, RPC hook, enabled msgTrace flag and customized trace topic
+ * name.
+ *
+ * @param namespace Namespace for this MQ Producer instance.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param topics Topic that needs to be initialized for routing
+ * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param enableMsgTrace Switch flag instance for message trace.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+ * trace topic name.
+ */
+ public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics,
+ RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
+ this.namespace = namespace;
+ this.producerGroup = producerGroup;
+ this.topics = topics;
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+ //if client open the message trace feature
+ if (enableMsgTrace) {
+ try {
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
+ dispatcher.setHostProducer(this.defaultMQProducerImpl);
+ traceDispatcher = dispatcher;
+ this.defaultMQProducerImpl.registerSendMessageHook(
+ new SendMessageTraceHookImpl(traceDispatcher));
+ this.defaultMQProducerImpl.registerEndTransactionHook(
+ new EndTransactionTraceHookImpl(traceDispatcher));
+ } catch (Throwable e) {
+ logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
+ }
+ }
+ }
+
@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
@@ -1316,4 +1372,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize);
}
+ public List<String> getTopics() {
+ return topics;
+ }
+
+ public void setTopics(List<String> topics) {
+ this.topics = topics;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
index d529f3e77..2c3b479f7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.producer;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
@@ -36,19 +37,31 @@ public class TransactionMQProducer extends DefaultMQProducer {
}
public TransactionMQProducer(final String producerGroup) {
- this(null, producerGroup, null);
+ this(null, producerGroup, null, null);
+ }
+
+ public TransactionMQProducer(final String producerGroup, final List<String> topics) {
+ this(null, producerGroup, topics, null);
}
public TransactionMQProducer(final String namespace, final String producerGroup) {
- this(namespace, producerGroup, null);
+ this(namespace, producerGroup, null, null);
+ }
+
+ public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics) {
+ this(namespace, producerGroup, topics, null);
}
public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
- this(null, producerGroup, rpcHook);
+ this(null, producerGroup, null, rpcHook);
+ }
+
+ public TransactionMQProducer(final String producerGroup, final List<String> topics, RPCHook rpcHook) {
+ this(null, producerGroup, topics, rpcHook);
}
- public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
- super(namespace, producerGroup, rpcHook);
+ public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) {
+ super(namespace, producerGroup, topics, rpcHook);
}
public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
index 5973c3c30..d1d57c55e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -39,7 +40,7 @@ public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
- TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
+ TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC));
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
--
2.32.0.windows.2
From 5b43387be33506e4c19df4783724d06b1dfdc062 Mon Sep 17 00:00:00 2001
From: Zhouxiang Zhan <zhouxzhan@apache.org>
Date: Thu, 23 Nov 2023 14:53:48 +0800
Subject: [PATCH 3/3] [ISSUE #7543] Retry topic v2 in pop (#7544)
* Implement pop retry topic v2
* Use pop retry topic v2 to notify the origin topic
* add parse group
* retry topic v2 compatibility
* calculate consumer lag
* delete retry topic
---
.../acl/plain/PlainAccessResource.java | 3 +-
.../ExpressionForRetryMessageFilter.java | 3 +-
.../NotifyMessageArrivingListener.java | 3 +-
.../longpolling/PopLongPollingService.java | 10 +++
.../broker/metrics/ConsumerLagCalculator.java | 11 ++++
.../processor/AdminBrokerProcessor.java | 4 ++
.../processor/NotificationProcessor.java | 2 +-
.../broker/processor/PopMessageProcessor.java | 24 ++++++-
.../broker/processor/PopReviveService.java | 9 ---
.../processor/SendMessageProcessor.java | 3 +-
.../apache/rocketmq/common/BrokerConfig.java | 10 +++
.../apache/rocketmq/common/KeyBuilder.java | 37 +++++++++--
.../rocketmq/common/KeyBuilderTest.java | 65 +++++++++++++++++++
.../consumer/ConsumerProgressSubCommand.java | 3 +-
.../tools/monitor/MonitorService.java | 3 +-
15 files changed, 168 insertions(+), 22 deletions(-)
create mode 100644 common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
index 72aa8ca71..1e185afff 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
@@ -48,6 +48,7 @@ import org.apache.rocketmq.acl.common.AuthenticationHeader;
import org.apache.rocketmq.acl.common.AuthorizationHeader;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
@@ -341,7 +342,7 @@ public class PlainAccessResource implements AccessResource {
if (retryTopic == null) {
return null;
}
- return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ return KeyBuilder.parseGroup(retryTopic);
}
public static String getRetryTopic(String group) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
index bc01b21cb..cc3e37bf4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.filter;
import java.nio.ByteBuffer;
import java.util.Map;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageConst;
@@ -62,7 +63,7 @@ public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}
String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC);
- String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ String group = KeyBuilder.parseGroup(subscriptionData.getTopic());
realFilterData = this.consumerFilterManager.get(realTopic, group);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
index 3c099fe2f..e55ed2778 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
@@ -17,12 +17,11 @@
package org.apache.rocketmq.broker.longpolling;
+import java.util.Map;
import org.apache.rocketmq.broker.processor.NotificationProcessor;
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
import org.apache.rocketmq.store.MessageArrivingListener;
-import java.util.Map;
-
public class NotifyMessageArrivingListener implements MessageArrivingListener {
private final PullRequestHoldService pullRequestHoldService;
private final PopMessageProcessor popMessageProcessor;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index 113c91297..f1bc9adc4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -144,6 +144,16 @@ public class PopLongPollingService extends ServiceThread {
}
}
+ public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId) {
+ String notifyTopic;
+ if (KeyBuilder.isPopRetryTopicV2(topic)) {
+ notifyTopic = KeyBuilder.parseNormalTopic(topic);
+ } else {
+ notifyTopic = topic;
+ }
+ notifyMessageArriving(notifyTopic, queueId);
+ }
+
public void notifyMessageArriving(final String topic, final int queueId) {
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
if (cids == null) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index af08a83c7..d1f3fffde 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -185,6 +185,17 @@ public class ConsumerLagCalculator {
continue;
}
}
+ if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
+ String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
+ TopicConfig retryTopicConfigV1 = topicConfigManager.selectTopicConfig(retryTopicV1);
+ if (retryTopicConfigV1 != null) {
+ int retryTopicPerm = retryTopicConfigV1.getPerm() & brokerConfig.getBrokerPermission();
+ if (PermName.isReadable(retryTopicPerm) || PermName.isWriteable(retryTopicPerm)) {
+ consumer.accept(new ProcessGroupInfo(group, topic, true, retryTopicV1));
+ continue;
+ }
+ }
+ }
consumer.accept(new ProcessGroupInfo(group, topic, true, null));
} else {
consumer.accept(new ProcessGroupInfo(group, topic, false, null));
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index fbba6633b..863b275d1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -548,6 +548,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) {
deleteTopicInBroker(popRetryTopic);
}
+ final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
+ if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) {
+ deleteTopicInBroker(popRetryTopicV1);
+ }
}
// delete topic
deleteTopicInBroker(topic);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index a15340383..91d275dfe 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -58,7 +58,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
}
public void notifyMessageArriving(final String topic, final int queueId) {
- popLongPollingService.notifyMessageArriving(topic, queueId);
+ popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId);
}
@Override
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 7ed4d53ab..58baecc05 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -185,7 +185,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
}
public void notifyMessageArriving(final String topic, final int queueId) {
- popLongPollingService.notifyMessageArriving(topic, queueId);
+ popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId);
}
public boolean notifyMessageArriving(final String topic, final String cid, final int queueId) {
@@ -364,6 +364,17 @@ public class PopMessageProcessor implements NettyRequestProcessor {
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
}
+ if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
+ TopicConfig retryTopicConfigV1 =
+ this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
+ if (retryTopicConfigV1 != null) {
+ for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) {
+ int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums();
+ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
+ startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
+ }
+ }
+ }
}
if (requestHeader.getQueueId() < 0) {
// read all queue
@@ -388,6 +399,17 @@ public class PopMessageProcessor implements NettyRequestProcessor {
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
}
+ if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
+ TopicConfig retryTopicConfigV1 =
+ this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
+ if (retryTopicConfigV1 != null) {
+ for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) {
+ int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums();
+ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
+ startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
+ }
+ }
+ }
}
final RemotingCommand finalResponse = response;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 3fb689ed6..8d25bc57e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -142,15 +142,6 @@ public class PopReviveService extends ServiceThread {
this.brokerController.getBrokerStatsManager().incBrokerPutNums(popCheckPoint.getTopic(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
- if (brokerController.getPopMessageProcessor() != null) {
- brokerController.getPopMessageProcessor().notifyMessageArriving(
- KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
- popCheckPoint.getCId(),
- -1
- );
- brokerController.getNotificationProcessor().notifyMessageArriving(
- KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
- }
return true;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 956ef43fb..4ec84c146 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.AbortProcessException;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -178,7 +179,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageExt msg, TopicConfig topicConfig, Map<String, String> properties) {
String newTopic = requestHeader.getTopic();
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ String groupName = KeyBuilder.parseGroup(newTopic);
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 0d248c4e1..c186352d1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -223,6 +223,8 @@ public class BrokerConfig extends BrokerIdentity {
private boolean enablePopBatchAck = false;
private boolean enableNotifyAfterPopOrderLockRelease = true;
private boolean initPopOffsetByCheckMsgInMem = true;
+ // read message from pop retry topic v1, for the compatibility, will be removed in the future version
+ private boolean retrieveMessageFromPopRetryTopicV1 = true;
private boolean realTimeNotifyConsumerChange = true;
@@ -1284,6 +1286,14 @@ public class BrokerConfig extends BrokerIdentity {
this.initPopOffsetByCheckMsgInMem = initPopOffsetByCheckMsgInMem;
}
+ public boolean isRetrieveMessageFromPopRetryTopicV1() {
+ return retrieveMessageFromPopRetryTopicV1;
+ }
+
+ public void setRetrieveMessageFromPopRetryTopicV1(boolean retrieveMessageFromPopRetryTopicV1) {
+ this.retrieveMessageFromPopRetryTopicV1 = retrieveMessageFromPopRetryTopicV1;
+ }
+
public boolean isRealTimeNotifyConsumerChange() {
return realTimeNotifyConsumerChange;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
index e1532d939..f2a8c4089 100644
--- a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
@@ -18,24 +18,53 @@ package org.apache.rocketmq.common;
public class KeyBuilder {
public static final int POP_ORDER_REVIVE_QUEUE = 999;
+ private static final String POP_RETRY_SEPARATOR_V1 = "_";
+ private static final String POP_RETRY_SEPARATOR_V2 = ":";
public static String buildPopRetryTopic(String topic, String cid) {
- return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + topic;
+ return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2 + topic;
+ }
+
+ public static String buildPopRetryTopicV1(String topic, String cid) {
+ return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1 + topic;
}
public static String parseNormalTopic(String topic, String cid) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_").length());
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2)) {
+ return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2).length());
+ }
+ return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1).length());
} else {
return topic;
}
}
+ public static String parseNormalTopic(String retryTopic) {
+ if (isPopRetryTopicV2(retryTopic)) {
+ String[] result = retryTopic.split(POP_RETRY_SEPARATOR_V2);
+ if (result.length == 2) {
+ return result[1];
+ }
+ }
+ return retryTopic;
+ }
+
+ public static String parseGroup(String retryTopic) {
+ if (isPopRetryTopicV2(retryTopic)) {
+ String[] result = retryTopic.split(POP_RETRY_SEPARATOR_V2);
+ if (result.length == 2) {
+ return result[0].substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ }
+ }
+ return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ }
+
public static String buildPollingKey(String topic, String cid, int queueId) {
return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId;
}
- public static String buildPollingNotificationKey(String topic, int queueId) {
- return topic + PopAckConstants.SPLIT + queueId;
+ public static boolean isPopRetryTopicV2(String retryTopic) {
+ return retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && retryTopic.contains(POP_RETRY_SEPARATOR_V2);
}
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
new file mode 100644
index 000000000..f83e0aa14
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class KeyBuilderTest {
+ String topic = "test-topic";
+ String group = "test-group";
+
+ @Test
+ public void buildPopRetryTopic() {
+ assertThat(KeyBuilder.buildPopRetryTopic(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + ":" + topic);
+ }
+
+ @Test
+ public void buildPopRetryTopicV1() {
+ assertThat(KeyBuilder.buildPopRetryTopicV1(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + "_" + topic);
+ }
+
+ @Test
+ public void parseNormalTopic() {
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+ assertThat(KeyBuilder.parseNormalTopic(popRetryTopic, group)).isEqualTo(topic);
+ String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
+ assertThat(KeyBuilder.parseNormalTopic(popRetryTopicV1, group)).isEqualTo(topic);
+ }
+
+ @Test
+ public void testParseNormalTopic() {
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+ assertThat(KeyBuilder.parseNormalTopic(popRetryTopic)).isEqualTo(topic);
+ }
+
+ @Test
+ public void parseGroup() {
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+ assertThat(KeyBuilder.parseGroup(popRetryTopic)).isEqualTo(group);
+ }
+
+ @Test
+ public void isPopRetryTopicV2() {
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+ assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopic)).isEqualTo(true);
+ String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
+ assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopicV1)).isEqualTo(false);
+ }
+}
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index 97125b854..c489cad68 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
@@ -212,7 +213,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
for (String topic : topicList.getTopicList()) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ String consumerGroup = KeyBuilder.parseGroup(topic);
try {
ConsumeStats consumeStats = null;
try {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
index 45dc3a036..b66dfad20 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -172,7 +173,7 @@ public class MonitorService {
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
for (String topic : topicList.getTopicList()) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ String consumerGroup = KeyBuilder.parseGroup(topic);
try {
this.reportUndoneMsgs(consumerGroup);
--
2.32.0.windows.2