52 lines
2.8 KiB
Diff
52 lines
2.8 KiB
Diff
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
|
|
index 30491110a3..ce81aa1b95 100644
|
|
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
|
|
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
|
|
@@ -539,10 +539,13 @@ public class SubscriptionState {
|
|
|
|
synchronized Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) {
|
|
TopicPartitionState topicPartitionState = assignedState(tp);
|
|
- if (isolationLevel == IsolationLevel.READ_COMMITTED)
|
|
+ if (topicPartitionState.position == null) {
|
|
+ return null;
|
|
+ } else if (isolationLevel == IsolationLevel.READ_COMMITTED) {
|
|
return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset - topicPartitionState.position.offset;
|
|
- else
|
|
+ } else {
|
|
return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position.offset;
|
|
+ }
|
|
}
|
|
|
|
synchronized Long partitionLead(TopicPartition tp) {
|
|
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
|
|
index d6e88008b5..d19234fe8a 100644
|
|
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
|
|
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
|
|
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
|
|
import org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation;
|
|
+import org.apache.kafka.common.IsolationLevel;
|
|
import org.apache.kafka.common.Node;
|
|
import org.apache.kafka.common.TopicPartition;
|
|
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
|
|
@@ -794,4 +795,18 @@ public class SubscriptionStateTest {
|
|
assertFalse(state.isOffsetResetNeeded(tp0));
|
|
}
|
|
|
|
+ @Test
|
|
+ public void nullPositionLagOnNoPosition() {
|
|
+ state.assignFromUser(Collections.singleton(tp0));
|
|
+
|
|
+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
|
|
+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
|
|
+
|
|
+ state.updateHighWatermark(tp0, 1L);
|
|
+ state.updateLastStableOffset(tp0, 1L);
|
|
+
|
|
+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
|
|
+ assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
|
|
+ }
|
|
+
|
|
}
|