From 2043dd50341e0a4a2f254d72aa3109f4dfc97aac Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan Date: Tue, 24 Oct 2023 10:29:43 +0800 Subject: [PATCH] [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE (#7494) * [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE for ChannelEventListener * introduce a new event NettyEventType.ACTIVE, * implement channelActive interface for NettyRemotingClient#NettyConnectManageHandler * add onChannelActive for ChannelEventListener interface. * Move send heartbeat to onChannelActive --- .../client/ClientHousekeepingService.java | 5 +++++ .../client/impl/factory/MQClientInstance.java | 20 +++++++++++-------- .../ContainerClientHouseKeepingService.java | 11 +++++++++- .../controller/BrokerHousekeepingService.java | 5 +++++ .../routeinfo/BrokerHousekeepingService.java | 5 +++++ .../remoting/ClientHousekeepingService.java | 4 ++++ .../remoting/ChannelEventListener.java | 2 ++ .../remoting/netty/NettyEventType.java | 3 ++- .../remoting/netty/NettyRemotingAbstract.java | 3 +++ .../remoting/netty/NettyRemotingClient.java | 11 ++++++++++ 10 files changed, 59 insertions(+), 10 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java index cbb81f632..7878d0eec 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java @@ -87,4 +87,9 @@ public class ClientHousekeepingService implements ChannelEventListener { this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getBrokerStatsManager().incChannelIdleNum(); } + + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 09534a176..ba72a6dce 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -159,14 +159,6 @@ public class MQClientInstance { private final ConcurrentMap> brokerAddrTable = MQClientInstance.this.brokerAddrTable; @Override public void onChannelConnect(String remoteAddr, Channel channel) { - for (Map.Entry> addressEntry : brokerAddrTable.entrySet()) { - for (String address : addressEntry.getValue().values()) { - if (address.equals(remoteAddr)) { - sendHeartbeatToAllBrokerWithLockV2(false); - break; - } - } - } } @Override @@ -180,6 +172,18 @@ public class MQClientInstance { @Override public void onChannelIdle(String remoteAddr, Channel channel) { } + + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + for (Map.Entry> addressEntry : brokerAddrTable.entrySet()) { + for (String address : addressEntry.getValue().values()) { + if (address.equals(remoteAddr)) { + sendHeartbeatToAllBrokerWithLockV2(false); + break; + } + } + } + } }; } else { channelEventListener = null; diff --git a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java index 8bf4b4a33..90c912247 100644 --- a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java +++ b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java @@ -49,6 +49,11 @@ public class ContainerClientHouseKeepingService implements ChannelEventListener onChannelOperation(CallbackCode.IDLE, remoteAddr, channel); } + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + onChannelOperation(CallbackCode.ACTIVE, remoteAddr, channel); + } + private void onChannelOperation(CallbackCode callbackCode, String remoteAddr, Channel channel) { Collection masterBrokers = this.brokerContainer.getMasterBrokers(); Collection slaveBrokers = this.brokerContainer.getSlaveBrokers(); @@ -103,6 +108,10 @@ public class ContainerClientHouseKeepingService implements ChannelEventListener /** * onChannelIdle */ - IDLE + IDLE, + /** + * onChannelActive + */ + ACTIVE } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java index 652a9eeb0..d22d0b606 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java @@ -48,4 +48,9 @@ public class BrokerHousekeepingService implements ChannelEventListener { public void onChannelIdle(String remoteAddr, Channel channel) { this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel); } + + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + + } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java index 80d993992..b527429f7 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java @@ -46,4 +46,9 @@ public class BrokerHousekeepingService implements ChannelEventListener { public void onChannelIdle(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(channel); } + + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java index e213ae855..74eb6f2db 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java @@ -49,5 +49,9 @@ public class ClientHousekeepingService implements ChannelEventListener { this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel); } + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java index c99133b3a..6802e69b9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java @@ -26,4 +26,6 @@ public interface ChannelEventListener { void onChannelException(final String remoteAddr, final Channel channel); void onChannelIdle(final String remoteAddr, final Channel channel); + + void onChannelActive(final String remoteAddr, final Channel channel); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java index 9ac944aad..4bc9d57dd 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java @@ -20,5 +20,6 @@ public enum NettyEventType { CONNECT, CLOSE, IDLE, - EXCEPTION + EXCEPTION, + ACTIVE } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 07ace28ea..62a8a7290 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -701,6 +701,9 @@ public abstract class NettyRemotingAbstract { case EXCEPTION: listener.onChannelException(event.getRemoteAddr(), event.getChannel()); break; + case ACTIVE: + listener.onChannelActive(event.getRemoteAddr(), event.getChannel()); + break; default: break; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 340daee67..9f1519130 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -1106,6 +1106,17 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}", remoteAddress); + super.channelActive(ctx); + + if (NettyRemotingClient.this.channelEventListener != null) { + NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.ACTIVE, remoteAddress, ctx.channel())); + } + } + @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); -- 2.32.0.windows.2