213 lines
11 KiB
Diff
213 lines
11 KiB
Diff
From 2043dd50341e0a4a2f254d72aa3109f4dfc97aac Mon Sep 17 00:00:00 2001
|
|
From: Zhouxiang Zhan <zhouxzhan@apache.org>
|
|
Date: Tue, 24 Oct 2023 10:29:43 +0800
|
|
Subject: [PATCH] [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE
|
|
(#7494)
|
|
|
|
* [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE for ChannelEventListener
|
|
|
|
* introduce a new event NettyEventType.ACTIVE,
|
|
|
|
* implement channelActive interface for NettyRemotingClient#NettyConnectManageHandler
|
|
|
|
* add onChannelActive for ChannelEventListener interface.
|
|
|
|
* Move send heartbeat to onChannelActive
|
|
---
|
|
.../client/ClientHousekeepingService.java | 5 +++++
|
|
.../client/impl/factory/MQClientInstance.java | 20 +++++++++++--------
|
|
.../ContainerClientHouseKeepingService.java | 11 +++++++++-
|
|
.../controller/BrokerHousekeepingService.java | 5 +++++
|
|
.../routeinfo/BrokerHousekeepingService.java | 5 +++++
|
|
.../remoting/ClientHousekeepingService.java | 4 ++++
|
|
.../remoting/ChannelEventListener.java | 2 ++
|
|
.../remoting/netty/NettyEventType.java | 3 ++-
|
|
.../remoting/netty/NettyRemotingAbstract.java | 3 +++
|
|
.../remoting/netty/NettyRemotingClient.java | 11 ++++++++++
|
|
10 files changed, 59 insertions(+), 10 deletions(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
|
|
index cbb81f632..7878d0eec 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
|
|
@@ -87,4 +87,9 @@ public class ClientHousekeepingService implements ChannelEventListener {
|
|
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
|
|
this.brokerController.getBrokerStatsManager().incChannelIdleNum();
|
|
}
|
|
+
|
|
+ @Override
|
|
+ public void onChannelActive(String remoteAddr, Channel channel) {
|
|
+
|
|
+ }
|
|
}
|
|
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
|
|
index 09534a176..ba72a6dce 100644
|
|
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
|
|
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
|
|
@@ -159,14 +159,6 @@ public class MQClientInstance {
|
|
private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = MQClientInstance.this.brokerAddrTable;
|
|
@Override
|
|
public void onChannelConnect(String remoteAddr, Channel channel) {
|
|
- for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) {
|
|
- for (String address : addressEntry.getValue().values()) {
|
|
- if (address.equals(remoteAddr)) {
|
|
- sendHeartbeatToAllBrokerWithLockV2(false);
|
|
- break;
|
|
- }
|
|
- }
|
|
- }
|
|
}
|
|
|
|
@Override
|
|
@@ -180,6 +172,18 @@ public class MQClientInstance {
|
|
@Override
|
|
public void onChannelIdle(String remoteAddr, Channel channel) {
|
|
}
|
|
+
|
|
+ @Override
|
|
+ public void onChannelActive(String remoteAddr, Channel channel) {
|
|
+ for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) {
|
|
+ for (String address : addressEntry.getValue().values()) {
|
|
+ if (address.equals(remoteAddr)) {
|
|
+ sendHeartbeatToAllBrokerWithLockV2(false);
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
};
|
|
} else {
|
|
channelEventListener = null;
|
|
diff --git a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
|
|
index 8bf4b4a33..90c912247 100644
|
|
--- a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
|
|
+++ b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
|
|
@@ -49,6 +49,11 @@ public class ContainerClientHouseKeepingService implements ChannelEventListener
|
|
onChannelOperation(CallbackCode.IDLE, remoteAddr, channel);
|
|
}
|
|
|
|
+ @Override
|
|
+ public void onChannelActive(String remoteAddr, Channel channel) {
|
|
+ onChannelOperation(CallbackCode.ACTIVE, remoteAddr, channel);
|
|
+ }
|
|
+
|
|
private void onChannelOperation(CallbackCode callbackCode, String remoteAddr, Channel channel) {
|
|
Collection<InnerBrokerController> masterBrokers = this.brokerContainer.getMasterBrokers();
|
|
Collection<InnerSalveBrokerController> slaveBrokers = this.brokerContainer.getSlaveBrokers();
|
|
@@ -103,6 +108,10 @@ public class ContainerClientHouseKeepingService implements ChannelEventListener
|
|
/**
|
|
* onChannelIdle
|
|
*/
|
|
- IDLE
|
|
+ IDLE,
|
|
+ /**
|
|
+ * onChannelActive
|
|
+ */
|
|
+ ACTIVE
|
|
}
|
|
}
|
|
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
|
|
index 652a9eeb0..d22d0b606 100644
|
|
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
|
|
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
|
|
@@ -48,4 +48,9 @@ public class BrokerHousekeepingService implements ChannelEventListener {
|
|
public void onChannelIdle(String remoteAddr, Channel channel) {
|
|
this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel);
|
|
}
|
|
+
|
|
+ @Override
|
|
+ public void onChannelActive(String remoteAddr, Channel channel) {
|
|
+
|
|
+ }
|
|
}
|
|
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
|
|
index 80d993992..b527429f7 100644
|
|
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
|
|
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
|
|
@@ -46,4 +46,9 @@ public class BrokerHousekeepingService implements ChannelEventListener {
|
|
public void onChannelIdle(String remoteAddr, Channel channel) {
|
|
this.namesrvController.getRouteInfoManager().onChannelDestroy(channel);
|
|
}
|
|
+
|
|
+ @Override
|
|
+ public void onChannelActive(String remoteAddr, Channel channel) {
|
|
+
|
|
+ }
|
|
}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
|
|
index e213ae855..74eb6f2db 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
|
|
@@ -49,5 +49,9 @@ public class ClientHousekeepingService implements ChannelEventListener {
|
|
this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel);
|
|
}
|
|
|
|
+ @Override
|
|
+ public void onChannelActive(String remoteAddr, Channel channel) {
|
|
+
|
|
+ }
|
|
}
|
|
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
|
|
index c99133b3a..6802e69b9 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
|
|
@@ -26,4 +26,6 @@ public interface ChannelEventListener {
|
|
void onChannelException(final String remoteAddr, final Channel channel);
|
|
|
|
void onChannelIdle(final String remoteAddr, final Channel channel);
|
|
+
|
|
+ void onChannelActive(final String remoteAddr, final Channel channel);
|
|
}
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
|
|
index 9ac944aad..4bc9d57dd 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
|
|
@@ -20,5 +20,6 @@ public enum NettyEventType {
|
|
CONNECT,
|
|
CLOSE,
|
|
IDLE,
|
|
- EXCEPTION
|
|
+ EXCEPTION,
|
|
+ ACTIVE
|
|
}
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
|
|
index 07ace28ea..62a8a7290 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
|
|
@@ -701,6 +701,9 @@ public abstract class NettyRemotingAbstract {
|
|
case EXCEPTION:
|
|
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
|
|
break;
|
|
+ case ACTIVE:
|
|
+ listener.onChannelActive(event.getRemoteAddr(), event.getChannel());
|
|
+ break;
|
|
default:
|
|
break;
|
|
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
|
|
index 340daee67..9f1519130 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
|
|
@@ -1106,6 +1106,17 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
|
|
}
|
|
}
|
|
|
|
+ @Override
|
|
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
|
|
+ LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}", remoteAddress);
|
|
+ super.channelActive(ctx);
|
|
+
|
|
+ if (NettyRemotingClient.this.channelEventListener != null) {
|
|
+ NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.ACTIVE, remoteAddress, ctx.channel()));
|
|
+ }
|
|
+ }
|
|
+
|
|
@Override
|
|
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
|
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
|
|
--
|
|
2.32.0.windows.2
|
|
|