backport some enhancement of tiered storage
This commit is contained in:
parent
3260b3ca94
commit
d79c72c4c5
601
patch018-backport-enhancement-of-tiered-storage.patch
Normal file
601
patch018-backport-enhancement-of-tiered-storage.patch
Normal file
@ -0,0 +1,601 @@
|
||||
From 1a8e7cb17cb29ed33b0196b52e452a6e76ade781 Mon Sep 17 00:00:00 2001
|
||||
From: yuz10 <845238369@qq.com>
|
||||
Date: Tue, 12 Sep 2023 19:33:41 +0800
|
||||
Subject: [PATCH 1/5] [ISSUE #7345] Fix wrong result of searchOffset in tiered
|
||||
storage
|
||||
|
||||
---
|
||||
.../tieredstore/file/TieredFlatFile.java | 5 +-
|
||||
.../tieredstore/file/TieredFlatFileTest.java | 46 +++++++++++++++++--
|
||||
2 files changed, 46 insertions(+), 5 deletions(-)
|
||||
|
||||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
||||
index 426c4e09d..d973179ee 100644
|
||||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
||||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
||||
@@ -365,7 +365,10 @@ public class TieredFlatFile {
|
||||
if (!segmentList.isEmpty()) {
|
||||
return boundaryType == BoundaryType.UPPER ? segmentList.get(0) : segmentList.get(segmentList.size() - 1);
|
||||
}
|
||||
- return fileSegmentList.isEmpty() ? null : fileSegmentList.get(fileSegmentList.size() - 1);
|
||||
+ if (fileSegmentList.isEmpty()) {
|
||||
+ return null;
|
||||
+ }
|
||||
+ return boundaryType == BoundaryType.UPPER ? fileSegmentList.get(fileSegmentList.size() - 1) : fileSegmentList.get(0);
|
||||
} finally {
|
||||
fileSegmentLock.readLock().unlock();
|
||||
}
|
||||
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
|
||||
index 7a4d05969..7e2fbf201 100644
|
||||
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
|
||||
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
|
||||
@@ -16,10 +16,7 @@
|
||||
*/
|
||||
package org.apache.rocketmq.tieredstore.file;
|
||||
|
||||
-import java.io.IOException;
|
||||
-import java.nio.ByteBuffer;
|
||||
-import java.util.ArrayList;
|
||||
-import java.util.List;
|
||||
+import org.apache.rocketmq.common.BoundaryType;
|
||||
import org.apache.rocketmq.common.message.MessageQueue;
|
||||
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
|
||||
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
||||
@@ -35,6 +32,11 @@ import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
+import java.io.IOException;
|
||||
+import java.nio.ByteBuffer;
|
||||
+import java.util.ArrayList;
|
||||
+import java.util.List;
|
||||
+
|
||||
public class TieredFlatFileTest {
|
||||
|
||||
private final String storePath = TieredStoreTestUtil.getRandomStorePath();
|
||||
@@ -301,4 +303,40 @@ public class TieredFlatFileTest {
|
||||
fileQueue.rollingNewFile();
|
||||
Assert.assertEquals(2, fileQueue.getFileSegmentCount());
|
||||
}
|
||||
+
|
||||
+ @Test
|
||||
+ public void testGetFileByTime() {
|
||||
+ String filePath = TieredStoreUtil.toPath(queue);
|
||||
+ TieredFlatFile tieredFlatFile = fileQueueFactory.createFlatFileForCommitLog(filePath);
|
||||
+ TieredFileSegment fileSegment1 = new MemoryFileSegment(FileSegmentType.CONSUME_QUEUE, queue, 1100, storeConfig);
|
||||
+ fileSegment1.setMinTimestamp(100);
|
||||
+ fileSegment1.setMaxTimestamp(200);
|
||||
+
|
||||
+ TieredFileSegment fileSegment2 = new MemoryFileSegment(FileSegmentType.CONSUME_QUEUE, queue, 1100, storeConfig);
|
||||
+ fileSegment2.setMinTimestamp(200);
|
||||
+ fileSegment2.setMaxTimestamp(300);
|
||||
+
|
||||
+ tieredFlatFile.getFileSegmentList().add(fileSegment1);
|
||||
+ tieredFlatFile.getFileSegmentList().add(fileSegment2);
|
||||
+
|
||||
+ TieredFileSegment segmentUpper = tieredFlatFile.getFileByTime(400, BoundaryType.UPPER);
|
||||
+ Assert.assertEquals(fileSegment2, segmentUpper);
|
||||
+
|
||||
+ TieredFileSegment segmentLower = tieredFlatFile.getFileByTime(400, BoundaryType.LOWER);
|
||||
+ Assert.assertEquals(fileSegment2, segmentLower);
|
||||
+
|
||||
+
|
||||
+ TieredFileSegment segmentUpper2 = tieredFlatFile.getFileByTime(0, BoundaryType.UPPER);
|
||||
+ Assert.assertEquals(fileSegment1, segmentUpper2);
|
||||
+
|
||||
+ TieredFileSegment segmentLower2 = tieredFlatFile.getFileByTime(0, BoundaryType.LOWER);
|
||||
+ Assert.assertEquals(fileSegment1, segmentLower2);
|
||||
+
|
||||
+
|
||||
+ TieredFileSegment segmentUpper3 = tieredFlatFile.getFileByTime(200, BoundaryType.UPPER);
|
||||
+ Assert.assertEquals(fileSegment1, segmentUpper3);
|
||||
+
|
||||
+ TieredFileSegment segmentLower3 = tieredFlatFile.getFileByTime(200, BoundaryType.LOWER);
|
||||
+ Assert.assertEquals(fileSegment2, segmentLower3);
|
||||
+ }
|
||||
}
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From fd32dae2ab59f86dd215eeec405bf4fa6212bcb3 Mon Sep 17 00:00:00 2001
|
||||
From: lizhimins <707364882@qq.com>
|
||||
Date: Tue, 12 Sep 2023 19:58:08 +0800
|
||||
Subject: [PATCH 2/5] [ISSUE #6633] Not clear uninitialized files and fix
|
||||
metadata recover (#7342)
|
||||
|
||||
---
|
||||
.../tieredstore/file/TieredFlatFile.java | 53 +++++++------------
|
||||
.../file/TieredFlatFileManager.java | 10 ++--
|
||||
2 files changed, 22 insertions(+), 41 deletions(-)
|
||||
|
||||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
||||
index d973179ee..d96eb6e8f 100644
|
||||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
||||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
||||
@@ -16,7 +16,6 @@
|
||||
*/
|
||||
package org.apache.rocketmq.tieredstore.file;
|
||||
|
||||
-import com.alibaba.fastjson.JSON;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
@@ -25,13 +24,13 @@ import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
-import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
+import org.apache.rocketmq.common.BoundaryType;
|
||||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||||
import org.apache.rocketmq.tieredstore.common.AppendResult;
|
||||
@@ -43,7 +42,6 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
|
||||
import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator;
|
||||
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
|
||||
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
||||
-import org.apache.rocketmq.common.BoundaryType;
|
||||
|
||||
public class TieredFlatFile {
|
||||
|
||||
@@ -177,7 +175,10 @@ public class TieredFlatFile {
|
||||
}
|
||||
}
|
||||
|
||||
- private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) {
|
||||
+ /**
|
||||
+ * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full
|
||||
+ */
|
||||
+ public void updateFileSegment(TieredFileSegment fileSegment) {
|
||||
|
||||
FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
|
||||
this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
|
||||
@@ -186,45 +187,24 @@ public class TieredFlatFile {
|
||||
if (metadata == null) {
|
||||
metadata = new FileSegmentMetadata(
|
||||
this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType());
|
||||
- metadata.setCreateTimestamp(fileSegment.getMinTimestamp());
|
||||
- metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
|
||||
- metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
|
||||
- if (fileSegment.isClosed()) {
|
||||
- metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
|
||||
- }
|
||||
- this.tieredMetadataStore.updateFileSegment(metadata);
|
||||
+ metadata.setCreateTimestamp(System.currentTimeMillis());
|
||||
}
|
||||
- return metadata;
|
||||
- }
|
||||
-
|
||||
- /**
|
||||
- * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full
|
||||
- */
|
||||
- public void updateFileSegment(TieredFileSegment fileSegment) {
|
||||
- FileSegmentMetadata segmentMetadata = getOrCreateFileSegmentMetadata(fileSegment);
|
||||
|
||||
- if (segmentMetadata.getStatus() == FileSegmentMetadata.STATUS_NEW
|
||||
- && fileSegment.isFull()
|
||||
- && !fileSegment.needCommit()) {
|
||||
+ metadata.setSize(fileSegment.getCommitPosition());
|
||||
+ metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
|
||||
+ metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
|
||||
|
||||
- segmentMetadata.markSealed();
|
||||
+ if (fileSegment.isFull() && !fileSegment.needCommit()) {
|
||||
+ if (metadata.getStatus() == FileSegmentMetadata.STATUS_NEW) {
|
||||
+ metadata.markSealed();
|
||||
+ }
|
||||
}
|
||||
|
||||
if (fileSegment.isClosed()) {
|
||||
- segmentMetadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
|
||||
+ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
|
||||
}
|
||||
|
||||
- segmentMetadata.setSize(fileSegment.getCommitPosition());
|
||||
- segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp());
|
||||
-
|
||||
- FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
|
||||
- this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
|
||||
-
|
||||
- if (!Objects.equals(metadata, segmentMetadata)) {
|
||||
- this.tieredMetadataStore.updateFileSegment(segmentMetadata);
|
||||
- logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, content: {}",
|
||||
- segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata));
|
||||
- }
|
||||
+ this.tieredMetadataStore.updateFileSegment(metadata);
|
||||
}
|
||||
|
||||
private void checkAndFixFileSize() {
|
||||
@@ -598,6 +578,9 @@ public class TieredFlatFile {
|
||||
logger.error("TieredFlatFile#destroy: mark file segment: {} is deleted failed", fileSegment.getPath(), e);
|
||||
}
|
||||
fileSegment.destroyFile();
|
||||
+ if (!fileSegment.exists()) {
|
||||
+ tieredMetadataStore.deleteFileSegment(filePath, fileType, fileSegment.getBaseOffset());
|
||||
+ }
|
||||
}
|
||||
fileSegmentList.clear();
|
||||
needCommitFileSegmentList.clear();
|
||||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
|
||||
index 7c744af3b..087ea8c9c 100644
|
||||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
|
||||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
|
||||
@@ -136,15 +136,13 @@ public class TieredFlatFileManager {
|
||||
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
|
||||
for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) {
|
||||
TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> {
|
||||
- flatFile.getCompositeFlatFileLock().lock();
|
||||
try {
|
||||
+ flatFile.getCompositeFlatFileLock().lock();
|
||||
flatFile.cleanExpiredFile(expiredTimeStamp);
|
||||
flatFile.destroyExpiredFile();
|
||||
- if (flatFile.getConsumeQueueBaseOffset() == -1) {
|
||||
- logger.info("Clean flatFile because file not initialized, topic={}, queueId={}",
|
||||
- flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId());
|
||||
- destroyCompositeFile(flatFile.getMessageQueue());
|
||||
- }
|
||||
+ } catch (Throwable t) {
|
||||
+ logger.error("Do Clean expired file error, topic={}, queueId={}",
|
||||
+ flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), t);
|
||||
} finally {
|
||||
flatFile.getCompositeFlatFileLock().unlock();
|
||||
}
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 4a8e0d5b851d1f9573cda79b7d2e42ee498809da Mon Sep 17 00:00:00 2001
|
||||
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
|
||||
Date: Wed, 13 Sep 2023 16:08:03 +0800
|
||||
Subject: [PATCH 3/5] [ISSUE #7351] Allow mqadmin to operate slave nodes
|
||||
|
||||
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
|
||||
---
|
||||
.../processor/AdminBrokerProcessor.java | 12 --
|
||||
.../processor/AdminBrokerProcessorTest.java | 106 ------------------
|
||||
2 files changed, 118 deletions(-)
|
||||
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||||
index 8fbcd3c94..9e48431be 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||||
@@ -406,9 +406,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||||
private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
|
||||
RemotingCommand request) throws RemotingCommandException {
|
||||
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
|
||||
- if (validateSlave(response)) {
|
||||
- return response;
|
||||
- }
|
||||
final CreateTopicRequestHeader requestHeader =
|
||||
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
|
||||
|
||||
@@ -519,9 +516,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||||
private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
|
||||
RemotingCommand request) throws RemotingCommandException {
|
||||
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
|
||||
- if (validateSlave(response)) {
|
||||
- return response;
|
||||
- }
|
||||
DeleteTopicRequestHeader requestHeader =
|
||||
(DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
|
||||
|
||||
@@ -1413,9 +1407,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||||
private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request)
|
||||
throws RemotingCommandException {
|
||||
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
|
||||
- if (validateSlave(response)) {
|
||||
- return response;
|
||||
- }
|
||||
|
||||
LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroup called by {}",
|
||||
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
|
||||
@@ -1480,9 +1471,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||||
private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx,
|
||||
RemotingCommand request) throws RemotingCommandException {
|
||||
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
|
||||
- if (validateSlave(response)) {
|
||||
- return response;
|
||||
- }
|
||||
DeleteSubscriptionGroupRequestHeader requestHeader =
|
||||
(DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
|
||||
|
||||
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
||||
index 9d17011b6..ec252cece 100644
|
||||
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
||||
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
||||
@@ -76,7 +76,6 @@ import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi
|
||||
import org.apache.rocketmq.store.DefaultMessageStore;
|
||||
import org.apache.rocketmq.store.MessageStore;
|
||||
import org.apache.rocketmq.store.SelectMappedBufferResult;
|
||||
-import org.apache.rocketmq.store.config.BrokerRole;
|
||||
import org.apache.rocketmq.store.config.MessageStoreConfig;
|
||||
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
|
||||
import org.apache.rocketmq.store.stats.BrokerStats;
|
||||
@@ -250,32 +249,6 @@ public class AdminBrokerProcessorTest {
|
||||
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
|
||||
}
|
||||
|
||||
- @Test
|
||||
- public void testUpdateAndCreateTopicOnSlaveInRocksdb() throws Exception {
|
||||
- if (notToBeExecuted()) {
|
||||
- return;
|
||||
- }
|
||||
- initRocksdbTopicManager();
|
||||
- testUpdateAndCreateTopicOnSlave();
|
||||
- }
|
||||
-
|
||||
- @Test
|
||||
- public void testUpdateAndCreateTopicOnSlave() throws Exception {
|
||||
- // setup
|
||||
- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
|
||||
- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
|
||||
- defaultMessageStore = mock(DefaultMessageStore.class);
|
||||
- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
|
||||
-
|
||||
- // test on slave
|
||||
- String topic = "TEST_CREATE_TOPIC";
|
||||
- RemotingCommand request = buildCreateTopicRequest(topic);
|
||||
- RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
|
||||
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
|
||||
- assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " +
|
||||
- "please execute it from master broker.");
|
||||
- }
|
||||
-
|
||||
@Test
|
||||
public void testDeleteTopicInRocksdb() throws Exception {
|
||||
if (notToBeExecuted()) {
|
||||
@@ -301,31 +274,6 @@ public class AdminBrokerProcessorTest {
|
||||
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
|
||||
}
|
||||
|
||||
- @Test
|
||||
- public void testDeleteTopicOnSlaveInRocksdb() throws Exception {
|
||||
- if (notToBeExecuted()) {
|
||||
- return;
|
||||
- }
|
||||
- initRocksdbTopicManager();
|
||||
- testDeleteTopicOnSlave();
|
||||
- }
|
||||
-
|
||||
- @Test
|
||||
- public void testDeleteTopicOnSlave() throws Exception {
|
||||
- // setup
|
||||
- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
|
||||
- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
|
||||
- defaultMessageStore = mock(DefaultMessageStore.class);
|
||||
- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
|
||||
-
|
||||
- String topic = "TEST_DELETE_TOPIC";
|
||||
- RemotingCommand request = buildDeleteTopicRequest(topic);
|
||||
- RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
|
||||
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
|
||||
- assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " +
|
||||
- "please execute it from master broker.");
|
||||
- }
|
||||
-
|
||||
@Test
|
||||
public void testDeleteWithPopRetryTopic() throws Exception {
|
||||
String topic = "topicA";
|
||||
@@ -538,36 +486,6 @@ public class AdminBrokerProcessorTest {
|
||||
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
|
||||
}
|
||||
|
||||
- @Test
|
||||
- public void testUpdateAndCreateSubscriptionGroupOnSlaveInRocksdb() throws Exception {
|
||||
- initRocksdbSubscriptionManager();
|
||||
- testUpdateAndCreateSubscriptionGroupOnSlave();
|
||||
- }
|
||||
-
|
||||
- @Test
|
||||
- public void testUpdateAndCreateSubscriptionGroupOnSlave() throws RemotingCommandException {
|
||||
- // Setup
|
||||
- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
|
||||
- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
|
||||
- defaultMessageStore = mock(DefaultMessageStore.class);
|
||||
- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
|
||||
-
|
||||
- // Test
|
||||
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
|
||||
- SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
|
||||
- subscriptionGroupConfig.setBrokerId(1);
|
||||
- subscriptionGroupConfig.setGroupName("groupId");
|
||||
- subscriptionGroupConfig.setConsumeEnable(Boolean.TRUE);
|
||||
- subscriptionGroupConfig.setConsumeBroadcastEnable(Boolean.TRUE);
|
||||
- subscriptionGroupConfig.setRetryMaxTimes(111);
|
||||
- subscriptionGroupConfig.setConsumeFromMinEnable(Boolean.TRUE);
|
||||
- request.setBody(JSON.toJSON(subscriptionGroupConfig).toString().getBytes());
|
||||
- RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
|
||||
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
|
||||
- assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " +
|
||||
- "please execute it from master broker.");
|
||||
- }
|
||||
-
|
||||
@Test
|
||||
public void testGetAllSubscriptionGroupInRocksdb() throws Exception {
|
||||
initRocksdbSubscriptionManager();
|
||||
@@ -596,30 +514,6 @@ public class AdminBrokerProcessorTest {
|
||||
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
|
||||
}
|
||||
|
||||
- @Test
|
||||
- public void testDeleteSubscriptionGroupOnSlaveInRocksdb() throws Exception {
|
||||
- initRocksdbSubscriptionManager();
|
||||
- testDeleteSubscriptionGroupOnSlave();
|
||||
- }
|
||||
-
|
||||
- @Test
|
||||
- public void testDeleteSubscriptionGroupOnSlave() throws RemotingCommandException {
|
||||
- // Setup
|
||||
- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
|
||||
- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
|
||||
- defaultMessageStore = mock(DefaultMessageStore.class);
|
||||
- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
|
||||
-
|
||||
- // Test
|
||||
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, null);
|
||||
- request.addExtField("groupName", "GID-Group-Name");
|
||||
- request.addExtField("removeOffset", "true");
|
||||
- RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
|
||||
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
|
||||
- assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " +
|
||||
- "please execute it from master broker.");
|
||||
- }
|
||||
-
|
||||
@Test
|
||||
public void testGetTopicStatsInfo() throws RemotingCommandException {
|
||||
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, null);
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 831fcc76cd7cd362bb6c136c287c624bb7eaf40a Mon Sep 17 00:00:00 2001
|
||||
From: lizhimins <707364882@qq.com>
|
||||
Date: Tue, 19 Sep 2023 10:04:04 +0800
|
||||
Subject: [PATCH 4/5] [ISSUE #7363] Fix get message from tiered storage return
|
||||
incorrect next pull offset (#7365)
|
||||
|
||||
---
|
||||
.../tieredstore/TieredMessageFetcher.java | 2 +-
|
||||
.../tieredstore/TieredMessageStore.java | 29 ++++++++++---------
|
||||
.../tieredstore/TieredMessageStoreTest.java | 5 ++--
|
||||
3 files changed, 20 insertions(+), 16 deletions(-)
|
||||
|
||||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
||||
index 766ff64f6..c948fa3fa 100644
|
||||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
||||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
||||
@@ -319,7 +319,7 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
||||
}
|
||||
|
||||
// if cache is miss, immediately pull messages
|
||||
- LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " +
|
||||
+ LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " +
|
||||
"topic: {}, queue: {}, queue offset: {}, max message num: {}",
|
||||
mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
|
||||
|
||||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
||||
index 9fb1b2f01..d7d13d61e 100644
|
||||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
||||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
||||
@@ -147,6 +147,11 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
||||
public CompletableFuture<GetMessageResult> getMessageAsync(String group, String topic,
|
||||
int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) {
|
||||
|
||||
+ // For system topic, force reading from local store
|
||||
+ if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) {
|
||||
+ return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
|
||||
+ }
|
||||
+
|
||||
if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
|
||||
logger.trace("GetMessageAsync from current store, topic: {}, queue: {}, offset: {}", topic, queueId, offset);
|
||||
} else {
|
||||
@@ -158,6 +163,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
||||
return fetcher
|
||||
.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter)
|
||||
.thenApply(result -> {
|
||||
+
|
||||
Attributes latencyAttributes = TieredStoreMetricsManager.newAttributesBuilder()
|
||||
.put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_MESSAGE)
|
||||
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
|
||||
@@ -166,8 +172,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
||||
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
|
||||
|
||||
if (result.getStatus() == GetMessageStatus.OFFSET_FOUND_NULL ||
|
||||
- result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE ||
|
||||
- result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
|
||||
+ result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
|
||||
|
||||
if (next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
|
||||
TieredStoreMetricsManager.fallbackTotal.add(1, latencyAttributes);
|
||||
@@ -178,14 +183,8 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
||||
}
|
||||
}
|
||||
|
||||
- // Fetch system topic data from the broker when using the force level.
|
||||
- if (result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
|
||||
- if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) {
|
||||
- return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
|
||||
- }
|
||||
- }
|
||||
-
|
||||
if (result.getStatus() != GetMessageStatus.FOUND &&
|
||||
+ result.getStatus() != GetMessageStatus.NO_MATCHED_LOGIC_QUEUE &&
|
||||
result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_ONE &&
|
||||
result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
|
||||
logger.warn("GetMessageAsync not found and message is not in next store, result: {}, " +
|
||||
@@ -206,10 +205,14 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
||||
if (minOffsetInQueue >= 0 && minOffsetInQueue < result.getMinOffset()) {
|
||||
result.setMinOffset(minOffsetInQueue);
|
||||
}
|
||||
- long maxOffsetInQueue = next.getMaxOffsetInQueue(topic, queueId);
|
||||
- if (maxOffsetInQueue >= 0 && maxOffsetInQueue > result.getMaxOffset()) {
|
||||
- result.setMaxOffset(maxOffsetInQueue);
|
||||
- }
|
||||
+
|
||||
+ // In general, the local cq offset is slightly greater than the commit offset in read message,
|
||||
+ // so there is no need to update the maximum offset to the local cq offset here,
|
||||
+ // otherwise it will cause repeated consumption after next begin offset over commit offset.
|
||||
+
|
||||
+ logger.trace("GetMessageAsync result, group: {}, topic: {}, queueId: {}, offset: {}, count:{}, {}",
|
||||
+ group, topic, queueId, offset, maxMsgNums, result);
|
||||
+
|
||||
return result;
|
||||
}).exceptionally(e -> {
|
||||
logger.error("GetMessageAsync from tiered store failed", e);
|
||||
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
|
||||
index 2451199c2..07af1fc8b 100644
|
||||
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
|
||||
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
|
||||
@@ -168,7 +168,7 @@ public class TieredMessageStoreTest {
|
||||
GetMessageResult result1 = new GetMessageResult();
|
||||
result1.setStatus(GetMessageStatus.FOUND);
|
||||
GetMessageResult result2 = new GetMessageResult();
|
||||
- result2.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
|
||||
+ result2.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
|
||||
|
||||
when(fetcher.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(CompletableFuture.completedFuture(result1));
|
||||
when(nextStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(result2);
|
||||
@@ -188,7 +188,8 @@ public class TieredMessageStoreTest {
|
||||
properties.setProperty("tieredStorageLevel", "3");
|
||||
configuration.update(properties);
|
||||
when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true);
|
||||
- Assert.assertSame(result2, store.getMessage("group", mq.getTopic(), mq.getQueueId(), 0, 0, null));
|
||||
+ Assert.assertEquals(result2.getStatus(),
|
||||
+ store.getMessage("group", mq.getTopic(), mq.getQueueId(), 0, 0, null).getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From f05a8da760dfade411ad56ef874f477988479cf9 Mon Sep 17 00:00:00 2001
|
||||
From: rongtong <jinrongtong5@163.com>
|
||||
Date: Wed, 20 Sep 2023 15:06:21 +0800
|
||||
Subject: [PATCH 5/5] Print admin queue watermark in log (#7372)
|
||||
|
||||
---
|
||||
.../main/java/org/apache/rocketmq/broker/BrokerController.java | 1 +
|
||||
1 file changed, 1 insertion(+)
|
||||
|
||||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
||||
index 13a3feb4e..53e2e1b62 100644
|
||||
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
||||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
||||
@@ -1182,6 +1182,7 @@ public class BrokerController {
|
||||
LOG_WATER_MARK.info("[WATERMARK] ClientManager Queue Size: {} SlowTimeMills: {}", this.clientManagerThreadPoolQueue.size(), this.headSlowTimeMills(this.clientManagerThreadPoolQueue));
|
||||
LOG_WATER_MARK.info("[WATERMARK] Heartbeat Queue Size: {} SlowTimeMills: {}", this.heartbeatThreadPoolQueue.size(), this.headSlowTimeMills(this.heartbeatThreadPoolQueue));
|
||||
LOG_WATER_MARK.info("[WATERMARK] Ack Queue Size: {} SlowTimeMills: {}", this.ackThreadPoolQueue.size(), headSlowTimeMills(this.ackThreadPoolQueue));
|
||||
+ LOG_WATER_MARK.info("[WATERMARK] Admin Queue Size: {} SlowTimeMills: {}", this.adminBrokerThreadPoolQueue.size(), headSlowTimeMills(this.adminBrokerThreadPoolQueue));
|
||||
}
|
||||
|
||||
public MessageStore getMessageStore() {
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
Summary: Cloud-Native, Distributed Messaging and Streaming
|
||||
Name: rocketmq
|
||||
Version: 5.1.3
|
||||
Release: 18
|
||||
Release: 19
|
||||
License: Apache-2.0
|
||||
Group: Applications/Message
|
||||
URL: https://rocketmq.apache.org/
|
||||
@ -27,6 +27,7 @@ Patch0014: patch014-backport-Queue-Selection-Strategy-Optimization.patch
|
||||
Patch0015: patch015-backport-fix-some-bugs.patch
|
||||
Patch0016: patch016-backport-Optimize-fault-tolerant-mechanism.patch
|
||||
Patch0017: patch017-backport-Convergent-thread-pool-creation.patch
|
||||
Patch0018: patch018-backport-enhancement-of-tiered-storage.patch
|
||||
BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git
|
||||
Requires: java-1.8.0-openjdk-devel
|
||||
|
||||
@ -61,6 +62,9 @@ exit 0
|
||||
|
||||
|
||||
%changelog
|
||||
* Mon Nov 20 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-19
|
||||
- backport some enhancement of tiered storage
|
||||
|
||||
* Mon Nov 20 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-18
|
||||
- backport-Convergent-thread-pool-creation
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user