!48 backport fix some buges

From: @zhiliatox 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
This commit is contained in:
openeuler-ci-bot 2023-12-12 01:53:54 +00:00 committed by Gitee
commit 5c945d3503
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
2 changed files with 191 additions and 1 deletions

View File

@ -0,0 +1,186 @@
From c2c29c2435e0626cfe4f49830fbdc0d9421d82b5 Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Mon, 4 Dec 2023 16:13:07 +0800
Subject: [PATCH 1/2] [ISSUE #7545] Fix set mapped file to null cause file can
not destroy (#7612)
---
.../rocketmq/tieredstore/index/IndexStoreFile.java | 2 --
.../rocketmq/tieredstore/index/IndexStoreService.java | 10 ++++++++++
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index 52a686f68..def5c8f2d 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -457,11 +457,9 @@ public class IndexStoreFile implements IndexFile {
this.fileStatus.set(IndexStatusEnum.SHUTDOWN);
if (this.mappedFile != null) {
this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
- this.mappedFile = null;
}
if (this.compactMappedFile != null) {
this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
- this.compactMappedFile = null;
}
} catch (Exception e) {
log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e);
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 14608aa58..e99ea0de1 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
@@ -101,6 +102,10 @@ public class IndexStoreService extends ServiceThread implements IndexService {
private void recover() {
Stopwatch stopwatch = Stopwatch.createStarted();
+ // delete compact file directory
+ UtilAll.deleteFile(new File(Paths.get(storeConfig.getStorePathRootDir(),
+ FILE_DIRECTORY_NAME, FILE_COMPACTED_DIRECTORY_NAME).toString()));
+
// recover local
File dir = new File(Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME).toString());
this.doConvertOldFormatFile(Paths.get(dir.getPath(), "0000").toString());
@@ -141,6 +146,10 @@ public class IndexStoreService extends ServiceThread implements IndexService {
for (TieredFileSegment fileSegment : flatFile.getFileSegmentList()) {
IndexFile indexFile = new IndexStoreFile(storeConfig, fileSegment);
+ IndexFile localFile = timeStoreTable.get(indexFile.getTimestamp());
+ if (localFile != null) {
+ localFile.destroy();
+ }
timeStoreTable.put(indexFile.getTimestamp(), indexFile);
log.info("IndexStoreService recover load remote file, timestamp: {}", indexFile.getTimestamp());
}
@@ -248,6 +257,7 @@ public class IndexStoreService extends ServiceThread implements IndexService {
if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
log.error("IndexStoreService file status not correct, so skip, timestamp: {}, status: {}",
indexFile.getTimestamp(), indexFile.getFileStatus());
+ indexFile.destroy();
return;
}
--
2.32.0.windows.2
From faae64715d917bb5d64b8d72581172d26ebe9501 Mon Sep 17 00:00:00 2001
From: gaoyf <gaoyf@users.noreply.github.com>
Date: Thu, 7 Dec 2023 11:25:22 +0800
Subject: [PATCH 2/2] [ISSUE #7601] Fix slave acting master bug (#7603)
* fix NullPointerException when message escape to remote
* fix NumberFormatException when message retry to escape to remote
* fix timerCheckPoint of the master is not updated, causing the timer message to be replayed after master is restarted
* Use properties copies instead of referencing the same map when converting message
---
.../org/apache/rocketmq/broker/BrokerController.java | 1 +
.../rocketmq/broker/slave/SlaveSynchronize.java | 4 +++-
.../rocketmq/common/message/MessageAccessor.java | 7 +++++++
.../rocketmq/store/timer/TimerMessageStore.java | 12 +++++++++---
4 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9f1fd0ad0..8d29d4438 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -2108,6 +2108,7 @@ public class BrokerController {
isScheduleServiceStart = shouldStart;
if (timerMessageStore != null) {
+ timerMessageStore.syncLastReadTimeMs();
timerMessageStore.setShouldRunningDequeue(shouldStart);
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 53cdecdf8..7f802adb9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -215,11 +215,13 @@ public class SlaveSynchronize {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
- if (null != brokerController.getMessageStore().getTimerMessageStore()) {
+ if (null != brokerController.getMessageStore().getTimerMessageStore() &&
+ !brokerController.getTimerMessageStore().isShouldRunningDequeue()) {
TimerCheckpoint checkpoint = this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak);
if (null != this.brokerController.getTimerCheckpoint()) {
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs());
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset());
+ this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion());
}
}
} catch (Exception e) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
index 1b7e2bba3..62e3bbd7e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.common.message;
+import java.util.HashMap;
import java.util.Map;
public class MessageAccessor {
@@ -96,4 +97,10 @@ public class MessageAccessor {
return newMsg;
}
+ public static Map<String, String> deepCopyProperties(Map<String, String> properties) {
+ if (properties == null) {
+ return null;
+ }
+ return new HashMap<>(properties);
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index d796e4467..872cd7105 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -602,6 +602,10 @@ public class TimerMessageStore {
this.shouldRunningDequeue = shouldRunningDequeue;
}
+ public boolean isShouldRunningDequeue() {
+ return shouldRunningDequeue;
+ }
+
public void addMetric(MessageExt msg, int value) {
try {
if (null == msg || null == msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
@@ -1084,8 +1088,10 @@ public class TimerMessageStore {
case PUT_OK:
if (brokerStatsManager != null) {
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
- this.brokerStatsManager.incTopicPutSize(message.getTopic(),
- putMessageResult.getAppendMessageResult().getWroteBytes());
+ if (putMessageResult.getAppendMessageResult() != null) {
+ this.brokerStatsManager.incTopicPutSize(message.getTopic(),
+ putMessageResult.getAppendMessageResult().getWroteBytes());
+ }
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
}
return PUT_OK;
@@ -1119,7 +1125,7 @@ public class TimerMessageStore {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
- MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+ MessageAccessor.setProperties(msgInner, MessageAccessor.deepCopyProperties(msgExt.getProperties()));
TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
long tagsCodeValue =
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
--
2.32.0.windows.2

View File

@ -5,7 +5,7 @@
Summary: Cloud-Native, Distributed Messaging and Streaming
Name: rocketmq
Version: 5.1.5
Release: 43
Release: 44
License: Apache-2.0
Group: Applications/Message
URL: https://rocketmq.apache.org/
@ -52,6 +52,7 @@ Patch0039: patch039-backport-add-some-validations.patch
Patch0040: patch040-backport-add-some-test-cases.patch
Patch0041: patch041-backport-improve-performance.patch
Patch0042: patch042-backport-Support-message-filtering.patch
Patch0043: patch043-backport-fix-some-bugs.patch
BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git
Requires: java-1.8.0-openjdk-devel
@ -92,6 +93,9 @@ exit 0
%changelog
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-44
- backport fix some bugs
* Mon Dec 11 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-43
- backport Support message filtering