!59 log clean relative index range check of group consider empty log segment to avoid too many empty log segment left
From: @sundapeng001 Reviewed-by: @hu-zongtang Signed-off-by: @hu-zongtang
This commit is contained in:
commit
e9be94c566
74
0017-fix-log-clean.patch
Normal file
74
0017-fix-log-clean.patch
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
|
||||||
|
index 7a8a13c6e7..177b460d38 100644
|
||||||
|
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
|
||||||
|
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
|
||||||
|
@@ -840,7 +840,10 @@ private[log] class Cleaner(val id: Int,
|
||||||
|
logSize + segs.head.size <= maxSize &&
|
||||||
|
indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize &&
|
||||||
|
timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
|
||||||
|
- lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) {
|
||||||
|
+ //if first segment size is 0, we don't need to do the index offset range check.
|
||||||
|
+ //this will avoid empty log left every 2^31 message.
|
||||||
|
+ (segs.head.size == 0 ||
|
||||||
|
+ lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue)) {
|
||||||
|
group = segs.head :: group
|
||||||
|
logSize += segs.head.size
|
||||||
|
indexSize += segs.head.offsetIndex.sizeInBytes
|
||||||
|
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
|
||||||
|
index 43bc3b9f28..e5984c4f31 100755
|
||||||
|
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
|
||||||
|
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
|
||||||
|
@@ -1258,6 +1258,53 @@ class LogCleanerTest {
|
||||||
|
"All but the last group should be the target size.")
|
||||||
|
}
|
||||||
|
|
||||||
|
+ @Test
|
||||||
|
+ def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={
|
||||||
|
+ val cleaner = makeCleaner(Int.MaxValue)
|
||||||
|
+ val logProps = new Properties()
|
||||||
|
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
|
||||||
|
+
|
||||||
|
+ val k="key".getBytes()
|
||||||
|
+ val v="val".getBytes()
|
||||||
|
+
|
||||||
|
+ //create 3 segments
|
||||||
|
+ for(i <- 0 until 3){
|
||||||
|
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
|
||||||
|
+ //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment
|
||||||
|
+ val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
|
||||||
|
+ log.appendAsFollower(records)
|
||||||
|
+ assertEquals(i + 1, log.numberOfSegments)
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ //4th active segment, not clean
|
||||||
|
+ log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
|
||||||
|
+
|
||||||
|
+ val totalSegments = 4
|
||||||
|
+ //last segment not cleanable
|
||||||
|
+ val firstUncleanableOffset = log.logEndOffset - 1
|
||||||
|
+ val notCleanableSegments = 1
|
||||||
|
+
|
||||||
|
+ assertEquals(totalSegments, log.numberOfSegments)
|
||||||
|
+ var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
|
||||||
|
+ //because index file uses 4 byte relative index offset and current segments all none empty,
|
||||||
|
+ //segments will not group even their size is very small.
|
||||||
|
+ assertEquals(totalSegments - notCleanableSegments, groups.size)
|
||||||
|
+ //do clean to clean first 2 segments to empty
|
||||||
|
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
|
||||||
|
+ assertEquals(totalSegments, log.numberOfSegments)
|
||||||
|
+ assertEquals(0, log.logSegments.head.size)
|
||||||
|
+
|
||||||
|
+ //after clean we got 2 empty segment, they will group together this time
|
||||||
|
+ groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
|
||||||
|
+ val noneEmptySegment = 1
|
||||||
|
+ assertEquals(noneEmptySegment + 1, groups.size)
|
||||||
|
+
|
||||||
|
+ //trigger a clean and 2 empty segments should cleaned to 1
|
||||||
|
+ cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
|
||||||
|
+ assertEquals(totalSegments - 1, log.numberOfSegments)
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+
|
||||||
|
/**
|
||||||
|
* Validate the logic for grouping log segments together for cleaning when only a small number of
|
||||||
|
* messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not
|
||||||
@ -4,7 +4,7 @@
|
|||||||
|
|
||||||
Name: kafka
|
Name: kafka
|
||||||
Version: 2.8.2
|
Version: 2.8.2
|
||||||
Release: 16
|
Release: 17
|
||||||
Summary: A Distributed Streaming Platform.
|
Summary: A Distributed Streaming Platform.
|
||||||
|
|
||||||
License: Apache-2.0
|
License: Apache-2.0
|
||||||
@ -28,6 +28,7 @@ Patch12: 0013-AlterIsr.patch
|
|||||||
Patch13: 0014-override-toString.patch
|
Patch13: 0014-override-toString.patch
|
||||||
Patch14: 0015-SessionWindows-closed-early.patch
|
Patch14: 0015-SessionWindows-closed-early.patch
|
||||||
Patch15: 0016-non-existent-URL.patch
|
Patch15: 0016-non-existent-URL.patch
|
||||||
|
Patch16: 0017-fix-log-clean.patch
|
||||||
|
|
||||||
BuildRequires: systemd java-1.8.0-openjdk-devel
|
BuildRequires: systemd java-1.8.0-openjdk-devel
|
||||||
Provides: kafka = %{version}
|
Provides: kafka = %{version}
|
||||||
@ -79,6 +80,8 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses
|
|||||||
rm -rf %{buildroot}
|
rm -rf %{buildroot}
|
||||||
|
|
||||||
%changelog
|
%changelog
|
||||||
|
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-17
|
||||||
|
- log clean relative index range check of group consider empty log segment to avoid too many empty log segment left
|
||||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-16
|
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-16
|
||||||
- Javadocs search sends you to a non-existent URL
|
- Javadocs search sends you to a non-existent URL
|
||||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-15
|
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-15
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user