rocketmq/patch010-backport-add-some-fixes.patch
2023-10-01 10:27:45 +08:00

1287 lines
61 KiB
Diff
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

From b2deef179dbc6a9eb1a2b6dd7b652d95cb768295 Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Thu, 10 Aug 2023 10:38:47 +0800
Subject: [PATCH 01/12] [ISSUE #7144] Accelerate the recovery speed of the
tiered storage module (#7145)
---
.../tieredstore/TieredDispatcher.java | 3 +
.../tieredstore/TieredMessageStore.java | 2 +-
.../common/TieredStoreExecutor.java | 25 ++--
.../tieredstore/file/CompositeFlatFile.java | 15 +-
.../file/CompositeQueueFlatFile.java | 20 ++-
.../tieredstore/file/TieredCommitLog.java | 24 +++-
.../tieredstore/file/TieredFlatFile.java | 42 +++---
.../file/TieredFlatFileManager.java | 135 ++++++++++--------
.../metadata/FileSegmentMetadata.java | 26 +++-
.../tieredstore/TieredDispatcherTest.java | 15 +-
.../tieredstore/TieredMessageFetcherTest.java | 2 +-
.../file/CompositeQueueFlatFileTest.java | 2 +-
.../file/TieredFlatFileManagerTest.java | 7 +-
13 files changed, 194 insertions(+), 124 deletions(-)
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index bb58ea7dd..1746190cd 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -279,6 +279,9 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
long upperBound = Math.min(dispatchOffset + maxCount, maxOffsetInQueue);
ConsumeQueue consumeQueue = (ConsumeQueue) defaultStore.getConsumeQueue(topic, queueId);
+ logger.debug("DispatchFlatFile race, topic={}, queueId={}, cq range={}-{}, dispatch offset={}-{}",
+ topic, queueId, minOffsetInQueue, maxOffsetInQueue, dispatchOffset, upperBound - 1);
+
for (; dispatchOffset < upperBound; dispatchOffset++) {
// get consume queue
SelectMappedBufferResult cqItem = consumeQueue.getIndexBuffer(dispatchOffset);
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 1f12410f2..ced1fb818 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -147,7 +147,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) {
if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) {
- logger.debug("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset);
+ logger.trace("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset);
return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
index 6eb3478b3..6dd0e8846 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
@@ -43,18 +43,9 @@ public class TieredStoreExecutor {
public static ExecutorService compactIndexFileExecutor;
public static void init() {
- dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
- dispatchExecutor = new ThreadPoolExecutor(
- Math.max(2, Runtime.getRuntime().availableProcessors()),
- Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- dispatchThreadPoolQueue,
- new ThreadFactoryImpl("TieredCommonExecutor_"));
-
commonScheduledExecutor = new ScheduledThreadPoolExecutor(
Math.max(4, Runtime.getRuntime().availableProcessors()),
- new ThreadFactoryImpl("TieredCommonScheduledExecutor_"));
+ new ThreadFactoryImpl("TieredCommonExecutor_"));
commitExecutor = new ScheduledThreadPoolExecutor(
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
@@ -62,7 +53,17 @@ public class TieredStoreExecutor {
cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor(
Math.max(4, Runtime.getRuntime().availableProcessors()),
- new ThreadFactoryImpl("TieredCleanExpiredFileExecutor_"));
+ new ThreadFactoryImpl("TieredCleanFileExecutor_"));
+
+ dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
+ dispatchExecutor = new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
+ Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ dispatchThreadPoolQueue,
+ new ThreadFactoryImpl("TieredDispatchExecutor_"),
+ new ThreadPoolExecutor.DiscardOldestPolicy());
fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
fetchDataExecutor = new ThreadPoolExecutor(
@@ -71,7 +72,7 @@ public class TieredStoreExecutor {
1000 * 60,
TimeUnit.MILLISECONDS,
fetchDataThreadPoolQueue,
- new ThreadFactoryImpl("TieredFetchDataExecutor_"));
+ new ThreadFactoryImpl("TieredFetchExecutor_"));
compactIndexFileThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
compactIndexFileExecutor = new ThreadPoolExecutor(
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
index df4baf33f..5ad3a6ff3 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
@@ -76,20 +76,15 @@ public class CompositeFlatFile implements CompositeAccess {
this.storeConfig = fileQueueFactory.getStoreConfig();
this.readAheadFactor = this.storeConfig.getReadAheadMinFactor();
this.metadataStore = TieredStoreUtil.getMetadataStore(this.storeConfig);
- this.dispatchOffset = new AtomicLong();
this.compositeFlatFileLock = new ReentrantLock();
this.inFlightRequestMap = new ConcurrentHashMap<>();
this.commitLog = new TieredCommitLog(fileQueueFactory, filePath);
this.consumeQueue = new TieredConsumeQueue(fileQueueFactory, filePath);
+ this.dispatchOffset = new AtomicLong(
+ this.consumeQueue.isInitialized() ? this.getConsumeQueueCommitOffset() : -1L);
this.groupOffsetCache = this.initOffsetCache();
}
- protected void recoverMetadata() {
- if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) {
- consumeQueue.setBaseOffset(this.dispatchOffset.get() * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
- }
- }
-
private Cache<String, Long> initOffsetCache() {
return Caffeine.newBuilder()
.expireAfterWrite(2, TimeUnit.MINUTES)
@@ -310,10 +305,12 @@ public class CompositeFlatFile implements CompositeAccess {
@Override
public void initOffset(long offset) {
- if (!consumeQueue.isInitialized()) {
+ if (consumeQueue.isInitialized()) {
+ dispatchOffset.set(this.getConsumeQueueCommitOffset());
+ } else {
consumeQueue.setBaseOffset(offset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ dispatchOffset.set(offset);
}
- dispatchOffset.set(offset);
}
@Override
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
index f6c0afed0..0a797f465 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
@@ -36,8 +36,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, MessageQueue messageQueue) {
super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue));
this.messageQueue = messageQueue;
- this.recoverTopicMetadata();
- super.recoverMetadata();
+ this.recoverQueueMetadata();
this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
}
@@ -46,11 +45,12 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
if (!consumeQueue.isInitialized()) {
queueMetadata.setMinOffset(offset);
queueMetadata.setMaxOffset(offset);
+ metadataStore.updateQueue(queueMetadata);
}
super.initOffset(offset);
}
- public void recoverTopicMetadata() {
+ public void recoverQueueMetadata() {
TopicMetadata topicMetadata = this.metadataStore.getTopic(messageQueue.getTopic());
if (topicMetadata == null) {
topicMetadata = this.metadataStore.addTopic(messageQueue.getTopic(), -1L);
@@ -64,18 +64,16 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) {
queueMetadata.setMaxOffset(queueMetadata.getMinOffset());
}
- this.dispatchOffset.set(queueMetadata.getMaxOffset());
}
- public void persistMetadata() {
+ public void flushMetadata() {
try {
- if (consumeQueue.getCommitOffset() < queueMetadata.getMinOffset()) {
- return;
- }
- queueMetadata.setMaxOffset(consumeQueue.getCommitOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ queueMetadata.setMinOffset(super.getConsumeQueueMinOffset());
+ queueMetadata.setMaxOffset(super.getConsumeQueueMaxOffset());
metadataStore.updateQueue(queueMetadata);
} catch (Exception e) {
- LOGGER.error("CompositeFlatFile#flushMetadata: update queue metadata failed: topic: {}, queue: {}", messageQueue.getTopic(), messageQueue.getQueueId(), e);
+ LOGGER.error("CompositeFlatFile#flushMetadata error, topic: {}, queue: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId(), e);
}
}
@@ -114,7 +112,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
@Override
public void shutdown() {
super.shutdown();
- metadataStore.updateQueue(queueMetadata);
+ this.flushMetadata();
}
@Override
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
index 80e1bce50..0e5f79132 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
@@ -50,7 +50,7 @@ public class TieredCommitLog {
this.storeConfig = fileQueueFactory.getStoreConfig();
this.flatFile = fileQueueFactory.createFlatFileForCommitLog(filePath);
this.minConsumeQueueOffset = new AtomicLong(NOT_EXIST_MIN_OFFSET);
- this.correctMinOffset();
+ this.correctMinOffsetAsync();
}
@VisibleForTesting
@@ -91,17 +91,26 @@ public class TieredCommitLog {
return flatFile.getFileToWrite().getMaxTimestamp();
}
- public synchronized long correctMinOffset() {
+ public long correctMinOffset() {
+ try {
+ return correctMinOffsetAsync().get();
+ } catch (Exception e) {
+ log.error("Correct min offset failed in clean expired file", e);
+ }
+ return NOT_EXIST_MIN_OFFSET;
+ }
+
+ public synchronized CompletableFuture<Long> correctMinOffsetAsync() {
if (flatFile.getFileSegmentCount() == 0) {
this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
- return NOT_EXIST_MIN_OFFSET;
+ return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET);
}
// queue offset field length is 8
int length = MessageBufferUtil.QUEUE_OFFSET_POSITION + 8;
if (flatFile.getCommitOffset() - flatFile.getMinOffset() < length) {
this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
- return NOT_EXIST_MIN_OFFSET;
+ return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET);
}
try {
@@ -109,7 +118,8 @@ public class TieredCommitLog {
.thenApply(buffer -> {
long offset = MessageBufferUtil.getQueueOffset(buffer);
minConsumeQueueOffset.set(offset);
- log.info("Correct commitlog min cq offset success, filePath={}, min cq offset={}, range={}-{}",
+ log.debug("Correct commitlog min cq offset success, " +
+ "filePath={}, min cq offset={}, commitlog range={}-{}",
flatFile.getFilePath(), offset, flatFile.getMinOffset(), flatFile.getCommitOffset());
return offset;
})
@@ -117,11 +127,11 @@ public class TieredCommitLog {
log.warn("Correct commitlog min cq offset error, filePath={}, range={}-{}",
flatFile.getFilePath(), flatFile.getMinOffset(), flatFile.getCommitOffset(), throwable);
return minConsumeQueueOffset.get();
- }).get();
+ });
} catch (Exception e) {
log.error("Correct commitlog min cq offset error, filePath={}", flatFile.getFilePath(), e);
}
- return minConsumeQueueOffset.get();
+ return CompletableFuture.completedFuture(minConsumeQueueOffset.get());
}
public AppendResult append(ByteBuffer byteBuf) {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index 75ce8d89f..426c4e09d 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.tieredstore.file;
+import com.alibaba.fastjson.JSON;
import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -24,6 +25,7 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -178,32 +180,26 @@ public class TieredFlatFile {
private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) {
FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
- fileSegment.getPath(), fileSegment.getFileType(), fileSegment.getBaseOffset());
-
- if (metadata != null) {
- return metadata;
- }
+ this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
// Note: file segment path may not the same as file base path, use base path here.
- metadata = new FileSegmentMetadata(
- this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType());
-
- if (fileSegment.isClosed()) {
- metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+ if (metadata == null) {
+ metadata = new FileSegmentMetadata(
+ this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType());
+ metadata.setCreateTimestamp(fileSegment.getMinTimestamp());
+ metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
+ metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
+ if (fileSegment.isClosed()) {
+ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+ }
+ this.tieredMetadataStore.updateFileSegment(metadata);
}
-
- metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
- metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
-
- // Submit to persist
- this.tieredMetadataStore.updateFileSegment(metadata);
return metadata;
}
/**
* FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full
*/
- @VisibleForTesting
public void updateFileSegment(TieredFileSegment fileSegment) {
FileSegmentMetadata segmentMetadata = getOrCreateFileSegmentMetadata(fileSegment);
@@ -219,9 +215,16 @@ public class TieredFlatFile {
}
segmentMetadata.setSize(fileSegment.getCommitPosition());
- segmentMetadata.setBeginTimestamp(fileSegment.getMinTimestamp());
segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp());
- this.tieredMetadataStore.updateFileSegment(segmentMetadata);
+
+ FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
+ this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
+
+ if (!Objects.equals(metadata, segmentMetadata)) {
+ this.tieredMetadataStore.updateFileSegment(segmentMetadata);
+ logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, content: {}",
+ segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata));
+ }
}
private void checkAndFixFileSize() {
@@ -257,6 +260,7 @@ public class TieredFlatFile {
logger.warn("TieredFlatFile#checkAndFixFileSize: fix last file {} size: origin: {}, actual: {}",
lastFile.getPath(), lastFile.getCommitOffset() - lastFile.getBaseOffset(), lastFileSize);
lastFile.initPosition(lastFileSize);
+ this.updateFileSegment(lastFile);
}
}
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index aeca44b8c..e9ae4a5a5 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -16,16 +16,19 @@
*/
package org.apache.rocketmq.tieredstore.file;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -36,6 +39,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
public class TieredFlatFileManager {
+ private static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
private static volatile TieredFlatFileManager instance;
@@ -44,7 +48,7 @@ public class TieredFlatFileManager {
private final TieredMetadataStore metadataStore;
private final TieredMessageStoreConfig storeConfig;
private final TieredFileAllocator tieredFileAllocator;
- private final ConcurrentMap<MessageQueue, CompositeQueueFlatFile> queueFlatFileMap;
+ private final ConcurrentMap<MessageQueue, CompositeQueueFlatFile> flatFileConcurrentMap;
public TieredFlatFileManager(TieredMessageStoreConfig storeConfig)
throws ClassNotFoundException, NoSuchMethodException {
@@ -52,23 +56,20 @@ public class TieredFlatFileManager {
this.storeConfig = storeConfig;
this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
this.tieredFileAllocator = new TieredFileAllocator(storeConfig);
- this.queueFlatFileMap = new ConcurrentHashMap<>();
+ this.flatFileConcurrentMap = new ConcurrentHashMap<>();
this.doScheduleTask();
}
public static TieredFlatFileManager getInstance(TieredMessageStoreConfig storeConfig) {
- if (storeConfig == null) {
+ if (storeConfig == null || instance != null) {
return instance;
}
-
- if (instance == null) {
- synchronized (TieredFlatFileManager.class) {
- if (instance == null) {
- try {
- instance = new TieredFlatFileManager(storeConfig);
- } catch (Exception e) {
- logger.error("TieredFlatFileManager#getInstance: create flat file manager failed", e);
- }
+ synchronized (TieredFlatFileManager.class) {
+ if (instance == null) {
+ try {
+ instance = new TieredFlatFileManager(storeConfig);
+ } catch (Exception e) {
+ logger.error("Construct FlatFileManager instance error", e);
}
}
}
@@ -88,7 +89,7 @@ public class TieredFlatFileManager {
TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0));
indexFile = new TieredIndexFile(new TieredFileAllocator(storeConfig), filePath);
} catch (Exception e) {
- logger.error("TieredFlatFileManager#getIndexFile: create index file failed", e);
+ logger.error("Construct FlatFileManager indexFile error", e);
}
}
}
@@ -105,7 +106,7 @@ public class TieredFlatFileManager {
flatFile.commitCommitLog();
} catch (Throwable e) {
MessageQueue mq = flatFile.getMessageQueue();
- logger.error("commit commitLog periodically failed: topic: {}, queue: {}",
+ logger.error("Commit commitLog periodically failed: topic: {}, queue: {}",
mq.getTopic(), mq.getQueueId(), e);
}
}, delay, TimeUnit.MILLISECONDS);
@@ -114,7 +115,7 @@ public class TieredFlatFileManager {
flatFile.commitConsumeQueue();
} catch (Throwable e) {
MessageQueue mq = flatFile.getMessageQueue();
- logger.error("commit consumeQueue periodically failed: topic: {}, queue: {}",
+ logger.error("Commit consumeQueue periodically failed: topic: {}, queue: {}",
mq.getTopic(), mq.getQueueId(), e);
}
}, delay, TimeUnit.MILLISECONDS);
@@ -125,7 +126,7 @@ public class TieredFlatFileManager {
indexFile.commit(true);
}
} catch (Throwable e) {
- logger.error("commit indexFile periodically failed", e);
+ logger.error("Commit indexFile periodically failed", e);
}
}, 0, TimeUnit.MILLISECONDS);
}
@@ -160,7 +161,7 @@ public class TieredFlatFileManager {
try {
doCommit();
} catch (Throwable e) {
- logger.error("commit flat file periodically failed: ", e);
+ logger.error("Commit flat file periodically failed: ", e);
}
}, 60, 60, TimeUnit.SECONDS);
@@ -168,49 +169,73 @@ public class TieredFlatFileManager {
try {
doCleanExpiredFile();
} catch (Throwable e) {
- logger.error("clean expired flat file failed: ", e);
+ logger.error("Clean expired flat file failed: ", e);
}
}, 30, 30, TimeUnit.SECONDS);
}
public boolean load() {
+ Stopwatch stopwatch = Stopwatch.createStarted();
try {
- AtomicLong topicSequenceNumber = new AtomicLong();
- List<Future<?>> futureList = new ArrayList<>();
- queueFlatFileMap.clear();
- metadataStore.iterateTopic(topicMetadata -> {
+ flatFileConcurrentMap.clear();
+ this.recoverSequenceNumber();
+ this.recoverTieredFlatFile();
+ logger.info("Message store recover end, total cost={}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ logger.info("Message store recover error, total cost={}ms", costTime);
+ BROKER_LOG.error("Message store recover error, total cost={}ms", costTime, e);
+ return false;
+ }
+ return true;
+ }
+
+ public void recoverSequenceNumber() {
+ AtomicLong topicSequenceNumber = new AtomicLong();
+ metadataStore.iterateTopic(topicMetadata -> {
+ if (topicMetadata != null && topicMetadata.getTopicId() > 0) {
topicSequenceNumber.set(Math.max(topicSequenceNumber.get(), topicMetadata.getTopicId()));
- Future<?> future = TieredStoreExecutor.dispatchExecutor.submit(() -> {
- if (topicMetadata.getStatus() != 0) {
- return;
- }
+ }
+ });
+ metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet());
+ }
+
+ public void recoverTieredFlatFile() {
+ Semaphore semaphore = new Semaphore((int) (TieredStoreExecutor.QUEUE_CAPACITY * 0.75));
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ metadataStore.iterateTopic(topicMetadata -> {
+ try {
+ semaphore.acquire();
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
- metadataStore.iterateQueue(topicMetadata.getTopic(),
- queueMetadata -> getOrCreateFlatFileIfAbsent(
- new MessageQueue(topicMetadata.getTopic(),
- storeConfig.getBrokerName(),
- queueMetadata.getQueue().getQueueId())));
+ Stopwatch subWatch = Stopwatch.createStarted();
+ if (topicMetadata.getStatus() != 0) {
+ return;
+ }
+ AtomicLong queueCount = new AtomicLong();
+ metadataStore.iterateQueue(topicMetadata.getTopic(), queueMetadata -> {
+ this.getOrCreateFlatFileIfAbsent(new MessageQueue(topicMetadata.getTopic(),
+ storeConfig.getBrokerName(), queueMetadata.getQueue().getQueueId()));
+ queueCount.incrementAndGet();
+ });
+ logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms",
+ topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
- logger.error("load mq composite flat file from metadata failed", e);
+ logger.error("Recover TopicFlatFile error, topic: {}", topicMetadata.getTopic(), e);
+ } finally {
+ semaphore.release();
}
- });
- futureList.add(future);
- });
-
- // Wait for load all metadata done
- for (Future<?> future : futureList) {
- future.get();
+ }, TieredStoreExecutor.commitExecutor);
+ futures.add(future);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet());
- } catch (Exception e) {
- logger.error("load mq composite flat file from metadata failed", e);
- return false;
- }
- return true;
+ });
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
public void cleanup() {
- queueFlatFileMap.clear();
+ flatFileConcurrentMap.clear();
cleanStaticReference();
}
@@ -221,27 +246,25 @@ public class TieredFlatFileManager {
@Nullable
public CompositeQueueFlatFile getOrCreateFlatFileIfAbsent(MessageQueue messageQueue) {
- return queueFlatFileMap.computeIfAbsent(messageQueue, mq -> {
+ return flatFileConcurrentMap.computeIfAbsent(messageQueue, mq -> {
try {
- logger.debug("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
- "try to create new flat file: topic: {}, queueId: {}",
+ logger.debug("Create new TopicFlatFile, topic: {}, queueId: {}",
messageQueue.getTopic(), messageQueue.getQueueId());
return new CompositeQueueFlatFile(tieredFileAllocator, mq);
} catch (Exception e) {
- logger.error("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
- "create new flat file: topic: {}, queueId: {}",
+ logger.debug("Create new TopicFlatFile failed, topic: {}, queueId: {}",
messageQueue.getTopic(), messageQueue.getQueueId(), e);
- return null;
}
+ return null;
});
}
public CompositeQueueFlatFile getFlatFile(MessageQueue messageQueue) {
- return queueFlatFileMap.get(messageQueue);
+ return flatFileConcurrentMap.get(messageQueue);
}
public ImmutableList<CompositeQueueFlatFile> deepCopyFlatFileToList() {
- return ImmutableList.copyOf(queueFlatFileMap.values());
+ return ImmutableList.copyOf(flatFileConcurrentMap.values());
}
public void shutdown() {
@@ -270,7 +293,7 @@ public class TieredFlatFileManager {
}
// delete memory reference
- CompositeQueueFlatFile flatFile = queueFlatFileMap.remove(mq);
+ CompositeQueueFlatFile flatFile = flatFileConcurrentMap.remove(mq);
if (flatFile != null) {
MessageQueue messageQueue = flatFile.getMessageQueue();
logger.info("TieredFlatFileManager#destroyCompositeFile: " +
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
index 1b232fc75..2f0fd71de 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.tieredstore.metadata;
+import java.util.Objects;
+
public class FileSegmentMetadata {
public static final int STATUS_NEW = 0;
@@ -43,7 +45,6 @@ public class FileSegmentMetadata {
this.baseOffset = baseOffset;
this.type = type;
this.status = STATUS_NEW;
- this.createTimestamp = System.currentTimeMillis();
}
public void markSealed() {
@@ -122,4 +123,27 @@ public class FileSegmentMetadata {
public void setSealTimestamp(long sealTimestamp) {
this.sealTimestamp = sealTimestamp;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ FileSegmentMetadata metadata = (FileSegmentMetadata) o;
+ return size == metadata.size
+ && baseOffset == metadata.baseOffset
+ && status == metadata.status
+ && path.equals(metadata.path)
+ && type == metadata.type
+ && createTimestamp == metadata.createTimestamp
+ && beginTimestamp == metadata.beginTimestamp
+ && endTimestamp == metadata.endTimestamp
+ && sealTimestamp == metadata.sealTimestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, path, baseOffset, status, size, createTimestamp, beginTimestamp, endTimestamp, sealTimestamp);
+ }
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
index e6adef1d1..5791dc9a4 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
@@ -116,19 +116,20 @@ public class TieredDispatcherTest {
buffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 9);
flatFile.appendCommitLog(buffer3);
flatFile.commitCommitLog();
- Assert.assertEquals(10, flatFile.getDispatchOffset());
+ Assert.assertEquals(9 + 1, flatFile.getDispatchOffset());
+ Assert.assertEquals(9, flatFile.getCommitLogDispatchCommitOffset());
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 8, 8, 0, 0, buffer1);
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 9, 9, 0, 0, buffer2);
dispatcher.buildConsumeQueueAndIndexFile();
Assert.assertEquals(7, flatFile.getConsumeQueueMaxOffset());
- Assert.assertEquals(7, flatFile.getDispatchOffset());
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 7, 7, 0, 0, buffer1);
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 8, 8, 0, 0, buffer2);
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 9, 9, 0, 0, buffer3);
dispatcher.buildConsumeQueueAndIndexFile();
- Assert.assertEquals(10, flatFile.getConsumeQueueMaxOffset());
+ Assert.assertEquals(6, flatFile.getConsumeQueueMinOffset());
+ Assert.assertEquals(9 + 1, flatFile.getConsumeQueueMaxOffset());
}
@Test
@@ -142,6 +143,7 @@ public class TieredDispatcherTest {
Mockito.when(defaultStore.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId())).thenReturn(0L);
Mockito.when(defaultStore.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId())).thenReturn(9L);
+ // mock cq item, position = 7
ByteBuffer cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
cqItem.putLong(7);
cqItem.putInt(MessageBufferUtilTest.MSG_LEN);
@@ -150,13 +152,13 @@ public class TieredDispatcherTest {
SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0, cqItem, ConsumeQueue.CQ_STORE_UNIT_SIZE, null);
Mockito.when(((ConsumeQueue) defaultStore.getConsumeQueue(mq.getTopic(), mq.getQueueId())).getIndexBuffer(6)).thenReturn(mockResult);
+ // mock cq item, position = 8
cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
cqItem.putLong(8);
cqItem.putInt(MessageBufferUtilTest.MSG_LEN);
cqItem.putLong(1);
cqItem.flip();
mockResult = new SelectMappedBufferResult(0, cqItem, ConsumeQueue.CQ_STORE_UNIT_SIZE, null);
-
Mockito.when(((ConsumeQueue) defaultStore.getConsumeQueue(mq.getTopic(), mq.getQueueId())).getIndexBuffer(7)).thenReturn(mockResult);
mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMockedMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null);
@@ -167,7 +169,10 @@ public class TieredDispatcherTest {
mockResult = new SelectMappedBufferResult(0, msg, MessageBufferUtilTest.MSG_LEN, null);
Mockito.when(defaultStore.selectOneMessageByOffset(8, MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
- dispatcher.dispatchFlatFile(flatFileManager.getOrCreateFlatFileIfAbsent(mq));
+ CompositeQueueFlatFile flatFile = flatFileManager.getOrCreateFlatFileIfAbsent(mq);
+ Assert.assertNotNull(flatFile);
+ flatFile.initOffset(7);
+ dispatcher.dispatchFlatFile(flatFile);
Assert.assertEquals(8, flatFileManager.getFlatFile(mq).getDispatchOffset());
}
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index d75b2f916..774c6cf64 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -23,6 +23,7 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.tuple.Triple;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.GetMessageResult;
@@ -40,7 +41,6 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-import org.apache.rocketmq.common.BoundaryType;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 27efe111e..2e028ada3 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -119,7 +119,7 @@ public class CompositeQueueFlatFileTest {
Assert.assertEquals(AppendResult.SUCCESS, result);
file.commit(true);
- file.persistMetadata();
+ file.flushMetadata();
QueueMetadata queueMetadata = metadataStore.getQueue(mq);
Assert.assertEquals(53, queueMetadata.getMaxOffset());
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
index b7528c5e4..20fe4dd70 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
@@ -72,10 +72,15 @@ public class TieredFlatFileManagerTest {
CompositeFlatFile flatFile = flatFileManager.getFlatFile(mq);
Assert.assertNotNull(flatFile);
- Assert.assertEquals(100, flatFile.getDispatchOffset());
+ Assert.assertEquals(-1L, flatFile.getDispatchOffset());
+ flatFile.initOffset(100L);
+ Assert.assertEquals(100L, flatFile.getDispatchOffset());
+ flatFile.initOffset(200L);
+ Assert.assertEquals(100L, flatFile.getDispatchOffset());
CompositeFlatFile flatFile1 = flatFileManager.getFlatFile(mq1);
Assert.assertNotNull(flatFile1);
+ flatFile1.initOffset(200L);
Assert.assertEquals(200, flatFile1.getDispatchOffset());
flatFileManager.destroyCompositeFile(mq);
--
2.32.0.windows.2
From 99b39a35f29e491862296d56b7938a995d153974 Mon Sep 17 00:00:00 2001
From: ShuangxiDing <dingshuangxi888@gmail.com>
Date: Thu, 10 Aug 2023 11:28:39 +0800
Subject: [PATCH 02/12] [ISSUE #7115] Fix grpc response message NPE (#7116)
---
.../apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java
index 0b3c85ea6..efa879a9c 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java
@@ -92,7 +92,7 @@ public class ResponseBuilder {
public Status buildStatus(Code code, String message) {
return Status.newBuilder()
.setCode(code)
- .setMessage(message)
+ .setMessage(message != null ? message : code.name())
.build();
}
--
2.32.0.windows.2
From c0ba453f38183266cf9a69be754e620311e1923b Mon Sep 17 00:00:00 2001
From: caigy <csgytsai@163.com>
Date: Thu, 10 Aug 2023 14:08:17 +0800
Subject: [PATCH 03/12] [ISSUE #7129] Fix resource collisions in acl tests
(#7130)
* run acl tests sequencially to avoid collision
* disable reuseForks for acl like broker
* Revert "[ISSUE #7135] Temporarily ignoring plainAccessValidator test (#7135)"
This reverts commit 6bc2c8474a0ce1e2833c82dffea7b1d8f718fcd7.
---
acl/pom.xml | 13 +++++++++++++
.../acl/plain/PlainAccessControlFlowTest.java | 5 -----
.../acl/plain/PlainAccessValidatorTest.java | 3 ---
.../acl/plain/PlainPermissionManagerTest.java | 3 ---
4 files changed, 13 insertions(+), 11 deletions(-)
diff --git a/acl/pom.xml b/acl/pom.xml
index 67bfcb8d2..989c0cf77 100644
--- a/acl/pom.xml
+++ b/acl/pom.xml
@@ -74,4 +74,17 @@
<scope>test</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven-surefire-plugin.version}</version>
+ <configuration>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
index e7fd0932f..519345714 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
@@ -31,7 +31,6 @@ import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
@@ -44,7 +43,6 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-
/**
* <p> In this class, we'll test the following scenarios, each containing several consecutive operations on ACL,
* <p> like updating and deleting ACL, changing config files and checking validations.
@@ -52,9 +50,6 @@ import java.util.List;
* <p> Case 2: Only conf/acl/plain_acl.yml exists;
* <p> Case 3: Both conf/plain_acl.yml and conf/acl/plain_acl.yml exists.
*/
-
-// Ignore this test case as it is currently unable to pass on ubuntu workflow
-@Ignore
public class PlainAccessControlFlowTest {
public static final String DEFAULT_TOPIC = "topic-acl";
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
index a3a925758..ef0cffbdc 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
@@ -56,11 +56,8 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
-// Ignore this test case as it is currently unable to pass on ubuntu workflow
-@Ignore
public class PlainAccessValidatorTest {
private PlainAccessValidator plainAccessValidator;
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
index aa7539f3a..941d8c779 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
@@ -29,7 +29,6 @@ import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
@@ -42,8 +41,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-// Ignore this test case as it is currently unable to pass on ubuntu workflow
-@Ignore
public class PlainPermissionManagerTest {
PlainPermissionManager plainPermissionManager;
--
2.32.0.windows.2
From 8741ff8c9b3bdbfc97976285affa7ea35c81243c Mon Sep 17 00:00:00 2001
From: ShuangxiDing <dingshuangxi888@gmail.com>
Date: Thu, 10 Aug 2023 17:41:15 +0800
Subject: [PATCH 04/12] [ISSUE #7153] Add switch for MIXED message type (#7154)
Add a switch for MIXED message type when creating a Topic in the Broker.
---
.../broker/processor/AdminBrokerProcessor.java | 8 ++++++++
.../java/org/apache/rocketmq/common/BrokerConfig.java | 10 ++++++++++
2 files changed, 18 insertions(+)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a6ce03dc2..bbddcec2d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -59,6 +59,7 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.AttributeParser;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.constant.FIleReadaheadMode;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -439,6 +440,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
String attributesModification = requestHeader.getAttributes();
topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));
+ if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
+ && !brokerController.getBrokerConfig().isEnableMixedMessageType()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("MIXED message type is not supported.");
+ return response;
+ }
+
try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a815636b1..99a5db5ad 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -393,6 +393,8 @@ public class BrokerConfig extends BrokerIdentity {
*/
private boolean enableSingleTopicRegister = false;
+ private boolean enableMixedMessageType = false;
+
public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
@@ -1712,4 +1714,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setEnableSingleTopicRegister(boolean enableSingleTopicRegister) {
this.enableSingleTopicRegister = enableSingleTopicRegister;
}
+
+ public boolean isEnableMixedMessageType() {
+ return enableMixedMessageType;
+ }
+
+ public void setEnableMixedMessageType(boolean enableMixedMessageType) {
+ this.enableMixedMessageType = enableMixedMessageType;
+ }
}
--
2.32.0.windows.2
From f534501855f8edbcb58f5b856973bf1027b5cf3a Mon Sep 17 00:00:00 2001
From: Steven <shirenchuang@users.noreply.github.com>
Date: Fri, 11 Aug 2023 10:25:48 +0800
Subject: [PATCH 05/12] [Feature 7155] add errlog when cmd err (#7157)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Co-authored-by: 十真 <shirenchuang.src@cainiao.com>
---
.../src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java | 1 +
1 file changed, 1 insertion(+)
diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
index b00bad3c5..5a8a7cd54 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
+++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
@@ -52,6 +52,7 @@ public class ServerUtil {
System.exit(0);
}
} catch (ParseException e) {
+ System.err.println(e.getMessage());
hf.printHelp(appName, options, true);
System.exit(1);
}
--
2.32.0.windows.2
From db58f00c0fe0f129611d654291f2177de55dc9ff Mon Sep 17 00:00:00 2001
From: Zhouxiang Zhan <zhouxzhan@apache.org>
Date: Fri, 11 Aug 2023 19:18:30 +0800
Subject: [PATCH 06/12] [ISSUE #7169] Change metadataThreadPoolQueueCapacity to
100000 (#7170)
---
.../main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 4f57a7052..39caaa0d9 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -165,7 +165,7 @@ public class ProxyConfig implements ConfigFile {
private int subscriptionGroupConfigCacheExpiredInSeconds = 20;
private int subscriptionGroupConfigCacheMaxNum = 20000;
private int metadataThreadPoolNums = 3;
- private int metadataThreadPoolQueueCapacity = 1000;
+ private int metadataThreadPoolQueueCapacity = 100000;
private int transactionHeartbeatThreadPoolNums = 20;
private int transactionHeartbeatThreadPoolQueueCapacity = 200;
--
2.32.0.windows.2
From 1f04e68a2e331ab035b791280c5a91b60fe0c85f Mon Sep 17 00:00:00 2001
From: yx9o <yangx_soft@163.com>
Date: Sat, 12 Aug 2023 21:12:22 +0800
Subject: [PATCH 07/12] [ISSUE #7172] Unified Chinese for Name Server (#7173)
---
docs/cn/concept.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/cn/concept.md b/docs/cn/concept.md
index cb2c863bd..3d67e9371 100644
--- a/docs/cn/concept.md
+++ b/docs/cn/concept.md
@@ -17,7 +17,7 @@ RocketMQ主要由 Producer、Broker、Consumer 三部分组成其中Producer
消息中转角色负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据包括消费者组、消费进度偏移和主题和队列消息等。
## 6 名字服务Name Server
- 名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群但相互独立没有信息交换。
+名字服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群但相互独立没有信息交换。
## 7 拉取式消费Pull Consumer
Consumer消费的一种类型应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息应用就会启动消费过程。
--
2.32.0.windows.2
From 25005060bbace477eeaaf4c0142cece5213efbbf Mon Sep 17 00:00:00 2001
From: yx9o <yangx_soft@163.com>
Date: Sun, 13 Aug 2023 20:52:17 +0800
Subject: [PATCH 08/12] [ISSUE #7176] Correct mismatched logs (#7177)
---
.../org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 0055a1cc8..f7a95f0a6 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -522,7 +522,7 @@ public class RouteInfoManager {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
- log.error("wipeWritePermOfBrokerByLock Exception", e);
+ log.error("addWritePermOfBrokerByLock Exception", e);
}
return 0;
}
--
2.32.0.windows.2
From ac411daa27117e9115a8fc5e2d5753085f009ed9 Mon Sep 17 00:00:00 2001
From: yx9o <yangx_soft@163.com>
Date: Tue, 15 Aug 2023 08:31:00 +0800
Subject: [PATCH 09/12] [ISSUE #7183] Correct mismatched commandDesc (#7184)
---
.../tools/command/topic/RemappingStaticTopicSubCommand.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 849f680d0..2a08fdb5b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -47,7 +47,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Update or create static topic, which has fixed number of queues";
+ return "Remapping static topic.";
}
@Override
--
2.32.0.windows.2
From 55e0cdb2af3ab75a6d892f919d60797f17a99fda Mon Sep 17 00:00:00 2001
From: redlsz <szliu0927@gmail.com>
Date: Tue, 15 Aug 2023 19:19:45 +0800
Subject: [PATCH 10/12] fix: IndexOutOfBoundsException when process pop
response (#7003)
---
.../org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 5 ++++-
.../rocketmq/proxy/service/message/LocalMessageService.java | 5 ++++-
.../rocketmq/remoting/protocol/header/ExtraInfoUtil.java | 4 ++++
3 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 708a6acd1..5101ffc8e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1174,7 +1174,10 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
continue;
}
- key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
+ // Value of POP_CK is used to determine whether it is a pop retry,
+ // cause topic could be rewritten by broker.
+ key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
+ messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getQueueId());
if (!sortMap.containsKey(key)) {
sortMap.put(key, new ArrayList<>(4));
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index 115c140ff..eb2c4d9ee 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -249,7 +249,10 @@ public class LocalMessageService implements MessageService {
// <topicMark@queueId, msg queueOffset>
Map<String, List<Long>> sortMap = new HashMap<>(16);
for (MessageExt messageExt : messageExtList) {
- String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
+ // Value of POP_CK is used to determine whether it is a pop retry,
+ // cause topic could be rewritten by broker.
+ String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
+ messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getQueueId());
if (!sortMap.containsKey(key)) {
sortMap.put(key, new ArrayList<>(4));
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
index 9a5fa89ab..13094331e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
@@ -282,6 +282,10 @@ public class ExtraInfoUtil {
return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
}
+ public static String getStartOffsetInfoMapKey(String topic, String popCk, long key) {
+ return ((topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || popCk != null) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
+ }
+
public static String getQueueOffsetKeyValueKey(long queueId, long queueOffset) {
return QUEUE_OFFSET + queueId + "%" + queueOffset;
}
--
2.32.0.windows.2
From a9c0b43f7f6ce5acfc4f2f3069553071fa93dfee Mon Sep 17 00:00:00 2001
From: yx9o <yangx_soft@163.com>
Date: Wed, 16 Aug 2023 18:45:00 +0800
Subject: [PATCH 11/12] [ISSUE #7192] Correct typos (#7193)
---
.../tools/command/consumer/ConsumerProgressSubCommand.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index f51a24673..97125b854 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -54,7 +54,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Query consumers's progress, speed";
+ return "Query consumer's progress, speed.";
}
@Override
--
2.32.0.windows.2
From 5a3de926b816db5a121c1d788430072a3bc942ae Mon Sep 17 00:00:00 2001
From: Zhouxiang Zhan <zhouxzhan@apache.org>
Date: Wed, 16 Aug 2023 20:52:53 +0800
Subject: [PATCH 12/12] Optimize updateSubscription check exist loop (#7190)
---
.../broker/client/ConsumerGroupInfo.java | 17 ++++++-----------
1 file changed, 6 insertions(+), 11 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
index 867b9c720..1ea58c125 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
@@ -172,7 +173,7 @@ public class ConsumerGroupInfo {
*/
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
-
+ Set<String> topicSet = new HashSet<>();
for (SubscriptionData sub : subList) {
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
@@ -194,22 +195,16 @@ public class ConsumerGroupInfo {
this.subscriptionTable.put(sub.getTopic(), sub);
}
+ // Add all new topics to the HashSet
+ topicSet.add(sub.getTopic());
}
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
-
- boolean exist = false;
- for (SubscriptionData sub : subList) {
- if (sub.getTopic().equals(oldTopic)) {
- exist = true;
- break;
- }
- }
-
- if (!exist) {
+ // Check HashSet with O(1) time complexity
+ if (!topicSet.contains(oldTopic)) {
log.warn("subscription changed, group: {} remove topic {} {}",
this.groupName,
oldTopic,
--
2.32.0.windows.2