rocketmq/patch005-backport-fix-some-bugs.patch
2023-09-15 15:23:50 +08:00

1539 lines
78 KiB
Diff

From 15c6889bb0abd014c06ef1452f791db9daa1ea08 Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Tue, 11 Jul 2023 17:04:00 +0800
Subject: [PATCH 1/8] fix receive message activity attempt id not correct
(#7012)
fix receive message activity attempt id not correct
---
.../proxy/grpc/v2/consumer/ReceiveMessageActivity.java | 2 +-
.../proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java | 6 +++---
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index a504179a9..cf58bb87a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -130,7 +130,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity {
subscriptionData,
fifo,
new PopMessageResultFilterImpl(maxAttempts),
- request.getAttemptId(),
+ request.hasAttemptId() ? request.getAttemptId() : null,
timeRemaining
).thenAccept(popResult -> {
if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index 2e562504a..7fd9a9ffd 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -57,6 +57,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -89,7 +90,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
.setRequestTimeout(Durations.fromSeconds(3))
.build());
when(this.messagingProcessor.popMessage(any(), any(), anyString(), anyString(), anyInt(), anyLong(),
- pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), anyString(), anyLong()))
+ pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong()))
.thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList())));
@@ -223,7 +224,6 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
assertEquals(Code.ILLEGAL_INVISIBLE_TIME, getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor.getAllValues()));
}
-
@Test
public void testReceiveMessage() {
StreamObserver<ReceiveMessageResponse> receiveStreamObserver = mock(ServerCallStreamObserver.class);
@@ -245,7 +245,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
any(),
anyBoolean(),
any(),
- anyString(),
+ isNull(),
anyLong())).thenReturn(CompletableFuture.completedFuture(popResult));
this.receiveMessageActivity.receiveMessage(
--
2.32.0.windows.2
From b4496be68705c1c0b282a07a1adeab4fffd670fe Mon Sep 17 00:00:00 2001
From: ShuangxiDing <dingshuangxi888@gmail.com>
Date: Tue, 11 Jul 2023 19:09:09 +0800
Subject: [PATCH 2/8] [ISSUE #7010] Fix the HandshakeHandler returns when
detect haproxy version need more data (#7011)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator
* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC server.
* Support proxy protocol for gRPC and Remoting server.
* 回滚netty的升级
* Support proxy protocol for gRPC and Remoting server.
* Support proxy protocol for gRPC and Remoting server.
* Support proxy protocol for gRPC and Remoting server.
* add grpc-netty-codec-haproxy in bazel
* add grpc-netty-codec-haproxy in bazel
* Support proxy protocol for gRPC and Remoting server.
* Fix Test
* add grpc-netty-codec-haproxy in bazel
* add ProxyProtocolTest for Remoting
* Move AttributeKey from RemotingHelper to AttributeKey.
* Fix the needs more data for HandshakeHandler.
* Fix the needs more data for HandshakeHandler.
* Fix the needs more data for HandshakeHandler.
* Fix the needs more data for HandshakeHandler.
---------
Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com>
---
.../remoting/MultiProtocolRemotingServer.java | 2 +-
.../activity/AbstractRemotingActivity.java | 16 +++++++------
.../activity/ClientManagerActivity.java | 24 ++++++++++---------
.../AbstractRemotingActivityTest.java | 10 ++++----
.../remoting/common/RemotingHelper.java | 21 ++++------------
.../remoting/netty/AttributeKeys.java | 11 ++++++++-
.../remoting/netty/NettyRemotingServer.java | 23 ++++++------------
7 files changed, 51 insertions(+), 56 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
index 858b1f022..12d728fff 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -78,7 +78,7 @@ public class MultiProtocolRemotingServer extends NettyRemotingServer {
@Override
protected ChannelPipeline configChannel(SocketChannel ch) {
return ch.pipeline()
- .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, this.getHandshakeHandler())
+ .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
.addLast(this.getDefaultEventExecutorGroup(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new ProtocolNegotiationHandler(this.remotingProtocolHandler)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
index 78cd203ec..ce4a63397 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
@@ -19,9 +19,6 @@ package org.apache.rocketmq.proxy.remoting.activity;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -41,11 +38,16 @@ import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
public abstract class AbstractRemotingActivity implements NettyRequestProcessor {
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected final MessagingProcessor messagingProcessor;
@@ -126,13 +128,13 @@ public abstract class AbstractRemotingActivity implements NettyRequestProcessor
.setProtocolType(ChannelProtocolType.REMOTING.getName())
.setChannel(channel)
.setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress()))
- .setRemoteAddress(NetworkUtil.socketAddress2String(ctx.channel().remoteAddress()));
+ .setRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.LANGUAGE_CODE_KEY, channel))
+ Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY, channel))
.ifPresent(language -> context.setLanguage(language.name()));
- Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.CLIENT_ID_KEY, channel))
+ Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY, channel))
.ifPresent(context::setClientID);
- Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.VERSION_KEY, channel))
+ Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, channel))
.ifPresent(version -> context.setClientVersion(MQVersion.getVersionDesc(version)));
return context;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
index 1eb81ce92..c671593a3 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
@@ -19,13 +19,20 @@ package org.apache.rocketmq.proxy.remoting.activity;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import java.util.Set;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerChangeListener;
import org.apache.rocketmq.broker.client.ProducerGroupEvent;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
@@ -33,13 +40,8 @@ import org.apache.rocketmq.remoting.protocol.header.UnregisterClientResponseHead
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData;
-import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.processor.MessagingProcessor;
-import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
-import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
-import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.util.Set;
public class ClientManagerActivity extends AbstractRemotingActivity {
@@ -108,9 +110,9 @@ public class ClientManagerActivity extends AbstractRemotingActivity {
if (channel instanceof RemotingChannel) {
RemotingChannel remotingChannel = (RemotingChannel) channel;
Channel parent = remotingChannel.parent();
- RemotingHelper.setPropertyToAttr(parent, RemotingHelper.CLIENT_ID_KEY, clientChannelInfo.getClientId());
- RemotingHelper.setPropertyToAttr(parent, RemotingHelper.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage());
- RemotingHelper.setPropertyToAttr(parent, RemotingHelper.VERSION_KEY, clientChannelInfo.getVersion());
+ RemotingHelper.setPropertyToAttr(parent, AttributeKeys.CLIENT_ID_KEY, clientChannelInfo.getClientId());
+ RemotingHelper.setPropertyToAttr(parent, AttributeKeys.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage());
+ RemotingHelper.setPropertyToAttr(parent, AttributeKeys.VERSION_KEY, clientChannelInfo.getVersion());
}
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
index 663a83e3c..b2bd3a35f 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
@@ -21,7 +21,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
-import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -35,6 +34,7 @@ import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
@@ -48,6 +48,8 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
+import java.util.concurrent.CompletableFuture;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -82,9 +84,9 @@ public class AbstractRemotingActivityTest extends InitConfigTest {
}
};
Channel channel = ctx.channel();
- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.CLIENT_ID_KEY, CLIENT_ID);
- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.LANGUAGE_CODE_KEY, LanguageCode.JAVA);
- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.VERSION_KEY, MQVersion.CURRENT_VERSION);
+ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.CLIENT_ID_KEY, CLIENT_ID);
+ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.LANGUAGE_CODE_KEY, LanguageCode.JAVA);
+ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.VERSION_KEY, MQVersion.CURRENT_VERSION);
}
@Test
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index d0750b678..363b22eac 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -22,7 +22,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -31,8 +30,8 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
@@ -51,16 +50,6 @@ public class RemotingHelper {
public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
- private static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr");
-
- private static final AttributeKey<String> PROXY_PROTOCOL_ADDR = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
- private static final AttributeKey<String> PROXY_PROTOCOL_PORT = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
-
- public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId");
-
- public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version");
-
- public static final AttributeKey<LanguageCode> LANGUAGE_CODE_KEY = AttributeKey.valueOf("LanguageCode");
public static final Map<Integer, String> REQUEST_CODE_MAP = new HashMap<Integer, String>() {
{
@@ -213,7 +202,7 @@ public class RemotingHelper {
if (StringUtils.isNotBlank(addr)) {
return addr;
}
- Attribute<String> att = channel.attr(REMOTE_ADDR_KEY);
+ Attribute<String> att = channel.attr(AttributeKeys.REMOTE_ADDR_KEY);
if (att == null) {
// mocked in unit test
return parseChannelRemoteAddr0(channel);
@@ -227,11 +216,11 @@ public class RemotingHelper {
}
private static String getProxyProtocolAddress(Channel channel) {
- if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) {
+ if (!channel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
return null;
}
- String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR, channel);
- String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT, channel);
+ String proxyProtocolAddr = getAttributeValue(AttributeKeys.PROXY_PROTOCOL_ADDR, channel);
+ String proxyProtocolPort = getAttributeValue(AttributeKeys.PROXY_PROTOCOL_PORT, channel);
if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort == null) {
return null;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
index 4e69ab82d..ebdde31f4 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
@@ -19,12 +19,21 @@ package org.apache.rocketmq.remoting.netty;
import io.netty.util.AttributeKey;
import org.apache.rocketmq.common.constant.HAProxyConstants;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class AttributeKeys {
+ public static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr");
+
+ public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId");
+
+ public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version");
+
+ public static final AttributeKey<LanguageCode> LANGUAGE_CODE_KEY = AttributeKey.valueOf("LanguageCode");
+
public static final AttributeKey<String> PROXY_PROTOCOL_ADDR =
AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
@@ -40,6 +49,6 @@ public class AttributeKeys {
private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP = new ConcurrentHashMap<>();
public static AttributeKey<String> valueOf(String name) {
- return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf);
+ return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKey::valueOf);
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 445f06cc6..8ae87a6fa 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -37,6 +37,7 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.ProtocolDetectionResult;
import io.netty.handler.codec.ProtocolDetectionState;
import io.netty.handler.codec.haproxy.HAProxyMessage;
@@ -73,6 +74,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
+import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -115,7 +117,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
// sharable handlers
- private HandshakeHandler handshakeHandler;
+ private TlsModeHandler tlsModeHandler;
private NettyEncoder encoder;
private NettyConnectManageHandler connectionManageHandler;
private NettyServerHandler serverHandler;
@@ -265,7 +267,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
*/
protected ChannelPipeline configChannel(SocketChannel ch) {
return ch.pipeline()
- .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
+ .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
@@ -402,7 +404,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
private void prepareSharableHandlers() {
- handshakeHandler = new HandshakeHandler();
+ tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
serverHandler = new NettyServerHandler();
@@ -429,10 +431,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
return defaultEventExecutorGroup;
}
- public HandshakeHandler getHandshakeHandler() {
- return handshakeHandler;
- }
-
public NettyEncoder getEncoder() {
return encoder;
}
@@ -449,17 +447,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
return distributionHandler;
}
- @ChannelHandler.Sharable
- public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
-
- private final TlsModeHandler tlsModeHandler;
+ public class HandshakeHandler extends ByteToMessageDecoder {
public HandshakeHandler() {
- tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
}
@Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
+ protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
try {
ProtocolDetectionResult<HAProxyProtocolVersion> detectionResult = HAProxyMessageDecoder.detectProtocol(byteBuf);
if (detectionResult.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
@@ -479,9 +473,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
} catch (NoSuchElementException e) {
log.error("Error while removing HandshakeHandler", e);
}
-
- // Hand over this message to the next .
- ctx.fireChannelRead(byteBuf.retain());
} catch (Exception e) {
log.error("process proxy protocol negotiator failed.", e);
throw e;
--
2.32.0.windows.2
From 1f0f3b2d6d16de7b315c702d33f7d3557c0fc25c Mon Sep 17 00:00:00 2001
From: Ji Juntao <juntao.jjt@alibaba-inc.com>
Date: Tue, 11 Jul 2023 21:13:06 +0800
Subject: [PATCH 3/8] [ISSUE #7013] Polish ColdDataCheckService's logic (#7014)
* polish coldCtrl
* remove the catch.
---
.../src/main/java/org/apache/rocketmq/store/CommitLog.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 5a5c90c5a..e6ee3bacc 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -2089,6 +2089,11 @@ public class CommitLog implements Swappable {
} else {
this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs());
}
+
+ if (pageSize < 0) {
+ initPageSize();
+ }
+
long beginClockTimestamp = this.systemClock.now();
scanFilesInPageCache();
long costTime = this.systemClock.now() - beginClockTimestamp;
@@ -2182,7 +2187,7 @@ public class CommitLog implements Swappable {
}
private void initPageSize() {
- if (pageSize < 0) {
+ if (pageSize < 0 && defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
try {
if (!MixAll.isWindows()) {
pageSize = LibC.INSTANCE.getpagesize();
--
2.32.0.windows.2
From d206590692bfdffca6bc58327e9533bc4bb68122 Mon Sep 17 00:00:00 2001
From: Lei Zhiyuan <leizhiyuan@gmail.com>
Date: Thu, 13 Jul 2023 11:17:38 +0800
Subject: [PATCH 4/8] [ISSUE #6979] Fix opaque will be duplicate in multi
client scene (#6985)
---
.../proxy/processor/DefaultMessagingProcessor.java | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 1b3f0af4e..188cb7b9b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -235,13 +235,23 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
@Override
public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
long timeoutMillis) {
- return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis);
+ int originalRequestOpaque = request.getOpaque();
+ request.setOpaque(RemotingCommand.createNewRequestId());
+ return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis).thenApply(r -> {
+ request.setOpaque(originalRequestOpaque);
+ return r;
+ });
}
@Override
public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
long timeoutMillis) {
- return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis);
+ int originalRequestOpaque = request.getOpaque();
+ request.setOpaque(RemotingCommand.createNewRequestId());
+ return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis).thenApply(r -> {
+ request.setOpaque(originalRequestOpaque);
+ return r;
+ });
}
@Override
--
2.32.0.windows.2
From 33cb22e1c0fa7ba980567117230fe443ff5dbd62 Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Thu, 13 Jul 2023 15:37:24 +0800
Subject: [PATCH 5/8] [ISSUE #7018] fix append in tiered storage when message
offset incorrect (#7019)
* fix append in tiered storage when message offset incorrect
---
.../tieredstore/TieredDispatcher.java | 25 ++++++++++------
.../tieredstore/file/CompositeFlatFile.java | 30 +++++++++----------
.../file/CompositeQueueFlatFile.java | 2 +-
.../file/CompositeQueueFlatFileTest.java | 4 +--
4 files changed, 34 insertions(+), 27 deletions(-)
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 2a8e2ed71..6584b0e89 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -308,9 +308,18 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
doRedispatchRequestToWriteMap(
result, flatFile, dispatchOffset, newCommitLogOffset, size, tagCode, message.getByteBuffer());
message.release();
- if (result != AppendResult.SUCCESS) {
- dispatchOffset--;
- break;
+
+ switch (result) {
+ case SUCCESS:
+ continue;
+ case FILE_CLOSED:
+ tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
+ logger.info("TieredDispatcher#dispatchFlatFile: file has been close and destroy, " +
+ "topic: {}, queueId: {}", topic, queueId);
+ return;
+ default:
+ dispatchOffset--;
+ break;
}
}
@@ -341,15 +350,13 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
switch (result) {
case SUCCESS:
- break;
- case OFFSET_INCORRECT:
long offset = MessageBufferUtil.getQueueOffset(message);
if (queueOffset != offset) {
- logger.error("[Bug] Commitlog offset incorrect, " +
- "result={}, topic={}, queueId={}, offset={}, msg offset={}",
- result, topic, queueId, queueOffset, offset);
+ logger.error("Message cq offset in commitlog does not meet expectations, " +
+ "result={}, topic={}, queueId={}, cq offset={}, msg offset={}",
+ AppendResult.OFFSET_INCORRECT, topic, queueId, queueOffset, offset);
}
- return;
+ break;
case BUFFER_FULL:
logger.debug("Commitlog buffer full, result={}, topic={}, queueId={}, offset={}",
result, topic, queueId, queueOffset);
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
index 1243f7721..8f8ba98b1 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -58,7 +59,7 @@ public class CompositeFlatFile implements CompositeAccess {
* dispatched to the current chunk, indicating the progress of the message distribution.
* It's consume queue current offset.
*/
- protected volatile long dispatchOffset;
+ protected final AtomicLong dispatchOffset;
protected final ReentrantLock compositeFlatFileLock;
protected final TieredMessageStoreConfig storeConfig;
@@ -75,6 +76,7 @@ public class CompositeFlatFile implements CompositeAccess {
this.storeConfig = fileQueueFactory.getStoreConfig();
this.readAheadFactor = this.storeConfig.getReadAheadMinFactor();
this.metadataStore = TieredStoreUtil.getMetadataStore(this.storeConfig);
+ this.dispatchOffset = new AtomicLong();
this.compositeFlatFileLock = new ReentrantLock();
this.inFlightRequestMap = new ConcurrentHashMap<>();
this.commitLog = new TieredCommitLog(fileQueueFactory, filePath);
@@ -83,8 +85,8 @@ public class CompositeFlatFile implements CompositeAccess {
}
protected void recoverMetadata() {
- if (!consumeQueue.isInitialized() && this.dispatchOffset != -1) {
- consumeQueue.setBaseOffset(this.dispatchOffset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) {
+ consumeQueue.setBaseOffset(this.dispatchOffset.get() * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
}
}
@@ -144,7 +146,7 @@ public class CompositeFlatFile implements CompositeAccess {
}
public long getDispatchOffset() {
- return dispatchOffset;
+ return dispatchOffset.get();
}
@Override
@@ -309,7 +311,7 @@ public class CompositeFlatFile implements CompositeAccess {
if (!consumeQueue.isInitialized()) {
consumeQueue.setBaseOffset(offset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
}
- dispatchOffset = offset;
+ dispatchOffset.set(offset);
}
@Override
@@ -323,14 +325,9 @@ public class CompositeFlatFile implements CompositeAccess {
return AppendResult.FILE_CLOSED;
}
- long queueOffset = MessageBufferUtil.getQueueOffset(message);
- if (dispatchOffset != queueOffset) {
- return AppendResult.OFFSET_INCORRECT;
- }
-
AppendResult result = commitLog.append(message, commit);
if (result == AppendResult.SUCCESS) {
- dispatchOffset = queueOffset + 1;
+ dispatchOffset.incrementAndGet();
}
return result;
}
@@ -483,14 +480,17 @@ public class CompositeFlatFile implements CompositeAccess {
}
public void destroy() {
- closed = true;
- commitLog.destroy();
- consumeQueue.destroy();
try {
+ closed = true;
+ compositeFlatFileLock.lock();
+ commitLog.destroy();
+ consumeQueue.destroy();
metadataStore.deleteFileSegment(filePath, FileSegmentType.COMMIT_LOG);
metadataStore.deleteFileSegment(filePath, FileSegmentType.CONSUME_QUEUE);
} catch (Exception e) {
- LOGGER.error("CompositeFlatFile#destroy: clean metadata failed: ", e);
+ LOGGER.error("CompositeFlatFile#destroy: delete file failed", e);
+ } finally {
+ compositeFlatFileLock.unlock();
}
}
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
index c0cf79069..f6c0afed0 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
@@ -64,7 +64,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) {
queueMetadata.setMaxOffset(queueMetadata.getMinOffset());
}
- this.dispatchOffset = queueMetadata.getMaxOffset();
+ this.dispatchOffset.set(queueMetadata.getMaxOffset());
}
public void persistMetadata() {
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 9735a535e..8322c72ed 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -73,8 +73,8 @@ public class CompositeQueueFlatFileTest {
CompositeQueueFlatFile flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq);
ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer();
AppendResult result = flatFile.appendCommitLog(message);
- Assert.assertEquals(AppendResult.OFFSET_INCORRECT, result);
- Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
+ Assert.assertEquals(AppendResult.SUCCESS, result);
+ Assert.assertEquals(122L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition());
flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq);
--
2.32.0.windows.2
From 70a66eda2c08eb5fca38356659cb6de1ac75e25e Mon Sep 17 00:00:00 2001
From: ShuangxiDing <dingshuangxi888@gmail.com>
Date: Fri, 14 Jul 2023 16:46:40 +0800
Subject: [PATCH 6/8] Fix LEAK: HAProxyMessage.release() was not called before
it's garbage-collected (#7025)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Call HAProxyMessage.release() after reading it.
---------
Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com>
---
.../grpc/ProxyAndTlsProtocolNegotiator.java | 52 ++++++++++---------
.../remoting/netty/NettyRemotingServer.java | 46 ++++++++--------
2 files changed, 53 insertions(+), 45 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
index ceb9becc0..ee167bd7b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
@@ -160,7 +160,7 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HAProxyMessage) {
- replaceEventWithMessage((HAProxyMessage) msg);
+ handleWithMessage((HAProxyMessage) msg);
ctx.fireUserEventTriggered(pne);
} else {
super.channelRead(ctx, msg);
@@ -174,30 +174,34 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator
*
* @param msg
*/
- private void replaceEventWithMessage(HAProxyMessage msg) {
- Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
- if (StringUtils.isNotBlank(msg.sourceAddress())) {
- builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress());
- }
- if (msg.sourcePort() > 0) {
- builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort()));
- }
- if (StringUtils.isNotBlank(msg.destinationAddress())) {
- builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress());
- }
- if (msg.destinationPort() > 0) {
- builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort()));
- }
- if (CollectionUtils.isNotEmpty(msg.tlvs())) {
- msg.tlvs().forEach(tlv -> {
- Attributes.Key<String> key = AttributeKeys.valueOf(
- HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
- builder.set(key, value);
- });
+ private void handleWithMessage(HAProxyMessage msg) {
+ try {
+ Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
+ if (StringUtils.isNotBlank(msg.sourceAddress())) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress());
+ }
+ if (msg.sourcePort() > 0) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort()));
+ }
+ if (StringUtils.isNotBlank(msg.destinationAddress())) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress());
+ }
+ if (msg.destinationPort() > 0) {
+ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort()));
+ }
+ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+ msg.tlvs().forEach(tlv -> {
+ Attributes.Key<String> key = AttributeKeys.valueOf(
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
+ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+ builder.set(key, value);
+ });
+ }
+ pne = InternalProtocolNegotiationEvent
+ .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
+ } finally {
+ msg.release();
}
- pne = InternalProtocolNegotiationEvent
- .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 8ae87a6fa..90e358ce3 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -758,7 +758,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HAProxyMessage) {
- fillChannelWithMessage((HAProxyMessage) msg, ctx.channel());
+ handleWithMessage((HAProxyMessage) msg, ctx.channel());
} else {
super.channelRead(ctx, msg);
}
@@ -771,26 +771,30 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
* @param msg
* @param channel
*/
- private void fillChannelWithMessage(HAProxyMessage msg, Channel channel) {
- if (StringUtils.isNotBlank(msg.sourceAddress())) {
- channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
- }
- if (msg.sourcePort() > 0) {
- channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
- }
- if (StringUtils.isNotBlank(msg.destinationAddress())) {
- channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
- }
- if (msg.destinationPort() > 0) {
- channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
- }
- if (CollectionUtils.isNotEmpty(msg.tlvs())) {
- msg.tlvs().forEach(tlv -> {
- AttributeKey<String> key = AttributeKeys.valueOf(
- HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
- channel.attr(key).set(value);
- });
+ private void handleWithMessage(HAProxyMessage msg, Channel channel) {
+ try {
+ if (StringUtils.isNotBlank(msg.sourceAddress())) {
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
+ }
+ if (msg.sourcePort() > 0) {
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
+ }
+ if (StringUtils.isNotBlank(msg.destinationAddress())) {
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
+ }
+ if (msg.destinationPort() > 0) {
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
+ }
+ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+ msg.tlvs().forEach(tlv -> {
+ AttributeKey<String> key = AttributeKeys.valueOf(
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
+ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+ channel.attr(key).set(value);
+ });
+ }
+ } finally {
+ msg.release();
}
}
}
--
2.32.0.windows.2
From 5914ff8dbb9d37e2cb48ef9c4f0256c6185b4659 Mon Sep 17 00:00:00 2001
From: lyx <56945247+lyx2000@users.noreply.github.com>
Date: Sat, 15 Jul 2023 18:15:57 +0800
Subject: [PATCH 7/8] [ISSUE #6968] fix grpc acl bug (#6969)
* feat(acl): fix acl bug
Signed-off-by: lyx <1419360299@qq.com>
# Conflicts:
# proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
* add access test for two client
Signed-off-by: lyx <1419360299@qq.com>
* use specific acl config
Signed-off-by: lyx <1419360299@qq.com>
* Recovering unchange file
Signed-off-by: lyx <1419360299@qq.com>
* let test pass
Signed-off-by: lyx <1419360299@qq.com>
---------
Signed-off-by: lyx <1419360299@qq.com>
---
.../acl/plain/PlainAccessResource.java | 21 +-
.../acl/RemotingClientAccessTest.java | 189 ++++++++++++++++++
.../access_acl_conf/acl/plain_acl.yml | 31 +++
acl/src/test/resources/conf/acl/plain_acl.yml | 1 -
pom.xml | 1 +
.../apache/rocketmq/proxy/ProxyStartup.java | 17 +-
.../proxy/grpc/GrpcServerBuilder.java | 10 +-
.../remoting/RemotingProtocolServer.java | 15 +-
8 files changed, 255 insertions(+), 30 deletions(-)
create mode 100644 acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java
create mode 100644 acl/src/test/resources/access_acl_conf/acl/plain_acl.yml
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
index cdbd9ea9b..72aa8ca71 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
@@ -223,7 +223,7 @@ public class PlainAccessResource implements AccessResource {
if (!request.hasGroup()) {
throw new AclException("Consumer heartbeat doesn't have group");
} else {
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
}
}
} else if (SendMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
@@ -240,15 +240,15 @@ public class PlainAccessResource implements AccessResource {
accessResource.addResourceAndPerm(topic, Permission.PUB);
} else if (ReceiveMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
ReceiveMessageRequest request = (ReceiveMessageRequest) messageV3;
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
accessResource.addResourceAndPerm(request.getMessageQueue().getTopic(), Permission.SUB);
} else if (AckMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
AckMessageRequest request = (AckMessageRequest) messageV3;
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
} else if (ForwardMessageToDeadLetterQueueRequest.getDescriptor().getFullName().equals(rpcFullName)) {
ForwardMessageToDeadLetterQueueRequest request = (ForwardMessageToDeadLetterQueueRequest) messageV3;
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
} else if (EndTransactionRequest.getDescriptor().getFullName().equals(rpcFullName)) {
EndTransactionRequest request = (EndTransactionRequest) messageV3;
@@ -264,7 +264,7 @@ public class PlainAccessResource implements AccessResource {
}
if (command.getSettings().hasSubscription()) {
Subscription subscription = command.getSettings().getSubscription();
- accessResource.addResourceAndPerm(subscription.getGroup(), Permission.SUB);
+ accessResource.addGroupResourceAndPerm(subscription.getGroup(), Permission.SUB);
for (SubscriptionEntry entry : subscription.getSubscriptionsList()) {
accessResource.addResourceAndPerm(entry.getTopic(), Permission.SUB);
}
@@ -275,17 +275,17 @@ public class PlainAccessResource implements AccessResource {
}
} else if (NotifyClientTerminationRequest.getDescriptor().getFullName().equals(rpcFullName)) {
NotifyClientTerminationRequest request = (NotifyClientTerminationRequest) messageV3;
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
} else if (QueryRouteRequest.getDescriptor().getFullName().equals(rpcFullName)) {
QueryRouteRequest request = (QueryRouteRequest) messageV3;
accessResource.addResourceAndPerm(request.getTopic(), Permission.ANY);
} else if (QueryAssignmentRequest.getDescriptor().getFullName().equals(rpcFullName)) {
QueryAssignmentRequest request = (QueryAssignmentRequest) messageV3;
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
} else if (ChangeInvisibleDurationRequest.getDescriptor().getFullName().equals(rpcFullName)) {
ChangeInvisibleDurationRequest request = (ChangeInvisibleDurationRequest) messageV3;
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
}
} catch (Throwable t) {
@@ -299,6 +299,11 @@ public class PlainAccessResource implements AccessResource {
addResourceAndPerm(resourceName, permission);
}
+ private void addGroupResourceAndPerm(Resource resource, byte permission) {
+ String resourceName = NamespaceUtil.wrapNamespace(resource.getResourceNamespace(), resource.getName());
+ addResourceAndPerm(getRetryTopic(resourceName), permission);
+ }
+
public static PlainAccessResource build(PlainAccessConfig plainAccessConfig, RemoteAddressStrategy remoteAddressStrategy) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setAccessKey(plainAccessConfig.getAccessKey());
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java b/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java
new file mode 100644
index 000000000..88c5e09a9
--- /dev/null
+++ b/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.acl.plain.AclTestHelper;
+import org.apache.rocketmq.acl.plain.PlainAccessResource;
+import org.apache.rocketmq.acl.plain.PlainAccessValidator;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemotingClientAccessTest {
+
+ private PlainAccessValidator plainAccessValidator;
+ private AclClientRPCHook aclClient;
+ private SessionCredentials sessionCredentials;
+
+ private File confHome;
+
+ private String clientAddress = "10.7.1.3";
+
+ @Before
+ public void init() throws IOException {
+ String folder = "access_acl_conf";
+ confHome = AclTestHelper.copyResources(folder, true);
+ System.setProperty("rocketmq.home.dir", confHome.getAbsolutePath());
+ System.setProperty("rocketmq.acl.plain.file", "/access_acl_conf/acl/plain_acl.yml".replace("/", File.separator));
+
+ plainAccessValidator = new PlainAccessValidator();
+ sessionCredentials = new SessionCredentials();
+ sessionCredentials.setAccessKey("rocketmq3");
+ sessionCredentials.setSecretKey("12345678");
+ aclClient = new AclClientRPCHook(sessionCredentials);
+ }
+
+ @After
+ public void cleanUp() {
+ AclTestHelper.recursiveDelete(confHome);
+ }
+
+ @Test(expected = AclException.class)
+ public void testProduceDenyTopic() {
+ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
+ messageRequestHeader.setTopic("topicD");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
+ aclClient.doBeforeRequest(clientAddress, remotingCommand);
+
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), clientAddress);
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+ Assert.fail("Should not throw IOException");
+ }
+ }
+
+ @Test
+ public void testProduceAuthorizedTopic() {
+ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
+ messageRequestHeader.setTopic("topicA");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
+ aclClient.doBeforeRequest(clientAddress, remotingCommand);
+
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), clientAddress);
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+ Assert.fail("Should not throw IOException");
+ }
+ }
+
+
+ @Test(expected = AclException.class)
+ public void testConsumeDenyTopic() {
+ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
+ pullMessageRequestHeader.setTopic("topicD");
+ pullMessageRequestHeader.setConsumerGroup("groupB");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+ Assert.fail("Should not throw IOException");
+ }
+
+ }
+
+ @Test
+ public void testConsumeAuthorizedTopic() {
+ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
+ pullMessageRequestHeader.setTopic("topicB");
+ pullMessageRequestHeader.setConsumerGroup("groupB");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+ Assert.fail("Should not throw IOException");
+ }
+ }
+
+ @Test(expected = AclException.class)
+ public void testConsumeInDeniedGroup() {
+ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
+ pullMessageRequestHeader.setTopic("topicB");
+ pullMessageRequestHeader.setConsumerGroup("groupD");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+ Assert.fail("Should not throw IOException");
+ }
+ }
+
+ @Test
+ public void testConsumeInAuthorizedGroup() {
+ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
+ pullMessageRequestHeader.setTopic("topicB");
+ pullMessageRequestHeader.setConsumerGroup("groupB");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+ Assert.fail("Should not throw IOException");
+ }
+ }
+
+}
diff --git a/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml b/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml
new file mode 100644
index 000000000..28a8c4888
--- /dev/null
+++ b/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+accounts:
+ - accessKey: rocketmq3
+ secretKey: 12345678
+ admin: false
+ defaultTopicPerm: DENY
+ defaultGroupPerm: DENY
+ topicPerms:
+ - topicA=PUB
+ - topicB=SUB
+ - topicC=PUB|SUB
+ - topicD=DENY
+ groupPerms:
+ - groupB=SUB
+ - groupC=PUB|SUB
+ - groupD=DENY
+
diff --git a/acl/src/test/resources/conf/acl/plain_acl.yml b/acl/src/test/resources/conf/acl/plain_acl.yml
index 5641a94bf..34e46696d 100644
--- a/acl/src/test/resources/conf/acl/plain_acl.yml
+++ b/acl/src/test/resources/conf/acl/plain_acl.yml
@@ -41,4 +41,3 @@ accounts:
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
-
diff --git a/pom.xml b/pom.xml
index 12bc2dbd5..4d5dd1dec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,6 +147,7 @@
<awaitility.version>4.1.0</awaitility.version>
<truth.version>0.30</truth.version>
<s3mock-junit4.version>2.11.0</s3mock-junit4.version>
+ <rocketmq-client-java.version>5.0.5</rocketmq-client-java.version>
<!-- Build plugin dependencies -->
<versions-maven-plugin.version>2.2</versions-maven-plugin.version>
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
index ea13bb808..06d5f4525 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
@@ -29,11 +29,14 @@ import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.AccessValidator;
+import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.common.utils.ServiceProvider;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
@@ -75,16 +78,17 @@ public class ProxyStartup {
MessagingProcessor messagingProcessor = createMessagingProcessor();
+ List<AccessValidator> accessValidators = loadAccessValidators();
// create grpcServer
GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor, ConfigurationManager.getProxyConfig().getGrpcServerPort())
.addService(createServiceProcessor(messagingProcessor))
.addService(ChannelzService.newInstance(100))
.addService(ProtoReflectionService.newInstance())
- .configInterceptor()
+ .configInterceptor(accessValidators)
.build();
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer);
- RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor);
+ RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor, accessValidators);
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer);
// start servers one by one.
@@ -109,6 +113,15 @@ public class ProxyStartup {
log.info(new Date() + " rocketmq-proxy startup successfully");
}
+ protected static List<AccessValidator> loadAccessValidators() {
+ List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
+ if (accessValidators.isEmpty()) {
+ log.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator");
+ accessValidators.add(new PlainAccessValidator());
+ }
+ return accessValidators;
+ }
+
protected static void initConfiguration(CommandLineArgument commandLineArgument) throws Exception {
if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) {
System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath());
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index 437b9216b..9cddd3013 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -28,9 +28,7 @@ import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
-import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.utils.ServiceProvider;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
@@ -98,14 +96,8 @@ public class GrpcServerBuilder {
return new GrpcServer(this.serverBuilder.build());
}
- public GrpcServerBuilder configInterceptor() {
+ public GrpcServerBuilder configInterceptor(List<AccessValidator> accessValidators) {
// grpc interceptors, including acl, logging etc.
- List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
- if (accessValidators.isEmpty()) {
- log.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator");
- accessValidators.add(new PlainAccessValidator());
- }
-
this.serverBuilder.intercept(new AuthenticationInterceptor(accessValidators));
this.serverBuilder
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index f08094c16..bcc9edd09 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.proxy.remoting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.Channel;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
@@ -28,15 +27,14 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
-import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
@@ -86,11 +84,11 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
protected final ThreadPoolExecutor defaultExecutor;
protected final ScheduledExecutorService timerExecutor;
- public RemotingProtocolServer(MessagingProcessor messagingProcessor) {
+ public RemotingProtocolServer(MessagingProcessor messagingProcessor, List<AccessValidator> accessValidators) {
this.messagingProcessor = messagingProcessor;
this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService());
- RequestPipeline pipeline = createRequestPipeline();
+ RequestPipeline pipeline = createRequestPipeline(accessValidators);
this.getTopicRouteActivity = new GetTopicRouteActivity(pipeline, messagingProcessor);
this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager);
this.consumerManagerActivity = new ConsumerManagerActivity(pipeline, messagingProcessor);
@@ -254,15 +252,12 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
return future;
}
- protected RequestPipeline createRequestPipeline() {
+ protected RequestPipeline createRequestPipeline(List<AccessValidator> accessValidators) {
RequestPipeline pipeline = (ctx, request, context) -> {
};
-
- List<AccessValidator> accessValidatorList = new ArrayList<>();
- accessValidatorList.add(new PlainAccessValidator());
// add pipeline
// the last pipe add will execute at the first
- return pipeline.pipe(new AuthenticationPipeline(accessValidatorList));
+ return pipeline.pipe(new AuthenticationPipeline(accessValidators));
}
protected class ThreadPoolHeadSlowTimeMillsMonitor implements ThreadPoolStatusMonitor {
--
2.32.0.windows.2
From 440be1ed4ce2af0ab58af6c3019de7075c09c20f Mon Sep 17 00:00:00 2001
From: fuyou001 <yubao.fyb@alibaba-inc.com>
Date: Mon, 17 Jul 2023 19:23:23 +0800
Subject: [PATCH 8/8] [ISSUE #7031] fix Pop caused broker memory leak bug
(#7032)
---
.../broker/processor/PopBufferMergeService.java | 17 ++++++++++++++++-
.../broker/processor/PopMessageProcessor.java | 11 ++++++-----
2 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index d7bc7c694..b7ba8ad4a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -429,9 +429,16 @@ public class PopBufferMergeService extends ServiceThread {
* @param nextBeginOffset
* @return
*/
- public void addCkJustOffset(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {
+ public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {
PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset, true);
+ if (this.buffer.containsKey(pointWrapper.getMergeKey())) {
+ // when mergeKey conflict
+ // will cause PopBufferMergeService.scanCommitOffset cannot poll PopCheckPointWrapper
+ POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ckJustOffset. ck:{}, mergeKey:{}", pointWrapper, pointWrapper.getMergeKey());
+ return false;
+ }
+
this.putCkToStore(pointWrapper, !checkQueueOk(pointWrapper));
putOffsetQueue(pointWrapper);
@@ -440,6 +447,7 @@ public class PopBufferMergeService extends ServiceThread {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]add ck just offset, {}", pointWrapper);
}
+ return true;
}
public void addCkMock(String group, String topic, int queueId, long startOffset, long invisibleTime,
@@ -492,6 +500,13 @@ public class PopBufferMergeService extends ServiceThread {
return false;
}
+ if (this.buffer.containsKey(pointWrapper.getMergeKey())) {
+ // when mergeKey conflict
+ // will cause PopBufferMergeService.scanCommitOffset cannot poll PopCheckPointWrapper
+ POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ck. ck:{}, mergeKey:{}", pointWrapper, pointWrapper.getMergeKey());
+ return false;
+ }
+
putOffsetQueue(pointWrapper);
this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);
this.counter.incrementAndGet();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 28549bfed..464f8f4fd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -570,7 +570,9 @@ public class PopMessageProcessor implements NettyRequestProcessor {
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(), topic, queueId, finalOffset);
} else {
- appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName());
+ if (!appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName())) {
+ return atomicRestNum.get() + result.getMessageCount();
+ }
}
ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, isRetry, queueId, finalOffset);
ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, isRetry, queueId,
@@ -685,7 +687,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
return msgInner;
}
- private void appendCheckPoint(final PopMessageRequestHeader requestHeader,
+ private boolean appendCheckPoint(final PopMessageRequestHeader requestHeader,
final String topic, final int reviveQid, final int queueId, final long offset,
final GetMessageResult getMessageTmpResult, final long popTime, final String brokerName) {
// add check point msg to revive log
@@ -708,10 +710,9 @@ public class PopMessageProcessor implements NettyRequestProcessor {
);
if (addBufferSuc) {
- return;
+ return true;
}
-
- this.popBufferMergeService.addCkJustOffset(
+ return this.popBufferMergeService.addCkJustOffset(
ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset()
);
}
--
2.32.0.windows.2