From 3942fb5667077200017a1a7c72672e482e798df6 Mon Sep 17 00:00:00 2001 From: zhangxiaoyu Date: Tue, 1 Nov 2022 14:36:02 +0800 Subject: [PATCH 40/43] bugfix for websocket receive data too long Signed-off-by: zhangxiaoyu --- .../entry/cri/websocket/service/ws_server.cc | 65 ++++++++++++++++--- .../entry/cri/websocket/service/ws_server.h | 5 +- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/src/daemon/entry/cri/websocket/service/ws_server.cc b/src/daemon/entry/cri/websocket/service/ws_server.cc index 41bb3fe8..ea320ff4 100644 --- a/src/daemon/entry/cri/websocket/service/ws_server.cc +++ b/src/daemon/entry/cri/websocket/service/ws_server.cc @@ -126,6 +126,32 @@ void SessionData::CloseSession() sessionMutex->unlock(); } +bool SessionData::IsStdinComplete() +{ + bool c = true; + + if (sessionMutex == nullptr) { + return true; + } + + sessionMutex->lock(); + c = completeStdin; + sessionMutex->unlock(); + + return c; +} + +void SessionData::SetStdinComplete(bool complete) +{ + if (sessionMutex == nullptr) { + return; + } + + sessionMutex->lock(); + completeStdin = complete; + sessionMutex->unlock(); +} + void SessionData::EraseAllMessage() { if (sessionMutex == nullptr) { @@ -330,6 +356,7 @@ int WebsocketServer::GenerateSessionData(SessionData *session, const std::string session->sessionMutex = bufMutex; session->syncCloseSem = syncCloseSem; session->close = false; + session->completeStdin = true; session->containerID = containerID; session->suffix = std::string(suffix); @@ -524,28 +551,44 @@ int WebsocketServer::ResizeTerminal(int socketID, const char *jsonData, size_t l return ret; } -void WebsocketServer::Receive(int socketID, void *in, size_t len) +void WebsocketServer::Receive(int socketID, void *in, size_t len, bool complete) { auto it = m_wsis.find(socketID); if (it == m_wsis.end()) { - ERROR("invailed websocket session!"); + ERROR("Invailed websocket session!"); return; } + if (!it->second->IsStdinComplete()) { + DEBUG("Receive remaning stdin data with length %zu", len); + // Too much data may cause error 'resource temporarily unavaliable' by using 'write' + if (util_write_nointr_in_total(m_wsis[socketID]->pipes.at(1), (char *)in, len) < 0) { + ERROR("Sub write over! err msg: %s", strerror(errno)); + } + goto out; + } + if (*static_cast(in) == WebsocketChannel::RESIZECHANNEL) { if (ResizeTerminal(socketID, (char *)in + 1, len, it->second->containerID, it->second->suffix) != 0) { ERROR("Failed to resize terminal tty"); - return; } - } else if (*static_cast(in) == WebsocketChannel::STDINCHANNEL) { - if (write(m_wsis[socketID]->pipes.at(1), (void *)((char *)in + 1), len - 1) < 0) { - ERROR("sub write over!"); - return; + if (!complete) { + ERROR("Resize data too long"); } - } else { - ERROR("invalid data: %s", (char *)in); return; } + + if (*static_cast(in) == WebsocketChannel::STDINCHANNEL) { + if (util_write_nointr_in_total(m_wsis[socketID]->pipes.at(1), (char *)in + 1, len - 1) < 0) { + ERROR("Sub write over! err msg: %s", strerror(errno)); + } + goto out; + } + + ERROR("Invalid data: %s", (char *)in); + +out: + it->second->SetStdinComplete(complete); } int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) @@ -605,7 +648,9 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, break; case LWS_CALLBACK_RECEIVE: { ReadGuard lock(m_mutex); - WebsocketServer::GetInstance()->Receive(lws_get_socket_fd(wsi), static_cast(in), len); + size_t bytesLen = lws_remaining_packet_payload(wsi); + WebsocketServer::GetInstance()->Receive(lws_get_socket_fd(wsi), static_cast(in), + len, bytesLen == 0); } break; case LWS_CALLBACK_CLOSED: { diff --git a/src/daemon/entry/cri/websocket/service/ws_server.h b/src/daemon/entry/cri/websocket/service/ws_server.h index a2a180ec..7da56818 100644 --- a/src/daemon/entry/cri/websocket/service/ws_server.h +++ b/src/daemon/entry/cri/websocket/service/ws_server.h @@ -44,6 +44,7 @@ struct SessionData { std::list buffer; std::string containerID; std::string suffix; + volatile bool completeStdin; unsigned char *FrontMessage(); void PopMessage(); @@ -51,6 +52,8 @@ struct SessionData { bool IsClosed(); void CloseSession(); void EraseAllMessage(); + bool IsStdinComplete(); + void SetStdinComplete(bool complete); }; class WebsocketServer { @@ -72,7 +75,7 @@ private: std::vector split(std::string str, char r); int CreateContext(); - inline void Receive(int socketID, void *in, size_t len); + inline void Receive(int socketID, void *in, size_t len, bool complete); int Wswrite(struct lws *wsi, const unsigned char *message); inline void DumpHandshakeInfo(struct lws *wsi) noexcept; int RegisterStreamTask(struct lws *wsi) noexcept; -- 2.25.1