Compare commits
18 Commits
91901a02a8
...
e9be94c566
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9be94c566 | ||
|
|
0159004f7d | ||
|
|
433e5a8631 | ||
|
|
1253478f93 | ||
|
|
74842eb6db | ||
|
|
d8872eb8e2 | ||
|
|
ea0e4fe3aa | ||
|
|
f157f4693f | ||
|
|
a51319836f | ||
|
|
61fe48ab84 | ||
|
|
c06ac6e7b7 | ||
|
|
a0e33e9d4b | ||
|
|
2bffa809d2 | ||
|
|
e3bce27258 | ||
|
|
026ed38997 | ||
|
|
e21e1d8199 | ||
|
|
9a3bdfcb54 | ||
|
|
48aaa7caa0 |
223
0013-AlterIsr.patch
Normal file
223
0013-AlterIsr.patch
Normal file
@ -0,0 +1,223 @@
|
||||
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
|
||||
index a58f4238ff..88b337311d 100755
|
||||
--- a/core/src/main/scala/kafka/cluster/Partition.scala
|
||||
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
|
||||
@@ -581,9 +581,6 @@ class Partition(val topicPartition: TopicPartition,
|
||||
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
|
||||
zkVersion = partitionState.zkVersion
|
||||
|
||||
- // Clear any pending AlterIsr requests and check replica state
|
||||
- alterIsrManager.clearPending(topicPartition)
|
||||
-
|
||||
// In the case of successive leader elections in a short time period, a follower may have
|
||||
// entries in its log from a later epoch than any entry in the new leader's log. In order
|
||||
// to ensure that these followers can truncate to the right offset, we must cache the new
|
||||
@@ -661,9 +658,6 @@ class Partition(val topicPartition: TopicPartition,
|
||||
leaderEpochStartOffsetOpt = None
|
||||
zkVersion = partitionState.zkVersion
|
||||
|
||||
- // Since we might have been a leader previously, still clear any pending AlterIsr requests
|
||||
- alterIsrManager.clearPending(topicPartition)
|
||||
-
|
||||
if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) {
|
||||
false
|
||||
} else {
|
||||
@@ -1373,13 +1367,15 @@ class Partition(val topicPartition: TopicPartition,
|
||||
isrState = proposedIsrState
|
||||
|
||||
if (!alterIsrManager.submit(alterIsrItem)) {
|
||||
- // If the ISR manager did not accept our update, we need to revert back to previous state
|
||||
+ // If the ISR manager did not accept our update, we need to revert the proposed state.
|
||||
+ // This can happen if the ISR state was updated by the controller (via LeaderAndIsr in ZK-mode or
|
||||
+ // ChangePartitionRecord in KRaft mode) but we have an AlterIsr request still in-flight.
|
||||
isrState = oldState
|
||||
isrChangeListener.markFailed()
|
||||
- throw new IllegalStateException(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
|
||||
+ warn(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
|
||||
+ } else {
|
||||
+ debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
|
||||
}
|
||||
-
|
||||
- debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
|
||||
}
|
||||
|
||||
/**
|
||||
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala
|
||||
index 9ad734f708..1059a3df3e 100644
|
||||
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
|
||||
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
|
||||
@@ -49,8 +49,6 @@ trait AlterIsrManager {
|
||||
def shutdown(): Unit = {}
|
||||
|
||||
def submit(alterIsrItem: AlterIsrItem): Boolean
|
||||
-
|
||||
- def clearPending(topicPartition: TopicPartition): Unit
|
||||
}
|
||||
|
||||
case class AlterIsrItem(topicPartition: TopicPartition,
|
||||
@@ -134,9 +132,6 @@ class DefaultAlterIsrManager(
|
||||
enqueued
|
||||
}
|
||||
|
||||
- override def clearPending(topicPartition: TopicPartition): Unit = {
|
||||
- unsentIsrUpdates.remove(topicPartition)
|
||||
- }
|
||||
|
||||
private[server] def maybePropagateIsrChanges(): Unit = {
|
||||
// Send all pending items if there is not already a request in-flight.
|
||||
diff --git a/core/src/main/scala/kafka/server/ZkIsrManager.scala b/core/src/main/scala/kafka/server/ZkIsrManager.scala
|
||||
index 2d88aac6b4..8dffcdf307 100644
|
||||
--- a/core/src/main/scala/kafka/server/ZkIsrManager.scala
|
||||
+++ b/core/src/main/scala/kafka/server/ZkIsrManager.scala
|
||||
@@ -55,12 +55,6 @@ class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) ex
|
||||
period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS)
|
||||
}
|
||||
|
||||
- override def clearPending(topicPartition: TopicPartition): Unit = {
|
||||
- // Since we always immediately process ZK updates and never actually enqueue anything, there is nothing to
|
||||
- // clear here so this is a no-op. Even if there are changes that have not been propagated, the write to ZK
|
||||
- // has already happened, so we may as well send the notification to the controller.
|
||||
- }
|
||||
-
|
||||
override def submit(alterIsrItem: AlterIsrItem): Boolean = {
|
||||
debug(s"Writing new ISR ${alterIsrItem.leaderAndIsr.isr} to ZooKeeper with version " +
|
||||
s"${alterIsrItem.leaderAndIsr.zkVersion} for partition ${alterIsrItem.topicPartition}")
|
||||
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
|
||||
index 5eedb63ae5..4dbd735753 100644
|
||||
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
|
||||
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
|
||||
@@ -18,10 +18,10 @@
|
||||
package kafka.controller
|
||||
|
||||
import java.util.Properties
|
||||
-import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue}
|
||||
-
|
||||
+import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingQueue, TimeUnit}
|
||||
import com.yammer.metrics.core.Timer
|
||||
import kafka.api.{ApiVersion, KAFKA_2_6_IV0, KAFKA_2_7_IV0, LeaderAndIsr}
|
||||
+import kafka.controller.KafkaController.AlterIsrCallback
|
||||
import kafka.metrics.KafkaYammerMetrics
|
||||
import kafka.server.{KafkaConfig, KafkaServer}
|
||||
import kafka.utils.{LogCaptureAppender, TestUtils}
|
||||
@@ -849,6 +849,67 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
|
||||
latch.await()
|
||||
}
|
||||
|
||||
+ @Test
|
||||
+ def testAlterIsrErrors(): Unit = {
|
||||
+ servers = makeServers(1)
|
||||
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
|
||||
+ val tp = new TopicPartition("t", 0)
|
||||
+ val assignment = Map(tp.partition -> Seq(controllerId))
|
||||
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
|
||||
+ val controller = getController().kafkaController
|
||||
+ var future = captureAlterIsrError(controllerId, controller.brokerEpoch - 1,
|
||||
+ Map(tp -> LeaderAndIsr(controllerId, List(controllerId))))
|
||||
+ var capturedError = future.get(5, TimeUnit.SECONDS)
|
||||
+ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
|
||||
+
|
||||
+ future = captureAlterIsrError(99, controller.brokerEpoch,
|
||||
+ Map(tp -> LeaderAndIsr(controllerId, List(controllerId))))
|
||||
+ capturedError = future.get(5, TimeUnit.SECONDS)
|
||||
+ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
|
||||
+
|
||||
+ val unknownTopicPartition = new TopicPartition("unknown", 99)
|
||||
+ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
|
||||
+ Map(unknownTopicPartition -> LeaderAndIsr(controllerId, List(controllerId))), unknownTopicPartition)
|
||||
+ capturedError = future.get(5, TimeUnit.SECONDS)
|
||||
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, capturedError)
|
||||
+
|
||||
+ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
|
||||
+ Map(tp -> LeaderAndIsr(controllerId, 1, List(controllerId), 99)), tp)
|
||||
+ capturedError = future.get(5, TimeUnit.SECONDS)
|
||||
+ assertEquals(Errors.INVALID_UPDATE_VERSION, capturedError)
|
||||
+ }
|
||||
+
|
||||
+ def captureAlterIsrError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr]): CompletableFuture[Errors] = {
|
||||
+ val future = new CompletableFuture[Errors]()
|
||||
+ val controller = getController().kafkaController
|
||||
+ val callback: AlterIsrCallback = {
|
||||
+ case Left(_: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
|
||||
+ future.completeExceptionally(new AssertionError(s"Should have seen top-level error"))
|
||||
+ case Right(error: Errors) =>
|
||||
+ future.complete(error)
|
||||
+ }
|
||||
+ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback))
|
||||
+ future
|
||||
+ }
|
||||
+
|
||||
+ def captureAlterIsrPartitionError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], tp: TopicPartition): CompletableFuture[Errors] = {
|
||||
+ val future = new CompletableFuture[Errors]()
|
||||
+ val controller = getController().kafkaController
|
||||
+ val callback: AlterIsrCallback = {
|
||||
+ case Left(partitionResults: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
|
||||
+ partitionResults.get(tp) match {
|
||||
+ case Some(Left(error: Errors)) => future.complete(error)
|
||||
+ case Some(Right(_: LeaderAndIsr)) => future.completeExceptionally(new AssertionError(s"Should have seen an error for $tp in result"))
|
||||
+ case None => future.completeExceptionally(new AssertionError(s"Should have seen $tp in result"))
|
||||
+ }
|
||||
+ case Right(_: Errors) =>
|
||||
+ future.completeExceptionally(new AssertionError(s"Should not seen top-level error"))
|
||||
+ }
|
||||
+ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback))
|
||||
+ future
|
||||
+ }
|
||||
+
|
||||
+
|
||||
@Test
|
||||
def testTopicIdsAreAdded(): Unit = {
|
||||
servers = makeServers(1)
|
||||
diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
|
||||
index 1074fd3157..1c8c81471f 100644
|
||||
--- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
|
||||
+++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
|
||||
@@ -70,8 +70,10 @@ class AlterIsrManagerTest {
|
||||
@Test
|
||||
def testOverwriteWithinBatch(): Unit = {
|
||||
val capture = EasyMock.newCapture[AbstractRequest.Builder[AlterIsrRequest]]()
|
||||
+ val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]()
|
||||
+
|
||||
EasyMock.expect(brokerToController.start())
|
||||
- EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.anyObject())).once()
|
||||
+ EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.capture(callbackCapture))).times(2)
|
||||
EasyMock.replay(brokerToController)
|
||||
|
||||
val scheduler = new MockScheduler(time)
|
||||
@@ -81,11 +83,21 @@ class AlterIsrManagerTest {
|
||||
// Only send one ISR update for a given topic+partition
|
||||
assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)))
|
||||
assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {}, 0)))
|
||||
+
|
||||
+ // Simulate response
|
||||
+ val alterIsrResp = partitionResponse(tp0, Errors.NONE)
|
||||
+ val resp = new ClientResponse(null, null, "", 0L, 0L,
|
||||
+ false, null, null, alterIsrResp)
|
||||
+ callbackCapture.getValue.onComplete(resp)
|
||||
+
|
||||
+ // Now we can submit this partition again
|
||||
+ assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1), 10), _ => {}, 0)))
|
||||
EasyMock.verify(brokerToController)
|
||||
|
||||
+ // Make sure we sent the right request ISR={1}
|
||||
val request = capture.getValue.build()
|
||||
assertEquals(request.data().topics().size(), 1)
|
||||
- assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 3)
|
||||
+ assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 1)
|
||||
}
|
||||
|
||||
@Test
|
||||
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
|
||||
index 43df2b97f4..8e52007bc7 100755
|
||||
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
|
||||
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
|
||||
@@ -1106,10 +1106,6 @@ object TestUtils extends Logging {
|
||||
}
|
||||
}
|
||||
|
||||
- override def clearPending(topicPartition: TopicPartition): Unit = {
|
||||
- inFlight.set(false);
|
||||
- }
|
||||
-
|
||||
def completeIsrUpdate(newZkVersion: Int): Unit = {
|
||||
if (inFlight.compareAndSet(true, false)) {
|
||||
val item = isrUpdates.head
|
||||
16
0014-override-toString.patch
Normal file
16
0014-override-toString.patch
Normal file
@ -0,0 +1,16 @@
|
||||
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
|
||||
index 58a9ce3d5a..daa5a0bf70 100644
|
||||
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
|
||||
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
|
||||
@@ -521,6 +521,11 @@ public class WorkerConfig extends AbstractConfig {
|
||||
}
|
||||
}
|
||||
}
|
||||
+
|
||||
+ @Override
|
||||
+ public String toString() {
|
||||
+ return "List of comma-separated URIs, ex: http://localhost:8080,https://localhost:8443.";
|
||||
+ }
|
||||
}
|
||||
|
||||
private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
|
||||
103
0015-SessionWindows-closed-early.patch
Normal file
103
0015-SessionWindows-closed-early.patch
Normal file
@ -0,0 +1,103 @@
|
||||
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
|
||||
index 5648e8f0a3..24e9e21ad7 100644
|
||||
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
|
||||
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
|
||||
@@ -121,7 +121,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
|
||||
|
||||
final long timestamp = context().timestamp();
|
||||
observedStreamTime = Math.max(observedStreamTime, timestamp);
|
||||
- final long closeTime = observedStreamTime - windows.gracePeriodMs();
|
||||
+ final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap();
|
||||
|
||||
final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
|
||||
final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
|
||||
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
|
||||
index ab2adbfbb1..244ea9f4fe 100644
|
||||
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
|
||||
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
|
||||
@@ -441,9 +441,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
||||
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
|
||||
processor.process("OnTime1", "1");
|
||||
|
||||
- // dummy record to advance stream time = 1
|
||||
- context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
|
||||
- processor.process("dummy", "dummy");
|
||||
+ // dummy record to advance stream time = 11, 10 for gap time plus 1 to place outside window
|
||||
+ context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders()));
|
||||
+ processor.process("dummy", "dummy");
|
||||
|
||||
try (final LogCaptureAppender appender =
|
||||
LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) {
|
||||
@@ -455,7 +455,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
||||
assertThat(
|
||||
appender.getMessages(),
|
||||
hasItem("Skipping record for expired window." +
|
||||
- " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]")
|
||||
+ " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]")
|
||||
);
|
||||
}
|
||||
|
||||
@@ -542,17 +542,17 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
||||
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
|
||||
processor.process("OnTime1", "1");
|
||||
|
||||
- // dummy record to advance stream time = 1
|
||||
- context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
|
||||
- processor.process("dummy", "dummy");
|
||||
+ // dummy record to advance stream time = 11, 10 for gap time plus 1 to place at edge of window
|
||||
+ context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders()));
|
||||
+ processor.process("dummy", "dummy");
|
||||
|
||||
// delayed record arrives on time, should not be skipped
|
||||
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
|
||||
processor.process("OnTime2", "1");
|
||||
|
||||
- // dummy record to advance stream time = 2
|
||||
- context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null));
|
||||
- processor.process("dummy", "dummy");
|
||||
+ // dummy record to advance stream time = 12, 10 for gap time plus 2 to place outside window
|
||||
+ context.setRecordContext(new ProcessorRecordContext(12, -2, -3, "topic", new RecordHeaders()));
|
||||
+ processor.process("dummy", "dummy");
|
||||
|
||||
// delayed record arrives late
|
||||
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
|
||||
@@ -561,7 +561,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
||||
assertThat(
|
||||
appender.getMessages(),
|
||||
hasItem("Skipping record for expired window." +
|
||||
- " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]")
|
||||
+ " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]")
|
||||
);
|
||||
}
|
||||
|
||||
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
|
||||
index 46a8ab8dcf..e0b7957e01 100644
|
||||
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
|
||||
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
|
||||
@@ -581,7 +581,7 @@ public class SuppressScenarioTest {
|
||||
// arbitrarily disordered records are admitted, because the *window* is not closed until stream-time > window-end + grace
|
||||
inputTopic.pipeInput("k1", "v1", 1L);
|
||||
// any record in the same partition advances stream time (note the key is different)
|
||||
- inputTopic.pipeInput("k2", "v1", 6L);
|
||||
+ inputTopic.pipeInput("k2", "v1", 11L);
|
||||
// late event for first window - this should get dropped from all streams, since the first window is now closed.
|
||||
inputTopic.pipeInput("k1", "v1", 5L);
|
||||
// just pushing stream time forward to flush the other events through.
|
||||
@@ -594,7 +594,7 @@ public class SuppressScenarioTest {
|
||||
new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L),
|
||||
new KeyValueTimestamp<>("[k1@0/5]", null, 5L),
|
||||
new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
|
||||
- new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L),
|
||||
+ new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L),
|
||||
new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L)
|
||||
)
|
||||
);
|
||||
@@ -602,7 +602,7 @@ public class SuppressScenarioTest {
|
||||
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
|
||||
asList(
|
||||
new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
|
||||
- new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L)
|
||||
+ new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L)
|
||||
)
|
||||
);
|
||||
}
|
||||
42
0016-non-existent-URL.patch
Normal file
42
0016-non-existent-URL.patch
Normal file
@ -0,0 +1,42 @@
|
||||
diff --git a/build.gradle b/build.gradle
|
||||
index 5e8b82237a..0a181019cb 100644
|
||||
--- a/build.gradle
|
||||
+++ b/build.gradle
|
||||
@@ -2296,4 +2296,37 @@ task aggregatedJavadoc(type: Javadoc, dependsOn: compileJava) {
|
||||
options.links "https://docs.oracle.com/en/java/javase/${JavaVersion.current().majorVersion}/docs/api/"
|
||||
else
|
||||
options.links "https://docs.oracle.com/javase/8/docs/api/"
|
||||
+ // TODO: remove this snippet once JDK >11 is used or https://bugs.openjdk.java.net/browse/JDK-8215291 is backported to JDK11
|
||||
+ // Patch to include `getURLPrefix` from JDK 12 +
|
||||
+ // NOTICE: This code was copied from original ORACLE search.js file present in JDK 12 and newer
|
||||
+ final SEARCH_PATCH_MODULE_LESS_AWARE = "\n\n" +
|
||||
+ "// Fix for moudle-less aware search\n" +
|
||||
+ "function getURLPrefix(ui) {\n" +
|
||||
+ " var urlPrefix=\"\";\n" +
|
||||
+ " var slash = \"/\";\n" +
|
||||
+ " if (ui.item.category === catModules) {\n" +
|
||||
+ " return ui.item.l + slash;\n" +
|
||||
+ " } else if (ui.item.category === catPackages && ui.item.m) {\n" +
|
||||
+ " return ui.item.m + slash;\n" +
|
||||
+ " } else if (ui.item.category === catTypes || ui.item.category === catMembers) {\n" +
|
||||
+ " if (ui.item.m) {\n" +
|
||||
+ " urlPrefix = ui.item.m + slash;\n" +
|
||||
+ " } else {\n" +
|
||||
+ " \$.each(packageSearchIndex, function(index, item) {\n" +
|
||||
+ " if (item.m && ui.item.p === item.l) {\n" +
|
||||
+ " urlPrefix = item.m + slash;\n" +
|
||||
+ " }\n" +
|
||||
+ " });\n" +
|
||||
+ " }\n" +
|
||||
+ " }\n" +
|
||||
+ " return urlPrefix;\n" +
|
||||
+ "}"
|
||||
+
|
||||
+ // When all the JavaDoc is generated we proceed to patch the search.js file
|
||||
+ doLast {
|
||||
+ def searchJsFile = new File(destinationDir.getAbsolutePath() + '/search.js')
|
||||
+ // Append the patch to the file. By being defined at a later position, JS will execute that definition instead of
|
||||
+ // the one provided by default (higher up in the file).
|
||||
+ searchJsFile.append SEARCH_PATCH_MODULE_LESS_AWARE
|
||||
+ }
|
||||
}
|
||||
74
0017-fix-log-clean.patch
Normal file
74
0017-fix-log-clean.patch
Normal file
@ -0,0 +1,74 @@
|
||||
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
|
||||
index 7a8a13c6e7..177b460d38 100644
|
||||
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
|
||||
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
|
||||
@@ -840,7 +840,10 @@ private[log] class Cleaner(val id: Int,
|
||||
logSize + segs.head.size <= maxSize &&
|
||||
indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize &&
|
||||
timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
|
||||
- lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) {
|
||||
+ //if first segment size is 0, we don't need to do the index offset range check.
|
||||
+ //this will avoid empty log left every 2^31 message.
|
||||
+ (segs.head.size == 0 ||
|
||||
+ lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue)) {
|
||||
group = segs.head :: group
|
||||
logSize += segs.head.size
|
||||
indexSize += segs.head.offsetIndex.sizeInBytes
|
||||
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
|
||||
index 43bc3b9f28..e5984c4f31 100755
|
||||
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
|
||||
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
|
||||
@@ -1258,6 +1258,53 @@ class LogCleanerTest {
|
||||
"All but the last group should be the target size.")
|
||||
}
|
||||
|
||||
+ @Test
|
||||
+ def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={
|
||||
+ val cleaner = makeCleaner(Int.MaxValue)
|
||||
+ val logProps = new Properties()
|
||||
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
|
||||
+
|
||||
+ val k="key".getBytes()
|
||||
+ val v="val".getBytes()
|
||||
+
|
||||
+ //create 3 segments
|
||||
+ for(i <- 0 until 3){
|
||||
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
|
||||
+ //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment
|
||||
+ val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
|
||||
+ log.appendAsFollower(records)
|
||||
+ assertEquals(i + 1, log.numberOfSegments)
|
||||
+ }
|
||||
+
|
||||
+ //4th active segment, not clean
|
||||
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
|
||||
+
|
||||
+ val totalSegments = 4
|
||||
+ //last segment not cleanable
|
||||
+ val firstUncleanableOffset = log.logEndOffset - 1
|
||||
+ val notCleanableSegments = 1
|
||||
+
|
||||
+ assertEquals(totalSegments, log.numberOfSegments)
|
||||
+ var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
|
||||
+ //because index file uses 4 byte relative index offset and current segments all none empty,
|
||||
+ //segments will not group even their size is very small.
|
||||
+ assertEquals(totalSegments - notCleanableSegments, groups.size)
|
||||
+ //do clean to clean first 2 segments to empty
|
||||
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
|
||||
+ assertEquals(totalSegments, log.numberOfSegments)
|
||||
+ assertEquals(0, log.logSegments.head.size)
|
||||
+
|
||||
+ //after clean we got 2 empty segment, they will group together this time
|
||||
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
|
||||
+ val noneEmptySegment = 1
|
||||
+ assertEquals(noneEmptySegment + 1, groups.size)
|
||||
+
|
||||
+ //trigger a clean and 2 empty segments should cleaned to 1
|
||||
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
|
||||
+ assertEquals(totalSegments - 1, log.numberOfSegments)
|
||||
+ }
|
||||
+
|
||||
+
|
||||
/**
|
||||
* Validate the logic for grouping log segments together for cleaning when only a small number of
|
||||
* messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not
|
||||
17
kafka.spec
17
kafka.spec
@ -4,7 +4,7 @@
|
||||
|
||||
Name: kafka
|
||||
Version: 2.8.2
|
||||
Release: 12
|
||||
Release: 17
|
||||
Summary: A Distributed Streaming Platform.
|
||||
|
||||
License: Apache-2.0
|
||||
@ -24,6 +24,11 @@ Patch8: 0009-format-RocksDBConfigSetter.patch
|
||||
Patch9: 0010-not-update-connection.patch
|
||||
Patch10: 0011-ConfigEntry.patch
|
||||
Patch11: 0012-incorrectly-LeaderElectionCommand.patch
|
||||
Patch12: 0013-AlterIsr.patch
|
||||
Patch13: 0014-override-toString.patch
|
||||
Patch14: 0015-SessionWindows-closed-early.patch
|
||||
Patch15: 0016-non-existent-URL.patch
|
||||
Patch16: 0017-fix-log-clean.patch
|
||||
|
||||
BuildRequires: systemd java-1.8.0-openjdk-devel
|
||||
Provides: kafka = %{version}
|
||||
@ -75,6 +80,16 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses
|
||||
rm -rf %{buildroot}
|
||||
|
||||
%changelog
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-17
|
||||
- log clean relative index range check of group consider empty log segment to avoid too many empty log segment left
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-16
|
||||
- Javadocs search sends you to a non-existent URL
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-15
|
||||
- SessionWindows are closed too early
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-14
|
||||
- override toString method to show correct value in doc
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-13
|
||||
- AlterIsr and LeaderAndIsr race condition
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-12
|
||||
- Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-11
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user