Compare commits
20 Commits
bdc879bf71
...
f6ad76f7d5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f6ad76f7d5 | ||
|
|
5a8314b95c | ||
|
|
237ddade55 | ||
|
|
a01313bb0d | ||
|
|
0a4fdb2251 | ||
|
|
75e1752c32 | ||
|
|
a63291e2c9 | ||
|
|
c1f21e9862 | ||
|
|
5a82bbf475 | ||
|
|
847a97c5f5 | ||
|
|
88d1e053bd | ||
|
|
0c79ca3252 | ||
|
|
2317def9fd | ||
|
|
ce43bfb69d | ||
|
|
0a577090c5 | ||
|
|
457fc30205 | ||
|
|
fb253ca687 | ||
|
|
c322d313a4 | ||
|
|
d439d964f7 | ||
|
|
158a304f32 |
149
0015-fix-no-messages.patch
Normal file
149
0015-fix-no-messages.patch
Normal file
@ -0,0 +1,149 @@
|
||||
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
|
||||
index 1e1245ed36..cf1603788f 100644
|
||||
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
|
||||
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
|
||||
@@ -192,10 +192,14 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
|
||||
sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap());
|
||||
} else {
|
||||
// Subscription doesn't exist. We need to force the creation of the subscription in this cluster, because
|
||||
- log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription",
|
||||
+ log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subscription",
|
||||
topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos);
|
||||
- topic.createSubscription(update.getSubscriptionName(),
|
||||
- InitialPosition.Latest, true /* replicateSubscriptionState */, null);
|
||||
+ topic.createSubscription(update.getSubscriptionName(), InitialPosition.Earliest,
|
||||
+ true /* replicateSubscriptionState */, Collections.emptyMap())
|
||||
+ .thenAccept(subscriptionCreated -> {
|
||||
+ subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos),
|
||||
+ AckType.Cumulative, Collections.emptyMap());
|
||||
+ });
|
||||
}
|
||||
}
|
||||
|
||||
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
|
||||
index 046adaa5ec..fb5bae08f6 100644
|
||||
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
|
||||
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
|
||||
@@ -25,9 +25,11 @@ import static org.testng.Assert.assertNotNull;
|
||||
import static org.testng.Assert.assertNull;
|
||||
import static org.testng.Assert.assertTrue;
|
||||
import com.google.common.collect.Sets;
|
||||
+import static org.testng.Assert.fail;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
+import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
@@ -41,6 +43,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
|
||||
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
|
||||
import org.apache.pulsar.client.api.Consumer;
|
||||
import org.apache.pulsar.client.api.Message;
|
||||
+import org.apache.pulsar.client.api.MessageId;
|
||||
import org.apache.pulsar.client.api.MessageRoutingMode;
|
||||
import org.apache.pulsar.client.api.Producer;
|
||||
import org.apache.pulsar.client.api.PulsarClient;
|
||||
@@ -48,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
|
||||
import org.apache.pulsar.common.policies.data.TopicStats;
|
||||
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
|
||||
import org.awaitility.Awaitility;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -154,6 +158,94 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
|
||||
"messages don't match.");
|
||||
}
|
||||
|
||||
+ @Test
|
||||
+ public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws Exception {
|
||||
+ final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_");
|
||||
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_");
|
||||
+ final String subscriptionName = "s1";
|
||||
+ final boolean isReplicatedSubscription = true;
|
||||
+ final int messagesCount = 20;
|
||||
+ final LinkedHashSet<String> sentMessages = new LinkedHashSet<>();
|
||||
+ final Set<String> receivedMessages = Collections.synchronizedSet(new LinkedHashSet<>());
|
||||
+ admin1.namespaces().createNamespace(namespace);
|
||||
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
|
||||
+ admin1.topics().createNonPartitionedTopic(topicName);
|
||||
+ admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest, isReplicatedSubscription);
|
||||
+ final PersistentTopic topic1 =
|
||||
+ (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
|
||||
+
|
||||
+ // Send messages
|
||||
+ // Wait for the topic created on the cluster2.
|
||||
+ // Wait for the snapshot created.
|
||||
+ final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).build();
|
||||
+ Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
|
||||
+ Consumer<String> consumer1 = client1.newConsumer(Schema.STRING).topic(topicName)
|
||||
+ .subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
|
||||
+ for (int i = 0; i < messagesCount / 2; i++) {
|
||||
+ String msg = i + "";
|
||||
+ producer1.send(msg);
|
||||
+ sentMessages.add(msg);
|
||||
+ }
|
||||
+ Awaitility.await().untilAsserted(() -> {
|
||||
+ ConcurrentOpenHashMap<String, ? extends Replicator> replicators = topic1.getReplicators();
|
||||
+ assertTrue(replicators != null && replicators.size() == 1, "Replicator should started");
|
||||
+ assertTrue(replicators.values().iterator().next().isConnected(), "Replicator should be connected");
|
||||
+ assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(),
|
||||
+ "One snapshot should be finished");
|
||||
+ });
|
||||
+ final PersistentTopic topic2 =
|
||||
+ (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
|
||||
+ Awaitility.await().untilAsserted(() -> {
|
||||
+ assertTrue(topic2.getReplicatedSubscriptionController().isPresent(),
|
||||
+ "Replicated subscription controller should created");
|
||||
+ });
|
||||
+ for (int i = messagesCount / 2; i < messagesCount; i++) {
|
||||
+ String msg = i + "";
|
||||
+ producer1.send(msg);
|
||||
+ sentMessages.add(msg);
|
||||
+ }
|
||||
+
|
||||
+ // Consume half messages and wait the subscription created on the cluster2.
|
||||
+ for (int i = 0; i < messagesCount / 2; i++){
|
||||
+ Message<String> message = consumer1.receive(2, TimeUnit.SECONDS);
|
||||
+ if (message == null) {
|
||||
+ fail("Should not receive null.");
|
||||
+ }
|
||||
+ receivedMessages.add(message.getValue());
|
||||
+ consumer1.acknowledge(message);
|
||||
+ }
|
||||
+ Awaitility.await().untilAsserted(() -> {
|
||||
+ assertNotNull(topic2.getSubscriptions().get(subscriptionName), "Subscription should created");
|
||||
+ });
|
||||
+
|
||||
+ // Switch client to cluster2.
|
||||
+ // Since the cluster1 was not crash, all messages will be replicated to the cluster2.
|
||||
+ consumer1.close();
|
||||
+ final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).build();
|
||||
+ final Consumer consumer2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName)
|
||||
+ .subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
|
||||
+
|
||||
+ // Verify all messages will be consumed.
|
||||
+ Awaitility.await().untilAsserted(() -> {
|
||||
+ while (true) {
|
||||
+ Message message = consumer2.receive(2, TimeUnit.SECONDS);
|
||||
+ if (message != null) {
|
||||
+ receivedMessages.add(message.getValue().toString());
|
||||
+ consumer2.acknowledge(message);
|
||||
+ } else {
|
||||
+ break;
|
||||
+ }
|
||||
+ }
|
||||
+ assertEquals(receivedMessages.size(), sentMessages.size());
|
||||
+ });
|
||||
+
|
||||
+ consumer2.close();
|
||||
+ producer1.close();
|
||||
+ client1.close();
|
||||
+ client2.close();
|
||||
+ }
|
||||
+
|
||||
+
|
||||
/**
|
||||
* If there's no traffic, the snapshot creation should stop and then resume when traffic comes back
|
||||
*/
|
||||
35
0016-handle-exception.patch
Normal file
35
0016-handle-exception.patch
Normal file
@ -0,0 +1,35 @@
|
||||
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
|
||||
index fe1e60d201..b98a111455 100644
|
||||
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
|
||||
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
|
||||
@@ -470,18 +470,20 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
|
||||
concat(locator.getIndexList(), newArrayList(info))
|
||||
).build(), locatorEntry.version
|
||||
).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> {
|
||||
- Throwable cause = FutureUtil.unwrapCompletionException(ex);
|
||||
- log.warn("[{}] Failed to update schema locator with position {}", schemaId, position, cause);
|
||||
- if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
|
||||
- bookKeeper.asyncDeleteLedger(position.getLedgerId(), new AsyncCallback.DeleteCallback() {
|
||||
- @Override
|
||||
- public void deleteComplete(int rc, Object ctx) {
|
||||
- if (rc != BKException.Code.OK) {
|
||||
- log.warn("[{}] Failed to delete ledger {} after updating schema locator failed, rc: {}",
|
||||
+ if (ex != null) {
|
||||
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
|
||||
+ log.warn("[{}] Failed to update schema locator with position {}", schemaId, position, cause);
|
||||
+ if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
|
||||
+ bookKeeper.asyncDeleteLedger(position.getLedgerId(), new AsyncCallback.DeleteCallback() {
|
||||
+ @Override
|
||||
+ public void deleteComplete(int rc, Object ctx) {
|
||||
+ if (rc != BKException.Code.OK) {
|
||||
+ log.warn("[{}] Failed to delete ledger {} after updating schema locator failed, rc: {}",
|
||||
schemaId, position.getLedgerId(), rc);
|
||||
+ }
|
||||
}
|
||||
- }
|
||||
- }, null);
|
||||
+ }, null);
|
||||
+ }
|
||||
}
|
||||
});
|
||||
}
|
||||
132
0017-return-earliest-position.patch
Normal file
132
0017-return-earliest-position.patch
Normal file
@ -0,0 +1,132 @@
|
||||
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
|
||||
index 46ca0f1400..0653e40cb3 100644
|
||||
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
|
||||
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
|
||||
@@ -532,6 +532,23 @@ public interface ManagedCursor {
|
||||
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
|
||||
FindEntryCallback callback, Object ctx);
|
||||
|
||||
+ /**
|
||||
+ * Find the newest entry that matches the given predicate.
|
||||
+ *
|
||||
+ * @param constraint
|
||||
+ * search only active entries or all entries
|
||||
+ * @param condition
|
||||
+ * predicate that reads an entry an applies a condition
|
||||
+ * @param callback
|
||||
+ * callback object returning the resultant position
|
||||
+ * @param ctx
|
||||
+ * opaque context
|
||||
+ * @param isFindFromLedger
|
||||
+ * find the newest entry from ledger
|
||||
+ */
|
||||
+ void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
|
||||
+ FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
|
||||
+
|
||||
/**
|
||||
* reset the cursor to specified position to enable replay of messages.
|
||||
*
|
||||
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
|
||||
index 011d3df77f..6045412398 100644
|
||||
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
|
||||
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
|
||||
@@ -1078,6 +1078,12 @@ public class ManagedCursorImpl implements ManagedCursor {
|
||||
@Override
|
||||
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
|
||||
FindEntryCallback callback, Object ctx) {
|
||||
+ asyncFindNewestMatching(constraint, condition, callback, ctx, false);
|
||||
+ }
|
||||
+
|
||||
+ @Override
|
||||
+ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
|
||||
+ FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
|
||||
OpFindNewest op;
|
||||
PositionImpl startPosition = null;
|
||||
long max = 0;
|
||||
@@ -1099,7 +1105,11 @@ public class ManagedCursorImpl implements ManagedCursor {
|
||||
Optional.empty(), ctx);
|
||||
return;
|
||||
}
|
||||
- op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
|
||||
+ if (isFindFromLedger) {
|
||||
+ op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx);
|
||||
+ } else {
|
||||
+ op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
|
||||
+ }
|
||||
op.find();
|
||||
}
|
||||
|
||||
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
|
||||
index 18fe4dba31..675f28e2d2 100644
|
||||
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
|
||||
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
|
||||
@@ -241,6 +241,11 @@ public class ManagedCursorContainerTest {
|
||||
AsyncCallbacks.FindEntryCallback callback, Object ctx) {
|
||||
}
|
||||
|
||||
+ @Override
|
||||
+ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
|
||||
+ AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
|
||||
+ }
|
||||
+
|
||||
@Override
|
||||
public void asyncResetCursor(final Position position, boolean forceReset,
|
||||
AsyncCallbacks.ResetCursorCallback callback) {
|
||||
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
|
||||
index 825bc546f4..838771e6d3 100644
|
||||
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
|
||||
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
|
||||
@@ -71,7 +71,7 @@ public class PersistentMessageFinder implements AsyncCallbacks.FindEntryCallback
|
||||
entry.release();
|
||||
}
|
||||
return false;
|
||||
- }, this, callback);
|
||||
+ }, this, callback, true);
|
||||
} else {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}][{}] Ignore message position find scheduled task, last find is still running", topicName,
|
||||
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
|
||||
index e70d0dc2b5..da78a0411d 100644
|
||||
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
|
||||
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
|
||||
@@ -243,6 +243,40 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
|
||||
factory.shutdown();
|
||||
}
|
||||
|
||||
+ @Test
|
||||
+ void testPersistentMessageFinderWhenLastMessageDelete() throws Exception {
|
||||
+ final String ledgerAndCursorName = "testPersistentMessageFinderWhenLastMessageDelete";
|
||||
+
|
||||
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
|
||||
+ config.setRetentionSizeInMB(10);
|
||||
+ config.setMaxEntriesPerLedger(10);
|
||||
+ config.setRetentionTime(1, TimeUnit.HOURS);
|
||||
+ ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
|
||||
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
|
||||
+
|
||||
+ ledger.addEntry(createMessageWrittenToLedger("msg1"));
|
||||
+ ledger.addEntry(createMessageWrittenToLedger("msg2"));
|
||||
+ ledger.addEntry(createMessageWrittenToLedger("msg3"));
|
||||
+ Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("last-message"));
|
||||
+
|
||||
+ long endTimestamp = System.currentTimeMillis() + 1000;
|
||||
+
|
||||
+ Result result = new Result();
|
||||
+ // delete last position message
|
||||
+ cursor.delete(lastPosition);
|
||||
+ CompletableFuture<Void> future = findMessage(result, cursor, endTimestamp);
|
||||
+ future.get();
|
||||
+ assertNull(result.exception);
|
||||
+ assertNotEquals(result.position, null);
|
||||
+ assertEquals(result.position, lastPosition);
|
||||
+
|
||||
+ result.reset();
|
||||
+ cursor.close();
|
||||
+ ledger.close();
|
||||
+ factory.shutdown();
|
||||
+ }
|
||||
+
|
||||
+
|
||||
@Test
|
||||
void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception {
|
||||
|
||||
@ -0,0 +1,167 @@
|
||||
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
|
||||
index 8cab06be11..17a6d1dbfb 100644
|
||||
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
|
||||
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
|
||||
@@ -140,6 +140,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
|
||||
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
|
||||
log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.topicName, consumer);
|
||||
consumer.disconnect();
|
||||
+ return;
|
||||
}
|
||||
|
||||
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
|
||||
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
|
||||
index 9694584025..1d74d00776 100644
|
||||
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
|
||||
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
|
||||
@@ -47,7 +47,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
|
||||
protected final Subscription subscription;
|
||||
|
||||
private CompletableFuture<Void> closeFuture = null;
|
||||
- private final String name;
|
||||
+ protected final String name;
|
||||
protected final Rate msgDrop;
|
||||
protected static final AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers>
|
||||
TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
|
||||
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
|
||||
index e5e5349651..da7fe56bde 100644
|
||||
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
|
||||
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
|
||||
@@ -39,6 +39,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
|
||||
import org.apache.pulsar.common.api.proto.KeySharedMeta;
|
||||
import org.apache.pulsar.common.api.proto.KeySharedMode;
|
||||
import org.apache.pulsar.common.protocol.Commands;
|
||||
+import org.slf4j.Logger;
|
||||
+import org.slf4j.LoggerFactory;
|
||||
|
||||
public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
|
||||
|
||||
@@ -84,6 +86,11 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
|
||||
|
||||
@Override
|
||||
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
|
||||
+ if (IS_CLOSED_UPDATER.get(this) == TRUE) {
|
||||
+ log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer);
|
||||
+ consumer.disconnect();
|
||||
+ return;
|
||||
+ }
|
||||
super.addConsumer(consumer);
|
||||
try {
|
||||
selector.addConsumer(consumer);
|
||||
@@ -168,4 +175,6 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
|
||||
public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
|
||||
return (ksm.getKeySharedMode() == this.keySharedMode);
|
||||
}
|
||||
+
|
||||
+ private static final Logger log = LoggerFactory.getLogger(NonPersistentStickyKeyDispatcherMultipleConsumers.class);
|
||||
}
|
||||
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
|
||||
index 90db639fde..e5b6f68bdf 100644
|
||||
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
|
||||
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
|
||||
@@ -18,6 +18,7 @@
|
||||
*/
|
||||
package org.apache.pulsar.broker.service.persistent;
|
||||
|
||||
+import com.google.common.annotations.VisibleForTesting;
|
||||
import io.netty.util.concurrent.FastThreadLocal;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
@@ -99,8 +100,18 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
|
||||
}
|
||||
}
|
||||
|
||||
+ @VisibleForTesting
|
||||
+ public StickyKeyConsumerSelector getSelector() {
|
||||
+ return selector;
|
||||
+ }
|
||||
+
|
||||
@Override
|
||||
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
|
||||
+ if (IS_CLOSED_UPDATER.get(this) == TRUE) {
|
||||
+ log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer);
|
||||
+ consumer.disconnect();
|
||||
+ return;
|
||||
+ }
|
||||
super.addConsumer(consumer);
|
||||
try {
|
||||
selector.addConsumer(consumer);
|
||||
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
|
||||
index 31e6f5579b..2b58ddfa88 100644
|
||||
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
|
||||
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
|
||||
@@ -293,6 +293,7 @@ public class PersistentDispatcherFailoverConsumerTest {
|
||||
assertEquals(isActive, change.isIsActive());
|
||||
}
|
||||
|
||||
+
|
||||
@Test
|
||||
public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
|
||||
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
|
||||
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
|
||||
index f319b7ce4a..b21f80ce1f 100644
|
||||
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
|
||||
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
|
||||
@@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.EntryBatchSizes;
|
||||
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
|
||||
import org.apache.pulsar.broker.service.RedeliveryTracker;
|
||||
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
|
||||
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
|
||||
import org.apache.pulsar.common.api.proto.MessageMetadata;
|
||||
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
|
||||
import org.apache.pulsar.common.protocol.Commands;
|
||||
@@ -67,6 +68,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
|
||||
private ServiceConfiguration configMock;
|
||||
|
||||
private NonPersistentStickyKeyDispatcherMultipleConsumers nonpersistentDispatcher;
|
||||
+ private StickyKeyConsumerSelector selector;
|
||||
|
||||
final String topicName = "non-persistent://public/default/testTopic";
|
||||
|
||||
@@ -93,6 +95,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
|
||||
doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies();
|
||||
|
||||
subscriptionMock = mock(NonPersistentSubscription.class);
|
||||
+ selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
|
||||
|
||||
try (MockedStatic<DispatchRateLimiter> rateLimiterMockedStatic = mockStatic(DispatchRateLimiter.class);) {
|
||||
rateLimiterMockedStatic.when(() -> DispatchRateLimiter.isDispatchRateNeeded(
|
||||
@@ -102,8 +105,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
|
||||
any(DispatchRateLimiter.Type.class)))
|
||||
.thenReturn(false);
|
||||
nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(
|
||||
- topicMock, subscriptionMock,
|
||||
- new HashRangeAutoSplitStickyKeyConsumerSelector());
|
||||
+ topicMock, subscriptionMock, selector);
|
||||
}
|
||||
}
|
||||
|
||||
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
|
||||
index 99a66f44ac..587ef122ec 100644
|
||||
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
|
||||
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
|
||||
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
+import static org.testng.Assert.assertTrue;
|
||||
import static org.testng.Assert.fail;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
@@ -156,6 +157,16 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
|
||||
}
|
||||
}
|
||||
|
||||
+ @Test(timeOut = 10000)
|
||||
+ public void testAddConsumerWhenClosed() throws Exception {
|
||||
+ persistentDispatcher.close().get();
|
||||
+ Consumer consumer = mock(Consumer.class);
|
||||
+ persistentDispatcher.addConsumer(consumer);
|
||||
+ verify(consumer, times(1)).disconnect();
|
||||
+ assertEquals(0, persistentDispatcher.getConsumers().size());
|
||||
+ assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty());
|
||||
+ }
|
||||
+
|
||||
@Test
|
||||
public void testSendMarkerMessage() {
|
||||
try {
|
||||
37
0019-clean-inactive-bundle.patch
Normal file
37
0019-clean-inactive-bundle.patch
Normal file
@ -0,0 +1,37 @@
|
||||
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
|
||||
index d81f6949f4..8bf8a73ff1 100644
|
||||
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
|
||||
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
|
||||
@@ -523,6 +523,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
|
||||
// load management decisions may be made.
|
||||
private void updateBundleData() {
|
||||
final Map<String, BundleData> bundleData = loadData.getBundleData();
|
||||
+ final Set<String> activeBundles = new HashSet<>();
|
||||
// Iterate over the broker data.
|
||||
for (Map.Entry<String, BrokerData> brokerEntry : loadData.getBrokerData().entrySet()) {
|
||||
final String broker = brokerEntry.getKey();
|
||||
@@ -534,6 +535,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
|
||||
for (Map.Entry<String, NamespaceBundleStats> entry : statsMap.entrySet()) {
|
||||
final String bundle = entry.getKey();
|
||||
final NamespaceBundleStats stats = entry.getValue();
|
||||
+ activeBundles.add(bundle);
|
||||
if (bundleData.containsKey(bundle)) {
|
||||
// If we recognize the bundle, add these stats as a new sample.
|
||||
bundleData.get(bundle).update(stats);
|
||||
@@ -545,6 +547,16 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
|
||||
bundleData.put(bundle, currentBundleData);
|
||||
}
|
||||
}
|
||||
+ //Remove not active bundle from loadData
|
||||
+ for (String bundle : bundleData.keySet()) {
|
||||
+ if (!activeBundles.contains(bundle)){
|
||||
+ bundleData.remove(bundle);
|
||||
+ if (pulsar.getLeaderElectionService().isLeader()){
|
||||
+ deleteBundleDataFromMetadataStore(bundle);
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
|
||||
// Remove all loaded bundles from the preallocated maps.
|
||||
final Map<String, BundleData> preallocatedBundleData = brokerData.getPreallocatedBundleData();
|
||||
22
pulsar.spec
22
pulsar.spec
@ -1,6 +1,6 @@
|
||||
%define debug_package %{nil}
|
||||
%define pulsar_ver 2.10.4
|
||||
%define pkg_ver 14
|
||||
%define pkg_ver 19
|
||||
%define _prefix /opt/pulsar
|
||||
Summary: Cloud-Native, Distributed Messaging and Streaming
|
||||
Name: pulsar
|
||||
@ -24,6 +24,11 @@ Patch0011: 0011-CVE-2023-25194.patch
|
||||
Patch0012: 0012-CVE-2023-2976.patch
|
||||
Patch0013: 0013-fix-deadlock.patch
|
||||
Patch0014: 0014-CVE-2023-32732.patch
|
||||
Patch0015: 0015-fix-no-messages.patch
|
||||
Patch0016: 0016-handle-exception.patch
|
||||
Patch0017: 0017-return-earliest-position.patch
|
||||
Patch0018: 0018-return-when-AbstractDispatcherSingleActiveConsumer-closed.patch
|
||||
Patch0019: 0019-clean-inactive-bundle.patch
|
||||
BuildRoot: /root/rpmbuild/BUILDROOT/
|
||||
BuildRequires: java-1.8.0-openjdk-devel,maven,systemd
|
||||
Requires: java-1.8.0-openjdk,systemd
|
||||
@ -50,6 +55,11 @@ Pulsar is a distributed pub-sub messaging platform with a very flexible messagin
|
||||
%patch0012 -p1
|
||||
%patch0013 -p1
|
||||
%patch0014 -p1
|
||||
%patch0015 -p1
|
||||
%patch0016 -p1
|
||||
%patch0017 -p1
|
||||
%patch0018 -p1
|
||||
%patch0019 -p1
|
||||
|
||||
%build
|
||||
mvn clean install -Pcore-modules,-main -DskipTests
|
||||
@ -75,6 +85,16 @@ getent passwd pulsar >/dev/null || useradd -r -g pulsar -d / -s /sbin/nologin pu
|
||||
exit 0
|
||||
|
||||
%changelog
|
||||
* Fri Dec 8 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-19
|
||||
- clean inactive bundle from bundleData in loadData and bundlesCache
|
||||
* Fri Dec 8 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-18
|
||||
- Return if AbstractDispatcherSingleActiveConsumer closed
|
||||
* Fri Dec 8 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-17
|
||||
- Fix return the earliest position when query position by timestamp.
|
||||
* Fri Dec 8 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-16
|
||||
- Only handle exception when there has
|
||||
* Thu Dec 7 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-15
|
||||
- fix Can not receive any messages after switch to standby cluster
|
||||
* Thu Dec 7 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-14
|
||||
- resolve cve-2023-32732
|
||||
* Thu Dec 7 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-13
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user