rocketmq/patch010-backport-add-some-fixes.patch

1287 lines
61 KiB
Diff
Raw Permalink Normal View History

2023-10-01 10:27:45 +08:00
From b2deef179dbc6a9eb1a2b6dd7b652d95cb768295 Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Thu, 10 Aug 2023 10:38:47 +0800
Subject: [PATCH 01/12] [ISSUE #7144] Accelerate the recovery speed of the
tiered storage module (#7145)
---
.../tieredstore/TieredDispatcher.java | 3 +
.../tieredstore/TieredMessageStore.java | 2 +-
.../common/TieredStoreExecutor.java | 25 ++--
.../tieredstore/file/CompositeFlatFile.java | 15 +-
.../file/CompositeQueueFlatFile.java | 20 ++-
.../tieredstore/file/TieredCommitLog.java | 24 +++-
.../tieredstore/file/TieredFlatFile.java | 42 +++---
.../file/TieredFlatFileManager.java | 135 ++++++++++--------
.../metadata/FileSegmentMetadata.java | 26 +++-
.../tieredstore/TieredDispatcherTest.java | 15 +-
.../tieredstore/TieredMessageFetcherTest.java | 2 +-
.../file/CompositeQueueFlatFileTest.java | 2 +-
.../file/TieredFlatFileManagerTest.java | 7 +-
13 files changed, 194 insertions(+), 124 deletions(-)
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index bb58ea7dd..1746190cd 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -279,6 +279,9 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
long upperBound = Math.min(dispatchOffset + maxCount, maxOffsetInQueue);
ConsumeQueue consumeQueue = (ConsumeQueue) defaultStore.getConsumeQueue(topic, queueId);
+ logger.debug("DispatchFlatFile race, topic={}, queueId={}, cq range={}-{}, dispatch offset={}-{}",
+ topic, queueId, minOffsetInQueue, maxOffsetInQueue, dispatchOffset, upperBound - 1);
+
for (; dispatchOffset < upperBound; dispatchOffset++) {
// get consume queue
SelectMappedBufferResult cqItem = consumeQueue.getIndexBuffer(dispatchOffset);
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 1f12410f2..ced1fb818 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -147,7 +147,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) {
if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) {
- logger.debug("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset);
+ logger.trace("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset);
return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
index 6eb3478b3..6dd0e8846 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
@@ -43,18 +43,9 @@ public class TieredStoreExecutor {
public static ExecutorService compactIndexFileExecutor;
public static void init() {
- dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
- dispatchExecutor = new ThreadPoolExecutor(
- Math.max(2, Runtime.getRuntime().availableProcessors()),
- Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- dispatchThreadPoolQueue,
- new ThreadFactoryImpl("TieredCommonExecutor_"));
-
commonScheduledExecutor = new ScheduledThreadPoolExecutor(
Math.max(4, Runtime.getRuntime().availableProcessors()),
- new ThreadFactoryImpl("TieredCommonScheduledExecutor_"));
+ new ThreadFactoryImpl("TieredCommonExecutor_"));
commitExecutor = new ScheduledThreadPoolExecutor(
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
@@ -62,7 +53,17 @@ public class TieredStoreExecutor {
cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor(
Math.max(4, Runtime.getRuntime().availableProcessors()),
- new ThreadFactoryImpl("TieredCleanExpiredFileExecutor_"));
+ new ThreadFactoryImpl("TieredCleanFileExecutor_"));
+
+ dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
+ dispatchExecutor = new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
+ Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ dispatchThreadPoolQueue,
+ new ThreadFactoryImpl("TieredDispatchExecutor_"),
+ new ThreadPoolExecutor.DiscardOldestPolicy());
fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
fetchDataExecutor = new ThreadPoolExecutor(
@@ -71,7 +72,7 @@ public class TieredStoreExecutor {
1000 * 60,
TimeUnit.MILLISECONDS,
fetchDataThreadPoolQueue,
- new ThreadFactoryImpl("TieredFetchDataExecutor_"));
+ new ThreadFactoryImpl("TieredFetchExecutor_"));
compactIndexFileThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
compactIndexFileExecutor = new ThreadPoolExecutor(
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
index df4baf33f..5ad3a6ff3 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
@@ -76,20 +76,15 @@ public class CompositeFlatFile implements CompositeAccess {
this.storeConfig = fileQueueFactory.getStoreConfig();
this.readAheadFactor = this.storeConfig.getReadAheadMinFactor();
this.metadataStore = TieredStoreUtil.getMetadataStore(this.storeConfig);
- this.dispatchOffset = new AtomicLong();
this.compositeFlatFileLock = new ReentrantLock();
this.inFlightRequestMap = new ConcurrentHashMap<>();
this.commitLog = new TieredCommitLog(fileQueueFactory, filePath);
this.consumeQueue = new TieredConsumeQueue(fileQueueFactory, filePath);
+ this.dispatchOffset = new AtomicLong(
+ this.consumeQueue.isInitialized() ? this.getConsumeQueueCommitOffset() : -1L);
this.groupOffsetCache = this.initOffsetCache();
}
- protected void recoverMetadata() {
- if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) {
- consumeQueue.setBaseOffset(this.dispatchOffset.get() * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
- }
- }
-
private Cache<String, Long> initOffsetCache() {
return Caffeine.newBuilder()
.expireAfterWrite(2, TimeUnit.MINUTES)
@@ -310,10 +305,12 @@ public class CompositeFlatFile implements CompositeAccess {
@Override
public void initOffset(long offset) {
- if (!consumeQueue.isInitialized()) {
+ if (consumeQueue.isInitialized()) {
+ dispatchOffset.set(this.getConsumeQueueCommitOffset());
+ } else {
consumeQueue.setBaseOffset(offset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ dispatchOffset.set(offset);
}
- dispatchOffset.set(offset);
}
@Override
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
index f6c0afed0..0a797f465 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
@@ -36,8 +36,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, MessageQueue messageQueue) {
super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue));
this.messageQueue = messageQueue;
- this.recoverTopicMetadata();
- super.recoverMetadata();
+ this.recoverQueueMetadata();
this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
}
@@ -46,11 +45,12 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
if (!consumeQueue.isInitialized()) {
queueMetadata.setMinOffset(offset);
queueMetadata.setMaxOffset(offset);
+ metadataStore.updateQueue(queueMetadata);
}
super.initOffset(offset);
}
- public void recoverTopicMetadata() {
+ public void recoverQueueMetadata() {
TopicMetadata topicMetadata = this.metadataStore.getTopic(messageQueue.getTopic());
if (topicMetadata == null) {
topicMetadata = this.metadataStore.addTopic(messageQueue.getTopic(), -1L);
@@ -64,18 +64,16 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) {
queueMetadata.setMaxOffset(queueMetadata.getMinOffset());
}
- this.dispatchOffset.set(queueMetadata.getMaxOffset());
}
- public void persistMetadata() {
+ public void flushMetadata() {
try {
- if (consumeQueue.getCommitOffset() < queueMetadata.getMinOffset()) {
- return;
- }
- queueMetadata.setMaxOffset(consumeQueue.getCommitOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ queueMetadata.setMinOffset(super.getConsumeQueueMinOffset());
+ queueMetadata.setMaxOffset(super.getConsumeQueueMaxOffset());
metadataStore.updateQueue(queueMetadata);
} catch (Exception e) {
- LOGGER.error("CompositeFlatFile#flushMetadata: update queue metadata failed: topic: {}, queue: {}", messageQueue.getTopic(), messageQueue.getQueueId(), e);
+ LOGGER.error("CompositeFlatFile#flushMetadata error, topic: {}, queue: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId(), e);
}
}
@@ -114,7 +112,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
@Override
public void shutdown() {
super.shutdown();
- metadataStore.updateQueue(queueMetadata);
+ this.flushMetadata();
}
@Override
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
index 80e1bce50..0e5f79132 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
@@ -50,7 +50,7 @@ public class TieredCommitLog {
this.storeConfig = fileQueueFactory.getStoreConfig();
this.flatFile = fileQueueFactory.createFlatFileForCommitLog(filePath);
this.minConsumeQueueOffset = new AtomicLong(NOT_EXIST_MIN_OFFSET);
- this.correctMinOffset();
+ this.correctMinOffsetAsync();
}
@VisibleForTesting
@@ -91,17 +91,26 @@ public class TieredCommitLog {
return flatFile.getFileToWrite().getMaxTimestamp();
}
- public synchronized long correctMinOffset() {
+ public long correctMinOffset() {
+ try {
+ return correctMinOffsetAsync().get();
+ } catch (Exception e) {
+ log.error("Correct min offset failed in clean expired file", e);
+ }
+ return NOT_EXIST_MIN_OFFSET;
+ }
+
+ public synchronized CompletableFuture<Long> correctMinOffsetAsync() {
if (flatFile.getFileSegmentCount() == 0) {
this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
- return NOT_EXIST_MIN_OFFSET;
+ return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET);
}
// queue offset field length is 8
int length = MessageBufferUtil.QUEUE_OFFSET_POSITION + 8;
if (flatFile.getCommitOffset() - flatFile.getMinOffset() < length) {
this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
- return NOT_EXIST_MIN_OFFSET;
+ return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET);
}
try {
@@ -109,7 +118,8 @@ public class TieredCommitLog {
.thenApply(buffer -> {
long offset = MessageBufferUtil.getQueueOffset(buffer);
minConsumeQueueOffset.set(offset);
- log.info("Correct commitlog min cq offset success, filePath={}, min cq offset={}, range={}-{}",
+ log.debug("Correct commitlog min cq offset success, " +
+ "filePath={}, min cq offset={}, commitlog range={}-{}",
flatFile.getFilePath(), offset, flatFile.getMinOffset(), flatFile.getCommitOffset());
return offset;
})
@@ -117,11 +127,11 @@ public class TieredCommitLog {
log.warn("Correct commitlog min cq offset error, filePath={}, range={}-{}",
flatFile.getFilePath(), flatFile.getMinOffset(), flatFile.getCommitOffset(), throwable);
return minConsumeQueueOffset.get();
- }).get();
+ });
} catch (Exception e) {
log.error("Correct commitlog min cq offset error, filePath={}", flatFile.getFilePath(), e);
}
- return minConsumeQueueOffset.get();
+ return CompletableFuture.completedFuture(minConsumeQueueOffset.get());
}
public AppendResult append(ByteBuffer byteBuf) {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index 75ce8d89f..426c4e09d 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.tieredstore.file;
+import com.alibaba.fastjson.JSON;
import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -24,6 +25,7 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -178,32 +180,26 @@ public class TieredFlatFile {
private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) {
FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
- fileSegment.getPath(), fileSegment.getFileType(), fileSegment.getBaseOffset());
-
- if (metadata != null) {
- return metadata;
- }
+ this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
// Note: file segment path may not the same as file base path, use base path here.
- metadata = new FileSegmentMetadata(
- this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType());
-
- if (fileSegment.isClosed()) {
- metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+ if (metadata == null) {
+ metadata = new FileSegmentMetadata(
+ this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType());
+ metadata.setCreateTimestamp(fileSegment.getMinTimestamp());
+ metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
+ metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
+ if (fileSegment.isClosed()) {
+ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+ }
+ this.tieredMetadataStore.updateFileSegment(metadata);
}
-
- metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
- metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
-
- // Submit to persist
- this.tieredMetadataStore.updateFileSegment(metadata);
return metadata;
}
/**
* FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full
*/
- @VisibleForTesting
public void updateFileSegment(TieredFileSegment fileSegment) {
FileSegmentMetadata segmentMetadata = getOrCreateFileSegmentMetadata(fileSegment);
@@ -219,9 +215,16 @@ public class TieredFlatFile {
}
segmentMetadata.setSize(fileSegment.getCommitPosition());
- segmentMetadata.setBeginTimestamp(fileSegment.getMinTimestamp());
segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp());
- this.tieredMetadataStore.updateFileSegment(segmentMetadata);
+
+ FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
+ this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
+
+ if (!Objects.equals(metadata, segmentMetadata)) {
+ this.tieredMetadataStore.updateFileSegment(segmentMetadata);
+ logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, content: {}",
+ segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata));
+ }
}
private void checkAndFixFileSize() {
@@ -257,6 +260,7 @@ public class TieredFlatFile {
logger.warn("TieredFlatFile#checkAndFixFileSize: fix last file {} size: origin: {}, actual: {}",
lastFile.getPath(), lastFile.getCommitOffset() - lastFile.getBaseOffset(), lastFileSize);
lastFile.initPosition(lastFileSize);
+ this.updateFileSegment(lastFile);
}
}
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index aeca44b8c..e9ae4a5a5 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -16,16 +16,19 @@
*/
package org.apache.rocketmq.tieredstore.file;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -36,6 +39,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
public class TieredFlatFileManager {
+ private static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
private static volatile TieredFlatFileManager instance;
@@ -44,7 +48,7 @@ public class TieredFlatFileManager {
private final TieredMetadataStore metadataStore;
private final TieredMessageStoreConfig storeConfig;
private final TieredFileAllocator tieredFileAllocator;
- private final ConcurrentMap<MessageQueue, CompositeQueueFlatFile> queueFlatFileMap;
+ private final ConcurrentMap<MessageQueue, CompositeQueueFlatFile> flatFileConcurrentMap;
public TieredFlatFileManager(TieredMessageStoreConfig storeConfig)
throws ClassNotFoundException, NoSuchMethodException {
@@ -52,23 +56,20 @@ public class TieredFlatFileManager {
this.storeConfig = storeConfig;
this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
this.tieredFileAllocator = new TieredFileAllocator(storeConfig);
- this.queueFlatFileMap = new ConcurrentHashMap<>();
+ this.flatFileConcurrentMap = new ConcurrentHashMap<>();
this.doScheduleTask();
}
public static TieredFlatFileManager getInstance(TieredMessageStoreConfig storeConfig) {
- if (storeConfig == null) {
+ if (storeConfig == null || instance != null) {
return instance;
}
-
- if (instance == null) {
- synchronized (TieredFlatFileManager.class) {
- if (instance == null) {
- try {
- instance = new TieredFlatFileManager(storeConfig);
- } catch (Exception e) {
- logger.error("TieredFlatFileManager#getInstance: create flat file manager failed", e);
- }
+ synchronized (TieredFlatFileManager.class) {
+ if (instance == null) {
+ try {
+ instance = new TieredFlatFileManager(storeConfig);
+ } catch (Exception e) {
+ logger.error("Construct FlatFileManager instance error", e);
}
}
}
@@ -88,7 +89,7 @@ public class TieredFlatFileManager {
TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0));
indexFile = new TieredIndexFile(new TieredFileAllocator(storeConfig), filePath);
} catch (Exception e) {
- logger.error("TieredFlatFileManager#getIndexFile: create index file failed", e);
+ logger.error("Construct FlatFileManager indexFile error", e);
}
}
}
@@ -105,7 +106,7 @@ public class TieredFlatFileManager {
flatFile.commitCommitLog();
} catch (Throwable e) {
MessageQueue mq = flatFile.getMessageQueue();
- logger.error("commit commitLog periodically failed: topic: {}, queue: {}",
+ logger.error("Commit commitLog periodically failed: topic: {}, queue: {}",
mq.getTopic(), mq.getQueueId(), e);
}
}, delay, TimeUnit.MILLISECONDS);
@@ -114,7 +115,7 @@ public class TieredFlatFileManager {
flatFile.commitConsumeQueue();
} catch (Throwable e) {
MessageQueue mq = flatFile.getMessageQueue();
- logger.error("commit consumeQueue periodically failed: topic: {}, queue: {}",
+ logger.error("Commit consumeQueue periodically failed: topic: {}, queue: {}",
mq.getTopic(), mq.getQueueId(), e);
}
}, delay, TimeUnit.MILLISECONDS);
@@ -125,7 +126,7 @@ public class TieredFlatFileManager {
indexFile.commit(true);
}
} catch (Throwable e) {
- logger.error("commit indexFile periodically failed", e);
+ logger.error("Commit indexFile periodically failed", e);
}
}, 0, TimeUnit.MILLISECONDS);
}
@@ -160,7 +161,7 @@ public class TieredFlatFileManager {
try {
doCommit();
} catch (Throwable e) {
- logger.error("commit flat file periodically failed: ", e);
+ logger.error("Commit flat file periodically failed: ", e);
}
}, 60, 60, TimeUnit.SECONDS);
@@ -168,49 +169,73 @@ public class TieredFlatFileManager {
try {
doCleanExpiredFile();
} catch (Throwable e) {
- logger.error("clean expired flat file failed: ", e);
+ logger.error("Clean expired flat file failed: ", e);
}
}, 30, 30, TimeUnit.SECONDS);
}
public boolean load() {
+ Stopwatch stopwatch = Stopwatch.createStarted();
try {
- AtomicLong topicSequenceNumber = new AtomicLong();
- List<Future<?>> futureList = new ArrayList<>();
- queueFlatFileMap.clear();
- metadataStore.iterateTopic(topicMetadata -> {
+ flatFileConcurrentMap.clear();
+ this.recoverSequenceNumber();
+ this.recoverTieredFlatFile();
+ logger.info("Message store recover end, total cost={}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ logger.info("Message store recover error, total cost={}ms", costTime);
+ BROKER_LOG.error("Message store recover error, total cost={}ms", costTime, e);
+ return false;
+ }
+ return true;
+ }
+
+ public void recoverSequenceNumber() {
+ AtomicLong topicSequenceNumber = new AtomicLong();
+ metadataStore.iterateTopic(topicMetadata -> {
+ if (topicMetadata != null && topicMetadata.getTopicId() > 0) {
topicSequenceNumber.set(Math.max(topicSequenceNumber.get(), topicMetadata.getTopicId()));
- Future<?> future = TieredStoreExecutor.dispatchExecutor.submit(() -> {
- if (topicMetadata.getStatus() != 0) {
- return;
- }
+ }
+ });
+ metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet());
+ }
+
+ public void recoverTieredFlatFile() {
+ Semaphore semaphore = new Semaphore((int) (TieredStoreExecutor.QUEUE_CAPACITY * 0.75));
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ metadataStore.iterateTopic(topicMetadata -> {
+ try {
+ semaphore.acquire();
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
- metadataStore.iterateQueue(topicMetadata.getTopic(),
- queueMetadata -> getOrCreateFlatFileIfAbsent(
- new MessageQueue(topicMetadata.getTopic(),
- storeConfig.getBrokerName(),
- queueMetadata.getQueue().getQueueId())));
+ Stopwatch subWatch = Stopwatch.createStarted();
+ if (topicMetadata.getStatus() != 0) {
+ return;
+ }
+ AtomicLong queueCount = new AtomicLong();
+ metadataStore.iterateQueue(topicMetadata.getTopic(), queueMetadata -> {
+ this.getOrCreateFlatFileIfAbsent(new MessageQueue(topicMetadata.getTopic(),
+ storeConfig.getBrokerName(), queueMetadata.getQueue().getQueueId()));
+ queueCount.incrementAndGet();
+ });
+ logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms",
+ topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
- logger.error("load mq composite flat file from metadata failed", e);
+ logger.error("Recover TopicFlatFile error, topic: {}", topicMetadata.getTopic(), e);
+ } finally {
+ semaphore.release();
}
- });
- futureList.add(future);
- });
-
- // Wait for load all metadata done
- for (Future<?> future : futureList) {
- future.get();
+ }, TieredStoreExecutor.commitExecutor);
+ futures.add(future);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet());
- } catch (Exception e) {
- logger.error("load mq composite flat file from metadata failed", e);
- return false;
- }
- return true;
+ });
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
public void cleanup() {
- queueFlatFileMap.clear();
+ flatFileConcurrentMap.clear();
cleanStaticReference();
}
@@ -221,27 +246,25 @@ public class TieredFlatFileManager {
@Nullable
public CompositeQueueFlatFile getOrCreateFlatFileIfAbsent(MessageQueue messageQueue) {
- return queueFlatFileMap.computeIfAbsent(messageQueue, mq -> {
+ return flatFileConcurrentMap.computeIfAbsent(messageQueue, mq -> {
try {
- logger.debug("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
- "try to create new flat file: topic: {}, queueId: {}",
+ logger.debug("Create new TopicFlatFile, topic: {}, queueId: {}",
messageQueue.getTopic(), messageQueue.getQueueId());
return new CompositeQueueFlatFile(tieredFileAllocator, mq);
} catch (Exception e) {
- logger.error("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
- "create new flat file: topic: {}, queueId: {}",
+ logger.debug("Create new TopicFlatFile failed, topic: {}, queueId: {}",
messageQueue.getTopic(), messageQueue.getQueueId(), e);
- return null;
}
+ return null;
});
}
public CompositeQueueFlatFile getFlatFile(MessageQueue messageQueue) {
- return queueFlatFileMap.get(messageQueue);
+ return flatFileConcurrentMap.get(messageQueue);
}
public ImmutableList<CompositeQueueFlatFile> deepCopyFlatFileToList() {
- return ImmutableList.copyOf(queueFlatFileMap.values());
+ return ImmutableList.copyOf(flatFileConcurrentMap.values());
}
public void shutdown() {
@@ -270,7 +293,7 @@ public class TieredFlatFileManager {
}
// delete memory reference
- CompositeQueueFlatFile flatFile = queueFlatFileMap.remove(mq);
+ CompositeQueueFlatFile flatFile = flatFileConcurrentMap.remove(mq);
if (flatFile != null) {
MessageQueue messageQueue = flatFile.getMessageQueue();
logger.info("TieredFlatFileManager#destroyCompositeFile: " +
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
index 1b232fc75..2f0fd71de 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.tieredstore.metadata;
+import java.util.Objects;
+
public class FileSegmentMetadata {
public static final int STATUS_NEW = 0;
@@ -43,7 +45,6 @@ public class FileSegmentMetadata {
this.baseOffset = baseOffset;
this.type = type;
this.status = STATUS_NEW;
- this.createTimestamp = System.currentTimeMillis();
}
public void markSealed() {
@@ -122,4 +123,27 @@ public class FileSegmentMetadata {
public void setSealTimestamp(long sealTimestamp) {
this.sealTimestamp = sealTimestamp;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ FileSegmentMetadata metadata = (FileSegmentMetadata) o;
+ return size == metadata.size
+ && baseOffset == metadata.baseOffset
+ && status == metadata.status
+ && path.equals(metadata.path)
+ && type == metadata.type
+ && createTimestamp == metadata.createTimestamp
+ && beginTimestamp == metadata.beginTimestamp
+ && endTimestamp == metadata.endTimestamp
+ && sealTimestamp == metadata.sealTimestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, path, baseOffset, status, size, createTimestamp, beginTimestamp, endTimestamp, sealTimestamp);
+ }
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
index e6adef1d1..5791dc9a4 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
@@ -116,19 +116,20 @@ public class TieredDispatcherTest {
buffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 9);
flatFile.appendCommitLog(buffer3);
flatFile.commitCommitLog();
- Assert.assertEquals(10, flatFile.getDispatchOffset());
+ Assert.assertEquals(9 + 1, flatFile.getDispatchOffset());
+ Assert.assertEquals(9, flatFile.getCommitLogDispatchCommitOffset());
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 8, 8, 0, 0, buffer1);
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 9, 9, 0, 0, buffer2);
dispatcher.buildConsumeQueueAndIndexFile();
Assert.assertEquals(7, flatFile.getConsumeQueueMaxOffset());
- Assert.assertEquals(7, flatFile.getDispatchOffset());
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 7, 7, 0, 0, buffer1);
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 8, 8, 0, 0, buffer2);
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 9, 9, 0, 0, buffer3);
dispatcher.buildConsumeQueueAndIndexFile();
- Assert.assertEquals(10, flatFile.getConsumeQueueMaxOffset());
+ Assert.assertEquals(6, flatFile.getConsumeQueueMinOffset());
+ Assert.assertEquals(9 + 1, flatFile.getConsumeQueueMaxOffset());
}
@Test
@@ -142,6 +143,7 @@ public class TieredDispatcherTest {
Mockito.when(defaultStore.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId())).thenReturn(0L);
Mockito.when(defaultStore.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId())).thenReturn(9L);
+ // mock cq item, position = 7
ByteBuffer cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
cqItem.putLong(7);
cqItem.putInt(MessageBufferUtilTest.MSG_LEN);
@@ -150,13 +152,13 @@ public class TieredDispatcherTest {
SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0, cqItem, ConsumeQueue.CQ_STORE_UNIT_SIZE, null);
Mockito.when(((ConsumeQueue) defaultStore.getConsumeQueue(mq.getTopic(), mq.getQueueId())).getIndexBuffer(6)).thenReturn(mockResult);
+ // mock cq item, position = 8
cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
cqItem.putLong(8);
cqItem.putInt(MessageBufferUtilTest.MSG_LEN);
cqItem.putLong(1);
cqItem.flip();
mockResult = new SelectMappedBufferResult(0, cqItem, ConsumeQueue.CQ_STORE_UNIT_SIZE, null);
-
Mockito.when(((ConsumeQueue) defaultStore.getConsumeQueue(mq.getTopic(), mq.getQueueId())).getIndexBuffer(7)).thenReturn(mockResult);
mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMockedMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null);
@@ -167,7 +169,10 @@ public class TieredDispatcherTest {
mockResult = new SelectMappedBufferResult(0, msg, MessageBufferUtilTest.MSG_LEN, null);
Mockito.when(defaultStore.selectOneMessageByOffset(8, MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
- dispatcher.dispatchFlatFile(flatFileManager.getOrCreateFlatFileIfAbsent(mq));
+ CompositeQueueFlatFile flatFile = flatFileManager.getOrCreateFlatFileIfAbsent(mq);
+ Assert.assertNotNull(flatFile);
+ flatFile.initOffset(7);
+ dispatcher.dispatchFlatFile(flatFile);
Assert.assertEquals(8, flatFileManager.getFlatFile(mq).getDispatchOffset());
}
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index d75b2f916..774c6cf64 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -23,6 +23,7 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.tuple.Triple;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.GetMessageResult;
@@ -40,7 +41,6 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-import org.apache.rocketmq.common.BoundaryType;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 27efe111e..2e028ada3 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -119,7 +119,7 @@ public class CompositeQueueFlatFileTest {
Assert.assertEquals(AppendResult.SUCCESS, result);
file.commit(true);
- file.persistMetadata();
+ file.flushMetadata();
QueueMetadata queueMetadata = metadataStore.getQueue(mq);
Assert.assertEquals(53, queueMetadata.getMaxOffset());
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
index b7528c5e4..20fe4dd70 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
@@ -72,10 +72,15 @@ public class TieredFlatFileManagerTest {
CompositeFlatFile flatFile = flatFileManager.getFlatFile(mq);
Assert.assertNotNull(flatFile);
- Assert.assertEquals(100, flatFile.getDispatchOffset());
+ Assert.assertEquals(-1L, flatFile.getDispatchOffset());
+ flatFile.initOffset(100L);
+ Assert.assertEquals(100L, flatFile.getDispatchOffset());
+ flatFile.initOffset(200L);
+ Assert.assertEquals(100L, flatFile.getDispatchOffset());
CompositeFlatFile flatFile1 = flatFileManager.getFlatFile(mq1);
Assert.assertNotNull(flatFile1);
+ flatFile1.initOffset(200L);
Assert.assertEquals(200, flatFile1.getDispatchOffset());
flatFileManager.destroyCompositeFile(mq);
--
2.32.0.windows.2
From 99b39a35f29e491862296d56b7938a995d153974 Mon Sep 17 00:00:00 2001
From: ShuangxiDing <dingshuangxi888@gmail.com>
Date: Thu, 10 Aug 2023 11:28:39 +0800
Subject: [PATCH 02/12] [ISSUE #7115] Fix grpc response message NPE (#7116)
---
.../apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java
index 0b3c85ea6..efa879a9c 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java
@@ -92,7 +92,7 @@ public class ResponseBuilder {
public Status buildStatus(Code code, String message) {
return Status.newBuilder()
.setCode(code)
- .setMessage(message)
+ .setMessage(message != null ? message : code.name())
.build();
}
--
2.32.0.windows.2
From c0ba453f38183266cf9a69be754e620311e1923b Mon Sep 17 00:00:00 2001
From: caigy <csgytsai@163.com>
Date: Thu, 10 Aug 2023 14:08:17 +0800
Subject: [PATCH 03/12] [ISSUE #7129] Fix resource collisions in acl tests
(#7130)
* run acl tests sequencially to avoid collision
* disable reuseForks for acl like broker
* Revert "[ISSUE #7135] Temporarily ignoring plainAccessValidator test (#7135)"
This reverts commit 6bc2c8474a0ce1e2833c82dffea7b1d8f718fcd7.
---
acl/pom.xml | 13 +++++++++++++
.../acl/plain/PlainAccessControlFlowTest.java | 5 -----
.../acl/plain/PlainAccessValidatorTest.java | 3 ---
.../acl/plain/PlainPermissionManagerTest.java | 3 ---
4 files changed, 13 insertions(+), 11 deletions(-)
diff --git a/acl/pom.xml b/acl/pom.xml
index 67bfcb8d2..989c0cf77 100644
--- a/acl/pom.xml
+++ b/acl/pom.xml
@@ -74,4 +74,17 @@
<scope>test</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven-surefire-plugin.version}</version>
+ <configuration>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
index e7fd0932f..519345714 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
@@ -31,7 +31,6 @@ import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
@@ -44,7 +43,6 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-
/**
* <p> In this class, we'll test the following scenarios, each containing several consecutive operations on ACL,
* <p> like updating and deleting ACL, changing config files and checking validations.
@@ -52,9 +50,6 @@ import java.util.List;
* <p> Case 2: Only conf/acl/plain_acl.yml exists;
* <p> Case 3: Both conf/plain_acl.yml and conf/acl/plain_acl.yml exists.
*/
-
-// Ignore this test case as it is currently unable to pass on ubuntu workflow
-@Ignore
public class PlainAccessControlFlowTest {
public static final String DEFAULT_TOPIC = "topic-acl";
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
index a3a925758..ef0cffbdc 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
@@ -56,11 +56,8 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
-// Ignore this test case as it is currently unable to pass on ubuntu workflow
-@Ignore
public class PlainAccessValidatorTest {
private PlainAccessValidator plainAccessValidator;
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
index aa7539f3a..941d8c779 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
@@ -29,7 +29,6 @@ import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
@@ -42,8 +41,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-// Ignore this test case as it is currently unable to pass on ubuntu workflow
-@Ignore
public class PlainPermissionManagerTest {
PlainPermissionManager plainPermissionManager;
--
2.32.0.windows.2
From 8741ff8c9b3bdbfc97976285affa7ea35c81243c Mon Sep 17 00:00:00 2001
From: ShuangxiDing <dingshuangxi888@gmail.com>
Date: Thu, 10 Aug 2023 17:41:15 +0800
Subject: [PATCH 04/12] [ISSUE #7153] Add switch for MIXED message type (#7154)
Add a switch for MIXED message type when creating a Topic in the Broker.
---
.../broker/processor/AdminBrokerProcessor.java | 8 ++++++++
.../java/org/apache/rocketmq/common/BrokerConfig.java | 10 ++++++++++
2 files changed, 18 insertions(+)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a6ce03dc2..bbddcec2d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -59,6 +59,7 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.AttributeParser;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.constant.FIleReadaheadMode;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -439,6 +440,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
String attributesModification = requestHeader.getAttributes();
topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));
+ if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
+ && !brokerController.getBrokerConfig().isEnableMixedMessageType()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("MIXED message type is not supported.");
+ return response;
+ }
+
try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a815636b1..99a5db5ad 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -393,6 +393,8 @@ public class BrokerConfig extends BrokerIdentity {
*/
private boolean enableSingleTopicRegister = false;
+ private boolean enableMixedMessageType = false;
+
public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
@@ -1712,4 +1714,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setEnableSingleTopicRegister(boolean enableSingleTopicRegister) {
this.enableSingleTopicRegister = enableSingleTopicRegister;
}
+
+ public boolean isEnableMixedMessageType() {
+ return enableMixedMessageType;
+ }
+
+ public void setEnableMixedMessageType(boolean enableMixedMessageType) {
+ this.enableMixedMessageType = enableMixedMessageType;
+ }
}
--
2.32.0.windows.2
From f534501855f8edbcb58f5b856973bf1027b5cf3a Mon Sep 17 00:00:00 2001
From: Steven <shirenchuang@users.noreply.github.com>
Date: Fri, 11 Aug 2023 10:25:48 +0800
Subject: [PATCH 05/12] [Feature 7155] add errlog when cmd err (#7157)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Co-authored-by: 十真 <shirenchuang.src@cainiao.com>
---
.../src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java | 1 +
1 file changed, 1 insertion(+)
diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
index b00bad3c5..5a8a7cd54 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
+++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
@@ -52,6 +52,7 @@ public class ServerUtil {
System.exit(0);
}
} catch (ParseException e) {
+ System.err.println(e.getMessage());
hf.printHelp(appName, options, true);
System.exit(1);
}
--
2.32.0.windows.2
From db58f00c0fe0f129611d654291f2177de55dc9ff Mon Sep 17 00:00:00 2001
From: Zhouxiang Zhan <zhouxzhan@apache.org>
Date: Fri, 11 Aug 2023 19:18:30 +0800
Subject: [PATCH 06/12] [ISSUE #7169] Change metadataThreadPoolQueueCapacity to
100000 (#7170)
---
.../main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 4f57a7052..39caaa0d9 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -165,7 +165,7 @@ public class ProxyConfig implements ConfigFile {
private int subscriptionGroupConfigCacheExpiredInSeconds = 20;
private int subscriptionGroupConfigCacheMaxNum = 20000;
private int metadataThreadPoolNums = 3;
- private int metadataThreadPoolQueueCapacity = 1000;
+ private int metadataThreadPoolQueueCapacity = 100000;
private int transactionHeartbeatThreadPoolNums = 20;
private int transactionHeartbeatThreadPoolQueueCapacity = 200;
--
2.32.0.windows.2
From 1f04e68a2e331ab035b791280c5a91b60fe0c85f Mon Sep 17 00:00:00 2001
From: yx9o <yangx_soft@163.com>
Date: Sat, 12 Aug 2023 21:12:22 +0800
Subject: [PATCH 07/12] [ISSUE #7172] Unified Chinese for Name Server (#7173)
---
docs/cn/concept.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/cn/concept.md b/docs/cn/concept.md
index cb2c863bd..3d67e9371 100644
--- a/docs/cn/concept.md
+++ b/docs/cn/concept.md
@@ -17,7 +17,7 @@ RocketMQ主要由 Producer、Broker、Consumer 三部分组成其中Producer
消息中转角色负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据包括消费者组、消费进度偏移和主题和队列消息等。
## 6 名字服务Name Server
- 名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群但相互独立没有信息交换。
+名字服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群但相互独立没有信息交换。
## 7 拉取式消费Pull Consumer
Consumer消费的一种类型应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息应用就会启动消费过程。
--
2.32.0.windows.2
From 25005060bbace477eeaaf4c0142cece5213efbbf Mon Sep 17 00:00:00 2001
From: yx9o <yangx_soft@163.com>
Date: Sun, 13 Aug 2023 20:52:17 +0800
Subject: [PATCH 08/12] [ISSUE #7176] Correct mismatched logs (#7177)
---
.../org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 0055a1cc8..f7a95f0a6 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -522,7 +522,7 @@ public class RouteInfoManager {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
- log.error("wipeWritePermOfBrokerByLock Exception", e);
+ log.error("addWritePermOfBrokerByLock Exception", e);
}
return 0;
}
--
2.32.0.windows.2
From ac411daa27117e9115a8fc5e2d5753085f009ed9 Mon Sep 17 00:00:00 2001
From: yx9o <yangx_soft@163.com>
Date: Tue, 15 Aug 2023 08:31:00 +0800
Subject: [PATCH 09/12] [ISSUE #7183] Correct mismatched commandDesc (#7184)
---
.../tools/command/topic/RemappingStaticTopicSubCommand.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 849f680d0..2a08fdb5b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -47,7 +47,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Update or create static topic, which has fixed number of queues";
+ return "Remapping static topic.";
}
@Override
--
2.32.0.windows.2
From 55e0cdb2af3ab75a6d892f919d60797f17a99fda Mon Sep 17 00:00:00 2001
From: redlsz <szliu0927@gmail.com>
Date: Tue, 15 Aug 2023 19:19:45 +0800
Subject: [PATCH 10/12] fix: IndexOutOfBoundsException when process pop
response (#7003)
---
.../org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 5 ++++-
.../rocketmq/proxy/service/message/LocalMessageService.java | 5 ++++-
.../rocketmq/remoting/protocol/header/ExtraInfoUtil.java | 4 ++++
3 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 708a6acd1..5101ffc8e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1174,7 +1174,10 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
continue;
}
- key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
+ // Value of POP_CK is used to determine whether it is a pop retry,
+ // cause topic could be rewritten by broker.
+ key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
+ messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getQueueId());
if (!sortMap.containsKey(key)) {
sortMap.put(key, new ArrayList<>(4));
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index 115c140ff..eb2c4d9ee 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -249,7 +249,10 @@ public class LocalMessageService implements MessageService {
// <topicMark@queueId, msg queueOffset>
Map<String, List<Long>> sortMap = new HashMap<>(16);
for (MessageExt messageExt : messageExtList) {
- String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
+ // Value of POP_CK is used to determine whether it is a pop retry,
+ // cause topic could be rewritten by broker.
+ String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
+ messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getQueueId());
if (!sortMap.containsKey(key)) {
sortMap.put(key, new ArrayList<>(4));
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
index 9a5fa89ab..13094331e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
@@ -282,6 +282,10 @@ public class ExtraInfoUtil {
return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
}
+ public static String getStartOffsetInfoMapKey(String topic, String popCk, long key) {
+ return ((topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || popCk != null) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
+ }
+
public static String getQueueOffsetKeyValueKey(long queueId, long queueOffset) {
return QUEUE_OFFSET + queueId + "%" + queueOffset;
}
--
2.32.0.windows.2
From a9c0b43f7f6ce5acfc4f2f3069553071fa93dfee Mon Sep 17 00:00:00 2001
From: yx9o <yangx_soft@163.com>
Date: Wed, 16 Aug 2023 18:45:00 +0800
Subject: [PATCH 11/12] [ISSUE #7192] Correct typos (#7193)
---
.../tools/command/consumer/ConsumerProgressSubCommand.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index f51a24673..97125b854 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -54,7 +54,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Query consumers's progress, speed";
+ return "Query consumer's progress, speed.";
}
@Override
--
2.32.0.windows.2
From 5a3de926b816db5a121c1d788430072a3bc942ae Mon Sep 17 00:00:00 2001
From: Zhouxiang Zhan <zhouxzhan@apache.org>
Date: Wed, 16 Aug 2023 20:52:53 +0800
Subject: [PATCH 12/12] Optimize updateSubscription check exist loop (#7190)
---
.../broker/client/ConsumerGroupInfo.java | 17 ++++++-----------
1 file changed, 6 insertions(+), 11 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
index 867b9c720..1ea58c125 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
@@ -172,7 +173,7 @@ public class ConsumerGroupInfo {
*/
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
-
+ Set<String> topicSet = new HashSet<>();
for (SubscriptionData sub : subList) {
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
@@ -194,22 +195,16 @@ public class ConsumerGroupInfo {
this.subscriptionTable.put(sub.getTopic(), sub);
}
+ // Add all new topics to the HashSet
+ topicSet.add(sub.getTopic());
}
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
-
- boolean exist = false;
- for (SubscriptionData sub : subList) {
- if (sub.getTopic().equals(oldTopic)) {
- exist = true;
- break;
- }
- }
-
- if (!exist) {
+ // Check HashSet with O(1) time complexity
+ if (!topicSet.contains(oldTopic)) {
log.warn("subscription changed, group: {} remove topic {} {}",
this.groupName,
oldTopic,
--
2.32.0.windows.2