1197 lines
66 KiB
Diff
1197 lines
66 KiB
Diff
From aec1055830e78f7e710e32ebd467f9f7d208855d Mon Sep 17 00:00:00 2001
|
|
From: lizhimins <707364882@qq.com>
|
|
Date: Mon, 4 Dec 2023 16:12:42 +0800
|
|
Subject: [PATCH] [ISSUE #7585] Support message filtering in rocketmq tiered
|
|
storage (#7594)
|
|
|
|
---
|
|
.../tieredstore/TieredMessageFetcher.java | 325 ++++++++----------
|
|
.../tieredstore/TieredMessageStore.java | 6 +-
|
|
.../common/GetMessageResultExt.java | 76 ++++
|
|
.../common/SelectBufferResult.java | 51 +++
|
|
...er.java => SelectBufferResultWrapper.java} | 53 +--
|
|
.../common/TieredMessageStoreConfig.java | 9 +
|
|
.../metrics/TieredStoreMetricsManager.java | 4 +-
|
|
.../provider/TieredFileSegment.java | 2 +-
|
|
.../tieredstore/util/MessageBufferUtil.java | 71 ++--
|
|
.../tieredstore/TieredMessageFetcherTest.java | 9 +-
|
|
.../common/GetMessageResultExtTest.java | 65 ++++
|
|
.../common/SelectBufferResultTest.java | 37 ++
|
|
.../util/MessageBufferUtilTest.java | 19 +-
|
|
13 files changed, 478 insertions(+), 249 deletions(-)
|
|
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java
|
|
create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
|
|
rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/{SelectMappedBufferResultWrapper.java => SelectBufferResultWrapper.java} (55%)
|
|
create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java
|
|
create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
|
index f739773eb..7b0c47c59 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
|
@@ -19,17 +19,14 @@ package org.apache.rocketmq.tieredstore;
|
|
import com.github.benmanes.caffeine.cache.Cache;
|
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
|
import com.github.benmanes.caffeine.cache.Scheduler;
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.base.Stopwatch;
|
|
-import com.google.common.collect.Sets;
|
|
import io.opentelemetry.api.common.Attributes;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.ArrayList;
|
|
-import java.util.HashSet;
|
|
import java.util.List;
|
|
-import java.util.Set;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.TimeUnit;
|
|
-import javax.annotation.Nullable;
|
|
import org.apache.commons.lang3.tuple.Pair;
|
|
import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.common.message.MessageQueue;
|
|
@@ -40,12 +37,13 @@ import org.apache.rocketmq.store.GetMessageStatus;
|
|
import org.apache.rocketmq.store.MessageFilter;
|
|
import org.apache.rocketmq.store.QueryMessageResult;
|
|
import org.apache.rocketmq.store.SelectMappedBufferResult;
|
|
+import org.apache.rocketmq.tieredstore.common.GetMessageResultExt;
|
|
import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture;
|
|
import org.apache.rocketmq.tieredstore.common.MessageCacheKey;
|
|
-import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
|
|
+import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
|
|
+import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper;
|
|
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
|
|
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
|
|
-import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
|
|
import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
|
|
import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
|
|
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
|
|
@@ -66,10 +64,10 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
private static final Logger LOGGER = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
|
|
|
|
private final String brokerName;
|
|
- private final TieredMessageStoreConfig storeConfig;
|
|
private final TieredMetadataStore metadataStore;
|
|
+ private final TieredMessageStoreConfig storeConfig;
|
|
private final TieredFlatFileManager flatFileManager;
|
|
- private final Cache<MessageCacheKey, SelectMappedBufferResultWrapper> readAheadCache;
|
|
+ private final Cache<MessageCacheKey, SelectBufferResultWrapper> readAheadCache;
|
|
|
|
public TieredMessageFetcher(TieredMessageStoreConfig storeConfig) {
|
|
this.storeConfig = storeConfig;
|
|
@@ -79,7 +77,7 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
this.readAheadCache = this.initCache(storeConfig);
|
|
}
|
|
|
|
- private Cache<MessageCacheKey, SelectMappedBufferResultWrapper> initCache(TieredMessageStoreConfig storeConfig) {
|
|
+ private Cache<MessageCacheKey, SelectBufferResultWrapper> initCache(TieredMessageStoreConfig storeConfig) {
|
|
long memoryMaxSize =
|
|
(long) (Runtime.getRuntime().maxMemory() * storeConfig.getReadAheadCacheSizeThresholdRate());
|
|
|
|
@@ -88,60 +86,35 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
.expireAfterWrite(storeConfig.getReadAheadCacheExpireDuration(), TimeUnit.MILLISECONDS)
|
|
.maximumWeight(memoryMaxSize)
|
|
// Using the buffer size of messages to calculate memory usage
|
|
- .weigher((MessageCacheKey key, SelectMappedBufferResultWrapper msg) -> msg.getDuplicateResult().getSize())
|
|
+ .weigher((MessageCacheKey key, SelectBufferResultWrapper msg) -> msg.getBufferSize())
|
|
.recordStats()
|
|
.build();
|
|
}
|
|
|
|
- protected SelectMappedBufferResultWrapper putMessageToCache(CompositeFlatFile flatFile,
|
|
- long queueOffset, SelectMappedBufferResult result, long minOffset, long maxOffset, int size) {
|
|
-
|
|
- return putMessageToCache(flatFile, queueOffset, result, minOffset, maxOffset, size, false);
|
|
- }
|
|
-
|
|
- protected SelectMappedBufferResultWrapper putMessageToCache(CompositeFlatFile flatFile,
|
|
- long queueOffset, SelectMappedBufferResult result, long minOffset, long maxOffset, int size, boolean used) {
|
|
-
|
|
- SelectMappedBufferResultWrapper wrapper =
|
|
- new SelectMappedBufferResultWrapper(result, queueOffset, minOffset, maxOffset, size);
|
|
- if (used) {
|
|
- wrapper.addAccessCount();
|
|
- }
|
|
- readAheadCache.put(new MessageCacheKey(flatFile, queueOffset), wrapper);
|
|
- return wrapper;
|
|
- }
|
|
-
|
|
- // Visible for metrics monitor
|
|
- public Cache<MessageCacheKey, SelectMappedBufferResultWrapper> getMessageCache() {
|
|
+ @VisibleForTesting
|
|
+ public Cache<MessageCacheKey, SelectBufferResultWrapper> getMessageCache() {
|
|
return readAheadCache;
|
|
}
|
|
|
|
- // Waiting for the request in transit to complete
|
|
- protected CompletableFuture<GetMessageResult> getMessageFromCacheAsync(
|
|
- CompositeQueueFlatFile flatFile, String group, long queueOffset, int maxCount) {
|
|
-
|
|
- return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, true);
|
|
+ protected void putMessageToCache(CompositeFlatFile flatFile, SelectBufferResultWrapper result) {
|
|
+ readAheadCache.put(new MessageCacheKey(flatFile, result.getOffset()), result);
|
|
}
|
|
|
|
- @Nullable
|
|
- protected SelectMappedBufferResultWrapper getMessageFromCache(CompositeFlatFile flatFile, long queueOffset) {
|
|
- MessageCacheKey cacheKey = new MessageCacheKey(flatFile, queueOffset);
|
|
- return readAheadCache.getIfPresent(cacheKey);
|
|
+ protected SelectBufferResultWrapper getMessageFromCache(CompositeFlatFile flatFile, long offset) {
|
|
+ return readAheadCache.getIfPresent(new MessageCacheKey(flatFile, offset));
|
|
}
|
|
|
|
- protected void recordCacheAccess(CompositeFlatFile flatFile, String group, long queueOffset,
|
|
- List<SelectMappedBufferResultWrapper> resultWrapperList) {
|
|
- if (resultWrapperList.size() > 0) {
|
|
- queueOffset = resultWrapperList.get(resultWrapperList.size() - 1).getCurOffset();
|
|
+ protected void recordCacheAccess(CompositeFlatFile flatFile,
|
|
+ String group, long offset, List<SelectBufferResultWrapper> resultWrapperList) {
|
|
+ if (!resultWrapperList.isEmpty()) {
|
|
+ offset = resultWrapperList.get(resultWrapperList.size() - 1).getOffset();
|
|
}
|
|
- flatFile.recordGroupAccess(group, queueOffset);
|
|
- for (SelectMappedBufferResultWrapper wrapper : resultWrapperList) {
|
|
- wrapper.addAccessCount();
|
|
- if (wrapper.getAccessCount() >= flatFile.getActiveGroupCount()) {
|
|
- MessageCacheKey cacheKey = new MessageCacheKey(flatFile, wrapper.getCurOffset());
|
|
- readAheadCache.invalidate(cacheKey);
|
|
+ flatFile.recordGroupAccess(group, offset);
|
|
+ resultWrapperList.forEach(wrapper -> {
|
|
+ if (wrapper.incrementAndGet() >= flatFile.getActiveGroupCount()) {
|
|
+ readAheadCache.invalidate(new MessageCacheKey(flatFile, wrapper.getOffset()));
|
|
}
|
|
- }
|
|
+ });
|
|
}
|
|
|
|
private void prefetchMessage(CompositeQueueFlatFile flatFile, String group, int maxCount, long nextBeginOffset) {
|
|
@@ -149,7 +122,6 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
return;
|
|
}
|
|
|
|
- MessageQueue mq = flatFile.getMessageQueue();
|
|
// make sure there is only one request per group and request range
|
|
int prefetchBatchSize = Math.min(maxCount * flatFile.getReadAheadFactor(), storeConfig.getReadAheadMessageCountThreshold());
|
|
InFlightRequestFuture inflightRequest = flatFile.getInflightRequest(group, nextBeginOffset, prefetchBatchSize);
|
|
@@ -166,13 +138,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
long maxOffsetOfLastRequest = inflightRequest.getLastFuture().join();
|
|
boolean lastRequestIsExpired = getMessageFromCache(flatFile, nextBeginOffset) == null;
|
|
|
|
- // if message fetch by last request is expired, we need to prefetch the message from tiered store
|
|
- int cacheRemainCount = (int) (maxOffsetOfLastRequest - nextBeginOffset);
|
|
- LOGGER.debug("TieredMessageFetcher#preFetchMessage: group={}, nextBeginOffset={}, maxOffsetOfLastRequest={}, lastRequestIsExpired={}, cacheRemainCount={}",
|
|
- group, nextBeginOffset, maxOffsetOfLastRequest, lastRequestIsExpired, cacheRemainCount);
|
|
-
|
|
- if (lastRequestIsExpired
|
|
- || maxOffsetOfLastRequest != -1L && nextBeginOffset >= inflightRequest.getStartOffset()) {
|
|
+ if (lastRequestIsExpired ||
|
|
+ maxOffsetOfLastRequest != -1L && nextBeginOffset >= inflightRequest.getStartOffset()) {
|
|
|
|
long queueOffset;
|
|
if (lastRequestIsExpired) {
|
|
@@ -196,12 +163,12 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
long nextQueueOffset = queueOffset;
|
|
if (flag == 1) {
|
|
int firstBatchSize = factor % storeConfig.getReadAheadBatchSizeFactorThreshold() * maxCount;
|
|
- CompletableFuture<Long> future = prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset, firstBatchSize);
|
|
+ CompletableFuture<Long> future = prefetchMessageThenPutToCache(flatFile, nextQueueOffset, firstBatchSize);
|
|
futureList.add(Pair.of(firstBatchSize, future));
|
|
nextQueueOffset += firstBatchSize;
|
|
}
|
|
for (long i = 0; i < concurrency - flag; i++) {
|
|
- CompletableFuture<Long> future = prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset + i * requestBatchSize, requestBatchSize);
|
|
+ CompletableFuture<Long> future = prefetchMessageThenPutToCache(flatFile, nextQueueOffset + i * requestBatchSize, requestBatchSize);
|
|
futureList.add(Pair.of(requestBatchSize, future));
|
|
}
|
|
flatFile.putInflightRequest(group, queueOffset, maxCount * factor, futureList);
|
|
@@ -211,52 +178,52 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
}
|
|
}
|
|
|
|
- private CompletableFuture<Long> prefetchMessageThenPutToCache(CompositeQueueFlatFile flatFile, MessageQueue mq,
|
|
- long queueOffset, int batchSize) {
|
|
+ private CompletableFuture<Long> prefetchMessageThenPutToCache(
|
|
+ CompositeQueueFlatFile flatFile, long queueOffset, int batchSize) {
|
|
+
|
|
+ MessageQueue mq = flatFile.getMessageQueue();
|
|
return getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize)
|
|
- .thenApplyAsync(result -> {
|
|
- if (result.getStatus() != GetMessageStatus.FOUND) {
|
|
- LOGGER.warn("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed: topic: {}, queue: {}, queue offset: {}, batch size: {}, result: {}",
|
|
- mq.getTopic(), mq.getQueueId(), queueOffset, batchSize, result.getStatus());
|
|
- return -1L;
|
|
- }
|
|
- // put message into cache
|
|
- List<Long> offsetList = result.getMessageQueueOffset();
|
|
- List<SelectMappedBufferResult> msgList = result.getMessageMapedList();
|
|
- if (offsetList.size() != msgList.size()) {
|
|
- LOGGER.error("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed, result is illegal: topic: {}, queue: {}, queue offset: {}, batch size: {}, offsetList size: {}, msgList size: {}",
|
|
- mq.getTopic(), mq.getQueueId(), queueOffset, batchSize, offsetList.size(), msgList.size());
|
|
+ .thenApply(result -> {
|
|
+ if (result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE ||
|
|
+ result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
|
|
return -1L;
|
|
}
|
|
- if (offsetList.isEmpty()) {
|
|
- LOGGER.error("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed, result is FOUND but msgList is empty: topic: {}, queue: {}, queue offset: {}, batch size: {}",
|
|
- mq.getTopic(), mq.getQueueId(), queueOffset, batchSize);
|
|
+ if (result.getStatus() != GetMessageStatus.FOUND) {
|
|
+ LOGGER.warn("MessageFetcher prefetch message then put to cache failed, result: {}, " +
|
|
+ "topic: {}, queue: {}, queue offset: {}, batch size: {}",
|
|
+ result.getStatus(), mq.getTopic(), mq.getQueueId(), queueOffset, batchSize);
|
|
return -1L;
|
|
}
|
|
- Long minOffset = offsetList.get(0);
|
|
- Long maxOffset = offsetList.get(offsetList.size() - 1);
|
|
- int size = offsetList.size();
|
|
- for (int n = 0; n < offsetList.size(); n++) {
|
|
- putMessageToCache(flatFile, offsetList.get(n), msgList.get(n), minOffset, maxOffset, size);
|
|
- }
|
|
- if (size != batchSize || maxOffset != queueOffset + batchSize - 1) {
|
|
- LOGGER.warn("TieredMessageFetcher#prefetchAndPutMsgToCache: size not match: except: {}, actual: {}, queue offset: {}, min offset: {}, except offset: {}, max offset: {}",
|
|
- batchSize, size, queueOffset, minOffset, queueOffset + batchSize - 1, maxOffset);
|
|
+ try {
|
|
+ List<Long> offsetList = result.getMessageQueueOffset();
|
|
+ List<Long> tagCodeList = result.getTagCodeList();
|
|
+ List<SelectMappedBufferResult> msgList = result.getMessageMapedList();
|
|
+ for (int i = 0; i < offsetList.size(); i++) {
|
|
+ SelectMappedBufferResult msg = msgList.get(i);
|
|
+ SelectBufferResultWrapper bufferResult = new SelectBufferResultWrapper(
|
|
+ msg, offsetList.get(i), tagCodeList.get(i), false);
|
|
+ this.putMessageToCache(flatFile, bufferResult);
|
|
+ }
|
|
+ return offsetList.get(offsetList.size() - 1);
|
|
+ } catch (Exception e) {
|
|
+ LOGGER.error("MessageFetcher prefetch message then put to cache failed, " +
|
|
+ "topic: {}, queue: {}, queue offset: {}, batch size: {}",
|
|
+ mq.getTopic(), mq.getQueueId(), queueOffset, batchSize, e);
|
|
}
|
|
- return maxOffset;
|
|
- }, TieredStoreExecutor.fetchDataExecutor);
|
|
+ return -1L;
|
|
+ });
|
|
}
|
|
|
|
- public CompletableFuture<GetMessageResult> getMessageFromCacheAsync(CompositeQueueFlatFile flatFile,
|
|
+ public CompletableFuture<GetMessageResultExt> getMessageFromCacheAsync(CompositeQueueFlatFile flatFile,
|
|
String group, long queueOffset, int maxCount, boolean waitInflightRequest) {
|
|
|
|
MessageQueue mq = flatFile.getMessageQueue();
|
|
|
|
long lastGetOffset = queueOffset - 1;
|
|
- List<SelectMappedBufferResultWrapper> resultWrapperList = new ArrayList<>(maxCount);
|
|
+ List<SelectBufferResultWrapper> resultWrapperList = new ArrayList<>(maxCount);
|
|
for (int i = 0; i < maxCount; i++) {
|
|
lastGetOffset++;
|
|
- SelectMappedBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset);
|
|
+ SelectBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset);
|
|
if (wrapper == null) {
|
|
lastGetOffset--;
|
|
break;
|
|
@@ -281,19 +248,19 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
flatFile.getInflightRequest(group, queueOffset, maxCount).getFuture(queueOffset);
|
|
if (!future.isDone()) {
|
|
Stopwatch stopwatch = Stopwatch.createStarted();
|
|
- // to prevent starvation issues, only allow waiting for inflight request once
|
|
- return future.thenCompose(v -> {
|
|
+ // to prevent starvation issues, only allow waiting for processing request once
|
|
+ return future.thenComposeAsync(v -> {
|
|
LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: wait for response cost: {}ms",
|
|
stopwatch.elapsed(TimeUnit.MILLISECONDS));
|
|
return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, false);
|
|
- });
|
|
+ }, TieredStoreExecutor.fetchDataExecutor);
|
|
}
|
|
}
|
|
|
|
// try to get message from cache again when prefetch request is done
|
|
for (int i = 0; i < maxCount - resultWrapperList.size(); i++) {
|
|
lastGetOffset++;
|
|
- SelectMappedBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset);
|
|
+ SelectBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset);
|
|
if (wrapper == null) {
|
|
lastGetOffset--;
|
|
break;
|
|
@@ -303,74 +270,94 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
|
|
recordCacheAccess(flatFile, group, queueOffset, resultWrapperList);
|
|
|
|
- // if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests
|
|
- if (!resultWrapperList.isEmpty()) {
|
|
- LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " +
|
|
- "topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}",
|
|
- mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size());
|
|
- prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
|
|
+ if (resultWrapperList.isEmpty()) {
|
|
+ // If cache miss, pull messages immediately
|
|
+ LOGGER.info("MessageFetcher cache miss, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}",
|
|
+ group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
|
|
+ } else {
|
|
+ // If cache hit, return buffer result immediately and asynchronously prefetch messages
|
|
+ LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}, resultSize: {}",
|
|
+ group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size());
|
|
|
|
- GetMessageResult result = new GetMessageResult();
|
|
+ GetMessageResultExt result = new GetMessageResultExt();
|
|
result.setStatus(GetMessageStatus.FOUND);
|
|
result.setMinOffset(flatFile.getConsumeQueueMinOffset());
|
|
result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
|
|
result.setNextBeginOffset(queueOffset + resultWrapperList.size());
|
|
- resultWrapperList.forEach(wrapper -> result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset()));
|
|
+ resultWrapperList.forEach(wrapper -> result.addMessageExt(
|
|
+ wrapper.getDuplicateResult(), wrapper.getOffset(), wrapper.getTagCode()));
|
|
+
|
|
+ if (lastGetOffset < result.getMaxOffset()) {
|
|
+ this.prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
|
|
+ }
|
|
return CompletableFuture.completedFuture(result);
|
|
}
|
|
|
|
- // if cache is miss, immediately pull messages
|
|
- LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " +
|
|
- "topic: {}, queue: {}, queue offset: {}, max message num: {}",
|
|
- mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
|
|
-
|
|
- CompletableFuture<GetMessageResult> resultFuture;
|
|
+ CompletableFuture<GetMessageResultExt> resultFuture;
|
|
synchronized (flatFile) {
|
|
int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
|
|
resultFuture = getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize)
|
|
- .thenApplyAsync(result -> {
|
|
+ .thenApply(result -> {
|
|
if (result.getStatus() != GetMessageStatus.FOUND) {
|
|
return result;
|
|
}
|
|
- GetMessageResult newResult = new GetMessageResult();
|
|
- newResult.setStatus(GetMessageStatus.FOUND);
|
|
- newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
|
|
- newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
|
|
|
|
+ GetMessageResultExt newResult = new GetMessageResultExt();
|
|
List<Long> offsetList = result.getMessageQueueOffset();
|
|
+ List<Long> tagCodeList = result.getTagCodeList();
|
|
List<SelectMappedBufferResult> msgList = result.getMessageMapedList();
|
|
- Long minOffset = offsetList.get(0);
|
|
- Long maxOffset = offsetList.get(offsetList.size() - 1);
|
|
- int size = offsetList.size();
|
|
+
|
|
for (int i = 0; i < offsetList.size(); i++) {
|
|
- Long offset = offsetList.get(i);
|
|
SelectMappedBufferResult msg = msgList.get(i);
|
|
- // put message into cache
|
|
- SelectMappedBufferResultWrapper resultWrapper = putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true);
|
|
- // try to meet maxCount
|
|
+ SelectBufferResultWrapper bufferResult = new SelectBufferResultWrapper(
|
|
+ msg, offsetList.get(i), tagCodeList.get(i), true);
|
|
+ this.putMessageToCache(flatFile, bufferResult);
|
|
if (newResult.getMessageMapedList().size() < maxCount) {
|
|
- newResult.addMessage(resultWrapper.getDuplicateResult(), offset);
|
|
+ newResult.addMessageExt(msg, offsetList.get(i), tagCodeList.get(i));
|
|
}
|
|
}
|
|
+
|
|
+ newResult.setStatus(GetMessageStatus.FOUND);
|
|
+ newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
|
|
+ newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
|
|
newResult.setNextBeginOffset(queueOffset + newResult.getMessageMapedList().size());
|
|
return newResult;
|
|
- }, TieredStoreExecutor.fetchDataExecutor);
|
|
+ });
|
|
|
|
List<Pair<Integer, CompletableFuture<Long>>> futureList = new ArrayList<>();
|
|
CompletableFuture<Long> inflightRequestFuture = resultFuture.thenApply(result ->
|
|
- result.getStatus() == GetMessageStatus.FOUND ? result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : -1L);
|
|
+ result.getStatus() == GetMessageStatus.FOUND ?
|
|
+ result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : -1L);
|
|
futureList.add(Pair.of(batchSize, inflightRequestFuture));
|
|
flatFile.putInflightRequest(group, queueOffset, batchSize, futureList);
|
|
}
|
|
return resultFuture;
|
|
}
|
|
|
|
- public CompletableFuture<GetMessageResult> getMessageFromTieredStoreAsync(CompositeQueueFlatFile flatFile,
|
|
- long queueOffset, int batchSize) {
|
|
+ public CompletableFuture<GetMessageResultExt> getMessageFromTieredStoreAsync(
|
|
+ CompositeQueueFlatFile flatFile, long queueOffset, int batchSize) {
|
|
|
|
- GetMessageResult result = new GetMessageResult();
|
|
+ GetMessageResultExt result = new GetMessageResultExt();
|
|
result.setMinOffset(flatFile.getConsumeQueueMinOffset());
|
|
result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
|
|
+
|
|
+ if (queueOffset < result.getMaxOffset()) {
|
|
+ batchSize = Math.min(batchSize, (int) Math.min(result.getMaxOffset() - queueOffset, Integer.MAX_VALUE));
|
|
+ } else if (queueOffset == result.getMaxOffset()) {
|
|
+ result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
|
|
+ result.setNextBeginOffset(queueOffset);
|
|
+ return CompletableFuture.completedFuture(result);
|
|
+ } else if (queueOffset > result.getMaxOffset()) {
|
|
+ result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
|
|
+ result.setNextBeginOffset(result.getMaxOffset());
|
|
+ return CompletableFuture.completedFuture(result);
|
|
+ }
|
|
+
|
|
+ LOGGER.info("MessageFetcher#getMessageFromTieredStoreAsync, " +
|
|
+ "topic: {}, queueId: {}, broker offset: {}-{}, offset: {}, expect: {}",
|
|
+ flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(),
|
|
+ result.getMinOffset(), result.getMaxOffset(), queueOffset, batchSize);
|
|
+
|
|
CompletableFuture<ByteBuffer> readConsumeQueueFuture;
|
|
try {
|
|
readConsumeQueueFuture = flatFile.getConsumeQueueAsync(queueOffset, batchSize);
|
|
@@ -389,66 +376,56 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
}
|
|
}
|
|
|
|
- CompletableFuture<ByteBuffer> readCommitLogFuture = readConsumeQueueFuture.thenComposeAsync(cqBuffer -> {
|
|
+ CompletableFuture<ByteBuffer> readCommitLogFuture = readConsumeQueueFuture.thenCompose(cqBuffer -> {
|
|
long firstCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
|
|
cqBuffer.position(cqBuffer.remaining() - TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
|
|
long lastCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
|
|
if (lastCommitLogOffset < firstCommitLogOffset) {
|
|
- MessageQueue mq = flatFile.getMessageQueue();
|
|
- LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: message is not in order, try to fetch data in next store, topic: {}, queueId: {}, batch size: {}, queue offset {}",
|
|
- mq.getTopic(), mq.getQueueId(), batchSize, queueOffset);
|
|
- throw new TieredStoreException(TieredStoreErrorCode.ILLEGAL_OFFSET, "message is not in order");
|
|
+ LOGGER.error("MessageFetcher#getMessageFromTieredStoreAsync, " +
|
|
+ "last offset is smaller than first offset, " +
|
|
+ "topic: {} queueId: {}, offset: {}, firstOffset: {}, lastOffset: {}",
|
|
+ flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), queueOffset,
|
|
+ firstCommitLogOffset, lastCommitLogOffset);
|
|
+ return CompletableFuture.completedFuture(ByteBuffer.allocate(0));
|
|
}
|
|
- long length = lastCommitLogOffset - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer);
|
|
|
|
- // prevent OOM
|
|
- long originLength = length;
|
|
- while (cqBuffer.limit() > TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE && length > storeConfig.getReadAheadMessageSizeThreshold()) {
|
|
+ // Get the total size of the data by reducing the length limit of cq to prevent OOM
|
|
+ long length = lastCommitLogOffset - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer);
|
|
+ while (cqBuffer.limit() > TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE &&
|
|
+ length > storeConfig.getReadAheadMessageSizeThreshold()) {
|
|
cqBuffer.limit(cqBuffer.position());
|
|
cqBuffer.position(cqBuffer.limit() - TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
|
|
- length = CQItemBufferUtil.getCommitLogOffset(cqBuffer) - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer);
|
|
- }
|
|
-
|
|
- if (originLength != length) {
|
|
- MessageQueue mq = flatFile.getMessageQueue();
|
|
- LOGGER.info("TieredMessageFetcher#getMessageFromTieredStoreAsync: msg data is too large, topic: {}, queueId: {}, batch size: {}, fix it from {} to {}",
|
|
- mq.getTopic(), mq.getQueueId(), batchSize, originLength, length);
|
|
+ length = CQItemBufferUtil.getCommitLogOffset(cqBuffer)
|
|
+ - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer);
|
|
}
|
|
|
|
return flatFile.getCommitLogAsync(firstCommitLogOffset, (int) length);
|
|
- }, TieredStoreExecutor.fetchDataExecutor);
|
|
+ });
|
|
|
|
- return readConsumeQueueFuture.thenCombineAsync(readCommitLogFuture, (cqBuffer, msgBuffer) -> {
|
|
- List<Pair<Integer, Integer>> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
|
|
- if (!msgList.isEmpty()) {
|
|
- int requestSize = cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
|
|
+ int finalBatchSize = batchSize;
|
|
+ return readConsumeQueueFuture.thenCombine(readCommitLogFuture, (cqBuffer, msgBuffer) -> {
|
|
+ List<SelectBufferResult> bufferList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
|
|
+ int requestSize = cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
|
|
+ if (bufferList.isEmpty()) {
|
|
+ result.setStatus(GetMessageStatus.NO_MATCHED_MESSAGE);
|
|
+ result.setNextBeginOffset(queueOffset + requestSize);
|
|
+ } else {
|
|
result.setStatus(GetMessageStatus.FOUND);
|
|
- result.setNextBeginOffset(queueOffset + msgList.size());
|
|
- msgList.forEach(pair -> {
|
|
- msgBuffer.position(pair.getLeft());
|
|
- ByteBuffer slice = msgBuffer.slice();
|
|
- slice.limit(pair.getRight());
|
|
- result.addMessage(new SelectMappedBufferResult(pair.getLeft(), slice, pair.getRight(), null), MessageBufferUtil.getQueueOffset(slice));
|
|
- });
|
|
- if (requestSize != msgList.size()) {
|
|
- Set<Long> requestOffsetSet = new HashSet<>();
|
|
- for (int i = 0; i < requestSize; i++) {
|
|
- requestOffsetSet.add(queueOffset + i);
|
|
- }
|
|
- LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: split message buffer failed, batch size: {}, request message count: {}, actual message count: {}, these messages may lost: {}", batchSize, requestSize, msgList.size(), Sets.difference(requestOffsetSet, Sets.newHashSet(result.getMessageQueueOffset())));
|
|
- } else if (requestSize != batchSize) {
|
|
- LOGGER.debug("TieredMessageFetcher#getMessageFromTieredStoreAsync: message count does not meet batch size, maybe dispatch delay: batch size: {}, request message count: {}", batchSize, requestSize);
|
|
+ result.setNextBeginOffset(queueOffset + requestSize);
|
|
+
|
|
+ for (SelectBufferResult bufferResult : bufferList) {
|
|
+ ByteBuffer slice = bufferResult.getByteBuffer().slice();
|
|
+ slice.limit(bufferResult.getSize());
|
|
+ SelectMappedBufferResult msg = new SelectMappedBufferResult(bufferResult.getStartOffset(),
|
|
+ bufferResult.getByteBuffer(), bufferResult.getSize(), null);
|
|
+ result.addMessageExt(msg, MessageBufferUtil.getQueueOffset(slice), bufferResult.getTagCode());
|
|
}
|
|
- return result;
|
|
}
|
|
- long nextBeginOffset = queueOffset + cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
|
|
- LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: split message buffer failed, consume queue buffer size: {}, message buffer size: {}, change offset from {} to {}", cqBuffer.remaining(), msgBuffer.remaining(), queueOffset, nextBeginOffset);
|
|
- result.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
|
|
- result.setNextBeginOffset(nextBeginOffset);
|
|
return result;
|
|
- }, TieredStoreExecutor.fetchDataExecutor).exceptionally(e -> {
|
|
+ }).exceptionally(e -> {
|
|
MessageQueue mq = flatFile.getMessageQueue();
|
|
- LOGGER.warn("TieredMessageFetcher#getMessageFromTieredStoreAsync: get message failed: topic: {} queueId: {}", mq.getTopic(), mq.getQueueId(), e);
|
|
+ LOGGER.warn("MessageFetcher#getMessageFromTieredStoreAsync failed, " +
|
|
+ "topic: {} queueId: {}, offset: {}, batchSize: {}", mq.getTopic(), mq.getQueueId(), queueOffset, finalBatchSize, e);
|
|
result.setStatus(GetMessageStatus.OFFSET_FOUND_NULL);
|
|
result.setNextBeginOffset(queueOffset);
|
|
return result;
|
|
@@ -498,7 +475,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
return CompletableFuture.completedFuture(result);
|
|
}
|
|
|
|
- return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount);
|
|
+ return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, true)
|
|
+ .thenApply(messageResultExt -> messageResultExt.doFilterMessage(messageFilter));
|
|
}
|
|
|
|
@Override
|
|
@@ -546,7 +524,7 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
return flatFile.getOffsetInConsumeQueueByTime(timestamp, type);
|
|
} catch (Exception e) {
|
|
LOGGER.error("TieredMessageFetcher#getOffsetInQueueByTime: " +
|
|
- "get offset in queue by time failed: topic: {}, queue: {}, timestamp: {}, type: {}",
|
|
+ "get offset in queue by time failed: topic: {}, queue: {}, timestamp: {}, type: {}",
|
|
topic, queueId, timestamp, type, e);
|
|
}
|
|
return -1L;
|
|
@@ -598,7 +576,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result);
|
|
}).whenComplete((result, throwable) -> {
|
|
if (result != null) {
|
|
- LOGGER.info("MessageFetcher#queryMessageAsync, query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}",
|
|
+ LOGGER.info("MessageFetcher#queryMessageAsync, " +
|
|
+ "query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}",
|
|
result.getMessageBufferList().size(), topic, topicId, key, maxCount, begin, end);
|
|
}
|
|
});
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
index edaa5d19f..015c27efa 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
@@ -213,8 +213,10 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
// so there is no need to update the maximum offset to the local cq offset here,
|
|
// otherwise it will cause repeated consumption after next begin offset over commit offset.
|
|
|
|
- logger.trace("GetMessageAsync result, group: {}, topic: {}, queueId: {}, offset: {}, count:{}, {}",
|
|
- group, topic, queueId, offset, maxMsgNums, result);
|
|
+ if (storeConfig.isRecordGetMessageResult()) {
|
|
+ logger.info("GetMessageAsync result, {}, group: {}, topic: {}, queueId: {}, offset: {}, count:{}",
|
|
+ result, group, topic, queueId, offset, maxMsgNums);
|
|
+ }
|
|
|
|
return result;
|
|
}).exceptionally(e -> {
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java
|
|
new file mode 100644
|
|
index 000000000..52462b5dc
|
|
--- /dev/null
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java
|
|
@@ -0,0 +1,76 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+
|
|
+package org.apache.rocketmq.tieredstore.common;
|
|
+
|
|
+import java.util.ArrayList;
|
|
+import java.util.List;
|
|
+import org.apache.rocketmq.store.GetMessageResult;
|
|
+import org.apache.rocketmq.store.GetMessageStatus;
|
|
+import org.apache.rocketmq.store.MessageFilter;
|
|
+import org.apache.rocketmq.store.SelectMappedBufferResult;
|
|
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
|
|
+
|
|
+public class GetMessageResultExt extends GetMessageResult {
|
|
+
|
|
+ private final List<Long> tagCodeList;
|
|
+
|
|
+ public GetMessageResultExt() {
|
|
+ this.tagCodeList = new ArrayList<>();
|
|
+ }
|
|
+
|
|
+ public void addMessageExt(SelectMappedBufferResult bufferResult, long queueOffset, long tagCode) {
|
|
+ super.addMessage(bufferResult, queueOffset);
|
|
+ this.tagCodeList.add(tagCode);
|
|
+ }
|
|
+
|
|
+ public List<Long> getTagCodeList() {
|
|
+ return tagCodeList;
|
|
+ }
|
|
+
|
|
+ public GetMessageResult doFilterMessage(MessageFilter messageFilter) {
|
|
+ if (GetMessageStatus.FOUND != super.getStatus() || messageFilter == null) {
|
|
+ return this;
|
|
+ }
|
|
+
|
|
+ GetMessageResult result = new GetMessageResult();
|
|
+ result.setStatus(GetMessageStatus.FOUND);
|
|
+ result.setMinOffset(this.getMinOffset());
|
|
+ result.setMaxOffset(this.getMaxOffset());
|
|
+ result.setNextBeginOffset(this.getNextBeginOffset());
|
|
+
|
|
+ for (int i = 0; i < this.getMessageMapedList().size(); i++) {
|
|
+ if (!messageFilter.isMatchedByConsumeQueue(this.tagCodeList.get(i), null)) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ SelectMappedBufferResult bufferResult = this.getMessageMapedList().get(i);
|
|
+ if (!messageFilter.isMatchedByCommitLog(bufferResult.getByteBuffer().slice(), null)) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ result.addMessage(new SelectMappedBufferResult(bufferResult.getStartOffset(),
|
|
+ bufferResult.getByteBuffer(), bufferResult.getSize(), null),
|
|
+ MessageBufferUtil.getQueueOffset(bufferResult.getByteBuffer()));
|
|
+ }
|
|
+
|
|
+ if (result.getBufferTotalSize() == 0) {
|
|
+ result.setStatus(GetMessageStatus.NO_MATCHED_MESSAGE);
|
|
+ }
|
|
+ return result;
|
|
+ }
|
|
+}
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
|
|
new file mode 100644
|
|
index 000000000..d265ed0fc
|
|
--- /dev/null
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
|
|
@@ -0,0 +1,51 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+
|
|
+package org.apache.rocketmq.tieredstore.common;
|
|
+
|
|
+import java.nio.ByteBuffer;
|
|
+
|
|
+public class SelectBufferResult {
|
|
+
|
|
+ private final ByteBuffer byteBuffer;
|
|
+ private final long startOffset;
|
|
+ private final int size;
|
|
+ private final long tagCode;
|
|
+
|
|
+ public SelectBufferResult(ByteBuffer byteBuffer, long startOffset, int size, long tagCode) {
|
|
+ this.startOffset = startOffset;
|
|
+ this.byteBuffer = byteBuffer;
|
|
+ this.size = size;
|
|
+ this.tagCode = tagCode;
|
|
+ }
|
|
+
|
|
+ public ByteBuffer getByteBuffer() {
|
|
+ return byteBuffer;
|
|
+ }
|
|
+
|
|
+ public long getStartOffset() {
|
|
+ return startOffset;
|
|
+ }
|
|
+
|
|
+ public int getSize() {
|
|
+ return size;
|
|
+ }
|
|
+
|
|
+ public long getTagCode() {
|
|
+ return tagCode;
|
|
+ }
|
|
+}
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java
|
|
similarity index 55%
|
|
rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java
|
|
rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java
|
|
index af0785f71..4f9f00a07 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java
|
|
@@ -16,32 +16,21 @@
|
|
*/
|
|
package org.apache.rocketmq.tieredstore.common;
|
|
|
|
-import java.util.concurrent.atomic.LongAdder;
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
import org.apache.rocketmq.store.SelectMappedBufferResult;
|
|
|
|
-public class SelectMappedBufferResultWrapper {
|
|
+public class SelectBufferResultWrapper {
|
|
|
|
private final SelectMappedBufferResult result;
|
|
- private final LongAdder accessCount;
|
|
-
|
|
- private final long curOffset;
|
|
- private final long minOffset;
|
|
- private final long maxOffset;
|
|
- private final long size;
|
|
-
|
|
- public SelectMappedBufferResultWrapper(
|
|
- SelectMappedBufferResult result, long curOffset, long minOffset, long maxOffset, long size) {
|
|
+ private final long offset;
|
|
+ private final long tagCode;
|
|
+ private final AtomicInteger accessCount;
|
|
|
|
+ public SelectBufferResultWrapper(SelectMappedBufferResult result, long offset, long tagCode, boolean used) {
|
|
this.result = result;
|
|
- this.accessCount = new LongAdder();
|
|
- this.curOffset = curOffset;
|
|
- this.minOffset = minOffset;
|
|
- this.maxOffset = maxOffset;
|
|
- this.size = size;
|
|
- }
|
|
-
|
|
- public SelectMappedBufferResult getResult() {
|
|
- return result;
|
|
+ this.offset = offset;
|
|
+ this.tagCode = tagCode;
|
|
+ this.accessCount = new AtomicInteger(used ? 1 : 0);
|
|
}
|
|
|
|
public SelectMappedBufferResult getDuplicateResult() {
|
|
@@ -53,27 +42,23 @@ public class SelectMappedBufferResultWrapper {
|
|
result.getMappedFile());
|
|
}
|
|
|
|
- public long getCurOffset() {
|
|
- return curOffset;
|
|
- }
|
|
-
|
|
- public long getMinOffset() {
|
|
- return minOffset;
|
|
+ public long getOffset() {
|
|
+ return offset;
|
|
}
|
|
|
|
- public long getMaxOffset() {
|
|
- return maxOffset;
|
|
+ public int getBufferSize() {
|
|
+ return this.result.getSize();
|
|
}
|
|
|
|
- public long getSize() {
|
|
- return size;
|
|
+ public long getTagCode() {
|
|
+ return tagCode;
|
|
}
|
|
|
|
- public void addAccessCount() {
|
|
- accessCount.increment();
|
|
+ public int incrementAndGet() {
|
|
+ return accessCount.incrementAndGet();
|
|
}
|
|
|
|
- public long getAccessCount() {
|
|
- return accessCount.sum();
|
|
+ public int getAccessCount() {
|
|
+ return accessCount.get();
|
|
}
|
|
}
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
|
|
index 595db6b86..b0750e550 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
|
|
@@ -82,6 +82,7 @@ public class TieredMessageStoreConfig {
|
|
|
|
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
|
|
private boolean messageIndexEnable = true;
|
|
+ private boolean recordGetMessageResult = false;
|
|
|
|
// CommitLog file size, default is 1G
|
|
private long tieredStoreCommitLogMaxSize = 1024 * 1024 * 1024;
|
|
@@ -182,6 +183,14 @@ public class TieredMessageStoreConfig {
|
|
this.messageIndexEnable = messageIndexEnable;
|
|
}
|
|
|
|
+ public boolean isRecordGetMessageResult() {
|
|
+ return recordGetMessageResult;
|
|
+ }
|
|
+
|
|
+ public void setRecordGetMessageResult(boolean recordGetMessageResult) {
|
|
+ this.recordGetMessageResult = recordGetMessageResult;
|
|
+ }
|
|
+
|
|
public long getTieredStoreCommitLogMaxSize() {
|
|
return tieredStoreCommitLogMaxSize;
|
|
}
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
|
|
index d8a07f0a7..2b9fc59d8 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
|
|
@@ -46,7 +46,7 @@ import org.apache.rocketmq.store.MessageStore;
|
|
import org.apache.rocketmq.tieredstore.TieredMessageFetcher;
|
|
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
|
import org.apache.rocketmq.tieredstore.common.MessageCacheKey;
|
|
-import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
|
|
+import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper;
|
|
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
|
|
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
|
|
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
|
|
@@ -265,7 +265,7 @@ public class TieredStoreMetricsManager {
|
|
.setUnit("bytes")
|
|
.ofLongs()
|
|
.buildWithCallback(measurement -> {
|
|
- Optional<Policy.Eviction<MessageCacheKey, SelectMappedBufferResultWrapper>> eviction = fetcher.getMessageCache().policy().eviction();
|
|
+ Optional<Policy.Eviction<MessageCacheKey, SelectBufferResultWrapper>> eviction = fetcher.getMessageCache().policy().eviction();
|
|
eviction.ifPresent(resultEviction -> measurement.record(resultEviction.weightedSize().orElse(0), newAttributesBuilder().build()));
|
|
});
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
|
|
index aad42de98..5e3d8c562 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
|
|
@@ -295,7 +295,7 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
|
|
return future;
|
|
}
|
|
if (position + length > commitPosition) {
|
|
- logger.warn("TieredFileSegment#readAsync request position + length is greater than commit position," +
|
|
+ logger.debug("TieredFileSegment#readAsync request position + length is greater than commit position," +
|
|
" correct length using commit position, file: {}, request position: {}, commit position:{}, change length from {} to {}",
|
|
getPath(), position, commitPosition, length, commitPosition - position);
|
|
length = (int) (commitPosition - position);
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java
|
|
index 6db45a747..2c4a6e578 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java
|
|
@@ -20,11 +20,11 @@ import java.nio.ByteBuffer;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
-import org.apache.commons.lang3.tuple.Pair;
|
|
import org.apache.rocketmq.common.UtilAll;
|
|
import org.apache.rocketmq.common.message.MessageDecoder;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
+import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
|
|
import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
|
|
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
|
|
|
|
@@ -113,53 +113,72 @@ public class MessageBufferUtil {
|
|
return MessageDecoder.decodeProperties(slice);
|
|
}
|
|
|
|
- public static List<Pair<Integer/* offset of msgBuffer */, Integer/* msg size */>> splitMessageBuffer(
|
|
- ByteBuffer cqBuffer, ByteBuffer msgBuffer) {
|
|
+ public static List<SelectBufferResult> splitMessageBuffer(ByteBuffer cqBuffer, ByteBuffer msgBuffer) {
|
|
+
|
|
cqBuffer.rewind();
|
|
msgBuffer.rewind();
|
|
- List<Pair<Integer, Integer>> messageList = new ArrayList<>(cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
|
|
+
|
|
+ List<SelectBufferResult> bufferResultList = new ArrayList<>(
|
|
+ cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
|
|
+
|
|
+ if (msgBuffer.remaining() == 0) {
|
|
+ logger.error("MessageBufferUtil#splitMessage, msg buffer length is zero");
|
|
+ return bufferResultList;
|
|
+ }
|
|
+
|
|
if (cqBuffer.remaining() % TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE != 0) {
|
|
- logger.warn("MessageBufferUtil#splitMessage: consume queue buffer size {} is not an integer multiple of CONSUME_QUEUE_STORE_UNIT_SIZE {}",
|
|
- cqBuffer.remaining(), TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
|
|
- return messageList;
|
|
+ logger.error("MessageBufferUtil#splitMessage, consume queue buffer size incorrect, {}", cqBuffer.remaining());
|
|
+ return bufferResultList;
|
|
}
|
|
+
|
|
try {
|
|
- long startCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
|
|
- for (int pos = cqBuffer.position(); pos < cqBuffer.limit(); pos += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) {
|
|
- cqBuffer.position(pos);
|
|
- int diff = (int) (CQItemBufferUtil.getCommitLogOffset(cqBuffer) - startCommitLogOffset);
|
|
- int size = CQItemBufferUtil.getSize(cqBuffer);
|
|
- if (diff + size > msgBuffer.limit()) {
|
|
- logger.error("MessageBufferUtil#splitMessage: message buffer size is incorrect: record in consume queue: {}, actual: {}", diff + size, msgBuffer.remaining());
|
|
- return messageList;
|
|
+ long firstCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
|
|
+
|
|
+ for (int position = cqBuffer.position(); position < cqBuffer.limit();
|
|
+ position += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) {
|
|
+
|
|
+ cqBuffer.position(position);
|
|
+ long logOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
|
|
+ int bufferSize = CQItemBufferUtil.getSize(cqBuffer);
|
|
+ long tagCode = CQItemBufferUtil.getTagCode(cqBuffer);
|
|
+
|
|
+ int offset = (int) (logOffset - firstCommitLogOffset);
|
|
+ if (offset + bufferSize > msgBuffer.limit()) {
|
|
+ logger.error("MessageBufferUtil#splitMessage, message buffer size incorrect. " +
|
|
+ "Expect length in consume queue: {}, actual length: {}", offset + bufferSize, msgBuffer.limit());
|
|
+ break;
|
|
}
|
|
- msgBuffer.position(diff);
|
|
|
|
+ msgBuffer.position(offset);
|
|
int magicCode = getMagicCode(msgBuffer);
|
|
if (magicCode == TieredCommitLog.BLANK_MAGIC_CODE) {
|
|
- logger.warn("MessageBufferUtil#splitMessage: message decode error: blank magic code, this message may be coda, try to fix offset");
|
|
- diff = diff + TieredCommitLog.CODA_SIZE;
|
|
- msgBuffer.position(diff);
|
|
+ offset += TieredCommitLog.CODA_SIZE;
|
|
+ msgBuffer.position(offset);
|
|
magicCode = getMagicCode(msgBuffer);
|
|
}
|
|
- if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
|
|
- logger.warn("MessageBufferUtil#splitMessage: message decode error: unknown magic code");
|
|
+ if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE &&
|
|
+ magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
|
|
+ logger.warn("MessageBufferUtil#splitMessage, found unknown magic code. " +
|
|
+ "Message offset: {}, wrong magic code: {}", offset, magicCode);
|
|
continue;
|
|
}
|
|
|
|
- if (getTotalSize(msgBuffer) != size) {
|
|
- logger.warn("MessageBufferUtil#splitMessage: message size is not right: except: {}, actual: {}", size, getTotalSize(msgBuffer));
|
|
+ if (bufferSize != getTotalSize(msgBuffer)) {
|
|
+ logger.warn("MessageBufferUtil#splitMessage, message length in commitlog incorrect. " +
|
|
+ "Except length in commitlog: {}, actual: {}", getTotalSize(msgBuffer), bufferSize);
|
|
continue;
|
|
}
|
|
|
|
- messageList.add(Pair.of(diff, size));
|
|
+ ByteBuffer sliceBuffer = msgBuffer.slice();
|
|
+ sliceBuffer.limit(bufferSize);
|
|
+ bufferResultList.add(new SelectBufferResult(sliceBuffer, offset, bufferSize, tagCode));
|
|
}
|
|
} catch (Exception e) {
|
|
- logger.error("MessageBufferUtil#splitMessage: split message failed, maybe decode consume queue item failed", e);
|
|
+ logger.error("MessageBufferUtil#splitMessage, split message buffer error", e);
|
|
} finally {
|
|
cqBuffer.rewind();
|
|
msgBuffer.rewind();
|
|
}
|
|
- return messageList;
|
|
+ return bufferResultList;
|
|
}
|
|
}
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
|
|
index 4e0d7e697..4e8287533 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
|
|
@@ -31,7 +31,7 @@ import org.apache.rocketmq.store.GetMessageStatus;
|
|
import org.apache.rocketmq.store.QueryMessageResult;
|
|
import org.apache.rocketmq.store.SelectMappedBufferResult;
|
|
import org.apache.rocketmq.tieredstore.common.AppendResult;
|
|
-import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
|
|
+import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper;
|
|
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
|
|
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
|
|
import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
|
|
@@ -143,17 +143,18 @@ public class TieredMessageFetcherTest {
|
|
|
|
fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, new ArrayList<>());
|
|
Assert.assertEquals(0, fetcher.getMessageCache().estimatedSize());
|
|
- fetcher.putMessageToCache(flatFile, 0, new SelectMappedBufferResult(0, msg1, msg1.remaining(), null), 0, 0, 1);
|
|
+ SelectMappedBufferResult bufferResult = new SelectMappedBufferResult(0, msg1, msg1.remaining(), null);
|
|
+ fetcher.putMessageToCache(flatFile, new SelectBufferResultWrapper(bufferResult, 0, 0, false));
|
|
Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize());
|
|
|
|
- GetMessageResult getMessageResult = fetcher.getMessageFromCacheAsync(flatFile, "group", 0, 32).join();
|
|
+ GetMessageResult getMessageResult = fetcher.getMessageFromCacheAsync(flatFile, "group", 0, 32, true).join();
|
|
Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
|
|
Assert.assertEquals(1, getMessageResult.getMessageBufferList().size());
|
|
Assert.assertEquals(msg1, getMessageResult.getMessageBufferList().get(0));
|
|
|
|
Awaitility.waitAtMost(3, TimeUnit.SECONDS)
|
|
.until(() -> fetcher.getMessageCache().estimatedSize() == 2);
|
|
- ArrayList<SelectMappedBufferResultWrapper> wrapperList = new ArrayList<>();
|
|
+ ArrayList<SelectBufferResultWrapper> wrapperList = new ArrayList<>();
|
|
wrapperList.add(fetcher.getMessageFromCache(flatFile, 0));
|
|
fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, wrapperList);
|
|
Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize());
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java
|
|
new file mode 100644
|
|
index 000000000..deb8770d2
|
|
--- /dev/null
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java
|
|
@@ -0,0 +1,65 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+package org.apache.rocketmq.tieredstore.common;
|
|
+
|
|
+import java.nio.ByteBuffer;
|
|
+import java.util.Map;
|
|
+import org.apache.rocketmq.store.ConsumeQueueExt;
|
|
+import org.apache.rocketmq.store.GetMessageResult;
|
|
+import org.apache.rocketmq.store.GetMessageStatus;
|
|
+import org.apache.rocketmq.store.MessageFilter;
|
|
+import org.apache.rocketmq.store.SelectMappedBufferResult;
|
|
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
|
|
+import org.junit.Assert;
|
|
+import org.junit.Test;
|
|
+
|
|
+import static org.junit.Assert.assertEquals;
|
|
+
|
|
+public class GetMessageResultExtTest {
|
|
+
|
|
+ @Test
|
|
+ public void doFilterTest() {
|
|
+ GetMessageResultExt resultExt = new GetMessageResultExt();
|
|
+ Assert.assertEquals(0, resultExt.doFilterMessage(null).getMessageCount());
|
|
+ resultExt.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
|
|
+ Assert.assertEquals(0, resultExt.doFilterMessage(null).getMessageCount());
|
|
+ resultExt.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
|
|
+ Assert.assertEquals(0, resultExt.doFilterMessage(null).getMessageCount());
|
|
+
|
|
+ resultExt.addMessageExt(new SelectMappedBufferResult(
|
|
+ 1000L, MessageBufferUtilTest.buildMockedMessageBuffer(), 100, null),
|
|
+ 0, "TagA".hashCode());
|
|
+ resultExt.addMessageExt(new SelectMappedBufferResult(
|
|
+ 2000L, MessageBufferUtilTest.buildMockedMessageBuffer(), 100, null),
|
|
+ 0, "TagB".hashCode());
|
|
+ assertEquals(2, resultExt.getMessageCount());
|
|
+
|
|
+ resultExt.setStatus(GetMessageStatus.FOUND);
|
|
+ GetMessageResult getMessageResult = resultExt.doFilterMessage(new MessageFilter() {
|
|
+ @Override
|
|
+ public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
|
|
+ return false;
|
|
+ }
|
|
+ });
|
|
+ Assert.assertEquals(0, getMessageResult.getMessageCount());
|
|
+ }
|
|
+}
|
|
\ No newline at end of file
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java
|
|
new file mode 100644
|
|
index 000000000..b7e6e639f
|
|
--- /dev/null
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java
|
|
@@ -0,0 +1,37 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+package org.apache.rocketmq.tieredstore.common;
|
|
+
|
|
+import java.nio.ByteBuffer;
|
|
+import org.junit.Assert;
|
|
+import org.junit.Test;
|
|
+
|
|
+public class SelectBufferResultTest {
|
|
+ @Test
|
|
+ public void testSelectBufferResult() {
|
|
+ ByteBuffer buffer = ByteBuffer.allocate(10);
|
|
+ long startOffset = 5L;
|
|
+ int size = 10;
|
|
+ long tagCode = 1L;
|
|
+
|
|
+ SelectBufferResult result = new SelectBufferResult(buffer, startOffset, size, tagCode);
|
|
+ Assert.assertEquals(buffer, result.getByteBuffer());
|
|
+ Assert.assertEquals(startOffset, result.getStartOffset());
|
|
+ Assert.assertEquals(size, result.getSize());
|
|
+ Assert.assertEquals(tagCode, result.getTagCode());
|
|
+ }
|
|
+}
|
|
\ No newline at end of file
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
|
|
index 68277cacc..a0b438948 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
|
|
@@ -22,9 +22,9 @@ import java.nio.charset.StandardCharsets;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
-import org.apache.commons.lang3.tuple.Pair;
|
|
import org.apache.rocketmq.common.message.MessageConst;
|
|
import org.apache.rocketmq.common.message.MessageDecoder;
|
|
+import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
|
|
import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
|
|
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
|
|
import org.junit.Assert;
|
|
@@ -206,10 +206,12 @@ public class MessageBufferUtilTest {
|
|
cqBuffer.flip();
|
|
cqBuffer1.rewind();
|
|
cqBuffer2.rewind();
|
|
- List<Pair<Integer, Integer>> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
|
|
+ List<SelectBufferResult> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
|
|
Assert.assertEquals(2, msgList.size());
|
|
- Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
|
|
- Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1));
|
|
+ Assert.assertEquals(0, msgList.get(0).getStartOffset());
|
|
+ Assert.assertEquals(MSG_LEN, msgList.get(0).getSize());
|
|
+ Assert.assertEquals(MSG_LEN + TieredCommitLog.CODA_SIZE, msgList.get(1).getStartOffset());
|
|
+ Assert.assertEquals(MSG_LEN, msgList.get(1).getSize());
|
|
|
|
cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2);
|
|
cqBuffer.put(cqBuffer1);
|
|
@@ -219,7 +221,8 @@ public class MessageBufferUtilTest {
|
|
cqBuffer4.rewind();
|
|
msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
|
|
Assert.assertEquals(1, msgList.size());
|
|
- Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
|
|
+ Assert.assertEquals(0, msgList.get(0).getStartOffset());
|
|
+ Assert.assertEquals(MSG_LEN, msgList.get(0).getSize());
|
|
|
|
cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 3);
|
|
cqBuffer.put(cqBuffer1);
|
|
@@ -227,8 +230,10 @@ public class MessageBufferUtilTest {
|
|
cqBuffer.flip();
|
|
msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
|
|
Assert.assertEquals(2, msgList.size());
|
|
- Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
|
|
- Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1));
|
|
+ Assert.assertEquals(0, msgList.get(0).getStartOffset());
|
|
+ Assert.assertEquals(MSG_LEN, msgList.get(0).getSize());
|
|
+ Assert.assertEquals(MSG_LEN + TieredCommitLog.CODA_SIZE, msgList.get(1).getStartOffset());
|
|
+ Assert.assertEquals(MSG_LEN, msgList.get(1).getSize());
|
|
|
|
cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
|
|
cqBuffer.put(cqBuffer5);
|
|
--
|
|
2.32.0.windows.2
|
|
|