From 886d62d91ec28913916a8a98449edb4a6f458154 Mon Sep 17 00:00:00 2001 From: shizhili Date: Tue, 19 Sep 2023 18:21:07 +0800 Subject: [PATCH] backport add allow boundary type --- patch008-backport-Allow-BoundaryType.patch | 1418 ++++++++++++++++++++ rocketmq.spec | 6 +- 2 files changed, 1423 insertions(+), 1 deletion(-) create mode 100644 patch008-backport-Allow-BoundaryType.patch diff --git a/patch008-backport-Allow-BoundaryType.patch b/patch008-backport-Allow-BoundaryType.patch new file mode 100644 index 0000000..f3e7564 --- /dev/null +++ b/patch008-backport-Allow-BoundaryType.patch @@ -0,0 +1,1418 @@ +From a1bf49d5d07cf64374bc3dde5ab43add831433ad Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Tue, 1 Aug 2023 15:56:34 +0800 +Subject: [PATCH 1/6] [ISSUE #7093] Avoid dispatch tasks too much cause + dispatch task failed (#7094) + +* Avoid dispatch tasks too much cause dispatch task failed + +* set schedule task async +--- + .../apache/rocketmq/tieredstore/TieredDispatcher.java | 11 ++++++----- + .../tieredstore/common/TieredStoreExecutor.java | 2 +- + 2 files changed, 7 insertions(+), 6 deletions(-) + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +index 523b0c2cd..bb58ea7dd 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +@@ -82,7 +82,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() -> + tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> { + if (!flatFile.getCompositeFlatFileLock().isLocked()) { +- dispatchFlatFile(flatFile); ++ dispatchFlatFileAsync(flatFile); + } + }), 30, 10, TimeUnit.SECONDS); + } +@@ -180,10 +180,6 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + message.release(); + flatFile.getCompositeFlatFileLock().unlock(); + } +- } else { +- if (!flatFile.getCompositeFlatFileLock().isLocked()) { +- this.dispatchFlatFileAsync(flatFile); +- } + } + } + +@@ -199,6 +195,11 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + } + + public void dispatchFlatFileAsync(CompositeQueueFlatFile flatFile, Consumer consumer) { ++ // Avoid dispatch tasks too much ++ if (TieredStoreExecutor.dispatchThreadPoolQueue.size() > ++ TieredStoreExecutor.QUEUE_CAPACITY * 0.75) { ++ return; ++ } + TieredStoreExecutor.dispatchExecutor.execute(() -> { + try { + dispatchFlatFile(flatFile); +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java +index 23f1b01ea..6eb3478b3 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java +@@ -27,7 +27,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl; + + public class TieredStoreExecutor { + +- private static final int QUEUE_CAPACITY = 10000; ++ public static final int QUEUE_CAPACITY = 10000; + + // Visible for monitor + public static BlockingQueue dispatchThreadPoolQueue; +-- +2.32.0.windows.2 + + +From ab61183030f4f230619ea539cbd2cb977234208b Mon Sep 17 00:00:00 2001 +From: Zhouxiang Zhan +Date: Tue, 1 Aug 2023 19:00:15 +0800 +Subject: [PATCH 2/6] [ISSUE #7104] Add ReceiptHandleGroupKey for RenewEvent + (#7105) + +--- + .../proxy/common/ReceiptHandleGroupKey.java | 69 +++++++++++++++++++ + .../rocketmq/proxy/common/RenewEvent.java | 9 ++- + .../processor/ReceiptHandleProcessor.java | 55 ++------------- + .../receipt/DefaultReceiptHandleManager.java | 36 +++++----- + .../DefaultReceiptHandleManagerTest.java | 4 +- + 5 files changed, 101 insertions(+), 72 deletions(-) + create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java +new file mode 100644 +index 000000000..bd28393e5 +--- /dev/null ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java +@@ -0,0 +1,69 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.proxy.common; ++ ++import com.google.common.base.MoreObjects; ++import com.google.common.base.Objects; ++import io.netty.channel.Channel; ++ ++public class ReceiptHandleGroupKey { ++ protected final Channel channel; ++ protected final String group; ++ ++ public ReceiptHandleGroupKey(Channel channel, String group) { ++ this.channel = channel; ++ this.group = group; ++ } ++ ++ protected String getChannelId() { ++ return channel.id().asLongText(); ++ } ++ ++ public String getGroup() { ++ return group; ++ } ++ ++ public Channel getChannel() { ++ return channel; ++ } ++ ++ @Override ++ public boolean equals(Object o) { ++ if (this == o) { ++ return true; ++ } ++ if (o == null || getClass() != o.getClass()) { ++ return false; ++ } ++ ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o; ++ return Objects.equal(getChannelId(), key.getChannelId()) && Objects.equal(group, key.group); ++ } ++ ++ @Override ++ public int hashCode() { ++ return Objects.hashCode(getChannelId(), group); ++ } ++ ++ @Override ++ public String toString() { ++ return MoreObjects.toStringHelper(this) ++ .add("channelId", getChannelId()) ++ .add("group", group) ++ .toString(); ++ } ++} +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java +index 0ff65c1cc..8d591560a 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java +@@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture; + import org.apache.rocketmq.client.consumer.AckResult; + + public class RenewEvent { ++ protected ReceiptHandleGroupKey key; + protected MessageReceiptHandle messageReceiptHandle; + protected long renewTime; + protected EventType eventType; +@@ -32,13 +33,19 @@ public class RenewEvent { + CLEAR_GROUP + } + +- public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, EventType eventType, CompletableFuture future) { ++ public RenewEvent(ReceiptHandleGroupKey key, MessageReceiptHandle messageReceiptHandle, long renewTime, ++ EventType eventType, CompletableFuture future) { ++ this.key = key; + this.messageReceiptHandle = messageReceiptHandle; + this.renewTime = renewTime; + this.eventType = eventType; + this.future = future; + } + ++ public ReceiptHandleGroupKey getKey() { ++ return key; ++ } ++ + public MessageReceiptHandle getMessageReceiptHandle() { + return messageReceiptHandle; + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +index 460842a86..5e1be9321 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +@@ -17,19 +17,17 @@ + + package org.apache.rocketmq.proxy.processor; + +-import com.google.common.base.MoreObjects; +-import com.google.common.base.Objects; + import io.netty.channel.Channel; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.consumer.ReceiptHandle; + import org.apache.rocketmq.common.state.StateEventListener; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +-import org.apache.rocketmq.proxy.common.RenewEvent; + import org.apache.rocketmq.proxy.common.MessageReceiptHandle; + import org.apache.rocketmq.proxy.common.ProxyContext; +-import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager; ++import org.apache.rocketmq.proxy.common.RenewEvent; + import org.apache.rocketmq.proxy.service.ServiceManager; ++import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager; + + public class ReceiptHandleProcessor extends AbstractProcessor { + protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); +@@ -38,7 +36,8 @@ public class ReceiptHandleProcessor extends AbstractProcessor { + public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) { + super(messagingProcessor, serviceManager); + StateEventListener eventListener = event -> { +- ProxyContext context = createContext(event.getEventType().name()); ++ ProxyContext context = createContext(event.getEventType().name()) ++ .setChannel(event.getKey().getChannel()); + MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); + ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); + messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), +@@ -66,50 +65,4 @@ public class ReceiptHandleProcessor extends AbstractProcessor { + return receiptHandleManager.removeReceiptHandle(ctx, channel, group, msgID, receiptHandle); + } + +- public static class ReceiptHandleGroupKey { +- protected final Channel channel; +- protected final String group; +- +- public ReceiptHandleGroupKey(Channel channel, String group) { +- this.channel = channel; +- this.group = group; +- } +- +- protected String getChannelId() { +- return channel.id().asLongText(); +- } +- +- public String getGroup() { +- return group; +- } +- +- public Channel getChannel() { +- return channel; +- } +- +- @Override +- public boolean equals(Object o) { +- if (this == o) { +- return true; +- } +- if (o == null || getClass() != o.getClass()) { +- return false; +- } +- ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o; +- return Objects.equal(getChannelId(), key.getChannelId()) && Objects.equal(group, key.group); +- } +- +- @Override +- public int hashCode() { +- return Objects.hashCode(getChannelId(), group); +- } +- +- @Override +- public String toString() { +- return MoreObjects.toStringHelper(this) +- .add("channelId", getChannelId()) +- .add("group", group) +- .toString(); +- } +- } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +index 9f35435f0..69f44344a 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +@@ -55,7 +55,7 @@ import org.apache.rocketmq.proxy.common.channel.ChannelHelper; + import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; + import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; +-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; ++import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey; + import org.apache.rocketmq.proxy.service.metadata.MetadataService; + import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; + import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +@@ -64,7 +64,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + protected final MetadataService metadataService; + protected final ConsumerManager consumerManager; +- protected final ConcurrentMap receiptHandleGroupMap; ++ protected final ConcurrentMap receiptHandleGroupMap; + protected final StateEventListener eventListener; + protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); + protected final ScheduledExecutorService scheduledExecutorService = +@@ -96,7 +96,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + // if the channel sync from other proxy is expired, not to clear data of connect to current proxy + return; + } +- clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); ++ clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); + log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo); + } + } +@@ -125,19 +125,19 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + } + + public void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) { +- ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group), ++ ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleGroupKey(channel, group), + k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle); + } + + public MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle) { +- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group)); ++ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleGroupKey(channel, group)); + if (handleGroup == null) { + return null; + } + return handleGroup.remove(msgID, receiptHandle); + } + +- protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) { ++ protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) { + return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null; + } + +@@ -145,8 +145,8 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); +- for (Map.Entry entry : receiptHandleGroupMap.entrySet()) { +- ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey(); ++ for (Map.Entry entry : receiptHandleGroupMap.entrySet()) { ++ ReceiptHandleGroupKey key = entry.getKey(); + if (clientIsOffline(key)) { + clearGroup(key); + continue; +@@ -159,7 +159,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { + return; + } +- renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr)); ++ renewalWorkerService.submit(() -> renewMessage(key, group, msgID, handleStr)); + }); + } + } catch (Exception e) { +@@ -169,15 +169,15 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis()); + } + +- protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) { ++ protected void renewMessage(ReceiptHandleGroupKey key, ReceiptHandleGroup group, String msgID, String handleStr) { + try { +- group.computeIfPresent(msgID, handleStr, this::startRenewMessage); ++ group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> startRenewMessage(key, messageReceiptHandle)); + } catch (Exception e) { + log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e); + } + } + +- protected CompletableFuture startRenewMessage(MessageReceiptHandle messageReceiptHandle) { ++ protected CompletableFuture startRenewMessage(ReceiptHandleGroupKey key, MessageReceiptHandle messageReceiptHandle) { + CompletableFuture resFuture = new CompletableFuture<>(); + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + long current = System.currentTimeMillis(); +@@ -188,7 +188,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + } + if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { + CompletableFuture future = new CompletableFuture<>(); +- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), RenewEvent.EventType.RENEW, future)); ++ eventListener.fireEvent(new RenewEvent(key, messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), RenewEvent.EventType.RENEW, future)); + future.whenComplete((ackResult, throwable) -> { + if (throwable != null) { + log.error("error when renew. handle:{}", messageReceiptHandle, throwable); +@@ -218,7 +218,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + } + RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy(); + CompletableFuture future = new CompletableFuture<>(); +- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), RenewEvent.EventType.STOP_RENEW, future)); ++ eventListener.fireEvent(new RenewEvent(key, messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), RenewEvent.EventType.STOP_RENEW, future)); + future.whenComplete((ackResult, throwable) -> { + if (throwable != null) { + log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable); +@@ -233,7 +233,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + return resFuture; + } + +- protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) { ++ protected void clearGroup(ReceiptHandleGroupKey key) { + if (key == null) { + return; + } +@@ -246,7 +246,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + try { + handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> { + CompletableFuture future = new CompletableFuture<>(); +- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), RenewEvent.EventType.CLEAR_GROUP, future)); ++ eventListener.fireEvent(new RenewEvent(key, messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), RenewEvent.EventType.CLEAR_GROUP, future)); + return CompletableFuture.completedFuture(null); + }); + } catch (Exception e) { +@@ -257,8 +257,8 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + + protected void clearAllHandle() { + log.info("start clear all handle in receiptHandleProcessor"); +- Set keySet = receiptHandleGroupMap.keySet(); +- for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) { ++ Set keySet = receiptHandleGroupMap.keySet(); ++ for (ReceiptHandleGroupKey key : keySet) { + clearGroup(key); + } + log.info("clear all handle in receiptHandleProcessor done"); +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java +index 7c6943e44..25ae1509a 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java +@@ -45,7 +45,7 @@ import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; + import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; + import org.apache.rocketmq.proxy.processor.MessagingProcessor; +-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; ++import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey; + import org.apache.rocketmq.proxy.service.BaseServiceTest; + import org.apache.rocketmq.proxy.service.metadata.MetadataService; + import org.apache.rocketmq.remoting.protocol.LanguageCode; +@@ -445,7 +445,7 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { + public void testClearGroup() { + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); +- receiptHandleManager.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP)); ++ receiptHandleManager.clearGroup(new ReceiptHandleGroupKey(channel, GROUP)); + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); + receiptHandleManager.scheduleRenewTask(); +-- +2.32.0.windows.2 + + +From c06facf08939772f81fffde72d14746d3a9384f2 Mon Sep 17 00:00:00 2001 +From: rongtong +Date: Thu, 3 Aug 2023 11:39:24 +0800 +Subject: [PATCH 3/6] [ISSUE #7102] Making perm equal to 0 is valid + +--- + .../main/java/org/apache/rocketmq/common/constant/PermName.java | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java +index d7c76b4c0..d87461d7f 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java ++++ b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java +@@ -62,7 +62,7 @@ public class PermName { + } + + public static boolean isValid(final int perm) { +- return perm >= PERM_INHERIT && perm < PERM_PRIORITY; ++ return perm >= 0 && perm < PERM_PRIORITY; + } + + public static boolean isPriority(final int perm) { +-- +2.32.0.windows.2 + + +From 1fe5d6233455f191d7195fb5f6e27dc46510dd3e Mon Sep 17 00:00:00 2001 +From: koado <34032341+Koado@users.noreply.github.com> +Date: Thu, 3 Aug 2023 11:40:16 +0800 +Subject: [PATCH 4/6] [ISSUE #7074] Allow a BoundaryType to be specified when + retrieving offset based on the timestamp (#7082) + +* add new interface for searching offset with boundary type + +* format code + +* fix failed test + +* unify two BoundaryType class + +* add interface getOffsetInQueueByTime(long timestamp, BoundaryType boundaryTYpe) in ConsumeQueueInterface + +* fix AdminBrokerProcessorTest unnecessary Mockito stubbings +--- + .../processor/AdminBrokerProcessor.java | 4 +- + .../processor/AdminBrokerProcessorTest.java | 3 +- + .../rocketmq/client/impl/MQAdminImpl.java | 9 +++- + .../rocketmq/client/impl/MQClientAPIImpl.java | 10 +++- + .../remoting/protocol/RemotingCommand.java | 4 ++ + .../header/SearchOffsetRequestHeader.java | 13 +++++ + .../apache/rocketmq/store/ConsumeQueue.java | 2 + + .../rocketmq/store/DefaultMessageStore.java | 7 ++- + .../apache/rocketmq/store/MessageStore.java | 12 +++++ + .../store/queue/BatchConsumeQueue.java | 37 +++++++++++--- + .../store/queue/ConsumeQueueInterface.java | 10 ++++ + .../store/queue/SparseConsumeQueue.java | 3 +- + .../tieredstore/MessageStoreFetcher.java | 2 +- + .../tieredstore/TieredMessageFetcher.java | 2 +- + .../tieredstore/TieredMessageStore.java | 3 +- + .../tieredstore/common/BoundaryType.java | 51 ------------------- + .../tieredstore/file/CompositeAccess.java | 2 +- + .../tieredstore/file/CompositeFlatFile.java | 2 +- + .../tieredstore/file/TieredConsumeQueue.java | 2 +- + .../tieredstore/file/TieredFlatFile.java | 2 +- + .../tieredstore/TieredMessageFetcherTest.java | 2 +- + .../tieredstore/TieredMessageStoreTest.java | 2 +- + .../file/CompositeQueueFlatFileTest.java | 2 +- + .../tools/admin/DefaultMQAdminExt.java | 9 ++++ + .../tools/admin/DefaultMQAdminExtImpl.java | 5 ++ + .../tools/admin/DefaultMQAdminExtTest.java | 30 +++++++++++ + 26 files changed, 154 insertions(+), 76 deletions(-) + delete mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +index 569a1c57b..a6ce03dc2 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +@@ -994,7 +994,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + continue; + } + if (mappingDetail.getBname().equals(item.getBname())) { +- offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp); ++ offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp, requestHeader.getBoundaryType()); + if (offset > 0) { + offset = item.computeStaticQueueOffsetStrictly(offset); + break; +@@ -1045,7 +1045,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + } + + long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(), +- requestHeader.getTimestamp()); ++ requestHeader.getTimestamp(), requestHeader.getBoundaryType()); + + responseHeader.setOffset(offset); + +diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +index fa2d929b0..a470c0cf2 100644 +--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java ++++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +@@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ConsumerManager; + import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; + import org.apache.rocketmq.broker.schedule.ScheduleMessageService; + import org.apache.rocketmq.broker.topic.TopicConfigManager; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.BrokerConfig; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.TopicConfig; +@@ -311,7 +312,7 @@ public class AdminBrokerProcessorTest { + @Test + public void testSearchOffsetByTimestamp() throws Exception { + messageStore = mock(MessageStore.class); +- when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong())).thenReturn(Long.MIN_VALUE); ++ when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), any(BoundaryType.class))).thenReturn(Long.MIN_VALUE); + when(brokerController.getMessageStore()).thenReturn(messageStore); + SearchOffsetRequestHeader searchOffsetRequestHeader = new SearchOffsetRequestHeader(); + searchOffsetRequestHeader.setTopic("topic"); +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +index 33fc44fd6..1ef3a9483 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +@@ -34,6 +34,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException; + import org.apache.rocketmq.client.exception.MQClientException; + import org.apache.rocketmq.client.impl.factory.MQClientInstance; + import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.TopicConfig; + import org.apache.rocketmq.common.help.FAQUrl; +@@ -184,6 +185,11 @@ public class MQAdminImpl { + } + + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { ++ // default return lower boundary offset when there are more than one offsets. ++ return searchOffset(mq, timestamp, BoundaryType.LOWER); ++ } ++ ++ public long searchOffset(MessageQueue mq, long timestamp, BoundaryType boundaryType) throws MQClientException { + String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); + if (null == brokerAddr) { + this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); +@@ -192,7 +198,8 @@ public class MQAdminImpl { + + if (brokerAddr != null) { + try { +- return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp, timeoutMillis); ++ return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp, ++ boundaryType, timeoutMillis); + } catch (Exception e) { + throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +index 4c9c3a169..708a6acd1 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +@@ -76,6 +76,7 @@ import org.apache.rocketmq.common.namesrv.NameServerUpdateCallback; + import org.apache.rocketmq.common.namesrv.TopAddressing; + import org.apache.rocketmq.common.sysflag.PullSysFlag; + import org.apache.rocketmq.common.topic.TopicValidator; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.remoting.CommandCustomHeader; + import org.apache.rocketmq.remoting.InvokeCallback; + import org.apache.rocketmq.remoting.RPCHook; +@@ -1237,13 +1238,20 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { + public long searchOffset(final String addr, final MessageQueue messageQueue, final long timestamp, + final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException { ++ // default return lower boundary offset when there are more than one offsets. ++ return searchOffset(addr, messageQueue, timestamp, BoundaryType.LOWER, timeoutMillis); ++ } ++ ++ public long searchOffset(final String addr, final MessageQueue messageQueue, final long timestamp, ++ final BoundaryType boundaryType, final long timeoutMillis) ++ throws RemotingException, MQBrokerException, InterruptedException { + SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); + requestHeader.setTopic(messageQueue.getTopic()); + requestHeader.setQueueId(messageQueue.getQueueId()); + requestHeader.setBname(messageQueue.getBrokerName()); + requestHeader.setTimestamp(timestamp); ++ requestHeader.setBoundaryType(boundaryType); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader); +- + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +index a6ed022ea..d27135132 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +@@ -34,6 +34,7 @@ import java.util.Set; + import java.util.concurrent.atomic.AtomicInteger; + + import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +@@ -63,6 +64,7 @@ public class RemotingCommand { + private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName(); + private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName(); + private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName(); ++ private static final String BOUNDARY_TYPE_CANONICAL_NAME = BoundaryType.class.getCanonicalName(); + private static volatile int configVersion = -1; + private static AtomicInteger requestId = new AtomicInteger(0); + +@@ -311,6 +313,8 @@ public class RemotingCommand { + valueParsed = Boolean.parseBoolean(value); + } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) { + valueParsed = Double.parseDouble(value); ++ } else if (type.equals(BOUNDARY_TYPE_CANONICAL_NAME)) { ++ valueParsed = BoundaryType.getType(value); + } else { + throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported"); + } +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java +index 0c644d739..79c3d337b 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java +@@ -21,6 +21,7 @@ + package org.apache.rocketmq.remoting.protocol.header; + + import com.google.common.base.MoreObjects; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.remoting.annotation.CFNotNull; + import org.apache.rocketmq.remoting.exception.RemotingCommandException; + import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader; +@@ -33,6 +34,8 @@ public class SearchOffsetRequestHeader extends TopicQueueRequestHeader { + @CFNotNull + private Long timestamp; + ++ private BoundaryType boundaryType; ++ + @Override + public void checkFields() throws RemotingCommandException { + +@@ -66,12 +69,22 @@ public class SearchOffsetRequestHeader extends TopicQueueRequestHeader { + this.timestamp = timestamp; + } + ++ public BoundaryType getBoundaryType() { ++ // default return LOWER ++ return boundaryType == null ? BoundaryType.LOWER : boundaryType; ++ } ++ ++ public void setBoundaryType(BoundaryType boundaryType) { ++ this.boundaryType = boundaryType; ++ } ++ + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("topic", topic) + .add("queueId", queueId) + .add("timestamp", timestamp) ++ .add("boundaryType", boundaryType.getName()) + .toString(); + } + } +diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +index 0c44ad043..a0b886eb0 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ++++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +@@ -204,6 +204,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { + return CQ_STORE_UNIT_SIZE; + } + ++ @Deprecated + @Override + public long getOffsetInQueueByTime(final long timestamp) { + MappedFile mappedFile = this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp, +@@ -211,6 +212,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { + return binarySearchInQueueByTime(mappedFile, timestamp, BoundaryType.LOWER); + } + ++ @Override + public long getOffsetInQueueByTime(final long timestamp, final BoundaryType boundaryType) { + MappedFile mappedFile = this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp, + messageStore.getCommitLog(), boundaryType); +diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +index acc1610e0..25e4a166f 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +@@ -82,6 +82,7 @@ import org.apache.rocketmq.common.topic.TopicValidator; + import org.apache.rocketmq.common.utils.CleanupPolicyUtils; + import org.apache.rocketmq.common.utils.QueueTypeUtils; + import org.apache.rocketmq.common.utils.ServiceProvider; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; +@@ -1015,9 +1016,13 @@ public class DefaultMessageStore implements MessageStore { + + @Override + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { ++ return getOffsetInQueueByTime(topic, queueId, timestamp, BoundaryType.LOWER); ++ } ++ ++ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) { + ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId); + if (logic != null) { +- long resultOffset = logic.getOffsetInQueueByTime(timestamp); ++ long resultOffset = logic.getOffsetInQueueByTime(timestamp, boundaryType); + // Make sure the result offset is in valid range. + resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue()); + resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue()); +diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +index 3db0c18f7..31bbb907f 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java ++++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +@@ -29,6 +29,7 @@ import java.util.Set; + import java.util.concurrent.CompletableFuture; + import java.util.function.Supplier; + ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.Pair; + import org.apache.rocketmq.common.SystemClock; + import org.apache.rocketmq.common.message.MessageExt; +@@ -226,6 +227,17 @@ public interface MessageStore { + */ + long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp); + ++ /** ++ * Look up the physical offset of the message whose store timestamp is as specified with specific boundaryType. ++ * ++ * @param topic Topic of the message. ++ * @param queueId Queue ID. ++ * @param timestamp Timestamp to look up. ++ * @param boundaryType Lower or Upper ++ * @return physical offset which matches. ++ */ ++ long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp, final BoundaryType boundaryType); ++ + /** + * Look up the message by given commit log offset. + * +diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +index 8fec1bf7b..387c233bf 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java ++++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +@@ -24,6 +24,7 @@ import java.util.Map; + import java.util.concurrent.ConcurrentSkipListMap; + import java.util.function.Function; + import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.attribute.CQType; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.message.MessageAccessor; +@@ -708,8 +709,14 @@ public class BatchConsumeQueue implements ConsumeQueueInterface { + * @param timestamp + * @return + */ ++ @Deprecated + @Override + public long getOffsetInQueueByTime(final long timestamp) { ++ return getOffsetInQueueByTime(timestamp, BoundaryType.LOWER); ++ } ++ ++ @Override ++ public long getOffsetInQueueByTime(long timestamp, BoundaryType boundaryType) { + MappedFile targetBcq; + BatchOffsetIndex targetMinOffset; + +@@ -760,7 +767,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface { + if (timestamp >= maxQueueTimestamp) { + return byteBuffer.getLong(right + MSG_BASE_OFFSET_INDEX); + } +- int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp); ++ int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp, boundaryType); + if (mid != -1) { + return byteBuffer.getLong(mid + MSG_BASE_OFFSET_INDEX); + } +@@ -819,11 +826,11 @@ public class BatchConsumeQueue implements ConsumeQueueInterface { + + /** + * Find the offset of which the value is equal or larger than the given targetValue. +- * If there are many values equal to the target, then find the earliest one. ++ * If there are many values equal to the target, then return the lowest offset if boundaryType is LOWER while ++ * return the highest offset if boundaryType is UPPER. + */ + public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, final int unitSize, +- final int unitShift, +- long targetValue) { ++ final int unitShift, long targetValue, BoundaryType boundaryType) { + int mid = -1; + while (left <= right) { + mid = ceil((left + right) / 2); +@@ -844,10 +851,24 @@ public class BatchConsumeQueue implements ConsumeQueueInterface { + } + } else { + //mid is actually in the mid +- if (tmpValue < targetValue) { +- left = mid + unitSize; +- } else { +- right = mid; ++ switch (boundaryType) { ++ case LOWER: ++ if (tmpValue < targetValue) { ++ left = mid + unitSize; ++ } else { ++ right = mid; ++ } ++ break; ++ case UPPER: ++ if (tmpValue <= targetValue) { ++ left = mid; ++ } else { ++ right = mid - unitSize; ++ } ++ break; ++ default: ++ log.warn("Unknown boundary type"); ++ return -1; + } + } + } +diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java +index d7213fa37..55d080829 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java ++++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java +@@ -17,6 +17,7 @@ + + package org.apache.rocketmq.store.queue; + ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.attribute.CQType; + import org.apache.rocketmq.common.message.MessageExtBrokerInner; + import org.apache.rocketmq.store.DispatchRequest; +@@ -93,6 +94,15 @@ public interface ConsumeQueueInterface extends FileQueueLifeCycle { + */ + long getOffsetInQueueByTime(final long timestamp); + ++ /** ++ * Get the message whose timestamp is the smallest, greater than or equal to the given time and when there are more ++ * than one message satisfy the condition, decide which one to return based on boundaryType. ++ * @param timestamp timestamp ++ * @param boundaryType Lower or Upper ++ * @return the offset(index) ++ */ ++ long getOffsetInQueueByTime(final long timestamp, final BoundaryType boundaryType); ++ + /** + * The max physical offset of commitlog has been dispatched to this queue. + * It should be exclusive. +diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java +index 5b397d696..4a5f3a93b 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java ++++ b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java +@@ -16,6 +16,7 @@ + */ + package org.apache.rocketmq.store.queue; + ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.UtilAll; + import org.apache.rocketmq.store.MessageStore; + import org.apache.rocketmq.store.SelectMappedBufferResult; +@@ -148,7 +149,7 @@ public class SparseConsumeQueue extends BatchConsumeQueue { + ByteBuffer byteBuffer = sbr.getByteBuffer(); + int left = minOffset.getIndexPos(); + int right = maxOffset.getIndexPos(); +- int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset); ++ int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset, BoundaryType.LOWER); + if (mid != -1) { + return minOffset.getMappedFile().selectMappedBuffer(mid); + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java +index f4d576d29..8ae4dc7f9 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java +@@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture; + import org.apache.rocketmq.store.GetMessageResult; + import org.apache.rocketmq.store.MessageFilter; + import org.apache.rocketmq.store.QueryMessageResult; +-import org.apache.rocketmq.tieredstore.common.BoundaryType; ++import org.apache.rocketmq.common.BoundaryType; + + public interface MessageStoreFetcher { + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +index c4fed54bd..9a9a3e5a5 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +@@ -39,7 +39,6 @@ import org.apache.rocketmq.store.GetMessageStatus; + import org.apache.rocketmq.store.MessageFilter; + import org.apache.rocketmq.store.QueryMessageResult; + import org.apache.rocketmq.store.SelectMappedBufferResult; +-import org.apache.rocketmq.tieredstore.common.BoundaryType; + import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture; + import org.apache.rocketmq.tieredstore.common.MessageCacheKey; + import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper; +@@ -59,6 +58,7 @@ import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager; + import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; ++import org.apache.rocketmq.common.BoundaryType; + + public class TieredMessageFetcher implements MessageStoreFetcher { + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +index 115d9640d..1f12410f2 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture; + import java.util.concurrent.TimeUnit; + import java.util.function.Supplier; + import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.Pair; + import org.apache.rocketmq.common.PopAckConstants; +@@ -45,7 +46,6 @@ import org.apache.rocketmq.store.QueryMessageResult; + import org.apache.rocketmq.store.SelectMappedBufferResult; + import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore; + import org.apache.rocketmq.store.plugin.MessageStorePluginContext; +-import org.apache.rocketmq.tieredstore.common.BoundaryType; + import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; + import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; + import org.apache.rocketmq.tieredstore.file.CompositeFlatFile; +@@ -287,6 +287,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { + return getOffsetInQueueByTime(topic, queueId, timestamp, BoundaryType.LOWER); + } + ++ @Override + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) { + long earliestTimeInNextStore = next.getEarliestMessageTime(); + if (earliestTimeInNextStore <= 0) { +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java +deleted file mode 100644 +index 77e53ec11..000000000 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java ++++ /dev/null +@@ -1,51 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.rocketmq.tieredstore.common; +- +-/** +- * This enumeration represents the boundary types. +- * It has two constants, lower and upper, which represent the lower and upper boundaries respectively. +- */ +-public enum BoundaryType { +- +- /** +- * Represents the lower boundary. +- */ +- LOWER("lower"), +- +- /** +- * Represents the upper boundary. +- */ +- UPPER("upper"); +- +- private final String name; +- +- BoundaryType(String name) { +- this.name = name; +- } +- +- public String getName() { +- return name; +- } +- +- public static BoundaryType getType(String name) { +- if (BoundaryType.UPPER.getName().equalsIgnoreCase(name)) { +- return UPPER; +- } +- return LOWER; +- } +-} +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java +index bc1062cd0..3d962e40d 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java +@@ -20,7 +20,7 @@ import java.nio.ByteBuffer; + import java.util.concurrent.CompletableFuture; + import org.apache.rocketmq.store.DispatchRequest; + import org.apache.rocketmq.tieredstore.common.AppendResult; +-import org.apache.rocketmq.tieredstore.common.BoundaryType; ++import org.apache.rocketmq.common.BoundaryType; + + interface CompositeAccess { + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +index fa01382e1..df4baf33f 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +@@ -37,7 +37,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.store.DispatchRequest; + import org.apache.rocketmq.tieredstore.common.AppendResult; +-import org.apache.rocketmq.tieredstore.common.BoundaryType; + import org.apache.rocketmq.tieredstore.common.FileSegmentType; + import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture; + import org.apache.rocketmq.tieredstore.common.InFlightRequestKey; +@@ -46,6 +45,7 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; + import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; ++import org.apache.rocketmq.common.BoundaryType; + + public class CompositeFlatFile implements CompositeAccess { + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java +index ff9572af6..35007f8cb 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java +@@ -21,8 +21,8 @@ import java.nio.ByteBuffer; + import java.util.concurrent.CompletableFuture; + import org.apache.commons.lang3.tuple.Pair; + import org.apache.rocketmq.tieredstore.common.AppendResult; +-import org.apache.rocketmq.tieredstore.common.BoundaryType; + import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; ++import org.apache.rocketmq.common.BoundaryType; + + public class TieredConsumeQueue { + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +index 90ca843bf..75ce8d89f 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +@@ -33,7 +33,6 @@ import javax.annotation.Nullable; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.tieredstore.common.AppendResult; +-import org.apache.rocketmq.tieredstore.common.BoundaryType; + import org.apache.rocketmq.tieredstore.common.FileSegmentType; + import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode; + import org.apache.rocketmq.tieredstore.exception.TieredStoreException; +@@ -42,6 +41,7 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; + import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator; + import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; ++import org.apache.rocketmq.common.BoundaryType; + + public class TieredFlatFile { + +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java +index df3720bab..d75b2f916 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java +@@ -30,7 +30,6 @@ import org.apache.rocketmq.store.GetMessageStatus; + import org.apache.rocketmq.store.QueryMessageResult; + import org.apache.rocketmq.store.SelectMappedBufferResult; + import org.apache.rocketmq.tieredstore.common.AppendResult; +-import org.apache.rocketmq.tieredstore.common.BoundaryType; + import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper; + import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; + import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; +@@ -41,6 +40,7 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; ++import org.apache.rocketmq.common.BoundaryType; + import org.awaitility.Awaitility; + import org.junit.After; + import org.junit.Assert; +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +index 58b7a52cc..8601392e7 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +@@ -37,11 +37,11 @@ import org.apache.rocketmq.store.QueryMessageResult; + import org.apache.rocketmq.store.SelectMappedBufferResult; + import org.apache.rocketmq.store.config.MessageStoreConfig; + import org.apache.rocketmq.store.plugin.MessageStorePluginContext; +-import org.apache.rocketmq.tieredstore.common.BoundaryType; + import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; + import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile; + import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; ++import org.apache.rocketmq.common.BoundaryType; + import org.junit.After; + import org.junit.Assert; + import org.junit.Before; +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +index 8322c72ed..27efe111e 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +@@ -23,7 +23,6 @@ import org.apache.rocketmq.store.ConsumeQueue; + import org.apache.rocketmq.store.DispatchRequest; + import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; + import org.apache.rocketmq.tieredstore.common.AppendResult; +-import org.apache.rocketmq.tieredstore.common.BoundaryType; + import org.apache.rocketmq.tieredstore.common.FileSegmentType; + import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; + import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; +@@ -33,6 +32,7 @@ import org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; + import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; ++import org.apache.rocketmq.common.BoundaryType; + import org.junit.After; + import org.junit.Assert; + import org.junit.Before; +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +index dd9c6a9b4..f0a08dfb1 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageExt; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.common.message.MessageRequestMode; + import org.apache.rocketmq.common.topic.TopicValidator; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.remoting.RPCHook; + import org.apache.rocketmq.remoting.exception.RemotingCommandException; + import org.apache.rocketmq.remoting.exception.RemotingConnectException; +@@ -122,6 +123,14 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { + return defaultMQAdminExtImpl.searchOffset(mq, timestamp); + } + ++ public long searchLowerBoundaryOffset(MessageQueue mq, long timestamp) throws MQClientException { ++ return defaultMQAdminExtImpl.searchOffset(mq, timestamp, BoundaryType.LOWER); ++ } ++ ++ public long searchUpperBoundaryOffset(MessageQueue mq, long timestamp) throws MQClientException { ++ return defaultMQAdminExtImpl.searchOffset(mq, timestamp, BoundaryType.UPPER); ++ } ++ + @Override + public long maxOffset(MessageQueue mq) throws MQClientException { + return defaultMQAdminExtImpl.maxOffset(mq); +diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +index c5c467bf0..fa3596d51 100644 +--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java ++++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +@@ -65,6 +65,7 @@ import org.apache.rocketmq.common.message.MessageRequestMode; + import org.apache.rocketmq.common.namesrv.NamesrvUtil; + import org.apache.rocketmq.common.topic.TopicValidator; + import org.apache.rocketmq.common.utils.NetworkUtil; ++import org.apache.rocketmq.common.BoundaryType; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.remoting.RPCHook; +@@ -1700,6 +1701,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { + return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp); + } + ++ public long searchOffset(MessageQueue mq, long timestamp, BoundaryType boundaryType) throws MQClientException { ++ return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp, boundaryType); ++ } ++ + @Override + public long maxOffset(MessageQueue mq) throws MQClientException { + return this.mqClientInstance.getMQAdminImpl().maxOffset(mq); +diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +index b94754f22..dc5642f88 100644 +--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java ++++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +@@ -512,6 +512,36 @@ public class DefaultMQAdminExtTest { + assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(TOPIC1, BROKER1_NAME, 0), System.currentTimeMillis())).isEqualTo(101L); + } + ++ @Test ++ public void testSearchOffsetWithSpecificBoundaryType() throws Exception { ++ // do mock ++ DefaultMQAdminExt mockDefaultMQAdminExt = mock(DefaultMQAdminExt.class); ++ when(mockDefaultMQAdminExt.minOffset(any(MessageQueue.class))).thenReturn(0L); ++ when(mockDefaultMQAdminExt.maxOffset(any(MessageQueue.class))).thenReturn(101L); ++ when(mockDefaultMQAdminExt.searchLowerBoundaryOffset(any(MessageQueue.class), anyLong())).thenReturn(0L); ++ when(mockDefaultMQAdminExt.searchUpperBoundaryOffset(any(MessageQueue.class), anyLong())).thenReturn(100L); ++ when(mockDefaultMQAdminExt.queryConsumeTimeSpan(anyString(), anyString())).thenReturn(mockQueryConsumeTimeSpan()); ++ ++ for (QueueTimeSpan timeSpan: mockDefaultMQAdminExt.queryConsumeTimeSpan(TOPIC1, "group_one")) { ++ MessageQueue mq = timeSpan.getMessageQueue(); ++ long maxOffset = mockDefaultMQAdminExt.maxOffset(mq); ++ long minOffset = mockDefaultMQAdminExt.minOffset(mq); ++ // if there is at least one message in queue, the maxOffset returns the queue's latest offset + 1 ++ assertThat((maxOffset == 0 ? 0 : maxOffset - 1) == mockDefaultMQAdminExt.searchUpperBoundaryOffset(mq, timeSpan.getMaxTimeStamp())).isTrue(); ++ assertThat(minOffset == mockDefaultMQAdminExt.searchLowerBoundaryOffset(mq, timeSpan.getMinTimeStamp())).isTrue(); ++ } ++ } ++ ++ private List mockQueryConsumeTimeSpan() { ++ List spanSet = new ArrayList<>(); ++ QueueTimeSpan timeSpan = new QueueTimeSpan(); ++ timeSpan.setMessageQueue(new MessageQueue(TOPIC1, BROKER1_NAME, 0)); ++ timeSpan.setMinTimeStamp(1690421253000L); ++ timeSpan.setMaxTimeStamp(1690507653000L); ++ spanSet.add(timeSpan); ++ return spanSet; ++ } ++ + @Test + public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException { + TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig("127.0.0.1:10911", "topic_test_examine_topicConfig"); +-- +2.32.0.windows.2 + + +From 3bdabf703b883fea6181df8889c08d1e91202291 Mon Sep 17 00:00:00 2001 +From: ShuangxiDing +Date: Thu, 3 Aug 2023 16:10:24 +0800 +Subject: [PATCH 5/6] [ISSUE #7109] support the mixed topic type (#7110) + +* Add mixed message type. + +* support mixed topic type for grpc server + +* remove the unnecessary parentheses around expression + +* format the javadoc +--- + .../common/attribute/TopicMessageType.java | 5 +++-- + .../proxy/grpc/v2/route/RouteActivity.java | 22 +++++++++++-------- + .../DefaultTopicMessageTypeValidator.java | 7 +++--- + .../validator/TopicMessageTypeValidator.java | 6 ++--- + 4 files changed, 23 insertions(+), 17 deletions(-) + +diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java +index 77629e4c9..9680acec7 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java ++++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java +@@ -27,7 +27,8 @@ public enum TopicMessageType { + NORMAL("NORMAL"), + FIFO("FIFO"), + DELAY("DELAY"), +- TRANSACTION("TRANSACTION"); ++ TRANSACTION("TRANSACTION"), ++ MIXED("MIXED"); + + private final String value; + TopicMessageType(String value) { +@@ -35,7 +36,7 @@ public enum TopicMessageType { + } + + public static Set topicMessageTypeSet() { +- return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value); ++ return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value, MIXED.value); + } + + public String getValue() { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java +index c5d485691..02dea0cda 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java +@@ -32,6 +32,8 @@ import apache.rocketmq.v2.QueryRouteResponse; + import apache.rocketmq.v2.Resource; + import com.google.common.net.HostAndPort; + import java.util.ArrayList; ++import java.util.Arrays; ++import java.util.Collections; + import java.util.HashMap; + import java.util.List; + import java.util.Map; +@@ -259,7 +261,7 @@ public class RouteActivity extends AbstractMessingActivity { + MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) + .setId(queueIdIndex++) + .setPermission(Permission.READ) +- .addAcceptMessageTypes(parseTopicMessageType(topicMessageType)) ++ .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .build(); + messageQueueList.add(messageQueue); + } +@@ -268,7 +270,7 @@ public class RouteActivity extends AbstractMessingActivity { + MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) + .setId(queueIdIndex++) + .setPermission(Permission.WRITE) +- .addAcceptMessageTypes(parseTopicMessageType(topicMessageType)) ++ .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .build(); + messageQueueList.add(messageQueue); + } +@@ -277,7 +279,7 @@ public class RouteActivity extends AbstractMessingActivity { + MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) + .setId(queueIdIndex++) + .setPermission(Permission.READ_WRITE) +- .addAcceptMessageTypes(parseTopicMessageType(topicMessageType)) ++ .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .build(); + messageQueueList.add(messageQueue); + } +@@ -285,18 +287,20 @@ public class RouteActivity extends AbstractMessingActivity { + return messageQueueList; + } + +- private MessageType parseTopicMessageType(TopicMessageType topicMessageType) { ++ private List parseTopicMessageType(TopicMessageType topicMessageType) { + switch (topicMessageType) { + case NORMAL: +- return MessageType.NORMAL; ++ return Collections.singletonList(MessageType.NORMAL); + case FIFO: +- return MessageType.FIFO; ++ return Collections.singletonList(MessageType.FIFO); + case TRANSACTION: +- return MessageType.TRANSACTION; ++ return Collections.singletonList(MessageType.TRANSACTION); + case DELAY: +- return MessageType.DELAY; ++ return Collections.singletonList(MessageType.DELAY); ++ case MIXED: ++ return Arrays.asList(MessageType.NORMAL, MessageType.FIFO, MessageType.DELAY, MessageType.TRANSACTION); + default: +- return MessageType.MESSAGE_TYPE_UNSPECIFIED; ++ return Collections.singletonList(MessageType.MESSAGE_TYPE_UNSPECIFIED); + } + } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java +index bc2fcf30f..83588f110 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java +@@ -23,9 +23,10 @@ import org.apache.rocketmq.proxy.common.ProxyExceptionCode; + + public class DefaultTopicMessageTypeValidator implements TopicMessageTypeValidator { + +- public void validate(TopicMessageType topicMessageType, TopicMessageType messageType) { +- if (messageType.equals(TopicMessageType.UNSPECIFIED) || !messageType.equals(topicMessageType)) { +- String errorInfo = String.format("TopicMessageType validate failed, topic type is %s, message type is %s", topicMessageType, messageType); ++ public void validate(TopicMessageType expectedType, TopicMessageType actualType) { ++ if (actualType.equals(TopicMessageType.UNSPECIFIED) ++ || !actualType.equals(expectedType) && !expectedType.equals(TopicMessageType.MIXED)) { ++ String errorInfo = String.format("TopicMessageType validate failed, the expected type is %s, but actual type is %s", expectedType, actualType); + throw new ProxyException(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, errorInfo); + } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java +index 137be9095..32758da50 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java +@@ -23,8 +23,8 @@ public interface TopicMessageTypeValidator { + /** + * Will throw {@link org.apache.rocketmq.proxy.common.ProxyException} if validate failed. + * +- * @param topicMessageType Target topic +- * @param messageType Message's type ++ * @param expectedType Target topic ++ * @param actualType Message's type + */ +- void validate(TopicMessageType topicMessageType, TopicMessageType messageType); ++ void validate(TopicMessageType expectedType, TopicMessageType actualType); + } +-- +2.32.0.windows.2 + + +From c73d8ee346035aec3d548b8b29c64c626a34e68b Mon Sep 17 00:00:00 2001 +From: haolinkong <110664176+haolinkong@users.noreply.github.com> +Date: Fri, 4 Aug 2023 10:41:41 +0800 +Subject: [PATCH 6/6] [ISSUE #6962]operation.md Format adjustment + +--- + docs/cn/operation.md | 4 ++-- + 1 file changed, 2 insertions(+), 2 deletions(-) + +diff --git a/docs/cn/operation.md b/docs/cn/operation.md +index 4310da570..9f04ce1d3 100644 +--- a/docs/cn/operation.md ++++ b/docs/cn/operation.md +@@ -569,9 +569,9 @@ RocketMQ 5.0 开始支持自动主从切换的模式,可参考以下文档 + NameServer 服务地址,格式 ip:port + + +- wipeWritePerm +- 从NameServer上清除 Broker写权限 + -b + BrokerName +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 128f3ed..cd83a3e 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.3 -Release: 8 +Release: 9 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -17,6 +17,7 @@ Patch0004: patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server Patch0005: patch005-backport-fix-some-bugs.patch Patch0006: patch006-backport-auto-batch-producer.patch Patch0007: patch007-backport-fix-some-bugs.patch +Patch0008: patch008-backport-Allow-BoundaryType.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -51,6 +52,9 @@ exit 0 %changelog +* Tue Sep 19 2023 ShiZhili - 5.1.3-9 +- backport add allow boundary type + * Tue Sep 19 2023 ShiZhili - 5.1.3-8 - backport fix some docs error