75 lines
3.8 KiB
Diff
75 lines
3.8 KiB
Diff
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
|
|
index 7a8a13c6e7..177b460d38 100644
|
|
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
|
|
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
|
|
@@ -840,7 +840,10 @@ private[log] class Cleaner(val id: Int,
|
|
logSize + segs.head.size <= maxSize &&
|
|
indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize &&
|
|
timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
|
|
- lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) {
|
|
+ //if first segment size is 0, we don't need to do the index offset range check.
|
|
+ //this will avoid empty log left every 2^31 message.
|
|
+ (segs.head.size == 0 ||
|
|
+ lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue)) {
|
|
group = segs.head :: group
|
|
logSize += segs.head.size
|
|
indexSize += segs.head.offsetIndex.sizeInBytes
|
|
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
|
|
index 43bc3b9f28..e5984c4f31 100755
|
|
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
|
|
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
|
|
@@ -1258,6 +1258,53 @@ class LogCleanerTest {
|
|
"All but the last group should be the target size.")
|
|
}
|
|
|
|
+ @Test
|
|
+ def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={
|
|
+ val cleaner = makeCleaner(Int.MaxValue)
|
|
+ val logProps = new Properties()
|
|
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
|
|
+
|
|
+ val k="key".getBytes()
|
|
+ val v="val".getBytes()
|
|
+
|
|
+ //create 3 segments
|
|
+ for(i <- 0 until 3){
|
|
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
|
|
+ //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment
|
|
+ val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
|
|
+ log.appendAsFollower(records)
|
|
+ assertEquals(i + 1, log.numberOfSegments)
|
|
+ }
|
|
+
|
|
+ //4th active segment, not clean
|
|
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
|
|
+
|
|
+ val totalSegments = 4
|
|
+ //last segment not cleanable
|
|
+ val firstUncleanableOffset = log.logEndOffset - 1
|
|
+ val notCleanableSegments = 1
|
|
+
|
|
+ assertEquals(totalSegments, log.numberOfSegments)
|
|
+ var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
|
|
+ //because index file uses 4 byte relative index offset and current segments all none empty,
|
|
+ //segments will not group even their size is very small.
|
|
+ assertEquals(totalSegments - notCleanableSegments, groups.size)
|
|
+ //do clean to clean first 2 segments to empty
|
|
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
|
|
+ assertEquals(totalSegments, log.numberOfSegments)
|
|
+ assertEquals(0, log.logSegments.head.size)
|
|
+
|
|
+ //after clean we got 2 empty segment, they will group together this time
|
|
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
|
|
+ val noneEmptySegment = 1
|
|
+ assertEquals(noneEmptySegment + 1, groups.size)
|
|
+
|
|
+ //trigger a clean and 2 empty segments should cleaned to 1
|
|
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
|
|
+ assertEquals(totalSegments - 1, log.numberOfSegments)
|
|
+ }
|
|
+
|
|
+
|
|
/**
|
|
* Validate the logic for grouping log segments together for cleaning when only a small number of
|
|
* messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not
|