Return if AbstractDispatcherSingleActiveConsumer closed
This commit is contained in:
parent
a63291e2c9
commit
a01313bb0d
@ -0,0 +1,167 @@
|
|||||||
|
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
|
||||||
|
index 8cab06be11..17a6d1dbfb 100644
|
||||||
|
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
|
||||||
|
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
|
||||||
|
@@ -140,6 +140,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
|
||||||
|
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
|
||||||
|
log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.topicName, consumer);
|
||||||
|
consumer.disconnect();
|
||||||
|
+ return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
|
||||||
|
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
|
||||||
|
index 9694584025..1d74d00776 100644
|
||||||
|
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
|
||||||
|
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
|
||||||
|
@@ -47,7 +47,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
|
||||||
|
protected final Subscription subscription;
|
||||||
|
|
||||||
|
private CompletableFuture<Void> closeFuture = null;
|
||||||
|
- private final String name;
|
||||||
|
+ protected final String name;
|
||||||
|
protected final Rate msgDrop;
|
||||||
|
protected static final AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers>
|
||||||
|
TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
|
||||||
|
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
|
||||||
|
index e5e5349651..da7fe56bde 100644
|
||||||
|
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
|
||||||
|
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
|
||||||
|
@@ -39,6 +39,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
|
||||||
|
import org.apache.pulsar.common.api.proto.KeySharedMeta;
|
||||||
|
import org.apache.pulsar.common.api.proto.KeySharedMode;
|
||||||
|
import org.apache.pulsar.common.protocol.Commands;
|
||||||
|
+import org.slf4j.Logger;
|
||||||
|
+import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
|
||||||
|
|
||||||
|
@@ -84,6 +86,11 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
|
||||||
|
+ if (IS_CLOSED_UPDATER.get(this) == TRUE) {
|
||||||
|
+ log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer);
|
||||||
|
+ consumer.disconnect();
|
||||||
|
+ return;
|
||||||
|
+ }
|
||||||
|
super.addConsumer(consumer);
|
||||||
|
try {
|
||||||
|
selector.addConsumer(consumer);
|
||||||
|
@@ -168,4 +175,6 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
|
||||||
|
public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
|
||||||
|
return (ksm.getKeySharedMode() == this.keySharedMode);
|
||||||
|
}
|
||||||
|
+
|
||||||
|
+ private static final Logger log = LoggerFactory.getLogger(NonPersistentStickyKeyDispatcherMultipleConsumers.class);
|
||||||
|
}
|
||||||
|
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
|
||||||
|
index 90db639fde..e5b6f68bdf 100644
|
||||||
|
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
|
||||||
|
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
|
||||||
|
@@ -18,6 +18,7 @@
|
||||||
|
*/
|
||||||
|
package org.apache.pulsar.broker.service.persistent;
|
||||||
|
|
||||||
|
+import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import io.netty.util.concurrent.FastThreadLocal;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
@@ -99,8 +100,18 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
+ @VisibleForTesting
|
||||||
|
+ public StickyKeyConsumerSelector getSelector() {
|
||||||
|
+ return selector;
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
@Override
|
||||||
|
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
|
||||||
|
+ if (IS_CLOSED_UPDATER.get(this) == TRUE) {
|
||||||
|
+ log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer);
|
||||||
|
+ consumer.disconnect();
|
||||||
|
+ return;
|
||||||
|
+ }
|
||||||
|
super.addConsumer(consumer);
|
||||||
|
try {
|
||||||
|
selector.addConsumer(consumer);
|
||||||
|
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
|
||||||
|
index 31e6f5579b..2b58ddfa88 100644
|
||||||
|
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
|
||||||
|
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
|
||||||
|
@@ -293,6 +293,7 @@ public class PersistentDispatcherFailoverConsumerTest {
|
||||||
|
assertEquals(isActive, change.isIsActive());
|
||||||
|
}
|
||||||
|
|
||||||
|
+
|
||||||
|
@Test
|
||||||
|
public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
|
||||||
|
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
|
||||||
|
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
|
||||||
|
index f319b7ce4a..b21f80ce1f 100644
|
||||||
|
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
|
||||||
|
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
|
||||||
|
@@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.EntryBatchSizes;
|
||||||
|
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
|
||||||
|
import org.apache.pulsar.broker.service.RedeliveryTracker;
|
||||||
|
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
|
||||||
|
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
|
||||||
|
import org.apache.pulsar.common.api.proto.MessageMetadata;
|
||||||
|
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
|
||||||
|
import org.apache.pulsar.common.protocol.Commands;
|
||||||
|
@@ -67,6 +68,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
|
||||||
|
private ServiceConfiguration configMock;
|
||||||
|
|
||||||
|
private NonPersistentStickyKeyDispatcherMultipleConsumers nonpersistentDispatcher;
|
||||||
|
+ private StickyKeyConsumerSelector selector;
|
||||||
|
|
||||||
|
final String topicName = "non-persistent://public/default/testTopic";
|
||||||
|
|
||||||
|
@@ -93,6 +95,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
|
||||||
|
doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies();
|
||||||
|
|
||||||
|
subscriptionMock = mock(NonPersistentSubscription.class);
|
||||||
|
+ selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
|
||||||
|
|
||||||
|
try (MockedStatic<DispatchRateLimiter> rateLimiterMockedStatic = mockStatic(DispatchRateLimiter.class);) {
|
||||||
|
rateLimiterMockedStatic.when(() -> DispatchRateLimiter.isDispatchRateNeeded(
|
||||||
|
@@ -102,8 +105,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
|
||||||
|
any(DispatchRateLimiter.Type.class)))
|
||||||
|
.thenReturn(false);
|
||||||
|
nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(
|
||||||
|
- topicMock, subscriptionMock,
|
||||||
|
- new HashRangeAutoSplitStickyKeyConsumerSelector());
|
||||||
|
+ topicMock, subscriptionMock, selector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
|
||||||
|
index 99a66f44ac..587ef122ec 100644
|
||||||
|
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
|
||||||
|
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
|
||||||
|
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
import static org.testng.Assert.assertEquals;
|
||||||
|
+import static org.testng.Assert.assertTrue;
|
||||||
|
import static org.testng.Assert.fail;
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
@@ -156,6 +157,16 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
+ @Test(timeOut = 10000)
|
||||||
|
+ public void testAddConsumerWhenClosed() throws Exception {
|
||||||
|
+ persistentDispatcher.close().get();
|
||||||
|
+ Consumer consumer = mock(Consumer.class);
|
||||||
|
+ persistentDispatcher.addConsumer(consumer);
|
||||||
|
+ verify(consumer, times(1)).disconnect();
|
||||||
|
+ assertEquals(0, persistentDispatcher.getConsumers().size());
|
||||||
|
+ assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty());
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
@Test
|
||||||
|
public void testSendMarkerMessage() {
|
||||||
|
try {
|
||||||
@ -1,6 +1,6 @@
|
|||||||
%define debug_package %{nil}
|
%define debug_package %{nil}
|
||||||
%define pulsar_ver 2.10.4
|
%define pulsar_ver 2.10.4
|
||||||
%define pkg_ver 17
|
%define pkg_ver 18
|
||||||
%define _prefix /opt/pulsar
|
%define _prefix /opt/pulsar
|
||||||
Summary: Cloud-Native, Distributed Messaging and Streaming
|
Summary: Cloud-Native, Distributed Messaging and Streaming
|
||||||
Name: pulsar
|
Name: pulsar
|
||||||
@ -27,6 +27,7 @@ Patch0014: 0014-CVE-2023-32732.patch
|
|||||||
Patch0015: 0015-fix-no-messages.patch
|
Patch0015: 0015-fix-no-messages.patch
|
||||||
Patch0016: 0016-handle-exception.patch
|
Patch0016: 0016-handle-exception.patch
|
||||||
Patch0017: 0017-return-earliest-position.patch
|
Patch0017: 0017-return-earliest-position.patch
|
||||||
|
Patch0018: 0018-return-when-AbstractDispatcherSingleActiveConsumer-closed.patch
|
||||||
BuildRoot: /root/rpmbuild/BUILDROOT/
|
BuildRoot: /root/rpmbuild/BUILDROOT/
|
||||||
BuildRequires: java-1.8.0-openjdk-devel,maven,systemd
|
BuildRequires: java-1.8.0-openjdk-devel,maven,systemd
|
||||||
Requires: java-1.8.0-openjdk,systemd
|
Requires: java-1.8.0-openjdk,systemd
|
||||||
@ -56,6 +57,7 @@ Pulsar is a distributed pub-sub messaging platform with a very flexible messagin
|
|||||||
%patch0015 -p1
|
%patch0015 -p1
|
||||||
%patch0016 -p1
|
%patch0016 -p1
|
||||||
%patch0017 -p1
|
%patch0017 -p1
|
||||||
|
%patch0018 -p1
|
||||||
|
|
||||||
%build
|
%build
|
||||||
mvn clean install -Pcore-modules,-main -DskipTests
|
mvn clean install -Pcore-modules,-main -DskipTests
|
||||||
@ -81,6 +83,8 @@ getent passwd pulsar >/dev/null || useradd -r -g pulsar -d / -s /sbin/nologin pu
|
|||||||
exit 0
|
exit 0
|
||||||
|
|
||||||
%changelog
|
%changelog
|
||||||
|
* Fri Dec 8 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-18
|
||||||
|
- Return if AbstractDispatcherSingleActiveConsumer closed
|
||||||
* Fri Dec 8 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-17
|
* Fri Dec 8 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-17
|
||||||
- Fix return the earliest position when query position by timestamp.
|
- Fix return the earliest position when query position by timestamp.
|
||||||
* Fri Dec 8 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-16
|
* Fri Dec 8 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-16
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user