AlterIsr and LeaderAndIsr race condition
This commit is contained in:
parent
91901a02a8
commit
61fe48ab84
223
0013-AlterIsr.patch
Normal file
223
0013-AlterIsr.patch
Normal file
@ -0,0 +1,223 @@
|
|||||||
|
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
|
||||||
|
index a58f4238ff..88b337311d 100755
|
||||||
|
--- a/core/src/main/scala/kafka/cluster/Partition.scala
|
||||||
|
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
|
||||||
|
@@ -581,9 +581,6 @@ class Partition(val topicPartition: TopicPartition,
|
||||||
|
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
|
||||||
|
zkVersion = partitionState.zkVersion
|
||||||
|
|
||||||
|
- // Clear any pending AlterIsr requests and check replica state
|
||||||
|
- alterIsrManager.clearPending(topicPartition)
|
||||||
|
-
|
||||||
|
// In the case of successive leader elections in a short time period, a follower may have
|
||||||
|
// entries in its log from a later epoch than any entry in the new leader's log. In order
|
||||||
|
// to ensure that these followers can truncate to the right offset, we must cache the new
|
||||||
|
@@ -661,9 +658,6 @@ class Partition(val topicPartition: TopicPartition,
|
||||||
|
leaderEpochStartOffsetOpt = None
|
||||||
|
zkVersion = partitionState.zkVersion
|
||||||
|
|
||||||
|
- // Since we might have been a leader previously, still clear any pending AlterIsr requests
|
||||||
|
- alterIsrManager.clearPending(topicPartition)
|
||||||
|
-
|
||||||
|
if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) {
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
@@ -1373,13 +1367,15 @@ class Partition(val topicPartition: TopicPartition,
|
||||||
|
isrState = proposedIsrState
|
||||||
|
|
||||||
|
if (!alterIsrManager.submit(alterIsrItem)) {
|
||||||
|
- // If the ISR manager did not accept our update, we need to revert back to previous state
|
||||||
|
+ // If the ISR manager did not accept our update, we need to revert the proposed state.
|
||||||
|
+ // This can happen if the ISR state was updated by the controller (via LeaderAndIsr in ZK-mode or
|
||||||
|
+ // ChangePartitionRecord in KRaft mode) but we have an AlterIsr request still in-flight.
|
||||||
|
isrState = oldState
|
||||||
|
isrChangeListener.markFailed()
|
||||||
|
- throw new IllegalStateException(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
|
||||||
|
+ warn(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
|
||||||
|
+ } else {
|
||||||
|
+ debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
|
||||||
|
}
|
||||||
|
-
|
||||||
|
- debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala
|
||||||
|
index 9ad734f708..1059a3df3e 100644
|
||||||
|
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
|
||||||
|
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
|
||||||
|
@@ -49,8 +49,6 @@ trait AlterIsrManager {
|
||||||
|
def shutdown(): Unit = {}
|
||||||
|
|
||||||
|
def submit(alterIsrItem: AlterIsrItem): Boolean
|
||||||
|
-
|
||||||
|
- def clearPending(topicPartition: TopicPartition): Unit
|
||||||
|
}
|
||||||
|
|
||||||
|
case class AlterIsrItem(topicPartition: TopicPartition,
|
||||||
|
@@ -134,9 +132,6 @@ class DefaultAlterIsrManager(
|
||||||
|
enqueued
|
||||||
|
}
|
||||||
|
|
||||||
|
- override def clearPending(topicPartition: TopicPartition): Unit = {
|
||||||
|
- unsentIsrUpdates.remove(topicPartition)
|
||||||
|
- }
|
||||||
|
|
||||||
|
private[server] def maybePropagateIsrChanges(): Unit = {
|
||||||
|
// Send all pending items if there is not already a request in-flight.
|
||||||
|
diff --git a/core/src/main/scala/kafka/server/ZkIsrManager.scala b/core/src/main/scala/kafka/server/ZkIsrManager.scala
|
||||||
|
index 2d88aac6b4..8dffcdf307 100644
|
||||||
|
--- a/core/src/main/scala/kafka/server/ZkIsrManager.scala
|
||||||
|
+++ b/core/src/main/scala/kafka/server/ZkIsrManager.scala
|
||||||
|
@@ -55,12 +55,6 @@ class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) ex
|
||||||
|
period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
- override def clearPending(topicPartition: TopicPartition): Unit = {
|
||||||
|
- // Since we always immediately process ZK updates and never actually enqueue anything, there is nothing to
|
||||||
|
- // clear here so this is a no-op. Even if there are changes that have not been propagated, the write to ZK
|
||||||
|
- // has already happened, so we may as well send the notification to the controller.
|
||||||
|
- }
|
||||||
|
-
|
||||||
|
override def submit(alterIsrItem: AlterIsrItem): Boolean = {
|
||||||
|
debug(s"Writing new ISR ${alterIsrItem.leaderAndIsr.isr} to ZooKeeper with version " +
|
||||||
|
s"${alterIsrItem.leaderAndIsr.zkVersion} for partition ${alterIsrItem.topicPartition}")
|
||||||
|
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
|
||||||
|
index 5eedb63ae5..4dbd735753 100644
|
||||||
|
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
|
||||||
|
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
|
||||||
|
@@ -18,10 +18,10 @@
|
||||||
|
package kafka.controller
|
||||||
|
|
||||||
|
import java.util.Properties
|
||||||
|
-import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue}
|
||||||
|
-
|
||||||
|
+import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingQueue, TimeUnit}
|
||||||
|
import com.yammer.metrics.core.Timer
|
||||||
|
import kafka.api.{ApiVersion, KAFKA_2_6_IV0, KAFKA_2_7_IV0, LeaderAndIsr}
|
||||||
|
+import kafka.controller.KafkaController.AlterIsrCallback
|
||||||
|
import kafka.metrics.KafkaYammerMetrics
|
||||||
|
import kafka.server.{KafkaConfig, KafkaServer}
|
||||||
|
import kafka.utils.{LogCaptureAppender, TestUtils}
|
||||||
|
@@ -849,6 +849,67 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
|
||||||
|
latch.await()
|
||||||
|
}
|
||||||
|
|
||||||
|
+ @Test
|
||||||
|
+ def testAlterIsrErrors(): Unit = {
|
||||||
|
+ servers = makeServers(1)
|
||||||
|
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
|
||||||
|
+ val tp = new TopicPartition("t", 0)
|
||||||
|
+ val assignment = Map(tp.partition -> Seq(controllerId))
|
||||||
|
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
|
||||||
|
+ val controller = getController().kafkaController
|
||||||
|
+ var future = captureAlterIsrError(controllerId, controller.brokerEpoch - 1,
|
||||||
|
+ Map(tp -> LeaderAndIsr(controllerId, List(controllerId))))
|
||||||
|
+ var capturedError = future.get(5, TimeUnit.SECONDS)
|
||||||
|
+ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
|
||||||
|
+
|
||||||
|
+ future = captureAlterIsrError(99, controller.brokerEpoch,
|
||||||
|
+ Map(tp -> LeaderAndIsr(controllerId, List(controllerId))))
|
||||||
|
+ capturedError = future.get(5, TimeUnit.SECONDS)
|
||||||
|
+ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
|
||||||
|
+
|
||||||
|
+ val unknownTopicPartition = new TopicPartition("unknown", 99)
|
||||||
|
+ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
|
||||||
|
+ Map(unknownTopicPartition -> LeaderAndIsr(controllerId, List(controllerId))), unknownTopicPartition)
|
||||||
|
+ capturedError = future.get(5, TimeUnit.SECONDS)
|
||||||
|
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, capturedError)
|
||||||
|
+
|
||||||
|
+ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
|
||||||
|
+ Map(tp -> LeaderAndIsr(controllerId, 1, List(controllerId), 99)), tp)
|
||||||
|
+ capturedError = future.get(5, TimeUnit.SECONDS)
|
||||||
|
+ assertEquals(Errors.INVALID_UPDATE_VERSION, capturedError)
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ def captureAlterIsrError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr]): CompletableFuture[Errors] = {
|
||||||
|
+ val future = new CompletableFuture[Errors]()
|
||||||
|
+ val controller = getController().kafkaController
|
||||||
|
+ val callback: AlterIsrCallback = {
|
||||||
|
+ case Left(_: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
|
||||||
|
+ future.completeExceptionally(new AssertionError(s"Should have seen top-level error"))
|
||||||
|
+ case Right(error: Errors) =>
|
||||||
|
+ future.complete(error)
|
||||||
|
+ }
|
||||||
|
+ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback))
|
||||||
|
+ future
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ def captureAlterIsrPartitionError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], tp: TopicPartition): CompletableFuture[Errors] = {
|
||||||
|
+ val future = new CompletableFuture[Errors]()
|
||||||
|
+ val controller = getController().kafkaController
|
||||||
|
+ val callback: AlterIsrCallback = {
|
||||||
|
+ case Left(partitionResults: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
|
||||||
|
+ partitionResults.get(tp) match {
|
||||||
|
+ case Some(Left(error: Errors)) => future.complete(error)
|
||||||
|
+ case Some(Right(_: LeaderAndIsr)) => future.completeExceptionally(new AssertionError(s"Should have seen an error for $tp in result"))
|
||||||
|
+ case None => future.completeExceptionally(new AssertionError(s"Should have seen $tp in result"))
|
||||||
|
+ }
|
||||||
|
+ case Right(_: Errors) =>
|
||||||
|
+ future.completeExceptionally(new AssertionError(s"Should not seen top-level error"))
|
||||||
|
+ }
|
||||||
|
+ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback))
|
||||||
|
+ future
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+
|
||||||
|
@Test
|
||||||
|
def testTopicIdsAreAdded(): Unit = {
|
||||||
|
servers = makeServers(1)
|
||||||
|
diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
|
||||||
|
index 1074fd3157..1c8c81471f 100644
|
||||||
|
--- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
|
||||||
|
+++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
|
||||||
|
@@ -70,8 +70,10 @@ class AlterIsrManagerTest {
|
||||||
|
@Test
|
||||||
|
def testOverwriteWithinBatch(): Unit = {
|
||||||
|
val capture = EasyMock.newCapture[AbstractRequest.Builder[AlterIsrRequest]]()
|
||||||
|
+ val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]()
|
||||||
|
+
|
||||||
|
EasyMock.expect(brokerToController.start())
|
||||||
|
- EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.anyObject())).once()
|
||||||
|
+ EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.capture(callbackCapture))).times(2)
|
||||||
|
EasyMock.replay(brokerToController)
|
||||||
|
|
||||||
|
val scheduler = new MockScheduler(time)
|
||||||
|
@@ -81,11 +83,21 @@ class AlterIsrManagerTest {
|
||||||
|
// Only send one ISR update for a given topic+partition
|
||||||
|
assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)))
|
||||||
|
assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {}, 0)))
|
||||||
|
+
|
||||||
|
+ // Simulate response
|
||||||
|
+ val alterIsrResp = partitionResponse(tp0, Errors.NONE)
|
||||||
|
+ val resp = new ClientResponse(null, null, "", 0L, 0L,
|
||||||
|
+ false, null, null, alterIsrResp)
|
||||||
|
+ callbackCapture.getValue.onComplete(resp)
|
||||||
|
+
|
||||||
|
+ // Now we can submit this partition again
|
||||||
|
+ assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1), 10), _ => {}, 0)))
|
||||||
|
EasyMock.verify(brokerToController)
|
||||||
|
|
||||||
|
+ // Make sure we sent the right request ISR={1}
|
||||||
|
val request = capture.getValue.build()
|
||||||
|
assertEquals(request.data().topics().size(), 1)
|
||||||
|
- assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 3)
|
||||||
|
+ assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
|
||||||
|
index 43df2b97f4..8e52007bc7 100755
|
||||||
|
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
|
||||||
|
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
|
||||||
|
@@ -1106,10 +1106,6 @@ object TestUtils extends Logging {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
- override def clearPending(topicPartition: TopicPartition): Unit = {
|
||||||
|
- inFlight.set(false);
|
||||||
|
- }
|
||||||
|
-
|
||||||
|
def completeIsrUpdate(newZkVersion: Int): Unit = {
|
||||||
|
if (inFlight.compareAndSet(true, false)) {
|
||||||
|
val item = isrUpdates.head
|
||||||
@ -4,7 +4,7 @@
|
|||||||
|
|
||||||
Name: kafka
|
Name: kafka
|
||||||
Version: 2.8.2
|
Version: 2.8.2
|
||||||
Release: 12
|
Release: 13
|
||||||
Summary: A Distributed Streaming Platform.
|
Summary: A Distributed Streaming Platform.
|
||||||
|
|
||||||
License: Apache-2.0
|
License: Apache-2.0
|
||||||
@ -24,6 +24,7 @@ Patch8: 0009-format-RocksDBConfigSetter.patch
|
|||||||
Patch9: 0010-not-update-connection.patch
|
Patch9: 0010-not-update-connection.patch
|
||||||
Patch10: 0011-ConfigEntry.patch
|
Patch10: 0011-ConfigEntry.patch
|
||||||
Patch11: 0012-incorrectly-LeaderElectionCommand.patch
|
Patch11: 0012-incorrectly-LeaderElectionCommand.patch
|
||||||
|
Patch12: 0013-AlterIsr.patch
|
||||||
|
|
||||||
BuildRequires: systemd java-1.8.0-openjdk-devel
|
BuildRequires: systemd java-1.8.0-openjdk-devel
|
||||||
Provides: kafka = %{version}
|
Provides: kafka = %{version}
|
||||||
@ -75,6 +76,8 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses
|
|||||||
rm -rf %{buildroot}
|
rm -rf %{buildroot}
|
||||||
|
|
||||||
%changelog
|
%changelog
|
||||||
|
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-13
|
||||||
|
- AlterIsr and LeaderAndIsr race condition
|
||||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-12
|
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-12
|
||||||
- Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand
|
- Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand
|
||||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-11
|
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-11
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user