rocketmq/patch031-backport-Add-CRC-check-of-commitlog.patch
2023-12-08 17:00:02 +08:00

1045 lines
51 KiB
Diff

From 91349f30b96db2e16b71d65a535d81f11b60bda5 Mon Sep 17 00:00:00 2001
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
Date: Wed, 25 Oct 2023 14:54:00 +0800
Subject: [PATCH 1/2] [ISSUE #7437] Add the CRC check of commitlog (#7468)
* Added CRC32 check for full data
* add unit test
* add MessageExtBrokerInnerTest
* fix codestyle
* fix codestyle
---------
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
---
.../org/apache/rocketmq/common/UtilAll.java | 14 ++
.../rocketmq/common/message/MessageConst.java | 2 +
.../common/message/MessageDecoder.java | 32 ++-
.../common/message/MessageExtBrokerInner.java | 49 +++++
.../common/MessageExtBrokerInnerTest.java | 93 ++++++++
.../org/apache/rocketmq/store/CommitLog.java | 87 +++++++-
.../rocketmq/store/MessageExtEncoder.java | 38 +++-
.../store/config/MessageStoreConfig.java | 23 ++
.../rocketmq/store/AppendPropCRCTest.java | 200 ++++++++++++++++++
.../rocketmq/store/BatchPutMessageTest.java | 2 +-
.../store/MessageExtBrokerInnerTest.java | 105 +++++++++
.../store/ha/autoswitch/AutoSwitchHATest.java | 2 +-
.../file/CompositeQueueFlatFileTest.java | 2 +-
.../util/MessageBufferUtilTest.java | 2 +-
14 files changed, 630 insertions(+), 21 deletions(-)
create mode 100644 common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java
create mode 100644 store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
create mode 100644 store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index d2b7c374b..95b6b09b4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -307,6 +307,20 @@ public class UtilAll {
return (int) (crc32.getValue() & 0x7FFFFFFF);
}
+ public static int crc32(ByteBuffer byteBuffer) {
+ CRC32 crc32 = new CRC32();
+ crc32.update(byteBuffer);
+ return (int) (crc32.getValue() & 0x7FFFFFFF);
+ }
+
+ public static int crc32(ByteBuffer[] byteBuffers) {
+ CRC32 crc32 = new CRC32();
+ for (ByteBuffer buffer : byteBuffers) {
+ crc32.update(buffer);
+ }
+ return (int) (crc32.getValue() & 0x7FFFFFFF);
+ }
+
public static String bytes2string(byte[] src) {
char[] hexChars = new char[src.length * 2];
for (int j = 0; j < src.length; j++) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 87fed7c19..24f7bdb99 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -97,6 +97,7 @@ public class MessageConst {
public static final String PROPERTY_TIMER_DEL_UNIQKEY = "TIMER_DEL_UNIQKEY";
public static final String PROPERTY_TIMER_DELAY_LEVEL = "TIMER_DELAY_LEVEL";
public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS";
+ public static final String PROPERTY_CRC32 = "__CRC32#";
/**
* properties for DLQ
@@ -155,5 +156,6 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP);
STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC);
STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
+ STRING_HASH_SET.add(PROPERTY_CRC32);
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 6de0b69fb..b053f8275 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.common.message;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
@@ -152,6 +153,34 @@ public class MessageDecoder {
return null;
}
+ public static void createCrc32(final ByteBuffer input, int crc32) {
+ input.put(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8));
+ input.put((byte) NAME_VALUE_SEPARATOR);
+ for (int i = 0; i < 10; i++) {
+ byte b = '0';
+ if (crc32 > 0) {
+ b += (byte) (crc32 % 10);
+ crc32 /= 10;
+ }
+ input.put(b);
+ }
+ input.put((byte) PROPERTY_SEPARATOR);
+ }
+
+ public static void createCrc32(final ByteBuf input, int crc32) {
+ input.writeBytes(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8));
+ input.writeByte((byte) NAME_VALUE_SEPARATOR);
+ for (int i = 0; i < 10; i++) {
+ byte b = '0';
+ if (crc32 > 0) {
+ b += (byte) (crc32 % 10);
+ crc32 /= 10;
+ }
+ input.writeByte(b);
+ }
+ input.writeByte((byte) PROPERTY_SEPARATOR);
+ }
+
public static MessageExt decode(ByteBuffer byteBuffer) {
return decode(byteBuffer, true, true, false);
}
@@ -601,9 +630,6 @@ public class MessageDecoder {
sb.append(value);
sb.append(PROPERTY_SEPARATOR);
}
- if (sb.length() > 0) {
- sb.deleteCharAt(sb.length() - 1);
- }
return sb.toString();
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
index 91599653c..4e5d3419a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
@@ -20,6 +20,9 @@ import java.nio.ByteBuffer;
import org.apache.rocketmq.common.TopicFilterType;
+import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
+import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
+
public class MessageExtBrokerInner extends MessageExt {
private static final long serialVersionUID = 7256001576878700634L;
private String propertiesString;
@@ -55,6 +58,52 @@ public class MessageExtBrokerInner extends MessageExt {
this.propertiesString = propertiesString;
}
+
+ public void deleteProperty(String name) {
+ super.clearProperty(name);
+ if (propertiesString != null) {
+ int idx0 = 0;
+ int idx1;
+ int idx2;
+ idx1 = propertiesString.indexOf(name, idx0);
+ if (idx1 != -1) {
+ // cropping may be required
+ StringBuilder stringBuilder = new StringBuilder(propertiesString.length());
+ while (true) {
+ int startIdx = idx0;
+ while (true) {
+ idx1 = propertiesString.indexOf(name, startIdx);
+ if (idx1 == -1) {
+ break;
+ }
+ startIdx = idx1 + name.length();
+ if (idx1 == 0 || propertiesString.charAt(idx1 - 1) == PROPERTY_SEPARATOR) {
+ if (propertiesString.length() > idx1 + name.length()
+ && propertiesString.charAt(idx1 + name.length()) == NAME_VALUE_SEPARATOR) {
+ break;
+ }
+ }
+ }
+ if (idx1 == -1) {
+ // there are no characters that need to be skipped. Append all remaining characters.
+ stringBuilder.append(propertiesString, idx0, propertiesString.length());
+ break;
+ }
+ // there are characters that need to be cropped
+ stringBuilder.append(propertiesString, idx0, idx1);
+ // move idx2 to the end of the cropped character
+ idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 + name.length() + 1);
+ // all subsequent characters will be cropped
+ if (idx2 == -1) {
+ break;
+ }
+ idx0 = idx2 + 1;
+ }
+ this.setPropertiesString(stringBuilder.toString());
+ }
+ }
+ }
+
public long getTagsCode() {
return tagsCode;
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java
new file mode 100644
index 000000000..77d69e5ad
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MessageExtBrokerInnerTest {
+ @Test
+ public void testDeleteProperty() {
+ MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
+ String propertiesString = "";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA");
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 93102799b..3d3ee86b8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -73,6 +73,10 @@ public class CommitLog implements Swappable {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
public final static int BLANK_MAGIC_CODE = -875286124;
+ /**
+ * CRC32 Format: [PROPERTY_CRC32 + NAME_VALUE_SEPARATOR + 10-digit fixed-length string + PROPERTY_SEPARATOR]
+ */
+ public static final int CRC32_RESERVED_LEN = MessageConst.PROPERTY_CRC32.length() + 1 + 10 + 1;
protected final MappedFileQueue mappedFileQueue;
protected final DefaultMessageStore defaultMessageStore;
@@ -96,6 +100,8 @@ public class CommitLog implements Swappable {
protected int commitLogSize;
+ private final boolean enabledAppendPropCRC;
+
public CommitLog(final DefaultMessageStore messageStore) {
String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog();
if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) {
@@ -117,7 +123,9 @@ public class CommitLog implements Swappable {
putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
@Override
protected PutMessageThreadLocal initialValue() {
- return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
+ return new PutMessageThreadLocal(
+ defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(),
+ defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC());
}
};
this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
@@ -127,6 +135,8 @@ public class CommitLog implements Swappable {
this.topicQueueLock = new TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum());
this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
+
+ this.enabledAppendPropCRC = messageStore.getMessageStoreConfig().isEnabledAppendPropCRC();
}
public void setFullStorePaths(Set<String> fullStorePaths) {
@@ -470,10 +480,16 @@ public class CommitLog implements Swappable {
byteBuffer.get(bytesContent, 0, bodyLen);
if (checkCRC) {
- int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
- if (crc != bodyCRC) {
- log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
- return new DispatchRequest(-1, false/* success */);
+ /**
+ * When the forceVerifyPropCRC = false,
+ * use original bodyCrc validation.
+ */
+ if (!this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) {
+ int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
+ if (crc != bodyCRC) {
+ log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
+ return new DispatchRequest(-1, false/* success */);
+ }
}
}
} else {
@@ -531,6 +547,43 @@ public class CommitLog implements Swappable {
}
}
+ if (checkCRC) {
+ /**
+ * When the forceVerifyPropCRC = true,
+ * Crc verification needs to be performed on the entire message data (excluding the length reserved at the tail)
+ */
+ if (this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) {
+ int expectedCRC = -1;
+ if (propertiesMap != null) {
+ String crc32Str = propertiesMap.get(MessageConst.PROPERTY_CRC32);
+ if (crc32Str != null) {
+ expectedCRC = 0;
+ for (int i = crc32Str.length() - 1; i >= 0; i--) {
+ int num = crc32Str.charAt(i) - '0';
+ expectedCRC *= 10;
+ expectedCRC += num;
+ }
+ }
+ }
+ if (expectedCRC > 0) {
+ ByteBuffer tmpBuffer = byteBuffer.duplicate();
+ tmpBuffer.position(tmpBuffer.position() - totalSize);
+ tmpBuffer.limit(tmpBuffer.position() + totalSize - CommitLog.CRC32_RESERVED_LEN);
+ int crc = UtilAll.crc32(tmpBuffer);
+ if (crc != expectedCRC) {
+ log.warn(
+ "CommitLog#checkAndDispatchMessage: failed to check message CRC, expected "
+ + "CRC={}, actual CRC={}", bodyCRC, crc);
+ return new DispatchRequest(-1, false/* success */);
+ }
+ } else {
+ log.warn(
+ "CommitLog#checkAndDispatchMessage: failed to check message CRC, not found CRC in properties");
+ return new DispatchRequest(-1, false/* success */);
+ }
+ }
+ }
+
int readLength = MessageExtEncoder.calMsgLength(messageVersion, sysFlag, bodyLen, topicLen, propertiesLength);
if (totalSize != readLength) {
doNothingForDeadCode(reconsumeTimes);
@@ -846,9 +899,12 @@ public class CommitLog implements Swappable {
if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
msg.setStoreTimestamp(System.currentTimeMillis());
}
-
// Set the message body CRC (consider the most appropriate setting on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
+ if (enabledAppendPropCRC) {
+ // delete crc32 properties if exist
+ msg.deleteProperty(MessageConst.PROPERTY_CRC32);
+ }
// Back to Results
AppendMessageResult result = null;
@@ -1764,6 +1820,7 @@ public class CommitLog implements Swappable {
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
// Store the message content
private final ByteBuffer msgStoreItemMemory;
+ private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN;
DefaultAppendMessageCallback() {
this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
@@ -1837,6 +1894,15 @@ public class CommitLog implements Swappable {
pos += 8 + 4 + 8 + ipLen;
// refresh store time stamp in lock
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
+ if (enabledAppendPropCRC) {
+ // 18 CRC32
+ int checkSize = msgLen - crc32ReservedLength;
+ ByteBuffer tmpBuffer = preEncodeBuffer.duplicate();
+ tmpBuffer.limit(tmpBuffer.position() + checkSize);
+ int crc32 = UtilAll.crc32(tmpBuffer);
+ tmpBuffer.limit(tmpBuffer.position() + crc32ReservedLength);
+ MessageDecoder.createCrc32(tmpBuffer, crc32);
+ }
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
CommitLog.this.getMessageStore().getPerfCounter().startTick("WRITE_MEMORY_TIME_MS");
@@ -1918,6 +1984,15 @@ public class CommitLog implements Swappable {
pos += 8 + 4 + 8 + bornHostLength;
// refresh store time stamp in lock
messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp());
+ if (enabledAppendPropCRC) {
+ //append crc32
+ int checkSize = msgLen - crc32ReservedLength;
+ ByteBuffer tmpBuffer = messagesByteBuff.duplicate();
+ tmpBuffer.position(msgPos).limit(msgPos + checkSize);
+ int crc32 = UtilAll.crc32(tmpBuffer);
+ messagesByteBuff.position(msgPos + checkSize);
+ MessageDecoder.createCrc32(messagesByteBuff, crc32);
+ }
putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen;
queueOffset++;
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
index ee609a337..c1d808728 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
+import java.nio.ByteBuffer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -29,8 +30,6 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-
public class MessageExtEncoder {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private ByteBuf byteBuf;
@@ -38,7 +37,13 @@ public class MessageExtEncoder {
private int maxMessageBodySize;
// The maximum length of the full message.
private int maxMessageSize;
+ private final int crc32ReservedLength;
+
public MessageExtEncoder(final int maxMessageBodySize) {
+ this(maxMessageBodySize, false);
+ }
+
+ public MessageExtEncoder(final int maxMessageBodySize, boolean enabledAppendPropCRC) {
ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
//Reserve 64kb for encoding buffer outside body
int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ?
@@ -46,6 +51,7 @@ public class MessageExtEncoder {
byteBuf = alloc.directBuffer(maxMessageSize);
this.maxMessageBodySize = maxMessageBodySize;
this.maxMessageSize = maxMessageSize;
+ this.crc32ReservedLength = enabledAppendPropCRC ? CommitLog.CRC32_RESERVED_LEN : 0;
}
public static int calMsgLength(MessageVersion messageVersion,
@@ -81,10 +87,13 @@ public class MessageExtEncoder {
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
- final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
+ boolean needAppendLastPropertySeparator = crc32ReservedLength > 0 && propertiesData != null && propertiesData.length > 0
+ && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR;
+
+ final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength;
if (propertiesLength > Short.MAX_VALUE) {
- log.warn("putMessage message properties length too long. length={}", propertiesData.length);
+ log.warn("putMessage message properties length too long. length={}", propertiesLength);
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
@@ -160,8 +169,14 @@ public class MessageExtEncoder {
// 17 PROPERTIES
this.byteBuf.writeShort((short) propertiesLength);
- if (propertiesLength > 0)
+ if (propertiesLength > crc32ReservedLength) {
this.byteBuf.writeBytes(propertiesData);
+ }
+ if (needAppendLastPropertySeparator) {
+ this.byteBuf.writeByte((byte) MessageDecoder.PROPERTY_SEPARATOR);
+ }
+ // 18 CRC32
+ this.byteBuf.writerIndex(this.byteBuf.writerIndex() + crc32ReservedLength);
return null;
}
@@ -213,10 +228,11 @@ public class MessageExtEncoder {
final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
- final int topicLengthSize = messageExtBatch.getVersion().getTopicLengthSize();
int totalPropLen = needAppendLastPropertySeparator ?
- propertiesLen + batchPropLen + topicLengthSize : propertiesLen + batchPropLen;
+ propertiesLen + batchPropLen + 1 : propertiesLen + batchPropLen;
+ // properties need to add crc32
+ totalPropLen += crc32ReservedLength;
final int msgLen = calMsgLength(
messageExtBatch.getVersion(), messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen);
@@ -278,6 +294,7 @@ public class MessageExtEncoder {
}
this.byteBuf.writeBytes(batchPropData, 0, batchPropLen);
}
+ this.byteBuf.writerIndex(this.byteBuf.writerIndex() + crc32ReservedLength);
}
putMessageContext.setBatchSize(batchSize);
putMessageContext.setPhyPos(new long[batchSize]);
@@ -304,8 +321,13 @@ public class MessageExtEncoder {
static class PutMessageThreadLocal {
private final MessageExtEncoder encoder;
private final StringBuilder keyBuilder;
+
PutMessageThreadLocal(int size) {
- encoder = new MessageExtEncoder(size);
+ this(size, false);
+ }
+
+ PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) {
+ encoder = new MessageExtEncoder(size, enabledAppendPropCRC);
keyBuilder = new StringBuilder();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 028facbdc..8cb3ea6e9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -270,6 +270,12 @@ public class MessageStoreConfig {
*/
private boolean autoMessageVersionOnTopicLen = true;
+ /**
+ * It cannot be changed after the broker is started.
+ * Modifications need to be restarted to take effect.
+ */
+ private boolean enabledAppendPropCRC = false;
+ private boolean forceVerifyPropCRC = false;
private int travelCqFileNumWhenGetMessage = 1;
// Sleep interval between to corrections
private int correctLogicMinOffsetSleepInterval = 1;
@@ -405,6 +411,14 @@ public class MessageStoreConfig {
private int topicQueueLockNum = 32;
+ public boolean isEnabledAppendPropCRC() {
+ return enabledAppendPropCRC;
+ }
+
+ public void setEnabledAppendPropCRC(boolean enabledAppendPropCRC) {
+ this.enabledAppendPropCRC = enabledAppendPropCRC;
+ }
+
public boolean isDebugLockEnable() {
return debugLockEnable;
}
@@ -640,6 +654,15 @@ public class MessageStoreConfig {
this.checkCRCOnRecover = checkCRCOnRecover;
}
+ public boolean isForceVerifyPropCRC() {
+ return forceVerifyPropCRC;
+ }
+
+ public void setForceVerifyPropCRC(boolean forceVerifyPropCRC) {
+ this.forceVerifyPropCRC = forceVerifyPropCRC;
+ }
+
+
public String getStorePathCommitLog() {
if (storePathCommitLog == null) {
return storePathRootDir + File.separator + "commitlog";
diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
new file mode 100644
index 000000000..c8ed4d74d
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AppendPropCRCTest {
+
+ AppendMessageCallback callback;
+
+ MessageExtEncoder encoder;
+
+ CommitLog commitLog;
+
+ @Before
+ public void init() throws Exception {
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMappedFileSizeCommitLog(1024 * 8);
+ messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
+ messageStoreConfig.setMaxHashSlotNum(100);
+ messageStoreConfig.setMaxIndexNum(100 * 10);
+ messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore");
+ messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog");
+ messageStoreConfig.setForceVerifyPropCRC(true);
+ messageStoreConfig.setEnabledAppendPropCRC(true);
+ //too much reference
+ DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>());
+ commitLog = new CommitLog(messageStore);
+ encoder = new MessageExtEncoder(10 * 1024 * 1024, true);
+ callback = commitLog.new DefaultAppendMessageCallback();
+ }
+
+ @After
+ public void destroy() {
+ UtilAll.deleteFile(new File(System.getProperty("user.home") + File.separator + "unitteststore"));
+ }
+
+ @Test
+ public void testAppendMessageSucc() throws Exception {
+ String topic = "test-topic";
+ int queue = 0;
+ int msgNum = 10;
+ int propertiesLen = 0;
+ Message msg = new Message();
+ msg.setBody("body".getBytes());
+ msg.setTopic(topic);
+ msg.setTags("abc");
+ msg.putUserProperty("a", "aaaaaaaa");
+ msg.putUserProperty("b", "bbbbbbbb");
+ msg.putUserProperty("c", "cccccccc");
+ msg.putUserProperty("d", "dddddddd");
+ msg.putUserProperty("e", "eeeeeeee");
+ msg.putUserProperty("f", "ffffffff");
+
+ MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
+ messageExtBrokerInner.setTopic(topic);
+ messageExtBrokerInner.setQueueId(queue);
+ messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
+ messageExtBrokerInner.setBornHost(new InetSocketAddress("127.0.0.1", 123));
+ messageExtBrokerInner.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
+ messageExtBrokerInner.setBody(msg.getBody());
+ messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+ propertiesLen = messageExtBrokerInner.getPropertiesString().length();
+
+ ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
+ for (int i = 0; i < msgNum; i++) {
+ encoder.encode(messageExtBrokerInner);
+ messageExtBrokerInner.setEncodedBuff(encoder.getEncoderBuffer());
+ AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBrokerInner, null);
+ assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
+ }
+ // Expected to pass when message is not modified
+ buff.flip();
+ for (int i = 0; i < msgNum - 1; i++) {
+ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false);
+ assertTrue(request.isSuccess());
+ }
+ // Modify the properties of the last message and expect the verification to fail.
+ int idx = buff.limit() - (propertiesLen / 2);
+ buff.put(idx, (byte) (buff.get(idx) + 1));
+ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false);
+ assertFalse(request.isSuccess());
+ }
+
+ @Test
+ public void testAppendMessageBatchSucc() throws Exception {
+ List<Message> messages = new ArrayList<>();
+ String topic = "test-topic";
+ int queue = 0;
+ int propertiesLen = 0;
+ for (int i = 0; i < 10; i++) {
+ Message msg = new Message();
+ msg.setBody("body".getBytes());
+ msg.setTopic(topic);
+ msg.setTags("abc");
+ msg.putUserProperty("a", "aaaaaaaa");
+ msg.putUserProperty("b", "bbbbbbbb");
+ msg.putUserProperty("c", "cccccccc");
+ msg.putUserProperty("d", "dddddddd");
+ msg.putUserProperty("e", "eeeeeeee");
+ msg.putUserProperty("f", "ffffffff");
+ String propertiesString = MessageDecoder.messageProperties2String(msg.getProperties());
+ propertiesLen = propertiesString.length();
+ messages.add(msg);
+ }
+ MessageExtBatch messageExtBatch = new MessageExtBatch();
+ messageExtBatch.setTopic(topic);
+ messageExtBatch.setQueueId(queue);
+ messageExtBatch.setBornTimestamp(System.currentTimeMillis());
+ messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 123));
+ messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
+ messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
+
+ PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
+ messageExtBatch.setEncodedBuff(encoder.encode(messageExtBatch, putMessageContext));
+ ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
+ //encounter end of file when append half of the data
+ AppendMessageResult allresult =
+ callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext);
+
+ assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
+ assertEquals(0, allresult.getWroteOffset());
+ assertEquals(0, allresult.getLogicsOffset());
+ assertEquals(buff.position(), allresult.getWroteBytes());
+
+ assertEquals(messages.size(), allresult.getMsgNum());
+
+ Set<String> msgIds = new HashSet<>();
+ for (String msgId : allresult.getMsgId().split(",")) {
+ assertEquals(32, msgId.length());
+ msgIds.add(msgId);
+ }
+ assertEquals(messages.size(), msgIds.size());
+
+ List<MessageExt> decodeMsgs = MessageDecoder.decodes((ByteBuffer) buff.flip());
+ assertEquals(decodeMsgs.size(), decodeMsgs.size());
+ long queueOffset = decodeMsgs.get(0).getQueueOffset();
+ long storeTimeStamp = decodeMsgs.get(0).getStoreTimestamp();
+ for (int i = 0; i < messages.size(); i++) {
+ assertEquals(messages.get(i).getTopic(), decodeMsgs.get(i).getTopic());
+ assertEquals(new String(messages.get(i).getBody()), new String(decodeMsgs.get(i).getBody()));
+ assertEquals(messages.get(i).getTags(), decodeMsgs.get(i).getTags());
+
+ assertEquals(messageExtBatch.getBornHostNameString(), decodeMsgs.get(i).getBornHostNameString());
+
+ assertEquals(messageExtBatch.getBornTimestamp(), decodeMsgs.get(i).getBornTimestamp());
+ assertEquals(storeTimeStamp, decodeMsgs.get(i).getStoreTimestamp());
+ assertEquals(queueOffset++, decodeMsgs.get(i).getQueueOffset());
+ }
+
+ // Expected to pass when message is not modified
+ buff.flip();
+ for (int i = 0; i < messages.size() - 1; i++) {
+ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false);
+ assertTrue(request.isSuccess());
+ }
+ // Modify the properties of the last message and expect the verification to fail.
+ int idx = buff.limit() - (propertiesLen / 2);
+ buff.put(idx, (byte) (buff.get(idx) + 1));
+ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false);
+ assertFalse(request.isSuccess());
+ }
+}
diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
index 43ca38eb4..768029ca1 100644
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -108,7 +108,7 @@ public class BatchPutMessageTest {
short propertiesLength = (short) propertiesBytes.length;
final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
- msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength + batchPropLen + 1) + msgLengthArr[j - 1];
+ msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength + batchPropLen) + msgLengthArr[j - 1];
j++;
}
byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);
diff --git a/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java
new file mode 100644
index 000000000..415dc3811
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MessageExtBrokerInnerTest {
+ @Test
+ public void testDeleteProperty() {
+ MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
+ String propertiesString = "";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA");
+
+ propertiesString = "__CRC32#\u0001";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("__CRC32#");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEmpty();
+
+ propertiesString = "__CRC32#";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("__CRC32#");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(propertiesString);
+ }
+
+}
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index db5c5af4c..7d659d2f6 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -465,7 +465,7 @@ public class AutoSwitchHATest {
// Step2: check flag SynchronizingSyncStateSet
Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet());
- Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1570);
+ Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1580);
Set<Long> syncStateSet = masterHAService.getSyncStateSet();
Assert.assertEquals(syncStateSet.size(), 2);
Assert.assertTrue(syncStateSet.contains(1L));
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 2e028ada3..588424304 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -74,7 +74,7 @@ public class CompositeQueueFlatFileTest {
ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer();
AppendResult result = flatFile.appendCommitLog(message);
Assert.assertEquals(AppendResult.SUCCESS, result);
- Assert.assertEquals(122L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
+ Assert.assertEquals(123L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition());
flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
index 1f38d4f6c..a413f2113 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
@@ -47,7 +47,7 @@ public class MessageBufferUtilTest {
+ 8 //Prepared Transaction Offset
+ 4 + 0 //BODY
+ 2 + 0 //TOPIC
- + 2 + 30 //properties
+ + 2 + 31 //properties
+ 0;
public static ByteBuffer buildMockedMessageBuffer() {
--
2.32.0.windows.2
From 48ef5ced4639699e3ba207b1a648b1fd47649a69 Mon Sep 17 00:00:00 2001
From: rongtong <jinrongtong5@163.com>
Date: Thu, 26 Oct 2023 14:43:24 +0800
Subject: [PATCH 2/2] [ISSUE #7505] Do not validate the length when deleting a
topic
---
.../rocketmq/broker/processor/AdminBrokerProcessor.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 0b7a6d206..004bf12ac 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -518,12 +518,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
String topic = requestHeader.getTopic();
- TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic);
- if (!result.isValid()) {
+
+ if (UtilAll.isBlank(topic)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(result.getRemark());
+ response.setRemark("The specified topic is blank.");
return response;
}
+
if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) {
if (TopicValidator.isSystemTopic(topic)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -2726,7 +2727,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
final EpochEntryCache entryCache = new EpochEntryCache(brokerConfig.getBrokerClusterName(),
- brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset());
+ brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset());
response.setBody(entryCache.encode());
response.setCode(ResponseCode.SUCCESS);
--
2.32.0.windows.2