diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index 8cab06be11..17a6d1dbfb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -140,6 +140,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas if (IS_CLOSED_UPDATER.get(this) == TRUE) { log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.topicName, consumer); consumer.disconnect(); + return; } if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java index 9694584025..1d74d00776 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java @@ -47,7 +47,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher protected final Subscription subscription; private CompletableFuture closeFuture = null; - private final String name; + protected final String name; protected final Rate msgDrop; protected static final AtomicIntegerFieldUpdater TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index e5e5349651..da7fe56bde 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -39,6 +39,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.protocol.Commands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers { @@ -84,6 +86,11 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis @Override public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { + if (IS_CLOSED_UPDATER.get(this) == TRUE) { + log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer); + consumer.disconnect(); + return; + } super.addConsumer(consumer); try { selector.addConsumer(consumer); @@ -168,4 +175,6 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) { return (ksm.getKeySharedMode() == this.keySharedMode); } + + private static final Logger log = LoggerFactory.getLogger(NonPersistentStickyKeyDispatcherMultipleConsumers.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 90db639fde..e5b6f68bdf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import com.google.common.annotations.VisibleForTesting; import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.Collections; @@ -99,8 +100,18 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi } } + @VisibleForTesting + public StickyKeyConsumerSelector getSelector() { + return selector; + } + @Override public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { + if (IS_CLOSED_UPDATER.get(this) == TRUE) { + log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer); + consumer.disconnect(); + return; + } super.addConsumer(consumer); try { selector.addConsumer(consumer); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 31e6f5579b..2b58ddfa88 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -293,6 +293,7 @@ public class PersistentDispatcherFailoverConsumerTest { assertEquals(isActive, change.isIsActive()); } + @Test public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java index f319b7ce4a..b21f80ce1f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.EntryBatchSizes; import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector; import org.apache.pulsar.broker.service.RedeliveryTracker; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; +import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.protocol.Commands; @@ -67,6 +68,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest { private ServiceConfiguration configMock; private NonPersistentStickyKeyDispatcherMultipleConsumers nonpersistentDispatcher; + private StickyKeyConsumerSelector selector; final String topicName = "non-persistent://public/default/testTopic"; @@ -93,6 +95,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest { doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies(); subscriptionMock = mock(NonPersistentSubscription.class); + selector = new HashRangeAutoSplitStickyKeyConsumerSelector(); try (MockedStatic rateLimiterMockedStatic = mockStatic(DispatchRateLimiter.class);) { rateLimiterMockedStatic.when(() -> DispatchRateLimiter.isDispatchRateNeeded( @@ -102,8 +105,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest { any(DispatchRateLimiter.Type.class))) .thenReturn(false); nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers( - topicMock, subscriptionMock, - new HashRangeAutoSplitStickyKeyConsumerSelector()); + topicMock, subscriptionMock, selector); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 99a66f44ac..587ef122ec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -156,6 +157,16 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { } } + @Test(timeOut = 10000) + public void testAddConsumerWhenClosed() throws Exception { + persistentDispatcher.close().get(); + Consumer consumer = mock(Consumer.class); + persistentDispatcher.addConsumer(consumer); + verify(consumer, times(1)).disconnect(); + assertEquals(0, persistentDispatcher.getConsumers().size()); + assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty()); + } + @Test public void testSendMarkerMessage() { try {