Compare commits

...

20 Commits

Author SHA1 Message Date
openeuler-ci-bot
915cd31a13
!49 add readme
From: @zhiliatox 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-12 01:59:54 +00:00
openeuler-ci-bot
5c945d3503
!48 backport fix some buges
From: @zhiliatox 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-12 01:53:54 +00:00
openeuler-ci-bot
0b032b76c8
!47 backport message filter
From: @zhiliatox 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-12 01:34:42 +00:00
openeuler-ci-bot
e0aa47ed26
!46 backport add improve performance
From: @zhiliatox 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-12 01:33:20 +00:00
openeuler-ci-bot
68f74a867a
!45 backport some test cases
From: @zhiliatox 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-12 01:32:57 +00:00
openeuler-ci-bot
2bce6cb470
!44 backport add some validations
From: @zhiliatox 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-12 01:32:13 +00:00
openeuler-ci-bot
60e7ad1bed
!43 slaveActingMaster Timer Message retry without escape logic
From: @zhiliatox 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-12 01:29:35 +00:00
openeuler-ci-bot
ffa9e6a51d
!42 backport Retry topic v2 in pop
From: @zhiliatox 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-12 01:28:07 +00:00
openeuler-ci-bot
4638651c5a
!41 backport rip 65
From: @zhiliatox 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-12 01:26:38 +00:00
openeuler-ci-bot
dba808d5c6
!40 backport some bugfixes
From: @zhiliatox 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-12 01:26:00 +00:00
shizhili
8df0259da2 add readme 2023-12-11 17:24:25 +08:00
shizhili
df52d543eb backport fix some bugs 2023-12-11 16:13:04 +08:00
shizhili
b3b0264e00 backport support message filter 2023-12-11 16:08:37 +08:00
shizhili
a4ade66792 backport add improve performance 2023-12-11 16:01:12 +08:00
shizhili
0881216d3c backport add some test cases 2023-12-11 15:57:18 +08:00
shizhili
4c1bdb337e backport add some validations 2023-12-11 15:54:33 +08:00
shizhili
15f14248c4 backport slaveActingMaster Timer Message retry without escape logic 2023-12-11 15:45:46 +08:00
shizhili
cd83ae768b backport Retry topic v2 in pop 2023-12-11 15:30:10 +08:00
shizhili
e9afdd81f6 backport rip 65 2023-12-11 15:26:18 +08:00
shizhili
98aba19773 backport fix some bugs 2023-12-11 15:20:10 +08:00
11 changed files with 6604 additions and 9 deletions

View File

@ -1,7 +1,7 @@
# rocketmq
#### 介绍
A Distributed Messaging Platform
RocketMQ是一个分布式的消息和流处理平台具有低延迟、高性能、高可靠性、百亿级容量和灵活扩缩容能力。
#### 软件架构
软件架构说明
@ -9,15 +9,54 @@ A Distributed Messaging Platform
#### 安装教程
1. xxxx
2. xxxx
3. xxxx
1. 切换yum源
```shell
cd /etc/yum.repos.d/ && curl -OL https://eur.openeuler.openatom.cn/coprs/zhiliatox/example/repo/openeuler-22.03_LTS_SP2/zhiliatox-example-openeuler-22.03_LTS_SP2.repo
```
2. 安装rocketmq
```shell
dnf install rocketmq
```
3. 使用RocketMQ
```shell
cd /opt/rocketmq
```
#### 使用说明
1. 启动Namesrv
1. xxxx
2. xxxx
3. xxxx
对于Mac或者Linux用户
```shell
### 启动Namesrv
$ nohup sh mqnamesrv &
### 检查日志确定Namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
```
对于Windows用户
```shell
$ nohup mqnamesrv.cmd &
The Name Server boot success...
```
2. 启动Broker
对于Mac或者Linux用户
```shell
### start Broker
$ nohup sh bin/mqbroker -n localhost:9876 &
### check whether Broker is successfully started, eg: Broker's IP is 192.168.1.2, Broker's name is broker-a
$ tail -f ~/logs/rocketmqlogs/broker.log
The broker[broker-a, 192.169.1.2:10911] boot success...
```
对于Windows用户
```shell
$ mqbroker.cmd -n localhost:9876
The broker[broker-a, 192.169.1.2:10911] boot success...
```
#### 参与贡献

View File

@ -0,0 +1,394 @@
From 1be5ebc7363e4bc6503c80688160a354f5a12f78 Mon Sep 17 00:00:00 2001
From: Zhanhui Li <lizhanhui@apache.org>
Date: Mon, 13 Nov 2023 09:45:37 +0800
Subject: [PATCH 1/5] [ISSUE #7551] Reuse helper methods from Netty to free
direct byte buffer (#7550)
* Reuse helper methods from Netty to free direct byte buffer, making codebase JDK 9+ compatible
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* Guard against null
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* fix #7552
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
---------
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
---
.../org/apache/rocketmq/common/UtilAll.java | 60 +------------------
.../apache/rocketmq/common/UtilAllTest.java | 10 ----
2 files changed, 3 insertions(+), 67 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index 95b6b09b4..2808f106a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -16,21 +16,18 @@
*/
package org.apache.rocketmq.common;
+import io.netty.util.internal.PlatformDependent;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
import java.nio.file.Files;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
import java.text.NumberFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -46,15 +43,11 @@ import java.util.function.Supplier;
import java.util.zip.CRC32;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
-import org.apache.commons.lang3.JavaVersion;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
public class UtilAll {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@@ -707,57 +700,10 @@ public class UtilAll {
}
public static void cleanBuffer(final ByteBuffer buffer) {
- if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0) {
+ if (null == buffer) {
return;
}
- if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
- try {
- Field field = Unsafe.class.getDeclaredField("theUnsafe");
- field.setAccessible(true);
- Unsafe unsafe = (Unsafe) field.get(null);
- Method cleaner = method(unsafe, "invokeCleaner", new Class[] {ByteBuffer.class});
- cleaner.invoke(unsafe, viewed(buffer));
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- } else {
- invoke(invoke(viewed(buffer), "cleaner"), "clean");
- }
- }
-
- public static Object invoke(final Object target, final String methodName, final Class<?>... args) {
- return AccessController.doPrivileged(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
- Method method = method(target, methodName, args);
- method.setAccessible(true);
- return method.invoke(target);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- });
- }
-
- public static Method method(Object target, String methodName, Class<?>[] args) throws NoSuchMethodException {
- try {
- return target.getClass().getMethod(methodName, args);
- } catch (NoSuchMethodException e) {
- return target.getClass().getDeclaredMethod(methodName, args);
- }
- }
-
- private static ByteBuffer viewed(ByteBuffer buffer) {
- if (!buffer.isDirect()) {
- throw new IllegalArgumentException("buffer is non-direct");
- }
- ByteBuffer viewedBuffer = (ByteBuffer) ((DirectBuffer) buffer).attachment();
- if (viewedBuffer == null) {
- return buffer;
- } else {
- return viewed(viewedBuffer);
- }
+ PlatformDependent.freeDirectBuffer(buffer);
}
public static void ensureDirOK(final String dirName) {
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
index 94bb390eb..cb288578c 100644
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -219,16 +219,6 @@ public class UtilAllTest {
UtilAll.cleanBuffer(ByteBuffer.allocate(0));
}
- @Test(expected = NoSuchMethodException.class)
- public void testMethod() throws NoSuchMethodException {
- UtilAll.method(new Object(), "noMethod", null);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testInvoke() throws Exception {
- UtilAll.invoke(new Object(), "noMethod");
- }
-
@Test
public void testCalculateFileSizeInPath() throws Exception {
/**
--
2.32.0.windows.2
From 4791d9a1f1a7c39e005da15f228473c04eafd007 Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Tue, 14 Nov 2023 17:59:47 +0800
Subject: [PATCH 2/5] [ISSUE #5923] Revert "Fix tiered store README.md error
about Configuration (#7436)" (#7557)
This reverts commit 70dc93abbcb9bf161378d66fcaca55bedc78b905.
---
.../tieredstore/common/TieredMessageStoreConfig.java | 10 +++++-----
.../tieredstore/provider/posix/PosixFileSegment.java | 4 ++--
.../rocketmq/tieredstore/file/TieredCommitLogTest.java | 2 +-
.../provider/posix/PosixFileSegmentTest.java | 2 +-
4 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
index a112ea6b1..595db6b86 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
@@ -115,7 +115,7 @@ public class TieredMessageStoreConfig {
private long readAheadCacheExpireDuration = 10 * 1000;
private double readAheadCacheSizeThresholdRate = 0.3;
- private String tieredStoreFilepath = "";
+ private String tieredStoreFilePath = "";
private String objectStoreEndpoint = "";
@@ -350,12 +350,12 @@ public class TieredMessageStoreConfig {
this.readAheadCacheSizeThresholdRate = rate;
}
- public String getTieredStoreFilepath() {
- return tieredStoreFilepath;
+ public String getTieredStoreFilePath() {
+ return tieredStoreFilePath;
}
- public void setTieredStoreFilepath(String tieredStoreFilepath) {
- this.tieredStoreFilepath = tieredStoreFilepath;
+ public void setTieredStoreFilePath(String tieredStoreFilePath) {
+ this.tieredStoreFilePath = tieredStoreFilePath;
}
public void setObjectStoreEndpoint(String objectStoreEndpoint) {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 708ce33f9..7e949cb28 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -66,8 +66,8 @@ public class PosixFileSegment extends TieredFileSegment {
super(storeConfig, fileType, filePath, baseOffset);
// basePath
- String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilepath(),
- StringUtils.appendIfMissing(storeConfig.getTieredStoreFilepath(), File.separator));
+ String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilePath(),
+ StringUtils.appendIfMissing(storeConfig.getTieredStoreFilePath(), File.separator));
// fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset
String brokerClusterName = storeConfig.getBrokerClusterName();
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
index 80cdba977..6693d3cb7 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
@@ -49,7 +49,7 @@ public class TieredCommitLogTest {
TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
storeConfig.setBrokerName("brokerName");
storeConfig.setStorePathRootDir(storePath);
- storeConfig.setTieredStoreFilepath(storePath + File.separator);
+ storeConfig.setTieredStoreFilePath(storePath + File.separator);
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
storeConfig.setCommitLogRollingInterval(0);
storeConfig.setTieredStoreCommitLogMaxSize(1000);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
index ede62b8ce..db33ae847 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
@@ -42,7 +42,7 @@ public class PosixFileSegmentTest {
@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
- storeConfig.setTieredStoreFilepath(storePath);
+ storeConfig.setTieredStoreFilePath(storePath);
mq = new MessageQueue("OSSFileSegmentTest", "broker", 0);
TieredStoreExecutor.init();
}
--
2.32.0.windows.2
From 651a5ca992988b90c7e4884e9975db0938557def Mon Sep 17 00:00:00 2001
From: Jixiang Jin <lollipop@apache.org>
Date: Thu, 16 Nov 2023 10:16:16 +0800
Subject: [PATCH 3/5] [ISSUE #7562] BugFix for estimating message accumulation
correctly (#7563)
---
.../broker/metrics/ConsumerLagCalculator.java | 11 +++++---
.../proxy/common/utils/FilterUtilTest.java | 25 +++++++++++++++++++
.../remoting/protocol/filter/FilterAPI.java | 8 ++++++
3 files changed, 40 insertions(+), 4 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index 7a5f1f765..af08a83c7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.subscription.SimpleSubscriptionData;
@@ -435,10 +436,12 @@ public class ConsumerLagCalculator {
if (subscriptionGroupConfig != null) {
for (SimpleSubscriptionData simpleSubscriptionData : subscriptionGroupConfig.getSubscriptionDataSet()) {
if (topic.equals(simpleSubscriptionData.getTopic())) {
- subscriptionData = new SubscriptionData();
- subscriptionData.setTopic(simpleSubscriptionData.getTopic());
- subscriptionData.setExpressionType(simpleSubscriptionData.getExpressionType());
- subscriptionData.setSubString(simpleSubscriptionData.getExpression());
+ try {
+ subscriptionData = FilterAPI.buildSubscriptionData(simpleSubscriptionData.getTopic(),
+ simpleSubscriptionData.getExpression(), simpleSubscriptionData.getExpressionType());
+ } catch (Exception e) {
+ LOGGER.error("Try to build subscription for group:{}, topic:{} exception.", group, topic, e);
+ }
break;
}
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
index 23389e9d3..7c9d84015 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
@@ -48,4 +48,29 @@ public class FilterUtilTest {
assertThat(FilterUtils.isTagMatched(subscriptionData.getTagsSet(), null)).isFalse();
}
+ @Test
+ public void testBuildSubscriptionData() throws Exception {
+ // Test case 1: expressionType is null, will use TAG as default.
+ String topic = "topic";
+ String subString = "substring";
+ String expressionType = null;
+ SubscriptionData result = FilterAPI.buildSubscriptionData(topic, subString, expressionType);
+ assertThat(result).isNotNull();
+ assertThat(topic).isEqualTo(result.getTopic());
+ assertThat(subString).isEqualTo(result.getSubString());
+ assertThat(result.getExpressionType()).isEqualTo("TAG");
+ assertThat(result.getCodeSet().size()).isEqualTo(1);
+
+ // Test case 2: expressionType is not null
+ topic = "topic";
+ subString = "substring1||substring2";
+ expressionType = "SQL92";
+ result = FilterAPI.buildSubscriptionData(topic, subString, expressionType);
+ assertThat(result).isNotNull();
+ assertThat(topic).isEqualTo(result.getTopic());
+ assertThat(subString).isEqualTo(result.getSubString());
+ assertThat(result.getExpressionType()).isEqualTo(expressionType);
+ assertThat(result.getCodeSet().size()).isEqualTo(2);
+ }
+
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
index 10a6bb463..f291bfccf 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
@@ -46,6 +46,14 @@ public class FilterAPI {
return subscriptionData;
}
+ public static SubscriptionData buildSubscriptionData(String topic, String subString, String expressionType) throws Exception {
+ final SubscriptionData subscriptionData = buildSubscriptionData(topic, subString);
+ if (StringUtils.isNotBlank(expressionType)) {
+ subscriptionData.setExpressionType(expressionType);
+ }
+ return subscriptionData;
+ }
+
public static SubscriptionData build(final String topic, final String subString,
final String type) throws Exception {
if (ExpressionType.TAG.equals(type) || type == null) {
--
2.32.0.windows.2
From 01a2aef96bdfb17c5f82415141ef421efb4e3bc7 Mon Sep 17 00:00:00 2001
From: cnScarb <jjhfen00@163.com>
Date: Fri, 17 Nov 2023 15:58:14 +0800
Subject: [PATCH 4/5] [ISSUE #7570] Add default value for lastPopTimestamp
(#7571)
---
.../apache/rocketmq/client/impl/consumer/PopProcessQueue.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
index 3b39b86cc..50827545b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
@@ -26,7 +26,7 @@ public class PopProcessQueue {
private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
- private long lastPopTimestamp;
+ private long lastPopTimestamp = System.currentTimeMillis();
private AtomicInteger waitAckCounter = new AtomicInteger(0);
private volatile boolean dropped = false;
--
2.32.0.windows.2
From 8e7e2b5f50e0db14b77462ef1574d4020c0fd986 Mon Sep 17 00:00:00 2001
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
Date: Mon, 20 Nov 2023 19:32:57 +0800
Subject: [PATCH 5/5] [ISSUE #7574] Fix RunningFlags conflict
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
---
store/src/main/java/org/apache/rocketmq/store/RunningFlags.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
index 91fcb155a..88b398a77 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
@@ -30,7 +30,7 @@ public class RunningFlags {
private static final int FENCED_BIT = 1 << 5;
- private static final int LOGIC_DISK_FULL_BIT = 1 << 5;
+ private static final int LOGIC_DISK_FULL_BIT = 1 << 6;
private volatile int flagBits = 0;
--
2.32.0.windows.2

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,845 @@
From ca721b0145994d7f5e67b4d2fe3b7a4ad7a1c132 Mon Sep 17 00:00:00 2001
From: zhanghong <985492783@qq.com>
Date: Tue, 21 Nov 2023 14:03:24 +0800
Subject: [PATCH 1/3] [ISSUE #7462] Remove deprecated LocalTransactionExecuter
(#7463)
---
.../impl/producer/DefaultMQProducerImpl.java | 9 +++----
.../client/producer/DefaultMQProducer.java | 16 -----------
.../producer/LocalTransactionExecuter.java | 27 -------------------
.../rocketmq/client/producer/MQProducer.java | 3 ---
.../producer/TransactionMQProducer.java | 16 -----------
.../client.producer.DefaultMQProducer.schema | 1 -
6 files changed, 4 insertions(+), 68 deletions(-)
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index b0c212e46..545f17d93 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -54,7 +54,6 @@ import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.client.latency.Resolver;
import org.apache.rocketmq.client.latency.ServiceDetector;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.RequestCallback;
@@ -1379,10 +1378,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public TransactionSendResult sendMessageInTransaction(final Message msg,
- final LocalTransactionExecuter localTransactionExecuter, final Object arg)
+ final TransactionListener localTransactionListener, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
- if (null == localTransactionExecuter && null == transactionListener) {
+ if (null == localTransactionListener && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
@@ -1414,8 +1413,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
- if (null != localTransactionExecuter) {
- localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
+ if (null != localTransactionListener) {
+ localTransactionState = localTransactionListener.executeLocalTransaction(msg, arg);
} else {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index c5b1b5223..7bd3876f5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -853,22 +853,6 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
}
- /**
- * This method is to send transactional messages.
- *
- * @param msg Transactional message to send.
- * @param tranExecuter local transaction executor.
- * @param arg Argument used along with local transaction executor.
- * @return Transaction result.
- * @throws MQClientException if there is any client error.
- */
- @Override
- public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter,
- final Object arg)
- throws MQClientException {
- throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
- }
-
/**
* This method is used to send transactional messages.
*
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
deleted file mode 100644
index 267ba10bd..000000000
--- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.producer;
-
-import org.apache.rocketmq.common.message.Message;
-
-/**
- * @deprecated This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
- */
-@Deprecated
-public interface LocalTransactionExecuter {
- LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index 78657e623..8bd30e98d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -81,9 +81,6 @@ public interface MQProducer extends MQAdmin {
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, InterruptedException;
- TransactionSendResult sendMessageInTransaction(final Message msg,
- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
-
TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException;
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
index baa8b4408..d529f3e77 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
@@ -67,22 +67,6 @@ public class TransactionMQProducer extends DefaultMQProducer {
this.defaultMQProducerImpl.destroyTransactionEnv();
}
- /**
- * This method will be removed in the version 5.0.0, method <code>sendMessageInTransaction(Message,Object)</code>}
- * is recommended.
- */
- @Override
- @Deprecated
- public TransactionSendResult sendMessageInTransaction(final Message msg,
- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
- if (null == this.transactionCheckListener) {
- throw new MQClientException("localTransactionBranchCheckListener is null", null);
- }
-
- msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
- return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
- }
-
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
diff --git a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema
index 0418c73fe..d1111fb45 100644
--- a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema
+++ b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema
@@ -122,7 +122,6 @@ Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq
Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (void)
Method send(org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.SendResult)
Method send(org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (org.apache.rocketmq.client.producer.SendResult)
-Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.client.producer.LocalTransactionExecuter,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult)
Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult)
Method sendOneway(java.lang.Object,org.apache.rocketmq.client.producer.MessageQueueSelector,org.apache.rocketmq.common.message.Message) : public throws (void)
Method sendOneway(org.apache.rocketmq.common.message.Message) : public throws (void)
--
2.32.0.windows.2
From a7d493b2fbc153cc6cbdf2b2ffcbf19cf7cba803 Mon Sep 17 00:00:00 2001
From: panzhi <panzhi33@qq.com>
Date: Tue, 21 Nov 2023 20:55:35 +0800
Subject: [PATCH 2/3] transactionProducer get the topic route before sending
the message (#7569)
---
.../impl/producer/DefaultMQProducerImpl.java | 15 +++++
.../client/producer/DefaultMQProducer.java | 63 +++++++++++++++++++
.../producer/TransactionMQProducer.java | 23 +++++--
.../transaction/TransactionProducer.java | 3 +-
4 files changed, 98 insertions(+), 6 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 545f17d93..088bff089 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -262,6 +262,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
mQClientFactory.start();
}
+ this.initTopicRoute();
+
this.mqFaultStrategy.startDetector();
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
@@ -1740,6 +1742,19 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
+ private void initTopicRoute() {
+ List<String> topics = this.defaultMQProducer.getTopics();
+ if (topics != null && topics.size() > 0) {
+ topics.forEach(topic -> {
+ String newTopic = NamespaceUtil.wrapNamespace(this.defaultMQProducer.getNamespace(), topic);
+ TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(newTopic);
+ if (topicPublishInfo == null || !topicPublishInfo.ok()) {
+ log.warn("No route info of this topic: " + newTopic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO));
+ }
+ });
+ }
+ }
+
public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
return topicPublishInfoTable;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 7bd3876f5..700e00aac 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -87,6 +87,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private String producerGroup;
+ /**
+ * Topics that need to be initialized for transaction producer
+ */
+ private List<String> topics;
+
/**
* Just for testing or demo program
*/
@@ -235,6 +240,22 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
+ /**
+ * Constructor specifying namespace, producer group, topics and RPC hook.
+ *
+ * @param namespace Namespace for this MQ Producer instance.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param topics Topic that needs to be initialized for routing
+ * @param rpcHook RPC hook to execute per each remoting command execution.
+ */
+ public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) {
+ this.namespace = namespace;
+ this.producerGroup = producerGroup;
+ this.topics = topics;
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+ }
+
/**
* Constructor specifying producer group and enabled msg trace flag.
*
@@ -290,6 +311,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
}
+ /**
+ * Constructor specifying namespace, producer group, topics, RPC hook, enabled msgTrace flag and customized trace topic
+ * name.
+ *
+ * @param namespace Namespace for this MQ Producer instance.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param topics Topic that needs to be initialized for routing
+ * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param enableMsgTrace Switch flag instance for message trace.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+ * trace topic name.
+ */
+ public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics,
+ RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
+ this.namespace = namespace;
+ this.producerGroup = producerGroup;
+ this.topics = topics;
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+ //if client open the message trace feature
+ if (enableMsgTrace) {
+ try {
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
+ dispatcher.setHostProducer(this.defaultMQProducerImpl);
+ traceDispatcher = dispatcher;
+ this.defaultMQProducerImpl.registerSendMessageHook(
+ new SendMessageTraceHookImpl(traceDispatcher));
+ this.defaultMQProducerImpl.registerEndTransactionHook(
+ new EndTransactionTraceHookImpl(traceDispatcher));
+ } catch (Throwable e) {
+ logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
+ }
+ }
+ }
+
@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
@@ -1316,4 +1372,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize);
}
+ public List<String> getTopics() {
+ return topics;
+ }
+
+ public void setTopics(List<String> topics) {
+ this.topics = topics;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
index d529f3e77..2c3b479f7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.producer;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
@@ -36,19 +37,31 @@ public class TransactionMQProducer extends DefaultMQProducer {
}
public TransactionMQProducer(final String producerGroup) {
- this(null, producerGroup, null);
+ this(null, producerGroup, null, null);
+ }
+
+ public TransactionMQProducer(final String producerGroup, final List<String> topics) {
+ this(null, producerGroup, topics, null);
}
public TransactionMQProducer(final String namespace, final String producerGroup) {
- this(namespace, producerGroup, null);
+ this(namespace, producerGroup, null, null);
+ }
+
+ public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics) {
+ this(namespace, producerGroup, topics, null);
}
public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
- this(null, producerGroup, rpcHook);
+ this(null, producerGroup, null, rpcHook);
+ }
+
+ public TransactionMQProducer(final String producerGroup, final List<String> topics, RPCHook rpcHook) {
+ this(null, producerGroup, topics, rpcHook);
}
- public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
- super(namespace, producerGroup, rpcHook);
+ public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) {
+ super(namespace, producerGroup, topics, rpcHook);
}
public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
index 5973c3c30..d1d57c55e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -39,7 +40,7 @@ public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
- TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
+ TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC));
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
--
2.32.0.windows.2
From 5b43387be33506e4c19df4783724d06b1dfdc062 Mon Sep 17 00:00:00 2001
From: Zhouxiang Zhan <zhouxzhan@apache.org>
Date: Thu, 23 Nov 2023 14:53:48 +0800
Subject: [PATCH 3/3] [ISSUE #7543] Retry topic v2 in pop (#7544)
* Implement pop retry topic v2
* Use pop retry topic v2 to notify the origin topic
* add parse group
* retry topic v2 compatibility
* calculate consumer lag
* delete retry topic
---
.../acl/plain/PlainAccessResource.java | 3 +-
.../ExpressionForRetryMessageFilter.java | 3 +-
.../NotifyMessageArrivingListener.java | 3 +-
.../longpolling/PopLongPollingService.java | 10 +++
.../broker/metrics/ConsumerLagCalculator.java | 11 ++++
.../processor/AdminBrokerProcessor.java | 4 ++
.../processor/NotificationProcessor.java | 2 +-
.../broker/processor/PopMessageProcessor.java | 24 ++++++-
.../broker/processor/PopReviveService.java | 9 ---
.../processor/SendMessageProcessor.java | 3 +-
.../apache/rocketmq/common/BrokerConfig.java | 10 +++
.../apache/rocketmq/common/KeyBuilder.java | 37 +++++++++--
.../rocketmq/common/KeyBuilderTest.java | 65 +++++++++++++++++++
.../consumer/ConsumerProgressSubCommand.java | 3 +-
.../tools/monitor/MonitorService.java | 3 +-
15 files changed, 168 insertions(+), 22 deletions(-)
create mode 100644 common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
index 72aa8ca71..1e185afff 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
@@ -48,6 +48,7 @@ import org.apache.rocketmq.acl.common.AuthenticationHeader;
import org.apache.rocketmq.acl.common.AuthorizationHeader;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
@@ -341,7 +342,7 @@ public class PlainAccessResource implements AccessResource {
if (retryTopic == null) {
return null;
}
- return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ return KeyBuilder.parseGroup(retryTopic);
}
public static String getRetryTopic(String group) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
index bc01b21cb..cc3e37bf4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.filter;
import java.nio.ByteBuffer;
import java.util.Map;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageConst;
@@ -62,7 +63,7 @@ public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}
String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC);
- String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ String group = KeyBuilder.parseGroup(subscriptionData.getTopic());
realFilterData = this.consumerFilterManager.get(realTopic, group);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
index 3c099fe2f..e55ed2778 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
@@ -17,12 +17,11 @@
package org.apache.rocketmq.broker.longpolling;
+import java.util.Map;
import org.apache.rocketmq.broker.processor.NotificationProcessor;
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
import org.apache.rocketmq.store.MessageArrivingListener;
-import java.util.Map;
-
public class NotifyMessageArrivingListener implements MessageArrivingListener {
private final PullRequestHoldService pullRequestHoldService;
private final PopMessageProcessor popMessageProcessor;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index 113c91297..f1bc9adc4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -144,6 +144,16 @@ public class PopLongPollingService extends ServiceThread {
}
}
+ public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId) {
+ String notifyTopic;
+ if (KeyBuilder.isPopRetryTopicV2(topic)) {
+ notifyTopic = KeyBuilder.parseNormalTopic(topic);
+ } else {
+ notifyTopic = topic;
+ }
+ notifyMessageArriving(notifyTopic, queueId);
+ }
+
public void notifyMessageArriving(final String topic, final int queueId) {
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
if (cids == null) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index af08a83c7..d1f3fffde 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -185,6 +185,17 @@ public class ConsumerLagCalculator {
continue;
}
}
+ if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
+ String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
+ TopicConfig retryTopicConfigV1 = topicConfigManager.selectTopicConfig(retryTopicV1);
+ if (retryTopicConfigV1 != null) {
+ int retryTopicPerm = retryTopicConfigV1.getPerm() & brokerConfig.getBrokerPermission();
+ if (PermName.isReadable(retryTopicPerm) || PermName.isWriteable(retryTopicPerm)) {
+ consumer.accept(new ProcessGroupInfo(group, topic, true, retryTopicV1));
+ continue;
+ }
+ }
+ }
consumer.accept(new ProcessGroupInfo(group, topic, true, null));
} else {
consumer.accept(new ProcessGroupInfo(group, topic, false, null));
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index fbba6633b..863b275d1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -548,6 +548,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) {
deleteTopicInBroker(popRetryTopic);
}
+ final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
+ if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) {
+ deleteTopicInBroker(popRetryTopicV1);
+ }
}
// delete topic
deleteTopicInBroker(topic);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index a15340383..91d275dfe 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -58,7 +58,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
}
public void notifyMessageArriving(final String topic, final int queueId) {
- popLongPollingService.notifyMessageArriving(topic, queueId);
+ popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId);
}
@Override
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 7ed4d53ab..58baecc05 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -185,7 +185,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
}
public void notifyMessageArriving(final String topic, final int queueId) {
- popLongPollingService.notifyMessageArriving(topic, queueId);
+ popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId);
}
public boolean notifyMessageArriving(final String topic, final String cid, final int queueId) {
@@ -364,6 +364,17 @@ public class PopMessageProcessor implements NettyRequestProcessor {
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
}
+ if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
+ TopicConfig retryTopicConfigV1 =
+ this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
+ if (retryTopicConfigV1 != null) {
+ for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) {
+ int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums();
+ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
+ startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
+ }
+ }
+ }
}
if (requestHeader.getQueueId() < 0) {
// read all queue
@@ -388,6 +399,17 @@ public class PopMessageProcessor implements NettyRequestProcessor {
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
}
+ if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
+ TopicConfig retryTopicConfigV1 =
+ this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
+ if (retryTopicConfigV1 != null) {
+ for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) {
+ int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums();
+ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
+ startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
+ }
+ }
+ }
}
final RemotingCommand finalResponse = response;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 3fb689ed6..8d25bc57e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -142,15 +142,6 @@ public class PopReviveService extends ServiceThread {
this.brokerController.getBrokerStatsManager().incBrokerPutNums(popCheckPoint.getTopic(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
- if (brokerController.getPopMessageProcessor() != null) {
- brokerController.getPopMessageProcessor().notifyMessageArriving(
- KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
- popCheckPoint.getCId(),
- -1
- );
- brokerController.getNotificationProcessor().notifyMessageArriving(
- KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
- }
return true;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 956ef43fb..4ec84c146 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.AbortProcessException;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -178,7 +179,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageExt msg, TopicConfig topicConfig, Map<String, String> properties) {
String newTopic = requestHeader.getTopic();
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ String groupName = KeyBuilder.parseGroup(newTopic);
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 0d248c4e1..c186352d1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -223,6 +223,8 @@ public class BrokerConfig extends BrokerIdentity {
private boolean enablePopBatchAck = false;
private boolean enableNotifyAfterPopOrderLockRelease = true;
private boolean initPopOffsetByCheckMsgInMem = true;
+ // read message from pop retry topic v1, for the compatibility, will be removed in the future version
+ private boolean retrieveMessageFromPopRetryTopicV1 = true;
private boolean realTimeNotifyConsumerChange = true;
@@ -1284,6 +1286,14 @@ public class BrokerConfig extends BrokerIdentity {
this.initPopOffsetByCheckMsgInMem = initPopOffsetByCheckMsgInMem;
}
+ public boolean isRetrieveMessageFromPopRetryTopicV1() {
+ return retrieveMessageFromPopRetryTopicV1;
+ }
+
+ public void setRetrieveMessageFromPopRetryTopicV1(boolean retrieveMessageFromPopRetryTopicV1) {
+ this.retrieveMessageFromPopRetryTopicV1 = retrieveMessageFromPopRetryTopicV1;
+ }
+
public boolean isRealTimeNotifyConsumerChange() {
return realTimeNotifyConsumerChange;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
index e1532d939..f2a8c4089 100644
--- a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
@@ -18,24 +18,53 @@ package org.apache.rocketmq.common;
public class KeyBuilder {
public static final int POP_ORDER_REVIVE_QUEUE = 999;
+ private static final String POP_RETRY_SEPARATOR_V1 = "_";
+ private static final String POP_RETRY_SEPARATOR_V2 = ":";
public static String buildPopRetryTopic(String topic, String cid) {
- return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + topic;
+ return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2 + topic;
+ }
+
+ public static String buildPopRetryTopicV1(String topic, String cid) {
+ return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1 + topic;
}
public static String parseNormalTopic(String topic, String cid) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_").length());
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2)) {
+ return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2).length());
+ }
+ return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1).length());
} else {
return topic;
}
}
+ public static String parseNormalTopic(String retryTopic) {
+ if (isPopRetryTopicV2(retryTopic)) {
+ String[] result = retryTopic.split(POP_RETRY_SEPARATOR_V2);
+ if (result.length == 2) {
+ return result[1];
+ }
+ }
+ return retryTopic;
+ }
+
+ public static String parseGroup(String retryTopic) {
+ if (isPopRetryTopicV2(retryTopic)) {
+ String[] result = retryTopic.split(POP_RETRY_SEPARATOR_V2);
+ if (result.length == 2) {
+ return result[0].substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ }
+ }
+ return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ }
+
public static String buildPollingKey(String topic, String cid, int queueId) {
return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId;
}
- public static String buildPollingNotificationKey(String topic, int queueId) {
- return topic + PopAckConstants.SPLIT + queueId;
+ public static boolean isPopRetryTopicV2(String retryTopic) {
+ return retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && retryTopic.contains(POP_RETRY_SEPARATOR_V2);
}
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
new file mode 100644
index 000000000..f83e0aa14
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class KeyBuilderTest {
+ String topic = "test-topic";
+ String group = "test-group";
+
+ @Test
+ public void buildPopRetryTopic() {
+ assertThat(KeyBuilder.buildPopRetryTopic(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + ":" + topic);
+ }
+
+ @Test
+ public void buildPopRetryTopicV1() {
+ assertThat(KeyBuilder.buildPopRetryTopicV1(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + "_" + topic);
+ }
+
+ @Test
+ public void parseNormalTopic() {
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+ assertThat(KeyBuilder.parseNormalTopic(popRetryTopic, group)).isEqualTo(topic);
+ String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
+ assertThat(KeyBuilder.parseNormalTopic(popRetryTopicV1, group)).isEqualTo(topic);
+ }
+
+ @Test
+ public void testParseNormalTopic() {
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+ assertThat(KeyBuilder.parseNormalTopic(popRetryTopic)).isEqualTo(topic);
+ }
+
+ @Test
+ public void parseGroup() {
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+ assertThat(KeyBuilder.parseGroup(popRetryTopic)).isEqualTo(group);
+ }
+
+ @Test
+ public void isPopRetryTopicV2() {
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+ assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopic)).isEqualTo(true);
+ String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
+ assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopicV1)).isEqualTo(false);
+ }
+}
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index 97125b854..c489cad68 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
@@ -212,7 +213,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
for (String topic : topicList.getTopicList()) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ String consumerGroup = KeyBuilder.parseGroup(topic);
try {
ConsumeStats consumeStats = null;
try {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
index 45dc3a036..b66dfad20 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -172,7 +173,7 @@ public class MonitorService {
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
for (String topic : topicList.getTopicList()) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ String consumerGroup = KeyBuilder.parseGroup(topic);
try {
this.reportUndoneMsgs(consumerGroup);
--
2.32.0.windows.2

View File

@ -0,0 +1,31 @@
From e955e4399ceed5b5a1fbadc400883cfc5f99e726 Mon Sep 17 00:00:00 2001
From: AYue <40812847+AYue-94@users.noreply.github.com>
Date: Fri, 24 Nov 2023 10:47:08 +0800
Subject: [PATCH] [ISSUE #7577] SlaveActingMaster Timer Message retry without
escape logic (#7578)
Co-authored-by: ayue <ericyu0421@163.com>
---
.../org/apache/rocketmq/store/timer/TimerMessageStore.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 3ab51a26d..d796e4467 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -1105,7 +1105,11 @@ public class TimerMessageStore {
}
}
Thread.sleep(50);
- putMessageResult = messageStore.putMessage(message);
+ if (escapeBridgeHook != null) {
+ putMessageResult = escapeBridgeHook.apply(message);
+ } else {
+ putMessageResult = messageStore.putMessage(message);
+ }
LOGGER.warn("Retrying to do put timer msg retryNum:{} putRes:{} msg:{}", retryNum, putMessageResult, message);
}
return PUT_NO_RETRY;
--
2.32.0.windows.2

View File

@ -0,0 +1,562 @@
From 9cfe724e6a188ea444c90ee00f2453da1b807bfa Mon Sep 17 00:00:00 2001
From: dinglei <libya_003@163.com>
Date: Tue, 28 Nov 2023 10:04:17 +0800
Subject: [PATCH 1/3] Add validation in broker/namesrv configure updating
command (#7584)
* Add validation for keys in black list in mqadmin command.
* Cancel validation for keys in black list in putKV command.
---
.../processor/AdminBrokerProcessor.java | 25 ++++++++++++++--
.../processor/AdminBrokerProcessorTest.java | 12 +++++++-
.../apache/rocketmq/common/BrokerConfig.java | 11 +++++++
.../rocketmq/common/ControllerConfig.java | 11 +++++++
.../common/namesrv/NamesrvConfig.java | 10 +++++++
.../processor/ControllerRequestProcessor.java | 27 +++++++++++++----
.../ControllerRequestProcessorTest.java | 23 +++++++++++++-
.../processor/DefaultRequestProcessor.java | 30 +++++++++++++++++--
.../processor/RequestProcessorTest.java | 15 ++++++++--
9 files changed, 149 insertions(+), 15 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 863b275d1..978c2e81d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -25,6 +25,7 @@ import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -193,9 +194,19 @@ import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorRe
public class AdminBrokerProcessor implements NettyRequestProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected final BrokerController brokerController;
+ protected Set<String> configBlackList = new HashSet<>();
public AdminBrokerProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
+ initConfigBlackList();
+ }
+
+ private void initConfigBlackList() {
+ configBlackList.add("brokerConfigPath");
+ configBlackList.add("rocketmqHome");
+ configBlackList.add("configBlackList");
+ String[] configArray = brokerController.getBrokerConfig().getConfigBlackList().split(";");
+ configBlackList.addAll(Arrays.asList(configArray));
}
@Override
@@ -919,10 +930,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
Properties properties = MixAll.string2Properties(bodyStr);
if (properties != null) {
LOGGER.info("updateBrokerConfig, new config: [{}] client: {} ", properties, callerAddress);
-
- if (properties.containsKey("brokerConfigPath")) {
+ if (validateBlackListConfigExist(properties)) {
response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark("Can not update config path");
+ response.setRemark("Can not update config in black list.");
return response;
}
@@ -2796,4 +2806,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
return false;
}
+
+ private boolean validateBlackListConfigExist(Properties properties) {
+ for (String blackConfig:configBlackList) {
+ if (properties.containsKey(blackConfig)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index ec252cece..c6b889bae 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -370,8 +370,18 @@ public class AdminBrokerProcessorTest {
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
- assertThat(response.getRemark()).contains("Can not update config path");
+ assertThat(response.getRemark()).contains("Can not update config in black list.");
+ //update disallowed value
+ properties.clear();
+ properties.setProperty("configBlackList", "test;path");
+ updateConfigRequest.setBody(MixAll.properties2String(properties).getBytes(StandardCharsets.UTF_8));
+
+ response = adminBrokerProcessor.processRequest(ctx, updateConfigRequest);
+
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
+ assertThat(response.getRemark()).contains("Can not update config in black list.");
}
@Test
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index c186352d1..96e0f8e91 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -406,6 +406,17 @@ public class BrokerConfig extends BrokerIdentity {
private int splitRegistrationSize = 800;
+ /**
+ * Config in this black list will be not allowed to update by command.
+ * Try to update this config black list by restart process.
+ * Try to update configures in black list by restart process.
+ */
+ private String configBlackList = "configBlackList;brokerConfigPath";
+
+ public String getConfigBlackList() {
+ return configBlackList;
+ }
+
public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
index 1e9c80b22..55854cfd2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
@@ -83,6 +83,17 @@ public class ControllerConfig {
private boolean metricsInDelta = false;
+ /**
+ * Config in this black list will be not allowed to update by command.
+ * Try to update this config black list by restart process.
+ * Try to update configures in black list by restart process.
+ */
+ private String configBlackList = "configBlackList;configStorePath";
+
+ public String getConfigBlackList() {
+ return configBlackList;
+ }
+
public String getRocketmqHome() {
return rocketmqHome;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
index 5b8a6dedb..b82d1b8f8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
@@ -90,6 +90,16 @@ public class NamesrvConfig {
* 2. This flag does not support static topic currently.
*/
private boolean deleteTopicWithBrokerRegistration = false;
+ /**
+ * Config in this black list will be not allowed to update by command.
+ * Try to update this config black list by restart process.
+ * Try to update configures in black list by restart process.
+ */
+ private String configBlackList = "configBlackList;configStorePath;kvConfigPath";
+
+ public String getConfigBlackList() {
+ return configBlackList;
+ }
public boolean isOrderMessageEnable() {
return orderMessageEnable;
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 93ecbbd9d..a8a3d2587 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -20,8 +20,11 @@ import com.google.common.base.Stopwatch;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -73,12 +76,20 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
private static final int WAIT_TIMEOUT_OUT = 5;
private final ControllerManager controllerManager;
private final BrokerHeartbeatManager heartbeatManager;
+ protected Set<String> configBlackList = new HashSet<>();
public ControllerRequestProcessor(final ControllerManager controllerManager) {
this.controllerManager = controllerManager;
this.heartbeatManager = controllerManager.getHeartbeatManager();
+ initConfigBlackList();
+ }
+ private void initConfigBlackList() {
+ configBlackList.add("configBlackList");
+ configBlackList.add("configStorePath");
+ configBlackList.add("rocketmqHome");
+ String[] configArray = controllerManager.getControllerConfig().getConfigBlackList().split(";");
+ configBlackList.addAll(Arrays.asList(configArray));
}
-
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
if (ctx != null) {
@@ -280,10 +291,9 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
response.setRemark("string2Properties error");
return response;
}
-
- if (properties.containsKey("configStorePath")) {
+ if (validateBlackListConfigExist(properties)) {
response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark("Can not update config path");
+ response.setRemark("Can not update config in black list.");
return response;
}
@@ -319,5 +329,12 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
public boolean rejectRequest() {
return false;
}
-
+ private boolean validateBlackListConfigExist(Properties properties) {
+ for (String blackConfig : configBlackList) {
+ if (properties.containsKey(blackConfig)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/ControllerRequestProcessorTest.java b/controller/src/test/java/org/apache/rocketmq/controller/ControllerRequestProcessorTest.java
index ede6ca36a..46f86ad32 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/ControllerRequestProcessorTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/ControllerRequestProcessorTest.java
@@ -64,7 +64,28 @@ public class ControllerRequestProcessorTest {
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
- assertThat(response.getRemark()).contains("Can not update config path");
+ assertThat(response.getRemark()).contains("Can not update config in black list.");
+ // Update disallowed value
+ properties.clear();
+ properties.setProperty("rocketmqHome", "test/path");
+ updateConfigRequest.setBody(MixAll.properties2String(properties).getBytes(StandardCharsets.UTF_8));
+
+ response = controllerRequestProcessor.processRequest(null, updateConfigRequest);
+
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
+ assertThat(response.getRemark()).contains("Can not update config in black list.");
+
+ // Update disallowed value
+ properties.clear();
+ properties.setProperty("configBlackList", "test;path");
+ updateConfigRequest.setBody(MixAll.properties2String(properties).getBytes(StandardCharsets.UTF_8));
+
+ response = controllerRequestProcessor.processRequest(null, updateConfigRequest);
+
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
+ assertThat(response.getRemark()).contains("Can not update config in black list.");
}
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 485b95c42..2daa95b9b 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -18,8 +18,11 @@ package org.apache.rocketmq.namesrv.processor;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MQVersion;
@@ -71,8 +74,20 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
protected final NamesrvController namesrvController;
+ protected Set<String> configBlackList = new HashSet<>();
+
public DefaultRequestProcessor(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
+ initConfigBlackList();
+ }
+
+ private void initConfigBlackList() {
+ configBlackList.add("configBlackList");
+ configBlackList.add("configStorePath");
+ configBlackList.add("kvConfigPath");
+ configBlackList.add("rocketmqHome");
+ String[] configArray = namesrvController.getNamesrvConfig().getConfigBlackList().split(";");
+ configBlackList.addAll(Arrays.asList(configArray));
}
@Override
@@ -153,6 +168,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
response.setRemark("namespace or key is null");
return response;
}
+
this.namesrvController.getKvConfigManager().putKVConfig(
requestHeader.getNamespace(),
requestHeader.getKey(),
@@ -623,10 +639,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
response.setRemark("string2Properties error");
return response;
}
-
- if (properties.containsKey("kvConfigPath") || properties.containsKey("configStorePath")) {
+ if (validateBlackListConfigExist(properties)) {
response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark("Can not update config path");
+ response.setRemark("Can not update config in black list.");
return response;
}
@@ -658,4 +673,13 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
+ private boolean validateBlackListConfigExist(Properties properties) {
+ for (String blackConfig : configBlackList) {
+ if (properties.containsKey(blackConfig)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
}
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
index 5bdf96d9d..2b2cf6294 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
@@ -203,7 +203,7 @@ public class RequestProcessorTest {
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
- assertThat(response.getRemark()).contains("Can not update config path");
+ assertThat(response.getRemark()).contains("Can not update config in black list.");
//update disallowed values
properties.clear();
@@ -214,7 +214,18 @@ public class RequestProcessorTest {
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
- assertThat(response.getRemark()).contains("Can not update config path");
+ assertThat(response.getRemark()).contains("Can not update config in black list");
+
+ //update disallowed values
+ properties.clear();
+ properties.setProperty("configBlackList", "test;path");
+ updateConfigRequest.setBody(MixAll.properties2String(properties).getBytes(StandardCharsets.UTF_8));
+
+ response = defaultRequestProcessor.processRequest(null, updateConfigRequest);
+
+ assertThat(response).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
+ assertThat(response.getRemark()).contains("Can not update config in black list");
}
@Test
--
2.32.0.windows.2
From 430ee0a755daf867de31e37b12df417f64811b3a Mon Sep 17 00:00:00 2001
From: rongtong <jinrongtong5@163.com>
Date: Tue, 28 Nov 2023 16:11:14 +0800
Subject: [PATCH 2/3] Add validation in broker container configure updating
command. (#7587)
---
.../container/BrokerContainerConfig.java | 16 ++++++++
.../container/BrokerContainerProcessor.java | 40 +++++++++++++++++--
2 files changed, 52 insertions(+), 4 deletions(-)
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
index e03b10c34..03b4b263f 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
@@ -49,6 +49,14 @@ public class BrokerContainerConfig {
*/
private long updateNamesrvAddrInterval = 60 * 2 * 1000;
+
+ /**
+ * Config in this black list will be not allowed to update by command.
+ * Try to update this config black list by restart process.
+ * Try to update configures in black list by restart process.
+ */
+ private String configBlackList = "configBlackList;brokerConfigPaths";
+
public String getRocketmqHome() {
return rocketmqHome;
}
@@ -108,4 +116,12 @@ public class BrokerContainerConfig {
public void setUpdateNamesrvAddrInterval(long updateNamesrvAddrInterval) {
this.updateNamesrvAddrInterval = updateNamesrvAddrInterval;
}
+
+ public String getConfigBlackList() {
+ return configBlackList;
+ }
+
+ public void setConfigBlackList(String configBlackList) {
+ this.configBlackList = configBlackList;
+ }
}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
index 5b825fe81..5ced08257 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
@@ -19,6 +19,9 @@ package org.apache.rocketmq.container;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
import java.util.List;
import java.util.Properties;
import org.apache.rocketmq.broker.BrokerController;
@@ -45,8 +48,19 @@ public class BrokerContainerProcessor implements NettyRequestProcessor {
private final BrokerContainer brokerContainer;
private List<BrokerBootHook> brokerBootHookList;
+ private final Set<String> configBlackList = new HashSet<>();
+
public BrokerContainerProcessor(BrokerContainer brokerContainer) {
this.brokerContainer = brokerContainer;
+ initConfigBlackList();
+ }
+
+ private void initConfigBlackList() {
+ configBlackList.add("brokerConfigPaths");
+ configBlackList.add("rocketmqHome");
+ configBlackList.add("configBlackList");
+ String[] configArray = brokerContainer.getBrokerContainerConfig().getConfigBlackList().split(";");
+ configBlackList.addAll(Arrays.asList(configArray));
}
@Override
@@ -232,15 +246,24 @@ public class BrokerContainerProcessor implements NettyRequestProcessor {
try {
String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
Properties properties = MixAll.string2Properties(bodyStr);
- if (properties != null) {
- LOGGER.info("updateSharedBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
- this.brokerContainer.getConfiguration().update(properties);
- } else {
+
+ if (properties == null) {
LOGGER.error("string2Properties error");
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("string2Properties error");
return response;
}
+
+ if (validateBlackListConfigExist(properties)) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark("Can not update config in black list.");
+ return response;
+ }
+
+
+ LOGGER.info("updateBrokerContainerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
+ this.brokerContainer.getConfiguration().update(properties);
+
} catch (UnsupportedEncodingException e) {
LOGGER.error("", e);
response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -254,6 +277,15 @@ public class BrokerContainerProcessor implements NettyRequestProcessor {
return response;
}
+ private boolean validateBlackListConfigExist(Properties properties) {
+ for (String blackConfig : configBlackList) {
+ if (properties.containsKey(blackConfig)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class);
--
2.32.0.windows.2
From a194e1eb9a12e08c43a0da65cd0a048ff849e04d Mon Sep 17 00:00:00 2001
From: dinglei <libya_003@163.com>
Date: Tue, 28 Nov 2023 20:18:53 +0800
Subject: [PATCH 3/3] Add set method for config black list. (#7586)
---
.../main/java/org/apache/rocketmq/common/BrokerConfig.java | 4 ++++
.../java/org/apache/rocketmq/common/ControllerConfig.java | 4 ++++
.../org/apache/rocketmq/common/namesrv/NamesrvConfig.java | 4 ++++
3 files changed, 12 insertions(+)
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 96e0f8e91..a4a553ad5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -417,6 +417,10 @@ public class BrokerConfig extends BrokerIdentity {
return configBlackList;
}
+ public void setConfigBlackList(String configBlackList) {
+ this.configBlackList = configBlackList;
+ }
+
public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
index 55854cfd2..1364754a0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
@@ -94,6 +94,10 @@ public class ControllerConfig {
return configBlackList;
}
+ public void setConfigBlackList(String configBlackList) {
+ this.configBlackList = configBlackList;
+ }
+
public String getRocketmqHome() {
return rocketmqHome;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
index b82d1b8f8..d1cdc7631 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
@@ -101,6 +101,10 @@ public class NamesrvConfig {
return configBlackList;
}
+ public void setConfigBlackList(String configBlackList) {
+ this.configBlackList = configBlackList;
+ }
+
public boolean isOrderMessageEnable() {
return orderMessageEnable;
}
--
2.32.0.windows.2

View File

@ -0,0 +1,80 @@
From 56e886bf70669befd7b9e7380e68751fe67f05b2 Mon Sep 17 00:00:00 2001
From: YASH PATEL <121890726+yp969803@users.noreply.github.com>
Date: Wed, 29 Nov 2023 10:02:38 +0530
Subject: [PATCH] =?UTF-8?q?[ISSUE=20#7592]=20testCleanBuffer=20unit=20test?=
=?UTF-8?q?=20modifies,=20changed=20non-direct=20=E2=80=A6=20(#7593)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
* [ISSUE #7592] testCleanBuffer unit test modifies, changed non-direct to direct buffer allocation
* fix: consolidate UtilAll#cleanBuffer by checking if the given buffer is direct or not
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
---------
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
Co-authored-by: Li Zhanhui <lizhanhui@gmail.com>
---
.../main/java/org/apache/rocketmq/common/UtilAll.java | 9 +++++++++
.../java/org/apache/rocketmq/common/UtilAllTest.java | 3 ++-
.../rocketmq/store/timer/TimerMessageStoreTest.java | 2 +-
3 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index 2808f106a..19efa9aa9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -699,10 +699,19 @@ public class UtilAll {
}
}
+ /**
+ * Free direct-buffer's memory actively.
+ * @param buffer Direct buffer to free.
+ */
public static void cleanBuffer(final ByteBuffer buffer) {
if (null == buffer) {
return;
}
+
+ if (!buffer.isDirect()) {
+ return;
+ }
+
PlatformDependent.freeDirectBuffer(buffer);
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
index cb288578c..2d22d5254 100644
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -215,8 +215,9 @@ public class UtilAllTest {
@Test
public void testCleanBuffer() {
UtilAll.cleanBuffer(null);
+ UtilAll.cleanBuffer(ByteBuffer.allocateDirect(10));
+ UtilAll.cleanBuffer(ByteBuffer.allocateDirect(0));
UtilAll.cleanBuffer(ByteBuffer.allocate(10));
- UtilAll.cleanBuffer(ByteBuffer.allocate(0));
}
@Test
diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index 63ec97cdb..02ff35681 100644
--- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -387,7 +387,7 @@ public class TimerMessageStoreTest {
assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
}
- // Wait until messages have wrote to TimerLog and currReadTimeMs catches up current time.
+ // Wait until messages have written to TimerLog and currReadTimeMs catches up current time.
await().atMost(5000, TimeUnit.MILLISECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() {
--
2.32.0.windows.2

View File

@ -0,0 +1,53 @@
From 65faea22fd54fd9875f2ca9d3088b4dc46d31cce Mon Sep 17 00:00:00 2001
From: keranbingaa <397294722@qq.com>
Date: Fri, 1 Dec 2023 10:05:16 +0800
Subject: [PATCH] [ISSUE #7534] Use high performance concurrent set to replace
copyonwriteset (#7583)
* fix ISSUE #7534
* reformat code
* Remove the useless unit test
---------
Co-authored-by: RongtongJin <jinrongtong16@mails.ucas.ac.cn>
---
.../rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java | 1 -
.../org/apache/rocketmq/remoting/protocol/body/TopicList.java | 4 ++--
2 files changed, 2 insertions(+), 3 deletions(-)
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
index 6002d1f5a..b52cf5074 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
@@ -130,7 +130,6 @@ public class RouteInfoManagerNewTest {
topicList = TopicList.decode(content, TopicList.class);
assertThat(topicList.getTopicList()).contains("TestTopic", "TestTopic1", "TestTopic2");
}
-
@Test
public void registerBroker() {
// Register master broker
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/TopicList.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/TopicList.java
index 30edfb5a9..0de0bae7e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/TopicList.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/TopicList.java
@@ -17,11 +17,11 @@
package org.apache.rocketmq.remoting.protocol.body;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicList extends RemotingSerializable {
- private Set<String> topicList = new CopyOnWriteArraySet<>();
+ private Set<String> topicList = ConcurrentHashMap.newKeySet();
private String brokerAddr;
public Set<String> getTopicList() {
--
2.32.0.windows.2

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,186 @@
From c2c29c2435e0626cfe4f49830fbdc0d9421d82b5 Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Mon, 4 Dec 2023 16:13:07 +0800
Subject: [PATCH 1/2] [ISSUE #7545] Fix set mapped file to null cause file can
not destroy (#7612)
---
.../rocketmq/tieredstore/index/IndexStoreFile.java | 2 --
.../rocketmq/tieredstore/index/IndexStoreService.java | 10 ++++++++++
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index 52a686f68..def5c8f2d 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -457,11 +457,9 @@ public class IndexStoreFile implements IndexFile {
this.fileStatus.set(IndexStatusEnum.SHUTDOWN);
if (this.mappedFile != null) {
this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
- this.mappedFile = null;
}
if (this.compactMappedFile != null) {
this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
- this.compactMappedFile = null;
}
} catch (Exception e) {
log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e);
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 14608aa58..e99ea0de1 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
@@ -101,6 +102,10 @@ public class IndexStoreService extends ServiceThread implements IndexService {
private void recover() {
Stopwatch stopwatch = Stopwatch.createStarted();
+ // delete compact file directory
+ UtilAll.deleteFile(new File(Paths.get(storeConfig.getStorePathRootDir(),
+ FILE_DIRECTORY_NAME, FILE_COMPACTED_DIRECTORY_NAME).toString()));
+
// recover local
File dir = new File(Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME).toString());
this.doConvertOldFormatFile(Paths.get(dir.getPath(), "0000").toString());
@@ -141,6 +146,10 @@ public class IndexStoreService extends ServiceThread implements IndexService {
for (TieredFileSegment fileSegment : flatFile.getFileSegmentList()) {
IndexFile indexFile = new IndexStoreFile(storeConfig, fileSegment);
+ IndexFile localFile = timeStoreTable.get(indexFile.getTimestamp());
+ if (localFile != null) {
+ localFile.destroy();
+ }
timeStoreTable.put(indexFile.getTimestamp(), indexFile);
log.info("IndexStoreService recover load remote file, timestamp: {}", indexFile.getTimestamp());
}
@@ -248,6 +257,7 @@ public class IndexStoreService extends ServiceThread implements IndexService {
if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
log.error("IndexStoreService file status not correct, so skip, timestamp: {}, status: {}",
indexFile.getTimestamp(), indexFile.getFileStatus());
+ indexFile.destroy();
return;
}
--
2.32.0.windows.2
From faae64715d917bb5d64b8d72581172d26ebe9501 Mon Sep 17 00:00:00 2001
From: gaoyf <gaoyf@users.noreply.github.com>
Date: Thu, 7 Dec 2023 11:25:22 +0800
Subject: [PATCH 2/2] [ISSUE #7601] Fix slave acting master bug (#7603)
* fix NullPointerException when message escape to remote
* fix NumberFormatException when message retry to escape to remote
* fix timerCheckPoint of the master is not updated, causing the timer message to be replayed after master is restarted
* Use properties copies instead of referencing the same map when converting message
---
.../org/apache/rocketmq/broker/BrokerController.java | 1 +
.../rocketmq/broker/slave/SlaveSynchronize.java | 4 +++-
.../rocketmq/common/message/MessageAccessor.java | 7 +++++++
.../rocketmq/store/timer/TimerMessageStore.java | 12 +++++++++---
4 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9f1fd0ad0..8d29d4438 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -2108,6 +2108,7 @@ public class BrokerController {
isScheduleServiceStart = shouldStart;
if (timerMessageStore != null) {
+ timerMessageStore.syncLastReadTimeMs();
timerMessageStore.setShouldRunningDequeue(shouldStart);
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 53cdecdf8..7f802adb9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -215,11 +215,13 @@ public class SlaveSynchronize {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
- if (null != brokerController.getMessageStore().getTimerMessageStore()) {
+ if (null != brokerController.getMessageStore().getTimerMessageStore() &&
+ !brokerController.getTimerMessageStore().isShouldRunningDequeue()) {
TimerCheckpoint checkpoint = this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak);
if (null != this.brokerController.getTimerCheckpoint()) {
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs());
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset());
+ this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion());
}
}
} catch (Exception e) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
index 1b7e2bba3..62e3bbd7e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.common.message;
+import java.util.HashMap;
import java.util.Map;
public class MessageAccessor {
@@ -96,4 +97,10 @@ public class MessageAccessor {
return newMsg;
}
+ public static Map<String, String> deepCopyProperties(Map<String, String> properties) {
+ if (properties == null) {
+ return null;
+ }
+ return new HashMap<>(properties);
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index d796e4467..872cd7105 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -602,6 +602,10 @@ public class TimerMessageStore {
this.shouldRunningDequeue = shouldRunningDequeue;
}
+ public boolean isShouldRunningDequeue() {
+ return shouldRunningDequeue;
+ }
+
public void addMetric(MessageExt msg, int value) {
try {
if (null == msg || null == msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
@@ -1084,8 +1088,10 @@ public class TimerMessageStore {
case PUT_OK:
if (brokerStatsManager != null) {
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
- this.brokerStatsManager.incTopicPutSize(message.getTopic(),
- putMessageResult.getAppendMessageResult().getWroteBytes());
+ if (putMessageResult.getAppendMessageResult() != null) {
+ this.brokerStatsManager.incTopicPutSize(message.getTopic(),
+ putMessageResult.getAppendMessageResult().getWroteBytes());
+ }
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
}
return PUT_OK;
@@ -1119,7 +1125,7 @@ public class TimerMessageStore {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
- MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+ MessageAccessor.setProperties(msgInner, MessageAccessor.deepCopyProperties(msgExt.getProperties()));
TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
long tagsCodeValue =
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
--
2.32.0.windows.2

View File

@ -5,7 +5,7 @@
Summary: Cloud-Native, Distributed Messaging and Streaming
Name: rocketmq
Version: 5.1.5
Release: 35
Release: 44
License: Apache-2.0
Group: Applications/Message
URL: https://rocketmq.apache.org/
@ -44,6 +44,15 @@ Patch0031: patch031-backport-Add-CRC-check-of-commitlog.patch
Patch0032: patch032-backport-Clear-POP_CK-when-sending-messages.patch
Patch0033: patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch
Patch0034: patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch
Patch0035: patch035-backport-fix-some-bugs.patch
Patch0036: patch036-backport-RIP65.patch
Patch0037: patch037-backport-Retry-topic-v2-in-pop.patch
Patch0038: patch038-backport-SlaveActingMaster-Timer-Message-retry-without-escape-logic.patch
Patch0039: patch039-backport-add-some-validations.patch
Patch0040: patch040-backport-add-some-test-cases.patch
Patch0041: patch041-backport-improve-performance.patch
Patch0042: patch042-backport-Support-message-filtering.patch
Patch0043: patch043-backport-fix-some-bugs.patch
BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git
Requires: java-1.8.0-openjdk-devel
@ -84,6 +93,33 @@ exit 0
%changelog
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-44
- backport fix some bugs
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-43
- backport Support message filtering
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-42
- backport add improve performance
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-41
- backport add some test cases
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-40
- backport add some validations
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-39
- backport SlaveActingMaster Timer Message retry without escape logic
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-38
- backport Retry topic v2 in pop
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-37
- backport rip 65
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-36
- backport fix some bugs
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-35
- backport Let consumer be aware of message queue assignment change