From cfd4e01ea3353fb29c883a36fea15f2021f2bf27 Mon Sep 17 00:00:00 2001 From: shizhili Date: Fri, 8 Dec 2023 17:00:02 +0800 Subject: [PATCH] backport Add the CRC check of commitlog --- ...-backport-Add-CRC-check-of-commitlog.patch | 1044 +++++++++++++++++ rocketmq.spec | 6 +- 2 files changed, 1049 insertions(+), 1 deletion(-) create mode 100644 patch031-backport-Add-CRC-check-of-commitlog.patch diff --git a/patch031-backport-Add-CRC-check-of-commitlog.patch b/patch031-backport-Add-CRC-check-of-commitlog.patch new file mode 100644 index 0000000..02d284f --- /dev/null +++ b/patch031-backport-Add-CRC-check-of-commitlog.patch @@ -0,0 +1,1044 @@ +From 91349f30b96db2e16b71d65a535d81f11b60bda5 Mon Sep 17 00:00:00 2001 +From: guyinyou <36399867+guyinyou@users.noreply.github.com> +Date: Wed, 25 Oct 2023 14:54:00 +0800 +Subject: [PATCH 1/2] [ISSUE #7437] Add the CRC check of commitlog (#7468) + +* Added CRC32 check for full data + +* add unit test + +* add MessageExtBrokerInnerTest + +* fix codestyle + +* fix codestyle + +--------- + +Co-authored-by: guyinyou +--- + .../org/apache/rocketmq/common/UtilAll.java | 14 ++ + .../rocketmq/common/message/MessageConst.java | 2 + + .../common/message/MessageDecoder.java | 32 ++- + .../common/message/MessageExtBrokerInner.java | 49 +++++ + .../common/MessageExtBrokerInnerTest.java | 93 ++++++++ + .../org/apache/rocketmq/store/CommitLog.java | 87 +++++++- + .../rocketmq/store/MessageExtEncoder.java | 38 +++- + .../store/config/MessageStoreConfig.java | 23 ++ + .../rocketmq/store/AppendPropCRCTest.java | 200 ++++++++++++++++++ + .../rocketmq/store/BatchPutMessageTest.java | 2 +- + .../store/MessageExtBrokerInnerTest.java | 105 +++++++++ + .../store/ha/autoswitch/AutoSwitchHATest.java | 2 +- + .../file/CompositeQueueFlatFileTest.java | 2 +- + .../util/MessageBufferUtilTest.java | 2 +- + 14 files changed, 630 insertions(+), 21 deletions(-) + create mode 100644 common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java + create mode 100644 store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java + create mode 100644 store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java + +diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +index d2b7c374b..95b6b09b4 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java ++++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +@@ -307,6 +307,20 @@ public class UtilAll { + return (int) (crc32.getValue() & 0x7FFFFFFF); + } + ++ public static int crc32(ByteBuffer byteBuffer) { ++ CRC32 crc32 = new CRC32(); ++ crc32.update(byteBuffer); ++ return (int) (crc32.getValue() & 0x7FFFFFFF); ++ } ++ ++ public static int crc32(ByteBuffer[] byteBuffers) { ++ CRC32 crc32 = new CRC32(); ++ for (ByteBuffer buffer : byteBuffers) { ++ crc32.update(buffer); ++ } ++ return (int) (crc32.getValue() & 0x7FFFFFFF); ++ } ++ + public static String bytes2string(byte[] src) { + char[] hexChars = new char[src.length * 2]; + for (int j = 0; j < src.length; j++) { +diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +index 87fed7c19..24f7bdb99 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java ++++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +@@ -97,6 +97,7 @@ public class MessageConst { + public static final String PROPERTY_TIMER_DEL_UNIQKEY = "TIMER_DEL_UNIQKEY"; + public static final String PROPERTY_TIMER_DELAY_LEVEL = "TIMER_DELAY_LEVEL"; + public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS"; ++ public static final String PROPERTY_CRC32 = "__CRC32#"; + + /** + * properties for DLQ +@@ -155,5 +156,6 @@ public class MessageConst { + STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP); + STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC); + STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID); ++ STRING_HASH_SET.add(PROPERTY_CRC32); + } + } +diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +index 6de0b69fb..b053f8275 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java ++++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +@@ -16,6 +16,7 @@ + */ + package org.apache.rocketmq.common.message; + ++import io.netty.buffer.ByteBuf; + import java.io.IOException; + import java.net.Inet4Address; + import java.net.InetAddress; +@@ -152,6 +153,34 @@ public class MessageDecoder { + return null; + } + ++ public static void createCrc32(final ByteBuffer input, int crc32) { ++ input.put(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8)); ++ input.put((byte) NAME_VALUE_SEPARATOR); ++ for (int i = 0; i < 10; i++) { ++ byte b = '0'; ++ if (crc32 > 0) { ++ b += (byte) (crc32 % 10); ++ crc32 /= 10; ++ } ++ input.put(b); ++ } ++ input.put((byte) PROPERTY_SEPARATOR); ++ } ++ ++ public static void createCrc32(final ByteBuf input, int crc32) { ++ input.writeBytes(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8)); ++ input.writeByte((byte) NAME_VALUE_SEPARATOR); ++ for (int i = 0; i < 10; i++) { ++ byte b = '0'; ++ if (crc32 > 0) { ++ b += (byte) (crc32 % 10); ++ crc32 /= 10; ++ } ++ input.writeByte(b); ++ } ++ input.writeByte((byte) PROPERTY_SEPARATOR); ++ } ++ + public static MessageExt decode(ByteBuffer byteBuffer) { + return decode(byteBuffer, true, true, false); + } +@@ -601,9 +630,6 @@ public class MessageDecoder { + sb.append(value); + sb.append(PROPERTY_SEPARATOR); + } +- if (sb.length() > 0) { +- sb.deleteCharAt(sb.length() - 1); +- } + return sb.toString(); + } + +diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +index 91599653c..4e5d3419a 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java ++++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +@@ -20,6 +20,9 @@ import java.nio.ByteBuffer; + + import org.apache.rocketmq.common.TopicFilterType; + ++import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; ++import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; ++ + public class MessageExtBrokerInner extends MessageExt { + private static final long serialVersionUID = 7256001576878700634L; + private String propertiesString; +@@ -55,6 +58,52 @@ public class MessageExtBrokerInner extends MessageExt { + this.propertiesString = propertiesString; + } + ++ ++ public void deleteProperty(String name) { ++ super.clearProperty(name); ++ if (propertiesString != null) { ++ int idx0 = 0; ++ int idx1; ++ int idx2; ++ idx1 = propertiesString.indexOf(name, idx0); ++ if (idx1 != -1) { ++ // cropping may be required ++ StringBuilder stringBuilder = new StringBuilder(propertiesString.length()); ++ while (true) { ++ int startIdx = idx0; ++ while (true) { ++ idx1 = propertiesString.indexOf(name, startIdx); ++ if (idx1 == -1) { ++ break; ++ } ++ startIdx = idx1 + name.length(); ++ if (idx1 == 0 || propertiesString.charAt(idx1 - 1) == PROPERTY_SEPARATOR) { ++ if (propertiesString.length() > idx1 + name.length() ++ && propertiesString.charAt(idx1 + name.length()) == NAME_VALUE_SEPARATOR) { ++ break; ++ } ++ } ++ } ++ if (idx1 == -1) { ++ // there are no characters that need to be skipped. Append all remaining characters. ++ stringBuilder.append(propertiesString, idx0, propertiesString.length()); ++ break; ++ } ++ // there are characters that need to be cropped ++ stringBuilder.append(propertiesString, idx0, idx1); ++ // move idx2 to the end of the cropped character ++ idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 + name.length() + 1); ++ // all subsequent characters will be cropped ++ if (idx2 == -1) { ++ break; ++ } ++ idx0 = idx2 + 1; ++ } ++ this.setPropertiesString(stringBuilder.toString()); ++ } ++ } ++ } ++ + public long getTagsCode() { + return tagsCode; + } +diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java +new file mode 100644 +index 000000000..77d69e5ad +--- /dev/null ++++ b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java +@@ -0,0 +1,93 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.common; ++ ++import org.apache.rocketmq.common.message.MessageExtBrokerInner; ++import org.junit.Test; ++ ++import static org.assertj.core.api.Assertions.assertThat; ++ ++public class MessageExtBrokerInnerTest { ++ @Test ++ public void testDeleteProperty() { ++ MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); ++ String propertiesString = ""; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); ++ ++ propertiesString = "KeyA\u0001ValueA"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); ++ ++ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); ++ ++ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); ++ ++ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002"); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB"); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002"); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001"); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA"); ++ } ++} +diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +index 93102799b..3d3ee86b8 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ++++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +@@ -73,6 +73,10 @@ public class CommitLog implements Swappable { + protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + // End of file empty MAGIC CODE cbd43194 + public final static int BLANK_MAGIC_CODE = -875286124; ++ /** ++ * CRC32 Format: [PROPERTY_CRC32 + NAME_VALUE_SEPARATOR + 10-digit fixed-length string + PROPERTY_SEPARATOR] ++ */ ++ public static final int CRC32_RESERVED_LEN = MessageConst.PROPERTY_CRC32.length() + 1 + 10 + 1; + protected final MappedFileQueue mappedFileQueue; + protected final DefaultMessageStore defaultMessageStore; + +@@ -96,6 +100,8 @@ public class CommitLog implements Swappable { + + protected int commitLogSize; + ++ private final boolean enabledAppendPropCRC; ++ + public CommitLog(final DefaultMessageStore messageStore) { + String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog(); + if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) { +@@ -117,7 +123,9 @@ public class CommitLog implements Swappable { + putMessageThreadLocal = new ThreadLocal() { + @Override + protected PutMessageThreadLocal initialValue() { +- return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); ++ return new PutMessageThreadLocal( ++ defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(), ++ defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC()); + } + }; + this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); +@@ -127,6 +135,8 @@ public class CommitLog implements Swappable { + this.topicQueueLock = new TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum()); + + this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); ++ ++ this.enabledAppendPropCRC = messageStore.getMessageStoreConfig().isEnabledAppendPropCRC(); + } + + public void setFullStorePaths(Set fullStorePaths) { +@@ -470,10 +480,16 @@ public class CommitLog implements Swappable { + byteBuffer.get(bytesContent, 0, bodyLen); + + if (checkCRC) { +- int crc = UtilAll.crc32(bytesContent, 0, bodyLen); +- if (crc != bodyCRC) { +- log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC); +- return new DispatchRequest(-1, false/* success */); ++ /** ++ * When the forceVerifyPropCRC = false, ++ * use original bodyCrc validation. ++ */ ++ if (!this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) { ++ int crc = UtilAll.crc32(bytesContent, 0, bodyLen); ++ if (crc != bodyCRC) { ++ log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC); ++ return new DispatchRequest(-1, false/* success */); ++ } + } + } + } else { +@@ -531,6 +547,43 @@ public class CommitLog implements Swappable { + } + } + ++ if (checkCRC) { ++ /** ++ * When the forceVerifyPropCRC = true, ++ * Crc verification needs to be performed on the entire message data (excluding the length reserved at the tail) ++ */ ++ if (this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) { ++ int expectedCRC = -1; ++ if (propertiesMap != null) { ++ String crc32Str = propertiesMap.get(MessageConst.PROPERTY_CRC32); ++ if (crc32Str != null) { ++ expectedCRC = 0; ++ for (int i = crc32Str.length() - 1; i >= 0; i--) { ++ int num = crc32Str.charAt(i) - '0'; ++ expectedCRC *= 10; ++ expectedCRC += num; ++ } ++ } ++ } ++ if (expectedCRC > 0) { ++ ByteBuffer tmpBuffer = byteBuffer.duplicate(); ++ tmpBuffer.position(tmpBuffer.position() - totalSize); ++ tmpBuffer.limit(tmpBuffer.position() + totalSize - CommitLog.CRC32_RESERVED_LEN); ++ int crc = UtilAll.crc32(tmpBuffer); ++ if (crc != expectedCRC) { ++ log.warn( ++ "CommitLog#checkAndDispatchMessage: failed to check message CRC, expected " ++ + "CRC={}, actual CRC={}", bodyCRC, crc); ++ return new DispatchRequest(-1, false/* success */); ++ } ++ } else { ++ log.warn( ++ "CommitLog#checkAndDispatchMessage: failed to check message CRC, not found CRC in properties"); ++ return new DispatchRequest(-1, false/* success */); ++ } ++ } ++ } ++ + int readLength = MessageExtEncoder.calMsgLength(messageVersion, sysFlag, bodyLen, topicLen, propertiesLength); + if (totalSize != readLength) { + doNothingForDeadCode(reconsumeTimes); +@@ -846,9 +899,12 @@ public class CommitLog implements Swappable { + if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { + msg.setStoreTimestamp(System.currentTimeMillis()); + } +- + // Set the message body CRC (consider the most appropriate setting on the client) + msg.setBodyCRC(UtilAll.crc32(msg.getBody())); ++ if (enabledAppendPropCRC) { ++ // delete crc32 properties if exist ++ msg.deleteProperty(MessageConst.PROPERTY_CRC32); ++ } + // Back to Results + AppendMessageResult result = null; + +@@ -1764,6 +1820,7 @@ public class CommitLog implements Swappable { + private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4; + // Store the message content + private final ByteBuffer msgStoreItemMemory; ++ private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN; + + DefaultAppendMessageCallback() { + this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH); +@@ -1837,6 +1894,15 @@ public class CommitLog implements Swappable { + pos += 8 + 4 + 8 + ipLen; + // refresh store time stamp in lock + preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp()); ++ if (enabledAppendPropCRC) { ++ // 18 CRC32 ++ int checkSize = msgLen - crc32ReservedLength; ++ ByteBuffer tmpBuffer = preEncodeBuffer.duplicate(); ++ tmpBuffer.limit(tmpBuffer.position() + checkSize); ++ int crc32 = UtilAll.crc32(tmpBuffer); ++ tmpBuffer.limit(tmpBuffer.position() + crc32ReservedLength); ++ MessageDecoder.createCrc32(tmpBuffer, crc32); ++ } + + final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); + CommitLog.this.getMessageStore().getPerfCounter().startTick("WRITE_MEMORY_TIME_MS"); +@@ -1918,6 +1984,15 @@ public class CommitLog implements Swappable { + pos += 8 + 4 + 8 + bornHostLength; + // refresh store time stamp in lock + messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp()); ++ if (enabledAppendPropCRC) { ++ //append crc32 ++ int checkSize = msgLen - crc32ReservedLength; ++ ByteBuffer tmpBuffer = messagesByteBuff.duplicate(); ++ tmpBuffer.position(msgPos).limit(msgPos + checkSize); ++ int crc32 = UtilAll.crc32(tmpBuffer); ++ messagesByteBuff.position(msgPos + checkSize); ++ MessageDecoder.createCrc32(messagesByteBuff, crc32); ++ } + + putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen; + queueOffset++; +diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +index ee609a337..c1d808728 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java ++++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +@@ -19,6 +19,7 @@ package org.apache.rocketmq.store; + import io.netty.buffer.ByteBuf; + import io.netty.buffer.ByteBufAllocator; + import io.netty.buffer.UnpooledByteBufAllocator; ++import java.nio.ByteBuffer; + import org.apache.rocketmq.common.UtilAll; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.message.MessageDecoder; +@@ -29,8 +30,6 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + +-import java.nio.ByteBuffer; +- + public class MessageExtEncoder { + protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + private ByteBuf byteBuf; +@@ -38,7 +37,13 @@ public class MessageExtEncoder { + private int maxMessageBodySize; + // The maximum length of the full message. + private int maxMessageSize; ++ private final int crc32ReservedLength; ++ + public MessageExtEncoder(final int maxMessageBodySize) { ++ this(maxMessageBodySize, false); ++ } ++ ++ public MessageExtEncoder(final int maxMessageBodySize, boolean enabledAppendPropCRC) { + ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; + //Reserve 64kb for encoding buffer outside body + int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ? +@@ -46,6 +51,7 @@ public class MessageExtEncoder { + byteBuf = alloc.directBuffer(maxMessageSize); + this.maxMessageBodySize = maxMessageBodySize; + this.maxMessageSize = maxMessageSize; ++ this.crc32ReservedLength = enabledAppendPropCRC ? CommitLog.CRC32_RESERVED_LEN : 0; + } + + public static int calMsgLength(MessageVersion messageVersion, +@@ -81,10 +87,13 @@ public class MessageExtEncoder { + final byte[] propertiesData = + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + +- final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; ++ boolean needAppendLastPropertySeparator = crc32ReservedLength > 0 && propertiesData != null && propertiesData.length > 0 ++ && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR; ++ ++ final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength; + + if (propertiesLength > Short.MAX_VALUE) { +- log.warn("putMessage message properties length too long. length={}", propertiesData.length); ++ log.warn("putMessage message properties length too long. length={}", propertiesLength); + return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); + } + +@@ -160,8 +169,14 @@ public class MessageExtEncoder { + + // 17 PROPERTIES + this.byteBuf.writeShort((short) propertiesLength); +- if (propertiesLength > 0) ++ if (propertiesLength > crc32ReservedLength) { + this.byteBuf.writeBytes(propertiesData); ++ } ++ if (needAppendLastPropertySeparator) { ++ this.byteBuf.writeByte((byte) MessageDecoder.PROPERTY_SEPARATOR); ++ } ++ // 18 CRC32 ++ this.byteBuf.writerIndex(this.byteBuf.writerIndex() + crc32ReservedLength); + + return null; + } +@@ -213,10 +228,11 @@ public class MessageExtEncoder { + final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + + final int topicLength = topicData.length; +- final int topicLengthSize = messageExtBatch.getVersion().getTopicLengthSize(); + int totalPropLen = needAppendLastPropertySeparator ? +- propertiesLen + batchPropLen + topicLengthSize : propertiesLen + batchPropLen; ++ propertiesLen + batchPropLen + 1 : propertiesLen + batchPropLen; + ++ // properties need to add crc32 ++ totalPropLen += crc32ReservedLength; + final int msgLen = calMsgLength( + messageExtBatch.getVersion(), messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen); + +@@ -278,6 +294,7 @@ public class MessageExtEncoder { + } + this.byteBuf.writeBytes(batchPropData, 0, batchPropLen); + } ++ this.byteBuf.writerIndex(this.byteBuf.writerIndex() + crc32ReservedLength); + } + putMessageContext.setBatchSize(batchSize); + putMessageContext.setPhyPos(new long[batchSize]); +@@ -304,8 +321,13 @@ public class MessageExtEncoder { + static class PutMessageThreadLocal { + private final MessageExtEncoder encoder; + private final StringBuilder keyBuilder; ++ + PutMessageThreadLocal(int size) { +- encoder = new MessageExtEncoder(size); ++ this(size, false); ++ } ++ ++ PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) { ++ encoder = new MessageExtEncoder(size, enabledAppendPropCRC); + keyBuilder = new StringBuilder(); + } + +diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +index 028facbdc..8cb3ea6e9 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java ++++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +@@ -270,6 +270,12 @@ public class MessageStoreConfig { + */ + private boolean autoMessageVersionOnTopicLen = true; + ++ /** ++ * It cannot be changed after the broker is started. ++ * Modifications need to be restarted to take effect. ++ */ ++ private boolean enabledAppendPropCRC = false; ++ private boolean forceVerifyPropCRC = false; + private int travelCqFileNumWhenGetMessage = 1; + // Sleep interval between to corrections + private int correctLogicMinOffsetSleepInterval = 1; +@@ -405,6 +411,14 @@ public class MessageStoreConfig { + + private int topicQueueLockNum = 32; + ++ public boolean isEnabledAppendPropCRC() { ++ return enabledAppendPropCRC; ++ } ++ ++ public void setEnabledAppendPropCRC(boolean enabledAppendPropCRC) { ++ this.enabledAppendPropCRC = enabledAppendPropCRC; ++ } ++ + public boolean isDebugLockEnable() { + return debugLockEnable; + } +@@ -640,6 +654,15 @@ public class MessageStoreConfig { + this.checkCRCOnRecover = checkCRCOnRecover; + } + ++ public boolean isForceVerifyPropCRC() { ++ return forceVerifyPropCRC; ++ } ++ ++ public void setForceVerifyPropCRC(boolean forceVerifyPropCRC) { ++ this.forceVerifyPropCRC = forceVerifyPropCRC; ++ } ++ ++ + public String getStorePathCommitLog() { + if (storePathCommitLog == null) { + return storePathRootDir + File.separator + "commitlog"; +diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java +new file mode 100644 +index 000000000..c8ed4d74d +--- /dev/null ++++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java +@@ -0,0 +1,200 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.store; ++ ++import java.io.File; ++import java.net.InetSocketAddress; ++import java.nio.ByteBuffer; ++import java.util.ArrayList; ++import java.util.HashSet; ++import java.util.List; ++import java.util.Set; ++import java.util.concurrent.ConcurrentHashMap; ++import org.apache.rocketmq.common.BrokerConfig; ++import org.apache.rocketmq.common.UtilAll; ++import org.apache.rocketmq.common.message.Message; ++import org.apache.rocketmq.common.message.MessageDecoder; ++import org.apache.rocketmq.common.message.MessageExt; ++import org.apache.rocketmq.common.message.MessageExtBatch; ++import org.apache.rocketmq.common.message.MessageExtBrokerInner; ++import org.apache.rocketmq.store.config.MessageStoreConfig; ++import org.junit.After; ++import org.junit.Before; ++import org.junit.Test; ++ ++import static org.junit.Assert.assertEquals; ++import static org.junit.Assert.assertFalse; ++import static org.junit.Assert.assertTrue; ++ ++public class AppendPropCRCTest { ++ ++ AppendMessageCallback callback; ++ ++ MessageExtEncoder encoder; ++ ++ CommitLog commitLog; ++ ++ @Before ++ public void init() throws Exception { ++ MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); ++ messageStoreConfig.setMappedFileSizeCommitLog(1024 * 8); ++ messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); ++ messageStoreConfig.setMaxHashSlotNum(100); ++ messageStoreConfig.setMaxIndexNum(100 * 10); ++ messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore"); ++ messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog"); ++ messageStoreConfig.setForceVerifyPropCRC(true); ++ messageStoreConfig.setEnabledAppendPropCRC(true); ++ //too much reference ++ DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>()); ++ commitLog = new CommitLog(messageStore); ++ encoder = new MessageExtEncoder(10 * 1024 * 1024, true); ++ callback = commitLog.new DefaultAppendMessageCallback(); ++ } ++ ++ @After ++ public void destroy() { ++ UtilAll.deleteFile(new File(System.getProperty("user.home") + File.separator + "unitteststore")); ++ } ++ ++ @Test ++ public void testAppendMessageSucc() throws Exception { ++ String topic = "test-topic"; ++ int queue = 0; ++ int msgNum = 10; ++ int propertiesLen = 0; ++ Message msg = new Message(); ++ msg.setBody("body".getBytes()); ++ msg.setTopic(topic); ++ msg.setTags("abc"); ++ msg.putUserProperty("a", "aaaaaaaa"); ++ msg.putUserProperty("b", "bbbbbbbb"); ++ msg.putUserProperty("c", "cccccccc"); ++ msg.putUserProperty("d", "dddddddd"); ++ msg.putUserProperty("e", "eeeeeeee"); ++ msg.putUserProperty("f", "ffffffff"); ++ ++ MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); ++ messageExtBrokerInner.setTopic(topic); ++ messageExtBrokerInner.setQueueId(queue); ++ messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis()); ++ messageExtBrokerInner.setBornHost(new InetSocketAddress("127.0.0.1", 123)); ++ messageExtBrokerInner.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); ++ messageExtBrokerInner.setBody(msg.getBody()); ++ messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); ++ propertiesLen = messageExtBrokerInner.getPropertiesString().length(); ++ ++ ByteBuffer buff = ByteBuffer.allocate(1024 * 10); ++ for (int i = 0; i < msgNum; i++) { ++ encoder.encode(messageExtBrokerInner); ++ messageExtBrokerInner.setEncodedBuff(encoder.getEncoderBuffer()); ++ AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBrokerInner, null); ++ assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); ++ } ++ // Expected to pass when message is not modified ++ buff.flip(); ++ for (int i = 0; i < msgNum - 1; i++) { ++ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); ++ assertTrue(request.isSuccess()); ++ } ++ // Modify the properties of the last message and expect the verification to fail. ++ int idx = buff.limit() - (propertiesLen / 2); ++ buff.put(idx, (byte) (buff.get(idx) + 1)); ++ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); ++ assertFalse(request.isSuccess()); ++ } ++ ++ @Test ++ public void testAppendMessageBatchSucc() throws Exception { ++ List messages = new ArrayList<>(); ++ String topic = "test-topic"; ++ int queue = 0; ++ int propertiesLen = 0; ++ for (int i = 0; i < 10; i++) { ++ Message msg = new Message(); ++ msg.setBody("body".getBytes()); ++ msg.setTopic(topic); ++ msg.setTags("abc"); ++ msg.putUserProperty("a", "aaaaaaaa"); ++ msg.putUserProperty("b", "bbbbbbbb"); ++ msg.putUserProperty("c", "cccccccc"); ++ msg.putUserProperty("d", "dddddddd"); ++ msg.putUserProperty("e", "eeeeeeee"); ++ msg.putUserProperty("f", "ffffffff"); ++ String propertiesString = MessageDecoder.messageProperties2String(msg.getProperties()); ++ propertiesLen = propertiesString.length(); ++ messages.add(msg); ++ } ++ MessageExtBatch messageExtBatch = new MessageExtBatch(); ++ messageExtBatch.setTopic(topic); ++ messageExtBatch.setQueueId(queue); ++ messageExtBatch.setBornTimestamp(System.currentTimeMillis()); ++ messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 123)); ++ messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); ++ messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); ++ ++ PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue); ++ messageExtBatch.setEncodedBuff(encoder.encode(messageExtBatch, putMessageContext)); ++ ByteBuffer buff = ByteBuffer.allocate(1024 * 10); ++ //encounter end of file when append half of the data ++ AppendMessageResult allresult = ++ callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext); ++ ++ assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); ++ assertEquals(0, allresult.getWroteOffset()); ++ assertEquals(0, allresult.getLogicsOffset()); ++ assertEquals(buff.position(), allresult.getWroteBytes()); ++ ++ assertEquals(messages.size(), allresult.getMsgNum()); ++ ++ Set msgIds = new HashSet<>(); ++ for (String msgId : allresult.getMsgId().split(",")) { ++ assertEquals(32, msgId.length()); ++ msgIds.add(msgId); ++ } ++ assertEquals(messages.size(), msgIds.size()); ++ ++ List decodeMsgs = MessageDecoder.decodes((ByteBuffer) buff.flip()); ++ assertEquals(decodeMsgs.size(), decodeMsgs.size()); ++ long queueOffset = decodeMsgs.get(0).getQueueOffset(); ++ long storeTimeStamp = decodeMsgs.get(0).getStoreTimestamp(); ++ for (int i = 0; i < messages.size(); i++) { ++ assertEquals(messages.get(i).getTopic(), decodeMsgs.get(i).getTopic()); ++ assertEquals(new String(messages.get(i).getBody()), new String(decodeMsgs.get(i).getBody())); ++ assertEquals(messages.get(i).getTags(), decodeMsgs.get(i).getTags()); ++ ++ assertEquals(messageExtBatch.getBornHostNameString(), decodeMsgs.get(i).getBornHostNameString()); ++ ++ assertEquals(messageExtBatch.getBornTimestamp(), decodeMsgs.get(i).getBornTimestamp()); ++ assertEquals(storeTimeStamp, decodeMsgs.get(i).getStoreTimestamp()); ++ assertEquals(queueOffset++, decodeMsgs.get(i).getQueueOffset()); ++ } ++ ++ // Expected to pass when message is not modified ++ buff.flip(); ++ for (int i = 0; i < messages.size() - 1; i++) { ++ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); ++ assertTrue(request.isSuccess()); ++ } ++ // Modify the properties of the last message and expect the verification to fail. ++ int idx = buff.limit() - (propertiesLen / 2); ++ buff.put(idx, (byte) (buff.get(idx) + 1)); ++ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false); ++ assertFalse(request.isSuccess()); ++ } ++} +diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java +index 43ca38eb4..768029ca1 100644 +--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java ++++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java +@@ -108,7 +108,7 @@ public class BatchPutMessageTest { + short propertiesLength = (short) propertiesBytes.length; + final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + final int topicLength = topicData.length; +- msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength + batchPropLen + 1) + msgLengthArr[j - 1]; ++ msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength + batchPropLen) + msgLengthArr[j - 1]; + j++; + } + byte[] batchMessageBody = MessageDecoder.encodeMessages(messages); +diff --git a/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java +new file mode 100644 +index 000000000..415dc3811 +--- /dev/null ++++ b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java +@@ -0,0 +1,105 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.store; ++ ++import org.apache.rocketmq.common.message.MessageExtBrokerInner; ++import org.junit.Test; ++ ++import static org.assertj.core.api.Assertions.assertThat; ++ ++public class MessageExtBrokerInnerTest { ++ @Test ++ public void testDeleteProperty() { ++ MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); ++ String propertiesString = ""; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); ++ ++ propertiesString = "KeyA\u0001ValueA"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(""); ++ ++ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); ++ ++ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); ++ ++ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002"); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002"); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB"); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002"); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001"); ++ ++ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("KeyA"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA"); ++ ++ propertiesString = "__CRC32#\u0001"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("__CRC32#"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEmpty(); ++ ++ propertiesString = "__CRC32#"; ++ messageExtBrokerInner.setPropertiesString(propertiesString); ++ messageExtBrokerInner.deleteProperty("__CRC32#"); ++ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(propertiesString); ++ } ++ ++} +diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java +index db5c5af4c..7d659d2f6 100644 +--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java ++++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java +@@ -465,7 +465,7 @@ public class AutoSwitchHATest { + + // Step2: check flag SynchronizingSyncStateSet + Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet()); +- Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1570); ++ Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1580); + Set syncStateSet = masterHAService.getSyncStateSet(); + Assert.assertEquals(syncStateSet.size(), 2); + Assert.assertTrue(syncStateSet.contains(1L)); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +index 2e028ada3..588424304 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +@@ -74,7 +74,7 @@ public class CompositeQueueFlatFileTest { + ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer(); + AppendResult result = flatFile.appendCommitLog(message); + Assert.assertEquals(AppendResult.SUCCESS, result); +- Assert.assertEquals(122L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition()); ++ Assert.assertEquals(123L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition()); + Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition()); + + flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java +index 1f38d4f6c..a413f2113 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java +@@ -47,7 +47,7 @@ public class MessageBufferUtilTest { + + 8 //Prepared Transaction Offset + + 4 + 0 //BODY + + 2 + 0 //TOPIC +- + 2 + 30 //properties ++ + 2 + 31 //properties + + 0; + + public static ByteBuffer buildMockedMessageBuffer() { +-- +2.32.0.windows.2 + + +From 48ef5ced4639699e3ba207b1a648b1fd47649a69 Mon Sep 17 00:00:00 2001 +From: rongtong +Date: Thu, 26 Oct 2023 14:43:24 +0800 +Subject: [PATCH 2/2] [ISSUE #7505] Do not validate the length when deleting a + topic + +--- + .../rocketmq/broker/processor/AdminBrokerProcessor.java | 9 +++++---- + 1 file changed, 5 insertions(+), 4 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +index 0b7a6d206..004bf12ac 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +@@ -518,12 +518,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + String topic = requestHeader.getTopic(); +- TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic); +- if (!result.isValid()) { ++ ++ if (UtilAll.isBlank(topic)) { + response.setCode(ResponseCode.SYSTEM_ERROR); +- response.setRemark(result.getRemark()); ++ response.setRemark("The specified topic is blank."); + return response; + } ++ + if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) { + if (TopicValidator.isSystemTopic(topic)) { + response.setCode(ResponseCode.SYSTEM_ERROR); +@@ -2726,7 +2727,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + return response; + } + final EpochEntryCache entryCache = new EpochEntryCache(brokerConfig.getBrokerClusterName(), +- brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset()); ++ brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset()); + + response.setBody(entryCache.encode()); + response.setCode(ResponseCode.SUCCESS); +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 47b3b67..a28ac53 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.5 -Release: 31 +Release: 32 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -40,6 +40,7 @@ Patch0027: patch027-backport-Utilizing-cache-to-avoid-duplicate-parsing.patch Patch0028: patch028-backport-Fix-proxy-client-language-error.patch Patch0029: patch029-backport-Introduce-new-event-NettyEventType.ACTIVEr.patch Patch0030: patch030-backport-remove-some-code.patch +Patch0031: patch031-backport-Add-CRC-check-of-commitlog.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -80,6 +81,9 @@ exit 0 %changelog +* Fri Dec 8 2023 ShiZhili - 5.1.3-32 +- backport Add the CRC check of commitlog + * Fri Dec 8 2023 ShiZhili - 5.1.3-31 - backport remove some code