Don't update connection idle time for muted connections
This commit is contained in:
parent
d32c54e67c
commit
20910bdc7d
83
0010-not-update-connection.patch
Normal file
83
0010-not-update-connection.patch
Normal file
@ -0,0 +1,83 @@
|
||||
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
|
||||
index 1e710fdf6a..33f587f480 100644
|
||||
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
|
||||
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
|
||||
@@ -576,7 +576,7 @@ public class Selector implements Selectable, AutoCloseable {
|
||||
attemptRead(channel);
|
||||
}
|
||||
|
||||
- if (channel.hasBytesBuffered()) {
|
||||
+ if (channel.hasBytesBuffered() && !explicitlyMutedChannels.contains(channel)) {
|
||||
//this channel has bytes enqueued in intermediary buffers that we could not read
|
||||
//(possibly because no memory). it may be the case that the underlying socket will
|
||||
//not come up in the next poll() and so we need to remember this channel for the
|
||||
@@ -742,6 +742,7 @@ public class Selector implements Selectable, AutoCloseable {
|
||||
private void mute(KafkaChannel channel) {
|
||||
channel.mute();
|
||||
explicitlyMutedChannels.add(channel);
|
||||
+ keysWithBufferedRead.remove(channel.selectionKey());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -754,6 +755,9 @@ public class Selector implements Selectable, AutoCloseable {
|
||||
// Remove the channel from explicitlyMutedChannels only if the channel has been actually unmuted.
|
||||
if (channel.maybeUnmute()) {
|
||||
explicitlyMutedChannels.remove(channel);
|
||||
+ if (channel.hasBytesBuffered()) {
|
||||
+ keysWithBufferedRead.add(channel.selectionKey());
|
||||
+ }
|
||||
}
|
||||
}
|
||||
|
||||
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
|
||||
index 293614432c..fbc5563392 100644
|
||||
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
|
||||
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
|
||||
@@ -1560,8 +1560,15 @@ class SocketServerTest {
|
||||
val testableSelector = testableServer.testableSelector
|
||||
testableSelector.updateMinWakeup(2)
|
||||
|
||||
+ val sleepTimeMs = idleTimeMs / 2 + 1
|
||||
val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer)
|
||||
- time.sleep(idleTimeMs + 1)
|
||||
+ // advance mock time in increments to verify that muted sockets with buffered data dont have their idle time updated
|
||||
+ // additional calls to poll() should not update the channel last idle time
|
||||
+ for (_ <- 0 to 3) {
|
||||
+ time.sleep(sleepTimeMs)
|
||||
+ testableSelector.operationCounts.clear()
|
||||
+ testableSelector.waitForOperations(SelectorOperation.Poll, 1)
|
||||
+ }
|
||||
testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = false)
|
||||
|
||||
val otherSocket = sslConnect(testableServer)
|
||||
@@ -1574,6 +1581,30 @@ class SocketServerTest {
|
||||
}
|
||||
}
|
||||
|
||||
+ @Test
|
||||
+ def testUnmuteChannelWithBufferedReceives(): Unit = {
|
||||
+ val time = new MockTime()
|
||||
+ props ++= sslServerProps
|
||||
+ val testableServer = new TestableSocketServer(time = time)
|
||||
+ testableServer.startup()
|
||||
+ val proxyServer = new ProxyServer(testableServer)
|
||||
+ try {
|
||||
+ val testableSelector = testableServer.testableSelector
|
||||
+ val (socket, request) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer)
|
||||
+ testableSelector.operationCounts.clear()
|
||||
+ testableSelector.waitForOperations(SelectorOperation.Poll, 1)
|
||||
+ val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
|
||||
+ assertEquals(Set.empty, keysWithBufferedRead.asScala)
|
||||
+ processRequest(testableServer.dataPlaneRequestChannel, request)
|
||||
+ // buffered requests should be processed after channel is unmuted
|
||||
+ receiveRequest(testableServer.dataPlaneRequestChannel)
|
||||
+ socket.close()
|
||||
+ } finally {
|
||||
+ proxyServer.close()
|
||||
+ shutdownServerAndMetrics(testableServer)
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
/**
|
||||
* Tests exception handling in [[Processor.processCompletedReceives]]. Exception is
|
||||
* injected into [[Selector.mute]] which is used to mute the channel when a receive is complete.
|
||||
@ -4,7 +4,7 @@
|
||||
|
||||
Name: kafka
|
||||
Version: 2.8.2
|
||||
Release: 9
|
||||
Release: 10
|
||||
Summary: A Distributed Streaming Platform.
|
||||
|
||||
License: Apache-2.0
|
||||
@ -21,6 +21,7 @@ Patch5: 0006-NPE-subscriptionState.patch
|
||||
Patch6: 0007-fix-payload-incorrectly.patch
|
||||
Patch7: 0008-Cast-SMT-allow-null.patch
|
||||
Patch8: 0009-format-RocksDBConfigSetter.patch
|
||||
Patch9: 0010-not-update-connection.patch
|
||||
|
||||
BuildRequires: systemd java-1.8.0-openjdk-devel
|
||||
Provides: kafka = %{version}
|
||||
@ -72,6 +73,8 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses
|
||||
rm -rf %{buildroot}
|
||||
|
||||
%changelog
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-10
|
||||
- Don't update connection idle time for muted connections
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-9
|
||||
- Fix the formatting of example RocksDBConfigSetter
|
||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-8
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user