987 lines
48 KiB
Diff
987 lines
48 KiB
Diff
From ead3d905016d9db4785a46beaa555c7fafd4f9bb Mon Sep 17 00:00:00 2001
|
|
From: Dongyuan Pan <dongyuanpan0@gmail.com>
|
|
Date: Wed, 8 Nov 2023 10:40:52 +0800
|
|
Subject: [PATCH 1/2] [ISSUE #7511] Lock granularity issue causing LMQ message
|
|
loss (#7525)
|
|
|
|
* bug fix: assignOffset and increaseOffset in LMQ has concurrency issues in topicQueueLock, should be in putMessageLock
|
|
|
|
* fix MultiDispatchTest
|
|
|
|
* fix MultiDispatchTest
|
|
|
|
* fix unit test
|
|
---
|
|
.../common/message/MessageExtBrokerInner.java | 10 ++
|
|
.../org/apache/rocketmq/store/CommitLog.java | 94 ++++++++++++--
|
|
.../apache/rocketmq/store/ConsumeQueue.java | 44 +------
|
|
.../rocketmq/store/DefaultMessageStore.java | 1 -
|
|
.../rocketmq/store/MessageExtEncoder.java | 118 ++++++++++++++++--
|
|
.../apache/rocketmq/store/MultiDispatch.java | 77 ++++++++++++
|
|
.../queue/AbstractConsumeQueueStore.java | 10 ++
|
|
.../store/queue/ConsumeQueueInterface.java | 1 -
|
|
.../queue/ConsumeQueueStoreInterface.java | 14 +++
|
|
...iDispatch.java => MultiDispatchUtils.java} | 17 +--
|
|
.../store/queue/QueueOffsetOperator.java | 6 +-
|
|
.../store/queue/RocksDBConsumeQueue.java | 42 -------
|
|
.../rocketmq/store/AppendCallbackTest.java | 6 +-
|
|
.../rocketmq/store/AppendPropCRCTest.java | 5 +-
|
|
.../rocketmq/store/MultiDispatchTest.java | 12 +-
|
|
.../rocketmq/store/kv/CompactionLogTest.java | 2 +-
|
|
16 files changed, 322 insertions(+), 137 deletions(-)
|
|
create mode 100644 store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
|
|
rename store/src/main/java/org/apache/rocketmq/store/queue/{MultiDispatch.java => MultiDispatchUtils.java} (78%)
|
|
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
|
|
index 52501dbca..147f23f12 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
|
|
@@ -28,6 +28,8 @@ public class MessageExtBrokerInner extends MessageExt {
|
|
|
|
private ByteBuffer encodedBuff;
|
|
|
|
+ private volatile boolean encodeCompleted;
|
|
+
|
|
private MessageVersion version = MessageVersion.MESSAGE_VERSION_V1;
|
|
|
|
public ByteBuffer getEncodedBuff() {
|
|
@@ -92,4 +94,12 @@ public class MessageExtBrokerInner extends MessageExt {
|
|
this.setPropertiesString(MessageDecoder.messageProperties2String(this.getProperties()));
|
|
}
|
|
}
|
|
+
|
|
+ public boolean isEncodeCompleted() {
|
|
+ return encodeCompleted;
|
|
+ }
|
|
+
|
|
+ public void setEncodeCompleted(boolean encodeCompleted) {
|
|
+ this.encodeCompleted = encodeCompleted;
|
|
+ }
|
|
}
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
index 6c3afde70..35c1d0e2d 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
@@ -35,6 +35,7 @@ import java.util.function.Supplier;
|
|
import java.util.stream.Collectors;
|
|
import com.sun.jna.NativeLong;
|
|
import com.sun.jna.Pointer;
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.rocketmq.common.MixAll;
|
|
import org.apache.rocketmq.common.ServiceThread;
|
|
import org.apache.rocketmq.common.SystemClock;
|
|
@@ -56,6 +57,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal;
|
|
import org.apache.rocketmq.store.config.BrokerRole;
|
|
import org.apache.rocketmq.store.config.FlushDiskType;
|
|
+import org.apache.rocketmq.store.config.MessageStoreConfig;
|
|
import org.apache.rocketmq.store.ha.HAService;
|
|
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
|
|
import org.apache.rocketmq.store.logfile.MappedFile;
|
|
@@ -101,6 +103,7 @@ public class CommitLog implements Swappable {
|
|
protected int commitLogSize;
|
|
|
|
private final boolean enabledAppendPropCRC;
|
|
+ protected final MultiDispatch multiDispatch;
|
|
|
|
public CommitLog(final DefaultMessageStore messageStore) {
|
|
String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog();
|
|
@@ -119,13 +122,11 @@ public class CommitLog implements Swappable {
|
|
this.flushManager = new DefaultFlushManager();
|
|
this.coldDataCheckService = new ColdDataCheckService();
|
|
|
|
- this.appendMessageCallback = new DefaultAppendMessageCallback();
|
|
+ this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig());
|
|
putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
|
|
@Override
|
|
protected PutMessageThreadLocal initialValue() {
|
|
- return new PutMessageThreadLocal(
|
|
- defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(),
|
|
- defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC());
|
|
+ return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig());
|
|
}
|
|
};
|
|
this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
|
|
@@ -137,6 +138,8 @@ public class CommitLog implements Swappable {
|
|
this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
|
|
|
|
this.enabledAppendPropCRC = messageStore.getMessageStoreConfig().isEnabledAppendPropCRC();
|
|
+
|
|
+ this.multiDispatch = new MultiDispatch(defaultMessageStore);
|
|
}
|
|
|
|
public void setFullStorePaths(Set<String> fullStorePaths) {
|
|
@@ -1830,15 +1833,84 @@ public class CommitLog implements Swappable {
|
|
// Store the message content
|
|
private final ByteBuffer msgStoreItemMemory;
|
|
private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN;
|
|
+ private final MessageStoreConfig messageStoreConfig;
|
|
|
|
- DefaultAppendMessageCallback() {
|
|
+ DefaultAppendMessageCallback(MessageStoreConfig messageStoreConfig) {
|
|
this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
|
|
+ this.messageStoreConfig = messageStoreConfig;
|
|
+ }
|
|
+
|
|
+ public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, final MessageExtBrokerInner msgInner) {
|
|
+ if (msgInner.isEncodeCompleted()) {
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ multiDispatch.wrapMultiDispatch(msgInner);
|
|
+
|
|
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
|
|
+
|
|
+ final byte[] propertiesData =
|
|
+ msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
|
|
+
|
|
+ boolean needAppendLastPropertySeparator = enabledAppendPropCRC && propertiesData != null && propertiesData.length > 0
|
|
+ && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR;
|
|
+
|
|
+ final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength;
|
|
+
|
|
+ if (propertiesLength > Short.MAX_VALUE) {
|
|
+ log.warn("putMessage message properties length too long. length={}", propertiesData.length);
|
|
+ return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
|
|
+ }
|
|
+
|
|
+ int msgLenWithoutProperties = preEncodeBuffer.getInt(0);
|
|
+
|
|
+ int msgLen = msgLenWithoutProperties + 2 + propertiesLength;
|
|
+
|
|
+ // Exceeds the maximum message
|
|
+ if (msgLen > this.messageStoreConfig.getMaxMessageSize()) {
|
|
+ log.warn("message size exceeded, msg total size: " + msgLen + ", maxMessageSize: " + this.messageStoreConfig.getMaxMessageSize());
|
|
+ return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
|
|
+ }
|
|
+
|
|
+ // Back filling total message length
|
|
+ preEncodeBuffer.putInt(0, msgLen);
|
|
+ // Modify position to msgLenWithoutProperties
|
|
+ preEncodeBuffer.position(msgLenWithoutProperties);
|
|
+
|
|
+ preEncodeBuffer.putShort((short) propertiesLength);
|
|
+
|
|
+ if (propertiesLength > crc32ReservedLength) {
|
|
+ preEncodeBuffer.put(propertiesData);
|
|
+ }
|
|
+
|
|
+ if (needAppendLastPropertySeparator) {
|
|
+ preEncodeBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR);
|
|
+ }
|
|
+ // 18 CRC32
|
|
+ preEncodeBuffer.position(preEncodeBuffer.position() + crc32ReservedLength);
|
|
+
|
|
+ msgInner.setEncodeCompleted(true);
|
|
+
|
|
+ return null;
|
|
}
|
|
|
|
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
|
|
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
|
|
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
|
|
|
|
+ ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
|
|
+ boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner);
|
|
+ if (isMultiDispatchMsg) {
|
|
+ AppendMessageResult appendMessageResult = handlePropertiesForLmqMsg(preEncodeBuffer, msgInner);
|
|
+ if (appendMessageResult != null) {
|
|
+ return appendMessageResult;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ final int msgLen = preEncodeBuffer.getInt(0);
|
|
+ preEncodeBuffer.position(0);
|
|
+ preEncodeBuffer.limit(msgLen);
|
|
+
|
|
// PHY OFFSET
|
|
long wroteOffset = fileFromOffset + byteBuffer.position();
|
|
|
|
@@ -1872,9 +1944,6 @@ public class CommitLog implements Swappable {
|
|
break;
|
|
}
|
|
|
|
- ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
|
|
- final int msgLen = preEncodeBuffer.getInt(0);
|
|
-
|
|
// Determines whether there is sufficient free space
|
|
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
|
|
this.msgStoreItemMemory.clear();
|
|
@@ -1919,6 +1988,11 @@ public class CommitLog implements Swappable {
|
|
byteBuffer.put(preEncodeBuffer);
|
|
CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS");
|
|
msgInner.setEncodedBuff(null);
|
|
+
|
|
+ if (isMultiDispatchMsg) {
|
|
+ CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
|
|
+ }
|
|
+
|
|
return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
|
|
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum);
|
|
}
|
|
@@ -2159,6 +2233,10 @@ public class CommitLog implements Swappable {
|
|
return flushManager;
|
|
}
|
|
|
|
+ public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) {
|
|
+ return StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
|
|
+ }
|
|
+
|
|
private boolean isCloseReadAhead() {
|
|
return !MixAll.isWindows() && !defaultMessageStore.getMessageStoreConfig().isDataReadAheadEnable();
|
|
}
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
|
|
index 623509c8b..453c9d1dc 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
|
|
@@ -27,7 +27,6 @@ import org.apache.rocketmq.common.MixAll;
|
|
import org.apache.rocketmq.common.Pair;
|
|
import org.apache.rocketmq.common.attribute.CQType;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
-import org.apache.rocketmq.common.message.MessageAccessor;
|
|
import org.apache.rocketmq.common.message.MessageConst;
|
|
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
@@ -38,7 +37,7 @@ import org.apache.rocketmq.store.logfile.MappedFile;
|
|
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
|
|
import org.apache.rocketmq.store.queue.CqUnit;
|
|
import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
|
|
-import org.apache.rocketmq.store.queue.MultiDispatch;
|
|
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
|
|
import org.apache.rocketmq.store.queue.QueueOffsetOperator;
|
|
import org.apache.rocketmq.store.queue.ReferredIterator;
|
|
|
|
@@ -702,7 +701,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
|
|
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
|
|
}
|
|
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
|
|
- if (MultiDispatch.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) {
|
|
+ if (MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) {
|
|
multiDispatchLmqQueue(request, maxRetries);
|
|
}
|
|
return;
|
|
@@ -776,28 +775,6 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
|
|
String topicQueueKey = getTopic() + "-" + getQueueId();
|
|
long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey);
|
|
msg.setQueueOffset(queueOffset);
|
|
-
|
|
-
|
|
- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28),
|
|
- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue.
|
|
- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) {
|
|
- return;
|
|
- }
|
|
- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
|
|
- if (StringUtils.isBlank(multiDispatchQueue)) {
|
|
- return;
|
|
- }
|
|
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
|
|
- Long[] queueOffsets = new Long[queues.length];
|
|
- for (int i = 0; i < queues.length; i++) {
|
|
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) {
|
|
- String key = MultiDispatch.lmqQueueKey(queues[i]);
|
|
- queueOffsets[i] = queueOffsetOperator.getLmqOffset(key);
|
|
- }
|
|
- }
|
|
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
|
|
- StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
|
|
- msg.removeWaitStorePropertyString();
|
|
}
|
|
|
|
@Override
|
|
@@ -805,23 +782,6 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
|
|
short messageNum) {
|
|
String topicQueueKey = getTopic() + "-" + getQueueId();
|
|
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
|
|
-
|
|
- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28),
|
|
- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue.
|
|
- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) {
|
|
- return;
|
|
- }
|
|
- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
|
|
- if (StringUtils.isBlank(multiDispatchQueue)) {
|
|
- return;
|
|
- }
|
|
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
|
|
- for (int i = 0; i < queues.length; i++) {
|
|
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) {
|
|
- String key = MultiDispatch.lmqQueueKey(queues[i]);
|
|
- queueOffsetOperator.increaseLmqOffset(key, (short) 1);
|
|
- }
|
|
- }
|
|
}
|
|
|
|
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
|
index 99a54e2d7..dc5f312e5 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
|
@@ -2112,7 +2112,6 @@ public class DefaultMessageStore implements MessageStore {
|
|
}
|
|
}
|
|
|
|
-
|
|
@Override
|
|
public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
|
|
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
|
|
index c1d808728..20e9a652b 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
|
|
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageVersion;
|
|
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
+import org.apache.rocketmq.store.config.MessageStoreConfig;
|
|
|
|
public class MessageExtEncoder {
|
|
protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
|
|
@@ -38,20 +39,22 @@ public class MessageExtEncoder {
|
|
// The maximum length of the full message.
|
|
private int maxMessageSize;
|
|
private final int crc32ReservedLength;
|
|
+ private MessageStoreConfig messageStoreConfig;
|
|
|
|
- public MessageExtEncoder(final int maxMessageBodySize) {
|
|
- this(maxMessageBodySize, false);
|
|
+ public MessageExtEncoder(final int maxMessageBodySize, final MessageStoreConfig messageStoreConfig) {
|
|
+ this(messageStoreConfig);
|
|
}
|
|
|
|
- public MessageExtEncoder(final int maxMessageBodySize, boolean enabledAppendPropCRC) {
|
|
+ public MessageExtEncoder(final MessageStoreConfig messageStoreConfig) {
|
|
ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
|
|
+ this.messageStoreConfig = messageStoreConfig;
|
|
+ this.maxMessageBodySize = messageStoreConfig.getMaxMessageSize();
|
|
//Reserve 64kb for encoding buffer outside body
|
|
int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ?
|
|
maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
|
|
byteBuf = alloc.directBuffer(maxMessageSize);
|
|
- this.maxMessageBodySize = maxMessageBodySize;
|
|
this.maxMessageSize = maxMessageSize;
|
|
- this.crc32ReservedLength = enabledAppendPropCRC ? CommitLog.CRC32_RESERVED_LEN : 0;
|
|
+ this.crc32ReservedLength = messageStoreConfig.isEnabledAppendPropCRC() ? CommitLog.CRC32_RESERVED_LEN : 0;
|
|
}
|
|
|
|
public static int calMsgLength(MessageVersion messageVersion,
|
|
@@ -79,8 +82,103 @@ public class MessageExtEncoder {
|
|
+ 2 + (Math.max(propertiesLength, 0)); //propertiesLength
|
|
}
|
|
|
|
+ public static int calMsgLengthNoProperties(MessageVersion messageVersion,
|
|
+ int sysFlag, int bodyLength, int topicLength) {
|
|
+
|
|
+ int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
|
|
+ int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
|
|
+
|
|
+ return 4 //TOTALSIZE
|
|
+ + 4 //MAGICCODE
|
|
+ + 4 //BODYCRC
|
|
+ + 4 //QUEUEID
|
|
+ + 4 //FLAG
|
|
+ + 8 //QUEUEOFFSET
|
|
+ + 8 //PHYSICALOFFSET
|
|
+ + 4 //SYSFLAG
|
|
+ + 8 //BORNTIMESTAMP
|
|
+ + bornhostLength //BORNHOST
|
|
+ + 8 //STORETIMESTAMP
|
|
+ + storehostAddressLength //STOREHOSTADDRESS
|
|
+ + 4 //RECONSUMETIMES
|
|
+ + 8 //Prepared Transaction Offset
|
|
+ + 4 + (Math.max(bodyLength, 0)) //BODY
|
|
+ + messageVersion.getTopicLengthSize() + topicLength; //TOPIC
|
|
+ }
|
|
+
|
|
+ public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner msgInner) {
|
|
+
|
|
+ final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
|
|
+ final int topicLength = topicData.length;
|
|
+
|
|
+ final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
|
|
+
|
|
+ // Exceeds the maximum message body
|
|
+ if (bodyLength > this.maxMessageBodySize) {
|
|
+ CommitLog.log.warn("message body size exceeded, msg body size: " + bodyLength
|
|
+ + ", maxMessageSize: " + this.maxMessageBodySize);
|
|
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
|
|
+ }
|
|
+
|
|
+ final int msgLenNoProperties = calMsgLengthNoProperties(msgInner.getVersion(), msgInner.getSysFlag(), bodyLength, topicLength);
|
|
+
|
|
+ // 1 TOTALSIZE
|
|
+ this.byteBuf.writeInt(msgLenNoProperties);
|
|
+ // 2 MAGICCODE
|
|
+ this.byteBuf.writeInt(msgInner.getVersion().getMagicCode());
|
|
+ // 3 BODYCRC
|
|
+ this.byteBuf.writeInt(msgInner.getBodyCRC());
|
|
+ // 4 QUEUEID
|
|
+ this.byteBuf.writeInt(msgInner.getQueueId());
|
|
+ // 5 FLAG
|
|
+ this.byteBuf.writeInt(msgInner.getFlag());
|
|
+ // 6 QUEUEOFFSET, need update later
|
|
+ this.byteBuf.writeLong(0);
|
|
+ // 7 PHYSICALOFFSET, need update later
|
|
+ this.byteBuf.writeLong(0);
|
|
+ // 8 SYSFLAG
|
|
+ this.byteBuf.writeInt(msgInner.getSysFlag());
|
|
+ // 9 BORNTIMESTAMP
|
|
+ this.byteBuf.writeLong(msgInner.getBornTimestamp());
|
|
+
|
|
+ // 10 BORNHOST
|
|
+ ByteBuffer bornHostBytes = msgInner.getBornHostBytes();
|
|
+ this.byteBuf.writeBytes(bornHostBytes.array());
|
|
+
|
|
+ // 11 STORETIMESTAMP
|
|
+ this.byteBuf.writeLong(msgInner.getStoreTimestamp());
|
|
+
|
|
+ // 12 STOREHOSTADDRESS
|
|
+ ByteBuffer storeHostBytes = msgInner.getStoreHostBytes();
|
|
+ this.byteBuf.writeBytes(storeHostBytes.array());
|
|
+
|
|
+ // 13 RECONSUMETIMES
|
|
+ this.byteBuf.writeInt(msgInner.getReconsumeTimes());
|
|
+ // 14 Prepared Transaction Offset
|
|
+ this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset());
|
|
+ // 15 BODY
|
|
+ this.byteBuf.writeInt(bodyLength);
|
|
+ if (bodyLength > 0)
|
|
+ this.byteBuf.writeBytes(msgInner.getBody());
|
|
+
|
|
+ // 16 TOPIC
|
|
+ if (MessageVersion.MESSAGE_VERSION_V2.equals(msgInner.getVersion())) {
|
|
+ this.byteBuf.writeShort((short) topicLength);
|
|
+ } else {
|
|
+ this.byteBuf.writeByte((byte) topicLength);
|
|
+ }
|
|
+ this.byteBuf.writeBytes(topicData);
|
|
+
|
|
+ return null;
|
|
+ }
|
|
+
|
|
public PutMessageResult encode(MessageExtBrokerInner msgInner) {
|
|
this.byteBuf.clear();
|
|
+
|
|
+ if (messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner)) {
|
|
+ return encodeWithoutProperties(msgInner);
|
|
+ }
|
|
+
|
|
/**
|
|
* Serialize message
|
|
*/
|
|
@@ -303,7 +401,7 @@ public class MessageExtEncoder {
|
|
}
|
|
|
|
public ByteBuffer getEncoderBuffer() {
|
|
- return this.byteBuf.nioBuffer();
|
|
+ return this.byteBuf.nioBuffer(0, this.byteBuf.capacity());
|
|
}
|
|
|
|
public int getMaxMessageBodySize() {
|
|
@@ -322,12 +420,8 @@ public class MessageExtEncoder {
|
|
private final MessageExtEncoder encoder;
|
|
private final StringBuilder keyBuilder;
|
|
|
|
- PutMessageThreadLocal(int size) {
|
|
- this(size, false);
|
|
- }
|
|
-
|
|
- PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) {
|
|
- encoder = new MessageExtEncoder(size, enabledAppendPropCRC);
|
|
+ PutMessageThreadLocal(MessageStoreConfig messageStoreConfig) {
|
|
+ encoder = new MessageExtEncoder(messageStoreConfig);
|
|
keyBuilder = new StringBuilder();
|
|
}
|
|
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
|
|
new file mode 100644
|
|
index 000000000..5bc587a8e
|
|
--- /dev/null
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
|
|
@@ -0,0 +1,77 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+package org.apache.rocketmq.store;
|
|
+
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
+import org.apache.rocketmq.common.MixAll;
|
|
+import org.apache.rocketmq.common.message.MessageAccessor;
|
|
+import org.apache.rocketmq.common.message.MessageConst;
|
|
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
|
+
|
|
+/**
|
|
+ * MultiDispatch for lmq, not-thread-safe
|
|
+ */
|
|
+public class MultiDispatch {
|
|
+ private final StringBuilder keyBuilder = new StringBuilder();
|
|
+ private final DefaultMessageStore messageStore;
|
|
+ private static final short VALUE_OF_EACH_INCREMENT = 1;
|
|
+
|
|
+ public MultiDispatch(DefaultMessageStore messageStore) {
|
|
+ this.messageStore = messageStore;
|
|
+ }
|
|
+
|
|
+ public String queueKey(String queueName, MessageExtBrokerInner msgInner) {
|
|
+ keyBuilder.delete(0, keyBuilder.length());
|
|
+ keyBuilder.append(queueName);
|
|
+ keyBuilder.append('-');
|
|
+ int queueId = msgInner.getQueueId();
|
|
+ if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
|
|
+ queueId = 0;
|
|
+ }
|
|
+ keyBuilder.append(queueId);
|
|
+ return keyBuilder.toString();
|
|
+ }
|
|
+
|
|
+ public void wrapMultiDispatch(final MessageExtBrokerInner msg) {
|
|
+
|
|
+ String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
|
|
+ String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
|
|
+ Long[] queueOffsets = new Long[queues.length];
|
|
+ if (messageStore.getMessageStoreConfig().isEnableLmq()) {
|
|
+ for (int i = 0; i < queues.length; i++) {
|
|
+ String key = queueKey(queues[i], msg);
|
|
+ if (MixAll.isLmq(key)) {
|
|
+ queueOffsets[i] = messageStore.getQueueStore().getLmqQueueOffset(key);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
|
|
+ StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
|
|
+ msg.removeWaitStorePropertyString();
|
|
+ }
|
|
+
|
|
+ public void updateMultiQueueOffset(final MessageExtBrokerInner msgInner) {
|
|
+ String multiDispatchQueue = msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
|
|
+ String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
|
|
+ for (String queue : queues) {
|
|
+ String key = queueKey(queue, msgInner);
|
|
+ if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) {
|
|
+ messageStore.getQueueStore().increaseLmqOffset(key, VALUE_OF_EACH_INCREMENT);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+}
|
|
\ No newline at end of file
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
|
|
index 30054fa50..d76b05577 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
|
|
@@ -74,6 +74,16 @@ public abstract class AbstractConsumeQueueStore implements ConsumeQueueStoreInte
|
|
consumeQueue.increaseQueueOffset(this.queueOffsetOperator, msg, messageNum);
|
|
}
|
|
|
|
+ @Override
|
|
+ public void increaseLmqOffset(String queueKey, short messageNum) {
|
|
+ queueOffsetOperator.increaseLmqOffset(queueKey, messageNum);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public long getLmqQueueOffset(String queueKey) {
|
|
+ return queueOffsetOperator.getLmqOffset(queueKey);
|
|
+ }
|
|
+
|
|
@Override
|
|
public void removeTopicQueueTable(String topic, Integer queueId) {
|
|
this.queueOffsetOperator.remove(topic, queueId);
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
|
|
index c65f2a68b..768c782b1 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
|
|
@@ -181,7 +181,6 @@ public interface ConsumeQueueInterface extends FileQueueLifeCycle {
|
|
*/
|
|
void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg) throws RocksDBException;
|
|
|
|
-
|
|
/**
|
|
* Increase queue offset.
|
|
* @param queueOffsetAssigner the delegated queue offset assigner
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
|
|
index 268803dcc..e68880a82 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
|
|
@@ -183,6 +183,20 @@ public interface ConsumeQueueStoreInterface {
|
|
*/
|
|
void increaseQueueOffset(MessageExtBrokerInner msg, short messageNum);
|
|
|
|
+ /**
|
|
+ * Increase lmq offset
|
|
+ * @param queueKey
|
|
+ * @param messageNum
|
|
+ */
|
|
+ void increaseLmqOffset(String queueKey, short messageNum);
|
|
+
|
|
+ /**
|
|
+ * get lmq queue offset
|
|
+ * @param queueKey
|
|
+ * @return
|
|
+ */
|
|
+ long getLmqQueueOffset(String queueKey);
|
|
+
|
|
/**
|
|
* recover topicQueue table by minPhyOffset
|
|
* @param minPhyOffset
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
|
|
similarity index 78%
|
|
rename from store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java
|
|
rename to store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
|
|
index d6291d908..44397a2fc 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
|
|
@@ -16,8 +16,6 @@
|
|
*/
|
|
package org.apache.rocketmq.store.queue;
|
|
|
|
-import java.util.ArrayList;
|
|
-import java.util.List;
|
|
import java.util.Map;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
@@ -27,7 +25,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
|
|
import org.apache.rocketmq.store.DispatchRequest;
|
|
import org.apache.rocketmq.store.config.MessageStoreConfig;
|
|
|
|
-public class MultiDispatch {
|
|
+public class MultiDispatchUtils {
|
|
|
|
public static String lmqQueueKey(String queueName) {
|
|
StringBuilder keyBuilder = new StringBuilder();
|
|
@@ -60,17 +58,4 @@ public class MultiDispatch {
|
|
}
|
|
return true;
|
|
}
|
|
-
|
|
- public static List<DispatchRequest> checkMultiDispatchQueue(MessageStoreConfig messageStoreConfig, List<DispatchRequest> dispatchRequests) {
|
|
- if (!messageStoreConfig.isEnableMultiDispatch() || dispatchRequests == null || dispatchRequests.size() == 0) {
|
|
- return null;
|
|
- }
|
|
- List<DispatchRequest> result = new ArrayList<>();
|
|
- for (DispatchRequest dispatchRequest : dispatchRequests) {
|
|
- if (checkMultiDispatchQueue(messageStoreConfig, dispatchRequest)) {
|
|
- result.add(dispatchRequest);
|
|
- }
|
|
- }
|
|
- return dispatchRequests;
|
|
- }
|
|
}
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
|
|
index 8da374828..5b4bf994e 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
|
|
@@ -71,9 +71,9 @@ public class QueueOffsetOperator {
|
|
return this.lmqTopicQueueTable.get(topicQueueKey);
|
|
}
|
|
|
|
- public void increaseLmqOffset(String topicQueueKey, short messageNum) {
|
|
- Long lmqOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, k -> 0L);
|
|
- this.lmqTopicQueueTable.put(topicQueueKey, lmqOffset + messageNum);
|
|
+ public void increaseLmqOffset(String queueKey, short messageNum) {
|
|
+ Long lmqOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, queueKey, k -> 0L);
|
|
+ this.lmqTopicQueueTable.put(queueKey, lmqOffset + messageNum);
|
|
}
|
|
|
|
public long currentQueueOffset(String topicQueueKey) {
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
|
|
index 759be395d..5a981bb4d 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
|
|
@@ -19,14 +19,10 @@ package org.apache.rocketmq.store.queue;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.List;
|
|
|
|
-import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.rocketmq.common.BoundaryType;
|
|
-import org.apache.rocketmq.common.MixAll;
|
|
import org.apache.rocketmq.common.Pair;
|
|
import org.apache.rocketmq.common.attribute.CQType;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
-import org.apache.rocketmq.common.message.MessageAccessor;
|
|
-import org.apache.rocketmq.common.message.MessageConst;
|
|
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
@@ -217,50 +213,12 @@ public class RocksDBConsumeQueue implements ConsumeQueueInterface {
|
|
queueOffsetOperator.updateQueueOffset(topicQueueKey, queueOffset);
|
|
}
|
|
msg.setQueueOffset(queueOffset);
|
|
-
|
|
- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28),
|
|
- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue.
|
|
- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) {
|
|
- return;
|
|
- }
|
|
- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
|
|
- if (StringUtils.isBlank(multiDispatchQueue)) {
|
|
- return;
|
|
- }
|
|
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
|
|
- Long[] queueOffsets = new Long[queues.length];
|
|
- for (int i = 0; i < queues.length; i++) {
|
|
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) {
|
|
- String key = MultiDispatch.lmqQueueKey(queues[i]);
|
|
- queueOffsets[i] = queueOffsetOperator.getLmqTopicQueueNextOffset(key);
|
|
- }
|
|
- }
|
|
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
|
|
- StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
|
|
- msg.removeWaitStorePropertyString();
|
|
}
|
|
|
|
@Override
|
|
public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg, short messageNum) {
|
|
String topicQueueKey = getTopic() + "-" + getQueueId();
|
|
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
|
|
-
|
|
- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28),
|
|
- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue.
|
|
- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) {
|
|
- return;
|
|
- }
|
|
- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
|
|
- if (StringUtils.isBlank(multiDispatchQueue)) {
|
|
- return;
|
|
- }
|
|
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
|
|
- for (int i = 0; i < queues.length; i++) {
|
|
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) {
|
|
- String key = MultiDispatch.lmqQueueKey(queues[i]);
|
|
- queueOffsetOperator.increaseLmqOffset(key, (short) 1);
|
|
- }
|
|
- }
|
|
}
|
|
|
|
@Override
|
|
diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
|
|
index 87bfe85da..374857149 100644
|
|
--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
|
|
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
|
|
@@ -44,7 +44,7 @@ public class AppendCallbackTest {
|
|
|
|
AppendMessageCallback callback;
|
|
|
|
- MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024);
|
|
+ MessageExtEncoder batchEncoder;
|
|
|
|
@Before
|
|
public void init() throws Exception {
|
|
@@ -53,12 +53,14 @@ public class AppendCallbackTest {
|
|
messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
|
|
messageStoreConfig.setMaxHashSlotNum(100);
|
|
messageStoreConfig.setMaxIndexNum(100 * 10);
|
|
+ messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024);
|
|
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore");
|
|
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog");
|
|
//too much reference
|
|
DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>());
|
|
CommitLog commitLog = new CommitLog(messageStore);
|
|
- callback = commitLog.new DefaultAppendMessageCallback();
|
|
+ callback = commitLog.new DefaultAppendMessageCallback(messageStoreConfig);
|
|
+ batchEncoder = new MessageExtEncoder(messageStoreConfig);
|
|
}
|
|
|
|
@After
|
|
diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
|
|
index c8ed4d74d..d882fc9d9 100644
|
|
--- a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
|
|
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
|
|
@@ -56,6 +56,7 @@ public class AppendPropCRCTest {
|
|
messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
|
|
messageStoreConfig.setMaxHashSlotNum(100);
|
|
messageStoreConfig.setMaxIndexNum(100 * 10);
|
|
+ messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024);
|
|
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore");
|
|
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog");
|
|
messageStoreConfig.setForceVerifyPropCRC(true);
|
|
@@ -63,8 +64,8 @@ public class AppendPropCRCTest {
|
|
//too much reference
|
|
DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>());
|
|
commitLog = new CommitLog(messageStore);
|
|
- encoder = new MessageExtEncoder(10 * 1024 * 1024, true);
|
|
- callback = commitLog.new DefaultAppendMessageCallback();
|
|
+ encoder = new MessageExtEncoder(messageStoreConfig);
|
|
+ callback = commitLog.new DefaultAppendMessageCallback(messageStoreConfig);
|
|
}
|
|
|
|
@After
|
|
diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
|
|
index 2447bbf68..eae5eaa07 100644
|
|
--- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
|
|
+++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
|
|
@@ -28,20 +28,19 @@ import org.apache.rocketmq.common.message.MessageConst;
|
|
import org.apache.rocketmq.common.message.MessageDecoder;
|
|
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
|
import org.apache.rocketmq.store.config.MessageStoreConfig;
|
|
-import org.apache.rocketmq.store.queue.MultiDispatch;
|
|
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
|
|
import org.junit.After;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.rocksdb.RocksDBException;
|
|
|
|
-import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathConsumeQueue;
|
|
import static org.junit.Assert.assertEquals;
|
|
import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.when;
|
|
|
|
public class MultiDispatchTest {
|
|
|
|
- private ConsumeQueue consumeQueue;
|
|
+ private MultiDispatch multiDispatch;
|
|
|
|
private DefaultMessageStore messageStore;
|
|
|
|
@@ -61,8 +60,7 @@ public class MultiDispatchTest {
|
|
BrokerConfig brokerConfig = new BrokerConfig();
|
|
//too much reference
|
|
messageStore = new DefaultMessageStore(messageStoreConfig, null, null, brokerConfig, new ConcurrentHashMap<>());
|
|
- consumeQueue = new ConsumeQueue("xxx", 0,
|
|
- getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()), messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore);
|
|
+ multiDispatch = new MultiDispatch(messageStore);
|
|
}
|
|
|
|
@After
|
|
@@ -74,14 +72,14 @@ public class MultiDispatchTest {
|
|
public void lmqQueueKey() {
|
|
MessageExtBrokerInner messageExtBrokerInner = mock(MessageExtBrokerInner.class);
|
|
when(messageExtBrokerInner.getQueueId()).thenReturn(2);
|
|
- String ret = MultiDispatch.lmqQueueKey("%LMQ%lmq123");
|
|
+ String ret = MultiDispatchUtils.lmqQueueKey("%LMQ%lmq123");
|
|
assertEquals(ret, "%LMQ%lmq123-0");
|
|
}
|
|
|
|
@Test
|
|
public void wrapMultiDispatch() throws RocksDBException {
|
|
MessageExtBrokerInner messageExtBrokerInner = buildMessageMultiQueue();
|
|
- messageStore.assignOffset(messageExtBrokerInner);
|
|
+ multiDispatch.wrapMultiDispatch(messageExtBrokerInner);
|
|
assertEquals(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET), "0,0");
|
|
}
|
|
|
|
diff --git a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
|
|
index df3c31c6e..e113b18f1 100644
|
|
--- a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
|
|
+++ b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
|
|
@@ -86,7 +86,7 @@ public class CompactionLogTest {
|
|
int compactionCqFileSize = 1024;
|
|
|
|
|
|
- private static MessageExtEncoder encoder = new MessageExtEncoder(1024);
|
|
+ private static MessageExtEncoder encoder = new MessageExtEncoder(1024, new MessageStoreConfig());
|
|
private static SocketAddress storeHost;
|
|
private static SocketAddress bornHost;
|
|
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 70dc93abbcb9bf161378d66fcaca55bedc78b905 Mon Sep 17 00:00:00 2001
|
|
From: yangguodong <1174533476@qq.com>
|
|
Date: Wed, 8 Nov 2023 21:14:54 -0600
|
|
Subject: [PATCH 2/2] Fix tiered store README.md error about Configuration
|
|
(#7436)
|
|
|
|
* Fix tiered store README.md error about Configuration
|
|
|
|
* Fix change tieredStoreFilePath to tieredStoreFilepath
|
|
|
|
* revert README.md change
|
|
|
|
---------
|
|
|
|
Co-authored-by: yangguodong.cn <yangguodong.cn@bytedance.com>
|
|
---
|
|
.../tieredstore/common/TieredMessageStoreConfig.java | 10 +++++-----
|
|
.../tieredstore/provider/posix/PosixFileSegment.java | 4 ++--
|
|
.../rocketmq/tieredstore/file/TieredCommitLogTest.java | 2 +-
|
|
.../provider/posix/PosixFileSegmentTest.java | 2 +-
|
|
4 files changed, 9 insertions(+), 9 deletions(-)
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
|
|
index 595db6b86..a112ea6b1 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
|
|
@@ -115,7 +115,7 @@ public class TieredMessageStoreConfig {
|
|
private long readAheadCacheExpireDuration = 10 * 1000;
|
|
private double readAheadCacheSizeThresholdRate = 0.3;
|
|
|
|
- private String tieredStoreFilePath = "";
|
|
+ private String tieredStoreFilepath = "";
|
|
|
|
private String objectStoreEndpoint = "";
|
|
|
|
@@ -350,12 +350,12 @@ public class TieredMessageStoreConfig {
|
|
this.readAheadCacheSizeThresholdRate = rate;
|
|
}
|
|
|
|
- public String getTieredStoreFilePath() {
|
|
- return tieredStoreFilePath;
|
|
+ public String getTieredStoreFilepath() {
|
|
+ return tieredStoreFilepath;
|
|
}
|
|
|
|
- public void setTieredStoreFilePath(String tieredStoreFilePath) {
|
|
- this.tieredStoreFilePath = tieredStoreFilePath;
|
|
+ public void setTieredStoreFilepath(String tieredStoreFilepath) {
|
|
+ this.tieredStoreFilepath = tieredStoreFilepath;
|
|
}
|
|
|
|
public void setObjectStoreEndpoint(String objectStoreEndpoint) {
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
|
|
index 7e949cb28..708ce33f9 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
|
|
@@ -66,8 +66,8 @@ public class PosixFileSegment extends TieredFileSegment {
|
|
super(storeConfig, fileType, filePath, baseOffset);
|
|
|
|
// basePath
|
|
- String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilePath(),
|
|
- StringUtils.appendIfMissing(storeConfig.getTieredStoreFilePath(), File.separator));
|
|
+ String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilepath(),
|
|
+ StringUtils.appendIfMissing(storeConfig.getTieredStoreFilepath(), File.separator));
|
|
|
|
// fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset
|
|
String brokerClusterName = storeConfig.getBrokerClusterName();
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
|
|
index 6693d3cb7..80cdba977 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
|
|
@@ -49,7 +49,7 @@ public class TieredCommitLogTest {
|
|
TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
|
|
storeConfig.setBrokerName("brokerName");
|
|
storeConfig.setStorePathRootDir(storePath);
|
|
- storeConfig.setTieredStoreFilePath(storePath + File.separator);
|
|
+ storeConfig.setTieredStoreFilepath(storePath + File.separator);
|
|
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
|
|
storeConfig.setCommitLogRollingInterval(0);
|
|
storeConfig.setTieredStoreCommitLogMaxSize(1000);
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
|
|
index db33ae847..ede62b8ce 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
|
|
@@ -42,7 +42,7 @@ public class PosixFileSegmentTest {
|
|
@Before
|
|
public void setUp() {
|
|
storeConfig = new TieredMessageStoreConfig();
|
|
- storeConfig.setTieredStoreFilePath(storePath);
|
|
+ storeConfig.setTieredStoreFilepath(storePath);
|
|
mq = new MessageQueue("OSSFileSegmentTest", "broker", 0);
|
|
TieredStoreExecutor.init();
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|