kafka/0006-NPE-subscriptionState.patch
2023-11-28 05:33:35 +00:00

52 lines
2.8 KiB
Diff

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 30491110a3..ce81aa1b95 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -539,10 +539,13 @@ public class SubscriptionState {
synchronized Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) {
TopicPartitionState topicPartitionState = assignedState(tp);
- if (isolationLevel == IsolationLevel.READ_COMMITTED)
+ if (topicPartitionState.position == null) {
+ return null;
+ } else if (isolationLevel == IsolationLevel.READ_COMMITTED) {
return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset - topicPartitionState.position.offset;
- else
+ } else {
return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position.offset;
+ }
}
synchronized Long partitionLead(TopicPartition tp) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index d6e88008b5..d19234fe8a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
@@ -794,4 +795,18 @@ public class SubscriptionStateTest {
assertFalse(state.isOffsetResetNeeded(tp0));
}
+ @Test
+ public void nullPositionLagOnNoPosition() {
+ state.assignFromUser(Collections.singleton(tp0));
+
+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
+
+ state.updateHighWatermark(tp0, 1L);
+ state.updateLastStableOffset(tp0, 1L);
+
+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
+ }
+
}