rocketmq/patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch

371 lines
16 KiB
Diff

From 27759f3556c279f63c13bc94fe3ad6ca55558114 Mon Sep 17 00:00:00 2001
From: Allon Murienik <mureinik@gmail.com>
Date: Thu, 9 Nov 2023 06:33:34 +0200
Subject: [PATCH 1/2] Fix unstable UtilAllTest#testCalculateFileSizeInPath on
Windows (#7419)
This patch offers an alternative approach to
5d492c338258d07613103e6ae16df4c6fa5b3838. Instead of manually setting
up the directory #testCalculateFileSizeInPath needs and then
recursively deleting it, it uses JUnit's TemporaryFolder Rule to
handle all of this work, and allows the code to concentrate on the
business logic.
Closes #7418
---
.../apache/rocketmq/common/UtilAllTest.java | 86 ++++++++-----------
1 file changed, 36 insertions(+), 50 deletions(-)
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
index a0653d7fc..94bb390eb 100644
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -26,7 +26,10 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
@@ -34,6 +37,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class UtilAllTest {
+ @Rule
+ public TemporaryFolder tempDir = new TemporaryFolder();
@Test
public void testCurrentStackTrace() {
@@ -236,56 +241,37 @@ public class UtilAllTest {
* - file_1_2_0
* - dir_2
*/
- String basePath = System.getProperty("java.io.tmpdir") + File.separator + "testCalculateFileSizeInPath";
- File baseFile = new File(basePath);
- try {
- // test empty path
- assertEquals(0, UtilAll.calculateFileSizeInPath(baseFile));
-
- // create baseDir
- assertTrue(baseFile.mkdirs());
-
- File file0 = new File(baseFile, "file_0");
- assertTrue(file0.createNewFile());
- writeFixedBytesToFile(file0, 1313);
-
- assertEquals(1313, UtilAll.calculateFileSizeInPath(baseFile));
-
- // build a file tree like above
- File dir1 = new File(baseFile, "dir_1");
- dir1.mkdirs();
- File file10 = new File(dir1, "file_1_0");
- File file11 = new File(dir1, "file_1_1");
- File dir12 = new File(dir1, "dir_1_2");
- dir12.mkdirs();
- File file120 = new File(dir12, "file_1_2_0");
- File dir2 = new File(baseFile, "dir_2");
- dir2.mkdirs();
-
- // write all file with 1313 bytes data
- assertTrue(file10.createNewFile());
- writeFixedBytesToFile(file10, 1313);
- assertTrue(file11.createNewFile());
- writeFixedBytesToFile(file11, 1313);
- assertTrue(file120.createNewFile());
- writeFixedBytesToFile(file120, 1313);
-
- assertEquals(1313 * 4, UtilAll.calculateFileSizeInPath(baseFile));
- } finally {
- deleteFolder(baseFile);
- }
- }
-
- public static void deleteFolder(File folder) {
- if (folder.isDirectory()) {
- File[] files = folder.listFiles();
- if (files != null) {
- for (File file : files) {
- deleteFolder(file);
- }
- }
- }
- folder.delete();
+ File baseFile = tempDir.getRoot();
+
+ // test empty path
+ assertEquals(0, UtilAll.calculateFileSizeInPath(baseFile));
+
+ File file0 = new File(baseFile, "file_0");
+ assertTrue(file0.createNewFile());
+ writeFixedBytesToFile(file0, 1313);
+
+ assertEquals(1313, UtilAll.calculateFileSizeInPath(baseFile));
+
+ // build a file tree like above
+ File dir1 = new File(baseFile, "dir_1");
+ dir1.mkdirs();
+ File file10 = new File(dir1, "file_1_0");
+ File file11 = new File(dir1, "file_1_1");
+ File dir12 = new File(dir1, "dir_1_2");
+ dir12.mkdirs();
+ File file120 = new File(dir12, "file_1_2_0");
+ File dir2 = new File(baseFile, "dir_2");
+ dir2.mkdirs();
+
+ // write all file with 1313 bytes data
+ assertTrue(file10.createNewFile());
+ writeFixedBytesToFile(file10, 1313);
+ assertTrue(file11.createNewFile());
+ writeFixedBytesToFile(file11, 1313);
+ assertTrue(file120.createNewFile());
+ writeFixedBytesToFile(file120, 1313);
+
+ assertEquals(1313 * 4, UtilAll.calculateFileSizeInPath(baseFile));
}
private void writeFixedBytesToFile(File file, int size) throws Exception {
--
2.32.0.windows.2
From 2ed27214d84799c62a6f3180d2b01075412e4ef8 Mon Sep 17 00:00:00 2001
From: Zhanhui Li <lizhanhui@apache.org>
Date: Mon, 13 Nov 2023 09:44:25 +0800
Subject: [PATCH 2/2] [ISSUE #7547] Let consumer be aware of message queue
assignment change (#7548)
* let consumer be aware of message queue assignment change
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* add unit test for DefaultMQPushConsumer#setMessageQueueListener
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* fix: bazel build warnings
Signed-off-by: Zhanhui Li <lizhanhui@apache.org>
* fix: set MixCommitlogTest test size as medium
Signed-off-by: Zhanhui Li <lizhanhui@apache.org>
* allow cache bazel test results
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* fix code style issue by removing unused imports
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* fix #7552
Signed-off-by: Zhanhui Li <lizhanhui@apache.org>
---------
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
Signed-off-by: Zhanhui Li <lizhanhui@apache.org>
---
.../client/consumer/DefaultMQPushConsumer.java | 13 +++++++++++++
.../rocketmq/client/consumer/MQConsumer.java | 8 +++++---
.../client/consumer/MessageQueueListener.java | 5 ++---
.../consumer/DefaultMQPushConsumerImpl.java | 10 +++++++++-
.../client/impl/consumer/RebalancePushImpl.java | 8 +++++++-
.../service/message/LocalRemotingCommand.java | 1 +
.../balance/NormalMsgDynamicBalanceIT.java | 17 +++++++++++++++++
7 files changed, 54 insertions(+), 8 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 1afb9113e..e593a17c9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -150,6 +150,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private MessageListener messageListener;
+ /**
+ * Listener to call if message queue assignment is changed.
+ */
+ private MessageQueueListener messageQueueListener;
+
/**
* Offset Storage
*/
@@ -987,4 +992,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public void setClientRebalance(boolean clientRebalance) {
this.clientRebalance = clientRebalance;
}
+
+ public MessageQueueListener getMessageQueueListener() {
+ return messageQueueListener;
+ }
+
+ public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
+ this.messageQueueListener = messageQueueListener;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
index f4a8eda23..81e06ee41 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
@@ -29,20 +29,22 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
*/
public interface MQConsumer extends MQAdmin {
/**
- * If consuming failure,message will be send back to the brokers,and delay consuming some time
+ * If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after
+ * interval specified in delay level.
*/
@Deprecated
void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
/**
- * If consuming failure,message will be send back to the broker,and delay consuming some time
+ * If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after
+ * interval specified in delay level.
*/
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
/**
- * Fetch message queues from consumer cache according to the topic
+ * Fetch message queues from consumer cache pertaining to the given topic.
*
* @param topic message topic
* @return queue set
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
index 63795a6ee..74510f4c3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
@@ -26,8 +26,7 @@ public interface MessageQueueListener {
/**
* @param topic message topic
* @param mqAll all queues in this message topic
- * @param mqDivided collection of queues,assigned to the current consumer
+ * @param mqAssigned collection of queues, assigned to the current consumer
*/
- void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
- final Set<MessageQueue> mqDivided);
+ void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqAssigned);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index e57579321..cfb89b5c8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.consumer.AckCallback;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PopCallback;
import org.apache.rocketmq.client.consumer.PopResult;
@@ -132,7 +133,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private long queueMaxSpanFlowControlTimes = 0;
//10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
+ private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
private static final int MAX_POP_INVISIBLE_TIME = 300000;
private static final int MIN_POP_INVISIBLE_TIME = 5000;
@@ -1553,4 +1554,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
int[] getPopDelayLevel() {
return popDelayLevel;
}
+
+ public MessageQueueListener getMessageQueueListener() {
+ if (null == defaultMQPushConsumer) {
+ return null;
+ }
+ return defaultMQPushConsumer.getMessageQueueListener();
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index df509f371..f9cf429c6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -52,7 +53,7 @@ public class RebalancePushImpl extends RebalanceImpl {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
- /**
+ /*
* When rebalance result changed, should update subscription's version to notify broker.
* Fix: inconsistency subscription may lead to consumer miss messages.
*/
@@ -82,6 +83,11 @@ public class RebalancePushImpl extends RebalanceImpl {
// notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLockV2(true);
+
+ MessageQueueListener messageQueueListener = defaultMQPushConsumerImpl.getMessageQueueListener();
+ if (null != messageQueueListener) {
+ messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
+ }
}
@Override
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
index 915cafcd5..7bf4a1698 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
@@ -32,6 +32,7 @@ public class LocalRemotingCommand extends RemotingCommand {
cmd.writeCustomHeader(customHeader);
cmd.setExtFields(new HashMap<>());
setCmdVersion(cmd);
+ cmd.makeCustomHeaderToNet();
return cmd;
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
index b2c9b0658..684b718ae 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.test.client.consumer.balance;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.test.base.BaseConf;
@@ -112,4 +114,19 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
consumer2.getListener().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true);
}
+
+ @Test
+ public void testMessageQueueListener() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ RMQNormalConsumer consumer1 = getConsumer(NAMESRV_ADDR, topic, "*", new RMQNormalListener());
+ // Register message queue listener
+ consumer1.getConsumer().setMessageQueueListener((topic, mqAll, mqAssigned) -> latch.countDown());
+
+ // Without message queue listener
+ RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, consumer1.getConsumerGroup(), topic,
+ "*", new RMQNormalListener());
+
+ Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
+ }
}
--
2.32.0.windows.2