trafficserver/CVE-2024-38311.patch
starlet-dx 0af0d27de3 Fix CVE-2024-38311,CVE-2024-56195 and CVE-2024-56202
(cherry picked from commit 1be79f85e0ecdd6927504eaa16b594d42a1beba1)
2025-03-07 10:53:04 +08:00

1581 lines
62 KiB
Diff

From a16c4b6bb0b126047c68dafbdf6311ac1586fc0b Mon Sep 17 00:00:00 2001
From: Bryan Call <bcall@apache.org>
Date: Tue, 4 Mar 2025 11:39:44 -0800
Subject: [PATCH] Fix chunked pipelined requests (#12071)
* Fix pipelined request for chunked bodies
This addresses a variety of bugs concerning pipelined requests. In
particular, the HttpTunnel logic had fundamentally assumed that it could
consume all bytes available in the producer's reader. If a request was
pipelined after a previous request that had a chunked body, this would
result in the second request being unparsed and either sent along to the
origin or dropped on the floor, depening on configuration. This adds an
explicit autest for pipelined requests and addresses these issues.
This patch largely does the following:
1. Updates the copy_partial_post_data data to take the number of bytes
it consumes rather than consuming all bytes in the reader. It also
now returns the number of bytes it consumes, which the tunnel needs
to keep track of the number of bytes it processes.
2. Previous to this patch, the HttpTunnel assumed that it could consume
all bytes in the reader originally passed to it (all bytes in
init_bytes_done). This simply will not work for pipelined requests.
This addresses this issue by adding a new variable to the tunnel:
bytes_consumed. This way the tunnel can keep track of how many bytes
it consumed while processing the request body, which allows the
HttpSM to later process just the right number of bytes from its
reader rather than eating into any pipelined requests that follow it.
3. The HttpSM must not consume bytes from its client reader that are
pipelined requests. It now uses the tunnel's processing
bytes_consumed to process bytes from its reader rather than simply
consuming all read_available() bytes from it.
* Fix bytes consumed chunk computation
Fix a possible miscalculation of bytes consumed while parsing chunked
content.
* is_read_closed fix
* Verify expected responses are received.
* Updated formatting
---------
Co-authored-by: Brian Neradt <brian.neradt@gmail.com>
---
proxy/ProxyTransaction.cc | 6 +
proxy/ProxyTransaction.h | 1 +
proxy/http/HttpSM.cc | 70 ++++--
proxy/http/HttpSM.h | 14 +-
proxy/http/HttpTunnel.cc | 171 +++++++++------
proxy/http/HttpTunnel.h | 151 +++++++++++--
proxy/http2/Http2Stream.cc | 6 +
proxy/http2/Http2Stream.h | 1 +
src/traffic_server/FetchSM.cc | 3 +-
tests/gold_tests/pipeline/pipeline.test.py | 137 ++++++++++++
tests/gold_tests/pipeline/pipeline_client.py | 104 +++++++++
tests/gold_tests/pipeline/pipeline_server.py | 201 ++++++++++++++++++
.../gold_tests/redirect/redirect_post.test.py | 1 +
.../timeout/inactive_client_timeout.test.py | 9 +-
tests/gold_tests/timeout/slow_server.yaml | 37 +++-
15 files changed, 796 insertions(+), 116 deletions(-)
create mode 100644 tests/gold_tests/pipeline/pipeline.test.py
create mode 100644 tests/gold_tests/pipeline/pipeline_client.py
create mode 100644 tests/gold_tests/pipeline/pipeline_server.py
diff --git a/proxy/ProxyTransaction.cc b/proxy/ProxyTransaction.cc
index c56ddb348d3..dfe5e565130 100644
--- a/proxy/ProxyTransaction.cc
+++ b/proxy/ProxyTransaction.cc
@@ -238,6 +238,12 @@ ProxyTransaction::get_version(HTTPHdr &hdr) const
return hdr.version_get();
}
+bool
+ProxyTransaction::is_read_closed() const
+{
+ return false;
+}
+
bool
ProxyTransaction::allow_half_open() const
{
diff --git a/proxy/ProxyTransaction.h b/proxy/ProxyTransaction.h
index 261af6829bd..1117880a04f 100644
--- a/proxy/ProxyTransaction.h
+++ b/proxy/ProxyTransaction.h
@@ -49,6 +49,7 @@ class ProxyTransaction : public VConnection
virtual void set_inactivity_timeout(ink_hrtime timeout_in);
virtual void cancel_inactivity_timeout();
virtual void cancel_active_timeout();
+ virtual bool is_read_closed() const;
// Implement VConnection interface.
VIO *do_io_read(Continuation *c, int64_t nbytes = INT64_MAX, MIOBuffer *buf = nullptr) override;
diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc
index 4e09795f036..cdc05461320 100644
--- a/proxy/http/HttpSM.cc
+++ b/proxy/http/HttpSM.cc
@@ -973,10 +973,8 @@ HttpSM::wait_for_full_body()
// Next order of business if copy the remaining data from the
// header buffer into new buffer
int64_t post_bytes = chunked ? INT64_MAX : t_state.hdr_info.request_content_length;
- client_request_body_bytes =
- post_buffer->write(ua_txn->get_remote_reader(), chunked ? ua_txn->get_remote_reader()->read_avail() : post_bytes);
+ post_buffer->write(ua_txn->get_remote_reader(), chunked ? ua_txn->get_remote_reader()->read_avail() : post_bytes);
- ua_txn->get_remote_reader()->consume(client_request_body_bytes);
p = tunnel.add_producer(ua_entry->vc, post_bytes, buf_start, &HttpSM::tunnel_handler_post_ua, HT_BUFFER_READ, "ua post buffer");
if (chunked) {
bool const drop_chunked_trailers = t_state.http_config_param->oride.http_drop_chunked_trailers == 1;
@@ -3633,7 +3631,24 @@ int
HttpSM::tunnel_handler_post_ua(int event, HttpTunnelProducer *p)
{
STATE_ENTER(&HttpSM::tunnel_handler_post_ua, event);
- client_request_body_bytes = p->init_bytes_done + p->bytes_read;
+
+ // Now that the tunnel is done, it can tell us how many bytes were in the
+ // body.
+ if (client_request_body_bytes == 0) {
+ // This is invoked multiple times for a transaction when buffering request
+ // body data, so we only call this the first time when
+ // client_request_body_bytes is 0.
+ client_request_body_bytes = p->bytes_consumed;
+ IOBufferReader *client_reader = ua_txn->get_remote_reader();
+ // p->bytes_consumed represents the number of body bytes the tunnel parsed
+ // and consumed from the client. However, not all those bytes may have been
+ // written to our _ua client transaction reader. We must not consume past
+ // the number of bytes available.
+ int64_t const bytes_to_consume = std::min(p->bytes_consumed, client_reader->read_avail());
+ SMDebug("http_tunnel", "Consuming %" PRId64 " bytes from client reader with p->bytes_consumed: %" PRId64 " available: %" PRId64,
+ bytes_to_consume, p->bytes_consumed, client_reader->read_avail());
+ client_reader->consume(bytes_to_consume);
+ }
switch (event) {
case VC_EVENT_INACTIVITY_TIMEOUT:
@@ -6095,8 +6110,8 @@ HttpSM::do_setup_post_tunnel(HttpVC_t to_vc_type)
IOBufferReader *postdata_producer_reader = postdata_producer_buffer->alloc_reader();
postdata_producer_buffer->write(this->_postbuf.postdata_copy_buffer_start);
- int64_t post_bytes = postdata_producer_reader->read_avail();
- transfered_bytes = post_bytes;
+ int64_t post_bytes = chunked ? INT64_MAX : t_state.hdr_info.request_content_length;
+ transferred_bytes = post_bytes;
p = tunnel.add_producer(HTTP_TUNNEL_STATIC_PRODUCER, post_bytes, postdata_producer_reader, (HttpProducerHandler) nullptr,
HT_STATIC, "redirect static agent post");
} else {
@@ -6125,12 +6140,27 @@ HttpSM::do_setup_post_tunnel(HttpVC_t to_vc_type)
// Next order of business if copy the remaining data from the
// header buffer into new buffer
- client_request_body_bytes =
- post_buffer->write(ua_txn->get_remote_reader(), chunked ? ua_txn->get_remote_reader()->read_avail() : post_bytes);
- ua_txn->get_remote_reader()->consume(client_request_body_bytes);
- p = tunnel.add_producer(ua_entry->vc, post_bytes - transfered_bytes, buf_start, &HttpSM::tunnel_handler_post_ua, HT_HTTP_CLIENT,
- "user agent post");
+ int64_t num_body_bytes = 0;
+ // If is_using_post_buffer has been used, then client_request_body_bytes
+ // will have already been sent in wait_for_full_body and there will be
+ // zero bytes in this user agent buffer. We don't want to clobber
+ // client_request_body_bytes with a zero value here in those cases.
+ if (client_request_body_bytes > 0) {
+ num_body_bytes = client_request_body_bytes;
+ } else {
+ num_body_bytes =
+ post_buffer->write(ua_txn->get_remote_reader(), chunked ? ua_txn->get_remote_reader()->read_avail() : post_bytes);
+ }
+ // Don't consume post_bytes here from ua_txn->get_remote_reader() since
+ // we are not sure how many bytes the tunnel will use yet. Wait until
+ // HttpSM::tunnel_handler_post_ua to consume the bytes.
+ // The user agent has already sent all it has
+ if (ua_txn->is_read_closed()) {
+ post_bytes = num_body_bytes;
+ }
+ p = tunnel.add_producer(ua_entry->vc, post_bytes - transferred_bytes, buf_start, &HttpSM::tunnel_handler_post_ua,
+ HT_HTTP_CLIENT, "user agent post");
}
ua_entry->in_tunnel = true;
@@ -6847,6 +6877,8 @@ HttpSM::server_transfer_init(MIOBuffer *buf, int hdr_size)
// we'll get is already in the buffer
nbytes = server_txn->get_remote_reader()->read_avail() + hdr_size;
} else if (t_state.hdr_info.response_content_length == HTTP_UNDEFINED_CL) {
+ // Chunked or otherwise, no length is defined. Pass -1 to tell the
+ // tunnel that the size is unknown.
nbytes = -1;
} else {
// Set to copy to the number of bytes we want to write as
@@ -8564,16 +8596,18 @@ HttpSM::rewind_state_machine()
// YTS Team, yamsat Plugin
// Function to copy the partial Post data while tunnelling
-void
-PostDataBuffers::copy_partial_post_data()
+int64_t
+PostDataBuffers::copy_partial_post_data(int64_t consumed_bytes)
{
if (post_data_buffer_done) {
- return;
+ return 0;
}
- Debug("http_redirect", "[PostDataBuffers::copy_partial_post_data] wrote %" PRId64 " bytes to buffers %" PRId64 "",
- this->ua_buffer_reader->read_avail(), this->postdata_copy_buffer_start->read_avail());
- this->postdata_copy_buffer->write(this->ua_buffer_reader);
- this->ua_buffer_reader->consume(this->ua_buffer_reader->read_avail());
+ int64_t const bytes_to_copy = std::min(consumed_bytes, this->ua_buffer_reader->read_avail());
+ Debug("http_redirect", "given %" PRId64 " bytes consumed, copying %" PRId64 " bytes to buffers with %" PRId64 " available bytes",
+ consumed_bytes, bytes_to_copy, this->ua_buffer_reader->read_avail());
+ this->postdata_copy_buffer->write(this->ua_buffer_reader, bytes_to_copy);
+ this->ua_buffer_reader->consume(bytes_to_copy);
+ return bytes_to_copy;
}
IOBufferReader *
diff --git a/proxy/http/HttpSM.h b/proxy/http/HttpSM.h
index 91e0504ce35..05e15f5a7f5 100644
--- a/proxy/http/HttpSM.h
+++ b/proxy/http/HttpSM.h
@@ -179,7 +179,7 @@ class PostDataBuffers
void clear();
void init(IOBufferReader *ua_reader);
- void copy_partial_post_data();
+ int64_t copy_partial_post_data(int64_t consumed_bytes);
IOBufferReader *get_post_data_buffer_clone_reader();
void
set_post_data_buffer_done(bool done)
@@ -313,8 +313,8 @@ class HttpSM : public Continuation, public PluginUserArgs<TS_USER_ARGS_TXN>
bool debug_on = false; // Transaction specific debug flag
char *redirect_url = nullptr; // url for force redirect (provide users a functionality to redirect to another url when needed)
int redirect_url_len = 0;
- int redirection_tries = 0; // To monitor number of redirections
- int64_t transfered_bytes = 0; // Added to calculate POST data
+ int redirection_tries = 0; // To monitor number of redirections
+ int64_t transferred_bytes = 0; // Added to calculate POST data
// Tunneling request to plugin
HttpPluginTunnel_t plugin_tunnel_type = HTTP_NO_PLUGIN_TUNNEL;
@@ -331,7 +331,7 @@ class HttpSM : public Continuation, public PluginUserArgs<TS_USER_ARGS_TXN>
int64_t postbuf_buffer_avail();
void postbuf_clear();
void disable_redirect();
- void postbuf_copy_partial_data();
+ int64_t postbuf_copy_partial_data(int64_t consumed_bytes);
void postbuf_init(IOBufferReader *ua_reader);
void set_postbuf_done(bool done);
IOBufferReader *get_postbuf_clone_reader();
@@ -750,10 +750,10 @@ HttpSM::disable_redirect()
this->_postbuf.clear();
}
-inline void
-HttpSM::postbuf_copy_partial_data()
+inline int64_t
+HttpSM::postbuf_copy_partial_data(int64_t consumed_bytes)
{
- this->_postbuf.copy_partial_post_data();
+ return this->_postbuf.copy_partial_post_data(consumed_bytes);
}
inline void
diff --git a/proxy/http/HttpTunnel.cc b/proxy/http/HttpTunnel.cc
index adb3cd9bc98..1508179e6b5 100644
--- a/proxy/http/HttpTunnel.cc
+++ b/proxy/http/HttpTunnel.cc
@@ -38,6 +38,9 @@
#include "tscore/ParseRules.h"
#include "tscore/ink_memory.h"
+#include <algorithm>
+#include <cstdint>
+
static const int min_block_transfer_bytes = 256;
static const char *const CHUNK_HEADER_FMT = "%" PRIx64 "\r\n";
// This should be as small as possible because it will only hold the
@@ -131,19 +134,19 @@ ChunkedHandler::set_max_chunk_size(int64_t size)
max_chunk_header_len = snprintf(max_chunk_header, sizeof(max_chunk_header), CHUNK_HEADER_FMT, max_chunk_size);
}
-void
+int64_t
ChunkedHandler::read_size()
{
- int64_t bytes_used;
- bool done = false;
- int cr = 0;
+ int64_t bytes_consumed = 0;
+ bool done = false;
+ int cr = 0;
- while (chunked_reader->read_avail() > 0 && !done) {
+ while (chunked_reader->is_read_avail_more_than(0) && !done) {
const char *tmp = chunked_reader->start();
int64_t data_size = chunked_reader->block_read_avail();
ink_assert(data_size > 0);
- bytes_used = 0;
+ int64_t bytes_used = 0;
while (data_size > 0) {
bytes_used++;
@@ -218,7 +221,9 @@ ChunkedHandler::read_size()
chunked_size += bytes_used;
}
chunked_reader->consume(bytes_used);
+ bytes_consumed += bytes_used;
}
+ return bytes_consumed;
}
// int ChunkedHandler::transfer_bytes()
@@ -240,7 +245,7 @@ ChunkedHandler::transfer_bytes()
chunked_size += moved;
}
chunked_reader->consume(moved);
- cur_chunk_bytes_left = cur_chunk_bytes_left - moved;
+ cur_chunk_bytes_left -= moved;
return moved;
}
@@ -274,10 +279,10 @@ ChunkedHandler::transfer_bytes()
return total_moved;
}
-void
+int64_t
ChunkedHandler::read_chunk()
{
- int64_t b = transfer_bytes();
+ int64_t transferred_bytes = transfer_bytes();
ink_assert(cur_chunk_bytes_left >= 0);
if (cur_chunk_bytes_left == 0) {
@@ -285,21 +290,23 @@ ChunkedHandler::read_chunk()
state = CHUNK_READ_SIZE_START;
} else if (cur_chunk_bytes_left > 0) {
- Debug("http_chunk", "read %" PRId64 " bytes of an %" PRId64 " chunk", b, cur_chunk_size);
+ Debug("http_chunk", "read %" PRId64 " bytes of an %" PRId64 " chunk", transferred_bytes, cur_chunk_size);
}
+ return transferred_bytes;
}
-void
+int64_t
ChunkedHandler::read_trailer()
{
- int64_t bytes_used;
- bool done = false;
+ int64_t bytes_consumed = 0;
+ bool done = false;
while (chunked_reader->is_read_avail_more_than(0) && !done) {
const char *tmp = chunked_reader->start();
int64_t data_size = chunked_reader->block_read_avail();
ink_assert(data_size > 0);
+ int64_t bytes_used = 0;
for (bytes_used = 0; data_size > 0; data_size--) {
bytes_used++;
@@ -337,43 +344,48 @@ ChunkedHandler::read_trailer()
tmp++;
}
chunked_reader->consume(bytes_used);
+ bytes_consumed += bytes_used;
}
+ return bytes_consumed;
}
-bool
+std::pair<int64_t, bool>
ChunkedHandler::process_chunked_content()
{
+ int64_t bytes_read = 0;
while (chunked_reader->is_read_avail_more_than(0) && state != CHUNK_READ_DONE && state != CHUNK_READ_ERROR) {
switch (state) {
case CHUNK_READ_SIZE:
case CHUNK_READ_SIZE_CRLF:
case CHUNK_READ_SIZE_START:
- read_size();
+ bytes_read += read_size();
break;
case CHUNK_READ_CHUNK:
- read_chunk();
+ bytes_read += read_chunk();
break;
case CHUNK_READ_TRAILER_BLANK:
case CHUNK_READ_TRAILER_CR:
case CHUNK_READ_TRAILER_LINE:
- read_trailer();
+ bytes_read += read_trailer();
break;
case CHUNK_FLOW_CONTROL:
- return false;
+ return std::make_pair(bytes_read, false);
default:
ink_release_assert(0);
break;
}
}
- return (state == CHUNK_READ_DONE || state == CHUNK_READ_ERROR);
+ auto const done = (state == CHUNK_READ_DONE || state == CHUNK_READ_ERROR);
+ return std::make_pair(bytes_read, done);
}
-bool
+std::pair<int64_t, bool>
ChunkedHandler::generate_chunked_content()
{
char tmp[16];
bool server_done = false;
int64_t r_avail;
+ int64_t consumed_bytes = 0;
ink_assert(max_chunk_header_len);
@@ -414,6 +426,7 @@ ChunkedHandler::generate_chunked_content()
chunked_buffer->write(dechunked_reader, write_val);
chunked_size += write_val;
dechunked_reader->consume(write_val);
+ consumed_bytes += write_val;
// Output the trailing CRLF.
chunked_buffer->write("\r\n", 2);
@@ -426,9 +439,9 @@ ChunkedHandler::generate_chunked_content()
// Add the chunked transfer coding trailer.
chunked_buffer->write("0\r\n\r\n", 5);
chunked_size += 5;
- return true;
+ return std::make_pair(consumed_bytes, true);
}
- return false;
+ return std::make_pair(consumed_bytes, false);
}
HttpTunnelProducer::HttpTunnelProducer() : consumer_list() {}
@@ -678,7 +691,7 @@ HttpTunnel::add_producer(VConnection *vc, int64_t nbytes_arg, IOBufferReader *re
ink_assert(reader_start->mbuf);
if ((p = alloc_producer()) != nullptr) {
p->vc = vc;
- p->nbytes = nbytes_arg;
+ p->total_bytes = nbytes_arg;
p->buffer_start = reader_start;
p->read_buffer = reader_start->mbuf;
p->vc_handler = sm_handler;
@@ -690,22 +703,23 @@ HttpTunnel::add_producer(VConnection *vc, int64_t nbytes_arg, IOBufferReader *re
p->do_dechunking = false;
p->do_chunked_passthru = false;
- p->init_bytes_done = reader_start->read_avail();
- if (p->nbytes < 0) {
- p->ntodo = p->nbytes;
+ p->init_bytes_done = p->buffer_start->read_avail();
+ if (p->total_bytes < 0 || p->total_bytes == INT64_MAX) {
+ p->total_bytes = INT64_MAX; // A negative nbytes_arg is a synonym for INT64_MAX.
+ p->ntodo = INT64_MAX;
} else { // The byte count given us includes bytes
// that already may be in the buffer.
// ntodo represents the number of bytes
// the tunneling mechanism needs to read
// for the producer
- p->ntodo = p->nbytes - p->init_bytes_done;
+ p->ntodo = std::max(p->total_bytes - p->init_bytes_done, INT64_C(0));
ink_assert(p->ntodo >= 0);
}
// We are static, the producer is never "alive"
// It just has data in the buffer
if (vc == HTTP_TUNNEL_STATIC_PRODUCER) {
- ink_assert(p->ntodo == 0);
+ ink_assert(p->ntodo >= 0);
p->alive = false;
p->read_success = true;
} else {
@@ -844,9 +858,15 @@ HttpTunnel::producer_run(HttpTunnelProducer *p)
}
}
}
+ if (!p->is_handling_chunked_content()) {
+ // If we are not handling chunked content, then we will be consuming all the
+ // bytes available in the reader, up until the total bytes that the tunnel
+ // will be processing.
+ p->bytes_consumed += std::min(p->total_bytes, p->init_bytes_done);
+ }
- int64_t consumer_n;
- int64_t producer_n;
+ int64_t consumer_n = 0;
+ int64_t producer_n = 0;
ink_assert(p->vc != nullptr);
active = true;
@@ -854,7 +874,10 @@ HttpTunnel::producer_run(HttpTunnelProducer *p)
IOBufferReader *chunked_buffer_start = nullptr;
IOBufferReader *dechunked_buffer_start = nullptr;
IOBufferReader *passthrough_buffer_start = nullptr;
- if (p->do_chunking || p->do_dechunking || p->do_chunked_passthru) {
+ if (p->is_handling_chunked_content()) {
+ // For all the chunking cases, we must only copy bytes as we process them.
+ body_bytes_to_copy = 0;
+
p->chunked_handler.init(p->buffer_start, p, this->http_drop_chunked_trailers);
// Copy the header into the chunked/dechunked buffers.
@@ -888,17 +911,23 @@ HttpTunnel::producer_run(HttpTunnelProducer *p)
int64_t read_start_pos = 0;
if (p->vc_type == HT_CACHE_READ && sm->t_state.range_setup == HttpTransact::RANGE_NOT_TRANSFORM_REQUESTED) {
- ink_assert(sm->t_state.num_range_fields == 1); // we current just support only one range entry
+ ink_assert(sm->t_state.num_range_fields == 1); // we currently just support only one range entry
read_start_pos = sm->t_state.ranges[0]._start;
producer_n = (sm->t_state.ranges[0]._end - sm->t_state.ranges[0]._start) + 1;
consumer_n = (producer_n + sm->client_response_hdr_bytes);
- } else if (p->nbytes >= 0) {
- consumer_n = p->nbytes;
+ } else if (p->total_bytes >= 0) {
+ consumer_n = p->total_bytes;
producer_n = p->ntodo;
} else {
consumer_n = (producer_n = INT64_MAX);
}
+ if (!p->is_handling_chunked_content()) {
+ // No chunking being handled, so the user specified a number of bytes
+ // described by Content-Length. Use that value.
+ body_bytes_to_copy = producer_n - body_bytes_copied;
+ }
+
// At least set up the consumer readers first so the data
// doesn't disappear out from under the tunnel
for (c = p->consumer_list.head; c; c = c->link.next) {
@@ -933,9 +962,9 @@ HttpTunnel::producer_run(HttpTunnelProducer *p)
c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
}
- // Consume bytes of the reader if we skipping bytes
+ // Consume bytes from the reader if we are skipping bytes.
if (c->skip_bytes > 0) {
- ink_assert(c->skip_bytes <= c->buffer_reader->read_avail());
+ ink_release_assert(c->skip_bytes <= c->buffer_reader->read_avail());
c->buffer_reader->consume(c->skip_bytes);
}
}
@@ -958,7 +987,8 @@ HttpTunnel::producer_run(HttpTunnelProducer *p)
return;
}
} else {
- sm->postbuf_copy_partial_data();
+ body_bytes_copied += sm->postbuf_copy_partial_data(body_bytes_to_copy);
+ body_bytes_to_copy = 0;
}
} // end of added logic for partial POST
@@ -966,6 +996,7 @@ HttpTunnel::producer_run(HttpTunnelProducer *p)
// remove the chunked reader marker so that it doesn't act like a buffer guard
p->chunked_handler.chunked_buffer->dealloc_reader(chunked_buffer_start);
p->chunked_handler.dechunked_reader->consume(p->chunked_handler.skip_bytes);
+ p->bytes_consumed += p->chunked_handler.skip_bytes;
// If there is data to process in the buffer, do it now
producer_handler(VC_EVENT_READ_READY, p);
@@ -984,12 +1015,10 @@ HttpTunnel::producer_run(HttpTunnelProducer *p)
p->chunked_handler.chunked_reader->read_avail());
if (!transform_consumer && (p->chunked_handler.chunked_reader->read_avail() >= p->chunked_handler.skip_bytes)) {
p->chunked_handler.chunked_reader->consume(p->chunked_handler.skip_bytes);
+ p->bytes_consumed += p->chunked_handler.skip_bytes;
Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.skip_bytes = %" PRId64 "",
p->chunked_handler.skip_bytes);
}
- // if(p->chunked_handler.chunked_reader->read_avail() > 0)
- // p->chunked_handler.chunked_reader->consume(
- // p->chunked_handler.skip_bytes);
producer_handler(VC_EVENT_READ_READY, p);
if (sm->get_postbuf_done() && p->vc_type == HT_HTTP_CLIENT) { // read_avail() == 0
@@ -1104,7 +1133,10 @@ HttpTunnel::producer_handler_dechunked(int event, HttpTunnelProducer *p)
/* fallthrough */
case VC_EVENT_READ_READY:
p->last_event = p->chunked_handler.last_server_event = event;
- if (p->chunked_handler.generate_chunked_content()) { // We are done, make sure the consumer is activated
+ auto const [consumed_bytes, done] = p->chunked_handler.generate_chunked_content();
+ p->bytes_consumed += consumed_bytes;
+ body_bytes_to_copy = consumed_bytes;
+ if (done) { // We are done, make sure the consumer is activated
HttpTunnelConsumer *c;
for (c = p->consumer_list.head; c; c = c->link.next) {
if (c->alive) {
@@ -1147,7 +1179,9 @@ HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer *p)
}
p->last_event = p->chunked_handler.last_server_event = event;
- bool done = p->chunked_handler.process_chunked_content();
+ auto const [bytes_consumed, done] = p->chunked_handler.process_chunked_content();
+ p->bytes_consumed += bytes_consumed;
+ body_bytes_to_copy = bytes_consumed;
// If we couldn't understand the encoding, return
// an error
@@ -1201,16 +1235,10 @@ HttpTunnel::producer_handler(int event, HttpTunnelProducer *p)
// Handle chunking/dechunking/chunked-passthrough if necessary.
if (p->do_chunking) {
+ // This will update body_bytes_to_copy with the number of bytes copied.
event = producer_handler_dechunked(event, p);
-
- // If we were in PRECOMPLETE when this function was called
- // and we are doing chunking, then we just wrote the last
- // chunk in the function call above. We are done with the
- // tunnel.
- if (event == HTTP_TUNNEL_EVENT_PRECOMPLETE) {
- event = VC_EVENT_EOS;
- }
} else if (p->do_dechunking || p->do_chunked_passthru) {
+ // This will update body_bytes_to_copy with the number of bytes copied.
event = producer_handler_chunked(event, p);
} else {
p->last_event = event;
@@ -1233,7 +1261,12 @@ HttpTunnel::producer_handler(int event, HttpTunnelProducer *p)
event = VC_EVENT_ERROR;
}
} else {
- sm->postbuf_copy_partial_data();
+ if (!p->is_handling_chunked_content()) {
+ // The chunk handlers didn't consume bytes. Pull bytes as needed.
+ body_bytes_to_copy = p->total_bytes - body_bytes_copied;
+ }
+ body_bytes_copied += sm->postbuf_copy_partial_data(body_bytes_to_copy);
+ body_bytes_to_copy = 0;
if (event == VC_EVENT_READ_COMPLETE || event == HTTP_TUNNEL_EVENT_PRECOMPLETE || event == VC_EVENT_EOS) {
sm->set_postbuf_done(true);
}
@@ -1268,6 +1301,9 @@ HttpTunnel::producer_handler(int event, HttpTunnelProducer *p)
p->alive = false;
if (p->read_vio) {
p->bytes_read = p->read_vio->ndone;
+ if (!p->is_handling_chunked_content()) {
+ p->bytes_consumed += p->bytes_read;
+ }
} else {
// If we are chunked, we can receive the whole document
// along with the header without knowing it (due to
@@ -1305,6 +1341,9 @@ HttpTunnel::producer_handler(int event, HttpTunnelProducer *p)
p->alive = false;
if (p->read_vio) {
p->bytes_read = p->read_vio->ndone;
+ if (!p->is_handling_chunked_content()) {
+ p->bytes_consumed += p->bytes_read;
+ }
} else {
p->bytes_read = 0;
}
@@ -1526,6 +1565,9 @@ HttpTunnel::chain_abort_all(HttpTunnelProducer *p)
p->alive = false;
if (p->read_vio) {
p->bytes_read = p->read_vio->ndone;
+ if (!p->is_handling_chunked_content()) {
+ p->bytes_consumed += p->bytes_read;
+ }
}
if (p->self_consumer) {
p->self_consumer->alive = false;
@@ -1543,8 +1585,8 @@ HttpTunnel::chain_abort_all(HttpTunnelProducer *p)
int64_t
HttpTunnel::final_consumer_bytes_to_write(HttpTunnelProducer *p, HttpTunnelConsumer *c)
{
- int64_t total_bytes = 0;
- int64_t consumer_n = 0;
+ int64_t bytes_to_write = 0;
+ int64_t consumer_n = 0;
if (p->alive) {
consumer_n = INT64_MAX;
} else {
@@ -1554,25 +1596,25 @@ HttpTunnel::final_consumer_bytes_to_write(HttpTunnelProducer *p, HttpTunnelConsu
switch (action) {
case TCA_CHUNK_CONTENT:
case TCA_PASSTHRU_DECHUNKED_CONTENT:
- total_bytes = p->bytes_read + p->init_bytes_done;
+ bytes_to_write = p->bytes_consumed;
break;
case TCA_DECHUNK_CONTENT:
case TCA_PASSTHRU_CHUNKED_CONTENT:
- total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
+ bytes_to_write = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
break;
default:
break;
}
} else if (action == TCA_CHUNK_CONTENT) {
- total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.chunked_size;
+ bytes_to_write = p->chunked_handler.skip_bytes + p->chunked_handler.chunked_size;
} else if (action == TCA_DECHUNK_CONTENT) {
- total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
+ bytes_to_write = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
} else if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
- total_bytes = p->bytes_read + p->init_bytes_done;
+ bytes_to_write = p->bytes_consumed;
} else {
- total_bytes = p->bytes_read + p->init_bytes_done;
+ bytes_to_write = p->bytes_consumed;
}
- consumer_n = total_bytes - c->skip_bytes;
+ consumer_n = bytes_to_write - c->skip_bytes;
}
}
return consumer_n;
@@ -1594,12 +1636,10 @@ HttpTunnel::finish_all_internal(HttpTunnelProducer *p, bool chain)
TunnelChunkingAction_t action = p->chunking_action;
if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
- // if the only chunked data was in the initial read, make sure we don't consume too much
+ // Verify that we consumed the number of bytes we accounted for via p->bytes_consumed.
if (p->bytes_read == 0 && p->buffer_start != nullptr) {
int num_read = p->buffer_start->read_avail() - p->chunked_handler.chunked_reader->read_avail();
- if (num_read < p->init_bytes_done) {
- p->init_bytes_done = num_read;
- }
+ ink_release_assert(num_read == p->bytes_consumed);
}
}
@@ -1694,6 +1734,9 @@ HttpTunnel::close_vc(HttpTunnelConsumer *c)
p->alive = false;
if (p->read_vio) {
p->bytes_read = p->read_vio->ndone;
+ if (!p->is_handling_chunked_content()) {
+ p->bytes_consumed += p->bytes_read;
+ }
}
}
diff --git a/proxy/http/HttpTunnel.h b/proxy/http/HttpTunnel.h
index 3ce994b11ed..3aac38aca68 100644
--- a/proxy/http/HttpTunnel.h
+++ b/proxy/http/HttpTunnel.h
@@ -112,7 +112,14 @@ struct ChunkedHandler {
*/
bool drop_chunked_trailers = false;
- bool truncation = false;
+ bool truncation = false;
+
+ /** The number of bytes to skip from the reader because they are not body bytes.
+ *
+ * These skipped bytes are generally header bytes. We copy these in for any
+ * internal buffers we'll send to consumers, but skip them when parsing body
+ * bytes.
+ */
int64_t skip_bytes = 0;
ChunkedState state = CHUNK_READ_CHUNK;
@@ -145,14 +152,54 @@ struct ChunkedHandler {
/// If @a size is zero it is set to @c DEFAULT_MAX_CHUNK_SIZE.
void set_max_chunk_size(int64_t size);
- // Returns true if complete, false otherwise
- bool process_chunked_content();
- bool generate_chunked_content();
+ /** Consumes and processes chunked content.
+ *
+ * This consumes data from @a chunked_reader and, if dechunking, writes the
+ * dechunked body to @a dechunked_buffer.
+ *
+ * @return The number of bytes consumed from the chunked reader and true if
+ * the entire chunked content is processed, false otherwise.
+ */
+ std::pair<int64_t, bool> process_chunked_content();
+
+ /** Writes chunked content.
+ *
+ * This reads from @a dechunked_reader and writes chunked formatted content to
+ * @a chunked_buffer.
+ *
+ * @return The number of bytes consumed from the dechunked reader and true if
+ * the entire chunked content is written, false otherwise.
+ */
+ std::pair<int64_t, bool> generate_chunked_content();
private:
- void read_size();
- void read_chunk();
- void read_trailer();
+ /** Read a chunk header containing the size of the chunk.
+ *
+ * @return The number of bytes consumed from the chunked buffer reader.
+ */
+ int64_t read_size();
+
+ /** Read a chunk body.
+ *
+ * This is called after read_size so that the chunk size is known.
+ *
+ * @return The number of bytes consumed from the chunked buffer reader.
+ */
+ int64_t read_chunk();
+
+ /** Read a chunk trailer.
+ *
+ * @return The number of bytes consumed from the chunked buffer reader.
+ */
+ int64_t read_trailer();
+
+ /** Transfer body bytes from the chunked reader.
+ *
+ * This will either simply consume the chunked body bytes in the case of
+ * chunked passthrough, or transfer the chunked body to the dechunked buffer.
+ *
+ * @return The number of bytes consumed from the chunked buffer reader.
+ */
int64_t transfer_bytes();
constexpr static std::string_view FINAL_CRLF = "\r\n";
@@ -209,12 +256,51 @@ struct HttpTunnelProducer {
bool do_dechunking = false;
bool do_chunked_passthru = false;
- int64_t init_bytes_done = 0; // bytes passed in buffer
- int64_t nbytes = 0; // total bytes (client's perspective)
- int64_t ntodo = 0; // what this vc needs to do
- int64_t bytes_read = 0; // total bytes read from the vc
- int handler_state = 0; // state used the handlers
- int last_event = 0; ///< Tracking for flow control restarts.
+ /** The number of bytes available in the reader at the start of the tunnel.
+ *
+ * @note In the case of pipelined requests, not all these bytes should be used.
+ */
+ int64_t init_bytes_done = 0;
+
+ /** The total number of bytes read from the reader, including any @a skip_bytes.
+ *
+ * In straightforward cases where @a total_bytes is specified (non-INT64_MAX),
+ * these should wind up being the same as @a total_bytes. For unspecified,
+ * generally chunked content, this will be the number of bytes actually
+ * consumed from the reader.
+ *
+ * @note that in the case of pipelined requests, this may be less than @a
+ * init_bytes_done because some of those bytes might be for a future request
+ * rather than for this body/tunnel.
+ */
+ int64_t bytes_consumed = 0;
+
+ /** The total number of bytes to be transferred through the tunnel.
+ *
+ * This will include any bytes skipped at the start of the tunnel.
+ *
+ * @note This is set by the creator of the tunnel and in the simple case the
+ * value is precisely known via a Content-Length header value. However, a user
+ * may set this to INT64_MAX or any negative value to indicate that the total
+ * is unknown at the start of the tunnel (such as is the case with chunked
+ * encoded content).
+ */
+ int64_t total_bytes = 0;
+
+ /** The number of bytes still to read after @a init_bytes_done to reach @a
+ * total_bytes.
+ *
+ * A value of zero indicates that all the required bytes have already been
+ * read off the socket. @a ntodo will be used to indicate how much more we
+ * have to read.
+ */
+ int64_t ntodo = 0;
+
+ /** The number of bytes read from the vc since the start of the tunnel. */
+ int64_t bytes_read = 0;
+
+ int handler_state = 0; // state used the handlers
+ int last_event = 0; ///< Tracking for flow control restarts.
int num_consumers = 0;
@@ -232,6 +318,12 @@ struct HttpTunnelProducer {
*/
uint64_t backlog(uint64_t limit = UINT64_MAX ///< More than this is irrelevant
);
+ /** Indicate whether this producer is handling some kind of chunked content.
+ *
+ * @return True if this producer is handling chunked content, false
+ * otherwise.
+ */
+ bool is_handling_chunked_content() const;
/// Check if producer is original (to ATS) source of data.
/// @return @c true if this producer is the source of bytes from outside ATS.
bool is_source() const;
@@ -301,10 +393,12 @@ class HttpTunnel : public Continuation
/// A named variable for the @a drop_chunked_trailers parameter to @a set_producer_chunking_action.
static constexpr bool DROP_CHUNKED_TRAILERS = true;
- /** Configure how the producer should behave with chunked content.
- * @param[in] p Producer to configure.
- * @param[in] skip_bytes Number of bytes to skip at the beginning of the stream (typically the headers).
- * @param[in] action Action to take with the chunked content.
+ /** Designate chunking behavior to the producer.
+ *
+ * @param[in] producer The producer being configured.
+ * @param[in] skip_bytes The number of bytes to consume off the stream before
+ * any chunked data is encountered. These are generally header bytes, if any.
+ * @param[in] action The chunking behavior to enact on incoming bytes.
* @param[in] drop_chunked_trailers If @c true, chunked trailers are filtered
* out. Logically speaking, this is only applicable when proxying chunked
* content, thus only when @a action is @c TCA_PASSTHRU_CHUNKED_CONTENT.
@@ -388,6 +482,21 @@ class HttpTunnel : public Continuation
/// Corresponds to proxy.config.http.drop_chunked_trailers having a value of 1.
bool http_drop_chunked_trailers = false;
+
+ /** The number of body bytes processed in this last execution of the tunnel.
+ *
+ * This accounting is used to determine how many bytes to copy into the body
+ * buffer via HttpSM::postbuf_copy_partial_data.
+ */
+ int64_t body_bytes_to_copy = 0;
+
+ /** The cumulative number of bytes that all calls to
+ * HttpSM::post_copy_partial_data have copied.
+ *
+ * This is recorded so we don't copy more bytes than the creator of the tunnel
+ * told us to via nbytes.
+ */
+ int64_t body_bytes_copied = 0;
};
////
@@ -542,7 +651,7 @@ HttpTunnel::append_message_to_producer_buffer(HttpTunnelProducer *p, const char
}
p->read_buffer->write(msg, msg_len);
- p->nbytes += msg_len;
+ p->total_bytes += msg_len;
p->bytes_read += msg_len;
}
@@ -609,6 +718,12 @@ HttpTunnelConsumer::is_sink() const
return HT_HTTP_CLIENT == vc_type || HT_CACHE_WRITE == vc_type;
}
+inline bool
+HttpTunnelProducer::is_handling_chunked_content() const
+{
+ return do_chunking || do_dechunking || do_chunked_passthru;
+}
+
inline bool
HttpTunnelProducer::is_source() const
{
diff --git a/proxy/http2/Http2Stream.cc b/proxy/http2/Http2Stream.cc
index 6a4d29584a6..cd5f0a3704c 100644
--- a/proxy/http2/Http2Stream.cc
+++ b/proxy/http2/Http2Stream.cc
@@ -1033,6 +1033,12 @@ Http2Stream::get_transaction_priority_dependence() const
}
}
+bool
+Http2Stream::is_read_closed() const
+{
+ return this->recv_end_stream;
+}
+
int64_t
Http2Stream::read_vio_read_avail()
{
diff --git a/proxy/http2/Http2Stream.h b/proxy/http2/Http2Stream.h
index 03d9decea09..e3e9f583232 100644
--- a/proxy/http2/Http2Stream.h
+++ b/proxy/http2/Http2Stream.h
@@ -106,6 +106,7 @@ class Http2Stream : public ProxyTransaction
int get_transaction_id() const override;
int get_transaction_priority_weight() const override;
int get_transaction_priority_dependence() const override;
+ bool is_read_closed() const override;
void clear_io_events();
diff --git a/src/traffic_server/FetchSM.cc b/src/traffic_server/FetchSM.cc
index 788d5335ac8..19303b7e03d 100644
--- a/src/traffic_server/FetchSM.cc
+++ b/src/traffic_server/FetchSM.cc
@@ -23,6 +23,7 @@
#include "tscore/ink_config.h"
#include "FetchSM.h"
+#include <cstdint>
#include <cstdio>
#include "HTTP.h"
#include "PluginVC.h"
@@ -229,7 +230,7 @@ FetchSM::dechunk_body()
// - TS_FETCH_EVENT_EXT_BODY_READY.
// - TS_FETCH_EVENT_EXT_BODY_DONE.
//
- if (chunked_handler.process_chunked_content()) {
+ if (auto const [_, done] = chunked_handler.process_chunked_content(); done) {
return TS_FETCH_EVENT_EXT_BODY_DONE;
}
diff --git a/tests/gold_tests/pipeline/pipeline.test.py b/tests/gold_tests/pipeline/pipeline.test.py
new file mode 100644
index 00000000000..dea95511fb4
--- /dev/null
+++ b/tests/gold_tests/pipeline/pipeline.test.py
@@ -0,0 +1,137 @@
+'''Test a pipelined requests.'''
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ports import get_port
+import sys
+
+Test.Summary = '''Test pipelined requests.'''
+
+IP_ALLOW_CONTENT = '''
+ip_allow:
+ - apply: in
+ ip_addrs: 0/0
+ action: deny
+ methods:
+ - DELETE
+'''
+
+
+class TestPipelining:
+ """Verify that a set of pipelined requests is handled correctly."""
+
+ _client_script: str = 'pipeline_client.py'
+ _server_script: str = 'pipeline_server.py'
+
+ _dns_counter: int = 0
+ _server_counter: int = 0
+ _ts_counter: int = 0
+
+ def __init__(self, buffer_requests: bool) -> None:
+ """Configure the TestRun.
+
+ :param buffer_requests: Whether to configure ATS to buffer client requests.
+ """
+ tr = Test.AddTestRun('Test a pipelined chunked encoded request.')
+ tr.TimeOut = 10
+ self._configure_dns(tr)
+ self._configure_server(tr)
+ self._configure_traffic_server(tr, buffer_requests)
+ self._configure_client(tr)
+
+ def _configure_dns(self, tr: 'TestRun') -> 'Process':
+ """Configure the DNS.
+
+ :param tr: The test run to associate with the DNS process with.
+ :return: The DNS process.
+ """
+ name = f'dns_{self._dns_counter}'
+ TestPipelining._dns_counter += 1
+ dns = tr.MakeDNServer(name, default='127.0.0.1')
+ self._dns = dns
+ return dns
+
+ def _configure_server(self, tr: 'TestRun') -> 'Process':
+ """Configure the origin server.
+
+ :param tr: The test run to associate with the origin server with.
+ :return: The origin server process.
+ """
+ name = f'server_{self._server_counter}'
+ TestPipelining._server_counter += 1
+ server = tr.Processes.Process(name)
+ tr.Setup.Copy(self._server_script)
+ port = get_port(server, "http_port")
+ server.Command = f'{sys.executable} {self._server_script} 127.0.0.1 {port} '
+ server.ReturnCode = 0
+ server.Ready = When.PortOpenv4(port)
+ server.Streams.All += Testers.ContainsExpression('/first', 'Should receive the first request')
+ server.Streams.All += Testers.ContainsExpression('/second', 'Should receive the second request')
+
+ # The third request should be denied due to the ip_allow.yaml rule.
+ server.Streams.All += Testers.ExcludesExpression('/third', 'Should not receive the third request')
+ self._server = server
+ return server
+
+ def _configure_traffic_server(self, tr: 'TestRun', buffer_requests: bool) -> 'Process':
+ """Configure ATS.
+
+ :param tr: The test run to associate with the ATS process with.
+ :param buffer_requests: Whether to enable request_buffer_enabled.
+ :return: The ATS process.
+ """
+ name = f'ts_{self._ts_counter}'
+ TestPipelining._ts_counter += 1
+ ts = tr.MakeATSProcess(name, enable_cache=False)
+ self._ts = ts
+ ts.Disk.remap_config.AddLine(f'map / http://backend.server.com:{self._server.Variables.http_port}')
+ ts.Disk.records_config.update(
+ {
+ 'proxy.config.diags.debug.enabled': 1,
+ 'proxy.config.diags.debug.tags': 'http|ip_allow',
+ 'proxy.config.dns.nameservers': f'127.0.0.1:{self._dns.Variables.Port}',
+ 'proxy.config.dns.resolv_conf': 'NULL',
+ })
+ if buffer_requests:
+ ts.Disk.records_config.update({
+ 'proxy.config.http.request_buffer_enabled': 1,
+ })
+ ts.Disk.ip_allow_yaml.AddLines(IP_ALLOW_CONTENT.split("\n"))
+ return ts
+
+ def _configure_client(self, tr: 'TestRun') -> 'Process':
+ """Configure the client.
+
+ :param tr: The test run to associate with the client process with.
+ :return: The client process.
+ """
+ client = tr.Processes.Default
+ tr.Setup.Copy(self._client_script)
+ client.Command = (f'{sys.executable} {self._client_script} 127.0.0.1 {self._ts.Variables.port} '
+ 'server.com server.com')
+ client.ReturnCode = 0
+ client.Streams.All += Testers.ContainsExpression('X-Response: first', "Should receive the origin's first response.")
+ client.Streams.All += Testers.ContainsExpression('X-Response: second', "Should receive the origin's second response.")
+ client.Streams.All += Testers.ExcludesExpression('X-Response: third', "Should not receive the origin's third response.")
+ client.Streams.All += Testers.ContainsExpression('403', 'Should receive the ATS-generated rejection of the DELETE request.')
+ client.StartBefore(self._dns)
+ client.StartBefore(self._server)
+ client.StartBefore(self._ts)
+
+
+TestPipelining(buffer_requests=False)
+TestPipelining(buffer_requests=True)
diff --git a/tests/gold_tests/pipeline/pipeline_client.py b/tests/gold_tests/pipeline/pipeline_client.py
new file mode 100644
index 00000000000..6716d8a1ff4
--- /dev/null
+++ b/tests/gold_tests/pipeline/pipeline_client.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python3
+"""A client that sends three pipelined GET requests."""
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import socket
+import sys
+
+
+def parse_args() -> argparse.Namespace:
+ """Parse the command line arguments."""
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument("proxy_address", metavar="proxy-address", help="Address of the proxy to connect to.")
+ parser.add_argument("proxy_port", metavar="proxy-port", type=int, help="The port of the proxy to connect to.")
+ parser.add_argument('first_hostname', metavar='first-hostname', help='The Host header field value of the first request.')
+ parser.add_argument('second_hostname', metavar='second-hostname', help='The Host header field value of the second request.')
+ return parser.parse_args()
+
+
+def open_connection(address: str, port: int) -> socket.socket:
+ """Open a connection to the desired host.
+
+ :param address: The address of the host to connect to.
+ :param port: The port of the host to connect to.
+ :return: The socket connected to the host.
+ """
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((address, port))
+ print(f'Connected to {address}:{port}')
+ return sock
+
+
+def write_pipelined_requests(sock: socket.socket, first_hostname: str, second_hostname: str) -> None:
+ """Write three pipelined requests to the socket.
+
+ :param sock: The socket to write to.
+ :param first_hostname: The Host header field value of the first request.
+ :param second_hostname: The Host header field value of the second request.
+ """
+ first_request = f'GET /first HTTP/1.1\r\nHost: {first_hostname}\r\nTransfer-Encoding: chunked\r\n\r\n0\r\n\r\n'
+ # For if we want to test CL first. Leave this out of the final commit.
+ #first_request = f'GET /first HTTP/1.1\r\nHost: {first_hostname}\r\nContent-Length: 5\r\n\r\n67891'
+ second_request = f'GET /second HTTP/1.1\r\nHost: {first_hostname}\r\nContent-Length: 5\r\n\r\n12345'
+ third_request = f'DELETE /third HTTP/1.1\r\nHost: {second_hostname}\r\nContent-Length: 0\r\n\r\n'
+ pipelined_requests = first_request + second_request + third_request
+ total = len(first_request) + len(second_request) + len(third_request)
+ print(
+ f'Sending three pipelined requests: {len(first_request)} bytes, '
+ f'{len(second_request)} bytes, and {len(third_request)} bytes: '
+ f'{total} total bytes')
+ print(pipelined_requests)
+ sock.sendall(pipelined_requests.encode())
+ print()
+
+
+def wait_for_responses(sock: socket.socket, num_responses: int) -> bytes:
+ """Wait for the responses to be complete.
+
+ :param sock: The socket to read from.
+ :param num_responses: The number of responses to wait for.
+ :returns: The bytes read off the socket.
+ """
+ responses = b""
+ while True:
+ data = sock.recv(1024)
+ if not data:
+ print("Socket closed.")
+ break
+ print(f'Received:\n{data}')
+ responses += data
+ if responses.count(b"\r\n\r\n") == num_responses:
+ break
+ return responses
+
+
+def main() -> int:
+ """Send the pipelined requests."""
+ args = parse_args()
+ with open_connection(args.proxy_address, args.proxy_port) as s:
+ write_pipelined_requests(s, args.first_hostname, args.second_hostname)
+ print("Waiting for responses...")
+ responses = wait_for_responses(s, 3)
+ print()
+ print(f'Received responses:\n{responses.decode()}')
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/tests/gold_tests/pipeline/pipeline_server.py b/tests/gold_tests/pipeline/pipeline_server.py
new file mode 100644
index 00000000000..cf13fa5696d
--- /dev/null
+++ b/tests/gold_tests/pipeline/pipeline_server.py
@@ -0,0 +1,201 @@
+#!/usr/bin/env python3
+"""A server that receives possibly pipelined requests."""
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import socket
+import sys
+import time
+"""A flag indicating whether all three requests have been received."""
+received_third_request: bool = False
+
+
+def parse_args() -> argparse.Namespace:
+ """Parse the command line arguments.
+
+ :returns: The parsed arguments.
+ """
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument("address", help="Address to listen on.")
+ parser.add_argument("port", type=int, help="The port to listen on.")
+ return parser.parse_args()
+
+
+def get_listening_socket(address: str, port: int) -> socket.socket:
+ """Create a listening socket.
+
+ :param address: The address to listen on.
+ :param port: The port to listen on.
+ :returns: A listening socket.
+ """
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((address, port))
+ sock.listen(1)
+ return sock
+
+
+def accept_connection(sock: socket.socket) -> socket.socket:
+ """Accept a connection.
+
+ :param sock: The socket to accept a connection on.
+ :returns: The accepted socket.
+ """
+ return sock.accept()[0]
+
+
+def receive_requests(sock: socket.socket) -> None:
+ """Receive three requests from the client.
+
+ :param sock: The socket to read from.
+ """
+ global received_third_request
+
+ all_received_data: bytes = b""
+ this_request: bytes = b""
+ first_response_bytes: bytes = b'HTTP/1.1 200 OK\r\nX-Response: first\r\nContent-Length: 0\r\n\r\n'
+ second_response_bytes: bytes = b'HTTP/1.1 200 OK\r\nX-Response: second\r\nContent-Length: 0\r\n\r\n'
+ third_response_bytes: bytes = b'HTTP/1.1 200 OK\r\nX-Response: third\r\nContent-Length: 0\r\n\r\n'
+ processing_first_request: bool = True
+ processing_second_request: bool = False
+ processing_third_request: bool = False
+
+ # Note that this is very ad-hoc. We expect the first request to be chunked,
+ # the second to have a Content-Length body, and the third, if we receive it,
+ # to have no body.
+ end_of_first_request: bytes = b'\r\n0\r\n\r\n'
+ #end_of_first_request: bytes = b'67891' # < revert this eventually.
+ end_of_second_request: bytes = b'12345'
+ end_of_third_request: bytes = b'\r\n\r\n'
+ while not received_third_request:
+ data = sock.recv(1024)
+ if not data:
+ print("Socket closed.")
+ break
+ print(f'Received:')
+ print(data)
+ this_request += data
+ all_received_data += data
+ while not received_third_request:
+ if processing_first_request:
+ end_of_request_index = this_request.find(end_of_first_request)
+ if end_of_request_index == -1:
+ # Need more data.
+ break
+ print(' Received the first request:')
+ print(f' {this_request[:end_of_request_index + len(end_of_first_request)]}')
+ processing_first_request = False
+ processing_second_request = True
+ # Remove the first request from the buffer.
+ this_request = this_request[end_of_request_index + len(end_of_first_request):]
+ print(' Sending response to the first request:')
+ print(f' {first_response_bytes}')
+ print()
+ time.sleep(0.01)
+ sock.sendall(first_response_bytes)
+ continue
+
+ elif processing_second_request:
+ end_of_request_index = this_request.find(end_of_second_request)
+ if end_of_request_index == -1:
+ # Need more data.
+ break
+ print(' Received the second request:')
+ print(f' {this_request[:end_of_request_index + len(end_of_second_request)]}')
+ processing_second_request = False
+ processing_third_request = True
+ # Remove the second request from the buffer.
+ this_request = this_request[end_of_request_index + len(end_of_second_request):]
+ print(' Sending response to the second request:')
+ print(f' {second_response_bytes}')
+ print()
+ time.sleep(0.01)
+ sock.sendall(second_response_bytes)
+ continue
+
+ elif processing_third_request:
+ end_of_request_index = this_request.find(end_of_third_request)
+ if end_of_request_index == -1:
+ # Need more data.
+ break
+ print(' Received the third request:')
+ print(f' {this_request[:end_of_request_index + len(end_of_third_request)]}')
+ processing_third_request = False
+ # Remove the third request from the buffer.
+ this_request = this_request[end_of_request_index + len(end_of_third_request):]
+ print(' Sending response to the third request:')
+ print(f' {third_response_bytes}')
+ print()
+ time.sleep(0.01)
+ sock.sendall(third_response_bytes)
+ received_third_request = True
+ break
+ return all_received_data
+
+
+def _run_server_inside_try(address: str, port: int) -> int:
+ """Run the server to handle the pipelined requests.
+
+ :param address: The address to listen on.
+ :param port: The port to listen on.
+ :return: 0 on success, 1 on failure (appropriate for the command line return
+ code).
+ """
+ with get_listening_socket(address, port) as listening_sock:
+ print(f"Listening on {address}:{port}")
+
+ read_bytes: bytes = b""
+ while len(read_bytes) == 0:
+ print('Waiting for a connection.')
+ with accept_connection(listening_sock) as sock:
+ read_bytes = receive_requests(sock)
+ if len(read_bytes) == 0:
+ # This is probably the PortOpenv4 test. Try again.
+ print("No bytes read on this connection. Trying again.")
+ sock.close()
+ continue
+
+
+def run_server(address: str, port: int) -> int:
+ """Run the server with exception handling.
+
+ :param address: The address to listen on.
+ :param port: The port to listen on.
+ :return: 1 if the third request was received (this is bad, we expect it to
+ be denied), 0 if it wasn't.
+ """
+
+ try:
+ ret = _run_server_inside_try(address, port)
+ except KeyboardInterrupt:
+ print('Handling KeyboardInterrupt.')
+
+ return 1 if received_third_request else 0
+
+
+def main() -> int:
+ """Receive pipelined requests."""
+ args = parse_args()
+ ret = run_server(args.address, args.port)
+
+ print(f'Done. Third request was received: {received_third_request}')
+ return ret
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/tests/gold_tests/redirect/redirect_post.test.py b/tests/gold_tests/redirect/redirect_post.test.py
index ea75a61c1b0..10b21c00b84 100644
--- a/tests/gold_tests/redirect/redirect_post.test.py
+++ b/tests/gold_tests/redirect/redirect_post.test.py
@@ -79,6 +79,7 @@
tr = Test.AddTestRun()
tr.Processes.Default.Command = 'touch largefile.txt && truncate largefile.txt -s 50M && curl -H "Expect: " -i http://127.0.0.1:{0}/redirect1 -F "filename=@./largefile.txt" && rm -f largefile.txt'.format(
ts.Variables.port)
+tr.TimeOut = 10
tr.Processes.Default.StartBefore(ts)
tr.Processes.Default.StartBefore(redirect_serv1)
tr.Processes.Default.StartBefore(redirect_serv2)
diff --git a/tests/gold_tests/timeout/inactive_client_timeout.test.py b/tests/gold_tests/timeout/inactive_client_timeout.test.py
index ea99fd4deca..2dafd845600 100644
--- a/tests/gold_tests/timeout/inactive_client_timeout.test.py
+++ b/tests/gold_tests/timeout/inactive_client_timeout.test.py
@@ -57,6 +57,13 @@
# get applied after the request is sent. In other words, a slow to respond server should not
# trigger the client inactivity timeout.
tr = Test.AddTestRun("Verify that server delay does not trigger client activity timeout.")
-tr.AddVerifierClientProcess("client", replay_file, http_ports=[ts.Variables.port], https_ports=[ts.Variables.ssl_port])
+client = tr.AddVerifierClientProcess("client", replay_file, http_ports=[ts.Variables.port], https_ports=[ts.Variables.ssl_port])
tr.Processes.Default.StartBefore(ts)
tr.Processes.Default.StartBefore(server)
+
+client.Streams.All += Testers.ContainsExpression('x-response: 1', 'Verify that the first response is received')
+client.Streams.All += Testers.ContainsExpression('x-response: 2', 'Verify that the second response is received')
+client.Streams.All += Testers.ContainsExpression('x-response: 3', 'Verify that the third response is received')
+client.Streams.All += Testers.ContainsExpression('x-response: 4', 'Verify that the fourth response is received')
+client.Streams.All += Testers.ContainsExpression('x-response: 5', 'Verify that the fifth response is received')
+client.Streams.All += Testers.ContainsExpression('x-response: 6', 'Verify that the sixth response is received')
diff --git a/tests/gold_tests/timeout/slow_server.yaml b/tests/gold_tests/timeout/slow_server.yaml
index 7a1aeb17244..920b2fc1565 100644
--- a/tests/gold_tests/timeout/slow_server.yaml
+++ b/tests/gold_tests/timeout/slow_server.yaml
@@ -25,7 +25,6 @@ meta:
blocks:
- delayed_response: &delayed_response
- server-response:
delay: 3s
status: 200
@@ -50,7 +49,11 @@ sessions:
- [ Host, www.no_tls.com ]
- [ uuid, 1 ]
- <<: *delayed_response
+ server-response:
+ <<: *delayed_response
+ headers:
+ fields:
+ - [ X-Response, 1 ]
proxy-response:
status: 200
@@ -69,7 +72,11 @@ sessions:
- [ Content-Length, 10 ]
- [ uuid, 2 ]
- <<: *delayed_response
+ server-response:
+ <<: *delayed_response
+ headers:
+ fields:
+ - [ X-Response, 2 ]
proxy-response:
status: 200
@@ -92,7 +99,11 @@ sessions:
- [ Host, www.tls.com ]
- [ uuid, 3 ]
- <<: *delayed_response
+ server-response:
+ <<: *delayed_response
+ headers:
+ fields:
+ - [ X-Response, 3 ]
proxy-response:
status: 200
@@ -115,7 +126,11 @@ sessions:
- [ Content-Length, 10 ]
- [ uuid, 4 ]
- <<: *delayed_response
+ server-response:
+ <<: *delayed_response
+ headers:
+ fields:
+ - [ X-Response, 4 ]
proxy-response:
status: 200
@@ -139,7 +154,11 @@ sessions:
- [ :path, '/path/5' ]
- [ uuid, 5 ]
- <<: *delayed_response
+ server-response:
+ <<: *delayed_response
+ headers:
+ fields:
+ - [ X-Response, 5 ]
proxy-response:
status: 200
@@ -165,7 +184,11 @@ sessions:
content:
size: 10
- <<: *delayed_response
+ server-response:
+ <<: *delayed_response
+ headers:
+ fields:
+ - [ X-Response, 6 ]
proxy-response:
status: 200