1539 lines
78 KiB
Diff
1539 lines
78 KiB
Diff
From 15c6889bb0abd014c06ef1452f791db9daa1ea08 Mon Sep 17 00:00:00 2001
|
|
From: lizhimins <707364882@qq.com>
|
|
Date: Tue, 11 Jul 2023 17:04:00 +0800
|
|
Subject: [PATCH 1/8] fix receive message activity attempt id not correct
|
|
(#7012)
|
|
|
|
fix receive message activity attempt id not correct
|
|
---
|
|
.../proxy/grpc/v2/consumer/ReceiveMessageActivity.java | 2 +-
|
|
.../proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java | 6 +++---
|
|
2 files changed, 4 insertions(+), 4 deletions(-)
|
|
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
|
|
index a504179a9..cf58bb87a 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
|
|
@@ -130,7 +130,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity {
|
|
subscriptionData,
|
|
fifo,
|
|
new PopMessageResultFilterImpl(maxAttempts),
|
|
- request.getAttemptId(),
|
|
+ request.hasAttemptId() ? request.getAttemptId() : null,
|
|
timeRemaining
|
|
).thenAccept(popResult -> {
|
|
if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
|
|
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
|
|
index 2e562504a..7fd9a9ffd 100644
|
|
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
|
|
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
|
|
@@ -57,6 +57,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
|
|
import static org.mockito.ArgumentMatchers.anyInt;
|
|
import static org.mockito.ArgumentMatchers.anyLong;
|
|
import static org.mockito.ArgumentMatchers.anyString;
|
|
+import static org.mockito.ArgumentMatchers.isNull;
|
|
import static org.mockito.Mockito.doNothing;
|
|
import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.when;
|
|
@@ -89,7 +90,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
|
|
.setRequestTimeout(Durations.fromSeconds(3))
|
|
.build());
|
|
when(this.messagingProcessor.popMessage(any(), any(), anyString(), anyString(), anyInt(), anyLong(),
|
|
- pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), anyString(), anyLong()))
|
|
+ pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong()))
|
|
.thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList())));
|
|
|
|
|
|
@@ -223,7 +224,6 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
|
|
assertEquals(Code.ILLEGAL_INVISIBLE_TIME, getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor.getAllValues()));
|
|
}
|
|
|
|
-
|
|
@Test
|
|
public void testReceiveMessage() {
|
|
StreamObserver<ReceiveMessageResponse> receiveStreamObserver = mock(ServerCallStreamObserver.class);
|
|
@@ -245,7 +245,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
|
|
any(),
|
|
anyBoolean(),
|
|
any(),
|
|
- anyString(),
|
|
+ isNull(),
|
|
anyLong())).thenReturn(CompletableFuture.completedFuture(popResult));
|
|
|
|
this.receiveMessageActivity.receiveMessage(
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From b4496be68705c1c0b282a07a1adeab4fffd670fe Mon Sep 17 00:00:00 2001
|
|
From: ShuangxiDing <dingshuangxi888@gmail.com>
|
|
Date: Tue, 11 Jul 2023 19:09:09 +0800
|
|
Subject: [PATCH 2/8] [ISSUE #7010] Fix the HandshakeHandler returns when
|
|
detect haproxy version need more data (#7011)
|
|
MIME-Version: 1.0
|
|
Content-Type: text/plain; charset=UTF-8
|
|
Content-Transfer-Encoding: 8bit
|
|
|
|
* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator
|
|
|
|
* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator
|
|
|
|
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
|
|
|
|
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
|
|
|
|
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
|
|
|
|
* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
|
|
|
|
* Support proxy protocol for gRPC server.
|
|
|
|
* Support proxy protocol for gRPC server.
|
|
|
|
* Support proxy protocol for gRPC server.
|
|
|
|
* Support proxy protocol for gRPC server.
|
|
|
|
* Support proxy protocol for gRPC server.
|
|
|
|
* Support proxy protocol for gRPC and Remoting server.
|
|
|
|
* 回滚netty的升级
|
|
|
|
* Support proxy protocol for gRPC and Remoting server.
|
|
|
|
* Support proxy protocol for gRPC and Remoting server.
|
|
|
|
* Support proxy protocol for gRPC and Remoting server.
|
|
|
|
* add grpc-netty-codec-haproxy in bazel
|
|
|
|
* add grpc-netty-codec-haproxy in bazel
|
|
|
|
* Support proxy protocol for gRPC and Remoting server.
|
|
|
|
* Fix Test
|
|
|
|
* add grpc-netty-codec-haproxy in bazel
|
|
|
|
* add ProxyProtocolTest for Remoting
|
|
|
|
* Move AttributeKey from RemotingHelper to AttributeKey.
|
|
|
|
* Fix the needs more data for HandshakeHandler.
|
|
|
|
* Fix the needs more data for HandshakeHandler.
|
|
|
|
* Fix the needs more data for HandshakeHandler.
|
|
|
|
* Fix the needs more data for HandshakeHandler.
|
|
|
|
---------
|
|
|
|
Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com>
|
|
---
|
|
.../remoting/MultiProtocolRemotingServer.java | 2 +-
|
|
.../activity/AbstractRemotingActivity.java | 16 +++++++------
|
|
.../activity/ClientManagerActivity.java | 24 ++++++++++---------
|
|
.../AbstractRemotingActivityTest.java | 10 ++++----
|
|
.../remoting/common/RemotingHelper.java | 21 ++++------------
|
|
.../remoting/netty/AttributeKeys.java | 11 ++++++++-
|
|
.../remoting/netty/NettyRemotingServer.java | 23 ++++++------------
|
|
7 files changed, 51 insertions(+), 56 deletions(-)
|
|
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
|
|
index 858b1f022..12d728fff 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
|
|
@@ -78,7 +78,7 @@ public class MultiProtocolRemotingServer extends NettyRemotingServer {
|
|
@Override
|
|
protected ChannelPipeline configChannel(SocketChannel ch) {
|
|
return ch.pipeline()
|
|
- .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, this.getHandshakeHandler())
|
|
+ .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
|
|
.addLast(this.getDefaultEventExecutorGroup(),
|
|
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
|
|
new ProtocolNegotiationHandler(this.remotingProtocolHandler)
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
|
|
index 78cd203ec..ce4a63397 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
|
|
@@ -19,9 +19,6 @@ package org.apache.rocketmq.proxy.remoting.activity;
|
|
|
|
import io.netty.channel.Channel;
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
-import java.util.HashMap;
|
|
-import java.util.Map;
|
|
-import java.util.Optional;
|
|
import org.apache.rocketmq.acl.common.AclException;
|
|
import org.apache.rocketmq.client.exception.MQBrokerException;
|
|
import org.apache.rocketmq.client.exception.MQClientException;
|
|
@@ -41,11 +38,16 @@ import org.apache.rocketmq.proxy.processor.MessagingProcessor;
|
|
import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
|
|
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
|
|
import org.apache.rocketmq.remoting.common.RemotingHelper;
|
|
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
|
|
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
|
|
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
import org.apache.rocketmq.remoting.protocol.RequestCode;
|
|
import org.apache.rocketmq.remoting.protocol.ResponseCode;
|
|
|
|
+import java.util.HashMap;
|
|
+import java.util.Map;
|
|
+import java.util.Optional;
|
|
+
|
|
public abstract class AbstractRemotingActivity implements NettyRequestProcessor {
|
|
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
|
|
protected final MessagingProcessor messagingProcessor;
|
|
@@ -126,13 +128,13 @@ public abstract class AbstractRemotingActivity implements NettyRequestProcessor
|
|
.setProtocolType(ChannelProtocolType.REMOTING.getName())
|
|
.setChannel(channel)
|
|
.setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress()))
|
|
- .setRemoteAddress(NetworkUtil.socketAddress2String(ctx.channel().remoteAddress()));
|
|
+ .setRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
|
|
|
|
- Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.LANGUAGE_CODE_KEY, channel))
|
|
+ Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY, channel))
|
|
.ifPresent(language -> context.setLanguage(language.name()));
|
|
- Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.CLIENT_ID_KEY, channel))
|
|
+ Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY, channel))
|
|
.ifPresent(context::setClientID);
|
|
- Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.VERSION_KEY, channel))
|
|
+ Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, channel))
|
|
.ifPresent(version -> context.setClientVersion(MQVersion.getVersionDesc(version)));
|
|
|
|
return context;
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
|
|
index 1eb81ce92..c671593a3 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
|
|
@@ -19,13 +19,20 @@ package org.apache.rocketmq.proxy.remoting.activity;
|
|
|
|
import io.netty.channel.Channel;
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
-import java.util.Set;
|
|
import org.apache.rocketmq.broker.client.ClientChannelInfo;
|
|
import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
|
|
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
|
|
import org.apache.rocketmq.broker.client.ProducerChangeListener;
|
|
import org.apache.rocketmq.broker.client.ProducerGroupEvent;
|
|
+import org.apache.rocketmq.proxy.common.ProxyContext;
|
|
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
|
|
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
|
|
+import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
|
|
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
|
|
import org.apache.rocketmq.remoting.common.RemotingHelper;
|
|
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
|
|
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
|
|
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
import org.apache.rocketmq.remoting.protocol.RequestCode;
|
|
import org.apache.rocketmq.remoting.protocol.ResponseCode;
|
|
import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
|
|
@@ -33,13 +40,8 @@ import org.apache.rocketmq.remoting.protocol.header.UnregisterClientResponseHead
|
|
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
|
|
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
|
|
import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData;
|
|
-import org.apache.rocketmq.proxy.common.ProxyContext;
|
|
-import org.apache.rocketmq.proxy.processor.MessagingProcessor;
|
|
-import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
|
|
-import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
|
|
-import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
|
|
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
|
|
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
+
|
|
+import java.util.Set;
|
|
|
|
public class ClientManagerActivity extends AbstractRemotingActivity {
|
|
|
|
@@ -108,9 +110,9 @@ public class ClientManagerActivity extends AbstractRemotingActivity {
|
|
if (channel instanceof RemotingChannel) {
|
|
RemotingChannel remotingChannel = (RemotingChannel) channel;
|
|
Channel parent = remotingChannel.parent();
|
|
- RemotingHelper.setPropertyToAttr(parent, RemotingHelper.CLIENT_ID_KEY, clientChannelInfo.getClientId());
|
|
- RemotingHelper.setPropertyToAttr(parent, RemotingHelper.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage());
|
|
- RemotingHelper.setPropertyToAttr(parent, RemotingHelper.VERSION_KEY, clientChannelInfo.getVersion());
|
|
+ RemotingHelper.setPropertyToAttr(parent, AttributeKeys.CLIENT_ID_KEY, clientChannelInfo.getClientId());
|
|
+ RemotingHelper.setPropertyToAttr(parent, AttributeKeys.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage());
|
|
+ RemotingHelper.setPropertyToAttr(parent, AttributeKeys.VERSION_KEY, clientChannelInfo.getVersion());
|
|
}
|
|
|
|
}
|
|
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
|
|
index 663a83e3c..b2bd3a35f 100644
|
|
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
|
|
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
|
|
@@ -21,7 +21,6 @@ import io.netty.channel.Channel;
|
|
import io.netty.channel.ChannelFuture;
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
import io.netty.channel.ChannelPromise;
|
|
-import java.util.concurrent.CompletableFuture;
|
|
import org.apache.rocketmq.acl.common.AclException;
|
|
import org.apache.rocketmq.client.exception.MQBrokerException;
|
|
import org.apache.rocketmq.client.exception.MQClientException;
|
|
@@ -35,6 +34,7 @@ import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
|
|
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
|
|
import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
|
|
import org.apache.rocketmq.remoting.common.RemotingHelper;
|
|
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
|
|
import org.apache.rocketmq.remoting.protocol.LanguageCode;
|
|
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
import org.apache.rocketmq.remoting.protocol.RequestCode;
|
|
@@ -48,6 +48,8 @@ import org.mockito.Mock;
|
|
import org.mockito.Spy;
|
|
import org.mockito.junit.MockitoJUnitRunner;
|
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
+
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
import static org.mockito.ArgumentMatchers.anyLong;
|
|
@@ -82,9 +84,9 @@ public class AbstractRemotingActivityTest extends InitConfigTest {
|
|
}
|
|
};
|
|
Channel channel = ctx.channel();
|
|
- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.CLIENT_ID_KEY, CLIENT_ID);
|
|
- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.LANGUAGE_CODE_KEY, LanguageCode.JAVA);
|
|
- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.VERSION_KEY, MQVersion.CURRENT_VERSION);
|
|
+ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.CLIENT_ID_KEY, CLIENT_ID);
|
|
+ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.LANGUAGE_CODE_KEY, LanguageCode.JAVA);
|
|
+ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.VERSION_KEY, MQVersion.CURRENT_VERSION);
|
|
}
|
|
|
|
@Test
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
|
|
index d0750b678..363b22eac 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
|
|
@@ -22,7 +22,6 @@ import io.netty.channel.ChannelFutureListener;
|
|
import io.netty.util.Attribute;
|
|
import io.netty.util.AttributeKey;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
-import org.apache.rocketmq.common.constant.HAProxyConstants;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.utils.NetworkUtil;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
@@ -31,8 +30,8 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
|
|
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
|
|
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
|
|
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
|
|
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
|
|
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
|
|
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
|
|
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
import org.apache.rocketmq.remoting.protocol.RequestCode;
|
|
import org.apache.rocketmq.remoting.protocol.ResponseCode;
|
|
@@ -51,16 +50,6 @@ public class RemotingHelper {
|
|
public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
|
|
- private static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr");
|
|
-
|
|
- private static final AttributeKey<String> PROXY_PROTOCOL_ADDR = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
|
|
- private static final AttributeKey<String> PROXY_PROTOCOL_PORT = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
|
|
-
|
|
- public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId");
|
|
-
|
|
- public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version");
|
|
-
|
|
- public static final AttributeKey<LanguageCode> LANGUAGE_CODE_KEY = AttributeKey.valueOf("LanguageCode");
|
|
|
|
public static final Map<Integer, String> REQUEST_CODE_MAP = new HashMap<Integer, String>() {
|
|
{
|
|
@@ -213,7 +202,7 @@ public class RemotingHelper {
|
|
if (StringUtils.isNotBlank(addr)) {
|
|
return addr;
|
|
}
|
|
- Attribute<String> att = channel.attr(REMOTE_ADDR_KEY);
|
|
+ Attribute<String> att = channel.attr(AttributeKeys.REMOTE_ADDR_KEY);
|
|
if (att == null) {
|
|
// mocked in unit test
|
|
return parseChannelRemoteAddr0(channel);
|
|
@@ -227,11 +216,11 @@ public class RemotingHelper {
|
|
}
|
|
|
|
private static String getProxyProtocolAddress(Channel channel) {
|
|
- if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) {
|
|
+ if (!channel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
|
|
return null;
|
|
}
|
|
- String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR, channel);
|
|
- String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT, channel);
|
|
+ String proxyProtocolAddr = getAttributeValue(AttributeKeys.PROXY_PROTOCOL_ADDR, channel);
|
|
+ String proxyProtocolPort = getAttributeValue(AttributeKeys.PROXY_PROTOCOL_PORT, channel);
|
|
if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort == null) {
|
|
return null;
|
|
}
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
|
|
index 4e69ab82d..ebdde31f4 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
|
|
@@ -19,12 +19,21 @@ package org.apache.rocketmq.remoting.netty;
|
|
|
|
import io.netty.util.AttributeKey;
|
|
import org.apache.rocketmq.common.constant.HAProxyConstants;
|
|
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
|
|
|
|
import java.util.Map;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
public class AttributeKeys {
|
|
|
|
+ public static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr");
|
|
+
|
|
+ public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId");
|
|
+
|
|
+ public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version");
|
|
+
|
|
+ public static final AttributeKey<LanguageCode> LANGUAGE_CODE_KEY = AttributeKey.valueOf("LanguageCode");
|
|
+
|
|
public static final AttributeKey<String> PROXY_PROTOCOL_ADDR =
|
|
AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
|
|
|
|
@@ -40,6 +49,6 @@ public class AttributeKeys {
|
|
private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP = new ConcurrentHashMap<>();
|
|
|
|
public static AttributeKey<String> valueOf(String name) {
|
|
- return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf);
|
|
+ return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKey::valueOf);
|
|
}
|
|
}
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
index 445f06cc6..8ae87a6fa 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
@@ -37,6 +37,7 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
|
|
import io.netty.channel.nio.NioEventLoopGroup;
|
|
import io.netty.channel.socket.SocketChannel;
|
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
|
+import io.netty.handler.codec.ByteToMessageDecoder;
|
|
import io.netty.handler.codec.ProtocolDetectionResult;
|
|
import io.netty.handler.codec.ProtocolDetectionState;
|
|
import io.netty.handler.codec.haproxy.HAProxyMessage;
|
|
@@ -73,6 +74,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
import java.io.IOException;
|
|
import java.net.InetSocketAddress;
|
|
import java.security.cert.CertificateException;
|
|
+import java.util.List;
|
|
import java.util.NoSuchElementException;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
@@ -115,7 +117,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
|
|
|
|
// sharable handlers
|
|
- private HandshakeHandler handshakeHandler;
|
|
+ private TlsModeHandler tlsModeHandler;
|
|
private NettyEncoder encoder;
|
|
private NettyConnectManageHandler connectionManageHandler;
|
|
private NettyServerHandler serverHandler;
|
|
@@ -265,7 +267,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
*/
|
|
protected ChannelPipeline configChannel(SocketChannel ch) {
|
|
return ch.pipeline()
|
|
- .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
|
|
+ .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
|
|
.addLast(defaultEventExecutorGroup,
|
|
encoder,
|
|
new NettyDecoder(),
|
|
@@ -402,7 +404,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
}
|
|
|
|
private void prepareSharableHandlers() {
|
|
- handshakeHandler = new HandshakeHandler();
|
|
+ tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
|
|
encoder = new NettyEncoder();
|
|
connectionManageHandler = new NettyConnectManageHandler();
|
|
serverHandler = new NettyServerHandler();
|
|
@@ -429,10 +431,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
return defaultEventExecutorGroup;
|
|
}
|
|
|
|
- public HandshakeHandler getHandshakeHandler() {
|
|
- return handshakeHandler;
|
|
- }
|
|
-
|
|
public NettyEncoder getEncoder() {
|
|
return encoder;
|
|
}
|
|
@@ -449,17 +447,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
return distributionHandler;
|
|
}
|
|
|
|
- @ChannelHandler.Sharable
|
|
- public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
|
-
|
|
- private final TlsModeHandler tlsModeHandler;
|
|
+ public class HandshakeHandler extends ByteToMessageDecoder {
|
|
|
|
public HandshakeHandler() {
|
|
- tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
|
|
}
|
|
|
|
@Override
|
|
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
|
|
+ protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
|
|
try {
|
|
ProtocolDetectionResult<HAProxyProtocolVersion> detectionResult = HAProxyMessageDecoder.detectProtocol(byteBuf);
|
|
if (detectionResult.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
|
|
@@ -479,9 +473,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
} catch (NoSuchElementException e) {
|
|
log.error("Error while removing HandshakeHandler", e);
|
|
}
|
|
-
|
|
- // Hand over this message to the next .
|
|
- ctx.fireChannelRead(byteBuf.retain());
|
|
} catch (Exception e) {
|
|
log.error("process proxy protocol negotiator failed.", e);
|
|
throw e;
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 1f0f3b2d6d16de7b315c702d33f7d3557c0fc25c Mon Sep 17 00:00:00 2001
|
|
From: Ji Juntao <juntao.jjt@alibaba-inc.com>
|
|
Date: Tue, 11 Jul 2023 21:13:06 +0800
|
|
Subject: [PATCH 3/8] [ISSUE #7013] Polish ColdDataCheckService's logic (#7014)
|
|
|
|
* polish coldCtrl
|
|
|
|
* remove the catch.
|
|
---
|
|
.../src/main/java/org/apache/rocketmq/store/CommitLog.java | 7 ++++++-
|
|
1 file changed, 6 insertions(+), 1 deletion(-)
|
|
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
index 5a5c90c5a..e6ee3bacc 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
@@ -2089,6 +2089,11 @@ public class CommitLog implements Swappable {
|
|
} else {
|
|
this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs());
|
|
}
|
|
+
|
|
+ if (pageSize < 0) {
|
|
+ initPageSize();
|
|
+ }
|
|
+
|
|
long beginClockTimestamp = this.systemClock.now();
|
|
scanFilesInPageCache();
|
|
long costTime = this.systemClock.now() - beginClockTimestamp;
|
|
@@ -2182,7 +2187,7 @@ public class CommitLog implements Swappable {
|
|
}
|
|
|
|
private void initPageSize() {
|
|
- if (pageSize < 0) {
|
|
+ if (pageSize < 0 && defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
|
|
try {
|
|
if (!MixAll.isWindows()) {
|
|
pageSize = LibC.INSTANCE.getpagesize();
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From d206590692bfdffca6bc58327e9533bc4bb68122 Mon Sep 17 00:00:00 2001
|
|
From: Lei Zhiyuan <leizhiyuan@gmail.com>
|
|
Date: Thu, 13 Jul 2023 11:17:38 +0800
|
|
Subject: [PATCH 4/8] [ISSUE #6979] Fix opaque will be duplicate in multi
|
|
client scene (#6985)
|
|
|
|
---
|
|
.../proxy/processor/DefaultMessagingProcessor.java | 14 ++++++++++++--
|
|
1 file changed, 12 insertions(+), 2 deletions(-)
|
|
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
|
|
index 1b3f0af4e..188cb7b9b 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
|
|
@@ -235,13 +235,23 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
|
|
@Override
|
|
public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
|
|
long timeoutMillis) {
|
|
- return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis);
|
|
+ int originalRequestOpaque = request.getOpaque();
|
|
+ request.setOpaque(RemotingCommand.createNewRequestId());
|
|
+ return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis).thenApply(r -> {
|
|
+ request.setOpaque(originalRequestOpaque);
|
|
+ return r;
|
|
+ });
|
|
}
|
|
|
|
@Override
|
|
public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
|
|
long timeoutMillis) {
|
|
- return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis);
|
|
+ int originalRequestOpaque = request.getOpaque();
|
|
+ request.setOpaque(RemotingCommand.createNewRequestId());
|
|
+ return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis).thenApply(r -> {
|
|
+ request.setOpaque(originalRequestOpaque);
|
|
+ return r;
|
|
+ });
|
|
}
|
|
|
|
@Override
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 33cb22e1c0fa7ba980567117230fe443ff5dbd62 Mon Sep 17 00:00:00 2001
|
|
From: lizhimins <707364882@qq.com>
|
|
Date: Thu, 13 Jul 2023 15:37:24 +0800
|
|
Subject: [PATCH 5/8] [ISSUE #7018] fix append in tiered storage when message
|
|
offset incorrect (#7019)
|
|
|
|
* fix append in tiered storage when message offset incorrect
|
|
---
|
|
.../tieredstore/TieredDispatcher.java | 25 ++++++++++------
|
|
.../tieredstore/file/CompositeFlatFile.java | 30 +++++++++----------
|
|
.../file/CompositeQueueFlatFile.java | 2 +-
|
|
.../file/CompositeQueueFlatFileTest.java | 4 +--
|
|
4 files changed, 34 insertions(+), 27 deletions(-)
|
|
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
|
index 2a8e2ed71..6584b0e89 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
|
|
@@ -308,9 +308,18 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
|
|
doRedispatchRequestToWriteMap(
|
|
result, flatFile, dispatchOffset, newCommitLogOffset, size, tagCode, message.getByteBuffer());
|
|
message.release();
|
|
- if (result != AppendResult.SUCCESS) {
|
|
- dispatchOffset--;
|
|
- break;
|
|
+
|
|
+ switch (result) {
|
|
+ case SUCCESS:
|
|
+ continue;
|
|
+ case FILE_CLOSED:
|
|
+ tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
|
|
+ logger.info("TieredDispatcher#dispatchFlatFile: file has been close and destroy, " +
|
|
+ "topic: {}, queueId: {}", topic, queueId);
|
|
+ return;
|
|
+ default:
|
|
+ dispatchOffset--;
|
|
+ break;
|
|
}
|
|
}
|
|
|
|
@@ -341,15 +350,13 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
|
|
|
|
switch (result) {
|
|
case SUCCESS:
|
|
- break;
|
|
- case OFFSET_INCORRECT:
|
|
long offset = MessageBufferUtil.getQueueOffset(message);
|
|
if (queueOffset != offset) {
|
|
- logger.error("[Bug] Commitlog offset incorrect, " +
|
|
- "result={}, topic={}, queueId={}, offset={}, msg offset={}",
|
|
- result, topic, queueId, queueOffset, offset);
|
|
+ logger.error("Message cq offset in commitlog does not meet expectations, " +
|
|
+ "result={}, topic={}, queueId={}, cq offset={}, msg offset={}",
|
|
+ AppendResult.OFFSET_INCORRECT, topic, queueId, queueOffset, offset);
|
|
}
|
|
- return;
|
|
+ break;
|
|
case BUFFER_FULL:
|
|
logger.debug("Commitlog buffer full, result={}, topic={}, queueId={}, offset={}",
|
|
result, topic, queueId, queueOffset);
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
|
|
index 1243f7721..8f8ba98b1 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
|
|
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.TimeUnit;
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.tuple.Pair;
|
|
@@ -58,7 +59,7 @@ public class CompositeFlatFile implements CompositeAccess {
|
|
* dispatched to the current chunk, indicating the progress of the message distribution.
|
|
* It's consume queue current offset.
|
|
*/
|
|
- protected volatile long dispatchOffset;
|
|
+ protected final AtomicLong dispatchOffset;
|
|
|
|
protected final ReentrantLock compositeFlatFileLock;
|
|
protected final TieredMessageStoreConfig storeConfig;
|
|
@@ -75,6 +76,7 @@ public class CompositeFlatFile implements CompositeAccess {
|
|
this.storeConfig = fileQueueFactory.getStoreConfig();
|
|
this.readAheadFactor = this.storeConfig.getReadAheadMinFactor();
|
|
this.metadataStore = TieredStoreUtil.getMetadataStore(this.storeConfig);
|
|
+ this.dispatchOffset = new AtomicLong();
|
|
this.compositeFlatFileLock = new ReentrantLock();
|
|
this.inFlightRequestMap = new ConcurrentHashMap<>();
|
|
this.commitLog = new TieredCommitLog(fileQueueFactory, filePath);
|
|
@@ -83,8 +85,8 @@ public class CompositeFlatFile implements CompositeAccess {
|
|
}
|
|
|
|
protected void recoverMetadata() {
|
|
- if (!consumeQueue.isInitialized() && this.dispatchOffset != -1) {
|
|
- consumeQueue.setBaseOffset(this.dispatchOffset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
|
|
+ if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) {
|
|
+ consumeQueue.setBaseOffset(this.dispatchOffset.get() * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
|
|
}
|
|
}
|
|
|
|
@@ -144,7 +146,7 @@ public class CompositeFlatFile implements CompositeAccess {
|
|
}
|
|
|
|
public long getDispatchOffset() {
|
|
- return dispatchOffset;
|
|
+ return dispatchOffset.get();
|
|
}
|
|
|
|
@Override
|
|
@@ -309,7 +311,7 @@ public class CompositeFlatFile implements CompositeAccess {
|
|
if (!consumeQueue.isInitialized()) {
|
|
consumeQueue.setBaseOffset(offset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
|
|
}
|
|
- dispatchOffset = offset;
|
|
+ dispatchOffset.set(offset);
|
|
}
|
|
|
|
@Override
|
|
@@ -323,14 +325,9 @@ public class CompositeFlatFile implements CompositeAccess {
|
|
return AppendResult.FILE_CLOSED;
|
|
}
|
|
|
|
- long queueOffset = MessageBufferUtil.getQueueOffset(message);
|
|
- if (dispatchOffset != queueOffset) {
|
|
- return AppendResult.OFFSET_INCORRECT;
|
|
- }
|
|
-
|
|
AppendResult result = commitLog.append(message, commit);
|
|
if (result == AppendResult.SUCCESS) {
|
|
- dispatchOffset = queueOffset + 1;
|
|
+ dispatchOffset.incrementAndGet();
|
|
}
|
|
return result;
|
|
}
|
|
@@ -483,14 +480,17 @@ public class CompositeFlatFile implements CompositeAccess {
|
|
}
|
|
|
|
public void destroy() {
|
|
- closed = true;
|
|
- commitLog.destroy();
|
|
- consumeQueue.destroy();
|
|
try {
|
|
+ closed = true;
|
|
+ compositeFlatFileLock.lock();
|
|
+ commitLog.destroy();
|
|
+ consumeQueue.destroy();
|
|
metadataStore.deleteFileSegment(filePath, FileSegmentType.COMMIT_LOG);
|
|
metadataStore.deleteFileSegment(filePath, FileSegmentType.CONSUME_QUEUE);
|
|
} catch (Exception e) {
|
|
- LOGGER.error("CompositeFlatFile#destroy: clean metadata failed: ", e);
|
|
+ LOGGER.error("CompositeFlatFile#destroy: delete file failed", e);
|
|
+ } finally {
|
|
+ compositeFlatFileLock.unlock();
|
|
}
|
|
}
|
|
}
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
|
|
index c0cf79069..f6c0afed0 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
|
|
@@ -64,7 +64,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
|
|
if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) {
|
|
queueMetadata.setMaxOffset(queueMetadata.getMinOffset());
|
|
}
|
|
- this.dispatchOffset = queueMetadata.getMaxOffset();
|
|
+ this.dispatchOffset.set(queueMetadata.getMaxOffset());
|
|
}
|
|
|
|
public void persistMetadata() {
|
|
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
|
|
index 9735a535e..8322c72ed 100644
|
|
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
|
|
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
|
|
@@ -73,8 +73,8 @@ public class CompositeQueueFlatFileTest {
|
|
CompositeQueueFlatFile flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq);
|
|
ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer();
|
|
AppendResult result = flatFile.appendCommitLog(message);
|
|
- Assert.assertEquals(AppendResult.OFFSET_INCORRECT, result);
|
|
- Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
|
|
+ Assert.assertEquals(AppendResult.SUCCESS, result);
|
|
+ Assert.assertEquals(122L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
|
|
Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition());
|
|
|
|
flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq);
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 70a66eda2c08eb5fca38356659cb6de1ac75e25e Mon Sep 17 00:00:00 2001
|
|
From: ShuangxiDing <dingshuangxi888@gmail.com>
|
|
Date: Fri, 14 Jul 2023 16:46:40 +0800
|
|
Subject: [PATCH 6/8] Fix LEAK: HAProxyMessage.release() was not called before
|
|
it's garbage-collected (#7025)
|
|
MIME-Version: 1.0
|
|
Content-Type: text/plain; charset=UTF-8
|
|
Content-Transfer-Encoding: 8bit
|
|
|
|
Call HAProxyMessage.release() after reading it.
|
|
|
|
---------
|
|
|
|
Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com>
|
|
---
|
|
.../grpc/ProxyAndTlsProtocolNegotiator.java | 52 ++++++++++---------
|
|
.../remoting/netty/NettyRemotingServer.java | 46 ++++++++--------
|
|
2 files changed, 53 insertions(+), 45 deletions(-)
|
|
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
|
|
index ceb9becc0..ee167bd7b 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
|
|
@@ -160,7 +160,7 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator
|
|
@Override
|
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
if (msg instanceof HAProxyMessage) {
|
|
- replaceEventWithMessage((HAProxyMessage) msg);
|
|
+ handleWithMessage((HAProxyMessage) msg);
|
|
ctx.fireUserEventTriggered(pne);
|
|
} else {
|
|
super.channelRead(ctx, msg);
|
|
@@ -174,30 +174,34 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator
|
|
*
|
|
* @param msg
|
|
*/
|
|
- private void replaceEventWithMessage(HAProxyMessage msg) {
|
|
- Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
|
|
- if (StringUtils.isNotBlank(msg.sourceAddress())) {
|
|
- builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress());
|
|
- }
|
|
- if (msg.sourcePort() > 0) {
|
|
- builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort()));
|
|
- }
|
|
- if (StringUtils.isNotBlank(msg.destinationAddress())) {
|
|
- builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress());
|
|
- }
|
|
- if (msg.destinationPort() > 0) {
|
|
- builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort()));
|
|
- }
|
|
- if (CollectionUtils.isNotEmpty(msg.tlvs())) {
|
|
- msg.tlvs().forEach(tlv -> {
|
|
- Attributes.Key<String> key = AttributeKeys.valueOf(
|
|
- HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
|
|
- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
|
|
- builder.set(key, value);
|
|
- });
|
|
+ private void handleWithMessage(HAProxyMessage msg) {
|
|
+ try {
|
|
+ Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
|
|
+ if (StringUtils.isNotBlank(msg.sourceAddress())) {
|
|
+ builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress());
|
|
+ }
|
|
+ if (msg.sourcePort() > 0) {
|
|
+ builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort()));
|
|
+ }
|
|
+ if (StringUtils.isNotBlank(msg.destinationAddress())) {
|
|
+ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress());
|
|
+ }
|
|
+ if (msg.destinationPort() > 0) {
|
|
+ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort()));
|
|
+ }
|
|
+ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
|
|
+ msg.tlvs().forEach(tlv -> {
|
|
+ Attributes.Key<String> key = AttributeKeys.valueOf(
|
|
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
|
|
+ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
|
|
+ builder.set(key, value);
|
|
+ });
|
|
+ }
|
|
+ pne = InternalProtocolNegotiationEvent
|
|
+ .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
|
|
+ } finally {
|
|
+ msg.release();
|
|
}
|
|
- pne = InternalProtocolNegotiationEvent
|
|
- .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
|
|
}
|
|
}
|
|
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
index 8ae87a6fa..90e358ce3 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
@@ -758,7 +758,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
@Override
|
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
if (msg instanceof HAProxyMessage) {
|
|
- fillChannelWithMessage((HAProxyMessage) msg, ctx.channel());
|
|
+ handleWithMessage((HAProxyMessage) msg, ctx.channel());
|
|
} else {
|
|
super.channelRead(ctx, msg);
|
|
}
|
|
@@ -771,26 +771,30 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
* @param msg
|
|
* @param channel
|
|
*/
|
|
- private void fillChannelWithMessage(HAProxyMessage msg, Channel channel) {
|
|
- if (StringUtils.isNotBlank(msg.sourceAddress())) {
|
|
- channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
|
|
- }
|
|
- if (msg.sourcePort() > 0) {
|
|
- channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
|
|
- }
|
|
- if (StringUtils.isNotBlank(msg.destinationAddress())) {
|
|
- channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
|
|
- }
|
|
- if (msg.destinationPort() > 0) {
|
|
- channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
|
|
- }
|
|
- if (CollectionUtils.isNotEmpty(msg.tlvs())) {
|
|
- msg.tlvs().forEach(tlv -> {
|
|
- AttributeKey<String> key = AttributeKeys.valueOf(
|
|
- HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
|
|
- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
|
|
- channel.attr(key).set(value);
|
|
- });
|
|
+ private void handleWithMessage(HAProxyMessage msg, Channel channel) {
|
|
+ try {
|
|
+ if (StringUtils.isNotBlank(msg.sourceAddress())) {
|
|
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
|
|
+ }
|
|
+ if (msg.sourcePort() > 0) {
|
|
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
|
|
+ }
|
|
+ if (StringUtils.isNotBlank(msg.destinationAddress())) {
|
|
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
|
|
+ }
|
|
+ if (msg.destinationPort() > 0) {
|
|
+ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
|
|
+ }
|
|
+ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
|
|
+ msg.tlvs().forEach(tlv -> {
|
|
+ AttributeKey<String> key = AttributeKeys.valueOf(
|
|
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
|
|
+ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
|
|
+ channel.attr(key).set(value);
|
|
+ });
|
|
+ }
|
|
+ } finally {
|
|
+ msg.release();
|
|
}
|
|
}
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 5914ff8dbb9d37e2cb48ef9c4f0256c6185b4659 Mon Sep 17 00:00:00 2001
|
|
From: lyx <56945247+lyx2000@users.noreply.github.com>
|
|
Date: Sat, 15 Jul 2023 18:15:57 +0800
|
|
Subject: [PATCH 7/8] [ISSUE #6968] fix grpc acl bug (#6969)
|
|
|
|
* feat(acl): fix acl bug
|
|
|
|
Signed-off-by: lyx <1419360299@qq.com>
|
|
|
|
# Conflicts:
|
|
# proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
|
|
|
|
* add access test for two client
|
|
|
|
Signed-off-by: lyx <1419360299@qq.com>
|
|
|
|
* use specific acl config
|
|
|
|
Signed-off-by: lyx <1419360299@qq.com>
|
|
|
|
* Recovering unchange file
|
|
|
|
Signed-off-by: lyx <1419360299@qq.com>
|
|
|
|
* let test pass
|
|
|
|
Signed-off-by: lyx <1419360299@qq.com>
|
|
|
|
---------
|
|
|
|
Signed-off-by: lyx <1419360299@qq.com>
|
|
---
|
|
.../acl/plain/PlainAccessResource.java | 21 +-
|
|
.../acl/RemotingClientAccessTest.java | 189 ++++++++++++++++++
|
|
.../access_acl_conf/acl/plain_acl.yml | 31 +++
|
|
acl/src/test/resources/conf/acl/plain_acl.yml | 1 -
|
|
pom.xml | 1 +
|
|
.../apache/rocketmq/proxy/ProxyStartup.java | 17 +-
|
|
.../proxy/grpc/GrpcServerBuilder.java | 10 +-
|
|
.../remoting/RemotingProtocolServer.java | 15 +-
|
|
8 files changed, 255 insertions(+), 30 deletions(-)
|
|
create mode 100644 acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java
|
|
create mode 100644 acl/src/test/resources/access_acl_conf/acl/plain_acl.yml
|
|
|
|
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
|
|
index cdbd9ea9b..72aa8ca71 100644
|
|
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
|
|
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
|
|
@@ -223,7 +223,7 @@ public class PlainAccessResource implements AccessResource {
|
|
if (!request.hasGroup()) {
|
|
throw new AclException("Consumer heartbeat doesn't have group");
|
|
} else {
|
|
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
}
|
|
}
|
|
} else if (SendMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
|
|
@@ -240,15 +240,15 @@ public class PlainAccessResource implements AccessResource {
|
|
accessResource.addResourceAndPerm(topic, Permission.PUB);
|
|
} else if (ReceiveMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
|
|
ReceiveMessageRequest request = (ReceiveMessageRequest) messageV3;
|
|
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
accessResource.addResourceAndPerm(request.getMessageQueue().getTopic(), Permission.SUB);
|
|
} else if (AckMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
|
|
AckMessageRequest request = (AckMessageRequest) messageV3;
|
|
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
|
|
} else if (ForwardMessageToDeadLetterQueueRequest.getDescriptor().getFullName().equals(rpcFullName)) {
|
|
ForwardMessageToDeadLetterQueueRequest request = (ForwardMessageToDeadLetterQueueRequest) messageV3;
|
|
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
|
|
} else if (EndTransactionRequest.getDescriptor().getFullName().equals(rpcFullName)) {
|
|
EndTransactionRequest request = (EndTransactionRequest) messageV3;
|
|
@@ -264,7 +264,7 @@ public class PlainAccessResource implements AccessResource {
|
|
}
|
|
if (command.getSettings().hasSubscription()) {
|
|
Subscription subscription = command.getSettings().getSubscription();
|
|
- accessResource.addResourceAndPerm(subscription.getGroup(), Permission.SUB);
|
|
+ accessResource.addGroupResourceAndPerm(subscription.getGroup(), Permission.SUB);
|
|
for (SubscriptionEntry entry : subscription.getSubscriptionsList()) {
|
|
accessResource.addResourceAndPerm(entry.getTopic(), Permission.SUB);
|
|
}
|
|
@@ -275,17 +275,17 @@ public class PlainAccessResource implements AccessResource {
|
|
}
|
|
} else if (NotifyClientTerminationRequest.getDescriptor().getFullName().equals(rpcFullName)) {
|
|
NotifyClientTerminationRequest request = (NotifyClientTerminationRequest) messageV3;
|
|
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
} else if (QueryRouteRequest.getDescriptor().getFullName().equals(rpcFullName)) {
|
|
QueryRouteRequest request = (QueryRouteRequest) messageV3;
|
|
accessResource.addResourceAndPerm(request.getTopic(), Permission.ANY);
|
|
} else if (QueryAssignmentRequest.getDescriptor().getFullName().equals(rpcFullName)) {
|
|
QueryAssignmentRequest request = (QueryAssignmentRequest) messageV3;
|
|
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
|
|
} else if (ChangeInvisibleDurationRequest.getDescriptor().getFullName().equals(rpcFullName)) {
|
|
ChangeInvisibleDurationRequest request = (ChangeInvisibleDurationRequest) messageV3;
|
|
- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
+ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
|
|
accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
|
|
}
|
|
} catch (Throwable t) {
|
|
@@ -299,6 +299,11 @@ public class PlainAccessResource implements AccessResource {
|
|
addResourceAndPerm(resourceName, permission);
|
|
}
|
|
|
|
+ private void addGroupResourceAndPerm(Resource resource, byte permission) {
|
|
+ String resourceName = NamespaceUtil.wrapNamespace(resource.getResourceNamespace(), resource.getName());
|
|
+ addResourceAndPerm(getRetryTopic(resourceName), permission);
|
|
+ }
|
|
+
|
|
public static PlainAccessResource build(PlainAccessConfig plainAccessConfig, RemoteAddressStrategy remoteAddressStrategy) {
|
|
PlainAccessResource plainAccessResource = new PlainAccessResource();
|
|
plainAccessResource.setAccessKey(plainAccessConfig.getAccessKey());
|
|
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java b/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java
|
|
new file mode 100644
|
|
index 000000000..88c5e09a9
|
|
--- /dev/null
|
|
+++ b/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java
|
|
@@ -0,0 +1,189 @@
|
|
+/*
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+ * contributor license agreements. See the NOTICE file distributed with
|
|
+ * this work for additional information regarding copyright ownership.
|
|
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+ * (the "License"); you may not use this file except in compliance with
|
|
+ * the License. You may obtain a copy of the License at
|
|
+ *
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
+ *
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+ * See the License for the specific language governing permissions and
|
|
+ * limitations under the License.
|
|
+ */
|
|
+package org.apache.rocketmq.acl;
|
|
+
|
|
+import java.io.File;
|
|
+import java.io.IOException;
|
|
+import java.nio.ByteBuffer;
|
|
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
|
+import org.apache.rocketmq.acl.common.AclException;
|
|
+import org.apache.rocketmq.acl.common.SessionCredentials;
|
|
+import org.apache.rocketmq.acl.plain.AclTestHelper;
|
|
+import org.apache.rocketmq.acl.plain.PlainAccessResource;
|
|
+import org.apache.rocketmq.acl.plain.PlainAccessValidator;
|
|
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
|
|
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
+import org.apache.rocketmq.remoting.protocol.RequestCode;
|
|
+import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
|
|
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
|
|
+import org.junit.After;
|
|
+import org.junit.Assert;
|
|
+import org.junit.Before;
|
|
+import org.junit.Test;
|
|
+
|
|
+public class RemotingClientAccessTest {
|
|
+
|
|
+ private PlainAccessValidator plainAccessValidator;
|
|
+ private AclClientRPCHook aclClient;
|
|
+ private SessionCredentials sessionCredentials;
|
|
+
|
|
+ private File confHome;
|
|
+
|
|
+ private String clientAddress = "10.7.1.3";
|
|
+
|
|
+ @Before
|
|
+ public void init() throws IOException {
|
|
+ String folder = "access_acl_conf";
|
|
+ confHome = AclTestHelper.copyResources(folder, true);
|
|
+ System.setProperty("rocketmq.home.dir", confHome.getAbsolutePath());
|
|
+ System.setProperty("rocketmq.acl.plain.file", "/access_acl_conf/acl/plain_acl.yml".replace("/", File.separator));
|
|
+
|
|
+ plainAccessValidator = new PlainAccessValidator();
|
|
+ sessionCredentials = new SessionCredentials();
|
|
+ sessionCredentials.setAccessKey("rocketmq3");
|
|
+ sessionCredentials.setSecretKey("12345678");
|
|
+ aclClient = new AclClientRPCHook(sessionCredentials);
|
|
+ }
|
|
+
|
|
+ @After
|
|
+ public void cleanUp() {
|
|
+ AclTestHelper.recursiveDelete(confHome);
|
|
+ }
|
|
+
|
|
+ @Test(expected = AclException.class)
|
|
+ public void testProduceDenyTopic() {
|
|
+ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
|
|
+ messageRequestHeader.setTopic("topicD");
|
|
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
|
|
+ aclClient.doBeforeRequest(clientAddress, remotingCommand);
|
|
+
|
|
+ ByteBuffer buf = remotingCommand.encodeHeader();
|
|
+ buf.getInt();
|
|
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
|
|
+ buf.position(0);
|
|
+ try {
|
|
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), clientAddress);
|
|
+ plainAccessValidator.validate(accessResource);
|
|
+ } catch (RemotingCommandException e) {
|
|
+ e.printStackTrace();
|
|
+ Assert.fail("Should not throw IOException");
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Test
|
|
+ public void testProduceAuthorizedTopic() {
|
|
+ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
|
|
+ messageRequestHeader.setTopic("topicA");
|
|
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
|
|
+ aclClient.doBeforeRequest(clientAddress, remotingCommand);
|
|
+
|
|
+ ByteBuffer buf = remotingCommand.encodeHeader();
|
|
+ buf.getInt();
|
|
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
|
|
+ buf.position(0);
|
|
+ try {
|
|
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), clientAddress);
|
|
+ plainAccessValidator.validate(accessResource);
|
|
+ } catch (RemotingCommandException e) {
|
|
+ e.printStackTrace();
|
|
+ Assert.fail("Should not throw IOException");
|
|
+ }
|
|
+ }
|
|
+
|
|
+
|
|
+ @Test(expected = AclException.class)
|
|
+ public void testConsumeDenyTopic() {
|
|
+ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
|
|
+ pullMessageRequestHeader.setTopic("topicD");
|
|
+ pullMessageRequestHeader.setConsumerGroup("groupB");
|
|
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
|
|
+ aclClient.doBeforeRequest("", remotingCommand);
|
|
+ ByteBuffer buf = remotingCommand.encodeHeader();
|
|
+ buf.getInt();
|
|
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
|
|
+ buf.position(0);
|
|
+ try {
|
|
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
|
|
+ plainAccessValidator.validate(accessResource);
|
|
+ } catch (RemotingCommandException e) {
|
|
+ e.printStackTrace();
|
|
+ Assert.fail("Should not throw IOException");
|
|
+ }
|
|
+
|
|
+ }
|
|
+
|
|
+ @Test
|
|
+ public void testConsumeAuthorizedTopic() {
|
|
+ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
|
|
+ pullMessageRequestHeader.setTopic("topicB");
|
|
+ pullMessageRequestHeader.setConsumerGroup("groupB");
|
|
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
|
|
+ aclClient.doBeforeRequest("", remotingCommand);
|
|
+ ByteBuffer buf = remotingCommand.encodeHeader();
|
|
+ buf.getInt();
|
|
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
|
|
+ buf.position(0);
|
|
+ try {
|
|
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
|
|
+ plainAccessValidator.validate(accessResource);
|
|
+ } catch (RemotingCommandException e) {
|
|
+ e.printStackTrace();
|
|
+ Assert.fail("Should not throw IOException");
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Test(expected = AclException.class)
|
|
+ public void testConsumeInDeniedGroup() {
|
|
+ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
|
|
+ pullMessageRequestHeader.setTopic("topicB");
|
|
+ pullMessageRequestHeader.setConsumerGroup("groupD");
|
|
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
|
|
+ aclClient.doBeforeRequest("", remotingCommand);
|
|
+ ByteBuffer buf = remotingCommand.encodeHeader();
|
|
+ buf.getInt();
|
|
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
|
|
+ buf.position(0);
|
|
+ try {
|
|
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
|
|
+ plainAccessValidator.validate(accessResource);
|
|
+ } catch (RemotingCommandException e) {
|
|
+ e.printStackTrace();
|
|
+ Assert.fail("Should not throw IOException");
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Test
|
|
+ public void testConsumeInAuthorizedGroup() {
|
|
+ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
|
|
+ pullMessageRequestHeader.setTopic("topicB");
|
|
+ pullMessageRequestHeader.setConsumerGroup("groupB");
|
|
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
|
|
+ aclClient.doBeforeRequest("", remotingCommand);
|
|
+ ByteBuffer buf = remotingCommand.encodeHeader();
|
|
+ buf.getInt();
|
|
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
|
|
+ buf.position(0);
|
|
+ try {
|
|
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
|
|
+ plainAccessValidator.validate(accessResource);
|
|
+ } catch (RemotingCommandException e) {
|
|
+ e.printStackTrace();
|
|
+ Assert.fail("Should not throw IOException");
|
|
+ }
|
|
+ }
|
|
+
|
|
+}
|
|
diff --git a/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml b/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml
|
|
new file mode 100644
|
|
index 000000000..28a8c4888
|
|
--- /dev/null
|
|
+++ b/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml
|
|
@@ -0,0 +1,31 @@
|
|
+# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
+# contributor license agreements. See the NOTICE file distributed with
|
|
+# this work for additional information regarding copyright ownership.
|
|
+# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
+# (the "License"); you may not use this file except in compliance with
|
|
+# the License. You may obtain a copy of the License at
|
|
+#
|
|
+# http://www.apache.org/licenses/LICENSE-2.0
|
|
+#
|
|
+# Unless required by applicable law or agreed to in writing, software
|
|
+# distributed under the License is distributed on an "AS IS" BASIS,
|
|
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
+# See the License for the specific language governing permissions and
|
|
+# limitations under the License.
|
|
+
|
|
+accounts:
|
|
+ - accessKey: rocketmq3
|
|
+ secretKey: 12345678
|
|
+ admin: false
|
|
+ defaultTopicPerm: DENY
|
|
+ defaultGroupPerm: DENY
|
|
+ topicPerms:
|
|
+ - topicA=PUB
|
|
+ - topicB=SUB
|
|
+ - topicC=PUB|SUB
|
|
+ - topicD=DENY
|
|
+ groupPerms:
|
|
+ - groupB=SUB
|
|
+ - groupC=PUB|SUB
|
|
+ - groupD=DENY
|
|
+
|
|
diff --git a/acl/src/test/resources/conf/acl/plain_acl.yml b/acl/src/test/resources/conf/acl/plain_acl.yml
|
|
index 5641a94bf..34e46696d 100644
|
|
--- a/acl/src/test/resources/conf/acl/plain_acl.yml
|
|
+++ b/acl/src/test/resources/conf/acl/plain_acl.yml
|
|
@@ -41,4 +41,3 @@ accounts:
|
|
whiteRemoteAddress: 192.168.1.*
|
|
# if it is admin, it could access all resources
|
|
admin: true
|
|
-
|
|
diff --git a/pom.xml b/pom.xml
|
|
index 12bc2dbd5..4d5dd1dec 100644
|
|
--- a/pom.xml
|
|
+++ b/pom.xml
|
|
@@ -147,6 +147,7 @@
|
|
<awaitility.version>4.1.0</awaitility.version>
|
|
<truth.version>0.30</truth.version>
|
|
<s3mock-junit4.version>2.11.0</s3mock-junit4.version>
|
|
+ <rocketmq-client-java.version>5.0.5</rocketmq-client-java.version>
|
|
|
|
<!-- Build plugin dependencies -->
|
|
<versions-maven-plugin.version>2.2</versions-maven-plugin.version>
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
|
|
index ea13bb808..06d5f4525 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
|
|
@@ -29,11 +29,14 @@ import org.apache.commons.cli.DefaultParser;
|
|
import org.apache.commons.cli.Option;
|
|
import org.apache.commons.cli.Options;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
+import org.apache.rocketmq.acl.AccessValidator;
|
|
+import org.apache.rocketmq.acl.plain.PlainAccessValidator;
|
|
import org.apache.rocketmq.broker.BrokerController;
|
|
import org.apache.rocketmq.broker.BrokerStartup;
|
|
import org.apache.rocketmq.common.MixAll;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
|
|
+import org.apache.rocketmq.common.utils.ServiceProvider;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
|
|
@@ -75,16 +78,17 @@ public class ProxyStartup {
|
|
|
|
MessagingProcessor messagingProcessor = createMessagingProcessor();
|
|
|
|
+ List<AccessValidator> accessValidators = loadAccessValidators();
|
|
// create grpcServer
|
|
GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor, ConfigurationManager.getProxyConfig().getGrpcServerPort())
|
|
.addService(createServiceProcessor(messagingProcessor))
|
|
.addService(ChannelzService.newInstance(100))
|
|
.addService(ProtoReflectionService.newInstance())
|
|
- .configInterceptor()
|
|
+ .configInterceptor(accessValidators)
|
|
.build();
|
|
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer);
|
|
|
|
- RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor);
|
|
+ RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor, accessValidators);
|
|
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer);
|
|
|
|
// start servers one by one.
|
|
@@ -109,6 +113,15 @@ public class ProxyStartup {
|
|
log.info(new Date() + " rocketmq-proxy startup successfully");
|
|
}
|
|
|
|
+ protected static List<AccessValidator> loadAccessValidators() {
|
|
+ List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
|
|
+ if (accessValidators.isEmpty()) {
|
|
+ log.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator");
|
|
+ accessValidators.add(new PlainAccessValidator());
|
|
+ }
|
|
+ return accessValidators;
|
|
+ }
|
|
+
|
|
protected static void initConfiguration(CommandLineArgument commandLineArgument) throws Exception {
|
|
if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) {
|
|
System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath());
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
|
|
index 437b9216b..9cddd3013 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
|
|
@@ -28,9 +28,7 @@ import java.util.List;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.acl.AccessValidator;
|
|
-import org.apache.rocketmq.acl.plain.PlainAccessValidator;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
-import org.apache.rocketmq.common.utils.ServiceProvider;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
@@ -98,14 +96,8 @@ public class GrpcServerBuilder {
|
|
return new GrpcServer(this.serverBuilder.build());
|
|
}
|
|
|
|
- public GrpcServerBuilder configInterceptor() {
|
|
+ public GrpcServerBuilder configInterceptor(List<AccessValidator> accessValidators) {
|
|
// grpc interceptors, including acl, logging etc.
|
|
- List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
|
|
- if (accessValidators.isEmpty()) {
|
|
- log.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator");
|
|
- accessValidators.add(new PlainAccessValidator());
|
|
- }
|
|
-
|
|
this.serverBuilder.intercept(new AuthenticationInterceptor(accessValidators));
|
|
|
|
this.serverBuilder
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
|
|
index f08094c16..bcc9edd09 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
|
|
@@ -19,7 +19,6 @@ package org.apache.rocketmq.proxy.remoting;
|
|
|
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
import io.netty.channel.Channel;
|
|
-import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.CompletableFuture;
|
|
@@ -28,15 +27,14 @@ import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.acl.AccessValidator;
|
|
-import org.apache.rocketmq.acl.plain.PlainAccessValidator;
|
|
import org.apache.rocketmq.client.exception.MQClientException;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.future.FutureTaskExt;
|
|
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
|
|
import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor;
|
|
+import org.apache.rocketmq.common.utils.StartAndShutdown;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
-import org.apache.rocketmq.common.utils.StartAndShutdown;
|
|
import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
import org.apache.rocketmq.proxy.config.ProxyConfig;
|
|
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
|
|
@@ -86,11 +84,11 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
|
|
protected final ThreadPoolExecutor defaultExecutor;
|
|
protected final ScheduledExecutorService timerExecutor;
|
|
|
|
- public RemotingProtocolServer(MessagingProcessor messagingProcessor) {
|
|
+ public RemotingProtocolServer(MessagingProcessor messagingProcessor, List<AccessValidator> accessValidators) {
|
|
this.messagingProcessor = messagingProcessor;
|
|
this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService());
|
|
|
|
- RequestPipeline pipeline = createRequestPipeline();
|
|
+ RequestPipeline pipeline = createRequestPipeline(accessValidators);
|
|
this.getTopicRouteActivity = new GetTopicRouteActivity(pipeline, messagingProcessor);
|
|
this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager);
|
|
this.consumerManagerActivity = new ConsumerManagerActivity(pipeline, messagingProcessor);
|
|
@@ -254,15 +252,12 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
|
|
return future;
|
|
}
|
|
|
|
- protected RequestPipeline createRequestPipeline() {
|
|
+ protected RequestPipeline createRequestPipeline(List<AccessValidator> accessValidators) {
|
|
RequestPipeline pipeline = (ctx, request, context) -> {
|
|
};
|
|
-
|
|
- List<AccessValidator> accessValidatorList = new ArrayList<>();
|
|
- accessValidatorList.add(new PlainAccessValidator());
|
|
// add pipeline
|
|
// the last pipe add will execute at the first
|
|
- return pipeline.pipe(new AuthenticationPipeline(accessValidatorList));
|
|
+ return pipeline.pipe(new AuthenticationPipeline(accessValidators));
|
|
}
|
|
|
|
protected class ThreadPoolHeadSlowTimeMillsMonitor implements ThreadPoolStatusMonitor {
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 440be1ed4ce2af0ab58af6c3019de7075c09c20f Mon Sep 17 00:00:00 2001
|
|
From: fuyou001 <yubao.fyb@alibaba-inc.com>
|
|
Date: Mon, 17 Jul 2023 19:23:23 +0800
|
|
Subject: [PATCH 8/8] [ISSUE #7031] fix Pop caused broker memory leak bug
|
|
(#7032)
|
|
|
|
---
|
|
.../broker/processor/PopBufferMergeService.java | 17 ++++++++++++++++-
|
|
.../broker/processor/PopMessageProcessor.java | 11 ++++++-----
|
|
2 files changed, 22 insertions(+), 6 deletions(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
|
|
index d7bc7c694..b7ba8ad4a 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
|
|
@@ -429,9 +429,16 @@ public class PopBufferMergeService extends ServiceThread {
|
|
* @param nextBeginOffset
|
|
* @return
|
|
*/
|
|
- public void addCkJustOffset(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {
|
|
+ public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {
|
|
PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset, true);
|
|
|
|
+ if (this.buffer.containsKey(pointWrapper.getMergeKey())) {
|
|
+ // when mergeKey conflict
|
|
+ // will cause PopBufferMergeService.scanCommitOffset cannot poll PopCheckPointWrapper
|
|
+ POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ckJustOffset. ck:{}, mergeKey:{}", pointWrapper, pointWrapper.getMergeKey());
|
|
+ return false;
|
|
+ }
|
|
+
|
|
this.putCkToStore(pointWrapper, !checkQueueOk(pointWrapper));
|
|
|
|
putOffsetQueue(pointWrapper);
|
|
@@ -440,6 +447,7 @@ public class PopBufferMergeService extends ServiceThread {
|
|
if (brokerController.getBrokerConfig().isEnablePopLog()) {
|
|
POP_LOGGER.info("[PopBuffer]add ck just offset, {}", pointWrapper);
|
|
}
|
|
+ return true;
|
|
}
|
|
|
|
public void addCkMock(String group, String topic, int queueId, long startOffset, long invisibleTime,
|
|
@@ -492,6 +500,13 @@ public class PopBufferMergeService extends ServiceThread {
|
|
return false;
|
|
}
|
|
|
|
+ if (this.buffer.containsKey(pointWrapper.getMergeKey())) {
|
|
+ // when mergeKey conflict
|
|
+ // will cause PopBufferMergeService.scanCommitOffset cannot poll PopCheckPointWrapper
|
|
+ POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ck. ck:{}, mergeKey:{}", pointWrapper, pointWrapper.getMergeKey());
|
|
+ return false;
|
|
+ }
|
|
+
|
|
putOffsetQueue(pointWrapper);
|
|
this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);
|
|
this.counter.incrementAndGet();
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
|
|
index 28549bfed..464f8f4fd 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
|
|
@@ -570,7 +570,9 @@ public class PopMessageProcessor implements NettyRequestProcessor {
|
|
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
|
|
requestHeader.getConsumerGroup(), topic, queueId, finalOffset);
|
|
} else {
|
|
- appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName());
|
|
+ if (!appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName())) {
|
|
+ return atomicRestNum.get() + result.getMessageCount();
|
|
+ }
|
|
}
|
|
ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, isRetry, queueId, finalOffset);
|
|
ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, isRetry, queueId,
|
|
@@ -685,7 +687,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
|
|
return msgInner;
|
|
}
|
|
|
|
- private void appendCheckPoint(final PopMessageRequestHeader requestHeader,
|
|
+ private boolean appendCheckPoint(final PopMessageRequestHeader requestHeader,
|
|
final String topic, final int reviveQid, final int queueId, final long offset,
|
|
final GetMessageResult getMessageTmpResult, final long popTime, final String brokerName) {
|
|
// add check point msg to revive log
|
|
@@ -708,10 +710,9 @@ public class PopMessageProcessor implements NettyRequestProcessor {
|
|
);
|
|
|
|
if (addBufferSuc) {
|
|
- return;
|
|
+ return true;
|
|
}
|
|
-
|
|
- this.popBufferMergeService.addCkJustOffset(
|
|
+ return this.popBufferMergeService.addCkJustOffset(
|
|
ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset()
|
|
);
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|