1419 lines
75 KiB
Diff
1419 lines
75 KiB
Diff
From a1bf49d5d07cf64374bc3dde5ab43add831433ad Mon Sep 17 00:00:00 2001
|
|
From: lizhimins <707364882@qq.com>
|
|
Date: Tue, 1 Aug 2023 15:56:34 +0800
|
|
Subject: [PATCH 1/6] [ISSUE #7093] Avoid dispatch tasks too much cause
|
|
dispatch task failed (#7094)
|
|
|
|
* Avoid dispatch tasks too much cause dispatch task failed
|
|
|
|
* set schedule task async
|
|
---
|
|
.../apache/rocketmq/tieredstore/TieredDispatcher.java | 11 ++++++-----
|
|
.../tieredstore/common/TieredStoreExecutor.java | 2 +-
|
|
2 files changed, 7 insertions(+), 6 deletions(-)
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
|
index 523b0c2cd..bb58ea7dd 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
|
@@ -82,7 +82,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
|
|
TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() ->
|
|
tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> {
|
|
if (!flatFile.getCompositeFlatFileLock().isLocked()) {
|
|
- dispatchFlatFile(flatFile);
|
|
+ dispatchFlatFileAsync(flatFile);
|
|
}
|
|
}), 30, 10, TimeUnit.SECONDS);
|
|
}
|
|
@@ -180,10 +180,6 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
|
|
message.release();
|
|
flatFile.getCompositeFlatFileLock().unlock();
|
|
}
|
|
- } else {
|
|
- if (!flatFile.getCompositeFlatFileLock().isLocked()) {
|
|
- this.dispatchFlatFileAsync(flatFile);
|
|
- }
|
|
}
|
|
}
|
|
|
|
@@ -199,6 +195,11 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
|
|
}
|
|
|
|
public void dispatchFlatFileAsync(CompositeQueueFlatFile flatFile, Consumer<Long> consumer) {
|
|
+ // Avoid dispatch tasks too much
|
|
+ if (TieredStoreExecutor.dispatchThreadPoolQueue.size() >
|
|
+ TieredStoreExecutor.QUEUE_CAPACITY * 0.75) {
|
|
+ return;
|
|
+ }
|
|
TieredStoreExecutor.dispatchExecutor.execute(() -> {
|
|
try {
|
|
dispatchFlatFile(flatFile);
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
|
|
index 23f1b01ea..6eb3478b3 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
|
|
@@ -27,7 +27,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
|
|
public class TieredStoreExecutor {
|
|
|
|
- private static final int QUEUE_CAPACITY = 10000;
|
|
+ public static final int QUEUE_CAPACITY = 10000;
|
|
|
|
// Visible for monitor
|
|
public static BlockingQueue<Runnable> dispatchThreadPoolQueue;
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From ab61183030f4f230619ea539cbd2cb977234208b Mon Sep 17 00:00:00 2001
|
|
From: Zhouxiang Zhan <zhouxzhan@apache.org>
|
|
Date: Tue, 1 Aug 2023 19:00:15 +0800
|
|
Subject: [PATCH 2/6] [ISSUE #7104] Add ReceiptHandleGroupKey for RenewEvent
|
|
(#7105)
|
|
|
|
---
|
|
.../proxy/common/ReceiptHandleGroupKey.java | 69 +++++++++++++++++++
|
|
.../rocketmq/proxy/common/RenewEvent.java | 9 ++-
|
|
.../processor/ReceiptHandleProcessor.java | 55 ++-------------
|
|
.../receipt/DefaultReceiptHandleManager.java | 36 +++++-----
|
|
.../DefaultReceiptHandleManagerTest.java | 4 +-
|
|
5 files changed, 101 insertions(+), 72 deletions(-)
|
|
create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java
|
|
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java
|
|
new file mode 100644
|
|
index 000000000..bd28393e5
|
|
--- /dev/null
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java
|
|
@@ -0,0 +1,69 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+
|
|
+package org.apache.rocketmq.proxy.common;
|
|
+
|
|
+import com.google.common.base.MoreObjects;
|
|
+import com.google.common.base.Objects;
|
|
+import io.netty.channel.Channel;
|
|
+
|
|
+public class ReceiptHandleGroupKey {
|
|
+ protected final Channel channel;
|
|
+ protected final String group;
|
|
+
|
|
+ public ReceiptHandleGroupKey(Channel channel, String group) {
|
|
+ this.channel = channel;
|
|
+ this.group = group;
|
|
+ }
|
|
+
|
|
+ protected String getChannelId() {
|
|
+ return channel.id().asLongText();
|
|
+ }
|
|
+
|
|
+ public String getGroup() {
|
|
+ return group;
|
|
+ }
|
|
+
|
|
+ public Channel getChannel() {
|
|
+ return channel;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean equals(Object o) {
|
|
+ if (this == o) {
|
|
+ return true;
|
|
+ }
|
|
+ if (o == null || getClass() != o.getClass()) {
|
|
+ return false;
|
|
+ }
|
|
+ ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o;
|
|
+ return Objects.equal(getChannelId(), key.getChannelId()) && Objects.equal(group, key.group);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public int hashCode() {
|
|
+ return Objects.hashCode(getChannelId(), group);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public String toString() {
|
|
+ return MoreObjects.toStringHelper(this)
|
|
+ .add("channelId", getChannelId())
|
|
+ .add("group", group)
|
|
+ .toString();
|
|
+ }
|
|
+}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
|
|
index 0ff65c1cc..8d591560a 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
|
|
@@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture;
|
|
import org.apache.rocketmq.client.consumer.AckResult;
|
|
|
|
public class RenewEvent {
|
|
+ protected ReceiptHandleGroupKey key;
|
|
protected MessageReceiptHandle messageReceiptHandle;
|
|
protected long renewTime;
|
|
protected EventType eventType;
|
|
@@ -32,13 +33,19 @@ public class RenewEvent {
|
|
CLEAR_GROUP
|
|
}
|
|
|
|
- public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, EventType eventType, CompletableFuture<AckResult> future) {
|
|
+ public RenewEvent(ReceiptHandleGroupKey key, MessageReceiptHandle messageReceiptHandle, long renewTime,
|
|
+ EventType eventType, CompletableFuture<AckResult> future) {
|
|
+ this.key = key;
|
|
this.messageReceiptHandle = messageReceiptHandle;
|
|
this.renewTime = renewTime;
|
|
this.eventType = eventType;
|
|
this.future = future;
|
|
}
|
|
|
|
+ public ReceiptHandleGroupKey getKey() {
|
|
+ return key;
|
|
+ }
|
|
+
|
|
public MessageReceiptHandle getMessageReceiptHandle() {
|
|
return messageReceiptHandle;
|
|
}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
|
|
index 460842a86..5e1be9321 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
|
|
@@ -17,19 +17,17 @@
|
|
|
|
package org.apache.rocketmq.proxy.processor;
|
|
|
|
-import com.google.common.base.MoreObjects;
|
|
-import com.google.common.base.Objects;
|
|
import io.netty.channel.Channel;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.consumer.ReceiptHandle;
|
|
import org.apache.rocketmq.common.state.StateEventListener;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
-import org.apache.rocketmq.proxy.common.RenewEvent;
|
|
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
|
|
import org.apache.rocketmq.proxy.common.ProxyContext;
|
|
-import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
|
|
+import org.apache.rocketmq.proxy.common.RenewEvent;
|
|
import org.apache.rocketmq.proxy.service.ServiceManager;
|
|
+import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
|
|
|
|
public class ReceiptHandleProcessor extends AbstractProcessor {
|
|
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
|
|
@@ -38,7 +36,8 @@ public class ReceiptHandleProcessor extends AbstractProcessor {
|
|
public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) {
|
|
super(messagingProcessor, serviceManager);
|
|
StateEventListener<RenewEvent> eventListener = event -> {
|
|
- ProxyContext context = createContext(event.getEventType().name());
|
|
+ ProxyContext context = createContext(event.getEventType().name())
|
|
+ .setChannel(event.getKey().getChannel());
|
|
MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle();
|
|
ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
|
|
messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
|
|
@@ -66,50 +65,4 @@ public class ReceiptHandleProcessor extends AbstractProcessor {
|
|
return receiptHandleManager.removeReceiptHandle(ctx, channel, group, msgID, receiptHandle);
|
|
}
|
|
|
|
- public static class ReceiptHandleGroupKey {
|
|
- protected final Channel channel;
|
|
- protected final String group;
|
|
-
|
|
- public ReceiptHandleGroupKey(Channel channel, String group) {
|
|
- this.channel = channel;
|
|
- this.group = group;
|
|
- }
|
|
-
|
|
- protected String getChannelId() {
|
|
- return channel.id().asLongText();
|
|
- }
|
|
-
|
|
- public String getGroup() {
|
|
- return group;
|
|
- }
|
|
-
|
|
- public Channel getChannel() {
|
|
- return channel;
|
|
- }
|
|
-
|
|
- @Override
|
|
- public boolean equals(Object o) {
|
|
- if (this == o) {
|
|
- return true;
|
|
- }
|
|
- if (o == null || getClass() != o.getClass()) {
|
|
- return false;
|
|
- }
|
|
- ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o;
|
|
- return Objects.equal(getChannelId(), key.getChannelId()) && Objects.equal(group, key.group);
|
|
- }
|
|
-
|
|
- @Override
|
|
- public int hashCode() {
|
|
- return Objects.hashCode(getChannelId(), group);
|
|
- }
|
|
-
|
|
- @Override
|
|
- public String toString() {
|
|
- return MoreObjects.toStringHelper(this)
|
|
- .add("channelId", getChannelId())
|
|
- .add("group", group)
|
|
- .toString();
|
|
- }
|
|
- }
|
|
}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
|
|
index 9f35435f0..69f44344a 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
|
|
@@ -55,7 +55,7 @@ import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
|
|
import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
|
|
import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
import org.apache.rocketmq.proxy.config.ProxyConfig;
|
|
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
|
|
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
|
|
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
|
|
import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
|
|
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
|
|
@@ -64,7 +64,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
|
|
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
|
|
protected final MetadataService metadataService;
|
|
protected final ConsumerManager consumerManager;
|
|
- protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap;
|
|
+ protected final ConcurrentMap<ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap;
|
|
protected final StateEventListener<RenewEvent> eventListener;
|
|
protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy();
|
|
protected final ScheduledExecutorService scheduledExecutorService =
|
|
@@ -96,7 +96,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
|
|
// if the channel sync from other proxy is expired, not to clear data of connect to current proxy
|
|
return;
|
|
}
|
|
- clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
|
|
+ clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
|
|
log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
|
|
}
|
|
}
|
|
@@ -125,19 +125,19 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
|
|
}
|
|
|
|
public void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) {
|
|
- ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
|
|
+ ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleGroupKey(channel, group),
|
|
k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
|
|
}
|
|
|
|
public MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle) {
|
|
- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
|
|
+ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleGroupKey(channel, group));
|
|
if (handleGroup == null) {
|
|
return null;
|
|
}
|
|
return handleGroup.remove(msgID, receiptHandle);
|
|
}
|
|
|
|
- protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) {
|
|
+ protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) {
|
|
return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null;
|
|
}
|
|
|
|
@@ -145,8 +145,8 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
|
|
Stopwatch stopwatch = Stopwatch.createStarted();
|
|
try {
|
|
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
|
|
- for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
|
|
- ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey();
|
|
+ for (Map.Entry<ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
|
|
+ ReceiptHandleGroupKey key = entry.getKey();
|
|
if (clientIsOffline(key)) {
|
|
clearGroup(key);
|
|
continue;
|
|
@@ -159,7 +159,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
|
|
if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) {
|
|
return;
|
|
}
|
|
- renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr));
|
|
+ renewalWorkerService.submit(() -> renewMessage(key, group, msgID, handleStr));
|
|
});
|
|
}
|
|
} catch (Exception e) {
|
|
@@ -169,15 +169,15 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
|
|
log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis());
|
|
}
|
|
|
|
- protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) {
|
|
+ protected void renewMessage(ReceiptHandleGroupKey key, ReceiptHandleGroup group, String msgID, String handleStr) {
|
|
try {
|
|
- group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
|
|
+ group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> startRenewMessage(key, messageReceiptHandle));
|
|
} catch (Exception e) {
|
|
log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e);
|
|
}
|
|
}
|
|
|
|
- protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
|
|
+ protected CompletableFuture<MessageReceiptHandle> startRenewMessage(ReceiptHandleGroupKey key, MessageReceiptHandle messageReceiptHandle) {
|
|
CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>();
|
|
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
|
|
long current = System.currentTimeMillis();
|
|
@@ -188,7 +188,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
|
|
}
|
|
if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
|
|
CompletableFuture<AckResult> future = new CompletableFuture<>();
|
|
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), RenewEvent.EventType.RENEW, future));
|
|
+ eventListener.fireEvent(new RenewEvent(key, messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), RenewEvent.EventType.RENEW, future));
|
|
future.whenComplete((ackResult, throwable) -> {
|
|
if (throwable != null) {
|
|
log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
|
|
@@ -218,7 +218,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
|
|
}
|
|
RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
|
|
CompletableFuture<AckResult> future = new CompletableFuture<>();
|
|
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), RenewEvent.EventType.STOP_RENEW, future));
|
|
+ eventListener.fireEvent(new RenewEvent(key, messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), RenewEvent.EventType.STOP_RENEW, future));
|
|
future.whenComplete((ackResult, throwable) -> {
|
|
if (throwable != null) {
|
|
log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable);
|
|
@@ -233,7 +233,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
|
|
return resFuture;
|
|
}
|
|
|
|
- protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) {
|
|
+ protected void clearGroup(ReceiptHandleGroupKey key) {
|
|
if (key == null) {
|
|
return;
|
|
}
|
|
@@ -246,7 +246,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
|
|
try {
|
|
handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> {
|
|
CompletableFuture<AckResult> future = new CompletableFuture<>();
|
|
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), RenewEvent.EventType.CLEAR_GROUP, future));
|
|
+ eventListener.fireEvent(new RenewEvent(key, messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), RenewEvent.EventType.CLEAR_GROUP, future));
|
|
return CompletableFuture.completedFuture(null);
|
|
});
|
|
} catch (Exception e) {
|
|
@@ -257,8 +257,8 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
|
|
|
|
protected void clearAllHandle() {
|
|
log.info("start clear all handle in receiptHandleProcessor");
|
|
- Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
|
|
- for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
|
|
+ Set<ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
|
|
+ for (ReceiptHandleGroupKey key : keySet) {
|
|
clearGroup(key);
|
|
}
|
|
log.info("clear all handle in receiptHandleProcessor done");
|
|
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
|
|
index 7c6943e44..25ae1509a 100644
|
|
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
|
|
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
|
|
@@ -45,7 +45,7 @@ import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
|
|
import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
import org.apache.rocketmq.proxy.config.ProxyConfig;
|
|
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
|
|
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
|
|
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
|
|
import org.apache.rocketmq.proxy.service.BaseServiceTest;
|
|
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
|
|
import org.apache.rocketmq.remoting.protocol.LanguageCode;
|
|
@@ -445,7 +445,7 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest {
|
|
public void testClearGroup() {
|
|
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
|
|
receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
- receiptHandleManager.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
|
|
+ receiptHandleManager.clearGroup(new ReceiptHandleGroupKey(channel, GROUP));
|
|
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
|
|
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
|
|
receiptHandleManager.scheduleRenewTask();
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From c06facf08939772f81fffde72d14746d3a9384f2 Mon Sep 17 00:00:00 2001
|
|
From: rongtong <jinrongtong5@163.com>
|
|
Date: Thu, 3 Aug 2023 11:39:24 +0800
|
|
Subject: [PATCH 3/6] [ISSUE #7102] Making perm equal to 0 is valid
|
|
|
|
---
|
|
.../main/java/org/apache/rocketmq/common/constant/PermName.java | 2 +-
|
|
1 file changed, 1 insertion(+), 1 deletion(-)
|
|
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
|
|
index d7c76b4c0..d87461d7f 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
|
|
@@ -62,7 +62,7 @@ public class PermName {
|
|
}
|
|
|
|
public static boolean isValid(final int perm) {
|
|
- return perm >= PERM_INHERIT && perm < PERM_PRIORITY;
|
|
+ return perm >= 0 && perm < PERM_PRIORITY;
|
|
}
|
|
|
|
public static boolean isPriority(final int perm) {
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 1fe5d6233455f191d7195fb5f6e27dc46510dd3e Mon Sep 17 00:00:00 2001
|
|
From: koado <34032341+Koado@users.noreply.github.com>
|
|
Date: Thu, 3 Aug 2023 11:40:16 +0800
|
|
Subject: [PATCH 4/6] [ISSUE #7074] Allow a BoundaryType to be specified when
|
|
retrieving offset based on the timestamp (#7082)
|
|
|
|
* add new interface for searching offset with boundary type
|
|
|
|
* format code
|
|
|
|
* fix failed test
|
|
|
|
* unify two BoundaryType class
|
|
|
|
* add interface getOffsetInQueueByTime(long timestamp, BoundaryType boundaryTYpe) in ConsumeQueueInterface
|
|
|
|
* fix AdminBrokerProcessorTest unnecessary Mockito stubbings
|
|
---
|
|
.../processor/AdminBrokerProcessor.java | 4 +-
|
|
.../processor/AdminBrokerProcessorTest.java | 3 +-
|
|
.../rocketmq/client/impl/MQAdminImpl.java | 9 +++-
|
|
.../rocketmq/client/impl/MQClientAPIImpl.java | 10 +++-
|
|
.../remoting/protocol/RemotingCommand.java | 4 ++
|
|
.../header/SearchOffsetRequestHeader.java | 13 +++++
|
|
.../apache/rocketmq/store/ConsumeQueue.java | 2 +
|
|
.../rocketmq/store/DefaultMessageStore.java | 7 ++-
|
|
.../apache/rocketmq/store/MessageStore.java | 12 +++++
|
|
.../store/queue/BatchConsumeQueue.java | 37 +++++++++++---
|
|
.../store/queue/ConsumeQueueInterface.java | 10 ++++
|
|
.../store/queue/SparseConsumeQueue.java | 3 +-
|
|
.../tieredstore/MessageStoreFetcher.java | 2 +-
|
|
.../tieredstore/TieredMessageFetcher.java | 2 +-
|
|
.../tieredstore/TieredMessageStore.java | 3 +-
|
|
.../tieredstore/common/BoundaryType.java | 51 -------------------
|
|
.../tieredstore/file/CompositeAccess.java | 2 +-
|
|
.../tieredstore/file/CompositeFlatFile.java | 2 +-
|
|
.../tieredstore/file/TieredConsumeQueue.java | 2 +-
|
|
.../tieredstore/file/TieredFlatFile.java | 2 +-
|
|
.../tieredstore/TieredMessageFetcherTest.java | 2 +-
|
|
.../tieredstore/TieredMessageStoreTest.java | 2 +-
|
|
.../file/CompositeQueueFlatFileTest.java | 2 +-
|
|
.../tools/admin/DefaultMQAdminExt.java | 9 ++++
|
|
.../tools/admin/DefaultMQAdminExtImpl.java | 5 ++
|
|
.../tools/admin/DefaultMQAdminExtTest.java | 30 +++++++++++
|
|
26 files changed, 154 insertions(+), 76 deletions(-)
|
|
delete mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
|
index 569a1c57b..a6ce03dc2 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
|
@@ -994,7 +994,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
|
continue;
|
|
}
|
|
if (mappingDetail.getBname().equals(item.getBname())) {
|
|
- offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp);
|
|
+ offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp, requestHeader.getBoundaryType());
|
|
if (offset > 0) {
|
|
offset = item.computeStaticQueueOffsetStrictly(offset);
|
|
break;
|
|
@@ -1045,7 +1045,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
|
}
|
|
|
|
long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
|
|
- requestHeader.getTimestamp());
|
|
+ requestHeader.getTimestamp(), requestHeader.getBoundaryType());
|
|
|
|
responseHeader.setOffset(offset);
|
|
|
|
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
|
index fa2d929b0..a470c0cf2 100644
|
|
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
|
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
|
|
@@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ConsumerManager;
|
|
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
|
|
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
|
|
import org.apache.rocketmq.broker.topic.TopicConfigManager;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.common.BrokerConfig;
|
|
import org.apache.rocketmq.common.MixAll;
|
|
import org.apache.rocketmq.common.TopicConfig;
|
|
@@ -311,7 +312,7 @@ public class AdminBrokerProcessorTest {
|
|
@Test
|
|
public void testSearchOffsetByTimestamp() throws Exception {
|
|
messageStore = mock(MessageStore.class);
|
|
- when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong())).thenReturn(Long.MIN_VALUE);
|
|
+ when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), any(BoundaryType.class))).thenReturn(Long.MIN_VALUE);
|
|
when(brokerController.getMessageStore()).thenReturn(messageStore);
|
|
SearchOffsetRequestHeader searchOffsetRequestHeader = new SearchOffsetRequestHeader();
|
|
searchOffsetRequestHeader.setTopic("topic");
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
|
|
index 33fc44fd6..1ef3a9483 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
|
|
@@ -34,6 +34,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
|
|
import org.apache.rocketmq.client.exception.MQClientException;
|
|
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
|
|
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.common.MixAll;
|
|
import org.apache.rocketmq.common.TopicConfig;
|
|
import org.apache.rocketmq.common.help.FAQUrl;
|
|
@@ -184,6 +185,11 @@ public class MQAdminImpl {
|
|
}
|
|
|
|
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
|
|
+ // default return lower boundary offset when there are more than one offsets.
|
|
+ return searchOffset(mq, timestamp, BoundaryType.LOWER);
|
|
+ }
|
|
+
|
|
+ public long searchOffset(MessageQueue mq, long timestamp, BoundaryType boundaryType) throws MQClientException {
|
|
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
|
|
if (null == brokerAddr) {
|
|
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
|
|
@@ -192,7 +198,8 @@ public class MQAdminImpl {
|
|
|
|
if (brokerAddr != null) {
|
|
try {
|
|
- return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp, timeoutMillis);
|
|
+ return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp,
|
|
+ boundaryType, timeoutMillis);
|
|
} catch (Exception e) {
|
|
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
|
|
}
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
|
|
index 4c9c3a169..708a6acd1 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
|
|
@@ -76,6 +76,7 @@ import org.apache.rocketmq.common.namesrv.NameServerUpdateCallback;
|
|
import org.apache.rocketmq.common.namesrv.TopAddressing;
|
|
import org.apache.rocketmq.common.sysflag.PullSysFlag;
|
|
import org.apache.rocketmq.common.topic.TopicValidator;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.remoting.CommandCustomHeader;
|
|
import org.apache.rocketmq.remoting.InvokeCallback;
|
|
import org.apache.rocketmq.remoting.RPCHook;
|
|
@@ -1237,13 +1238,20 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
|
|
public long searchOffset(final String addr, final MessageQueue messageQueue, final long timestamp,
|
|
final long timeoutMillis)
|
|
throws RemotingException, MQBrokerException, InterruptedException {
|
|
+ // default return lower boundary offset when there are more than one offsets.
|
|
+ return searchOffset(addr, messageQueue, timestamp, BoundaryType.LOWER, timeoutMillis);
|
|
+ }
|
|
+
|
|
+ public long searchOffset(final String addr, final MessageQueue messageQueue, final long timestamp,
|
|
+ final BoundaryType boundaryType, final long timeoutMillis)
|
|
+ throws RemotingException, MQBrokerException, InterruptedException {
|
|
SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
|
|
requestHeader.setTopic(messageQueue.getTopic());
|
|
requestHeader.setQueueId(messageQueue.getQueueId());
|
|
requestHeader.setBname(messageQueue.getBrokerName());
|
|
requestHeader.setTimestamp(timestamp);
|
|
+ requestHeader.setBoundaryType(boundaryType);
|
|
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
|
|
-
|
|
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
|
|
request, timeoutMillis);
|
|
assert response != null;
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
|
|
index a6ed022ea..d27135132 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
|
|
@@ -34,6 +34,7 @@ import java.util.Set;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
@@ -63,6 +64,7 @@ public class RemotingCommand {
|
|
private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
|
|
private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
|
|
private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
|
|
+ private static final String BOUNDARY_TYPE_CANONICAL_NAME = BoundaryType.class.getCanonicalName();
|
|
private static volatile int configVersion = -1;
|
|
private static AtomicInteger requestId = new AtomicInteger(0);
|
|
|
|
@@ -311,6 +313,8 @@ public class RemotingCommand {
|
|
valueParsed = Boolean.parseBoolean(value);
|
|
} else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
|
|
valueParsed = Double.parseDouble(value);
|
|
+ } else if (type.equals(BOUNDARY_TYPE_CANONICAL_NAME)) {
|
|
+ valueParsed = BoundaryType.getType(value);
|
|
} else {
|
|
throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
|
|
}
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
|
|
index 0c644d739..79c3d337b 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
|
|
@@ -21,6 +21,7 @@
|
|
package org.apache.rocketmq.remoting.protocol.header;
|
|
|
|
import com.google.common.base.MoreObjects;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.remoting.annotation.CFNotNull;
|
|
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
|
|
import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
|
|
@@ -33,6 +34,8 @@ public class SearchOffsetRequestHeader extends TopicQueueRequestHeader {
|
|
@CFNotNull
|
|
private Long timestamp;
|
|
|
|
+ private BoundaryType boundaryType;
|
|
+
|
|
@Override
|
|
public void checkFields() throws RemotingCommandException {
|
|
|
|
@@ -66,12 +69,22 @@ public class SearchOffsetRequestHeader extends TopicQueueRequestHeader {
|
|
this.timestamp = timestamp;
|
|
}
|
|
|
|
+ public BoundaryType getBoundaryType() {
|
|
+ // default return LOWER
|
|
+ return boundaryType == null ? BoundaryType.LOWER : boundaryType;
|
|
+ }
|
|
+
|
|
+ public void setBoundaryType(BoundaryType boundaryType) {
|
|
+ this.boundaryType = boundaryType;
|
|
+ }
|
|
+
|
|
@Override
|
|
public String toString() {
|
|
return MoreObjects.toStringHelper(this)
|
|
.add("topic", topic)
|
|
.add("queueId", queueId)
|
|
.add("timestamp", timestamp)
|
|
+ .add("boundaryType", boundaryType.getName())
|
|
.toString();
|
|
}
|
|
}
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
|
|
index 0c44ad043..a0b886eb0 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
|
|
@@ -204,6 +204,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
|
|
return CQ_STORE_UNIT_SIZE;
|
|
}
|
|
|
|
+ @Deprecated
|
|
@Override
|
|
public long getOffsetInQueueByTime(final long timestamp) {
|
|
MappedFile mappedFile = this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp,
|
|
@@ -211,6 +212,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
|
|
return binarySearchInQueueByTime(mappedFile, timestamp, BoundaryType.LOWER);
|
|
}
|
|
|
|
+ @Override
|
|
public long getOffsetInQueueByTime(final long timestamp, final BoundaryType boundaryType) {
|
|
MappedFile mappedFile = this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp,
|
|
messageStore.getCommitLog(), boundaryType);
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
|
index acc1610e0..25e4a166f 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
|
@@ -82,6 +82,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
|
|
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
|
|
import org.apache.rocketmq.common.utils.QueueTypeUtils;
|
|
import org.apache.rocketmq.common.utils.ServiceProvider;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
|
|
@@ -1015,9 +1016,13 @@ public class DefaultMessageStore implements MessageStore {
|
|
|
|
@Override
|
|
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
|
|
+ return getOffsetInQueueByTime(topic, queueId, timestamp, BoundaryType.LOWER);
|
|
+ }
|
|
+
|
|
+ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) {
|
|
ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId);
|
|
if (logic != null) {
|
|
- long resultOffset = logic.getOffsetInQueueByTime(timestamp);
|
|
+ long resultOffset = logic.getOffsetInQueueByTime(timestamp, boundaryType);
|
|
// Make sure the result offset is in valid range.
|
|
resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue());
|
|
resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue());
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
|
|
index 3db0c18f7..31bbb907f 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
|
|
@@ -29,6 +29,7 @@ import java.util.Set;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.function.Supplier;
|
|
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.common.Pair;
|
|
import org.apache.rocketmq.common.SystemClock;
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
@@ -226,6 +227,17 @@ public interface MessageStore {
|
|
*/
|
|
long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp);
|
|
|
|
+ /**
|
|
+ * Look up the physical offset of the message whose store timestamp is as specified with specific boundaryType.
|
|
+ *
|
|
+ * @param topic Topic of the message.
|
|
+ * @param queueId Queue ID.
|
|
+ * @param timestamp Timestamp to look up.
|
|
+ * @param boundaryType Lower or Upper
|
|
+ * @return physical offset which matches.
|
|
+ */
|
|
+ long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp, final BoundaryType boundaryType);
|
|
+
|
|
/**
|
|
* Look up the message by given commit log offset.
|
|
*
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
|
|
index 8fec1bf7b..387c233bf 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
|
|
@@ -24,6 +24,7 @@ import java.util.Map;
|
|
import java.util.concurrent.ConcurrentSkipListMap;
|
|
import java.util.function.Function;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.common.attribute.CQType;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.message.MessageAccessor;
|
|
@@ -708,8 +709,14 @@ public class BatchConsumeQueue implements ConsumeQueueInterface {
|
|
* @param timestamp
|
|
* @return
|
|
*/
|
|
+ @Deprecated
|
|
@Override
|
|
public long getOffsetInQueueByTime(final long timestamp) {
|
|
+ return getOffsetInQueueByTime(timestamp, BoundaryType.LOWER);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public long getOffsetInQueueByTime(long timestamp, BoundaryType boundaryType) {
|
|
MappedFile targetBcq;
|
|
BatchOffsetIndex targetMinOffset;
|
|
|
|
@@ -760,7 +767,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface {
|
|
if (timestamp >= maxQueueTimestamp) {
|
|
return byteBuffer.getLong(right + MSG_BASE_OFFSET_INDEX);
|
|
}
|
|
- int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp);
|
|
+ int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp, boundaryType);
|
|
if (mid != -1) {
|
|
return byteBuffer.getLong(mid + MSG_BASE_OFFSET_INDEX);
|
|
}
|
|
@@ -819,11 +826,11 @@ public class BatchConsumeQueue implements ConsumeQueueInterface {
|
|
|
|
/**
|
|
* Find the offset of which the value is equal or larger than the given targetValue.
|
|
- * If there are many values equal to the target, then find the earliest one.
|
|
+ * If there are many values equal to the target, then return the lowest offset if boundaryType is LOWER while
|
|
+ * return the highest offset if boundaryType is UPPER.
|
|
*/
|
|
public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, final int unitSize,
|
|
- final int unitShift,
|
|
- long targetValue) {
|
|
+ final int unitShift, long targetValue, BoundaryType boundaryType) {
|
|
int mid = -1;
|
|
while (left <= right) {
|
|
mid = ceil((left + right) / 2);
|
|
@@ -844,10 +851,24 @@ public class BatchConsumeQueue implements ConsumeQueueInterface {
|
|
}
|
|
} else {
|
|
//mid is actually in the mid
|
|
- if (tmpValue < targetValue) {
|
|
- left = mid + unitSize;
|
|
- } else {
|
|
- right = mid;
|
|
+ switch (boundaryType) {
|
|
+ case LOWER:
|
|
+ if (tmpValue < targetValue) {
|
|
+ left = mid + unitSize;
|
|
+ } else {
|
|
+ right = mid;
|
|
+ }
|
|
+ break;
|
|
+ case UPPER:
|
|
+ if (tmpValue <= targetValue) {
|
|
+ left = mid;
|
|
+ } else {
|
|
+ right = mid - unitSize;
|
|
+ }
|
|
+ break;
|
|
+ default:
|
|
+ log.warn("Unknown boundary type");
|
|
+ return -1;
|
|
}
|
|
}
|
|
}
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
|
|
index d7213fa37..55d080829 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
|
|
@@ -17,6 +17,7 @@
|
|
|
|
package org.apache.rocketmq.store.queue;
|
|
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.common.attribute.CQType;
|
|
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
|
import org.apache.rocketmq.store.DispatchRequest;
|
|
@@ -93,6 +94,15 @@ public interface ConsumeQueueInterface extends FileQueueLifeCycle {
|
|
*/
|
|
long getOffsetInQueueByTime(final long timestamp);
|
|
|
|
+ /**
|
|
+ * Get the message whose timestamp is the smallest, greater than or equal to the given time and when there are more
|
|
+ * than one message satisfy the condition, decide which one to return based on boundaryType.
|
|
+ * @param timestamp timestamp
|
|
+ * @param boundaryType Lower or Upper
|
|
+ * @return the offset(index)
|
|
+ */
|
|
+ long getOffsetInQueueByTime(final long timestamp, final BoundaryType boundaryType);
|
|
+
|
|
/**
|
|
* The max physical offset of commitlog has been dispatched to this queue.
|
|
* It should be exclusive.
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
|
|
index 5b397d696..4a5f3a93b 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
|
|
@@ -16,6 +16,7 @@
|
|
*/
|
|
package org.apache.rocketmq.store.queue;
|
|
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.common.UtilAll;
|
|
import org.apache.rocketmq.store.MessageStore;
|
|
import org.apache.rocketmq.store.SelectMappedBufferResult;
|
|
@@ -148,7 +149,7 @@ public class SparseConsumeQueue extends BatchConsumeQueue {
|
|
ByteBuffer byteBuffer = sbr.getByteBuffer();
|
|
int left = minOffset.getIndexPos();
|
|
int right = maxOffset.getIndexPos();
|
|
- int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset);
|
|
+ int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset, BoundaryType.LOWER);
|
|
if (mid != -1) {
|
|
return minOffset.getMappedFile().selectMappedBuffer(mid);
|
|
}
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
|
|
index f4d576d29..8ae4dc7f9 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
|
|
@@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture;
|
|
import org.apache.rocketmq.store.GetMessageResult;
|
|
import org.apache.rocketmq.store.MessageFilter;
|
|
import org.apache.rocketmq.store.QueryMessageResult;
|
|
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
|
|
public interface MessageStoreFetcher {
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
|
index c4fed54bd..9a9a3e5a5 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
|
|
@@ -39,7 +39,6 @@ import org.apache.rocketmq.store.GetMessageStatus;
|
|
import org.apache.rocketmq.store.MessageFilter;
|
|
import org.apache.rocketmq.store.QueryMessageResult;
|
|
import org.apache.rocketmq.store.SelectMappedBufferResult;
|
|
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
|
|
import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture;
|
|
import org.apache.rocketmq.tieredstore.common.MessageCacheKey;
|
|
import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
|
|
@@ -59,6 +58,7 @@ import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
|
|
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
|
|
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
|
|
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
|
|
public class TieredMessageFetcher implements MessageStoreFetcher {
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
index 115d9640d..1f12410f2 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
|
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.function.Supplier;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.common.MixAll;
|
|
import org.apache.rocketmq.common.Pair;
|
|
import org.apache.rocketmq.common.PopAckConstants;
|
|
@@ -45,7 +46,6 @@ import org.apache.rocketmq.store.QueryMessageResult;
|
|
import org.apache.rocketmq.store.SelectMappedBufferResult;
|
|
import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
|
|
import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
|
|
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
|
|
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
|
|
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
|
|
import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
|
|
@@ -287,6 +287,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
|
return getOffsetInQueueByTime(topic, queueId, timestamp, BoundaryType.LOWER);
|
|
}
|
|
|
|
+ @Override
|
|
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) {
|
|
long earliestTimeInNextStore = next.getEarliestMessageTime();
|
|
if (earliestTimeInNextStore <= 0) {
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java
|
|
deleted file mode 100644
|
|
index 77e53ec11..000000000
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java
|
|
+++ /dev/null
|
|
@@ -1,51 +0,0 @@
|
|
-/*
|
|
- * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
- * contributor license agreements. See the NOTICE file distributed with
|
|
- * this work for additional information regarding copyright ownership.
|
|
- * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
- * (the "License"); you may not use this file except in compliance with
|
|
- * the License. You may obtain a copy of the License at
|
|
- *
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
- *
|
|
- * Unless required by applicable law or agreed to in writing, software
|
|
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
- * See the License for the specific language governing permissions and
|
|
- * limitations under the License.
|
|
- */
|
|
-package org.apache.rocketmq.tieredstore.common;
|
|
-
|
|
-/**
|
|
- * This enumeration represents the boundary types.
|
|
- * It has two constants, lower and upper, which represent the lower and upper boundaries respectively.
|
|
- */
|
|
-public enum BoundaryType {
|
|
-
|
|
- /**
|
|
- * Represents the lower boundary.
|
|
- */
|
|
- LOWER("lower"),
|
|
-
|
|
- /**
|
|
- * Represents the upper boundary.
|
|
- */
|
|
- UPPER("upper");
|
|
-
|
|
- private final String name;
|
|
-
|
|
- BoundaryType(String name) {
|
|
- this.name = name;
|
|
- }
|
|
-
|
|
- public String getName() {
|
|
- return name;
|
|
- }
|
|
-
|
|
- public static BoundaryType getType(String name) {
|
|
- if (BoundaryType.UPPER.getName().equalsIgnoreCase(name)) {
|
|
- return UPPER;
|
|
- }
|
|
- return LOWER;
|
|
- }
|
|
-}
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
|
|
index bc1062cd0..3d962e40d 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
|
|
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import org.apache.rocketmq.store.DispatchRequest;
|
|
import org.apache.rocketmq.tieredstore.common.AppendResult;
|
|
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
|
|
interface CompositeAccess {
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
|
|
index fa01382e1..df4baf33f 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
|
|
@@ -37,7 +37,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.store.DispatchRequest;
|
|
import org.apache.rocketmq.tieredstore.common.AppendResult;
|
|
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
|
|
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
|
import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture;
|
|
import org.apache.rocketmq.tieredstore.common.InFlightRequestKey;
|
|
@@ -46,6 +45,7 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
|
|
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
|
|
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
|
|
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
|
|
public class CompositeFlatFile implements CompositeAccess {
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
|
|
index ff9572af6..35007f8cb 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
|
|
@@ -21,8 +21,8 @@ import java.nio.ByteBuffer;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import org.apache.commons.lang3.tuple.Pair;
|
|
import org.apache.rocketmq.tieredstore.common.AppendResult;
|
|
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
|
|
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
|
|
public class TieredConsumeQueue {
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
|
index 90ca843bf..75ce8d89f 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
|
|
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.tieredstore.common.AppendResult;
|
|
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
|
|
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
|
import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
|
|
import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
|
|
@@ -42,6 +41,7 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
|
|
import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator;
|
|
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
|
|
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
|
|
public class TieredFlatFile {
|
|
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
|
|
index df3720bab..d75b2f916 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
|
|
@@ -30,7 +30,6 @@ import org.apache.rocketmq.store.GetMessageStatus;
|
|
import org.apache.rocketmq.store.QueryMessageResult;
|
|
import org.apache.rocketmq.store.SelectMappedBufferResult;
|
|
import org.apache.rocketmq.tieredstore.common.AppendResult;
|
|
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
|
|
import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
|
|
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
|
|
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
|
|
@@ -41,6 +40,7 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
|
|
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
|
|
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
|
|
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.awaitility.Awaitility;
|
|
import org.junit.After;
|
|
import org.junit.Assert;
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
|
|
index 58b7a52cc..8601392e7 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
|
|
@@ -37,11 +37,11 @@ import org.apache.rocketmq.store.QueryMessageResult;
|
|
import org.apache.rocketmq.store.SelectMappedBufferResult;
|
|
import org.apache.rocketmq.store.config.MessageStoreConfig;
|
|
import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
|
|
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
|
|
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
|
|
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
|
|
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
|
|
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.junit.After;
|
|
import org.junit.Assert;
|
|
import org.junit.Before;
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
|
|
index 8322c72ed..27efe111e 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
|
|
@@ -23,7 +23,6 @@ import org.apache.rocketmq.store.ConsumeQueue;
|
|
import org.apache.rocketmq.store.DispatchRequest;
|
|
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
|
|
import org.apache.rocketmq.tieredstore.common.AppendResult;
|
|
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
|
|
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
|
|
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
|
|
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
|
|
@@ -33,6 +32,7 @@ import org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment;
|
|
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
|
|
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
|
|
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.junit.After;
|
|
import org.junit.Assert;
|
|
import org.junit.Before;
|
|
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
|
|
index dd9c6a9b4..f0a08dfb1 100644
|
|
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
|
|
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
|
|
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageExt;
|
|
import org.apache.rocketmq.common.message.MessageQueue;
|
|
import org.apache.rocketmq.common.message.MessageRequestMode;
|
|
import org.apache.rocketmq.common.topic.TopicValidator;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.remoting.RPCHook;
|
|
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
|
|
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
|
|
@@ -122,6 +123,14 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
|
|
return defaultMQAdminExtImpl.searchOffset(mq, timestamp);
|
|
}
|
|
|
|
+ public long searchLowerBoundaryOffset(MessageQueue mq, long timestamp) throws MQClientException {
|
|
+ return defaultMQAdminExtImpl.searchOffset(mq, timestamp, BoundaryType.LOWER);
|
|
+ }
|
|
+
|
|
+ public long searchUpperBoundaryOffset(MessageQueue mq, long timestamp) throws MQClientException {
|
|
+ return defaultMQAdminExtImpl.searchOffset(mq, timestamp, BoundaryType.UPPER);
|
|
+ }
|
|
+
|
|
@Override
|
|
public long maxOffset(MessageQueue mq) throws MQClientException {
|
|
return defaultMQAdminExtImpl.maxOffset(mq);
|
|
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
|
|
index c5c467bf0..fa3596d51 100644
|
|
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
|
|
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
|
|
@@ -65,6 +65,7 @@ import org.apache.rocketmq.common.message.MessageRequestMode;
|
|
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
|
|
import org.apache.rocketmq.common.topic.TopicValidator;
|
|
import org.apache.rocketmq.common.utils.NetworkUtil;
|
|
+import org.apache.rocketmq.common.BoundaryType;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.RPCHook;
|
|
@@ -1700,6 +1701,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
|
|
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
|
|
}
|
|
|
|
+ public long searchOffset(MessageQueue mq, long timestamp, BoundaryType boundaryType) throws MQClientException {
|
|
+ return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp, boundaryType);
|
|
+ }
|
|
+
|
|
@Override
|
|
public long maxOffset(MessageQueue mq) throws MQClientException {
|
|
return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
|
|
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
|
|
index b94754f22..dc5642f88 100644
|
|
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
|
|
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
|
|
@@ -512,6 +512,36 @@ public class DefaultMQAdminExtTest {
|
|
assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(TOPIC1, BROKER1_NAME, 0), System.currentTimeMillis())).isEqualTo(101L);
|
|
}
|
|
|
|
+ @Test
|
|
+ public void testSearchOffsetWithSpecificBoundaryType() throws Exception {
|
|
+ // do mock
|
|
+ DefaultMQAdminExt mockDefaultMQAdminExt = mock(DefaultMQAdminExt.class);
|
|
+ when(mockDefaultMQAdminExt.minOffset(any(MessageQueue.class))).thenReturn(0L);
|
|
+ when(mockDefaultMQAdminExt.maxOffset(any(MessageQueue.class))).thenReturn(101L);
|
|
+ when(mockDefaultMQAdminExt.searchLowerBoundaryOffset(any(MessageQueue.class), anyLong())).thenReturn(0L);
|
|
+ when(mockDefaultMQAdminExt.searchUpperBoundaryOffset(any(MessageQueue.class), anyLong())).thenReturn(100L);
|
|
+ when(mockDefaultMQAdminExt.queryConsumeTimeSpan(anyString(), anyString())).thenReturn(mockQueryConsumeTimeSpan());
|
|
+
|
|
+ for (QueueTimeSpan timeSpan: mockDefaultMQAdminExt.queryConsumeTimeSpan(TOPIC1, "group_one")) {
|
|
+ MessageQueue mq = timeSpan.getMessageQueue();
|
|
+ long maxOffset = mockDefaultMQAdminExt.maxOffset(mq);
|
|
+ long minOffset = mockDefaultMQAdminExt.minOffset(mq);
|
|
+ // if there is at least one message in queue, the maxOffset returns the queue's latest offset + 1
|
|
+ assertThat((maxOffset == 0 ? 0 : maxOffset - 1) == mockDefaultMQAdminExt.searchUpperBoundaryOffset(mq, timeSpan.getMaxTimeStamp())).isTrue();
|
|
+ assertThat(minOffset == mockDefaultMQAdminExt.searchLowerBoundaryOffset(mq, timeSpan.getMinTimeStamp())).isTrue();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private List<QueueTimeSpan> mockQueryConsumeTimeSpan() {
|
|
+ List<QueueTimeSpan> spanSet = new ArrayList<>();
|
|
+ QueueTimeSpan timeSpan = new QueueTimeSpan();
|
|
+ timeSpan.setMessageQueue(new MessageQueue(TOPIC1, BROKER1_NAME, 0));
|
|
+ timeSpan.setMinTimeStamp(1690421253000L);
|
|
+ timeSpan.setMaxTimeStamp(1690507653000L);
|
|
+ spanSet.add(timeSpan);
|
|
+ return spanSet;
|
|
+ }
|
|
+
|
|
@Test
|
|
public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException {
|
|
TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig("127.0.0.1:10911", "topic_test_examine_topicConfig");
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 3bdabf703b883fea6181df8889c08d1e91202291 Mon Sep 17 00:00:00 2001
|
|
From: ShuangxiDing <dingshuangxi888@gmail.com>
|
|
Date: Thu, 3 Aug 2023 16:10:24 +0800
|
|
Subject: [PATCH 5/6] [ISSUE #7109] support the mixed topic type (#7110)
|
|
|
|
* Add mixed message type.
|
|
|
|
* support mixed topic type for grpc server
|
|
|
|
* remove the unnecessary parentheses around expression
|
|
|
|
* format the javadoc
|
|
---
|
|
.../common/attribute/TopicMessageType.java | 5 +++--
|
|
.../proxy/grpc/v2/route/RouteActivity.java | 22 +++++++++++--------
|
|
.../DefaultTopicMessageTypeValidator.java | 7 +++---
|
|
.../validator/TopicMessageTypeValidator.java | 6 ++---
|
|
4 files changed, 23 insertions(+), 17 deletions(-)
|
|
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
|
|
index 77629e4c9..9680acec7 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
|
|
@@ -27,7 +27,8 @@ public enum TopicMessageType {
|
|
NORMAL("NORMAL"),
|
|
FIFO("FIFO"),
|
|
DELAY("DELAY"),
|
|
- TRANSACTION("TRANSACTION");
|
|
+ TRANSACTION("TRANSACTION"),
|
|
+ MIXED("MIXED");
|
|
|
|
private final String value;
|
|
TopicMessageType(String value) {
|
|
@@ -35,7 +36,7 @@ public enum TopicMessageType {
|
|
}
|
|
|
|
public static Set<String> topicMessageTypeSet() {
|
|
- return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value);
|
|
+ return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value, MIXED.value);
|
|
}
|
|
|
|
public String getValue() {
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
|
|
index c5d485691..02dea0cda 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
|
|
@@ -32,6 +32,8 @@ import apache.rocketmq.v2.QueryRouteResponse;
|
|
import apache.rocketmq.v2.Resource;
|
|
import com.google.common.net.HostAndPort;
|
|
import java.util.ArrayList;
|
|
+import java.util.Arrays;
|
|
+import java.util.Collections;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
@@ -259,7 +261,7 @@ public class RouteActivity extends AbstractMessingActivity {
|
|
MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
|
|
.setId(queueIdIndex++)
|
|
.setPermission(Permission.READ)
|
|
- .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
|
|
+ .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
|
|
.build();
|
|
messageQueueList.add(messageQueue);
|
|
}
|
|
@@ -268,7 +270,7 @@ public class RouteActivity extends AbstractMessingActivity {
|
|
MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
|
|
.setId(queueIdIndex++)
|
|
.setPermission(Permission.WRITE)
|
|
- .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
|
|
+ .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
|
|
.build();
|
|
messageQueueList.add(messageQueue);
|
|
}
|
|
@@ -277,7 +279,7 @@ public class RouteActivity extends AbstractMessingActivity {
|
|
MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
|
|
.setId(queueIdIndex++)
|
|
.setPermission(Permission.READ_WRITE)
|
|
- .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
|
|
+ .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
|
|
.build();
|
|
messageQueueList.add(messageQueue);
|
|
}
|
|
@@ -285,18 +287,20 @@ public class RouteActivity extends AbstractMessingActivity {
|
|
return messageQueueList;
|
|
}
|
|
|
|
- private MessageType parseTopicMessageType(TopicMessageType topicMessageType) {
|
|
+ private List<MessageType> parseTopicMessageType(TopicMessageType topicMessageType) {
|
|
switch (topicMessageType) {
|
|
case NORMAL:
|
|
- return MessageType.NORMAL;
|
|
+ return Collections.singletonList(MessageType.NORMAL);
|
|
case FIFO:
|
|
- return MessageType.FIFO;
|
|
+ return Collections.singletonList(MessageType.FIFO);
|
|
case TRANSACTION:
|
|
- return MessageType.TRANSACTION;
|
|
+ return Collections.singletonList(MessageType.TRANSACTION);
|
|
case DELAY:
|
|
- return MessageType.DELAY;
|
|
+ return Collections.singletonList(MessageType.DELAY);
|
|
+ case MIXED:
|
|
+ return Arrays.asList(MessageType.NORMAL, MessageType.FIFO, MessageType.DELAY, MessageType.TRANSACTION);
|
|
default:
|
|
- return MessageType.MESSAGE_TYPE_UNSPECIFIED;
|
|
+ return Collections.singletonList(MessageType.MESSAGE_TYPE_UNSPECIFIED);
|
|
}
|
|
}
|
|
}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
|
|
index bc2fcf30f..83588f110 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
|
|
@@ -23,9 +23,10 @@ import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
|
|
|
|
public class DefaultTopicMessageTypeValidator implements TopicMessageTypeValidator {
|
|
|
|
- public void validate(TopicMessageType topicMessageType, TopicMessageType messageType) {
|
|
- if (messageType.equals(TopicMessageType.UNSPECIFIED) || !messageType.equals(topicMessageType)) {
|
|
- String errorInfo = String.format("TopicMessageType validate failed, topic type is %s, message type is %s", topicMessageType, messageType);
|
|
+ public void validate(TopicMessageType expectedType, TopicMessageType actualType) {
|
|
+ if (actualType.equals(TopicMessageType.UNSPECIFIED)
|
|
+ || !actualType.equals(expectedType) && !expectedType.equals(TopicMessageType.MIXED)) {
|
|
+ String errorInfo = String.format("TopicMessageType validate failed, the expected type is %s, but actual type is %s", expectedType, actualType);
|
|
throw new ProxyException(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, errorInfo);
|
|
}
|
|
}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
|
|
index 137be9095..32758da50 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
|
|
@@ -23,8 +23,8 @@ public interface TopicMessageTypeValidator {
|
|
/**
|
|
* Will throw {@link org.apache.rocketmq.proxy.common.ProxyException} if validate failed.
|
|
*
|
|
- * @param topicMessageType Target topic
|
|
- * @param messageType Message's type
|
|
+ * @param expectedType Target topic
|
|
+ * @param actualType Message's type
|
|
*/
|
|
- void validate(TopicMessageType topicMessageType, TopicMessageType messageType);
|
|
+ void validate(TopicMessageType expectedType, TopicMessageType actualType);
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From c73d8ee346035aec3d548b8b29c64c626a34e68b Mon Sep 17 00:00:00 2001
|
|
From: haolinkong <110664176+haolinkong@users.noreply.github.com>
|
|
Date: Fri, 4 Aug 2023 10:41:41 +0800
|
|
Subject: [PATCH 6/6] [ISSUE #6962]operation.md Format adjustment
|
|
|
|
---
|
|
docs/cn/operation.md | 4 ++--
|
|
1 file changed, 2 insertions(+), 2 deletions(-)
|
|
|
|
diff --git a/docs/cn/operation.md b/docs/cn/operation.md
|
|
index 4310da570..9f04ce1d3 100644
|
|
--- a/docs/cn/operation.md
|
|
+++ b/docs/cn/operation.md
|
|
@@ -569,9 +569,9 @@ RocketMQ 5.0 开始支持自动主从切换的模式,可参考以下文档
|
|
<td class=xl68 width=87 style='width:65pt'>NameServer 服务地址,格式 ip:port</td>
|
|
</tr>
|
|
<tr height=57 style='height:43.0pt'>
|
|
- <td rowspan=1 height=137 class=xl69 width=191 style='border-bottom:1.0pt;
|
|
+ <td rowspan=3 height=137 class=xl69 width=191 style='border-bottom:1.0pt;
|
|
height:103.0pt;border-top:none;width:143pt'>wipeWritePerm</td>
|
|
- <td rowspan=1 class=xl72 width=87 style='border-bottom:1.0pt
|
|
+ <td rowspan=3 class=xl72 width=87 style='border-bottom:1.0pt
|
|
border-top:none;width:65pt'>从NameServer上清除 Broker写权限</td>
|
|
<td class=xl67 width=87 style='width:65pt'>-b</td>
|
|
<td class=xl68 width=87 style='width:65pt'>BrokerName</td>
|
|
--
|
|
2.32.0.windows.2
|
|
|