diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 1e1245ed36..cf1603788f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -192,10 +192,14 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap()); } else { // Subscription doesn't exist. We need to force the creation of the subscription in this cluster, because - log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription", + log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subscription", topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos); - topic.createSubscription(update.getSubscriptionName(), - InitialPosition.Latest, true /* replicateSubscriptionState */, null); + topic.createSubscription(update.getSubscriptionName(), InitialPosition.Earliest, + true /* replicateSubscriptionState */, Collections.emptyMap()) + .thenAccept(subscriptionCreated -> { + subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos), + AckType.Cumulative, Collections.emptyMap()); + }); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 046adaa5ec..fb5bae08f6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -25,9 +25,11 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; +import static org.testng.Assert.fail; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Map; @@ -41,6 +43,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -48,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,6 +158,94 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { "messages don't match."); } + @Test + public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_"); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"); + final String subscriptionName = "s1"; + final boolean isReplicatedSubscription = true; + final int messagesCount = 20; + final LinkedHashSet sentMessages = new LinkedHashSet<>(); + final Set receivedMessages = Collections.synchronizedSet(new LinkedHashSet<>()); + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + admin1.topics().createNonPartitionedTopic(topicName); + admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest, isReplicatedSubscription); + final PersistentTopic topic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + + // Send messages + // Wait for the topic created on the cluster2. + // Wait for the snapshot created. + final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).build(); + Producer producer1 = client1.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create(); + Consumer consumer1 = client1.newConsumer(Schema.STRING).topic(topicName) + .subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe(); + for (int i = 0; i < messagesCount / 2; i++) { + String msg = i + ""; + producer1.send(msg); + sentMessages.add(msg); + } + Awaitility.await().untilAsserted(() -> { + ConcurrentOpenHashMap replicators = topic1.getReplicators(); + assertTrue(replicators != null && replicators.size() == 1, "Replicator should started"); + assertTrue(replicators.values().iterator().next().isConnected(), "Replicator should be connected"); + assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(), + "One snapshot should be finished"); + }); + final PersistentTopic topic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + Awaitility.await().untilAsserted(() -> { + assertTrue(topic2.getReplicatedSubscriptionController().isPresent(), + "Replicated subscription controller should created"); + }); + for (int i = messagesCount / 2; i < messagesCount; i++) { + String msg = i + ""; + producer1.send(msg); + sentMessages.add(msg); + } + + // Consume half messages and wait the subscription created on the cluster2. + for (int i = 0; i < messagesCount / 2; i++){ + Message message = consumer1.receive(2, TimeUnit.SECONDS); + if (message == null) { + fail("Should not receive null."); + } + receivedMessages.add(message.getValue()); + consumer1.acknowledge(message); + } + Awaitility.await().untilAsserted(() -> { + assertNotNull(topic2.getSubscriptions().get(subscriptionName), "Subscription should created"); + }); + + // Switch client to cluster2. + // Since the cluster1 was not crash, all messages will be replicated to the cluster2. + consumer1.close(); + final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).build(); + final Consumer consumer2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName) + .subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe(); + + // Verify all messages will be consumed. + Awaitility.await().untilAsserted(() -> { + while (true) { + Message message = consumer2.receive(2, TimeUnit.SECONDS); + if (message != null) { + receivedMessages.add(message.getValue().toString()); + consumer2.acknowledge(message); + } else { + break; + } + } + assertEquals(receivedMessages.size(), sentMessages.size()); + }); + + consumer2.close(); + producer1.close(); + client1.close(); + client2.close(); + } + + /** * If there's no traffic, the snapshot creation should stop and then resume when traffic comes back */