!57 SessionWindows are closed too early
From: @sundapeng001 Reviewed-by: @hu-zongtang Signed-off-by: @hu-zongtang
This commit is contained in:
commit
1253478f93
103
0015-SessionWindows-closed-early.patch
Normal file
103
0015-SessionWindows-closed-early.patch
Normal file
@ -0,0 +1,103 @@
|
||||
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
|
||||
index 5648e8f0a3..24e9e21ad7 100644
|
||||
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
|
||||
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
|
||||
@@ -121,7 +121,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
|
||||
|
||||
final long timestamp = context().timestamp();
|
||||
observedStreamTime = Math.max(observedStreamTime, timestamp);
|
||||
- final long closeTime = observedStreamTime - windows.gracePeriodMs();
|
||||
+ final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap();
|
||||
|
||||
final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
|
||||
final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
|
||||
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
|
||||
index ab2adbfbb1..244ea9f4fe 100644
|
||||
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
|
||||
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
|
||||
@@ -441,9 +441,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
||||
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
|
||||
processor.process("OnTime1", "1");
|
||||
|
||||
- // dummy record to advance stream time = 1
|
||||
- context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
|
||||
- processor.process("dummy", "dummy");
|
||||
+ // dummy record to advance stream time = 11, 10 for gap time plus 1 to place outside window
|
||||
+ context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders()));
|
||||
+ processor.process("dummy", "dummy");
|
||||
|
||||
try (final LogCaptureAppender appender =
|
||||
LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) {
|
||||
@@ -455,7 +455,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
||||
assertThat(
|
||||
appender.getMessages(),
|
||||
hasItem("Skipping record for expired window." +
|
||||
- " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]")
|
||||
+ " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]")
|
||||
);
|
||||
}
|
||||
|
||||
@@ -542,17 +542,17 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
||||
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
|
||||
processor.process("OnTime1", "1");
|
||||
|
||||
- // dummy record to advance stream time = 1
|
||||
- context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
|
||||
- processor.process("dummy", "dummy");
|
||||
+ // dummy record to advance stream time = 11, 10 for gap time plus 1 to place at edge of window
|
||||
+ context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders()));
|
||||
+ processor.process("dummy", "dummy");
|
||||
|
||||
// delayed record arrives on time, should not be skipped
|
||||
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
|
||||
processor.process("OnTime2", "1");
|
||||
|
||||
- // dummy record to advance stream time = 2
|
||||
- context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null));
|
||||
- processor.process("dummy", "dummy");
|
||||
+ // dummy record to advance stream time = 12, 10 for gap time plus 2 to place outside window
|
||||
+ context.setRecordContext(new ProcessorRecordContext(12, -2, -3, "topic", new RecordHeaders()));
|
||||
+ processor.process("dummy", "dummy");
|
||||
|
||||
// delayed record arrives late
|
||||
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
|
||||
@@ -561,7 +561,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
||||
assertThat(
|
||||
appender.getMessages(),
|
||||
hasItem("Skipping record for expired window." +
|
||||
- " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]")
|
||||
+ " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]")
|
||||
);
|
||||
}
|
||||
|
||||
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
|
||||
index 46a8ab8dcf..e0b7957e01 100644
|
||||
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
|
||||
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
|
||||
@@ -581,7 +581,7 @@ public class SuppressScenarioTest {
|
||||
// arbitrarily disordered records are admitted, because the *window* is not closed until stream-time > window-end + grace
|
||||
inputTopic.pipeInput("k1", "v1", 1L);
|
||||
// any record in the same partition advances stream time (note the key is different)
|
||||
- inputTopic.pipeInput("k2", "v1", 6L);
|
||||
+ inputTopic.pipeInput("k2", "v1", 11L);
|
||||
// late event for first window - this should get dropped from all streams, since the first window is now closed.
|
||||
inputTopic.pipeInput("k1", "v1", 5L);
|
||||
// just pushing stream time forward to flush the other events through.
|
||||
@@ -594,7 +594,7 @@ public class SuppressScenarioTest {
|
||||
new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L),
|
||||
new KeyValueTimestamp<>("[k1@0/5]", null, 5L),
|
||||
new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
|
||||
- new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L),
|
||||
+ new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L),
|
||||
new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L)
|
||||
)
|
||||
);
|
||||
@@ -602,7 +602,7 @@ public class SuppressScenarioTest {
|
||||
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
|
||||
asList(
|
||||
new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
|
||||
- new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L)
|
||||
+ new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L)
|
||||
)
|
||||
);
|
||||
}
|
||||
@ -4,7 +4,7 @@
|
||||
|
||||
Name: kafka
|
||||
Version: 2.8.2
|
||||
Release: 14
|
||||
Release: 15
|
||||
Summary: A Distributed Streaming Platform.
|
||||
|
||||
License: Apache-2.0
|
||||
@ -26,6 +26,7 @@ Patch10: 0011-ConfigEntry.patch
|
||||
Patch11: 0012-incorrectly-LeaderElectionCommand.patch
|
||||
Patch12: 0013-AlterIsr.patch
|
||||
Patch13: 0014-override-toString.patch
|
||||
Patch14: 0015-SessionWindows-closed-early.patch
|
||||
|
||||
BuildRequires: systemd java-1.8.0-openjdk-devel
|
||||
Provides: kafka = %{version}
|
||||
@ -77,6 +78,8 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses
|
||||
rm -rf %{buildroot}
|
||||
|
||||
%changelog
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-15
|
||||
- SessionWindows are closed too early
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-14
|
||||
- override toString method to show correct value in doc
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-13
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user