304 lines
12 KiB
Diff
304 lines
12 KiB
Diff
From f44cddbebae95935fa640aa19ed5d5786de2aafa Mon Sep 17 00:00:00 2001
|
|
From: AJ Heller <hork@google.com>
|
|
Date: Wed, 12 Jul 2023 15:12:56 -0700
|
|
Subject: [PATCH] [backport][iomgr][EventEngine] Improve server handling of
|
|
file descriptor exhaustion (#33670)
|
|
|
|
Backport of #33656
|
|
---
|
|
src/core/BUILD | 1 +
|
|
.../event_engine/posix_engine/posix_engine.h | 1 +
|
|
.../posix_engine/posix_engine_listener.cc | 30 +++++++++++
|
|
.../posix_engine/posix_engine_listener.h | 3 ++
|
|
src/core/lib/iomgr/tcp_server_posix.cc | 53 ++++++++++++++-----
|
|
src/core/lib/iomgr/tcp_server_utils_posix.h | 12 +++++
|
|
.../iomgr/tcp_server_utils_posix_common.cc | 21 ++++++++
|
|
7 files changed, 107 insertions(+), 14 deletions(-)
|
|
|
|
diff --git a/src/core/BUILD b/src/core/BUILD
|
|
index 3f8ef0d054..d4ae087542 100644
|
|
--- a/src/core/BUILD
|
|
+++ b/src/core/BUILD
|
|
@@ -1908,6 +1908,7 @@ grpc_cc_library(
|
|
"posix_event_engine_tcp_socket_utils",
|
|
"socket_mutator",
|
|
"status_helper",
|
|
+ "time",
|
|
"//:event_engine_base_hdrs",
|
|
"//:gpr",
|
|
],
|
|
diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.h b/src/core/lib/event_engine/posix_engine/posix_engine.h
|
|
index afeda404a7..3a49d65699 100644
|
|
--- a/src/core/lib/event_engine/posix_engine/posix_engine.h
|
|
+++ b/src/core/lib/event_engine/posix_engine/posix_engine.h
|
|
@@ -196,6 +196,7 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
|
|
const DNSResolver::ResolverOptions& options) override;
|
|
void Run(Closure* closure) override;
|
|
void Run(absl::AnyInvocable<void()> closure) override;
|
|
+ // Caution!! The timer implementation cannot create any fds. See #20418.
|
|
TaskHandle RunAfter(Duration when, Closure* closure) override;
|
|
TaskHandle RunAfter(Duration when,
|
|
absl::AnyInvocable<void()> closure) override;
|
|
diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
|
|
index b395bff00d..39f3141afd 100644
|
|
--- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
|
|
+++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
|
|
@@ -23,7 +23,10 @@
|
|
#include <sys/socket.h> // IWYU pragma: keep
|
|
#include <unistd.h> // IWYU pragma: keep
|
|
|
|
+#include <atomic>
|
|
#include <string>
|
|
+#include <tuple>
|
|
+#include <type_traits>
|
|
#include <utility>
|
|
|
|
#include "absl/functional/any_invocable.h"
|
|
@@ -41,6 +44,7 @@
|
|
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
|
|
#include "src/core/lib/event_engine/tcp_socket_utils.h"
|
|
#include "src/core/lib/gprpp/status_helper.h"
|
|
+#include "src/core/lib/gprpp/time.h"
|
|
#include "src/core/lib/iomgr/socket_mutator.h"
|
|
|
|
namespace grpc_event_engine {
|
|
@@ -136,6 +140,32 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
|
|
switch (errno) {
|
|
case EINTR:
|
|
continue;
|
|
+ case EMFILE:
|
|
+ // When the process runs out of fds, accept4() returns EMFILE. When
|
|
+ // this happens, the connection is left in the accept queue until
|
|
+ // either a read event triggers the on_read callback, or time has
|
|
+ // passed and the accept should be re-tried regardless. This callback
|
|
+ // is not cancelled, so a spurious wakeup may occur even when there's
|
|
+ // nothing to accept. This is not a performant code path, but if an fd
|
|
+ // limit has been reached, the system is likely in an unhappy state
|
|
+ // regardless.
|
|
+ GRPC_LOG_EVERY_N_SEC(1, GPR_ERROR, "%s",
|
|
+ "File descriptor limit reached. Retrying.");
|
|
+ handle_->NotifyOnRead(notify_on_accept_);
|
|
+ // Do not schedule another timer if one is already armed.
|
|
+ if (retry_timer_armed_.exchange(true)) return;
|
|
+ // Hold a ref while the retry timer is waiting, to prevent listener
|
|
+ // destruction and the races that would ensue.
|
|
+ Ref();
|
|
+ std::ignore =
|
|
+ engine_->RunAfter(grpc_core::Duration::Seconds(1), [this]() {
|
|
+ retry_timer_armed_.store(false);
|
|
+ if (!handle_->IsHandleShutdown()) {
|
|
+ handle_->SetReadable();
|
|
+ }
|
|
+ Unref();
|
|
+ });
|
|
+ return;
|
|
case EAGAIN:
|
|
case ECONNABORTED:
|
|
handle_->NotifyOnRead(notify_on_accept_);
|
|
diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.h b/src/core/lib/event_engine/posix_engine/posix_engine_listener.h
|
|
index 4bf793b197..ababb97846 100644
|
|
--- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.h
|
|
+++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.h
|
|
@@ -121,6 +121,9 @@ class PosixEngineListenerImpl
|
|
ListenerSocketsContainer::ListenerSocket socket_;
|
|
EventHandle* handle_;
|
|
PosixEngineClosure* notify_on_accept_;
|
|
+ // Tracks the status of a backup timer to retry accept4 calls after file
|
|
+ // descriptor exhaustion.
|
|
+ std::atomic<bool> retry_timer_armed_{false};
|
|
};
|
|
class ListenerAsyncAcceptors : public ListenerSocketsContainer {
|
|
public:
|
|
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
|
|
index dbe7bec338..c0cea6769e 100644
|
|
--- a/src/core/lib/iomgr/tcp_server_posix.cc
|
|
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
|
|
@@ -16,13 +16,17 @@
|
|
//
|
|
//
|
|
|
|
+#include <grpc/support/port_platform.h>
|
|
+
|
|
+#include <utility>
|
|
+
|
|
+#include <grpc/support/atm.h>
|
|
+
|
|
// FIXME: "posix" files shouldn't be depending on _GNU_SOURCE
|
|
#ifndef _GNU_SOURCE
|
|
#define _GNU_SOURCE
|
|
#endif
|
|
|
|
-#include <grpc/support/port_platform.h>
|
|
-
|
|
#include "src/core/lib/iomgr/port.h"
|
|
|
|
#ifdef GRPC_POSIX_SOCKET_TCP_SERVER
|
|
@@ -45,6 +49,7 @@
|
|
|
|
#include <grpc/byte_buffer.h>
|
|
#include <grpc/event_engine/endpoint_config.h>
|
|
+#include <grpc/event_engine/event_engine.h>
|
|
#include <grpc/support/alloc.h>
|
|
#include <grpc/support/log.h>
|
|
#include <grpc/support/sync.h>
|
|
@@ -75,6 +80,8 @@
|
|
#include "src/core/lib/transport/error_utils.h"
|
|
|
|
static std::atomic<int64_t> num_dropped_connections{0};
|
|
+static constexpr grpc_core::Duration kRetryAcceptWaitTime{
|
|
+ grpc_core::Duration::Seconds(1)};
|
|
|
|
using ::grpc_event_engine::experimental::EndpointConfig;
|
|
using ::grpc_event_engine::experimental::EventEngine;
|
|
@@ -339,22 +346,38 @@ static void on_read(void* arg, grpc_error_handle err) {
|
|
if (fd < 0) {
|
|
if (errno == EINTR) {
|
|
continue;
|
|
- } else if (errno == EAGAIN || errno == ECONNABORTED ||
|
|
- errno == EWOULDBLOCK) {
|
|
+ }
|
|
+ // When the process runs out of fds, accept4() returns EMFILE. When this
|
|
+ // happens, the connection is left in the accept queue until either a
|
|
+ // read event triggers the on_read callback, or time has passed and the
|
|
+ // accept should be re-tried regardless. This callback is not cancelled,
|
|
+ // so a spurious wakeup may occur even when there's nothing to accept.
|
|
+ // This is not a performant code path, but if an fd limit has been
|
|
+ // reached, the system is likely in an unhappy state regardless.
|
|
+ if (errno == EMFILE) {
|
|
+ GRPC_LOG_EVERY_N_SEC(1, GPR_ERROR, "%s",
|
|
+ "File descriptor limit reached. Retrying.");
|
|
+ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
|
|
+ if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return;
|
|
+ grpc_timer_init(&sp->retry_timer,
|
|
+ grpc_core::Timestamp::Now() + kRetryAcceptWaitTime,
|
|
+ &sp->retry_closure);
|
|
+ return;
|
|
+ }
|
|
+ if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) {
|
|
grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
|
|
return;
|
|
+ }
|
|
+ gpr_mu_lock(&sp->server->mu);
|
|
+ if (!sp->server->shutdown_listeners) {
|
|
+ gpr_log(GPR_ERROR, "Failed accept4: %s",
|
|
+ grpc_core::StrError(errno).c_str());
|
|
} else {
|
|
- gpr_mu_lock(&sp->server->mu);
|
|
- if (!sp->server->shutdown_listeners) {
|
|
- gpr_log(GPR_ERROR, "Failed accept4: %s",
|
|
- grpc_core::StrError(errno).c_str());
|
|
- } else {
|
|
- // if we have shutdown listeners, accept4 could fail, and we
|
|
- // needn't notify users
|
|
- }
|
|
- gpr_mu_unlock(&sp->server->mu);
|
|
- goto error;
|
|
+ // if we have shutdown listeners, accept4 could fail, and we
|
|
+ // needn't notify users
|
|
}
|
|
+ gpr_mu_unlock(&sp->server->mu);
|
|
+ goto error;
|
|
}
|
|
|
|
if (sp->server->memory_quota->IsMemoryPressureHigh()) {
|
|
@@ -547,6 +570,7 @@ static grpc_error_handle clone_port(grpc_tcp_listener* listener,
|
|
sp->port_index = listener->port_index;
|
|
sp->fd_index = listener->fd_index + count - i;
|
|
GPR_ASSERT(sp->emfd);
|
|
+ grpc_tcp_server_listener_initialize_retry_timer(sp);
|
|
while (listener->server->tail->next != nullptr) {
|
|
listener->server->tail = listener->server->tail->next;
|
|
}
|
|
@@ -780,6 +804,7 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
|
|
if (s->active_ports) {
|
|
grpc_tcp_listener* sp;
|
|
for (sp = s->head; sp; sp = sp->next) {
|
|
+ grpc_timer_cancel(&sp->retry_timer);
|
|
grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server shutdown"));
|
|
}
|
|
}
|
|
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h
|
|
index 26cef0209f..de5a888cff 100644
|
|
--- a/src/core/lib/iomgr/tcp_server_utils_posix.h
|
|
+++ b/src/core/lib/iomgr/tcp_server_utils_posix.h
|
|
@@ -30,6 +30,7 @@
|
|
#include "src/core/lib/iomgr/resolve_address.h"
|
|
#include "src/core/lib/iomgr/socket_utils_posix.h"
|
|
#include "src/core/lib/iomgr/tcp_server.h"
|
|
+#include "src/core/lib/iomgr/timer.h"
|
|
#include "src/core/lib/resource_quota/memory_quota.h"
|
|
|
|
// one listening port
|
|
@@ -52,6 +53,11 @@ typedef struct grpc_tcp_listener {
|
|
// identified while iterating through 'next'.
|
|
struct grpc_tcp_listener* sibling;
|
|
int is_sibling;
|
|
+ // If an accept4() call fails, a timer is started to drain the accept queue in
|
|
+ // case no further connection attempts reach the gRPC server.
|
|
+ grpc_closure retry_closure;
|
|
+ grpc_timer retry_timer;
|
|
+ gpr_atm retry_timer_armed;
|
|
} grpc_tcp_listener;
|
|
|
|
// the overall server
|
|
@@ -139,4 +145,10 @@ grpc_error_handle grpc_tcp_server_prepare_socket(
|
|
// Ruturn true if the platform supports ifaddrs
|
|
bool grpc_tcp_server_have_ifaddrs(void);
|
|
|
|
+// Initialize (but don't start) the timer and callback to retry accept4() on a
|
|
+// listening socket after file descriptors have been exhausted. This must be
|
|
+// called when creating a new listener.
|
|
+void grpc_tcp_server_listener_initialize_retry_timer(
|
|
+ grpc_tcp_listener* listener);
|
|
+
|
|
#endif // GRPC_SRC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H
|
|
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
|
|
index 574fd02d0d..a32f542c4a 100644
|
|
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
|
|
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
|
|
@@ -18,6 +18,8 @@
|
|
|
|
#include <grpc/support/port_platform.h>
|
|
|
|
+#include <grpc/support/atm.h>
|
|
+
|
|
#include "src/core/lib/iomgr/port.h"
|
|
|
|
#ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
|
|
@@ -81,6 +83,24 @@ static int get_max_accept_queue_size(void) {
|
|
return s_max_accept_queue_size;
|
|
}
|
|
|
|
+static void listener_retry_timer_cb(void* arg, grpc_error_handle err) {
|
|
+ // Do nothing if cancelled.
|
|
+ if (!err.ok()) return;
|
|
+ grpc_tcp_listener* listener = static_cast<grpc_tcp_listener*>(arg);
|
|
+ gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
|
|
+ if (!grpc_fd_is_shutdown(listener->emfd)) {
|
|
+ grpc_fd_set_readable(listener->emfd);
|
|
+ }
|
|
+}
|
|
+
|
|
+void grpc_tcp_server_listener_initialize_retry_timer(
|
|
+ grpc_tcp_listener* listener) {
|
|
+ gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
|
|
+ grpc_timer_init_unset(&listener->retry_timer);
|
|
+ GRPC_CLOSURE_INIT(&listener->retry_closure, listener_retry_timer_cb, listener,
|
|
+ grpc_schedule_on_exec_ctx);
|
|
+}
|
|
+
|
|
static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
|
|
const grpc_resolved_address* addr,
|
|
unsigned port_index,
|
|
@@ -112,6 +132,7 @@ static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
|
|
sp->server = s;
|
|
sp->fd = fd;
|
|
sp->emfd = grpc_fd_create(fd, name.c_str(), true);
|
|
+ grpc_tcp_server_listener_initialize_retry_timer(sp);
|
|
|
|
// Check and set fd as prellocated
|
|
if (grpc_tcp_server_pre_allocated_fd(s) == fd) {
|
|
--
|
|
2.33.0
|
|
|