rocketmq/patch008-backport-Allow-BoundaryType.patch
2023-09-20 10:04:36 +08:00

1419 lines
75 KiB
Diff

From a1bf49d5d07cf64374bc3dde5ab43add831433ad Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Tue, 1 Aug 2023 15:56:34 +0800
Subject: [PATCH 1/6] [ISSUE #7093] Avoid dispatch tasks too much cause
dispatch task failed (#7094)
* Avoid dispatch tasks too much cause dispatch task failed
* set schedule task async
---
.../apache/rocketmq/tieredstore/TieredDispatcher.java | 11 ++++++-----
.../tieredstore/common/TieredStoreExecutor.java | 2 +-
2 files changed, 7 insertions(+), 6 deletions(-)
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 523b0c2cd..bb58ea7dd 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -82,7 +82,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() ->
tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> {
if (!flatFile.getCompositeFlatFileLock().isLocked()) {
- dispatchFlatFile(flatFile);
+ dispatchFlatFileAsync(flatFile);
}
}), 30, 10, TimeUnit.SECONDS);
}
@@ -180,10 +180,6 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
message.release();
flatFile.getCompositeFlatFileLock().unlock();
}
- } else {
- if (!flatFile.getCompositeFlatFileLock().isLocked()) {
- this.dispatchFlatFileAsync(flatFile);
- }
}
}
@@ -199,6 +195,11 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
}
public void dispatchFlatFileAsync(CompositeQueueFlatFile flatFile, Consumer<Long> consumer) {
+ // Avoid dispatch tasks too much
+ if (TieredStoreExecutor.dispatchThreadPoolQueue.size() >
+ TieredStoreExecutor.QUEUE_CAPACITY * 0.75) {
+ return;
+ }
TieredStoreExecutor.dispatchExecutor.execute(() -> {
try {
dispatchFlatFile(flatFile);
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
index 23f1b01ea..6eb3478b3 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
@@ -27,7 +27,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl;
public class TieredStoreExecutor {
- private static final int QUEUE_CAPACITY = 10000;
+ public static final int QUEUE_CAPACITY = 10000;
// Visible for monitor
public static BlockingQueue<Runnable> dispatchThreadPoolQueue;
--
2.32.0.windows.2
From ab61183030f4f230619ea539cbd2cb977234208b Mon Sep 17 00:00:00 2001
From: Zhouxiang Zhan <zhouxzhan@apache.org>
Date: Tue, 1 Aug 2023 19:00:15 +0800
Subject: [PATCH 2/6] [ISSUE #7104] Add ReceiptHandleGroupKey for RenewEvent
(#7105)
---
.../proxy/common/ReceiptHandleGroupKey.java | 69 +++++++++++++++++++
.../rocketmq/proxy/common/RenewEvent.java | 9 ++-
.../processor/ReceiptHandleProcessor.java | 55 ++-------------
.../receipt/DefaultReceiptHandleManager.java | 36 +++++-----
.../DefaultReceiptHandleManagerTest.java | 4 +-
5 files changed, 101 insertions(+), 72 deletions(-)
create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java
new file mode 100644
index 000000000..bd28393e5
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import io.netty.channel.Channel;
+
+public class ReceiptHandleGroupKey {
+ protected final Channel channel;
+ protected final String group;
+
+ public ReceiptHandleGroupKey(Channel channel, String group) {
+ this.channel = channel;
+ this.group = group;
+ }
+
+ protected String getChannelId() {
+ return channel.id().asLongText();
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o;
+ return Objects.equal(getChannelId(), key.getChannelId()) && Objects.equal(group, key.group);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(getChannelId(), group);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("channelId", getChannelId())
+ .add("group", group)
+ .toString();
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
index 0ff65c1cc..8d591560a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.consumer.AckResult;
public class RenewEvent {
+ protected ReceiptHandleGroupKey key;
protected MessageReceiptHandle messageReceiptHandle;
protected long renewTime;
protected EventType eventType;
@@ -32,13 +33,19 @@ public class RenewEvent {
CLEAR_GROUP
}
- public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, EventType eventType, CompletableFuture<AckResult> future) {
+ public RenewEvent(ReceiptHandleGroupKey key, MessageReceiptHandle messageReceiptHandle, long renewTime,
+ EventType eventType, CompletableFuture<AckResult> future) {
+ this.key = key;
this.messageReceiptHandle = messageReceiptHandle;
this.renewTime = renewTime;
this.eventType = eventType;
this.future = future;
}
+ public ReceiptHandleGroupKey getKey() {
+ return key;
+ }
+
public MessageReceiptHandle getMessageReceiptHandle() {
return messageReceiptHandle;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 460842a86..5e1be9321 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -17,19 +17,17 @@
package org.apache.rocketmq.proxy.processor;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
import io.netty.channel.Channel;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.state.StateEventListener;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.proxy.common.RenewEvent;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
+import org.apache.rocketmq.proxy.common.RenewEvent;
import org.apache.rocketmq.proxy.service.ServiceManager;
+import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
public class ReceiptHandleProcessor extends AbstractProcessor {
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -38,7 +36,8 @@ public class ReceiptHandleProcessor extends AbstractProcessor {
public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) {
super(messagingProcessor, serviceManager);
StateEventListener<RenewEvent> eventListener = event -> {
- ProxyContext context = createContext(event.getEventType().name());
+ ProxyContext context = createContext(event.getEventType().name())
+ .setChannel(event.getKey().getChannel());
MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle();
ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
@@ -66,50 +65,4 @@ public class ReceiptHandleProcessor extends AbstractProcessor {
return receiptHandleManager.removeReceiptHandle(ctx, channel, group, msgID, receiptHandle);
}
- public static class ReceiptHandleGroupKey {
- protected final Channel channel;
- protected final String group;
-
- public ReceiptHandleGroupKey(Channel channel, String group) {
- this.channel = channel;
- this.group = group;
- }
-
- protected String getChannelId() {
- return channel.id().asLongText();
- }
-
- public String getGroup() {
- return group;
- }
-
- public Channel getChannel() {
- return channel;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o;
- return Objects.equal(getChannelId(), key.getChannelId()) && Objects.equal(group, key.group);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(getChannelId(), group);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("channelId", getChannelId())
- .add("group", group)
- .toString();
- }
- }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index 9f35435f0..69f44344a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -55,7 +55,7 @@ import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
@@ -64,7 +64,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected final MetadataService metadataService;
protected final ConsumerManager consumerManager;
- protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap;
+ protected final ConcurrentMap<ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap;
protected final StateEventListener<RenewEvent> eventListener;
protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy();
protected final ScheduledExecutorService scheduledExecutorService =
@@ -96,7 +96,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
// if the channel sync from other proxy is expired, not to clear data of connect to current proxy
return;
}
- clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
+ clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
}
}
@@ -125,19 +125,19 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
}
public void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) {
- ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
+ ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleGroupKey(channel, group),
k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
}
public MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle) {
- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
+ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleGroupKey(channel, group));
if (handleGroup == null) {
return null;
}
return handleGroup.remove(msgID, receiptHandle);
}
- protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) {
+ protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) {
return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null;
}
@@ -145,8 +145,8 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
Stopwatch stopwatch = Stopwatch.createStarted();
try {
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
- ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey();
+ for (Map.Entry<ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
+ ReceiptHandleGroupKey key = entry.getKey();
if (clientIsOffline(key)) {
clearGroup(key);
continue;
@@ -159,7 +159,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) {
return;
}
- renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr));
+ renewalWorkerService.submit(() -> renewMessage(key, group, msgID, handleStr));
});
}
} catch (Exception e) {
@@ -169,15 +169,15 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis());
}
- protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) {
+ protected void renewMessage(ReceiptHandleGroupKey key, ReceiptHandleGroup group, String msgID, String handleStr) {
try {
- group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
+ group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> startRenewMessage(key, messageReceiptHandle));
} catch (Exception e) {
log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e);
}
}
- protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
+ protected CompletableFuture<MessageReceiptHandle> startRenewMessage(ReceiptHandleGroupKey key, MessageReceiptHandle messageReceiptHandle) {
CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>();
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
long current = System.currentTimeMillis();
@@ -188,7 +188,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
}
if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
CompletableFuture<AckResult> future = new CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), RenewEvent.EventType.RENEW, future));
+ eventListener.fireEvent(new RenewEvent(key, messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), RenewEvent.EventType.RENEW, future));
future.whenComplete((ackResult, throwable) -> {
if (throwable != null) {
log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
@@ -218,7 +218,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
}
RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
CompletableFuture<AckResult> future = new CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), RenewEvent.EventType.STOP_RENEW, future));
+ eventListener.fireEvent(new RenewEvent(key, messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), RenewEvent.EventType.STOP_RENEW, future));
future.whenComplete((ackResult, throwable) -> {
if (throwable != null) {
log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable);
@@ -233,7 +233,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
return resFuture;
}
- protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) {
+ protected void clearGroup(ReceiptHandleGroupKey key) {
if (key == null) {
return;
}
@@ -246,7 +246,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
try {
handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> {
CompletableFuture<AckResult> future = new CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), RenewEvent.EventType.CLEAR_GROUP, future));
+ eventListener.fireEvent(new RenewEvent(key, messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), RenewEvent.EventType.CLEAR_GROUP, future));
return CompletableFuture.completedFuture(null);
});
} catch (Exception e) {
@@ -257,8 +257,8 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
protected void clearAllHandle() {
log.info("start clear all handle in receiptHandleProcessor");
- Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
- for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
+ Set<ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
+ for (ReceiptHandleGroupKey key : keySet) {
clearGroup(key);
}
log.info("clear all handle in receiptHandleProcessor done");
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
index 7c6943e44..25ae1509a 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
@@ -45,7 +45,7 @@ import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
import org.apache.rocketmq.proxy.service.BaseServiceTest;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
@@ -445,7 +445,7 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest {
public void testClearGroup() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
- receiptHandleManager.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
+ receiptHandleManager.clearGroup(new ReceiptHandleGroupKey(channel, GROUP));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
receiptHandleManager.scheduleRenewTask();
--
2.32.0.windows.2
From c06facf08939772f81fffde72d14746d3a9384f2 Mon Sep 17 00:00:00 2001
From: rongtong <jinrongtong5@163.com>
Date: Thu, 3 Aug 2023 11:39:24 +0800
Subject: [PATCH 3/6] [ISSUE #7102] Making perm equal to 0 is valid
---
.../main/java/org/apache/rocketmq/common/constant/PermName.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
index d7c76b4c0..d87461d7f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
@@ -62,7 +62,7 @@ public class PermName {
}
public static boolean isValid(final int perm) {
- return perm >= PERM_INHERIT && perm < PERM_PRIORITY;
+ return perm >= 0 && perm < PERM_PRIORITY;
}
public static boolean isPriority(final int perm) {
--
2.32.0.windows.2
From 1fe5d6233455f191d7195fb5f6e27dc46510dd3e Mon Sep 17 00:00:00 2001
From: koado <34032341+Koado@users.noreply.github.com>
Date: Thu, 3 Aug 2023 11:40:16 +0800
Subject: [PATCH 4/6] [ISSUE #7074] Allow a BoundaryType to be specified when
retrieving offset based on the timestamp (#7082)
* add new interface for searching offset with boundary type
* format code
* fix failed test
* unify two BoundaryType class
* add interface getOffsetInQueueByTime(long timestamp, BoundaryType boundaryTYpe) in ConsumeQueueInterface
* fix AdminBrokerProcessorTest unnecessary Mockito stubbings
---
.../processor/AdminBrokerProcessor.java | 4 +-
.../processor/AdminBrokerProcessorTest.java | 3 +-
.../rocketmq/client/impl/MQAdminImpl.java | 9 +++-
.../rocketmq/client/impl/MQClientAPIImpl.java | 10 +++-
.../remoting/protocol/RemotingCommand.java | 4 ++
.../header/SearchOffsetRequestHeader.java | 13 +++++
.../apache/rocketmq/store/ConsumeQueue.java | 2 +
.../rocketmq/store/DefaultMessageStore.java | 7 ++-
.../apache/rocketmq/store/MessageStore.java | 12 +++++
.../store/queue/BatchConsumeQueue.java | 37 +++++++++++---
.../store/queue/ConsumeQueueInterface.java | 10 ++++
.../store/queue/SparseConsumeQueue.java | 3 +-
.../tieredstore/MessageStoreFetcher.java | 2 +-
.../tieredstore/TieredMessageFetcher.java | 2 +-
.../tieredstore/TieredMessageStore.java | 3 +-
.../tieredstore/common/BoundaryType.java | 51 -------------------
.../tieredstore/file/CompositeAccess.java | 2 +-
.../tieredstore/file/CompositeFlatFile.java | 2 +-
.../tieredstore/file/TieredConsumeQueue.java | 2 +-
.../tieredstore/file/TieredFlatFile.java | 2 +-
.../tieredstore/TieredMessageFetcherTest.java | 2 +-
.../tieredstore/TieredMessageStoreTest.java | 2 +-
.../file/CompositeQueueFlatFileTest.java | 2 +-
.../tools/admin/DefaultMQAdminExt.java | 9 ++++
.../tools/admin/DefaultMQAdminExtImpl.java | 5 ++
.../tools/admin/DefaultMQAdminExtTest.java | 30 +++++++++++
26 files changed, 154 insertions(+), 76 deletions(-)
delete mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 569a1c57b..a6ce03dc2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -994,7 +994,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
continue;
}
if (mappingDetail.getBname().equals(item.getBname())) {
- offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp);
+ offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp, requestHeader.getBoundaryType());
if (offset > 0) {
offset = item.computeStaticQueueOffsetStrictly(offset);
break;
@@ -1045,7 +1045,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
- requestHeader.getTimestamp());
+ requestHeader.getTimestamp(), requestHeader.getBoundaryType());
responseHeader.setOffset(offset);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index fa2d929b0..a470c0cf2 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -311,7 +312,7 @@ public class AdminBrokerProcessorTest {
@Test
public void testSearchOffsetByTimestamp() throws Exception {
messageStore = mock(MessageStore.class);
- when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong())).thenReturn(Long.MIN_VALUE);
+ when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), any(BoundaryType.class))).thenReturn(Long.MIN_VALUE);
when(brokerController.getMessageStore()).thenReturn(messageStore);
SearchOffsetRequestHeader searchOffsetRequestHeader = new SearchOffsetRequestHeader();
searchOffsetRequestHeader.setTopic("topic");
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 33fc44fd6..1ef3a9483 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.help.FAQUrl;
@@ -184,6 +185,11 @@ public class MQAdminImpl {
}
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+ // default return lower boundary offset when there are more than one offsets.
+ return searchOffset(mq, timestamp, BoundaryType.LOWER);
+ }
+
+ public long searchOffset(MessageQueue mq, long timestamp, BoundaryType boundaryType) throws MQClientException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
@@ -192,7 +198,8 @@ public class MQAdminImpl {
if (brokerAddr != null) {
try {
- return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp, timeoutMillis);
+ return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp,
+ boundaryType, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 4c9c3a169..708a6acd1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -76,6 +76,7 @@ import org.apache.rocketmq.common.namesrv.NameServerUpdateCallback;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -1237,13 +1238,20 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
public long searchOffset(final String addr, final MessageQueue messageQueue, final long timestamp,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
+ // default return lower boundary offset when there are more than one offsets.
+ return searchOffset(addr, messageQueue, timestamp, BoundaryType.LOWER, timeoutMillis);
+ }
+
+ public long searchOffset(final String addr, final MessageQueue messageQueue, final long timestamp,
+ final BoundaryType boundaryType, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
requestHeader.setTopic(messageQueue.getTopic());
requestHeader.setQueueId(messageQueue.getQueueId());
requestHeader.setBname(messageQueue.getBrokerName());
requestHeader.setTimestamp(timestamp);
+ requestHeader.setBoundaryType(boundaryType);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
-
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index a6ed022ea..d27135132 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -63,6 +64,7 @@ public class RemotingCommand {
private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
+ private static final String BOUNDARY_TYPE_CANONICAL_NAME = BoundaryType.class.getCanonicalName();
private static volatile int configVersion = -1;
private static AtomicInteger requestId = new AtomicInteger(0);
@@ -311,6 +313,8 @@ public class RemotingCommand {
valueParsed = Boolean.parseBoolean(value);
} else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
valueParsed = Double.parseDouble(value);
+ } else if (type.equals(BOUNDARY_TYPE_CANONICAL_NAME)) {
+ valueParsed = BoundaryType.getType(value);
} else {
throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
index 0c644d739..79c3d337b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
@@ -21,6 +21,7 @@
package org.apache.rocketmq.remoting.protocol.header;
import com.google.common.base.MoreObjects;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
@@ -33,6 +34,8 @@ public class SearchOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private Long timestamp;
+ private BoundaryType boundaryType;
+
@Override
public void checkFields() throws RemotingCommandException {
@@ -66,12 +69,22 @@ public class SearchOffsetRequestHeader extends TopicQueueRequestHeader {
this.timestamp = timestamp;
}
+ public BoundaryType getBoundaryType() {
+ // default return LOWER
+ return boundaryType == null ? BoundaryType.LOWER : boundaryType;
+ }
+
+ public void setBoundaryType(BoundaryType boundaryType) {
+ this.boundaryType = boundaryType;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("topic", topic)
.add("queueId", queueId)
.add("timestamp", timestamp)
+ .add("boundaryType", boundaryType.getName())
.toString();
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 0c44ad043..a0b886eb0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -204,6 +204,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
return CQ_STORE_UNIT_SIZE;
}
+ @Deprecated
@Override
public long getOffsetInQueueByTime(final long timestamp) {
MappedFile mappedFile = this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp,
@@ -211,6 +212,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
return binarySearchInQueueByTime(mappedFile, timestamp, BoundaryType.LOWER);
}
+ @Override
public long getOffsetInQueueByTime(final long timestamp, final BoundaryType boundaryType) {
MappedFile mappedFile = this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp,
messageStore.getCommitLog(), boundaryType);
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index acc1610e0..25e4a166f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -82,6 +82,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.common.utils.ServiceProvider;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -1015,9 +1016,13 @@ public class DefaultMessageStore implements MessageStore {
@Override
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
+ return getOffsetInQueueByTime(topic, queueId, timestamp, BoundaryType.LOWER);
+ }
+
+ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) {
ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
- long resultOffset = logic.getOffsetInQueueByTime(timestamp);
+ long resultOffset = logic.getOffsetInQueueByTime(timestamp, boundaryType);
// Make sure the result offset is in valid range.
resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue());
resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue());
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 3db0c18f7..31bbb907f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.message.MessageExt;
@@ -226,6 +227,17 @@ public interface MessageStore {
*/
long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp);
+ /**
+ * Look up the physical offset of the message whose store timestamp is as specified with specific boundaryType.
+ *
+ * @param topic Topic of the message.
+ * @param queueId Queue ID.
+ * @param timestamp Timestamp to look up.
+ * @param boundaryType Lower or Upper
+ * @return physical offset which matches.
+ */
+ long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp, final BoundaryType boundaryType);
+
/**
* Look up the message by given commit log offset.
*
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 8fec1bf7b..387c233bf 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -708,8 +709,14 @@ public class BatchConsumeQueue implements ConsumeQueueInterface {
* @param timestamp
* @return
*/
+ @Deprecated
@Override
public long getOffsetInQueueByTime(final long timestamp) {
+ return getOffsetInQueueByTime(timestamp, BoundaryType.LOWER);
+ }
+
+ @Override
+ public long getOffsetInQueueByTime(long timestamp, BoundaryType boundaryType) {
MappedFile targetBcq;
BatchOffsetIndex targetMinOffset;
@@ -760,7 +767,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface {
if (timestamp >= maxQueueTimestamp) {
return byteBuffer.getLong(right + MSG_BASE_OFFSET_INDEX);
}
- int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp);
+ int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp, boundaryType);
if (mid != -1) {
return byteBuffer.getLong(mid + MSG_BASE_OFFSET_INDEX);
}
@@ -819,11 +826,11 @@ public class BatchConsumeQueue implements ConsumeQueueInterface {
/**
* Find the offset of which the value is equal or larger than the given targetValue.
- * If there are many values equal to the target, then find the earliest one.
+ * If there are many values equal to the target, then return the lowest offset if boundaryType is LOWER while
+ * return the highest offset if boundaryType is UPPER.
*/
public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, final int unitSize,
- final int unitShift,
- long targetValue) {
+ final int unitShift, long targetValue, BoundaryType boundaryType) {
int mid = -1;
while (left <= right) {
mid = ceil((left + right) / 2);
@@ -844,10 +851,24 @@ public class BatchConsumeQueue implements ConsumeQueueInterface {
}
} else {
//mid is actually in the mid
- if (tmpValue < targetValue) {
- left = mid + unitSize;
- } else {
- right = mid;
+ switch (boundaryType) {
+ case LOWER:
+ if (tmpValue < targetValue) {
+ left = mid + unitSize;
+ } else {
+ right = mid;
+ }
+ break;
+ case UPPER:
+ if (tmpValue <= targetValue) {
+ left = mid;
+ } else {
+ right = mid - unitSize;
+ }
+ break;
+ default:
+ log.warn("Unknown boundary type");
+ return -1;
}
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index d7213fa37..55d080829 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.store.queue;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.DispatchRequest;
@@ -93,6 +94,15 @@ public interface ConsumeQueueInterface extends FileQueueLifeCycle {
*/
long getOffsetInQueueByTime(final long timestamp);
+ /**
+ * Get the message whose timestamp is the smallest, greater than or equal to the given time and when there are more
+ * than one message satisfy the condition, decide which one to return based on boundaryType.
+ * @param timestamp timestamp
+ * @param boundaryType Lower or Upper
+ * @return the offset(index)
+ */
+ long getOffsetInQueueByTime(final long timestamp, final BoundaryType boundaryType);
+
/**
* The max physical offset of commitlog has been dispatched to this queue.
* It should be exclusive.
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
index 5b397d696..4a5f3a93b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.store.queue;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
@@ -148,7 +149,7 @@ public class SparseConsumeQueue extends BatchConsumeQueue {
ByteBuffer byteBuffer = sbr.getByteBuffer();
int left = minOffset.getIndexPos();
int right = maxOffset.getIndexPos();
- int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset);
+ int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset, BoundaryType.LOWER);
if (mid != -1) {
return minOffset.getMappedFile().selectMappedBuffer(mid);
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
index f4d576d29..8ae4dc7f9 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
@@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.QueryMessageResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
+import org.apache.rocketmq.common.BoundaryType;
public interface MessageStoreFetcher {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index c4fed54bd..9a9a3e5a5 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -39,7 +39,6 @@ import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture;
import org.apache.rocketmq.tieredstore.common.MessageCacheKey;
import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
@@ -59,6 +58,7 @@ import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
public class TieredMessageFetcher implements MessageStoreFetcher {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 115d9640d..1f12410f2 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PopAckConstants;
@@ -45,7 +46,6 @@ import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
@@ -287,6 +287,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
return getOffsetInQueueByTime(topic, queueId, timestamp, BoundaryType.LOWER);
}
+ @Override
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) {
long earliestTimeInNextStore = next.getEarliestMessageTime();
if (earliestTimeInNextStore <= 0) {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java
deleted file mode 100644
index 77e53ec11..000000000
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.tieredstore.common;
-
-/**
- * This enumeration represents the boundary types.
- * It has two constants, lower and upper, which represent the lower and upper boundaries respectively.
- */
-public enum BoundaryType {
-
- /**
- * Represents the lower boundary.
- */
- LOWER("lower"),
-
- /**
- * Represents the upper boundary.
- */
- UPPER("upper");
-
- private final String name;
-
- BoundaryType(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
- public static BoundaryType getType(String name) {
- if (BoundaryType.UPPER.getName().equalsIgnoreCase(name)) {
- return UPPER;
- }
- return LOWER;
- }
-}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
index bc1062cd0..3d962e40d 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
+import org.apache.rocketmq.common.BoundaryType;
interface CompositeAccess {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
index fa01382e1..df4baf33f 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
@@ -37,7 +37,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture;
import org.apache.rocketmq.tieredstore.common.InFlightRequestKey;
@@ -46,6 +45,7 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
public class CompositeFlatFile implements CompositeAccess {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
index ff9572af6..35007f8cb 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
@@ -21,8 +21,8 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.common.BoundaryType;
public class TieredConsumeQueue {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index 90ca843bf..75ce8d89f 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
@@ -42,6 +41,7 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
public class TieredFlatFile {
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index df3720bab..d75b2f916 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -30,7 +30,6 @@ import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
@@ -41,6 +40,7 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 58b7a52cc..8601392e7 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -37,11 +37,11 @@ import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 8322c72ed..27efe111e 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -23,7 +23,6 @@ import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
@@ -33,6 +32,7 @@ import org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index dd9c6a9b4..f0a08dfb1 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -122,6 +123,14 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return defaultMQAdminExtImpl.searchOffset(mq, timestamp);
}
+ public long searchLowerBoundaryOffset(MessageQueue mq, long timestamp) throws MQClientException {
+ return defaultMQAdminExtImpl.searchOffset(mq, timestamp, BoundaryType.LOWER);
+ }
+
+ public long searchUpperBoundaryOffset(MessageQueue mq, long timestamp) throws MQClientException {
+ return defaultMQAdminExtImpl.searchOffset(mq, timestamp, BoundaryType.UPPER);
+ }
+
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return defaultMQAdminExtImpl.maxOffset(mq);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index c5c467bf0..fa3596d51 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -65,6 +65,7 @@ import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
@@ -1700,6 +1701,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
}
+ public long searchOffset(MessageQueue mq, long timestamp, BoundaryType boundaryType) throws MQClientException {
+ return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp, boundaryType);
+ }
+
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index b94754f22..dc5642f88 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -512,6 +512,36 @@ public class DefaultMQAdminExtTest {
assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(TOPIC1, BROKER1_NAME, 0), System.currentTimeMillis())).isEqualTo(101L);
}
+ @Test
+ public void testSearchOffsetWithSpecificBoundaryType() throws Exception {
+ // do mock
+ DefaultMQAdminExt mockDefaultMQAdminExt = mock(DefaultMQAdminExt.class);
+ when(mockDefaultMQAdminExt.minOffset(any(MessageQueue.class))).thenReturn(0L);
+ when(mockDefaultMQAdminExt.maxOffset(any(MessageQueue.class))).thenReturn(101L);
+ when(mockDefaultMQAdminExt.searchLowerBoundaryOffset(any(MessageQueue.class), anyLong())).thenReturn(0L);
+ when(mockDefaultMQAdminExt.searchUpperBoundaryOffset(any(MessageQueue.class), anyLong())).thenReturn(100L);
+ when(mockDefaultMQAdminExt.queryConsumeTimeSpan(anyString(), anyString())).thenReturn(mockQueryConsumeTimeSpan());
+
+ for (QueueTimeSpan timeSpan: mockDefaultMQAdminExt.queryConsumeTimeSpan(TOPIC1, "group_one")) {
+ MessageQueue mq = timeSpan.getMessageQueue();
+ long maxOffset = mockDefaultMQAdminExt.maxOffset(mq);
+ long minOffset = mockDefaultMQAdminExt.minOffset(mq);
+ // if there is at least one message in queue, the maxOffset returns the queue's latest offset + 1
+ assertThat((maxOffset == 0 ? 0 : maxOffset - 1) == mockDefaultMQAdminExt.searchUpperBoundaryOffset(mq, timeSpan.getMaxTimeStamp())).isTrue();
+ assertThat(minOffset == mockDefaultMQAdminExt.searchLowerBoundaryOffset(mq, timeSpan.getMinTimeStamp())).isTrue();
+ }
+ }
+
+ private List<QueueTimeSpan> mockQueryConsumeTimeSpan() {
+ List<QueueTimeSpan> spanSet = new ArrayList<>();
+ QueueTimeSpan timeSpan = new QueueTimeSpan();
+ timeSpan.setMessageQueue(new MessageQueue(TOPIC1, BROKER1_NAME, 0));
+ timeSpan.setMinTimeStamp(1690421253000L);
+ timeSpan.setMaxTimeStamp(1690507653000L);
+ spanSet.add(timeSpan);
+ return spanSet;
+ }
+
@Test
public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException {
TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig("127.0.0.1:10911", "topic_test_examine_topicConfig");
--
2.32.0.windows.2
From 3bdabf703b883fea6181df8889c08d1e91202291 Mon Sep 17 00:00:00 2001
From: ShuangxiDing <dingshuangxi888@gmail.com>
Date: Thu, 3 Aug 2023 16:10:24 +0800
Subject: [PATCH 5/6] [ISSUE #7109] support the mixed topic type (#7110)
* Add mixed message type.
* support mixed topic type for grpc server
* remove the unnecessary parentheses around expression
* format the javadoc
---
.../common/attribute/TopicMessageType.java | 5 +++--
.../proxy/grpc/v2/route/RouteActivity.java | 22 +++++++++++--------
.../DefaultTopicMessageTypeValidator.java | 7 +++---
.../validator/TopicMessageTypeValidator.java | 6 ++---
4 files changed, 23 insertions(+), 17 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
index 77629e4c9..9680acec7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
@@ -27,7 +27,8 @@ public enum TopicMessageType {
NORMAL("NORMAL"),
FIFO("FIFO"),
DELAY("DELAY"),
- TRANSACTION("TRANSACTION");
+ TRANSACTION("TRANSACTION"),
+ MIXED("MIXED");
private final String value;
TopicMessageType(String value) {
@@ -35,7 +36,7 @@ public enum TopicMessageType {
}
public static Set<String> topicMessageTypeSet() {
- return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value);
+ return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value, MIXED.value);
}
public String getValue() {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index c5d485691..02dea0cda 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -32,6 +32,8 @@ import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.Resource;
import com.google.common.net.HostAndPort;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -259,7 +261,7 @@ public class RouteActivity extends AbstractMessingActivity {
MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
.setId(queueIdIndex++)
.setPermission(Permission.READ)
- .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
+ .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
.build();
messageQueueList.add(messageQueue);
}
@@ -268,7 +270,7 @@ public class RouteActivity extends AbstractMessingActivity {
MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
.setId(queueIdIndex++)
.setPermission(Permission.WRITE)
- .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
+ .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
.build();
messageQueueList.add(messageQueue);
}
@@ -277,7 +279,7 @@ public class RouteActivity extends AbstractMessingActivity {
MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
.setId(queueIdIndex++)
.setPermission(Permission.READ_WRITE)
- .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
+ .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
.build();
messageQueueList.add(messageQueue);
}
@@ -285,18 +287,20 @@ public class RouteActivity extends AbstractMessingActivity {
return messageQueueList;
}
- private MessageType parseTopicMessageType(TopicMessageType topicMessageType) {
+ private List<MessageType> parseTopicMessageType(TopicMessageType topicMessageType) {
switch (topicMessageType) {
case NORMAL:
- return MessageType.NORMAL;
+ return Collections.singletonList(MessageType.NORMAL);
case FIFO:
- return MessageType.FIFO;
+ return Collections.singletonList(MessageType.FIFO);
case TRANSACTION:
- return MessageType.TRANSACTION;
+ return Collections.singletonList(MessageType.TRANSACTION);
case DELAY:
- return MessageType.DELAY;
+ return Collections.singletonList(MessageType.DELAY);
+ case MIXED:
+ return Arrays.asList(MessageType.NORMAL, MessageType.FIFO, MessageType.DELAY, MessageType.TRANSACTION);
default:
- return MessageType.MESSAGE_TYPE_UNSPECIFIED;
+ return Collections.singletonList(MessageType.MESSAGE_TYPE_UNSPECIFIED);
}
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
index bc2fcf30f..83588f110 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
@@ -23,9 +23,10 @@ import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
public class DefaultTopicMessageTypeValidator implements TopicMessageTypeValidator {
- public void validate(TopicMessageType topicMessageType, TopicMessageType messageType) {
- if (messageType.equals(TopicMessageType.UNSPECIFIED) || !messageType.equals(topicMessageType)) {
- String errorInfo = String.format("TopicMessageType validate failed, topic type is %s, message type is %s", topicMessageType, messageType);
+ public void validate(TopicMessageType expectedType, TopicMessageType actualType) {
+ if (actualType.equals(TopicMessageType.UNSPECIFIED)
+ || !actualType.equals(expectedType) && !expectedType.equals(TopicMessageType.MIXED)) {
+ String errorInfo = String.format("TopicMessageType validate failed, the expected type is %s, but actual type is %s", expectedType, actualType);
throw new ProxyException(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, errorInfo);
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
index 137be9095..32758da50 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
@@ -23,8 +23,8 @@ public interface TopicMessageTypeValidator {
/**
* Will throw {@link org.apache.rocketmq.proxy.common.ProxyException} if validate failed.
*
- * @param topicMessageType Target topic
- * @param messageType Message's type
+ * @param expectedType Target topic
+ * @param actualType Message's type
*/
- void validate(TopicMessageType topicMessageType, TopicMessageType messageType);
+ void validate(TopicMessageType expectedType, TopicMessageType actualType);
}
--
2.32.0.windows.2
From c73d8ee346035aec3d548b8b29c64c626a34e68b Mon Sep 17 00:00:00 2001
From: haolinkong <110664176+haolinkong@users.noreply.github.com>
Date: Fri, 4 Aug 2023 10:41:41 +0800
Subject: [PATCH 6/6] [ISSUE #6962]operation.md Format adjustment
---
docs/cn/operation.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/docs/cn/operation.md b/docs/cn/operation.md
index 4310da570..9f04ce1d3 100644
--- a/docs/cn/operation.md
+++ b/docs/cn/operation.md
@@ -569,9 +569,9 @@ RocketMQ 5.0 开始支持自动主从切换的模式,可参考以下文档
<td class=xl68 width=87 style='width:65pt'>NameServer 服务地址,格式 ip:port</td>
</tr>
<tr height=57 style='height:43.0pt'>
- <td rowspan=1 height=137 class=xl69 width=191 style='border-bottom:1.0pt;
+ <td rowspan=3 height=137 class=xl69 width=191 style='border-bottom:1.0pt;
height:103.0pt;border-top:none;width:143pt'>wipeWritePerm</td>
- <td rowspan=1 class=xl72 width=87 style='border-bottom:1.0pt
+ <td rowspan=3 class=xl72 width=87 style='border-bottom:1.0pt
border-top:none;width:65pt'>从NameServer上清除 Broker写权限</td>
<td class=xl67 width=87 style='width:65pt'>-b</td>
<td class=xl68 width=87 style='width:65pt'>BrokerName</td>
--
2.32.0.windows.2