1045 lines
51 KiB
Diff
1045 lines
51 KiB
Diff
From 91349f30b96db2e16b71d65a535d81f11b60bda5 Mon Sep 17 00:00:00 2001
|
|
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
|
|
Date: Wed, 25 Oct 2023 14:54:00 +0800
|
|
Subject: [PATCH 1/2] [ISSUE #7437] Add the CRC check of commitlog (#7468)
|
|
|
|
* Added CRC32 check for full data
|
|
|
|
* add unit test
|
|
|
|
* add MessageExtBrokerInnerTest
|
|
|
|
* fix codestyle
|
|
|
|
* fix codestyle
|
|
|
|
---------
|
|
|
|
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
|
|
---
|
|
.../org/apache/rocketmq/common/UtilAll.java | 14 ++
|
|
.../rocketmq/common/message/MessageConst.java | 2 +
|
|
.../common/message/MessageDecoder.java | 32 ++-
|
|
.../common/message/MessageExtBrokerInner.java | 49 +++++
|
|
.../common/MessageExtBrokerInnerTest.java | 93 ++++++++
|
|
.../org/apache/rocketmq/store/CommitLog.java | 87 +++++++-
|
|
.../rocketmq/store/MessageExtEncoder.java | 38 +++-
|
|
.../store/config/MessageStoreConfig.java | 23 ++
|
|
.../rocketmq/store/AppendPropCRCTest.java | 200 ++++++++++++++++++
|
|
.../rocketmq/store/BatchPutMessageTest.java | 2 +-
|
|
.../store/MessageExtBrokerInnerTest.java | 105 +++++++++
|
|
.../store/ha/autoswitch/AutoSwitchHATest.java | 2 +-
|
|
.../file/CompositeQueueFlatFileTest.java | 2 +-
|
|
.../util/MessageBufferUtilTest.java | 2 +-
|
|
14 files changed, 630 insertions(+), 21 deletions(-)
|
|
create mode 100644 common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java
|
|
create mode 100644 store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
|
|
create mode 100644 store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java
|
|
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
|
|
index d2b7c374b..95b6b09b4 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
|
|
@@ -307,6 +307,20 @@ public class UtilAll {
|
|
return (int) (crc32.getValue() & 0x7FFFFFFF);
|
|
}
|
|
|
|
+ public static int crc32(ByteBuffer byteBuffer) {
|
|
+ CRC32 crc32 = new CRC32();
|
|
+ crc32.update(byteBuffer);
|
|
+ return (int) (crc32.getValue() & 0x7FFFFFFF);
|
|
+ }
|
|
+
|
|
+ public static int crc32(ByteBuffer[] byteBuffers) {
|
|
+ CRC32 crc32 = new CRC32();
|
|
+ for (ByteBuffer buffer : byteBuffers) {
|
|
+ crc32.update(buffer);
|
|
+ }
|
|
+ return (int) (crc32.getValue() & 0x7FFFFFFF);
|
|
+ }
|
|
+
|
|
public static String bytes2string(byte[] src) {
|
|
char[] hexChars = new char[src.length * 2];
|
|
for (int j = 0; j < src.length; j++) {
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
|
|
index 87fed7c19..24f7bdb99 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
|
|
@@ -97,6 +97,7 @@ public class MessageConst {
|
|
public static final String PROPERTY_TIMER_DEL_UNIQKEY = "TIMER_DEL_UNIQKEY";
|
|
public static final String PROPERTY_TIMER_DELAY_LEVEL = "TIMER_DELAY_LEVEL";
|
|
public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS";
|
|
+ public static final String PROPERTY_CRC32 = "__CRC32#";
|
|
|
|
/**
|
|
* properties for DLQ
|
|
@@ -155,5 +156,6 @@ public class MessageConst {
|
|
STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP);
|
|
STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC);
|
|
STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
|
|
+ STRING_HASH_SET.add(PROPERTY_CRC32);
|
|
}
|
|
}
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
|
|
index 6de0b69fb..b053f8275 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
|
|
@@ -16,6 +16,7 @@
|
|
*/
|
|
package org.apache.rocketmq.common.message;
|
|
|
|
+import io.netty.buffer.ByteBuf;
|
|
import java.io.IOException;
|
|
import java.net.Inet4Address;
|
|
import java.net.InetAddress;
|
|
@@ -152,6 +153,34 @@ public class MessageDecoder {
|
|
return null;
|
|
}
|
|
|
|
+ public static void createCrc32(final ByteBuffer input, int crc32) {
|
|
+ input.put(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8));
|
|
+ input.put((byte) NAME_VALUE_SEPARATOR);
|
|
+ for (int i = 0; i < 10; i++) {
|
|
+ byte b = '0';
|
|
+ if (crc32 > 0) {
|
|
+ b += (byte) (crc32 % 10);
|
|
+ crc32 /= 10;
|
|
+ }
|
|
+ input.put(b);
|
|
+ }
|
|
+ input.put((byte) PROPERTY_SEPARATOR);
|
|
+ }
|
|
+
|
|
+ public static void createCrc32(final ByteBuf input, int crc32) {
|
|
+ input.writeBytes(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8));
|
|
+ input.writeByte((byte) NAME_VALUE_SEPARATOR);
|
|
+ for (int i = 0; i < 10; i++) {
|
|
+ byte b = '0';
|
|
+ if (crc32 > 0) {
|
|
+ b += (byte) (crc32 % 10);
|
|
+ crc32 /= 10;
|
|
+ }
|
|
+ input.writeByte(b);
|
|
+ }
|
|
+ input.writeByte((byte) PROPERTY_SEPARATOR);
|
|
+ }
|
|
+
|
|
public static MessageExt decode(ByteBuffer byteBuffer) {
|
|
return decode(byteBuffer, true, true, false);
|
|
}
|
|
@@ -601,9 +630,6 @@ public class MessageDecoder {
|
|
sb.append(value);
|
|
sb.append(PROPERTY_SEPARATOR);
|
|
}
|
|
- if (sb.length() > 0) {
|
|
- sb.deleteCharAt(sb.length() - 1);
|
|
- }
|
|
return sb.toString();
|
|
}
|
|
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
|
|
index 91599653c..4e5d3419a 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
|
|
@@ -20,6 +20,9 @@ import java.nio.ByteBuffer;
|
|
|
|
import org.apache.rocketmq.common.TopicFilterType;
|
|
|
|
+import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
|
|
+import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
|
|
+
|
|
public class MessageExtBrokerInner extends MessageExt {
|
|
private static final long serialVersionUID = 7256001576878700634L;
|
|
private String propertiesString;
|
|
@@ -55,6 +58,52 @@ public class MessageExtBrokerInner extends MessageExt {
|
|
this.propertiesString = propertiesString;
|
|
}
|
|
|
|
+
|
|
+ public void deleteProperty(String name) {
|
|
+ super.clearProperty(name);
|
|
+ if (propertiesString != null) {
|
|
+ int idx0 = 0;
|
|
+ int idx1;
|
|
+ int idx2;
|
|
+ idx1 = propertiesString.indexOf(name, idx0);
|
|
+ if (idx1 != -1) {
|
|
+ // cropping may be required
|
|
+ StringBuilder stringBuilder = new StringBuilder(propertiesString.length());
|
|
+ while (true) {
|
|
+ int startIdx = idx0;
|
|
+ while (true) {
|
|
+ idx1 = propertiesString.indexOf(name, startIdx);
|
|
+ if (idx1 == -1) {
|
|
+ break;
|
|
+ }
|
|
+ startIdx = idx1 + name.length();
|
|
+ if (idx1 == 0 || propertiesString.charAt(idx1 - 1) == PROPERTY_SEPARATOR) {
|
|
+ if (propertiesString.length() > idx1 + name.length()
|
|
+ && propertiesString.charAt(idx1 + name.length()) == NAME_VALUE_SEPARATOR) {
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ if (idx1 == -1) {
|
|
+ // there are no characters that need to be skipped. Append all remaining characters.
|
|
+ stringBuilder.append(propertiesString, idx0, propertiesString.length());
|
|
+ break;
|
|
+ }
|
|
+ // there are characters that need to be cropped
|
|
+ stringBuilder.append(propertiesString, idx0, idx1);
|
|
+ // move idx2 to the end of the cropped character
|
|
+ idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 + name.length() + 1);
|
|
+ // all subsequent characters will be cropped
|
|
+ if (idx2 == -1) {
|
|
+ break;
|
|
+ }
|
|
+ idx0 = idx2 + 1;
|
|
+ }
|
|
+ this.setPropertiesString(stringBuilder.toString());
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
public long getTagsCode() {
|
|
return tagsCode;
|
|
}
|
|
diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java
|
|
new file mode 100644
|
|
index 000000000..77d69e5ad
|
|
--- /dev/null
|
|
+++ b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java
|
|
@@ -0,0 +1,93 @@
|
|
+/**
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+package org.apache.rocketmq.common;
|
|
+
|
|
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
|
+import org.junit.Test;
|
|
+
|
|
+import static org.assertj.core.api.Assertions.assertThat;
|
|
+
|
|
+public class MessageExtBrokerInnerTest {
|
|
+ @Test
|
|
+ public void testDeleteProperty() {
|
|
+ MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
|
|
+ String propertiesString = "";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
|
|
+
|
|
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
|
|
+
|
|
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
|
|
+
|
|
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA");
|
|
+ }
|
|
+}
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
index 93102799b..3d3ee86b8 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
@@ -73,6 +73,10 @@ public class CommitLog implements Swappable {
|
|
protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
|
|
// End of file empty MAGIC CODE cbd43194
|
|
public final static int BLANK_MAGIC_CODE = -875286124;
|
|
+ /**
|
|
+ * CRC32 Format: [PROPERTY_CRC32 + NAME_VALUE_SEPARATOR + 10-digit fixed-length string + PROPERTY_SEPARATOR]
|
|
+ */
|
|
+ public static final int CRC32_RESERVED_LEN = MessageConst.PROPERTY_CRC32.length() + 1 + 10 + 1;
|
|
protected final MappedFileQueue mappedFileQueue;
|
|
protected final DefaultMessageStore defaultMessageStore;
|
|
|
|
@@ -96,6 +100,8 @@ public class CommitLog implements Swappable {
|
|
|
|
protected int commitLogSize;
|
|
|
|
+ private final boolean enabledAppendPropCRC;
|
|
+
|
|
public CommitLog(final DefaultMessageStore messageStore) {
|
|
String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog();
|
|
if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) {
|
|
@@ -117,7 +123,9 @@ public class CommitLog implements Swappable {
|
|
putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
|
|
@Override
|
|
protected PutMessageThreadLocal initialValue() {
|
|
- return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
|
|
+ return new PutMessageThreadLocal(
|
|
+ defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(),
|
|
+ defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC());
|
|
}
|
|
};
|
|
this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
|
|
@@ -127,6 +135,8 @@ public class CommitLog implements Swappable {
|
|
this.topicQueueLock = new TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum());
|
|
|
|
this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
|
|
+
|
|
+ this.enabledAppendPropCRC = messageStore.getMessageStoreConfig().isEnabledAppendPropCRC();
|
|
}
|
|
|
|
public void setFullStorePaths(Set<String> fullStorePaths) {
|
|
@@ -470,10 +480,16 @@ public class CommitLog implements Swappable {
|
|
byteBuffer.get(bytesContent, 0, bodyLen);
|
|
|
|
if (checkCRC) {
|
|
- int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
|
|
- if (crc != bodyCRC) {
|
|
- log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
|
|
- return new DispatchRequest(-1, false/* success */);
|
|
+ /**
|
|
+ * When the forceVerifyPropCRC = false,
|
|
+ * use original bodyCrc validation.
|
|
+ */
|
|
+ if (!this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) {
|
|
+ int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
|
|
+ if (crc != bodyCRC) {
|
|
+ log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
|
|
+ return new DispatchRequest(-1, false/* success */);
|
|
+ }
|
|
}
|
|
}
|
|
} else {
|
|
@@ -531,6 +547,43 @@ public class CommitLog implements Swappable {
|
|
}
|
|
}
|
|
|
|
+ if (checkCRC) {
|
|
+ /**
|
|
+ * When the forceVerifyPropCRC = true,
|
|
+ * Crc verification needs to be performed on the entire message data (excluding the length reserved at the tail)
|
|
+ */
|
|
+ if (this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) {
|
|
+ int expectedCRC = -1;
|
|
+ if (propertiesMap != null) {
|
|
+ String crc32Str = propertiesMap.get(MessageConst.PROPERTY_CRC32);
|
|
+ if (crc32Str != null) {
|
|
+ expectedCRC = 0;
|
|
+ for (int i = crc32Str.length() - 1; i >= 0; i--) {
|
|
+ int num = crc32Str.charAt(i) - '0';
|
|
+ expectedCRC *= 10;
|
|
+ expectedCRC += num;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ if (expectedCRC > 0) {
|
|
+ ByteBuffer tmpBuffer = byteBuffer.duplicate();
|
|
+ tmpBuffer.position(tmpBuffer.position() - totalSize);
|
|
+ tmpBuffer.limit(tmpBuffer.position() + totalSize - CommitLog.CRC32_RESERVED_LEN);
|
|
+ int crc = UtilAll.crc32(tmpBuffer);
|
|
+ if (crc != expectedCRC) {
|
|
+ log.warn(
|
|
+ "CommitLog#checkAndDispatchMessage: failed to check message CRC, expected "
|
|
+ + "CRC={}, actual CRC={}", bodyCRC, crc);
|
|
+ return new DispatchRequest(-1, false/* success */);
|
|
+ }
|
|
+ } else {
|
|
+ log.warn(
|
|
+ "CommitLog#checkAndDispatchMessage: failed to check message CRC, not found CRC in properties");
|
|
+ return new DispatchRequest(-1, false/* success */);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
int readLength = MessageExtEncoder.calMsgLength(messageVersion, sysFlag, bodyLen, topicLen, propertiesLength);
|
|
if (totalSize != readLength) {
|
|
doNothingForDeadCode(reconsumeTimes);
|
|
@@ -846,9 +899,12 @@ public class CommitLog implements Swappable {
|
|
if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
|
|
msg.setStoreTimestamp(System.currentTimeMillis());
|
|
}
|
|
-
|
|
// Set the message body CRC (consider the most appropriate setting on the client)
|
|
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
|
|
+ if (enabledAppendPropCRC) {
|
|
+ // delete crc32 properties if exist
|
|
+ msg.deleteProperty(MessageConst.PROPERTY_CRC32);
|
|
+ }
|
|
// Back to Results
|
|
AppendMessageResult result = null;
|
|
|
|
@@ -1764,6 +1820,7 @@ public class CommitLog implements Swappable {
|
|
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
|
|
// Store the message content
|
|
private final ByteBuffer msgStoreItemMemory;
|
|
+ private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN;
|
|
|
|
DefaultAppendMessageCallback() {
|
|
this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
|
|
@@ -1837,6 +1894,15 @@ public class CommitLog implements Swappable {
|
|
pos += 8 + 4 + 8 + ipLen;
|
|
// refresh store time stamp in lock
|
|
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
|
|
+ if (enabledAppendPropCRC) {
|
|
+ // 18 CRC32
|
|
+ int checkSize = msgLen - crc32ReservedLength;
|
|
+ ByteBuffer tmpBuffer = preEncodeBuffer.duplicate();
|
|
+ tmpBuffer.limit(tmpBuffer.position() + checkSize);
|
|
+ int crc32 = UtilAll.crc32(tmpBuffer);
|
|
+ tmpBuffer.limit(tmpBuffer.position() + crc32ReservedLength);
|
|
+ MessageDecoder.createCrc32(tmpBuffer, crc32);
|
|
+ }
|
|
|
|
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
|
|
CommitLog.this.getMessageStore().getPerfCounter().startTick("WRITE_MEMORY_TIME_MS");
|
|
@@ -1918,6 +1984,15 @@ public class CommitLog implements Swappable {
|
|
pos += 8 + 4 + 8 + bornHostLength;
|
|
// refresh store time stamp in lock
|
|
messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp());
|
|
+ if (enabledAppendPropCRC) {
|
|
+ //append crc32
|
|
+ int checkSize = msgLen - crc32ReservedLength;
|
|
+ ByteBuffer tmpBuffer = messagesByteBuff.duplicate();
|
|
+ tmpBuffer.position(msgPos).limit(msgPos + checkSize);
|
|
+ int crc32 = UtilAll.crc32(tmpBuffer);
|
|
+ messagesByteBuff.position(msgPos + checkSize);
|
|
+ MessageDecoder.createCrc32(messagesByteBuff, crc32);
|
|
+ }
|
|
|
|
putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen;
|
|
queueOffset++;
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
|
|
index ee609a337..c1d808728 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
|
|
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store;
|
|
import io.netty.buffer.ByteBuf;
|
|
import io.netty.buffer.ByteBufAllocator;
|
|
import io.netty.buffer.UnpooledByteBufAllocator;
|
|
+import java.nio.ByteBuffer;
|
|
import org.apache.rocketmq.common.UtilAll;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.message.MessageDecoder;
|
|
@@ -29,8 +30,6 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
|
|
-import java.nio.ByteBuffer;
|
|
-
|
|
public class MessageExtEncoder {
|
|
protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
|
|
private ByteBuf byteBuf;
|
|
@@ -38,7 +37,13 @@ public class MessageExtEncoder {
|
|
private int maxMessageBodySize;
|
|
// The maximum length of the full message.
|
|
private int maxMessageSize;
|
|
+ private final int crc32ReservedLength;
|
|
+
|
|
public MessageExtEncoder(final int maxMessageBodySize) {
|
|
+ this(maxMessageBodySize, false);
|
|
+ }
|
|
+
|
|
+ public MessageExtEncoder(final int maxMessageBodySize, boolean enabledAppendPropCRC) {
|
|
ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
|
|
//Reserve 64kb for encoding buffer outside body
|
|
int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ?
|
|
@@ -46,6 +51,7 @@ public class MessageExtEncoder {
|
|
byteBuf = alloc.directBuffer(maxMessageSize);
|
|
this.maxMessageBodySize = maxMessageBodySize;
|
|
this.maxMessageSize = maxMessageSize;
|
|
+ this.crc32ReservedLength = enabledAppendPropCRC ? CommitLog.CRC32_RESERVED_LEN : 0;
|
|
}
|
|
|
|
public static int calMsgLength(MessageVersion messageVersion,
|
|
@@ -81,10 +87,13 @@ public class MessageExtEncoder {
|
|
final byte[] propertiesData =
|
|
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
|
|
|
|
- final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
|
|
+ boolean needAppendLastPropertySeparator = crc32ReservedLength > 0 && propertiesData != null && propertiesData.length > 0
|
|
+ && propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR;
|
|
+
|
|
+ final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength;
|
|
|
|
if (propertiesLength > Short.MAX_VALUE) {
|
|
- log.warn("putMessage message properties length too long. length={}", propertiesData.length);
|
|
+ log.warn("putMessage message properties length too long. length={}", propertiesLength);
|
|
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
|
|
}
|
|
|
|
@@ -160,8 +169,14 @@ public class MessageExtEncoder {
|
|
|
|
// 17 PROPERTIES
|
|
this.byteBuf.writeShort((short) propertiesLength);
|
|
- if (propertiesLength > 0)
|
|
+ if (propertiesLength > crc32ReservedLength) {
|
|
this.byteBuf.writeBytes(propertiesData);
|
|
+ }
|
|
+ if (needAppendLastPropertySeparator) {
|
|
+ this.byteBuf.writeByte((byte) MessageDecoder.PROPERTY_SEPARATOR);
|
|
+ }
|
|
+ // 18 CRC32
|
|
+ this.byteBuf.writerIndex(this.byteBuf.writerIndex() + crc32ReservedLength);
|
|
|
|
return null;
|
|
}
|
|
@@ -213,10 +228,11 @@ public class MessageExtEncoder {
|
|
final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
|
|
|
|
final int topicLength = topicData.length;
|
|
- final int topicLengthSize = messageExtBatch.getVersion().getTopicLengthSize();
|
|
int totalPropLen = needAppendLastPropertySeparator ?
|
|
- propertiesLen + batchPropLen + topicLengthSize : propertiesLen + batchPropLen;
|
|
+ propertiesLen + batchPropLen + 1 : propertiesLen + batchPropLen;
|
|
|
|
+ // properties need to add crc32
|
|
+ totalPropLen += crc32ReservedLength;
|
|
final int msgLen = calMsgLength(
|
|
messageExtBatch.getVersion(), messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen);
|
|
|
|
@@ -278,6 +294,7 @@ public class MessageExtEncoder {
|
|
}
|
|
this.byteBuf.writeBytes(batchPropData, 0, batchPropLen);
|
|
}
|
|
+ this.byteBuf.writerIndex(this.byteBuf.writerIndex() + crc32ReservedLength);
|
|
}
|
|
putMessageContext.setBatchSize(batchSize);
|
|
putMessageContext.setPhyPos(new long[batchSize]);
|
|
@@ -304,8 +321,13 @@ public class MessageExtEncoder {
|
|
static class PutMessageThreadLocal {
|
|
private final MessageExtEncoder encoder;
|
|
private final StringBuilder keyBuilder;
|
|
+
|
|
PutMessageThreadLocal(int size) {
|
|
- encoder = new MessageExtEncoder(size);
|
|
+ this(size, false);
|
|
+ }
|
|
+
|
|
+ PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) {
|
|
+ encoder = new MessageExtEncoder(size, enabledAppendPropCRC);
|
|
keyBuilder = new StringBuilder();
|
|
}
|
|
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
|
|
index 028facbdc..8cb3ea6e9 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
|
|
@@ -270,6 +270,12 @@ public class MessageStoreConfig {
|
|
*/
|
|
private boolean autoMessageVersionOnTopicLen = true;
|
|
|
|
+ /**
|
|
+ * It cannot be changed after the broker is started.
|
|
+ * Modifications need to be restarted to take effect.
|
|
+ */
|
|
+ private boolean enabledAppendPropCRC = false;
|
|
+ private boolean forceVerifyPropCRC = false;
|
|
private int travelCqFileNumWhenGetMessage = 1;
|
|
// Sleep interval between to corrections
|
|
private int correctLogicMinOffsetSleepInterval = 1;
|
|
@@ -405,6 +411,14 @@ public class MessageStoreConfig {
|
|
|
|
private int topicQueueLockNum = 32;
|
|
|
|
+ public boolean isEnabledAppendPropCRC() {
|
|
+ return enabledAppendPropCRC;
|
|
+ }
|
|
+
|
|
+ public void setEnabledAppendPropCRC(boolean enabledAppendPropCRC) {
|
|
+ this.enabledAppendPropCRC = enabledAppendPropCRC;
|
|
+ }
|
|
+
|
|
public boolean isDebugLockEnable() {
|
|
return debugLockEnable;
|
|
}
|
|
@@ -640,6 +654,15 @@ public class MessageStoreConfig {
|
|
this.checkCRCOnRecover = checkCRCOnRecover;
|
|
}
|
|
|
|
+ public boolean isForceVerifyPropCRC() {
|
|
+ return forceVerifyPropCRC;
|
|
+ }
|
|
+
|
|
+ public void setForceVerifyPropCRC(boolean forceVerifyPropCRC) {
|
|
+ this.forceVerifyPropCRC = forceVerifyPropCRC;
|
|
+ }
|
|
+
|
|
+
|
|
public String getStorePathCommitLog() {
|
|
if (storePathCommitLog == null) {
|
|
return storePathRootDir + File.separator + "commitlog";
|
|
diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
|
|
new file mode 100644
|
|
index 000000000..c8ed4d74d
|
|
--- /dev/null
|
|
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
|
|
@@ -0,0 +1,200 @@
|
|
+/**
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+
|
|
+package org.apache.rocketmq.store;
|
|
+
|
|
+import java.io.File;
|
|
+import java.net.InetSocketAddress;
|
|
+import java.nio.ByteBuffer;
|
|
+import java.util.ArrayList;
|
|
+import java.util.HashSet;
|
|
+import java.util.List;
|
|
+import java.util.Set;
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
+import org.apache.rocketmq.common.BrokerConfig;
|
|
+import org.apache.rocketmq.common.UtilAll;
|
|
+import org.apache.rocketmq.common.message.Message;
|
|
+import org.apache.rocketmq.common.message.MessageDecoder;
|
|
+import org.apache.rocketmq.common.message.MessageExt;
|
|
+import org.apache.rocketmq.common.message.MessageExtBatch;
|
|
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
|
+import org.apache.rocketmq.store.config.MessageStoreConfig;
|
|
+import org.junit.After;
|
|
+import org.junit.Before;
|
|
+import org.junit.Test;
|
|
+
|
|
+import static org.junit.Assert.assertEquals;
|
|
+import static org.junit.Assert.assertFalse;
|
|
+import static org.junit.Assert.assertTrue;
|
|
+
|
|
+public class AppendPropCRCTest {
|
|
+
|
|
+ AppendMessageCallback callback;
|
|
+
|
|
+ MessageExtEncoder encoder;
|
|
+
|
|
+ CommitLog commitLog;
|
|
+
|
|
+ @Before
|
|
+ public void init() throws Exception {
|
|
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
|
|
+ messageStoreConfig.setMappedFileSizeCommitLog(1024 * 8);
|
|
+ messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
|
|
+ messageStoreConfig.setMaxHashSlotNum(100);
|
|
+ messageStoreConfig.setMaxIndexNum(100 * 10);
|
|
+ messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore");
|
|
+ messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog");
|
|
+ messageStoreConfig.setForceVerifyPropCRC(true);
|
|
+ messageStoreConfig.setEnabledAppendPropCRC(true);
|
|
+ //too much reference
|
|
+ DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>());
|
|
+ commitLog = new CommitLog(messageStore);
|
|
+ encoder = new MessageExtEncoder(10 * 1024 * 1024, true);
|
|
+ callback = commitLog.new DefaultAppendMessageCallback();
|
|
+ }
|
|
+
|
|
+ @After
|
|
+ public void destroy() {
|
|
+ UtilAll.deleteFile(new File(System.getProperty("user.home") + File.separator + "unitteststore"));
|
|
+ }
|
|
+
|
|
+ @Test
|
|
+ public void testAppendMessageSucc() throws Exception {
|
|
+ String topic = "test-topic";
|
|
+ int queue = 0;
|
|
+ int msgNum = 10;
|
|
+ int propertiesLen = 0;
|
|
+ Message msg = new Message();
|
|
+ msg.setBody("body".getBytes());
|
|
+ msg.setTopic(topic);
|
|
+ msg.setTags("abc");
|
|
+ msg.putUserProperty("a", "aaaaaaaa");
|
|
+ msg.putUserProperty("b", "bbbbbbbb");
|
|
+ msg.putUserProperty("c", "cccccccc");
|
|
+ msg.putUserProperty("d", "dddddddd");
|
|
+ msg.putUserProperty("e", "eeeeeeee");
|
|
+ msg.putUserProperty("f", "ffffffff");
|
|
+
|
|
+ MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
|
|
+ messageExtBrokerInner.setTopic(topic);
|
|
+ messageExtBrokerInner.setQueueId(queue);
|
|
+ messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
|
|
+ messageExtBrokerInner.setBornHost(new InetSocketAddress("127.0.0.1", 123));
|
|
+ messageExtBrokerInner.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
|
|
+ messageExtBrokerInner.setBody(msg.getBody());
|
|
+ messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
|
|
+ propertiesLen = messageExtBrokerInner.getPropertiesString().length();
|
|
+
|
|
+ ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
|
|
+ for (int i = 0; i < msgNum; i++) {
|
|
+ encoder.encode(messageExtBrokerInner);
|
|
+ messageExtBrokerInner.setEncodedBuff(encoder.getEncoderBuffer());
|
|
+ AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBrokerInner, null);
|
|
+ assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
|
|
+ }
|
|
+ // Expected to pass when message is not modified
|
|
+ buff.flip();
|
|
+ for (int i = 0; i < msgNum - 1; i++) {
|
|
+ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false);
|
|
+ assertTrue(request.isSuccess());
|
|
+ }
|
|
+ // Modify the properties of the last message and expect the verification to fail.
|
|
+ int idx = buff.limit() - (propertiesLen / 2);
|
|
+ buff.put(idx, (byte) (buff.get(idx) + 1));
|
|
+ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false);
|
|
+ assertFalse(request.isSuccess());
|
|
+ }
|
|
+
|
|
+ @Test
|
|
+ public void testAppendMessageBatchSucc() throws Exception {
|
|
+ List<Message> messages = new ArrayList<>();
|
|
+ String topic = "test-topic";
|
|
+ int queue = 0;
|
|
+ int propertiesLen = 0;
|
|
+ for (int i = 0; i < 10; i++) {
|
|
+ Message msg = new Message();
|
|
+ msg.setBody("body".getBytes());
|
|
+ msg.setTopic(topic);
|
|
+ msg.setTags("abc");
|
|
+ msg.putUserProperty("a", "aaaaaaaa");
|
|
+ msg.putUserProperty("b", "bbbbbbbb");
|
|
+ msg.putUserProperty("c", "cccccccc");
|
|
+ msg.putUserProperty("d", "dddddddd");
|
|
+ msg.putUserProperty("e", "eeeeeeee");
|
|
+ msg.putUserProperty("f", "ffffffff");
|
|
+ String propertiesString = MessageDecoder.messageProperties2String(msg.getProperties());
|
|
+ propertiesLen = propertiesString.length();
|
|
+ messages.add(msg);
|
|
+ }
|
|
+ MessageExtBatch messageExtBatch = new MessageExtBatch();
|
|
+ messageExtBatch.setTopic(topic);
|
|
+ messageExtBatch.setQueueId(queue);
|
|
+ messageExtBatch.setBornTimestamp(System.currentTimeMillis());
|
|
+ messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 123));
|
|
+ messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
|
|
+ messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
|
|
+
|
|
+ PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
|
|
+ messageExtBatch.setEncodedBuff(encoder.encode(messageExtBatch, putMessageContext));
|
|
+ ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
|
|
+ //encounter end of file when append half of the data
|
|
+ AppendMessageResult allresult =
|
|
+ callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext);
|
|
+
|
|
+ assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
|
|
+ assertEquals(0, allresult.getWroteOffset());
|
|
+ assertEquals(0, allresult.getLogicsOffset());
|
|
+ assertEquals(buff.position(), allresult.getWroteBytes());
|
|
+
|
|
+ assertEquals(messages.size(), allresult.getMsgNum());
|
|
+
|
|
+ Set<String> msgIds = new HashSet<>();
|
|
+ for (String msgId : allresult.getMsgId().split(",")) {
|
|
+ assertEquals(32, msgId.length());
|
|
+ msgIds.add(msgId);
|
|
+ }
|
|
+ assertEquals(messages.size(), msgIds.size());
|
|
+
|
|
+ List<MessageExt> decodeMsgs = MessageDecoder.decodes((ByteBuffer) buff.flip());
|
|
+ assertEquals(decodeMsgs.size(), decodeMsgs.size());
|
|
+ long queueOffset = decodeMsgs.get(0).getQueueOffset();
|
|
+ long storeTimeStamp = decodeMsgs.get(0).getStoreTimestamp();
|
|
+ for (int i = 0; i < messages.size(); i++) {
|
|
+ assertEquals(messages.get(i).getTopic(), decodeMsgs.get(i).getTopic());
|
|
+ assertEquals(new String(messages.get(i).getBody()), new String(decodeMsgs.get(i).getBody()));
|
|
+ assertEquals(messages.get(i).getTags(), decodeMsgs.get(i).getTags());
|
|
+
|
|
+ assertEquals(messageExtBatch.getBornHostNameString(), decodeMsgs.get(i).getBornHostNameString());
|
|
+
|
|
+ assertEquals(messageExtBatch.getBornTimestamp(), decodeMsgs.get(i).getBornTimestamp());
|
|
+ assertEquals(storeTimeStamp, decodeMsgs.get(i).getStoreTimestamp());
|
|
+ assertEquals(queueOffset++, decodeMsgs.get(i).getQueueOffset());
|
|
+ }
|
|
+
|
|
+ // Expected to pass when message is not modified
|
|
+ buff.flip();
|
|
+ for (int i = 0; i < messages.size() - 1; i++) {
|
|
+ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false);
|
|
+ assertTrue(request.isSuccess());
|
|
+ }
|
|
+ // Modify the properties of the last message and expect the verification to fail.
|
|
+ int idx = buff.limit() - (propertiesLen / 2);
|
|
+ buff.put(idx, (byte) (buff.get(idx) + 1));
|
|
+ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, true, false);
|
|
+ assertFalse(request.isSuccess());
|
|
+ }
|
|
+}
|
|
diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
|
|
index 43ca38eb4..768029ca1 100644
|
|
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
|
|
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
|
|
@@ -108,7 +108,7 @@ public class BatchPutMessageTest {
|
|
short propertiesLength = (short) propertiesBytes.length;
|
|
final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
|
|
final int topicLength = topicData.length;
|
|
- msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength + batchPropLen + 1) + msgLengthArr[j - 1];
|
|
+ msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength + batchPropLen) + msgLengthArr[j - 1];
|
|
j++;
|
|
}
|
|
byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);
|
|
diff --git a/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java
|
|
new file mode 100644
|
|
index 000000000..415dc3811
|
|
--- /dev/null
|
|
+++ b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java
|
|
@@ -0,0 +1,105 @@
|
|
+/**
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+
|
|
+package org.apache.rocketmq.store;
|
|
+
|
|
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
|
+import org.junit.Test;
|
|
+
|
|
+import static org.assertj.core.api.Assertions.assertThat;
|
|
+
|
|
+public class MessageExtBrokerInnerTest {
|
|
+ @Test
|
|
+ public void testDeleteProperty() {
|
|
+ MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
|
|
+ String propertiesString = "";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
|
|
+
|
|
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
|
|
+
|
|
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
|
|
+
|
|
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001");
|
|
+
|
|
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("KeyA");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA");
|
|
+
|
|
+ propertiesString = "__CRC32#\u0001";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("__CRC32#");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEmpty();
|
|
+
|
|
+ propertiesString = "__CRC32#";
|
|
+ messageExtBrokerInner.setPropertiesString(propertiesString);
|
|
+ messageExtBrokerInner.deleteProperty("__CRC32#");
|
|
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(propertiesString);
|
|
+ }
|
|
+
|
|
+}
|
|
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
|
|
index db5c5af4c..7d659d2f6 100644
|
|
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
|
|
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
|
|
@@ -465,7 +465,7 @@ public class AutoSwitchHATest {
|
|
|
|
// Step2: check flag SynchronizingSyncStateSet
|
|
Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet());
|
|
- Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1570);
|
|
+ Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1580);
|
|
Set<Long> syncStateSet = masterHAService.getSyncStateSet();
|
|
Assert.assertEquals(syncStateSet.size(), 2);
|
|
Assert.assertTrue(syncStateSet.contains(1L));
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
|
|
index 2e028ada3..588424304 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
|
|
@@ -74,7 +74,7 @@ public class CompositeQueueFlatFileTest {
|
|
ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer();
|
|
AppendResult result = flatFile.appendCommitLog(message);
|
|
Assert.assertEquals(AppendResult.SUCCESS, result);
|
|
- Assert.assertEquals(122L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
|
|
+ Assert.assertEquals(123L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
|
|
Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition());
|
|
|
|
flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq);
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
|
|
index 1f38d4f6c..a413f2113 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
|
|
@@ -47,7 +47,7 @@ public class MessageBufferUtilTest {
|
|
+ 8 //Prepared Transaction Offset
|
|
+ 4 + 0 //BODY
|
|
+ 2 + 0 //TOPIC
|
|
- + 2 + 30 //properties
|
|
+ + 2 + 31 //properties
|
|
+ 0;
|
|
|
|
public static ByteBuffer buildMockedMessageBuffer() {
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 48ef5ced4639699e3ba207b1a648b1fd47649a69 Mon Sep 17 00:00:00 2001
|
|
From: rongtong <jinrongtong5@163.com>
|
|
Date: Thu, 26 Oct 2023 14:43:24 +0800
|
|
Subject: [PATCH 2/2] [ISSUE #7505] Do not validate the length when deleting a
|
|
topic
|
|
|
|
---
|
|
.../rocketmq/broker/processor/AdminBrokerProcessor.java | 9 +++++----
|
|
1 file changed, 5 insertions(+), 4 deletions(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
|
index 0b7a6d206..004bf12ac 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
|
@@ -518,12 +518,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
|
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
|
|
|
|
String topic = requestHeader.getTopic();
|
|
- TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic);
|
|
- if (!result.isValid()) {
|
|
+
|
|
+ if (UtilAll.isBlank(topic)) {
|
|
response.setCode(ResponseCode.SYSTEM_ERROR);
|
|
- response.setRemark(result.getRemark());
|
|
+ response.setRemark("The specified topic is blank.");
|
|
return response;
|
|
}
|
|
+
|
|
if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) {
|
|
if (TopicValidator.isSystemTopic(topic)) {
|
|
response.setCode(ResponseCode.SYSTEM_ERROR);
|
|
@@ -2726,7 +2727,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
|
return response;
|
|
}
|
|
final EpochEntryCache entryCache = new EpochEntryCache(brokerConfig.getBrokerClusterName(),
|
|
- brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset());
|
|
+ brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset());
|
|
|
|
response.setBody(entryCache.encode());
|
|
response.setCode(ResponseCode.SUCCESS);
|
|
--
|
|
2.32.0.windows.2
|
|
|