1940 lines
99 KiB
Diff
1940 lines
99 KiB
Diff
From 955428278ccd9bfa0f15e21a8d3040c5213358bd Mon Sep 17 00:00:00 2001
|
|
From: Dongyuan Pan <dongyuanpan0@gmail.com>
|
|
Date: Tue, 4 Jul 2023 18:01:48 +0800
|
|
Subject: [PATCH 1/5] [ISSUE #6991] Delete rocketmq.client.logUseSlf4j=true in
|
|
JAVA_OPT
|
|
|
|
---
|
|
distribution/bin/runbroker.cmd | 1 -
|
|
distribution/bin/runbroker.sh | 1 -
|
|
2 files changed, 2 deletions(-)
|
|
|
|
diff --git a/distribution/bin/runbroker.cmd b/distribution/bin/runbroker.cmd
|
|
index 15f676aa8..77a0d1ff8 100644
|
|
--- a/distribution/bin/runbroker.cmd
|
|
+++ b/distribution/bin/runbroker.cmd
|
|
@@ -36,7 +36,6 @@ set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow"
|
|
set "JAVA_OPT=%JAVA_OPT% -XX:+AlwaysPreTouch"
|
|
set "JAVA_OPT=%JAVA_OPT% -XX:MaxDirectMemorySize=15g"
|
|
set "JAVA_OPT=%JAVA_OPT% -XX:-UseLargePages -XX:-UseBiasedLocking"
|
|
-set "JAVA_OPT=%JAVA_OPT% -Drocketmq.client.logUseSlf4j=true"
|
|
set "JAVA_OPT=%JAVA_OPT% %JAVA_OPT_EXT% -cp %CLASSPATH%"
|
|
|
|
"%JAVA%" %JAVA_OPT% %*
|
|
\ No newline at end of file
|
|
diff --git a/distribution/bin/runbroker.sh b/distribution/bin/runbroker.sh
|
|
index a081df79e..e6e2132ab 100644
|
|
--- a/distribution/bin/runbroker.sh
|
|
+++ b/distribution/bin/runbroker.sh
|
|
@@ -106,7 +106,6 @@ JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
|
|
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
|
|
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
|
|
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
|
|
-JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true"
|
|
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
|
|
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
|
|
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 00fc42b8be848fc3f5c550cbab007b92f128dc38 Mon Sep 17 00:00:00 2001
|
|
From: ShuangxiDing <dingshuangxi888@gmail.com>
|
|
Date: Tue, 4 Jul 2023 18:02:16 +0800
|
|
Subject: [PATCH 2/5] [ISSUE #6957] Support Proxy Protocol for gRPC and
|
|
Remoting Server (#6958)
|
|
|
|
---
|
|
WORKSPACE | 1 +
|
|
.../common/constant/HAProxyConstants.java | 28 ++++
|
|
pom.xml | 5 +
|
|
proxy/BUILD.bazel | 2 +
|
|
proxy/pom.xml | 4 +
|
|
.../proxy/grpc/GrpcServerBuilder.java | 2 +-
|
|
...ava => ProxyAndTlsProtocolNegotiator.java} | 139 ++++++++++++++++--
|
|
.../proxy/grpc/constant/AttributeKeys.java | 44 ++++++
|
|
.../grpc/interceptor/HeaderInterceptor.java | 32 +++-
|
|
.../remoting/MultiProtocolRemotingServer.java | 5 +-
|
|
.../remoting/common/RemotingHelper.java | 42 ++++--
|
|
.../remoting/netty/AttributeKeys.java | 45 ++++++
|
|
.../remoting/netty/NettyRemotingServer.java | 129 ++++++++++++++--
|
|
.../rocketmq/remoting/ProxyProtocolTest.java | 116 +++++++++++++++
|
|
.../org/apache/rocketmq/remoting/TlsTest.java | 28 ++--
|
|
15 files changed, 563 insertions(+), 59 deletions(-)
|
|
create mode 100644 common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
|
|
rename proxy/src/main/java/org/apache/rocketmq/proxy/grpc/{OptionalSSLProtocolNegotiator.java => ProxyAndTlsProtocolNegotiator.java} (51%)
|
|
create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
|
|
create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
|
|
create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
|
|
|
|
diff --git a/WORKSPACE b/WORKSPACE
|
|
index fbb694efe..e3a8f37dc 100644
|
|
--- a/WORKSPACE
|
|
+++ b/WORKSPACE
|
|
@@ -104,6 +104,7 @@ maven_install(
|
|
"software.amazon.awssdk:s3:2.20.29",
|
|
"com.fasterxml.jackson.core:jackson-databind:2.13.4.2",
|
|
"com.adobe.testing:s3mock-junit4:2.11.0",
|
|
+ "io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
|
|
],
|
|
fetch_sources = True,
|
|
repositories = [
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
|
|
new file mode 100644
|
|
index 000000000..c1ae0cca1
|
|
--- /dev/null
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
|
|
@@ -0,0 +1,28 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+
|
|
+package org.apache.rocketmq.common.constant;
|
|
+
|
|
+public class HAProxyConstants {
|
|
+
|
|
+ public static final String PROXY_PROTOCOL_PREFIX = "proxy_protocol_";
|
|
+ public static final String PROXY_PROTOCOL_ADDR = PROXY_PROTOCOL_PREFIX + "addr";
|
|
+ public static final String PROXY_PROTOCOL_PORT = PROXY_PROTOCOL_PREFIX + "port";
|
|
+ public static final String PROXY_PROTOCOL_SERVER_ADDR = PROXY_PROTOCOL_PREFIX + "server_addr";
|
|
+ public static final String PROXY_PROTOCOL_SERVER_PORT = PROXY_PROTOCOL_PREFIX + "server_port";
|
|
+ public static final String PROXY_PROTOCOL_TLV_PREFIX = PROXY_PROTOCOL_PREFIX + "tlv_0x";
|
|
+}
|
|
diff --git a/pom.xml b/pom.xml
|
|
index a3b474602..12bc2dbd5 100644
|
|
--- a/pom.xml
|
|
+++ b/pom.xml
|
|
@@ -888,6 +888,11 @@
|
|
</exclusion>
|
|
</exclusions>
|
|
</dependency>
|
|
+ <dependency>
|
|
+ <groupId>io.github.aliyunmq</groupId>
|
|
+ <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId>
|
|
+ <version>1.0.0</version>
|
|
+ </dependency>
|
|
<dependency>
|
|
<groupId>com.conversantmedia</groupId>
|
|
<artifactId>disruptor</artifactId>
|
|
diff --git a/proxy/BUILD.bazel b/proxy/BUILD.bazel
|
|
index fcb85e46f..b4f3c16e2 100644
|
|
--- a/proxy/BUILD.bazel
|
|
+++ b/proxy/BUILD.bazel
|
|
@@ -46,6 +46,7 @@ java_library(
|
|
"@maven//:io_grpc_grpc_services",
|
|
"@maven//:io_grpc_grpc_stub",
|
|
"@maven//:io_netty_netty_all",
|
|
+ "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy",
|
|
"@maven//:io_openmessaging_storage_dledger",
|
|
"@maven//:io_opentelemetry_opentelemetry_api",
|
|
"@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
|
|
@@ -94,6 +95,7 @@ java_library(
|
|
"@maven//:io_grpc_grpc_netty_shaded",
|
|
"@maven//:io_grpc_grpc_stub",
|
|
"@maven//:io_netty_netty_all",
|
|
+ "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy",
|
|
"@maven//:org_apache_commons_commons_lang3",
|
|
"@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
|
|
"@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
|
|
diff --git a/proxy/pom.xml b/proxy/pom.xml
|
|
index f14155737..3fbea107a 100644
|
|
--- a/proxy/pom.xml
|
|
+++ b/proxy/pom.xml
|
|
@@ -75,6 +75,10 @@
|
|
<groupId>com.google.protobuf</groupId>
|
|
<artifactId>protobuf-java-util</artifactId>
|
|
</dependency>
|
|
+ <dependency>
|
|
+ <groupId>io.github.aliyunmq</groupId>
|
|
+ <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId>
|
|
+ </dependency>
|
|
<dependency>
|
|
<groupId>org.apache.commons</groupId>
|
|
<artifactId>commons-lang3</artifactId>
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
|
|
index 0ca6a1fcb..437b9216b 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
|
|
@@ -50,7 +50,7 @@ public class GrpcServerBuilder {
|
|
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
|
|
serverBuilder = NettyServerBuilder.forPort(port);
|
|
|
|
- serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator());
|
|
+ serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());
|
|
|
|
// build server
|
|
int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
|
|
similarity index 51%
|
|
rename from proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
|
|
rename to proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
|
|
index 670e1c1a2..ceb9becc0 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
|
|
@@ -16,36 +16,53 @@
|
|
*/
|
|
package org.apache.rocketmq.proxy.grpc;
|
|
|
|
+import io.grpc.Attributes;
|
|
import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler;
|
|
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
|
|
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent;
|
|
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
|
|
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
|
|
+import io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiationEvent;
|
|
import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
|
|
import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
|
|
import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
|
|
+import io.grpc.netty.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
|
|
import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder;
|
|
+import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionResult;
|
|
+import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionState;
|
|
+import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessage;
|
|
+import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
|
|
+import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
|
|
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
|
|
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
|
|
import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
|
|
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
|
|
import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
|
|
import io.grpc.netty.shaded.io.netty.util.AsciiString;
|
|
-import java.io.InputStream;
|
|
-import java.nio.file.Files;
|
|
-import java.nio.file.Paths;
|
|
-import java.util.List;
|
|
+import io.grpc.netty.shaded.io.netty.util.CharsetUtil;
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
+import org.apache.rocketmq.common.constant.HAProxyConstants;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
import org.apache.rocketmq.proxy.config.ProxyConfig;
|
|
+import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
|
|
import org.apache.rocketmq.remoting.common.TlsMode;
|
|
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
|
|
|
|
-public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator {
|
|
+import java.io.InputStream;
|
|
+import java.nio.file.Files;
|
|
+import java.nio.file.Paths;
|
|
+import java.util.List;
|
|
+
|
|
+public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator {
|
|
protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
|
|
|
|
+ private static final String HA_PROXY_DECODER = "HAProxyDecoder";
|
|
+ private static final String HA_PROXY_HANDLER = "HAProxyHandler";
|
|
+ private static final String TLS_MODE_HANDLER = "TlsModeHandler";
|
|
/**
|
|
* the length of the ssl record header (in bytes)
|
|
*/
|
|
@@ -53,7 +70,7 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
|
|
|
|
private static SslContext sslContext;
|
|
|
|
- public OptionalSSLProtocolNegotiator() {
|
|
+ public ProxyAndTlsProtocolNegotiator() {
|
|
sslContext = loadSslContext();
|
|
}
|
|
|
|
@@ -64,11 +81,12 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
|
|
|
|
@Override
|
|
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
|
|
- return new PortUnificationServerHandler(grpcHandler);
|
|
+ return new ProxyAndTlsProtocolHandler(grpcHandler);
|
|
}
|
|
|
|
@Override
|
|
- public void close() {}
|
|
+ public void close() {
|
|
+ }
|
|
|
|
private static SslContext loadSslContext() {
|
|
try {
|
|
@@ -85,8 +103,8 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
|
|
String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath();
|
|
try (InputStream serverKeyInputStream = Files.newInputStream(
|
|
Paths.get(tlsKeyPath));
|
|
- InputStream serverCertificateStream = Files.newInputStream(
|
|
- Paths.get(tlsCertPath))) {
|
|
+ InputStream serverCertificateStream = Files.newInputStream(
|
|
+ Paths.get(tlsCertPath))) {
|
|
SslContext res = GrpcSslContexts.forServer(serverCertificateStream,
|
|
serverKeyInputStream)
|
|
.trustManager(InsecureTrustManagerFactory.INSTANCE)
|
|
@@ -102,12 +120,95 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
|
|
}
|
|
}
|
|
|
|
- public static class PortUnificationServerHandler extends ByteToMessageDecoder {
|
|
+ private static class ProxyAndTlsProtocolHandler extends ByteToMessageDecoder {
|
|
+
|
|
+ private final GrpcHttp2ConnectionHandler grpcHandler;
|
|
+
|
|
+ public ProxyAndTlsProtocolHandler(GrpcHttp2ConnectionHandler grpcHandler) {
|
|
+ this.grpcHandler = grpcHandler;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
|
|
+ try {
|
|
+ ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(
|
|
+ in);
|
|
+ if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
|
|
+ return;
|
|
+ }
|
|
+ if (ha.state() == ProtocolDetectionState.DETECTED) {
|
|
+ ctx.pipeline().addAfter(ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
|
|
+ .addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
|
|
+ .addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
|
|
+ } else {
|
|
+ ctx.pipeline().addAfter(ctx.name(), TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
|
|
+ }
|
|
+
|
|
+ ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
|
|
+ ctx.pipeline().remove(this);
|
|
+ } catch (Exception e) {
|
|
+ log.error("process proxy protocol negotiator failed.", e);
|
|
+ throw e;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
|
|
+
|
|
+ private ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault();
|
|
+
|
|
+ @Override
|
|
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
+ if (msg instanceof HAProxyMessage) {
|
|
+ replaceEventWithMessage((HAProxyMessage) msg);
|
|
+ ctx.fireUserEventTriggered(pne);
|
|
+ } else {
|
|
+ super.channelRead(ctx, msg);
|
|
+ }
|
|
+ ctx.pipeline().remove(this);
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * The definition of key refers to the implementation of nginx
|
|
+ * <a href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr">ngx_http_core_module</a>
|
|
+ *
|
|
+ * @param msg
|
|
+ */
|
|
+ private void replaceEventWithMessage(HAProxyMessage msg) {
|
|
+ Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
|
|
+ if (StringUtils.isNotBlank(msg.sourceAddress())) {
|
|
+ builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress());
|
|
+ }
|
|
+ if (msg.sourcePort() > 0) {
|
|
+ builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort()));
|
|
+ }
|
|
+ if (StringUtils.isNotBlank(msg.destinationAddress())) {
|
|
+ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress());
|
|
+ }
|
|
+ if (msg.destinationPort() > 0) {
|
|
+ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort()));
|
|
+ }
|
|
+ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
|
|
+ msg.tlvs().forEach(tlv -> {
|
|
+ Attributes.Key<String> key = AttributeKeys.valueOf(
|
|
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
|
|
+ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
|
|
+ builder.set(key, value);
|
|
+ });
|
|
+ }
|
|
+ pne = InternalProtocolNegotiationEvent
|
|
+ .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static class TlsModeHandler extends ByteToMessageDecoder {
|
|
+
|
|
+ private ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault();
|
|
|
|
private final ChannelHandler ssl;
|
|
private final ChannelHandler plaintext;
|
|
|
|
- public PortUnificationServerHandler(GrpcHttp2ConnectionHandler grpcHandler) {
|
|
+ public TlsModeHandler(GrpcHttp2ConnectionHandler grpcHandler) {
|
|
this.ssl = InternalProtocolNegotiators.serverTls(sslContext)
|
|
.newHandler(grpcHandler);
|
|
this.plaintext = InternalProtocolNegotiators.serverPlaintext()
|
|
@@ -115,8 +216,7 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
|
|
}
|
|
|
|
@Override
|
|
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
|
|
- throws Exception {
|
|
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
|
|
try {
|
|
TlsMode tlsMode = TlsSystemConfig.tlsMode;
|
|
if (TlsMode.ENFORCING.equals(tlsMode)) {
|
|
@@ -134,12 +234,21 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
|
|
ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
|
|
}
|
|
}
|
|
- ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
|
|
+ ctx.fireUserEventTriggered(pne);
|
|
ctx.pipeline().remove(this);
|
|
} catch (Exception e) {
|
|
log.error("process ssl protocol negotiator failed.", e);
|
|
throw e;
|
|
}
|
|
}
|
|
+
|
|
+ @Override
|
|
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
|
+ if (evt instanceof ProtocolNegotiationEvent) {
|
|
+ pne = (ProtocolNegotiationEvent) evt;
|
|
+ } else {
|
|
+ super.userEventTriggered(ctx, evt);
|
|
+ }
|
|
+ }
|
|
}
|
|
}
|
|
\ No newline at end of file
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
|
|
new file mode 100644
|
|
index 000000000..096a5ba3d
|
|
--- /dev/null
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
|
|
@@ -0,0 +1,44 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+package org.apache.rocketmq.proxy.grpc.constant;
|
|
+
|
|
+import io.grpc.Attributes;
|
|
+import org.apache.rocketmq.common.constant.HAProxyConstants;
|
|
+
|
|
+import java.util.Map;
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
+
|
|
+public class AttributeKeys {
|
|
+
|
|
+ public static final Attributes.Key<String> PROXY_PROTOCOL_ADDR =
|
|
+ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_ADDR);
|
|
+
|
|
+ public static final Attributes.Key<String> PROXY_PROTOCOL_PORT =
|
|
+ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_PORT);
|
|
+
|
|
+ public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_ADDR =
|
|
+ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR);
|
|
+
|
|
+ public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_PORT =
|
|
+ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT);
|
|
+
|
|
+ private static final Map<String, Attributes.Key<String>> ATTRIBUTES_KEY_MAP = new ConcurrentHashMap<>();
|
|
+
|
|
+ public static Attributes.Key<String> valueOf(String name) {
|
|
+ return ATTRIBUTES_KEY_MAP.computeIfAbsent(name, key -> Attributes.Key.create(name));
|
|
+ }
|
|
+}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
|
|
index 1cbb00361..13893e5ed 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
|
|
@@ -18,11 +18,16 @@
|
|
package org.apache.rocketmq.proxy.grpc.interceptor;
|
|
|
|
import com.google.common.net.HostAndPort;
|
|
+import io.grpc.Attributes;
|
|
import io.grpc.Grpc;
|
|
import io.grpc.Metadata;
|
|
import io.grpc.ServerCall;
|
|
import io.grpc.ServerCallHandler;
|
|
import io.grpc.ServerInterceptor;
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
+import org.apache.rocketmq.common.constant.HAProxyConstants;
|
|
+import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
|
|
+
|
|
import java.net.InetSocketAddress;
|
|
import java.net.SocketAddress;
|
|
|
|
@@ -33,13 +38,27 @@ public class HeaderInterceptor implements ServerInterceptor {
|
|
Metadata headers,
|
|
ServerCallHandler<R, W> next
|
|
) {
|
|
- SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
|
|
- String remoteAddress = parseSocketAddress(remoteSocketAddress);
|
|
+ String remoteAddress = getProxyProtocolAddress(call.getAttributes());
|
|
+ if (StringUtils.isBlank(remoteAddress)) {
|
|
+ SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
|
|
+ remoteAddress = parseSocketAddress(remoteSocketAddress);
|
|
+ }
|
|
headers.put(InterceptorConstants.REMOTE_ADDRESS, remoteAddress);
|
|
|
|
SocketAddress localSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
|
|
String localAddress = parseSocketAddress(localSocketAddress);
|
|
headers.put(InterceptorConstants.LOCAL_ADDRESS, localAddress);
|
|
+
|
|
+ for (Attributes.Key<?> key : call.getAttributes().keys()) {
|
|
+ if (!StringUtils.startsWith(key.toString(), HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
|
|
+ continue;
|
|
+ }
|
|
+ Metadata.Key<String> headerKey
|
|
+ = Metadata.Key.of(key.toString(), Metadata.ASCII_STRING_MARSHALLER);
|
|
+ String headerValue = String.valueOf(call.getAttributes().get(key));
|
|
+ headers.put(headerKey, headerValue);
|
|
+ }
|
|
+
|
|
return next.startCall(call, headers);
|
|
}
|
|
|
|
@@ -55,4 +74,13 @@ public class HeaderInterceptor implements ServerInterceptor {
|
|
|
|
return "";
|
|
}
|
|
+
|
|
+ private String getProxyProtocolAddress(Attributes attributes) {
|
|
+ String proxyProtocolAddr = attributes.get(AttributeKeys.PROXY_PROTOCOL_ADDR);
|
|
+ String proxyProtocolPort = attributes.get(AttributeKeys.PROXY_PROTOCOL_PORT);
|
|
+ if (StringUtils.isBlank(proxyProtocolAddr) || StringUtils.isBlank(proxyProtocolPort)) {
|
|
+ return null;
|
|
+ }
|
|
+ return proxyProtocolAddr + ":" + proxyProtocolPort;
|
|
+ }
|
|
}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
|
|
index 1142132b7..858b1f022 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
|
|
@@ -20,8 +20,6 @@ package org.apache.rocketmq.proxy.remoting;
|
|
import io.netty.channel.ChannelPipeline;
|
|
import io.netty.channel.socket.SocketChannel;
|
|
import io.netty.handler.timeout.IdleStateHandler;
|
|
-import java.io.IOException;
|
|
-import java.security.cert.CertificateException;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
@@ -36,6 +34,9 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
|
|
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
|
|
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
|
|
|
|
+import java.io.IOException;
|
|
+import java.security.cert.CertificateException;
|
|
+
|
|
/**
|
|
* support remoting and http2 protocol at one port
|
|
*/
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
|
|
index 75e25a83a..d0750b678 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
|
|
@@ -21,14 +21,8 @@ import io.netty.channel.ChannelFuture;
|
|
import io.netty.channel.ChannelFutureListener;
|
|
import io.netty.util.Attribute;
|
|
import io.netty.util.AttributeKey;
|
|
-import java.io.IOException;
|
|
-import java.lang.reflect.Field;
|
|
-import java.net.InetSocketAddress;
|
|
-import java.net.SocketAddress;
|
|
-import java.nio.ByteBuffer;
|
|
-import java.nio.channels.SocketChannel;
|
|
-import java.util.HashMap;
|
|
-import java.util.Map;
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
+import org.apache.rocketmq.common.constant.HAProxyConstants;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.utils.NetworkUtil;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
@@ -43,6 +37,15 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
import org.apache.rocketmq.remoting.protocol.RequestCode;
|
|
import org.apache.rocketmq.remoting.protocol.ResponseCode;
|
|
|
|
+import java.io.IOException;
|
|
+import java.lang.reflect.Field;
|
|
+import java.net.InetSocketAddress;
|
|
+import java.net.SocketAddress;
|
|
+import java.nio.ByteBuffer;
|
|
+import java.nio.channels.SocketChannel;
|
|
+import java.util.HashMap;
|
|
+import java.util.Map;
|
|
+
|
|
public class RemotingHelper {
|
|
public static final String DEFAULT_CHARSET = "UTF-8";
|
|
public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
|
|
@@ -50,6 +53,9 @@ public class RemotingHelper {
|
|
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
|
|
private static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr");
|
|
|
|
+ private static final AttributeKey<String> PROXY_PROTOCOL_ADDR = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
|
|
+ private static final AttributeKey<String> PROXY_PROTOCOL_PORT = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
|
|
+
|
|
public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId");
|
|
|
|
public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version");
|
|
@@ -203,12 +209,16 @@ public class RemotingHelper {
|
|
if (null == channel) {
|
|
return "";
|
|
}
|
|
+ String addr = getProxyProtocolAddress(channel);
|
|
+ if (StringUtils.isNotBlank(addr)) {
|
|
+ return addr;
|
|
+ }
|
|
Attribute<String> att = channel.attr(REMOTE_ADDR_KEY);
|
|
if (att == null) {
|
|
// mocked in unit test
|
|
return parseChannelRemoteAddr0(channel);
|
|
}
|
|
- String addr = att.get();
|
|
+ addr = att.get();
|
|
if (addr == null) {
|
|
addr = parseChannelRemoteAddr0(channel);
|
|
att.set(addr);
|
|
@@ -216,6 +226,18 @@ public class RemotingHelper {
|
|
return addr;
|
|
}
|
|
|
|
+ private static String getProxyProtocolAddress(Channel channel) {
|
|
+ if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) {
|
|
+ return null;
|
|
+ }
|
|
+ String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR, channel);
|
|
+ String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT, channel);
|
|
+ if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort == null) {
|
|
+ return null;
|
|
+ }
|
|
+ return proxyProtocolAddr + ":" + proxyProtocolPort;
|
|
+ }
|
|
+
|
|
private static String parseChannelRemoteAddr0(final Channel channel) {
|
|
SocketAddress remote = channel.remoteAddress();
|
|
final String addr = remote != null ? remote.toString() : "";
|
|
@@ -255,7 +277,7 @@ public class RemotingHelper {
|
|
return "";
|
|
}
|
|
|
|
- public static int parseSocketAddressPort(SocketAddress socketAddress) {
|
|
+ public static Integer parseSocketAddressPort(SocketAddress socketAddress) {
|
|
if (socketAddress instanceof InetSocketAddress) {
|
|
return ((InetSocketAddress) socketAddress).getPort();
|
|
}
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
|
|
new file mode 100644
|
|
index 000000000..4e69ab82d
|
|
--- /dev/null
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
|
|
@@ -0,0 +1,45 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+package org.apache.rocketmq.remoting.netty;
|
|
+
|
|
+
|
|
+import io.netty.util.AttributeKey;
|
|
+import org.apache.rocketmq.common.constant.HAProxyConstants;
|
|
+
|
|
+import java.util.Map;
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
+
|
|
+public class AttributeKeys {
|
|
+
|
|
+ public static final AttributeKey<String> PROXY_PROTOCOL_ADDR =
|
|
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
|
|
+
|
|
+ public static final AttributeKey<String> PROXY_PROTOCOL_PORT =
|
|
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
|
|
+
|
|
+ public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_ADDR =
|
|
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR);
|
|
+
|
|
+ public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_PORT =
|
|
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT);
|
|
+
|
|
+ private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP = new ConcurrentHashMap<>();
|
|
+
|
|
+ public static AttributeKey<String> valueOf(String name) {
|
|
+ return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf);
|
|
+ }
|
|
+}
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
index 9f39d672e..94ffd8d07 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
@@ -24,6 +24,7 @@ import io.netty.channel.ChannelDuplexHandler;
|
|
import io.netty.channel.ChannelFuture;
|
|
import io.netty.channel.ChannelHandler;
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
+import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
import io.netty.channel.ChannelInitializer;
|
|
import io.netty.channel.ChannelOption;
|
|
import io.netty.channel.ChannelPipeline;
|
|
@@ -36,27 +37,25 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
|
|
import io.netty.channel.nio.NioEventLoopGroup;
|
|
import io.netty.channel.socket.SocketChannel;
|
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
|
+import io.netty.handler.codec.ProtocolDetectionResult;
|
|
+import io.netty.handler.codec.ProtocolDetectionState;
|
|
+import io.netty.handler.codec.haproxy.HAProxyMessage;
|
|
+import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
|
|
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
|
|
import io.netty.handler.timeout.IdleState;
|
|
import io.netty.handler.timeout.IdleStateEvent;
|
|
import io.netty.handler.timeout.IdleStateHandler;
|
|
+import io.netty.util.AttributeKey;
|
|
+import io.netty.util.CharsetUtil;
|
|
import io.netty.util.HashedWheelTimer;
|
|
import io.netty.util.Timeout;
|
|
import io.netty.util.TimerTask;
|
|
import io.netty.util.concurrent.DefaultEventExecutorGroup;
|
|
-import java.io.IOException;
|
|
-import java.net.InetSocketAddress;
|
|
-import java.security.cert.CertificateException;
|
|
-import java.util.NoSuchElementException;
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
-import java.util.concurrent.ConcurrentMap;
|
|
-import java.util.concurrent.ExecutorService;
|
|
-import java.util.concurrent.Executors;
|
|
-import java.util.concurrent.ScheduledExecutorService;
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
-import java.util.concurrent.TimeUnit;
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.rocketmq.common.Pair;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
+import org.apache.rocketmq.common.constant.HAProxyConstants;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.utils.NetworkUtil;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
@@ -71,6 +70,19 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
|
|
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
|
|
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
|
|
+import java.io.IOException;
|
|
+import java.net.InetSocketAddress;
|
|
+import java.security.cert.CertificateException;
|
|
+import java.util.NoSuchElementException;
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
+import java.util.concurrent.ExecutorService;
|
|
+import java.util.concurrent.Executors;
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
+import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
+import java.util.concurrent.TimeUnit;
|
|
+
|
|
@SuppressWarnings("NullableProblems")
|
|
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
|
|
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
|
|
@@ -96,6 +108,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
private final ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> remotingServerTable = new ConcurrentHashMap<>();
|
|
|
|
public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
|
|
+ public static final String HA_PROXY_DECODER = "HAProxyDecoder";
|
|
+ public static final String HA_PROXY_HANDLER = "HAProxyHandler";
|
|
+ public static final String TLS_MODE_HANDLER = "TlsModeHandler";
|
|
public static final String TLS_HANDLER_NAME = "sslHandler";
|
|
public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
|
|
|
|
@@ -387,7 +402,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
}
|
|
|
|
private void prepareSharableHandlers() {
|
|
- handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
|
|
+ handshakeHandler = new HandshakeHandler();
|
|
encoder = new NettyEncoder();
|
|
connectionManageHandler = new NettyConnectManageHandler();
|
|
serverHandler = new NettyServerHandler();
|
|
@@ -437,11 +452,51 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
@ChannelHandler.Sharable
|
|
public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
|
|
|
+ private final TlsModeHandler tlsModeHandler;
|
|
+
|
|
+ public HandshakeHandler() {
|
|
+ tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
|
|
+ try {
|
|
+ ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(in);
|
|
+ if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
|
|
+ return;
|
|
+ }
|
|
+ if (ha.state() == ProtocolDetectionState.DETECTED) {
|
|
+ ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
|
|
+ .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
|
|
+ .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
|
|
+ } else {
|
|
+ ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), TLS_MODE_HANDLER, tlsModeHandler);
|
|
+ }
|
|
+
|
|
+ try {
|
|
+ // Remove this handler
|
|
+ ctx.pipeline().remove(this);
|
|
+ } catch (NoSuchElementException e) {
|
|
+ log.error("Error while removing HandshakeHandler", e);
|
|
+ }
|
|
+
|
|
+ // Hand over this message to the next .
|
|
+ ctx.fireChannelRead(in.retain());
|
|
+ } catch (Exception e) {
|
|
+ log.error("process proxy protocol negotiator failed.", e);
|
|
+ throw e;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @ChannelHandler.Sharable
|
|
+ public class TlsModeHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
|
+
|
|
private final TlsMode tlsMode;
|
|
|
|
private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
|
|
|
|
- HandshakeHandler(TlsMode tlsMode) {
|
|
+ TlsModeHandler(TlsMode tlsMode) {
|
|
this.tlsMode = tlsMode;
|
|
}
|
|
|
|
@@ -461,7 +516,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
case ENFORCING:
|
|
if (null != sslContext) {
|
|
ctx.pipeline()
|
|
- .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
|
|
+ .addAfter(defaultEventExecutorGroup, TLS_MODE_HANDLER, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
|
|
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
|
|
log.info("Handlers prepended to channel pipeline to establish SSL connection");
|
|
} else {
|
|
@@ -483,7 +538,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
// Remove this handler
|
|
ctx.pipeline().remove(this);
|
|
} catch (NoSuchElementException e) {
|
|
- log.error("Error while removing HandshakeHandler", e);
|
|
+ log.error("Error while removing TlsModeHandler", e);
|
|
}
|
|
|
|
// Hand over this message to the next .
|
|
@@ -706,4 +761,46 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
return NettyRemotingServer.this.getCallbackExecutor();
|
|
}
|
|
}
|
|
+
|
|
+ public static class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
|
|
+
|
|
+ @Override
|
|
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
+ if (msg instanceof HAProxyMessage) {
|
|
+ fillChannelWithMessage((HAProxyMessage) msg, ctx.channel());
|
|
+ } else {
|
|
+ super.channelRead(ctx, msg);
|
|
+ }
|
|
+ ctx.pipeline().remove(this);
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * The definition of key refers to the implementation of nginx
|
|
+ * <a href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr">ngx_http_core_module</a>
|
|
+ * @param msg
|
|
+ * @param channel
|
|
+ */
|
|
+ private void fillChannelWithMessage(HAProxyMessage msg, Channel channel) {
|
|
+ if (StringUtils.isNotBlank(msg.sourceAddress())) {
|
|
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
|
|
+ }
|
|
+ if (msg.sourcePort() > 0) {
|
|
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
|
|
+ }
|
|
+ if (StringUtils.isNotBlank(msg.destinationAddress())) {
|
|
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
|
|
+ }
|
|
+ if (msg.destinationPort() > 0) {
|
|
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
|
|
+ }
|
|
+ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
|
|
+ msg.tlvs().forEach(tlv -> {
|
|
+ AttributeKey<String> key = AttributeKeys.valueOf(
|
|
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
|
|
+ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
|
|
+ channel.attr(key).set(value);
|
|
+ });
|
|
+ }
|
|
+ }
|
|
+ }
|
|
}
|
|
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
|
|
new file mode 100644
|
|
index 000000000..c39fd2132
|
|
--- /dev/null
|
|
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
|
|
@@ -0,0 +1,116 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+
|
|
+package org.apache.rocketmq.remoting;
|
|
+
|
|
+import io.netty.buffer.ByteBuf;
|
|
+import io.netty.buffer.Unpooled;
|
|
+import io.netty.channel.Channel;
|
|
+import io.netty.handler.codec.haproxy.HAProxyCommand;
|
|
+import io.netty.handler.codec.haproxy.HAProxyMessage;
|
|
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
|
|
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
|
|
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
|
|
+import org.apache.rocketmq.common.utils.NetworkUtil;
|
|
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
|
|
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
|
|
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
|
|
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
+import org.junit.Before;
|
|
+import org.junit.Test;
|
|
+import org.junit.runner.RunWith;
|
|
+import org.mockito.junit.MockitoJUnitRunner;
|
|
+
|
|
+import java.io.IOException;
|
|
+import java.lang.reflect.Method;
|
|
+import java.net.Socket;
|
|
+import java.time.Duration;
|
|
+import java.util.concurrent.TimeUnit;
|
|
+
|
|
+import static org.assertj.core.api.Assertions.assertThat;
|
|
+import static org.awaitility.Awaitility.await;
|
|
+import static org.junit.Assert.assertNotNull;
|
|
+
|
|
+@RunWith(MockitoJUnitRunner.class)
|
|
+public class ProxyProtocolTest {
|
|
+
|
|
+ private RemotingServer remotingServer;
|
|
+ private RemotingClient remotingClient;
|
|
+
|
|
+ @Before
|
|
+ public void setUp() throws Exception {
|
|
+ NettyClientConfig clientConfig = new NettyClientConfig();
|
|
+ clientConfig.setUseTLS(false);
|
|
+
|
|
+ remotingServer = RemotingServerTest.createRemotingServer();
|
|
+ remotingClient = RemotingServerTest.createRemotingClient(clientConfig);
|
|
+
|
|
+ await().pollDelay(Duration.ofMillis(10))
|
|
+ .pollInterval(Duration.ofMillis(10))
|
|
+ .atMost(20, TimeUnit.SECONDS).until(() -> isHostConnectable(getServerAddress()));
|
|
+ }
|
|
+
|
|
+ @Test
|
|
+ public void testProxyProtocol() throws Exception {
|
|
+ sendHAProxyMessage(remotingClient);
|
|
+ requestThenAssertResponse(remotingClient);
|
|
+ }
|
|
+
|
|
+ private void requestThenAssertResponse(RemotingClient remotingClient) throws Exception {
|
|
+ RemotingCommand response = remotingClient.invokeSync(getServerAddress(), createRequest(), 10000 * 3);
|
|
+ assertNotNull(response);
|
|
+ assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
|
|
+ assertThat(response.getExtFields()).hasSize(2);
|
|
+ assertThat(response.getExtFields().get("messageTitle")).isEqualTo("Welcome");
|
|
+ }
|
|
+
|
|
+ private void sendHAProxyMessage(RemotingClient remotingClient) throws Exception {
|
|
+ Method getAndCreateChannel = NettyRemotingClient.class.getDeclaredMethod("getAndCreateChannel", String.class);
|
|
+ getAndCreateChannel.setAccessible(true);
|
|
+ NettyRemotingClient nettyRemotingClient = (NettyRemotingClient) remotingClient;
|
|
+ Channel channel = (Channel) getAndCreateChannel.invoke(nettyRemotingClient, getServerAddress());
|
|
+ HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY,
|
|
+ HAProxyProxiedProtocol.TCP4, "127.0.0.1", "127.0.0.2", 8000, 9000);
|
|
+
|
|
+ ByteBuf byteBuf = Unpooled.directBuffer();
|
|
+ Method encode = HAProxyMessageEncoder.class.getDeclaredMethod("encodeV2", HAProxyMessage.class, ByteBuf.class);
|
|
+ encode.setAccessible(true);
|
|
+ encode.invoke(HAProxyMessageEncoder.INSTANCE, message, byteBuf);
|
|
+ channel.writeAndFlush(byteBuf).sync();
|
|
+ }
|
|
+
|
|
+ private static RemotingCommand createRequest() {
|
|
+ RequestHeader requestHeader = new RequestHeader();
|
|
+ requestHeader.setCount(1);
|
|
+ requestHeader.setMessageTitle("Welcome");
|
|
+ return RemotingCommand.createRequestCommand(0, requestHeader);
|
|
+ }
|
|
+
|
|
+
|
|
+ private String getServerAddress() {
|
|
+ return "localhost:" + remotingServer.localListenPort();
|
|
+ }
|
|
+
|
|
+ private boolean isHostConnectable(String addr) {
|
|
+ try (Socket socket = new Socket()) {
|
|
+ socket.connect(NetworkUtil.string2SocketAddress(addr));
|
|
+ return true;
|
|
+ } catch (IOException ignored) {
|
|
+ }
|
|
+ return false;
|
|
+ }
|
|
+}
|
|
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
|
|
index 3da7abf57..de7edbbfb 100644
|
|
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
|
|
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
|
|
@@ -17,19 +17,6 @@
|
|
|
|
package org.apache.rocketmq.remoting;
|
|
|
|
-import java.io.BufferedInputStream;
|
|
-import java.io.BufferedOutputStream;
|
|
-import java.io.BufferedWriter;
|
|
-import java.io.File;
|
|
-import java.io.FileOutputStream;
|
|
-import java.io.FileWriter;
|
|
-import java.io.IOException;
|
|
-import java.io.InputStream;
|
|
-import java.io.PrintWriter;
|
|
-import java.net.Socket;
|
|
-import java.time.Duration;
|
|
-import java.util.UUID;
|
|
-import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.common.utils.NetworkUtil;
|
|
import org.apache.rocketmq.remoting.common.TlsMode;
|
|
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
|
|
@@ -47,6 +34,20 @@ import org.junit.rules.TestName;
|
|
import org.junit.runner.RunWith;
|
|
import org.mockito.junit.MockitoJUnitRunner;
|
|
|
|
+import java.io.BufferedInputStream;
|
|
+import java.io.BufferedOutputStream;
|
|
+import java.io.BufferedWriter;
|
|
+import java.io.File;
|
|
+import java.io.FileOutputStream;
|
|
+import java.io.FileWriter;
|
|
+import java.io.IOException;
|
|
+import java.io.InputStream;
|
|
+import java.io.PrintWriter;
|
|
+import java.net.Socket;
|
|
+import java.time.Duration;
|
|
+import java.util.UUID;
|
|
+import java.util.concurrent.TimeUnit;
|
|
+
|
|
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_AUTHSERVER;
|
|
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_CERTPATH;
|
|
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_KEYPASSWORD;
|
|
@@ -234,6 +235,7 @@ public class TlsTest {
|
|
@Test
|
|
public void serverAcceptsUntrustedClientCert() throws Exception {
|
|
requestThenAssertResponse();
|
|
+// Thread.sleep(1000000L);
|
|
}
|
|
|
|
/**
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 4f840afcb04f5cc328795896198c6fba96ff37ec Mon Sep 17 00:00:00 2001
|
|
From: mxsm <ljbmxsm@gmail.com>
|
|
Date: Wed, 5 Jul 2023 11:03:52 +0800
|
|
Subject: [PATCH 3/5] [ISSUE #6960] Added Slot formatting sketch comments
|
|
(#6961)
|
|
|
|
---
|
|
.../java/org/apache/rocketmq/store/timer/Slot.java | 10 +++++++++-
|
|
1 file changed, 9 insertions(+), 1 deletion(-)
|
|
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java b/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java
|
|
index b91193b94..2da846cee 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java
|
|
@@ -16,9 +16,17 @@
|
|
*/
|
|
package org.apache.rocketmq.store.timer;
|
|
|
|
+/**
|
|
+ * Represents a slot of timing wheel. Format:
|
|
+ * ┌────────────┬───────────┬───────────┬───────────┬───────────┐
|
|
+ * │delayed time│ first pos │ last pos │ num │ magic │
|
|
+ * ├────────────┼───────────┼───────────┼───────────┼───────────┤
|
|
+ * │ 8bytes │ 8bytes │ 8bytes │ 4bytes │ 4bytes │
|
|
+ * └────────────┴───────────┴───────────┴───────────┴───────────┘
|
|
+ */
|
|
public class Slot {
|
|
public static final short SIZE = 32;
|
|
- public final long timeMs;
|
|
+ public final long timeMs; //delayed time
|
|
public final long firstPos;
|
|
public final long lastPos;
|
|
public final int num;
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 58550f074ec101c0a158ede0df1839950e08837a Mon Sep 17 00:00:00 2001
|
|
From: rongtong <jinrongtong5@163.com>
|
|
Date: Mon, 10 Jul 2023 14:13:18 +0800
|
|
Subject: [PATCH 4/5] [ISSUE #7008] Fix the issue of protocol parsing failure
|
|
when using haproxy and tls together (#7009)
|
|
|
|
---
|
|
.../remoting/netty/NettyRemotingServer.java | 14 +++++++-------
|
|
1 file changed, 7 insertions(+), 7 deletions(-)
|
|
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
index 94ffd8d07..445f06cc6 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
@@ -459,13 +459,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
}
|
|
|
|
@Override
|
|
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
|
|
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
|
|
try {
|
|
- ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(in);
|
|
- if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
|
|
+ ProtocolDetectionResult<HAProxyProtocolVersion> detectionResult = HAProxyMessageDecoder.detectProtocol(byteBuf);
|
|
+ if (detectionResult.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
|
|
return;
|
|
}
|
|
- if (ha.state() == ProtocolDetectionState.DETECTED) {
|
|
+ if (detectionResult.state() == ProtocolDetectionState.DETECTED) {
|
|
ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
|
|
.addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
|
|
.addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
|
|
@@ -481,7 +481,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
}
|
|
|
|
// Hand over this message to the next .
|
|
- ctx.fireChannelRead(in.retain());
|
|
+ ctx.fireChannelRead(byteBuf.retain());
|
|
} catch (Exception e) {
|
|
log.error("process proxy protocol negotiator failed.", e);
|
|
throw e;
|
|
@@ -503,8 +503,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
@Override
|
|
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
|
|
|
|
- // Peek the first byte to determine if the content is starting with TLS handshake
|
|
- byte b = msg.getByte(0);
|
|
+ // Peek the current read index byte to determine if the content is starting with TLS handshake
|
|
+ byte b = msg.getByte(msg.readerIndex());
|
|
|
|
if (b == HANDSHAKE_MAGIC_CODE) {
|
|
switch (tlsMode) {
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 8e6b5e62bd4da78c0a7d265891c52685fcffd08a Mon Sep 17 00:00:00 2001
|
|
From: Zhouxiang Zhan <zhouxzhan@apache.org>
|
|
Date: Mon, 10 Jul 2023 20:14:17 +0800
|
|
Subject: [PATCH 5/5] [ISSUE #6999] Add interface ReceiptHandleManager (#7000)
|
|
|
|
* Add interface ReceiptHandleManager
|
|
|
|
* fix unit test
|
|
|
|
* fix
|
|
---
|
|
.../processor/ReceiptHandleProcessor.java | 10 +-
|
|
.../receipt/DefaultReceiptHandleManager.java | 282 ++++++++++++++++++
|
|
.../service/receipt/ReceiptHandleManager.java | 260 +---------------
|
|
...a => DefaultReceiptHandleManagerTest.java} | 34 +--
|
|
4 files changed, 307 insertions(+), 279 deletions(-)
|
|
create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
|
|
rename proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/{ReceiptHandleManagerTest.java => DefaultReceiptHandleManagerTest.java} (93%)
|
|
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
|
|
index 9c7e8dea9..fc49e7622 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
|
|
@@ -28,12 +28,12 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.proxy.common.RenewEvent;
|
|
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
|
|
import org.apache.rocketmq.proxy.common.ProxyContext;
|
|
-import org.apache.rocketmq.proxy.service.receipt.ReceiptHandleManager;
|
|
+import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
|
|
import org.apache.rocketmq.proxy.service.ServiceManager;
|
|
|
|
public class ReceiptHandleProcessor extends AbstractProcessor {
|
|
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
|
|
- protected ReceiptHandleManager receiptHandleManager;
|
|
+ protected DefaultReceiptHandleManager receiptHandleManager;
|
|
|
|
public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) {
|
|
super(messagingProcessor, serviceManager);
|
|
@@ -51,7 +51,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor {
|
|
event.getFuture().complete(v);
|
|
});
|
|
};
|
|
- this.receiptHandleManager = new ReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener);
|
|
+ this.receiptHandleManager = new DefaultReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener);
|
|
}
|
|
|
|
protected ProxyContext createContext(String actionName) {
|
|
@@ -59,11 +59,11 @@ public class ReceiptHandleProcessor extends AbstractProcessor {
|
|
}
|
|
|
|
public void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) {
|
|
- receiptHandleManager.addReceiptHandle(channel, group, msgID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(ctx, channel, group, msgID, messageReceiptHandle);
|
|
}
|
|
|
|
public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle) {
|
|
- return receiptHandleManager.removeReceiptHandle(channel, group, msgID, receiptHandle);
|
|
+ return receiptHandleManager.removeReceiptHandle(ctx, channel, group, msgID, receiptHandle);
|
|
}
|
|
|
|
public static class ReceiptHandleGroupKey {
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
|
|
new file mode 100644
|
|
index 000000000..c7633d658
|
|
--- /dev/null
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
|
|
@@ -0,0 +1,282 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+
|
|
+package org.apache.rocketmq.proxy.service.receipt;
|
|
+
|
|
+import com.google.common.base.Stopwatch;
|
|
+import io.netty.channel.Channel;
|
|
+import java.util.Map;
|
|
+import java.util.Set;
|
|
+import java.util.concurrent.CompletableFuture;
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
+import java.util.concurrent.Executors;
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
+import java.util.concurrent.TimeUnit;
|
|
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
|
|
+import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
|
|
+import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
|
|
+import org.apache.rocketmq.broker.client.ConsumerManager;
|
|
+import org.apache.rocketmq.client.consumer.AckResult;
|
|
+import org.apache.rocketmq.client.consumer.AckStatus;
|
|
+import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
+import org.apache.rocketmq.common.constant.LoggerName;
|
|
+import org.apache.rocketmq.common.consumer.ReceiptHandle;
|
|
+import org.apache.rocketmq.common.state.StateEventListener;
|
|
+import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
|
|
+import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
|
|
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
|
|
+import org.apache.rocketmq.common.utils.StartAndShutdown;
|
|
+import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
+import org.apache.rocketmq.proxy.common.RenewEvent;
|
|
+import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
|
|
+import org.apache.rocketmq.proxy.common.ProxyContext;
|
|
+import org.apache.rocketmq.proxy.common.ProxyException;
|
|
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
|
|
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
|
|
+import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
|
|
+import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
|
|
+import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
|
|
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
+import org.apache.rocketmq.proxy.config.ProxyConfig;
|
|
+import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
|
|
+import org.apache.rocketmq.proxy.service.metadata.MetadataService;
|
|
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
|
|
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
|
|
+
|
|
+public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implements ReceiptHandleManager {
|
|
+ protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
|
|
+ protected final MetadataService metadataService;
|
|
+ protected final ConsumerManager consumerManager;
|
|
+ protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap;
|
|
+ protected final StateEventListener<RenewEvent> eventListener;
|
|
+ protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy();
|
|
+ protected final ScheduledExecutorService scheduledExecutorService =
|
|
+ Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
|
|
+ protected final ThreadPoolExecutor renewalWorkerService;
|
|
+
|
|
+ public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
|
|
+ this.metadataService = metadataService;
|
|
+ this.consumerManager = consumerManager;
|
|
+ this.eventListener = eventListener;
|
|
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
|
|
+ this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
|
|
+ proxyConfig.getRenewThreadPoolNums(),
|
|
+ proxyConfig.getRenewMaxThreadPoolNums(),
|
|
+ 1, TimeUnit.MINUTES,
|
|
+ "RenewalWorkerThread",
|
|
+ proxyConfig.getRenewThreadPoolQueueCapacity()
|
|
+ );
|
|
+ consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
|
|
+ @Override
|
|
+ public void handle(ConsumerGroupEvent event, String group, Object... args) {
|
|
+ if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) {
|
|
+ if (args == null || args.length < 1) {
|
|
+ return;
|
|
+ }
|
|
+ if (args[0] instanceof ClientChannelInfo) {
|
|
+ ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
|
|
+ if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
|
|
+ // if the channel sync from other proxy is expired, not to clear data of connect to current proxy
|
|
+ return;
|
|
+ }
|
|
+ clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
|
|
+ log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void shutdown() {
|
|
+
|
|
+ }
|
|
+ });
|
|
+ this.receiptHandleGroupMap = new ConcurrentHashMap<>();
|
|
+ this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
|
|
+ this.appendStartAndShutdown(new StartAndShutdown() {
|
|
+ @Override
|
|
+ public void start() throws Exception {
|
|
+ scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0,
|
|
+ ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void shutdown() throws Exception {
|
|
+ scheduledExecutorService.shutdown();
|
|
+ clearAllHandle();
|
|
+ }
|
|
+ });
|
|
+ }
|
|
+
|
|
+ public void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) {
|
|
+ ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
|
|
+ k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
|
|
+ }
|
|
+
|
|
+ public MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle) {
|
|
+ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
|
|
+ if (handleGroup == null) {
|
|
+ return null;
|
|
+ }
|
|
+ return handleGroup.remove(msgID, receiptHandle);
|
|
+ }
|
|
+
|
|
+ protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) {
|
|
+ return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null;
|
|
+ }
|
|
+
|
|
+ protected void scheduleRenewTask() {
|
|
+ Stopwatch stopwatch = Stopwatch.createStarted();
|
|
+ try {
|
|
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
|
|
+ for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
|
|
+ ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey();
|
|
+ if (clientIsOffline(key)) {
|
|
+ clearGroup(key);
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ ReceiptHandleGroup group = entry.getValue();
|
|
+ group.scan((msgID, handleStr, v) -> {
|
|
+ long current = System.currentTimeMillis();
|
|
+ ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr());
|
|
+ if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) {
|
|
+ return;
|
|
+ }
|
|
+ renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr));
|
|
+ });
|
|
+ }
|
|
+ } catch (Exception e) {
|
|
+ log.error("unexpect error when schedule renew task", e);
|
|
+ }
|
|
+
|
|
+ log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis());
|
|
+ }
|
|
+
|
|
+ protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) {
|
|
+ try {
|
|
+ group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
|
|
+ } catch (Exception e) {
|
|
+ log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
|
|
+ CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>();
|
|
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
|
|
+ long current = System.currentTimeMillis();
|
|
+ try {
|
|
+ if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) {
|
|
+ log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle);
|
|
+ return CompletableFuture.completedFuture(null);
|
|
+ }
|
|
+ if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
|
|
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
|
|
+ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
|
|
+ future.whenComplete((ackResult, throwable) -> {
|
|
+ if (throwable != null) {
|
|
+ log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
|
|
+ if (renewExceptionNeedRetry(throwable)) {
|
|
+ messageReceiptHandle.incrementAndGetRenewRetryTimes();
|
|
+ resFuture.complete(messageReceiptHandle);
|
|
+ } else {
|
|
+ resFuture.complete(null);
|
|
+ }
|
|
+ } else if (AckStatus.OK.equals(ackResult.getStatus())) {
|
|
+ messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
|
|
+ messageReceiptHandle.resetRenewRetryTimes();
|
|
+ messageReceiptHandle.incrementRenewTimes();
|
|
+ resFuture.complete(messageReceiptHandle);
|
|
+ } else {
|
|
+ log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle);
|
|
+ resFuture.complete(null);
|
|
+ }
|
|
+ });
|
|
+ } else {
|
|
+ ProxyContext context = createContext("RenewMessage");
|
|
+ SubscriptionGroupConfig subscriptionGroupConfig =
|
|
+ metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup());
|
|
+ if (subscriptionGroupConfig == null) {
|
|
+ log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle);
|
|
+ return CompletableFuture.completedFuture(null);
|
|
+ }
|
|
+ RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
|
|
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
|
|
+ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future));
|
|
+ future.whenComplete((ackResult, throwable) -> {
|
|
+ if (throwable != null) {
|
|
+ log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable);
|
|
+ }
|
|
+ resFuture.complete(null);
|
|
+ });
|
|
+ }
|
|
+ } catch (Throwable t) {
|
|
+ log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t);
|
|
+ resFuture.complete(null);
|
|
+ }
|
|
+ return resFuture;
|
|
+ }
|
|
+
|
|
+ protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) {
|
|
+ if (key == null) {
|
|
+ return;
|
|
+ }
|
|
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
|
|
+ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key);
|
|
+ if (handleGroup == null) {
|
|
+ return;
|
|
+ }
|
|
+ handleGroup.scan((msgID, handle, v) -> {
|
|
+ try {
|
|
+ handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> {
|
|
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
|
|
+ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future));
|
|
+ return CompletableFuture.completedFuture(null);
|
|
+ });
|
|
+ } catch (Exception e) {
|
|
+ log.error("error when clear handle for group. key:{}", key, e);
|
|
+ }
|
|
+ });
|
|
+ }
|
|
+
|
|
+ protected void clearAllHandle() {
|
|
+ log.info("start clear all handle in receiptHandleProcessor");
|
|
+ Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
|
|
+ for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
|
|
+ clearGroup(key);
|
|
+ }
|
|
+ log.info("clear all handle in receiptHandleProcessor done");
|
|
+ }
|
|
+
|
|
+ protected boolean renewExceptionNeedRetry(Throwable t) {
|
|
+ t = ExceptionUtils.getRealException(t);
|
|
+ if (t instanceof ProxyException) {
|
|
+ ProxyException proxyException = (ProxyException) t;
|
|
+ if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
|
|
+ ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
|
|
+ return false;
|
|
+ }
|
|
+ }
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ protected ProxyContext createContext(String actionName) {
|
|
+ return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName);
|
|
+ }
|
|
+}
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
|
|
index f3b805624..6a8888e97 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
|
|
@@ -17,266 +17,12 @@
|
|
|
|
package org.apache.rocketmq.proxy.service.receipt;
|
|
|
|
-import com.google.common.base.Stopwatch;
|
|
import io.netty.channel.Channel;
|
|
-import java.util.Map;
|
|
-import java.util.Set;
|
|
-import java.util.concurrent.CompletableFuture;
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
-import java.util.concurrent.ConcurrentMap;
|
|
-import java.util.concurrent.Executors;
|
|
-import java.util.concurrent.ScheduledExecutorService;
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
-import java.util.concurrent.TimeUnit;
|
|
-import org.apache.rocketmq.broker.client.ClientChannelInfo;
|
|
-import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
|
|
-import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
|
|
-import org.apache.rocketmq.broker.client.ConsumerManager;
|
|
-import org.apache.rocketmq.client.consumer.AckResult;
|
|
-import org.apache.rocketmq.client.consumer.AckStatus;
|
|
-import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
-import org.apache.rocketmq.common.constant.LoggerName;
|
|
-import org.apache.rocketmq.common.consumer.ReceiptHandle;
|
|
-import org.apache.rocketmq.common.state.StateEventListener;
|
|
-import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
|
|
-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
|
|
-import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
|
|
-import org.apache.rocketmq.common.utils.StartAndShutdown;
|
|
-import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
-import org.apache.rocketmq.proxy.common.RenewEvent;
|
|
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
|
|
import org.apache.rocketmq.proxy.common.ProxyContext;
|
|
-import org.apache.rocketmq.proxy.common.ProxyException;
|
|
-import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
|
|
-import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
|
|
-import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
|
|
-import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
|
|
-import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
|
|
-import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
-import org.apache.rocketmq.proxy.config.ProxyConfig;
|
|
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
|
|
-import org.apache.rocketmq.proxy.service.metadata.MetadataService;
|
|
-import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
|
|
-import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
|
|
|
|
-public class ReceiptHandleManager extends AbstractStartAndShutdown {
|
|
- protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
|
|
- protected final MetadataService metadataService;
|
|
- protected final ConsumerManager consumerManager;
|
|
- protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap;
|
|
- protected final StateEventListener<RenewEvent> eventListener;
|
|
- protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy();
|
|
- protected final ScheduledExecutorService scheduledExecutorService =
|
|
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
|
|
- protected final ThreadPoolExecutor renewalWorkerService;
|
|
+public interface ReceiptHandleManager {
|
|
+ void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle);
|
|
|
|
- public ReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
|
|
- this.metadataService = metadataService;
|
|
- this.consumerManager = consumerManager;
|
|
- this.eventListener = eventListener;
|
|
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
|
|
- this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
|
|
- proxyConfig.getRenewThreadPoolNums(),
|
|
- proxyConfig.getRenewMaxThreadPoolNums(),
|
|
- 1, TimeUnit.MINUTES,
|
|
- "RenewalWorkerThread",
|
|
- proxyConfig.getRenewThreadPoolQueueCapacity()
|
|
- );
|
|
- consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
|
|
- @Override
|
|
- public void handle(ConsumerGroupEvent event, String group, Object... args) {
|
|
- if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) {
|
|
- if (args == null || args.length < 1) {
|
|
- return;
|
|
- }
|
|
- if (args[0] instanceof ClientChannelInfo) {
|
|
- ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
|
|
- if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
|
|
- // if the channel sync from other proxy is expired, not to clear data of connect to current proxy
|
|
- return;
|
|
- }
|
|
- clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
|
|
- log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
|
|
- }
|
|
- }
|
|
- }
|
|
-
|
|
- @Override
|
|
- public void shutdown() {
|
|
-
|
|
- }
|
|
- });
|
|
- this.receiptHandleGroupMap = new ConcurrentHashMap<>();
|
|
- this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
|
|
- this.appendStartAndShutdown(new StartAndShutdown() {
|
|
- @Override
|
|
- public void start() throws Exception {
|
|
- scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0,
|
|
- ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
|
|
- }
|
|
-
|
|
- @Override
|
|
- public void shutdown() throws Exception {
|
|
- scheduledExecutorService.shutdown();
|
|
- clearAllHandle();
|
|
- }
|
|
- });
|
|
- }
|
|
-
|
|
- public void addReceiptHandle(Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) {
|
|
- ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
|
|
- k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
|
|
- }
|
|
-
|
|
- public MessageReceiptHandle removeReceiptHandle(Channel channel, String group, String msgID, String receiptHandle) {
|
|
- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
|
|
- if (handleGroup == null) {
|
|
- return null;
|
|
- }
|
|
- return handleGroup.remove(msgID, receiptHandle);
|
|
- }
|
|
-
|
|
- protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) {
|
|
- return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null;
|
|
- }
|
|
-
|
|
- public void scheduleRenewTask() {
|
|
- Stopwatch stopwatch = Stopwatch.createStarted();
|
|
- try {
|
|
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
|
|
- for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
|
|
- ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey();
|
|
- if (clientIsOffline(key)) {
|
|
- clearGroup(key);
|
|
- continue;
|
|
- }
|
|
-
|
|
- ReceiptHandleGroup group = entry.getValue();
|
|
- group.scan((msgID, handleStr, v) -> {
|
|
- long current = System.currentTimeMillis();
|
|
- ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr());
|
|
- if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) {
|
|
- return;
|
|
- }
|
|
- renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr));
|
|
- });
|
|
- }
|
|
- } catch (Exception e) {
|
|
- log.error("unexpect error when schedule renew task", e);
|
|
- }
|
|
-
|
|
- log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis());
|
|
- }
|
|
-
|
|
- protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) {
|
|
- try {
|
|
- group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
|
|
- } catch (Exception e) {
|
|
- log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e);
|
|
- }
|
|
- }
|
|
-
|
|
- protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
|
|
- CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>();
|
|
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
|
|
- long current = System.currentTimeMillis();
|
|
- try {
|
|
- if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) {
|
|
- log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle);
|
|
- return CompletableFuture.completedFuture(null);
|
|
- }
|
|
- if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
|
|
- CompletableFuture<AckResult> future = new CompletableFuture<>();
|
|
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
|
|
- future.whenComplete((ackResult, throwable) -> {
|
|
- if (throwable != null) {
|
|
- log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
|
|
- if (renewExceptionNeedRetry(throwable)) {
|
|
- messageReceiptHandle.incrementAndGetRenewRetryTimes();
|
|
- resFuture.complete(messageReceiptHandle);
|
|
- } else {
|
|
- resFuture.complete(null);
|
|
- }
|
|
- } else if (AckStatus.OK.equals(ackResult.getStatus())) {
|
|
- messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
|
|
- messageReceiptHandle.resetRenewRetryTimes();
|
|
- messageReceiptHandle.incrementRenewTimes();
|
|
- resFuture.complete(messageReceiptHandle);
|
|
- } else {
|
|
- log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle);
|
|
- resFuture.complete(null);
|
|
- }
|
|
- });
|
|
- } else {
|
|
- ProxyContext context = createContext("RenewMessage");
|
|
- SubscriptionGroupConfig subscriptionGroupConfig =
|
|
- metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup());
|
|
- if (subscriptionGroupConfig == null) {
|
|
- log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle);
|
|
- return CompletableFuture.completedFuture(null);
|
|
- }
|
|
- RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
|
|
- CompletableFuture<AckResult> future = new CompletableFuture<>();
|
|
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future));
|
|
- future.whenComplete((ackResult, throwable) -> {
|
|
- if (throwable != null) {
|
|
- log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable);
|
|
- }
|
|
- resFuture.complete(null);
|
|
- });
|
|
- }
|
|
- } catch (Throwable t) {
|
|
- log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t);
|
|
- resFuture.complete(null);
|
|
- }
|
|
- return resFuture;
|
|
- }
|
|
-
|
|
- protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) {
|
|
- if (key == null) {
|
|
- return;
|
|
- }
|
|
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
|
|
- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key);
|
|
- if (handleGroup == null) {
|
|
- return;
|
|
- }
|
|
- handleGroup.scan((msgID, handle, v) -> {
|
|
- try {
|
|
- handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> {
|
|
- CompletableFuture<AckResult> future = new CompletableFuture<>();
|
|
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future));
|
|
- return CompletableFuture.completedFuture(null);
|
|
- });
|
|
- } catch (Exception e) {
|
|
- log.error("error when clear handle for group. key:{}", key, e);
|
|
- }
|
|
- });
|
|
- }
|
|
-
|
|
- public void clearAllHandle() {
|
|
- log.info("start clear all handle in receiptHandleProcessor");
|
|
- Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
|
|
- for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
|
|
- clearGroup(key);
|
|
- }
|
|
- log.info("clear all handle in receiptHandleProcessor done");
|
|
- }
|
|
-
|
|
- protected boolean renewExceptionNeedRetry(Throwable t) {
|
|
- t = ExceptionUtils.getRealException(t);
|
|
- if (t instanceof ProxyException) {
|
|
- ProxyException proxyException = (ProxyException) t;
|
|
- if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
|
|
- ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
|
|
- return false;
|
|
- }
|
|
- }
|
|
- return true;
|
|
- }
|
|
-
|
|
- protected ProxyContext createContext(String actionName) {
|
|
- return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName);
|
|
- }
|
|
+ MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle);
|
|
}
|
|
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
|
|
similarity index 93%
|
|
rename from proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
|
|
rename to proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
|
|
index 877c9fd6f..7c6943e44 100644
|
|
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
|
|
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
|
|
@@ -62,8 +62,8 @@ import static org.awaitility.Awaitility.await;
|
|
import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.assertTrue;
|
|
|
|
-public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
- private ReceiptHandleManager receiptHandleManager;
|
|
+public class DefaultReceiptHandleManagerTest extends BaseServiceTest {
|
|
+ private DefaultReceiptHandleManager receiptHandleManager;
|
|
@Mock
|
|
protected MessagingProcessor messagingProcessor;
|
|
@Mock
|
|
@@ -87,7 +87,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
|
|
@Before
|
|
public void setup() {
|
|
- receiptHandleManager = new ReceiptHandleManager(metadataService, consumerManager, new StateEventListener<RenewEvent>() {
|
|
+ receiptHandleManager = new DefaultReceiptHandleManager(metadataService, consumerManager, new StateEventListener<RenewEvent>() {
|
|
@Override
|
|
public void fireEvent(RenewEvent event) {
|
|
MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle();
|
|
@@ -125,7 +125,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
@Test
|
|
public void testAddReceiptHandle() {
|
|
Channel channel = new LocalChannel();
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
|
|
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
|
|
receiptHandleManager.scheduleRenewTask();
|
|
@@ -152,9 +152,9 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
.build().encode();
|
|
MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
|
|
RECONSUME_TIMES);
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
}
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
|
|
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
|
|
receiptHandleManager.scheduleRenewTask();
|
|
@@ -170,7 +170,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
public void testRenewReceiptHandle() {
|
|
ProxyConfig config = ConfigurationManager.getProxyConfig();
|
|
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
|
|
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
|
|
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
|
|
@@ -216,7 +216,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
public void testRenewExceedMaxRenewTimes() {
|
|
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
|
|
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
|
|
CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
|
|
ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
|
|
@@ -246,7 +246,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
public void testRenewWithInvalidHandle() {
|
|
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
|
|
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
|
|
CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
|
|
ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
|
|
@@ -270,7 +270,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
ProxyConfig config = ConfigurationManager.getProxyConfig();
|
|
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
|
|
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
|
|
AtomicInteger count = new AtomicInteger(0);
|
|
List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
|
|
@@ -348,7 +348,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
|
|
RECONSUME_TIMES);
|
|
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
|
|
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
|
|
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
|
|
@@ -382,7 +382,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
|
|
RECONSUME_TIMES);
|
|
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
|
|
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(null);
|
|
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()))
|
|
@@ -418,7 +418,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
|
|
RECONSUME_TIMES);
|
|
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
|
|
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
|
|
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
|
|
@@ -431,8 +431,8 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
@Test
|
|
public void testRemoveReceiptHandle() {
|
|
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
- receiptHandleManager.removeReceiptHandle(channel, GROUP, MSG_ID, receiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.removeReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle);
|
|
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
|
|
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
|
|
receiptHandleManager.scheduleRenewTask();
|
|
@@ -444,7 +444,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
@Test
|
|
public void testClearGroup() {
|
|
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
receiptHandleManager.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
|
|
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
|
|
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
|
|
@@ -459,7 +459,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
|
|
ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
|
|
Mockito.verify(consumerManager, Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture());
|
|
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
|
|
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
|
|
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
|
|
assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty());
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|