Compare commits
No commits in common. "e9be94c5669947ea3d5b53c649bd007bf46334da" and "91901a02a82f49668b8a091096d5135b3d3224b9" have entirely different histories.
e9be94c566
...
91901a02a8
@ -1,223 +0,0 @@
|
|||||||
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
|
|
||||||
index a58f4238ff..88b337311d 100755
|
|
||||||
--- a/core/src/main/scala/kafka/cluster/Partition.scala
|
|
||||||
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
|
|
||||||
@@ -581,9 +581,6 @@ class Partition(val topicPartition: TopicPartition,
|
|
||||||
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
|
|
||||||
zkVersion = partitionState.zkVersion
|
|
||||||
|
|
||||||
- // Clear any pending AlterIsr requests and check replica state
|
|
||||||
- alterIsrManager.clearPending(topicPartition)
|
|
||||||
-
|
|
||||||
// In the case of successive leader elections in a short time period, a follower may have
|
|
||||||
// entries in its log from a later epoch than any entry in the new leader's log. In order
|
|
||||||
// to ensure that these followers can truncate to the right offset, we must cache the new
|
|
||||||
@@ -661,9 +658,6 @@ class Partition(val topicPartition: TopicPartition,
|
|
||||||
leaderEpochStartOffsetOpt = None
|
|
||||||
zkVersion = partitionState.zkVersion
|
|
||||||
|
|
||||||
- // Since we might have been a leader previously, still clear any pending AlterIsr requests
|
|
||||||
- alterIsrManager.clearPending(topicPartition)
|
|
||||||
-
|
|
||||||
if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) {
|
|
||||||
false
|
|
||||||
} else {
|
|
||||||
@@ -1373,13 +1367,15 @@ class Partition(val topicPartition: TopicPartition,
|
|
||||||
isrState = proposedIsrState
|
|
||||||
|
|
||||||
if (!alterIsrManager.submit(alterIsrItem)) {
|
|
||||||
- // If the ISR manager did not accept our update, we need to revert back to previous state
|
|
||||||
+ // If the ISR manager did not accept our update, we need to revert the proposed state.
|
|
||||||
+ // This can happen if the ISR state was updated by the controller (via LeaderAndIsr in ZK-mode or
|
|
||||||
+ // ChangePartitionRecord in KRaft mode) but we have an AlterIsr request still in-flight.
|
|
||||||
isrState = oldState
|
|
||||||
isrChangeListener.markFailed()
|
|
||||||
- throw new IllegalStateException(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
|
|
||||||
+ warn(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
|
|
||||||
+ } else {
|
|
||||||
+ debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
|
|
||||||
}
|
|
||||||
-
|
|
||||||
- debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala
|
|
||||||
index 9ad734f708..1059a3df3e 100644
|
|
||||||
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
|
|
||||||
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
|
|
||||||
@@ -49,8 +49,6 @@ trait AlterIsrManager {
|
|
||||||
def shutdown(): Unit = {}
|
|
||||||
|
|
||||||
def submit(alterIsrItem: AlterIsrItem): Boolean
|
|
||||||
-
|
|
||||||
- def clearPending(topicPartition: TopicPartition): Unit
|
|
||||||
}
|
|
||||||
|
|
||||||
case class AlterIsrItem(topicPartition: TopicPartition,
|
|
||||||
@@ -134,9 +132,6 @@ class DefaultAlterIsrManager(
|
|
||||||
enqueued
|
|
||||||
}
|
|
||||||
|
|
||||||
- override def clearPending(topicPartition: TopicPartition): Unit = {
|
|
||||||
- unsentIsrUpdates.remove(topicPartition)
|
|
||||||
- }
|
|
||||||
|
|
||||||
private[server] def maybePropagateIsrChanges(): Unit = {
|
|
||||||
// Send all pending items if there is not already a request in-flight.
|
|
||||||
diff --git a/core/src/main/scala/kafka/server/ZkIsrManager.scala b/core/src/main/scala/kafka/server/ZkIsrManager.scala
|
|
||||||
index 2d88aac6b4..8dffcdf307 100644
|
|
||||||
--- a/core/src/main/scala/kafka/server/ZkIsrManager.scala
|
|
||||||
+++ b/core/src/main/scala/kafka/server/ZkIsrManager.scala
|
|
||||||
@@ -55,12 +55,6 @@ class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) ex
|
|
||||||
period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS)
|
|
||||||
}
|
|
||||||
|
|
||||||
- override def clearPending(topicPartition: TopicPartition): Unit = {
|
|
||||||
- // Since we always immediately process ZK updates and never actually enqueue anything, there is nothing to
|
|
||||||
- // clear here so this is a no-op. Even if there are changes that have not been propagated, the write to ZK
|
|
||||||
- // has already happened, so we may as well send the notification to the controller.
|
|
||||||
- }
|
|
||||||
-
|
|
||||||
override def submit(alterIsrItem: AlterIsrItem): Boolean = {
|
|
||||||
debug(s"Writing new ISR ${alterIsrItem.leaderAndIsr.isr} to ZooKeeper with version " +
|
|
||||||
s"${alterIsrItem.leaderAndIsr.zkVersion} for partition ${alterIsrItem.topicPartition}")
|
|
||||||
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
|
|
||||||
index 5eedb63ae5..4dbd735753 100644
|
|
||||||
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
|
|
||||||
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
|
|
||||||
@@ -18,10 +18,10 @@
|
|
||||||
package kafka.controller
|
|
||||||
|
|
||||||
import java.util.Properties
|
|
||||||
-import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue}
|
|
||||||
-
|
|
||||||
+import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingQueue, TimeUnit}
|
|
||||||
import com.yammer.metrics.core.Timer
|
|
||||||
import kafka.api.{ApiVersion, KAFKA_2_6_IV0, KAFKA_2_7_IV0, LeaderAndIsr}
|
|
||||||
+import kafka.controller.KafkaController.AlterIsrCallback
|
|
||||||
import kafka.metrics.KafkaYammerMetrics
|
|
||||||
import kafka.server.{KafkaConfig, KafkaServer}
|
|
||||||
import kafka.utils.{LogCaptureAppender, TestUtils}
|
|
||||||
@@ -849,6 +849,67 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
|
|
||||||
latch.await()
|
|
||||||
}
|
|
||||||
|
|
||||||
+ @Test
|
|
||||||
+ def testAlterIsrErrors(): Unit = {
|
|
||||||
+ servers = makeServers(1)
|
|
||||||
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
|
|
||||||
+ val tp = new TopicPartition("t", 0)
|
|
||||||
+ val assignment = Map(tp.partition -> Seq(controllerId))
|
|
||||||
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
|
|
||||||
+ val controller = getController().kafkaController
|
|
||||||
+ var future = captureAlterIsrError(controllerId, controller.brokerEpoch - 1,
|
|
||||||
+ Map(tp -> LeaderAndIsr(controllerId, List(controllerId))))
|
|
||||||
+ var capturedError = future.get(5, TimeUnit.SECONDS)
|
|
||||||
+ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
|
|
||||||
+
|
|
||||||
+ future = captureAlterIsrError(99, controller.brokerEpoch,
|
|
||||||
+ Map(tp -> LeaderAndIsr(controllerId, List(controllerId))))
|
|
||||||
+ capturedError = future.get(5, TimeUnit.SECONDS)
|
|
||||||
+ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
|
|
||||||
+
|
|
||||||
+ val unknownTopicPartition = new TopicPartition("unknown", 99)
|
|
||||||
+ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
|
|
||||||
+ Map(unknownTopicPartition -> LeaderAndIsr(controllerId, List(controllerId))), unknownTopicPartition)
|
|
||||||
+ capturedError = future.get(5, TimeUnit.SECONDS)
|
|
||||||
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, capturedError)
|
|
||||||
+
|
|
||||||
+ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
|
|
||||||
+ Map(tp -> LeaderAndIsr(controllerId, 1, List(controllerId), 99)), tp)
|
|
||||||
+ capturedError = future.get(5, TimeUnit.SECONDS)
|
|
||||||
+ assertEquals(Errors.INVALID_UPDATE_VERSION, capturedError)
|
|
||||||
+ }
|
|
||||||
+
|
|
||||||
+ def captureAlterIsrError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr]): CompletableFuture[Errors] = {
|
|
||||||
+ val future = new CompletableFuture[Errors]()
|
|
||||||
+ val controller = getController().kafkaController
|
|
||||||
+ val callback: AlterIsrCallback = {
|
|
||||||
+ case Left(_: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
|
|
||||||
+ future.completeExceptionally(new AssertionError(s"Should have seen top-level error"))
|
|
||||||
+ case Right(error: Errors) =>
|
|
||||||
+ future.complete(error)
|
|
||||||
+ }
|
|
||||||
+ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback))
|
|
||||||
+ future
|
|
||||||
+ }
|
|
||||||
+
|
|
||||||
+ def captureAlterIsrPartitionError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], tp: TopicPartition): CompletableFuture[Errors] = {
|
|
||||||
+ val future = new CompletableFuture[Errors]()
|
|
||||||
+ val controller = getController().kafkaController
|
|
||||||
+ val callback: AlterIsrCallback = {
|
|
||||||
+ case Left(partitionResults: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
|
|
||||||
+ partitionResults.get(tp) match {
|
|
||||||
+ case Some(Left(error: Errors)) => future.complete(error)
|
|
||||||
+ case Some(Right(_: LeaderAndIsr)) => future.completeExceptionally(new AssertionError(s"Should have seen an error for $tp in result"))
|
|
||||||
+ case None => future.completeExceptionally(new AssertionError(s"Should have seen $tp in result"))
|
|
||||||
+ }
|
|
||||||
+ case Right(_: Errors) =>
|
|
||||||
+ future.completeExceptionally(new AssertionError(s"Should not seen top-level error"))
|
|
||||||
+ }
|
|
||||||
+ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback))
|
|
||||||
+ future
|
|
||||||
+ }
|
|
||||||
+
|
|
||||||
+
|
|
||||||
@Test
|
|
||||||
def testTopicIdsAreAdded(): Unit = {
|
|
||||||
servers = makeServers(1)
|
|
||||||
diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
|
|
||||||
index 1074fd3157..1c8c81471f 100644
|
|
||||||
--- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
|
|
||||||
+++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
|
|
||||||
@@ -70,8 +70,10 @@ class AlterIsrManagerTest {
|
|
||||||
@Test
|
|
||||||
def testOverwriteWithinBatch(): Unit = {
|
|
||||||
val capture = EasyMock.newCapture[AbstractRequest.Builder[AlterIsrRequest]]()
|
|
||||||
+ val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]()
|
|
||||||
+
|
|
||||||
EasyMock.expect(brokerToController.start())
|
|
||||||
- EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.anyObject())).once()
|
|
||||||
+ EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.capture(callbackCapture))).times(2)
|
|
||||||
EasyMock.replay(brokerToController)
|
|
||||||
|
|
||||||
val scheduler = new MockScheduler(time)
|
|
||||||
@@ -81,11 +83,21 @@ class AlterIsrManagerTest {
|
|
||||||
// Only send one ISR update for a given topic+partition
|
|
||||||
assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)))
|
|
||||||
assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {}, 0)))
|
|
||||||
+
|
|
||||||
+ // Simulate response
|
|
||||||
+ val alterIsrResp = partitionResponse(tp0, Errors.NONE)
|
|
||||||
+ val resp = new ClientResponse(null, null, "", 0L, 0L,
|
|
||||||
+ false, null, null, alterIsrResp)
|
|
||||||
+ callbackCapture.getValue.onComplete(resp)
|
|
||||||
+
|
|
||||||
+ // Now we can submit this partition again
|
|
||||||
+ assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1), 10), _ => {}, 0)))
|
|
||||||
EasyMock.verify(brokerToController)
|
|
||||||
|
|
||||||
+ // Make sure we sent the right request ISR={1}
|
|
||||||
val request = capture.getValue.build()
|
|
||||||
assertEquals(request.data().topics().size(), 1)
|
|
||||||
- assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 3)
|
|
||||||
+ assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
|
|
||||||
index 43df2b97f4..8e52007bc7 100755
|
|
||||||
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
|
|
||||||
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
|
|
||||||
@@ -1106,10 +1106,6 @@ object TestUtils extends Logging {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
- override def clearPending(topicPartition: TopicPartition): Unit = {
|
|
||||||
- inFlight.set(false);
|
|
||||||
- }
|
|
||||||
-
|
|
||||||
def completeIsrUpdate(newZkVersion: Int): Unit = {
|
|
||||||
if (inFlight.compareAndSet(true, false)) {
|
|
||||||
val item = isrUpdates.head
|
|
||||||
@ -1,16 +0,0 @@
|
|||||||
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
|
|
||||||
index 58a9ce3d5a..daa5a0bf70 100644
|
|
||||||
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
|
|
||||||
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
|
|
||||||
@@ -521,6 +521,11 @@ public class WorkerConfig extends AbstractConfig {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
+
|
|
||||||
+ @Override
|
|
||||||
+ public String toString() {
|
|
||||||
+ return "List of comma-separated URIs, ex: http://localhost:8080,https://localhost:8443.";
|
|
||||||
+ }
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
|
|
||||||
@ -1,103 +0,0 @@
|
|||||||
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
|
|
||||||
index 5648e8f0a3..24e9e21ad7 100644
|
|
||||||
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
|
|
||||||
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
|
|
||||||
@@ -121,7 +121,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
|
|
||||||
|
|
||||||
final long timestamp = context().timestamp();
|
|
||||||
observedStreamTime = Math.max(observedStreamTime, timestamp);
|
|
||||||
- final long closeTime = observedStreamTime - windows.gracePeriodMs();
|
|
||||||
+ final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap();
|
|
||||||
|
|
||||||
final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
|
|
||||||
final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
|
|
||||||
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
|
|
||||||
index ab2adbfbb1..244ea9f4fe 100644
|
|
||||||
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
|
|
||||||
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
|
|
||||||
@@ -441,9 +441,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
|
||||||
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
|
|
||||||
processor.process("OnTime1", "1");
|
|
||||||
|
|
||||||
- // dummy record to advance stream time = 1
|
|
||||||
- context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
|
|
||||||
- processor.process("dummy", "dummy");
|
|
||||||
+ // dummy record to advance stream time = 11, 10 for gap time plus 1 to place outside window
|
|
||||||
+ context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders()));
|
|
||||||
+ processor.process("dummy", "dummy");
|
|
||||||
|
|
||||||
try (final LogCaptureAppender appender =
|
|
||||||
LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) {
|
|
||||||
@@ -455,7 +455,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
|
||||||
assertThat(
|
|
||||||
appender.getMessages(),
|
|
||||||
hasItem("Skipping record for expired window." +
|
|
||||||
- " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]")
|
|
||||||
+ " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]")
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -542,17 +542,17 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
|
||||||
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
|
|
||||||
processor.process("OnTime1", "1");
|
|
||||||
|
|
||||||
- // dummy record to advance stream time = 1
|
|
||||||
- context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
|
|
||||||
- processor.process("dummy", "dummy");
|
|
||||||
+ // dummy record to advance stream time = 11, 10 for gap time plus 1 to place at edge of window
|
|
||||||
+ context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders()));
|
|
||||||
+ processor.process("dummy", "dummy");
|
|
||||||
|
|
||||||
// delayed record arrives on time, should not be skipped
|
|
||||||
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
|
|
||||||
processor.process("OnTime2", "1");
|
|
||||||
|
|
||||||
- // dummy record to advance stream time = 2
|
|
||||||
- context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null));
|
|
||||||
- processor.process("dummy", "dummy");
|
|
||||||
+ // dummy record to advance stream time = 12, 10 for gap time plus 2 to place outside window
|
|
||||||
+ context.setRecordContext(new ProcessorRecordContext(12, -2, -3, "topic", new RecordHeaders()));
|
|
||||||
+ processor.process("dummy", "dummy");
|
|
||||||
|
|
||||||
// delayed record arrives late
|
|
||||||
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
|
|
||||||
@@ -561,7 +561,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
|
||||||
assertThat(
|
|
||||||
appender.getMessages(),
|
|
||||||
hasItem("Skipping record for expired window." +
|
|
||||||
- " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]")
|
|
||||||
+ " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]")
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
|
|
||||||
index 46a8ab8dcf..e0b7957e01 100644
|
|
||||||
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
|
|
||||||
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
|
|
||||||
@@ -581,7 +581,7 @@ public class SuppressScenarioTest {
|
|
||||||
// arbitrarily disordered records are admitted, because the *window* is not closed until stream-time > window-end + grace
|
|
||||||
inputTopic.pipeInput("k1", "v1", 1L);
|
|
||||||
// any record in the same partition advances stream time (note the key is different)
|
|
||||||
- inputTopic.pipeInput("k2", "v1", 6L);
|
|
||||||
+ inputTopic.pipeInput("k2", "v1", 11L);
|
|
||||||
// late event for first window - this should get dropped from all streams, since the first window is now closed.
|
|
||||||
inputTopic.pipeInput("k1", "v1", 5L);
|
|
||||||
// just pushing stream time forward to flush the other events through.
|
|
||||||
@@ -594,7 +594,7 @@ public class SuppressScenarioTest {
|
|
||||||
new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L),
|
|
||||||
new KeyValueTimestamp<>("[k1@0/5]", null, 5L),
|
|
||||||
new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
|
|
||||||
- new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L),
|
|
||||||
+ new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L),
|
|
||||||
new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
@@ -602,7 +602,7 @@ public class SuppressScenarioTest {
|
|
||||||
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
|
|
||||||
asList(
|
|
||||||
new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
|
|
||||||
- new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L)
|
|
||||||
+ new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
@ -1,42 +0,0 @@
|
|||||||
diff --git a/build.gradle b/build.gradle
|
|
||||||
index 5e8b82237a..0a181019cb 100644
|
|
||||||
--- a/build.gradle
|
|
||||||
+++ b/build.gradle
|
|
||||||
@@ -2296,4 +2296,37 @@ task aggregatedJavadoc(type: Javadoc, dependsOn: compileJava) {
|
|
||||||
options.links "https://docs.oracle.com/en/java/javase/${JavaVersion.current().majorVersion}/docs/api/"
|
|
||||||
else
|
|
||||||
options.links "https://docs.oracle.com/javase/8/docs/api/"
|
|
||||||
+ // TODO: remove this snippet once JDK >11 is used or https://bugs.openjdk.java.net/browse/JDK-8215291 is backported to JDK11
|
|
||||||
+ // Patch to include `getURLPrefix` from JDK 12 +
|
|
||||||
+ // NOTICE: This code was copied from original ORACLE search.js file present in JDK 12 and newer
|
|
||||||
+ final SEARCH_PATCH_MODULE_LESS_AWARE = "\n\n" +
|
|
||||||
+ "// Fix for moudle-less aware search\n" +
|
|
||||||
+ "function getURLPrefix(ui) {\n" +
|
|
||||||
+ " var urlPrefix=\"\";\n" +
|
|
||||||
+ " var slash = \"/\";\n" +
|
|
||||||
+ " if (ui.item.category === catModules) {\n" +
|
|
||||||
+ " return ui.item.l + slash;\n" +
|
|
||||||
+ " } else if (ui.item.category === catPackages && ui.item.m) {\n" +
|
|
||||||
+ " return ui.item.m + slash;\n" +
|
|
||||||
+ " } else if (ui.item.category === catTypes || ui.item.category === catMembers) {\n" +
|
|
||||||
+ " if (ui.item.m) {\n" +
|
|
||||||
+ " urlPrefix = ui.item.m + slash;\n" +
|
|
||||||
+ " } else {\n" +
|
|
||||||
+ " \$.each(packageSearchIndex, function(index, item) {\n" +
|
|
||||||
+ " if (item.m && ui.item.p === item.l) {\n" +
|
|
||||||
+ " urlPrefix = item.m + slash;\n" +
|
|
||||||
+ " }\n" +
|
|
||||||
+ " });\n" +
|
|
||||||
+ " }\n" +
|
|
||||||
+ " }\n" +
|
|
||||||
+ " return urlPrefix;\n" +
|
|
||||||
+ "}"
|
|
||||||
+
|
|
||||||
+ // When all the JavaDoc is generated we proceed to patch the search.js file
|
|
||||||
+ doLast {
|
|
||||||
+ def searchJsFile = new File(destinationDir.getAbsolutePath() + '/search.js')
|
|
||||||
+ // Append the patch to the file. By being defined at a later position, JS will execute that definition instead of
|
|
||||||
+ // the one provided by default (higher up in the file).
|
|
||||||
+ searchJsFile.append SEARCH_PATCH_MODULE_LESS_AWARE
|
|
||||||
+ }
|
|
||||||
}
|
|
||||||
@ -1,74 +0,0 @@
|
|||||||
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
|
|
||||||
index 7a8a13c6e7..177b460d38 100644
|
|
||||||
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
|
|
||||||
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
|
|
||||||
@@ -840,7 +840,10 @@ private[log] class Cleaner(val id: Int,
|
|
||||||
logSize + segs.head.size <= maxSize &&
|
|
||||||
indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize &&
|
|
||||||
timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
|
|
||||||
- lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) {
|
|
||||||
+ //if first segment size is 0, we don't need to do the index offset range check.
|
|
||||||
+ //this will avoid empty log left every 2^31 message.
|
|
||||||
+ (segs.head.size == 0 ||
|
|
||||||
+ lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue)) {
|
|
||||||
group = segs.head :: group
|
|
||||||
logSize += segs.head.size
|
|
||||||
indexSize += segs.head.offsetIndex.sizeInBytes
|
|
||||||
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
|
|
||||||
index 43bc3b9f28..e5984c4f31 100755
|
|
||||||
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
|
|
||||||
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
|
|
||||||
@@ -1258,6 +1258,53 @@ class LogCleanerTest {
|
|
||||||
"All but the last group should be the target size.")
|
|
||||||
}
|
|
||||||
|
|
||||||
+ @Test
|
|
||||||
+ def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={
|
|
||||||
+ val cleaner = makeCleaner(Int.MaxValue)
|
|
||||||
+ val logProps = new Properties()
|
|
||||||
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
|
|
||||||
+
|
|
||||||
+ val k="key".getBytes()
|
|
||||||
+ val v="val".getBytes()
|
|
||||||
+
|
|
||||||
+ //create 3 segments
|
|
||||||
+ for(i <- 0 until 3){
|
|
||||||
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
|
|
||||||
+ //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment
|
|
||||||
+ val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
|
|
||||||
+ log.appendAsFollower(records)
|
|
||||||
+ assertEquals(i + 1, log.numberOfSegments)
|
|
||||||
+ }
|
|
||||||
+
|
|
||||||
+ //4th active segment, not clean
|
|
||||||
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
|
|
||||||
+
|
|
||||||
+ val totalSegments = 4
|
|
||||||
+ //last segment not cleanable
|
|
||||||
+ val firstUncleanableOffset = log.logEndOffset - 1
|
|
||||||
+ val notCleanableSegments = 1
|
|
||||||
+
|
|
||||||
+ assertEquals(totalSegments, log.numberOfSegments)
|
|
||||||
+ var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
|
|
||||||
+ //because index file uses 4 byte relative index offset and current segments all none empty,
|
|
||||||
+ //segments will not group even their size is very small.
|
|
||||||
+ assertEquals(totalSegments - notCleanableSegments, groups.size)
|
|
||||||
+ //do clean to clean first 2 segments to empty
|
|
||||||
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
|
|
||||||
+ assertEquals(totalSegments, log.numberOfSegments)
|
|
||||||
+ assertEquals(0, log.logSegments.head.size)
|
|
||||||
+
|
|
||||||
+ //after clean we got 2 empty segment, they will group together this time
|
|
||||||
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
|
|
||||||
+ val noneEmptySegment = 1
|
|
||||||
+ assertEquals(noneEmptySegment + 1, groups.size)
|
|
||||||
+
|
|
||||||
+ //trigger a clean and 2 empty segments should cleaned to 1
|
|
||||||
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
|
|
||||||
+ assertEquals(totalSegments - 1, log.numberOfSegments)
|
|
||||||
+ }
|
|
||||||
+
|
|
||||||
+
|
|
||||||
/**
|
|
||||||
* Validate the logic for grouping log segments together for cleaning when only a small number of
|
|
||||||
* messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not
|
|
||||||
17
kafka.spec
17
kafka.spec
@ -4,7 +4,7 @@
|
|||||||
|
|
||||||
Name: kafka
|
Name: kafka
|
||||||
Version: 2.8.2
|
Version: 2.8.2
|
||||||
Release: 17
|
Release: 12
|
||||||
Summary: A Distributed Streaming Platform.
|
Summary: A Distributed Streaming Platform.
|
||||||
|
|
||||||
License: Apache-2.0
|
License: Apache-2.0
|
||||||
@ -24,11 +24,6 @@ Patch8: 0009-format-RocksDBConfigSetter.patch
|
|||||||
Patch9: 0010-not-update-connection.patch
|
Patch9: 0010-not-update-connection.patch
|
||||||
Patch10: 0011-ConfigEntry.patch
|
Patch10: 0011-ConfigEntry.patch
|
||||||
Patch11: 0012-incorrectly-LeaderElectionCommand.patch
|
Patch11: 0012-incorrectly-LeaderElectionCommand.patch
|
||||||
Patch12: 0013-AlterIsr.patch
|
|
||||||
Patch13: 0014-override-toString.patch
|
|
||||||
Patch14: 0015-SessionWindows-closed-early.patch
|
|
||||||
Patch15: 0016-non-existent-URL.patch
|
|
||||||
Patch16: 0017-fix-log-clean.patch
|
|
||||||
|
|
||||||
BuildRequires: systemd java-1.8.0-openjdk-devel
|
BuildRequires: systemd java-1.8.0-openjdk-devel
|
||||||
Provides: kafka = %{version}
|
Provides: kafka = %{version}
|
||||||
@ -80,16 +75,6 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses
|
|||||||
rm -rf %{buildroot}
|
rm -rf %{buildroot}
|
||||||
|
|
||||||
%changelog
|
%changelog
|
||||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-17
|
|
||||||
- log clean relative index range check of group consider empty log segment to avoid too many empty log segment left
|
|
||||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-16
|
|
||||||
- Javadocs search sends you to a non-existent URL
|
|
||||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-15
|
|
||||||
- SessionWindows are closed too early
|
|
||||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-14
|
|
||||||
- override toString method to show correct value in doc
|
|
||||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-13
|
|
||||||
- AlterIsr and LeaderAndIsr race condition
|
|
||||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-12
|
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-12
|
||||||
- Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand
|
- Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand
|
||||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-11
|
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-11
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user