Can not receive any messages after switch to standby cluster
This commit is contained in:
parent
bdc879bf71
commit
847a97c5f5
149
0015-fix-no-messages.patch
Normal file
149
0015-fix-no-messages.patch
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
|
||||||
|
index 1e1245ed36..cf1603788f 100644
|
||||||
|
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
|
||||||
|
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
|
||||||
|
@@ -192,10 +192,14 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
|
||||||
|
sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap());
|
||||||
|
} else {
|
||||||
|
// Subscription doesn't exist. We need to force the creation of the subscription in this cluster, because
|
||||||
|
- log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription",
|
||||||
|
+ log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subscription",
|
||||||
|
topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos);
|
||||||
|
- topic.createSubscription(update.getSubscriptionName(),
|
||||||
|
- InitialPosition.Latest, true /* replicateSubscriptionState */, null);
|
||||||
|
+ topic.createSubscription(update.getSubscriptionName(), InitialPosition.Earliest,
|
||||||
|
+ true /* replicateSubscriptionState */, Collections.emptyMap())
|
||||||
|
+ .thenAccept(subscriptionCreated -> {
|
||||||
|
+ subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos),
|
||||||
|
+ AckType.Cumulative, Collections.emptyMap());
|
||||||
|
+ });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
|
||||||
|
index 046adaa5ec..fb5bae08f6 100644
|
||||||
|
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
|
||||||
|
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
|
||||||
|
@@ -25,9 +25,11 @@ import static org.testng.Assert.assertNotNull;
|
||||||
|
import static org.testng.Assert.assertNull;
|
||||||
|
import static org.testng.Assert.assertTrue;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
+import static org.testng.Assert.fail;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
+import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
@@ -41,6 +43,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
|
||||||
|
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
|
||||||
|
import org.apache.pulsar.client.api.Consumer;
|
||||||
|
import org.apache.pulsar.client.api.Message;
|
||||||
|
+import org.apache.pulsar.client.api.MessageId;
|
||||||
|
import org.apache.pulsar.client.api.MessageRoutingMode;
|
||||||
|
import org.apache.pulsar.client.api.Producer;
|
||||||
|
import org.apache.pulsar.client.api.PulsarClient;
|
||||||
|
@@ -48,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
|
||||||
|
import org.apache.pulsar.client.api.Schema;
|
||||||
|
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
|
||||||
|
import org.apache.pulsar.common.policies.data.TopicStats;
|
||||||
|
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
|
||||||
|
import org.awaitility.Awaitility;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
@@ -154,6 +158,94 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
|
||||||
|
"messages don't match.");
|
||||||
|
}
|
||||||
|
|
||||||
|
+ @Test
|
||||||
|
+ public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws Exception {
|
||||||
|
+ final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_");
|
||||||
|
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_");
|
||||||
|
+ final String subscriptionName = "s1";
|
||||||
|
+ final boolean isReplicatedSubscription = true;
|
||||||
|
+ final int messagesCount = 20;
|
||||||
|
+ final LinkedHashSet<String> sentMessages = new LinkedHashSet<>();
|
||||||
|
+ final Set<String> receivedMessages = Collections.synchronizedSet(new LinkedHashSet<>());
|
||||||
|
+ admin1.namespaces().createNamespace(namespace);
|
||||||
|
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
|
||||||
|
+ admin1.topics().createNonPartitionedTopic(topicName);
|
||||||
|
+ admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest, isReplicatedSubscription);
|
||||||
|
+ final PersistentTopic topic1 =
|
||||||
|
+ (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
|
||||||
|
+
|
||||||
|
+ // Send messages
|
||||||
|
+ // Wait for the topic created on the cluster2.
|
||||||
|
+ // Wait for the snapshot created.
|
||||||
|
+ final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).build();
|
||||||
|
+ Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
|
||||||
|
+ Consumer<String> consumer1 = client1.newConsumer(Schema.STRING).topic(topicName)
|
||||||
|
+ .subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
|
||||||
|
+ for (int i = 0; i < messagesCount / 2; i++) {
|
||||||
|
+ String msg = i + "";
|
||||||
|
+ producer1.send(msg);
|
||||||
|
+ sentMessages.add(msg);
|
||||||
|
+ }
|
||||||
|
+ Awaitility.await().untilAsserted(() -> {
|
||||||
|
+ ConcurrentOpenHashMap<String, ? extends Replicator> replicators = topic1.getReplicators();
|
||||||
|
+ assertTrue(replicators != null && replicators.size() == 1, "Replicator should started");
|
||||||
|
+ assertTrue(replicators.values().iterator().next().isConnected(), "Replicator should be connected");
|
||||||
|
+ assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(),
|
||||||
|
+ "One snapshot should be finished");
|
||||||
|
+ });
|
||||||
|
+ final PersistentTopic topic2 =
|
||||||
|
+ (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
|
||||||
|
+ Awaitility.await().untilAsserted(() -> {
|
||||||
|
+ assertTrue(topic2.getReplicatedSubscriptionController().isPresent(),
|
||||||
|
+ "Replicated subscription controller should created");
|
||||||
|
+ });
|
||||||
|
+ for (int i = messagesCount / 2; i < messagesCount; i++) {
|
||||||
|
+ String msg = i + "";
|
||||||
|
+ producer1.send(msg);
|
||||||
|
+ sentMessages.add(msg);
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ // Consume half messages and wait the subscription created on the cluster2.
|
||||||
|
+ for (int i = 0; i < messagesCount / 2; i++){
|
||||||
|
+ Message<String> message = consumer1.receive(2, TimeUnit.SECONDS);
|
||||||
|
+ if (message == null) {
|
||||||
|
+ fail("Should not receive null.");
|
||||||
|
+ }
|
||||||
|
+ receivedMessages.add(message.getValue());
|
||||||
|
+ consumer1.acknowledge(message);
|
||||||
|
+ }
|
||||||
|
+ Awaitility.await().untilAsserted(() -> {
|
||||||
|
+ assertNotNull(topic2.getSubscriptions().get(subscriptionName), "Subscription should created");
|
||||||
|
+ });
|
||||||
|
+
|
||||||
|
+ // Switch client to cluster2.
|
||||||
|
+ // Since the cluster1 was not crash, all messages will be replicated to the cluster2.
|
||||||
|
+ consumer1.close();
|
||||||
|
+ final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).build();
|
||||||
|
+ final Consumer consumer2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName)
|
||||||
|
+ .subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
|
||||||
|
+
|
||||||
|
+ // Verify all messages will be consumed.
|
||||||
|
+ Awaitility.await().untilAsserted(() -> {
|
||||||
|
+ while (true) {
|
||||||
|
+ Message message = consumer2.receive(2, TimeUnit.SECONDS);
|
||||||
|
+ if (message != null) {
|
||||||
|
+ receivedMessages.add(message.getValue().toString());
|
||||||
|
+ consumer2.acknowledge(message);
|
||||||
|
+ } else {
|
||||||
|
+ break;
|
||||||
|
+ }
|
||||||
|
+ }
|
||||||
|
+ assertEquals(receivedMessages.size(), sentMessages.size());
|
||||||
|
+ });
|
||||||
|
+
|
||||||
|
+ consumer2.close();
|
||||||
|
+ producer1.close();
|
||||||
|
+ client1.close();
|
||||||
|
+ client2.close();
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+
|
||||||
|
/**
|
||||||
|
* If there's no traffic, the snapshot creation should stop and then resume when traffic comes back
|
||||||
|
*/
|
||||||
@ -1,6 +1,6 @@
|
|||||||
%define debug_package %{nil}
|
%define debug_package %{nil}
|
||||||
%define pulsar_ver 2.10.4
|
%define pulsar_ver 2.10.4
|
||||||
%define pkg_ver 14
|
%define pkg_ver 15
|
||||||
%define _prefix /opt/pulsar
|
%define _prefix /opt/pulsar
|
||||||
Summary: Cloud-Native, Distributed Messaging and Streaming
|
Summary: Cloud-Native, Distributed Messaging and Streaming
|
||||||
Name: pulsar
|
Name: pulsar
|
||||||
@ -24,6 +24,7 @@ Patch0011: 0011-CVE-2023-25194.patch
|
|||||||
Patch0012: 0012-CVE-2023-2976.patch
|
Patch0012: 0012-CVE-2023-2976.patch
|
||||||
Patch0013: 0013-fix-deadlock.patch
|
Patch0013: 0013-fix-deadlock.patch
|
||||||
Patch0014: 0014-CVE-2023-32732.patch
|
Patch0014: 0014-CVE-2023-32732.patch
|
||||||
|
Patch0015: 0015-fix-no-messages.patch
|
||||||
BuildRoot: /root/rpmbuild/BUILDROOT/
|
BuildRoot: /root/rpmbuild/BUILDROOT/
|
||||||
BuildRequires: java-1.8.0-openjdk-devel,maven,systemd
|
BuildRequires: java-1.8.0-openjdk-devel,maven,systemd
|
||||||
Requires: java-1.8.0-openjdk,systemd
|
Requires: java-1.8.0-openjdk,systemd
|
||||||
@ -50,6 +51,7 @@ Pulsar is a distributed pub-sub messaging platform with a very flexible messagin
|
|||||||
%patch0012 -p1
|
%patch0012 -p1
|
||||||
%patch0013 -p1
|
%patch0013 -p1
|
||||||
%patch0014 -p1
|
%patch0014 -p1
|
||||||
|
%patch0015 -p1
|
||||||
|
|
||||||
%build
|
%build
|
||||||
mvn clean install -Pcore-modules,-main -DskipTests
|
mvn clean install -Pcore-modules,-main -DskipTests
|
||||||
@ -75,6 +77,8 @@ getent passwd pulsar >/dev/null || useradd -r -g pulsar -d / -s /sbin/nologin pu
|
|||||||
exit 0
|
exit 0
|
||||||
|
|
||||||
%changelog
|
%changelog
|
||||||
|
* Thu Dec 7 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-15
|
||||||
|
- fix Can not receive any messages after switch to standby cluster
|
||||||
* Thu Dec 7 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-14
|
* Thu Dec 7 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-14
|
||||||
- resolve cve-2023-32732
|
- resolve cve-2023-32732
|
||||||
* Thu Dec 7 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-13
|
* Thu Dec 7 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-13
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user