diff --git a/patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch b/patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch new file mode 100644 index 0000000..d170ae0 --- /dev/null +++ b/patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch @@ -0,0 +1,370 @@ +From 27759f3556c279f63c13bc94fe3ad6ca55558114 Mon Sep 17 00:00:00 2001 +From: Allon Murienik +Date: Thu, 9 Nov 2023 06:33:34 +0200 +Subject: [PATCH 1/2] Fix unstable UtilAllTest#testCalculateFileSizeInPath on + Windows (#7419) + +This patch offers an alternative approach to +5d492c338258d07613103e6ae16df4c6fa5b3838. Instead of manually setting +up the directory #testCalculateFileSizeInPath needs and then +recursively deleting it, it uses JUnit's TemporaryFolder Rule to +handle all of this work, and allows the code to concentrate on the +business logic. + +Closes #7418 +--- + .../apache/rocketmq/common/UtilAllTest.java | 86 ++++++++----------- + 1 file changed, 36 insertions(+), 50 deletions(-) + +diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java +index a0653d7fc..94bb390eb 100644 +--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java ++++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java +@@ -26,7 +26,10 @@ import java.util.Arrays; + import java.util.Collections; + import java.util.List; + import java.util.Properties; ++ ++import org.junit.Rule; + import org.junit.Test; ++import org.junit.rules.TemporaryFolder; + + import static org.assertj.core.api.Assertions.assertThat; + import static org.assertj.core.api.Assertions.within; +@@ -34,6 +37,8 @@ import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertTrue; + + public class UtilAllTest { ++ @Rule ++ public TemporaryFolder tempDir = new TemporaryFolder(); + + @Test + public void testCurrentStackTrace() { +@@ -236,56 +241,37 @@ public class UtilAllTest { + * - file_1_2_0 + * - dir_2 + */ +- String basePath = System.getProperty("java.io.tmpdir") + File.separator + "testCalculateFileSizeInPath"; +- File baseFile = new File(basePath); +- try { +- // test empty path +- assertEquals(0, UtilAll.calculateFileSizeInPath(baseFile)); +- +- // create baseDir +- assertTrue(baseFile.mkdirs()); +- +- File file0 = new File(baseFile, "file_0"); +- assertTrue(file0.createNewFile()); +- writeFixedBytesToFile(file0, 1313); +- +- assertEquals(1313, UtilAll.calculateFileSizeInPath(baseFile)); +- +- // build a file tree like above +- File dir1 = new File(baseFile, "dir_1"); +- dir1.mkdirs(); +- File file10 = new File(dir1, "file_1_0"); +- File file11 = new File(dir1, "file_1_1"); +- File dir12 = new File(dir1, "dir_1_2"); +- dir12.mkdirs(); +- File file120 = new File(dir12, "file_1_2_0"); +- File dir2 = new File(baseFile, "dir_2"); +- dir2.mkdirs(); +- +- // write all file with 1313 bytes data +- assertTrue(file10.createNewFile()); +- writeFixedBytesToFile(file10, 1313); +- assertTrue(file11.createNewFile()); +- writeFixedBytesToFile(file11, 1313); +- assertTrue(file120.createNewFile()); +- writeFixedBytesToFile(file120, 1313); +- +- assertEquals(1313 * 4, UtilAll.calculateFileSizeInPath(baseFile)); +- } finally { +- deleteFolder(baseFile); +- } +- } +- +- public static void deleteFolder(File folder) { +- if (folder.isDirectory()) { +- File[] files = folder.listFiles(); +- if (files != null) { +- for (File file : files) { +- deleteFolder(file); +- } +- } +- } +- folder.delete(); ++ File baseFile = tempDir.getRoot(); ++ ++ // test empty path ++ assertEquals(0, UtilAll.calculateFileSizeInPath(baseFile)); ++ ++ File file0 = new File(baseFile, "file_0"); ++ assertTrue(file0.createNewFile()); ++ writeFixedBytesToFile(file0, 1313); ++ ++ assertEquals(1313, UtilAll.calculateFileSizeInPath(baseFile)); ++ ++ // build a file tree like above ++ File dir1 = new File(baseFile, "dir_1"); ++ dir1.mkdirs(); ++ File file10 = new File(dir1, "file_1_0"); ++ File file11 = new File(dir1, "file_1_1"); ++ File dir12 = new File(dir1, "dir_1_2"); ++ dir12.mkdirs(); ++ File file120 = new File(dir12, "file_1_2_0"); ++ File dir2 = new File(baseFile, "dir_2"); ++ dir2.mkdirs(); ++ ++ // write all file with 1313 bytes data ++ assertTrue(file10.createNewFile()); ++ writeFixedBytesToFile(file10, 1313); ++ assertTrue(file11.createNewFile()); ++ writeFixedBytesToFile(file11, 1313); ++ assertTrue(file120.createNewFile()); ++ writeFixedBytesToFile(file120, 1313); ++ ++ assertEquals(1313 * 4, UtilAll.calculateFileSizeInPath(baseFile)); + } + + private void writeFixedBytesToFile(File file, int size) throws Exception { +-- +2.32.0.windows.2 + + +From 2ed27214d84799c62a6f3180d2b01075412e4ef8 Mon Sep 17 00:00:00 2001 +From: Zhanhui Li +Date: Mon, 13 Nov 2023 09:44:25 +0800 +Subject: [PATCH 2/2] [ISSUE #7547] Let consumer be aware of message queue + assignment change (#7548) + +* let consumer be aware of message queue assignment change + +Signed-off-by: Li Zhanhui + +* add unit test for DefaultMQPushConsumer#setMessageQueueListener + +Signed-off-by: Li Zhanhui + +* fix: bazel build warnings + +Signed-off-by: Zhanhui Li + +* fix: set MixCommitlogTest test size as medium + +Signed-off-by: Zhanhui Li + +* allow cache bazel test results + +Signed-off-by: Li Zhanhui + +* fix code style issue by removing unused imports + +Signed-off-by: Li Zhanhui + +* fix #7552 + +Signed-off-by: Zhanhui Li + +--------- + +Signed-off-by: Li Zhanhui +Signed-off-by: Zhanhui Li +--- + .../client/consumer/DefaultMQPushConsumer.java | 13 +++++++++++++ + .../rocketmq/client/consumer/MQConsumer.java | 8 +++++--- + .../client/consumer/MessageQueueListener.java | 5 ++--- + .../consumer/DefaultMQPushConsumerImpl.java | 10 +++++++++- + .../client/impl/consumer/RebalancePushImpl.java | 8 +++++++- + .../service/message/LocalRemotingCommand.java | 1 + + .../balance/NormalMsgDynamicBalanceIT.java | 17 +++++++++++++++++ + 7 files changed, 54 insertions(+), 8 deletions(-) + +diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +index 1afb9113e..e593a17c9 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java ++++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +@@ -150,6 +150,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume + */ + private MessageListener messageListener; + ++ /** ++ * Listener to call if message queue assignment is changed. ++ */ ++ private MessageQueueListener messageQueueListener; ++ + /** + * Offset Storage + */ +@@ -987,4 +992,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume + public void setClientRebalance(boolean clientRebalance) { + this.clientRebalance = clientRebalance; + } ++ ++ public MessageQueueListener getMessageQueueListener() { ++ return messageQueueListener; ++ } ++ ++ public void setMessageQueueListener(MessageQueueListener messageQueueListener) { ++ this.messageQueueListener = messageQueueListener; ++ } + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java +index f4a8eda23..81e06ee41 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java ++++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java +@@ -29,20 +29,22 @@ import org.apache.rocketmq.remoting.exception.RemotingException; + */ + public interface MQConsumer extends MQAdmin { + /** +- * If consuming failure,message will be send back to the brokers,and delay consuming some time ++ * If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after ++ * interval specified in delay level. + */ + @Deprecated + void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException; + + /** +- * If consuming failure,message will be send back to the broker,and delay consuming some time ++ * If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after ++ * interval specified in delay level. + */ + void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + + /** +- * Fetch message queues from consumer cache according to the topic ++ * Fetch message queues from consumer cache pertaining to the given topic. + * + * @param topic message topic + * @return queue set +diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java +index 63795a6ee..74510f4c3 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java ++++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java +@@ -26,8 +26,7 @@ public interface MessageQueueListener { + /** + * @param topic message topic + * @param mqAll all queues in this message topic +- * @param mqDivided collection of queues,assigned to the current consumer ++ * @param mqAssigned collection of queues, assigned to the current consumer + */ +- void messageQueueChanged(final String topic, final Set mqAll, +- final Set mqDivided); ++ void messageQueueChanged(final String topic, final Set mqAll, final Set mqAssigned); + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +index e57579321..cfb89b5c8 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.consumer.AckCallback; + import org.apache.rocketmq.client.consumer.AckResult; + import org.apache.rocketmq.client.consumer.AckStatus; + import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; ++import org.apache.rocketmq.client.consumer.MessageQueueListener; + import org.apache.rocketmq.client.consumer.MessageSelector; + import org.apache.rocketmq.client.consumer.PopCallback; + import org.apache.rocketmq.client.consumer.PopResult; +@@ -132,7 +133,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { + private long queueMaxSpanFlowControlTimes = 0; + + //10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h +- private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200}; ++ private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200}; + + private static final int MAX_POP_INVISIBLE_TIME = 300000; + private static final int MIN_POP_INVISIBLE_TIME = 5000; +@@ -1553,4 +1554,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { + int[] getPopDelayLevel() { + return popDelayLevel; + } ++ ++ public MessageQueueListener getMessageQueueListener() { ++ if (null == defaultMQPushConsumer) { ++ return null; ++ } ++ return defaultMQPushConsumer.getMessageQueueListener(); ++ } + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +index df509f371..f9cf429c6 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +@@ -20,6 +20,7 @@ import java.util.List; + import java.util.Set; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; ++import org.apache.rocketmq.client.consumer.MessageQueueListener; + import org.apache.rocketmq.client.consumer.store.OffsetStore; + import org.apache.rocketmq.client.consumer.store.ReadOffsetType; + import org.apache.rocketmq.client.exception.MQClientException; +@@ -52,7 +53,7 @@ public class RebalancePushImpl extends RebalanceImpl { + + @Override + public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) { +- /** ++ /* + * When rebalance result changed, should update subscription's version to notify broker. + * Fix: inconsistency subscription may lead to consumer miss messages. + */ +@@ -82,6 +83,11 @@ public class RebalancePushImpl extends RebalanceImpl { + + // notify broker + this.getmQClientFactory().sendHeartbeatToAllBrokerWithLockV2(true); ++ ++ MessageQueueListener messageQueueListener = defaultMQPushConsumerImpl.getMessageQueueListener(); ++ if (null != messageQueueListener) { ++ messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); ++ } + } + + @Override +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java +index 915cafcd5..7bf4a1698 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java +@@ -32,6 +32,7 @@ public class LocalRemotingCommand extends RemotingCommand { + cmd.writeCustomHeader(customHeader); + cmd.setExtFields(new HashMap<>()); + setCmdVersion(cmd); ++ cmd.makeCustomHeaderToNet(); + return cmd; + } + +diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java +index b2c9b0658..684b718ae 100644 +--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java ++++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java +@@ -17,6 +17,8 @@ + + package org.apache.rocketmq.test.client.consumer.balance; + ++import java.util.concurrent.CountDownLatch; ++import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.test.base.BaseConf; +@@ -112,4 +114,19 @@ public class NormalMsgDynamicBalanceIT extends BaseConf { + consumer2.getListener().getAllUndupMsgBody()).size()); + assertThat(balance).isEqualTo(true); + } ++ ++ @Test ++ public void testMessageQueueListener() throws InterruptedException { ++ final CountDownLatch latch = new CountDownLatch(1); ++ ++ RMQNormalConsumer consumer1 = getConsumer(NAMESRV_ADDR, topic, "*", new RMQNormalListener()); ++ // Register message queue listener ++ consumer1.getConsumer().setMessageQueueListener((topic, mqAll, mqAssigned) -> latch.countDown()); ++ ++ // Without message queue listener ++ RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, consumer1.getConsumerGroup(), topic, ++ "*", new RMQNormalListener()); ++ ++ Assert.assertTrue(latch.await(30, TimeUnit.SECONDS)); ++ } + } +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 4ff218d..de1e58a 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.5 -Release: 34 +Release: 35 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -43,6 +43,7 @@ Patch0030: patch030-backport-remove-some-code.patch Patch0031: patch031-backport-Add-CRC-check-of-commitlog.patch Patch0032: patch032-backport-Clear-POP_CK-when-sending-messages.patch Patch0033: patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch +Patch0034: patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -83,6 +84,9 @@ exit 0 %changelog +* Mon Dec 11 2023 ShiZhili - 5.1.3-35 +- backport Let consumer be aware of message queue assignment change + * Fri Dec 8 2023 ShiZhili - 5.1.3-34 - backport Lock granularity issue causing LMQ message loss