1426 lines
70 KiB
Diff
1426 lines
70 KiB
Diff
From 90c5382aee07879a80309f257f04114201ccaac6 Mon Sep 17 00:00:00 2001
|
||
From: ShuangxiDing <dingshuangxi888@gmail.com>
|
||
Date: Fri, 21 Jul 2023 20:28:58 +0800
|
||
Subject: [PATCH 01/10] [ISSUE #7061] Support forward HAProxyMessage for Multi
|
||
Protocol server. (#7062)
|
||
MIME-Version: 1.0
|
||
Content-Type: text/plain; charset=UTF-8
|
||
Content-Transfer-Encoding: 8bit
|
||
|
||
* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator
|
||
|
||
* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator
|
||
|
||
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
|
||
|
||
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
|
||
|
||
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
|
||
|
||
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
|
||
|
||
* Support proxy protocol for gRPC server.
|
||
|
||
* Support proxy protocol for gRPC server.
|
||
|
||
* Support proxy protocol for gRPC server.
|
||
|
||
* Support proxy protocol for gRPC server.
|
||
|
||
* Support proxy protocol for gRPC server.
|
||
|
||
* Support proxy protocol for gRPC and Remoting server.
|
||
|
||
* 回滚netty的升级
|
||
|
||
* Support proxy protocol for gRPC and Remoting server.
|
||
|
||
* Support proxy protocol for gRPC and Remoting server.
|
||
|
||
* Support proxy protocol for gRPC and Remoting server.
|
||
|
||
* add grpc-netty-codec-haproxy in bazel
|
||
|
||
* add grpc-netty-codec-haproxy in bazel
|
||
|
||
* Support proxy protocol for gRPC and Remoting server.
|
||
|
||
* Fix Test
|
||
|
||
* add grpc-netty-codec-haproxy in bazel
|
||
|
||
* add ProxyProtocolTest for Remoting
|
||
|
||
* Support HAProxyMessage forward for multi protocol server.
|
||
|
||
---------
|
||
|
||
Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com>
|
||
---
|
||
.../http2proxy/HAProxyMessageForwarder.java | 129 ++++++++++++++++++
|
||
.../http2proxy/Http2ProtocolProxyHandler.java | 23 +++-
|
||
.../http2proxy/Http2ProxyBackendHandler.java | 2 +
|
||
.../http2proxy/Http2ProxyFrontendHandler.java | 28 ++--
|
||
4 files changed, 164 insertions(+), 18 deletions(-)
|
||
create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
|
||
|
||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
|
||
new file mode 100644
|
||
index 000000000..8f139d3d9
|
||
--- /dev/null
|
||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
|
||
@@ -0,0 +1,129 @@
|
||
+/*
|
||
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
||
+ * contributor license agreements. See the NOTICE file distributed with
|
||
+ * this work for additional information regarding copyright ownership.
|
||
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
||
+ * (the "License"); you may not use this file except in compliance with
|
||
+ * the License. You may obtain a copy of the License at
|
||
+ *
|
||
+ * http://www.apache.org/licenses/LICENSE-2.0
|
||
+ *
|
||
+ * Unless required by applicable law or agreed to in writing, software
|
||
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
||
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
+ * See the License for the specific language governing permissions and
|
||
+ * limitations under the License.
|
||
+ */
|
||
+
|
||
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
|
||
+
|
||
+import io.netty.buffer.ByteBuf;
|
||
+import io.netty.buffer.Unpooled;
|
||
+import io.netty.channel.Channel;
|
||
+import io.netty.channel.ChannelHandlerContext;
|
||
+import io.netty.channel.ChannelInboundHandlerAdapter;
|
||
+import io.netty.handler.codec.haproxy.HAProxyCommand;
|
||
+import io.netty.handler.codec.haproxy.HAProxyMessage;
|
||
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
|
||
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
|
||
+import io.netty.handler.codec.haproxy.HAProxyTLV;
|
||
+import io.netty.util.Attribute;
|
||
+import io.netty.util.DefaultAttributeMap;
|
||
+import org.apache.commons.codec.binary.Hex;
|
||
+import org.apache.commons.lang3.ArrayUtils;
|
||
+import org.apache.commons.lang3.StringUtils;
|
||
+import org.apache.commons.lang3.reflect.FieldUtils;
|
||
+import org.apache.rocketmq.acl.common.AclUtils;
|
||
+import org.apache.rocketmq.common.constant.HAProxyConstants;
|
||
+import org.apache.rocketmq.common.constant.LoggerName;
|
||
+import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
|
||
+
|
||
+import java.lang.reflect.Field;
|
||
+import java.nio.charset.Charset;
|
||
+import java.util.ArrayList;
|
||
+import java.util.List;
|
||
+
|
||
+public class HAProxyMessageForwarder extends ChannelInboundHandlerAdapter {
|
||
+
|
||
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
|
||
+
|
||
+ private static final Field FIELD_ATTRIBUTE =
|
||
+ FieldUtils.getField(DefaultAttributeMap.class, "attributes", true);
|
||
+
|
||
+ private final Channel outboundChannel;
|
||
+
|
||
+ public HAProxyMessageForwarder(final Channel outboundChannel) {
|
||
+ this.outboundChannel = outboundChannel;
|
||
+ }
|
||
+
|
||
+ @Override
|
||
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||
+ try {
|
||
+ forwardHAProxyMessage(ctx.channel(), outboundChannel);
|
||
+ ctx.fireChannelRead(msg);
|
||
+ } catch (Exception e) {
|
||
+ log.error("Forward HAProxyMessage from Remoting to gRPC server error.", e);
|
||
+ throw e;
|
||
+ } finally {
|
||
+ ctx.pipeline().remove(this);
|
||
+ }
|
||
+ }
|
||
+
|
||
+ private void forwardHAProxyMessage(Channel inboundChannel, Channel outboundChannel) throws Exception {
|
||
+ if (!inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
|
||
+ return;
|
||
+ }
|
||
+
|
||
+ if (!(inboundChannel instanceof DefaultAttributeMap)) {
|
||
+ return;
|
||
+ }
|
||
+
|
||
+ Attribute<?>[] attributes = (Attribute<?>[]) FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel);
|
||
+ if (ArrayUtils.isEmpty(attributes)) {
|
||
+ return;
|
||
+ }
|
||
+
|
||
+ String sourceAddress = null, destinationAddress = null;
|
||
+ int sourcePort = 0, destinationPort = 0;
|
||
+ List<HAProxyTLV> haProxyTLVs = new ArrayList<>();
|
||
+
|
||
+ for (Attribute<?> attribute : attributes) {
|
||
+ String attributeKey = attribute.key().name();
|
||
+ if (!StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
|
||
+ continue;
|
||
+ }
|
||
+ String attributeValue = (String) attribute.get();
|
||
+ if (StringUtils.isEmpty(attributeValue)) {
|
||
+ continue;
|
||
+ }
|
||
+ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_ADDR) {
|
||
+ sourceAddress = attributeValue;
|
||
+ }
|
||
+ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_PORT) {
|
||
+ sourcePort = Integer.parseInt(attributeValue);
|
||
+ }
|
||
+ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR) {
|
||
+ destinationAddress = attributeValue;
|
||
+ }
|
||
+ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_PORT) {
|
||
+ destinationPort = Integer.parseInt(attributeValue);
|
||
+ }
|
||
+ if (StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
|
||
+ String typeString = StringUtils.substringAfter(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX);
|
||
+ ByteBuf byteBuf = Unpooled.buffer();
|
||
+ byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset()));
|
||
+ HAProxyTLV haProxyTLV = new HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf);
|
||
+ haProxyTLVs.add(haProxyTLV);
|
||
+ }
|
||
+ }
|
||
+
|
||
+ HAProxyProxiedProtocol proxiedProtocol = AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 :
|
||
+ HAProxyProxiedProtocol.TCP4;
|
||
+
|
||
+ HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY,
|
||
+ proxiedProtocol, sourceAddress, destinationAddress, sourcePort, destinationPort, haProxyTLVs);
|
||
+ outboundChannel.writeAndFlush(message).sync();
|
||
+ }
|
||
+}
|
||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
|
||
index 913f35c93..c37db92af 100644
|
||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
|
||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
|
||
@@ -24,13 +24,14 @@ import io.netty.channel.ChannelFuture;
|
||
import io.netty.channel.ChannelHandlerContext;
|
||
import io.netty.channel.ChannelInitializer;
|
||
import io.netty.channel.ChannelOption;
|
||
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
|
||
import io.netty.handler.ssl.ApplicationProtocolConfig;
|
||
import io.netty.handler.ssl.ApplicationProtocolNames;
|
||
import io.netty.handler.ssl.SslContext;
|
||
import io.netty.handler.ssl.SslContextBuilder;
|
||
+import io.netty.handler.ssl.SslHandler;
|
||
import io.netty.handler.ssl.SslProvider;
|
||
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
|
||
-import javax.net.ssl.SSLException;
|
||
import org.apache.rocketmq.common.constant.LoggerName;
|
||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||
@@ -38,8 +39,11 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
||
import org.apache.rocketmq.proxy.config.ProxyConfig;
|
||
import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
|
||
import org.apache.rocketmq.remoting.common.TlsMode;
|
||
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
|
||
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
|
||
|
||
+import javax.net.ssl.SSLException;
|
||
+
|
||
public class Http2ProtocolProxyHandler implements ProtocolHandler {
|
||
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
|
||
private static final String LOCAL_HOST = "127.0.0.1";
|
||
@@ -101,11 +105,8 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler {
|
||
.handler(new ChannelInitializer<Channel>() {
|
||
@Override
|
||
protected void initChannel(Channel ch) throws Exception {
|
||
- if (sslContext != null) {
|
||
- ch.pipeline()
|
||
- .addLast(sslContext.newHandler(ch.alloc(), LOCAL_HOST, config.getGrpcServerPort()));
|
||
- }
|
||
- ch.pipeline().addLast(new Http2ProxyBackendHandler(inboundChannel));
|
||
+ ch.pipeline().addLast(null, Http2ProxyBackendHandler.HANDLER_NAME,
|
||
+ new Http2ProxyBackendHandler(inboundChannel));
|
||
}
|
||
})
|
||
.option(ChannelOption.AUTO_READ, false)
|
||
@@ -120,7 +121,15 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler {
|
||
}
|
||
|
||
final Channel outboundChannel = f.channel();
|
||
+ if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
|
||
+ ctx.pipeline().addLast(new HAProxyMessageForwarder(outboundChannel));
|
||
+ outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE);
|
||
+ }
|
||
|
||
- ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel));
|
||
+ SslHandler sslHandler = null;
|
||
+ if (sslContext != null) {
|
||
+ sslHandler = sslContext.newHandler(outboundChannel.alloc(), LOCAL_HOST, config.getGrpcServerPort());
|
||
+ }
|
||
+ ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel, sslHandler));
|
||
}
|
||
}
|
||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
|
||
index 0195b0c1c..fd5408fae 100644
|
||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
|
||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
|
||
@@ -29,6 +29,8 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||
public class Http2ProxyBackendHandler extends ChannelInboundHandlerAdapter {
|
||
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
|
||
|
||
+ public static final String HANDLER_NAME = "Http2ProxyBackendHandler";
|
||
+
|
||
private final Channel inboundChannel;
|
||
|
||
public Http2ProxyBackendHandler(Channel inboundChannel) {
|
||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
|
||
index 87147a322..9b37e85e5 100644
|
||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
|
||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
|
||
@@ -19,36 +19,42 @@ package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
|
||
|
||
import io.netty.buffer.Unpooled;
|
||
import io.netty.channel.Channel;
|
||
-import io.netty.channel.ChannelFuture;
|
||
import io.netty.channel.ChannelFutureListener;
|
||
import io.netty.channel.ChannelHandlerContext;
|
||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||
+import io.netty.handler.ssl.SslHandler;
|
||
import org.apache.rocketmq.common.constant.LoggerName;
|
||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||
|
||
public class Http2ProxyFrontendHandler extends ChannelInboundHandlerAdapter {
|
||
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
|
||
+
|
||
+ public static final String HANDLER_NAME = "SslHandler";
|
||
+
|
||
// As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as
|
||
// the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
|
||
private final Channel outboundChannel;
|
||
+ private final SslHandler sslHandler;
|
||
|
||
- public Http2ProxyFrontendHandler(final Channel outboundChannel) {
|
||
+ public Http2ProxyFrontendHandler(final Channel outboundChannel, final SslHandler sslHandler) {
|
||
this.outboundChannel = outboundChannel;
|
||
+ this.sslHandler = sslHandler;
|
||
}
|
||
|
||
@Override
|
||
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
|
||
if (outboundChannel.isActive()) {
|
||
- outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
|
||
- @Override
|
||
- public void operationComplete(ChannelFuture future) {
|
||
- if (future.isSuccess()) {
|
||
- // was able to flush out data, start to read the next chunk
|
||
- ctx.channel().read();
|
||
- } else {
|
||
- future.channel().close();
|
||
- }
|
||
+ if (sslHandler != null && outboundChannel.pipeline().get(HANDLER_NAME) == null) {
|
||
+ outboundChannel.pipeline().addBefore(Http2ProxyBackendHandler.HANDLER_NAME, HANDLER_NAME, sslHandler);
|
||
+ }
|
||
+
|
||
+ outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
|
||
+ if (future.isSuccess()) {
|
||
+ // was able to flush out data, start to read the next chunk
|
||
+ ctx.channel().read();
|
||
+ } else {
|
||
+ future.channel().close();
|
||
}
|
||
});
|
||
}
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 8027cfc7cbb6c120d2fc045e0caa8debe1028a31 Mon Sep 17 00:00:00 2001
|
||
From: maclong1989 <814742806@qq.com>
|
||
Date: Sun, 23 Jul 2023 09:15:05 +0800
|
||
Subject: [PATCH 02/10] [ISSUE #7063] doc: fix typo in user_guide.md
|
||
|
||
Signed-off-by: jiangyl3 <jiangyl3@asiainfo.com>
|
||
Co-authored-by: jiangyl3 <jiangyl3@asiainfo.com>
|
||
---
|
||
docs/cn/msg_trace/user_guide.md | 2 +-
|
||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||
|
||
diff --git a/docs/cn/msg_trace/user_guide.md b/docs/cn/msg_trace/user_guide.md
|
||
index d8314052b..9cf139fd3 100644
|
||
--- a/docs/cn/msg_trace/user_guide.md
|
||
+++ b/docs/cn/msg_trace/user_guide.md
|
||
@@ -35,7 +35,7 @@ namesrvAddr=XX.XX.XX.XX:9876
|
||
RocketMQ集群中每一个Broker节点均用于存储Client端收集并发送过来的消息轨迹数据。因此,对于RocketMQ集群中的Broker节点数量并无要求和限制。
|
||
|
||
### 2.3 物理IO隔离模式
|
||
-对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RockeMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。
|
||
+对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RocketMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。
|
||
|
||
### 2.4 启动开启消息轨迹的Broker
|
||
`nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &`
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 3102758487f3e21e977424d7f1b7187eb6c069cb Mon Sep 17 00:00:00 2001
|
||
From: =?UTF-8?q?=E5=90=B4=E6=98=9F=E7=81=BF?=
|
||
<37405937+wuyoudexiao@users.noreply.github.com>
|
||
Date: Tue, 25 Jul 2023 13:47:53 +0800
|
||
Subject: [PATCH 03/10] fix: npe in lockBatchMQ and unlockBatchMQ (#7078)
|
||
|
||
Co-authored-by: wxc <wuxingcan666@foxmail.com>
|
||
---
|
||
.../rocketmq/proxy/processor/ConsumerProcessor.java | 11 +++++++----
|
||
1 file changed, 7 insertions(+), 4 deletions(-)
|
||
|
||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
|
||
index cc973813b..656a6339d 100644
|
||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
|
||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
|
||
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.processor;
|
||
|
||
import java.util.ArrayList;
|
||
import java.util.HashMap;
|
||
+import java.util.HashSet;
|
||
import java.util.List;
|
||
import java.util.Map;
|
||
import java.util.Set;
|
||
@@ -425,13 +426,15 @@ public class ConsumerProcessor extends AbstractProcessor {
|
||
}
|
||
|
||
protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext ctx, Set<MessageQueue> mqSet) {
|
||
- return mqSet.stream().map(mq -> {
|
||
+ Set<AddressableMessageQueue> addressableMessageQueueSet = new HashSet<>(mqSet.size());
|
||
+ for (MessageQueue mq:mqSet) {
|
||
try {
|
||
- return serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq);
|
||
+ addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq)) ;
|
||
} catch (Exception e) {
|
||
- return null;
|
||
+ log.error("build addressable message queue fail, messageQueue = {}", mq, e);
|
||
}
|
||
- }).collect(Collectors.toSet());
|
||
+ }
|
||
+ return addressableMessageQueueSet;
|
||
}
|
||
|
||
protected HashMap<String, List<AddressableMessageQueue>> buildAddressableMapByBrokerName(
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 047ef7498f2203a2234052603a99a114d8a65e17 Mon Sep 17 00:00:00 2001
|
||
From: rongtong <jinrongtong5@163.com>
|
||
Date: Tue, 25 Jul 2023 14:00:22 +0800
|
||
Subject: [PATCH 04/10] Ensuring consistency between broker and nameserver data
|
||
when deleting a topic (#7066)
|
||
MIME-Version: 1.0
|
||
Content-Type: text/plain; charset=UTF-8
|
||
Content-Transfer-Encoding: 8bit
|
||
|
||
Co-authored-by: 尘央 <xinyuzhou.zxy@alibaba-inc.com>
|
||
---
|
||
.../rocketmq/broker/BrokerController.java | 11 ++
|
||
.../rocketmq/broker/out/BrokerOuterAPI.java | 62 ++++++++++
|
||
.../processor/AdminBrokerProcessor.java | 26 ++--
|
||
.../broker/topic/TopicConfigManager.java | 6 +-
|
||
.../apache/rocketmq/common/BrokerConfig.java | 14 +++
|
||
.../common/namesrv/NamesrvConfig.java | 17 +++
|
||
.../namesrv/routeinfo/RouteInfoManager.java | 64 ++++++++--
|
||
.../routeinfo/RouteInfoManagerNewTest.java | 99 +++++++++++++++
|
||
.../rocketmq/test/util/MQAdminTestUtils.java | 37 ++++++
|
||
.../dledger/DLedgerProduceAndConsumeIT.java | 2 +-
|
||
.../test/route/CreateAndUpdateTopicIT.java | 114 ++++++++++++++++++
|
||
11 files changed, 429 insertions(+), 23 deletions(-)
|
||
rename test/src/test/java/org/apache/rocketmq/test/{base => }/dledger/DLedgerProduceAndConsumeIT.java (99%)
|
||
create mode 100644 test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
|
||
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
||
index 196401e26..972457194 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
||
@@ -1678,6 +1678,17 @@ public class BrokerController {
|
||
}, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));
|
||
}
|
||
|
||
+ public synchronized void registerSingleTopicAll(final TopicConfig topicConfig) {
|
||
+ TopicConfig tmpTopic = topicConfig;
|
||
+ if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|
||
+ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
|
||
+ // Copy the topic config and modify the perm
|
||
+ tmpTopic = new TopicConfig(topicConfig);
|
||
+ tmpTopic.setPerm(topicConfig.getPerm() & this.brokerConfig.getBrokerPermission());
|
||
+ }
|
||
+ this.brokerOuterAPI.registerSingleTopicAll(this.brokerConfig.getBrokerName(), tmpTopic, 3000);
|
||
+ }
|
||
+
|
||
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
|
||
this.registerIncrementBrokerData(Collections.singletonList(topicConfig), dataVersion);
|
||
}
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
|
||
index b6273e9ed..1793a83c0 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
|
||
@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.LockCallback;
|
||
import org.apache.rocketmq.common.MixAll;
|
||
import org.apache.rocketmq.common.Pair;
|
||
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
||
+import org.apache.rocketmq.common.TopicConfig;
|
||
import org.apache.rocketmq.common.UnlockCallback;
|
||
import org.apache.rocketmq.common.UtilAll;
|
||
import org.apache.rocketmq.common.constant.LoggerName;
|
||
@@ -120,12 +121,14 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionRequ
|
||
import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResponseHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
|
||
+import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterTopicRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
|
||
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
|
||
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
|
||
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
|
||
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
|
||
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
|
||
import org.apache.rocketmq.remoting.rpc.ClientMetadata;
|
||
import org.apache.rocketmq.remoting.rpc.RpcClient;
|
||
@@ -614,6 +617,65 @@ public class BrokerOuterAPI {
|
||
throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
|
||
}
|
||
|
||
+ /**
|
||
+ * Register the topic route info of single topic to all name server nodes.
|
||
+ * This method is used to replace incremental broker registration feature.
|
||
+ */
|
||
+ public void registerSingleTopicAll(
|
||
+ final String brokerName,
|
||
+ final TopicConfig topicConfig,
|
||
+ final int timeoutMills) {
|
||
+ String topic = topicConfig.getTopicName();
|
||
+ RegisterTopicRequestHeader requestHeader = new RegisterTopicRequestHeader();
|
||
+ requestHeader.setTopic(topic);
|
||
+
|
||
+ TopicRouteData topicRouteData = new TopicRouteData();
|
||
+ List<QueueData> queueDatas = new ArrayList<>();
|
||
+ topicRouteData.setQueueDatas(queueDatas);
|
||
+
|
||
+ final QueueData queueData = new QueueData();
|
||
+ queueData.setBrokerName(brokerName);
|
||
+ queueData.setPerm(topicConfig.getPerm());
|
||
+ queueData.setReadQueueNums(topicConfig.getReadQueueNums());
|
||
+ queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
|
||
+ queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());
|
||
+ queueDatas.add(queueData);
|
||
+ final byte[] topicRouteBody = topicRouteData.encode();
|
||
+
|
||
+
|
||
+ List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
|
||
+ final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
|
||
+ for (final String namesrvAddr : nameServerAddressList) {
|
||
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_TOPIC_IN_NAMESRV, requestHeader);
|
||
+ request.setBody(topicRouteBody);
|
||
+
|
||
+ try {
|
||
+ brokerOuterExecutor.execute(() -> {
|
||
+ try {
|
||
+ RemotingCommand response = BrokerOuterAPI.this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
|
||
+ assert response != null;
|
||
+ LOGGER.info("Register single topic %s to broker %s with response code %s", topic, brokerName, response.getCode());
|
||
+ } catch (Exception e) {
|
||
+ LOGGER.warn(String.format("Register single topic %s to broker %s exception", topic, brokerName), e);
|
||
+ } finally {
|
||
+ countDownLatch.countDown();
|
||
+ }
|
||
+ });
|
||
+ } catch (Exception e) {
|
||
+ LOGGER.warn("Execute single topic registration task failed, topic {}, broker name {}", topic, brokerName);
|
||
+ countDownLatch.countDown();
|
||
+ }
|
||
+
|
||
+ }
|
||
+
|
||
+ try {
|
||
+ if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
|
||
+ LOGGER.warn("Registration single topic to one or more name servers timeout. Timeout threshold: {}ms", timeoutMills);
|
||
+ }
|
||
+ } catch (InterruptedException ignore) {
|
||
+ }
|
||
+ }
|
||
+
|
||
public List<Boolean> needRegister(
|
||
final String clusterName,
|
||
final String brokerAddr,
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||
index 892a71330..569a1c57b 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
|
||
@@ -441,13 +441,18 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||
|
||
try {
|
||
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
|
||
- this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
|
||
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
||
+ this.brokerController.registerSingleTopicAll(topicConfig);
|
||
+ } else {
|
||
+ this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
|
||
+ }
|
||
response.setCode(ResponseCode.SUCCESS);
|
||
} catch (Exception e) {
|
||
LOGGER.error("Update / create topic failed for [{}]", request, e);
|
||
response.setCode(ResponseCode.SYSTEM_ERROR);
|
||
response.setRemark(e.getMessage());
|
||
}
|
||
+
|
||
return response;
|
||
}
|
||
|
||
@@ -769,7 +774,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||
return response;
|
||
}
|
||
|
||
- private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx, RemotingCommand request) {
|
||
+ private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx,
|
||
+ RemotingCommand request) {
|
||
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
|
||
LOGGER.info("updateColdDataFlowCtrGroupConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
|
||
|
||
@@ -876,7 +882,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||
}
|
||
MessageStore messageStore = this.brokerController.getMessageStore();
|
||
if (messageStore instanceof DefaultMessageStore) {
|
||
- DefaultMessageStore defaultMessageStore = (DefaultMessageStore)messageStore;
|
||
+ DefaultMessageStore defaultMessageStore = (DefaultMessageStore) messageStore;
|
||
if (mode == LibC.MADV_NORMAL) {
|
||
defaultMessageStore.getMessageStoreConfig().setDataReadAheadEnable(true);
|
||
} else {
|
||
@@ -1835,13 +1841,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
|
||
/**
|
||
* Reset consumer offset.
|
||
*
|
||
- * @param topic Required, not null.
|
||
- * @param group Required, not null.
|
||
- * @param queueId if target queue ID is negative, all message queues will be reset;
|
||
- * otherwise, only the target queue would get reset.
|
||
- * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being;
|
||
- * otherwise, binary search is performed to locate target offset.
|
||
- * @param offset Target offset to reset to if target queue ID is properly provided.
|
||
+ * @param topic Required, not null.
|
||
+ * @param group Required, not null.
|
||
+ * @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue
|
||
+ * would get reset.
|
||
+ * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; otherwise,
|
||
+ * binary search is performed to locate target offset.
|
||
+ * @param offset Target offset to reset to if target queue ID is properly provided.
|
||
* @return Affected queues and their new offset
|
||
*/
|
||
private RemotingCommand resetOffsetInner(String topic, String group, int queueId, long timestamp, Long offset) {
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
||
index e5fdd8675..e90530512 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
||
@@ -305,7 +305,11 @@ public class TopicConfigManager extends ConfigManager {
|
||
log.error("createTopicIfAbsent ", e);
|
||
}
|
||
if (createNew && register) {
|
||
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
||
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
||
+ this.brokerController.registerSingleTopicAll(topicConfig);
|
||
+ } else {
|
||
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
||
+ }
|
||
}
|
||
return this.topicConfigTable.get(topicConfig.getTopicName());
|
||
}
|
||
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||
index a4d82d1c5..02c692e2b 100644
|
||
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||
@@ -386,6 +386,12 @@ public class BrokerConfig extends BrokerIdentity {
|
||
*/
|
||
private boolean popResponseReturnActualRetryTopic = false;
|
||
|
||
+ /**
|
||
+ * If both the deleteTopicWithBrokerRegistration flag in the NameServer configuration and this flag are set to true,
|
||
+ * it guarantees the ultimate consistency of data between the broker and the nameserver during topic deletion.
|
||
+ */
|
||
+ private boolean enableSingleTopicRegister = false;
|
||
+
|
||
public long getMaxPopPollingSize() {
|
||
return maxPopPollingSize;
|
||
}
|
||
@@ -1689,4 +1695,12 @@ public class BrokerConfig extends BrokerIdentity {
|
||
public void setPopResponseReturnActualRetryTopic(boolean popResponseReturnActualRetryTopic) {
|
||
this.popResponseReturnActualRetryTopic = popResponseReturnActualRetryTopic;
|
||
}
|
||
+
|
||
+ public boolean isEnableSingleTopicRegister() {
|
||
+ return enableSingleTopicRegister;
|
||
+ }
|
||
+
|
||
+ public void setEnableSingleTopicRegister(boolean enableSingleTopicRegister) {
|
||
+ this.enableSingleTopicRegister = enableSingleTopicRegister;
|
||
+ }
|
||
}
|
||
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
|
||
index 700febfe2..5b8a6dedb 100644
|
||
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
|
||
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
|
||
@@ -82,6 +82,15 @@ public class NamesrvConfig {
|
||
|
||
private int waitSecondsForService = 45;
|
||
|
||
+ /**
|
||
+ * If enable this flag, the topics that don't exist in broker registration payload will be deleted from name server.
|
||
+ *
|
||
+ * WARNING:
|
||
+ * 1. Enable this flag and "enableSingleTopicRegister" of broker config meanwhile to avoid losing topic route info unexpectedly.
|
||
+ * 2. This flag does not support static topic currently.
|
||
+ */
|
||
+ private boolean deleteTopicWithBrokerRegistration = false;
|
||
+
|
||
public boolean isOrderMessageEnable() {
|
||
return orderMessageEnable;
|
||
}
|
||
@@ -241,4 +250,12 @@ public class NamesrvConfig {
|
||
public void setWaitSecondsForService(int waitSecondsForService) {
|
||
this.waitSecondsForService = waitSecondsForService;
|
||
}
|
||
+
|
||
+ public boolean isDeleteTopicWithBrokerRegistration() {
|
||
+ return deleteTopicWithBrokerRegistration;
|
||
+ }
|
||
+
|
||
+ public void setDeleteTopicWithBrokerRegistration(boolean deleteTopicWithBrokerRegistration) {
|
||
+ this.deleteTopicWithBrokerRegistration = deleteTopicWithBrokerRegistration;
|
||
+ }
|
||
}
|
||
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
|
||
index ac27d76ce..0055a1cc8 100644
|
||
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
|
||
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
|
||
@@ -121,9 +121,18 @@ public class RouteInfoManager {
|
||
if (queueDatas == null || queueDatas.isEmpty()) {
|
||
return;
|
||
}
|
||
+
|
||
try {
|
||
this.lock.writeLock().lockInterruptibly();
|
||
if (this.topicQueueTable.containsKey(topic)) {
|
||
+ Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
|
||
+ for (QueueData queueData : queueDatas) {
|
||
+ if (!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
|
||
+ log.warn("Register topic contains illegal broker, {}, {}", topic, queueData);
|
||
+ return;
|
||
+ }
|
||
+ queueDataMap.put(queueData.getBrokerName(), queueData);
|
||
+ }
|
||
log.info("Topic route already exist.{}, {}", topic, this.topicQueueTable.get(topic));
|
||
} else {
|
||
// check and construct queue data map
|
||
@@ -299,7 +308,32 @@ public class RouteInfoManager {
|
||
|
||
ConcurrentMap<String, TopicConfig> tcTable =
|
||
topicConfigWrapper.getTopicConfigTable();
|
||
+
|
||
if (tcTable != null) {
|
||
+
|
||
+ TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
|
||
+ Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
|
||
+
|
||
+ // Delete the topics that don't exist in tcTable from the current broker
|
||
+ // Static topic is not supported currently
|
||
+ if (namesrvConfig.isDeleteTopicWithBrokerRegistration() && topicQueueMappingInfoMap.isEmpty()) {
|
||
+ final Set<String> oldTopicSet = topicSetOfBrokerName(brokerName);
|
||
+ final Set<String> newTopicSet = tcTable.keySet();
|
||
+ final Sets.SetView<String> toDeleteTopics = Sets.difference(oldTopicSet, newTopicSet);
|
||
+ for (final String toDeleteTopic : toDeleteTopics) {
|
||
+ Map<String, QueueData> queueDataMap = topicQueueTable.get(toDeleteTopic);
|
||
+ final QueueData removedQD = queueDataMap.remove(brokerName);
|
||
+ if (removedQD != null) {
|
||
+ log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, toDeleteTopic, removedQD);
|
||
+ }
|
||
+
|
||
+ if (queueDataMap.isEmpty()) {
|
||
+ log.info("deleteTopic, remove the topic all queue {}", toDeleteTopic);
|
||
+ topicQueueTable.remove(toDeleteTopic);
|
||
+ }
|
||
+ }
|
||
+ }
|
||
+
|
||
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
|
||
if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
|
||
topicConfigWrapper.getDataVersion(), brokerName,
|
||
@@ -312,19 +346,17 @@ public class RouteInfoManager {
|
||
this.createAndUpdateQueueData(brokerName, topicConfig);
|
||
}
|
||
}
|
||
- }
|
||
|
||
- if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
|
||
- TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
|
||
- Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
|
||
- //the topicQueueMappingInfoMap should never be null, but can be empty
|
||
- for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
|
||
- if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
|
||
- topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
|
||
+ if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
|
||
+ //the topicQueueMappingInfoMap should never be null, but can be empty
|
||
+ for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
|
||
+ if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
|
||
+ topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
|
||
+ }
|
||
+ //Note asset brokerName equal entry.getValue().getBname()
|
||
+ //here use the mappingDetail.bname
|
||
+ topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
|
||
}
|
||
- //Note asset brokerName equal entry.getValue().getBname()
|
||
- //here use the mappingDetail.bname
|
||
- topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
|
||
}
|
||
}
|
||
}
|
||
@@ -374,6 +406,16 @@ public class RouteInfoManager {
|
||
return result;
|
||
}
|
||
|
||
+ private Set<String> topicSetOfBrokerName(final String brokerName) {
|
||
+ Set<String> topicOfBroker = new HashSet<>();
|
||
+ for (final Entry<String, Map<String, QueueData>> entry : this.topicQueueTable.entrySet()) {
|
||
+ if (entry.getValue().containsKey(brokerName)) {
|
||
+ topicOfBroker.add(entry.getKey());
|
||
+ }
|
||
+ }
|
||
+ return topicOfBroker;
|
||
+ }
|
||
+
|
||
public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String brokerName) {
|
||
BrokerMemberGroup groupMember = new BrokerMemberGroup(clusterName, brokerName);
|
||
try {
|
||
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
|
||
index b53519e5f..6002d1f5a 100644
|
||
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
|
||
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
|
||
@@ -22,6 +22,7 @@ import io.netty.channel.Channel;
|
||
import java.time.Duration;
|
||
import java.util.ArrayList;
|
||
import java.util.Arrays;
|
||
+import java.util.Collections;
|
||
import java.util.List;
|
||
import java.util.concurrent.ConcurrentHashMap;
|
||
import java.util.concurrent.ConcurrentMap;
|
||
@@ -37,6 +38,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList;
|
||
import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
|
||
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
|
||
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
|
||
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
|
||
import org.junit.After;
|
||
import org.junit.Before;
|
||
@@ -624,6 +626,92 @@ public class RouteInfoManagerNewTest {
|
||
.containsValues(BrokerBasicInfo.defaultBroker().brokerAddr, BrokerBasicInfo.slaveBroker().brokerAddr);
|
||
}
|
||
|
||
+ @Test
|
||
+ public void keepTopicWithBrokerRegistration() {
|
||
+ RegisterBrokerResult masterResult = registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic", "TestTopic1");
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
|
||
+
|
||
+ masterResult = registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic1");
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
|
||
+ }
|
||
+
|
||
+ @Test
|
||
+ public void deleteTopicWithBrokerRegistration() {
|
||
+ config.setDeleteTopicWithBrokerRegistration(true);
|
||
+ registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic", "TestTopic1");
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
|
||
+
|
||
+ registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic1");
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
|
||
+ }
|
||
+
|
||
+ @Test
|
||
+ public void deleteTopicWithBrokerRegistration2() {
|
||
+ // Register two brokers and delete a specific one by one
|
||
+ config.setDeleteTopicWithBrokerRegistration(true);
|
||
+ final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker();
|
||
+ final BrokerBasicInfo master2 = BrokerBasicInfo.defaultBroker().name(DEFAULT_BROKER + 1).addr(DEFAULT_ADDR + 9);
|
||
+
|
||
+ registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1");
|
||
+ registerBrokerWithNormalTopic(master2, "TestTopic", "TestTopic1");
|
||
+
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(2);
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
|
||
+
|
||
+
|
||
+ registerBrokerWithNormalTopic(master1, "TestTopic1");
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(1);
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas().get(0).getBrokerName())
|
||
+ .isEqualTo(master2.brokerName);
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
|
||
+
|
||
+ registerBrokerWithNormalTopic(master2, "TestTopic1");
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
|
||
+ }
|
||
+
|
||
+ @Test
|
||
+ public void registerSingleTopicWithBrokerRegistration() {
|
||
+ config.setDeleteTopicWithBrokerRegistration(true);
|
||
+ final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker();
|
||
+
|
||
+ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic");
|
||
+
|
||
+ // Single topic registration failed because there is no broker connection exists
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
|
||
+
|
||
+ // Register broker with TestTopic first and then register single topic TestTopic1
|
||
+ registerBrokerWithNormalTopic(master1, "TestTopic");
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
|
||
+
|
||
+ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1");
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
|
||
+
|
||
+ // Register the two topics to keep the route info
|
||
+ registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1");
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
|
||
+
|
||
+ // Cancel the TestTopic1 with broker registration
|
||
+ registerBrokerWithNormalTopic(master1, "TestTopic");
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
|
||
+
|
||
+ // Add TestTopic1 and cancel all the topics with broker un-registration
|
||
+ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1");
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
|
||
+
|
||
+ routeInfoManager.unregisterBroker(master1.clusterName, master1.brokerAddr, master1.brokerName, 0);
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
|
||
+ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
|
||
+
|
||
+
|
||
+ }
|
||
+
|
||
private RegisterBrokerResult registerBrokerWithNormalTopic(BrokerBasicInfo brokerInfo, String... topics) {
|
||
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
|
||
TopicConfig baseTopic = new TopicConfig("baseTopic");
|
||
@@ -711,6 +799,17 @@ public class RouteInfoManagerNewTest {
|
||
return registerBrokerResult;
|
||
}
|
||
|
||
+ private void registerSingleTopicWithBrokerName(String brokerName, String... topics) {
|
||
+ for (final String topic : topics) {
|
||
+ QueueData queueData = new QueueData();
|
||
+ queueData.setBrokerName(brokerName);
|
||
+ queueData.setReadQueueNums(8);
|
||
+ queueData.setWriteQueueNums(8);
|
||
+ queueData.setPerm(6);
|
||
+ routeInfoManager.registerTopic(topic, Collections.singletonList(queueData));
|
||
+ }
|
||
+ }
|
||
+
|
||
static class BrokerBasicInfo {
|
||
String clusterName;
|
||
String brokerName;
|
||
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
|
||
index 11b00a72c..d3d5de9e2 100644
|
||
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
|
||
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
|
||
@@ -17,6 +17,7 @@
|
||
|
||
package org.apache.rocketmq.test.util;
|
||
|
||
+import java.util.Collections;
|
||
import java.util.HashMap;
|
||
import java.util.Map;
|
||
import java.util.Set;
|
||
@@ -38,6 +39,7 @@ import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
|
||
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
|
||
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
|
||
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
|
||
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
|
||
import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
|
||
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingOne;
|
||
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
|
||
@@ -319,4 +321,39 @@ public class MQAdminTestUtils {
|
||
}
|
||
return consumeStats;
|
||
}
|
||
+
|
||
+ /**
|
||
+ * Delete topic from broker only without cleaning route info from name server forwardly
|
||
+ *
|
||
+ * @param nameSrvAddr the namesrv addr to connect
|
||
+ * @param brokerName the specific broker
|
||
+ * @param topic the specific topic to delete
|
||
+ */
|
||
+ public static void deleteTopicFromBrokerOnly(String nameSrvAddr, String brokerName, String topic) {
|
||
+ DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
|
||
+ mqAdminExt.setNamesrvAddr(nameSrvAddr);
|
||
+
|
||
+ try {
|
||
+ mqAdminExt.start();
|
||
+ String brokerAddr = CommandUtil.fetchMasterAddrByBrokerName(mqAdminExt, brokerName);
|
||
+ mqAdminExt.deleteTopicInBroker(Collections.singleton(brokerAddr), topic);
|
||
+ } catch (Exception ignored) {
|
||
+ } finally {
|
||
+ mqAdminExt.shutdown();
|
||
+ }
|
||
+ }
|
||
+
|
||
+ public static TopicRouteData examineTopicRouteInfo(String nameSrvAddr, String topicName) {
|
||
+ DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
|
||
+ mqAdminExt.setNamesrvAddr(nameSrvAddr);
|
||
+ TopicRouteData route = null;
|
||
+ try {
|
||
+ mqAdminExt.start();
|
||
+ route = mqAdminExt.examineTopicRouteInfo(topicName);
|
||
+ } catch (Exception ignored) {
|
||
+ } finally {
|
||
+ mqAdminExt.shutdown();
|
||
+ }
|
||
+ return route;
|
||
+ }
|
||
}
|
||
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
|
||
similarity index 99%
|
||
rename from test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
|
||
rename to test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
|
||
index 9e142eb61..43fefd616 100644
|
||
--- a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
|
||
+++ b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
|
||
@@ -14,7 +14,7 @@
|
||
* See the License for the specific language governing permissions and
|
||
* limitations under the License.
|
||
*/
|
||
-package org.apache.rocketmq.test.base.dledger;
|
||
+package org.apache.rocketmq.test.dledger;
|
||
|
||
import java.util.UUID;
|
||
import org.apache.rocketmq.broker.BrokerController;
|
||
diff --git a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
|
||
new file mode 100644
|
||
index 000000000..7e3c7b871
|
||
--- /dev/null
|
||
+++ b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
|
||
@@ -0,0 +1,114 @@
|
||
+/*
|
||
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
||
+ * contributor license agreements. See the NOTICE file distributed with
|
||
+ * this work for additional information regarding copyright ownership.
|
||
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
||
+ * (the "License"); you may not use this file except in compliance with
|
||
+ * the License. You may obtain a copy of the License at
|
||
+ *
|
||
+ * http://www.apache.org/licenses/LICENSE-2.0
|
||
+ *
|
||
+ * Unless required by applicable law or agreed to in writing, software
|
||
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
||
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
+ * See the License for the specific language governing permissions and
|
||
+ * limitations under the License.
|
||
+ */
|
||
+
|
||
+package org.apache.rocketmq.test.route;
|
||
+
|
||
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
|
||
+import org.apache.rocketmq.test.base.BaseConf;
|
||
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
|
||
+import org.junit.Test;
|
||
+
|
||
+import static org.assertj.core.api.Assertions.assertThat;
|
||
+
|
||
+public class CreateAndUpdateTopicIT extends BaseConf {
|
||
+
|
||
+ @Test
|
||
+ public void testCreateOrUpdateTopic_EnableSingleTopicRegistration() {
|
||
+ String topic = "test-topic-without-broker-registration";
|
||
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
|
||
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
|
||
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
|
||
+
|
||
+ final boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, topic, 8, null);
|
||
+ assertThat(createResult).isTrue();
|
||
+
|
||
+ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, topic);
|
||
+ assertThat(route.getBrokerDatas()).hasSize(3);
|
||
+ assertThat(route.getQueueDatas()).hasSize(3);
|
||
+
|
||
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
|
||
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
|
||
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
|
||
+
|
||
+ }
|
||
+
|
||
+ @Test
|
||
+ public void testDeleteTopicFromNameSrvWithBrokerRegistration() {
|
||
+ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true);
|
||
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
|
||
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
|
||
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
|
||
+
|
||
+ String testTopic1 = "test-topic-keep-route";
|
||
+ String testTopic2 = "test-topic-delete-route";
|
||
+
|
||
+ boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic1, 8, null);
|
||
+ assertThat(createResult).isTrue();
|
||
+
|
||
+
|
||
+ createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic2, 8, null);
|
||
+ assertThat(createResult).isTrue();
|
||
+
|
||
+
|
||
+ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2);
|
||
+ assertThat(route.getBrokerDatas()).hasSize(3);
|
||
+
|
||
+ MQAdminTestUtils.deleteTopicFromBrokerOnly(NAMESRV_ADDR, BROKER1_NAME, testTopic2);
|
||
+
|
||
+ // Deletion is lazy, trigger broker registration
|
||
+ brokerController1.registerBrokerAll(false, false, true);
|
||
+
|
||
+ // The route info of testTopic2 will be removed from broker1 after the registration
|
||
+ route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2);
|
||
+ assertThat(route.getBrokerDatas()).hasSize(2);
|
||
+ assertThat(route.getQueueDatas().get(0).getBrokerName()).isEqualTo(BROKER2_NAME);
|
||
+ assertThat(route.getQueueDatas().get(1).getBrokerName()).isEqualTo(BROKER3_NAME);
|
||
+
|
||
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
|
||
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
|
||
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
|
||
+ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
|
||
+ }
|
||
+
|
||
+ @Test
|
||
+ public void testStaticTopicNotAffected() throws Exception {
|
||
+ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true);
|
||
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
|
||
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
|
||
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
|
||
+
|
||
+ String testTopic = "test-topic-not-affected";
|
||
+ String testStaticTopic = "test-static-topic";
|
||
+
|
||
+ boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic, 8, null);
|
||
+ assertThat(createResult).isTrue();
|
||
+
|
||
+ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic);
|
||
+ assertThat(route.getBrokerDatas()).hasSize(3);
|
||
+ assertThat(route.getQueueDatas()).hasSize(3);
|
||
+
|
||
+ MQAdminTestUtils.createStaticTopicWithCommand(testStaticTopic, 10, null, CLUSTER_NAME, NAMESRV_ADDR);
|
||
+
|
||
+ assertThat(route.getBrokerDatas()).hasSize(3);
|
||
+ assertThat(route.getQueueDatas()).hasSize(3);
|
||
+
|
||
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
|
||
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
|
||
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
|
||
+ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
|
||
+ }
|
||
+}
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 32eb1d55570af81641a4a40d96ff5554329b93cb Mon Sep 17 00:00:00 2001
|
||
From: gaoyf <gaoyf@users.noreply.github.com>
|
||
Date: Tue, 25 Jul 2023 15:26:20 +0800
|
||
Subject: [PATCH 05/10] [ISSUE #7068] Fix failed to create syncer topic when
|
||
the proxy was just started (#7076)
|
||
|
||
---
|
||
.../apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java | 1 +
|
||
1 file changed, 1 insertion(+)
|
||
|
||
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
|
||
index f7d9b11ba..c68859b28 100644
|
||
--- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
|
||
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
|
||
@@ -104,6 +104,7 @@ public class MQClientAPIFactory implements StartAndShutdown {
|
||
rpcHook);
|
||
|
||
if (!mqClientAPIExt.updateNameServerAddressList()) {
|
||
+ mqClientAPIExt.fetchNameServerAddr();
|
||
this.scheduledExecutorService.scheduleAtFixedRate(
|
||
mqClientAPIExt::fetchNameServerAddr,
|
||
Duration.ofSeconds(10).toMillis(),
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From d79737788078707168c0258c4af0d800de32c137 Mon Sep 17 00:00:00 2001
|
||
From: Vincent Lee <cool8511@gmail.com>
|
||
Date: Thu, 27 Jul 2023 10:51:51 +0800
|
||
Subject: [PATCH 06/10] [ISSUE #7056] Avoid close success channel if invokeSync
|
||
most time cost on get connection for channel (#7057)
|
||
|
||
* fix: avoid close success channel if invokeSync most time cost on get channel
|
||
|
||
Change-Id: I29741cf55ac6333bfa30fef755357b78a22b1325
|
||
|
||
* fix: ci style
|
||
|
||
Change-Id: I8c9b86e9cb6f1463bf213e64c9b8c139afa794c8
|
||
---
|
||
.../rocketmq/remoting/netty/NettyRemotingClient.java | 11 ++++++++---
|
||
1 file changed, 8 insertions(+), 3 deletions(-)
|
||
|
||
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
|
||
index 9715b918a..8491f4354 100644
|
||
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
|
||
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
|
||
@@ -88,6 +88,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
|
||
|
||
private static final long LOCK_TIMEOUT_MILLIS = 3000;
|
||
+ private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100;
|
||
|
||
private final NettyClientConfig nettyClientConfig;
|
||
private final Bootstrap bootstrap = new Bootstrap();
|
||
@@ -524,13 +525,15 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
|
||
final Channel channel = this.getAndCreateChannel(addr);
|
||
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
|
||
if (channel != null && channel.isActive()) {
|
||
+ long left = timeoutMillis;
|
||
try {
|
||
doBeforeRpcHooks(channelRemoteAddr, request);
|
||
long costTime = System.currentTimeMillis() - beginStartTime;
|
||
- if (timeoutMillis < costTime) {
|
||
+ left -= costTime;
|
||
+ if (left <= 0) {
|
||
throw new RemotingTimeoutException("invokeSync call the addr[" + channelRemoteAddr + "] timeout");
|
||
}
|
||
- RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
|
||
+ RemotingCommand response = this.invokeSyncImpl(channel, request, left);
|
||
doAfterRpcHooks(channelRemoteAddr, request, response);
|
||
this.updateChannelLastResponseTime(addr);
|
||
return response;
|
||
@@ -539,7 +542,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
|
||
this.closeChannel(addr, channel);
|
||
throw e;
|
||
} catch (RemotingTimeoutException e) {
|
||
- if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
|
||
+ // avoid close the success channel if left timeout is small, since it may cost too much time in get the success channel, the left timeout for read is small
|
||
+ boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left > timeoutMillis / 4;
|
||
+ if (nettyClientConfig.isClientCloseSocketIfTimeout() && shouldClose) {
|
||
this.closeChannel(addr, channel);
|
||
LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, channelRemoteAddr);
|
||
}
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From d0a69be563785ca815dc31ef1aab4c1bc5588c01 Mon Sep 17 00:00:00 2001
|
||
From: zd46319 <zd46319@163.com>
|
||
Date: Thu, 27 Jul 2023 16:56:41 +0800
|
||
Subject: [PATCH 07/10] [ISSUE #6810] Fix the bug of mistakenly deleting data
|
||
in clientChannelTable when the channel expire (#7073)
|
||
|
||
---
|
||
.../broker/client/ProducerManager.java | 5 ++-
|
||
.../broker/client/ProducerManagerTest.java | 34 +++++++++++++++++++
|
||
2 files changed, 38 insertions(+), 1 deletion(-)
|
||
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
|
||
index 52d67bf28..f9fe1193e 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
|
||
@@ -112,7 +112,10 @@ public class ProducerManager {
|
||
long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
|
||
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
|
||
it.remove();
|
||
- clientChannelTable.remove(info.getClientId());
|
||
+ Channel channelInClientTable = clientChannelTable.get(info.getClientId());
|
||
+ if (channelInClientTable != null && channelInClientTable.equals(info.getChannel())) {
|
||
+ clientChannelTable.remove(info.getClientId());
|
||
+ }
|
||
log.warn(
|
||
"ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
|
||
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
|
||
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
|
||
index dac5468c8..3d6091e02 100644
|
||
--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
|
||
+++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
|
||
@@ -27,6 +27,7 @@ import org.junit.Before;
|
||
import org.junit.Test;
|
||
import org.junit.runner.RunWith;
|
||
import org.mockito.Mock;
|
||
+import org.mockito.Mockito;
|
||
import org.mockito.junit.MockitoJUnitRunner;
|
||
|
||
import static org.assertj.core.api.Assertions.assertThat;
|
||
@@ -79,6 +80,39 @@ public class ProducerManagerTest {
|
||
assertThat(producerManager.findChannel("clientId")).isNull();
|
||
}
|
||
|
||
+ @Test
|
||
+ public void scanNotActiveChannelWithSameClientId() throws Exception {
|
||
+ producerManager.registerProducer(group, clientInfo);
|
||
+ Channel channel1 = Mockito.mock(Channel.class);
|
||
+ ClientChannelInfo clientInfo1 = new ClientChannelInfo(channel1, clientInfo.getClientId(), LanguageCode.JAVA, 0);
|
||
+ producerManager.registerProducer(group, clientInfo1);
|
||
+ AtomicReference<String> groupRef = new AtomicReference<>();
|
||
+ AtomicReference<ClientChannelInfo> clientChannelInfoRef = new AtomicReference<>();
|
||
+ producerManager.appendProducerChangeListener((event, group, clientChannelInfo) -> {
|
||
+ switch (event) {
|
||
+ case GROUP_UNREGISTER:
|
||
+ groupRef.set(group);
|
||
+ break;
|
||
+ case CLIENT_UNREGISTER:
|
||
+ clientChannelInfoRef.set(clientChannelInfo);
|
||
+ break;
|
||
+ default:
|
||
+ break;
|
||
+ }
|
||
+ });
|
||
+ assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
|
||
+ assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull();
|
||
+ assertThat(producerManager.findChannel("clientId")).isNotNull();
|
||
+ Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
|
||
+ field.setAccessible(true);
|
||
+ long channelExpiredTimeout = field.getLong(producerManager);
|
||
+ clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - channelExpiredTimeout - 10);
|
||
+ when(channel.close()).thenReturn(mock(ChannelFuture.class));
|
||
+ producerManager.scanNotActiveChannel();
|
||
+ assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull();
|
||
+ assertThat(producerManager.findChannel("clientId")).isNotNull();
|
||
+ }
|
||
+
|
||
@Test
|
||
public void doChannelCloseEvent() throws Exception {
|
||
producerManager.registerProducer(group, clientInfo);
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From d429bd72dfae0901f4325c8e9c6ce631286e40d4 Mon Sep 17 00:00:00 2001
|
||
From: cnScarb <jjhfen00@163.com>
|
||
Date: Fri, 28 Jul 2023 09:46:39 +0800
|
||
Subject: [PATCH 08/10] [ISSUE #7039] Fix retry message filter when subtype is
|
||
TAG (#7040)
|
||
|
||
---
|
||
.../broker/filter/ExpressionForRetryMessageFilter.java | 6 +++---
|
||
1 file changed, 3 insertions(+), 3 deletions(-)
|
||
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
|
||
index d2d1087ef..bc01b21cb 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
|
||
@@ -45,12 +45,12 @@ public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
|
||
return true;
|
||
}
|
||
|
||
- boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
|
||
-
|
||
- if (!isRetryTopic && ExpressionType.isTagType(subscriptionData.getExpressionType())) {
|
||
+ if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
|
||
return true;
|
||
}
|
||
|
||
+ boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
|
||
+
|
||
ConsumerFilterData realFilterData = this.consumerFilterData;
|
||
Map<String, String> tempProperties = properties;
|
||
boolean decoded = false;
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 8baa51e85e569429293720b2ba7fcaee745abecc Mon Sep 17 00:00:00 2001
|
||
From: Zack_Aayush <60972989+AayushSaini101@users.noreply.github.com>
|
||
Date: Sun, 30 Jul 2023 09:02:02 +0530
|
||
Subject: [PATCH 09/10] [ISSUE #7091] Update the cd command in README (#7096)
|
||
|
||
* Update the cd command
|
||
|
||
* Removed extra space
|
||
|
||
---------
|
||
|
||
Co-authored-by: Aayush <aaayush@redhat.com>
|
||
---
|
||
README.md | 2 +-
|
||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||
|
||
diff --git a/README.md b/README.md
|
||
index 393ef88e6..56d253ce1 100644
|
||
--- a/README.md
|
||
+++ b/README.md
|
||
@@ -63,7 +63,7 @@ $ unzip rocketmq-all-5.1.3-bin-release.zip
|
||
|
||
Prepare a terminal and change to the extracted `bin` directory:
|
||
```shell
|
||
-$ cd rocketmq-all-5.1.3/bin
|
||
+$ cd rocketmq-all-5.1.3-bin-release/bin
|
||
```
|
||
|
||
**1) Start NameServer**
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 8bcc94829d2ef2597a8eeab3c6b7099432a0bea1 Mon Sep 17 00:00:00 2001
|
||
From: weihubeats <weihubeats@163.com>
|
||
Date: Tue, 1 Aug 2023 10:15:07 +0800
|
||
Subject: [PATCH 10/10] [ISSUE #7077] Schedule CQ offset invalid. offset=77,
|
||
cqMinOffset=0, cqMaxOffset=74, queueId=1 (#7084)
|
||
|
||
* Adding null does not update
|
||
|
||
* delete slave put correctDelayOffset
|
||
|
||
* Remove duplicate delayOffset file loading
|
||
|
||
* add loadWhenSyncDelayOffset
|
||
|
||
* add method
|
||
|
||
* add method
|
||
---
|
||
.../rocketmq/broker/schedule/ScheduleMessageService.java | 6 ++++++
|
||
.../org/apache/rocketmq/broker/slave/SlaveSynchronize.java | 2 +-
|
||
2 files changed, 7 insertions(+), 1 deletion(-)
|
||
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
|
||
index 2a4ace098..26f09dcd0 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
|
||
@@ -223,6 +223,12 @@ public class ScheduleMessageService extends ConfigManager {
|
||
result = result && this.correctDelayOffset();
|
||
return result;
|
||
}
|
||
+
|
||
+ public boolean loadWhenSyncDelayOffset() {
|
||
+ boolean result = super.load();
|
||
+ result = result && this.parseDelayLevel();
|
||
+ return result;
|
||
+ }
|
||
|
||
public boolean correctDelayOffset() {
|
||
try {
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
|
||
index b9de5173b..53cdecdf8 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
|
||
@@ -152,7 +152,7 @@ public class SlaveSynchronize {
|
||
.getMessageStoreConfig().getStorePathRootDir());
|
||
try {
|
||
MixAll.string2File(delayOffset, fileName);
|
||
- this.brokerController.getScheduleMessageService().load();
|
||
+ this.brokerController.getScheduleMessageService().loadWhenSyncDelayOffset();
|
||
} catch (IOException e) {
|
||
LOGGER.error("Persist file Exception, {}", fileName, e);
|
||
}
|
||
--
|
||
2.32.0.windows.2
|
||
|