84 lines
4.1 KiB
Diff
84 lines
4.1 KiB
Diff
|
|
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
|
||
|
|
index 1e710fdf6a..33f587f480 100644
|
||
|
|
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
|
||
|
|
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
|
||
|
|
@@ -576,7 +576,7 @@ public class Selector implements Selectable, AutoCloseable {
|
||
|
|
attemptRead(channel);
|
||
|
|
}
|
||
|
|
|
||
|
|
- if (channel.hasBytesBuffered()) {
|
||
|
|
+ if (channel.hasBytesBuffered() && !explicitlyMutedChannels.contains(channel)) {
|
||
|
|
//this channel has bytes enqueued in intermediary buffers that we could not read
|
||
|
|
//(possibly because no memory). it may be the case that the underlying socket will
|
||
|
|
//not come up in the next poll() and so we need to remember this channel for the
|
||
|
|
@@ -742,6 +742,7 @@ public class Selector implements Selectable, AutoCloseable {
|
||
|
|
private void mute(KafkaChannel channel) {
|
||
|
|
channel.mute();
|
||
|
|
explicitlyMutedChannels.add(channel);
|
||
|
|
+ keysWithBufferedRead.remove(channel.selectionKey());
|
||
|
|
}
|
||
|
|
|
||
|
|
@Override
|
||
|
|
@@ -754,6 +755,9 @@ public class Selector implements Selectable, AutoCloseable {
|
||
|
|
// Remove the channel from explicitlyMutedChannels only if the channel has been actually unmuted.
|
||
|
|
if (channel.maybeUnmute()) {
|
||
|
|
explicitlyMutedChannels.remove(channel);
|
||
|
|
+ if (channel.hasBytesBuffered()) {
|
||
|
|
+ keysWithBufferedRead.add(channel.selectionKey());
|
||
|
|
+ }
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
|
||
|
|
index 293614432c..fbc5563392 100644
|
||
|
|
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
|
||
|
|
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
|
||
|
|
@@ -1560,8 +1560,15 @@ class SocketServerTest {
|
||
|
|
val testableSelector = testableServer.testableSelector
|
||
|
|
testableSelector.updateMinWakeup(2)
|
||
|
|
|
||
|
|
+ val sleepTimeMs = idleTimeMs / 2 + 1
|
||
|
|
val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer)
|
||
|
|
- time.sleep(idleTimeMs + 1)
|
||
|
|
+ // advance mock time in increments to verify that muted sockets with buffered data dont have their idle time updated
|
||
|
|
+ // additional calls to poll() should not update the channel last idle time
|
||
|
|
+ for (_ <- 0 to 3) {
|
||
|
|
+ time.sleep(sleepTimeMs)
|
||
|
|
+ testableSelector.operationCounts.clear()
|
||
|
|
+ testableSelector.waitForOperations(SelectorOperation.Poll, 1)
|
||
|
|
+ }
|
||
|
|
testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = false)
|
||
|
|
|
||
|
|
val otherSocket = sslConnect(testableServer)
|
||
|
|
@@ -1574,6 +1581,30 @@ class SocketServerTest {
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
+ @Test
|
||
|
|
+ def testUnmuteChannelWithBufferedReceives(): Unit = {
|
||
|
|
+ val time = new MockTime()
|
||
|
|
+ props ++= sslServerProps
|
||
|
|
+ val testableServer = new TestableSocketServer(time = time)
|
||
|
|
+ testableServer.startup()
|
||
|
|
+ val proxyServer = new ProxyServer(testableServer)
|
||
|
|
+ try {
|
||
|
|
+ val testableSelector = testableServer.testableSelector
|
||
|
|
+ val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer)
|
||
|
|
+ testableSelector.operationCounts.clear()
|
||
|
|
+ testableSelector.waitForOperations(SelectorOperation.Poll, 1)
|
||
|
|
+ val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
|
||
|
|
+ assertEquals(Set.empty, keysWithBufferedRead.asScala)
|
||
|
|
+ processRequest(testableServer.dataPlaneRequestChannel, request)
|
||
|
|
+ // buffered requests should be processed after channel is unmuted
|
||
|
|
+ receiveRequest(testableServer.dataPlaneRequestChannel)
|
||
|
|
+ socket.close()
|
||
|
|
+ } finally {
|
||
|
|
+ proxyServer.close()
|
||
|
|
+ shutdownServerAndMetrics(testableServer)
|
||
|
|
+ }
|
||
|
|
+ }
|
||
|
|
+
|
||
|
|
/**
|
||
|
|
* Tests exception handling in [[Processor.processCompletedReceives]]. Exception is
|
||
|
|
* injected into [[Selector.mute]] which is used to mute the channel when a receive is complete.
|