From 0588847e899975fc6a9b61ccafdae25de4824bab Mon Sep 17 00:00:00 2001 From: shizhili Date: Fri, 8 Dec 2023 16:54:02 +0800 Subject: [PATCH] backport Introduce a new event NettyEventType.ACTIVE --- ...uce-new-event-NettyEventType.ACTIVEr.patch | 212 ++++++++++++++++++ rocketmq.spec | 6 +- 2 files changed, 217 insertions(+), 1 deletion(-) create mode 100644 patch029-backport-Introduce-new-event-NettyEventType.ACTIVEr.patch diff --git a/patch029-backport-Introduce-new-event-NettyEventType.ACTIVEr.patch b/patch029-backport-Introduce-new-event-NettyEventType.ACTIVEr.patch new file mode 100644 index 0000000..b352d47 --- /dev/null +++ b/patch029-backport-Introduce-new-event-NettyEventType.ACTIVEr.patch @@ -0,0 +1,212 @@ +From 2043dd50341e0a4a2f254d72aa3109f4dfc97aac Mon Sep 17 00:00:00 2001 +From: Zhouxiang Zhan +Date: Tue, 24 Oct 2023 10:29:43 +0800 +Subject: [PATCH] [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE + (#7494) + +* [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE for ChannelEventListener + +* introduce a new event NettyEventType.ACTIVE, + +* implement channelActive interface for NettyRemotingClient#NettyConnectManageHandler + +* add onChannelActive for ChannelEventListener interface. + +* Move send heartbeat to onChannelActive +--- + .../client/ClientHousekeepingService.java | 5 +++++ + .../client/impl/factory/MQClientInstance.java | 20 +++++++++++-------- + .../ContainerClientHouseKeepingService.java | 11 +++++++++- + .../controller/BrokerHousekeepingService.java | 5 +++++ + .../routeinfo/BrokerHousekeepingService.java | 5 +++++ + .../remoting/ClientHousekeepingService.java | 4 ++++ + .../remoting/ChannelEventListener.java | 2 ++ + .../remoting/netty/NettyEventType.java | 3 ++- + .../remoting/netty/NettyRemotingAbstract.java | 3 +++ + .../remoting/netty/NettyRemotingClient.java | 11 ++++++++++ + 10 files changed, 59 insertions(+), 10 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +index cbb81f632..7878d0eec 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +@@ -87,4 +87,9 @@ public class ClientHousekeepingService implements ChannelEventListener { + this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getBrokerStatsManager().incChannelIdleNum(); + } ++ ++ @Override ++ public void onChannelActive(String remoteAddr, Channel channel) { ++ ++ } + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +index 09534a176..ba72a6dce 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +@@ -159,14 +159,6 @@ public class MQClientInstance { + private final ConcurrentMap> brokerAddrTable = MQClientInstance.this.brokerAddrTable; + @Override + public void onChannelConnect(String remoteAddr, Channel channel) { +- for (Map.Entry> addressEntry : brokerAddrTable.entrySet()) { +- for (String address : addressEntry.getValue().values()) { +- if (address.equals(remoteAddr)) { +- sendHeartbeatToAllBrokerWithLockV2(false); +- break; +- } +- } +- } + } + + @Override +@@ -180,6 +172,18 @@ public class MQClientInstance { + @Override + public void onChannelIdle(String remoteAddr, Channel channel) { + } ++ ++ @Override ++ public void onChannelActive(String remoteAddr, Channel channel) { ++ for (Map.Entry> addressEntry : brokerAddrTable.entrySet()) { ++ for (String address : addressEntry.getValue().values()) { ++ if (address.equals(remoteAddr)) { ++ sendHeartbeatToAllBrokerWithLockV2(false); ++ break; ++ } ++ } ++ } ++ } + }; + } else { + channelEventListener = null; +diff --git a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java +index 8bf4b4a33..90c912247 100644 +--- a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java ++++ b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java +@@ -49,6 +49,11 @@ public class ContainerClientHouseKeepingService implements ChannelEventListener + onChannelOperation(CallbackCode.IDLE, remoteAddr, channel); + } + ++ @Override ++ public void onChannelActive(String remoteAddr, Channel channel) { ++ onChannelOperation(CallbackCode.ACTIVE, remoteAddr, channel); ++ } ++ + private void onChannelOperation(CallbackCode callbackCode, String remoteAddr, Channel channel) { + Collection masterBrokers = this.brokerContainer.getMasterBrokers(); + Collection slaveBrokers = this.brokerContainer.getSlaveBrokers(); +@@ -103,6 +108,10 @@ public class ContainerClientHouseKeepingService implements ChannelEventListener + /** + * onChannelIdle + */ +- IDLE ++ IDLE, ++ /** ++ * onChannelActive ++ */ ++ ACTIVE + } + } +diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java +index 652a9eeb0..d22d0b606 100644 +--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java ++++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java +@@ -48,4 +48,9 @@ public class BrokerHousekeepingService implements ChannelEventListener { + public void onChannelIdle(String remoteAddr, Channel channel) { + this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel); + } ++ ++ @Override ++ public void onChannelActive(String remoteAddr, Channel channel) { ++ ++ } + } +diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java +index 80d993992..b527429f7 100644 +--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java ++++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java +@@ -46,4 +46,9 @@ public class BrokerHousekeepingService implements ChannelEventListener { + public void onChannelIdle(String remoteAddr, Channel channel) { + this.namesrvController.getRouteInfoManager().onChannelDestroy(channel); + } ++ ++ @Override ++ public void onChannelActive(String remoteAddr, Channel channel) { ++ ++ } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java +index e213ae855..74eb6f2db 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java +@@ -49,5 +49,9 @@ public class ClientHousekeepingService implements ChannelEventListener { + this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel); + } + ++ @Override ++ public void onChannelActive(String remoteAddr, Channel channel) { ++ ++ } + } + +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java +index c99133b3a..6802e69b9 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java +@@ -26,4 +26,6 @@ public interface ChannelEventListener { + void onChannelException(final String remoteAddr, final Channel channel); + + void onChannelIdle(final String remoteAddr, final Channel channel); ++ ++ void onChannelActive(final String remoteAddr, final Channel channel); + } +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java +index 9ac944aad..4bc9d57dd 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java +@@ -20,5 +20,6 @@ public enum NettyEventType { + CONNECT, + CLOSE, + IDLE, +- EXCEPTION ++ EXCEPTION, ++ ACTIVE + } +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +index 07ace28ea..62a8a7290 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +@@ -701,6 +701,9 @@ public abstract class NettyRemotingAbstract { + case EXCEPTION: + listener.onChannelException(event.getRemoteAddr(), event.getChannel()); + break; ++ case ACTIVE: ++ listener.onChannelActive(event.getRemoteAddr(), event.getChannel()); ++ break; + default: + break; + +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +index 340daee67..9f1519130 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +@@ -1106,6 +1106,17 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti + } + } + ++ @Override ++ public void channelActive(ChannelHandlerContext ctx) throws Exception { ++ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); ++ LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}", remoteAddress); ++ super.channelActive(ctx); ++ ++ if (NettyRemotingClient.this.channelEventListener != null) { ++ NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.ACTIVE, remoteAddress, ctx.channel())); ++ } ++ } ++ + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); +-- +2.32.0.windows.2 + diff --git a/rocketmq.spec b/rocketmq.spec index 5f7825a..4e6d934 100644 --- a/rocketmq.spec +++ b/rocketmq.spec @@ -5,7 +5,7 @@ Summary: Cloud-Native, Distributed Messaging and Streaming Name: rocketmq Version: 5.1.5 -Release: 29 +Release: 30 License: Apache-2.0 Group: Applications/Message URL: https://rocketmq.apache.org/ @@ -38,6 +38,7 @@ Patch0025: patch025-backport-Fix-channel-connect-issue.patch Patch0026: patch026-backport-AddBroker-removes-parsing-configuration-from-body.patch Patch0027: patch027-backport-Utilizing-cache-to-avoid-duplicate-parsing.patch Patch0028: patch028-backport-Fix-proxy-client-language-error.patch +Patch0029: patch029-backport-Introduce-new-event-NettyEventType.ACTIVEr.patch BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git Requires: java-1.8.0-openjdk-devel @@ -78,6 +79,9 @@ exit 0 %changelog +* Fri Dec 8 2023 ShiZhili - 5.1.3-30 +- backport Introduce a new event NettyEventType.ACTIVE + * Fri Dec 8 2023 ShiZhili - 5.1.3-29 - backport fix proxy client language error