pulsar/0018-return-when-AbstractDispatcherSingleActiveConsumer-closed.patch

168 lines
9.9 KiB
Diff
Raw Normal View History

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 8cab06be11..17a6d1dbfb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -140,6 +140,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.topicName, consumer);
consumer.disconnect();
+ return;
}
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 9694584025..1d74d00776 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -47,7 +47,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
protected final Subscription subscription;
private CompletableFuture<Void> closeFuture = null;
- private final String name;
+ protected final String name;
protected final Rate msgDrop;
protected static final AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index e5e5349651..da7fe56bde 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -39,6 +39,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.protocol.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
@@ -84,6 +86,11 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
+ if (IS_CLOSED_UPDATER.get(this) == TRUE) {
+ log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer);
+ consumer.disconnect();
+ return;
+ }
super.addConsumer(consumer);
try {
selector.addConsumer(consumer);
@@ -168,4 +175,6 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
return (ksm.getKeySharedMode() == this.keySharedMode);
}
+
+ private static final Logger log = LoggerFactory.getLogger(NonPersistentStickyKeyDispatcherMultipleConsumers.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 90db639fde..e5b6f68bdf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
@@ -99,8 +100,18 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
}
+ @VisibleForTesting
+ public StickyKeyConsumerSelector getSelector() {
+ return selector;
+ }
+
@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
+ if (IS_CLOSED_UPDATER.get(this) == TRUE) {
+ log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer);
+ consumer.disconnect();
+ return;
+ }
super.addConsumer(consumer);
try {
selector.addConsumer(consumer);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 31e6f5579b..2b58ddfa88 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -293,6 +293,7 @@ public class PersistentDispatcherFailoverConsumerTest {
assertEquals(isActive, change.isIsActive());
}
+
@Test
public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index f319b7ce4a..b21f80ce1f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
@@ -67,6 +68,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
private ServiceConfiguration configMock;
private NonPersistentStickyKeyDispatcherMultipleConsumers nonpersistentDispatcher;
+ private StickyKeyConsumerSelector selector;
final String topicName = "non-persistent://public/default/testTopic";
@@ -93,6 +95,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies();
subscriptionMock = mock(NonPersistentSubscription.class);
+ selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
try (MockedStatic<DispatchRateLimiter> rateLimiterMockedStatic = mockStatic(DispatchRateLimiter.class);) {
rateLimiterMockedStatic.when(() -> DispatchRateLimiter.isDispatchRateNeeded(
@@ -102,8 +105,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
any(DispatchRateLimiter.Type.class)))
.thenReturn(false);
nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(
- topicMock, subscriptionMock,
- new HashRangeAutoSplitStickyKeyConsumerSelector());
+ topicMock, subscriptionMock, selector);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 99a66f44ac..587ef122ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -156,6 +157,16 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
}
}
+ @Test(timeOut = 10000)
+ public void testAddConsumerWhenClosed() throws Exception {
+ persistentDispatcher.close().get();
+ Consumer consumer = mock(Consumer.class);
+ persistentDispatcher.addConsumer(consumer);
+ verify(consumer, times(1)).disconnect();
+ assertEquals(0, persistentDispatcher.getConsumers().size());
+ assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty());
+ }
+
@Test
public void testSendMarkerMessage() {
try {