rocketmq/patch007-backport-fix-some-bugs.patch

1426 lines
70 KiB
Diff
Raw Permalink Normal View History

2023-09-19 16:10:33 +08:00
From 90c5382aee07879a80309f257f04114201ccaac6 Mon Sep 17 00:00:00 2001
From: ShuangxiDing <dingshuangxi888@gmail.com>
Date: Fri, 21 Jul 2023 20:28:58 +0800
Subject: [PATCH 01/10] [ISSUE #7061] Support forward HAProxyMessage for Multi
Protocol server. (#7062)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator
* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC and Remoting server.
* 回滚netty的升级
* Support proxy protocol for gRPC and Remoting server.
* Support proxy protocol for gRPC and Remoting server.
* Support proxy protocol for gRPC and Remoting server.
* add grpc-netty-codec-haproxy in bazel
* add grpc-netty-codec-haproxy in bazel
* Support proxy protocol for gRPC and Remoting server.
* Fix Test
* add grpc-netty-codec-haproxy in bazel
* add ProxyProtocolTest for Remoting
* Support HAProxyMessage forward for multi protocol server.
---------
Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com>
---
.../http2proxy/HAProxyMessageForwarder.java | 129 ++++++++++++++++++
.../http2proxy/Http2ProtocolProxyHandler.java | 23 +++-
.../http2proxy/Http2ProxyBackendHandler.java | 2 +
.../http2proxy/Http2ProxyFrontendHandler.java | 28 ++--
4 files changed, 164 insertions(+), 18 deletions(-)
create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
new file mode 100644
index 000000000..8f139d3d9
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.haproxy.HAProxyCommand;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
+import io.netty.handler.codec.haproxy.HAProxyTLV;
+import io.netty.util.Attribute;
+import io.netty.util.DefaultAttributeMap;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.acl.common.AclUtils;
+import org.apache.rocketmq.common.constant.HAProxyConstants;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
+
+import java.lang.reflect.Field;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HAProxyMessageForwarder extends ChannelInboundHandlerAdapter {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+
+ private static final Field FIELD_ATTRIBUTE =
+ FieldUtils.getField(DefaultAttributeMap.class, "attributes", true);
+
+ private final Channel outboundChannel;
+
+ public HAProxyMessageForwarder(final Channel outboundChannel) {
+ this.outboundChannel = outboundChannel;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ try {
+ forwardHAProxyMessage(ctx.channel(), outboundChannel);
+ ctx.fireChannelRead(msg);
+ } catch (Exception e) {
+ log.error("Forward HAProxyMessage from Remoting to gRPC server error.", e);
+ throw e;
+ } finally {
+ ctx.pipeline().remove(this);
+ }
+ }
+
+ private void forwardHAProxyMessage(Channel inboundChannel, Channel outboundChannel) throws Exception {
+ if (!inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
+ return;
+ }
+
+ if (!(inboundChannel instanceof DefaultAttributeMap)) {
+ return;
+ }
+
+ Attribute<?>[] attributes = (Attribute<?>[]) FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel);
+ if (ArrayUtils.isEmpty(attributes)) {
+ return;
+ }
+
+ String sourceAddress = null, destinationAddress = null;
+ int sourcePort = 0, destinationPort = 0;
+ List<HAProxyTLV> haProxyTLVs = new ArrayList<>();
+
+ for (Attribute<?> attribute : attributes) {
+ String attributeKey = attribute.key().name();
+ if (!StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
+ continue;
+ }
+ String attributeValue = (String) attribute.get();
+ if (StringUtils.isEmpty(attributeValue)) {
+ continue;
+ }
+ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_ADDR) {
+ sourceAddress = attributeValue;
+ }
+ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_PORT) {
+ sourcePort = Integer.parseInt(attributeValue);
+ }
+ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR) {
+ destinationAddress = attributeValue;
+ }
+ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_PORT) {
+ destinationPort = Integer.parseInt(attributeValue);
+ }
+ if (StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
+ String typeString = StringUtils.substringAfter(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX);
+ ByteBuf byteBuf = Unpooled.buffer();
+ byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset()));
+ HAProxyTLV haProxyTLV = new HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf);
+ haProxyTLVs.add(haProxyTLV);
+ }
+ }
+
+ HAProxyProxiedProtocol proxiedProtocol = AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 :
+ HAProxyProxiedProtocol.TCP4;
+
+ HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY,
+ proxiedProtocol, sourceAddress, destinationAddress, sourcePort, destinationPort, haProxyTLVs);
+ outboundChannel.writeAndFlush(message).sync();
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
index 913f35c93..c37db92af 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -24,13 +24,14 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
-import javax.net.ssl.SSLException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -38,8 +39,11 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import javax.net.ssl.SSLException;
+
public class Http2ProtocolProxyHandler implements ProtocolHandler {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
private static final String LOCAL_HOST = "127.0.0.1";
@@ -101,11 +105,8 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler {
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
- if (sslContext != null) {
- ch.pipeline()
- .addLast(sslContext.newHandler(ch.alloc(), LOCAL_HOST, config.getGrpcServerPort()));
- }
- ch.pipeline().addLast(new Http2ProxyBackendHandler(inboundChannel));
+ ch.pipeline().addLast(null, Http2ProxyBackendHandler.HANDLER_NAME,
+ new Http2ProxyBackendHandler(inboundChannel));
}
})
.option(ChannelOption.AUTO_READ, false)
@@ -120,7 +121,15 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler {
}
final Channel outboundChannel = f.channel();
+ if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
+ ctx.pipeline().addLast(new HAProxyMessageForwarder(outboundChannel));
+ outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE);
+ }
- ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel));
+ SslHandler sslHandler = null;
+ if (sslContext != null) {
+ sslHandler = sslContext.newHandler(outboundChannel.alloc(), LOCAL_HOST, config.getGrpcServerPort());
+ }
+ ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel, sslHandler));
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
index 0195b0c1c..fd5408fae 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
@@ -29,6 +29,8 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class Http2ProxyBackendHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+ public static final String HANDLER_NAME = "Http2ProxyBackendHandler";
+
private final Channel inboundChannel;
public Http2ProxyBackendHandler(Channel inboundChannel) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
index 87147a322..9b37e85e5 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
@@ -19,36 +19,42 @@ package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.ssl.SslHandler;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class Http2ProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+
+ public static final String HANDLER_NAME = "SslHandler";
+
// As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as
// the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
private final Channel outboundChannel;
+ private final SslHandler sslHandler;
- public Http2ProxyFrontendHandler(final Channel outboundChannel) {
+ public Http2ProxyFrontendHandler(final Channel outboundChannel, final SslHandler sslHandler) {
this.outboundChannel = outboundChannel;
+ this.sslHandler = sslHandler;
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (outboundChannel.isActive()) {
- outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- if (future.isSuccess()) {
- // was able to flush out data, start to read the next chunk
- ctx.channel().read();
- } else {
- future.channel().close();
- }
+ if (sslHandler != null && outboundChannel.pipeline().get(HANDLER_NAME) == null) {
+ outboundChannel.pipeline().addBefore(Http2ProxyBackendHandler.HANDLER_NAME, HANDLER_NAME, sslHandler);
+ }
+
+ outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
+ if (future.isSuccess()) {
+ // was able to flush out data, start to read the next chunk
+ ctx.channel().read();
+ } else {
+ future.channel().close();
}
});
}
--
2.32.0.windows.2
From 8027cfc7cbb6c120d2fc045e0caa8debe1028a31 Mon Sep 17 00:00:00 2001
From: maclong1989 <814742806@qq.com>
Date: Sun, 23 Jul 2023 09:15:05 +0800
Subject: [PATCH 02/10] [ISSUE #7063] doc: fix typo in user_guide.md
Signed-off-by: jiangyl3 <jiangyl3@asiainfo.com>
Co-authored-by: jiangyl3 <jiangyl3@asiainfo.com>
---
docs/cn/msg_trace/user_guide.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/cn/msg_trace/user_guide.md b/docs/cn/msg_trace/user_guide.md
index d8314052b..9cf139fd3 100644
--- a/docs/cn/msg_trace/user_guide.md
+++ b/docs/cn/msg_trace/user_guide.md
@@ -35,7 +35,7 @@ namesrvAddr=XX.XX.XX.XX:9876
RocketMQ集群中每一个Broker节点均用于存储Client端收集并发送过来的消息轨迹数据。因此对于RocketMQ集群中的Broker节点数量并无要求和限制。
### 2.3 物理IO隔离模式
-对于消息轨迹数据量较大的场景可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离互不影响。在该模式下RockeMQ集群中至少有两个Broker节点其中一个Broker节点定义为存储消息轨迹数据的服务端。
+对于消息轨迹数据量较大的场景可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离互不影响。在该模式下RocketMQ集群中至少有两个Broker节点其中一个Broker节点定义为存储消息轨迹数据的服务端。
### 2.4 启动开启消息轨迹的Broker
`nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &`
--
2.32.0.windows.2
From 3102758487f3e21e977424d7f1b7187eb6c069cb Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=90=B4=E6=98=9F=E7=81=BF?=
<37405937+wuyoudexiao@users.noreply.github.com>
Date: Tue, 25 Jul 2023 13:47:53 +0800
Subject: [PATCH 03/10] fix: npe in lockBatchMQ and unlockBatchMQ (#7078)
Co-authored-by: wxc <wuxingcan666@foxmail.com>
---
.../rocketmq/proxy/processor/ConsumerProcessor.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index cc973813b..656a6339d 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.processor;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -425,13 +426,15 @@ public class ConsumerProcessor extends AbstractProcessor {
}
protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext ctx, Set<MessageQueue> mqSet) {
- return mqSet.stream().map(mq -> {
+ Set<AddressableMessageQueue> addressableMessageQueueSet = new HashSet<>(mqSet.size());
+ for (MessageQueue mq:mqSet) {
try {
- return serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq);
+ addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq)) ;
} catch (Exception e) {
- return null;
+ log.error("build addressable message queue fail, messageQueue = {}", mq, e);
}
- }).collect(Collectors.toSet());
+ }
+ return addressableMessageQueueSet;
}
protected HashMap<String, List<AddressableMessageQueue>> buildAddressableMapByBrokerName(
--
2.32.0.windows.2
From 047ef7498f2203a2234052603a99a114d8a65e17 Mon Sep 17 00:00:00 2001
From: rongtong <jinrongtong5@163.com>
Date: Tue, 25 Jul 2023 14:00:22 +0800
Subject: [PATCH 04/10] Ensuring consistency between broker and nameserver data
when deleting a topic (#7066)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Co-authored-by: 尘央 <xinyuzhou.zxy@alibaba-inc.com>
---
.../rocketmq/broker/BrokerController.java | 11 ++
.../rocketmq/broker/out/BrokerOuterAPI.java | 62 ++++++++++
.../processor/AdminBrokerProcessor.java | 26 ++--
.../broker/topic/TopicConfigManager.java | 6 +-
.../apache/rocketmq/common/BrokerConfig.java | 14 +++
.../common/namesrv/NamesrvConfig.java | 17 +++
.../namesrv/routeinfo/RouteInfoManager.java | 64 ++++++++--
.../routeinfo/RouteInfoManagerNewTest.java | 99 +++++++++++++++
.../rocketmq/test/util/MQAdminTestUtils.java | 37 ++++++
.../dledger/DLedgerProduceAndConsumeIT.java | 2 +-
.../test/route/CreateAndUpdateTopicIT.java | 114 ++++++++++++++++++
11 files changed, 429 insertions(+), 23 deletions(-)
rename test/src/test/java/org/apache/rocketmq/test/{base => }/dledger/DLedgerProduceAndConsumeIT.java (99%)
create mode 100644 test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 196401e26..972457194 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1678,6 +1678,17 @@ public class BrokerController {
}, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));
}
+ public synchronized void registerSingleTopicAll(final TopicConfig topicConfig) {
+ TopicConfig tmpTopic = topicConfig;
+ if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+ // Copy the topic config and modify the perm
+ tmpTopic = new TopicConfig(topicConfig);
+ tmpTopic.setPerm(topicConfig.getPerm() & this.brokerConfig.getBrokerPermission());
+ }
+ this.brokerOuterAPI.registerSingleTopicAll(this.brokerConfig.getBrokerName(), tmpTopic, 3000);
+ }
+
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
this.registerIncrementBrokerData(Collections.singletonList(topicConfig), dataVersion);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index b6273e9ed..1793a83c0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -120,12 +121,14 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionRequ
import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.rpc.ClientMetadata;
import org.apache.rocketmq.remoting.rpc.RpcClient;
@@ -614,6 +617,65 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}
+ /**
+ * Register the topic route info of single topic to all name server nodes.
+ * This method is used to replace incremental broker registration feature.
+ */
+ public void registerSingleTopicAll(
+ final String brokerName,
+ final TopicConfig topicConfig,
+ final int timeoutMills) {
+ String topic = topicConfig.getTopicName();
+ RegisterTopicRequestHeader requestHeader = new RegisterTopicRequestHeader();
+ requestHeader.setTopic(topic);
+
+ TopicRouteData topicRouteData = new TopicRouteData();
+ List<QueueData> queueDatas = new ArrayList<>();
+ topicRouteData.setQueueDatas(queueDatas);
+
+ final QueueData queueData = new QueueData();
+ queueData.setBrokerName(brokerName);
+ queueData.setPerm(topicConfig.getPerm());
+ queueData.setReadQueueNums(topicConfig.getReadQueueNums());
+ queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
+ queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());
+ queueDatas.add(queueData);
+ final byte[] topicRouteBody = topicRouteData.encode();
+
+
+ List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
+ final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
+ for (final String namesrvAddr : nameServerAddressList) {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_TOPIC_IN_NAMESRV, requestHeader);
+ request.setBody(topicRouteBody);
+
+ try {
+ brokerOuterExecutor.execute(() -> {
+ try {
+ RemotingCommand response = BrokerOuterAPI.this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
+ assert response != null;
+ LOGGER.info("Register single topic %s to broker %s with response code %s", topic, brokerName, response.getCode());
+ } catch (Exception e) {
+ LOGGER.warn(String.format("Register single topic %s to broker %s exception", topic, brokerName), e);
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
+ } catch (Exception e) {
+ LOGGER.warn("Execute single topic registration task failed, topic {}, broker name {}", topic, brokerName);
+ countDownLatch.countDown();
+ }
+
+ }
+
+ try {
+ if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
+ LOGGER.warn("Registration single topic to one or more name servers timeout. Timeout threshold: {}ms", timeoutMills);
+ }
+ } catch (InterruptedException ignore) {
+ }
+ }
+
public List<Boolean> needRegister(
final String clusterName,
final String brokerAddr,
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 892a71330..569a1c57b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -441,13 +441,18 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+ this.brokerController.registerSingleTopicAll(topicConfig);
+ } else {
+ this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
+ }
response.setCode(ResponseCode.SUCCESS);
} catch (Exception e) {
LOGGER.error("Update / create topic failed for [{}]", request, e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
}
+
return response;
}
@@ -769,7 +774,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
- private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+ private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx,
+ RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
LOGGER.info("updateColdDataFlowCtrGroupConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
@@ -876,7 +882,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
MessageStore messageStore = this.brokerController.getMessageStore();
if (messageStore instanceof DefaultMessageStore) {
- DefaultMessageStore defaultMessageStore = (DefaultMessageStore)messageStore;
+ DefaultMessageStore defaultMessageStore = (DefaultMessageStore) messageStore;
if (mode == LibC.MADV_NORMAL) {
defaultMessageStore.getMessageStoreConfig().setDataReadAheadEnable(true);
} else {
@@ -1835,13 +1841,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
/**
* Reset consumer offset.
*
- * @param topic Required, not null.
- * @param group Required, not null.
- * @param queueId if target queue ID is negative, all message queues will be reset;
- * otherwise, only the target queue would get reset.
- * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being;
- * otherwise, binary search is performed to locate target offset.
- * @param offset Target offset to reset to if target queue ID is properly provided.
+ * @param topic Required, not null.
+ * @param group Required, not null.
+ * @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue
+ * would get reset.
+ * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; otherwise,
+ * binary search is performed to locate target offset.
+ * @param offset Target offset to reset to if target queue ID is properly provided.
* @return Affected queues and their new offset
*/
private RemotingCommand resetOffsetInner(String topic, String group, int queueId, long timestamp, Long offset) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index e5fdd8675..e90530512 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -305,7 +305,11 @@ public class TopicConfigManager extends ConfigManager {
log.error("createTopicIfAbsent ", e);
}
if (createNew && register) {
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+ this.brokerController.registerSingleTopicAll(topicConfig);
+ } else {
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
+ }
}
return this.topicConfigTable.get(topicConfig.getTopicName());
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a4d82d1c5..02c692e2b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -386,6 +386,12 @@ public class BrokerConfig extends BrokerIdentity {
*/
private boolean popResponseReturnActualRetryTopic = false;
+ /**
+ * If both the deleteTopicWithBrokerRegistration flag in the NameServer configuration and this flag are set to true,
+ * it guarantees the ultimate consistency of data between the broker and the nameserver during topic deletion.
+ */
+ private boolean enableSingleTopicRegister = false;
+
public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
@@ -1689,4 +1695,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setPopResponseReturnActualRetryTopic(boolean popResponseReturnActualRetryTopic) {
this.popResponseReturnActualRetryTopic = popResponseReturnActualRetryTopic;
}
+
+ public boolean isEnableSingleTopicRegister() {
+ return enableSingleTopicRegister;
+ }
+
+ public void setEnableSingleTopicRegister(boolean enableSingleTopicRegister) {
+ this.enableSingleTopicRegister = enableSingleTopicRegister;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
index 700febfe2..5b8a6dedb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
@@ -82,6 +82,15 @@ public class NamesrvConfig {
private int waitSecondsForService = 45;
+ /**
+ * If enable this flag, the topics that don't exist in broker registration payload will be deleted from name server.
+ *
+ * WARNING:
+ * 1. Enable this flag and "enableSingleTopicRegister" of broker config meanwhile to avoid losing topic route info unexpectedly.
+ * 2. This flag does not support static topic currently.
+ */
+ private boolean deleteTopicWithBrokerRegistration = false;
+
public boolean isOrderMessageEnable() {
return orderMessageEnable;
}
@@ -241,4 +250,12 @@ public class NamesrvConfig {
public void setWaitSecondsForService(int waitSecondsForService) {
this.waitSecondsForService = waitSecondsForService;
}
+
+ public boolean isDeleteTopicWithBrokerRegistration() {
+ return deleteTopicWithBrokerRegistration;
+ }
+
+ public void setDeleteTopicWithBrokerRegistration(boolean deleteTopicWithBrokerRegistration) {
+ this.deleteTopicWithBrokerRegistration = deleteTopicWithBrokerRegistration;
+ }
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index ac27d76ce..0055a1cc8 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -121,9 +121,18 @@ public class RouteInfoManager {
if (queueDatas == null || queueDatas.isEmpty()) {
return;
}
+
try {
this.lock.writeLock().lockInterruptibly();
if (this.topicQueueTable.containsKey(topic)) {
+ Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
+ for (QueueData queueData : queueDatas) {
+ if (!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
+ log.warn("Register topic contains illegal broker, {}, {}", topic, queueData);
+ return;
+ }
+ queueDataMap.put(queueData.getBrokerName(), queueData);
+ }
log.info("Topic route already exist.{}, {}", topic, this.topicQueueTable.get(topic));
} else {
// check and construct queue data map
@@ -299,7 +308,32 @@ public class RouteInfoManager {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
+
if (tcTable != null) {
+
+ TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
+ Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
+
+ // Delete the topics that don't exist in tcTable from the current broker
+ // Static topic is not supported currently
+ if (namesrvConfig.isDeleteTopicWithBrokerRegistration() && topicQueueMappingInfoMap.isEmpty()) {
+ final Set<String> oldTopicSet = topicSetOfBrokerName(brokerName);
+ final Set<String> newTopicSet = tcTable.keySet();
+ final Sets.SetView<String> toDeleteTopics = Sets.difference(oldTopicSet, newTopicSet);
+ for (final String toDeleteTopic : toDeleteTopics) {
+ Map<String, QueueData> queueDataMap = topicQueueTable.get(toDeleteTopic);
+ final QueueData removedQD = queueDataMap.remove(brokerName);
+ if (removedQD != null) {
+ log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, toDeleteTopic, removedQD);
+ }
+
+ if (queueDataMap.isEmpty()) {
+ log.info("deleteTopic, remove the topic all queue {}", toDeleteTopic);
+ topicQueueTable.remove(toDeleteTopic);
+ }
+ }
+ }
+
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
topicConfigWrapper.getDataVersion(), brokerName,
@@ -312,19 +346,17 @@ public class RouteInfoManager {
this.createAndUpdateQueueData(brokerName, topicConfig);
}
}
- }
- if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
- TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
- Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
- //the topicQueueMappingInfoMap should never be null, but can be empty
- for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
- if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
- topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
+ if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
+ //the topicQueueMappingInfoMap should never be null, but can be empty
+ for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
+ if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
+ topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
+ }
+ //Note asset brokerName equal entry.getValue().getBname()
+ //here use the mappingDetail.bname
+ topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
}
- //Note asset brokerName equal entry.getValue().getBname()
- //here use the mappingDetail.bname
- topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
}
}
}
@@ -374,6 +406,16 @@ public class RouteInfoManager {
return result;
}
+ private Set<String> topicSetOfBrokerName(final String brokerName) {
+ Set<String> topicOfBroker = new HashSet<>();
+ for (final Entry<String, Map<String, QueueData>> entry : this.topicQueueTable.entrySet()) {
+ if (entry.getValue().containsKey(brokerName)) {
+ topicOfBroker.add(entry.getKey());
+ }
+ }
+ return topicOfBroker;
+ }
+
public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String brokerName) {
BrokerMemberGroup groupMember = new BrokerMemberGroup(clusterName, brokerName);
try {
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
index b53519e5f..6002d1f5a 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
@@ -22,6 +22,7 @@ import io.netty.channel.Channel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -37,6 +38,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.junit.After;
import org.junit.Before;
@@ -624,6 +626,92 @@ public class RouteInfoManagerNewTest {
.containsValues(BrokerBasicInfo.defaultBroker().brokerAddr, BrokerBasicInfo.slaveBroker().brokerAddr);
}
+ @Test
+ public void keepTopicWithBrokerRegistration() {
+ RegisterBrokerResult masterResult = registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic", "TestTopic1");
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+ masterResult = registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic1");
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+ }
+
+ @Test
+ public void deleteTopicWithBrokerRegistration() {
+ config.setDeleteTopicWithBrokerRegistration(true);
+ registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic", "TestTopic1");
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+ registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic1");
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+ }
+
+ @Test
+ public void deleteTopicWithBrokerRegistration2() {
+ // Register two brokers and delete a specific one by one
+ config.setDeleteTopicWithBrokerRegistration(true);
+ final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker();
+ final BrokerBasicInfo master2 = BrokerBasicInfo.defaultBroker().name(DEFAULT_BROKER + 1).addr(DEFAULT_ADDR + 9);
+
+ registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1");
+ registerBrokerWithNormalTopic(master2, "TestTopic", "TestTopic1");
+
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(2);
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
+
+
+ registerBrokerWithNormalTopic(master1, "TestTopic1");
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(1);
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas().get(0).getBrokerName())
+ .isEqualTo(master2.brokerName);
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
+
+ registerBrokerWithNormalTopic(master2, "TestTopic1");
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
+ }
+
+ @Test
+ public void registerSingleTopicWithBrokerRegistration() {
+ config.setDeleteTopicWithBrokerRegistration(true);
+ final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker();
+
+ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic");
+
+ // Single topic registration failed because there is no broker connection exists
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+
+ // Register broker with TestTopic first and then register single topic TestTopic1
+ registerBrokerWithNormalTopic(master1, "TestTopic");
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+
+ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1");
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+ // Register the two topics to keep the route info
+ registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1");
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+ // Cancel the TestTopic1 with broker registration
+ registerBrokerWithNormalTopic(master1, "TestTopic");
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
+
+ // Add TestTopic1 and cancel all the topics with broker un-registration
+ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1");
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+ routeInfoManager.unregisterBroker(master1.clusterName, master1.brokerAddr, master1.brokerName, 0);
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
+
+
+ }
+
private RegisterBrokerResult registerBrokerWithNormalTopic(BrokerBasicInfo brokerInfo, String... topics) {
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
TopicConfig baseTopic = new TopicConfig("baseTopic");
@@ -711,6 +799,17 @@ public class RouteInfoManagerNewTest {
return registerBrokerResult;
}
+ private void registerSingleTopicWithBrokerName(String brokerName, String... topics) {
+ for (final String topic : topics) {
+ QueueData queueData = new QueueData();
+ queueData.setBrokerName(brokerName);
+ queueData.setReadQueueNums(8);
+ queueData.setWriteQueueNums(8);
+ queueData.setPerm(6);
+ routeInfoManager.registerTopic(topic, Collections.singletonList(queueData));
+ }
+ }
+
static class BrokerBasicInfo {
String clusterName;
String brokerName;
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 11b00a72c..d3d5de9e2 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.util;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -38,6 +39,7 @@ import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
@@ -319,4 +321,39 @@ public class MQAdminTestUtils {
}
return consumeStats;
}
+
+ /**
+ * Delete topic from broker only without cleaning route info from name server forwardly
+ *
+ * @param nameSrvAddr the namesrv addr to connect
+ * @param brokerName the specific broker
+ * @param topic the specific topic to delete
+ */
+ public static void deleteTopicFromBrokerOnly(String nameSrvAddr, String brokerName, String topic) {
+ DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+ mqAdminExt.setNamesrvAddr(nameSrvAddr);
+
+ try {
+ mqAdminExt.start();
+ String brokerAddr = CommandUtil.fetchMasterAddrByBrokerName(mqAdminExt, brokerName);
+ mqAdminExt.deleteTopicInBroker(Collections.singleton(brokerAddr), topic);
+ } catch (Exception ignored) {
+ } finally {
+ mqAdminExt.shutdown();
+ }
+ }
+
+ public static TopicRouteData examineTopicRouteInfo(String nameSrvAddr, String topicName) {
+ DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+ mqAdminExt.setNamesrvAddr(nameSrvAddr);
+ TopicRouteData route = null;
+ try {
+ mqAdminExt.start();
+ route = mqAdminExt.examineTopicRouteInfo(topicName);
+ } catch (Exception ignored) {
+ } finally {
+ mqAdminExt.shutdown();
+ }
+ return route;
+ }
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
similarity index 99%
rename from test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
rename to test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
index 9e142eb61..43fefd616 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.test.base.dledger;
+package org.apache.rocketmq.test.dledger;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
diff --git a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
new file mode 100644
index 000000000..7e3c7b871
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.route;
+
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CreateAndUpdateTopicIT extends BaseConf {
+
+ @Test
+ public void testCreateOrUpdateTopic_EnableSingleTopicRegistration() {
+ String topic = "test-topic-without-broker-registration";
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
+
+ final boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, topic, 8, null);
+ assertThat(createResult).isTrue();
+
+ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, topic);
+ assertThat(route.getBrokerDatas()).hasSize(3);
+ assertThat(route.getQueueDatas()).hasSize(3);
+
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
+
+ }
+
+ @Test
+ public void testDeleteTopicFromNameSrvWithBrokerRegistration() {
+ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true);
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
+
+ String testTopic1 = "test-topic-keep-route";
+ String testTopic2 = "test-topic-delete-route";
+
+ boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic1, 8, null);
+ assertThat(createResult).isTrue();
+
+
+ createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic2, 8, null);
+ assertThat(createResult).isTrue();
+
+
+ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2);
+ assertThat(route.getBrokerDatas()).hasSize(3);
+
+ MQAdminTestUtils.deleteTopicFromBrokerOnly(NAMESRV_ADDR, BROKER1_NAME, testTopic2);
+
+ // Deletion is lazy, trigger broker registration
+ brokerController1.registerBrokerAll(false, false, true);
+
+ // The route info of testTopic2 will be removed from broker1 after the registration
+ route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2);
+ assertThat(route.getBrokerDatas()).hasSize(2);
+ assertThat(route.getQueueDatas().get(0).getBrokerName()).isEqualTo(BROKER2_NAME);
+ assertThat(route.getQueueDatas().get(1).getBrokerName()).isEqualTo(BROKER3_NAME);
+
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
+ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
+ }
+
+ @Test
+ public void testStaticTopicNotAffected() throws Exception {
+ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true);
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
+
+ String testTopic = "test-topic-not-affected";
+ String testStaticTopic = "test-static-topic";
+
+ boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic, 8, null);
+ assertThat(createResult).isTrue();
+
+ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic);
+ assertThat(route.getBrokerDatas()).hasSize(3);
+ assertThat(route.getQueueDatas()).hasSize(3);
+
+ MQAdminTestUtils.createStaticTopicWithCommand(testStaticTopic, 10, null, CLUSTER_NAME, NAMESRV_ADDR);
+
+ assertThat(route.getBrokerDatas()).hasSize(3);
+ assertThat(route.getQueueDatas()).hasSize(3);
+
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
+ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
+ }
+}
--
2.32.0.windows.2
From 32eb1d55570af81641a4a40d96ff5554329b93cb Mon Sep 17 00:00:00 2001
From: gaoyf <gaoyf@users.noreply.github.com>
Date: Tue, 25 Jul 2023 15:26:20 +0800
Subject: [PATCH 05/10] [ISSUE #7068] Fix failed to create syncer topic when
the proxy was just started (#7076)
---
.../apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java | 1 +
1 file changed, 1 insertion(+)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
index f7d9b11ba..c68859b28 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
@@ -104,6 +104,7 @@ public class MQClientAPIFactory implements StartAndShutdown {
rpcHook);
if (!mqClientAPIExt.updateNameServerAddressList()) {
+ mqClientAPIExt.fetchNameServerAddr();
this.scheduledExecutorService.scheduleAtFixedRate(
mqClientAPIExt::fetchNameServerAddr,
Duration.ofSeconds(10).toMillis(),
--
2.32.0.windows.2
From d79737788078707168c0258c4af0d800de32c137 Mon Sep 17 00:00:00 2001
From: Vincent Lee <cool8511@gmail.com>
Date: Thu, 27 Jul 2023 10:51:51 +0800
Subject: [PATCH 06/10] [ISSUE #7056] Avoid close success channel if invokeSync
most time cost on get connection for channel (#7057)
* fix: avoid close success channel if invokeSync most time cost on get channel
Change-Id: I29741cf55ac6333bfa30fef755357b78a22b1325
* fix: ci style
Change-Id: I8c9b86e9cb6f1463bf213e64c9b8c139afa794c8
---
.../rocketmq/remoting/netty/NettyRemotingClient.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 9715b918a..8491f4354 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -88,6 +88,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
+ private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100;
private final NettyClientConfig nettyClientConfig;
private final Bootstrap bootstrap = new Bootstrap();
@@ -524,13 +525,15 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
final Channel channel = this.getAndCreateChannel(addr);
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
+ long left = timeoutMillis;
try {
doBeforeRpcHooks(channelRemoteAddr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTime) {
+ left -= costTime;
+ if (left <= 0) {
throw new RemotingTimeoutException("invokeSync call the addr[" + channelRemoteAddr + "] timeout");
}
- RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
+ RemotingCommand response = this.invokeSyncImpl(channel, request, left);
doAfterRpcHooks(channelRemoteAddr, request, response);
this.updateChannelLastResponseTime(addr);
return response;
@@ -539,7 +542,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
- if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
+ // avoid close the success channel if left timeout is small, since it may cost too much time in get the success channel, the left timeout for read is small
+ boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left > timeoutMillis / 4;
+ if (nettyClientConfig.isClientCloseSocketIfTimeout() && shouldClose) {
this.closeChannel(addr, channel);
LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, channelRemoteAddr);
}
--
2.32.0.windows.2
From d0a69be563785ca815dc31ef1aab4c1bc5588c01 Mon Sep 17 00:00:00 2001
From: zd46319 <zd46319@163.com>
Date: Thu, 27 Jul 2023 16:56:41 +0800
Subject: [PATCH 07/10] [ISSUE #6810] Fix the bug of mistakenly deleting data
in clientChannelTable when the channel expire (#7073)
---
.../broker/client/ProducerManager.java | 5 ++-
.../broker/client/ProducerManagerTest.java | 34 +++++++++++++++++++
2 files changed, 38 insertions(+), 1 deletion(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 52d67bf28..f9fe1193e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -112,7 +112,10 @@ public class ProducerManager {
long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
- clientChannelTable.remove(info.getClientId());
+ Channel channelInClientTable = clientChannelTable.get(info.getClientId());
+ if (channelInClientTable != null && channelInClientTable.equals(info.getChannel())) {
+ clientChannelTable.remove(info.getClientId());
+ }
log.warn(
"ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
index dac5468c8..3d6091e02 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
@@ -27,6 +27,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
@@ -79,6 +80,39 @@ public class ProducerManagerTest {
assertThat(producerManager.findChannel("clientId")).isNull();
}
+ @Test
+ public void scanNotActiveChannelWithSameClientId() throws Exception {
+ producerManager.registerProducer(group, clientInfo);
+ Channel channel1 = Mockito.mock(Channel.class);
+ ClientChannelInfo clientInfo1 = new ClientChannelInfo(channel1, clientInfo.getClientId(), LanguageCode.JAVA, 0);
+ producerManager.registerProducer(group, clientInfo1);
+ AtomicReference<String> groupRef = new AtomicReference<>();
+ AtomicReference<ClientChannelInfo> clientChannelInfoRef = new AtomicReference<>();
+ producerManager.appendProducerChangeListener((event, group, clientChannelInfo) -> {
+ switch (event) {
+ case GROUP_UNREGISTER:
+ groupRef.set(group);
+ break;
+ case CLIENT_UNREGISTER:
+ clientChannelInfoRef.set(clientChannelInfo);
+ break;
+ default:
+ break;
+ }
+ });
+ assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
+ assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull();
+ assertThat(producerManager.findChannel("clientId")).isNotNull();
+ Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
+ field.setAccessible(true);
+ long channelExpiredTimeout = field.getLong(producerManager);
+ clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - channelExpiredTimeout - 10);
+ when(channel.close()).thenReturn(mock(ChannelFuture.class));
+ producerManager.scanNotActiveChannel();
+ assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull();
+ assertThat(producerManager.findChannel("clientId")).isNotNull();
+ }
+
@Test
public void doChannelCloseEvent() throws Exception {
producerManager.registerProducer(group, clientInfo);
--
2.32.0.windows.2
From d429bd72dfae0901f4325c8e9c6ce631286e40d4 Mon Sep 17 00:00:00 2001
From: cnScarb <jjhfen00@163.com>
Date: Fri, 28 Jul 2023 09:46:39 +0800
Subject: [PATCH 08/10] [ISSUE #7039] Fix retry message filter when subtype is
TAG (#7040)
---
.../broker/filter/ExpressionForRetryMessageFilter.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
index d2d1087ef..bc01b21cb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
@@ -45,12 +45,12 @@ public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
return true;
}
- boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
-
- if (!isRetryTopic && ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+ if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
return true;
}
+ boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+
ConsumerFilterData realFilterData = this.consumerFilterData;
Map<String, String> tempProperties = properties;
boolean decoded = false;
--
2.32.0.windows.2
From 8baa51e85e569429293720b2ba7fcaee745abecc Mon Sep 17 00:00:00 2001
From: Zack_Aayush <60972989+AayushSaini101@users.noreply.github.com>
Date: Sun, 30 Jul 2023 09:02:02 +0530
Subject: [PATCH 09/10] [ISSUE #7091] Update the cd command in README (#7096)
* Update the cd command
* Removed extra space
---------
Co-authored-by: Aayush <aaayush@redhat.com>
---
README.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/README.md b/README.md
index 393ef88e6..56d253ce1 100644
--- a/README.md
+++ b/README.md
@@ -63,7 +63,7 @@ $ unzip rocketmq-all-5.1.3-bin-release.zip
Prepare a terminal and change to the extracted `bin` directory:
```shell
-$ cd rocketmq-all-5.1.3/bin
+$ cd rocketmq-all-5.1.3-bin-release/bin
```
**1) Start NameServer**
--
2.32.0.windows.2
From 8bcc94829d2ef2597a8eeab3c6b7099432a0bea1 Mon Sep 17 00:00:00 2001
From: weihubeats <weihubeats@163.com>
Date: Tue, 1 Aug 2023 10:15:07 +0800
Subject: [PATCH 10/10] [ISSUE #7077] Schedule CQ offset invalid. offset=77,
cqMinOffset=0, cqMaxOffset=74, queueId=1 (#7084)
* Adding null does not update
* delete slave put correctDelayOffset
* Remove duplicate delayOffset file loading
* add loadWhenSyncDelayOffset
* add method
* add method
---
.../rocketmq/broker/schedule/ScheduleMessageService.java | 6 ++++++
.../org/apache/rocketmq/broker/slave/SlaveSynchronize.java | 2 +-
2 files changed, 7 insertions(+), 1 deletion(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 2a4ace098..26f09dcd0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -223,6 +223,12 @@ public class ScheduleMessageService extends ConfigManager {
result = result && this.correctDelayOffset();
return result;
}
+
+ public boolean loadWhenSyncDelayOffset() {
+ boolean result = super.load();
+ result = result && this.parseDelayLevel();
+ return result;
+ }
public boolean correctDelayOffset() {
try {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index b9de5173b..53cdecdf8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -152,7 +152,7 @@ public class SlaveSynchronize {
.getMessageStoreConfig().getStorePathRootDir());
try {
MixAll.string2File(delayOffset, fileName);
- this.brokerController.getScheduleMessageService().load();
+ this.brokerController.getScheduleMessageService().loadWhenSyncDelayOffset();
} catch (IOException e) {
LOGGER.error("Persist file Exception, {}", fileName, e);
}
--
2.32.0.windows.2