187 lines
9.9 KiB
Diff
187 lines
9.9 KiB
Diff
From c2c29c2435e0626cfe4f49830fbdc0d9421d82b5 Mon Sep 17 00:00:00 2001
|
|
From: lizhimins <707364882@qq.com>
|
|
Date: Mon, 4 Dec 2023 16:13:07 +0800
|
|
Subject: [PATCH 1/2] [ISSUE #7545] Fix set mapped file to null cause file can
|
|
not destroy (#7612)
|
|
|
|
---
|
|
.../rocketmq/tieredstore/index/IndexStoreFile.java | 2 --
|
|
.../rocketmq/tieredstore/index/IndexStoreService.java | 10 ++++++++++
|
|
2 files changed, 10 insertions(+), 2 deletions(-)
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
|
|
index 52a686f68..def5c8f2d 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
|
|
@@ -457,11 +457,9 @@ public class IndexStoreFile implements IndexFile {
|
|
this.fileStatus.set(IndexStatusEnum.SHUTDOWN);
|
|
if (this.mappedFile != null) {
|
|
this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
|
|
- this.mappedFile = null;
|
|
}
|
|
if (this.compactMappedFile != null) {
|
|
this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
|
|
- this.compactMappedFile = null;
|
|
}
|
|
} catch (Exception e) {
|
|
log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e);
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
|
|
index 14608aa58..e99ea0de1 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
|
|
@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReadWriteLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.rocketmq.common.ServiceThread;
|
|
+import org.apache.rocketmq.common.UtilAll;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
|
|
@@ -101,6 +102,10 @@ public class IndexStoreService extends ServiceThread implements IndexService {
|
|
private void recover() {
|
|
Stopwatch stopwatch = Stopwatch.createStarted();
|
|
|
|
+ // delete compact file directory
|
|
+ UtilAll.deleteFile(new File(Paths.get(storeConfig.getStorePathRootDir(),
|
|
+ FILE_DIRECTORY_NAME, FILE_COMPACTED_DIRECTORY_NAME).toString()));
|
|
+
|
|
// recover local
|
|
File dir = new File(Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME).toString());
|
|
this.doConvertOldFormatFile(Paths.get(dir.getPath(), "0000").toString());
|
|
@@ -141,6 +146,10 @@ public class IndexStoreService extends ServiceThread implements IndexService {
|
|
|
|
for (TieredFileSegment fileSegment : flatFile.getFileSegmentList()) {
|
|
IndexFile indexFile = new IndexStoreFile(storeConfig, fileSegment);
|
|
+ IndexFile localFile = timeStoreTable.get(indexFile.getTimestamp());
|
|
+ if (localFile != null) {
|
|
+ localFile.destroy();
|
|
+ }
|
|
timeStoreTable.put(indexFile.getTimestamp(), indexFile);
|
|
log.info("IndexStoreService recover load remote file, timestamp: {}", indexFile.getTimestamp());
|
|
}
|
|
@@ -248,6 +257,7 @@ public class IndexStoreService extends ServiceThread implements IndexService {
|
|
if (IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
|
|
log.error("IndexStoreService file status not correct, so skip, timestamp: {}, status: {}",
|
|
indexFile.getTimestamp(), indexFile.getFileStatus());
|
|
+ indexFile.destroy();
|
|
return;
|
|
}
|
|
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From faae64715d917bb5d64b8d72581172d26ebe9501 Mon Sep 17 00:00:00 2001
|
|
From: gaoyf <gaoyf@users.noreply.github.com>
|
|
Date: Thu, 7 Dec 2023 11:25:22 +0800
|
|
Subject: [PATCH 2/2] [ISSUE #7601] Fix slave acting master bug (#7603)
|
|
|
|
* fix NullPointerException when message escape to remote
|
|
|
|
* fix NumberFormatException when message retry to escape to remote
|
|
|
|
* fix timerCheckPoint of the master is not updated, causing the timer message to be replayed after master is restarted
|
|
|
|
* Use properties copies instead of referencing the same map when converting message
|
|
---
|
|
.../org/apache/rocketmq/broker/BrokerController.java | 1 +
|
|
.../rocketmq/broker/slave/SlaveSynchronize.java | 4 +++-
|
|
.../rocketmq/common/message/MessageAccessor.java | 7 +++++++
|
|
.../rocketmq/store/timer/TimerMessageStore.java | 12 +++++++++---
|
|
4 files changed, 20 insertions(+), 4 deletions(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
index 9f1fd0ad0..8d29d4438 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
@@ -2108,6 +2108,7 @@ public class BrokerController {
|
|
isScheduleServiceStart = shouldStart;
|
|
|
|
if (timerMessageStore != null) {
|
|
+ timerMessageStore.syncLastReadTimeMs();
|
|
timerMessageStore.setShouldRunningDequeue(shouldStart);
|
|
}
|
|
}
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
|
|
index 53cdecdf8..7f802adb9 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
|
|
@@ -215,11 +215,13 @@ public class SlaveSynchronize {
|
|
String masterAddrBak = this.masterAddr;
|
|
if (masterAddrBak != null) {
|
|
try {
|
|
- if (null != brokerController.getMessageStore().getTimerMessageStore()) {
|
|
+ if (null != brokerController.getMessageStore().getTimerMessageStore() &&
|
|
+ !brokerController.getTimerMessageStore().isShouldRunningDequeue()) {
|
|
TimerCheckpoint checkpoint = this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak);
|
|
if (null != this.brokerController.getTimerCheckpoint()) {
|
|
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs());
|
|
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset());
|
|
+ this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion());
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
|
|
index 1b7e2bba3..62e3bbd7e 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
|
|
@@ -17,6 +17,7 @@
|
|
|
|
package org.apache.rocketmq.common.message;
|
|
|
|
+import java.util.HashMap;
|
|
import java.util.Map;
|
|
|
|
public class MessageAccessor {
|
|
@@ -96,4 +97,10 @@ public class MessageAccessor {
|
|
return newMsg;
|
|
}
|
|
|
|
+ public static Map<String, String> deepCopyProperties(Map<String, String> properties) {
|
|
+ if (properties == null) {
|
|
+ return null;
|
|
+ }
|
|
+ return new HashMap<>(properties);
|
|
+ }
|
|
}
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
|
index d796e4467..872cd7105 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
|
@@ -602,6 +602,10 @@ public class TimerMessageStore {
|
|
this.shouldRunningDequeue = shouldRunningDequeue;
|
|
}
|
|
|
|
+ public boolean isShouldRunningDequeue() {
|
|
+ return shouldRunningDequeue;
|
|
+ }
|
|
+
|
|
public void addMetric(MessageExt msg, int value) {
|
|
try {
|
|
if (null == msg || null == msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
|
|
@@ -1084,8 +1088,10 @@ public class TimerMessageStore {
|
|
case PUT_OK:
|
|
if (brokerStatsManager != null) {
|
|
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
|
|
- this.brokerStatsManager.incTopicPutSize(message.getTopic(),
|
|
- putMessageResult.getAppendMessageResult().getWroteBytes());
|
|
+ if (putMessageResult.getAppendMessageResult() != null) {
|
|
+ this.brokerStatsManager.incTopicPutSize(message.getTopic(),
|
|
+ putMessageResult.getAppendMessageResult().getWroteBytes());
|
|
+ }
|
|
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
|
|
}
|
|
return PUT_OK;
|
|
@@ -1119,7 +1125,7 @@ public class TimerMessageStore {
|
|
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
|
|
msgInner.setBody(msgExt.getBody());
|
|
msgInner.setFlag(msgExt.getFlag());
|
|
- MessageAccessor.setProperties(msgInner, msgExt.getProperties());
|
|
+ MessageAccessor.setProperties(msgInner, MessageAccessor.deepCopyProperties(msgExt.getProperties()));
|
|
TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
|
|
long tagsCodeValue =
|
|
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
|
|
--
|
|
2.32.0.windows.2
|
|
|