!39 backport Let consumer be aware of message queue assignment change
From: @zhiliatox Reviewed-by: @hu-zongtang Signed-off-by: @hu-zongtang
This commit is contained in:
commit
c1684d0f54
@ -0,0 +1,370 @@
|
||||
From 27759f3556c279f63c13bc94fe3ad6ca55558114 Mon Sep 17 00:00:00 2001
|
||||
From: Allon Murienik <mureinik@gmail.com>
|
||||
Date: Thu, 9 Nov 2023 06:33:34 +0200
|
||||
Subject: [PATCH 1/2] Fix unstable UtilAllTest#testCalculateFileSizeInPath on
|
||||
Windows (#7419)
|
||||
|
||||
This patch offers an alternative approach to
|
||||
5d492c338258d07613103e6ae16df4c6fa5b3838. Instead of manually setting
|
||||
up the directory #testCalculateFileSizeInPath needs and then
|
||||
recursively deleting it, it uses JUnit's TemporaryFolder Rule to
|
||||
handle all of this work, and allows the code to concentrate on the
|
||||
business logic.
|
||||
|
||||
Closes #7418
|
||||
---
|
||||
.../apache/rocketmq/common/UtilAllTest.java | 86 ++++++++-----------
|
||||
1 file changed, 36 insertions(+), 50 deletions(-)
|
||||
|
||||
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
|
||||
index a0653d7fc..94bb390eb 100644
|
||||
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
|
||||
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
|
||||
@@ -26,7 +26,10 @@ import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
+
|
||||
+import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
+import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.within;
|
||||
@@ -34,6 +37,8 @@ import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class UtilAllTest {
|
||||
+ @Rule
|
||||
+ public TemporaryFolder tempDir = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void testCurrentStackTrace() {
|
||||
@@ -236,56 +241,37 @@ public class UtilAllTest {
|
||||
* - file_1_2_0
|
||||
* - dir_2
|
||||
*/
|
||||
- String basePath = System.getProperty("java.io.tmpdir") + File.separator + "testCalculateFileSizeInPath";
|
||||
- File baseFile = new File(basePath);
|
||||
- try {
|
||||
- // test empty path
|
||||
- assertEquals(0, UtilAll.calculateFileSizeInPath(baseFile));
|
||||
-
|
||||
- // create baseDir
|
||||
- assertTrue(baseFile.mkdirs());
|
||||
-
|
||||
- File file0 = new File(baseFile, "file_0");
|
||||
- assertTrue(file0.createNewFile());
|
||||
- writeFixedBytesToFile(file0, 1313);
|
||||
-
|
||||
- assertEquals(1313, UtilAll.calculateFileSizeInPath(baseFile));
|
||||
-
|
||||
- // build a file tree like above
|
||||
- File dir1 = new File(baseFile, "dir_1");
|
||||
- dir1.mkdirs();
|
||||
- File file10 = new File(dir1, "file_1_0");
|
||||
- File file11 = new File(dir1, "file_1_1");
|
||||
- File dir12 = new File(dir1, "dir_1_2");
|
||||
- dir12.mkdirs();
|
||||
- File file120 = new File(dir12, "file_1_2_0");
|
||||
- File dir2 = new File(baseFile, "dir_2");
|
||||
- dir2.mkdirs();
|
||||
-
|
||||
- // write all file with 1313 bytes data
|
||||
- assertTrue(file10.createNewFile());
|
||||
- writeFixedBytesToFile(file10, 1313);
|
||||
- assertTrue(file11.createNewFile());
|
||||
- writeFixedBytesToFile(file11, 1313);
|
||||
- assertTrue(file120.createNewFile());
|
||||
- writeFixedBytesToFile(file120, 1313);
|
||||
-
|
||||
- assertEquals(1313 * 4, UtilAll.calculateFileSizeInPath(baseFile));
|
||||
- } finally {
|
||||
- deleteFolder(baseFile);
|
||||
- }
|
||||
- }
|
||||
-
|
||||
- public static void deleteFolder(File folder) {
|
||||
- if (folder.isDirectory()) {
|
||||
- File[] files = folder.listFiles();
|
||||
- if (files != null) {
|
||||
- for (File file : files) {
|
||||
- deleteFolder(file);
|
||||
- }
|
||||
- }
|
||||
- }
|
||||
- folder.delete();
|
||||
+ File baseFile = tempDir.getRoot();
|
||||
+
|
||||
+ // test empty path
|
||||
+ assertEquals(0, UtilAll.calculateFileSizeInPath(baseFile));
|
||||
+
|
||||
+ File file0 = new File(baseFile, "file_0");
|
||||
+ assertTrue(file0.createNewFile());
|
||||
+ writeFixedBytesToFile(file0, 1313);
|
||||
+
|
||||
+ assertEquals(1313, UtilAll.calculateFileSizeInPath(baseFile));
|
||||
+
|
||||
+ // build a file tree like above
|
||||
+ File dir1 = new File(baseFile, "dir_1");
|
||||
+ dir1.mkdirs();
|
||||
+ File file10 = new File(dir1, "file_1_0");
|
||||
+ File file11 = new File(dir1, "file_1_1");
|
||||
+ File dir12 = new File(dir1, "dir_1_2");
|
||||
+ dir12.mkdirs();
|
||||
+ File file120 = new File(dir12, "file_1_2_0");
|
||||
+ File dir2 = new File(baseFile, "dir_2");
|
||||
+ dir2.mkdirs();
|
||||
+
|
||||
+ // write all file with 1313 bytes data
|
||||
+ assertTrue(file10.createNewFile());
|
||||
+ writeFixedBytesToFile(file10, 1313);
|
||||
+ assertTrue(file11.createNewFile());
|
||||
+ writeFixedBytesToFile(file11, 1313);
|
||||
+ assertTrue(file120.createNewFile());
|
||||
+ writeFixedBytesToFile(file120, 1313);
|
||||
+
|
||||
+ assertEquals(1313 * 4, UtilAll.calculateFileSizeInPath(baseFile));
|
||||
}
|
||||
|
||||
private void writeFixedBytesToFile(File file, int size) throws Exception {
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 2ed27214d84799c62a6f3180d2b01075412e4ef8 Mon Sep 17 00:00:00 2001
|
||||
From: Zhanhui Li <lizhanhui@apache.org>
|
||||
Date: Mon, 13 Nov 2023 09:44:25 +0800
|
||||
Subject: [PATCH 2/2] [ISSUE #7547] Let consumer be aware of message queue
|
||||
assignment change (#7548)
|
||||
|
||||
* let consumer be aware of message queue assignment change
|
||||
|
||||
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
|
||||
|
||||
* add unit test for DefaultMQPushConsumer#setMessageQueueListener
|
||||
|
||||
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
|
||||
|
||||
* fix: bazel build warnings
|
||||
|
||||
Signed-off-by: Zhanhui Li <lizhanhui@apache.org>
|
||||
|
||||
* fix: set MixCommitlogTest test size as medium
|
||||
|
||||
Signed-off-by: Zhanhui Li <lizhanhui@apache.org>
|
||||
|
||||
* allow cache bazel test results
|
||||
|
||||
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
|
||||
|
||||
* fix code style issue by removing unused imports
|
||||
|
||||
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
|
||||
|
||||
* fix #7552
|
||||
|
||||
Signed-off-by: Zhanhui Li <lizhanhui@apache.org>
|
||||
|
||||
---------
|
||||
|
||||
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
|
||||
Signed-off-by: Zhanhui Li <lizhanhui@apache.org>
|
||||
---
|
||||
.../client/consumer/DefaultMQPushConsumer.java | 13 +++++++++++++
|
||||
.../rocketmq/client/consumer/MQConsumer.java | 8 +++++---
|
||||
.../client/consumer/MessageQueueListener.java | 5 ++---
|
||||
.../consumer/DefaultMQPushConsumerImpl.java | 10 +++++++++-
|
||||
.../client/impl/consumer/RebalancePushImpl.java | 8 +++++++-
|
||||
.../service/message/LocalRemotingCommand.java | 1 +
|
||||
.../balance/NormalMsgDynamicBalanceIT.java | 17 +++++++++++++++++
|
||||
7 files changed, 54 insertions(+), 8 deletions(-)
|
||||
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
|
||||
index 1afb9113e..e593a17c9 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
|
||||
@@ -150,6 +150,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
|
||||
*/
|
||||
private MessageListener messageListener;
|
||||
|
||||
+ /**
|
||||
+ * Listener to call if message queue assignment is changed.
|
||||
+ */
|
||||
+ private MessageQueueListener messageQueueListener;
|
||||
+
|
||||
/**
|
||||
* Offset Storage
|
||||
*/
|
||||
@@ -987,4 +992,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
|
||||
public void setClientRebalance(boolean clientRebalance) {
|
||||
this.clientRebalance = clientRebalance;
|
||||
}
|
||||
+
|
||||
+ public MessageQueueListener getMessageQueueListener() {
|
||||
+ return messageQueueListener;
|
||||
+ }
|
||||
+
|
||||
+ public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
|
||||
+ this.messageQueueListener = messageQueueListener;
|
||||
+ }
|
||||
}
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
|
||||
index f4a8eda23..81e06ee41 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
|
||||
@@ -29,20 +29,22 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||
*/
|
||||
public interface MQConsumer extends MQAdmin {
|
||||
/**
|
||||
- * If consuming failure,message will be send back to the brokers,and delay consuming some time
|
||||
+ * If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after
|
||||
+ * interval specified in delay level.
|
||||
*/
|
||||
@Deprecated
|
||||
void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException,
|
||||
MQBrokerException, InterruptedException, MQClientException;
|
||||
|
||||
/**
|
||||
- * If consuming failure,message will be send back to the broker,and delay consuming some time
|
||||
+ * If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after
|
||||
+ * interval specified in delay level.
|
||||
*/
|
||||
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
|
||||
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
|
||||
|
||||
/**
|
||||
- * Fetch message queues from consumer cache according to the topic
|
||||
+ * Fetch message queues from consumer cache pertaining to the given topic.
|
||||
*
|
||||
* @param topic message topic
|
||||
* @return queue set
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
|
||||
index 63795a6ee..74510f4c3 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
|
||||
@@ -26,8 +26,7 @@ public interface MessageQueueListener {
|
||||
/**
|
||||
* @param topic message topic
|
||||
* @param mqAll all queues in this message topic
|
||||
- * @param mqDivided collection of queues,assigned to the current consumer
|
||||
+ * @param mqAssigned collection of queues, assigned to the current consumer
|
||||
*/
|
||||
- void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
|
||||
- final Set<MessageQueue> mqDivided);
|
||||
+ void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqAssigned);
|
||||
}
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
|
||||
index e57579321..cfb89b5c8 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
|
||||
@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.consumer.AckCallback;
|
||||
import org.apache.rocketmq.client.consumer.AckResult;
|
||||
import org.apache.rocketmq.client.consumer.AckStatus;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
|
||||
import org.apache.rocketmq.client.consumer.MessageSelector;
|
||||
import org.apache.rocketmq.client.consumer.PopCallback;
|
||||
import org.apache.rocketmq.client.consumer.PopResult;
|
||||
@@ -132,7 +133,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
|
||||
private long queueMaxSpanFlowControlTimes = 0;
|
||||
|
||||
//10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
|
||||
- private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
|
||||
+ private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
|
||||
|
||||
private static final int MAX_POP_INVISIBLE_TIME = 300000;
|
||||
private static final int MIN_POP_INVISIBLE_TIME = 5000;
|
||||
@@ -1553,4 +1554,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
|
||||
int[] getPopDelayLevel() {
|
||||
return popDelayLevel;
|
||||
}
|
||||
+
|
||||
+ public MessageQueueListener getMessageQueueListener() {
|
||||
+ if (null == defaultMQPushConsumer) {
|
||||
+ return null;
|
||||
+ }
|
||||
+ return defaultMQPushConsumer.getMessageQueueListener();
|
||||
+ }
|
||||
}
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
|
||||
index df509f371..f9cf429c6 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
|
||||
@@ -20,6 +20,7 @@ import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
|
||||
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
|
||||
import org.apache.rocketmq.client.consumer.store.OffsetStore;
|
||||
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
@@ -52,7 +53,7 @@ public class RebalancePushImpl extends RebalanceImpl {
|
||||
|
||||
@Override
|
||||
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
|
||||
- /**
|
||||
+ /*
|
||||
* When rebalance result changed, should update subscription's version to notify broker.
|
||||
* Fix: inconsistency subscription may lead to consumer miss messages.
|
||||
*/
|
||||
@@ -82,6 +83,11 @@ public class RebalancePushImpl extends RebalanceImpl {
|
||||
|
||||
// notify broker
|
||||
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLockV2(true);
|
||||
+
|
||||
+ MessageQueueListener messageQueueListener = defaultMQPushConsumerImpl.getMessageQueueListener();
|
||||
+ if (null != messageQueueListener) {
|
||||
+ messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
|
||||
+ }
|
||||
}
|
||||
|
||||
@Override
|
||||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
|
||||
index 915cafcd5..7bf4a1698 100644
|
||||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
|
||||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
|
||||
@@ -32,6 +32,7 @@ public class LocalRemotingCommand extends RemotingCommand {
|
||||
cmd.writeCustomHeader(customHeader);
|
||||
cmd.setExtFields(new HashMap<>());
|
||||
setCmdVersion(cmd);
|
||||
+ cmd.makeCustomHeaderToNet();
|
||||
return cmd;
|
||||
}
|
||||
|
||||
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
|
||||
index b2c9b0658..684b718ae 100644
|
||||
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
|
||||
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
|
||||
@@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.rocketmq.test.client.consumer.balance;
|
||||
|
||||
+import java.util.concurrent.CountDownLatch;
|
||||
+import java.util.concurrent.TimeUnit;
|
||||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||||
import org.apache.rocketmq.test.base.BaseConf;
|
||||
@@ -112,4 +114,19 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
|
||||
consumer2.getListener().getAllUndupMsgBody()).size());
|
||||
assertThat(balance).isEqualTo(true);
|
||||
}
|
||||
+
|
||||
+ @Test
|
||||
+ public void testMessageQueueListener() throws InterruptedException {
|
||||
+ final CountDownLatch latch = new CountDownLatch(1);
|
||||
+
|
||||
+ RMQNormalConsumer consumer1 = getConsumer(NAMESRV_ADDR, topic, "*", new RMQNormalListener());
|
||||
+ // Register message queue listener
|
||||
+ consumer1.getConsumer().setMessageQueueListener((topic, mqAll, mqAssigned) -> latch.countDown());
|
||||
+
|
||||
+ // Without message queue listener
|
||||
+ RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, consumer1.getConsumerGroup(), topic,
|
||||
+ "*", new RMQNormalListener());
|
||||
+
|
||||
+ Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
|
||||
+ }
|
||||
}
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
Summary: Cloud-Native, Distributed Messaging and Streaming
|
||||
Name: rocketmq
|
||||
Version: 5.1.5
|
||||
Release: 34
|
||||
Release: 35
|
||||
License: Apache-2.0
|
||||
Group: Applications/Message
|
||||
URL: https://rocketmq.apache.org/
|
||||
@ -43,6 +43,7 @@ Patch0030: patch030-backport-remove-some-code.patch
|
||||
Patch0031: patch031-backport-Add-CRC-check-of-commitlog.patch
|
||||
Patch0032: patch032-backport-Clear-POP_CK-when-sending-messages.patch
|
||||
Patch0033: patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch
|
||||
Patch0034: patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch
|
||||
BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git
|
||||
Requires: java-1.8.0-openjdk-devel
|
||||
|
||||
@ -83,6 +84,9 @@ exit 0
|
||||
|
||||
|
||||
%changelog
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-35
|
||||
- backport Let consumer be aware of message queue assignment change
|
||||
|
||||
* Fri Dec 8 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-34
|
||||
- backport Lock granularity issue causing LMQ message loss
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user