diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 1e710fdf6a..33f587f480 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -576,7 +576,7 @@ public class Selector implements Selectable, AutoCloseable { attemptRead(channel); } - if (channel.hasBytesBuffered()) { + if (channel.hasBytesBuffered() && !explicitlyMutedChannels.contains(channel)) { //this channel has bytes enqueued in intermediary buffers that we could not read //(possibly because no memory). it may be the case that the underlying socket will //not come up in the next poll() and so we need to remember this channel for the @@ -742,6 +742,7 @@ public class Selector implements Selectable, AutoCloseable { private void mute(KafkaChannel channel) { channel.mute(); explicitlyMutedChannels.add(channel); + keysWithBufferedRead.remove(channel.selectionKey()); } @Override @@ -754,6 +755,9 @@ public class Selector implements Selectable, AutoCloseable { // Remove the channel from explicitlyMutedChannels only if the channel has been actually unmuted. if (channel.maybeUnmute()) { explicitlyMutedChannels.remove(channel); + if (channel.hasBytesBuffered()) { + keysWithBufferedRead.add(channel.selectionKey()); + } } } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 293614432c..fbc5563392 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -1560,8 +1560,15 @@ class SocketServerTest { val testableSelector = testableServer.testableSelector testableSelector.updateMinWakeup(2) + val sleepTimeMs = idleTimeMs / 2 + 1 val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer) - time.sleep(idleTimeMs + 1) + // advance mock time in increments to verify that muted sockets with buffered data dont have their idle time updated + // additional calls to poll() should not update the channel last idle time + for (_ <- 0 to 3) { + time.sleep(sleepTimeMs) + testableSelector.operationCounts.clear() + testableSelector.waitForOperations(SelectorOperation.Poll, 1) + } testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = false) val otherSocket = sslConnect(testableServer) @@ -1574,6 +1581,30 @@ class SocketServerTest { } } + @Test + def testUnmuteChannelWithBufferedReceives(): Unit = { + val time = new MockTime() + props ++= sslServerProps + val testableServer = new TestableSocketServer(time = time) + testableServer.startup() + val proxyServer = new ProxyServer(testableServer) + try { + val testableSelector = testableServer.testableSelector + val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer) + testableSelector.operationCounts.clear() + testableSelector.waitForOperations(SelectorOperation.Poll, 1) + val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead") + assertEquals(Set.empty, keysWithBufferedRead.asScala) + processRequest(testableServer.dataPlaneRequestChannel, request) + // buffered requests should be processed after channel is unmuted + receiveRequest(testableServer.dataPlaneRequestChannel) + socket.close() + } finally { + proxyServer.close() + shutdownServerAndMetrics(testableServer) + } + } + /** * Tests exception handling in [[Processor.processCompletedReceives]]. Exception is * injected into [[Selector.mute]] which is used to mute the channel when a receive is complete.