rocketmq/patch035-backport-fix-some-bugs.patch
2023-12-11 15:20:10 +08:00

395 lines
18 KiB
Diff

From 1be5ebc7363e4bc6503c80688160a354f5a12f78 Mon Sep 17 00:00:00 2001
From: Zhanhui Li <lizhanhui@apache.org>
Date: Mon, 13 Nov 2023 09:45:37 +0800
Subject: [PATCH 1/5] [ISSUE #7551] Reuse helper methods from Netty to free
direct byte buffer (#7550)
* Reuse helper methods from Netty to free direct byte buffer, making codebase JDK 9+ compatible
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* Guard against null
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* fix #7552
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
---------
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
---
.../org/apache/rocketmq/common/UtilAll.java | 60 +------------------
.../apache/rocketmq/common/UtilAllTest.java | 10 ----
2 files changed, 3 insertions(+), 67 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index 95b6b09b4..2808f106a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -16,21 +16,18 @@
*/
package org.apache.rocketmq.common;
+import io.netty.util.internal.PlatformDependent;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
import java.nio.file.Files;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
import java.text.NumberFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -46,15 +43,11 @@ import java.util.function.Supplier;
import java.util.zip.CRC32;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
-import org.apache.commons.lang3.JavaVersion;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
public class UtilAll {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@@ -707,57 +700,10 @@ public class UtilAll {
}
public static void cleanBuffer(final ByteBuffer buffer) {
- if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0) {
+ if (null == buffer) {
return;
}
- if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
- try {
- Field field = Unsafe.class.getDeclaredField("theUnsafe");
- field.setAccessible(true);
- Unsafe unsafe = (Unsafe) field.get(null);
- Method cleaner = method(unsafe, "invokeCleaner", new Class[] {ByteBuffer.class});
- cleaner.invoke(unsafe, viewed(buffer));
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- } else {
- invoke(invoke(viewed(buffer), "cleaner"), "clean");
- }
- }
-
- public static Object invoke(final Object target, final String methodName, final Class<?>... args) {
- return AccessController.doPrivileged(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
- Method method = method(target, methodName, args);
- method.setAccessible(true);
- return method.invoke(target);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- });
- }
-
- public static Method method(Object target, String methodName, Class<?>[] args) throws NoSuchMethodException {
- try {
- return target.getClass().getMethod(methodName, args);
- } catch (NoSuchMethodException e) {
- return target.getClass().getDeclaredMethod(methodName, args);
- }
- }
-
- private static ByteBuffer viewed(ByteBuffer buffer) {
- if (!buffer.isDirect()) {
- throw new IllegalArgumentException("buffer is non-direct");
- }
- ByteBuffer viewedBuffer = (ByteBuffer) ((DirectBuffer) buffer).attachment();
- if (viewedBuffer == null) {
- return buffer;
- } else {
- return viewed(viewedBuffer);
- }
+ PlatformDependent.freeDirectBuffer(buffer);
}
public static void ensureDirOK(final String dirName) {
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
index 94bb390eb..cb288578c 100644
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -219,16 +219,6 @@ public class UtilAllTest {
UtilAll.cleanBuffer(ByteBuffer.allocate(0));
}
- @Test(expected = NoSuchMethodException.class)
- public void testMethod() throws NoSuchMethodException {
- UtilAll.method(new Object(), "noMethod", null);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testInvoke() throws Exception {
- UtilAll.invoke(new Object(), "noMethod");
- }
-
@Test
public void testCalculateFileSizeInPath() throws Exception {
/**
--
2.32.0.windows.2
From 4791d9a1f1a7c39e005da15f228473c04eafd007 Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Tue, 14 Nov 2023 17:59:47 +0800
Subject: [PATCH 2/5] [ISSUE #5923] Revert "Fix tiered store README.md error
about Configuration (#7436)" (#7557)
This reverts commit 70dc93abbcb9bf161378d66fcaca55bedc78b905.
---
.../tieredstore/common/TieredMessageStoreConfig.java | 10 +++++-----
.../tieredstore/provider/posix/PosixFileSegment.java | 4 ++--
.../rocketmq/tieredstore/file/TieredCommitLogTest.java | 2 +-
.../provider/posix/PosixFileSegmentTest.java | 2 +-
4 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
index a112ea6b1..595db6b86 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
@@ -115,7 +115,7 @@ public class TieredMessageStoreConfig {
private long readAheadCacheExpireDuration = 10 * 1000;
private double readAheadCacheSizeThresholdRate = 0.3;
- private String tieredStoreFilepath = "";
+ private String tieredStoreFilePath = "";
private String objectStoreEndpoint = "";
@@ -350,12 +350,12 @@ public class TieredMessageStoreConfig {
this.readAheadCacheSizeThresholdRate = rate;
}
- public String getTieredStoreFilepath() {
- return tieredStoreFilepath;
+ public String getTieredStoreFilePath() {
+ return tieredStoreFilePath;
}
- public void setTieredStoreFilepath(String tieredStoreFilepath) {
- this.tieredStoreFilepath = tieredStoreFilepath;
+ public void setTieredStoreFilePath(String tieredStoreFilePath) {
+ this.tieredStoreFilePath = tieredStoreFilePath;
}
public void setObjectStoreEndpoint(String objectStoreEndpoint) {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 708ce33f9..7e949cb28 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -66,8 +66,8 @@ public class PosixFileSegment extends TieredFileSegment {
super(storeConfig, fileType, filePath, baseOffset);
// basePath
- String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilepath(),
- StringUtils.appendIfMissing(storeConfig.getTieredStoreFilepath(), File.separator));
+ String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilePath(),
+ StringUtils.appendIfMissing(storeConfig.getTieredStoreFilePath(), File.separator));
// fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset
String brokerClusterName = storeConfig.getBrokerClusterName();
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
index 80cdba977..6693d3cb7 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
@@ -49,7 +49,7 @@ public class TieredCommitLogTest {
TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
storeConfig.setBrokerName("brokerName");
storeConfig.setStorePathRootDir(storePath);
- storeConfig.setTieredStoreFilepath(storePath + File.separator);
+ storeConfig.setTieredStoreFilePath(storePath + File.separator);
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
storeConfig.setCommitLogRollingInterval(0);
storeConfig.setTieredStoreCommitLogMaxSize(1000);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
index ede62b8ce..db33ae847 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
@@ -42,7 +42,7 @@ public class PosixFileSegmentTest {
@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
- storeConfig.setTieredStoreFilepath(storePath);
+ storeConfig.setTieredStoreFilePath(storePath);
mq = new MessageQueue("OSSFileSegmentTest", "broker", 0);
TieredStoreExecutor.init();
}
--
2.32.0.windows.2
From 651a5ca992988b90c7e4884e9975db0938557def Mon Sep 17 00:00:00 2001
From: Jixiang Jin <lollipop@apache.org>
Date: Thu, 16 Nov 2023 10:16:16 +0800
Subject: [PATCH 3/5] [ISSUE #7562] BugFix for estimating message accumulation
correctly (#7563)
---
.../broker/metrics/ConsumerLagCalculator.java | 11 +++++---
.../proxy/common/utils/FilterUtilTest.java | 25 +++++++++++++++++++
.../remoting/protocol/filter/FilterAPI.java | 8 ++++++
3 files changed, 40 insertions(+), 4 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index 7a5f1f765..af08a83c7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.subscription.SimpleSubscriptionData;
@@ -435,10 +436,12 @@ public class ConsumerLagCalculator {
if (subscriptionGroupConfig != null) {
for (SimpleSubscriptionData simpleSubscriptionData : subscriptionGroupConfig.getSubscriptionDataSet()) {
if (topic.equals(simpleSubscriptionData.getTopic())) {
- subscriptionData = new SubscriptionData();
- subscriptionData.setTopic(simpleSubscriptionData.getTopic());
- subscriptionData.setExpressionType(simpleSubscriptionData.getExpressionType());
- subscriptionData.setSubString(simpleSubscriptionData.getExpression());
+ try {
+ subscriptionData = FilterAPI.buildSubscriptionData(simpleSubscriptionData.getTopic(),
+ simpleSubscriptionData.getExpression(), simpleSubscriptionData.getExpressionType());
+ } catch (Exception e) {
+ LOGGER.error("Try to build subscription for group:{}, topic:{} exception.", group, topic, e);
+ }
break;
}
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
index 23389e9d3..7c9d84015 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
@@ -48,4 +48,29 @@ public class FilterUtilTest {
assertThat(FilterUtils.isTagMatched(subscriptionData.getTagsSet(), null)).isFalse();
}
+ @Test
+ public void testBuildSubscriptionData() throws Exception {
+ // Test case 1: expressionType is null, will use TAG as default.
+ String topic = "topic";
+ String subString = "substring";
+ String expressionType = null;
+ SubscriptionData result = FilterAPI.buildSubscriptionData(topic, subString, expressionType);
+ assertThat(result).isNotNull();
+ assertThat(topic).isEqualTo(result.getTopic());
+ assertThat(subString).isEqualTo(result.getSubString());
+ assertThat(result.getExpressionType()).isEqualTo("TAG");
+ assertThat(result.getCodeSet().size()).isEqualTo(1);
+
+ // Test case 2: expressionType is not null
+ topic = "topic";
+ subString = "substring1||substring2";
+ expressionType = "SQL92";
+ result = FilterAPI.buildSubscriptionData(topic, subString, expressionType);
+ assertThat(result).isNotNull();
+ assertThat(topic).isEqualTo(result.getTopic());
+ assertThat(subString).isEqualTo(result.getSubString());
+ assertThat(result.getExpressionType()).isEqualTo(expressionType);
+ assertThat(result.getCodeSet().size()).isEqualTo(2);
+ }
+
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
index 10a6bb463..f291bfccf 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
@@ -46,6 +46,14 @@ public class FilterAPI {
return subscriptionData;
}
+ public static SubscriptionData buildSubscriptionData(String topic, String subString, String expressionType) throws Exception {
+ final SubscriptionData subscriptionData = buildSubscriptionData(topic, subString);
+ if (StringUtils.isNotBlank(expressionType)) {
+ subscriptionData.setExpressionType(expressionType);
+ }
+ return subscriptionData;
+ }
+
public static SubscriptionData build(final String topic, final String subString,
final String type) throws Exception {
if (ExpressionType.TAG.equals(type) || type == null) {
--
2.32.0.windows.2
From 01a2aef96bdfb17c5f82415141ef421efb4e3bc7 Mon Sep 17 00:00:00 2001
From: cnScarb <jjhfen00@163.com>
Date: Fri, 17 Nov 2023 15:58:14 +0800
Subject: [PATCH 4/5] [ISSUE #7570] Add default value for lastPopTimestamp
(#7571)
---
.../apache/rocketmq/client/impl/consumer/PopProcessQueue.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
index 3b39b86cc..50827545b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
@@ -26,7 +26,7 @@ public class PopProcessQueue {
private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
- private long lastPopTimestamp;
+ private long lastPopTimestamp = System.currentTimeMillis();
private AtomicInteger waitAckCounter = new AtomicInteger(0);
private volatile boolean dropped = false;
--
2.32.0.windows.2
From 8e7e2b5f50e0db14b77462ef1574d4020c0fd986 Mon Sep 17 00:00:00 2001
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
Date: Mon, 20 Nov 2023 19:32:57 +0800
Subject: [PATCH 5/5] [ISSUE #7574] Fix RunningFlags conflict
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
---
store/src/main/java/org/apache/rocketmq/store/RunningFlags.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
index 91fcb155a..88b398a77 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
@@ -30,7 +30,7 @@ public class RunningFlags {
private static final int FENCED_BIT = 1 << 5;
- private static final int LOGIC_DISK_FULL_BIT = 1 << 5;
+ private static final int LOGIC_DISK_FULL_BIT = 1 << 6;
private volatile int flagBits = 0;
--
2.32.0.windows.2