diff --git a/patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch b/patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch new file mode 100644 index 0000000..2f3e05a --- /dev/null +++ b/patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch @@ -0,0 +1,986 @@ +From ead3d905016d9db4785a46beaa555c7fafd4f9bb Mon Sep 17 00:00:00 2001 +From: Dongyuan Pan +Date: Wed, 8 Nov 2023 10:40:52 +0800 +Subject: [PATCH 1/2] [ISSUE #7511] Lock granularity issue causing LMQ message + loss (#7525) + +* bug fix: assignOffset and increaseOffset in LMQ has concurrency issues in topicQueueLock, should be in putMessageLock + +* fix MultiDispatchTest + +* fix MultiDispatchTest + +* fix unit test +--- + .../common/message/MessageExtBrokerInner.java | 10 ++ + .../org/apache/rocketmq/store/CommitLog.java | 94 ++++++++++++-- + .../apache/rocketmq/store/ConsumeQueue.java | 44 +------ + .../rocketmq/store/DefaultMessageStore.java | 1 - + .../rocketmq/store/MessageExtEncoder.java | 118 ++++++++++++++++-- + .../apache/rocketmq/store/MultiDispatch.java | 77 ++++++++++++ + .../queue/AbstractConsumeQueueStore.java | 10 ++ + .../store/queue/ConsumeQueueInterface.java | 1 - + .../queue/ConsumeQueueStoreInterface.java | 14 +++ + ...iDispatch.java => MultiDispatchUtils.java} | 17 +-- + .../store/queue/QueueOffsetOperator.java | 6 +- + .../store/queue/RocksDBConsumeQueue.java | 42 ------- + .../rocketmq/store/AppendCallbackTest.java | 6 +- + .../rocketmq/store/AppendPropCRCTest.java | 5 +- + .../rocketmq/store/MultiDispatchTest.java | 12 +- + .../rocketmq/store/kv/CompactionLogTest.java | 2 +- + 16 files changed, 322 insertions(+), 137 deletions(-) + create mode 100644 store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java + rename store/src/main/java/org/apache/rocketmq/store/queue/{MultiDispatch.java => MultiDispatchUtils.java} (78%) + +diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +index 52501dbca..147f23f12 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java ++++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +@@ -28,6 +28,8 @@ public class MessageExtBrokerInner extends MessageExt { + + private ByteBuffer encodedBuff; + ++ private volatile boolean encodeCompleted; ++ + private MessageVersion version = MessageVersion.MESSAGE_VERSION_V1; + + public ByteBuffer getEncodedBuff() { +@@ -92,4 +94,12 @@ public class MessageExtBrokerInner extends MessageExt { + this.setPropertiesString(MessageDecoder.messageProperties2String(this.getProperties())); + } + } ++ ++ public boolean isEncodeCompleted() { ++ return encodeCompleted; ++ } ++ ++ public void setEncodeCompleted(boolean encodeCompleted) { ++ this.encodeCompleted = encodeCompleted; ++ } + } +diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +index 6c3afde70..35c1d0e2d 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ++++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +@@ -35,6 +35,7 @@ import java.util.function.Supplier; + import java.util.stream.Collectors; + import com.sun.jna.NativeLong; + import com.sun.jna.Pointer; ++import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.ServiceThread; + import org.apache.rocketmq.common.SystemClock; +@@ -56,6 +57,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal; + import org.apache.rocketmq.store.config.BrokerRole; + import org.apache.rocketmq.store.config.FlushDiskType; ++import org.apache.rocketmq.store.config.MessageStoreConfig; + import org.apache.rocketmq.store.ha.HAService; + import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; + import org.apache.rocketmq.store.logfile.MappedFile; +@@ -101,6 +103,7 @@ public class CommitLog implements Swappable { + protected int commitLogSize; + + private final boolean enabledAppendPropCRC; ++ protected final MultiDispatch multiDispatch; + + public CommitLog(final DefaultMessageStore messageStore) { + String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog(); +@@ -119,13 +122,11 @@ public class CommitLog implements Swappable { + this.flushManager = new DefaultFlushManager(); + this.coldDataCheckService = new ColdDataCheckService(); + +- this.appendMessageCallback = new DefaultAppendMessageCallback(); ++ this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig()); + putMessageThreadLocal = new ThreadLocal() { + @Override + protected PutMessageThreadLocal initialValue() { +- return new PutMessageThreadLocal( +- defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(), +- defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC()); ++ return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig()); + } + }; + this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); +@@ -137,6 +138,8 @@ public class CommitLog implements Swappable { + this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); + + this.enabledAppendPropCRC = messageStore.getMessageStoreConfig().isEnabledAppendPropCRC(); ++ ++ this.multiDispatch = new MultiDispatch(defaultMessageStore); + } + + public void setFullStorePaths(Set fullStorePaths) { +@@ -1830,15 +1833,84 @@ public class CommitLog implements Swappable { + // Store the message content + private final ByteBuffer msgStoreItemMemory; + private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN; ++ private final MessageStoreConfig messageStoreConfig; + +- DefaultAppendMessageCallback() { ++ DefaultAppendMessageCallback(MessageStoreConfig messageStoreConfig) { + this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH); ++ this.messageStoreConfig = messageStoreConfig; ++ } ++ ++ public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, final MessageExtBrokerInner msgInner) { ++ if (msgInner.isEncodeCompleted()) { ++ return null; ++ } ++ ++ multiDispatch.wrapMultiDispatch(msgInner); ++ ++ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); ++ ++ final byte[] propertiesData = ++ msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); ++ ++ boolean needAppendLastPropertySeparator = enabledAppendPropCRC && propertiesData != null && propertiesData.length > 0 ++ && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR; ++ ++ final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength; ++ ++ if (propertiesLength > Short.MAX_VALUE) { ++ log.warn("putMessage message properties length too long. length={}", propertiesData.length); ++ return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); ++ } ++ ++ int msgLenWithoutProperties = preEncodeBuffer.getInt(0); ++ ++ int msgLen = msgLenWithoutProperties + 2 + propertiesLength; ++ ++ // Exceeds the maximum message ++ if (msgLen > this.messageStoreConfig.getMaxMessageSize()) { ++ log.warn("message size exceeded, msg total size: " + msgLen + ", maxMessageSize: " + this.messageStoreConfig.getMaxMessageSize()); ++ return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); ++ } ++ ++ // Back filling total message length ++ preEncodeBuffer.putInt(0, msgLen); ++ // Modify position to msgLenWithoutProperties ++ preEncodeBuffer.position(msgLenWithoutProperties); ++ ++ preEncodeBuffer.putShort((short) propertiesLength); ++ ++ if (propertiesLength > crc32ReservedLength) { ++ preEncodeBuffer.put(propertiesData); ++ } ++ ++ if (needAppendLastPropertySeparator) { ++ preEncodeBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR); ++ } ++ // 18 CRC32 ++ preEncodeBuffer.position(preEncodeBuffer.position() + crc32ReservedLength); ++ ++ msgInner.setEncodeCompleted(true); ++ ++ return null; + } + + public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, + final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) { + // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
+ ++ ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); ++ boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner); ++ if (isMultiDispatchMsg) { ++ AppendMessageResult appendMessageResult = handlePropertiesForLmqMsg(preEncodeBuffer, msgInner); ++ if (appendMessageResult != null) { ++ return appendMessageResult; ++ } ++ } ++ ++ final int msgLen = preEncodeBuffer.getInt(0); ++ preEncodeBuffer.position(0); ++ preEncodeBuffer.limit(msgLen); ++ + // PHY OFFSET + long wroteOffset = fileFromOffset + byteBuffer.position(); + +@@ -1872,9 +1944,6 @@ public class CommitLog implements Swappable { + break; + } + +- ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); +- final int msgLen = preEncodeBuffer.getInt(0); +- + // Determines whether there is sufficient free space + if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { + this.msgStoreItemMemory.clear(); +@@ -1919,6 +1988,11 @@ public class CommitLog implements Swappable { + byteBuffer.put(preEncodeBuffer); + CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS"); + msgInner.setEncodedBuff(null); ++ ++ if (isMultiDispatchMsg) { ++ CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner); ++ } ++ + return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier, + msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum); + } +@@ -2159,6 +2233,10 @@ public class CommitLog implements Swappable { + return flushManager; + } + ++ public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) { ++ return StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); ++ } ++ + private boolean isCloseReadAhead() { + return !MixAll.isWindows() && !defaultMessageStore.getMessageStoreConfig().isDataReadAheadEnable(); + } +diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +index 623509c8b..453c9d1dc 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ++++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +@@ -27,7 +27,6 @@ import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.Pair; + import org.apache.rocketmq.common.attribute.CQType; + import org.apache.rocketmq.common.constant.LoggerName; +-import org.apache.rocketmq.common.message.MessageAccessor; + import org.apache.rocketmq.common.message.MessageConst; + import org.apache.rocketmq.common.message.MessageExtBrokerInner; + import org.apache.rocketmq.logging.org.slf4j.Logger; +@@ -38,7 +37,7 @@ import org.apache.rocketmq.store.logfile.MappedFile; + import org.apache.rocketmq.store.queue.ConsumeQueueInterface; + import org.apache.rocketmq.store.queue.CqUnit; + import org.apache.rocketmq.store.queue.FileQueueLifeCycle; +-import org.apache.rocketmq.store.queue.MultiDispatch; ++import org.apache.rocketmq.store.queue.MultiDispatchUtils; + import org.apache.rocketmq.store.queue.QueueOffsetOperator; + import org.apache.rocketmq.store.queue.ReferredIterator; + +@@ -702,7 +701,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { + this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp()); + } + this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); +- if (MultiDispatch.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) { ++ if (MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) { + multiDispatchLmqQueue(request, maxRetries); + } + return; +@@ -776,28 +775,6 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { + String topicQueueKey = getTopic() + "-" + getQueueId(); + long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey); + msg.setQueueOffset(queueOffset); +- +- +- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28), +- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue. +- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) { +- return; +- } +- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); +- if (StringUtils.isBlank(multiDispatchQueue)) { +- return; +- } +- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); +- Long[] queueOffsets = new Long[queues.length]; +- for (int i = 0; i < queues.length; i++) { +- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) { +- String key = MultiDispatch.lmqQueueKey(queues[i]); +- queueOffsets[i] = queueOffsetOperator.getLmqOffset(key); +- } +- } +- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, +- StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER)); +- msg.removeWaitStorePropertyString(); + } + + @Override +@@ -805,23 +782,6 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { + short messageNum) { + String topicQueueKey = getTopic() + "-" + getQueueId(); + queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum); +- +- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28), +- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue. +- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) { +- return; +- } +- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); +- if (StringUtils.isBlank(multiDispatchQueue)) { +- return; +- } +- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); +- for (int i = 0; i < queues.length; i++) { +- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) { +- String key = MultiDispatch.lmqQueueKey(queues[i]); +- queueOffsetOperator.increaseLmqOffset(key, (short) 1); +- } +- } + } + + private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, +diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +index 99a54e2d7..dc5f312e5 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +@@ -2112,7 +2112,6 @@ public class DefaultMessageStore implements MessageStore { + } + } + +- + @Override + public void increaseOffset(MessageExtBrokerInner msg, short messageNum) { + final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); +diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +index c1d808728..20e9a652b 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java ++++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageVersion; + import org.apache.rocketmq.common.sysflag.MessageSysFlag; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; ++import org.apache.rocketmq.store.config.MessageStoreConfig; + + public class MessageExtEncoder { + protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); +@@ -38,20 +39,22 @@ public class MessageExtEncoder { + // The maximum length of the full message. + private int maxMessageSize; + private final int crc32ReservedLength; ++ private MessageStoreConfig messageStoreConfig; + +- public MessageExtEncoder(final int maxMessageBodySize) { +- this(maxMessageBodySize, false); ++ public MessageExtEncoder(final int maxMessageBodySize, final MessageStoreConfig messageStoreConfig) { ++ this(messageStoreConfig); + } + +- public MessageExtEncoder(final int maxMessageBodySize, boolean enabledAppendPropCRC) { ++ public MessageExtEncoder(final MessageStoreConfig messageStoreConfig) { + ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; ++ this.messageStoreConfig = messageStoreConfig; ++ this.maxMessageBodySize = messageStoreConfig.getMaxMessageSize(); + //Reserve 64kb for encoding buffer outside body + int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ? + maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE; + byteBuf = alloc.directBuffer(maxMessageSize); +- this.maxMessageBodySize = maxMessageBodySize; + this.maxMessageSize = maxMessageSize; +- this.crc32ReservedLength = enabledAppendPropCRC ? CommitLog.CRC32_RESERVED_LEN : 0; ++ this.crc32ReservedLength = messageStoreConfig.isEnabledAppendPropCRC() ? CommitLog.CRC32_RESERVED_LEN : 0; + } + + public static int calMsgLength(MessageVersion messageVersion, +@@ -79,8 +82,103 @@ public class MessageExtEncoder { + + 2 + (Math.max(propertiesLength, 0)); //propertiesLength + } + ++ public static int calMsgLengthNoProperties(MessageVersion messageVersion, ++ int sysFlag, int bodyLength, int topicLength) { ++ ++ int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20; ++ int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20; ++ ++ return 4 //TOTALSIZE ++ + 4 //MAGICCODE ++ + 4 //BODYCRC ++ + 4 //QUEUEID ++ + 4 //FLAG ++ + 8 //QUEUEOFFSET ++ + 8 //PHYSICALOFFSET ++ + 4 //SYSFLAG ++ + 8 //BORNTIMESTAMP ++ + bornhostLength //BORNHOST ++ + 8 //STORETIMESTAMP ++ + storehostAddressLength //STOREHOSTADDRESS ++ + 4 //RECONSUMETIMES ++ + 8 //Prepared Transaction Offset ++ + 4 + (Math.max(bodyLength, 0)) //BODY ++ + messageVersion.getTopicLengthSize() + topicLength; //TOPIC ++ } ++ ++ public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner msgInner) { ++ ++ final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); ++ final int topicLength = topicData.length; ++ ++ final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; ++ ++ // Exceeds the maximum message body ++ if (bodyLength > this.maxMessageBodySize) { ++ CommitLog.log.warn("message body size exceeded, msg body size: " + bodyLength ++ + ", maxMessageSize: " + this.maxMessageBodySize); ++ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); ++ } ++ ++ final int msgLenNoProperties = calMsgLengthNoProperties(msgInner.getVersion(), msgInner.getSysFlag(), bodyLength, topicLength); ++ ++ // 1 TOTALSIZE ++ this.byteBuf.writeInt(msgLenNoProperties); ++ // 2 MAGICCODE ++ this.byteBuf.writeInt(msgInner.getVersion().getMagicCode()); ++ // 3 BODYCRC ++ this.byteBuf.writeInt(msgInner.getBodyCRC()); ++ // 4 QUEUEID ++ this.byteBuf.writeInt(msgInner.getQueueId()); ++ // 5 FLAG ++ this.byteBuf.writeInt(msgInner.getFlag()); ++ // 6 QUEUEOFFSET, need update later ++ this.byteBuf.writeLong(0); ++ // 7 PHYSICALOFFSET, need update later ++ this.byteBuf.writeLong(0); ++ // 8 SYSFLAG ++ this.byteBuf.writeInt(msgInner.getSysFlag()); ++ // 9 BORNTIMESTAMP ++ this.byteBuf.writeLong(msgInner.getBornTimestamp()); ++ ++ // 10 BORNHOST ++ ByteBuffer bornHostBytes = msgInner.getBornHostBytes(); ++ this.byteBuf.writeBytes(bornHostBytes.array()); ++ ++ // 11 STORETIMESTAMP ++ this.byteBuf.writeLong(msgInner.getStoreTimestamp()); ++ ++ // 12 STOREHOSTADDRESS ++ ByteBuffer storeHostBytes = msgInner.getStoreHostBytes(); ++ this.byteBuf.writeBytes(storeHostBytes.array()); ++ ++ // 13 RECONSUMETIMES ++ this.byteBuf.writeInt(msgInner.getReconsumeTimes()); ++ // 14 Prepared Transaction Offset ++ this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset()); ++ // 15 BODY ++ this.byteBuf.writeInt(bodyLength); ++ if (bodyLength > 0) ++ this.byteBuf.writeBytes(msgInner.getBody()); ++ ++ // 16 TOPIC ++ if (MessageVersion.MESSAGE_VERSION_V2.equals(msgInner.getVersion())) { ++ this.byteBuf.writeShort((short) topicLength); ++ } else { ++ this.byteBuf.writeByte((byte) topicLength); ++ } ++ this.byteBuf.writeBytes(topicData); ++ ++ return null; ++ } ++ + public PutMessageResult encode(MessageExtBrokerInner msgInner) { + this.byteBuf.clear(); ++ ++ if (messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner)) { ++ return encodeWithoutProperties(msgInner); ++ } ++ + /** + * Serialize message + */ +@@ -303,7 +401,7 @@ public class MessageExtEncoder { + } + + public ByteBuffer getEncoderBuffer() { +- return this.byteBuf.nioBuffer(); ++ return this.byteBuf.nioBuffer(0, this.byteBuf.capacity()); + } + + public int getMaxMessageBodySize() { +@@ -322,12 +420,8 @@ public class MessageExtEncoder { + private final MessageExtEncoder encoder; + private final StringBuilder keyBuilder; + +- PutMessageThreadLocal(int size) { +- this(size, false); +- } +- +- PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) { +- encoder = new MessageExtEncoder(size, enabledAppendPropCRC); ++ PutMessageThreadLocal(MessageStoreConfig messageStoreConfig) { ++ encoder = new MessageExtEncoder(messageStoreConfig); + keyBuilder = new StringBuilder(); + } + +diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java +new file mode 100644 +index 000000000..5bc587a8e +--- /dev/null ++++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java +@@ -0,0 +1,77 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.store; ++ ++import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.common.MixAll; ++import org.apache.rocketmq.common.message.MessageAccessor; ++import org.apache.rocketmq.common.message.MessageConst; ++import org.apache.rocketmq.common.message.MessageExtBrokerInner; ++ ++/** ++ * MultiDispatch for lmq, not-thread-safe ++ */ ++public class MultiDispatch { ++ private final StringBuilder keyBuilder = new StringBuilder(); ++ private final DefaultMessageStore messageStore; ++ private static final short VALUE_OF_EACH_INCREMENT = 1; ++ ++ public MultiDispatch(DefaultMessageStore messageStore) { ++ this.messageStore = messageStore; ++ } ++ ++ public String queueKey(String queueName, MessageExtBrokerInner msgInner) { ++ keyBuilder.delete(0, keyBuilder.length()); ++ keyBuilder.append(queueName); ++ keyBuilder.append('-'); ++ int queueId = msgInner.getQueueId(); ++ if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) { ++ queueId = 0; ++ } ++ keyBuilder.append(queueId); ++ return keyBuilder.toString(); ++ } ++ ++ public void wrapMultiDispatch(final MessageExtBrokerInner msg) { ++ ++ String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); ++ String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); ++ Long[] queueOffsets = new Long[queues.length]; ++ if (messageStore.getMessageStoreConfig().isEnableLmq()) { ++ for (int i = 0; i < queues.length; i++) { ++ String key = queueKey(queues[i], msg); ++ if (MixAll.isLmq(key)) { ++ queueOffsets[i] = messageStore.getQueueStore().getLmqQueueOffset(key); ++ } ++ } ++ } ++ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, ++ StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER)); ++ msg.removeWaitStorePropertyString(); ++ } ++ ++ public void updateMultiQueueOffset(final MessageExtBrokerInner msgInner) { ++ String multiDispatchQueue = msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); ++ String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); ++ for (String queue : queues) { ++ String key = queueKey(queue, msgInner); ++ if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { ++ messageStore.getQueueStore().increaseLmqOffset(key, VALUE_OF_EACH_INCREMENT); ++ } ++ } ++ } ++} +\ No newline at end of file +diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java +index 30054fa50..d76b05577 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java +@@ -74,6 +74,16 @@ public abstract class AbstractConsumeQueueStore implements ConsumeQueueStoreInte + consumeQueue.increaseQueueOffset(this.queueOffsetOperator, msg, messageNum); + } + ++ @Override ++ public void increaseLmqOffset(String queueKey, short messageNum) { ++ queueOffsetOperator.increaseLmqOffset(queueKey, messageNum); ++ } ++ ++ @Override ++ public long getLmqQueueOffset(String queueKey) { ++ return queueOffsetOperator.getLmqOffset(queueKey); ++ } ++ + @Override + public void removeTopicQueueTable(String topic, Integer queueId) { + this.queueOffsetOperator.remove(topic, queueId); +diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java +index c65f2a68b..768c782b1 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java ++++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java +@@ -181,7 +181,6 @@ public interface ConsumeQueueInterface extends FileQueueLifeCycle { + */ + void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg) throws RocksDBException; + +- + /** + * Increase queue offset. + * @param queueOffsetAssigner the delegated queue offset assigner +diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java +index 268803dcc..e68880a82 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java ++++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java +@@ -183,6 +183,20 @@ public interface ConsumeQueueStoreInterface { + */ + void increaseQueueOffset(MessageExtBrokerInner msg, short messageNum); + ++ /** ++ * Increase lmq offset ++ * @param queueKey ++ * @param messageNum ++ */ ++ void increaseLmqOffset(String queueKey, short messageNum); ++ ++ /** ++ * get lmq queue offset ++ * @param queueKey ++ * @return ++ */ ++ long getLmqQueueOffset(String queueKey); ++ + /** + * recover topicQueue table by minPhyOffset + * @param minPhyOffset +diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java +similarity index 78% +rename from store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java +rename to store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java +index d6291d908..44397a2fc 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java ++++ b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java +@@ -16,8 +16,6 @@ + */ + package org.apache.rocketmq.store.queue; + +-import java.util.ArrayList; +-import java.util.List; + import java.util.Map; + + import org.apache.commons.lang3.StringUtils; +@@ -27,7 +25,7 @@ import org.apache.rocketmq.common.topic.TopicValidator; + import org.apache.rocketmq.store.DispatchRequest; + import org.apache.rocketmq.store.config.MessageStoreConfig; + +-public class MultiDispatch { ++public class MultiDispatchUtils { + + public static String lmqQueueKey(String queueName) { + StringBuilder keyBuilder = new StringBuilder(); +@@ -60,17 +58,4 @@ public class MultiDispatch { + } + return true; + } +- +- public static List checkMultiDispatchQueue(MessageStoreConfig messageStoreConfig, List dispatchRequests) { +- if (!messageStoreConfig.isEnableMultiDispatch() || dispatchRequests == null || dispatchRequests.size() == 0) { +- return null; +- } +- List result = new ArrayList<>(); +- for (DispatchRequest dispatchRequest : dispatchRequests) { +- if (checkMultiDispatchQueue(messageStoreConfig, dispatchRequest)) { +- result.add(dispatchRequest); +- } +- } +- return dispatchRequests; +- } + } +diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java +index 8da374828..5b4bf994e 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java ++++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java +@@ -71,9 +71,9 @@ public class QueueOffsetOperator { + return this.lmqTopicQueueTable.get(topicQueueKey); + } + +- public void increaseLmqOffset(String topicQueueKey, short messageNum) { +- Long lmqOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, k -> 0L); +- this.lmqTopicQueueTable.put(topicQueueKey, lmqOffset + messageNum); ++ public void increaseLmqOffset(String queueKey, short messageNum) { ++ Long lmqOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, queueKey, k -> 0L); ++ this.lmqTopicQueueTable.put(queueKey, lmqOffset + messageNum); + } + + public long currentQueueOffset(String topicQueueKey) { +diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java +index 759be395d..5a981bb4d 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java ++++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java +@@ -19,14 +19,10 @@ package org.apache.rocketmq.store.queue; + import java.nio.ByteBuffer; + import java.util.List; + +-import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.common.BoundaryType; +-import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.Pair; + import org.apache.rocketmq.common.attribute.CQType; + import org.apache.rocketmq.common.constant.LoggerName; +-import org.apache.rocketmq.common.message.MessageAccessor; +-import org.apache.rocketmq.common.message.MessageConst; + import org.apache.rocketmq.common.message.MessageExtBrokerInner; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +@@ -217,50 +213,12 @@ public class RocksDBConsumeQueue implements ConsumeQueueInterface { + queueOffsetOperator.updateQueueOffset(topicQueueKey, queueOffset); + } + msg.setQueueOffset(queueOffset); +- +- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28), +- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue. +- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) { +- return; +- } +- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); +- if (StringUtils.isBlank(multiDispatchQueue)) { +- return; +- } +- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); +- Long[] queueOffsets = new Long[queues.length]; +- for (int i = 0; i < queues.length; i++) { +- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) { +- String key = MultiDispatch.lmqQueueKey(queues[i]); +- queueOffsets[i] = queueOffsetOperator.getLmqTopicQueueNextOffset(key); +- } +- } +- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, +- StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER)); +- msg.removeWaitStorePropertyString(); + } + + @Override + public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg, short messageNum) { + String topicQueueKey = getTopic() + "-" + getQueueId(); + queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum); +- +- // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28), +- // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue. +- if (!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(), msg.getTopic())) { +- return; +- } +- String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); +- if (StringUtils.isBlank(multiDispatchQueue)) { +- return; +- } +- String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); +- for (int i = 0; i < queues.length; i++) { +- if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queues[i])) { +- String key = MultiDispatch.lmqQueueKey(queues[i]); +- queueOffsetOperator.increaseLmqOffset(key, (short) 1); +- } +- } + } + + @Override +diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java +index 87bfe85da..374857149 100644 +--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java ++++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java +@@ -44,7 +44,7 @@ public class AppendCallbackTest { + + AppendMessageCallback callback; + +- MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024); ++ MessageExtEncoder batchEncoder; + + @Before + public void init() throws Exception { +@@ -53,12 +53,14 @@ public class AppendCallbackTest { + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); ++ messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024); + messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore"); + messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog"); + //too much reference + DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>()); + CommitLog commitLog = new CommitLog(messageStore); +- callback = commitLog.new DefaultAppendMessageCallback(); ++ callback = commitLog.new DefaultAppendMessageCallback(messageStoreConfig); ++ batchEncoder = new MessageExtEncoder(messageStoreConfig); + } + + @After +diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java +index c8ed4d74d..d882fc9d9 100644 +--- a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java ++++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java +@@ -56,6 +56,7 @@ public class AppendPropCRCTest { + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); ++ messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024); + messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore"); + messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog"); + messageStoreConfig.setForceVerifyPropCRC(true); +@@ -63,8 +64,8 @@ public class AppendPropCRCTest { + //too much reference + DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>()); + commitLog = new CommitLog(messageStore); +- encoder = new MessageExtEncoder(10 * 1024 * 1024, true); +- callback = commitLog.new DefaultAppendMessageCallback(); ++ encoder = new MessageExtEncoder(messageStoreConfig); ++ callback = commitLog.new DefaultAppendMessageCallback(messageStoreConfig); + } + + @After +diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java +index 2447bbf68..eae5eaa07 100644 +--- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java ++++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java +@@ -28,20 +28,19 @@ import org.apache.rocketmq.common.message.MessageConst; + import org.apache.rocketmq.common.message.MessageDecoder; + import org.apache.rocketmq.common.message.MessageExtBrokerInner; + import org.apache.rocketmq.store.config.MessageStoreConfig; +-import org.apache.rocketmq.store.queue.MultiDispatch; ++import org.apache.rocketmq.store.queue.MultiDispatchUtils; + import org.junit.After; + import org.junit.Before; + import org.junit.Test; + import org.rocksdb.RocksDBException; + +-import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathConsumeQueue; + import static org.junit.Assert.assertEquals; + import static org.mockito.Mockito.mock; + import static org.mockito.Mockito.when; + + public class MultiDispatchTest { + +- private ConsumeQueue consumeQueue; ++ private MultiDispatch multiDispatch; + + private DefaultMessageStore messageStore; + +@@ -61,8 +60,7 @@ public class MultiDispatchTest { + BrokerConfig brokerConfig = new BrokerConfig(); + //too much reference + messageStore = new DefaultMessageStore(messageStoreConfig, null, null, brokerConfig, new ConcurrentHashMap<>()); +- consumeQueue = new ConsumeQueue("xxx", 0, +- getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()), messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore); ++ multiDispatch = new MultiDispatch(messageStore); + } + + @After +@@ -74,14 +72,14 @@ public class MultiDispatchTest { + public void lmqQueueKey() { + MessageExtBrokerInner messageExtBrokerInner = mock(MessageExtBrokerInner.class); + when(messageExtBrokerInner.getQueueId()).thenReturn(2); +- String ret = MultiDispatch.lmqQueueKey("%LMQ%lmq123"); ++ String ret = MultiDispatchUtils.lmqQueueKey("%LMQ%lmq123"); + assertEquals(ret, "%LMQ%lmq123-0"); + } + + @Test + public void wrapMultiDispatch() throws RocksDBException { + MessageExtBrokerInner messageExtBrokerInner = buildMessageMultiQueue(); +- messageStore.assignOffset(messageExtBrokerInner); ++ multiDispatch.wrapMultiDispatch(messageExtBrokerInner); + assertEquals(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET), "0,0"); + } + +diff --git a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java +index df3c31c6e..e113b18f1 100644 +--- a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java ++++ b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java +@@ -86,7 +86,7 @@ public class CompactionLogTest { + int compactionCqFileSize = 1024; + + +- private static MessageExtEncoder encoder = new MessageExtEncoder(1024); ++ private static MessageExtEncoder encoder = new MessageExtEncoder(1024, new MessageStoreConfig()); + private static SocketAddress storeHost; + private static SocketAddress bornHost; + +-- +2.32.0.windows.2 + + +From 70dc93abbcb9bf161378d66fcaca55bedc78b905 Mon Sep 17 00:00:00 2001 +From: yangguodong <1174533476@qq.com> +Date: Wed, 8 Nov 2023 21:14:54 -0600 +Subject: [PATCH 2/2] Fix tiered store README.md error about Configuration + (#7436) + +* Fix tiered store README.md error about Configuration + +* Fix change tieredStoreFilePath to tieredStoreFilepath + +* revert README.md change + +--------- + +Co-authored-by: yangguodong.cn +--- + .../tieredstore/common/TieredMessageStoreConfig.java | 10 +++++----- + .../tieredstore/provider/posix/PosixFileSegment.java | 4 ++-- + .../rocketmq/tieredstore/file/TieredCommitLogTest.java | 2 +- + .../provider/posix/PosixFileSegmentTest.java | 2 +- + 4 files changed, 9 insertions(+), 9 deletions(-) + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java +index 595db6b86..a112ea6b1 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java +@@ -115,7 +115,7 @@ public class TieredMessageStoreConfig { + private long readAheadCacheExpireDuration = 10 * 1000; + private double readAheadCacheSizeThresholdRate = 0.3; + +- private String tieredStoreFilePath = ""; ++ private String tieredStoreFilepath = ""; + + private String objectStoreEndpoint = ""; + +@@ -350,12 +350,12 @@ public class TieredMessageStoreConfig { + this.readAheadCacheSizeThresholdRate = rate; + } + +- public String getTieredStoreFilePath() { +- return tieredStoreFilePath; ++ public String getTieredStoreFilepath() { ++ return tieredStoreFilepath; + } + +- public void setTieredStoreFilePath(String tieredStoreFilePath) { +- this.tieredStoreFilePath = tieredStoreFilePath; ++ public void setTieredStoreFilepath(String tieredStoreFilepath) { ++ this.tieredStoreFilepath = tieredStoreFilepath; + } + + public void setObjectStoreEndpoint(String objectStoreEndpoint) { +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +index 7e949cb28..708ce33f9 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +@@ -66,8 +66,8 @@ public class PosixFileSegment extends TieredFileSegment { + super(storeConfig, fileType, filePath, baseOffset); + + // basePath +- String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilePath(), +- StringUtils.appendIfMissing(storeConfig.getTieredStoreFilePath(), File.separator)); ++ String basePath = StringUtils.defaultString(storeConfig.getTieredStoreFilepath(), ++ StringUtils.appendIfMissing(storeConfig.getTieredStoreFilepath(), File.separator)); + + // fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset + String brokerClusterName = storeConfig.getBrokerClusterName(); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java +index 6693d3cb7..80cdba977 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java +@@ -49,7 +49,7 @@ public class TieredCommitLogTest { + TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig(); + storeConfig.setBrokerName("brokerName"); + storeConfig.setStorePathRootDir(storePath); +- storeConfig.setTieredStoreFilePath(storePath + File.separator); ++ storeConfig.setTieredStoreFilepath(storePath + File.separator); + storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); + storeConfig.setCommitLogRollingInterval(0); + storeConfig.setTieredStoreCommitLogMaxSize(1000); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java +index db33ae847..ede62b8ce 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java +@@ -42,7 +42,7 @@ public class PosixFileSegmentTest { + @Before + public void setUp() { + storeConfig = new TieredMessageStoreConfig(); +- storeConfig.setTieredStoreFilePath(storePath); ++ storeConfig.setTieredStoreFilepath(storePath); + mq = new MessageQueue("OSSFileSegmentTest", "broker", 0); + TieredStoreExecutor.init(); + } +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 887a59d..4ff218d 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.5 -Release: 33 +Release: 34 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -42,6 +42,7 @@ Patch0029: patch029-backport-Introduce-new-event-NettyEventType.ACTIVEr.patch Patch0030: patch030-backport-remove-some-code.patch Patch0031: patch031-backport-Add-CRC-check-of-commitlog.patch Patch0032: patch032-backport-Clear-POP_CK-when-sending-messages.patch +Patch0033: patch033-backport-Lock-granularity-issue-causing-LMQ-message-loss.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -82,6 +83,9 @@ exit 0 %changelog +* Fri Dec 8 2023 ShiZhili - 5.1.3-34 +- backport Lock granularity issue causing LMQ message loss + * Fri Dec 8 2023 ShiZhili - 5.1.3-33 - backport Clear POP_CK when sending messages