602 lines
31 KiB
Diff
602 lines
31 KiB
Diff
From 1a8e7cb17cb29ed33b0196b52e452a6e76ade781 Mon Sep 17 00:00:00 2001
|
|
From: yuz10 <845238369@qq.com>
|
|
Date: Tue, 12 Sep 2023 19:33:41 +0800
|
|
Subject: [PATCH 1/5] [ISSUE #7345] Fix wrong result of searchOffset in tiered
|
|
storage
|
|
|
|
---
|
|
.../tieredstore/file/TieredFlatFile.java | 5 +-
|
|
.../tieredstore/file/TieredFlatFileTest.java | 46 +++++++++++++++++--
|
|
2 files changed, 46 insertions(+), 5 deletions(-)
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
|
index 426c4e09d..d973179ee 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
|
@@ -365,7 +365,10 @@ public class TieredFlatFile {
|
|
if (!segmentList.isEmpty()) {
|
|
return boundaryType == BoundaryType.UPPER ? segmentList.get(0) : segmentList.get(segmentList.size() - 1);
|
|
}
|
|
- return fileSegmentList.isEmpty() ? null : fileSegmentList.get(fileSegmentList.size() - 1);
|
|
+ if (fileSegmentList.isEmpty()) {
|
|
+ return null;
|
|
+ }
|
|
+ return boundaryType == BoundaryType.UPPER ? fileSegmentList.get(fileSegmentList.size() - 1) : fileSegmentList.get(0);
|
|
} finally {
|
|
fileSegmentLock.readLock().unlock();
|
|
}
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
|
|
index 7a4d05969..7e2fbf201 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
|
|
@@ -16,10 +16,7 @@
|
|
*/
|
|
package org.apache.rocketmq.tieredstore.file;
|
|
|
|
-import java.io.IOException;
|
|
-import java.nio.ByteBuffer;
|
|
-import java.util.ArrayList;
|
|
-import java.util.List;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.common.message.MessageQueue;
|
|
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
|
|
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
|
@@ -35,6 +32,11 @@ import org.junit.Assert;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
|
|
+import java.io.IOException;
|
|
+import java.nio.ByteBuffer;
|
|
+import java.util.ArrayList;
|
|
+import java.util.List;
|
|
+
|
|
public class TieredFlatFileTest {
|
|
|
|
private final String storePath = TieredStoreTestUtil.getRandomStorePath();
|
|
@@ -301,4 +303,40 @@ public class TieredFlatFileTest {
|
|
fileQueue.rollingNewFile();
|
|
Assert.assertEquals(2, fileQueue.getFileSegmentCount());
|
|
}
|
|
+
|
|
+ @Test
|
|
+ public void testGetFileByTime() {
|
|
+ String filePath = TieredStoreUtil.toPath(queue);
|
|
+ TieredFlatFile tieredFlatFile = fileQueueFactory.createFlatFileForCommitLog(filePath);
|
|
+ TieredFileSegment fileSegment1 = new MemoryFileSegment(FileSegmentType.CONSUME_QUEUE, queue, 1100, storeConfig);
|
|
+ fileSegment1.setMinTimestamp(100);
|
|
+ fileSegment1.setMaxTimestamp(200);
|
|
+
|
|
+ TieredFileSegment fileSegment2 = new MemoryFileSegment(FileSegmentType.CONSUME_QUEUE, queue, 1100, storeConfig);
|
|
+ fileSegment2.setMinTimestamp(200);
|
|
+ fileSegment2.setMaxTimestamp(300);
|
|
+
|
|
+ tieredFlatFile.getFileSegmentList().add(fileSegment1);
|
|
+ tieredFlatFile.getFileSegmentList().add(fileSegment2);
|
|
+
|
|
+ TieredFileSegment segmentUpper = tieredFlatFile.getFileByTime(400, BoundaryType.UPPER);
|
|
+ Assert.assertEquals(fileSegment2, segmentUpper);
|
|
+
|
|
+ TieredFileSegment segmentLower = tieredFlatFile.getFileByTime(400, BoundaryType.LOWER);
|
|
+ Assert.assertEquals(fileSegment2, segmentLower);
|
|
+
|
|
+
|
|
+ TieredFileSegment segmentUpper2 = tieredFlatFile.getFileByTime(0, BoundaryType.UPPER);
|
|
+ Assert.assertEquals(fileSegment1, segmentUpper2);
|
|
+
|
|
+ TieredFileSegment segmentLower2 = tieredFlatFile.getFileByTime(0, BoundaryType.LOWER);
|
|
+ Assert.assertEquals(fileSegment1, segmentLower2);
|
|
+
|
|
+
|
|
+ TieredFileSegment segmentUpper3 = tieredFlatFile.getFileByTime(200, BoundaryType.UPPER);
|
|
+ Assert.assertEquals(fileSegment1, segmentUpper3);
|
|
+
|
|
+ TieredFileSegment segmentLower3 = tieredFlatFile.getFileByTime(200, BoundaryType.LOWER);
|
|
+ Assert.assertEquals(fileSegment2, segmentLower3);
|
|
+ }
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From fd32dae2ab59f86dd215eeec405bf4fa6212bcb3 Mon Sep 17 00:00:00 2001
|
|
From: lizhimins <707364882@qq.com>
|
|
Date: Tue, 12 Sep 2023 19:58:08 +0800
|
|
Subject: [PATCH 2/5] [ISSUE #6633] Not clear uninitialized files and fix
|
|
metadata recover (#7342)
|
|
|
|
---
|
|
.../tieredstore/file/TieredFlatFile.java | 53 +++++++------------
|
|
.../file/TieredFlatFileManager.java | 10 ++--
|
|
2 files changed, 22 insertions(+), 41 deletions(-)
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
|
index d973179ee..d96eb6e8f 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
|
@@ -16,7 +16,6 @@
|
|
*/
|
|
package org.apache.rocketmq.tieredstore.file;
|
|
|
|
-import com.alibaba.fastjson.JSON;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.ArrayList;
|
|
@@ -25,13 +24,13 @@ import java.util.Comparator;
|
|
import java.util.HashSet;
|
|
import java.util.LinkedList;
|
|
import java.util.List;
|
|
-import java.util.Objects;
|
|
import java.util.Set;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
import java.util.stream.Collectors;
|
|
import javax.annotation.Nullable;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.tieredstore.common.AppendResult;
|
|
@@ -43,7 +42,6 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
|
|
import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator;
|
|
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
|
|
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
-import org.apache.rocketmq.common.BoundaryType;
|
|
|
|
public class TieredFlatFile {
|
|
|
|
@@ -177,7 +175,10 @@ public class TieredFlatFile {
|
|
}
|
|
}
|
|
|
|
- private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) {
|
|
+ /**
|
|
+ * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full
|
|
+ */
|
|
+ public void updateFileSegment(TieredFileSegment fileSegment) {
|
|
|
|
FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
|
|
this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
|
|
@@ -186,45 +187,24 @@ public class TieredFlatFile {
|
|
if (metadata == null) {
|
|
metadata = new FileSegmentMetadata(
|
|
this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType());
|
|
- metadata.setCreateTimestamp(fileSegment.getMinTimestamp());
|
|
- metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
|
|
- metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
|
|
- if (fileSegment.isClosed()) {
|
|
- metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
|
|
- }
|
|
- this.tieredMetadataStore.updateFileSegment(metadata);
|
|
+ metadata.setCreateTimestamp(System.currentTimeMillis());
|
|
}
|
|
- return metadata;
|
|
- }
|
|
-
|
|
- /**
|
|
- * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full
|
|
- */
|
|
- public void updateFileSegment(TieredFileSegment fileSegment) {
|
|
- FileSegmentMetadata segmentMetadata = getOrCreateFileSegmentMetadata(fileSegment);
|
|
|
|
- if (segmentMetadata.getStatus() == FileSegmentMetadata.STATUS_NEW
|
|
- && fileSegment.isFull()
|
|
- && !fileSegment.needCommit()) {
|
|
+ metadata.setSize(fileSegment.getCommitPosition());
|
|
+ metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
|
|
+ metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
|
|
|
|
- segmentMetadata.markSealed();
|
|
+ if (fileSegment.isFull() && !fileSegment.needCommit()) {
|
|
+ if (metadata.getStatus() == FileSegmentMetadata.STATUS_NEW) {
|
|
+ metadata.markSealed();
|
|
+ }
|
|
}
|
|
|
|
if (fileSegment.isClosed()) {
|
|
- segmentMetadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
|
|
+ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
|
|
}
|
|
|
|
- segmentMetadata.setSize(fileSegment.getCommitPosition());
|
|
- segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp());
|
|
-
|
|
- FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
|
|
- this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
|
|
-
|
|
- if (!Objects.equals(metadata, segmentMetadata)) {
|
|
- this.tieredMetadataStore.updateFileSegment(segmentMetadata);
|
|
- logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, content: {}",
|
|
- segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata));
|
|
- }
|
|
+ this.tieredMetadataStore.updateFileSegment(metadata);
|
|
}
|
|
|
|
private void checkAndFixFileSize() {
|
|
@@ -598,6 +578,9 @@ public class TieredFlatFile {
|
|
logger.error("TieredFlatFile#destroy: mark file segment: {} is deleted failed", fileSegment.getPath(), e);
|
|
}
|
|
fileSegment.destroyFile();
|
|
+ if (!fileSegment.exists()) {
|
|
+ tieredMetadataStore.deleteFileSegment(filePath, fileType, fileSegment.getBaseOffset());
|
|
+ }
|
|
}
|
|
fileSegmentList.clear();
|
|
needCommitFileSegmentList.clear();
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
|
|
index 7c744af3b..087ea8c9c 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
|
|
@@ -136,15 +136,13 @@ public class TieredFlatFileManager {
|
|
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
|
|
for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) {
|
|
TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> {
|
|
- flatFile.getCompositeFlatFileLock().lock();
|
|
try {
|
|
+ flatFile.getCompositeFlatFileLock().lock();
|
|
flatFile.cleanExpiredFile(expiredTimeStamp);
|
|
flatFile.destroyExpiredFile();
|
|
- if (flatFile.getConsumeQueueBaseOffset() == -1) {
|
|
- logger.info("Clean flatFile because file not initialized, topic={}, queueId={}",
|
|
- flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId());
|
|
- destroyCompositeFile(flatFile.getMessageQueue());
|
|
- }
|
|
+ } catch (Throwable t) {
|
|
+ logger.error("Do Clean expired file error, topic={}, queueId={}",
|
|
+ flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), t);
|
|
} finally {
|
|
flatFile.getCompositeFlatFileLock().unlock();
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 4a8e0d5b851d1f9573cda79b7d2e42ee498809da Mon Sep 17 00:00:00 2001
|
|
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
|
|
Date: Wed, 13 Sep 2023 16:08:03 +0800
|
|
Subject: [PATCH 3/5] [ISSUE #7351] Allow mqadmin to operate slave nodes
|
|
|
|
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
|
|
---
|
|
.../processor/AdminBrokerProcessor.java | 12 --
|
|
.../processor/AdminBrokerProcessorTest.java | 106 ------------------
|
|
2 files changed, 118 deletions(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
|
index 8fbcd3c94..9e48431be 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
|
@@ -406,9 +406,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
|
private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
|
|
RemotingCommand request) throws RemotingCommandException {
|
|
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
|
|
- if (validateSlave(response)) {
|
|
- return response;
|
|
- }
|
|
final CreateTopicRequestHeader requestHeader =
|
|
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
|
|
|
|
@@ -519,9 +516,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
|
private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
|
|
RemotingCommand request) throws RemotingCommandException {
|
|
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
|
|
- if (validateSlave(response)) {
|
|
- return response;
|
|
- }
|
|
DeleteTopicRequestHeader requestHeader =
|
|
(DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
|
|
|
|
@@ -1413,9 +1407,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
|
private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request)
|
|
throws RemotingCommandException {
|
|
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
|
|
- if (validateSlave(response)) {
|
|
- return response;
|
|
- }
|
|
|
|
LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroup called by {}",
|
|
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
|
|
@@ -1480,9 +1471,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
|
private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx,
|
|
RemotingCommand request) throws RemotingCommandException {
|
|
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
|
|
- if (validateSlave(response)) {
|
|
- return response;
|
|
- }
|
|
DeleteSubscriptionGroupRequestHeader requestHeader =
|
|
(DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
|
|
|
|
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
|
index 9d17011b6..ec252cece 100644
|
|
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
|
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
|
@@ -76,7 +76,6 @@ import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi
|
|
import org.apache.rocketmq.store.DefaultMessageStore;
|
|
import org.apache.rocketmq.store.MessageStore;
|
|
import org.apache.rocketmq.store.SelectMappedBufferResult;
|
|
-import org.apache.rocketmq.store.config.BrokerRole;
|
|
import org.apache.rocketmq.store.config.MessageStoreConfig;
|
|
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
|
|
import org.apache.rocketmq.store.stats.BrokerStats;
|
|
@@ -250,32 +249,6 @@ public class AdminBrokerProcessorTest {
|
|
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
|
|
}
|
|
|
|
- @Test
|
|
- public void testUpdateAndCreateTopicOnSlaveInRocksdb() throws Exception {
|
|
- if (notToBeExecuted()) {
|
|
- return;
|
|
- }
|
|
- initRocksdbTopicManager();
|
|
- testUpdateAndCreateTopicOnSlave();
|
|
- }
|
|
-
|
|
- @Test
|
|
- public void testUpdateAndCreateTopicOnSlave() throws Exception {
|
|
- // setup
|
|
- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
|
|
- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
|
|
- defaultMessageStore = mock(DefaultMessageStore.class);
|
|
- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
|
|
-
|
|
- // test on slave
|
|
- String topic = "TEST_CREATE_TOPIC";
|
|
- RemotingCommand request = buildCreateTopicRequest(topic);
|
|
- RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
|
|
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
|
|
- assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " +
|
|
- "please execute it from master broker.");
|
|
- }
|
|
-
|
|
@Test
|
|
public void testDeleteTopicInRocksdb() throws Exception {
|
|
if (notToBeExecuted()) {
|
|
@@ -301,31 +274,6 @@ public class AdminBrokerProcessorTest {
|
|
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
|
|
}
|
|
|
|
- @Test
|
|
- public void testDeleteTopicOnSlaveInRocksdb() throws Exception {
|
|
- if (notToBeExecuted()) {
|
|
- return;
|
|
- }
|
|
- initRocksdbTopicManager();
|
|
- testDeleteTopicOnSlave();
|
|
- }
|
|
-
|
|
- @Test
|
|
- public void testDeleteTopicOnSlave() throws Exception {
|
|
- // setup
|
|
- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
|
|
- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
|
|
- defaultMessageStore = mock(DefaultMessageStore.class);
|
|
- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
|
|
-
|
|
- String topic = "TEST_DELETE_TOPIC";
|
|
- RemotingCommand request = buildDeleteTopicRequest(topic);
|
|
- RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
|
|
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
|
|
- assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " +
|
|
- "please execute it from master broker.");
|
|
- }
|
|
-
|
|
@Test
|
|
public void testDeleteWithPopRetryTopic() throws Exception {
|
|
String topic = "topicA";
|
|
@@ -538,36 +486,6 @@ public class AdminBrokerProcessorTest {
|
|
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
|
|
}
|
|
|
|
- @Test
|
|
- public void testUpdateAndCreateSubscriptionGroupOnSlaveInRocksdb() throws Exception {
|
|
- initRocksdbSubscriptionManager();
|
|
- testUpdateAndCreateSubscriptionGroupOnSlave();
|
|
- }
|
|
-
|
|
- @Test
|
|
- public void testUpdateAndCreateSubscriptionGroupOnSlave() throws RemotingCommandException {
|
|
- // Setup
|
|
- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
|
|
- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
|
|
- defaultMessageStore = mock(DefaultMessageStore.class);
|
|
- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
|
|
-
|
|
- // Test
|
|
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
|
|
- SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
|
|
- subscriptionGroupConfig.setBrokerId(1);
|
|
- subscriptionGroupConfig.setGroupName("groupId");
|
|
- subscriptionGroupConfig.setConsumeEnable(Boolean.TRUE);
|
|
- subscriptionGroupConfig.setConsumeBroadcastEnable(Boolean.TRUE);
|
|
- subscriptionGroupConfig.setRetryMaxTimes(111);
|
|
- subscriptionGroupConfig.setConsumeFromMinEnable(Boolean.TRUE);
|
|
- request.setBody(JSON.toJSON(subscriptionGroupConfig).toString().getBytes());
|
|
- RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
|
|
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
|
|
- assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " +
|
|
- "please execute it from master broker.");
|
|
- }
|
|
-
|
|
@Test
|
|
public void testGetAllSubscriptionGroupInRocksdb() throws Exception {
|
|
initRocksdbSubscriptionManager();
|
|
@@ -596,30 +514,6 @@ public class AdminBrokerProcessorTest {
|
|
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
|
|
}
|
|
|
|
- @Test
|
|
- public void testDeleteSubscriptionGroupOnSlaveInRocksdb() throws Exception {
|
|
- initRocksdbSubscriptionManager();
|
|
- testDeleteSubscriptionGroupOnSlave();
|
|
- }
|
|
-
|
|
- @Test
|
|
- public void testDeleteSubscriptionGroupOnSlave() throws RemotingCommandException {
|
|
- // Setup
|
|
- MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
|
|
- when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
|
|
- defaultMessageStore = mock(DefaultMessageStore.class);
|
|
- when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
|
|
-
|
|
- // Test
|
|
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, null);
|
|
- request.addExtField("groupName", "GID-Group-Name");
|
|
- request.addExtField("removeOffset", "true");
|
|
- RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
|
|
- assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
|
|
- assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " +
|
|
- "please execute it from master broker.");
|
|
- }
|
|
-
|
|
@Test
|
|
public void testGetTopicStatsInfo() throws RemotingCommandException {
|
|
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, null);
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 831fcc76cd7cd362bb6c136c287c624bb7eaf40a Mon Sep 17 00:00:00 2001
|
|
From: lizhimins <707364882@qq.com>
|
|
Date: Tue, 19 Sep 2023 10:04:04 +0800
|
|
Subject: [PATCH 4/5] [ISSUE #7363] Fix get message from tiered storage return
|
|
incorrect next pull offset (#7365)
|
|
|
|
---
|
|
.../tieredstore/TieredMessageFetcher.java | 2 +-
|
|
.../tieredstore/TieredMessageStore.java | 29 ++++++++++---------
|
|
.../tieredstore/TieredMessageStoreTest.java | 5 ++--
|
|
3 files changed, 20 insertions(+), 16 deletions(-)
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
|
index 766ff64f6..c948fa3fa 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
|
@@ -319,7 +319,7 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
}
|
|
|
|
// if cache is miss, immediately pull messages
|
|
- LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " +
|
|
+ LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " +
|
|
"topic: {}, queue: {}, queue offset: {}, max message num: {}",
|
|
mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
index 9fb1b2f01..d7d13d61e 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
@@ -147,6 +147,11 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
public CompletableFuture<GetMessageResult> getMessageAsync(String group, String topic,
|
|
int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) {
|
|
|
|
+ // For system topic, force reading from local store
|
|
+ if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) {
|
|
+ return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
|
|
+ }
|
|
+
|
|
if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
|
|
logger.trace("GetMessageAsync from current store, topic: {}, queue: {}, offset: {}", topic, queueId, offset);
|
|
} else {
|
|
@@ -158,6 +163,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
return fetcher
|
|
.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter)
|
|
.thenApply(result -> {
|
|
+
|
|
Attributes latencyAttributes = TieredStoreMetricsManager.newAttributesBuilder()
|
|
.put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_MESSAGE)
|
|
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
|
|
@@ -166,8 +172,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
|
|
|
|
if (result.getStatus() == GetMessageStatus.OFFSET_FOUND_NULL ||
|
|
- result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE ||
|
|
- result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
|
|
+ result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
|
|
|
|
if (next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
|
|
TieredStoreMetricsManager.fallbackTotal.add(1, latencyAttributes);
|
|
@@ -178,14 +183,8 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
}
|
|
}
|
|
|
|
- // Fetch system topic data from the broker when using the force level.
|
|
- if (result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
|
|
- if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) {
|
|
- return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
|
|
- }
|
|
- }
|
|
-
|
|
if (result.getStatus() != GetMessageStatus.FOUND &&
|
|
+ result.getStatus() != GetMessageStatus.NO_MATCHED_LOGIC_QUEUE &&
|
|
result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_ONE &&
|
|
result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
|
|
logger.warn("GetMessageAsync not found and message is not in next store, result: {}, " +
|
|
@@ -206,10 +205,14 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
if (minOffsetInQueue >= 0 && minOffsetInQueue < result.getMinOffset()) {
|
|
result.setMinOffset(minOffsetInQueue);
|
|
}
|
|
- long maxOffsetInQueue = next.getMaxOffsetInQueue(topic, queueId);
|
|
- if (maxOffsetInQueue >= 0 && maxOffsetInQueue > result.getMaxOffset()) {
|
|
- result.setMaxOffset(maxOffsetInQueue);
|
|
- }
|
|
+
|
|
+ // In general, the local cq offset is slightly greater than the commit offset in read message,
|
|
+ // so there is no need to update the maximum offset to the local cq offset here,
|
|
+ // otherwise it will cause repeated consumption after next begin offset over commit offset.
|
|
+
|
|
+ logger.trace("GetMessageAsync result, group: {}, topic: {}, queueId: {}, offset: {}, count:{}, {}",
|
|
+ group, topic, queueId, offset, maxMsgNums, result);
|
|
+
|
|
return result;
|
|
}).exceptionally(e -> {
|
|
logger.error("GetMessageAsync from tiered store failed", e);
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
|
|
index 2451199c2..07af1fc8b 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
|
|
@@ -168,7 +168,7 @@ public class TieredMessageStoreTest {
|
|
GetMessageResult result1 = new GetMessageResult();
|
|
result1.setStatus(GetMessageStatus.FOUND);
|
|
GetMessageResult result2 = new GetMessageResult();
|
|
- result2.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
|
|
+ result2.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
|
|
|
|
when(fetcher.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(CompletableFuture.completedFuture(result1));
|
|
when(nextStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(result2);
|
|
@@ -188,7 +188,8 @@ public class TieredMessageStoreTest {
|
|
properties.setProperty("tieredStorageLevel", "3");
|
|
configuration.update(properties);
|
|
when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true);
|
|
- Assert.assertSame(result2, store.getMessage("group", mq.getTopic(), mq.getQueueId(), 0, 0, null));
|
|
+ Assert.assertEquals(result2.getStatus(),
|
|
+ store.getMessage("group", mq.getTopic(), mq.getQueueId(), 0, 0, null).getStatus());
|
|
}
|
|
|
|
@Test
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From f05a8da760dfade411ad56ef874f477988479cf9 Mon Sep 17 00:00:00 2001
|
|
From: rongtong <jinrongtong5@163.com>
|
|
Date: Wed, 20 Sep 2023 15:06:21 +0800
|
|
Subject: [PATCH 5/5] Print admin queue watermark in log (#7372)
|
|
|
|
---
|
|
.../main/java/org/apache/rocketmq/broker/BrokerController.java | 1 +
|
|
1 file changed, 1 insertion(+)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
index 13a3feb4e..53e2e1b62 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
@@ -1182,6 +1182,7 @@ public class BrokerController {
|
|
LOG_WATER_MARK.info("[WATERMARK] ClientManager Queue Size: {} SlowTimeMills: {}", this.clientManagerThreadPoolQueue.size(), this.headSlowTimeMills(this.clientManagerThreadPoolQueue));
|
|
LOG_WATER_MARK.info("[WATERMARK] Heartbeat Queue Size: {} SlowTimeMills: {}", this.heartbeatThreadPoolQueue.size(), this.headSlowTimeMills(this.heartbeatThreadPoolQueue));
|
|
LOG_WATER_MARK.info("[WATERMARK] Ack Queue Size: {} SlowTimeMills: {}", this.ackThreadPoolQueue.size(), headSlowTimeMills(this.ackThreadPoolQueue));
|
|
+ LOG_WATER_MARK.info("[WATERMARK] Admin Queue Size: {} SlowTimeMills: {}", this.adminBrokerThreadPoolQueue.size(), headSlowTimeMills(this.adminBrokerThreadPoolQueue));
|
|
}
|
|
|
|
public MessageStore getMessageStore() {
|
|
--
|
|
2.32.0.windows.2
|
|
|