580 lines
22 KiB
Diff
580 lines
22 KiB
Diff
From ce4b154e7b48f66bd98858626347747cd2514311 Mon Sep 17 00:00:00 2001
|
|
From: Mark Thomas <markt@apache.org>
|
|
Date: Thu, 18 Feb 2021 16:41:57 +0000
|
|
Subject: [PATCH] Ensure ReadListener.onError() is fired if client drops the
|
|
connection
|
|
|
|
Origin:
|
|
https://github.com/apache/tomcat/commit/659b28c00d94e2a9049e0a8ac1e02bd4d36dd005
|
|
https://github.com/apache/tomcat/commit/f562edd3302866f34c0ca9fa97f6ff414450f1ae
|
|
https://github.com/apache/tomcat/commit/918146f9d04af67d904b47c440acaab14380521b
|
|
https://github.com/apache/tomcat/commit/504445cd2c618fb1edbfeda62e07e1c29b4d285c
|
|
https://github.com/apache/tomcat/commit/ce4b154e7b48f66bd98858626347747cd2514311
|
|
|
|
---
|
|
.../catalina/core/StandardWrapperValve.java | 2 +
|
|
.../coyote/http11/Http11InputBuffer.java | 43 +++-
|
|
.../coyote/http11/Http11OutputBuffer.java | 15 +-
|
|
.../catalina/core/TestAsyncContextImpl.java | 172 +++++++++++++++-
|
|
.../nonblocking/TestNonBlockingAPI.java | 192 ++++++++++++++++++
|
|
5 files changed, 412 insertions(+), 12 deletions(-)
|
|
|
|
diff --git a/java/org/apache/catalina/core/StandardWrapperValve.java b/java/org/apache/catalina/core/StandardWrapperValve.java
|
|
index 27f136a..89f5915 100644
|
|
--- a/java/org/apache/catalina/core/StandardWrapperValve.java
|
|
+++ b/java/org/apache/catalina/core/StandardWrapperValve.java
|
|
@@ -29,6 +29,7 @@ import javax.servlet.ServletException;
|
|
import javax.servlet.UnavailableException;
|
|
import javax.servlet.http.HttpServletResponse;
|
|
|
|
+import org.apache.catalina.Container;
|
|
import org.apache.catalina.Context;
|
|
import org.apache.catalina.Globals;
|
|
import org.apache.catalina.LifecycleException;
|
|
@@ -174,6 +175,7 @@ final class StandardWrapperValve
|
|
|
|
// Call the filter chain for this request
|
|
// NOTE: This also calls the servlet's service() method
|
|
+ Container container = this.container;
|
|
try {
|
|
if ((servlet != null) && (filterChain != null)) {
|
|
// Swallow output if needed
|
|
diff --git a/java/org/apache/coyote/http11/Http11InputBuffer.java b/java/org/apache/coyote/http11/Http11InputBuffer.java
|
|
index 27392d4..db596b4 100644
|
|
--- a/java/org/apache/coyote/http11/Http11InputBuffer.java
|
|
+++ b/java/org/apache/coyote/http11/Http11InputBuffer.java
|
|
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.util.Arrays;
|
|
|
|
+import org.apache.coyote.CloseNowException;
|
|
import org.apache.coyote.InputBuffer;
|
|
import org.apache.coyote.Request;
|
|
import org.apache.juli.logging.Log;
|
|
@@ -382,10 +383,6 @@ public class Http11InputBuffer implements InputBuffer, ApplicationBufferHandler
|
|
|
|
parsingRequestLineStart = byteBuffer.position();
|
|
parsingRequestLinePhase = 2;
|
|
- if (log.isDebugEnabled()) {
|
|
- log.debug("Received ["
|
|
- + new String(byteBuffer.array(), byteBuffer.position(), byteBuffer.remaining(), StandardCharsets.ISO_8859_1) + "]");
|
|
- }
|
|
}
|
|
if (parsingRequestLinePhase == 2) {
|
|
//
|
|
@@ -709,6 +706,16 @@ public class Http11InputBuffer implements InputBuffer, ApplicationBufferHandler
|
|
*/
|
|
private boolean fill(boolean block) throws IOException {
|
|
|
|
+ if (log.isDebugEnabled()) {
|
|
+ log.debug("Before fill(): parsingHeader: [" + parsingHeader +
|
|
+ "], parsingRequestLine: [" + parsingRequestLine +
|
|
+ "], parsingRequestLinePhase: [" + parsingRequestLinePhase +
|
|
+ "], parsingRequestLineStart: [" + parsingRequestLineStart +
|
|
+ "], byteBuffer.position(): [" + byteBuffer.position() +
|
|
+ "], byteBuffer.limit(): [" + byteBuffer.limit() +
|
|
+ "], end: [" + end + "]");
|
|
+ }
|
|
+
|
|
if (parsingHeader) {
|
|
if (byteBuffer.limit() >= headerBufferSize) {
|
|
if (parsingRequestLine) {
|
|
@@ -721,13 +728,31 @@ public class Http11InputBuffer implements InputBuffer, ApplicationBufferHandler
|
|
byteBuffer.limit(end).position(end);
|
|
}
|
|
|
|
+ int nRead = -1;
|
|
byteBuffer.mark();
|
|
- if (byteBuffer.position() < byteBuffer.limit()) {
|
|
- byteBuffer.position(byteBuffer.limit());
|
|
+ try {
|
|
+ if (byteBuffer.position() < byteBuffer.limit()) {
|
|
+ byteBuffer.position(byteBuffer.limit());
|
|
+ }
|
|
+ byteBuffer.limit(byteBuffer.capacity());
|
|
+ SocketWrapperBase<?> socketWrapper = this.wrapper;
|
|
+ if (socketWrapper != null) {
|
|
+ nRead = socketWrapper.read(block, byteBuffer);
|
|
+ } else {
|
|
+ throw new CloseNowException(sm.getString("iib.eof.error"));
|
|
+ }
|
|
+ } finally {
|
|
+ // Ensure that the buffer limit and position are returned to a
|
|
+ // consistent "ready for read" state if an error occurs during in
|
|
+ // the above code block.
|
|
+ byteBuffer.limit(byteBuffer.position()).reset();
|
|
}
|
|
- byteBuffer.limit(byteBuffer.capacity());
|
|
- int nRead = wrapper.read(block, byteBuffer);
|
|
- byteBuffer.limit(byteBuffer.position()).reset();
|
|
+
|
|
+ if (log.isDebugEnabled()) {
|
|
+ log.debug("Received ["
|
|
+ + new String(byteBuffer.array(), byteBuffer.position(), byteBuffer.remaining(), StandardCharsets.ISO_8859_1) + "]");
|
|
+ }
|
|
+
|
|
if (nRead > 0) {
|
|
return true;
|
|
} else if (nRead == -1) {
|
|
diff --git a/java/org/apache/coyote/http11/Http11OutputBuffer.java b/java/org/apache/coyote/http11/Http11OutputBuffer.java
|
|
index aa5ad48..c369837 100644
|
|
--- a/java/org/apache/coyote/http11/Http11OutputBuffer.java
|
|
+++ b/java/org/apache/coyote/http11/Http11OutputBuffer.java
|
|
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
|
|
import java.util.Arrays;
|
|
|
|
import org.apache.coyote.ActionCode;
|
|
+import org.apache.coyote.CloseNowException;
|
|
import org.apache.coyote.Response;
|
|
import org.apache.tomcat.util.buf.ByteChunk;
|
|
import org.apache.tomcat.util.buf.MessageBytes;
|
|
@@ -303,7 +304,12 @@ public class Http11OutputBuffer implements HttpOutputBuffer {
|
|
// Sending the response header buffer
|
|
headerBuffer.flip();
|
|
try {
|
|
- socketWrapper.write(isBlocking(), headerBuffer);
|
|
+ SocketWrapperBase<?> socketWrapper = this.socketWrapper;
|
|
+ if (socketWrapper != null) {
|
|
+ socketWrapper.write(isBlocking(), headerBuffer);
|
|
+ } else {
|
|
+ throw new CloseNowException(sm.getString("iob.failedwrite"));
|
|
+ }
|
|
} finally {
|
|
headerBuffer.position(0).limit(headerBuffer.capacity());
|
|
}
|
|
@@ -527,7 +533,12 @@ public class Http11OutputBuffer implements HttpOutputBuffer {
|
|
public int doWrite(ByteBuffer chunk) throws IOException {
|
|
try {
|
|
int len = chunk.remaining();
|
|
- socketWrapper.write(isBlocking(), chunk);
|
|
+ SocketWrapperBase<?> socketWrapper = Http11OutputBuffer.this.socketWrapper;
|
|
+ if (socketWrapper != null) {
|
|
+ socketWrapper.write(isBlocking(), chunk);
|
|
+ } else {
|
|
+ throw new CloseNowException(sm.getString("iob.failedwrite"));
|
|
+ }
|
|
len -= chunk.remaining();
|
|
byteCount += len;
|
|
return len;
|
|
diff --git a/test/org/apache/catalina/core/TestAsyncContextImpl.java b/test/org/apache/catalina/core/TestAsyncContextImpl.java
|
|
index 3f6524b..4023a74 100644
|
|
--- a/test/org/apache/catalina/core/TestAsyncContextImpl.java
|
|
+++ b/test/org/apache/catalina/core/TestAsyncContextImpl.java
|
|
@@ -17,6 +17,7 @@
|
|
package org.apache.catalina.core;
|
|
|
|
import java.io.IOException;
|
|
+import java.io.InputStream;
|
|
import java.io.PrintWriter;
|
|
import java.net.URI;
|
|
import java.net.URISyntaxException;
|
|
@@ -819,7 +820,7 @@ public class TestAsyncContextImpl extends TomcatBaseTest {
|
|
}
|
|
}
|
|
|
|
- private static class TrackingListener implements AsyncListener {
|
|
+ public static class TrackingListener implements AsyncListener {
|
|
|
|
private final boolean completeOnError;
|
|
private final boolean completeOnTimeout;
|
|
@@ -2653,4 +2654,173 @@ public class TestAsyncContextImpl extends TomcatBaseTest {
|
|
}
|
|
|
|
}
|
|
+
|
|
+
|
|
+
|
|
+ /*
|
|
+ * Tests an error on an async thread when the client closes the connection
|
|
+ * before fully writing the request body.
|
|
+ *
|
|
+ * Required sequence is:
|
|
+ * - enter Servlet's service() method
|
|
+ * - startAsync()
|
|
+ * - start async thread
|
|
+ * - read partial body
|
|
+ * - close client connection
|
|
+ * - read on async thread -> I/O error
|
|
+ * - exit Servlet's service() method
|
|
+ *
|
|
+ * This test makes extensive use of instance fields in the Servlet that
|
|
+ * would normally be considered very poor practice. It is only safe in this
|
|
+ * test as the Servlet only processes a single request.
|
|
+ */
|
|
+ @Test
|
|
+ public void testCanceledPost() throws Exception {
|
|
+ CountDownLatch partialReadLatch = new CountDownLatch(1);
|
|
+ CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
|
+ CountDownLatch threadCompleteLatch = new CountDownLatch(1);
|
|
+
|
|
+ AtomicBoolean testFailed = new AtomicBoolean(true);
|
|
+
|
|
+ // Setup Tomcat instance
|
|
+ Tomcat tomcat = getTomcatInstance();
|
|
+
|
|
+ // No file system docBase required
|
|
+ Context ctx = tomcat.addContext("", null);
|
|
+
|
|
+ PostServlet postServlet = new PostServlet(partialReadLatch, clientCloseLatch, threadCompleteLatch, testFailed);
|
|
+ Wrapper wrapper = Tomcat.addServlet(ctx, "postServlet", postServlet);
|
|
+ wrapper.setAsyncSupported(true);
|
|
+ ctx.addServletMappingDecoded("/*", "postServlet");
|
|
+
|
|
+ tomcat.start();
|
|
+
|
|
+ PostClient client = new PostClient();
|
|
+ client.setPort(getPort());
|
|
+ client.setRequest(new String[] { "POST / HTTP/1.1" + SimpleHttpClient.CRLF +
|
|
+ "Host: localhost:" + SimpleHttpClient.CRLF +
|
|
+ "Content-Length: 100" + SimpleHttpClient.CRLF +
|
|
+ SimpleHttpClient.CRLF +
|
|
+ "This is 16 bytes"
|
|
+ });
|
|
+ client.connect();
|
|
+ client.sendRequest();
|
|
+
|
|
+ // Wait server to read partial request body
|
|
+ partialReadLatch.await();
|
|
+
|
|
+ client.disconnect();
|
|
+
|
|
+ clientCloseLatch.countDown();
|
|
+
|
|
+ threadCompleteLatch.await();
|
|
+
|
|
+ Assert.assertFalse(testFailed.get());
|
|
+ }
|
|
+
|
|
+
|
|
+ private static final class PostClient extends SimpleHttpClient {
|
|
+
|
|
+ @Override
|
|
+ public boolean isResponseBodyOK() {
|
|
+ return true;
|
|
+ }
|
|
+ }
|
|
+
|
|
+
|
|
+ private static final class PostServlet extends HttpServlet {
|
|
+
|
|
+ private static final long serialVersionUID = 1L;
|
|
+
|
|
+ private final transient CountDownLatch partialReadLatch;
|
|
+ private final transient CountDownLatch clientCloseLatch;
|
|
+ private final transient CountDownLatch threadCompleteLatch;
|
|
+ private final AtomicBoolean testFailed;
|
|
+
|
|
+ public PostServlet(CountDownLatch doPostLatch, CountDownLatch clientCloseLatch,
|
|
+ CountDownLatch threadCompleteLatch, AtomicBoolean testFailed) {
|
|
+ this.partialReadLatch = doPostLatch;
|
|
+ this.clientCloseLatch = clientCloseLatch;
|
|
+ this.threadCompleteLatch = threadCompleteLatch;
|
|
+ this.testFailed = testFailed;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp)
|
|
+ throws ServletException, IOException {
|
|
+
|
|
+ AsyncContext ac = req.startAsync();
|
|
+ Thread t = new PostServletThread(ac, partialReadLatch, clientCloseLatch, threadCompleteLatch, testFailed);
|
|
+ t.start();
|
|
+
|
|
+ try {
|
|
+ threadCompleteLatch.await();
|
|
+ } catch (InterruptedException e) {
|
|
+ // Ignore
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+
|
|
+ private static final class PostServletThread extends Thread {
|
|
+
|
|
+ private final AsyncContext ac;
|
|
+ private final CountDownLatch partialReadLatch;
|
|
+ private final CountDownLatch clientCloseLatch;
|
|
+ private final CountDownLatch threadCompleteLatch;
|
|
+ private final AtomicBoolean testFailed;
|
|
+
|
|
+ public PostServletThread(AsyncContext ac, CountDownLatch partialReadLatch, CountDownLatch clientCloseLatch,
|
|
+ CountDownLatch threadCompleteLatch, AtomicBoolean testFailed) {
|
|
+ this.ac = ac;
|
|
+ this.partialReadLatch = partialReadLatch;
|
|
+ this.clientCloseLatch = clientCloseLatch;
|
|
+ this.threadCompleteLatch = threadCompleteLatch;
|
|
+ this.testFailed = testFailed;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void run() {
|
|
+ try {
|
|
+ int bytesRead = 0;
|
|
+ byte[] buffer = new byte[32];
|
|
+ InputStream is = null;
|
|
+
|
|
+ try {
|
|
+ is = ac.getRequest().getInputStream();
|
|
+
|
|
+ // Read the partial request body
|
|
+ while (bytesRead < 16) {
|
|
+ int read = is.read(buffer);
|
|
+ if (read == -1) {
|
|
+ // Error condition
|
|
+ return;
|
|
+ }
|
|
+ bytesRead += read;
|
|
+ }
|
|
+ } catch (IOException ioe) {
|
|
+ // Error condition
|
|
+ return;
|
|
+ } finally {
|
|
+ partialReadLatch.countDown();
|
|
+ }
|
|
+
|
|
+ // Wait for client to close connection
|
|
+ clientCloseLatch.await();
|
|
+
|
|
+ // Read again
|
|
+ try {
|
|
+ is.read();
|
|
+ } catch (IOException e) {
|
|
+ e.printStackTrace();
|
|
+ // Required. Clear the error marker.
|
|
+ testFailed.set(false);
|
|
+ }
|
|
+ } catch (InterruptedException e) {
|
|
+ // Ignore
|
|
+ } finally {
|
|
+ threadCompleteLatch.countDown();
|
|
+ }
|
|
+ }
|
|
+ }
|
|
}
|
|
diff --git a/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java b/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
|
|
index 7130b11..6868375 100644
|
|
--- a/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
|
|
+++ b/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
|
|
@@ -32,6 +32,9 @@ import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.TimeUnit;
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
+import java.util.logging.Level;
|
|
+import java.util.logging.LogManager;
|
|
|
|
import javax.net.SocketFactory;
|
|
import javax.servlet.AsyncContext;
|
|
@@ -44,6 +47,7 @@ import javax.servlet.ServletInputStream;
|
|
import javax.servlet.ServletOutputStream;
|
|
import javax.servlet.WriteListener;
|
|
import javax.servlet.annotation.WebServlet;
|
|
+import javax.servlet.http.HttpServlet;
|
|
import javax.servlet.http.HttpServletRequest;
|
|
import javax.servlet.http.HttpServletResponse;
|
|
|
|
@@ -52,7 +56,9 @@ import org.junit.Ignore;
|
|
import org.junit.Test;
|
|
|
|
import org.apache.catalina.Context;
|
|
+import org.apache.catalina.Wrapper;
|
|
import org.apache.catalina.startup.BytesStreamer;
|
|
+import org.apache.catalina.startup.SimpleHttpClient;
|
|
import org.apache.catalina.startup.TesterServlet;
|
|
import org.apache.catalina.startup.Tomcat;
|
|
import org.apache.catalina.startup.TomcatBaseTest;
|
|
@@ -997,4 +1003,190 @@ public class TestNonBlockingAPI extends TomcatBaseTest {
|
|
|
|
}
|
|
}
|
|
+
|
|
+
|
|
+ /*
|
|
+ * Tests an error on an non-blocking read when the client closes the
|
|
+ * connection before fully writing the request body.
|
|
+ *
|
|
+ * Required sequence is:
|
|
+ * - enter Servlet's service() method
|
|
+ * - startAsync()
|
|
+ * - configure non-blocking read
|
|
+ * - read partial body
|
|
+ * - close client connection
|
|
+ * - error is triggered
|
|
+ * - exit Servlet's service() method
|
|
+ *
|
|
+ * This test makes extensive use of instance fields in the Servlet that
|
|
+ * would normally be considered very poor practice. It is only safe in this
|
|
+ * test as the Servlet only processes a single request.
|
|
+ */
|
|
+ @Test
|
|
+ public void testCanceledPost() throws Exception {
|
|
+
|
|
+ LogManager.getLogManager().getLogger("org.apache.coyote").setLevel(Level.ALL);
|
|
+ LogManager.getLogManager().getLogger("org.apache.tomcat.util.net").setLevel(Level.ALL);
|
|
+
|
|
+ CountDownLatch partialReadLatch = new CountDownLatch(1);
|
|
+ CountDownLatch completeLatch = new CountDownLatch(1);
|
|
+
|
|
+ AtomicBoolean testFailed = new AtomicBoolean(true);
|
|
+
|
|
+ // Setup Tomcat instance
|
|
+ Tomcat tomcat = getTomcatInstance();
|
|
+
|
|
+ // No file system docBase required
|
|
+ Context ctx = tomcat.addContext("", null);
|
|
+
|
|
+ PostServlet postServlet = new PostServlet(partialReadLatch, completeLatch, testFailed);
|
|
+ Wrapper wrapper = Tomcat.addServlet(ctx, "postServlet", postServlet);
|
|
+ wrapper.setAsyncSupported(true);
|
|
+ ctx.addServletMappingDecoded("/*", "postServlet");
|
|
+
|
|
+ tomcat.start();
|
|
+
|
|
+ PostClient client = new PostClient();
|
|
+ client.setPort(getPort());
|
|
+ client.setRequest(new String[] { "POST / HTTP/1.1" + SimpleHttpClient.CRLF +
|
|
+ "Host: localhost:" + SimpleHttpClient.CRLF +
|
|
+ "Content-Length: 100" + SimpleHttpClient.CRLF +
|
|
+ SimpleHttpClient.CRLF +
|
|
+ "This is 16 bytes"
|
|
+ });
|
|
+ client.connect();
|
|
+ client.sendRequest();
|
|
+
|
|
+ // Wait server to read partial request body
|
|
+ partialReadLatch.await();
|
|
+
|
|
+ client.disconnect();
|
|
+
|
|
+ completeLatch.await();
|
|
+
|
|
+ Assert.assertFalse(testFailed.get());
|
|
+ }
|
|
+
|
|
+
|
|
+ private static final class PostClient extends SimpleHttpClient {
|
|
+
|
|
+ @Override
|
|
+ public boolean isResponseBodyOK() {
|
|
+ return true;
|
|
+ }
|
|
+ }
|
|
+
|
|
+
|
|
+ private static final class PostServlet extends HttpServlet {
|
|
+
|
|
+ private static final long serialVersionUID = 1L;
|
|
+
|
|
+ private final transient CountDownLatch partialReadLatch;
|
|
+ private final transient CountDownLatch completeLatch;
|
|
+ private final AtomicBoolean testFailed;
|
|
+
|
|
+ public PostServlet(CountDownLatch doPostLatch, CountDownLatch completeLatch, AtomicBoolean testFailed) {
|
|
+ this.partialReadLatch = doPostLatch;
|
|
+ this.completeLatch = completeLatch;
|
|
+ this.testFailed = testFailed;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp)
|
|
+ throws ServletException, IOException {
|
|
+
|
|
+ AsyncContext ac = req.startAsync();
|
|
+ ac.setTimeout(-1);
|
|
+ CanceledPostAsyncListener asyncListener = new CanceledPostAsyncListener(completeLatch);
|
|
+ ac.addListener(asyncListener);
|
|
+
|
|
+ CanceledPostReadListener readListener = new CanceledPostReadListener(ac, partialReadLatch, testFailed);
|
|
+ req.getInputStream().setReadListener(readListener);
|
|
+ }
|
|
+ }
|
|
+
|
|
+
|
|
+ private static final class CanceledPostAsyncListener implements AsyncListener {
|
|
+
|
|
+ private final transient CountDownLatch completeLatch;
|
|
+
|
|
+ public CanceledPostAsyncListener(CountDownLatch completeLatch) {
|
|
+ this.completeLatch = completeLatch;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void onComplete(AsyncEvent event) throws IOException {
|
|
+ System.out.println("complete");
|
|
+ completeLatch.countDown();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void onTimeout(AsyncEvent event) throws IOException {
|
|
+ System.out.println("onTimeout");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void onError(AsyncEvent event) throws IOException {
|
|
+ System.out.println("onError-async");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void onStartAsync(AsyncEvent event) throws IOException {
|
|
+ System.out.println("onStartAsync");
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static final class CanceledPostReadListener implements ReadListener {
|
|
+
|
|
+ private final AsyncContext ac;
|
|
+ private final CountDownLatch partialReadLatch;
|
|
+ private final AtomicBoolean testFailed;
|
|
+ private int totalRead = 0;
|
|
+
|
|
+ public CanceledPostReadListener(AsyncContext ac, CountDownLatch partialReadLatch, AtomicBoolean testFailed) {
|
|
+ this.ac = ac;
|
|
+ this.partialReadLatch = partialReadLatch;
|
|
+ this.testFailed = testFailed;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void onDataAvailable() throws IOException {
|
|
+ ServletInputStream sis = ac.getRequest().getInputStream();
|
|
+ boolean isReady;
|
|
+
|
|
+ byte[] buffer = new byte[32];
|
|
+ do {
|
|
+ if (partialReadLatch.getCount() == 0) {
|
|
+ System.out.println("debug");
|
|
+ }
|
|
+ int bytesRead = sis.read(buffer);
|
|
+
|
|
+ if (bytesRead == -1) {
|
|
+ return;
|
|
+ }
|
|
+ totalRead += bytesRead;
|
|
+ isReady = sis.isReady();
|
|
+ System.out.println("Read [" + bytesRead +
|
|
+ "], buffer [" + new String(buffer, 0, bytesRead, StandardCharsets.UTF_8) +
|
|
+ "], total read [" + totalRead +
|
|
+ "], isReady [" + isReady + "]");
|
|
+ } while (isReady);
|
|
+ if (totalRead == 16) {
|
|
+ partialReadLatch.countDown();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void onAllDataRead() throws IOException {
|
|
+ ac.complete();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void onError(Throwable throwable) {
|
|
+ throwable.printStackTrace();
|
|
+ // This is the expected behaviour so clear the failed flag.
|
|
+ testFailed.set(false);
|
|
+ ac.complete();
|
|
+ }
|
|
+ }
|
|
}
|
|
--
|
|
2.33.0
|
|
|