!38 Lock granularity issue causing LMQ message loss
From: @zhiliatox Reviewed-by: @hu-zongtang Signed-off-by: @hu-zongtang
This commit is contained in:
commit
4eee4c0e83
@ -0,0 +1,986 @@
|
||||
From ead3d905016d9db4785a46beaa555c7fafd4f9bb Mon Sep 17 00:00:00 2001
|
||||
From: Dongyuan Pan <dongyuanpan0@gmail.com>
|
||||
Date: Wed, 8 Nov 2023 10:40:52 +0800
|
||||
Subject: [PATCH 1/2] [ISSUE #7511] Lock granularity issue causing LMQ message
|
||||
loss (#7525)
|
||||
|
||||
* bug fix: assignOffset and increaseOffset in LMQ has concurrency issues in topicQueueLock, should be in putMessageLock
|
||||
|
||||
* fix MultiDispatchTest
|
||||
|
||||
* fix MultiDispatchTest
|
||||
|
||||
* fix unit test
|
||||
---
|
||||
.../common/message/MessageExtBrokerInner.java | 10 ++
|
||||
.../org/apache/rocketmq/store/CommitLog.java | 94 ++++++++++++--
|
||||
.../apache/rocketmq/store/ConsumeQueue.java | 44 +------
|
||||
.../rocketmq/store/DefaultMessageStore.java | 1 -
|
||||
.../rocketmq/store/MessageExtEncoder.java | 118 ++++++++++++++++--
|
||||
.../apache/rocketmq/store/MultiDispatch.java | 77 ++++++++++++
|
||||
.../queue/AbstractConsumeQueueStore.java | 10 ++
|
||||
.../store/queue/ConsumeQueueInterface.java | 1 -
|
||||
.../queue/ConsumeQueueStoreInterface.java | 14 +++
|
||||
...iDispatch.java => MultiDispatchUtils.java} | 17 +--
|
||||
.../store/queue/QueueOffsetOperator.java | 6 +-
|
||||
.../store/queue/RocksDBConsumeQueue.java | 42 -------
|
||||
.../rocketmq/store/AppendCallbackTest.java | 6 +-
|
||||
.../rocketmq/store/AppendPropCRCTest.java | 5 +-
|
||||
.../rocketmq/store/MultiDispatchTest.java | 12 +-
|
||||
.../rocketmq/store/kv/CompactionLogTest.java | 2 +-
|
||||
16 files changed, 322 insertions(+), 137 deletions(-)
|
||||
create mode 100644 store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
|
||||
rename store/src/main/java/org/apache/rocketmq/store/queue/{MultiDispatch.java => MultiDispatchUtils.java} (78%)
|
||||
|
||||
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
|
||||
index 52501dbca..147f23f12 100644
|
||||
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
|
||||
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
|
||||
@@ -28,6 +28,8 @@ public class MessageExtBrokerInner extends MessageExt {
|
||||
|
||||
private ByteBuffer encodedBuff;
|
||||
|
||||
+ private volatile boolean encodeCompleted;
|
||||
+
|
||||
private MessageVersion version = MessageVersion.MESSAGE_VERSION_V1;
|
||||
|
||||
public ByteBuffer getEncodedBuff() {
|
||||
@@ -92,4 +94,12 @@ public class MessageExtBrokerInner extends MessageExt {
|
||||
this.setPropertiesString(MessageDecoder.messageProperties2String(this.getProperties()));
|
||||
}
|
||||
}
|
||||
+
|
||||
+ public boolean isEncodeCompleted() {
|
||||
+ return encodeCompleted;
|
||||
+ }
|
||||
+
|
||||
+ public void setEncodeCompleted(boolean encodeCompleted) {
|
||||
+ this.encodeCompleted = encodeCompleted;
|
||||
+ }
|
||||
}
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
||||
index 6c3afde70..35c1d0e2d 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
||||
@@ -35,6 +35,7 @@ import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import com.sun.jna.NativeLong;
|
||||
import com.sun.jna.Pointer;
|
||||
+import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.ServiceThread;
|
||||
import org.apache.rocketmq.common.SystemClock;
|
||||
@@ -56,6 +57,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||||
import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal;
|
||||
import org.apache.rocketmq.store.config.BrokerRole;
|
||||
import org.apache.rocketmq.store.config.FlushDiskType;
|
||||
+import org.apache.rocketmq.store.config.MessageStoreConfig;
|
||||
import org.apache.rocketmq.store.ha.HAService;
|
||||
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
|
||||
import org.apache.rocketmq.store.logfile.MappedFile;
|
||||
@@ -101,6 +103,7 @@ public class CommitLog implements Swappable {
|
||||
protected int commitLogSize;
|
||||
|
||||
private final boolean enabledAppendPropCRC;
|
||||
+ protected final MultiDispatch multiDispatch;
|
||||
|
||||
public CommitLog(final DefaultMessageStore messageStore) {
|
||||
String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog();
|
||||
@@ -119,13 +122,11 @@ public class CommitLog implements Swappable {
|
||||
this.flushManager = new DefaultFlushManager();
|
||||
this.coldDataCheckService = new ColdDataCheckService();
|
||||
|
||||
- this.appendMessageCallback = new DefaultAppendMessageCallback();
|
||||
+ this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig());
|
||||
putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
|
||||
@Override
|
||||
protected PutMessageThreadLocal initialValue() {
|
||||
- return new PutMessageThreadLocal(
|
||||
- defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(),
|
||||
- defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC());
|
||||
+ return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig());
|
||||
}
|
||||
};
|
||||
this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
|
||||
@@ -137,6 +138,8 @@ public class CommitLog implements Swappable {
|
||||
this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
|
||||
|
||||
this.enabledAppendPropCRC = messageStore.getMessageStoreConfig().isEnabledAppendPropCRC();
|
||||
+
|
||||
+ this.multiDispatch = new MultiDispatch(defaultMessageStore);
|
||||
}
|
||||
|
||||
public void setFullStorePaths(Set<String> fullStorePaths) {
|
||||
@@ -1830,15 +1833,84 @@ public class CommitLog implements Swappable {
|
||||
// Store the message content
|
||||
private final ByteBuffer msgStoreItemMemory;
|
||||
private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN;
|
||||
+ private final MessageStoreConfig messageStoreConfig;
|
||||
|
||||
- DefaultAppendMessageCallback() {
|
||||
+ DefaultAppendMessageCallback(MessageStoreConfig messageStoreConfig) {
|
||||
this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
|
||||
+ this.messageStoreConfig = messageStoreConfig;
|
||||
+ }
|
||||
+
|
||||
+ public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, final MessageExtBrokerInner msgInner) {
|
||||
+ if (msgInner.isEncodeCompleted()) {
|
||||
+ return null;
|
||||
+ }
|
||||
+
|
||||
+ multiDispatch.wrapMultiDispatch(msgInner);
|
||||
+
|
||||
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
|
||||
+
|
||||
+ final byte[] propertiesData =
|
||||
+ msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
|
||||
+
|
||||
+ boolean needAppendLastPropertySeparator = enabledAppendPropCRC && propertiesData != null && propertiesData.length > 0
|
||||
+ && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR;
|
||||
+
|
||||
+ final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength;
|
||||
+
|
||||
+ if (propertiesLength > Short.MAX_VALUE) {
|
||||
+ log.warn("putMessage message properties length too long. length={}", propertiesData.length);
|
||||
+ return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
|
||||
+ }
|
||||
+
|
||||
+ int msgLenWithoutProperties = preEncodeBuffer.getInt(0);
|
||||
+
|
||||
+ int msgLen = msgLenWithoutProperties + 2 + propertiesLength;
|
||||
+
|
||||
+ // Exceeds the maximum message
|
||||
+ if (msgLen > this.messageStoreConfig.getMaxMessageSize()) {
|
||||
+ log.warn("message size exceeded, msg total size: " + msgLen + ", maxMessageSize: " + this.messageStoreConfig.getMaxMessageSize());
|
||||
+ return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
|
||||
+ }
|
||||
+
|
||||
+ // Back filling total message length
|
||||
+ preEncodeBuffer.putInt(0, msgLen);
|
||||
+ // Modify position to msgLenWithoutProperties
|
||||
+ preEncodeBuffer.position(msgLenWithoutProperties);
|
||||
+
|
||||
+ preEncodeBuffer.putShort((short) propertiesLength);
|
||||
+
|
||||
+ if (propertiesLength > crc32ReservedLength) {
|
||||
+ preEncodeBuffer.put(propertiesData);
|
||||
+ }
|
||||
+
|
||||
+ if (needAppendLastPropertySeparator) {
|
||||
+ preEncodeBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR);
|
||||
+ }
|
||||
+ // 18 CRC32
|
||||
+ preEncodeBuffer.position(preEncodeBuffer.position() + crc32ReservedLength);
|
||||
+
|
||||
+ msgInner.setEncodeCompleted(true);
|
||||
+
|
||||
+ return null;
|
||||
}
|
||||
|
||||
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
|
||||
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
|
||||
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
|
||||
|
||||
+ ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
|
||||
+ boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner);
|
||||
+ if (isMultiDispatchMsg) {
|
||||
+ AppendMessageResult appendMessageResult = handlePropertiesForLmqMsg(preEncodeBuffer, msgInner);
|
||||
+ if (appendMessageResult != null) {
|
||||
+ return appendMessageResult;
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ final int msgLen = preEncodeBuffer.getInt(0);
|
||||
+ preEncodeBuffer.position(0);
|
||||
+ preEncodeBuffer.limit(msgLen);
|
||||
+
|
||||
// PHY OFFSET
|
||||
long wroteOffset = fileFromOffset + byteBuffer.position();
|
||||
|
||||
@@ -1872,9 +1944,6 @@ public class CommitLog implements Swappable {
|
||||
break;
|
||||
}
|
||||
|
||||
- ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
|
||||
- final int msgLen = preEncodeBuffer.getInt(0);
|
||||
-
|
||||
// Determines whether there is sufficient free space
|
||||
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
|
||||
this.msgStoreItemMemory.clear();
|
||||
@@ -1919,6 +1988,11 @@ public class CommitLog implements Swappable {
|
||||
byteBuffer.put(preEncodeBuffer);
|
||||
CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS");
|
||||
msgInner.setEncodedBuff(null);
|
||||
+
|
||||
+ if (isMultiDispatchMsg) {
|
||||
+ CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
|
||||
+ }
|
||||
+
|
||||
return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
|
||||
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum);
|
||||
}
|
||||
@@ -2159,6 +2233,10 @@ public class CommitLog implements Swappable {
|
||||
return flushManager;
|
||||
}
|
||||
|
||||
+ public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) {
|
||||
+ return StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
|
||||
+ }
|
||||
+
|
||||
private boolean isCloseReadAhead() {
|
||||
return !MixAll.isWindows() && !defaultMessageStore.getMessageStoreConfig().isDataReadAheadEnable();
|
||||
}
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
|
||||
index 623509c8b..453c9d1dc 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
|
||||
@@ -27,7 +27,6 @@ import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.Pair;
|
||||
import org.apache.rocketmq.common.attribute.CQType;
|
||||
import org.apache.rocketmq.common.constant.LoggerName;
|
||||
-import org.apache.rocketmq.common.message.MessageAccessor;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
||||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||||
@@ -38,7 +37,7 @@ import org.apache.rocketmq.store.logfile.MappedFile;
|
||||
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
|
||||
import org.apache.rocketmq.store.queue.CqUnit;
|
||||
import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
|
||||
-import org.apache.rocketmq.store.queue.MultiDispatch;
|
||||
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
|
||||
import org.apache.rocketmq.store.queue.QueueOffsetOperator;
|
||||
import org.apache.rocketmq.store.queue.ReferredIterator;
|
||||
|
||||
@@ -702,7 +701,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
|
||||
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
|
||||
}
|
||||
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
|
||||
- if (MultiDispatch.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) {
|
||||
+ if (MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) {
|
||||
multiDispatchLmqQueue(request, maxRetries);
|
||||
}
|
||||
return;
|
||||
@@ -776,28 +775,6 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
|
||||
String topicQueueKey = getTopic() + "-" + getQueueId();
|
||||
long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey);
|
||||
msg.setQueueOffset(queueOffset);
|
||||
-
|
||||
-
|
||||
- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28),
|
||||
- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue.
|
||||
- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) {
|
||||
- return;
|
||||
- }
|
||||
- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
|
||||
- if (StringUtils.isBlank(multiDispatchQueue)) {
|
||||
- return;
|
||||
- }
|
||||
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
|
||||
- Long[] queueOffsets = new Long[queues.length];
|
||||
- for (int i = 0; i < queues.length; i++) {
|
||||
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) {
|
||||
- String key = MultiDispatch.lmqQueueKey(queues[i]);
|
||||
- queueOffsets[i] = queueOffsetOperator.getLmqOffset(key);
|
||||
- }
|
||||
- }
|
||||
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
|
||||
- StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
|
||||
- msg.removeWaitStorePropertyString();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -805,23 +782,6 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
|
||||
short messageNum) {
|
||||
String topicQueueKey = getTopic() + "-" + getQueueId();
|
||||
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
|
||||
-
|
||||
- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28),
|
||||
- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue.
|
||||
- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) {
|
||||
- return;
|
||||
- }
|
||||
- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
|
||||
- if (StringUtils.isBlank(multiDispatchQueue)) {
|
||||
- return;
|
||||
- }
|
||||
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
|
||||
- for (int i = 0; i < queues.length; i++) {
|
||||
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) {
|
||||
- String key = MultiDispatch.lmqQueueKey(queues[i]);
|
||||
- queueOffsetOperator.increaseLmqOffset(key, (short) 1);
|
||||
- }
|
||||
- }
|
||||
}
|
||||
|
||||
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
||||
index 99a54e2d7..dc5f312e5 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
||||
@@ -2112,7 +2112,6 @@ public class DefaultMessageStore implements MessageStore {
|
||||
}
|
||||
}
|
||||
|
||||
-
|
||||
@Override
|
||||
public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
|
||||
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
|
||||
index c1d808728..20e9a652b 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
|
||||
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageVersion;
|
||||
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
|
||||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||||
+import org.apache.rocketmq.store.config.MessageStoreConfig;
|
||||
|
||||
public class MessageExtEncoder {
|
||||
protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
|
||||
@@ -38,20 +39,22 @@ public class MessageExtEncoder {
|
||||
// The maximum length of the full message.
|
||||
private int maxMessageSize;
|
||||
private final int crc32ReservedLength;
|
||||
+ private MessageStoreConfig messageStoreConfig;
|
||||
|
||||
- public MessageExtEncoder(final int maxMessageBodySize) {
|
||||
- this(maxMessageBodySize, false);
|
||||
+ public MessageExtEncoder(final int maxMessageBodySize, final MessageStoreConfig messageStoreConfig) {
|
||||
+ this(messageStoreConfig);
|
||||
}
|
||||
|
||||
- public MessageExtEncoder(final int maxMessageBodySize, boolean enabledAppendPropCRC) {
|
||||
+ public MessageExtEncoder(final MessageStoreConfig messageStoreConfig) {
|
||||
ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
|
||||
+ this.messageStoreConfig = messageStoreConfig;
|
||||
+ this.maxMessageBodySize = messageStoreConfig.getMaxMessageSize();
|
||||
//Reserve 64kb for encoding buffer outside body
|
||||
int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ?
|
||||
maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
|
||||
byteBuf = alloc.directBuffer(maxMessageSize);
|
||||
- this.maxMessageBodySize = maxMessageBodySize;
|
||||
this.maxMessageSize = maxMessageSize;
|
||||
- this.crc32ReservedLength = enabledAppendPropCRC ? CommitLog.CRC32_RESERVED_LEN : 0;
|
||||
+ this.crc32ReservedLength = messageStoreConfig.isEnabledAppendPropCRC() ? CommitLog.CRC32_RESERVED_LEN : 0;
|
||||
}
|
||||
|
||||
public static int calMsgLength(MessageVersion messageVersion,
|
||||
@@ -79,8 +82,103 @@ public class MessageExtEncoder {
|
||||
+ 2 + (Math.max(propertiesLength, 0)); //propertiesLength
|
||||
}
|
||||
|
||||
+ public static int calMsgLengthNoProperties(MessageVersion messageVersion,
|
||||
+ int sysFlag, int bodyLength, int topicLength) {
|
||||
+
|
||||
+ int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
|
||||
+ int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
|
||||
+
|
||||
+ return 4 //TOTALSIZE
|
||||
+ + 4 //MAGICCODE
|
||||
+ + 4 //BODYCRC
|
||||
+ + 4 //QUEUEID
|
||||
+ + 4 //FLAG
|
||||
+ + 8 //QUEUEOFFSET
|
||||
+ + 8 //PHYSICALOFFSET
|
||||
+ + 4 //SYSFLAG
|
||||
+ + 8 //BORNTIMESTAMP
|
||||
+ + bornhostLength //BORNHOST
|
||||
+ + 8 //STORETIMESTAMP
|
||||
+ + storehostAddressLength //STOREHOSTADDRESS
|
||||
+ + 4 //RECONSUMETIMES
|
||||
+ + 8 //Prepared Transaction Offset
|
||||
+ + 4 + (Math.max(bodyLength, 0)) //BODY
|
||||
+ + messageVersion.getTopicLengthSize() + topicLength; //TOPIC
|
||||
+ }
|
||||
+
|
||||
+ public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner msgInner) {
|
||||
+
|
||||
+ final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
|
||||
+ final int topicLength = topicData.length;
|
||||
+
|
||||
+ final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
|
||||
+
|
||||
+ // Exceeds the maximum message body
|
||||
+ if (bodyLength > this.maxMessageBodySize) {
|
||||
+ CommitLog.log.warn("message body size exceeded, msg body size: " + bodyLength
|
||||
+ + ", maxMessageSize: " + this.maxMessageBodySize);
|
||||
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
|
||||
+ }
|
||||
+
|
||||
+ final int msgLenNoProperties = calMsgLengthNoProperties(msgInner.getVersion(), msgInner.getSysFlag(), bodyLength, topicLength);
|
||||
+
|
||||
+ // 1 TOTALSIZE
|
||||
+ this.byteBuf.writeInt(msgLenNoProperties);
|
||||
+ // 2 MAGICCODE
|
||||
+ this.byteBuf.writeInt(msgInner.getVersion().getMagicCode());
|
||||
+ // 3 BODYCRC
|
||||
+ this.byteBuf.writeInt(msgInner.getBodyCRC());
|
||||
+ // 4 QUEUEID
|
||||
+ this.byteBuf.writeInt(msgInner.getQueueId());
|
||||
+ // 5 FLAG
|
||||
+ this.byteBuf.writeInt(msgInner.getFlag());
|
||||
+ // 6 QUEUEOFFSET, need update later
|
||||
+ this.byteBuf.writeLong(0);
|
||||
+ // 7 PHYSICALOFFSET, need update later
|
||||
+ this.byteBuf.writeLong(0);
|
||||
+ // 8 SYSFLAG
|
||||
+ this.byteBuf.writeInt(msgInner.getSysFlag());
|
||||
+ // 9 BORNTIMESTAMP
|
||||
+ this.byteBuf.writeLong(msgInner.getBornTimestamp());
|
||||
+
|
||||
+ // 10 BORNHOST
|
||||
+ ByteBuffer bornHostBytes = msgInner.getBornHostBytes();
|
||||
+ this.byteBuf.writeBytes(bornHostBytes.array());
|
||||
+
|
||||
+ // 11 STORETIMESTAMP
|
||||
+ this.byteBuf.writeLong(msgInner.getStoreTimestamp());
|
||||
+
|
||||
+ // 12 STOREHOSTADDRESS
|
||||
+ ByteBuffer storeHostBytes = msgInner.getStoreHostBytes();
|
||||
+ this.byteBuf.writeBytes(storeHostBytes.array());
|
||||
+
|
||||
+ // 13 RECONSUMETIMES
|
||||
+ this.byteBuf.writeInt(msgInner.getReconsumeTimes());
|
||||
+ // 14 Prepared Transaction Offset
|
||||
+ this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset());
|
||||
+ // 15 BODY
|
||||
+ this.byteBuf.writeInt(bodyLength);
|
||||
+ if (bodyLength > 0)
|
||||
+ this.byteBuf.writeBytes(msgInner.getBody());
|
||||
+
|
||||
+ // 16 TOPIC
|
||||
+ if (MessageVersion.MESSAGE_VERSION_V2.equals(msgInner.getVersion())) {
|
||||
+ this.byteBuf.writeShort((short) topicLength);
|
||||
+ } else {
|
||||
+ this.byteBuf.writeByte((byte) topicLength);
|
||||
+ }
|
||||
+ this.byteBuf.writeBytes(topicData);
|
||||
+
|
||||
+ return null;
|
||||
+ }
|
||||
+
|
||||
public PutMessageResult encode(MessageExtBrokerInner msgInner) {
|
||||
this.byteBuf.clear();
|
||||
+
|
||||
+ if (messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner)) {
|
||||
+ return encodeWithoutProperties(msgInner);
|
||||
+ }
|
||||
+
|
||||
/**
|
||||
* Serialize message
|
||||
*/
|
||||
@@ -303,7 +401,7 @@ public class MessageExtEncoder {
|
||||
}
|
||||
|
||||
public ByteBuffer getEncoderBuffer() {
|
||||
- return this.byteBuf.nioBuffer();
|
||||
+ return this.byteBuf.nioBuffer(0, this.byteBuf.capacity());
|
||||
}
|
||||
|
||||
public int getMaxMessageBodySize() {
|
||||
@@ -322,12 +420,8 @@ public class MessageExtEncoder {
|
||||
private final MessageExtEncoder encoder;
|
||||
private final StringBuilder keyBuilder;
|
||||
|
||||
- PutMessageThreadLocal(int size) {
|
||||
- this(size, false);
|
||||
- }
|
||||
-
|
||||
- PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) {
|
||||
- encoder = new MessageExtEncoder(size, enabledAppendPropCRC);
|
||||
+ PutMessageThreadLocal(MessageStoreConfig messageStoreConfig) {
|
||||
+ encoder = new MessageExtEncoder(messageStoreConfig);
|
||||
keyBuilder = new StringBuilder();
|
||||
}
|
||||
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
|
||||
new file mode 100644
|
||||
index 000000000..5bc587a8e
|
||||
--- /dev/null
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
|
||||
@@ -0,0 +1,77 @@
|
||||
+/*
|
||||
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
+ * contributor license agreements. See the NOTICE file distributed with
|
||||
+ * this work for additional information regarding copyright ownership.
|
||||
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
+ * (the "License"); you may not use this file except in compliance with
|
||||
+ * the License. You may obtain a copy of the License at
|
||||
+ *
|
||||
+ * http://www.apache.org/licenses/LICENSE-2.0
|
||||
+ *
|
||||
+ * Unless required by applicable law or agreed to in writing, software
|
||||
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
||||
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
+ * See the License for the specific language governing permissions and
|
||||
+ * limitations under the License.
|
||||
+ */
|
||||
+package org.apache.rocketmq.store;
|
||||
+
|
||||
+import org.apache.commons.lang3.StringUtils;
|
||||
+import org.apache.rocketmq.common.MixAll;
|
||||
+import org.apache.rocketmq.common.message.MessageAccessor;
|
||||
+import org.apache.rocketmq.common.message.MessageConst;
|
||||
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
||||
+
|
||||
+/**
|
||||
+ * MultiDispatch for lmq, not-thread-safe
|
||||
+ */
|
||||
+public class MultiDispatch {
|
||||
+ private final StringBuilder keyBuilder = new StringBuilder();
|
||||
+ private final DefaultMessageStore messageStore;
|
||||
+ private static final short VALUE_OF_EACH_INCREMENT = 1;
|
||||
+
|
||||
+ public MultiDispatch(DefaultMessageStore messageStore) {
|
||||
+ this.messageStore = messageStore;
|
||||
+ }
|
||||
+
|
||||
+ public String queueKey(String queueName, MessageExtBrokerInner msgInner) {
|
||||
+ keyBuilder.delete(0, keyBuilder.length());
|
||||
+ keyBuilder.append(queueName);
|
||||
+ keyBuilder.append('-');
|
||||
+ int queueId = msgInner.getQueueId();
|
||||
+ if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
|
||||
+ queueId = 0;
|
||||
+ }
|
||||
+ keyBuilder.append(queueId);
|
||||
+ return keyBuilder.toString();
|
||||
+ }
|
||||
+
|
||||
+ public void wrapMultiDispatch(final MessageExtBrokerInner msg) {
|
||||
+
|
||||
+ String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
|
||||
+ String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
|
||||
+ Long[] queueOffsets = new Long[queues.length];
|
||||
+ if (messageStore.getMessageStoreConfig().isEnableLmq()) {
|
||||
+ for (int i = 0; i < queues.length; i++) {
|
||||
+ String key = queueKey(queues[i], msg);
|
||||
+ if (MixAll.isLmq(key)) {
|
||||
+ queueOffsets[i] = messageStore.getQueueStore().getLmqQueueOffset(key);
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
|
||||
+ StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
|
||||
+ msg.removeWaitStorePropertyString();
|
||||
+ }
|
||||
+
|
||||
+ public void updateMultiQueueOffset(final MessageExtBrokerInner msgInner) {
|
||||
+ String multiDispatchQueue = msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
|
||||
+ String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
|
||||
+ for (String queue : queues) {
|
||||
+ String key = queueKey(queue, msgInner);
|
||||
+ if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) {
|
||||
+ messageStore.getQueueStore().increaseLmqOffset(key, VALUE_OF_EACH_INCREMENT);
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+}
|
||||
\ No newline at end of file
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
|
||||
index 30054fa50..d76b05577 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
|
||||
@@ -74,6 +74,16 @@ public abstract class AbstractConsumeQueueStore implements ConsumeQueueStoreInte
|
||||
consumeQueue.increaseQueueOffset(this.queueOffsetOperator, msg, messageNum);
|
||||
}
|
||||
|
||||
+ @Override
|
||||
+ public void increaseLmqOffset(String queueKey, short messageNum) {
|
||||
+ queueOffsetOperator.increaseLmqOffset(queueKey, messageNum);
|
||||
+ }
|
||||
+
|
||||
+ @Override
|
||||
+ public long getLmqQueueOffset(String queueKey) {
|
||||
+ return queueOffsetOperator.getLmqOffset(queueKey);
|
||||
+ }
|
||||
+
|
||||
@Override
|
||||
public void removeTopicQueueTable(String topic, Integer queueId) {
|
||||
this.queueOffsetOperator.remove(topic, queueId);
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
|
||||
index c65f2a68b..768c782b1 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
|
||||
@@ -181,7 +181,6 @@ public interface ConsumeQueueInterface extends FileQueueLifeCycle {
|
||||
*/
|
||||
void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg) throws RocksDBException;
|
||||
|
||||
-
|
||||
/**
|
||||
* Increase queue offset.
|
||||
* @param queueOffsetAssigner the delegated queue offset assigner
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
|
||||
index 268803dcc..e68880a82 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
|
||||
@@ -183,6 +183,20 @@ public interface ConsumeQueueStoreInterface {
|
||||
*/
|
||||
void increaseQueueOffset(MessageExtBrokerInner msg, short messageNum);
|
||||
|
||||
+ /**
|
||||
+ * Increase lmq offset
|
||||
+ * @param queueKey
|
||||
+ * @param messageNum
|
||||
+ */
|
||||
+ void increaseLmqOffset(String queueKey, short messageNum);
|
||||
+
|
||||
+ /**
|
||||
+ * get lmq queue offset
|
||||
+ * @param queueKey
|
||||
+ * @return
|
||||
+ */
|
||||
+ long getLmqQueueOffset(String queueKey);
|
||||
+
|
||||
/**
|
||||
* recover topicQueue table by minPhyOffset
|
||||
* @param minPhyOffset
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
|
||||
similarity index 78%
|
||||
rename from store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java
|
||||
rename to store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
|
||||
index d6291d908..44397a2fc 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
|
||||
@@ -16,8 +16,6 @@
|
||||
*/
|
||||
package org.apache.rocketmq.store.queue;
|
||||
|
||||
-import java.util.ArrayList;
|
||||
-import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@@ -27,7 +25,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
|
||||
import org.apache.rocketmq.store.DispatchRequest;
|
||||
import org.apache.rocketmq.store.config.MessageStoreConfig;
|
||||
|
||||
-public class MultiDispatch {
|
||||
+public class MultiDispatchUtils {
|
||||
|
||||
public static String lmqQueueKey(String queueName) {
|
||||
StringBuilder keyBuilder = new StringBuilder();
|
||||
@@ -60,17 +58,4 @@ public class MultiDispatch {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
-
|
||||
- public static List<DispatchRequest> checkMultiDispatchQueue(MessageStoreConfig messageStoreConfig, List<DispatchRequest> dispatchRequests) {
|
||||
- if (!messageStoreConfig.isEnableMultiDispatch() || dispatchRequests == null || dispatchRequests.size() == 0) {
|
||||
- return null;
|
||||
- }
|
||||
- List<DispatchRequest> result = new ArrayList<>();
|
||||
- for (DispatchRequest dispatchRequest : dispatchRequests) {
|
||||
- if (checkMultiDispatchQueue(messageStoreConfig, dispatchRequest)) {
|
||||
- result.add(dispatchRequest);
|
||||
- }
|
||||
- }
|
||||
- return dispatchRequests;
|
||||
- }
|
||||
}
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
|
||||
index 8da374828..5b4bf994e 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
|
||||
@@ -71,9 +71,9 @@ public class QueueOffsetOperator {
|
||||
return this.lmqTopicQueueTable.get(topicQueueKey);
|
||||
}
|
||||
|
||||
- public void increaseLmqOffset(String topicQueueKey, short messageNum) {
|
||||
- Long lmqOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, k -> 0L);
|
||||
- this.lmqTopicQueueTable.put(topicQueueKey, lmqOffset + messageNum);
|
||||
+ public void increaseLmqOffset(String queueKey, short messageNum) {
|
||||
+ Long lmqOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, queueKey, k -> 0L);
|
||||
+ this.lmqTopicQueueTable.put(queueKey, lmqOffset + messageNum);
|
||||
}
|
||||
|
||||
public long currentQueueOffset(String topicQueueKey) {
|
||||
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
|
||||
index 759be395d..5a981bb4d 100644
|
||||
--- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
|
||||
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
|
||||
@@ -19,14 +19,10 @@ package org.apache.rocketmq.store.queue;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
-import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.common.BoundaryType;
|
||||
-import org.apache.rocketmq.common.MixAll;
|
||||
import org.apache.rocketmq.common.Pair;
|
||||
import org.apache.rocketmq.common.attribute.CQType;
|
||||
import org.apache.rocketmq.common.constant.LoggerName;
|
||||
-import org.apache.rocketmq.common.message.MessageAccessor;
|
||||
-import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
||||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||||
@@ -217,50 +213,12 @@ public class RocksDBConsumeQueue implements ConsumeQueueInterface {
|
||||
queueOffsetOperator.updateQueueOffset(topicQueueKey, queueOffset);
|
||||
}
|
||||
msg.setQueueOffset(queueOffset);
|
||||
-
|
||||
- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28),
|
||||
- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue.
|
||||
- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) {
|
||||
- return;
|
||||
- }
|
||||
- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
|
||||
- if (StringUtils.isBlank(multiDispatchQueue)) {
|
||||
- return;
|
||||
- }
|
||||
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
|
||||
- Long[] queueOffsets = new Long[queues.length];
|
||||
- for (int i = 0; i < queues.length; i++) {
|
||||
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) {
|
||||
- String key = MultiDispatch.lmqQueueKey(queues[i]);
|
||||
- queueOffsets[i] = queueOffsetOperator.getLmqTopicQueueNextOffset(key);
|
||||
- }
|
||||
- }
|
||||
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
|
||||
- StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
|
||||
- msg.removeWaitStorePropertyString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg, short messageNum) {
|
||||
String topicQueueKey = getTopic() + "-" + getQueueId();
|
||||
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
|
||||
-
|
||||
- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28),
|
||||
- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue.
|
||||
- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) {
|
||||
- return;
|
||||
- }
|
||||
- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
|
||||
- if (StringUtils.isBlank(multiDispatchQueue)) {
|
||||
- return;
|
||||
- }
|
||||
- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
|
||||
- for (int i = 0; i < queues.length; i++) {
|
||||
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) {
|
||||
- String key = MultiDispatch.lmqQueueKey(queues[i]);
|
||||
- queueOffsetOperator.increaseLmqOffset(key, (short) 1);
|
||||
- }
|
||||
- }
|
||||
}
|
||||
|
||||
@Override
|
||||
diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
|
||||
index 87bfe85da..374857149 100644
|
||||
--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
|
||||
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
|
||||
@@ -44,7 +44,7 @@ public class AppendCallbackTest {
|
||||
|
||||
AppendMessageCallback callback;
|
||||
|
||||
- MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024);
|
||||
+ MessageExtEncoder batchEncoder;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
@@ -53,12 +53,14 @@ public class AppendCallbackTest {
|
||||
messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
|
||||
messageStoreConfig.setMaxHashSlotNum(100);
|
||||
messageStoreConfig.setMaxIndexNum(100 * 10);
|
||||
+ messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024);
|
||||
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore");
|
||||
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog");
|
||||
//too much reference
|
||||
DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>());
|
||||
CommitLog commitLog = new CommitLog(messageStore);
|
||||
- callback = commitLog.new DefaultAppendMessageCallback();
|
||||
+ callback = commitLog.new DefaultAppendMessageCallback(messageStoreConfig);
|
||||
+ batchEncoder = new MessageExtEncoder(messageStoreConfig);
|
||||
}
|
||||
|
||||
@After
|
||||
diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
|
||||
index c8ed4d74d..d882fc9d9 100644
|
||||
--- a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
|
||||
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
|
||||
@@ -56,6 +56,7 @@ public class AppendPropCRCTest {
|
||||
messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
|
||||
messageStoreConfig.setMaxHashSlotNum(100);
|
||||
messageStoreConfig.setMaxIndexNum(100 * 10);
|
||||
+ messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024);
|
||||
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore");
|
||||
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog");
|
||||
messageStoreConfig.setForceVerifyPropCRC(true);
|
||||
@@ -63,8 +64,8 @@ public class AppendPropCRCTest {
|
||||
//too much reference
|
||||
DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>());
|
||||
commitLog = new CommitLog(messageStore);
|
||||
- encoder = new MessageExtEncoder(10 * 1024 * 1024, true);
|
||||
- callback = commitLog.new DefaultAppendMessageCallback();
|
||||
+ encoder = new MessageExtEncoder(messageStoreConfig);
|
||||
+ callback = commitLog.new DefaultAppendMessageCallback(messageStoreConfig);
|
||||
}
|
||||
|
||||
@After
|
||||
diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
|
||||
index 2447bbf68..eae5eaa07 100644
|
||||
--- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
|
||||
+++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
|
||||
@@ -28,20 +28,19 @@ import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.apache.rocketmq.common.message.MessageDecoder;
|
||||
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
||||
import org.apache.rocketmq.store.config.MessageStoreConfig;
|
||||
-import org.apache.rocketmq.store.queue.MultiDispatch;
|
||||
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.rocksdb.RocksDBException;
|
||||
|
||||
-import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathConsumeQueue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class MultiDispatchTest {
|
||||
|
||||
- private ConsumeQueue consumeQueue;
|
||||
+ private MultiDispatch multiDispatch;
|
||||
|
||||
private DefaultMessageStore messageStore;
|
||||
|
||||
@@ -61,8 +60,7 @@ public class MultiDispatchTest {
|
||||
BrokerConfig brokerConfig = new BrokerConfig();
|
||||
//too much reference
|
||||
messageStore = new DefaultMessageStore(messageStoreConfig, null, null, brokerConfig, new ConcurrentHashMap<>());
|
||||
- consumeQueue = new ConsumeQueue("xxx", 0,
|
||||
- getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()), messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore);
|
||||
+ multiDispatch = new MultiDispatch(messageStore);
|
||||
}
|
||||
|
||||
@After
|
||||
@@ -74,14 +72,14 @@ public class MultiDispatchTest {
|
||||
public void lmqQueueKey() {
|
||||
MessageExtBrokerInner messageExtBrokerInner = mock(MessageExtBrokerInner.class);
|
||||
when(messageExtBrokerInner.getQueueId()).thenReturn(2);
|
||||
- String ret = MultiDispatch.lmqQueueKey("%LMQ%lmq123");
|
||||
+ String ret = MultiDispatchUtils.lmqQueueKey("%LMQ%lmq123");
|
||||
assertEquals(ret, "%LMQ%lmq123-0");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrapMultiDispatch() throws RocksDBException {
|
||||
MessageExtBrokerInner messageExtBrokerInner = buildMessageMultiQueue();
|
||||
- messageStore.assignOffset(messageExtBrokerInner);
|
||||
+ multiDispatch.wrapMultiDispatch(messageExtBrokerInner);
|
||||
assertEquals(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET), "0,0");
|
||||
}
|
||||
|
||||
diff --git a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
|
||||
index df3c31c6e..e113b18f1 100644
|
||||
--- a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
|
||||
+++ b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
|
||||
@@ -86,7 +86,7 @@ public class CompactionLogTest {
|
||||
int compactionCqFileSize = 1024;
|
||||
|
||||
|
||||
- private static MessageExtEncoder encoder = new MessageExtEncoder(1024);
|
||||
+ private static MessageExtEncoder encoder = new MessageExtEncoder(1024, new MessageStoreConfig());
|
||||
private static SocketAddress storeHost;
|
||||
private static SocketAddress bornHost;
|
||||
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 70dc93abbcb9bf161378d66fcaca55bedc78b905 Mon Sep 17 00:00:00 2001
|
||||
From: yangguodong <1174533476@qq.com>
|
||||
Date: Wed, 8 Nov 2023 21:14:54 -0600
|
||||
Subject: [PATCH 2/2] Fix tiered store README.md error about Configuration
|
||||
(#7436)
|
||||
|
||||
* Fix tiered store README.md error about Configuration
|
||||
|
||||
* Fix change tieredStoreFilePath to tieredStoreFilepath
|
||||
|
||||
* revert README.md change
|
||||
|
||||
---------
|
||||
|
||||
Co-authored-by: yangguodong.cn <yangguodong.cn@bytedance.com>
|
||||
---
|
||||
.../tieredstore/common/TieredMessageStoreConfig.java | 10 +++++-----
|
||||
.../tieredstore/provider/posix/PosixFileSegment.java | 4 ++--
|
||||
.../rocketmq/tieredstore/file/TieredCommitLogTest.java | 2 +-
|
||||
.../provider/posix/PosixFileSegmentTest.java | 2 +-
|
||||
4 files changed, 9 insertions(+), 9 deletions(-)
|
||||
|
||||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
|
||||
index 595db6b86..a112ea6b1 100644
|
||||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
|
||||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
|
||||
@@ -115,7 +115,7 @@ public class TieredMessageStoreConfig {
|
||||
private long readAheadCacheExpireDuration = 10 * 1000;
|
||||
private double readAheadCacheSizeThresholdRate = 0.3;
|
||||
|
||||
- private String tieredStoreFilePath = "";
|
||||
+ private String tieredStoreFilepath = "";
|
||||
|
||||
private String objectStoreEndpoint = "";
|
||||
|
||||
@@ -350,12 +350,12 @@ public class TieredMessageStoreConfig {
|
||||
this.readAheadCacheSizeThresholdRate = rate;
|
||||
}
|
||||
|
||||
- public String getTieredStoreFilePath() {
|
||||
- return tieredStoreFilePath;
|
||||
+ public String getTieredStoreFilepath() {
|
||||
+ return tieredStoreFilepath;
|
||||
}
|
||||
|
||||
- public void setTieredStoreFilePath(String tieredStoreFilePath) {
|
||||
- this.tieredStoreFilePath = tieredStoreFilePath;
|
||||
+ public void setTieredStoreFilepath(String tieredStoreFilepath) {
|
||||
+ this.tieredStoreFilepath = tieredStoreFilepath;
|
||||
}
|
||||
|
||||
public void setObjectStoreEndpoint(String objectStoreEndpoint) {
|
||||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
|
||||
index 7e949cb28..708ce33f9 100644
|
||||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
|
||||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
|
||||
@@ -66,8 +66,8 @@ public class PosixFileSegment extends TieredFileSegment {
|
||||
super(storeConfig, fileType, filePath, baseOffset);
|
||||
|
||||
// basePath
|
||||
- String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilePath(),
|
||||
- StringUtils.appendIfMissing(storeConfig.getTieredStoreFilePath(), File.separator));
|
||||
+ String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilepath(),
|
||||
+ StringUtils.appendIfMissing(storeConfig.getTieredStoreFilepath(), File.separator));
|
||||
|
||||
// fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset
|
||||
String brokerClusterName = storeConfig.getBrokerClusterName();
|
||||
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
|
||||
index 6693d3cb7..80cdba977 100644
|
||||
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
|
||||
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
|
||||
@@ -49,7 +49,7 @@ public class TieredCommitLogTest {
|
||||
TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
|
||||
storeConfig.setBrokerName("brokerName");
|
||||
storeConfig.setStorePathRootDir(storePath);
|
||||
- storeConfig.setTieredStoreFilePath(storePath + File.separator);
|
||||
+ storeConfig.setTieredStoreFilepath(storePath + File.separator);
|
||||
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
|
||||
storeConfig.setCommitLogRollingInterval(0);
|
||||
storeConfig.setTieredStoreCommitLogMaxSize(1000);
|
||||
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
|
||||
index db33ae847..ede62b8ce 100644
|
||||
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
|
||||
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
|
||||
@@ -42,7 +42,7 @@ public class PosixFileSegmentTest {
|
||||
@Before
|
||||
public void setUp() {
|
||||
storeConfig = new TieredMessageStoreConfig();
|
||||
- storeConfig.setTieredStoreFilePath(storePath);
|
||||
+ storeConfig.setTieredStoreFilepath(storePath);
|
||||
mq = new MessageQueue("OSSFileSegmentTest", "broker", 0);
|
||||
TieredStoreExecutor.init();
|
||||
}
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
Summary: Cloud-Native, Distributed Messaging and Streaming
|
||||
Name: rocketmq
|
||||
Version: 5.1.5
|
||||
Release: 33
|
||||
Release: 34
|
||||
License: Apache-2.0
|
||||
Group: Applications/Message
|
||||
URL: https://rocketmq.apache.org/
|
||||
@ -42,6 +42,7 @@ Patch0029: patch029-backport-Introduce-new-event-NettyEventType.ACTIVEr.patch
|
||||
Patch0030: patch030-backport-remove-some-code.patch
|
||||
Patch0031: patch031-backport-Add-CRC-check-of-commitlog.patch
|
||||
Patch0032: patch032-backport-Clear-POP_CK-when-sending-messages.patch
|
||||
Patch0033: patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch
|
||||
BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git
|
||||
Requires: java-1.8.0-openjdk-devel
|
||||
|
||||
@ -82,6 +83,9 @@ exit 0
|
||||
|
||||
|
||||
%changelog
|
||||
* Fri Dec 8 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-34
|
||||
- backport Lock granularity issue causing LMQ message loss
|
||||
|
||||
* Fri Dec 8 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-33
|
||||
- backport Clear POP_CK when sending messages
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user