From a1bf49d5d07cf64374bc3dde5ab43add831433ad Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Tue, 1 Aug 2023 15:56:34 +0800 Subject: [PATCH 1/6] [ISSUE #7093] Avoid dispatch tasks too much cause dispatch task failed (#7094) * Avoid dispatch tasks too much cause dispatch task failed * set schedule task async --- .../apache/rocketmq/tieredstore/TieredDispatcher.java | 11 ++++++----- .../tieredstore/common/TieredStoreExecutor.java | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java index 523b0c2cd..bb58ea7dd 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java @@ -82,7 +82,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() -> tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> { if (!flatFile.getCompositeFlatFileLock().isLocked()) { - dispatchFlatFile(flatFile); + dispatchFlatFileAsync(flatFile); } }), 30, 10, TimeUnit.SECONDS); } @@ -180,10 +180,6 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch message.release(); flatFile.getCompositeFlatFileLock().unlock(); } - } else { - if (!flatFile.getCompositeFlatFileLock().isLocked()) { - this.dispatchFlatFileAsync(flatFile); - } } } @@ -199,6 +195,11 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch } public void dispatchFlatFileAsync(CompositeQueueFlatFile flatFile, Consumer consumer) { + // Avoid dispatch tasks too much + if (TieredStoreExecutor.dispatchThreadPoolQueue.size() > + TieredStoreExecutor.QUEUE_CAPACITY * 0.75) { + return; + } TieredStoreExecutor.dispatchExecutor.execute(() -> { try { dispatchFlatFile(flatFile); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java index 23f1b01ea..6eb3478b3 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java @@ -27,7 +27,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl; public class TieredStoreExecutor { - private static final int QUEUE_CAPACITY = 10000; + public static final int QUEUE_CAPACITY = 10000; // Visible for monitor public static BlockingQueue dispatchThreadPoolQueue; -- 2.32.0.windows.2 From ab61183030f4f230619ea539cbd2cb977234208b Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan Date: Tue, 1 Aug 2023 19:00:15 +0800 Subject: [PATCH 2/6] [ISSUE #7104] Add ReceiptHandleGroupKey for RenewEvent (#7105) --- .../proxy/common/ReceiptHandleGroupKey.java | 69 +++++++++++++++++++ .../rocketmq/proxy/common/RenewEvent.java | 9 ++- .../processor/ReceiptHandleProcessor.java | 55 ++------------- .../receipt/DefaultReceiptHandleManager.java | 36 +++++----- .../DefaultReceiptHandleManagerTest.java | 4 +- 5 files changed, 101 insertions(+), 72 deletions(-) create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java new file mode 100644 index 000000000..bd28393e5 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.common; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import io.netty.channel.Channel; + +public class ReceiptHandleGroupKey { + protected final Channel channel; + protected final String group; + + public ReceiptHandleGroupKey(Channel channel, String group) { + this.channel = channel; + this.group = group; + } + + protected String getChannelId() { + return channel.id().asLongText(); + } + + public String getGroup() { + return group; + } + + public Channel getChannel() { + return channel; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o; + return Objects.equal(getChannelId(), key.getChannelId()) && Objects.equal(group, key.group); + } + + @Override + public int hashCode() { + return Objects.hashCode(getChannelId(), group); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("channelId", getChannelId()) + .add("group", group) + .toString(); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java index 0ff65c1cc..8d591560a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java @@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.client.consumer.AckResult; public class RenewEvent { + protected ReceiptHandleGroupKey key; protected MessageReceiptHandle messageReceiptHandle; protected long renewTime; protected EventType eventType; @@ -32,13 +33,19 @@ public class RenewEvent { CLEAR_GROUP } - public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, EventType eventType, CompletableFuture future) { + public RenewEvent(ReceiptHandleGroupKey key, MessageReceiptHandle messageReceiptHandle, long renewTime, + EventType eventType, CompletableFuture future) { + this.key = key; this.messageReceiptHandle = messageReceiptHandle; this.renewTime = renewTime; this.eventType = eventType; this.future = future; } + public ReceiptHandleGroupKey getKey() { + return key; + } + public MessageReceiptHandle getMessageReceiptHandle() { return messageReceiptHandle; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index 460842a86..5e1be9321 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -17,19 +17,17 @@ package org.apache.rocketmq.proxy.processor; -import com.google.common.base.MoreObjects; -import com.google.common.base.Objects; import io.netty.channel.Channel; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.state.StateEventListener; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.proxy.common.RenewEvent; import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager; +import org.apache.rocketmq.proxy.common.RenewEvent; import org.apache.rocketmq.proxy.service.ServiceManager; +import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager; public class ReceiptHandleProcessor extends AbstractProcessor { protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -38,7 +36,8 @@ public class ReceiptHandleProcessor extends AbstractProcessor { public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) { super(messagingProcessor, serviceManager); StateEventListener eventListener = event -> { - ProxyContext context = createContext(event.getEventType().name()); + ProxyContext context = createContext(event.getEventType().name()) + .setChannel(event.getKey().getChannel()); MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), @@ -66,50 +65,4 @@ public class ReceiptHandleProcessor extends AbstractProcessor { return receiptHandleManager.removeReceiptHandle(ctx, channel, group, msgID, receiptHandle); } - public static class ReceiptHandleGroupKey { - protected final Channel channel; - protected final String group; - - public ReceiptHandleGroupKey(Channel channel, String group) { - this.channel = channel; - this.group = group; - } - - protected String getChannelId() { - return channel.id().asLongText(); - } - - public String getGroup() { - return group; - } - - public Channel getChannel() { - return channel; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o; - return Objects.equal(getChannelId(), key.getChannelId()) && Objects.equal(group, key.group); - } - - @Override - public int hashCode() { - return Objects.hashCode(getChannelId(), group); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("channelId", getChannelId()) - .add("group", group) - .toString(); - } - } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java index 9f35435f0..69f44344a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java @@ -55,7 +55,7 @@ import org.apache.rocketmq.proxy.common.channel.ChannelHelper; import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; -import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; +import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey; import org.apache.rocketmq.proxy.service.metadata.MetadataService; import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; @@ -64,7 +64,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected final MetadataService metadataService; protected final ConsumerManager consumerManager; - protected final ConcurrentMap receiptHandleGroupMap; + protected final ConcurrentMap receiptHandleGroupMap; protected final StateEventListener eventListener; protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); protected final ScheduledExecutorService scheduledExecutorService = @@ -96,7 +96,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem // if the channel sync from other proxy is expired, not to clear data of connect to current proxy return; } - clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); + clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo); } } @@ -125,19 +125,19 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem } public void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) { - ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group), + ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleGroupKey(channel, group), k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle); } public MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle) { - ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group)); + ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleGroupKey(channel, group)); if (handleGroup == null) { return null; } return handleGroup.remove(msgID, receiptHandle); } - protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) { + protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) { return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null; } @@ -145,8 +145,8 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem Stopwatch stopwatch = Stopwatch.createStarted(); try { ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - for (Map.Entry entry : receiptHandleGroupMap.entrySet()) { - ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey(); + for (Map.Entry entry : receiptHandleGroupMap.entrySet()) { + ReceiptHandleGroupKey key = entry.getKey(); if (clientIsOffline(key)) { clearGroup(key); continue; @@ -159,7 +159,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { return; } - renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr)); + renewalWorkerService.submit(() -> renewMessage(key, group, msgID, handleStr)); }); } } catch (Exception e) { @@ -169,15 +169,15 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis()); } - protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) { + protected void renewMessage(ReceiptHandleGroupKey key, ReceiptHandleGroup group, String msgID, String handleStr) { try { - group.computeIfPresent(msgID, handleStr, this::startRenewMessage); + group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> startRenewMessage(key, messageReceiptHandle)); } catch (Exception e) { log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e); } } - protected CompletableFuture startRenewMessage(MessageReceiptHandle messageReceiptHandle) { + protected CompletableFuture startRenewMessage(ReceiptHandleGroupKey key, MessageReceiptHandle messageReceiptHandle) { CompletableFuture resFuture = new CompletableFuture<>(); ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); long current = System.currentTimeMillis(); @@ -188,7 +188,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem } if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { CompletableFuture future = new CompletableFuture<>(); - eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), RenewEvent.EventType.RENEW, future)); + eventListener.fireEvent(new RenewEvent(key, messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), RenewEvent.EventType.RENEW, future)); future.whenComplete((ackResult, throwable) -> { if (throwable != null) { log.error("error when renew. handle:{}", messageReceiptHandle, throwable); @@ -218,7 +218,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem } RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy(); CompletableFuture future = new CompletableFuture<>(); - eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), RenewEvent.EventType.STOP_RENEW, future)); + eventListener.fireEvent(new RenewEvent(key, messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), RenewEvent.EventType.STOP_RENEW, future)); future.whenComplete((ackResult, throwable) -> { if (throwable != null) { log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable); @@ -233,7 +233,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem return resFuture; } - protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) { + protected void clearGroup(ReceiptHandleGroupKey key) { if (key == null) { return; } @@ -246,7 +246,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem try { handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> { CompletableFuture future = new CompletableFuture<>(); - eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), RenewEvent.EventType.CLEAR_GROUP, future)); + eventListener.fireEvent(new RenewEvent(key, messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), RenewEvent.EventType.CLEAR_GROUP, future)); return CompletableFuture.completedFuture(null); }); } catch (Exception e) { @@ -257,8 +257,8 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem protected void clearAllHandle() { log.info("start clear all handle in receiptHandleProcessor"); - Set keySet = receiptHandleGroupMap.keySet(); - for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) { + Set keySet = receiptHandleGroupMap.keySet(); + for (ReceiptHandleGroupKey key : keySet) { clearGroup(key); } log.info("clear all handle in receiptHandleProcessor done"); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java index 7c6943e44..25ae1509a 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java @@ -45,7 +45,7 @@ import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; +import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey; import org.apache.rocketmq.proxy.service.BaseServiceTest; import org.apache.rocketmq.proxy.service.metadata.MetadataService; import org.apache.rocketmq.remoting.protocol.LanguageCode; @@ -445,7 +445,7 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { public void testClearGroup() { Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); - receiptHandleManager.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP)); + receiptHandleManager.clearGroup(new ReceiptHandleGroupKey(channel, GROUP)); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); receiptHandleManager.scheduleRenewTask(); -- 2.32.0.windows.2 From c06facf08939772f81fffde72d14746d3a9384f2 Mon Sep 17 00:00:00 2001 From: rongtong Date: Thu, 3 Aug 2023 11:39:24 +0800 Subject: [PATCH 3/6] [ISSUE #7102] Making perm equal to 0 is valid --- .../main/java/org/apache/rocketmq/common/constant/PermName.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java index d7c76b4c0..d87461d7f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java +++ b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java @@ -62,7 +62,7 @@ public class PermName { } public static boolean isValid(final int perm) { - return perm >= PERM_INHERIT && perm < PERM_PRIORITY; + return perm >= 0 && perm < PERM_PRIORITY; } public static boolean isPriority(final int perm) { -- 2.32.0.windows.2 From 1fe5d6233455f191d7195fb5f6e27dc46510dd3e Mon Sep 17 00:00:00 2001 From: koado <34032341+Koado@users.noreply.github.com> Date: Thu, 3 Aug 2023 11:40:16 +0800 Subject: [PATCH 4/6] [ISSUE #7074] Allow a BoundaryType to be specified when retrieving offset based on the timestamp (#7082) * add new interface for searching offset with boundary type * format code * fix failed test * unify two BoundaryType class * add interface getOffsetInQueueByTime(long timestamp, BoundaryType boundaryTYpe) in ConsumeQueueInterface * fix AdminBrokerProcessorTest unnecessary Mockito stubbings --- .../processor/AdminBrokerProcessor.java | 4 +- .../processor/AdminBrokerProcessorTest.java | 3 +- .../rocketmq/client/impl/MQAdminImpl.java | 9 +++- .../rocketmq/client/impl/MQClientAPIImpl.java | 10 +++- .../remoting/protocol/RemotingCommand.java | 4 ++ .../header/SearchOffsetRequestHeader.java | 13 +++++ .../apache/rocketmq/store/ConsumeQueue.java | 2 + .../rocketmq/store/DefaultMessageStore.java | 7 ++- .../apache/rocketmq/store/MessageStore.java | 12 +++++ .../store/queue/BatchConsumeQueue.java | 37 +++++++++++--- .../store/queue/ConsumeQueueInterface.java | 10 ++++ .../store/queue/SparseConsumeQueue.java | 3 +- .../tieredstore/MessageStoreFetcher.java | 2 +- .../tieredstore/TieredMessageFetcher.java | 2 +- .../tieredstore/TieredMessageStore.java | 3 +- .../tieredstore/common/BoundaryType.java | 51 ------------------- .../tieredstore/file/CompositeAccess.java | 2 +- .../tieredstore/file/CompositeFlatFile.java | 2 +- .../tieredstore/file/TieredConsumeQueue.java | 2 +- .../tieredstore/file/TieredFlatFile.java | 2 +- .../tieredstore/TieredMessageFetcherTest.java | 2 +- .../tieredstore/TieredMessageStoreTest.java | 2 +- .../file/CompositeQueueFlatFileTest.java | 2 +- .../tools/admin/DefaultMQAdminExt.java | 9 ++++ .../tools/admin/DefaultMQAdminExtImpl.java | 5 ++ .../tools/admin/DefaultMQAdminExtTest.java | 30 +++++++++++ 26 files changed, 154 insertions(+), 76 deletions(-) delete mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 569a1c57b..a6ce03dc2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -994,7 +994,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { continue; } if (mappingDetail.getBname().equals(item.getBname())) { - offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp); + offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp, requestHeader.getBoundaryType()); if (offset > 0) { offset = item.computeStaticQueueOffsetStrictly(offset); break; @@ -1045,7 +1045,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(), - requestHeader.getTimestamp()); + requestHeader.getTimestamp(), requestHeader.getBoundaryType()); responseHeader.setOffset(offset); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index fa2d929b0..a470c0cf2 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.schedule.ScheduleMessageService; import org.apache.rocketmq.broker.topic.TopicConfigManager; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -311,7 +312,7 @@ public class AdminBrokerProcessorTest { @Test public void testSearchOffsetByTimestamp() throws Exception { messageStore = mock(MessageStore.class); - when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong())).thenReturn(Long.MIN_VALUE); + when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), any(BoundaryType.class))).thenReturn(Long.MIN_VALUE); when(brokerController.getMessageStore()).thenReturn(messageStore); SearchOffsetRequestHeader searchOffsetRequestHeader = new SearchOffsetRequestHeader(); searchOffsetRequestHeader.setTopic("topic"); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 33fc44fd6..1ef3a9483 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.help.FAQUrl; @@ -184,6 +185,11 @@ public class MQAdminImpl { } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + // default return lower boundary offset when there are more than one offsets. + return searchOffset(mq, timestamp, BoundaryType.LOWER); + } + + public long searchOffset(MessageQueue mq, long timestamp, BoundaryType boundaryType) throws MQClientException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); if (null == brokerAddr) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); @@ -192,7 +198,8 @@ public class MQAdminImpl { if (brokerAddr != null) { try { - return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp, timeoutMillis); + return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp, + boundaryType, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 4c9c3a169..708a6acd1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -76,6 +76,7 @@ import org.apache.rocketmq.common.namesrv.NameServerUpdateCallback; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -1237,13 +1238,20 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { public long searchOffset(final String addr, final MessageQueue messageQueue, final long timestamp, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { + // default return lower boundary offset when there are more than one offsets. + return searchOffset(addr, messageQueue, timestamp, BoundaryType.LOWER, timeoutMillis); + } + + public long searchOffset(final String addr, final MessageQueue messageQueue, final long timestamp, + final BoundaryType boundaryType, final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException { SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); requestHeader.setTopic(messageQueue.getTopic()); requestHeader.setQueueId(messageQueue.getQueueId()); requestHeader.setBname(messageQueue.getBrokerName()); requestHeader.setTimestamp(timestamp); + requestHeader.setBoundaryType(boundaryType); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader); - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index a6ed022ea..d27135132 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -34,6 +34,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -63,6 +64,7 @@ public class RemotingCommand { private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName(); private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName(); private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName(); + private static final String BOUNDARY_TYPE_CANONICAL_NAME = BoundaryType.class.getCanonicalName(); private static volatile int configVersion = -1; private static AtomicInteger requestId = new AtomicInteger(0); @@ -311,6 +313,8 @@ public class RemotingCommand { valueParsed = Boolean.parseBoolean(value); } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) { valueParsed = Double.parseDouble(value); + } else if (type.equals(BOUNDARY_TYPE_CANONICAL_NAME)) { + valueParsed = BoundaryType.getType(value); } else { throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported"); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java index 0c644d739..79c3d337b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java @@ -21,6 +21,7 @@ package org.apache.rocketmq.remoting.protocol.header; import com.google.common.base.MoreObjects; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader; @@ -33,6 +34,8 @@ public class SearchOffsetRequestHeader extends TopicQueueRequestHeader { @CFNotNull private Long timestamp; + private BoundaryType boundaryType; + @Override public void checkFields() throws RemotingCommandException { @@ -66,12 +69,22 @@ public class SearchOffsetRequestHeader extends TopicQueueRequestHeader { this.timestamp = timestamp; } + public BoundaryType getBoundaryType() { + // default return LOWER + return boundaryType == null ? BoundaryType.LOWER : boundaryType; + } + + public void setBoundaryType(BoundaryType boundaryType) { + this.boundaryType = boundaryType; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) .add("topic", topic) .add("queueId", queueId) .add("timestamp", timestamp) + .add("boundaryType", boundaryType.getName()) .toString(); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 0c44ad043..a0b886eb0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -204,6 +204,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { return CQ_STORE_UNIT_SIZE; } + @Deprecated @Override public long getOffsetInQueueByTime(final long timestamp) { MappedFile mappedFile = this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp, @@ -211,6 +212,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { return binarySearchInQueueByTime(mappedFile, timestamp, BoundaryType.LOWER); } + @Override public long getOffsetInQueueByTime(final long timestamp, final BoundaryType boundaryType) { MappedFile mappedFile = this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp, messageStore.getCommitLog(), boundaryType); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index acc1610e0..25e4a166f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -82,6 +82,7 @@ import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.CleanupPolicyUtils; import org.apache.rocketmq.common.utils.QueueTypeUtils; import org.apache.rocketmq.common.utils.ServiceProvider; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -1015,9 +1016,13 @@ public class DefaultMessageStore implements MessageStore { @Override public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { + return getOffsetInQueueByTime(topic, queueId, timestamp, BoundaryType.LOWER); + } + + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) { ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId); if (logic != null) { - long resultOffset = logic.getOffsetInQueueByTime(timestamp); + long resultOffset = logic.getOffsetInQueueByTime(timestamp, boundaryType); // Make sure the result offset is in valid range. resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue()); resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue()); diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 3db0c18f7..31bbb907f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.message.MessageExt; @@ -226,6 +227,17 @@ public interface MessageStore { */ long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp); + /** + * Look up the physical offset of the message whose store timestamp is as specified with specific boundaryType. + * + * @param topic Topic of the message. + * @param queueId Queue ID. + * @param timestamp Timestamp to look up. + * @param boundaryType Lower or Upper + * @return physical offset which matches. + */ + long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp, final BoundaryType boundaryType); + /** * Look up the message by given commit log offset. * diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index 8fec1bf7b..387c233bf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.Function; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageAccessor; @@ -708,8 +709,14 @@ public class BatchConsumeQueue implements ConsumeQueueInterface { * @param timestamp * @return */ + @Deprecated @Override public long getOffsetInQueueByTime(final long timestamp) { + return getOffsetInQueueByTime(timestamp, BoundaryType.LOWER); + } + + @Override + public long getOffsetInQueueByTime(long timestamp, BoundaryType boundaryType) { MappedFile targetBcq; BatchOffsetIndex targetMinOffset; @@ -760,7 +767,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface { if (timestamp >= maxQueueTimestamp) { return byteBuffer.getLong(right + MSG_BASE_OFFSET_INDEX); } - int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp); + int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp, boundaryType); if (mid != -1) { return byteBuffer.getLong(mid + MSG_BASE_OFFSET_INDEX); } @@ -819,11 +826,11 @@ public class BatchConsumeQueue implements ConsumeQueueInterface { /** * Find the offset of which the value is equal or larger than the given targetValue. - * If there are many values equal to the target, then find the earliest one. + * If there are many values equal to the target, then return the lowest offset if boundaryType is LOWER while + * return the highest offset if boundaryType is UPPER. */ public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, final int unitSize, - final int unitShift, - long targetValue) { + final int unitShift, long targetValue, BoundaryType boundaryType) { int mid = -1; while (left <= right) { mid = ceil((left + right) / 2); @@ -844,10 +851,24 @@ public class BatchConsumeQueue implements ConsumeQueueInterface { } } else { //mid is actually in the mid - if (tmpValue < targetValue) { - left = mid + unitSize; - } else { - right = mid; + switch (boundaryType) { + case LOWER: + if (tmpValue < targetValue) { + left = mid + unitSize; + } else { + right = mid; + } + break; + case UPPER: + if (tmpValue <= targetValue) { + left = mid; + } else { + right = mid - unitSize; + } + break; + default: + log.warn("Unknown boundary type"); + return -1; } } } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java index d7213fa37..55d080829 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store.queue; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.DispatchRequest; @@ -93,6 +94,15 @@ public interface ConsumeQueueInterface extends FileQueueLifeCycle { */ long getOffsetInQueueByTime(final long timestamp); + /** + * Get the message whose timestamp is the smallest, greater than or equal to the given time and when there are more + * than one message satisfy the condition, decide which one to return based on boundaryType. + * @param timestamp timestamp + * @param boundaryType Lower or Upper + * @return the offset(index) + */ + long getOffsetInQueueByTime(final long timestamp, final BoundaryType boundaryType); + /** * The max physical offset of commitlog has been dispatched to this queue. * It should be exclusive. diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java index 5b397d696..4a5f3a93b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.store.queue; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; @@ -148,7 +149,7 @@ public class SparseConsumeQueue extends BatchConsumeQueue { ByteBuffer byteBuffer = sbr.getByteBuffer(); int left = minOffset.getIndexPos(); int right = maxOffset.getIndexPos(); - int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset); + int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset, BoundaryType.LOWER); if (mid != -1) { return minOffset.getMappedFile().selectMappedBuffer(mid); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java index f4d576d29..8ae4dc7f9 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java @@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.QueryMessageResult; -import org.apache.rocketmq.tieredstore.common.BoundaryType; +import org.apache.rocketmq.common.BoundaryType; public interface MessageStoreFetcher { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java index c4fed54bd..9a9a3e5a5 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java @@ -39,7 +39,6 @@ import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; -import org.apache.rocketmq.tieredstore.common.BoundaryType; import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture; import org.apache.rocketmq.tieredstore.common.MessageCacheKey; import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper; @@ -59,6 +58,7 @@ import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager; import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +import org.apache.rocketmq.common.BoundaryType; public class TieredMessageFetcher implements MessageStoreFetcher { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 115d9640d..1f12410f2 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PopAckConstants; @@ -45,7 +46,6 @@ import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore; import org.apache.rocketmq.store.plugin.MessageStorePluginContext; -import org.apache.rocketmq.tieredstore.common.BoundaryType; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; import org.apache.rocketmq.tieredstore.file.CompositeFlatFile; @@ -287,6 +287,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { return getOffsetInQueueByTime(topic, queueId, timestamp, BoundaryType.LOWER); } + @Override public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) { long earliestTimeInNextStore = next.getEarliestMessageTime(); if (earliestTimeInNextStore <= 0) { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java deleted file mode 100644 index 77e53ec11..000000000 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.tieredstore.common; - -/** - * This enumeration represents the boundary types. - * It has two constants, lower and upper, which represent the lower and upper boundaries respectively. - */ -public enum BoundaryType { - - /** - * Represents the lower boundary. - */ - LOWER("lower"), - - /** - * Represents the upper boundary. - */ - UPPER("upper"); - - private final String name; - - BoundaryType(String name) { - this.name = name; - } - - public String getName() { - return name; - } - - public static BoundaryType getType(String name) { - if (BoundaryType.UPPER.getName().equalsIgnoreCase(name)) { - return UPPER; - } - return LOWER; - } -} diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java index bc1062cd0..3d962e40d 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java @@ -20,7 +20,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.tieredstore.common.AppendResult; -import org.apache.rocketmq.tieredstore.common.BoundaryType; +import org.apache.rocketmq.common.BoundaryType; interface CompositeAccess { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java index fa01382e1..df4baf33f 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java @@ -37,7 +37,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.tieredstore.common.AppendResult; -import org.apache.rocketmq.tieredstore.common.BoundaryType; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture; import org.apache.rocketmq.tieredstore.common.InFlightRequestKey; @@ -46,6 +45,7 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +import org.apache.rocketmq.common.BoundaryType; public class CompositeFlatFile implements CompositeAccess { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java index ff9572af6..35007f8cb 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java @@ -21,8 +21,8 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.tieredstore.common.AppendResult; -import org.apache.rocketmq.tieredstore.common.BoundaryType; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; +import org.apache.rocketmq.common.BoundaryType; public class TieredConsumeQueue { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java index 90ca843bf..75ce8d89f 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java @@ -33,7 +33,6 @@ import javax.annotation.Nullable; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.tieredstore.common.AppendResult; -import org.apache.rocketmq.tieredstore.common.BoundaryType; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode; import org.apache.rocketmq.tieredstore.exception.TieredStoreException; @@ -42,6 +41,7 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +import org.apache.rocketmq.common.BoundaryType; public class TieredFlatFile { diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java index df3720bab..d75b2f916 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java @@ -30,7 +30,6 @@ import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.tieredstore.common.AppendResult; -import org.apache.rocketmq.tieredstore.common.BoundaryType; import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; @@ -41,6 +40,7 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +import org.apache.rocketmq.common.BoundaryType; import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java index 58b7a52cc..8601392e7 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java @@ -37,11 +37,11 @@ import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.plugin.MessageStorePluginContext; -import org.apache.rocketmq.tieredstore.common.BoundaryType; import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile; import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +import org.apache.rocketmq.common.BoundaryType; import org.junit.After; import org.junit.Assert; import org.junit.Before; diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java index 8322c72ed..27efe111e 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java @@ -23,7 +23,6 @@ import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; import org.apache.rocketmq.tieredstore.common.AppendResult; -import org.apache.rocketmq.tieredstore.common.BoundaryType; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; @@ -33,6 +32,7 @@ import org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +import org.apache.rocketmq.common.BoundaryType; import org.junit.After; import org.junit.Assert; import org.junit.Before; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index dd9c6a9b4..f0a08dfb1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageRequestMode; import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -122,6 +123,14 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { return defaultMQAdminExtImpl.searchOffset(mq, timestamp); } + public long searchLowerBoundaryOffset(MessageQueue mq, long timestamp) throws MQClientException { + return defaultMQAdminExtImpl.searchOffset(mq, timestamp, BoundaryType.LOWER); + } + + public long searchUpperBoundaryOffset(MessageQueue mq, long timestamp) throws MQClientException { + return defaultMQAdminExtImpl.searchOffset(mq, timestamp, BoundaryType.UPPER); + } + @Override public long maxOffset(MessageQueue mq) throws MQClientException { return defaultMQAdminExtImpl.maxOffset(mq); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index c5c467bf0..fa3596d51 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -65,6 +65,7 @@ import org.apache.rocketmq.common.message.MessageRequestMode; import org.apache.rocketmq.common.namesrv.NamesrvUtil; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.RPCHook; @@ -1700,6 +1701,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp); } + public long searchOffset(MessageQueue mq, long timestamp, BoundaryType boundaryType) throws MQClientException { + return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp, boundaryType); + } + @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.mqClientInstance.getMQAdminImpl().maxOffset(mq); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index b94754f22..dc5642f88 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -512,6 +512,36 @@ public class DefaultMQAdminExtTest { assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(TOPIC1, BROKER1_NAME, 0), System.currentTimeMillis())).isEqualTo(101L); } + @Test + public void testSearchOffsetWithSpecificBoundaryType() throws Exception { + // do mock + DefaultMQAdminExt mockDefaultMQAdminExt = mock(DefaultMQAdminExt.class); + when(mockDefaultMQAdminExt.minOffset(any(MessageQueue.class))).thenReturn(0L); + when(mockDefaultMQAdminExt.maxOffset(any(MessageQueue.class))).thenReturn(101L); + when(mockDefaultMQAdminExt.searchLowerBoundaryOffset(any(MessageQueue.class), anyLong())).thenReturn(0L); + when(mockDefaultMQAdminExt.searchUpperBoundaryOffset(any(MessageQueue.class), anyLong())).thenReturn(100L); + when(mockDefaultMQAdminExt.queryConsumeTimeSpan(anyString(), anyString())).thenReturn(mockQueryConsumeTimeSpan()); + + for (QueueTimeSpan timeSpan: mockDefaultMQAdminExt.queryConsumeTimeSpan(TOPIC1, "group_one")) { + MessageQueue mq = timeSpan.getMessageQueue(); + long maxOffset = mockDefaultMQAdminExt.maxOffset(mq); + long minOffset = mockDefaultMQAdminExt.minOffset(mq); + // if there is at least one message in queue, the maxOffset returns the queue's latest offset + 1 + assertThat((maxOffset == 0 ? 0 : maxOffset - 1) == mockDefaultMQAdminExt.searchUpperBoundaryOffset(mq, timeSpan.getMaxTimeStamp())).isTrue(); + assertThat(minOffset == mockDefaultMQAdminExt.searchLowerBoundaryOffset(mq, timeSpan.getMinTimeStamp())).isTrue(); + } + } + + private List mockQueryConsumeTimeSpan() { + List spanSet = new ArrayList<>(); + QueueTimeSpan timeSpan = new QueueTimeSpan(); + timeSpan.setMessageQueue(new MessageQueue(TOPIC1, BROKER1_NAME, 0)); + timeSpan.setMinTimeStamp(1690421253000L); + timeSpan.setMaxTimeStamp(1690507653000L); + spanSet.add(timeSpan); + return spanSet; + } + @Test public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException { TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig("127.0.0.1:10911", "topic_test_examine_topicConfig"); -- 2.32.0.windows.2 From 3bdabf703b883fea6181df8889c08d1e91202291 Mon Sep 17 00:00:00 2001 From: ShuangxiDing Date: Thu, 3 Aug 2023 16:10:24 +0800 Subject: [PATCH 5/6] [ISSUE #7109] support the mixed topic type (#7110) * Add mixed message type. * support mixed topic type for grpc server * remove the unnecessary parentheses around expression * format the javadoc --- .../common/attribute/TopicMessageType.java | 5 +++-- .../proxy/grpc/v2/route/RouteActivity.java | 22 +++++++++++-------- .../DefaultTopicMessageTypeValidator.java | 7 +++--- .../validator/TopicMessageTypeValidator.java | 6 ++--- 4 files changed, 23 insertions(+), 17 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java index 77629e4c9..9680acec7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java +++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java @@ -27,7 +27,8 @@ public enum TopicMessageType { NORMAL("NORMAL"), FIFO("FIFO"), DELAY("DELAY"), - TRANSACTION("TRANSACTION"); + TRANSACTION("TRANSACTION"), + MIXED("MIXED"); private final String value; TopicMessageType(String value) { @@ -35,7 +36,7 @@ public enum TopicMessageType { } public static Set topicMessageTypeSet() { - return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value); + return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value, MIXED.value); } public String getValue() { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java index c5d485691..02dea0cda 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java @@ -32,6 +32,8 @@ import apache.rocketmq.v2.QueryRouteResponse; import apache.rocketmq.v2.Resource; import com.google.common.net.HostAndPort; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -259,7 +261,7 @@ public class RouteActivity extends AbstractMessingActivity { MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) .setId(queueIdIndex++) .setPermission(Permission.READ) - .addAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) .build(); messageQueueList.add(messageQueue); } @@ -268,7 +270,7 @@ public class RouteActivity extends AbstractMessingActivity { MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) .setId(queueIdIndex++) .setPermission(Permission.WRITE) - .addAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) .build(); messageQueueList.add(messageQueue); } @@ -277,7 +279,7 @@ public class RouteActivity extends AbstractMessingActivity { MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) .setId(queueIdIndex++) .setPermission(Permission.READ_WRITE) - .addAcceptMessageTypes(parseTopicMessageType(topicMessageType)) + .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) .build(); messageQueueList.add(messageQueue); } @@ -285,18 +287,20 @@ public class RouteActivity extends AbstractMessingActivity { return messageQueueList; } - private MessageType parseTopicMessageType(TopicMessageType topicMessageType) { + private List parseTopicMessageType(TopicMessageType topicMessageType) { switch (topicMessageType) { case NORMAL: - return MessageType.NORMAL; + return Collections.singletonList(MessageType.NORMAL); case FIFO: - return MessageType.FIFO; + return Collections.singletonList(MessageType.FIFO); case TRANSACTION: - return MessageType.TRANSACTION; + return Collections.singletonList(MessageType.TRANSACTION); case DELAY: - return MessageType.DELAY; + return Collections.singletonList(MessageType.DELAY); + case MIXED: + return Arrays.asList(MessageType.NORMAL, MessageType.FIFO, MessageType.DELAY, MessageType.TRANSACTION); default: - return MessageType.MESSAGE_TYPE_UNSPECIFIED; + return Collections.singletonList(MessageType.MESSAGE_TYPE_UNSPECIFIED); } } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java index bc2fcf30f..83588f110 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java @@ -23,9 +23,10 @@ import org.apache.rocketmq.proxy.common.ProxyExceptionCode; public class DefaultTopicMessageTypeValidator implements TopicMessageTypeValidator { - public void validate(TopicMessageType topicMessageType, TopicMessageType messageType) { - if (messageType.equals(TopicMessageType.UNSPECIFIED) || !messageType.equals(topicMessageType)) { - String errorInfo = String.format("TopicMessageType validate failed, topic type is %s, message type is %s", topicMessageType, messageType); + public void validate(TopicMessageType expectedType, TopicMessageType actualType) { + if (actualType.equals(TopicMessageType.UNSPECIFIED) + || !actualType.equals(expectedType) && !expectedType.equals(TopicMessageType.MIXED)) { + String errorInfo = String.format("TopicMessageType validate failed, the expected type is %s, but actual type is %s", expectedType, actualType); throw new ProxyException(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, errorInfo); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java index 137be9095..32758da50 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java @@ -23,8 +23,8 @@ public interface TopicMessageTypeValidator { /** * Will throw {@link org.apache.rocketmq.proxy.common.ProxyException} if validate failed. * - * @param topicMessageType Target topic - * @param messageType Message's type + * @param expectedType Target topic + * @param actualType Message's type */ - void validate(TopicMessageType topicMessageType, TopicMessageType messageType); + void validate(TopicMessageType expectedType, TopicMessageType actualType); } -- 2.32.0.windows.2 From c73d8ee346035aec3d548b8b29c64c626a34e68b Mon Sep 17 00:00:00 2001 From: haolinkong <110664176+haolinkong@users.noreply.github.com> Date: Fri, 4 Aug 2023 10:41:41 +0800 Subject: [PATCH 6/6] [ISSUE #6962]operation.md Format adjustment --- docs/cn/operation.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/cn/operation.md b/docs/cn/operation.md index 4310da570..9f04ce1d3 100644 --- a/docs/cn/operation.md +++ b/docs/cn/operation.md @@ -569,9 +569,9 @@ RocketMQ 5.0 开始支持自动主从切换的模式,可参考以下文档 NameServer 服务地址,格式 ip:port - wipeWritePerm - 从NameServer上清除 Broker写权限 -b BrokerName -- 2.32.0.windows.2