rocketmq/patch015-backport-fix-some-bugs.patch
2023-10-30 20:47:50 +08:00

1895 lines
96 KiB
Diff

From bd0e9c09db9748f7f74a0c707579142dccf30afc Mon Sep 17 00:00:00 2001
From: PiteXChen <44110731+RapperCL@users.noreply.github.com>
Date: Tue, 29 Aug 2023 19:39:27 +0800
Subject: [PATCH 1/7] [ISSUE #7111] Remove responseFuture from the
responseTable when exception occurs (#7112)
* remove responseFuture when exception
* Empty-Commit
---------
Co-authored-by: chenyong152 <chenyong152@midea.com>
---
.../apache/rocketmq/remoting/netty/NettyRemotingAbstract.java | 1 +
1 file changed, 1 insertion(+)
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 44d6a3df4..fce2de267 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -529,6 +529,7 @@ public abstract class NettyRemotingAbstract {
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
});
} catch (Exception e) {
+ responseTable.remove(opaque);
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
--
2.32.0.windows.2
From c78061bf6ca5f35452510ec4107c46735c51c316 Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Wed, 30 Aug 2023 22:29:51 +0800
Subject: [PATCH 2/7] [ISSUE#7280] Fix and refactor handle commit exception in
tiered storage (#7281)
* refactor handle commit exception
* refactor handle commit exception
* fix handle commit exception
---
.../tieredstore/TieredDispatcher.java | 3 +-
.../tieredstore/TieredMessageFetcher.java | 57 ++--
.../tieredstore/TieredMessageStore.java | 26 +-
.../provider/TieredFileSegment.java | 291 ++++++++++--------
.../provider/TieredStoreProvider.java | 8 +-
.../provider/posix/PosixFileSegment.java | 4 +-
.../CommitLogInputStream.java} | 30 +-
.../FileSegmentInputStream.java} | 49 ++-
.../FileSegmentInputStreamFactory.java} | 26 +-
.../tieredstore/TieredMessageStoreTest.java | 14 +-
.../tieredstore/file/TieredFlatFileTest.java | 2 +
.../tieredstore/file/TieredIndexFileTest.java | 2 +
...m.java => MockFileSegmentInputStream.java} | 8 +-
.../TieredFileSegmentInputStreamTest.java | 24 +-
.../provider/TieredFileSegmentTest.java | 89 +++++-
.../provider/memory/MemoryFileSegment.java | 27 +-
.../memory/MemoryFileSegmentWithoutCheck.java | 4 +-
17 files changed, 427 insertions(+), 237 deletions(-)
rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/{inputstream/TieredCommitLogInputStream.java => stream/CommitLogInputStream.java} (88%)
rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/{inputstream/TieredFileSegmentInputStream.java => stream/FileSegmentInputStream.java} (77%)
rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/{inputstream/TieredFileSegmentInputStreamFactory.java => stream/FileSegmentInputStreamFactory.java} (54%)
rename tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/{MockTieredFileSegmentInputStream.java => MockFileSegmentInputStream.java} (82%)
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 1746190cd..430c2b62e 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -318,8 +318,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
continue;
case FILE_CLOSED:
tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
- logger.info("TieredDispatcher#dispatchFlatFile: file has been close and destroy, " +
- "topic: {}, queueId: {}", topic, queueId);
+ logger.info("File has been closed and destroy, topic: {}, queueId: {}", topic, queueId);
return;
default:
dispatchOffset--;
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index 9a9a3e5a5..766ff64f6 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -273,15 +273,17 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
TieredStoreMetricsManager.cacheHit.add(resultWrapperList.size(), attributes);
}
- // if no cached message found and there is currently an inflight request, wait for the request to end before continuing
+ // If there are no messages in the cache and there are currently requests being pulled.
+ // We need to wait for the request to return before continuing.
if (resultWrapperList.isEmpty() && waitInflightRequest) {
- CompletableFuture<Long> future = flatFile.getInflightRequest(group, queueOffset, maxCount)
- .getFuture(queueOffset);
+ CompletableFuture<Long> future =
+ flatFile.getInflightRequest(group, queueOffset, maxCount).getFuture(queueOffset);
if (!future.isDone()) {
Stopwatch stopwatch = Stopwatch.createStarted();
// to prevent starvation issues, only allow waiting for inflight request once
return future.thenCompose(v -> {
- LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: wait for inflight request cost: {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: wait for response cost: {}ms",
+ stopwatch.elapsed(TimeUnit.MILLISECONDS));
return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, false);
});
}
@@ -302,7 +304,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
// if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests
if (!resultWrapperList.isEmpty()) {
- LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: cache hit: topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}",
+ LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " +
+ "topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}",
mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size());
prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
@@ -316,8 +319,10 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
}
// if cache is miss, immediately pull messages
- LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: topic: {}, queue: {}, queue offset: {}, max message num: {}",
+ LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " +
+ "topic: {}, queue: {}, queue offset: {}, max message num: {}",
mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
+
CompletableFuture<GetMessageResult> resultFuture;
synchronized (flatFile) {
int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
@@ -453,42 +458,42 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
public CompletableFuture<GetMessageResult> getMessageAsync(
String group, String topic, int queueId, long queueOffset, int maxCount, final MessageFilter messageFilter) {
+ GetMessageResult result = new GetMessageResult();
CompositeQueueFlatFile flatFile = flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, queueId));
+
if (flatFile == null) {
- GetMessageResult result = new GetMessageResult();
result.setNextBeginOffset(queueOffset);
result.setStatus(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE);
return CompletableFuture.completedFuture(result);
}
- GetMessageResult result = new GetMessageResult();
- long minQueueOffset = flatFile.getConsumeQueueMinOffset();
- long maxQueueOffset = flatFile.getConsumeQueueCommitOffset();
- result.setMinOffset(minQueueOffset);
- result.setMaxOffset(maxQueueOffset);
+ // Max queue offset means next message put position
+ result.setMinOffset(flatFile.getConsumeQueueMinOffset());
+ result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
+
+ // Fill result according file offset.
+ // Offset range | Result | Fix to
+ // (-oo, 0] | no message | current offset
+ // (0, min) | too small | min offset
+ // [min, max) | correct |
+ // [max, max] | overflow one | max offset
+ // (max, +oo) | overflow badly | max offset
- if (flatFile.getConsumeQueueCommitOffset() <= 0) {
+ if (result.getMaxOffset() <= 0) {
result.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
result.setNextBeginOffset(queueOffset);
return CompletableFuture.completedFuture(result);
- }
-
- // request range | result
- // (0, min) | too small
- // [min, max) | correct
- // [max, max] | overflow one
- // (max, +oo) | overflow badly
- if (queueOffset < minQueueOffset) {
+ } else if (queueOffset < result.getMinOffset()) {
result.setStatus(GetMessageStatus.OFFSET_TOO_SMALL);
- result.setNextBeginOffset(flatFile.getConsumeQueueMinOffset());
+ result.setNextBeginOffset(result.getMinOffset());
return CompletableFuture.completedFuture(result);
- } else if (queueOffset == maxQueueOffset) {
+ } else if (queueOffset == result.getMaxOffset()) {
result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
- result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset());
+ result.setNextBeginOffset(result.getMaxOffset());
return CompletableFuture.completedFuture(result);
- } else if (queueOffset > maxQueueOffset) {
+ } else if (queueOffset > result.getMaxOffset()) {
result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
- result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset());
+ result.setNextBeginOffset(result.getMaxOffset());
return CompletableFuture.completedFuture(result);
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 5240ac8e9..78e855f36 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -99,11 +99,11 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
return storeConfig;
}
- public boolean viaTieredStorage(String topic, int queueId, long offset) {
- return viaTieredStorage(topic, queueId, offset, 1);
+ public boolean fetchFromCurrentStore(String topic, int queueId, long offset) {
+ return fetchFromCurrentStore(topic, queueId, offset, 1);
}
- public boolean viaTieredStorage(String topic, int queueId, long offset, int batchSize) {
+ public boolean fetchFromCurrentStore(String topic, int queueId, long offset, int batchSize) {
TieredMessageStoreConfig.TieredStorageLevel deepStorageLevel = storeConfig.getTieredStorageLevel();
if (deepStorageLevel.check(TieredMessageStoreConfig.TieredStorageLevel.FORCE)) {
@@ -146,8 +146,10 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
public CompletableFuture<GetMessageResult> getMessageAsync(String group, String topic,
int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) {
- if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) {
- logger.trace("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset);
+ if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
+ logger.trace("GetMessageAsync from current store, topic: {}, queue: {}, offset: {}", topic, queueId, offset);
+ } else {
+ logger.trace("GetMessageAsync from next store, topic: {}, queue: {}, offset: {}", topic, queueId, offset);
return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
}
@@ -168,14 +170,14 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
if (next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
TieredStoreMetricsManager.fallbackTotal.add(1, latencyAttributes);
- logger.debug("GetMessageAsync not found then try back to next store, result: {}, " +
+ logger.debug("GetMessageAsync not found, then back to next store, result: {}, " +
"topic: {}, queue: {}, queue offset: {}, offset range: {}-{}",
result.getStatus(), topic, queueId, offset, result.getMinOffset(), result.getMaxOffset());
return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
}
}
- // system topic
+ // Fetch system topic data from the broker when using the force level.
if (result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) {
return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
@@ -198,7 +200,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
TieredStoreMetricsManager.messagesOutTotal.add(result.getMessageCount(), messagesOutAttributes);
}
- // fix min or max offset according next store
+ // Fix min or max offset according next store at last
long minOffsetInQueue = next.getMinOffsetInQueue(topic, queueId);
if (minOffsetInQueue >= 0 && minOffsetInQueue < result.getMinOffset()) {
result.setMinOffset(minOffsetInQueue);
@@ -209,7 +211,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
}
return result;
}).exceptionally(e -> {
- logger.error("GetMessageAsync from tiered store failed: ", e);
+ logger.error("GetMessageAsync from tiered store failed", e);
return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
});
}
@@ -251,7 +253,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
.build();
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
if (time < 0) {
- logger.debug("TieredMessageStore#getEarliestMessageTimeAsync: get earliest message time failed, try to get earliest message time from next store: topic: {}, queue: {}",
+ logger.debug("GetEarliestMessageTimeAsync failed, try to get earliest message time from next store: topic: {}, queue: {}",
topic, queueId);
return finalNextEarliestMessageTime != Long.MAX_VALUE ? finalNextEarliestMessageTime : -1;
}
@@ -262,7 +264,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
@Override
public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, int queueId,
long consumeQueueOffset) {
- if (viaTieredStorage(topic, queueId, consumeQueueOffset)) {
+ if (fetchFromCurrentStore(topic, queueId, consumeQueueOffset)) {
Stopwatch stopwatch = Stopwatch.createStarted();
return fetcher.getMessageStoreTimeStampAsync(topic, queueId, consumeQueueOffset)
.thenApply(time -> {
@@ -272,7 +274,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
.build();
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
if (time == -1) {
- logger.debug("TieredMessageStore#getMessageStoreTimeStampAsync: get message time failed, try to get message time from next store: topic: {}, queue: {}, queue offset: {}",
+ logger.debug("GetEarliestMessageTimeAsync failed, try to get message time from next store, topic: {}, queue: {}, queue offset: {}",
topic, queueId, consumeQueueOffset);
return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset);
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
index 5062c7d9e..32911a6e8 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
@@ -16,14 +16,11 @@
*/
package org.apache.rocketmq.tieredstore.provider;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -35,8 +32,8 @@ import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -50,22 +47,23 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
protected final TieredMessageStoreConfig storeConfig;
private final long maxSize;
- private final ReentrantLock bufferLock;
- private final Semaphore commitLock;
+ private final ReentrantLock bufferLock = new ReentrantLock();
+ private final Semaphore commitLock = new Semaphore(1);
- private volatile boolean full;
- private volatile boolean closed;
+ private volatile boolean full = false;
+ private volatile boolean closed = false;
- private volatile long minTimestamp;
- private volatile long maxTimestamp;
- private volatile long commitPosition;
- private volatile long appendPosition;
+ private volatile long minTimestamp = Long.MAX_VALUE;
+ private volatile long maxTimestamp = Long.MAX_VALUE;
+ private volatile long commitPosition = 0L;
+ private volatile long appendPosition = 0L;
// only used in commitLog
- private volatile long dispatchCommitOffset = 0;
+ private volatile long dispatchCommitOffset = 0L;
private ByteBuffer codaBuffer;
- private List<ByteBuffer> uploadBufferList = new ArrayList<>();
+ private List<ByteBuffer> bufferList = new ArrayList<>();
+ private FileSegmentInputStream fileSegmentInputStream;
private CompletableFuture<Boolean> flightCommitRequest = CompletableFuture.completedFuture(false);
public TieredFileSegment(TieredMessageStoreConfig storeConfig,
@@ -75,21 +73,13 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
this.fileType = fileType;
this.filePath = filePath;
this.baseOffset = baseOffset;
-
- this.closed = false;
- this.bufferLock = new ReentrantLock();
- this.commitLock = new Semaphore(1);
-
- this.commitPosition = 0L;
- this.appendPosition = 0L;
- this.minTimestamp = Long.MAX_VALUE;
- this.maxTimestamp = Long.MAX_VALUE;
-
- // The max segment size of a file is determined by the file type
- this.maxSize = getMaxSizeAccordingFileType(storeConfig);
+ this.maxSize = getMaxSizeByFileType();
}
- private long getMaxSizeAccordingFileType(TieredMessageStoreConfig storeConfig) {
+ /**
+ * The max segment size of a file is determined by the file type
+ */
+ protected long getMaxSizeByFileType() {
switch (fileType) {
case COMMIT_LOG:
return storeConfig.getTieredStoreCommitLogMaxSize();
@@ -184,39 +174,23 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
this.appendPosition = pos;
}
- private List<ByteBuffer> rollingUploadBuffer() {
+ private List<ByteBuffer> borrowBuffer() {
bufferLock.lock();
try {
- List<ByteBuffer> tmp = uploadBufferList;
- uploadBufferList = new ArrayList<>();
+ List<ByteBuffer> tmp = bufferList;
+ bufferList = new ArrayList<>();
return tmp;
} finally {
bufferLock.unlock();
}
}
- private void sendBackBuffer(TieredFileSegmentInputStream inputStream) {
- bufferLock.lock();
- try {
- List<ByteBuffer> tmpBufferList = inputStream.getUploadBufferList();
- for (ByteBuffer buffer : tmpBufferList) {
- buffer.rewind();
- }
- tmpBufferList.addAll(uploadBufferList);
- uploadBufferList = tmpBufferList;
- if (inputStream.getCodaBuffer() != null) {
- codaBuffer.rewind();
- }
- } finally {
- bufferLock.unlock();
- }
- }
-
@SuppressWarnings("NonAtomicOperationOnVolatileField")
- public AppendResult append(ByteBuffer byteBuf, long timeStamp) {
+ public AppendResult append(ByteBuffer byteBuf, long timestamp) {
if (closed) {
return AppendResult.FILE_CLOSED;
}
+
bufferLock.lock();
try {
if (full || codaBuffer != null) {
@@ -227,7 +201,8 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
minTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
maxTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
appendPosition += byteBuf.remaining();
- uploadBufferList.add(byteBuf);
+ // IndexFile is large and not change after compaction, no need deep copy
+ bufferList.add(byteBuf);
setFull();
return AppendResult.SUCCESS;
}
@@ -236,23 +211,34 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
setFull();
return AppendResult.FILE_FULL;
}
- if (uploadBufferList.size() > storeConfig.getTieredStoreGroupCommitCount()
+
+ if (bufferList.size() > storeConfig.getTieredStoreGroupCommitCount()
|| appendPosition - commitPosition > storeConfig.getTieredStoreGroupCommitSize()) {
commitAsync();
}
- if (uploadBufferList.size() > storeConfig.getTieredStoreMaxGroupCommitCount()) {
- logger.debug("TieredFileSegment#append: buffer full: file: {}, upload buffer size: {}",
- getPath(), uploadBufferList.size());
+
+ if (bufferList.size() > storeConfig.getTieredStoreMaxGroupCommitCount()) {
+ logger.debug("File segment append buffer full, file: {}, buffer size: {}, pending bytes: {}",
+ getPath(), bufferList.size(), appendPosition - commitPosition);
return AppendResult.BUFFER_FULL;
}
- if (timeStamp != Long.MAX_VALUE) {
- maxTimestamp = timeStamp;
+
+ if (timestamp != Long.MAX_VALUE) {
+ maxTimestamp = timestamp;
if (minTimestamp == Long.MAX_VALUE) {
- minTimestamp = timeStamp;
+ minTimestamp = timestamp;
}
}
+
appendPosition += byteBuf.remaining();
- uploadBufferList.add(byteBuf);
+
+ // deep copy buffer
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(byteBuf.remaining());
+ byteBuffer.put(byteBuf);
+ byteBuffer.flip();
+ byteBuf.rewind();
+
+ bufferList.add(byteBuffer);
return AppendResult.SUCCESS;
} finally {
bufferLock.unlock();
@@ -267,7 +253,6 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
return appendPosition;
}
- @VisibleForTesting
public void setAppendPosition(long appendPosition) {
this.appendPosition = appendPosition;
}
@@ -333,6 +318,8 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
if (closed) {
return false;
}
+ // result is false when we send real commit request
+ // use join for wait flight request done
Boolean result = commitAsync().join();
if (!result) {
result = flightCommitRequest.join();
@@ -340,92 +327,156 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
return result;
}
+ private void releaseCommitLock() {
+ if (commitLock.availablePermits() == 0) {
+ commitLock.release();
+ } else {
+ logger.error("[Bug] FileSegmentCommitAsync, lock is already released: available permits: {}",
+ commitLock.availablePermits());
+ }
+ }
+
+ private void updateDispatchCommitOffset(List<ByteBuffer> bufferList) {
+ if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) {
+ dispatchCommitOffset =
+ MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
+ }
+ }
+
+ /**
+ * @return false: commit, true: no commit operation
+ */
@SuppressWarnings("NonAtomicOperationOnVolatileField")
public CompletableFuture<Boolean> commitAsync() {
if (closed) {
return CompletableFuture.completedFuture(false);
}
- Stopwatch stopwatch = Stopwatch.createStarted();
+
if (!needCommit()) {
return CompletableFuture.completedFuture(true);
}
- try {
- int permits = commitLock.drainPermits();
- if (permits <= 0) {
- return CompletableFuture.completedFuture(false);
- }
- } catch (Exception e) {
+
+ if (commitLock.drainPermits() <= 0) {
return CompletableFuture.completedFuture(false);
}
- List<ByteBuffer> bufferList = rollingUploadBuffer();
- int bufferSize = 0;
- for (ByteBuffer buffer : bufferList) {
- bufferSize += buffer.remaining();
- }
- if (codaBuffer != null) {
- bufferSize += codaBuffer.remaining();
- }
- if (bufferSize == 0) {
- return CompletableFuture.completedFuture(true);
- }
- TieredFileSegmentInputStream inputStream = TieredFileSegmentInputStreamFactory.build(
- fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize);
- int finalBufferSize = bufferSize;
+
try {
- flightCommitRequest = commit0(inputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX)
+ if (fileSegmentInputStream != null) {
+ long fileSize = this.getSize();
+ if (fileSize == -1L) {
+ logger.error("Get commit position error before commit, Commit: %d, Expect: %d, Current Max: %d, FileName: %s",
+ commitPosition, commitPosition + fileSegmentInputStream.getContentLength(), appendPosition, getPath());
+ releaseCommitLock();
+ return CompletableFuture.completedFuture(false);
+ } else {
+ if (correctPosition(fileSize, null)) {
+ updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
+ fileSegmentInputStream = null;
+ }
+ }
+ }
+
+ int bufferSize;
+ if (fileSegmentInputStream != null) {
+ bufferSize = fileSegmentInputStream.available();
+ } else {
+ List<ByteBuffer> bufferList = borrowBuffer();
+ bufferSize = bufferList.stream().mapToInt(ByteBuffer::remaining).sum()
+ + (codaBuffer != null ? codaBuffer.remaining() : 0);
+ if (bufferSize == 0) {
+ releaseCommitLock();
+ return CompletableFuture.completedFuture(true);
+ }
+ fileSegmentInputStream = FileSegmentInputStreamFactory.build(
+ fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize);
+ }
+
+ return flightCommitRequest = this
+ .commit0(fileSegmentInputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX)
.thenApply(result -> {
if (result) {
- if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) {
- dispatchCommitOffset = MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
- }
- commitPosition += finalBufferSize;
+ updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
+ commitPosition += bufferSize;
+ fileSegmentInputStream = null;
return true;
- }
- sendBackBuffer(inputStream);
- return false;
- })
- .exceptionally(e -> handleCommitException(inputStream, e))
- .whenComplete((result, e) -> {
- if (commitLock.availablePermits() == 0) {
- logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize);
- commitLock.release();
} else {
- logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits());
+ fileSegmentInputStream.rewind();
+ return false;
}
- });
- return flightCommitRequest;
+ })
+ .exceptionally(this::handleCommitException)
+ .whenComplete((result, e) -> releaseCommitLock());
+
} catch (Exception e) {
- handleCommitException(inputStream, e);
- if (commitLock.availablePermits() == 0) {
- logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize);
- commitLock.release();
- } else {
- logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits());
- }
+ handleCommitException(e);
+ releaseCommitLock();
}
return CompletableFuture.completedFuture(false);
}
- private boolean handleCommitException(TieredFileSegmentInputStream inputStream, Throwable e) {
+ private long getCorrectFileSize(Throwable throwable) {
+ if (throwable instanceof TieredStoreException) {
+ long fileSize = ((TieredStoreException) throwable).getPosition();
+ if (fileSize > 0) {
+ return fileSize;
+ }
+ }
+ return getSize();
+ }
+
+ private boolean handleCommitException(Throwable e) {
+ // Get root cause here
Throwable cause = e.getCause() != null ? e.getCause() : e;
- sendBackBuffer(inputStream);
- long realSize = 0;
- if (cause instanceof TieredStoreException && ((TieredStoreException) cause).getPosition() > 0) {
- realSize = ((TieredStoreException) cause).getPosition();
+ long fileSize = this.getCorrectFileSize(cause);
+
+ if (fileSize == -1L) {
+ logger.error("Get commit position error, Commit: %d, Expect: %d, Current Max: %d, FileName: %s",
+ commitPosition, commitPosition + fileSegmentInputStream.getContentLength(), appendPosition, getPath());
+ fileSegmentInputStream.rewind();
+ return false;
+ }
+
+ if (correctPosition(fileSize, cause)) {
+ updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
+ fileSegmentInputStream = null;
+ return true;
+ } else {
+ fileSegmentInputStream.rewind();
+ return false;
}
- if (realSize <= 0) {
- realSize = getSize();
+ }
+
+ /**
+ * return true to clear buffer
+ */
+ private boolean correctPosition(long fileSize, Throwable throwable) {
+
+ // Current we have three offsets here: commit offset, expect offset, file size.
+ // We guarantee that the commit offset is less than or equal to the expect offset.
+ // Max offset will increase because we can continuously put in new buffers
+ String handleInfo = throwable == null ? "before commit" : "after commit";
+ long expectPosition = commitPosition + fileSegmentInputStream.getContentLength();
+
+ String offsetInfo = String.format("Correct Commit Position, %s, result=[{}], " +
+ "Commit: %d, Expect: %d, Current Max: %d, FileSize: %d, FileName: %s",
+ handleInfo, commitPosition, expectPosition, appendPosition, fileSize, this.getPath());
+
+ // We are believing that the file size returned by the server is correct,
+ // can reset the commit offset to the file size reported by the storage system.
+ if (fileSize == expectPosition) {
+ logger.info(offsetInfo, "Success", throwable);
+ commitPosition = fileSize;
+ return true;
}
- if (realSize > 0 && realSize > commitPosition) {
- logger.error("TieredFileSegment#handleCommitException: commit failed: file: {}, try to fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause);
- // TODO check if this diff part is uploaded to backend storage
- long diff = appendPosition - commitPosition;
- commitPosition = realSize;
- appendPosition = realSize + diff;
- // TODO check if appendPosition is large than maxOffset
- } else if (realSize < commitPosition) {
- logger.error("[Bug]TieredFileSegment#handleCommitException: commit failed: file: {}, can not fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause);
+
+ if (fileSize < commitPosition) {
+ logger.error(offsetInfo, "FileSizeIncorrect", throwable);
+ } else if (fileSize == commitPosition) {
+ logger.warn(offsetInfo, "CommitFailed", throwable);
+ } else if (fileSize > commitPosition) {
+ logger.warn(offsetInfo, "PartialSuccess", throwable);
}
+ commitPosition = fileSize;
return false;
}
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
index 5a0ca25f5..0db3eaf8f 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.tieredstore.provider;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
public interface TieredStoreProvider {
@@ -30,7 +30,9 @@ public interface TieredStoreProvider {
String getPath();
/**
- * Get file size in backend file system
+ * Get the real length of the file.
+ * Return 0 if the file does not exist,
+ * Return -1 if system get size failed.
*
* @return file real size
*/
@@ -71,5 +73,5 @@ public interface TieredStoreProvider {
* @param append try to append or create a new file
* @return put result, <code>true</code> if data successfully write; <code>false</code> otherwise
*/
- CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream,long position, int length, boolean append);
+ CompletableFuture<Boolean> commit0(FileSegmentInputStream inputStream,long position, int length, boolean append);
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 52be90b1d..7e949cb28 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -36,7 +36,7 @@ import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
@@ -184,7 +184,7 @@ public class PosixFileSegment extends TieredFileSegment {
@Override
public CompletableFuture<Boolean> commit0(
- TieredFileSegmentInputStream inputStream, long position, int length, boolean append) {
+ FileSegmentInputStream inputStream, long position, int length, boolean append) {
Stopwatch stopwatch = Stopwatch.createStarted();
AttributesBuilder attributesBuilder = newAttributesBuilder()
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java
similarity index 88%
rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java
index c70bb7656..13b6e0ef9 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.tieredstore.provider.inputstream;
+package org.apache.rocketmq.tieredstore.provider.stream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -23,20 +23,23 @@ import java.util.List;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
-public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
+public class CommitLogInputStream extends FileSegmentInputStream {
/**
* commitLogOffset is the real physical offset of the commitLog buffer which is being read
*/
+ private final long startCommitLogOffset;
+
private long commitLogOffset;
private final ByteBuffer codaBuffer;
private long markCommitLogOffset = -1;
- public TieredCommitLogInputStream(FileSegmentType fileType, long startOffset,
+ public CommitLogInputStream(FileSegmentType fileType, long startOffset,
List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) {
super(fileType, uploadBufferList, contentLength);
+ this.startCommitLogOffset = startOffset;
this.commitLogOffset = startOffset;
this.codaBuffer = codaBuffer;
}
@@ -53,6 +56,15 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
this.commitLogOffset = markCommitLogOffset;
}
+ @Override
+ public synchronized void rewind() {
+ super.rewind();
+ this.commitLogOffset = this.startCommitLogOffset;
+ if (this.codaBuffer != null) {
+ this.codaBuffer.rewind();
+ }
+ }
+
@Override
public ByteBuffer getCodaBuffer() {
return this.codaBuffer;
@@ -64,17 +76,17 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
return -1;
}
readPosition++;
- if (curReadBufferIndex >= uploadBufferList.size()) {
+ if (curReadBufferIndex >= bufferList.size()) {
return readCoda();
}
int res;
if (readPosInCurBuffer >= curBuffer.remaining()) {
curReadBufferIndex++;
- if (curReadBufferIndex >= uploadBufferList.size()) {
+ if (curReadBufferIndex >= bufferList.size()) {
readPosInCurBuffer = 0;
return readCoda();
}
- curBuffer = uploadBufferList.get(curReadBufferIndex);
+ curBuffer = bufferList.get(curReadBufferIndex);
commitLogOffset += readPosInCurBuffer;
readPosInCurBuffer = 0;
}
@@ -119,9 +131,9 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
int posInCurBuffer = readPosInCurBuffer;
long curCommitLogOffset = commitLogOffset;
ByteBuffer curBuf = curBuffer;
- while (needRead > 0 && bufIndex <= uploadBufferList.size()) {
+ while (needRead > 0 && bufIndex <= bufferList.size()) {
int readLen, remaining, realReadLen = 0;
- if (bufIndex == uploadBufferList.size()) {
+ if (bufIndex == bufferList.size()) {
// read from coda buffer
remaining = codaBuffer.remaining() - posInCurBuffer;
readLen = Math.min(remaining, needRead);
@@ -137,7 +149,7 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
}
remaining = curBuf.remaining() - posInCurBuffer;
readLen = Math.min(remaining, needRead);
- curBuf = uploadBufferList.get(bufIndex);
+ curBuf = bufferList.get(bufIndex);
if (posInCurBuffer < MessageBufferUtil.PHYSICAL_OFFSET_POSITION) {
realReadLen = Math.min(MessageBufferUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer, readLen);
// read from commitLog buffer
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java
similarity index 77%
rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java
index e1758ca93..9e9d5135c 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java
@@ -15,15 +15,16 @@
* limitations under the License.
*/
-package org.apache.rocketmq.tieredstore.provider.inputstream;
+package org.apache.rocketmq.tieredstore.provider.stream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
-public class TieredFileSegmentInputStream extends InputStream {
+public class FileSegmentInputStream extends InputStream {
/**
* file type, can be commitlog, consume queue or indexfile now
@@ -33,7 +34,7 @@ public class TieredFileSegmentInputStream extends InputStream {
/**
* hold bytebuffer
*/
- protected final List<ByteBuffer> uploadBufferList;
+ protected final List<ByteBuffer> bufferList;
/**
* total remaining of bytebuffer list
@@ -65,13 +66,13 @@ public class TieredFileSegmentInputStream extends InputStream {
private int markReadPosInCurBuffer = -1;
- public TieredFileSegmentInputStream(FileSegmentType fileType, List<ByteBuffer> uploadBufferList,
- int contentLength) {
+ public FileSegmentInputStream(
+ FileSegmentType fileType, List<ByteBuffer> bufferList, int contentLength) {
this.fileType = fileType;
this.contentLength = contentLength;
- this.uploadBufferList = uploadBufferList;
- if (uploadBufferList != null && uploadBufferList.size() > 0) {
- this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+ this.bufferList = bufferList;
+ if (bufferList != null && bufferList.size() > 0) {
+ this.curBuffer = bufferList.get(curReadBufferIndex);
}
}
@@ -95,18 +96,34 @@ public class TieredFileSegmentInputStream extends InputStream {
this.readPosition = markReadPosition;
this.curReadBufferIndex = markCurReadBufferIndex;
this.readPosInCurBuffer = markReadPosInCurBuffer;
- if (this.curReadBufferIndex < uploadBufferList.size()) {
- this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+ if (this.curReadBufferIndex < bufferList.size()) {
+ this.curBuffer = bufferList.get(curReadBufferIndex);
}
}
+ public synchronized void rewind() {
+ this.readPosition = 0;
+ this.curReadBufferIndex = 0;
+ this.readPosInCurBuffer = 0;
+ if (CollectionUtils.isNotEmpty(bufferList)) {
+ this.curBuffer = bufferList.get(0);
+ for (ByteBuffer buffer : bufferList) {
+ buffer.rewind();
+ }
+ }
+ }
+
+ public int getContentLength() {
+ return contentLength;
+ }
+
@Override
public int available() {
return contentLength - readPosition;
}
- public List<ByteBuffer> getUploadBufferList() {
- return uploadBufferList;
+ public List<ByteBuffer> getBufferList() {
+ return bufferList;
}
public ByteBuffer getCodaBuffer() {
@@ -121,10 +138,10 @@ public class TieredFileSegmentInputStream extends InputStream {
readPosition++;
if (readPosInCurBuffer >= curBuffer.remaining()) {
curReadBufferIndex++;
- if (curReadBufferIndex >= uploadBufferList.size()) {
+ if (curReadBufferIndex >= bufferList.size()) {
return -1;
}
- curBuffer = uploadBufferList.get(curReadBufferIndex);
+ curBuffer = bufferList.get(curReadBufferIndex);
readPosInCurBuffer = 0;
}
return curBuffer.get(readPosInCurBuffer++) & 0xff;
@@ -153,8 +170,8 @@ public class TieredFileSegmentInputStream extends InputStream {
int bufIndex = curReadBufferIndex;
int posInCurBuffer = readPosInCurBuffer;
ByteBuffer curBuf = curBuffer;
- while (needRead > 0 && bufIndex < uploadBufferList.size()) {
- curBuf = uploadBufferList.get(bufIndex);
+ while (needRead > 0 && bufIndex < bufferList.size()) {
+ curBuf = bufferList.get(bufIndex);
int remaining = curBuf.remaining() - posInCurBuffer;
int readLen = Math.min(remaining, needRead);
// read from curBuf
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java
similarity index 54%
rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java
index d0c983fd4..a90baff3a 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java
@@ -15,30 +15,34 @@
* limitations under the License.
*/
-package org.apache.rocketmq.tieredstore.provider.inputstream;
+package org.apache.rocketmq.tieredstore.provider.stream;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
-public class TieredFileSegmentInputStreamFactory {
+public class FileSegmentInputStreamFactory {
- public static TieredFileSegmentInputStream build(FileSegmentType fileType,
- long startOffset, List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) {
+ public static FileSegmentInputStream build(
+ FileSegmentType fileType, long offset, List<ByteBuffer> bufferList, ByteBuffer byteBuffer, int length) {
+
+ if (bufferList == null) {
+ throw new IllegalArgumentException("bufferList is null");
+ }
switch (fileType) {
case COMMIT_LOG:
- return new TieredCommitLogInputStream(
- fileType, startOffset, uploadBufferList, codaBuffer, contentLength);
+ return new CommitLogInputStream(
+ fileType, offset, bufferList, byteBuffer, length);
case CONSUME_QUEUE:
- return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength);
+ return new FileSegmentInputStream(fileType, bufferList, length);
case INDEX:
- if (uploadBufferList.size() != 1) {
- throw new IllegalArgumentException("uploadBufferList size in INDEX type input stream must be 1");
+ if (bufferList.size() != 1) {
+ throw new IllegalArgumentException("buffer block size must be 1 when file type is IndexFile");
}
- return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength);
+ return new FileSegmentInputStream(fileType, bufferList, length);
default:
- throw new IllegalArgumentException("fileType is not supported");
+ throw new IllegalArgumentException("file type is not supported");
}
}
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 8601392e7..2451199c2 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -130,36 +130,36 @@ public class TieredMessageStoreTest {
// TieredStorageLevel.DISABLE
properties.setProperty("tieredStorageLevel", "0");
configuration.update(properties);
- Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
+ Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
// TieredStorageLevel.NOT_IN_DISK
properties.setProperty("tieredStorageLevel", "1");
configuration.update(properties);
when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(false);
- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
+ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true);
- Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
+ Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
// TieredStorageLevel.NOT_IN_MEM
properties.setProperty("tieredStorageLevel", "2");
configuration.update(properties);
Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(false);
Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(true);
- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
+ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true);
Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(false);
- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
+ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true);
Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(true);
- Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
+ Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
// TieredStorageLevel.FORCE
properties.setProperty("tieredStorageLevel", "3");
configuration.update(properties);
- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0));
+ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0));
}
@Test
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
index cc39cfbfc..7a4d05969 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
@@ -55,6 +56,7 @@ public class TieredFlatFileTest {
public void tearDown() throws IOException {
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyTempDir(storePath);
+ TieredStoreExecutor.shutdown();
}
private List<FileSegmentMetadata> getSegmentMetadataList(TieredMetadataStore metadataStore) {
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
index 262d6645b..2da72bc7a 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
@@ -87,5 +87,7 @@ public class TieredIndexFileTest {
indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
Assert.assertEquals(1, indexList.size());
+
+ indexFile.destroy();
}
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java
similarity index 82%
rename from tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java
rename to tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java
index a6566b7de..3bbe41dd4 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java
@@ -20,13 +20,13 @@ package org.apache.rocketmq.tieredstore.provider;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
-public class MockTieredFileSegmentInputStream extends TieredFileSegmentInputStream {
+public class MockFileSegmentInputStream extends FileSegmentInputStream {
private final InputStream inputStream;
- public MockTieredFileSegmentInputStream(InputStream inputStream) {
+ public MockFileSegmentInputStream(InputStream inputStream) {
super(null, null, Integer.MAX_VALUE);
this.inputStream = inputStream;
}
@@ -43,7 +43,7 @@ public class MockTieredFileSegmentInputStream extends TieredFileSegmentInputStre
}
@Override
- public List<ByteBuffer> getUploadBufferList() {
+ public List<ByteBuffer> getBufferList() {
return null;
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
index a2554ba3d..743d9182c 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
@@ -28,8 +28,8 @@ import java.util.Random;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.junit.Assert;
@@ -57,7 +57,7 @@ public class TieredFileSegmentInputStreamTest {
bufferSize += byteBuffer.remaining();
}
- // build expected byte buffer for verifying the TieredFileSegmentInputStream
+ // build expected byte buffer for verifying the FileSegmentInputStream
ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
for (ByteBuffer byteBuffer : uploadBufferList) {
expectedByteBuffer.put(byteBuffer);
@@ -74,7 +74,7 @@ public class TieredFileSegmentInputStreamTest {
int[] batchReadSizeTestSet = {
MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1, MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1, MSG_LEN - 1, MSG_LEN, MSG_LEN + 1
};
- verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
+ verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build(
FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), finalBufferSize, batchReadSizeTestSet);
}
@@ -98,7 +98,7 @@ public class TieredFileSegmentInputStreamTest {
int codaBufferSize = codaBuffer.remaining();
bufferSize += codaBufferSize;
- // build expected byte buffer for verifying the TieredFileSegmentInputStream
+ // build expected byte buffer for verifying the FileSegmentInputStream
ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
for (ByteBuffer byteBuffer : uploadBufferList) {
expectedByteBuffer.put(byteBuffer);
@@ -119,7 +119,7 @@ public class TieredFileSegmentInputStreamTest {
MSG_LEN - 1, MSG_LEN, MSG_LEN + 1,
bufferSize - 1, bufferSize, bufferSize + 1
};
- verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
+ verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build(
FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, codaBuffer, finalBufferSize), finalBufferSize, batchReadSizeTestSet);
}
@@ -134,7 +134,7 @@ public class TieredFileSegmentInputStreamTest {
bufferSize += byteBuffer.remaining();
}
- // build expected byte buffer for verifying the TieredFileSegmentInputStream
+ // build expected byte buffer for verifying the FileSegmentInputStream
ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
for (ByteBuffer byteBuffer : uploadBufferList) {
expectedByteBuffer.put(byteBuffer);
@@ -143,7 +143,7 @@ public class TieredFileSegmentInputStreamTest {
int finalBufferSize = bufferSize;
int[] batchReadSizeTestSet = {TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE - 1, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE + 1};
- verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
+ verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build(
FileSegmentType.CONSUME_QUEUE, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), bufferSize, batchReadSizeTestSet);
}
@@ -156,16 +156,16 @@ public class TieredFileSegmentInputStreamTest {
byteBuffer.flip();
List<ByteBuffer> uploadBufferList = Arrays.asList(byteBuffer);
- // build expected byte buffer for verifying the TieredFileSegmentInputStream
+ // build expected byte buffer for verifying the FileSegmentInputStream
ByteBuffer expectedByteBuffer = byteBuffer.slice();
- verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
+ verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build(
FileSegmentType.INDEX, COMMIT_LOG_START_OFFSET, uploadBufferList, null, byteBuffer.limit()), byteBuffer.limit(), new int[] {23, 24, 25});
}
- private void verifyReadAndReset(ByteBuffer expectedByteBuffer, Supplier<TieredFileSegmentInputStream> constructor,
+ private void verifyReadAndReset(ByteBuffer expectedByteBuffer, Supplier<FileSegmentInputStream> constructor,
int bufferSize, int[] readBatchSizeTestSet) {
- TieredFileSegmentInputStream inputStream = constructor.get();
+ FileSegmentInputStream inputStream = constructor.get();
// verify
verifyInputStream(inputStream, expectedByteBuffer);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
index 4cd83e0d2..a655710a5 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
@@ -116,13 +116,22 @@ public class TieredFileSegmentTest {
}
@Test
- public void testCommitFailed() {
+ public void testCommitFailedThenSuccess() {
long startTime = System.currentTimeMillis();
MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(FileSegmentType.COMMIT_LOG);
long lastSize = segment.getSize();
- segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
- segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
-
+ segment.setCheckSize(false);
+ segment.initPosition(lastSize);
+ segment.setSize((int) lastSize);
+
+ ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize);
+ ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN);
+ segment.append(buffer1, 0);
+ segment.append(buffer2, 0);
+
+ // Mock new message arrive
segment.blocker = new CompletableFuture<>();
new Thread(() -> {
try {
@@ -131,20 +140,88 @@ public class TieredFileSegmentTest {
Assert.fail(e.getMessage());
}
ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
+ buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN * 2);
buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime);
segment.append(buffer, 0);
segment.blocker.complete(false);
}).start();
+ // Commit failed
segment.commit();
segment.blocker.join();
+ segment.blocker = null;
+
+ // Copy data and assume commit success
+ segment.getMemStore().put(buffer1);
+ segment.getMemStore().put(buffer2);
+ segment.setSize((int) (lastSize + MessageBufferUtilTest.MSG_LEN * 2));
- segment.blocker = new CompletableFuture<>();
- segment.blocker.complete(true);
segment.commit();
+ Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitPosition());
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset());
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+
+ ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN);
+ Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1));
+
+ ByteBuffer msg2 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN);
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtil.getCommitLogOffset(msg2));
+
+ ByteBuffer msg3 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN);
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtil.getCommitLogOffset(msg3));
+ }
+
+ @Test
+ public void testCommitFailed3Times() {
+ long startTime = System.currentTimeMillis();
+ MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(FileSegmentType.COMMIT_LOG);
+ long lastSize = segment.getSize();
+ segment.setCheckSize(false);
+ segment.initPosition(lastSize);
+ segment.setSize((int) lastSize);
+
+ ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize);
+ ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN);
+ segment.append(buffer1, 0);
+ segment.append(buffer2, 0);
+
+ // Mock new message arrive
+ segment.blocker = new CompletableFuture<>();
+ new Thread(() -> {
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ Assert.fail(e.getMessage());
+ }
+ ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
+ buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN * 2);
+ buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime);
+ segment.append(buffer, 0);
+ segment.blocker.complete(false);
+ }).start();
+
+ for (int i = 0; i < 3; i++) {
+ segment.commit();
+ }
+
+ Assert.assertEquals(lastSize, segment.getCommitPosition());
+ Assert.assertEquals(baseOffset + lastSize, segment.getCommitOffset());
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+
+ segment.blocker.join();
+ segment.blocker = null;
+ segment.commit();
+ Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 2, segment.getCommitPosition());
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, segment.getCommitOffset());
Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+
+ segment.commit();
+ Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitPosition());
Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset());
+ Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN);
Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1));
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
index cb155cf8f..80ad41f68 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.junit.Assert;
@@ -33,6 +33,8 @@ public class MemoryFileSegment extends TieredFileSegment {
public CompletableFuture<Boolean> blocker;
+ protected int size = 0;
+
protected boolean checkSize = true;
public MemoryFileSegment(FileSegmentType fileType, MessageQueue messageQueue, long baseOffset,
@@ -56,6 +58,18 @@ public class MemoryFileSegment extends TieredFileSegment {
memStore.position((int) getSize());
}
+ public boolean isCheckSize() {
+ return checkSize;
+ }
+
+ public void setCheckSize(boolean checkSize) {
+ this.checkSize = checkSize;
+ }
+
+ public ByteBuffer getMemStore() {
+ return memStore;
+ }
+
@Override
public String getPath() {
return filePath;
@@ -66,7 +80,11 @@ public class MemoryFileSegment extends TieredFileSegment {
if (checkSize) {
return 1000;
}
- return 0;
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
}
@Override
@@ -85,11 +103,11 @@ public class MemoryFileSegment extends TieredFileSegment {
@Override
public CompletableFuture<Boolean> commit0(
- TieredFileSegmentInputStream inputStream, long position, int length, boolean append) {
+ FileSegmentInputStream inputStream, long position, int length, boolean append) {
try {
if (blocker != null && !blocker.get()) {
- throw new IllegalStateException();
+ throw new IllegalStateException("Commit Exception for Memory Test");
}
} catch (InterruptedException | ExecutionException e) {
Assert.fail(e.getMessage());
@@ -98,7 +116,6 @@ public class MemoryFileSegment extends TieredFileSegment {
Assert.assertTrue(!checkSize || position >= getSize());
byte[] buffer = new byte[1024];
-
int startPos = memStore.position();
try {
int len;
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
index 8ac330b37..630fd2223 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
@@ -22,7 +22,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
-import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.junit.Assert;
@@ -46,7 +46,7 @@ public class MemoryFileSegmentWithoutCheck extends MemoryFileSegment {
}
@Override
- public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
+ public CompletableFuture<Boolean> commit0(FileSegmentInputStream inputStream, long position, int length,
boolean append) {
try {
if (blocker != null && !blocker.get()) {
--
2.32.0.windows.2
From d000ef947d7c99918ceba0fa451c1e29fd84ba07 Mon Sep 17 00:00:00 2001
From: yuz10 <845238369@qq.com>
Date: Thu, 31 Aug 2023 09:41:33 +0800
Subject: [PATCH 3/7] [ISSUE #7283] Incorrect dledger commitlog min offset
after mappedFile re delete failed (#7284)
---
.../apache/rocketmq/store/dledger/DLedgerCommitLog.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index ec5e86d70..d5f6acdc0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -162,7 +162,12 @@ public class DLedgerCommitLog extends CommitLog {
if (!mappedFileQueue.getMappedFiles().isEmpty()) {
return mappedFileQueue.getMinOffset();
}
- return dLedgerFileList.getMinOffset();
+ for (MmapFile file : dLedgerFileList.getMappedFiles()) {
+ if (file.isAvailable()) {
+ return file.getFileFromOffset() + file.getStartPosition();
+ }
+ }
+ return 0;
}
@Override
--
2.32.0.windows.2
From f82718ae3b77a16b553c03f672dc971a2d5d48fa Mon Sep 17 00:00:00 2001
From: cnScarb <jjhfen00@163.com>
Date: Thu, 31 Aug 2023 15:50:10 +0800
Subject: [PATCH 4/7] [ISSUE #7208] fix: when deleting topic also delete its
pop retry topic (#7209)
---
.../processor/AdminBrokerProcessor.java | 24 ++++++++++---
.../processor/AdminBrokerProcessorTest.java | 36 +++++++++++++++++++
2 files changed, 55 insertions(+), 5 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index bbddcec2d..8fbcd3c94 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -51,6 +51,7 @@ import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
@@ -542,16 +543,29 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
}
- this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
- this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic());
- this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic());
- this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic());
- this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic()));
+ final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
+ // delete pop retry topics first
+ for (String group : groups) {
+ final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+ if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) {
+ deleteTopicInBroker(popRetryTopic);
+ }
+ }
+ // delete topic
+ deleteTopicInBroker(topic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
+ private void deleteTopicInBroker(String topic) {
+ this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
+ this.brokerController.getTopicQueueMappingManager().delete(topic);
+ this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic);
+ this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic);
+ this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic));
+ }
+
private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index d33a217f7..9d17011b6 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
import org.apache.rocketmq.broker.BrokerController;
@@ -41,6 +42,7 @@ import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
@@ -90,8 +92,11 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -321,6 +326,37 @@ public class AdminBrokerProcessorTest {
"please execute it from master broker.");
}
+ @Test
+ public void testDeleteWithPopRetryTopic() throws Exception {
+ String topic = "topicA";
+ String anotherTopic = "another_topicA";
+
+ topicConfigManager = mock(TopicConfigManager.class);
+ when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+ final ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
+ topicConfigTable.put(topic, new TopicConfig());
+ topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1"), new TopicConfig());
+
+ topicConfigTable.put(anotherTopic, new TopicConfig());
+ topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, "cid2"), new TopicConfig());
+ when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable);
+ when(topicConfigManager.selectTopicConfig(anyString())).thenAnswer(invocation -> {
+ final String selectTopic = invocation.getArgument(0);
+ return topicConfigManager.getTopicConfigTable().get(selectTopic);
+ });
+
+ when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+ when(consumerOffsetManager.whichGroupByTopic(topic)).thenReturn(Sets.newHashSet("cid1"));
+
+ RemotingCommand request = buildDeleteTopicRequest(topic);
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ verify(topicConfigManager).deleteTopicConfig(topic);
+ verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic, "cid1"));
+ verify(messageStore, times(2)).deleteTopics(anySet());
+ }
+
@Test
public void testGetAllTopicConfigInRocksdb() throws Exception {
if (notToBeExecuted()) {
--
2.32.0.windows.2
From 31d10385d1616445478104ce9ef463a8c4852ba2 Mon Sep 17 00:00:00 2001
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
Date: Mon, 4 Sep 2023 14:09:32 +0800
Subject: [PATCH 5/7] [ISSUE #7289] Fixed asynchronous send backpressure
capability
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
---
.../impl/producer/DefaultMQProducerImpl.java | 77 +++++++++++++------
1 file changed, 53 insertions(+), 24 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index bbbb17b07..2d6b83ac2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -547,6 +547,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Deprecated
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
+ BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback);
+
final long beginStartTime = System.currentTimeMillis();
Runnable runnable = new Runnable() {
@Override
@@ -554,20 +556,53 @@ public class DefaultMQProducerImpl implements MQProducerInner {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
- sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
+ sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime);
} catch (Exception e) {
- sendCallback.onException(e);
+ newCallBack.onException(e);
}
} else {
- sendCallback.onException(
+ newCallBack.onException(
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
}
}
};
- executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime);
+ executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
}
- public void executeAsyncMessageSend(Runnable runnable, final Message msg, final SendCallback sendCallback,
+ class BackpressureSendCallBack implements SendCallback {
+ public boolean isSemaphoreAsyncSizeAquired = false;
+ public boolean isSemaphoreAsyncNumAquired = false;
+ public int msgLen;
+ private final SendCallback sendCallback;
+
+ public BackpressureSendCallBack(final SendCallback sendCallback) {
+ this.sendCallback = sendCallback;
+ }
+
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ if (isSemaphoreAsyncSizeAquired) {
+ semaphoreAsyncSendSize.release(msgLen);
+ }
+ if (isSemaphoreAsyncNumAquired) {
+ semaphoreAsyncSendNum.release();
+ }
+ sendCallback.onSuccess(sendResult);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ if (isSemaphoreAsyncSizeAquired) {
+ semaphoreAsyncSendSize.release(msgLen);
+ }
+ if (isSemaphoreAsyncNumAquired) {
+ semaphoreAsyncSendNum.release();
+ }
+ sendCallback.onException(e);
+ }
+ }
+
+ public void executeAsyncMessageSend(Runnable runnable, final Message msg, final BackpressureSendCallBack sendCallback,
final long timeout, final long beginStartTime)
throws MQClientException, InterruptedException {
ExecutorService executor = this.getAsyncSenderExecutor();
@@ -595,7 +630,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return;
}
}
-
+ sendCallback.isSemaphoreAsyncSizeAquired = isSemaphoreAsyncSizeAquired;
+ sendCallback.isSemaphoreAsyncNumAquired = isSemaphoreAsyncNumAquired;
+ sendCallback.msgLen = msgLen;
executor.submit(runnable);
} catch (RejectedExecutionException e) {
if (isEnableBackpressureForAsyncMode) {
@@ -603,15 +640,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} else {
throw new MQClientException("executor rejected ", e);
}
- } finally {
- if (isSemaphoreAsyncSizeAquired) {
- semaphoreAsyncSendSize.release(msgLen);
- }
- if (isSemaphoreAsyncNumAquired) {
- semaphoreAsyncSendNum.release();
- }
}
-
}
public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg,
@@ -1188,7 +1217,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Deprecated
public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
-
+ BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback);
final long beginStartTime = System.currentTimeMillis();
Runnable runnable = new Runnable() {
@Override
@@ -1203,22 +1232,22 @@ public class DefaultMQProducerImpl implements MQProducerInner {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
- sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null,
+ sendKernelImpl(msg, mq, CommunicationMode.ASYNC, newCallBack, null,
timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e);
}
} else {
- sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
+ newCallBack.onException(new RemotingTooMuchRequestException("call timeout"));
}
} catch (Exception e) {
- sendCallback.onException(e);
+ newCallBack.onException(e);
}
}
};
- executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime);
+ executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
}
/**
@@ -1315,7 +1344,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
-
+ BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback);
final long beginStartTime = System.currentTimeMillis();
Runnable runnable = new Runnable() {
@Override
@@ -1324,21 +1353,21 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (timeout > costTime) {
try {
try {
- sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
+ sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, newCallBack,
timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e);
}
} catch (Exception e) {
- sendCallback.onException(e);
+ newCallBack.onException(e);
}
} else {
- sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
+ newCallBack.onException(new RemotingTooMuchRequestException("call timeout"));
}
}
};
- executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime);
+ executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
}
/**
--
2.32.0.windows.2
From d67b9d64cbd53824798af57ba18770e0fcefa37a Mon Sep 17 00:00:00 2001
From: yuz10 <845238369@qq.com>
Date: Wed, 6 Sep 2023 14:07:23 +0800
Subject: [PATCH 6/7] [ISSUE #7302] Fix singleTopicRegister code deleted in
merge
---
.../apache/rocketmq/broker/topic/TopicConfigManager.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 1c3b9711f..4e3c1736c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -330,7 +330,11 @@ public class TopicConfigManager extends ConfigManager {
log.error("createTopicIfAbsent ", e);
}
if (createNew && register) {
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+ this.brokerController.registerSingleTopicAll(topicConfig);
+ } else {
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
+ }
}
return getTopicConfig(topicConfig.getTopicName());
}
--
2.32.0.windows.2
From 37017dbaec5c521fd529ef4aecf3658092884f84 Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Wed, 6 Sep 2023 15:23:15 +0800
Subject: [PATCH 7/7] [ISSUE #7305] Fix metrics and transactional module not
shutdown while broker offline cause coredump(#7307)
---
.../java/org/apache/rocketmq/broker/BrokerController.java | 8 ++++++++
.../queue/TransactionalMessageServiceImpl.java | 4 +++-
2 files changed, 11 insertions(+), 1 deletion(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index e8f943702..6aba70cb2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1302,6 +1302,10 @@ public class BrokerController {
this.fastRemotingServer.shutdown();
}
+ if (this.brokerMetricsManager != null) {
+ this.brokerMetricsManager.shutdown();
+ }
+
if (this.brokerStatsManager != null) {
this.brokerStatsManager.shutdown();
}
@@ -1324,6 +1328,10 @@ public class BrokerController {
this.ackMessageProcessor.shutdownPopReviveService();
}
+ if (this.transactionalMessageService != null) {
+ this.transactionalMessageService.close();
+ }
+
if (this.notificationProcessor != null) {
this.notificationProcessor.getPopLongPollingService().shutdown();
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
index 93fa725a9..48db828e0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
@@ -629,7 +629,9 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
@Override
public void close() {
-
+ if (this.transactionalOpBatchService != null) {
+ this.transactionalOpBatchService.shutdown();
+ }
}
public Message getOpMessage(int queueId, String moreData) {
--
2.32.0.windows.2