Compare commits
20 Commits
c1684d0f54
...
915cd31a13
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
915cd31a13 | ||
|
|
5c945d3503 | ||
|
|
0b032b76c8 | ||
|
|
e0aa47ed26 | ||
|
|
68f74a867a | ||
|
|
2bce6cb470 | ||
|
|
60e7ad1bed | ||
|
|
ffa9e6a51d | ||
|
|
4638651c5a | ||
|
|
dba808d5c6 | ||
|
|
8df0259da2 | ||
|
|
df52d543eb | ||
|
|
b3b0264e00 | ||
|
|
a4ade66792 | ||
|
|
0881216d3c | ||
|
|
4c1bdb337e | ||
|
|
15f14248c4 | ||
|
|
cd83ae768b | ||
|
|
e9afdd81f6 | ||
|
|
98aba19773 |
53
README.md
53
README.md
@ -1,7 +1,7 @@
|
||||
# rocketmq
|
||||
|
||||
#### 介绍
|
||||
A Distributed Messaging Platform
|
||||
RocketMQ是一个分布式的消息和流处理平台,具有低延迟、高性能、高可靠性、百亿级容量和灵活扩缩容能力。
|
||||
|
||||
#### 软件架构
|
||||
软件架构说明
|
||||
@ -9,15 +9,54 @@ A Distributed Messaging Platform
|
||||
|
||||
#### 安装教程
|
||||
|
||||
1. xxxx
|
||||
2. xxxx
|
||||
3. xxxx
|
||||
1. 切换yum源
|
||||
```shell
|
||||
cd /etc/yum.repos.d/ && curl -OL https://eur.openeuler.openatom.cn/coprs/zhiliatox/example/repo/openeuler-22.03_LTS_SP2/zhiliatox-example-openeuler-22.03_LTS_SP2.repo
|
||||
```
|
||||
2. 安装rocketmq
|
||||
```shell
|
||||
dnf install rocketmq
|
||||
```
|
||||
3. 使用RocketMQ
|
||||
```shell
|
||||
cd /opt/rocketmq
|
||||
```
|
||||
|
||||
#### 使用说明
|
||||
1. 启动Namesrv
|
||||
|
||||
1. xxxx
|
||||
2. xxxx
|
||||
3. xxxx
|
||||
对于Mac或者Linux用户:
|
||||
```shell
|
||||
### 启动Namesrv
|
||||
$ nohup sh mqnamesrv &
|
||||
|
||||
### 检查日志确定Namesrv是否启动成功
|
||||
$ tail -f ~/logs/rocketmqlogs/namesrv.log
|
||||
The Name Server boot success...
|
||||
```
|
||||
对于Windows用户:
|
||||
```shell
|
||||
$ nohup mqnamesrv.cmd &
|
||||
The Name Server boot success...
|
||||
```
|
||||
|
||||
2. 启动Broker
|
||||
|
||||
对于Mac或者Linux用户:
|
||||
```shell
|
||||
### start Broker
|
||||
$ nohup sh bin/mqbroker -n localhost:9876 &
|
||||
|
||||
### check whether Broker is successfully started, eg: Broker's IP is 192.168.1.2, Broker's name is broker-a
|
||||
$ tail -f ~/logs/rocketmqlogs/broker.log
|
||||
The broker[broker-a, 192.169.1.2:10911] boot success...
|
||||
```
|
||||
|
||||
对于Windows用户:
|
||||
```shell
|
||||
$ mqbroker.cmd -n localhost:9876
|
||||
The broker[broker-a, 192.169.1.2:10911] boot success...
|
||||
```
|
||||
|
||||
#### 参与贡献
|
||||
|
||||
|
||||
394
patch035-backport-fix-some-bugs.patch
Normal file
394
patch035-backport-fix-some-bugs.patch
Normal file
@ -0,0 +1,394 @@
|
||||
From 1be5ebc7363e4bc6503c80688160a354f5a12f78 Mon Sep 17 00:00:00 2001
|
||||
From: Zhanhui Li <lizhanhui@apache.org>
|
||||
Date: Mon, 13 Nov 2023 09:45:37 +0800
|
||||
Subject: [PATCH 1/5] [ISSUE #7551] Reuse helper methods from Netty to free
|
||||
direct byte buffer (#7550)
|
||||
|
||||
* Reuse helper methods from Netty to free direct byte buffer, making codebase JDK 9+ compatible
|
||||
|
||||
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
|
||||
|
||||
* Guard against null
|
||||
|
||||
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
|
||||
|
||||
* fix #7552
|
||||
|
||||
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
|
||||
|
||||
---------
|
||||
|
||||
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
|
||||
---
|
||||
.../org/apache/rocketmq/common/UtilAll.java | 60 +------------------
|
||||
.../apache/rocketmq/common/UtilAllTest.java | 10 ----
|
||||
2 files changed, 3 insertions(+), 67 deletions(-)
|
||||
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
|
||||
index 95b6b09b4..2808f106a 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
|
||||
@@ -16,21 +16,18 @@
|
||||
*/
|
||||
package org.apache.rocketmq.common;
|
||||
|
||||
+import io.netty.util.internal.PlatformDependent;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
-import java.lang.reflect.Field;
|
||||
-import java.lang.reflect.Method;
|
||||
import java.net.Inet4Address;
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Files;
|
||||
-import java.security.AccessController;
|
||||
-import java.security.PrivilegedAction;
|
||||
import java.text.NumberFormat;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
@@ -46,15 +43,11 @@ import java.util.function.Supplier;
|
||||
import java.util.zip.CRC32;
|
||||
import java.util.zip.DeflaterOutputStream;
|
||||
import java.util.zip.InflaterInputStream;
|
||||
-import org.apache.commons.lang3.JavaVersion;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
-import org.apache.commons.lang3.SystemUtils;
|
||||
import org.apache.commons.validator.routines.InetAddressValidator;
|
||||
import org.apache.rocketmq.common.constant.LoggerName;
|
||||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||||
-import sun.misc.Unsafe;
|
||||
-import sun.nio.ch.DirectBuffer;
|
||||
|
||||
public class UtilAll {
|
||||
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
|
||||
@@ -707,57 +700,10 @@ public class UtilAll {
|
||||
}
|
||||
|
||||
public static void cleanBuffer(final ByteBuffer buffer) {
|
||||
- if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0) {
|
||||
+ if (null == buffer) {
|
||||
return;
|
||||
}
|
||||
- if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
|
||||
- try {
|
||||
- Field field = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
- field.setAccessible(true);
|
||||
- Unsafe unsafe = (Unsafe) field.get(null);
|
||||
- Method cleaner = method(unsafe, "invokeCleaner", new Class[] {ByteBuffer.class});
|
||||
- cleaner.invoke(unsafe, viewed(buffer));
|
||||
- } catch (Exception e) {
|
||||
- throw new IllegalStateException(e);
|
||||
- }
|
||||
- } else {
|
||||
- invoke(invoke(viewed(buffer), "cleaner"), "clean");
|
||||
- }
|
||||
- }
|
||||
-
|
||||
- public static Object invoke(final Object target, final String methodName, final Class<?>... args) {
|
||||
- return AccessController.doPrivileged(new PrivilegedAction<Object>() {
|
||||
- @Override
|
||||
- public Object run() {
|
||||
- try {
|
||||
- Method method = method(target, methodName, args);
|
||||
- method.setAccessible(true);
|
||||
- return method.invoke(target);
|
||||
- } catch (Exception e) {
|
||||
- throw new IllegalStateException(e);
|
||||
- }
|
||||
- }
|
||||
- });
|
||||
- }
|
||||
-
|
||||
- public static Method method(Object target, String methodName, Class<?>[] args) throws NoSuchMethodException {
|
||||
- try {
|
||||
- return target.getClass().getMethod(methodName, args);
|
||||
- } catch (NoSuchMethodException e) {
|
||||
- return target.getClass().getDeclaredMethod(methodName, args);
|
||||
- }
|
||||
- }
|
||||
-
|
||||
- private static ByteBuffer viewed(ByteBuffer buffer) {
|
||||
- if (!buffer.isDirect()) {
|
||||
- throw new IllegalArgumentException("buffer is non-direct");
|
||||
- }
|
||||
- ByteBuffer viewedBuffer = (ByteBuffer) ((DirectBuffer) buffer).attachment();
|
||||
- if (viewedBuffer == null) {
|
||||
- return buffer;
|
||||
- } else {
|
||||
- return viewed(viewedBuffer);
|
||||
- }
|
||||
+ PlatformDependent.freeDirectBuffer(buffer);
|
||||
}
|
||||
|
||||
public static void ensureDirOK(final String dirName) {
|
||||
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
|
||||
index 94bb390eb..cb288578c 100644
|
||||
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
|
||||
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
|
||||
@@ -219,16 +219,6 @@ public class UtilAllTest {
|
||||
UtilAll.cleanBuffer(ByteBuffer.allocate(0));
|
||||
}
|
||||
|
||||
- @Test(expected = NoSuchMethodException.class)
|
||||
- public void testMethod() throws NoSuchMethodException {
|
||||
- UtilAll.method(new Object(), "noMethod", null);
|
||||
- }
|
||||
-
|
||||
- @Test(expected = IllegalStateException.class)
|
||||
- public void testInvoke() throws Exception {
|
||||
- UtilAll.invoke(new Object(), "noMethod");
|
||||
- }
|
||||
-
|
||||
@Test
|
||||
public void testCalculateFileSizeInPath() throws Exception {
|
||||
/**
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 4791d9a1f1a7c39e005da15f228473c04eafd007 Mon Sep 17 00:00:00 2001
|
||||
From: lizhimins <707364882@qq.com>
|
||||
Date: Tue, 14 Nov 2023 17:59:47 +0800
|
||||
Subject: [PATCH 2/5] [ISSUE #5923] Revert "Fix tiered store README.md error
|
||||
about Configuration (#7436)" (#7557)
|
||||
|
||||
This reverts commit 70dc93abbcb9bf161378d66fcaca55bedc78b905.
|
||||
---
|
||||
.../tieredstore/common/TieredMessageStoreConfig.java | 10 +++++-----
|
||||
.../tieredstore/provider/posix/PosixFileSegment.java | 4 ++--
|
||||
.../rocketmq/tieredstore/file/TieredCommitLogTest.java | 2 +-
|
||||
.../provider/posix/PosixFileSegmentTest.java | 2 +-
|
||||
4 files changed, 9 insertions(+), 9 deletions(-)
|
||||
|
||||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
|
||||
index a112ea6b1..595db6b86 100644
|
||||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
|
||||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
|
||||
@@ -115,7 +115,7 @@ public class TieredMessageStoreConfig {
|
||||
private long readAheadCacheExpireDuration = 10 * 1000;
|
||||
private double readAheadCacheSizeThresholdRate = 0.3;
|
||||
|
||||
- private String tieredStoreFilepath = "";
|
||||
+ private String tieredStoreFilePath = "";
|
||||
|
||||
private String objectStoreEndpoint = "";
|
||||
|
||||
@@ -350,12 +350,12 @@ public class TieredMessageStoreConfig {
|
||||
this.readAheadCacheSizeThresholdRate = rate;
|
||||
}
|
||||
|
||||
- public String getTieredStoreFilepath() {
|
||||
- return tieredStoreFilepath;
|
||||
+ public String getTieredStoreFilePath() {
|
||||
+ return tieredStoreFilePath;
|
||||
}
|
||||
|
||||
- public void setTieredStoreFilepath(String tieredStoreFilepath) {
|
||||
- this.tieredStoreFilepath = tieredStoreFilepath;
|
||||
+ public void setTieredStoreFilePath(String tieredStoreFilePath) {
|
||||
+ this.tieredStoreFilePath = tieredStoreFilePath;
|
||||
}
|
||||
|
||||
public void setObjectStoreEndpoint(String objectStoreEndpoint) {
|
||||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
|
||||
index 708ce33f9..7e949cb28 100644
|
||||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
|
||||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
|
||||
@@ -66,8 +66,8 @@ public class PosixFileSegment extends TieredFileSegment {
|
||||
super(storeConfig, fileType, filePath, baseOffset);
|
||||
|
||||
// basePath
|
||||
- String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilepath(),
|
||||
- StringUtils.appendIfMissing(storeConfig.getTieredStoreFilepath(), File.separator));
|
||||
+ String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilePath(),
|
||||
+ StringUtils.appendIfMissing(storeConfig.getTieredStoreFilePath(), File.separator));
|
||||
|
||||
// fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset
|
||||
String brokerClusterName = storeConfig.getBrokerClusterName();
|
||||
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
|
||||
index 80cdba977..6693d3cb7 100644
|
||||
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
|
||||
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
|
||||
@@ -49,7 +49,7 @@ public class TieredCommitLogTest {
|
||||
TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
|
||||
storeConfig.setBrokerName("brokerName");
|
||||
storeConfig.setStorePathRootDir(storePath);
|
||||
- storeConfig.setTieredStoreFilepath(storePath + File.separator);
|
||||
+ storeConfig.setTieredStoreFilePath(storePath + File.separator);
|
||||
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
|
||||
storeConfig.setCommitLogRollingInterval(0);
|
||||
storeConfig.setTieredStoreCommitLogMaxSize(1000);
|
||||
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
|
||||
index ede62b8ce..db33ae847 100644
|
||||
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
|
||||
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
|
||||
@@ -42,7 +42,7 @@ public class PosixFileSegmentTest {
|
||||
@Before
|
||||
public void setUp() {
|
||||
storeConfig = new TieredMessageStoreConfig();
|
||||
- storeConfig.setTieredStoreFilepath(storePath);
|
||||
+ storeConfig.setTieredStoreFilePath(storePath);
|
||||
mq = new MessageQueue("OSSFileSegmentTest", "broker", 0);
|
||||
TieredStoreExecutor.init();
|
||||
}
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 651a5ca992988b90c7e4884e9975db0938557def Mon Sep 17 00:00:00 2001
|
||||
From: Jixiang Jin <lollipop@apache.org>
|
||||
Date: Thu, 16 Nov 2023 10:16:16 +0800
|
||||
Subject: [PATCH 3/5] [ISSUE #7562] BugFix for estimating message accumulation
|
||||
correctly (#7563)
|
||||
|
||||
---
|
||||
.../broker/metrics/ConsumerLagCalculator.java | 11 +++++---
|
||||
.../proxy/common/utils/FilterUtilTest.java | 25 +++++++++++++++++++
|
||||
.../remoting/protocol/filter/FilterAPI.java | 8 ++++++
|
||||
3 files changed, 40 insertions(+), 4 deletions(-)
|
||||
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
|
||||
index 7a5f1f765..af08a83c7 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
|
||||
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.constant.PermName;
|
||||
import org.apache.rocketmq.common.filter.ExpressionType;
|
||||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||||
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
|
||||
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
|
||||
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
|
||||
import org.apache.rocketmq.remoting.protocol.subscription.SimpleSubscriptionData;
|
||||
@@ -435,10 +436,12 @@ public class ConsumerLagCalculator {
|
||||
if (subscriptionGroupConfig != null) {
|
||||
for (SimpleSubscriptionData simpleSubscriptionData : subscriptionGroupConfig.getSubscriptionDataSet()) {
|
||||
if (topic.equals(simpleSubscriptionData.getTopic())) {
|
||||
- subscriptionData = new SubscriptionData();
|
||||
- subscriptionData.setTopic(simpleSubscriptionData.getTopic());
|
||||
- subscriptionData.setExpressionType(simpleSubscriptionData.getExpressionType());
|
||||
- subscriptionData.setSubString(simpleSubscriptionData.getExpression());
|
||||
+ try {
|
||||
+ subscriptionData = FilterAPI.buildSubscriptionData(simpleSubscriptionData.getTopic(),
|
||||
+ simpleSubscriptionData.getExpression(), simpleSubscriptionData.getExpressionType());
|
||||
+ } catch (Exception e) {
|
||||
+ LOGGER.error("Try to build subscription for group:{}, topic:{} exception.", group, topic, e);
|
||||
+ }
|
||||
break;
|
||||
}
|
||||
}
|
||||
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
|
||||
index 23389e9d3..7c9d84015 100644
|
||||
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
|
||||
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
|
||||
@@ -48,4 +48,29 @@ public class FilterUtilTest {
|
||||
assertThat(FilterUtils.isTagMatched(subscriptionData.getTagsSet(), null)).isFalse();
|
||||
}
|
||||
|
||||
+ @Test
|
||||
+ public void testBuildSubscriptionData() throws Exception {
|
||||
+ // Test case 1: expressionType is null, will use TAG as default.
|
||||
+ String topic = "topic";
|
||||
+ String subString = "substring";
|
||||
+ String expressionType = null;
|
||||
+ SubscriptionData result = FilterAPI.buildSubscriptionData(topic, subString, expressionType);
|
||||
+ assertThat(result).isNotNull();
|
||||
+ assertThat(topic).isEqualTo(result.getTopic());
|
||||
+ assertThat(subString).isEqualTo(result.getSubString());
|
||||
+ assertThat(result.getExpressionType()).isEqualTo("TAG");
|
||||
+ assertThat(result.getCodeSet().size()).isEqualTo(1);
|
||||
+
|
||||
+ // Test case 2: expressionType is not null
|
||||
+ topic = "topic";
|
||||
+ subString = "substring1||substring2";
|
||||
+ expressionType = "SQL92";
|
||||
+ result = FilterAPI.buildSubscriptionData(topic, subString, expressionType);
|
||||
+ assertThat(result).isNotNull();
|
||||
+ assertThat(topic).isEqualTo(result.getTopic());
|
||||
+ assertThat(subString).isEqualTo(result.getSubString());
|
||||
+ assertThat(result.getExpressionType()).isEqualTo(expressionType);
|
||||
+ assertThat(result.getCodeSet().size()).isEqualTo(2);
|
||||
+ }
|
||||
+
|
||||
}
|
||||
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
|
||||
index 10a6bb463..f291bfccf 100644
|
||||
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
|
||||
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
|
||||
@@ -46,6 +46,14 @@ public class FilterAPI {
|
||||
return subscriptionData;
|
||||
}
|
||||
|
||||
+ public static SubscriptionData buildSubscriptionData(String topic, String subString, String expressionType) throws Exception {
|
||||
+ final SubscriptionData subscriptionData = buildSubscriptionData(topic, subString);
|
||||
+ if (StringUtils.isNotBlank(expressionType)) {
|
||||
+ subscriptionData.setExpressionType(expressionType);
|
||||
+ }
|
||||
+ return subscriptionData;
|
||||
+ }
|
||||
+
|
||||
public static SubscriptionData build(final String topic, final String subString,
|
||||
final String type) throws Exception {
|
||||
if (ExpressionType.TAG.equals(type) || type == null) {
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 01a2aef96bdfb17c5f82415141ef421efb4e3bc7 Mon Sep 17 00:00:00 2001
|
||||
From: cnScarb <jjhfen00@163.com>
|
||||
Date: Fri, 17 Nov 2023 15:58:14 +0800
|
||||
Subject: [PATCH 4/5] [ISSUE #7570] Add default value for lastPopTimestamp
|
||||
(#7571)
|
||||
|
||||
---
|
||||
.../apache/rocketmq/client/impl/consumer/PopProcessQueue.java | 2 +-
|
||||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||||
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
|
||||
index 3b39b86cc..50827545b 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
|
||||
@@ -26,7 +26,7 @@ public class PopProcessQueue {
|
||||
|
||||
private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
|
||||
|
||||
- private long lastPopTimestamp;
|
||||
+ private long lastPopTimestamp = System.currentTimeMillis();
|
||||
private AtomicInteger waitAckCounter = new AtomicInteger(0);
|
||||
private volatile boolean dropped = false;
|
||||
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 8e7e2b5f50e0db14b77462ef1574d4020c0fd986 Mon Sep 17 00:00:00 2001
|
||||
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
|
||||
Date: Mon, 20 Nov 2023 19:32:57 +0800
|
||||
Subject: [PATCH 5/5] [ISSUE #7574] Fix RunningFlags conflict
|
||||
|
||||
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
|
||||
---
|
||||
store/src/main/java/org/apache/rocketmq/store/RunningFlags.java | 2 +-
|
||||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||||
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
|
||||
index 91fcb155a..88b398a77 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
|
||||
@@ -30,7 +30,7 @@ public class RunningFlags {
|
||||
|
||||
private static final int FENCED_BIT = 1 << 5;
|
||||
|
||||
- private static final int LOGIC_DISK_FULL_BIT = 1 << 5;
|
||||
+ private static final int LOGIC_DISK_FULL_BIT = 1 << 6;
|
||||
|
||||
private volatile int flagBits = 0;
|
||||
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
3173
patch036-backport-RIP65.patch
Normal file
3173
patch036-backport-RIP65.patch
Normal file
File diff suppressed because it is too large
Load Diff
845
patch037-backport-Retry-topic-v2-in-pop.patch
Normal file
845
patch037-backport-Retry-topic-v2-in-pop.patch
Normal file
@ -0,0 +1,845 @@
|
||||
From ca721b0145994d7f5e67b4d2fe3b7a4ad7a1c132 Mon Sep 17 00:00:00 2001
|
||||
From: zhanghong <985492783@qq.com>
|
||||
Date: Tue, 21 Nov 2023 14:03:24 +0800
|
||||
Subject: [PATCH 1/3] [ISSUE #7462] Remove deprecated LocalTransactionExecuter
|
||||
(#7463)
|
||||
|
||||
---
|
||||
.../impl/producer/DefaultMQProducerImpl.java | 9 +++----
|
||||
.../client/producer/DefaultMQProducer.java | 16 -----------
|
||||
.../producer/LocalTransactionExecuter.java | 27 -------------------
|
||||
.../rocketmq/client/producer/MQProducer.java | 3 ---
|
||||
.../producer/TransactionMQProducer.java | 16 -----------
|
||||
.../client.producer.DefaultMQProducer.schema | 1 -
|
||||
6 files changed, 4 insertions(+), 68 deletions(-)
|
||||
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
|
||||
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
||||
index b0c212e46..545f17d93 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
||||
@@ -54,7 +54,6 @@ import org.apache.rocketmq.client.latency.MQFaultStrategy;
|
||||
import org.apache.rocketmq.client.latency.Resolver;
|
||||
import org.apache.rocketmq.client.latency.ServiceDetector;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
-import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionState;
|
||||
import org.apache.rocketmq.client.producer.MessageQueueSelector;
|
||||
import org.apache.rocketmq.client.producer.RequestCallback;
|
||||
@@ -1379,10 +1378,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
||||
}
|
||||
|
||||
public TransactionSendResult sendMessageInTransaction(final Message msg,
|
||||
- final LocalTransactionExecuter localTransactionExecuter, final Object arg)
|
||||
+ final TransactionListener localTransactionListener, final Object arg)
|
||||
throws MQClientException {
|
||||
TransactionListener transactionListener = getCheckListener();
|
||||
- if (null == localTransactionExecuter && null == transactionListener) {
|
||||
+ if (null == localTransactionListener && null == transactionListener) {
|
||||
throw new MQClientException("tranExecutor is null", null);
|
||||
}
|
||||
|
||||
@@ -1414,8 +1413,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
||||
if (null != transactionId && !"".equals(transactionId)) {
|
||||
msg.setTransactionId(transactionId);
|
||||
}
|
||||
- if (null != localTransactionExecuter) {
|
||||
- localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
|
||||
+ if (null != localTransactionListener) {
|
||||
+ localTransactionState = localTransactionListener.executeLocalTransaction(msg, arg);
|
||||
} else {
|
||||
log.debug("Used new transaction API");
|
||||
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
|
||||
index c5b1b5223..7bd3876f5 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
|
||||
@@ -853,22 +853,6 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
|
||||
this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
|
||||
}
|
||||
|
||||
- /**
|
||||
- * This method is to send transactional messages.
|
||||
- *
|
||||
- * @param msg Transactional message to send.
|
||||
- * @param tranExecuter local transaction executor.
|
||||
- * @param arg Argument used along with local transaction executor.
|
||||
- * @return Transaction result.
|
||||
- * @throws MQClientException if there is any client error.
|
||||
- */
|
||||
- @Override
|
||||
- public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter,
|
||||
- final Object arg)
|
||||
- throws MQClientException {
|
||||
- throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
|
||||
- }
|
||||
-
|
||||
/**
|
||||
* This method is used to send transactional messages.
|
||||
*
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
|
||||
deleted file mode 100644
|
||||
index 267ba10bd..000000000
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
|
||||
+++ /dev/null
|
||||
@@ -1,27 +0,0 @@
|
||||
-/*
|
||||
- * Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
- * contributor license agreements. See the NOTICE file distributed with
|
||||
- * this work for additional information regarding copyright ownership.
|
||||
- * The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
- * (the "License"); you may not use this file except in compliance with
|
||||
- * the License. You may obtain a copy of the License at
|
||||
- *
|
||||
- * http://www.apache.org/licenses/LICENSE-2.0
|
||||
- *
|
||||
- * Unless required by applicable law or agreed to in writing, software
|
||||
- * distributed under the License is distributed on an "AS IS" BASIS,
|
||||
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
- * See the License for the specific language governing permissions and
|
||||
- * limitations under the License.
|
||||
- */
|
||||
-package org.apache.rocketmq.client.producer;
|
||||
-
|
||||
-import org.apache.rocketmq.common.message.Message;
|
||||
-
|
||||
-/**
|
||||
- * @deprecated This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
|
||||
- */
|
||||
-@Deprecated
|
||||
-public interface LocalTransactionExecuter {
|
||||
- LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
|
||||
-}
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
|
||||
index 78657e623..8bd30e98d 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
|
||||
@@ -81,9 +81,6 @@ public interface MQProducer extends MQAdmin {
|
||||
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
|
||||
throws MQClientException, RemotingException, InterruptedException;
|
||||
|
||||
- TransactionSendResult sendMessageInTransaction(final Message msg,
|
||||
- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
|
||||
-
|
||||
TransactionSendResult sendMessageInTransaction(final Message msg,
|
||||
final Object arg) throws MQClientException;
|
||||
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
|
||||
index baa8b4408..d529f3e77 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
|
||||
@@ -67,22 +67,6 @@ public class TransactionMQProducer extends DefaultMQProducer {
|
||||
this.defaultMQProducerImpl.destroyTransactionEnv();
|
||||
}
|
||||
|
||||
- /**
|
||||
- * This method will be removed in the version 5.0.0, method <code>sendMessageInTransaction(Message,Object)</code>}
|
||||
- * is recommended.
|
||||
- */
|
||||
- @Override
|
||||
- @Deprecated
|
||||
- public TransactionSendResult sendMessageInTransaction(final Message msg,
|
||||
- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
|
||||
- if (null == this.transactionCheckListener) {
|
||||
- throw new MQClientException("localTransactionBranchCheckListener is null", null);
|
||||
- }
|
||||
-
|
||||
- msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
|
||||
- return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
|
||||
- }
|
||||
-
|
||||
@Override
|
||||
public TransactionSendResult sendMessageInTransaction(final Message msg,
|
||||
final Object arg) throws MQClientException {
|
||||
diff --git a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema
|
||||
index 0418c73fe..d1111fb45 100644
|
||||
--- a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema
|
||||
+++ b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema
|
||||
@@ -122,7 +122,6 @@ Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq
|
||||
Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (void)
|
||||
Method send(org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.SendResult)
|
||||
Method send(org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (org.apache.rocketmq.client.producer.SendResult)
|
||||
-Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.client.producer.LocalTransactionExecuter,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult)
|
||||
Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult)
|
||||
Method sendOneway(java.lang.Object,org.apache.rocketmq.client.producer.MessageQueueSelector,org.apache.rocketmq.common.message.Message) : public throws (void)
|
||||
Method sendOneway(org.apache.rocketmq.common.message.Message) : public throws (void)
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From a7d493b2fbc153cc6cbdf2b2ffcbf19cf7cba803 Mon Sep 17 00:00:00 2001
|
||||
From: panzhi <panzhi33@qq.com>
|
||||
Date: Tue, 21 Nov 2023 20:55:35 +0800
|
||||
Subject: [PATCH 2/3] transactionProducer get the topic route before sending
|
||||
the message (#7569)
|
||||
|
||||
---
|
||||
.../impl/producer/DefaultMQProducerImpl.java | 15 +++++
|
||||
.../client/producer/DefaultMQProducer.java | 63 +++++++++++++++++++
|
||||
.../producer/TransactionMQProducer.java | 23 +++++--
|
||||
.../transaction/TransactionProducer.java | 3 +-
|
||||
4 files changed, 98 insertions(+), 6 deletions(-)
|
||||
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
||||
index 545f17d93..088bff089 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
||||
@@ -262,6 +262,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
||||
mQClientFactory.start();
|
||||
}
|
||||
|
||||
+ this.initTopicRoute();
|
||||
+
|
||||
this.mqFaultStrategy.startDetector();
|
||||
|
||||
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
|
||||
@@ -1740,6 +1742,19 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
||||
}
|
||||
}
|
||||
|
||||
+ private void initTopicRoute() {
|
||||
+ List<String> topics = this.defaultMQProducer.getTopics();
|
||||
+ if (topics != null && topics.size() > 0) {
|
||||
+ topics.forEach(topic -> {
|
||||
+ String newTopic = NamespaceUtil.wrapNamespace(this.defaultMQProducer.getNamespace(), topic);
|
||||
+ TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(newTopic);
|
||||
+ if (topicPublishInfo == null || !topicPublishInfo.ok()) {
|
||||
+ log.warn("No route info of this topic: " + newTopic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO));
|
||||
+ }
|
||||
+ });
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
|
||||
return topicPublishInfoTable;
|
||||
}
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
|
||||
index 7bd3876f5..700e00aac 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
|
||||
@@ -87,6 +87,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
|
||||
*/
|
||||
private String producerGroup;
|
||||
|
||||
+ /**
|
||||
+ * Topics that need to be initialized for transaction producer
|
||||
+ */
|
||||
+ private List<String> topics;
|
||||
+
|
||||
/**
|
||||
* Just for testing or demo program
|
||||
*/
|
||||
@@ -235,6 +240,22 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
|
||||
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
|
||||
}
|
||||
|
||||
+ /**
|
||||
+ * Constructor specifying namespace, producer group, topics and RPC hook.
|
||||
+ *
|
||||
+ * @param namespace Namespace for this MQ Producer instance.
|
||||
+ * @param producerGroup Producer group, see the name-sake field.
|
||||
+ * @param topics Topic that needs to be initialized for routing
|
||||
+ * @param rpcHook RPC hook to execute per each remoting command execution.
|
||||
+ */
|
||||
+ public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) {
|
||||
+ this.namespace = namespace;
|
||||
+ this.producerGroup = producerGroup;
|
||||
+ this.topics = topics;
|
||||
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
|
||||
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
|
||||
+ }
|
||||
+
|
||||
/**
|
||||
* Constructor specifying producer group and enabled msg trace flag.
|
||||
*
|
||||
@@ -290,6 +311,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
|
||||
}
|
||||
}
|
||||
|
||||
+ /**
|
||||
+ * Constructor specifying namespace, producer group, topics, RPC hook, enabled msgTrace flag and customized trace topic
|
||||
+ * name.
|
||||
+ *
|
||||
+ * @param namespace Namespace for this MQ Producer instance.
|
||||
+ * @param producerGroup Producer group, see the name-sake field.
|
||||
+ * @param topics Topic that needs to be initialized for routing
|
||||
+ * @param rpcHook RPC hook to execute per each remoting command execution.
|
||||
+ * @param enableMsgTrace Switch flag instance for message trace.
|
||||
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
|
||||
+ * trace topic name.
|
||||
+ */
|
||||
+ public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics,
|
||||
+ RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
|
||||
+ this.namespace = namespace;
|
||||
+ this.producerGroup = producerGroup;
|
||||
+ this.topics = topics;
|
||||
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
|
||||
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
|
||||
+ //if client open the message trace feature
|
||||
+ if (enableMsgTrace) {
|
||||
+ try {
|
||||
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
|
||||
+ dispatcher.setHostProducer(this.defaultMQProducerImpl);
|
||||
+ traceDispatcher = dispatcher;
|
||||
+ this.defaultMQProducerImpl.registerSendMessageHook(
|
||||
+ new SendMessageTraceHookImpl(traceDispatcher));
|
||||
+ this.defaultMQProducerImpl.registerEndTransactionHook(
|
||||
+ new EndTransactionTraceHookImpl(traceDispatcher));
|
||||
+ } catch (Throwable e) {
|
||||
+ logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
@Override
|
||||
public void setUseTLS(boolean useTLS) {
|
||||
super.setUseTLS(useTLS);
|
||||
@@ -1316,4 +1372,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
|
||||
defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize);
|
||||
}
|
||||
|
||||
+ public List<String> getTopics() {
|
||||
+ return topics;
|
||||
+ }
|
||||
+
|
||||
+ public void setTopics(List<String> topics) {
|
||||
+ this.topics = topics;
|
||||
+ }
|
||||
}
|
||||
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
|
||||
index d529f3e77..2c3b479f7 100644
|
||||
--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
|
||||
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
|
||||
@@ -16,6 +16,7 @@
|
||||
*/
|
||||
package org.apache.rocketmq.client.producer;
|
||||
|
||||
+import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
@@ -36,19 +37,31 @@ public class TransactionMQProducer extends DefaultMQProducer {
|
||||
}
|
||||
|
||||
public TransactionMQProducer(final String producerGroup) {
|
||||
- this(null, producerGroup, null);
|
||||
+ this(null, producerGroup, null, null);
|
||||
+ }
|
||||
+
|
||||
+ public TransactionMQProducer(final String producerGroup, final List<String> topics) {
|
||||
+ this(null, producerGroup, topics, null);
|
||||
}
|
||||
|
||||
public TransactionMQProducer(final String namespace, final String producerGroup) {
|
||||
- this(namespace, producerGroup, null);
|
||||
+ this(namespace, producerGroup, null, null);
|
||||
+ }
|
||||
+
|
||||
+ public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics) {
|
||||
+ this(namespace, producerGroup, topics, null);
|
||||
}
|
||||
|
||||
public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
|
||||
- this(null, producerGroup, rpcHook);
|
||||
+ this(null, producerGroup, null, rpcHook);
|
||||
+ }
|
||||
+
|
||||
+ public TransactionMQProducer(final String producerGroup, final List<String> topics, RPCHook rpcHook) {
|
||||
+ this(null, producerGroup, topics, rpcHook);
|
||||
}
|
||||
|
||||
- public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
|
||||
- super(namespace, producerGroup, rpcHook);
|
||||
+ public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) {
|
||||
+ super(namespace, producerGroup, topics, rpcHook);
|
||||
}
|
||||
|
||||
public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
|
||||
diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
|
||||
index 5973c3c30..d1d57c55e 100644
|
||||
--- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
|
||||
+++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
|
||||
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.remoting.common.RemotingHelper;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
+import java.util.Arrays;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
@@ -39,7 +40,7 @@ public class TransactionProducer {
|
||||
|
||||
public static void main(String[] args) throws MQClientException, InterruptedException {
|
||||
TransactionListener transactionListener = new TransactionListenerImpl();
|
||||
- TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
|
||||
+ TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC));
|
||||
|
||||
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
|
||||
// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 5b43387be33506e4c19df4783724d06b1dfdc062 Mon Sep 17 00:00:00 2001
|
||||
From: Zhouxiang Zhan <zhouxzhan@apache.org>
|
||||
Date: Thu, 23 Nov 2023 14:53:48 +0800
|
||||
Subject: [PATCH 3/3] [ISSUE #7543] Retry topic v2 in pop (#7544)
|
||||
|
||||
* Implement pop retry topic v2
|
||||
|
||||
* Use pop retry topic v2 to notify the origin topic
|
||||
|
||||
* add parse group
|
||||
|
||||
* retry topic v2 compatibility
|
||||
|
||||
* calculate consumer lag
|
||||
|
||||
* delete retry topic
|
||||
---
|
||||
.../acl/plain/PlainAccessResource.java | 3 +-
|
||||
.../ExpressionForRetryMessageFilter.java | 3 +-
|
||||
.../NotifyMessageArrivingListener.java | 3 +-
|
||||
.../longpolling/PopLongPollingService.java | 10 +++
|
||||
.../broker/metrics/ConsumerLagCalculator.java | 11 ++++
|
||||
.../processor/AdminBrokerProcessor.java | 4 ++
|
||||
.../processor/NotificationProcessor.java | 2 +-
|
||||
.../broker/processor/PopMessageProcessor.java | 24 ++++++-
|
||||
.../broker/processor/PopReviveService.java | 9 ---
|
||||
.../processor/SendMessageProcessor.java | 3 +-
|
||||
.../apache/rocketmq/common/BrokerConfig.java | 10 +++
|
||||
.../apache/rocketmq/common/KeyBuilder.java | 37 +++++++++--
|
||||
.../rocketmq/common/KeyBuilderTest.java | 65 +++++++++++++++++++
|
||||
.../consumer/ConsumerProgressSubCommand.java | 3 +-
|
||||
.../tools/monitor/MonitorService.java | 3 +-
|
||||
15 files changed, 168 insertions(+), 22 deletions(-)
|
||||
create mode 100644 common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
|
||||
|
||||
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
|
||||
index 72aa8ca71..1e185afff 100644
|
||||
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
|
||||
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
|
||||
@@ -48,6 +48,7 @@ import org.apache.rocketmq.acl.common.AuthenticationHeader;
|
||||
import org.apache.rocketmq.acl.common.AuthorizationHeader;
|
||||
import org.apache.rocketmq.acl.common.Permission;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
+import org.apache.rocketmq.common.KeyBuilder;
|
||||
import org.apache.rocketmq.common.MQVersion;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.PlainAccessConfig;
|
||||
@@ -341,7 +342,7 @@ public class PlainAccessResource implements AccessResource {
|
||||
if (retryTopic == null) {
|
||||
return null;
|
||||
}
|
||||
- return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ return KeyBuilder.parseGroup(retryTopic);
|
||||
}
|
||||
|
||||
public static String getRetryTopic(String group) {
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
|
||||
index bc01b21cb..cc3e37bf4 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
|
||||
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.filter;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
+import org.apache.rocketmq.common.KeyBuilder;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.filter.ExpressionType;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
@@ -62,7 +63,7 @@ public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
|
||||
tempProperties = MessageDecoder.decodeProperties(msgBuffer);
|
||||
}
|
||||
String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC);
|
||||
- String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ String group = KeyBuilder.parseGroup(subscriptionData.getTopic());
|
||||
realFilterData = this.consumerFilterManager.get(realTopic, group);
|
||||
}
|
||||
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
|
||||
index 3c099fe2f..e55ed2778 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
|
||||
@@ -17,12 +17,11 @@
|
||||
|
||||
package org.apache.rocketmq.broker.longpolling;
|
||||
|
||||
+import java.util.Map;
|
||||
import org.apache.rocketmq.broker.processor.NotificationProcessor;
|
||||
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
|
||||
import org.apache.rocketmq.store.MessageArrivingListener;
|
||||
|
||||
-import java.util.Map;
|
||||
-
|
||||
public class NotifyMessageArrivingListener implements MessageArrivingListener {
|
||||
private final PullRequestHoldService pullRequestHoldService;
|
||||
private final PopMessageProcessor popMessageProcessor;
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
|
||||
index 113c91297..f1bc9adc4 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
|
||||
@@ -144,6 +144,16 @@ public class PopLongPollingService extends ServiceThread {
|
||||
}
|
||||
}
|
||||
|
||||
+ public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId) {
|
||||
+ String notifyTopic;
|
||||
+ if (KeyBuilder.isPopRetryTopicV2(topic)) {
|
||||
+ notifyTopic = KeyBuilder.parseNormalTopic(topic);
|
||||
+ } else {
|
||||
+ notifyTopic = topic;
|
||||
+ }
|
||||
+ notifyMessageArriving(notifyTopic, queueId);
|
||||
+ }
|
||||
+
|
||||
public void notifyMessageArriving(final String topic, final int queueId) {
|
||||
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
|
||||
if (cids == null) {
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
|
||||
index af08a83c7..d1f3fffde 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
|
||||
@@ -185,6 +185,17 @@ public class ConsumerLagCalculator {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
+ if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
|
||||
+ String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
|
||||
+ TopicConfig retryTopicConfigV1 = topicConfigManager.selectTopicConfig(retryTopicV1);
|
||||
+ if (retryTopicConfigV1 != null) {
|
||||
+ int retryTopicPerm = retryTopicConfigV1.getPerm() & brokerConfig.getBrokerPermission();
|
||||
+ if (PermName.isReadable(retryTopicPerm) || PermName.isWriteable(retryTopicPerm)) {
|
||||
+ consumer.accept(new ProcessGroupInfo(group, topic, true, retryTopicV1));
|
||||
+ continue;
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
consumer.accept(new ProcessGroupInfo(group, topic, true, null));
|
||||
} else {
|
||||
consumer.accept(new ProcessGroupInfo(group, topic, false, null));
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||||
index fbba6633b..863b275d1 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||||
@@ -548,6 +548,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||||
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) {
|
||||
deleteTopicInBroker(popRetryTopic);
|
||||
}
|
||||
+ final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
|
||||
+ if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) {
|
||||
+ deleteTopicInBroker(popRetryTopicV1);
|
||||
+ }
|
||||
}
|
||||
// delete topic
|
||||
deleteTopicInBroker(topic);
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
|
||||
index a15340383..91d275dfe 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
|
||||
@@ -58,7 +58,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
|
||||
}
|
||||
|
||||
public void notifyMessageArriving(final String topic, final int queueId) {
|
||||
- popLongPollingService.notifyMessageArriving(topic, queueId);
|
||||
+ popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId);
|
||||
}
|
||||
|
||||
@Override
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
|
||||
index 7ed4d53ab..58baecc05 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
|
||||
@@ -185,7 +185,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
|
||||
}
|
||||
|
||||
public void notifyMessageArriving(final String topic, final int queueId) {
|
||||
- popLongPollingService.notifyMessageArriving(topic, queueId);
|
||||
+ popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId);
|
||||
}
|
||||
|
||||
public boolean notifyMessageArriving(final String topic, final String cid, final int queueId) {
|
||||
@@ -364,6 +364,17 @@ public class PopMessageProcessor implements NettyRequestProcessor {
|
||||
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
|
||||
}
|
||||
}
|
||||
+ if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
|
||||
+ TopicConfig retryTopicConfigV1 =
|
||||
+ this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
|
||||
+ if (retryTopicConfigV1 != null) {
|
||||
+ for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) {
|
||||
+ int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums();
|
||||
+ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
|
||||
+ startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
}
|
||||
if (requestHeader.getQueueId() < 0) {
|
||||
// read all queue
|
||||
@@ -388,6 +399,17 @@ public class PopMessageProcessor implements NettyRequestProcessor {
|
||||
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
|
||||
}
|
||||
}
|
||||
+ if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
|
||||
+ TopicConfig retryTopicConfigV1 =
|
||||
+ this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
|
||||
+ if (retryTopicConfigV1 != null) {
|
||||
+ for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) {
|
||||
+ int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums();
|
||||
+ getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
|
||||
+ startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
}
|
||||
|
||||
final RemotingCommand finalResponse = response;
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
|
||||
index 3fb689ed6..8d25bc57e 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
|
||||
@@ -142,15 +142,6 @@ public class PopReviveService extends ServiceThread {
|
||||
this.brokerController.getBrokerStatsManager().incBrokerPutNums(popCheckPoint.getTopic(), 1);
|
||||
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
|
||||
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
|
||||
- if (brokerController.getPopMessageProcessor() != null) {
|
||||
- brokerController.getPopMessageProcessor().notifyMessageArriving(
|
||||
- KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
|
||||
- popCheckPoint.getCId(),
|
||||
- -1
|
||||
- );
|
||||
- brokerController.getNotificationProcessor().notifyMessageArriving(
|
||||
- KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
|
||||
- }
|
||||
return true;
|
||||
}
|
||||
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
|
||||
index 956ef43fb..4ec84c146 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
|
||||
@@ -28,6 +28,7 @@ import org.apache.rocketmq.broker.BrokerController;
|
||||
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
|
||||
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
|
||||
import org.apache.rocketmq.common.AbortProcessException;
|
||||
+import org.apache.rocketmq.common.KeyBuilder;
|
||||
import org.apache.rocketmq.common.MQVersion;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.TopicConfig;
|
||||
@@ -178,7 +179,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
|
||||
MessageExt msg, TopicConfig topicConfig, Map<String, String> properties) {
|
||||
String newTopic = requestHeader.getTopic();
|
||||
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
|
||||
- String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ String groupName = KeyBuilder.parseGroup(newTopic);
|
||||
SubscriptionGroupConfig subscriptionGroupConfig =
|
||||
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
|
||||
if (null == subscriptionGroupConfig) {
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||||
index 0d248c4e1..c186352d1 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||||
@@ -223,6 +223,8 @@ public class BrokerConfig extends BrokerIdentity {
|
||||
private boolean enablePopBatchAck = false;
|
||||
private boolean enableNotifyAfterPopOrderLockRelease = true;
|
||||
private boolean initPopOffsetByCheckMsgInMem = true;
|
||||
+ // read message from pop retry topic v1, for the compatibility, will be removed in the future version
|
||||
+ private boolean retrieveMessageFromPopRetryTopicV1 = true;
|
||||
|
||||
private boolean realTimeNotifyConsumerChange = true;
|
||||
|
||||
@@ -1284,6 +1286,14 @@ public class BrokerConfig extends BrokerIdentity {
|
||||
this.initPopOffsetByCheckMsgInMem = initPopOffsetByCheckMsgInMem;
|
||||
}
|
||||
|
||||
+ public boolean isRetrieveMessageFromPopRetryTopicV1() {
|
||||
+ return retrieveMessageFromPopRetryTopicV1;
|
||||
+ }
|
||||
+
|
||||
+ public void setRetrieveMessageFromPopRetryTopicV1(boolean retrieveMessageFromPopRetryTopicV1) {
|
||||
+ this.retrieveMessageFromPopRetryTopicV1 = retrieveMessageFromPopRetryTopicV1;
|
||||
+ }
|
||||
+
|
||||
public boolean isRealTimeNotifyConsumerChange() {
|
||||
return realTimeNotifyConsumerChange;
|
||||
}
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
|
||||
index e1532d939..f2a8c4089 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
|
||||
@@ -18,24 +18,53 @@ package org.apache.rocketmq.common;
|
||||
|
||||
public class KeyBuilder {
|
||||
public static final int POP_ORDER_REVIVE_QUEUE = 999;
|
||||
+ private static final String POP_RETRY_SEPARATOR_V1 = "_";
|
||||
+ private static final String POP_RETRY_SEPARATOR_V2 = ":";
|
||||
|
||||
public static String buildPopRetryTopic(String topic, String cid) {
|
||||
- return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + topic;
|
||||
+ return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2 + topic;
|
||||
+ }
|
||||
+
|
||||
+ public static String buildPopRetryTopicV1(String topic, String cid) {
|
||||
+ return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1 + topic;
|
||||
}
|
||||
|
||||
public static String parseNormalTopic(String topic, String cid) {
|
||||
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
|
||||
- return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_").length());
|
||||
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2)) {
|
||||
+ return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2).length());
|
||||
+ }
|
||||
+ return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1).length());
|
||||
} else {
|
||||
return topic;
|
||||
}
|
||||
}
|
||||
|
||||
+ public static String parseNormalTopic(String retryTopic) {
|
||||
+ if (isPopRetryTopicV2(retryTopic)) {
|
||||
+ String[] result = retryTopic.split(POP_RETRY_SEPARATOR_V2);
|
||||
+ if (result.length == 2) {
|
||||
+ return result[1];
|
||||
+ }
|
||||
+ }
|
||||
+ return retryTopic;
|
||||
+ }
|
||||
+
|
||||
+ public static String parseGroup(String retryTopic) {
|
||||
+ if (isPopRetryTopicV2(retryTopic)) {
|
||||
+ String[] result = retryTopic.split(POP_RETRY_SEPARATOR_V2);
|
||||
+ if (result.length == 2) {
|
||||
+ return result[0].substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ }
|
||||
+ }
|
||||
+ return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ }
|
||||
+
|
||||
public static String buildPollingKey(String topic, String cid, int queueId) {
|
||||
return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId;
|
||||
}
|
||||
|
||||
- public static String buildPollingNotificationKey(String topic, int queueId) {
|
||||
- return topic + PopAckConstants.SPLIT + queueId;
|
||||
+ public static boolean isPopRetryTopicV2(String retryTopic) {
|
||||
+ return retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && retryTopic.contains(POP_RETRY_SEPARATOR_V2);
|
||||
}
|
||||
}
|
||||
diff --git a/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
|
||||
new file mode 100644
|
||||
index 000000000..f83e0aa14
|
||||
--- /dev/null
|
||||
+++ b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
|
||||
@@ -0,0 +1,65 @@
|
||||
+/*
|
||||
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
+ * contributor license agreements. See the NOTICE file distributed with
|
||||
+ * this work for additional information regarding copyright ownership.
|
||||
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
+ * (the "License"); you may not use this file except in compliance with
|
||||
+ * the License. You may obtain a copy of the License at
|
||||
+ *
|
||||
+ * http://www.apache.org/licenses/LICENSE-2.0
|
||||
+ *
|
||||
+ * Unless required by applicable law or agreed to in writing, software
|
||||
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
||||
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
+ * See the License for the specific language governing permissions and
|
||||
+ * limitations under the License.
|
||||
+ */
|
||||
+
|
||||
+package org.apache.rocketmq.common;
|
||||
+
|
||||
+import org.junit.Test;
|
||||
+
|
||||
+import static org.assertj.core.api.Assertions.assertThat;
|
||||
+
|
||||
+public class KeyBuilderTest {
|
||||
+ String topic = "test-topic";
|
||||
+ String group = "test-group";
|
||||
+
|
||||
+ @Test
|
||||
+ public void buildPopRetryTopic() {
|
||||
+ assertThat(KeyBuilder.buildPopRetryTopic(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + ":" + topic);
|
||||
+ }
|
||||
+
|
||||
+ @Test
|
||||
+ public void buildPopRetryTopicV1() {
|
||||
+ assertThat(KeyBuilder.buildPopRetryTopicV1(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + "_" + topic);
|
||||
+ }
|
||||
+
|
||||
+ @Test
|
||||
+ public void parseNormalTopic() {
|
||||
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
|
||||
+ assertThat(KeyBuilder.parseNormalTopic(popRetryTopic, group)).isEqualTo(topic);
|
||||
+ String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
|
||||
+ assertThat(KeyBuilder.parseNormalTopic(popRetryTopicV1, group)).isEqualTo(topic);
|
||||
+ }
|
||||
+
|
||||
+ @Test
|
||||
+ public void testParseNormalTopic() {
|
||||
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
|
||||
+ assertThat(KeyBuilder.parseNormalTopic(popRetryTopic)).isEqualTo(topic);
|
||||
+ }
|
||||
+
|
||||
+ @Test
|
||||
+ public void parseGroup() {
|
||||
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
|
||||
+ assertThat(KeyBuilder.parseGroup(popRetryTopic)).isEqualTo(group);
|
||||
+ }
|
||||
+
|
||||
+ @Test
|
||||
+ public void isPopRetryTopicV2() {
|
||||
+ String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
|
||||
+ assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopic)).isEqualTo(true);
|
||||
+ String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
|
||||
+ assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopicV1)).isEqualTo(false);
|
||||
+ }
|
||||
+}
|
||||
\ No newline at end of file
|
||||
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
|
||||
index 97125b854..c489cad68 100644
|
||||
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
|
||||
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
|
||||
@@ -25,6 +25,7 @@ import java.util.Map;
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.Option;
|
||||
import org.apache.commons.cli.Options;
|
||||
+import org.apache.rocketmq.common.KeyBuilder;
|
||||
import org.apache.rocketmq.common.MQVersion;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.UtilAll;
|
||||
@@ -212,7 +213,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
|
||||
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
|
||||
for (String topic : topicList.getTopicList()) {
|
||||
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
|
||||
- String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ String consumerGroup = KeyBuilder.parseGroup(topic);
|
||||
try {
|
||||
ConsumeStats consumeStats = null;
|
||||
try {
|
||||
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
|
||||
index 45dc3a036..b66dfad20 100644
|
||||
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
|
||||
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
|
||||
@@ -34,6 +34,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
+import org.apache.rocketmq.common.KeyBuilder;
|
||||
import org.apache.rocketmq.common.MQVersion;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
||||
@@ -172,7 +173,7 @@ public class MonitorService {
|
||||
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
|
||||
for (String topic : topicList.getTopicList()) {
|
||||
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
|
||||
- String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
|
||||
+ String consumerGroup = KeyBuilder.parseGroup(topic);
|
||||
|
||||
try {
|
||||
this.reportUndoneMsgs(consumerGroup);
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
@ -0,0 +1,31 @@
|
||||
From e955e4399ceed5b5a1fbadc400883cfc5f99e726 Mon Sep 17 00:00:00 2001
|
||||
From: AYue <40812847+AYue-94@users.noreply.github.com>
|
||||
Date: Fri, 24 Nov 2023 10:47:08 +0800
|
||||
Subject: [PATCH] [ISSUE #7577] SlaveActingMaster Timer Message retry without
|
||||
escape logic (#7578)
|
||||
|
||||
Co-authored-by: ayue <ericyu0421@163.com>
|
||||
---
|
||||
.../org/apache/rocketmq/store/timer/TimerMessageStore.java | 6 +++++-
|
||||
1 file changed, 5 insertions(+), 1 deletion(-)
|
||||
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
||||
index 3ab51a26d..d796e4467 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
||||
@@ -1105,7 +1105,11 @@ public class TimerMessageStore {
|
||||
}
|
||||
}
|
||||
Thread.sleep(50);
|
||||
- putMessageResult = messageStore.putMessage(message);
|
||||
+ if (escapeBridgeHook != null) {
|
||||
+ putMessageResult = escapeBridgeHook.apply(message);
|
||||
+ } else {
|
||||
+ putMessageResult = messageStore.putMessage(message);
|
||||
+ }
|
||||
LOGGER.warn("Retrying to do put timer msg retryNum:{} putRes:{} msg:{}", retryNum, putMessageResult, message);
|
||||
}
|
||||
return PUT_NO_RETRY;
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
562
patch039-backport-add-some-validations.patch
Normal file
562
patch039-backport-add-some-validations.patch
Normal file
@ -0,0 +1,562 @@
|
||||
From 9cfe724e6a188ea444c90ee00f2453da1b807bfa Mon Sep 17 00:00:00 2001
|
||||
From: dinglei <libya_003@163.com>
|
||||
Date: Tue, 28 Nov 2023 10:04:17 +0800
|
||||
Subject: [PATCH 1/3] Add validation in broker/namesrv configure updating
|
||||
command (#7584)
|
||||
|
||||
* Add validation for keys in black list in mqadmin command.
|
||||
|
||||
* Cancel validation for keys in black list in putKV command.
|
||||
---
|
||||
.../processor/AdminBrokerProcessor.java | 25 ++++++++++++++--
|
||||
.../processor/AdminBrokerProcessorTest.java | 12 +++++++-
|
||||
.../apache/rocketmq/common/BrokerConfig.java | 11 +++++++
|
||||
.../rocketmq/common/ControllerConfig.java | 11 +++++++
|
||||
.../common/namesrv/NamesrvConfig.java | 10 +++++++
|
||||
.../processor/ControllerRequestProcessor.java | 27 +++++++++++++----
|
||||
.../ControllerRequestProcessorTest.java | 23 +++++++++++++-
|
||||
.../processor/DefaultRequestProcessor.java | 30 +++++++++++++++++--
|
||||
.../processor/RequestProcessorTest.java | 15 ++++++++--
|
||||
9 files changed, 149 insertions(+), 15 deletions(-)
|
||||
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||||
index 863b275d1..978c2e81d 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||||
@@ -25,6 +25,7 @@ import java.io.UnsupportedEncodingException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
+import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
@@ -193,9 +194,19 @@ import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorRe
|
||||
public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
|
||||
protected final BrokerController brokerController;
|
||||
+ protected Set<String> configBlackList = new HashSet<>();
|
||||
|
||||
public AdminBrokerProcessor(final BrokerController brokerController) {
|
||||
this.brokerController = brokerController;
|
||||
+ initConfigBlackList();
|
||||
+ }
|
||||
+
|
||||
+ private void initConfigBlackList() {
|
||||
+ configBlackList.add("brokerConfigPath");
|
||||
+ configBlackList.add("rocketmqHome");
|
||||
+ configBlackList.add("configBlackList");
|
||||
+ String[] configArray = brokerController.getBrokerConfig().getConfigBlackList().split(";");
|
||||
+ configBlackList.addAll(Arrays.asList(configArray));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -919,10 +930,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||||
Properties properties = MixAll.string2Properties(bodyStr);
|
||||
if (properties != null) {
|
||||
LOGGER.info("updateBrokerConfig, new config: [{}] client: {} ", properties, callerAddress);
|
||||
-
|
||||
- if (properties.containsKey("brokerConfigPath")) {
|
||||
+ if (validateBlackListConfigExist(properties)) {
|
||||
response.setCode(ResponseCode.NO_PERMISSION);
|
||||
- response.setRemark("Can not update config path");
|
||||
+ response.setRemark("Can not update config in black list.");
|
||||
return response;
|
||||
}
|
||||
|
||||
@@ -2796,4 +2806,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
+
|
||||
+ private boolean validateBlackListConfigExist(Properties properties) {
|
||||
+ for (String blackConfig:configBlackList) {
|
||||
+ if (properties.containsKey(blackConfig)) {
|
||||
+ return true;
|
||||
+ }
|
||||
+ }
|
||||
+ return false;
|
||||
+ }
|
||||
}
|
||||
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
||||
index ec252cece..c6b889bae 100644
|
||||
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
||||
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
||||
@@ -370,8 +370,18 @@ public class AdminBrokerProcessorTest {
|
||||
|
||||
assertThat(response).isNotNull();
|
||||
assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
|
||||
- assertThat(response.getRemark()).contains("Can not update config path");
|
||||
+ assertThat(response.getRemark()).contains("Can not update config in black list.");
|
||||
|
||||
+ //update disallowed value
|
||||
+ properties.clear();
|
||||
+ properties.setProperty("configBlackList", "test;path");
|
||||
+ updateConfigRequest.setBody(MixAll.properties2String(properties).getBytes(StandardCharsets.UTF_8));
|
||||
+
|
||||
+ response = adminBrokerProcessor.processRequest(ctx, updateConfigRequest);
|
||||
+
|
||||
+ assertThat(response).isNotNull();
|
||||
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
|
||||
+ assertThat(response.getRemark()).contains("Can not update config in black list.");
|
||||
}
|
||||
|
||||
@Test
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||||
index c186352d1..96e0f8e91 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||||
@@ -406,6 +406,17 @@ public class BrokerConfig extends BrokerIdentity {
|
||||
|
||||
private int splitRegistrationSize = 800;
|
||||
|
||||
+ /**
|
||||
+ * Config in this black list will be not allowed to update by command.
|
||||
+ * Try to update this config black list by restart process.
|
||||
+ * Try to update configures in black list by restart process.
|
||||
+ */
|
||||
+ private String configBlackList = "configBlackList;brokerConfigPath";
|
||||
+
|
||||
+ public String getConfigBlackList() {
|
||||
+ return configBlackList;
|
||||
+ }
|
||||
+
|
||||
public long getMaxPopPollingSize() {
|
||||
return maxPopPollingSize;
|
||||
}
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
|
||||
index 1e9c80b22..55854cfd2 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
|
||||
@@ -83,6 +83,17 @@ public class ControllerConfig {
|
||||
|
||||
private boolean metricsInDelta = false;
|
||||
|
||||
+ /**
|
||||
+ * Config in this black list will be not allowed to update by command.
|
||||
+ * Try to update this config black list by restart process.
|
||||
+ * Try to update configures in black list by restart process.
|
||||
+ */
|
||||
+ private String configBlackList = "configBlackList;configStorePath";
|
||||
+
|
||||
+ public String getConfigBlackList() {
|
||||
+ return configBlackList;
|
||||
+ }
|
||||
+
|
||||
public String getRocketmqHome() {
|
||||
return rocketmqHome;
|
||||
}
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
|
||||
index 5b8a6dedb..b82d1b8f8 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
|
||||
@@ -90,6 +90,16 @@ public class NamesrvConfig {
|
||||
* 2. This flag does not support static topic currently.
|
||||
*/
|
||||
private boolean deleteTopicWithBrokerRegistration = false;
|
||||
+ /**
|
||||
+ * Config in this black list will be not allowed to update by command.
|
||||
+ * Try to update this config black list by restart process.
|
||||
+ * Try to update configures in black list by restart process.
|
||||
+ */
|
||||
+ private String configBlackList = "configBlackList;configStorePath;kvConfigPath";
|
||||
+
|
||||
+ public String getConfigBlackList() {
|
||||
+ return configBlackList;
|
||||
+ }
|
||||
|
||||
public boolean isOrderMessageEnable() {
|
||||
return orderMessageEnable;
|
||||
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
|
||||
index 93ecbbd9d..a8a3d2587 100644
|
||||
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
|
||||
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
|
||||
@@ -20,8 +20,11 @@ import com.google.common.base.Stopwatch;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
+import java.util.Arrays;
|
||||
+import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
+import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@@ -73,12 +76,20 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
|
||||
private static final int WAIT_TIMEOUT_OUT = 5;
|
||||
private final ControllerManager controllerManager;
|
||||
private final BrokerHeartbeatManager heartbeatManager;
|
||||
+ protected Set<String> configBlackList = new HashSet<>();
|
||||
|
||||
public ControllerRequestProcessor(final ControllerManager controllerManager) {
|
||||
this.controllerManager = controllerManager;
|
||||
this.heartbeatManager = controllerManager.getHeartbeatManager();
|
||||
+ initConfigBlackList();
|
||||
+ }
|
||||
+ private void initConfigBlackList() {
|
||||
+ configBlackList.add("configBlackList");
|
||||
+ configBlackList.add("configStorePath");
|
||||
+ configBlackList.add("rocketmqHome");
|
||||
+ String[] configArray = controllerManager.getControllerConfig().getConfigBlackList().split(";");
|
||||
+ configBlackList.addAll(Arrays.asList(configArray));
|
||||
}
|
||||
-
|
||||
@Override
|
||||
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
|
||||
if (ctx != null) {
|
||||
@@ -280,10 +291,9 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
|
||||
response.setRemark("string2Properties error");
|
||||
return response;
|
||||
}
|
||||
-
|
||||
- if (properties.containsKey("configStorePath")) {
|
||||
+ if (validateBlackListConfigExist(properties)) {
|
||||
response.setCode(ResponseCode.NO_PERMISSION);
|
||||
- response.setRemark("Can not update config path");
|
||||
+ response.setRemark("Can not update config in black list.");
|
||||
return response;
|
||||
}
|
||||
|
||||
@@ -319,5 +329,12 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
|
||||
public boolean rejectRequest() {
|
||||
return false;
|
||||
}
|
||||
-
|
||||
+ private boolean validateBlackListConfigExist(Properties properties) {
|
||||
+ for (String blackConfig : configBlackList) {
|
||||
+ if (properties.containsKey(blackConfig)) {
|
||||
+ return true;
|
||||
+ }
|
||||
+ }
|
||||
+ return false;
|
||||
+ }
|
||||
}
|
||||
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/ControllerRequestProcessorTest.java b/controller/src/test/java/org/apache/rocketmq/controller/ControllerRequestProcessorTest.java
|
||||
index ede6ca36a..46f86ad32 100644
|
||||
--- a/controller/src/test/java/org/apache/rocketmq/controller/ControllerRequestProcessorTest.java
|
||||
+++ b/controller/src/test/java/org/apache/rocketmq/controller/ControllerRequestProcessorTest.java
|
||||
@@ -64,7 +64,28 @@ public class ControllerRequestProcessorTest {
|
||||
|
||||
assertThat(response).isNotNull();
|
||||
assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
|
||||
- assertThat(response.getRemark()).contains("Can not update config path");
|
||||
+ assertThat(response.getRemark()).contains("Can not update config in black list.");
|
||||
|
||||
+ // Update disallowed value
|
||||
+ properties.clear();
|
||||
+ properties.setProperty("rocketmqHome", "test/path");
|
||||
+ updateConfigRequest.setBody(MixAll.properties2String(properties).getBytes(StandardCharsets.UTF_8));
|
||||
+
|
||||
+ response = controllerRequestProcessor.processRequest(null, updateConfigRequest);
|
||||
+
|
||||
+ assertThat(response).isNotNull();
|
||||
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
|
||||
+ assertThat(response.getRemark()).contains("Can not update config in black list.");
|
||||
+
|
||||
+ // Update disallowed value
|
||||
+ properties.clear();
|
||||
+ properties.setProperty("configBlackList", "test;path");
|
||||
+ updateConfigRequest.setBody(MixAll.properties2String(properties).getBytes(StandardCharsets.UTF_8));
|
||||
+
|
||||
+ response = controllerRequestProcessor.processRequest(null, updateConfigRequest);
|
||||
+
|
||||
+ assertThat(response).isNotNull();
|
||||
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
|
||||
+ assertThat(response.getRemark()).contains("Can not update config in black list.");
|
||||
}
|
||||
}
|
||||
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
|
||||
index 485b95c42..2daa95b9b 100644
|
||||
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
|
||||
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
|
||||
@@ -18,8 +18,11 @@ package org.apache.rocketmq.namesrv.processor;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
+import java.util.Arrays;
|
||||
+import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
+import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.common.MQVersion;
|
||||
@@ -71,8 +74,20 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
|
||||
|
||||
protected final NamesrvController namesrvController;
|
||||
|
||||
+ protected Set<String> configBlackList = new HashSet<>();
|
||||
+
|
||||
public DefaultRequestProcessor(NamesrvController namesrvController) {
|
||||
this.namesrvController = namesrvController;
|
||||
+ initConfigBlackList();
|
||||
+ }
|
||||
+
|
||||
+ private void initConfigBlackList() {
|
||||
+ configBlackList.add("configBlackList");
|
||||
+ configBlackList.add("configStorePath");
|
||||
+ configBlackList.add("kvConfigPath");
|
||||
+ configBlackList.add("rocketmqHome");
|
||||
+ String[] configArray = namesrvController.getNamesrvConfig().getConfigBlackList().split(";");
|
||||
+ configBlackList.addAll(Arrays.asList(configArray));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -153,6 +168,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
|
||||
response.setRemark("namespace or key is null");
|
||||
return response;
|
||||
}
|
||||
+
|
||||
this.namesrvController.getKvConfigManager().putKVConfig(
|
||||
requestHeader.getNamespace(),
|
||||
requestHeader.getKey(),
|
||||
@@ -623,10 +639,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
|
||||
response.setRemark("string2Properties error");
|
||||
return response;
|
||||
}
|
||||
-
|
||||
- if (properties.containsKey("kvConfigPath") || properties.containsKey("configStorePath")) {
|
||||
+ if (validateBlackListConfigExist(properties)) {
|
||||
response.setCode(ResponseCode.NO_PERMISSION);
|
||||
- response.setRemark("Can not update config path");
|
||||
+ response.setRemark("Can not update config in black list.");
|
||||
return response;
|
||||
}
|
||||
|
||||
@@ -658,4 +673,13 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
|
||||
return response;
|
||||
}
|
||||
|
||||
+ private boolean validateBlackListConfigExist(Properties properties) {
|
||||
+ for (String blackConfig : configBlackList) {
|
||||
+ if (properties.containsKey(blackConfig)) {
|
||||
+ return true;
|
||||
+ }
|
||||
+ }
|
||||
+ return false;
|
||||
+ }
|
||||
+
|
||||
}
|
||||
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
|
||||
index 5bdf96d9d..2b2cf6294 100644
|
||||
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
|
||||
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
|
||||
@@ -203,7 +203,7 @@ public class RequestProcessorTest {
|
||||
|
||||
assertThat(response).isNotNull();
|
||||
assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
|
||||
- assertThat(response.getRemark()).contains("Can not update config path");
|
||||
+ assertThat(response.getRemark()).contains("Can not update config in black list.");
|
||||
|
||||
//update disallowed values
|
||||
properties.clear();
|
||||
@@ -214,7 +214,18 @@ public class RequestProcessorTest {
|
||||
|
||||
assertThat(response).isNotNull();
|
||||
assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
|
||||
- assertThat(response.getRemark()).contains("Can not update config path");
|
||||
+ assertThat(response.getRemark()).contains("Can not update config in black list");
|
||||
+
|
||||
+ //update disallowed values
|
||||
+ properties.clear();
|
||||
+ properties.setProperty("configBlackList", "test;path");
|
||||
+ updateConfigRequest.setBody(MixAll.properties2String(properties).getBytes(StandardCharsets.UTF_8));
|
||||
+
|
||||
+ response = defaultRequestProcessor.processRequest(null, updateConfigRequest);
|
||||
+
|
||||
+ assertThat(response).isNotNull();
|
||||
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
|
||||
+ assertThat(response.getRemark()).contains("Can not update config in black list");
|
||||
}
|
||||
|
||||
@Test
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 430ee0a755daf867de31e37b12df417f64811b3a Mon Sep 17 00:00:00 2001
|
||||
From: rongtong <jinrongtong5@163.com>
|
||||
Date: Tue, 28 Nov 2023 16:11:14 +0800
|
||||
Subject: [PATCH 2/3] Add validation in broker container configure updating
|
||||
command. (#7587)
|
||||
|
||||
---
|
||||
.../container/BrokerContainerConfig.java | 16 ++++++++
|
||||
.../container/BrokerContainerProcessor.java | 40 +++++++++++++++++--
|
||||
2 files changed, 52 insertions(+), 4 deletions(-)
|
||||
|
||||
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
|
||||
index e03b10c34..03b4b263f 100644
|
||||
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
|
||||
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
|
||||
@@ -49,6 +49,14 @@ public class BrokerContainerConfig {
|
||||
*/
|
||||
private long updateNamesrvAddrInterval = 60 * 2 * 1000;
|
||||
|
||||
+
|
||||
+ /**
|
||||
+ * Config in this black list will be not allowed to update by command.
|
||||
+ * Try to update this config black list by restart process.
|
||||
+ * Try to update configures in black list by restart process.
|
||||
+ */
|
||||
+ private String configBlackList = "configBlackList;brokerConfigPaths";
|
||||
+
|
||||
public String getRocketmqHome() {
|
||||
return rocketmqHome;
|
||||
}
|
||||
@@ -108,4 +116,12 @@ public class BrokerContainerConfig {
|
||||
public void setUpdateNamesrvAddrInterval(long updateNamesrvAddrInterval) {
|
||||
this.updateNamesrvAddrInterval = updateNamesrvAddrInterval;
|
||||
}
|
||||
+
|
||||
+ public String getConfigBlackList() {
|
||||
+ return configBlackList;
|
||||
+ }
|
||||
+
|
||||
+ public void setConfigBlackList(String configBlackList) {
|
||||
+ this.configBlackList = configBlackList;
|
||||
+ }
|
||||
}
|
||||
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
|
||||
index 5b825fe81..5ced08257 100644
|
||||
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
|
||||
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
|
||||
@@ -19,6 +19,9 @@ package org.apache.rocketmq.container;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
+import java.util.Arrays;
|
||||
+import java.util.HashSet;
|
||||
+import java.util.Set;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import org.apache.rocketmq.broker.BrokerController;
|
||||
@@ -45,8 +48,19 @@ public class BrokerContainerProcessor implements NettyRequestProcessor {
|
||||
private final BrokerContainer brokerContainer;
|
||||
private List<BrokerBootHook> brokerBootHookList;
|
||||
|
||||
+ private final Set<String> configBlackList = new HashSet<>();
|
||||
+
|
||||
public BrokerContainerProcessor(BrokerContainer brokerContainer) {
|
||||
this.brokerContainer = brokerContainer;
|
||||
+ initConfigBlackList();
|
||||
+ }
|
||||
+
|
||||
+ private void initConfigBlackList() {
|
||||
+ configBlackList.add("brokerConfigPaths");
|
||||
+ configBlackList.add("rocketmqHome");
|
||||
+ configBlackList.add("configBlackList");
|
||||
+ String[] configArray = brokerContainer.getBrokerContainerConfig().getConfigBlackList().split(";");
|
||||
+ configBlackList.addAll(Arrays.asList(configArray));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -232,15 +246,24 @@ public class BrokerContainerProcessor implements NettyRequestProcessor {
|
||||
try {
|
||||
String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
|
||||
Properties properties = MixAll.string2Properties(bodyStr);
|
||||
- if (properties != null) {
|
||||
- LOGGER.info("updateSharedBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
|
||||
- this.brokerContainer.getConfiguration().update(properties);
|
||||
- } else {
|
||||
+
|
||||
+ if (properties == null) {
|
||||
LOGGER.error("string2Properties error");
|
||||
response.setCode(ResponseCode.SYSTEM_ERROR);
|
||||
response.setRemark("string2Properties error");
|
||||
return response;
|
||||
}
|
||||
+
|
||||
+ if (validateBlackListConfigExist(properties)) {
|
||||
+ response.setCode(ResponseCode.NO_PERMISSION);
|
||||
+ response.setRemark("Can not update config in black list.");
|
||||
+ return response;
|
||||
+ }
|
||||
+
|
||||
+
|
||||
+ LOGGER.info("updateBrokerContainerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
|
||||
+ this.brokerContainer.getConfiguration().update(properties);
|
||||
+
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
LOGGER.error("", e);
|
||||
response.setCode(ResponseCode.SYSTEM_ERROR);
|
||||
@@ -254,6 +277,15 @@ public class BrokerContainerProcessor implements NettyRequestProcessor {
|
||||
return response;
|
||||
}
|
||||
|
||||
+ private boolean validateBlackListConfigExist(Properties properties) {
|
||||
+ for (String blackConfig : configBlackList) {
|
||||
+ if (properties.containsKey(blackConfig)) {
|
||||
+ return true;
|
||||
+ }
|
||||
+ }
|
||||
+ return false;
|
||||
+ }
|
||||
+
|
||||
private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
|
||||
|
||||
final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class);
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From a194e1eb9a12e08c43a0da65cd0a048ff849e04d Mon Sep 17 00:00:00 2001
|
||||
From: dinglei <libya_003@163.com>
|
||||
Date: Tue, 28 Nov 2023 20:18:53 +0800
|
||||
Subject: [PATCH 3/3] Add set method for config black list. (#7586)
|
||||
|
||||
---
|
||||
.../main/java/org/apache/rocketmq/common/BrokerConfig.java | 4 ++++
|
||||
.../java/org/apache/rocketmq/common/ControllerConfig.java | 4 ++++
|
||||
.../org/apache/rocketmq/common/namesrv/NamesrvConfig.java | 4 ++++
|
||||
3 files changed, 12 insertions(+)
|
||||
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||||
index 96e0f8e91..a4a553ad5 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||||
@@ -417,6 +417,10 @@ public class BrokerConfig extends BrokerIdentity {
|
||||
return configBlackList;
|
||||
}
|
||||
|
||||
+ public void setConfigBlackList(String configBlackList) {
|
||||
+ this.configBlackList = configBlackList;
|
||||
+ }
|
||||
+
|
||||
public long getMaxPopPollingSize() {
|
||||
return maxPopPollingSize;
|
||||
}
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
|
||||
index 55854cfd2..1364754a0 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
|
||||
@@ -94,6 +94,10 @@ public class ControllerConfig {
|
||||
return configBlackList;
|
||||
}
|
||||
|
||||
+ public void setConfigBlackList(String configBlackList) {
|
||||
+ this.configBlackList = configBlackList;
|
||||
+ }
|
||||
+
|
||||
public String getRocketmqHome() {
|
||||
return rocketmqHome;
|
||||
}
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
|
||||
index b82d1b8f8..d1cdc7631 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
|
||||
@@ -101,6 +101,10 @@ public class NamesrvConfig {
|
||||
return configBlackList;
|
||||
}
|
||||
|
||||
+ public void setConfigBlackList(String configBlackList) {
|
||||
+ this.configBlackList = configBlackList;
|
||||
+ }
|
||||
+
|
||||
public boolean isOrderMessageEnable() {
|
||||
return orderMessageEnable;
|
||||
}
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
80
patch040-backport-add-some-test-cases.patch
Normal file
80
patch040-backport-add-some-test-cases.patch
Normal file
@ -0,0 +1,80 @@
|
||||
From 56e886bf70669befd7b9e7380e68751fe67f05b2 Mon Sep 17 00:00:00 2001
|
||||
From: YASH PATEL <121890726+yp969803@users.noreply.github.com>
|
||||
Date: Wed, 29 Nov 2023 10:02:38 +0530
|
||||
Subject: [PATCH] =?UTF-8?q?[ISSUE=20#7592]=20testCleanBuffer=20unit=20test?=
|
||||
=?UTF-8?q?=20modifies,=20changed=20non-direct=20=E2=80=A6=20(#7593)?=
|
||||
MIME-Version: 1.0
|
||||
Content-Type: text/plain; charset=UTF-8
|
||||
Content-Transfer-Encoding: 8bit
|
||||
|
||||
* [ISSUE #7592] testCleanBuffer unit test modifies, changed non-direct to direct buffer allocation
|
||||
|
||||
* fix: consolidate UtilAll#cleanBuffer by checking if the given buffer is direct or not
|
||||
|
||||
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
|
||||
|
||||
---------
|
||||
|
||||
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
|
||||
Co-authored-by: Li Zhanhui <lizhanhui@gmail.com>
|
||||
---
|
||||
.../main/java/org/apache/rocketmq/common/UtilAll.java | 9 +++++++++
|
||||
.../java/org/apache/rocketmq/common/UtilAllTest.java | 3 ++-
|
||||
.../rocketmq/store/timer/TimerMessageStoreTest.java | 2 +-
|
||||
3 files changed, 12 insertions(+), 2 deletions(-)
|
||||
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
|
||||
index 2808f106a..19efa9aa9 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
|
||||
@@ -699,10 +699,19 @@ public class UtilAll {
|
||||
}
|
||||
}
|
||||
|
||||
+ /**
|
||||
+ * Free direct-buffer's memory actively.
|
||||
+ * @param buffer Direct buffer to free.
|
||||
+ */
|
||||
public static void cleanBuffer(final ByteBuffer buffer) {
|
||||
if (null == buffer) {
|
||||
return;
|
||||
}
|
||||
+
|
||||
+ if (!buffer.isDirect()) {
|
||||
+ return;
|
||||
+ }
|
||||
+
|
||||
PlatformDependent.freeDirectBuffer(buffer);
|
||||
}
|
||||
|
||||
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
|
||||
index cb288578c..2d22d5254 100644
|
||||
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
|
||||
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
|
||||
@@ -215,8 +215,9 @@ public class UtilAllTest {
|
||||
@Test
|
||||
public void testCleanBuffer() {
|
||||
UtilAll.cleanBuffer(null);
|
||||
+ UtilAll.cleanBuffer(ByteBuffer.allocateDirect(10));
|
||||
+ UtilAll.cleanBuffer(ByteBuffer.allocateDirect(0));
|
||||
UtilAll.cleanBuffer(ByteBuffer.allocate(10));
|
||||
- UtilAll.cleanBuffer(ByteBuffer.allocate(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
|
||||
index 63ec97cdb..02ff35681 100644
|
||||
--- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
|
||||
+++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
|
||||
@@ -387,7 +387,7 @@ public class TimerMessageStoreTest {
|
||||
assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
|
||||
}
|
||||
|
||||
- // Wait until messages have wrote to TimerLog and currReadTimeMs catches up current time.
|
||||
+ // Wait until messages have written to TimerLog and currReadTimeMs catches up current time.
|
||||
await().atMost(5000, TimeUnit.MILLISECONDS).until(new Callable<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() {
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
53
patch041-backport-improve-performance.patch
Normal file
53
patch041-backport-improve-performance.patch
Normal file
@ -0,0 +1,53 @@
|
||||
From 65faea22fd54fd9875f2ca9d3088b4dc46d31cce Mon Sep 17 00:00:00 2001
|
||||
From: keranbingaa <397294722@qq.com>
|
||||
Date: Fri, 1 Dec 2023 10:05:16 +0800
|
||||
Subject: [PATCH] [ISSUE #7534] Use high performance concurrent set to replace
|
||||
copyonwriteset (#7583)
|
||||
|
||||
* fix ISSUE #7534
|
||||
|
||||
* reformat code
|
||||
|
||||
* Remove the useless unit test
|
||||
|
||||
---------
|
||||
|
||||
Co-authored-by: RongtongJin <jinrongtong16@mails.ucas.ac.cn>
|
||||
---
|
||||
.../rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java | 1 -
|
||||
.../org/apache/rocketmq/remoting/protocol/body/TopicList.java | 4 ++--
|
||||
2 files changed, 2 insertions(+), 3 deletions(-)
|
||||
|
||||
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
|
||||
index 6002d1f5a..b52cf5074 100644
|
||||
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
|
||||
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
|
||||
@@ -130,7 +130,6 @@ public class RouteInfoManagerNewTest {
|
||||
topicList = TopicList.decode(content, TopicList.class);
|
||||
assertThat(topicList.getTopicList()).contains("TestTopic", "TestTopic1", "TestTopic2");
|
||||
}
|
||||
-
|
||||
@Test
|
||||
public void registerBroker() {
|
||||
// Register master broker
|
||||
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/TopicList.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/TopicList.java
|
||||
index 30edfb5a9..0de0bae7e 100644
|
||||
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/TopicList.java
|
||||
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/TopicList.java
|
||||
@@ -17,11 +17,11 @@
|
||||
package org.apache.rocketmq.remoting.protocol.body;
|
||||
|
||||
import java.util.Set;
|
||||
-import java.util.concurrent.CopyOnWriteArraySet;
|
||||
+import java.util.concurrent.ConcurrentHashMap;
|
||||
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
|
||||
|
||||
public class TopicList extends RemotingSerializable {
|
||||
- private Set<String> topicList = new CopyOnWriteArraySet<>();
|
||||
+ private Set<String> topicList = ConcurrentHashMap.newKeySet();
|
||||
private String brokerAddr;
|
||||
|
||||
public Set<String> getTopicList() {
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
1196
patch042-backport-Support-message-filtering.patch
Normal file
1196
patch042-backport-Support-message-filtering.patch
Normal file
File diff suppressed because it is too large
Load Diff
186
patch043-backport-fix-some-bugs.patch
Normal file
186
patch043-backport-fix-some-bugs.patch
Normal file
@ -0,0 +1,186 @@
|
||||
From c2c29c2435e0626cfe4f49830fbdc0d9421d82b5 Mon Sep 17 00:00:00 2001
|
||||
From: lizhimins <707364882@qq.com>
|
||||
Date: Mon, 4 Dec 2023 16:13:07 +0800
|
||||
Subject: [PATCH 1/2] [ISSUE #7545] Fix set mapped file to null cause file can
|
||||
not destroy (#7612)
|
||||
|
||||
---
|
||||
.../rocketmq/tieredstore/index/IndexStoreFile.java | 2 --
|
||||
.../rocketmq/tieredstore/index/IndexStoreService.java | 10 ++++++++++
|
||||
2 files changed, 10 insertions(+), 2 deletions(-)
|
||||
|
||||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
|
||||
index 52a686f68..def5c8f2d 100644
|
||||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
|
||||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
|
||||
@@ -457,11 +457,9 @@ public class IndexStoreFile implements IndexFile {
|
||||
this.fileStatus.set(IndexStatusEnum.SHUTDOWN);
|
||||
if (this.mappedFile != null) {
|
||||
this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
|
||||
- this.mappedFile = null;
|
||||
}
|
||||
if (this.compactMappedFile != null) {
|
||||
this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
|
||||
- this.compactMappedFile = null;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e);
|
||||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
|
||||
index 14608aa58..e99ea0de1 100644
|
||||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
|
||||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
|
||||
@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.common.ServiceThread;
|
||||
+import org.apache.rocketmq.common.UtilAll;
|
||||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||||
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
|
||||
@@ -101,6 +102,10 @@ public class IndexStoreService extends ServiceThread implements IndexService {
|
||||
private void recover() {
|
||||
Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
|
||||
+ // delete compact file directory
|
||||
+ UtilAll.deleteFile(new File(Paths.get(storeConfig.getStorePathRootDir(),
|
||||
+ FILE_DIRECTORY_NAME, FILE_COMPACTED_DIRECTORY_NAME).toString()));
|
||||
+
|
||||
// recover local
|
||||
File dir = new File(Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME).toString());
|
||||
this.doConvertOldFormatFile(Paths.get(dir.getPath(), "0000").toString());
|
||||
@@ -141,6 +146,10 @@ public class IndexStoreService extends ServiceThread implements IndexService {
|
||||
|
||||
for (TieredFileSegment fileSegment : flatFile.getFileSegmentList()) {
|
||||
IndexFile indexFile = new IndexStoreFile(storeConfig, fileSegment);
|
||||
+ IndexFile localFile = timeStoreTable.get(indexFile.getTimestamp());
|
||||
+ if (localFile != null) {
|
||||
+ localFile.destroy();
|
||||
+ }
|
||||
timeStoreTable.put(indexFile.getTimestamp(), indexFile);
|
||||
log.info("IndexStoreService recover load remote file, timestamp: {}", indexFile.getTimestamp());
|
||||
}
|
||||
@@ -248,6 +257,7 @@ public class IndexStoreService extends ServiceThread implements IndexService {
|
||||
if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
|
||||
log.error("IndexStoreService file status not correct, so skip, timestamp: {}, status: {}",
|
||||
indexFile.getTimestamp(), indexFile.getFileStatus());
|
||||
+ indexFile.destroy();
|
||||
return;
|
||||
}
|
||||
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From faae64715d917bb5d64b8d72581172d26ebe9501 Mon Sep 17 00:00:00 2001
|
||||
From: gaoyf <gaoyf@users.noreply.github.com>
|
||||
Date: Thu, 7 Dec 2023 11:25:22 +0800
|
||||
Subject: [PATCH 2/2] [ISSUE #7601] Fix slave acting master bug (#7603)
|
||||
|
||||
* fix NullPointerException when message escape to remote
|
||||
|
||||
* fix NumberFormatException when message retry to escape to remote
|
||||
|
||||
* fix timerCheckPoint of the master is not updated, causing the timer message to be replayed after master is restarted
|
||||
|
||||
* Use properties copies instead of referencing the same map when converting message
|
||||
---
|
||||
.../org/apache/rocketmq/broker/BrokerController.java | 1 +
|
||||
.../rocketmq/broker/slave/SlaveSynchronize.java | 4 +++-
|
||||
.../rocketmq/common/message/MessageAccessor.java | 7 +++++++
|
||||
.../rocketmq/store/timer/TimerMessageStore.java | 12 +++++++++---
|
||||
4 files changed, 20 insertions(+), 4 deletions(-)
|
||||
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
||||
index 9f1fd0ad0..8d29d4438 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
||||
@@ -2108,6 +2108,7 @@ public class BrokerController {
|
||||
isScheduleServiceStart = shouldStart;
|
||||
|
||||
if (timerMessageStore != null) {
|
||||
+ timerMessageStore.syncLastReadTimeMs();
|
||||
timerMessageStore.setShouldRunningDequeue(shouldStart);
|
||||
}
|
||||
}
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
|
||||
index 53cdecdf8..7f802adb9 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
|
||||
@@ -215,11 +215,13 @@ public class SlaveSynchronize {
|
||||
String masterAddrBak = this.masterAddr;
|
||||
if (masterAddrBak != null) {
|
||||
try {
|
||||
- if (null != brokerController.getMessageStore().getTimerMessageStore()) {
|
||||
+ if (null != brokerController.getMessageStore().getTimerMessageStore() &&
|
||||
+ !brokerController.getTimerMessageStore().isShouldRunningDequeue()) {
|
||||
TimerCheckpoint checkpoint = this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak);
|
||||
if (null != this.brokerController.getTimerCheckpoint()) {
|
||||
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs());
|
||||
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset());
|
||||
+ this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
|
||||
index 1b7e2bba3..62e3bbd7e 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
|
||||
@@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.rocketmq.common.message;
|
||||
|
||||
+import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class MessageAccessor {
|
||||
@@ -96,4 +97,10 @@ public class MessageAccessor {
|
||||
return newMsg;
|
||||
}
|
||||
|
||||
+ public static Map<String, String> deepCopyProperties(Map<String, String> properties) {
|
||||
+ if (properties == null) {
|
||||
+ return null;
|
||||
+ }
|
||||
+ return new HashMap<>(properties);
|
||||
+ }
|
||||
}
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
||||
index d796e4467..872cd7105 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
||||
@@ -602,6 +602,10 @@ public class TimerMessageStore {
|
||||
this.shouldRunningDequeue = shouldRunningDequeue;
|
||||
}
|
||||
|
||||
+ public boolean isShouldRunningDequeue() {
|
||||
+ return shouldRunningDequeue;
|
||||
+ }
|
||||
+
|
||||
public void addMetric(MessageExt msg, int value) {
|
||||
try {
|
||||
if (null == msg || null == msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
|
||||
@@ -1084,8 +1088,10 @@ public class TimerMessageStore {
|
||||
case PUT_OK:
|
||||
if (brokerStatsManager != null) {
|
||||
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
|
||||
- this.brokerStatsManager.incTopicPutSize(message.getTopic(),
|
||||
- putMessageResult.getAppendMessageResult().getWroteBytes());
|
||||
+ if (putMessageResult.getAppendMessageResult() != null) {
|
||||
+ this.brokerStatsManager.incTopicPutSize(message.getTopic(),
|
||||
+ putMessageResult.getAppendMessageResult().getWroteBytes());
|
||||
+ }
|
||||
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
|
||||
}
|
||||
return PUT_OK;
|
||||
@@ -1119,7 +1125,7 @@ public class TimerMessageStore {
|
||||
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
|
||||
msgInner.setBody(msgExt.getBody());
|
||||
msgInner.setFlag(msgExt.getFlag());
|
||||
- MessageAccessor.setProperties(msgInner, msgExt.getProperties());
|
||||
+ MessageAccessor.setProperties(msgInner, MessageAccessor.deepCopyProperties(msgExt.getProperties()));
|
||||
TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
|
||||
long tagsCodeValue =
|
||||
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
Summary: Cloud-Native, Distributed Messaging and Streaming
|
||||
Name: rocketmq
|
||||
Version: 5.1.5
|
||||
Release: 35
|
||||
Release: 44
|
||||
License: Apache-2.0
|
||||
Group: Applications/Message
|
||||
URL: https://rocketmq.apache.org/
|
||||
@ -44,6 +44,15 @@ Patch0031: patch031-backport-Add-CRC-check-of-commitlog.patch
|
||||
Patch0032: patch032-backport-Clear-POP_CK-when-sending-messages.patch
|
||||
Patch0033: patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch
|
||||
Patch0034: patch034-backport-Let-consumer-be-aware-of-message-queue-assignment-change.patch
|
||||
Patch0035: patch035-backport-fix-some-bugs.patch
|
||||
Patch0036: patch036-backport-RIP65.patch
|
||||
Patch0037: patch037-backport-Retry-topic-v2-in-pop.patch
|
||||
Patch0038: patch038-backport-SlaveActingMaster-Timer-Message-retry-without-escape-logic.patch
|
||||
Patch0039: patch039-backport-add-some-validations.patch
|
||||
Patch0040: patch040-backport-add-some-test-cases.patch
|
||||
Patch0041: patch041-backport-improve-performance.patch
|
||||
Patch0042: patch042-backport-Support-message-filtering.patch
|
||||
Patch0043: patch043-backport-fix-some-bugs.patch
|
||||
BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git
|
||||
Requires: java-1.8.0-openjdk-devel
|
||||
|
||||
@ -84,6 +93,33 @@ exit 0
|
||||
|
||||
|
||||
%changelog
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-44
|
||||
- backport fix some bugs
|
||||
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-43
|
||||
- backport Support message filtering
|
||||
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-42
|
||||
- backport add improve performance
|
||||
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-41
|
||||
- backport add some test cases
|
||||
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-40
|
||||
- backport add some validations
|
||||
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-39
|
||||
- backport SlaveActingMaster Timer Message retry without escape logic
|
||||
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-38
|
||||
- backport Retry topic v2 in pop
|
||||
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-37
|
||||
- backport rip 65
|
||||
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-36
|
||||
- backport fix some bugs
|
||||
|
||||
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-35
|
||||
- backport Let consumer be aware of message queue assignment change
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user