345 lines
15 KiB
Diff
345 lines
15 KiB
Diff
From dc3f22ffe9eb83ace991b68921076093c7c0da5f Mon Sep 17 00:00:00 2001
|
|
From: LetLetMe <43874697+LetLetMe@users.noreply.github.com>
|
|
Date: Tue, 10 Oct 2023 17:39:23 +0800
|
|
Subject: [PATCH 1/6] add getter for class Message ,fix json serialize bug
|
|
(#7439)
|
|
|
|
Co-authored-by: LetLetMe <allen.hyt@alibaba-inc.com>
|
|
---
|
|
.../rocketmq/common/message/Message.java | 24 ++++++++++++++++++-
|
|
1 file changed, 23 insertions(+), 1 deletion(-)
|
|
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
|
|
index e02b526a1..c7997c473 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
|
|
@@ -218,14 +218,36 @@ public class Message implements Serializable {
|
|
public void setDelayTimeSec(long sec) {
|
|
this.putProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC, String.valueOf(sec));
|
|
}
|
|
+
|
|
+ public long getDelayTimeSec() {
|
|
+ String t = this.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC);
|
|
+ if (t != null) {
|
|
+ return Long.parseLong(t);
|
|
+ }
|
|
+ return 0;
|
|
+ }
|
|
+
|
|
public void setDelayTimeMs(long timeMs) {
|
|
this.putProperty(MessageConst.PROPERTY_TIMER_DELAY_MS, String.valueOf(timeMs));
|
|
}
|
|
+
|
|
+ public long getDelayTimeMs() {
|
|
+ String t = this.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS);
|
|
+ if (t != null) {
|
|
+ return Long.parseLong(t);
|
|
+ }
|
|
+ return 0;
|
|
+ }
|
|
+
|
|
public void setDeliverTimeMs(long timeMs) {
|
|
this.putProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(timeMs));
|
|
}
|
|
|
|
public long getDeliverTimeMs() {
|
|
- return Long.parseLong(this.getUserProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS));
|
|
+ String t = this.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS);
|
|
+ if (t != null) {
|
|
+ return Long.parseLong(t);
|
|
+ }
|
|
+ return 0;
|
|
}
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 7e4879a3bc120d6289aabc8354a2811f349ac8a6 Mon Sep 17 00:00:00 2001
|
|
From: fujian-zfj <2573259572@qq.com>
|
|
Date: Wed, 11 Oct 2023 14:45:07 +0800
|
|
Subject: [PATCH 2/6] [ISSUE #7441] Fix log "Init the confirmOffset" keep
|
|
printing error in controller mode (#7442)
|
|
|
|
* typo int readme[ecosystem]
|
|
|
|
* fix keep printing log problem
|
|
---
|
|
store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 2 +-
|
|
1 file changed, 1 insertion(+), 1 deletion(-)
|
|
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
index 456bf2b86..f98e9a284 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
@@ -580,7 +580,7 @@ public class CommitLog implements Swappable {
|
|
return this.defaultMessageStore.getMaxPhyOffset();
|
|
}
|
|
// First time it will compute the confirmOffset.
|
|
- if (this.confirmOffset <= 0) {
|
|
+ if (this.confirmOffset < 0) {
|
|
setConfirmOffset(((AutoSwitchHAService) this.defaultMessageStore.getHaService()).computeConfirmOffset());
|
|
log.info("Init the confirmOffset to {}.", this.confirmOffset);
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 5d492c338258d07613103e6ae16df4c6fa5b3838 Mon Sep 17 00:00:00 2001
|
|
From: rongtong <jinrongtong5@163.com>
|
|
Date: Fri, 13 Oct 2023 11:23:30 +0800
|
|
Subject: [PATCH 3/6] [ISSUE #7444] Fix testCalculateFileSizeInPath test can
|
|
not rerun in same environment (#7445)
|
|
|
|
* Fix testCalculateFileSizeInPath test can not rerun in same environment
|
|
|
|
* Ensure that files are always deleted
|
|
---
|
|
.../apache/rocketmq/common/UtilAllTest.java | 83 +++++++++++--------
|
|
1 file changed, 48 insertions(+), 35 deletions(-)
|
|
|
|
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
|
|
index f568a65f4..a0653d7fc 100644
|
|
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
|
|
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
|
|
@@ -238,41 +238,54 @@ public class UtilAllTest {
|
|
*/
|
|
String basePath = System.getProperty("java.io.tmpdir") + File.separator + "testCalculateFileSizeInPath";
|
|
File baseFile = new File(basePath);
|
|
- // test empty path
|
|
- assertEquals(0, UtilAll.calculateFileSizeInPath(baseFile));
|
|
-
|
|
- // create baseDir
|
|
- assertTrue(baseFile.mkdirs());
|
|
-
|
|
- File file0 = new File(baseFile, "file_0");
|
|
- assertTrue(file0.createNewFile());
|
|
- writeFixedBytesToFile(file0, 1313);
|
|
-
|
|
- assertEquals(1313, UtilAll.calculateFileSizeInPath(baseFile));
|
|
-
|
|
- // build a file tree like above
|
|
- File dir1 = new File(baseFile, "dir_1");
|
|
- dir1.mkdirs();
|
|
- File file10 = new File(dir1, "file_1_0");
|
|
- File file11 = new File(dir1, "file_1_1");
|
|
- File dir12 = new File(dir1, "dir_1_2");
|
|
- dir12.mkdirs();
|
|
- File file120 = new File(dir12, "file_1_2_0");
|
|
- File dir2 = new File(baseFile, "dir_2");
|
|
- dir2.mkdirs();
|
|
-
|
|
- // write all file with 1313 bytes data
|
|
- assertTrue(file10.createNewFile());
|
|
- writeFixedBytesToFile(file10, 1313);
|
|
- assertTrue(file11.createNewFile());
|
|
- writeFixedBytesToFile(file11, 1313);
|
|
- assertTrue(file120.createNewFile());
|
|
- writeFixedBytesToFile(file120, 1313);
|
|
-
|
|
- assertEquals(1313 * 4, UtilAll.calculateFileSizeInPath(baseFile));
|
|
-
|
|
- // clear all file
|
|
- baseFile.deleteOnExit();
|
|
+ try {
|
|
+ // test empty path
|
|
+ assertEquals(0, UtilAll.calculateFileSizeInPath(baseFile));
|
|
+
|
|
+ // create baseDir
|
|
+ assertTrue(baseFile.mkdirs());
|
|
+
|
|
+ File file0 = new File(baseFile, "file_0");
|
|
+ assertTrue(file0.createNewFile());
|
|
+ writeFixedBytesToFile(file0, 1313);
|
|
+
|
|
+ assertEquals(1313, UtilAll.calculateFileSizeInPath(baseFile));
|
|
+
|
|
+ // build a file tree like above
|
|
+ File dir1 = new File(baseFile, "dir_1");
|
|
+ dir1.mkdirs();
|
|
+ File file10 = new File(dir1, "file_1_0");
|
|
+ File file11 = new File(dir1, "file_1_1");
|
|
+ File dir12 = new File(dir1, "dir_1_2");
|
|
+ dir12.mkdirs();
|
|
+ File file120 = new File(dir12, "file_1_2_0");
|
|
+ File dir2 = new File(baseFile, "dir_2");
|
|
+ dir2.mkdirs();
|
|
+
|
|
+ // write all file with 1313 bytes data
|
|
+ assertTrue(file10.createNewFile());
|
|
+ writeFixedBytesToFile(file10, 1313);
|
|
+ assertTrue(file11.createNewFile());
|
|
+ writeFixedBytesToFile(file11, 1313);
|
|
+ assertTrue(file120.createNewFile());
|
|
+ writeFixedBytesToFile(file120, 1313);
|
|
+
|
|
+ assertEquals(1313 * 4, UtilAll.calculateFileSizeInPath(baseFile));
|
|
+ } finally {
|
|
+ deleteFolder(baseFile);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public static void deleteFolder(File folder) {
|
|
+ if (folder.isDirectory()) {
|
|
+ File[] files = folder.listFiles();
|
|
+ if (files != null) {
|
|
+ for (File file : files) {
|
|
+ deleteFolder(file);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ folder.delete();
|
|
}
|
|
|
|
private void writeFixedBytesToFile(File file, int size) throws Exception {
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 28427d40129e3aa0c6f951535617e5cac0a8211b Mon Sep 17 00:00:00 2001
|
|
From: Lei Sun <yuncun.sl@alibaba-inc.com>
|
|
Date: Fri, 13 Oct 2023 13:42:27 +0800
|
|
Subject: [PATCH 4/6] [ISSUE #7425] Add RoccketmqControllerConsole log to fix
|
|
bug (#7458)
|
|
|
|
---
|
|
.../org/apache/rocketmq/common/constant/LoggerName.java | 1 +
|
|
.../org/apache/rocketmq/controller/ControllerStartup.java | 7 ++++---
|
|
controller/src/main/resources/rmq.controller.logback.xml | 4 ++++
|
|
3 files changed, 9 insertions(+), 3 deletions(-)
|
|
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
|
|
index cb04b00b3..61310893f 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
|
|
@@ -21,6 +21,7 @@ public class LoggerName {
|
|
public static final String NAMESRV_LOGGER_NAME = "RocketmqNamesrv";
|
|
public static final String NAMESRV_CONSOLE_LOGGER_NAME = "RocketmqNamesrvConsole";
|
|
public static final String CONTROLLER_LOGGER_NAME = "RocketmqController";
|
|
+ public static final String CONTROLLER_CONSOLE_NAME = "RocketmqControllerConsole";
|
|
public static final String NAMESRV_WATER_MARK_LOGGER_NAME = "RocketmqNamesrvWaterMark";
|
|
public static final String BROKER_LOGGER_NAME = "RocketmqBroker";
|
|
public static final String BROKER_CONSOLE_NAME = "RocketmqConsole";
|
|
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java
|
|
index 401720d05..9e96a704d 100644
|
|
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java
|
|
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java
|
|
@@ -94,9 +94,10 @@ public class ControllerStartup {
|
|
}
|
|
|
|
if (commandLine.hasOption('p')) {
|
|
- MixAll.printObjectProperties(null, controllerConfig);
|
|
- MixAll.printObjectProperties(null, nettyServerConfig);
|
|
- MixAll.printObjectProperties(null, nettyClientConfig);
|
|
+ Logger console = LoggerFactory.getLogger(LoggerName.CONTROLLER_CONSOLE_NAME);
|
|
+ MixAll.printObjectProperties(console, controllerConfig);
|
|
+ MixAll.printObjectProperties(console, nettyServerConfig);
|
|
+ MixAll.printObjectProperties(console, nettyClientConfig);
|
|
System.exit(0);
|
|
}
|
|
|
|
diff --git a/controller/src/main/resources/rmq.controller.logback.xml b/controller/src/main/resources/rmq.controller.logback.xml
|
|
index bb158213a..18083e8f9 100644
|
|
--- a/controller/src/main/resources/rmq.controller.logback.xml
|
|
+++ b/controller/src/main/resources/rmq.controller.logback.xml
|
|
@@ -116,6 +116,10 @@
|
|
<appender-ref ref="RocketmqControllerAppender"/>
|
|
</logger>
|
|
|
|
+ <logger name="RocketmqControllerConsole" additivity="false" level="INFO">
|
|
+ <appender-ref ref="STDOUT"/>
|
|
+ </logger>
|
|
+
|
|
<logger name="RocketmqCommon" additivity="false" level="INFO">
|
|
<appender-ref ref="RocketmqControllerAppender"/>
|
|
</logger>
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From dc62d7f2e1ce4f99364599f8e23d65fd88eb1cd4 Mon Sep 17 00:00:00 2001
|
|
From: LetLetMe <43874697+LetLetMe@users.noreply.github.com>
|
|
Date: Fri, 13 Oct 2023 13:45:48 +0800
|
|
Subject: [PATCH 5/6] [ISSUE #7451] Override toString for
|
|
TopicConfigAndQueueMapping
|
|
|
|
---
|
|
.../statictopic/TopicConfigAndQueueMapping.java | 10 ++++++++++
|
|
1 file changed, 10 insertions(+)
|
|
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicConfigAndQueueMapping.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicConfigAndQueueMapping.java
|
|
index c937fec23..d13692735 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicConfigAndQueueMapping.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicConfigAndQueueMapping.java
|
|
@@ -16,6 +16,7 @@
|
|
*/
|
|
package org.apache.rocketmq.remoting.protocol.statictopic;
|
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.builder.EqualsBuilder;
|
|
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
|
import org.apache.rocketmq.common.TopicConfig;
|
|
@@ -60,4 +61,13 @@ public class TopicConfigAndQueueMapping extends TopicConfig {
|
|
.append(mappingDetail)
|
|
.toHashCode();
|
|
}
|
|
+
|
|
+ @Override
|
|
+ public String toString() {
|
|
+ String string = super.toString();
|
|
+ if (StringUtils.isNotBlank(string)) {
|
|
+ string = string.substring(0, string.length() - 1) + ", mappingDetail=" + mappingDetail + "]";
|
|
+ }
|
|
+ return string;
|
|
+ }
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 2113fa371b9c2bf7c512f8ad234e51c616f1362c Mon Sep 17 00:00:00 2001
|
|
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
|
|
Date: Fri, 13 Oct 2023 13:47:09 +0800
|
|
Subject: [PATCH 6/6] [ISSUE #7453] Fix the problem in constructing the
|
|
GetMessageResult (#7456)
|
|
|
|
* Fix the problem in constructing the GetMessageResult
|
|
|
|
* Optimize the initialization size of GetMessageResult
|
|
---
|
|
.../apache/rocketmq/broker/processor/PeekMessageProcessor.java | 3 +--
|
|
.../apache/rocketmq/broker/processor/PopMessageProcessor.java | 3 +--
|
|
2 files changed, 2 insertions(+), 4 deletions(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
|
|
index a8358c4ff..e1e0e13e5 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
|
|
@@ -129,8 +129,7 @@ public class PeekMessageProcessor implements NettyRequestProcessor {
|
|
}
|
|
int randomQ = random.nextInt(100);
|
|
int reviveQid = randomQ % this.brokerController.getBrokerConfig().getReviveQueueNum();
|
|
- int commercialSizePerMsg = this.brokerController.getBrokerConfig().getCommercialSizePerMsg();
|
|
- GetMessageResult getMessageResult = new GetMessageResult(commercialSizePerMsg);
|
|
+ GetMessageResult getMessageResult = new GetMessageResult(requestHeader.getMaxMsgNums());
|
|
boolean needRetry = randomQ % 5 == 0;
|
|
long popTime = System.currentTimeMillis();
|
|
long restNum = 0;
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
|
|
index 441f7de08..0d9bdf143 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
|
|
@@ -347,8 +347,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
|
|
reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum());
|
|
}
|
|
|
|
- int commercialSizePerMsg = this.brokerController.getBrokerConfig().getCommercialSizePerMsg();
|
|
- GetMessageResult getMessageResult = new GetMessageResult(commercialSizePerMsg);
|
|
+ GetMessageResult getMessageResult = new GetMessageResult(requestHeader.getMaxMsgNums());
|
|
ExpressionMessageFilter finalMessageFilter = messageFilter;
|
|
StringBuilder finalOrderCountInfo = orderCountInfo;
|
|
|
|
--
|
|
2.32.0.windows.2
|
|
|