Compare commits

...

18 Commits

Author SHA1 Message Date
openeuler-ci-bot
e9be94c566
!59 log clean relative index range check of group consider empty log segment to avoid too many empty log segment left
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-08 08:05:13 +00:00
openeuler-ci-bot
0159004f7d
!58 Javadocs search sends you to a non-existent URL
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-08 08:04:34 +00:00
sundapeng
433e5a8631 log clean relative index range check of group consider empty log segment to avoid too many empty log segment left 2023-12-08 07:38:22 +00:00
openeuler-ci-bot
1253478f93
!57 SessionWindows are closed too early
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-08 07:13:07 +00:00
sundapeng
74842eb6db Javadocs search sends you to a non-existent URL 2023-12-08 07:06:46 +00:00
sundapeng
d8872eb8e2 SessionWindows are closed too early 2023-12-08 06:51:04 +00:00
openeuler-ci-bot
ea0e4fe3aa
!56 override toString method to show correct value in doc
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-08 06:41:13 +00:00
openeuler-ci-bot
f157f4693f
!55 AlterIsr and LeaderAndIsr race condition
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-08 06:39:32 +00:00
sundapeng
a51319836f override toString method to show correct value in doc 2023-12-08 06:25:55 +00:00
sundapeng
61fe48ab84 AlterIsr and LeaderAndIsr race condition 2023-12-08 06:16:52 +00:00
openeuler-ci-bot
c06ac6e7b7
!54 Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-08 06:12:01 +00:00
openeuler-ci-bot
a0e33e9d4b
!53 ConfigEntry#equal does not compare other fields when value is NOT null
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-08 05:41:56 +00:00
openeuler-ci-bot
2bffa809d2
!51 Don't update connection idle time for muted connections
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-08 02:58:46 +00:00
openeuler-ci-bot
e3bce27258
!50 Fix the formatting of example RocksDBConfigSetter
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-08 02:55:41 +00:00
openeuler-ci-bot
026ed38997
!49 Cast SMT should allow null value records to pass through
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-08 02:39:42 +00:00
openeuler-ci-bot
e21e1d8199
!48 Fix using random payload in ProducerPerformance incorrectly
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-08 02:38:52 +00:00
openeuler-ci-bot
9a3bdfcb54
!47 Upgrade Guava to 32.0.0 to address CVE-2023-2976
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-07 07:02:47 +00:00
openeuler-ci-bot
48aaa7caa0
!46 rocksdb 升级到6.19.3,修复CVE-2016-3189
From: @sundapeng001 
Reviewed-by: @hu-zongtang 
Signed-off-by: @hu-zongtang
2023-12-04 06:55:59 +00:00
6 changed files with 474 additions and 1 deletions

223
0013-AlterIsr.patch Normal file
View File

@ -0,0 +1,223 @@
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index a58f4238ff..88b337311d 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -581,9 +581,6 @@ class Partition(val topicPartition: TopicPartition,
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
zkVersion = partitionState.zkVersion
- // Clear any pending AlterIsr requests and check replica state
- alterIsrManager.clearPending(topicPartition)
-
// In the case of successive leader elections in a short time period, a follower may have
// entries in its log from a later epoch than any entry in the new leader's log. In order
// to ensure that these followers can truncate to the right offset, we must cache the new
@@ -661,9 +658,6 @@ class Partition(val topicPartition: TopicPartition,
leaderEpochStartOffsetOpt = None
zkVersion = partitionState.zkVersion
- // Since we might have been a leader previously, still clear any pending AlterIsr requests
- alterIsrManager.clearPending(topicPartition)
-
if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) {
false
} else {
@@ -1373,13 +1367,15 @@ class Partition(val topicPartition: TopicPartition,
isrState = proposedIsrState
if (!alterIsrManager.submit(alterIsrItem)) {
- // If the ISR manager did not accept our update, we need to revert back to previous state
+ // If the ISR manager did not accept our update, we need to revert the proposed state.
+ // This can happen if the ISR state was updated by the controller (via LeaderAndIsr in ZK-mode or
+ // ChangePartitionRecord in KRaft mode) but we have an AlterIsr request still in-flight.
isrState = oldState
isrChangeListener.markFailed()
- throw new IllegalStateException(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
+ warn(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
+ } else {
+ debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
}
-
- debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
}
/**
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala
index 9ad734f708..1059a3df3e 100644
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
@@ -49,8 +49,6 @@ trait AlterIsrManager {
def shutdown(): Unit = {}
def submit(alterIsrItem: AlterIsrItem): Boolean
-
- def clearPending(topicPartition: TopicPartition): Unit
}
case class AlterIsrItem(topicPartition: TopicPartition,
@@ -134,9 +132,6 @@ class DefaultAlterIsrManager(
enqueued
}
- override def clearPending(topicPartition: TopicPartition): Unit = {
- unsentIsrUpdates.remove(topicPartition)
- }
private[server] def maybePropagateIsrChanges(): Unit = {
// Send all pending items if there is not already a request in-flight.
diff --git a/core/src/main/scala/kafka/server/ZkIsrManager.scala b/core/src/main/scala/kafka/server/ZkIsrManager.scala
index 2d88aac6b4..8dffcdf307 100644
--- a/core/src/main/scala/kafka/server/ZkIsrManager.scala
+++ b/core/src/main/scala/kafka/server/ZkIsrManager.scala
@@ -55,12 +55,6 @@ class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) ex
period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS)
}
- override def clearPending(topicPartition: TopicPartition): Unit = {
- // Since we always immediately process ZK updates and never actually enqueue anything, there is nothing to
- // clear here so this is a no-op. Even if there are changes that have not been propagated, the write to ZK
- // has already happened, so we may as well send the notification to the controller.
- }
-
override def submit(alterIsrItem: AlterIsrItem): Boolean = {
debug(s"Writing new ISR ${alterIsrItem.leaderAndIsr.isr} to ZooKeeper with version " +
s"${alterIsrItem.leaderAndIsr.zkVersion} for partition ${alterIsrItem.topicPartition}")
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 5eedb63ae5..4dbd735753 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -18,10 +18,10 @@
package kafka.controller
import java.util.Properties
-import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue}
-
+import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingQueue, TimeUnit}
import com.yammer.metrics.core.Timer
import kafka.api.{ApiVersion, KAFKA_2_6_IV0, KAFKA_2_7_IV0, LeaderAndIsr}
+import kafka.controller.KafkaController.AlterIsrCallback
import kafka.metrics.KafkaYammerMetrics
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{LogCaptureAppender, TestUtils}
@@ -849,6 +849,67 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
latch.await()
}
+ @Test
+ def testAlterIsrErrors(): Unit = {
+ servers = makeServers(1)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(controllerId))
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+ val controller = getController().kafkaController
+ var future = captureAlterIsrError(controllerId, controller.brokerEpoch - 1,
+ Map(tp -> LeaderAndIsr(controllerId, List(controllerId))))
+ var capturedError = future.get(5, TimeUnit.SECONDS)
+ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
+
+ future = captureAlterIsrError(99, controller.brokerEpoch,
+ Map(tp -> LeaderAndIsr(controllerId, List(controllerId))))
+ capturedError = future.get(5, TimeUnit.SECONDS)
+ assertEquals(Errors.STALE_BROKER_EPOCH, capturedError)
+
+ val unknownTopicPartition = new TopicPartition("unknown", 99)
+ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
+ Map(unknownTopicPartition -> LeaderAndIsr(controllerId, List(controllerId))), unknownTopicPartition)
+ capturedError = future.get(5, TimeUnit.SECONDS)
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, capturedError)
+
+ future = captureAlterIsrPartitionError(controllerId, controller.brokerEpoch,
+ Map(tp -> LeaderAndIsr(controllerId, 1, List(controllerId), 99)), tp)
+ capturedError = future.get(5, TimeUnit.SECONDS)
+ assertEquals(Errors.INVALID_UPDATE_VERSION, capturedError)
+ }
+
+ def captureAlterIsrError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr]): CompletableFuture[Errors] = {
+ val future = new CompletableFuture[Errors]()
+ val controller = getController().kafkaController
+ val callback: AlterIsrCallback = {
+ case Left(_: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
+ future.completeExceptionally(new AssertionError(s"Should have seen top-level error"))
+ case Right(error: Errors) =>
+ future.complete(error)
+ }
+ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback))
+ future
+ }
+
+ def captureAlterIsrPartitionError(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], tp: TopicPartition): CompletableFuture[Errors] = {
+ val future = new CompletableFuture[Errors]()
+ val controller = getController().kafkaController
+ val callback: AlterIsrCallback = {
+ case Left(partitionResults: Map[TopicPartition, Either[Errors, LeaderAndIsr]]) =>
+ partitionResults.get(tp) match {
+ case Some(Left(error: Errors)) => future.complete(error)
+ case Some(Right(_: LeaderAndIsr)) => future.completeExceptionally(new AssertionError(s"Should have seen an error for $tp in result"))
+ case None => future.completeExceptionally(new AssertionError(s"Should have seen $tp in result"))
+ }
+ case Right(_: Errors) =>
+ future.completeExceptionally(new AssertionError(s"Should not seen top-level error"))
+ }
+ controller.eventManager.put(AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback))
+ future
+ }
+
+
@Test
def testTopicIdsAreAdded(): Unit = {
servers = makeServers(1)
diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
index 1074fd3157..1c8c81471f 100644
--- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
@@ -70,8 +70,10 @@ class AlterIsrManagerTest {
@Test
def testOverwriteWithinBatch(): Unit = {
val capture = EasyMock.newCapture[AbstractRequest.Builder[AlterIsrRequest]]()
+ val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]()
+
EasyMock.expect(brokerToController.start())
- EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.anyObject())).once()
+ EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.capture(callbackCapture))).times(2)
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
@@ -81,11 +83,21 @@ class AlterIsrManagerTest {
// Only send one ISR update for a given topic+partition
assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)))
assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {}, 0)))
+
+ // Simulate response
+ val alterIsrResp = partitionResponse(tp0, Errors.NONE)
+ val resp = new ClientResponse(null, null, "", 0L, 0L,
+ false, null, null, alterIsrResp)
+ callbackCapture.getValue.onComplete(resp)
+
+ // Now we can submit this partition again
+ assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1), 10), _ => {}, 0)))
EasyMock.verify(brokerToController)
+ // Make sure we sent the right request ISR={1}
val request = capture.getValue.build()
assertEquals(request.data().topics().size(), 1)
- assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 3)
+ assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(), 1)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 43df2b97f4..8e52007bc7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1106,10 +1106,6 @@ object TestUtils extends Logging {
}
}
- override def clearPending(topicPartition: TopicPartition): Unit = {
- inFlight.set(false);
- }
-
def completeIsrUpdate(newZkVersion: Int): Unit = {
if (inFlight.compareAndSet(true, false)) {
val item = isrUpdates.head

View File

@ -0,0 +1,16 @@
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 58a9ce3d5a..daa5a0bf70 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -521,6 +521,11 @@ public class WorkerConfig extends AbstractConfig {
}
}
}
+
+ @Override
+ public String toString() {
+ return "List of comma-separated URIs, ex: http://localhost:8080,https://localhost:8443.";
+ }
}
private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {

View File

@ -0,0 +1,103 @@
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 5648e8f0a3..24e9e21ad7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -121,7 +121,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
final long timestamp = context().timestamp();
observedStreamTime = Math.max(observedStreamTime, timestamp);
- final long closeTime = observedStreamTime - windows.gracePeriodMs();
+ final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap();
final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index ab2adbfbb1..244ea9f4fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -441,9 +441,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
processor.process("OnTime1", "1");
- // dummy record to advance stream time = 1
- context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
- processor.process("dummy", "dummy");
+ // dummy record to advance stream time = 11, 10 for gap time plus 1 to place outside window
+ context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders()));
+ processor.process("dummy", "dummy");
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) {
@@ -455,7 +455,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
assertThat(
appender.getMessages(),
hasItem("Skipping record for expired window." +
- " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]")
+ " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]")
);
}
@@ -542,17 +542,17 @@ public class KStreamSessionWindowAggregateProcessorTest {
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
processor.process("OnTime1", "1");
- // dummy record to advance stream time = 1
- context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
- processor.process("dummy", "dummy");
+ // dummy record to advance stream time = 11, 10 for gap time plus 1 to place at edge of window
+ context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders()));
+ processor.process("dummy", "dummy");
// delayed record arrives on time, should not be skipped
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
processor.process("OnTime2", "1");
- // dummy record to advance stream time = 2
- context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null));
- processor.process("dummy", "dummy");
+ // dummy record to advance stream time = 12, 10 for gap time plus 2 to place outside window
+ context.setRecordContext(new ProcessorRecordContext(12, -2, -3, "topic", new RecordHeaders()));
+ processor.process("dummy", "dummy");
// delayed record arrives late
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
@@ -561,7 +561,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
assertThat(
appender.getMessages(),
hasItem("Skipping record for expired window." +
- " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]")
+ " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]")
);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 46a8ab8dcf..e0b7957e01 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -581,7 +581,7 @@ public class SuppressScenarioTest {
// arbitrarily disordered records are admitted, because the *window* is not closed until stream-time > window-end + grace
inputTopic.pipeInput("k1", "v1", 1L);
// any record in the same partition advances stream time (note the key is different)
- inputTopic.pipeInput("k2", "v1", 6L);
+ inputTopic.pipeInput("k2", "v1", 11L);
// late event for first window - this should get dropped from all streams, since the first window is now closed.
inputTopic.pipeInput("k1", "v1", 5L);
// just pushing stream time forward to flush the other events through.
@@ -594,7 +594,7 @@ public class SuppressScenarioTest {
new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L),
new KeyValueTimestamp<>("[k1@0/5]", null, 5L),
new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
- new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L),
+ new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L),
new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L)
)
);
@@ -602,7 +602,7 @@ public class SuppressScenarioTest {
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
- new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L)
+ new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L)
)
);
}

View File

@ -0,0 +1,42 @@
diff --git a/build.gradle b/build.gradle
index 5e8b82237a..0a181019cb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2296,4 +2296,37 @@ task aggregatedJavadoc(type: Javadoc, dependsOn: compileJava) {
options.links "https://docs.oracle.com/en/java/javase/${JavaVersion.current().majorVersion}/docs/api/"
else
options.links "https://docs.oracle.com/javase/8/docs/api/"
+ // TODO: remove this snippet once JDK >11 is used or https://bugs.openjdk.java.net/browse/JDK-8215291 is backported to JDK11
+ // Patch to include `getURLPrefix` from JDK 12 +
+ // NOTICE: This code was copied from original ORACLE search.js file present in JDK 12 and newer
+ final SEARCH_PATCH_MODULE_LESS_AWARE = "\n\n" +
+ "// Fix for moudle-less aware search\n" +
+ "function getURLPrefix(ui) {\n" +
+ " var urlPrefix=\"\";\n" +
+ " var slash = \"/\";\n" +
+ " if (ui.item.category === catModules) {\n" +
+ " return ui.item.l + slash;\n" +
+ " } else if (ui.item.category === catPackages && ui.item.m) {\n" +
+ " return ui.item.m + slash;\n" +
+ " } else if (ui.item.category === catTypes || ui.item.category === catMembers) {\n" +
+ " if (ui.item.m) {\n" +
+ " urlPrefix = ui.item.m + slash;\n" +
+ " } else {\n" +
+ " \$.each(packageSearchIndex, function(index, item) {\n" +
+ " if (item.m && ui.item.p === item.l) {\n" +
+ " urlPrefix = item.m + slash;\n" +
+ " }\n" +
+ " });\n" +
+ " }\n" +
+ " }\n" +
+ " return urlPrefix;\n" +
+ "}"
+
+ // When all the JavaDoc is generated we proceed to patch the search.js file
+ doLast {
+ def searchJsFile = new File(destinationDir.getAbsolutePath() + '/search.js')
+ // Append the patch to the file. By being defined at a later position, JS will execute that definition instead of
+ // the one provided by default (higher up in the file).
+ searchJsFile.append SEARCH_PATCH_MODULE_LESS_AWARE
+ }
}

74
0017-fix-log-clean.patch Normal file
View File

@ -0,0 +1,74 @@
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7a8a13c6e7..177b460d38 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -840,7 +840,10 @@ private[log] class Cleaner(val id: Int,
logSize + segs.head.size <= maxSize &&
indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize &&
timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
- lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) {
+ //if first segment size is 0, we don't need to do the index offset range check.
+ //this will avoid empty log left every 2^31 message.
+ (segs.head.size == 0 ||
+ lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue)) {
group = segs.head :: group
logSize += segs.head.size
indexSize += segs.head.offsetIndex.sizeInBytes
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 43bc3b9f28..e5984c4f31 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1258,6 +1258,53 @@ class LogCleanerTest {
"All but the last group should be the target size.")
}
+ @Test
+ def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={
+ val cleaner = makeCleaner(Int.MaxValue)
+ val logProps = new Properties()
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+ val k="key".getBytes()
+ val v="val".getBytes()
+
+ //create 3 segments
+ for(i <- 0 until 3){
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
+ //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment
+ val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
+ log.appendAsFollower(records)
+ assertEquals(i + 1, log.numberOfSegments)
+ }
+
+ //4th active segment, not clean
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
+
+ val totalSegments = 4
+ //last segment not cleanable
+ val firstUncleanableOffset = log.logEndOffset - 1
+ val notCleanableSegments = 1
+
+ assertEquals(totalSegments, log.numberOfSegments)
+ var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
+ //because index file uses 4 byte relative index offset and current segments all none empty,
+ //segments will not group even their size is very small.
+ assertEquals(totalSegments - notCleanableSegments, groups.size)
+ //do clean to clean first 2 segments to empty
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
+ assertEquals(totalSegments, log.numberOfSegments)
+ assertEquals(0, log.logSegments.head.size)
+
+ //after clean we got 2 empty segment, they will group together this time
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
+ val noneEmptySegment = 1
+ assertEquals(noneEmptySegment + 1, groups.size)
+
+ //trigger a clean and 2 empty segments should cleaned to 1
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
+ assertEquals(totalSegments - 1, log.numberOfSegments)
+ }
+
+
/**
* Validate the logic for grouping log segments together for cleaning when only a small number of
* messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not

View File

@ -4,7 +4,7 @@
Name: kafka
Version: 2.8.2
Release: 12
Release: 17
Summary: A Distributed Streaming Platform.
License: Apache-2.0
@ -24,6 +24,11 @@ Patch8: 0009-format-RocksDBConfigSetter.patch
Patch9: 0010-not-update-connection.patch
Patch10: 0011-ConfigEntry.patch
Patch11: 0012-incorrectly-LeaderElectionCommand.patch
Patch12: 0013-AlterIsr.patch
Patch13: 0014-override-toString.patch
Patch14: 0015-SessionWindows-closed-early.patch
Patch15: 0016-non-existent-URL.patch
Patch16: 0017-fix-log-clean.patch
BuildRequires: systemd java-1.8.0-openjdk-devel
Provides: kafka = %{version}
@ -75,6 +80,16 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses
rm -rf %{buildroot}
%changelog
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-17
- log clean relative index range check of group consider empty log segment to avoid too many empty log segment left
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-16
- Javadocs search sends you to a non-existent URL
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-15
- SessionWindows are closed too early
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-14
- override toString method to show correct value in doc
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-13
- AlterIsr and LeaderAndIsr race condition
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-12
- Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-11