rocketmq/patch042-backport-Support-message-filtering.patch
2023-12-11 16:08:37 +08:00

1197 lines
66 KiB
Diff

From aec1055830e78f7e710e32ebd467f9f7d208855d Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Mon, 4 Dec 2023 16:12:42 +0800
Subject: [PATCH] [ISSUE #7585] Support message filtering in rocketmq tiered
storage (#7594)
---
.../tieredstore/TieredMessageFetcher.java | 325 ++++++++----------
.../tieredstore/TieredMessageStore.java | 6 +-
.../common/GetMessageResultExt.java | 76 ++++
.../common/SelectBufferResult.java | 51 +++
...er.java => SelectBufferResultWrapper.java} | 53 +--
.../common/TieredMessageStoreConfig.java | 9 +
.../metrics/TieredStoreMetricsManager.java | 4 +-
.../provider/TieredFileSegment.java | 2 +-
.../tieredstore/util/MessageBufferUtil.java | 71 ++--
.../tieredstore/TieredMessageFetcherTest.java | 9 +-
.../common/GetMessageResultExtTest.java | 65 ++++
.../common/SelectBufferResultTest.java | 37 ++
.../util/MessageBufferUtilTest.java | 19 +-
13 files changed, 478 insertions(+), 249 deletions(-)
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/{SelectMappedBufferResultWrapper.java => SelectBufferResultWrapper.java} (55%)
create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java
create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index f739773eb..7b0c47c59 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -19,17 +19,14 @@ package org.apache.rocketmq.tieredstore;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -40,12 +37,13 @@ import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.tieredstore.common.GetMessageResultExt;
import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture;
import org.apache.rocketmq.tieredstore.common.MessageCacheKey;
-import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
-import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
@@ -66,10 +64,10 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
private static final Logger LOGGER = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
private final String brokerName;
- private final TieredMessageStoreConfig storeConfig;
private final TieredMetadataStore metadataStore;
+ private final TieredMessageStoreConfig storeConfig;
private final TieredFlatFileManager flatFileManager;
- private final Cache<MessageCacheKey, SelectMappedBufferResultWrapper> readAheadCache;
+ private final Cache<MessageCacheKey, SelectBufferResultWrapper> readAheadCache;
public TieredMessageFetcher(TieredMessageStoreConfig storeConfig) {
this.storeConfig = storeConfig;
@@ -79,7 +77,7 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
this.readAheadCache = this.initCache(storeConfig);
}
- private Cache<MessageCacheKey, SelectMappedBufferResultWrapper> initCache(TieredMessageStoreConfig storeConfig) {
+ private Cache<MessageCacheKey, SelectBufferResultWrapper> initCache(TieredMessageStoreConfig storeConfig) {
long memoryMaxSize =
(long) (Runtime.getRuntime().maxMemory() * storeConfig.getReadAheadCacheSizeThresholdRate());
@@ -88,60 +86,35 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
.expireAfterWrite(storeConfig.getReadAheadCacheExpireDuration(), TimeUnit.MILLISECONDS)
.maximumWeight(memoryMaxSize)
// Using the buffer size of messages to calculate memory usage
- .weigher((MessageCacheKey key, SelectMappedBufferResultWrapper msg) -> msg.getDuplicateResult().getSize())
+ .weigher((MessageCacheKey key, SelectBufferResultWrapper msg) -> msg.getBufferSize())
.recordStats()
.build();
}
- protected SelectMappedBufferResultWrapper putMessageToCache(CompositeFlatFile flatFile,
- long queueOffset, SelectMappedBufferResult result, long minOffset, long maxOffset, int size) {
-
- return putMessageToCache(flatFile, queueOffset, result, minOffset, maxOffset, size, false);
- }
-
- protected SelectMappedBufferResultWrapper putMessageToCache(CompositeFlatFile flatFile,
- long queueOffset, SelectMappedBufferResult result, long minOffset, long maxOffset, int size, boolean used) {
-
- SelectMappedBufferResultWrapper wrapper =
- new SelectMappedBufferResultWrapper(result, queueOffset, minOffset, maxOffset, size);
- if (used) {
- wrapper.addAccessCount();
- }
- readAheadCache.put(new MessageCacheKey(flatFile, queueOffset), wrapper);
- return wrapper;
- }
-
- // Visible for metrics monitor
- public Cache<MessageCacheKey, SelectMappedBufferResultWrapper> getMessageCache() {
+ @VisibleForTesting
+ public Cache<MessageCacheKey, SelectBufferResultWrapper> getMessageCache() {
return readAheadCache;
}
- // Waiting for the request in transit to complete
- protected CompletableFuture<GetMessageResult> getMessageFromCacheAsync(
- CompositeQueueFlatFile flatFile, String group, long queueOffset, int maxCount) {
-
- return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, true);
+ protected void putMessageToCache(CompositeFlatFile flatFile, SelectBufferResultWrapper result) {
+ readAheadCache.put(new MessageCacheKey(flatFile, result.getOffset()), result);
}
- @Nullable
- protected SelectMappedBufferResultWrapper getMessageFromCache(CompositeFlatFile flatFile, long queueOffset) {
- MessageCacheKey cacheKey = new MessageCacheKey(flatFile, queueOffset);
- return readAheadCache.getIfPresent(cacheKey);
+ protected SelectBufferResultWrapper getMessageFromCache(CompositeFlatFile flatFile, long offset) {
+ return readAheadCache.getIfPresent(new MessageCacheKey(flatFile, offset));
}
- protected void recordCacheAccess(CompositeFlatFile flatFile, String group, long queueOffset,
- List<SelectMappedBufferResultWrapper> resultWrapperList) {
- if (resultWrapperList.size() > 0) {
- queueOffset = resultWrapperList.get(resultWrapperList.size() - 1).getCurOffset();
+ protected void recordCacheAccess(CompositeFlatFile flatFile,
+ String group, long offset, List<SelectBufferResultWrapper> resultWrapperList) {
+ if (!resultWrapperList.isEmpty()) {
+ offset = resultWrapperList.get(resultWrapperList.size() - 1).getOffset();
}
- flatFile.recordGroupAccess(group, queueOffset);
- for (SelectMappedBufferResultWrapper wrapper : resultWrapperList) {
- wrapper.addAccessCount();
- if (wrapper.getAccessCount() >= flatFile.getActiveGroupCount()) {
- MessageCacheKey cacheKey = new MessageCacheKey(flatFile, wrapper.getCurOffset());
- readAheadCache.invalidate(cacheKey);
+ flatFile.recordGroupAccess(group, offset);
+ resultWrapperList.forEach(wrapper -> {
+ if (wrapper.incrementAndGet() >= flatFile.getActiveGroupCount()) {
+ readAheadCache.invalidate(new MessageCacheKey(flatFile, wrapper.getOffset()));
}
- }
+ });
}
private void prefetchMessage(CompositeQueueFlatFile flatFile, String group, int maxCount, long nextBeginOffset) {
@@ -149,7 +122,6 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
return;
}
- MessageQueue mq = flatFile.getMessageQueue();
// make sure there is only one request per group and request range
int prefetchBatchSize = Math.min(maxCount * flatFile.getReadAheadFactor(), storeConfig.getReadAheadMessageCountThreshold());
InFlightRequestFuture inflightRequest = flatFile.getInflightRequest(group, nextBeginOffset, prefetchBatchSize);
@@ -166,13 +138,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
long maxOffsetOfLastRequest = inflightRequest.getLastFuture().join();
boolean lastRequestIsExpired = getMessageFromCache(flatFile, nextBeginOffset) == null;
- // if message fetch by last request is expired, we need to prefetch the message from tiered store
- int cacheRemainCount = (int) (maxOffsetOfLastRequest - nextBeginOffset);
- LOGGER.debug("TieredMessageFetcher#preFetchMessage: group={}, nextBeginOffset={}, maxOffsetOfLastRequest={}, lastRequestIsExpired={}, cacheRemainCount={}",
- group, nextBeginOffset, maxOffsetOfLastRequest, lastRequestIsExpired, cacheRemainCount);
-
- if (lastRequestIsExpired
- || maxOffsetOfLastRequest != -1L && nextBeginOffset >= inflightRequest.getStartOffset()) {
+ if (lastRequestIsExpired ||
+ maxOffsetOfLastRequest != -1L && nextBeginOffset >= inflightRequest.getStartOffset()) {
long queueOffset;
if (lastRequestIsExpired) {
@@ -196,12 +163,12 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
long nextQueueOffset = queueOffset;
if (flag == 1) {
int firstBatchSize = factor % storeConfig.getReadAheadBatchSizeFactorThreshold() * maxCount;
- CompletableFuture<Long> future = prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset, firstBatchSize);
+ CompletableFuture<Long> future = prefetchMessageThenPutToCache(flatFile, nextQueueOffset, firstBatchSize);
futureList.add(Pair.of(firstBatchSize, future));
nextQueueOffset += firstBatchSize;
}
for (long i = 0; i < concurrency - flag; i++) {
- CompletableFuture<Long> future = prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset + i * requestBatchSize, requestBatchSize);
+ CompletableFuture<Long> future = prefetchMessageThenPutToCache(flatFile, nextQueueOffset + i * requestBatchSize, requestBatchSize);
futureList.add(Pair.of(requestBatchSize, future));
}
flatFile.putInflightRequest(group, queueOffset, maxCount * factor, futureList);
@@ -211,52 +178,52 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
}
}
- private CompletableFuture<Long> prefetchMessageThenPutToCache(CompositeQueueFlatFile flatFile, MessageQueue mq,
- long queueOffset, int batchSize) {
+ private CompletableFuture<Long> prefetchMessageThenPutToCache(
+ CompositeQueueFlatFile flatFile, long queueOffset, int batchSize) {
+
+ MessageQueue mq = flatFile.getMessageQueue();
return getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize)
- .thenApplyAsync(result -> {
- if (result.getStatus() != GetMessageStatus.FOUND) {
- LOGGER.warn("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed: topic: {}, queue: {}, queue offset: {}, batch size: {}, result: {}",
- mq.getTopic(), mq.getQueueId(), queueOffset, batchSize, result.getStatus());
- return -1L;
- }
- // put message into cache
- List<Long> offsetList = result.getMessageQueueOffset();
- List<SelectMappedBufferResult> msgList = result.getMessageMapedList();
- if (offsetList.size() != msgList.size()) {
- LOGGER.error("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed, result is illegal: topic: {}, queue: {}, queue offset: {}, batch size: {}, offsetList size: {}, msgList size: {}",
- mq.getTopic(), mq.getQueueId(), queueOffset, batchSize, offsetList.size(), msgList.size());
+ .thenApply(result -> {
+ if (result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE ||
+ result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
return -1L;
}
- if (offsetList.isEmpty()) {
- LOGGER.error("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed, result is FOUND but msgList is empty: topic: {}, queue: {}, queue offset: {}, batch size: {}",
- mq.getTopic(), mq.getQueueId(), queueOffset, batchSize);
+ if (result.getStatus() != GetMessageStatus.FOUND) {
+ LOGGER.warn("MessageFetcher prefetch message then put to cache failed, result: {}, " +
+ "topic: {}, queue: {}, queue offset: {}, batch size: {}",
+ result.getStatus(), mq.getTopic(), mq.getQueueId(), queueOffset, batchSize);
return -1L;
}
- Long minOffset = offsetList.get(0);
- Long maxOffset = offsetList.get(offsetList.size() - 1);
- int size = offsetList.size();
- for (int n = 0; n < offsetList.size(); n++) {
- putMessageToCache(flatFile, offsetList.get(n), msgList.get(n), minOffset, maxOffset, size);
- }
- if (size != batchSize || maxOffset != queueOffset + batchSize - 1) {
- LOGGER.warn("TieredMessageFetcher#prefetchAndPutMsgToCache: size not match: except: {}, actual: {}, queue offset: {}, min offset: {}, except offset: {}, max offset: {}",
- batchSize, size, queueOffset, minOffset, queueOffset + batchSize - 1, maxOffset);
+ try {
+ List<Long> offsetList = result.getMessageQueueOffset();
+ List<Long> tagCodeList = result.getTagCodeList();
+ List<SelectMappedBufferResult> msgList = result.getMessageMapedList();
+ for (int i = 0; i < offsetList.size(); i++) {
+ SelectMappedBufferResult msg = msgList.get(i);
+ SelectBufferResultWrapper bufferResult = new SelectBufferResultWrapper(
+ msg, offsetList.get(i), tagCodeList.get(i), false);
+ this.putMessageToCache(flatFile, bufferResult);
+ }
+ return offsetList.get(offsetList.size() - 1);
+ } catch (Exception e) {
+ LOGGER.error("MessageFetcher prefetch message then put to cache failed, " +
+ "topic: {}, queue: {}, queue offset: {}, batch size: {}",
+ mq.getTopic(), mq.getQueueId(), queueOffset, batchSize, e);
}
- return maxOffset;
- }, TieredStoreExecutor.fetchDataExecutor);
+ return -1L;
+ });
}
- public CompletableFuture<GetMessageResult> getMessageFromCacheAsync(CompositeQueueFlatFile flatFile,
+ public CompletableFuture<GetMessageResultExt> getMessageFromCacheAsync(CompositeQueueFlatFile flatFile,
String group, long queueOffset, int maxCount, boolean waitInflightRequest) {
MessageQueue mq = flatFile.getMessageQueue();
long lastGetOffset = queueOffset - 1;
- List<SelectMappedBufferResultWrapper> resultWrapperList = new ArrayList<>(maxCount);
+ List<SelectBufferResultWrapper> resultWrapperList = new ArrayList<>(maxCount);
for (int i = 0; i < maxCount; i++) {
lastGetOffset++;
- SelectMappedBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset);
+ SelectBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset);
if (wrapper == null) {
lastGetOffset--;
break;
@@ -281,19 +248,19 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
flatFile.getInflightRequest(group, queueOffset, maxCount).getFuture(queueOffset);
if (!future.isDone()) {
Stopwatch stopwatch = Stopwatch.createStarted();
- // to prevent starvation issues, only allow waiting for inflight request once
- return future.thenCompose(v -> {
+ // to prevent starvation issues, only allow waiting for processing request once
+ return future.thenComposeAsync(v -> {
LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: wait for response cost: {}ms",
stopwatch.elapsed(TimeUnit.MILLISECONDS));
return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, false);
- });
+ }, TieredStoreExecutor.fetchDataExecutor);
}
}
// try to get message from cache again when prefetch request is done
for (int i = 0; i < maxCount - resultWrapperList.size(); i++) {
lastGetOffset++;
- SelectMappedBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset);
+ SelectBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset);
if (wrapper == null) {
lastGetOffset--;
break;
@@ -303,74 +270,94 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
recordCacheAccess(flatFile, group, queueOffset, resultWrapperList);
- // if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests
- if (!resultWrapperList.isEmpty()) {
- LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " +
- "topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}",
- mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size());
- prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
+ if (resultWrapperList.isEmpty()) {
+ // If cache miss, pull messages immediately
+ LOGGER.info("MessageFetcher cache miss, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}",
+ group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
+ } else {
+ // If cache hit, return buffer result immediately and asynchronously prefetch messages
+ LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}, resultSize: {}",
+ group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size());
- GetMessageResult result = new GetMessageResult();
+ GetMessageResultExt result = new GetMessageResultExt();
result.setStatus(GetMessageStatus.FOUND);
result.setMinOffset(flatFile.getConsumeQueueMinOffset());
result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
result.setNextBeginOffset(queueOffset + resultWrapperList.size());
- resultWrapperList.forEach(wrapper -> result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset()));
+ resultWrapperList.forEach(wrapper -> result.addMessageExt(
+ wrapper.getDuplicateResult(), wrapper.getOffset(), wrapper.getTagCode()));
+
+ if (lastGetOffset < result.getMaxOffset()) {
+ this.prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
+ }
return CompletableFuture.completedFuture(result);
}
- // if cache is miss, immediately pull messages
- LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " +
- "topic: {}, queue: {}, queue offset: {}, max message num: {}",
- mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
-
- CompletableFuture<GetMessageResult> resultFuture;
+ CompletableFuture<GetMessageResultExt> resultFuture;
synchronized (flatFile) {
int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
resultFuture = getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize)
- .thenApplyAsync(result -> {
+ .thenApply(result -> {
if (result.getStatus() != GetMessageStatus.FOUND) {
return result;
}
- GetMessageResult newResult = new GetMessageResult();
- newResult.setStatus(GetMessageStatus.FOUND);
- newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
- newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
+ GetMessageResultExt newResult = new GetMessageResultExt();
List<Long> offsetList = result.getMessageQueueOffset();
+ List<Long> tagCodeList = result.getTagCodeList();
List<SelectMappedBufferResult> msgList = result.getMessageMapedList();
- Long minOffset = offsetList.get(0);
- Long maxOffset = offsetList.get(offsetList.size() - 1);
- int size = offsetList.size();
+
for (int i = 0; i < offsetList.size(); i++) {
- Long offset = offsetList.get(i);
SelectMappedBufferResult msg = msgList.get(i);
- // put message into cache
- SelectMappedBufferResultWrapper resultWrapper = putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true);
- // try to meet maxCount
+ SelectBufferResultWrapper bufferResult = new SelectBufferResultWrapper(
+ msg, offsetList.get(i), tagCodeList.get(i), true);
+ this.putMessageToCache(flatFile, bufferResult);
if (newResult.getMessageMapedList().size() < maxCount) {
- newResult.addMessage(resultWrapper.getDuplicateResult(), offset);
+ newResult.addMessageExt(msg, offsetList.get(i), tagCodeList.get(i));
}
}
+
+ newResult.setStatus(GetMessageStatus.FOUND);
+ newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
+ newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
newResult.setNextBeginOffset(queueOffset + newResult.getMessageMapedList().size());
return newResult;
- }, TieredStoreExecutor.fetchDataExecutor);
+ });
List<Pair<Integer, CompletableFuture<Long>>> futureList = new ArrayList<>();
CompletableFuture<Long> inflightRequestFuture = resultFuture.thenApply(result ->
- result.getStatus() == GetMessageStatus.FOUND ? result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : -1L);
+ result.getStatus() == GetMessageStatus.FOUND ?
+ result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : -1L);
futureList.add(Pair.of(batchSize, inflightRequestFuture));
flatFile.putInflightRequest(group, queueOffset, batchSize, futureList);
}
return resultFuture;
}
- public CompletableFuture<GetMessageResult> getMessageFromTieredStoreAsync(CompositeQueueFlatFile flatFile,
- long queueOffset, int batchSize) {
+ public CompletableFuture<GetMessageResultExt> getMessageFromTieredStoreAsync(
+ CompositeQueueFlatFile flatFile, long queueOffset, int batchSize) {
- GetMessageResult result = new GetMessageResult();
+ GetMessageResultExt result = new GetMessageResultExt();
result.setMinOffset(flatFile.getConsumeQueueMinOffset());
result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
+
+ if (queueOffset < result.getMaxOffset()) {
+ batchSize = Math.min(batchSize, (int) Math.min(result.getMaxOffset() - queueOffset, Integer.MAX_VALUE));
+ } else if (queueOffset == result.getMaxOffset()) {
+ result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
+ result.setNextBeginOffset(queueOffset);
+ return CompletableFuture.completedFuture(result);
+ } else if (queueOffset > result.getMaxOffset()) {
+ result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
+ result.setNextBeginOffset(result.getMaxOffset());
+ return CompletableFuture.completedFuture(result);
+ }
+
+ LOGGER.info("MessageFetcher#getMessageFromTieredStoreAsync, " +
+ "topic: {}, queueId: {}, broker offset: {}-{}, offset: {}, expect: {}",
+ flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(),
+ result.getMinOffset(), result.getMaxOffset(), queueOffset, batchSize);
+
CompletableFuture<ByteBuffer> readConsumeQueueFuture;
try {
readConsumeQueueFuture = flatFile.getConsumeQueueAsync(queueOffset, batchSize);
@@ -389,66 +376,56 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
}
}
- CompletableFuture<ByteBuffer> readCommitLogFuture = readConsumeQueueFuture.thenComposeAsync(cqBuffer -> {
+ CompletableFuture<ByteBuffer> readCommitLogFuture = readConsumeQueueFuture.thenCompose(cqBuffer -> {
long firstCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
cqBuffer.position(cqBuffer.remaining() - TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
long lastCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
if (lastCommitLogOffset < firstCommitLogOffset) {
- MessageQueue mq = flatFile.getMessageQueue();
- LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: message is not in order, try to fetch data in next store, topic: {}, queueId: {}, batch size: {}, queue offset {}",
- mq.getTopic(), mq.getQueueId(), batchSize, queueOffset);
- throw new TieredStoreException(TieredStoreErrorCode.ILLEGAL_OFFSET, "message is not in order");
+ LOGGER.error("MessageFetcher#getMessageFromTieredStoreAsync, " +
+ "last offset is smaller than first offset, " +
+ "topic: {} queueId: {}, offset: {}, firstOffset: {}, lastOffset: {}",
+ flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), queueOffset,
+ firstCommitLogOffset, lastCommitLogOffset);
+ return CompletableFuture.completedFuture(ByteBuffer.allocate(0));
}
- long length = lastCommitLogOffset - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer);
- // prevent OOM
- long originLength = length;
- while (cqBuffer.limit() > TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE && length > storeConfig.getReadAheadMessageSizeThreshold()) {
+ // Get the total size of the data by reducing the length limit of cq to prevent OOM
+ long length = lastCommitLogOffset - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer);
+ while (cqBuffer.limit() > TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE &&
+ length > storeConfig.getReadAheadMessageSizeThreshold()) {
cqBuffer.limit(cqBuffer.position());
cqBuffer.position(cqBuffer.limit() - TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
- length = CQItemBufferUtil.getCommitLogOffset(cqBuffer) - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer);
- }
-
- if (originLength != length) {
- MessageQueue mq = flatFile.getMessageQueue();
- LOGGER.info("TieredMessageFetcher#getMessageFromTieredStoreAsync: msg data is too large, topic: {}, queueId: {}, batch size: {}, fix it from {} to {}",
- mq.getTopic(), mq.getQueueId(), batchSize, originLength, length);
+ length = CQItemBufferUtil.getCommitLogOffset(cqBuffer)
+ - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer);
}
return flatFile.getCommitLogAsync(firstCommitLogOffset, (int) length);
- }, TieredStoreExecutor.fetchDataExecutor);
+ });
- return readConsumeQueueFuture.thenCombineAsync(readCommitLogFuture, (cqBuffer, msgBuffer) -> {
- List<Pair<Integer, Integer>> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
- if (!msgList.isEmpty()) {
- int requestSize = cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
+ int finalBatchSize = batchSize;
+ return readConsumeQueueFuture.thenCombine(readCommitLogFuture, (cqBuffer, msgBuffer) -> {
+ List<SelectBufferResult> bufferList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
+ int requestSize = cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
+ if (bufferList.isEmpty()) {
+ result.setStatus(GetMessageStatus.NO_MATCHED_MESSAGE);
+ result.setNextBeginOffset(queueOffset + requestSize);
+ } else {
result.setStatus(GetMessageStatus.FOUND);
- result.setNextBeginOffset(queueOffset + msgList.size());
- msgList.forEach(pair -> {
- msgBuffer.position(pair.getLeft());
- ByteBuffer slice = msgBuffer.slice();
- slice.limit(pair.getRight());
- result.addMessage(new SelectMappedBufferResult(pair.getLeft(), slice, pair.getRight(), null), MessageBufferUtil.getQueueOffset(slice));
- });
- if (requestSize != msgList.size()) {
- Set<Long> requestOffsetSet = new HashSet<>();
- for (int i = 0; i < requestSize; i++) {
- requestOffsetSet.add(queueOffset + i);
- }
- LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: split message buffer failed, batch size: {}, request message count: {}, actual message count: {}, these messages may lost: {}", batchSize, requestSize, msgList.size(), Sets.difference(requestOffsetSet, Sets.newHashSet(result.getMessageQueueOffset())));
- } else if (requestSize != batchSize) {
- LOGGER.debug("TieredMessageFetcher#getMessageFromTieredStoreAsync: message count does not meet batch size, maybe dispatch delay: batch size: {}, request message count: {}", batchSize, requestSize);
+ result.setNextBeginOffset(queueOffset + requestSize);
+
+ for (SelectBufferResult bufferResult : bufferList) {
+ ByteBuffer slice = bufferResult.getByteBuffer().slice();
+ slice.limit(bufferResult.getSize());
+ SelectMappedBufferResult msg = new SelectMappedBufferResult(bufferResult.getStartOffset(),
+ bufferResult.getByteBuffer(), bufferResult.getSize(), null);
+ result.addMessageExt(msg, MessageBufferUtil.getQueueOffset(slice), bufferResult.getTagCode());
}
- return result;
}
- long nextBeginOffset = queueOffset + cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
- LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: split message buffer failed, consume queue buffer size: {}, message buffer size: {}, change offset from {} to {}", cqBuffer.remaining(), msgBuffer.remaining(), queueOffset, nextBeginOffset);
- result.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
- result.setNextBeginOffset(nextBeginOffset);
return result;
- }, TieredStoreExecutor.fetchDataExecutor).exceptionally(e -> {
+ }).exceptionally(e -> {
MessageQueue mq = flatFile.getMessageQueue();
- LOGGER.warn("TieredMessageFetcher#getMessageFromTieredStoreAsync: get message failed: topic: {} queueId: {}", mq.getTopic(), mq.getQueueId(), e);
+ LOGGER.warn("MessageFetcher#getMessageFromTieredStoreAsync failed, " +
+ "topic: {} queueId: {}, offset: {}, batchSize: {}", mq.getTopic(), mq.getQueueId(), queueOffset, finalBatchSize, e);
result.setStatus(GetMessageStatus.OFFSET_FOUND_NULL);
result.setNextBeginOffset(queueOffset);
return result;
@@ -498,7 +475,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
return CompletableFuture.completedFuture(result);
}
- return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount);
+ return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, true)
+ .thenApply(messageResultExt -> messageResultExt.doFilterMessage(messageFilter));
}
@Override
@@ -546,7 +524,7 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
return flatFile.getOffsetInConsumeQueueByTime(timestamp, type);
} catch (Exception e) {
LOGGER.error("TieredMessageFetcher#getOffsetInQueueByTime: " +
- "get offset in queue by time failed: topic: {}, queue: {}, timestamp: {}, type: {}",
+ "get offset in queue by time failed: topic: {}, queue: {}, timestamp: {}, type: {}",
topic, queueId, timestamp, type, e);
}
return -1L;
@@ -598,7 +576,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result);
}).whenComplete((result, throwable) -> {
if (result != null) {
- LOGGER.info("MessageFetcher#queryMessageAsync, query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}",
+ LOGGER.info("MessageFetcher#queryMessageAsync, " +
+ "query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}",
result.getMessageBufferList().size(), topic, topicId, key, maxCount, begin, end);
}
});
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index edaa5d19f..015c27efa 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -213,8 +213,10 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
// so there is no need to update the maximum offset to the local cq offset here,
// otherwise it will cause repeated consumption after next begin offset over commit offset.
- logger.trace("GetMessageAsync result, group: {}, topic: {}, queueId: {}, offset: {}, count:{}, {}",
- group, topic, queueId, offset, maxMsgNums, result);
+ if (storeConfig.isRecordGetMessageResult()) {
+ logger.info("GetMessageAsync result, {}, group: {}, topic: {}, queueId: {}, offset: {}, count:{}",
+ result, group, topic, queueId, offset, maxMsgNums);
+ }
return result;
}).exceptionally(e -> {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java
new file mode 100644
index 000000000..52462b5dc
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
+
+public class GetMessageResultExt extends GetMessageResult {
+
+ private final List<Long> tagCodeList;
+
+ public GetMessageResultExt() {
+ this.tagCodeList = new ArrayList<>();
+ }
+
+ public void addMessageExt(SelectMappedBufferResult bufferResult, long queueOffset, long tagCode) {
+ super.addMessage(bufferResult, queueOffset);
+ this.tagCodeList.add(tagCode);
+ }
+
+ public List<Long> getTagCodeList() {
+ return tagCodeList;
+ }
+
+ public GetMessageResult doFilterMessage(MessageFilter messageFilter) {
+ if (GetMessageStatus.FOUND != super.getStatus() || messageFilter == null) {
+ return this;
+ }
+
+ GetMessageResult result = new GetMessageResult();
+ result.setStatus(GetMessageStatus.FOUND);
+ result.setMinOffset(this.getMinOffset());
+ result.setMaxOffset(this.getMaxOffset());
+ result.setNextBeginOffset(this.getNextBeginOffset());
+
+ for (int i = 0; i < this.getMessageMapedList().size(); i++) {
+ if (!messageFilter.isMatchedByConsumeQueue(this.tagCodeList.get(i), null)) {
+ continue;
+ }
+
+ SelectMappedBufferResult bufferResult = this.getMessageMapedList().get(i);
+ if (!messageFilter.isMatchedByCommitLog(bufferResult.getByteBuffer().slice(), null)) {
+ continue;
+ }
+
+ result.addMessage(new SelectMappedBufferResult(bufferResult.getStartOffset(),
+ bufferResult.getByteBuffer(), bufferResult.getSize(), null),
+ MessageBufferUtil.getQueueOffset(bufferResult.getByteBuffer()));
+ }
+
+ if (result.getBufferTotalSize() == 0) {
+ result.setStatus(GetMessageStatus.NO_MATCHED_MESSAGE);
+ }
+ return result;
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
new file mode 100644
index 000000000..d265ed0fc
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.common;
+
+import java.nio.ByteBuffer;
+
+public class SelectBufferResult {
+
+ private final ByteBuffer byteBuffer;
+ private final long startOffset;
+ private final int size;
+ private final long tagCode;
+
+ public SelectBufferResult(ByteBuffer byteBuffer, long startOffset, int size, long tagCode) {
+ this.startOffset = startOffset;
+ this.byteBuffer = byteBuffer;
+ this.size = size;
+ this.tagCode = tagCode;
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return byteBuffer;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public long getTagCode() {
+ return tagCode;
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java
similarity index 55%
rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java
rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java
index af0785f71..4f9f00a07 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java
@@ -16,32 +16,21 @@
*/
package org.apache.rocketmq.tieredstore.common;
-import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.store.SelectMappedBufferResult;
-public class SelectMappedBufferResultWrapper {
+public class SelectBufferResultWrapper {
private final SelectMappedBufferResult result;
- private final LongAdder accessCount;
-
- private final long curOffset;
- private final long minOffset;
- private final long maxOffset;
- private final long size;
-
- public SelectMappedBufferResultWrapper(
- SelectMappedBufferResult result, long curOffset, long minOffset, long maxOffset, long size) {
+ private final long offset;
+ private final long tagCode;
+ private final AtomicInteger accessCount;
+ public SelectBufferResultWrapper(SelectMappedBufferResult result, long offset, long tagCode, boolean used) {
this.result = result;
- this.accessCount = new LongAdder();
- this.curOffset = curOffset;
- this.minOffset = minOffset;
- this.maxOffset = maxOffset;
- this.size = size;
- }
-
- public SelectMappedBufferResult getResult() {
- return result;
+ this.offset = offset;
+ this.tagCode = tagCode;
+ this.accessCount = new AtomicInteger(used ? 1 : 0);
}
public SelectMappedBufferResult getDuplicateResult() {
@@ -53,27 +42,23 @@ public class SelectMappedBufferResultWrapper {
result.getMappedFile());
}
- public long getCurOffset() {
- return curOffset;
- }
-
- public long getMinOffset() {
- return minOffset;
+ public long getOffset() {
+ return offset;
}
- public long getMaxOffset() {
- return maxOffset;
+ public int getBufferSize() {
+ return this.result.getSize();
}
- public long getSize() {
- return size;
+ public long getTagCode() {
+ return tagCode;
}
- public void addAccessCount() {
- accessCount.increment();
+ public int incrementAndGet() {
+ return accessCount.incrementAndGet();
}
- public long getAccessCount() {
- return accessCount.sum();
+ public int getAccessCount() {
+ return accessCount.get();
}
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
index 595db6b86..b0750e550 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
@@ -82,6 +82,7 @@ public class TieredMessageStoreConfig {
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
private boolean messageIndexEnable = true;
+ private boolean recordGetMessageResult = false;
// CommitLog file size, default is 1G
private long tieredStoreCommitLogMaxSize = 1024 * 1024 * 1024;
@@ -182,6 +183,14 @@ public class TieredMessageStoreConfig {
this.messageIndexEnable = messageIndexEnable;
}
+ public boolean isRecordGetMessageResult() {
+ return recordGetMessageResult;
+ }
+
+ public void setRecordGetMessageResult(boolean recordGetMessageResult) {
+ this.recordGetMessageResult = recordGetMessageResult;
+ }
+
public long getTieredStoreCommitLogMaxSize() {
return tieredStoreCommitLogMaxSize;
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index d8a07f0a7..2b9fc59d8 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -46,7 +46,7 @@ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.tieredstore.TieredMessageFetcher;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.MessageCacheKey;
-import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
@@ -265,7 +265,7 @@ public class TieredStoreMetricsManager {
.setUnit("bytes")
.ofLongs()
.buildWithCallback(measurement -> {
- Optional<Policy.Eviction<MessageCacheKey, SelectMappedBufferResultWrapper>> eviction = fetcher.getMessageCache().policy().eviction();
+ Optional<Policy.Eviction<MessageCacheKey, SelectBufferResultWrapper>> eviction = fetcher.getMessageCache().policy().eviction();
eviction.ifPresent(resultEviction -> measurement.record(resultEviction.weightedSize().orElse(0), newAttributesBuilder().build()));
});
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
index aad42de98..5e3d8c562 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
@@ -295,7 +295,7 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
return future;
}
if (position + length > commitPosition) {
- logger.warn("TieredFileSegment#readAsync request position + length is greater than commit position," +
+ logger.debug("TieredFileSegment#readAsync request position + length is greater than commit position," +
" correct length using commit position, file: {}, request position: {}, commit position:{}, change length from {} to {}",
getPath(), position, commitPosition, length, commitPosition - position);
length = (int) (commitPosition - position);
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java
index 6db45a747..2c4a6e578 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java
@@ -20,11 +20,11 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
@@ -113,53 +113,72 @@ public class MessageBufferUtil {
return MessageDecoder.decodeProperties(slice);
}
- public static List<Pair<Integer/* offset of msgBuffer */, Integer/* msg size */>> splitMessageBuffer(
- ByteBuffer cqBuffer, ByteBuffer msgBuffer) {
+ public static List<SelectBufferResult> splitMessageBuffer(ByteBuffer cqBuffer, ByteBuffer msgBuffer) {
+
cqBuffer.rewind();
msgBuffer.rewind();
- List<Pair<Integer, Integer>> messageList = new ArrayList<>(cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+
+ List<SelectBufferResult> bufferResultList = new ArrayList<>(
+ cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+
+ if (msgBuffer.remaining() == 0) {
+ logger.error("MessageBufferUtil#splitMessage, msg buffer length is zero");
+ return bufferResultList;
+ }
+
if (cqBuffer.remaining() % TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE != 0) {
- logger.warn("MessageBufferUtil#splitMessage: consume queue buffer size {} is not an integer multiple of CONSUME_QUEUE_STORE_UNIT_SIZE {}",
- cqBuffer.remaining(), TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
- return messageList;
+ logger.error("MessageBufferUtil#splitMessage, consume queue buffer size incorrect, {}", cqBuffer.remaining());
+ return bufferResultList;
}
+
try {
- long startCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
- for (int pos = cqBuffer.position(); pos < cqBuffer.limit(); pos += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) {
- cqBuffer.position(pos);
- int diff = (int) (CQItemBufferUtil.getCommitLogOffset(cqBuffer) - startCommitLogOffset);
- int size = CQItemBufferUtil.getSize(cqBuffer);
- if (diff + size > msgBuffer.limit()) {
- logger.error("MessageBufferUtil#splitMessage: message buffer size is incorrect: record in consume queue: {}, actual: {}", diff + size, msgBuffer.remaining());
- return messageList;
+ long firstCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
+
+ for (int position = cqBuffer.position(); position < cqBuffer.limit();
+ position += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) {
+
+ cqBuffer.position(position);
+ long logOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
+ int bufferSize = CQItemBufferUtil.getSize(cqBuffer);
+ long tagCode = CQItemBufferUtil.getTagCode(cqBuffer);
+
+ int offset = (int) (logOffset - firstCommitLogOffset);
+ if (offset + bufferSize > msgBuffer.limit()) {
+ logger.error("MessageBufferUtil#splitMessage, message buffer size incorrect. " +
+ "Expect length in consume queue: {}, actual length: {}", offset + bufferSize, msgBuffer.limit());
+ break;
}
- msgBuffer.position(diff);
+ msgBuffer.position(offset);
int magicCode = getMagicCode(msgBuffer);
if (magicCode == TieredCommitLog.BLANK_MAGIC_CODE) {
- logger.warn("MessageBufferUtil#splitMessage: message decode error: blank magic code, this message may be coda, try to fix offset");
- diff = diff + TieredCommitLog.CODA_SIZE;
- msgBuffer.position(diff);
+ offset += TieredCommitLog.CODA_SIZE;
+ msgBuffer.position(offset);
magicCode = getMagicCode(msgBuffer);
}
- if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
- logger.warn("MessageBufferUtil#splitMessage: message decode error: unknown magic code");
+ if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE &&
+ magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
+ logger.warn("MessageBufferUtil#splitMessage, found unknown magic code. " +
+ "Message offset: {}, wrong magic code: {}", offset, magicCode);
continue;
}
- if (getTotalSize(msgBuffer) != size) {
- logger.warn("MessageBufferUtil#splitMessage: message size is not right: except: {}, actual: {}", size, getTotalSize(msgBuffer));
+ if (bufferSize != getTotalSize(msgBuffer)) {
+ logger.warn("MessageBufferUtil#splitMessage, message length in commitlog incorrect. " +
+ "Except length in commitlog: {}, actual: {}", getTotalSize(msgBuffer), bufferSize);
continue;
}
- messageList.add(Pair.of(diff, size));
+ ByteBuffer sliceBuffer = msgBuffer.slice();
+ sliceBuffer.limit(bufferSize);
+ bufferResultList.add(new SelectBufferResult(sliceBuffer, offset, bufferSize, tagCode));
}
} catch (Exception e) {
- logger.error("MessageBufferUtil#splitMessage: split message failed, maybe decode consume queue item failed", e);
+ logger.error("MessageBufferUtil#splitMessage, split message buffer error", e);
} finally {
cqBuffer.rewind();
msgBuffer.rewind();
}
- return messageList;
+ return bufferResultList;
}
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index 4e0d7e697..4e8287533 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -31,7 +31,7 @@ import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
@@ -143,17 +143,18 @@ public class TieredMessageFetcherTest {
fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, new ArrayList<>());
Assert.assertEquals(0, fetcher.getMessageCache().estimatedSize());
- fetcher.putMessageToCache(flatFile, 0, new SelectMappedBufferResult(0, msg1, msg1.remaining(), null), 0, 0, 1);
+ SelectMappedBufferResult bufferResult = new SelectMappedBufferResult(0, msg1, msg1.remaining(), null);
+ fetcher.putMessageToCache(flatFile, new SelectBufferResultWrapper(bufferResult, 0, 0, false));
Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize());
- GetMessageResult getMessageResult = fetcher.getMessageFromCacheAsync(flatFile, "group", 0, 32).join();
+ GetMessageResult getMessageResult = fetcher.getMessageFromCacheAsync(flatFile, "group", 0, 32, true).join();
Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
Assert.assertEquals(1, getMessageResult.getMessageBufferList().size());
Assert.assertEquals(msg1, getMessageResult.getMessageBufferList().get(0));
Awaitility.waitAtMost(3, TimeUnit.SECONDS)
.until(() -> fetcher.getMessageCache().estimatedSize() == 2);
- ArrayList<SelectMappedBufferResultWrapper> wrapperList = new ArrayList<>();
+ ArrayList<SelectBufferResultWrapper> wrapperList = new ArrayList<>();
wrapperList.add(fetcher.getMessageFromCache(flatFile, 0));
fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, wrapperList);
Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize());
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java
new file mode 100644
index 000000000..deb8770d2
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.common;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class GetMessageResultExtTest {
+
+ @Test
+ public void doFilterTest() {
+ GetMessageResultExt resultExt = new GetMessageResultExt();
+ Assert.assertEquals(0, resultExt.doFilterMessage(null).getMessageCount());
+ resultExt.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
+ Assert.assertEquals(0, resultExt.doFilterMessage(null).getMessageCount());
+ resultExt.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
+ Assert.assertEquals(0, resultExt.doFilterMessage(null).getMessageCount());
+
+ resultExt.addMessageExt(new SelectMappedBufferResult(
+ 1000L, MessageBufferUtilTest.buildMockedMessageBuffer(), 100, null),
+ 0, "TagA".hashCode());
+ resultExt.addMessageExt(new SelectMappedBufferResult(
+ 2000L, MessageBufferUtilTest.buildMockedMessageBuffer(), 100, null),
+ 0, "TagB".hashCode());
+ assertEquals(2, resultExt.getMessageCount());
+
+ resultExt.setStatus(GetMessageStatus.FOUND);
+ GetMessageResult getMessageResult = resultExt.doFilterMessage(new MessageFilter() {
+ @Override
+ public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
+ return false;
+ }
+
+ @Override
+ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
+ return false;
+ }
+ });
+ Assert.assertEquals(0, getMessageResult.getMessageCount());
+ }
+}
\ No newline at end of file
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java
new file mode 100644
index 000000000..b7e6e639f
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.common;
+
+import java.nio.ByteBuffer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SelectBufferResultTest {
+ @Test
+ public void testSelectBufferResult() {
+ ByteBuffer buffer = ByteBuffer.allocate(10);
+ long startOffset = 5L;
+ int size = 10;
+ long tagCode = 1L;
+
+ SelectBufferResult result = new SelectBufferResult(buffer, startOffset, size, tagCode);
+ Assert.assertEquals(buffer, result.getByteBuffer());
+ Assert.assertEquals(startOffset, result.getStartOffset());
+ Assert.assertEquals(size, result.getSize());
+ Assert.assertEquals(tagCode, result.getTagCode());
+ }
+}
\ No newline at end of file
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
index 68277cacc..a0b438948 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
@@ -22,9 +22,9 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
import org.junit.Assert;
@@ -206,10 +206,12 @@ public class MessageBufferUtilTest {
cqBuffer.flip();
cqBuffer1.rewind();
cqBuffer2.rewind();
- List<Pair<Integer, Integer>> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
+ List<SelectBufferResult> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
Assert.assertEquals(2, msgList.size());
- Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
- Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1));
+ Assert.assertEquals(0, msgList.get(0).getStartOffset());
+ Assert.assertEquals(MSG_LEN, msgList.get(0).getSize());
+ Assert.assertEquals(MSG_LEN + TieredCommitLog.CODA_SIZE, msgList.get(1).getStartOffset());
+ Assert.assertEquals(MSG_LEN, msgList.get(1).getSize());
cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2);
cqBuffer.put(cqBuffer1);
@@ -219,7 +221,8 @@ public class MessageBufferUtilTest {
cqBuffer4.rewind();
msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
Assert.assertEquals(1, msgList.size());
- Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
+ Assert.assertEquals(0, msgList.get(0).getStartOffset());
+ Assert.assertEquals(MSG_LEN, msgList.get(0).getSize());
cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 3);
cqBuffer.put(cqBuffer1);
@@ -227,8 +230,10 @@ public class MessageBufferUtilTest {
cqBuffer.flip();
msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
Assert.assertEquals(2, msgList.size());
- Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
- Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1));
+ Assert.assertEquals(0, msgList.get(0).getStartOffset());
+ Assert.assertEquals(MSG_LEN, msgList.get(0).getSize());
+ Assert.assertEquals(MSG_LEN + TieredCommitLog.CODA_SIZE, msgList.get(1).getStartOffset());
+ Assert.assertEquals(MSG_LEN, msgList.get(1).getSize());
cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
cqBuffer.put(cqBuffer5);
--
2.32.0.windows.2