1895 lines
96 KiB
Diff
1895 lines
96 KiB
Diff
From bd0e9c09db9748f7f74a0c707579142dccf30afc Mon Sep 17 00:00:00 2001
|
|
From: PiteXChen <44110731+RapperCL@users.noreply.github.com>
|
|
Date: Tue, 29 Aug 2023 19:39:27 +0800
|
|
Subject: [PATCH 1/7] [ISSUE #7111] Remove responseFuture from the
|
|
responseTable when exception occurs (#7112)
|
|
|
|
* remove responseFuture when exception
|
|
* Empty-Commit
|
|
|
|
---------
|
|
Co-authored-by: chenyong152 <chenyong152@midea.com>
|
|
---
|
|
.../apache/rocketmq/remoting/netty/NettyRemotingAbstract.java | 1 +
|
|
1 file changed, 1 insertion(+)
|
|
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
|
|
index 44d6a3df4..fce2de267 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
|
|
@@ -529,6 +529,7 @@ public abstract class NettyRemotingAbstract {
|
|
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
|
|
});
|
|
} catch (Exception e) {
|
|
+ responseTable.remove(opaque);
|
|
responseFuture.release();
|
|
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
|
|
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From c78061bf6ca5f35452510ec4107c46735c51c316 Mon Sep 17 00:00:00 2001
|
|
From: lizhimins <707364882@qq.com>
|
|
Date: Wed, 30 Aug 2023 22:29:51 +0800
|
|
Subject: [PATCH 2/7] [ISSUE#7280] Fix and refactor handle commit exception in
|
|
tiered storage (#7281)
|
|
|
|
* refactor handle commit exception
|
|
|
|
* refactor handle commit exception
|
|
|
|
* fix handle commit exception
|
|
---
|
|
.../tieredstore/TieredDispatcher.java | 3 +-
|
|
.../tieredstore/TieredMessageFetcher.java | 57 ++--
|
|
.../tieredstore/TieredMessageStore.java | 26 +-
|
|
.../provider/TieredFileSegment.java | 291 ++++++++++--------
|
|
.../provider/TieredStoreProvider.java | 8 +-
|
|
.../provider/posix/PosixFileSegment.java | 4 +-
|
|
.../CommitLogInputStream.java} | 30 +-
|
|
.../FileSegmentInputStream.java} | 49 ++-
|
|
.../FileSegmentInputStreamFactory.java} | 26 +-
|
|
.../tieredstore/TieredMessageStoreTest.java | 14 +-
|
|
.../tieredstore/file/TieredFlatFileTest.java | 2 +
|
|
.../tieredstore/file/TieredIndexFileTest.java | 2 +
|
|
...m.java => MockFileSegmentInputStream.java} | 8 +-
|
|
.../TieredFileSegmentInputStreamTest.java | 24 +-
|
|
.../provider/TieredFileSegmentTest.java | 89 +++++-
|
|
.../provider/memory/MemoryFileSegment.java | 27 +-
|
|
.../memory/MemoryFileSegmentWithoutCheck.java | 4 +-
|
|
17 files changed, 427 insertions(+), 237 deletions(-)
|
|
rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/{inputstream/TieredCommitLogInputStream.java => stream/CommitLogInputStream.java} (88%)
|
|
rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/{inputstream/TieredFileSegmentInputStream.java => stream/FileSegmentInputStream.java} (77%)
|
|
rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/{inputstream/TieredFileSegmentInputStreamFactory.java => stream/FileSegmentInputStreamFactory.java} (54%)
|
|
rename tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/{MockTieredFileSegmentInputStream.java => MockFileSegmentInputStream.java} (82%)
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
|
index 1746190cd..430c2b62e 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
|
@@ -318,8 +318,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
|
|
continue;
|
|
case FILE_CLOSED:
|
|
tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
|
|
- logger.info("TieredDispatcher#dispatchFlatFile: file has been close and destroy, " +
|
|
- "topic: {}, queueId: {}", topic, queueId);
|
|
+ logger.info("File has been closed and destroy, topic: {}, queueId: {}", topic, queueId);
|
|
return;
|
|
default:
|
|
dispatchOffset--;
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
|
index 9a9a3e5a5..766ff64f6 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
|
@@ -273,15 +273,17 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
TieredStoreMetricsManager.cacheHit.add(resultWrapperList.size(), attributes);
|
|
}
|
|
|
|
- // if no cached message found and there is currently an inflight request, wait for the request to end before continuing
|
|
+ // If there are no messages in the cache and there are currently requests being pulled.
|
|
+ // We need to wait for the request to return before continuing.
|
|
if (resultWrapperList.isEmpty() && waitInflightRequest) {
|
|
- CompletableFuture<Long> future = flatFile.getInflightRequest(group, queueOffset, maxCount)
|
|
- .getFuture(queueOffset);
|
|
+ CompletableFuture<Long> future =
|
|
+ flatFile.getInflightRequest(group, queueOffset, maxCount).getFuture(queueOffset);
|
|
if (!future.isDone()) {
|
|
Stopwatch stopwatch = Stopwatch.createStarted();
|
|
// to prevent starvation issues, only allow waiting for inflight request once
|
|
return future.thenCompose(v -> {
|
|
- LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: wait for inflight request cost: {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
|
|
+ LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: wait for response cost: {}ms",
|
|
+ stopwatch.elapsed(TimeUnit.MILLISECONDS));
|
|
return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, false);
|
|
});
|
|
}
|
|
@@ -302,7 +304,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
|
|
// if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests
|
|
if (!resultWrapperList.isEmpty()) {
|
|
- LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: cache hit: topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}",
|
|
+ LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " +
|
|
+ "topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}",
|
|
mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size());
|
|
prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
|
|
|
|
@@ -316,8 +319,10 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
}
|
|
|
|
// if cache is miss, immediately pull messages
|
|
- LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: topic: {}, queue: {}, queue offset: {}, max message num: {}",
|
|
+ LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " +
|
|
+ "topic: {}, queue: {}, queue offset: {}, max message num: {}",
|
|
mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
|
|
+
|
|
CompletableFuture<GetMessageResult> resultFuture;
|
|
synchronized (flatFile) {
|
|
int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
|
|
@@ -453,42 +458,42 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
public CompletableFuture<GetMessageResult> getMessageAsync(
|
|
String group, String topic, int queueId, long queueOffset, int maxCount, final MessageFilter messageFilter) {
|
|
|
|
+ GetMessageResult result = new GetMessageResult();
|
|
CompositeQueueFlatFile flatFile = flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, queueId));
|
|
+
|
|
if (flatFile == null) {
|
|
- GetMessageResult result = new GetMessageResult();
|
|
result.setNextBeginOffset(queueOffset);
|
|
result.setStatus(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE);
|
|
return CompletableFuture.completedFuture(result);
|
|
}
|
|
|
|
- GetMessageResult result = new GetMessageResult();
|
|
- long minQueueOffset = flatFile.getConsumeQueueMinOffset();
|
|
- long maxQueueOffset = flatFile.getConsumeQueueCommitOffset();
|
|
- result.setMinOffset(minQueueOffset);
|
|
- result.setMaxOffset(maxQueueOffset);
|
|
+ // Max queue offset means next message put position
|
|
+ result.setMinOffset(flatFile.getConsumeQueueMinOffset());
|
|
+ result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
|
|
+
|
|
+ // Fill result according file offset.
|
|
+ // Offset range | Result | Fix to
|
|
+ // (-oo, 0] | no message | current offset
|
|
+ // (0, min) | too small | min offset
|
|
+ // [min, max) | correct |
|
|
+ // [max, max] | overflow one | max offset
|
|
+ // (max, +oo) | overflow badly | max offset
|
|
|
|
- if (flatFile.getConsumeQueueCommitOffset() <= 0) {
|
|
+ if (result.getMaxOffset() <= 0) {
|
|
result.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
|
|
result.setNextBeginOffset(queueOffset);
|
|
return CompletableFuture.completedFuture(result);
|
|
- }
|
|
-
|
|
- // request range | result
|
|
- // (0, min) | too small
|
|
- // [min, max) | correct
|
|
- // [max, max] | overflow one
|
|
- // (max, +oo) | overflow badly
|
|
- if (queueOffset < minQueueOffset) {
|
|
+ } else if (queueOffset < result.getMinOffset()) {
|
|
result.setStatus(GetMessageStatus.OFFSET_TOO_SMALL);
|
|
- result.setNextBeginOffset(flatFile.getConsumeQueueMinOffset());
|
|
+ result.setNextBeginOffset(result.getMinOffset());
|
|
return CompletableFuture.completedFuture(result);
|
|
- } else if (queueOffset == maxQueueOffset) {
|
|
+ } else if (queueOffset == result.getMaxOffset()) {
|
|
result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
|
|
- result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset());
|
|
+ result.setNextBeginOffset(result.getMaxOffset());
|
|
return CompletableFuture.completedFuture(result);
|
|
- } else if (queueOffset > maxQueueOffset) {
|
|
+ } else if (queueOffset > result.getMaxOffset()) {
|
|
result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
|
|
- result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset());
|
|
+ result.setNextBeginOffset(result.getMaxOffset());
|
|
return CompletableFuture.completedFuture(result);
|
|
}
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
index 5240ac8e9..78e855f36 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
@@ -99,11 +99,11 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
return storeConfig;
|
|
}
|
|
|
|
- public boolean viaTieredStorage(String topic, int queueId, long offset) {
|
|
- return viaTieredStorage(topic, queueId, offset, 1);
|
|
+ public boolean fetchFromCurrentStore(String topic, int queueId, long offset) {
|
|
+ return fetchFromCurrentStore(topic, queueId, offset, 1);
|
|
}
|
|
|
|
- public boolean viaTieredStorage(String topic, int queueId, long offset, int batchSize) {
|
|
+ public boolean fetchFromCurrentStore(String topic, int queueId, long offset, int batchSize) {
|
|
TieredMessageStoreConfig.TieredStorageLevel deepStorageLevel = storeConfig.getTieredStorageLevel();
|
|
|
|
if (deepStorageLevel.check(TieredMessageStoreConfig.TieredStorageLevel.FORCE)) {
|
|
@@ -146,8 +146,10 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
public CompletableFuture<GetMessageResult> getMessageAsync(String group, String topic,
|
|
int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) {
|
|
|
|
- if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) {
|
|
- logger.trace("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset);
|
|
+ if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
|
|
+ logger.trace("GetMessageAsync from current store, topic: {}, queue: {}, offset: {}", topic, queueId, offset);
|
|
+ } else {
|
|
+ logger.trace("GetMessageAsync from next store, topic: {}, queue: {}, offset: {}", topic, queueId, offset);
|
|
return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
|
|
}
|
|
|
|
@@ -168,14 +170,14 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
|
|
if (next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
|
|
TieredStoreMetricsManager.fallbackTotal.add(1, latencyAttributes);
|
|
- logger.debug("GetMessageAsync not found then try back to next store, result: {}, " +
|
|
+ logger.debug("GetMessageAsync not found, then back to next store, result: {}, " +
|
|
"topic: {}, queue: {}, queue offset: {}, offset range: {}-{}",
|
|
result.getStatus(), topic, queueId, offset, result.getMinOffset(), result.getMaxOffset());
|
|
return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
|
|
}
|
|
}
|
|
|
|
- // system topic
|
|
+ // Fetch system topic data from the broker when using the force level.
|
|
if (result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
|
|
if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) {
|
|
return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
|
|
@@ -198,7 +200,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
TieredStoreMetricsManager.messagesOutTotal.add(result.getMessageCount(), messagesOutAttributes);
|
|
}
|
|
|
|
- // fix min or max offset according next store
|
|
+ // Fix min or max offset according next store at last
|
|
long minOffsetInQueue = next.getMinOffsetInQueue(topic, queueId);
|
|
if (minOffsetInQueue >= 0 && minOffsetInQueue < result.getMinOffset()) {
|
|
result.setMinOffset(minOffsetInQueue);
|
|
@@ -209,7 +211,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
}
|
|
return result;
|
|
}).exceptionally(e -> {
|
|
- logger.error("GetMessageAsync from tiered store failed: ", e);
|
|
+ logger.error("GetMessageAsync from tiered store failed", e);
|
|
return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
|
|
});
|
|
}
|
|
@@ -251,7 +253,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
.build();
|
|
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
|
|
if (time < 0) {
|
|
- logger.debug("TieredMessageStore#getEarliestMessageTimeAsync: get earliest message time failed, try to get earliest message time from next store: topic: {}, queue: {}",
|
|
+ logger.debug("GetEarliestMessageTimeAsync failed, try to get earliest message time from next store: topic: {}, queue: {}",
|
|
topic, queueId);
|
|
return finalNextEarliestMessageTime != Long.MAX_VALUE ? finalNextEarliestMessageTime : -1;
|
|
}
|
|
@@ -262,7 +264,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
@Override
|
|
public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, int queueId,
|
|
long consumeQueueOffset) {
|
|
- if (viaTieredStorage(topic, queueId, consumeQueueOffset)) {
|
|
+ if (fetchFromCurrentStore(topic, queueId, consumeQueueOffset)) {
|
|
Stopwatch stopwatch = Stopwatch.createStarted();
|
|
return fetcher.getMessageStoreTimeStampAsync(topic, queueId, consumeQueueOffset)
|
|
.thenApply(time -> {
|
|
@@ -272,7 +274,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
.build();
|
|
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
|
|
if (time == -1) {
|
|
- logger.debug("TieredMessageStore#getMessageStoreTimeStampAsync: get message time failed, try to get message time from next store: topic: {}, queue: {}, queue offset: {}",
|
|
+ logger.debug("GetEarliestMessageTimeAsync failed, try to get message time from next store, topic: {}, queue: {}, queue offset: {}",
|
|
topic, queueId, consumeQueueOffset);
|
|
return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset);
|
|
}
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
|
|
index 5062c7d9e..32911a6e8 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
|
|
@@ -16,14 +16,11 @@
|
|
*/
|
|
package org.apache.rocketmq.tieredstore.provider;
|
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
-import com.google.common.base.Stopwatch;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.Semaphore;
|
|
-import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
@@ -35,8 +32,8 @@ import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
|
|
import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
|
|
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
|
|
import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
|
|
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
|
|
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
|
|
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
|
|
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory;
|
|
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
|
|
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
|
|
@@ -50,22 +47,23 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
|
|
protected final TieredMessageStoreConfig storeConfig;
|
|
|
|
private final long maxSize;
|
|
- private final ReentrantLock bufferLock;
|
|
- private final Semaphore commitLock;
|
|
+ private final ReentrantLock bufferLock = new ReentrantLock();
|
|
+ private final Semaphore commitLock = new Semaphore(1);
|
|
|
|
- private volatile boolean full;
|
|
- private volatile boolean closed;
|
|
+ private volatile boolean full = false;
|
|
+ private volatile boolean closed = false;
|
|
|
|
- private volatile long minTimestamp;
|
|
- private volatile long maxTimestamp;
|
|
- private volatile long commitPosition;
|
|
- private volatile long appendPosition;
|
|
+ private volatile long minTimestamp = Long.MAX_VALUE;
|
|
+ private volatile long maxTimestamp = Long.MAX_VALUE;
|
|
+ private volatile long commitPosition = 0L;
|
|
+ private volatile long appendPosition = 0L;
|
|
|
|
// only used in commitLog
|
|
- private volatile long dispatchCommitOffset = 0;
|
|
+ private volatile long dispatchCommitOffset = 0L;
|
|
|
|
private ByteBuffer codaBuffer;
|
|
- private List<ByteBuffer> uploadBufferList = new ArrayList<>();
|
|
+ private List<ByteBuffer> bufferList = new ArrayList<>();
|
|
+ private FileSegmentInputStream fileSegmentInputStream;
|
|
private CompletableFuture<Boolean> flightCommitRequest = CompletableFuture.completedFuture(false);
|
|
|
|
public TieredFileSegment(TieredMessageStoreConfig storeConfig,
|
|
@@ -75,21 +73,13 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
|
|
this.fileType = fileType;
|
|
this.filePath = filePath;
|
|
this.baseOffset = baseOffset;
|
|
-
|
|
- this.closed = false;
|
|
- this.bufferLock = new ReentrantLock();
|
|
- this.commitLock = new Semaphore(1);
|
|
-
|
|
- this.commitPosition = 0L;
|
|
- this.appendPosition = 0L;
|
|
- this.minTimestamp = Long.MAX_VALUE;
|
|
- this.maxTimestamp = Long.MAX_VALUE;
|
|
-
|
|
- // The max segment size of a file is determined by the file type
|
|
- this.maxSize = getMaxSizeAccordingFileType(storeConfig);
|
|
+ this.maxSize = getMaxSizeByFileType();
|
|
}
|
|
|
|
- private long getMaxSizeAccordingFileType(TieredMessageStoreConfig storeConfig) {
|
|
+ /**
|
|
+ * The max segment size of a file is determined by the file type
|
|
+ */
|
|
+ protected long getMaxSizeByFileType() {
|
|
switch (fileType) {
|
|
case COMMIT_LOG:
|
|
return storeConfig.getTieredStoreCommitLogMaxSize();
|
|
@@ -184,39 +174,23 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
|
|
this.appendPosition = pos;
|
|
}
|
|
|
|
- private List<ByteBuffer> rollingUploadBuffer() {
|
|
+ private List<ByteBuffer> borrowBuffer() {
|
|
bufferLock.lock();
|
|
try {
|
|
- List<ByteBuffer> tmp = uploadBufferList;
|
|
- uploadBufferList = new ArrayList<>();
|
|
+ List<ByteBuffer> tmp = bufferList;
|
|
+ bufferList = new ArrayList<>();
|
|
return tmp;
|
|
} finally {
|
|
bufferLock.unlock();
|
|
}
|
|
}
|
|
|
|
- private void sendBackBuffer(TieredFileSegmentInputStream inputStream) {
|
|
- bufferLock.lock();
|
|
- try {
|
|
- List<ByteBuffer> tmpBufferList = inputStream.getUploadBufferList();
|
|
- for (ByteBuffer buffer : tmpBufferList) {
|
|
- buffer.rewind();
|
|
- }
|
|
- tmpBufferList.addAll(uploadBufferList);
|
|
- uploadBufferList = tmpBufferList;
|
|
- if (inputStream.getCodaBuffer() != null) {
|
|
- codaBuffer.rewind();
|
|
- }
|
|
- } finally {
|
|
- bufferLock.unlock();
|
|
- }
|
|
- }
|
|
-
|
|
@SuppressWarnings("NonAtomicOperationOnVolatileField")
|
|
- public AppendResult append(ByteBuffer byteBuf, long timeStamp) {
|
|
+ public AppendResult append(ByteBuffer byteBuf, long timestamp) {
|
|
if (closed) {
|
|
return AppendResult.FILE_CLOSED;
|
|
}
|
|
+
|
|
bufferLock.lock();
|
|
try {
|
|
if (full || codaBuffer != null) {
|
|
@@ -227,7 +201,8 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
|
|
minTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
|
|
maxTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
|
|
appendPosition += byteBuf.remaining();
|
|
- uploadBufferList.add(byteBuf);
|
|
+ // IndexFile is large and not change after compaction, no need deep copy
|
|
+ bufferList.add(byteBuf);
|
|
setFull();
|
|
return AppendResult.SUCCESS;
|
|
}
|
|
@@ -236,23 +211,34 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
|
|
setFull();
|
|
return AppendResult.FILE_FULL;
|
|
}
|
|
- if (uploadBufferList.size() > storeConfig.getTieredStoreGroupCommitCount()
|
|
+
|
|
+ if (bufferList.size() > storeConfig.getTieredStoreGroupCommitCount()
|
|
|| appendPosition - commitPosition > storeConfig.getTieredStoreGroupCommitSize()) {
|
|
commitAsync();
|
|
}
|
|
- if (uploadBufferList.size() > storeConfig.getTieredStoreMaxGroupCommitCount()) {
|
|
- logger.debug("TieredFileSegment#append: buffer full: file: {}, upload buffer size: {}",
|
|
- getPath(), uploadBufferList.size());
|
|
+
|
|
+ if (bufferList.size() > storeConfig.getTieredStoreMaxGroupCommitCount()) {
|
|
+ logger.debug("File segment append buffer full, file: {}, buffer size: {}, pending bytes: {}",
|
|
+ getPath(), bufferList.size(), appendPosition - commitPosition);
|
|
return AppendResult.BUFFER_FULL;
|
|
}
|
|
- if (timeStamp != Long.MAX_VALUE) {
|
|
- maxTimestamp = timeStamp;
|
|
+
|
|
+ if (timestamp != Long.MAX_VALUE) {
|
|
+ maxTimestamp = timestamp;
|
|
if (minTimestamp == Long.MAX_VALUE) {
|
|
- minTimestamp = timeStamp;
|
|
+ minTimestamp = timestamp;
|
|
}
|
|
}
|
|
+
|
|
appendPosition += byteBuf.remaining();
|
|
- uploadBufferList.add(byteBuf);
|
|
+
|
|
+ // deep copy buffer
|
|
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(byteBuf.remaining());
|
|
+ byteBuffer.put(byteBuf);
|
|
+ byteBuffer.flip();
|
|
+ byteBuf.rewind();
|
|
+
|
|
+ bufferList.add(byteBuffer);
|
|
return AppendResult.SUCCESS;
|
|
} finally {
|
|
bufferLock.unlock();
|
|
@@ -267,7 +253,6 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
|
|
return appendPosition;
|
|
}
|
|
|
|
- @VisibleForTesting
|
|
public void setAppendPosition(long appendPosition) {
|
|
this.appendPosition = appendPosition;
|
|
}
|
|
@@ -333,6 +318,8 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
|
|
if (closed) {
|
|
return false;
|
|
}
|
|
+ // result is false when we send real commit request
|
|
+ // use join for wait flight request done
|
|
Boolean result = commitAsync().join();
|
|
if (!result) {
|
|
result = flightCommitRequest.join();
|
|
@@ -340,92 +327,156 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
|
|
return result;
|
|
}
|
|
|
|
+ private void releaseCommitLock() {
|
|
+ if (commitLock.availablePermits() == 0) {
|
|
+ commitLock.release();
|
|
+ } else {
|
|
+ logger.error("[Bug] FileSegmentCommitAsync, lock is already released: available permits: {}",
|
|
+ commitLock.availablePermits());
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private void updateDispatchCommitOffset(List<ByteBuffer> bufferList) {
|
|
+ if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) {
|
|
+ dispatchCommitOffset =
|
|
+ MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * @return false: commit, true: no commit operation
|
|
+ */
|
|
@SuppressWarnings("NonAtomicOperationOnVolatileField")
|
|
public CompletableFuture<Boolean> commitAsync() {
|
|
if (closed) {
|
|
return CompletableFuture.completedFuture(false);
|
|
}
|
|
- Stopwatch stopwatch = Stopwatch.createStarted();
|
|
+
|
|
if (!needCommit()) {
|
|
return CompletableFuture.completedFuture(true);
|
|
}
|
|
- try {
|
|
- int permits = commitLock.drainPermits();
|
|
- if (permits <= 0) {
|
|
- return CompletableFuture.completedFuture(false);
|
|
- }
|
|
- } catch (Exception e) {
|
|
+
|
|
+ if (commitLock.drainPermits() <= 0) {
|
|
return CompletableFuture.completedFuture(false);
|
|
}
|
|
- List<ByteBuffer> bufferList = rollingUploadBuffer();
|
|
- int bufferSize = 0;
|
|
- for (ByteBuffer buffer : bufferList) {
|
|
- bufferSize += buffer.remaining();
|
|
- }
|
|
- if (codaBuffer != null) {
|
|
- bufferSize += codaBuffer.remaining();
|
|
- }
|
|
- if (bufferSize == 0) {
|
|
- return CompletableFuture.completedFuture(true);
|
|
- }
|
|
- TieredFileSegmentInputStream inputStream = TieredFileSegmentInputStreamFactory.build(
|
|
- fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize);
|
|
- int finalBufferSize = bufferSize;
|
|
+
|
|
try {
|
|
- flightCommitRequest = commit0(inputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX)
|
|
+ if (fileSegmentInputStream != null) {
|
|
+ long fileSize = this.getSize();
|
|
+ if (fileSize == -1L) {
|
|
+ logger.error("Get commit position error before commit, Commit: %d, Expect: %d, Current Max: %d, FileName: %s",
|
|
+ commitPosition, commitPosition + fileSegmentInputStream.getContentLength(), appendPosition, getPath());
|
|
+ releaseCommitLock();
|
|
+ return CompletableFuture.completedFuture(false);
|
|
+ } else {
|
|
+ if (correctPosition(fileSize, null)) {
|
|
+ updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
|
|
+ fileSegmentInputStream = null;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ int bufferSize;
|
|
+ if (fileSegmentInputStream != null) {
|
|
+ bufferSize = fileSegmentInputStream.available();
|
|
+ } else {
|
|
+ List<ByteBuffer> bufferList = borrowBuffer();
|
|
+ bufferSize = bufferList.stream().mapToInt(ByteBuffer::remaining).sum()
|
|
+ + (codaBuffer != null ? codaBuffer.remaining() : 0);
|
|
+ if (bufferSize == 0) {
|
|
+ releaseCommitLock();
|
|
+ return CompletableFuture.completedFuture(true);
|
|
+ }
|
|
+ fileSegmentInputStream = FileSegmentInputStreamFactory.build(
|
|
+ fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize);
|
|
+ }
|
|
+
|
|
+ return flightCommitRequest = this
|
|
+ .commit0(fileSegmentInputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX)
|
|
.thenApply(result -> {
|
|
if (result) {
|
|
- if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) {
|
|
- dispatchCommitOffset = MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
|
|
- }
|
|
- commitPosition += finalBufferSize;
|
|
+ updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
|
|
+ commitPosition += bufferSize;
|
|
+ fileSegmentInputStream = null;
|
|
return true;
|
|
- }
|
|
- sendBackBuffer(inputStream);
|
|
- return false;
|
|
- })
|
|
- .exceptionally(e -> handleCommitException(inputStream, e))
|
|
- .whenComplete((result, e) -> {
|
|
- if (commitLock.availablePermits() == 0) {
|
|
- logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize);
|
|
- commitLock.release();
|
|
} else {
|
|
- logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits());
|
|
+ fileSegmentInputStream.rewind();
|
|
+ return false;
|
|
}
|
|
- });
|
|
- return flightCommitRequest;
|
|
+ })
|
|
+ .exceptionally(this::handleCommitException)
|
|
+ .whenComplete((result, e) -> releaseCommitLock());
|
|
+
|
|
} catch (Exception e) {
|
|
- handleCommitException(inputStream, e);
|
|
- if (commitLock.availablePermits() == 0) {
|
|
- logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize);
|
|
- commitLock.release();
|
|
- } else {
|
|
- logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits());
|
|
- }
|
|
+ handleCommitException(e);
|
|
+ releaseCommitLock();
|
|
}
|
|
return CompletableFuture.completedFuture(false);
|
|
}
|
|
|
|
- private boolean handleCommitException(TieredFileSegmentInputStream inputStream, Throwable e) {
|
|
+ private long getCorrectFileSize(Throwable throwable) {
|
|
+ if (throwable instanceof TieredStoreException) {
|
|
+ long fileSize = ((TieredStoreException) throwable).getPosition();
|
|
+ if (fileSize > 0) {
|
|
+ return fileSize;
|
|
+ }
|
|
+ }
|
|
+ return getSize();
|
|
+ }
|
|
+
|
|
+ private boolean handleCommitException(Throwable e) {
|
|
+ // Get root cause here
|
|
Throwable cause = e.getCause() != null ? e.getCause() : e;
|
|
- sendBackBuffer(inputStream);
|
|
- long realSize = 0;
|
|
- if (cause instanceof TieredStoreException && ((TieredStoreException) cause).getPosition() > 0) {
|
|
- realSize = ((TieredStoreException) cause).getPosition();
|
|
+ long fileSize = this.getCorrectFileSize(cause);
|
|
+
|
|
+ if (fileSize == -1L) {
|
|
+ logger.error("Get commit position error, Commit: %d, Expect: %d, Current Max: %d, FileName: %s",
|
|
+ commitPosition, commitPosition + fileSegmentInputStream.getContentLength(), appendPosition, getPath());
|
|
+ fileSegmentInputStream.rewind();
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ if (correctPosition(fileSize, cause)) {
|
|
+ updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
|
|
+ fileSegmentInputStream = null;
|
|
+ return true;
|
|
+ } else {
|
|
+ fileSegmentInputStream.rewind();
|
|
+ return false;
|
|
}
|
|
- if (realSize <= 0) {
|
|
- realSize = getSize();
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * return true to clear buffer
|
|
+ */
|
|
+ private boolean correctPosition(long fileSize, Throwable throwable) {
|
|
+
|
|
+ // Current we have three offsets here: commit offset, expect offset, file size.
|
|
+ // We guarantee that the commit offset is less than or equal to the expect offset.
|
|
+ // Max offset will increase because we can continuously put in new buffers
|
|
+ String handleInfo = throwable == null ? "before commit" : "after commit";
|
|
+ long expectPosition = commitPosition + fileSegmentInputStream.getContentLength();
|
|
+
|
|
+ String offsetInfo = String.format("Correct Commit Position, %s, result=[{}], " +
|
|
+ "Commit: %d, Expect: %d, Current Max: %d, FileSize: %d, FileName: %s",
|
|
+ handleInfo, commitPosition, expectPosition, appendPosition, fileSize, this.getPath());
|
|
+
|
|
+ // We are believing that the file size returned by the server is correct,
|
|
+ // can reset the commit offset to the file size reported by the storage system.
|
|
+ if (fileSize == expectPosition) {
|
|
+ logger.info(offsetInfo, "Success", throwable);
|
|
+ commitPosition = fileSize;
|
|
+ return true;
|
|
}
|
|
- if (realSize > 0 && realSize > commitPosition) {
|
|
- logger.error("TieredFileSegment#handleCommitException: commit failed: file: {}, try to fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause);
|
|
- // TODO check if this diff part is uploaded to backend storage
|
|
- long diff = appendPosition - commitPosition;
|
|
- commitPosition = realSize;
|
|
- appendPosition = realSize + diff;
|
|
- // TODO check if appendPosition is large than maxOffset
|
|
- } else if (realSize < commitPosition) {
|
|
- logger.error("[Bug]TieredFileSegment#handleCommitException: commit failed: file: {}, can not fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause);
|
|
+
|
|
+ if (fileSize < commitPosition) {
|
|
+ logger.error(offsetInfo, "FileSizeIncorrect", throwable);
|
|
+ } else if (fileSize == commitPosition) {
|
|
+ logger.warn(offsetInfo, "CommitFailed", throwable);
|
|
+ } else if (fileSize > commitPosition) {
|
|
+ logger.warn(offsetInfo, "PartialSuccess", throwable);
|
|
}
|
|
+ commitPosition = fileSize;
|
|
return false;
|
|
}
|
|
}
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
|
|
index 5a0ca25f5..0db3eaf8f 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
|
|
@@ -18,7 +18,7 @@ package org.apache.rocketmq.tieredstore.provider;
|
|
|
|
import java.nio.ByteBuffer;
|
|
import java.util.concurrent.CompletableFuture;
|
|
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
|
|
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
|
|
|
|
public interface TieredStoreProvider {
|
|
|
|
@@ -30,7 +30,9 @@ public interface TieredStoreProvider {
|
|
String getPath();
|
|
|
|
/**
|
|
- * Get file size in backend file system
|
|
+ * Get the real length of the file.
|
|
+ * Return 0 if the file does not exist,
|
|
+ * Return -1 if system get size failed.
|
|
*
|
|
* @return file real size
|
|
*/
|
|
@@ -71,5 +73,5 @@ public interface TieredStoreProvider {
|
|
* @param append try to append or create a new file
|
|
* @return put result, <code>true</code> if data successfully write; <code>false</code> otherwise
|
|
*/
|
|
- CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream,long position, int length, boolean append);
|
|
+ CompletableFuture<Boolean> commit0(FileSegmentInputStream inputStream,long position, int length, boolean append);
|
|
}
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
|
|
index 52be90b1d..7e949cb28 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
|
|
@@ -36,7 +36,7 @@ import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
|
|
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
|
|
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
|
|
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
|
|
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
|
|
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
|
|
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
|
|
import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
|
|
@@ -184,7 +184,7 @@ public class PosixFileSegment extends TieredFileSegment {
|
|
|
|
@Override
|
|
public CompletableFuture<Boolean> commit0(
|
|
- TieredFileSegmentInputStream inputStream, long position, int length, boolean append) {
|
|
+ FileSegmentInputStream inputStream, long position, int length, boolean append) {
|
|
|
|
Stopwatch stopwatch = Stopwatch.createStarted();
|
|
AttributesBuilder attributesBuilder = newAttributesBuilder()
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java
|
|
similarity index 88%
|
|
rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
|
|
rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java
|
|
index c70bb7656..13b6e0ef9 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java
|
|
@@ -15,7 +15,7 @@
|
|
* limitations under the License.
|
|
*/
|
|
|
|
-package org.apache.rocketmq.tieredstore.provider.inputstream;
|
|
+package org.apache.rocketmq.tieredstore.provider.stream;
|
|
|
|
import java.io.IOException;
|
|
import java.nio.ByteBuffer;
|
|
@@ -23,20 +23,23 @@ import java.util.List;
|
|
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
|
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
|
|
|
|
-public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
|
|
+public class CommitLogInputStream extends FileSegmentInputStream {
|
|
|
|
/**
|
|
* commitLogOffset is the real physical offset of the commitLog buffer which is being read
|
|
*/
|
|
+ private final long startCommitLogOffset;
|
|
+
|
|
private long commitLogOffset;
|
|
|
|
private final ByteBuffer codaBuffer;
|
|
|
|
private long markCommitLogOffset = -1;
|
|
|
|
- public TieredCommitLogInputStream(FileSegmentType fileType, long startOffset,
|
|
+ public CommitLogInputStream(FileSegmentType fileType, long startOffset,
|
|
List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) {
|
|
super(fileType, uploadBufferList, contentLength);
|
|
+ this.startCommitLogOffset = startOffset;
|
|
this.commitLogOffset = startOffset;
|
|
this.codaBuffer = codaBuffer;
|
|
}
|
|
@@ -53,6 +56,15 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
|
|
this.commitLogOffset = markCommitLogOffset;
|
|
}
|
|
|
|
+ @Override
|
|
+ public synchronized void rewind() {
|
|
+ super.rewind();
|
|
+ this.commitLogOffset = this.startCommitLogOffset;
|
|
+ if (this.codaBuffer != null) {
|
|
+ this.codaBuffer.rewind();
|
|
+ }
|
|
+ }
|
|
+
|
|
@Override
|
|
public ByteBuffer getCodaBuffer() {
|
|
return this.codaBuffer;
|
|
@@ -64,17 +76,17 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
|
|
return -1;
|
|
}
|
|
readPosition++;
|
|
- if (curReadBufferIndex >= uploadBufferList.size()) {
|
|
+ if (curReadBufferIndex >= bufferList.size()) {
|
|
return readCoda();
|
|
}
|
|
int res;
|
|
if (readPosInCurBuffer >= curBuffer.remaining()) {
|
|
curReadBufferIndex++;
|
|
- if (curReadBufferIndex >= uploadBufferList.size()) {
|
|
+ if (curReadBufferIndex >= bufferList.size()) {
|
|
readPosInCurBuffer = 0;
|
|
return readCoda();
|
|
}
|
|
- curBuffer = uploadBufferList.get(curReadBufferIndex);
|
|
+ curBuffer = bufferList.get(curReadBufferIndex);
|
|
commitLogOffset += readPosInCurBuffer;
|
|
readPosInCurBuffer = 0;
|
|
}
|
|
@@ -119,9 +131,9 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
|
|
int posInCurBuffer = readPosInCurBuffer;
|
|
long curCommitLogOffset = commitLogOffset;
|
|
ByteBuffer curBuf = curBuffer;
|
|
- while (needRead > 0 && bufIndex <= uploadBufferList.size()) {
|
|
+ while (needRead > 0 && bufIndex <= bufferList.size()) {
|
|
int readLen, remaining, realReadLen = 0;
|
|
- if (bufIndex == uploadBufferList.size()) {
|
|
+ if (bufIndex == bufferList.size()) {
|
|
// read from coda buffer
|
|
remaining = codaBuffer.remaining() - posInCurBuffer;
|
|
readLen = Math.min(remaining, needRead);
|
|
@@ -137,7 +149,7 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
|
|
}
|
|
remaining = curBuf.remaining() - posInCurBuffer;
|
|
readLen = Math.min(remaining, needRead);
|
|
- curBuf = uploadBufferList.get(bufIndex);
|
|
+ curBuf = bufferList.get(bufIndex);
|
|
if (posInCurBuffer < MessageBufferUtil.PHYSICAL_OFFSET_POSITION) {
|
|
realReadLen = Math.min(MessageBufferUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer, readLen);
|
|
// read from commitLog buffer
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java
|
|
similarity index 77%
|
|
rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
|
|
rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java
|
|
index e1758ca93..9e9d5135c 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java
|
|
@@ -15,15 +15,16 @@
|
|
* limitations under the License.
|
|
*/
|
|
|
|
-package org.apache.rocketmq.tieredstore.provider.inputstream;
|
|
+package org.apache.rocketmq.tieredstore.provider.stream;
|
|
|
|
import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.List;
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
|
|
|
-public class TieredFileSegmentInputStream extends InputStream {
|
|
+public class FileSegmentInputStream extends InputStream {
|
|
|
|
/**
|
|
* file type, can be commitlog, consume queue or indexfile now
|
|
@@ -33,7 +34,7 @@ public class TieredFileSegmentInputStream extends InputStream {
|
|
/**
|
|
* hold bytebuffer
|
|
*/
|
|
- protected final List<ByteBuffer> uploadBufferList;
|
|
+ protected final List<ByteBuffer> bufferList;
|
|
|
|
/**
|
|
* total remaining of bytebuffer list
|
|
@@ -65,13 +66,13 @@ public class TieredFileSegmentInputStream extends InputStream {
|
|
|
|
private int markReadPosInCurBuffer = -1;
|
|
|
|
- public TieredFileSegmentInputStream(FileSegmentType fileType, List<ByteBuffer> uploadBufferList,
|
|
- int contentLength) {
|
|
+ public FileSegmentInputStream(
|
|
+ FileSegmentType fileType, List<ByteBuffer> bufferList, int contentLength) {
|
|
this.fileType = fileType;
|
|
this.contentLength = contentLength;
|
|
- this.uploadBufferList = uploadBufferList;
|
|
- if (uploadBufferList != null && uploadBufferList.size() > 0) {
|
|
- this.curBuffer = uploadBufferList.get(curReadBufferIndex);
|
|
+ this.bufferList = bufferList;
|
|
+ if (bufferList != null && bufferList.size() > 0) {
|
|
+ this.curBuffer = bufferList.get(curReadBufferIndex);
|
|
}
|
|
}
|
|
|
|
@@ -95,18 +96,34 @@ public class TieredFileSegmentInputStream extends InputStream {
|
|
this.readPosition = markReadPosition;
|
|
this.curReadBufferIndex = markCurReadBufferIndex;
|
|
this.readPosInCurBuffer = markReadPosInCurBuffer;
|
|
- if (this.curReadBufferIndex < uploadBufferList.size()) {
|
|
- this.curBuffer = uploadBufferList.get(curReadBufferIndex);
|
|
+ if (this.curReadBufferIndex < bufferList.size()) {
|
|
+ this.curBuffer = bufferList.get(curReadBufferIndex);
|
|
}
|
|
}
|
|
|
|
+ public synchronized void rewind() {
|
|
+ this.readPosition = 0;
|
|
+ this.curReadBufferIndex = 0;
|
|
+ this.readPosInCurBuffer = 0;
|
|
+ if (CollectionUtils.isNotEmpty(bufferList)) {
|
|
+ this.curBuffer = bufferList.get(0);
|
|
+ for (ByteBuffer buffer : bufferList) {
|
|
+ buffer.rewind();
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public int getContentLength() {
|
|
+ return contentLength;
|
|
+ }
|
|
+
|
|
@Override
|
|
public int available() {
|
|
return contentLength - readPosition;
|
|
}
|
|
|
|
- public List<ByteBuffer> getUploadBufferList() {
|
|
- return uploadBufferList;
|
|
+ public List<ByteBuffer> getBufferList() {
|
|
+ return bufferList;
|
|
}
|
|
|
|
public ByteBuffer getCodaBuffer() {
|
|
@@ -121,10 +138,10 @@ public class TieredFileSegmentInputStream extends InputStream {
|
|
readPosition++;
|
|
if (readPosInCurBuffer >= curBuffer.remaining()) {
|
|
curReadBufferIndex++;
|
|
- if (curReadBufferIndex >= uploadBufferList.size()) {
|
|
+ if (curReadBufferIndex >= bufferList.size()) {
|
|
return -1;
|
|
}
|
|
- curBuffer = uploadBufferList.get(curReadBufferIndex);
|
|
+ curBuffer = bufferList.get(curReadBufferIndex);
|
|
readPosInCurBuffer = 0;
|
|
}
|
|
return curBuffer.get(readPosInCurBuffer++) & 0xff;
|
|
@@ -153,8 +170,8 @@ public class TieredFileSegmentInputStream extends InputStream {
|
|
int bufIndex = curReadBufferIndex;
|
|
int posInCurBuffer = readPosInCurBuffer;
|
|
ByteBuffer curBuf = curBuffer;
|
|
- while (needRead > 0 && bufIndex < uploadBufferList.size()) {
|
|
- curBuf = uploadBufferList.get(bufIndex);
|
|
+ while (needRead > 0 && bufIndex < bufferList.size()) {
|
|
+ curBuf = bufferList.get(bufIndex);
|
|
int remaining = curBuf.remaining() - posInCurBuffer;
|
|
int readLen = Math.min(remaining, needRead);
|
|
// read from curBuf
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java
|
|
similarity index 54%
|
|
rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
|
|
rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java
|
|
index d0c983fd4..a90baff3a 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java
|
|
@@ -15,30 +15,34 @@
|
|
* limitations under the License.
|
|
*/
|
|
|
|
-package org.apache.rocketmq.tieredstore.provider.inputstream;
|
|
+package org.apache.rocketmq.tieredstore.provider.stream;
|
|
|
|
import java.nio.ByteBuffer;
|
|
import java.util.List;
|
|
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
|
|
|
-public class TieredFileSegmentInputStreamFactory {
|
|
+public class FileSegmentInputStreamFactory {
|
|
|
|
- public static TieredFileSegmentInputStream build(FileSegmentType fileType,
|
|
- long startOffset, List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) {
|
|
+ public static FileSegmentInputStream build(
|
|
+ FileSegmentType fileType, long offset, List<ByteBuffer> bufferList, ByteBuffer byteBuffer, int length) {
|
|
+
|
|
+ if (bufferList == null) {
|
|
+ throw new IllegalArgumentException("bufferList is null");
|
|
+ }
|
|
|
|
switch (fileType) {
|
|
case COMMIT_LOG:
|
|
- return new TieredCommitLogInputStream(
|
|
- fileType, startOffset, uploadBufferList, codaBuffer, contentLength);
|
|
+ return new CommitLogInputStream(
|
|
+ fileType, offset, bufferList, byteBuffer, length);
|
|
case CONSUME_QUEUE:
|
|
- return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength);
|
|
+ return new FileSegmentInputStream(fileType, bufferList, length);
|
|
case INDEX:
|
|
- if (uploadBufferList.size() != 1) {
|
|
- throw new IllegalArgumentException("uploadBufferList size in INDEX type input stream must be 1");
|
|
+ if (bufferList.size() != 1) {
|
|
+ throw new IllegalArgumentException("buffer block size must be 1 when file type is IndexFile");
|
|
}
|
|
- return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength);
|
|
+ return new FileSegmentInputStream(fileType, bufferList, length);
|
|
default:
|
|
- throw new IllegalArgumentException("fileType is not supported");
|
|
+ throw new IllegalArgumentException("file type is not supported");
|
|
}
|
|
}
|
|
}
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
|
|
index 8601392e7..2451199c2 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
|
|
@@ -130,36 +130,36 @@ public class TieredMessageStoreTest {
|
|
// TieredStorageLevel.DISABLE
|
|
properties.setProperty("tieredStorageLevel", "0");
|
|
configuration.update(properties);
|
|
- Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
|
|
+ Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
|
|
|
|
// TieredStorageLevel.NOT_IN_DISK
|
|
properties.setProperty("tieredStorageLevel", "1");
|
|
configuration.update(properties);
|
|
when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(false);
|
|
- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
|
|
+ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
|
|
|
|
when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true);
|
|
- Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
|
|
+ Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
|
|
|
|
// TieredStorageLevel.NOT_IN_MEM
|
|
properties.setProperty("tieredStorageLevel", "2");
|
|
configuration.update(properties);
|
|
Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(false);
|
|
Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(true);
|
|
- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
|
|
+ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
|
|
|
|
Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true);
|
|
Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(false);
|
|
- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
|
|
+ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
|
|
|
|
Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true);
|
|
Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(true);
|
|
- Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
|
|
+ Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
|
|
|
|
// TieredStorageLevel.FORCE
|
|
properties.setProperty("tieredStorageLevel", "3");
|
|
configuration.update(properties);
|
|
- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
|
|
+ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
|
|
}
|
|
|
|
@Test
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
|
|
index cc39cfbfc..7a4d05969 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
|
|
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
|
|
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
|
|
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
|
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
|
|
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
|
|
import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata;
|
|
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
|
|
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
|
|
@@ -55,6 +56,7 @@ public class TieredFlatFileTest {
|
|
public void tearDown() throws IOException {
|
|
TieredStoreTestUtil.destroyMetadataStore();
|
|
TieredStoreTestUtil.destroyTempDir(storePath);
|
|
+ TieredStoreExecutor.shutdown();
|
|
}
|
|
|
|
private List<FileSegmentMetadata> getSegmentMetadataList(TieredMetadataStore metadataStore) {
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
|
|
index 262d6645b..2da72bc7a 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
|
|
@@ -87,5 +87,7 @@ public class TieredIndexFileTest {
|
|
|
|
indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
|
|
Assert.assertEquals(1, indexList.size());
|
|
+
|
|
+ indexFile.destroy();
|
|
}
|
|
}
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java
|
|
similarity index 82%
|
|
rename from tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java
|
|
rename to tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java
|
|
index a6566b7de..3bbe41dd4 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java
|
|
@@ -20,13 +20,13 @@ package org.apache.rocketmq.tieredstore.provider;
|
|
import java.io.InputStream;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.List;
|
|
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
|
|
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
|
|
|
|
-public class MockTieredFileSegmentInputStream extends TieredFileSegmentInputStream {
|
|
+public class MockFileSegmentInputStream extends FileSegmentInputStream {
|
|
|
|
private final InputStream inputStream;
|
|
|
|
- public MockTieredFileSegmentInputStream(InputStream inputStream) {
|
|
+ public MockFileSegmentInputStream(InputStream inputStream) {
|
|
super(null, null, Integer.MAX_VALUE);
|
|
this.inputStream = inputStream;
|
|
}
|
|
@@ -43,7 +43,7 @@ public class MockTieredFileSegmentInputStream extends TieredFileSegmentInputStre
|
|
}
|
|
|
|
@Override
|
|
- public List<ByteBuffer> getUploadBufferList() {
|
|
+ public List<ByteBuffer> getBufferList() {
|
|
return null;
|
|
}
|
|
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
|
|
index a2554ba3d..743d9182c 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
|
|
@@ -28,8 +28,8 @@ import java.util.Random;
|
|
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
|
import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
|
|
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
|
|
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
|
|
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
|
|
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
|
|
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory;
|
|
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
|
|
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
|
|
import org.junit.Assert;
|
|
@@ -57,7 +57,7 @@ public class TieredFileSegmentInputStreamTest {
|
|
bufferSize += byteBuffer.remaining();
|
|
}
|
|
|
|
- // build expected byte buffer for verifying the TieredFileSegmentInputStream
|
|
+ // build expected byte buffer for verifying the FileSegmentInputStream
|
|
ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
|
|
for (ByteBuffer byteBuffer : uploadBufferList) {
|
|
expectedByteBuffer.put(byteBuffer);
|
|
@@ -74,7 +74,7 @@ public class TieredFileSegmentInputStreamTest {
|
|
int[] batchReadSizeTestSet = {
|
|
MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1, MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1, MSG_LEN - 1, MSG_LEN, MSG_LEN + 1
|
|
};
|
|
- verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
|
|
+ verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build(
|
|
FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), finalBufferSize, batchReadSizeTestSet);
|
|
|
|
}
|
|
@@ -98,7 +98,7 @@ public class TieredFileSegmentInputStreamTest {
|
|
int codaBufferSize = codaBuffer.remaining();
|
|
bufferSize += codaBufferSize;
|
|
|
|
- // build expected byte buffer for verifying the TieredFileSegmentInputStream
|
|
+ // build expected byte buffer for verifying the FileSegmentInputStream
|
|
ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
|
|
for (ByteBuffer byteBuffer : uploadBufferList) {
|
|
expectedByteBuffer.put(byteBuffer);
|
|
@@ -119,7 +119,7 @@ public class TieredFileSegmentInputStreamTest {
|
|
MSG_LEN - 1, MSG_LEN, MSG_LEN + 1,
|
|
bufferSize - 1, bufferSize, bufferSize + 1
|
|
};
|
|
- verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
|
|
+ verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build(
|
|
FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, codaBuffer, finalBufferSize), finalBufferSize, batchReadSizeTestSet);
|
|
|
|
}
|
|
@@ -134,7 +134,7 @@ public class TieredFileSegmentInputStreamTest {
|
|
bufferSize += byteBuffer.remaining();
|
|
}
|
|
|
|
- // build expected byte buffer for verifying the TieredFileSegmentInputStream
|
|
+ // build expected byte buffer for verifying the FileSegmentInputStream
|
|
ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
|
|
for (ByteBuffer byteBuffer : uploadBufferList) {
|
|
expectedByteBuffer.put(byteBuffer);
|
|
@@ -143,7 +143,7 @@ public class TieredFileSegmentInputStreamTest {
|
|
|
|
int finalBufferSize = bufferSize;
|
|
int[] batchReadSizeTestSet = {TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE - 1, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE + 1};
|
|
- verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
|
|
+ verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build(
|
|
FileSegmentType.CONSUME_QUEUE, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), bufferSize, batchReadSizeTestSet);
|
|
}
|
|
|
|
@@ -156,16 +156,16 @@ public class TieredFileSegmentInputStreamTest {
|
|
byteBuffer.flip();
|
|
List<ByteBuffer> uploadBufferList = Arrays.asList(byteBuffer);
|
|
|
|
- // build expected byte buffer for verifying the TieredFileSegmentInputStream
|
|
+ // build expected byte buffer for verifying the FileSegmentInputStream
|
|
ByteBuffer expectedByteBuffer = byteBuffer.slice();
|
|
|
|
- verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
|
|
+ verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build(
|
|
FileSegmentType.INDEX, COMMIT_LOG_START_OFFSET, uploadBufferList, null, byteBuffer.limit()), byteBuffer.limit(), new int[] {23, 24, 25});
|
|
}
|
|
|
|
- private void verifyReadAndReset(ByteBuffer expectedByteBuffer, Supplier<TieredFileSegmentInputStream> constructor,
|
|
+ private void verifyReadAndReset(ByteBuffer expectedByteBuffer, Supplier<FileSegmentInputStream> constructor,
|
|
int bufferSize, int[] readBatchSizeTestSet) {
|
|
- TieredFileSegmentInputStream inputStream = constructor.get();
|
|
+ FileSegmentInputStream inputStream = constructor.get();
|
|
|
|
// verify
|
|
verifyInputStream(inputStream, expectedByteBuffer);
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
|
|
index 4cd83e0d2..a655710a5 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
|
|
@@ -116,13 +116,22 @@ public class TieredFileSegmentTest {
|
|
}
|
|
|
|
@Test
|
|
- public void testCommitFailed() {
|
|
+ public void testCommitFailedThenSuccess() {
|
|
long startTime = System.currentTimeMillis();
|
|
MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(FileSegmentType.COMMIT_LOG);
|
|
long lastSize = segment.getSize();
|
|
- segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
|
|
- segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
|
|
-
|
|
+ segment.setCheckSize(false);
|
|
+ segment.initPosition(lastSize);
|
|
+ segment.setSize((int) lastSize);
|
|
+
|
|
+ ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
|
|
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize);
|
|
+ ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
|
|
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN);
|
|
+ segment.append(buffer1, 0);
|
|
+ segment.append(buffer2, 0);
|
|
+
|
|
+ // Mock new message arrive
|
|
segment.blocker = new CompletableFuture<>();
|
|
new Thread(() -> {
|
|
try {
|
|
@@ -131,20 +140,88 @@ public class TieredFileSegmentTest {
|
|
Assert.fail(e.getMessage());
|
|
}
|
|
ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
|
|
+ buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN * 2);
|
|
buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime);
|
|
segment.append(buffer, 0);
|
|
segment.blocker.complete(false);
|
|
}).start();
|
|
|
|
+ // Commit failed
|
|
segment.commit();
|
|
segment.blocker.join();
|
|
+ segment.blocker = null;
|
|
+
|
|
+ // Copy data and assume commit success
|
|
+ segment.getMemStore().put(buffer1);
|
|
+ segment.getMemStore().put(buffer2);
|
|
+ segment.setSize((int) (lastSize + MessageBufferUtilTest.MSG_LEN * 2));
|
|
|
|
- segment.blocker = new CompletableFuture<>();
|
|
- segment.blocker.complete(true);
|
|
segment.commit();
|
|
+ Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitPosition());
|
|
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset());
|
|
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
|
|
+
|
|
+ ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN);
|
|
+ Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1));
|
|
+
|
|
+ ByteBuffer msg2 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN);
|
|
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtil.getCommitLogOffset(msg2));
|
|
+
|
|
+ ByteBuffer msg3 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN);
|
|
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtil.getCommitLogOffset(msg3));
|
|
+ }
|
|
+
|
|
+ @Test
|
|
+ public void testCommitFailed3Times() {
|
|
+ long startTime = System.currentTimeMillis();
|
|
+ MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(FileSegmentType.COMMIT_LOG);
|
|
+ long lastSize = segment.getSize();
|
|
+ segment.setCheckSize(false);
|
|
+ segment.initPosition(lastSize);
|
|
+ segment.setSize((int) lastSize);
|
|
+
|
|
+ ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
|
|
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize);
|
|
+ ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
|
|
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN);
|
|
+ segment.append(buffer1, 0);
|
|
+ segment.append(buffer2, 0);
|
|
+
|
|
+ // Mock new message arrive
|
|
+ segment.blocker = new CompletableFuture<>();
|
|
+ new Thread(() -> {
|
|
+ try {
|
|
+ Thread.sleep(3000);
|
|
+ } catch (InterruptedException e) {
|
|
+ Assert.fail(e.getMessage());
|
|
+ }
|
|
+ ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
|
|
+ buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN * 2);
|
|
+ buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime);
|
|
+ segment.append(buffer, 0);
|
|
+ segment.blocker.complete(false);
|
|
+ }).start();
|
|
+
|
|
+ for (int i = 0; i < 3; i++) {
|
|
+ segment.commit();
|
|
+ }
|
|
+
|
|
+ Assert.assertEquals(lastSize, segment.getCommitPosition());
|
|
+ Assert.assertEquals(baseOffset + lastSize, segment.getCommitOffset());
|
|
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
|
|
+
|
|
+ segment.blocker.join();
|
|
+ segment.blocker = null;
|
|
|
|
+ segment.commit();
|
|
+ Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 2, segment.getCommitPosition());
|
|
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, segment.getCommitOffset());
|
|
Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
|
|
+
|
|
+ segment.commit();
|
|
+ Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitPosition());
|
|
Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset());
|
|
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
|
|
|
|
ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN);
|
|
Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1));
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
|
|
index cb155cf8f..80ad41f68 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
|
|
@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
|
|
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
|
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
|
|
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
|
|
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
|
|
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
|
|
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
import org.junit.Assert;
|
|
|
|
@@ -33,6 +33,8 @@ public class MemoryFileSegment extends TieredFileSegment {
|
|
|
|
public CompletableFuture<Boolean> blocker;
|
|
|
|
+ protected int size = 0;
|
|
+
|
|
protected boolean checkSize = true;
|
|
|
|
public MemoryFileSegment(FileSegmentType fileType, MessageQueue messageQueue, long baseOffset,
|
|
@@ -56,6 +58,18 @@ public class MemoryFileSegment extends TieredFileSegment {
|
|
memStore.position((int) getSize());
|
|
}
|
|
|
|
+ public boolean isCheckSize() {
|
|
+ return checkSize;
|
|
+ }
|
|
+
|
|
+ public void setCheckSize(boolean checkSize) {
|
|
+ this.checkSize = checkSize;
|
|
+ }
|
|
+
|
|
+ public ByteBuffer getMemStore() {
|
|
+ return memStore;
|
|
+ }
|
|
+
|
|
@Override
|
|
public String getPath() {
|
|
return filePath;
|
|
@@ -66,7 +80,11 @@ public class MemoryFileSegment extends TieredFileSegment {
|
|
if (checkSize) {
|
|
return 1000;
|
|
}
|
|
- return 0;
|
|
+ return size;
|
|
+ }
|
|
+
|
|
+ public void setSize(int size) {
|
|
+ this.size = size;
|
|
}
|
|
|
|
@Override
|
|
@@ -85,11 +103,11 @@ public class MemoryFileSegment extends TieredFileSegment {
|
|
|
|
@Override
|
|
public CompletableFuture<Boolean> commit0(
|
|
- TieredFileSegmentInputStream inputStream, long position, int length, boolean append) {
|
|
+ FileSegmentInputStream inputStream, long position, int length, boolean append) {
|
|
|
|
try {
|
|
if (blocker != null && !blocker.get()) {
|
|
- throw new IllegalStateException();
|
|
+ throw new IllegalStateException("Commit Exception for Memory Test");
|
|
}
|
|
} catch (InterruptedException | ExecutionException e) {
|
|
Assert.fail(e.getMessage());
|
|
@@ -98,7 +116,6 @@ public class MemoryFileSegment extends TieredFileSegment {
|
|
Assert.assertTrue(!checkSize || position >= getSize());
|
|
|
|
byte[] buffer = new byte[1024];
|
|
-
|
|
int startPos = memStore.position();
|
|
try {
|
|
int len;
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
|
|
index 8ac330b37..630fd2223 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
|
|
@@ -22,7 +22,7 @@ import java.util.concurrent.ExecutionException;
|
|
import org.apache.rocketmq.common.message.MessageQueue;
|
|
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
|
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
|
|
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
|
|
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
|
|
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
import org.junit.Assert;
|
|
|
|
@@ -46,7 +46,7 @@ public class MemoryFileSegmentWithoutCheck extends MemoryFileSegment {
|
|
}
|
|
|
|
@Override
|
|
- public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
|
|
+ public CompletableFuture<Boolean> commit0(FileSegmentInputStream inputStream, long position, int length,
|
|
boolean append) {
|
|
try {
|
|
if (blocker != null && !blocker.get()) {
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From d000ef947d7c99918ceba0fa451c1e29fd84ba07 Mon Sep 17 00:00:00 2001
|
|
From: yuz10 <845238369@qq.com>
|
|
Date: Thu, 31 Aug 2023 09:41:33 +0800
|
|
Subject: [PATCH 3/7] [ISSUE #7283] Incorrect dledger commitlog min offset
|
|
after mappedFile re delete failed (#7284)
|
|
|
|
---
|
|
.../apache/rocketmq/store/dledger/DLedgerCommitLog.java | 7 ++++++-
|
|
1 file changed, 6 insertions(+), 1 deletion(-)
|
|
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
|
|
index ec5e86d70..d5f6acdc0 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
|
|
@@ -162,7 +162,12 @@ public class DLedgerCommitLog extends CommitLog {
|
|
if (!mappedFileQueue.getMappedFiles().isEmpty()) {
|
|
return mappedFileQueue.getMinOffset();
|
|
}
|
|
- return dLedgerFileList.getMinOffset();
|
|
+ for (MmapFile file : dLedgerFileList.getMappedFiles()) {
|
|
+ if (file.isAvailable()) {
|
|
+ return file.getFileFromOffset() + file.getStartPosition();
|
|
+ }
|
|
+ }
|
|
+ return 0;
|
|
}
|
|
|
|
@Override
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From f82718ae3b77a16b553c03f672dc971a2d5d48fa Mon Sep 17 00:00:00 2001
|
|
From: cnScarb <jjhfen00@163.com>
|
|
Date: Thu, 31 Aug 2023 15:50:10 +0800
|
|
Subject: [PATCH 4/7] [ISSUE #7208] fix: when deleting topic also delete its
|
|
pop retry topic (#7209)
|
|
|
|
---
|
|
.../processor/AdminBrokerProcessor.java | 24 ++++++++++---
|
|
.../processor/AdminBrokerProcessorTest.java | 36 +++++++++++++++++++
|
|
2 files changed, 55 insertions(+), 5 deletions(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
|
index bbddcec2d..8fbcd3c94 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
|
@@ -51,6 +51,7 @@ import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
|
|
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
|
|
import org.apache.rocketmq.common.AclConfig;
|
|
import org.apache.rocketmq.common.BrokerConfig;
|
|
+import org.apache.rocketmq.common.KeyBuilder;
|
|
import org.apache.rocketmq.common.LockCallback;
|
|
import org.apache.rocketmq.common.MQVersion;
|
|
import org.apache.rocketmq.common.MixAll;
|
|
@@ -542,16 +543,29 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
|
}
|
|
}
|
|
|
|
- this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
|
|
- this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic());
|
|
- this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic());
|
|
- this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic());
|
|
- this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic()));
|
|
+ final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
|
|
+ // delete pop retry topics first
|
|
+ for (String group : groups) {
|
|
+ final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
|
|
+ if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) {
|
|
+ deleteTopicInBroker(popRetryTopic);
|
|
+ }
|
|
+ }
|
|
+ // delete topic
|
|
+ deleteTopicInBroker(topic);
|
|
response.setCode(ResponseCode.SUCCESS);
|
|
response.setRemark(null);
|
|
return response;
|
|
}
|
|
|
|
+ private void deleteTopicInBroker(String topic) {
|
|
+ this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
|
|
+ this.brokerController.getTopicQueueMappingManager().delete(topic);
|
|
+ this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic);
|
|
+ this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic);
|
|
+ this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic));
|
|
+ }
|
|
+
|
|
private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx,
|
|
RemotingCommand request) throws RemotingCommandException {
|
|
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
|
|
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
|
index d33a217f7..9d17011b6 100644
|
|
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
|
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
|
@@ -29,6 +29,7 @@ import java.util.HashMap;
|
|
import java.util.Map;
|
|
import java.util.Properties;
|
|
import java.util.Set;
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.atomic.LongAdder;
|
|
import org.apache.rocketmq.broker.BrokerController;
|
|
@@ -41,6 +42,7 @@ import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
|
|
import org.apache.rocketmq.broker.topic.TopicConfigManager;
|
|
import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.common.BrokerConfig;
|
|
+import org.apache.rocketmq.common.KeyBuilder;
|
|
import org.apache.rocketmq.common.MixAll;
|
|
import org.apache.rocketmq.common.TopicConfig;
|
|
import org.apache.rocketmq.common.TopicFilterType;
|
|
@@ -90,8 +92,11 @@ import static org.assertj.core.api.Assertions.assertThat;
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
import static org.mockito.ArgumentMatchers.anyInt;
|
|
import static org.mockito.ArgumentMatchers.anyLong;
|
|
+import static org.mockito.ArgumentMatchers.anySet;
|
|
import static org.mockito.ArgumentMatchers.anyString;
|
|
import static org.mockito.Mockito.mock;
|
|
+import static org.mockito.Mockito.times;
|
|
+import static org.mockito.Mockito.verify;
|
|
import static org.mockito.Mockito.when;
|
|
|
|
@RunWith(MockitoJUnitRunner.class)
|
|
@@ -321,6 +326,37 @@ public class AdminBrokerProcessorTest {
|
|
"please execute it from master broker.");
|
|
}
|
|
|
|
+ @Test
|
|
+ public void testDeleteWithPopRetryTopic() throws Exception {
|
|
+ String topic = "topicA";
|
|
+ String anotherTopic = "another_topicA";
|
|
+
|
|
+ topicConfigManager = mock(TopicConfigManager.class);
|
|
+ when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
|
|
+ final ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
|
|
+ topicConfigTable.put(topic, new TopicConfig());
|
|
+ topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1"), new TopicConfig());
|
|
+
|
|
+ topicConfigTable.put(anotherTopic, new TopicConfig());
|
|
+ topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, "cid2"), new TopicConfig());
|
|
+ when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable);
|
|
+ when(topicConfigManager.selectTopicConfig(anyString())).thenAnswer(invocation -> {
|
|
+ final String selectTopic = invocation.getArgument(0);
|
|
+ return topicConfigManager.getTopicConfigTable().get(selectTopic);
|
|
+ });
|
|
+
|
|
+ when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
|
|
+ when(consumerOffsetManager.whichGroupByTopic(topic)).thenReturn(Sets.newHashSet("cid1"));
|
|
+
|
|
+ RemotingCommand request = buildDeleteTopicRequest(topic);
|
|
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
|
|
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
|
|
+
|
|
+ verify(topicConfigManager).deleteTopicConfig(topic);
|
|
+ verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic, "cid1"));
|
|
+ verify(messageStore, times(2)).deleteTopics(anySet());
|
|
+ }
|
|
+
|
|
@Test
|
|
public void testGetAllTopicConfigInRocksdb() throws Exception {
|
|
if (notToBeExecuted()) {
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 31d10385d1616445478104ce9ef463a8c4852ba2 Mon Sep 17 00:00:00 2001
|
|
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
|
|
Date: Mon, 4 Sep 2023 14:09:32 +0800
|
|
Subject: [PATCH 5/7] [ISSUE #7289] Fixed asynchronous send backpressure
|
|
capability
|
|
|
|
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
|
|
---
|
|
.../impl/producer/DefaultMQProducerImpl.java | 77 +++++++++++++------
|
|
1 file changed, 53 insertions(+), 24 deletions(-)
|
|
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
|
index bbbb17b07..2d6b83ac2 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
|
|
@@ -547,6 +547,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
@Deprecated
|
|
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
|
|
throws MQClientException, RemotingException, InterruptedException {
|
|
+ BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback);
|
|
+
|
|
final long beginStartTime = System.currentTimeMillis();
|
|
Runnable runnable = new Runnable() {
|
|
@Override
|
|
@@ -554,20 +556,53 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
long costTime = System.currentTimeMillis() - beginStartTime;
|
|
if (timeout > costTime) {
|
|
try {
|
|
- sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
|
|
+ sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime);
|
|
} catch (Exception e) {
|
|
- sendCallback.onException(e);
|
|
+ newCallBack.onException(e);
|
|
}
|
|
} else {
|
|
- sendCallback.onException(
|
|
+ newCallBack.onException(
|
|
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
|
|
}
|
|
}
|
|
};
|
|
- executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime);
|
|
+ executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
|
|
}
|
|
|
|
- public void executeAsyncMessageSend(Runnable runnable, final Message msg, final SendCallback sendCallback,
|
|
+ class BackpressureSendCallBack implements SendCallback {
|
|
+ public boolean isSemaphoreAsyncSizeAquired = false;
|
|
+ public boolean isSemaphoreAsyncNumAquired = false;
|
|
+ public int msgLen;
|
|
+ private final SendCallback sendCallback;
|
|
+
|
|
+ public BackpressureSendCallBack(final SendCallback sendCallback) {
|
|
+ this.sendCallback = sendCallback;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void onSuccess(SendResult sendResult) {
|
|
+ if (isSemaphoreAsyncSizeAquired) {
|
|
+ semaphoreAsyncSendSize.release(msgLen);
|
|
+ }
|
|
+ if (isSemaphoreAsyncNumAquired) {
|
|
+ semaphoreAsyncSendNum.release();
|
|
+ }
|
|
+ sendCallback.onSuccess(sendResult);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void onException(Throwable e) {
|
|
+ if (isSemaphoreAsyncSizeAquired) {
|
|
+ semaphoreAsyncSendSize.release(msgLen);
|
|
+ }
|
|
+ if (isSemaphoreAsyncNumAquired) {
|
|
+ semaphoreAsyncSendNum.release();
|
|
+ }
|
|
+ sendCallback.onException(e);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public void executeAsyncMessageSend(Runnable runnable, final Message msg, final BackpressureSendCallBack sendCallback,
|
|
final long timeout, final long beginStartTime)
|
|
throws MQClientException, InterruptedException {
|
|
ExecutorService executor = this.getAsyncSenderExecutor();
|
|
@@ -595,7 +630,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
return;
|
|
}
|
|
}
|
|
-
|
|
+ sendCallback.isSemaphoreAsyncSizeAquired = isSemaphoreAsyncSizeAquired;
|
|
+ sendCallback.isSemaphoreAsyncNumAquired = isSemaphoreAsyncNumAquired;
|
|
+ sendCallback.msgLen = msgLen;
|
|
executor.submit(runnable);
|
|
} catch (RejectedExecutionException e) {
|
|
if (isEnableBackpressureForAsyncMode) {
|
|
@@ -603,15 +640,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
} else {
|
|
throw new MQClientException("executor rejected ", e);
|
|
}
|
|
- } finally {
|
|
- if (isSemaphoreAsyncSizeAquired) {
|
|
- semaphoreAsyncSendSize.release(msgLen);
|
|
- }
|
|
- if (isSemaphoreAsyncNumAquired) {
|
|
- semaphoreAsyncSendNum.release();
|
|
- }
|
|
}
|
|
-
|
|
}
|
|
|
|
public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg,
|
|
@@ -1188,7 +1217,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
@Deprecated
|
|
public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
|
|
throws MQClientException, RemotingException, InterruptedException {
|
|
-
|
|
+ BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback);
|
|
final long beginStartTime = System.currentTimeMillis();
|
|
Runnable runnable = new Runnable() {
|
|
@Override
|
|
@@ -1203,22 +1232,22 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
long costTime = System.currentTimeMillis() - beginStartTime;
|
|
if (timeout > costTime) {
|
|
try {
|
|
- sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null,
|
|
+ sendKernelImpl(msg, mq, CommunicationMode.ASYNC, newCallBack, null,
|
|
timeout - costTime);
|
|
} catch (MQBrokerException e) {
|
|
throw new MQClientException("unknown exception", e);
|
|
}
|
|
} else {
|
|
- sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
|
|
+ newCallBack.onException(new RemotingTooMuchRequestException("call timeout"));
|
|
}
|
|
} catch (Exception e) {
|
|
- sendCallback.onException(e);
|
|
+ newCallBack.onException(e);
|
|
}
|
|
}
|
|
|
|
};
|
|
|
|
- executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime);
|
|
+ executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
|
|
}
|
|
|
|
/**
|
|
@@ -1315,7 +1344,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
|
|
final SendCallback sendCallback, final long timeout)
|
|
throws MQClientException, RemotingException, InterruptedException {
|
|
-
|
|
+ BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback);
|
|
final long beginStartTime = System.currentTimeMillis();
|
|
Runnable runnable = new Runnable() {
|
|
@Override
|
|
@@ -1324,21 +1353,21 @@ public class DefaultMQProducerImpl implements MQProducerInner {
|
|
if (timeout > costTime) {
|
|
try {
|
|
try {
|
|
- sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
|
|
+ sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, newCallBack,
|
|
timeout - costTime);
|
|
} catch (MQBrokerException e) {
|
|
throw new MQClientException("unknown exception", e);
|
|
}
|
|
} catch (Exception e) {
|
|
- sendCallback.onException(e);
|
|
+ newCallBack.onException(e);
|
|
}
|
|
} else {
|
|
- sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
|
|
+ newCallBack.onException(new RemotingTooMuchRequestException("call timeout"));
|
|
}
|
|
}
|
|
|
|
};
|
|
- executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime);
|
|
+ executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
|
|
}
|
|
|
|
/**
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From d67b9d64cbd53824798af57ba18770e0fcefa37a Mon Sep 17 00:00:00 2001
|
|
From: yuz10 <845238369@qq.com>
|
|
Date: Wed, 6 Sep 2023 14:07:23 +0800
|
|
Subject: [PATCH 6/7] [ISSUE #7302] Fix singleTopicRegister code deleted in
|
|
merge
|
|
|
|
---
|
|
.../apache/rocketmq/broker/topic/TopicConfigManager.java | 6 +++++-
|
|
1 file changed, 5 insertions(+), 1 deletion(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
|
index 1c3b9711f..4e3c1736c 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
|
@@ -330,7 +330,11 @@ public class TopicConfigManager extends ConfigManager {
|
|
log.error("createTopicIfAbsent ", e);
|
|
}
|
|
if (createNew && register) {
|
|
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
+ this.brokerController.registerSingleTopicAll(topicConfig);
|
|
+ } else {
|
|
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
+ }
|
|
}
|
|
return getTopicConfig(topicConfig.getTopicName());
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 37017dbaec5c521fd529ef4aecf3658092884f84 Mon Sep 17 00:00:00 2001
|
|
From: lizhimins <707364882@qq.com>
|
|
Date: Wed, 6 Sep 2023 15:23:15 +0800
|
|
Subject: [PATCH 7/7] [ISSUE #7305] Fix metrics and transactional module not
|
|
shutdown while broker offline cause coredump(#7307)
|
|
|
|
---
|
|
.../java/org/apache/rocketmq/broker/BrokerController.java | 8 ++++++++
|
|
.../queue/TransactionalMessageServiceImpl.java | 4 +++-
|
|
2 files changed, 11 insertions(+), 1 deletion(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
index e8f943702..6aba70cb2 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
@@ -1302,6 +1302,10 @@ public class BrokerController {
|
|
this.fastRemotingServer.shutdown();
|
|
}
|
|
|
|
+ if (this.brokerMetricsManager != null) {
|
|
+ this.brokerMetricsManager.shutdown();
|
|
+ }
|
|
+
|
|
if (this.brokerStatsManager != null) {
|
|
this.brokerStatsManager.shutdown();
|
|
}
|
|
@@ -1324,6 +1328,10 @@ public class BrokerController {
|
|
this.ackMessageProcessor.shutdownPopReviveService();
|
|
}
|
|
|
|
+ if (this.transactionalMessageService != null) {
|
|
+ this.transactionalMessageService.close();
|
|
+ }
|
|
+
|
|
if (this.notificationProcessor != null) {
|
|
this.notificationProcessor.getPopLongPollingService().shutdown();
|
|
}
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
|
|
index 93fa725a9..48db828e0 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
|
|
@@ -629,7 +629,9 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
|
|
|
|
@Override
|
|
public void close() {
|
|
-
|
|
+ if (this.transactionalOpBatchService != null) {
|
|
+ this.transactionalOpBatchService.shutdown();
|
|
+ }
|
|
}
|
|
|
|
public Message getOpMessage(int queueId, String moreData) {
|
|
--
|
|
2.32.0.windows.2
|
|
|