1287 lines
61 KiB
Diff
1287 lines
61 KiB
Diff
From b2deef179dbc6a9eb1a2b6dd7b652d95cb768295 Mon Sep 17 00:00:00 2001
|
||
From: lizhimins <707364882@qq.com>
|
||
Date: Thu, 10 Aug 2023 10:38:47 +0800
|
||
Subject: [PATCH 01/12] [ISSUE #7144] Accelerate the recovery speed of the
|
||
tiered storage module (#7145)
|
||
|
||
---
|
||
.../tieredstore/TieredDispatcher.java | 3 +
|
||
.../tieredstore/TieredMessageStore.java | 2 +-
|
||
.../common/TieredStoreExecutor.java | 25 ++--
|
||
.../tieredstore/file/CompositeFlatFile.java | 15 +-
|
||
.../file/CompositeQueueFlatFile.java | 20 ++-
|
||
.../tieredstore/file/TieredCommitLog.java | 24 +++-
|
||
.../tieredstore/file/TieredFlatFile.java | 42 +++---
|
||
.../file/TieredFlatFileManager.java | 135 ++++++++++--------
|
||
.../metadata/FileSegmentMetadata.java | 26 +++-
|
||
.../tieredstore/TieredDispatcherTest.java | 15 +-
|
||
.../tieredstore/TieredMessageFetcherTest.java | 2 +-
|
||
.../file/CompositeQueueFlatFileTest.java | 2 +-
|
||
.../file/TieredFlatFileManagerTest.java | 7 +-
|
||
13 files changed, 194 insertions(+), 124 deletions(-)
|
||
|
||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
||
index bb58ea7dd..1746190cd 100644
|
||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
||
@@ -279,6 +279,9 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
|
||
long upperBound = Math.min(dispatchOffset + maxCount, maxOffsetInQueue);
|
||
ConsumeQueue consumeQueue = (ConsumeQueue) defaultStore.getConsumeQueue(topic, queueId);
|
||
|
||
+ logger.debug("DispatchFlatFile race, topic={}, queueId={}, cq range={}-{}, dispatch offset={}-{}",
|
||
+ topic, queueId, minOffsetInQueue, maxOffsetInQueue, dispatchOffset, upperBound - 1);
|
||
+
|
||
for (; dispatchOffset < upperBound; dispatchOffset++) {
|
||
// get consume queue
|
||
SelectMappedBufferResult cqItem = consumeQueue.getIndexBuffer(dispatchOffset);
|
||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
||
index 1f12410f2..ced1fb818 100644
|
||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
||
@@ -147,7 +147,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
||
int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) {
|
||
|
||
if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) {
|
||
- logger.debug("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset);
|
||
+ logger.trace("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset);
|
||
return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
|
||
}
|
||
|
||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
|
||
index 6eb3478b3..6dd0e8846 100644
|
||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
|
||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
|
||
@@ -43,18 +43,9 @@ public class TieredStoreExecutor {
|
||
public static ExecutorService compactIndexFileExecutor;
|
||
|
||
public static void init() {
|
||
- dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
|
||
- dispatchExecutor = new ThreadPoolExecutor(
|
||
- Math.max(2, Runtime.getRuntime().availableProcessors()),
|
||
- Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
|
||
- 1000 * 60,
|
||
- TimeUnit.MILLISECONDS,
|
||
- dispatchThreadPoolQueue,
|
||
- new ThreadFactoryImpl("TieredCommonExecutor_"));
|
||
-
|
||
commonScheduledExecutor = new ScheduledThreadPoolExecutor(
|
||
Math.max(4, Runtime.getRuntime().availableProcessors()),
|
||
- new ThreadFactoryImpl("TieredCommonScheduledExecutor_"));
|
||
+ new ThreadFactoryImpl("TieredCommonExecutor_"));
|
||
|
||
commitExecutor = new ScheduledThreadPoolExecutor(
|
||
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
|
||
@@ -62,7 +53,17 @@ public class TieredStoreExecutor {
|
||
|
||
cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor(
|
||
Math.max(4, Runtime.getRuntime().availableProcessors()),
|
||
- new ThreadFactoryImpl("TieredCleanExpiredFileExecutor_"));
|
||
+ new ThreadFactoryImpl("TieredCleanFileExecutor_"));
|
||
+
|
||
+ dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
|
||
+ dispatchExecutor = new ThreadPoolExecutor(
|
||
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
|
||
+ Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
|
||
+ 1000 * 60,
|
||
+ TimeUnit.MILLISECONDS,
|
||
+ dispatchThreadPoolQueue,
|
||
+ new ThreadFactoryImpl("TieredDispatchExecutor_"),
|
||
+ new ThreadPoolExecutor.DiscardOldestPolicy());
|
||
|
||
fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
|
||
fetchDataExecutor = new ThreadPoolExecutor(
|
||
@@ -71,7 +72,7 @@ public class TieredStoreExecutor {
|
||
1000 * 60,
|
||
TimeUnit.MILLISECONDS,
|
||
fetchDataThreadPoolQueue,
|
||
- new ThreadFactoryImpl("TieredFetchDataExecutor_"));
|
||
+ new ThreadFactoryImpl("TieredFetchExecutor_"));
|
||
|
||
compactIndexFileThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
|
||
compactIndexFileExecutor = new ThreadPoolExecutor(
|
||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
|
||
index df4baf33f..5ad3a6ff3 100644
|
||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
|
||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
|
||
@@ -76,20 +76,15 @@ public class CompositeFlatFile implements CompositeAccess {
|
||
this.storeConfig = fileQueueFactory.getStoreConfig();
|
||
this.readAheadFactor = this.storeConfig.getReadAheadMinFactor();
|
||
this.metadataStore = TieredStoreUtil.getMetadataStore(this.storeConfig);
|
||
- this.dispatchOffset = new AtomicLong();
|
||
this.compositeFlatFileLock = new ReentrantLock();
|
||
this.inFlightRequestMap = new ConcurrentHashMap<>();
|
||
this.commitLog = new TieredCommitLog(fileQueueFactory, filePath);
|
||
this.consumeQueue = new TieredConsumeQueue(fileQueueFactory, filePath);
|
||
+ this.dispatchOffset = new AtomicLong(
|
||
+ this.consumeQueue.isInitialized() ? this.getConsumeQueueCommitOffset() : -1L);
|
||
this.groupOffsetCache = this.initOffsetCache();
|
||
}
|
||
|
||
- protected void recoverMetadata() {
|
||
- if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) {
|
||
- consumeQueue.setBaseOffset(this.dispatchOffset.get() * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
|
||
- }
|
||
- }
|
||
-
|
||
private Cache<String, Long> initOffsetCache() {
|
||
return Caffeine.newBuilder()
|
||
.expireAfterWrite(2, TimeUnit.MINUTES)
|
||
@@ -310,10 +305,12 @@ public class CompositeFlatFile implements CompositeAccess {
|
||
|
||
@Override
|
||
public void initOffset(long offset) {
|
||
- if (!consumeQueue.isInitialized()) {
|
||
+ if (consumeQueue.isInitialized()) {
|
||
+ dispatchOffset.set(this.getConsumeQueueCommitOffset());
|
||
+ } else {
|
||
consumeQueue.setBaseOffset(offset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
|
||
+ dispatchOffset.set(offset);
|
||
}
|
||
- dispatchOffset.set(offset);
|
||
}
|
||
|
||
@Override
|
||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
|
||
index f6c0afed0..0a797f465 100644
|
||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
|
||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
|
||
@@ -36,8 +36,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
|
||
public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, MessageQueue messageQueue) {
|
||
super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue));
|
||
this.messageQueue = messageQueue;
|
||
- this.recoverTopicMetadata();
|
||
- super.recoverMetadata();
|
||
+ this.recoverQueueMetadata();
|
||
this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
|
||
}
|
||
|
||
@@ -46,11 +45,12 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
|
||
if (!consumeQueue.isInitialized()) {
|
||
queueMetadata.setMinOffset(offset);
|
||
queueMetadata.setMaxOffset(offset);
|
||
+ metadataStore.updateQueue(queueMetadata);
|
||
}
|
||
super.initOffset(offset);
|
||
}
|
||
|
||
- public void recoverTopicMetadata() {
|
||
+ public void recoverQueueMetadata() {
|
||
TopicMetadata topicMetadata = this.metadataStore.getTopic(messageQueue.getTopic());
|
||
if (topicMetadata == null) {
|
||
topicMetadata = this.metadataStore.addTopic(messageQueue.getTopic(), -1L);
|
||
@@ -64,18 +64,16 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
|
||
if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) {
|
||
queueMetadata.setMaxOffset(queueMetadata.getMinOffset());
|
||
}
|
||
- this.dispatchOffset.set(queueMetadata.getMaxOffset());
|
||
}
|
||
|
||
- public void persistMetadata() {
|
||
+ public void flushMetadata() {
|
||
try {
|
||
- if (consumeQueue.getCommitOffset() < queueMetadata.getMinOffset()) {
|
||
- return;
|
||
- }
|
||
- queueMetadata.setMaxOffset(consumeQueue.getCommitOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
|
||
+ queueMetadata.setMinOffset(super.getConsumeQueueMinOffset());
|
||
+ queueMetadata.setMaxOffset(super.getConsumeQueueMaxOffset());
|
||
metadataStore.updateQueue(queueMetadata);
|
||
} catch (Exception e) {
|
||
- LOGGER.error("CompositeFlatFile#flushMetadata: update queue metadata failed: topic: {}, queue: {}", messageQueue.getTopic(), messageQueue.getQueueId(), e);
|
||
+ LOGGER.error("CompositeFlatFile#flushMetadata error, topic: {}, queue: {}",
|
||
+ messageQueue.getTopic(), messageQueue.getQueueId(), e);
|
||
}
|
||
}
|
||
|
||
@@ -114,7 +112,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
|
||
@Override
|
||
public void shutdown() {
|
||
super.shutdown();
|
||
- metadataStore.updateQueue(queueMetadata);
|
||
+ this.flushMetadata();
|
||
}
|
||
|
||
@Override
|
||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
|
||
index 80e1bce50..0e5f79132 100644
|
||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
|
||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
|
||
@@ -50,7 +50,7 @@ public class TieredCommitLog {
|
||
this.storeConfig = fileQueueFactory.getStoreConfig();
|
||
this.flatFile = fileQueueFactory.createFlatFileForCommitLog(filePath);
|
||
this.minConsumeQueueOffset = new AtomicLong(NOT_EXIST_MIN_OFFSET);
|
||
- this.correctMinOffset();
|
||
+ this.correctMinOffsetAsync();
|
||
}
|
||
|
||
@VisibleForTesting
|
||
@@ -91,17 +91,26 @@ public class TieredCommitLog {
|
||
return flatFile.getFileToWrite().getMaxTimestamp();
|
||
}
|
||
|
||
- public synchronized long correctMinOffset() {
|
||
+ public long correctMinOffset() {
|
||
+ try {
|
||
+ return correctMinOffsetAsync().get();
|
||
+ } catch (Exception e) {
|
||
+ log.error("Correct min offset failed in clean expired file", e);
|
||
+ }
|
||
+ return NOT_EXIST_MIN_OFFSET;
|
||
+ }
|
||
+
|
||
+ public synchronized CompletableFuture<Long> correctMinOffsetAsync() {
|
||
if (flatFile.getFileSegmentCount() == 0) {
|
||
this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
|
||
- return NOT_EXIST_MIN_OFFSET;
|
||
+ return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET);
|
||
}
|
||
|
||
// queue offset field length is 8
|
||
int length = MessageBufferUtil.QUEUE_OFFSET_POSITION + 8;
|
||
if (flatFile.getCommitOffset() - flatFile.getMinOffset() < length) {
|
||
this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
|
||
- return NOT_EXIST_MIN_OFFSET;
|
||
+ return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET);
|
||
}
|
||
|
||
try {
|
||
@@ -109,7 +118,8 @@ public class TieredCommitLog {
|
||
.thenApply(buffer -> {
|
||
long offset = MessageBufferUtil.getQueueOffset(buffer);
|
||
minConsumeQueueOffset.set(offset);
|
||
- log.info("Correct commitlog min cq offset success, filePath={}, min cq offset={}, range={}-{}",
|
||
+ log.debug("Correct commitlog min cq offset success, " +
|
||
+ "filePath={}, min cq offset={}, commitlog range={}-{}",
|
||
flatFile.getFilePath(), offset, flatFile.getMinOffset(), flatFile.getCommitOffset());
|
||
return offset;
|
||
})
|
||
@@ -117,11 +127,11 @@ public class TieredCommitLog {
|
||
log.warn("Correct commitlog min cq offset error, filePath={}, range={}-{}",
|
||
flatFile.getFilePath(), flatFile.getMinOffset(), flatFile.getCommitOffset(), throwable);
|
||
return minConsumeQueueOffset.get();
|
||
- }).get();
|
||
+ });
|
||
} catch (Exception e) {
|
||
log.error("Correct commitlog min cq offset error, filePath={}", flatFile.getFilePath(), e);
|
||
}
|
||
- return minConsumeQueueOffset.get();
|
||
+ return CompletableFuture.completedFuture(minConsumeQueueOffset.get());
|
||
}
|
||
|
||
public AppendResult append(ByteBuffer byteBuf) {
|
||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
||
index 75ce8d89f..426c4e09d 100644
|
||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
||
@@ -16,6 +16,7 @@
|
||
*/
|
||
package org.apache.rocketmq.tieredstore.file;
|
||
|
||
+import com.alibaba.fastjson.JSON;
|
||
import com.google.common.annotations.VisibleForTesting;
|
||
import java.nio.ByteBuffer;
|
||
import java.util.ArrayList;
|
||
@@ -24,6 +25,7 @@ import java.util.Comparator;
|
||
import java.util.HashSet;
|
||
import java.util.LinkedList;
|
||
import java.util.List;
|
||
+import java.util.Objects;
|
||
import java.util.Set;
|
||
import java.util.concurrent.CompletableFuture;
|
||
import java.util.concurrent.CopyOnWriteArrayList;
|
||
@@ -178,32 +180,26 @@ public class TieredFlatFile {
|
||
private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) {
|
||
|
||
FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
|
||
- fileSegment.getPath(), fileSegment.getFileType(), fileSegment.getBaseOffset());
|
||
-
|
||
- if (metadata != null) {
|
||
- return metadata;
|
||
- }
|
||
+ this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
|
||
|
||
// Note: file segment path may not the same as file base path, use base path here.
|
||
- metadata = new FileSegmentMetadata(
|
||
- this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType());
|
||
-
|
||
- if (fileSegment.isClosed()) {
|
||
- metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
|
||
+ if (metadata == null) {
|
||
+ metadata = new FileSegmentMetadata(
|
||
+ this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType());
|
||
+ metadata.setCreateTimestamp(fileSegment.getMinTimestamp());
|
||
+ metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
|
||
+ metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
|
||
+ if (fileSegment.isClosed()) {
|
||
+ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
|
||
+ }
|
||
+ this.tieredMetadataStore.updateFileSegment(metadata);
|
||
}
|
||
-
|
||
- metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
|
||
- metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
|
||
-
|
||
- // Submit to persist
|
||
- this.tieredMetadataStore.updateFileSegment(metadata);
|
||
return metadata;
|
||
}
|
||
|
||
/**
|
||
* FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full
|
||
*/
|
||
- @VisibleForTesting
|
||
public void updateFileSegment(TieredFileSegment fileSegment) {
|
||
FileSegmentMetadata segmentMetadata = getOrCreateFileSegmentMetadata(fileSegment);
|
||
|
||
@@ -219,9 +215,16 @@ public class TieredFlatFile {
|
||
}
|
||
|
||
segmentMetadata.setSize(fileSegment.getCommitPosition());
|
||
- segmentMetadata.setBeginTimestamp(fileSegment.getMinTimestamp());
|
||
segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp());
|
||
- this.tieredMetadataStore.updateFileSegment(segmentMetadata);
|
||
+
|
||
+ FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
|
||
+ this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
|
||
+
|
||
+ if (!Objects.equals(metadata, segmentMetadata)) {
|
||
+ this.tieredMetadataStore.updateFileSegment(segmentMetadata);
|
||
+ logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, content: {}",
|
||
+ segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata));
|
||
+ }
|
||
}
|
||
|
||
private void checkAndFixFileSize() {
|
||
@@ -257,6 +260,7 @@ public class TieredFlatFile {
|
||
logger.warn("TieredFlatFile#checkAndFixFileSize: fix last file {} size: origin: {}, actual: {}",
|
||
lastFile.getPath(), lastFile.getCommitOffset() - lastFile.getBaseOffset(), lastFileSize);
|
||
lastFile.initPosition(lastFileSize);
|
||
+ this.updateFileSegment(lastFile);
|
||
}
|
||
}
|
||
}
|
||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
|
||
index aeca44b8c..e9ae4a5a5 100644
|
||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
|
||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
|
||
@@ -16,16 +16,19 @@
|
||
*/
|
||
package org.apache.rocketmq.tieredstore.file;
|
||
|
||
+import com.google.common.base.Stopwatch;
|
||
import com.google.common.collect.ImmutableList;
|
||
import java.util.ArrayList;
|
||
import java.util.List;
|
||
import java.util.Random;
|
||
+import java.util.concurrent.CompletableFuture;
|
||
import java.util.concurrent.ConcurrentHashMap;
|
||
import java.util.concurrent.ConcurrentMap;
|
||
-import java.util.concurrent.Future;
|
||
+import java.util.concurrent.Semaphore;
|
||
import java.util.concurrent.TimeUnit;
|
||
import java.util.concurrent.atomic.AtomicLong;
|
||
import javax.annotation.Nullable;
|
||
+import org.apache.rocketmq.common.constant.LoggerName;
|
||
import org.apache.rocketmq.common.message.MessageQueue;
|
||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||
@@ -36,6 +39,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
||
|
||
public class TieredFlatFileManager {
|
||
|
||
+ private static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
|
||
private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
|
||
|
||
private static volatile TieredFlatFileManager instance;
|
||
@@ -44,7 +48,7 @@ public class TieredFlatFileManager {
|
||
private final TieredMetadataStore metadataStore;
|
||
private final TieredMessageStoreConfig storeConfig;
|
||
private final TieredFileAllocator tieredFileAllocator;
|
||
- private final ConcurrentMap<MessageQueue, CompositeQueueFlatFile> queueFlatFileMap;
|
||
+ private final ConcurrentMap<MessageQueue, CompositeQueueFlatFile> flatFileConcurrentMap;
|
||
|
||
public TieredFlatFileManager(TieredMessageStoreConfig storeConfig)
|
||
throws ClassNotFoundException, NoSuchMethodException {
|
||
@@ -52,23 +56,20 @@ public class TieredFlatFileManager {
|
||
this.storeConfig = storeConfig;
|
||
this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
|
||
this.tieredFileAllocator = new TieredFileAllocator(storeConfig);
|
||
- this.queueFlatFileMap = new ConcurrentHashMap<>();
|
||
+ this.flatFileConcurrentMap = new ConcurrentHashMap<>();
|
||
this.doScheduleTask();
|
||
}
|
||
|
||
public static TieredFlatFileManager getInstance(TieredMessageStoreConfig storeConfig) {
|
||
- if (storeConfig == null) {
|
||
+ if (storeConfig == null || instance != null) {
|
||
return instance;
|
||
}
|
||
-
|
||
- if (instance == null) {
|
||
- synchronized (TieredFlatFileManager.class) {
|
||
- if (instance == null) {
|
||
- try {
|
||
- instance = new TieredFlatFileManager(storeConfig);
|
||
- } catch (Exception e) {
|
||
- logger.error("TieredFlatFileManager#getInstance: create flat file manager failed", e);
|
||
- }
|
||
+ synchronized (TieredFlatFileManager.class) {
|
||
+ if (instance == null) {
|
||
+ try {
|
||
+ instance = new TieredFlatFileManager(storeConfig);
|
||
+ } catch (Exception e) {
|
||
+ logger.error("Construct FlatFileManager instance error", e);
|
||
}
|
||
}
|
||
}
|
||
@@ -88,7 +89,7 @@ public class TieredFlatFileManager {
|
||
TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0));
|
||
indexFile = new TieredIndexFile(new TieredFileAllocator(storeConfig), filePath);
|
||
} catch (Exception e) {
|
||
- logger.error("TieredFlatFileManager#getIndexFile: create index file failed", e);
|
||
+ logger.error("Construct FlatFileManager indexFile error", e);
|
||
}
|
||
}
|
||
}
|
||
@@ -105,7 +106,7 @@ public class TieredFlatFileManager {
|
||
flatFile.commitCommitLog();
|
||
} catch (Throwable e) {
|
||
MessageQueue mq = flatFile.getMessageQueue();
|
||
- logger.error("commit commitLog periodically failed: topic: {}, queue: {}",
|
||
+ logger.error("Commit commitLog periodically failed: topic: {}, queue: {}",
|
||
mq.getTopic(), mq.getQueueId(), e);
|
||
}
|
||
}, delay, TimeUnit.MILLISECONDS);
|
||
@@ -114,7 +115,7 @@ public class TieredFlatFileManager {
|
||
flatFile.commitConsumeQueue();
|
||
} catch (Throwable e) {
|
||
MessageQueue mq = flatFile.getMessageQueue();
|
||
- logger.error("commit consumeQueue periodically failed: topic: {}, queue: {}",
|
||
+ logger.error("Commit consumeQueue periodically failed: topic: {}, queue: {}",
|
||
mq.getTopic(), mq.getQueueId(), e);
|
||
}
|
||
}, delay, TimeUnit.MILLISECONDS);
|
||
@@ -125,7 +126,7 @@ public class TieredFlatFileManager {
|
||
indexFile.commit(true);
|
||
}
|
||
} catch (Throwable e) {
|
||
- logger.error("commit indexFile periodically failed", e);
|
||
+ logger.error("Commit indexFile periodically failed", e);
|
||
}
|
||
}, 0, TimeUnit.MILLISECONDS);
|
||
}
|
||
@@ -160,7 +161,7 @@ public class TieredFlatFileManager {
|
||
try {
|
||
doCommit();
|
||
} catch (Throwable e) {
|
||
- logger.error("commit flat file periodically failed: ", e);
|
||
+ logger.error("Commit flat file periodically failed: ", e);
|
||
}
|
||
}, 60, 60, TimeUnit.SECONDS);
|
||
|
||
@@ -168,49 +169,73 @@ public class TieredFlatFileManager {
|
||
try {
|
||
doCleanExpiredFile();
|
||
} catch (Throwable e) {
|
||
- logger.error("clean expired flat file failed: ", e);
|
||
+ logger.error("Clean expired flat file failed: ", e);
|
||
}
|
||
}, 30, 30, TimeUnit.SECONDS);
|
||
}
|
||
|
||
public boolean load() {
|
||
+ Stopwatch stopwatch = Stopwatch.createStarted();
|
||
try {
|
||
- AtomicLong topicSequenceNumber = new AtomicLong();
|
||
- List<Future<?>> futureList = new ArrayList<>();
|
||
- queueFlatFileMap.clear();
|
||
- metadataStore.iterateTopic(topicMetadata -> {
|
||
+ flatFileConcurrentMap.clear();
|
||
+ this.recoverSequenceNumber();
|
||
+ this.recoverTieredFlatFile();
|
||
+ logger.info("Message store recover end, total cost={}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
|
||
+ } catch (Exception e) {
|
||
+ long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
|
||
+ logger.info("Message store recover error, total cost={}ms", costTime);
|
||
+ BROKER_LOG.error("Message store recover error, total cost={}ms", costTime, e);
|
||
+ return false;
|
||
+ }
|
||
+ return true;
|
||
+ }
|
||
+
|
||
+ public void recoverSequenceNumber() {
|
||
+ AtomicLong topicSequenceNumber = new AtomicLong();
|
||
+ metadataStore.iterateTopic(topicMetadata -> {
|
||
+ if (topicMetadata != null && topicMetadata.getTopicId() > 0) {
|
||
topicSequenceNumber.set(Math.max(topicSequenceNumber.get(), topicMetadata.getTopicId()));
|
||
- Future<?> future = TieredStoreExecutor.dispatchExecutor.submit(() -> {
|
||
- if (topicMetadata.getStatus() != 0) {
|
||
- return;
|
||
- }
|
||
+ }
|
||
+ });
|
||
+ metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet());
|
||
+ }
|
||
+
|
||
+ public void recoverTieredFlatFile() {
|
||
+ Semaphore semaphore = new Semaphore((int) (TieredStoreExecutor.QUEUE_CAPACITY * 0.75));
|
||
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
|
||
+ metadataStore.iterateTopic(topicMetadata -> {
|
||
+ try {
|
||
+ semaphore.acquire();
|
||
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
||
try {
|
||
- metadataStore.iterateQueue(topicMetadata.getTopic(),
|
||
- queueMetadata -> getOrCreateFlatFileIfAbsent(
|
||
- new MessageQueue(topicMetadata.getTopic(),
|
||
- storeConfig.getBrokerName(),
|
||
- queueMetadata.getQueue().getQueueId())));
|
||
+ Stopwatch subWatch = Stopwatch.createStarted();
|
||
+ if (topicMetadata.getStatus() != 0) {
|
||
+ return;
|
||
+ }
|
||
+ AtomicLong queueCount = new AtomicLong();
|
||
+ metadataStore.iterateQueue(topicMetadata.getTopic(), queueMetadata -> {
|
||
+ this.getOrCreateFlatFileIfAbsent(new MessageQueue(topicMetadata.getTopic(),
|
||
+ storeConfig.getBrokerName(), queueMetadata.getQueue().getQueueId()));
|
||
+ queueCount.incrementAndGet();
|
||
+ });
|
||
+ logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms",
|
||
+ topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS));
|
||
} catch (Exception e) {
|
||
- logger.error("load mq composite flat file from metadata failed", e);
|
||
+ logger.error("Recover TopicFlatFile error, topic: {}", topicMetadata.getTopic(), e);
|
||
+ } finally {
|
||
+ semaphore.release();
|
||
}
|
||
- });
|
||
- futureList.add(future);
|
||
- });
|
||
-
|
||
- // Wait for load all metadata done
|
||
- for (Future<?> future : futureList) {
|
||
- future.get();
|
||
+ }, TieredStoreExecutor.commitExecutor);
|
||
+ futures.add(future);
|
||
+ } catch (Exception e) {
|
||
+ throw new RuntimeException(e);
|
||
}
|
||
- metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet());
|
||
- } catch (Exception e) {
|
||
- logger.error("load mq composite flat file from metadata failed", e);
|
||
- return false;
|
||
- }
|
||
- return true;
|
||
+ });
|
||
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||
}
|
||
|
||
public void cleanup() {
|
||
- queueFlatFileMap.clear();
|
||
+ flatFileConcurrentMap.clear();
|
||
cleanStaticReference();
|
||
}
|
||
|
||
@@ -221,27 +246,25 @@ public class TieredFlatFileManager {
|
||
|
||
@Nullable
|
||
public CompositeQueueFlatFile getOrCreateFlatFileIfAbsent(MessageQueue messageQueue) {
|
||
- return queueFlatFileMap.computeIfAbsent(messageQueue, mq -> {
|
||
+ return flatFileConcurrentMap.computeIfAbsent(messageQueue, mq -> {
|
||
try {
|
||
- logger.debug("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
|
||
- "try to create new flat file: topic: {}, queueId: {}",
|
||
+ logger.debug("Create new TopicFlatFile, topic: {}, queueId: {}",
|
||
messageQueue.getTopic(), messageQueue.getQueueId());
|
||
return new CompositeQueueFlatFile(tieredFileAllocator, mq);
|
||
} catch (Exception e) {
|
||
- logger.error("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
|
||
- "create new flat file: topic: {}, queueId: {}",
|
||
+ logger.debug("Create new TopicFlatFile failed, topic: {}, queueId: {}",
|
||
messageQueue.getTopic(), messageQueue.getQueueId(), e);
|
||
- return null;
|
||
}
|
||
+ return null;
|
||
});
|
||
}
|
||
|
||
public CompositeQueueFlatFile getFlatFile(MessageQueue messageQueue) {
|
||
- return queueFlatFileMap.get(messageQueue);
|
||
+ return flatFileConcurrentMap.get(messageQueue);
|
||
}
|
||
|
||
public ImmutableList<CompositeQueueFlatFile> deepCopyFlatFileToList() {
|
||
- return ImmutableList.copyOf(queueFlatFileMap.values());
|
||
+ return ImmutableList.copyOf(flatFileConcurrentMap.values());
|
||
}
|
||
|
||
public void shutdown() {
|
||
@@ -270,7 +293,7 @@ public class TieredFlatFileManager {
|
||
}
|
||
|
||
// delete memory reference
|
||
- CompositeQueueFlatFile flatFile = queueFlatFileMap.remove(mq);
|
||
+ CompositeQueueFlatFile flatFile = flatFileConcurrentMap.remove(mq);
|
||
if (flatFile != null) {
|
||
MessageQueue messageQueue = flatFile.getMessageQueue();
|
||
logger.info("TieredFlatFileManager#destroyCompositeFile: " +
|
||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
|
||
index 1b232fc75..2f0fd71de 100644
|
||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
|
||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
|
||
@@ -16,6 +16,8 @@
|
||
*/
|
||
package org.apache.rocketmq.tieredstore.metadata;
|
||
|
||
+import java.util.Objects;
|
||
+
|
||
public class FileSegmentMetadata {
|
||
|
||
public static final int STATUS_NEW = 0;
|
||
@@ -43,7 +45,6 @@ public class FileSegmentMetadata {
|
||
this.baseOffset = baseOffset;
|
||
this.type = type;
|
||
this.status = STATUS_NEW;
|
||
- this.createTimestamp = System.currentTimeMillis();
|
||
}
|
||
|
||
public void markSealed() {
|
||
@@ -122,4 +123,27 @@ public class FileSegmentMetadata {
|
||
public void setSealTimestamp(long sealTimestamp) {
|
||
this.sealTimestamp = sealTimestamp;
|
||
}
|
||
+
|
||
+ @Override
|
||
+ public boolean equals(Object o) {
|
||
+ if (this == o)
|
||
+ return true;
|
||
+ if (o == null || getClass() != o.getClass())
|
||
+ return false;
|
||
+ FileSegmentMetadata metadata = (FileSegmentMetadata) o;
|
||
+ return size == metadata.size
|
||
+ && baseOffset == metadata.baseOffset
|
||
+ && status == metadata.status
|
||
+ && path.equals(metadata.path)
|
||
+ && type == metadata.type
|
||
+ && createTimestamp == metadata.createTimestamp
|
||
+ && beginTimestamp == metadata.beginTimestamp
|
||
+ && endTimestamp == metadata.endTimestamp
|
||
+ && sealTimestamp == metadata.sealTimestamp;
|
||
+ }
|
||
+
|
||
+ @Override
|
||
+ public int hashCode() {
|
||
+ return Objects.hash(type, path, baseOffset, status, size, createTimestamp, beginTimestamp, endTimestamp, sealTimestamp);
|
||
+ }
|
||
}
|
||
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
|
||
index e6adef1d1..5791dc9a4 100644
|
||
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
|
||
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
|
||
@@ -116,19 +116,20 @@ public class TieredDispatcherTest {
|
||
buffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 9);
|
||
flatFile.appendCommitLog(buffer3);
|
||
flatFile.commitCommitLog();
|
||
- Assert.assertEquals(10, flatFile.getDispatchOffset());
|
||
+ Assert.assertEquals(9 + 1, flatFile.getDispatchOffset());
|
||
+ Assert.assertEquals(9, flatFile.getCommitLogDispatchCommitOffset());
|
||
|
||
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 8, 8, 0, 0, buffer1);
|
||
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 9, 9, 0, 0, buffer2);
|
||
dispatcher.buildConsumeQueueAndIndexFile();
|
||
Assert.assertEquals(7, flatFile.getConsumeQueueMaxOffset());
|
||
- Assert.assertEquals(7, flatFile.getDispatchOffset());
|
||
|
||
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 7, 7, 0, 0, buffer1);
|
||
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 8, 8, 0, 0, buffer2);
|
||
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, flatFile, 9, 9, 0, 0, buffer3);
|
||
dispatcher.buildConsumeQueueAndIndexFile();
|
||
- Assert.assertEquals(10, flatFile.getConsumeQueueMaxOffset());
|
||
+ Assert.assertEquals(6, flatFile.getConsumeQueueMinOffset());
|
||
+ Assert.assertEquals(9 + 1, flatFile.getConsumeQueueMaxOffset());
|
||
}
|
||
|
||
@Test
|
||
@@ -142,6 +143,7 @@ public class TieredDispatcherTest {
|
||
Mockito.when(defaultStore.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId())).thenReturn(0L);
|
||
Mockito.when(defaultStore.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId())).thenReturn(9L);
|
||
|
||
+ // mock cq item, position = 7
|
||
ByteBuffer cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
|
||
cqItem.putLong(7);
|
||
cqItem.putInt(MessageBufferUtilTest.MSG_LEN);
|
||
@@ -150,13 +152,13 @@ public class TieredDispatcherTest {
|
||
SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0, cqItem, ConsumeQueue.CQ_STORE_UNIT_SIZE, null);
|
||
Mockito.when(((ConsumeQueue) defaultStore.getConsumeQueue(mq.getTopic(), mq.getQueueId())).getIndexBuffer(6)).thenReturn(mockResult);
|
||
|
||
+ // mock cq item, position = 8
|
||
cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
|
||
cqItem.putLong(8);
|
||
cqItem.putInt(MessageBufferUtilTest.MSG_LEN);
|
||
cqItem.putLong(1);
|
||
cqItem.flip();
|
||
mockResult = new SelectMappedBufferResult(0, cqItem, ConsumeQueue.CQ_STORE_UNIT_SIZE, null);
|
||
-
|
||
Mockito.when(((ConsumeQueue) defaultStore.getConsumeQueue(mq.getTopic(), mq.getQueueId())).getIndexBuffer(7)).thenReturn(mockResult);
|
||
|
||
mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMockedMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null);
|
||
@@ -167,7 +169,10 @@ public class TieredDispatcherTest {
|
||
mockResult = new SelectMappedBufferResult(0, msg, MessageBufferUtilTest.MSG_LEN, null);
|
||
Mockito.when(defaultStore.selectOneMessageByOffset(8, MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
|
||
|
||
- dispatcher.dispatchFlatFile(flatFileManager.getOrCreateFlatFileIfAbsent(mq));
|
||
+ CompositeQueueFlatFile flatFile = flatFileManager.getOrCreateFlatFileIfAbsent(mq);
|
||
+ Assert.assertNotNull(flatFile);
|
||
+ flatFile.initOffset(7);
|
||
+ dispatcher.dispatchFlatFile(flatFile);
|
||
Assert.assertEquals(8, flatFileManager.getFlatFile(mq).getDispatchOffset());
|
||
}
|
||
}
|
||
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
|
||
index d75b2f916..774c6cf64 100644
|
||
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
|
||
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
|
||
@@ -23,6 +23,7 @@ import java.util.Objects;
|
||
import java.util.concurrent.TimeUnit;
|
||
import org.apache.commons.lang3.SystemUtils;
|
||
import org.apache.commons.lang3.tuple.Triple;
|
||
+import org.apache.rocketmq.common.BoundaryType;
|
||
import org.apache.rocketmq.common.message.MessageQueue;
|
||
import org.apache.rocketmq.store.DispatchRequest;
|
||
import org.apache.rocketmq.store.GetMessageResult;
|
||
@@ -40,7 +41,6 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
|
||
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
|
||
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
|
||
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
||
-import org.apache.rocketmq.common.BoundaryType;
|
||
import org.awaitility.Awaitility;
|
||
import org.junit.After;
|
||
import org.junit.Assert;
|
||
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
|
||
index 27efe111e..2e028ada3 100644
|
||
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
|
||
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
|
||
@@ -119,7 +119,7 @@ public class CompositeQueueFlatFileTest {
|
||
Assert.assertEquals(AppendResult.SUCCESS, result);
|
||
|
||
file.commit(true);
|
||
- file.persistMetadata();
|
||
+ file.flushMetadata();
|
||
|
||
QueueMetadata queueMetadata = metadataStore.getQueue(mq);
|
||
Assert.assertEquals(53, queueMetadata.getMaxOffset());
|
||
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
|
||
index b7528c5e4..20fe4dd70 100644
|
||
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
|
||
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
|
||
@@ -72,10 +72,15 @@ public class TieredFlatFileManagerTest {
|
||
|
||
CompositeFlatFile flatFile = flatFileManager.getFlatFile(mq);
|
||
Assert.assertNotNull(flatFile);
|
||
- Assert.assertEquals(100, flatFile.getDispatchOffset());
|
||
+ Assert.assertEquals(-1L, flatFile.getDispatchOffset());
|
||
+ flatFile.initOffset(100L);
|
||
+ Assert.assertEquals(100L, flatFile.getDispatchOffset());
|
||
+ flatFile.initOffset(200L);
|
||
+ Assert.assertEquals(100L, flatFile.getDispatchOffset());
|
||
|
||
CompositeFlatFile flatFile1 = flatFileManager.getFlatFile(mq1);
|
||
Assert.assertNotNull(flatFile1);
|
||
+ flatFile1.initOffset(200L);
|
||
Assert.assertEquals(200, flatFile1.getDispatchOffset());
|
||
|
||
flatFileManager.destroyCompositeFile(mq);
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 99b39a35f29e491862296d56b7938a995d153974 Mon Sep 17 00:00:00 2001
|
||
From: ShuangxiDing <dingshuangxi888@gmail.com>
|
||
Date: Thu, 10 Aug 2023 11:28:39 +0800
|
||
Subject: [PATCH 02/12] [ISSUE #7115] Fix grpc response message NPE (#7116)
|
||
|
||
---
|
||
.../apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java | 2 +-
|
||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||
|
||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java
|
||
index 0b3c85ea6..efa879a9c 100644
|
||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java
|
||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/ResponseBuilder.java
|
||
@@ -92,7 +92,7 @@ public class ResponseBuilder {
|
||
public Status buildStatus(Code code, String message) {
|
||
return Status.newBuilder()
|
||
.setCode(code)
|
||
- .setMessage(message)
|
||
+ .setMessage(message != null ? message : code.name())
|
||
.build();
|
||
}
|
||
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From c0ba453f38183266cf9a69be754e620311e1923b Mon Sep 17 00:00:00 2001
|
||
From: caigy <csgytsai@163.com>
|
||
Date: Thu, 10 Aug 2023 14:08:17 +0800
|
||
Subject: [PATCH 03/12] [ISSUE #7129] Fix resource collisions in acl tests
|
||
(#7130)
|
||
|
||
* run acl tests sequencially to avoid collision
|
||
|
||
* disable reuseForks for acl like broker
|
||
|
||
* Revert "[ISSUE #7135] Temporarily ignoring plainAccessValidator test (#7135)"
|
||
|
||
This reverts commit 6bc2c8474a0ce1e2833c82dffea7b1d8f718fcd7.
|
||
---
|
||
acl/pom.xml | 13 +++++++++++++
|
||
.../acl/plain/PlainAccessControlFlowTest.java | 5 -----
|
||
.../acl/plain/PlainAccessValidatorTest.java | 3 ---
|
||
.../acl/plain/PlainPermissionManagerTest.java | 3 ---
|
||
4 files changed, 13 insertions(+), 11 deletions(-)
|
||
|
||
diff --git a/acl/pom.xml b/acl/pom.xml
|
||
index 67bfcb8d2..989c0cf77 100644
|
||
--- a/acl/pom.xml
|
||
+++ b/acl/pom.xml
|
||
@@ -74,4 +74,17 @@
|
||
<scope>test</scope>
|
||
</dependency>
|
||
</dependencies>
|
||
+
|
||
+ <build>
|
||
+ <plugins>
|
||
+ <plugin>
|
||
+ <artifactId>maven-surefire-plugin</artifactId>
|
||
+ <version>${maven-surefire-plugin.version}</version>
|
||
+ <configuration>
|
||
+ <forkCount>1</forkCount>
|
||
+ <reuseForks>false</reuseForks>
|
||
+ </configuration>
|
||
+ </plugin>
|
||
+ </plugins>
|
||
+ </build>
|
||
</project>
|
||
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
|
||
index e7fd0932f..519345714 100644
|
||
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
|
||
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
|
||
@@ -31,7 +31,6 @@ import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
|
||
import org.junit.Assert;
|
||
-import org.junit.Ignore;
|
||
import org.junit.Test;
|
||
|
||
import java.io.File;
|
||
@@ -44,7 +43,6 @@ import java.util.Collections;
|
||
import java.util.LinkedList;
|
||
import java.util.List;
|
||
|
||
-
|
||
/**
|
||
* <p> In this class, we'll test the following scenarios, each containing several consecutive operations on ACL,
|
||
* <p> like updating and deleting ACL, changing config files and checking validations.
|
||
@@ -52,9 +50,6 @@ import java.util.List;
|
||
* <p> Case 2: Only conf/acl/plain_acl.yml exists;
|
||
* <p> Case 3: Both conf/plain_acl.yml and conf/acl/plain_acl.yml exists.
|
||
*/
|
||
-
|
||
-// Ignore this test case as it is currently unable to pass on ubuntu workflow
|
||
-@Ignore
|
||
public class PlainAccessControlFlowTest {
|
||
public static final String DEFAULT_TOPIC = "topic-acl";
|
||
|
||
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
|
||
index a3a925758..ef0cffbdc 100644
|
||
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
|
||
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
|
||
@@ -56,11 +56,8 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
|
||
import org.junit.After;
|
||
import org.junit.Assert;
|
||
import org.junit.Before;
|
||
-import org.junit.Ignore;
|
||
import org.junit.Test;
|
||
|
||
-// Ignore this test case as it is currently unable to pass on ubuntu workflow
|
||
-@Ignore
|
||
public class PlainAccessValidatorTest {
|
||
|
||
private PlainAccessValidator plainAccessValidator;
|
||
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
|
||
index aa7539f3a..941d8c779 100644
|
||
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
|
||
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
|
||
@@ -29,7 +29,6 @@ import org.assertj.core.api.Assertions;
|
||
import org.assertj.core.util.Lists;
|
||
import org.junit.Assert;
|
||
import org.junit.Before;
|
||
-import org.junit.Ignore;
|
||
import org.junit.Test;
|
||
|
||
import java.io.File;
|
||
@@ -42,8 +41,6 @@ import java.util.List;
|
||
import java.util.Map;
|
||
import java.util.Set;
|
||
|
||
-// Ignore this test case as it is currently unable to pass on ubuntu workflow
|
||
-@Ignore
|
||
public class PlainPermissionManagerTest {
|
||
|
||
PlainPermissionManager plainPermissionManager;
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 8741ff8c9b3bdbfc97976285affa7ea35c81243c Mon Sep 17 00:00:00 2001
|
||
From: ShuangxiDing <dingshuangxi888@gmail.com>
|
||
Date: Thu, 10 Aug 2023 17:41:15 +0800
|
||
Subject: [PATCH 04/12] [ISSUE #7153] Add switch for MIXED message type (#7154)
|
||
|
||
Add a switch for MIXED message type when creating a Topic in the Broker.
|
||
---
|
||
.../broker/processor/AdminBrokerProcessor.java | 8 ++++++++
|
||
.../java/org/apache/rocketmq/common/BrokerConfig.java | 10 ++++++++++
|
||
2 files changed, 18 insertions(+)
|
||
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||
index a6ce03dc2..bbddcec2d 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||
@@ -59,6 +59,7 @@ import org.apache.rocketmq.common.TopicConfig;
|
||
import org.apache.rocketmq.common.UnlockCallback;
|
||
import org.apache.rocketmq.common.UtilAll;
|
||
import org.apache.rocketmq.common.attribute.AttributeParser;
|
||
+import org.apache.rocketmq.common.attribute.TopicMessageType;
|
||
import org.apache.rocketmq.common.constant.ConsumeInitMode;
|
||
import org.apache.rocketmq.common.constant.FIleReadaheadMode;
|
||
import org.apache.rocketmq.common.constant.LoggerName;
|
||
@@ -439,6 +440,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||
String attributesModification = requestHeader.getAttributes();
|
||
topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));
|
||
|
||
+ if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
|
||
+ && !brokerController.getBrokerConfig().isEnableMixedMessageType()) {
|
||
+ response.setCode(ResponseCode.SYSTEM_ERROR);
|
||
+ response.setRemark("MIXED message type is not supported.");
|
||
+ return response;
|
||
+ }
|
||
+
|
||
try {
|
||
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
|
||
if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
||
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||
index a815636b1..99a5db5ad 100644
|
||
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||
@@ -393,6 +393,8 @@ public class BrokerConfig extends BrokerIdentity {
|
||
*/
|
||
private boolean enableSingleTopicRegister = false;
|
||
|
||
+ private boolean enableMixedMessageType = false;
|
||
+
|
||
public long getMaxPopPollingSize() {
|
||
return maxPopPollingSize;
|
||
}
|
||
@@ -1712,4 +1714,12 @@ public class BrokerConfig extends BrokerIdentity {
|
||
public void setEnableSingleTopicRegister(boolean enableSingleTopicRegister) {
|
||
this.enableSingleTopicRegister = enableSingleTopicRegister;
|
||
}
|
||
+
|
||
+ public boolean isEnableMixedMessageType() {
|
||
+ return enableMixedMessageType;
|
||
+ }
|
||
+
|
||
+ public void setEnableMixedMessageType(boolean enableMixedMessageType) {
|
||
+ this.enableMixedMessageType = enableMixedMessageType;
|
||
+ }
|
||
}
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From f534501855f8edbcb58f5b856973bf1027b5cf3a Mon Sep 17 00:00:00 2001
|
||
From: Steven <shirenchuang@users.noreply.github.com>
|
||
Date: Fri, 11 Aug 2023 10:25:48 +0800
|
||
Subject: [PATCH 05/12] [Feature 7155] add errlog when cmd err (#7157)
|
||
MIME-Version: 1.0
|
||
Content-Type: text/plain; charset=UTF-8
|
||
Content-Transfer-Encoding: 8bit
|
||
|
||
Co-authored-by: 十真 <shirenchuang.src@cainiao.com>
|
||
---
|
||
.../src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java | 1 +
|
||
1 file changed, 1 insertion(+)
|
||
|
||
diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
|
||
index b00bad3c5..5a8a7cd54 100644
|
||
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
|
||
+++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
|
||
@@ -52,6 +52,7 @@ public class ServerUtil {
|
||
System.exit(0);
|
||
}
|
||
} catch (ParseException e) {
|
||
+ System.err.println(e.getMessage());
|
||
hf.printHelp(appName, options, true);
|
||
System.exit(1);
|
||
}
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From db58f00c0fe0f129611d654291f2177de55dc9ff Mon Sep 17 00:00:00 2001
|
||
From: Zhouxiang Zhan <zhouxzhan@apache.org>
|
||
Date: Fri, 11 Aug 2023 19:18:30 +0800
|
||
Subject: [PATCH 06/12] [ISSUE #7169] Change metadataThreadPoolQueueCapacity to
|
||
100000 (#7170)
|
||
|
||
---
|
||
.../main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java | 2 +-
|
||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||
|
||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
|
||
index 4f57a7052..39caaa0d9 100644
|
||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
|
||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
|
||
@@ -165,7 +165,7 @@ public class ProxyConfig implements ConfigFile {
|
||
private int subscriptionGroupConfigCacheExpiredInSeconds = 20;
|
||
private int subscriptionGroupConfigCacheMaxNum = 20000;
|
||
private int metadataThreadPoolNums = 3;
|
||
- private int metadataThreadPoolQueueCapacity = 1000;
|
||
+ private int metadataThreadPoolQueueCapacity = 100000;
|
||
|
||
private int transactionHeartbeatThreadPoolNums = 20;
|
||
private int transactionHeartbeatThreadPoolQueueCapacity = 200;
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 1f04e68a2e331ab035b791280c5a91b60fe0c85f Mon Sep 17 00:00:00 2001
|
||
From: yx9o <yangx_soft@163.com>
|
||
Date: Sat, 12 Aug 2023 21:12:22 +0800
|
||
Subject: [PATCH 07/12] [ISSUE #7172] Unified Chinese for Name Server (#7173)
|
||
|
||
---
|
||
docs/cn/concept.md | 2 +-
|
||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||
|
||
diff --git a/docs/cn/concept.md b/docs/cn/concept.md
|
||
index cb2c863bd..3d67e9371 100644
|
||
--- a/docs/cn/concept.md
|
||
+++ b/docs/cn/concept.md
|
||
@@ -17,7 +17,7 @@ RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer
|
||
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
|
||
|
||
## 6 名字服务(Name Server)
|
||
- 名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
|
||
+名字服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
|
||
|
||
## 7 拉取式消费(Pull Consumer)
|
||
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 25005060bbace477eeaaf4c0142cece5213efbbf Mon Sep 17 00:00:00 2001
|
||
From: yx9o <yangx_soft@163.com>
|
||
Date: Sun, 13 Aug 2023 20:52:17 +0800
|
||
Subject: [PATCH 08/12] [ISSUE #7176] Correct mismatched logs (#7177)
|
||
|
||
---
|
||
.../org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java | 2 +-
|
||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||
|
||
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
|
||
index 0055a1cc8..f7a95f0a6 100644
|
||
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
|
||
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
|
||
@@ -522,7 +522,7 @@ public class RouteInfoManager {
|
||
this.lock.writeLock().unlock();
|
||
}
|
||
} catch (Exception e) {
|
||
- log.error("wipeWritePermOfBrokerByLock Exception", e);
|
||
+ log.error("addWritePermOfBrokerByLock Exception", e);
|
||
}
|
||
return 0;
|
||
}
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From ac411daa27117e9115a8fc5e2d5753085f009ed9 Mon Sep 17 00:00:00 2001
|
||
From: yx9o <yangx_soft@163.com>
|
||
Date: Tue, 15 Aug 2023 08:31:00 +0800
|
||
Subject: [PATCH 09/12] [ISSUE #7183] Correct mismatched commandDesc (#7184)
|
||
|
||
---
|
||
.../tools/command/topic/RemappingStaticTopicSubCommand.java | 2 +-
|
||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||
|
||
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
|
||
index 849f680d0..2a08fdb5b 100644
|
||
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
|
||
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
|
||
@@ -47,7 +47,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
|
||
|
||
@Override
|
||
public String commandDesc() {
|
||
- return "Update or create static topic, which has fixed number of queues";
|
||
+ return "Remapping static topic.";
|
||
}
|
||
|
||
@Override
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 55e0cdb2af3ab75a6d892f919d60797f17a99fda Mon Sep 17 00:00:00 2001
|
||
From: redlsz <szliu0927@gmail.com>
|
||
Date: Tue, 15 Aug 2023 19:19:45 +0800
|
||
Subject: [PATCH 10/12] fix: IndexOutOfBoundsException when process pop
|
||
response (#7003)
|
||
|
||
---
|
||
.../org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 5 ++++-
|
||
.../rocketmq/proxy/service/message/LocalMessageService.java | 5 ++++-
|
||
.../rocketmq/remoting/protocol/header/ExtraInfoUtil.java | 4 ++++
|
||
3 files changed, 12 insertions(+), 2 deletions(-)
|
||
|
||
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
|
||
index 708a6acd1..5101ffc8e 100644
|
||
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
|
||
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
|
||
@@ -1174,7 +1174,10 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
|
||
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
|
||
continue;
|
||
}
|
||
- key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
|
||
+ // Value of POP_CK is used to determine whether it is a pop retry,
|
||
+ // cause topic could be rewritten by broker.
|
||
+ key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
|
||
+ messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getQueueId());
|
||
if (!sortMap.containsKey(key)) {
|
||
sortMap.put(key, new ArrayList<>(4));
|
||
}
|
||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
|
||
index 115c140ff..eb2c4d9ee 100644
|
||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
|
||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
|
||
@@ -249,7 +249,10 @@ public class LocalMessageService implements MessageService {
|
||
// <topicMark@queueId, msg queueOffset>
|
||
Map<String, List<Long>> sortMap = new HashMap<>(16);
|
||
for (MessageExt messageExt : messageExtList) {
|
||
- String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
|
||
+ // Value of POP_CK is used to determine whether it is a pop retry,
|
||
+ // cause topic could be rewritten by broker.
|
||
+ String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
|
||
+ messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getQueueId());
|
||
if (!sortMap.containsKey(key)) {
|
||
sortMap.put(key, new ArrayList<>(4));
|
||
}
|
||
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
|
||
index 9a5fa89ab..13094331e 100644
|
||
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
|
||
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
|
||
@@ -282,6 +282,10 @@ public class ExtraInfoUtil {
|
||
return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
|
||
}
|
||
|
||
+ public static String getStartOffsetInfoMapKey(String topic, String popCk, long key) {
|
||
+ return ((topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || popCk != null) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
|
||
+ }
|
||
+
|
||
public static String getQueueOffsetKeyValueKey(long queueId, long queueOffset) {
|
||
return QUEUE_OFFSET + queueId + "%" + queueOffset;
|
||
}
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From a9c0b43f7f6ce5acfc4f2f3069553071fa93dfee Mon Sep 17 00:00:00 2001
|
||
From: yx9o <yangx_soft@163.com>
|
||
Date: Wed, 16 Aug 2023 18:45:00 +0800
|
||
Subject: [PATCH 11/12] [ISSUE #7192] Correct typos (#7193)
|
||
|
||
---
|
||
.../tools/command/consumer/ConsumerProgressSubCommand.java | 2 +-
|
||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||
|
||
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
|
||
index f51a24673..97125b854 100644
|
||
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
|
||
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
|
||
@@ -54,7 +54,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
|
||
|
||
@Override
|
||
public String commandDesc() {
|
||
- return "Query consumers's progress, speed";
|
||
+ return "Query consumer's progress, speed.";
|
||
}
|
||
|
||
@Override
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 5a3de926b816db5a121c1d788430072a3bc942ae Mon Sep 17 00:00:00 2001
|
||
From: Zhouxiang Zhan <zhouxzhan@apache.org>
|
||
Date: Wed, 16 Aug 2023 20:52:53 +0800
|
||
Subject: [PATCH 12/12] Optimize updateSubscription check exist loop (#7190)
|
||
|
||
---
|
||
.../broker/client/ConsumerGroupInfo.java | 17 ++++++-----------
|
||
1 file changed, 6 insertions(+), 11 deletions(-)
|
||
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
|
||
index 867b9c720..1ea58c125 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
|
||
@@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.client;
|
||
|
||
import io.netty.channel.Channel;
|
||
import java.util.ArrayList;
|
||
+import java.util.HashSet;
|
||
import java.util.Iterator;
|
||
import java.util.List;
|
||
import java.util.Map.Entry;
|
||
@@ -172,7 +173,7 @@ public class ConsumerGroupInfo {
|
||
*/
|
||
public boolean updateSubscription(final Set<SubscriptionData> subList) {
|
||
boolean updated = false;
|
||
-
|
||
+ Set<String> topicSet = new HashSet<>();
|
||
for (SubscriptionData sub : subList) {
|
||
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
|
||
if (old == null) {
|
||
@@ -194,22 +195,16 @@ public class ConsumerGroupInfo {
|
||
|
||
this.subscriptionTable.put(sub.getTopic(), sub);
|
||
}
|
||
+ // Add all new topics to the HashSet
|
||
+ topicSet.add(sub.getTopic());
|
||
}
|
||
|
||
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
|
||
while (it.hasNext()) {
|
||
Entry<String, SubscriptionData> next = it.next();
|
||
String oldTopic = next.getKey();
|
||
-
|
||
- boolean exist = false;
|
||
- for (SubscriptionData sub : subList) {
|
||
- if (sub.getTopic().equals(oldTopic)) {
|
||
- exist = true;
|
||
- break;
|
||
- }
|
||
- }
|
||
-
|
||
- if (!exist) {
|
||
+ // Check HashSet with O(1) time complexity
|
||
+ if (!topicSet.contains(oldTopic)) {
|
||
log.warn("subscription changed, group: {} remove topic {} {}",
|
||
this.groupName,
|
||
oldTopic,
|
||
--
|
||
2.32.0.windows.2
|
||
|