From e10bcde6bc507767fc0770c0a1606b6f96494d6b Mon Sep 17 00:00:00 2001 From: wujing Date: Sat, 27 Mar 2021 18:38:08 +0800 Subject: [PATCH 069/104] cri: fix residual IO copy thread in CRI exec operation Signed-off-by: wujing --- .../cri/websocket/service/attach_serve.cc | 20 +- .../cri/websocket/service/attach_serve.h | 2 +- .../entry/cri/websocket/service/exec_serve.cc | 24 +- .../entry/cri/websocket/service/exec_serve.h | 2 +- .../service/route_callback_register.h | 24 +- .../entry/cri/websocket/service/ws_server.cc | 259 +++++++++--------- .../entry/cri/websocket/service/ws_server.h | 53 +++- src/daemon/modules/service/io_handler.c | 53 ++-- src/utils/console/console.c | 38 ++- src/utils/console/console.h | 5 +- 10 files changed, 284 insertions(+), 196 deletions(-) diff --git a/src/daemon/entry/cri/websocket/service/attach_serve.cc b/src/daemon/entry/cri/websocket/service/attach_serve.cc index 01c6b9cf..ec2edc8b 100644 --- a/src/daemon/entry/cri/websocket/service/attach_serve.cc +++ b/src/daemon/entry/cri/websocket/service/attach_serve.cc @@ -16,30 +16,44 @@ #include "attach_serve.h" #include "utils.h" -int AttachServe::Execute(struct lws *wsi, const std::string &token, int read_pipe_fd) +int AttachServe::Execute(lwsContext lws_ctx, const std::string &token, int read_pipe_fd) { + prctl(PR_SET_NAME, "AttachServe"); + service_executor_t *cb = get_service_executor(); if (cb == nullptr || cb->container.attach == nullptr) { + sem_post(lws_ctx.sync_close_sem); return -1; } container_attach_request *container_req = nullptr; if (GetContainerRequest(token, &container_req) != 0) { ERROR("Failed to get contaner request"); + sem_post(lws_ctx.sync_close_sem); + return -1; + } + + // attach operation is non-blocking and cannot pass a local variable in + // free memory when close websocket session in closeWsConnect + lwsContext *lws_context = new (std::nothrow)lwsContext(lws_ctx); + if (lws_context == nullptr) { + ERROR("Out of memory"); + sem_post(lws_ctx.sync_close_sem); return -1; } struct io_write_wrapper stringWriter = { 0 }; - stringWriter.context = (void *)wsi; + stringWriter.context = (void *)(lws_context); stringWriter.write_func = WsWriteStdoutToClient; stringWriter.close_func = closeWsConnect; container_req->attach_stderr = false; container_attach_response *container_res = nullptr; int ret = cb->container.attach(container_req, &container_res, container_req->attach_stdin ? read_pipe_fd : -1, - &stringWriter, nullptr); + container_req->attach_stdout ? &stringWriter : nullptr, nullptr); if (ret != 0) { ERROR("Failed to attach container: %s", container_req->container_id); + sem_post(lws_ctx.sync_close_sem); } free_container_attach_request(container_req); diff --git a/src/daemon/entry/cri/websocket/service/attach_serve.h b/src/daemon/entry/cri/websocket/service/attach_serve.h index 00e2b34e..0d29f35b 100644 --- a/src/daemon/entry/cri/websocket/service/attach_serve.h +++ b/src/daemon/entry/cri/websocket/service/attach_serve.h @@ -33,7 +33,7 @@ public: AttachServe(const AttachServe &) = delete; AttachServe &operator=(const AttachServe &) = delete; virtual ~AttachServe() = default; - int Execute(struct lws *wsi, const std::string &token, int read_pipe_fd) override; + int Execute(lwsContext lws_ctx, const std::string &token, int read_pipe_fd) override; private: int RequestFromCri(const runtime::v1alpha2::AttachRequest &grequest, container_attach_request **request); diff --git a/src/daemon/entry/cri/websocket/service/exec_serve.cc b/src/daemon/entry/cri/websocket/service/exec_serve.cc index 855d28b8..96675734 100644 --- a/src/daemon/entry/cri/websocket/service/exec_serve.cc +++ b/src/daemon/entry/cri/websocket/service/exec_serve.cc @@ -17,25 +17,37 @@ #include "io_wrapper.h" #include "utils.h" -int ExecServe::Execute(struct lws *wsi, const std::string &token, int read_pipe_fd) +int ExecServe::Execute(lwsContext lws_ctx, const std::string &token, int read_pipe_fd) { service_executor_t *cb = get_service_executor(); if (cb == nullptr || cb->container.exec == nullptr) { + sem_post(lws_ctx.sync_close_sem); return -1; } container_exec_request *container_req = nullptr; if (GetContainerRequest(token, &container_req) != 0) { ERROR("Failed to get contaner request"); + sem_post(lws_ctx.sync_close_sem); + return -1; + } + + lwsContext *lws_context = new (std::nothrow)lwsContext(lws_ctx); + if (lws_context == nullptr) { + ERROR("Out of memory"); + sem_post(lws_ctx.sync_close_sem); return -1; } struct io_write_wrapper StdoutstringWriter = { 0 }; - StdoutstringWriter.context = (void *)wsi; + StdoutstringWriter.context = (void *)lws_context; StdoutstringWriter.write_func = WsWriteStdoutToClient; + // the close function of StderrstringWriter is preferred unless StderrstringWriter is nullptr + StdoutstringWriter.close_func = container_req->attach_stderr ? nullptr : closeWsStream; struct io_write_wrapper StderrstringWriter = { 0 }; - StderrstringWriter.context = (void *)wsi; + StderrstringWriter.context = (void *)lws_context; StderrstringWriter.write_func = WsWriteStderrToClient; + StderrstringWriter.close_func = container_req->attach_stderr ? closeWsStream : nullptr; container_exec_response *container_res = nullptr; int ret = cb->container.exec(container_req, &container_res, container_req->attach_stdin ? read_pipe_fd : -1, @@ -48,17 +60,17 @@ int ExecServe::Execute(struct lws *wsi, const std::string &token, int read_pipe_ } else { message = "Failed to call exec container callback. "; } - WsWriteStdoutToClient(wsi, message.c_str(), message.length()); + WsWriteStdoutToClient(lws_context, message.c_str(), message.length()); } if (container_res != nullptr && container_res->exit_code != 0) { std::string exit_info = "Exit code :" + std::to_string((int)container_res->exit_code) + "\n"; - WsWriteStdoutToClient(wsi, exit_info.c_str(), exit_info.length()); + WsWriteStdoutToClient(lws_context, exit_info.c_str(), exit_info.length()); } free_container_exec_request(container_req); free_container_exec_response(container_res); - (void)closeWsConnect((void*)wsi, nullptr); + closeWsConnect((void*)lws_context, nullptr); return ret; } diff --git a/src/daemon/entry/cri/websocket/service/exec_serve.h b/src/daemon/entry/cri/websocket/service/exec_serve.h index b29c3e1e..093f076a 100644 --- a/src/daemon/entry/cri/websocket/service/exec_serve.h +++ b/src/daemon/entry/cri/websocket/service/exec_serve.h @@ -37,7 +37,7 @@ public: ExecServe(const ExecServe &) = delete; ExecServe &operator=(const ExecServe &) = delete; virtual ~ExecServe() = default; - int Execute(struct lws *wsi, const std::string &token, int read_pipe_fd) override; + int Execute(lwsContext lws_ctx, const std::string &token, int read_pipe_fd) override; private: int RequestFromCri(const runtime::v1alpha2::ExecRequest &grequest, container_exec_request **request); diff --git a/src/daemon/entry/cri/websocket/service/route_callback_register.h b/src/daemon/entry/cri/websocket/service/route_callback_register.h index 5d021d17..9c6bdd64 100644 --- a/src/daemon/entry/cri/websocket/service/route_callback_register.h +++ b/src/daemon/entry/cri/websocket/service/route_callback_register.h @@ -21,14 +21,21 @@ #include #include #include +#include #include "isula_libutils/log.h" + +struct lwsContext { + int fd; + sem_t *sync_close_sem; +}; + class StreamingServeInterface { public: StreamingServeInterface() = default; StreamingServeInterface(const StreamingServeInterface &) = delete; StreamingServeInterface &operator=(const StreamingServeInterface &) = delete; virtual ~StreamingServeInterface() = default; - virtual int Execute(struct lws *wsi, const std::string &token, int read_pipe_fd) = 0; + virtual int Execute(lwsContext lws_ctx, const std::string &token, int read_pipe_fd) = 0; }; class RouteCallbackRegister { @@ -42,15 +49,14 @@ public: return static_cast(m_registeredcallbacks.count(method)); } - int HandleCallback(struct lws *wsi, const std::string &method, - const std::string &token, - int read_pipe_fd) + int HandleCallback(lwsContext lws_ctx, const std::string &method, + const std::string &token, int read_pipe_fd) { auto it = m_registeredcallbacks.find(method); if (it != m_registeredcallbacks.end()) { std::shared_ptr callback = it->second; if (callback) { - return callback->Execute(wsi, token, read_pipe_fd); + return callback->Execute(lws_ctx, token, read_pipe_fd); } } ERROR("invalid method!"); @@ -69,21 +75,21 @@ private: class StreamTask { public: - StreamTask(RouteCallbackRegister *invoker, struct lws *wsi, + StreamTask(RouteCallbackRegister *invoker, lwsContext lws_ctx, const std::string &method, const std::string &token, int read_pipe_fd) - : m_invoker(invoker), m_wsi(wsi), m_method(method), m_token(token), + : m_invoker(invoker), m_lws_ctx(lws_ctx), m_method(method), m_token(token), m_read_pipe_fd(read_pipe_fd) {} StreamTask(const StreamTask &) = delete; StreamTask &operator=(const StreamTask &) = delete; virtual ~StreamTask() = default; int Run() { - return m_invoker->HandleCallback(m_wsi, m_method, m_token, m_read_pipe_fd); + return m_invoker->HandleCallback(m_lws_ctx, m_method, m_token, m_read_pipe_fd); } private: RouteCallbackRegister *m_invoker{ nullptr }; - struct lws *m_wsi; + lwsContext m_lws_ctx; std::string m_method; std::string m_token; int m_read_pipe_fd; diff --git a/src/daemon/entry/cri/websocket/service/ws_server.cc b/src/daemon/entry/cri/websocket/service/ws_server.cc index 795d2c1e..4993e1e8 100644 --- a/src/daemon/entry/cri/websocket/service/ws_server.cc +++ b/src/daemon/entry/cri/websocket/service/ws_server.cc @@ -30,7 +30,6 @@ struct lws_context *WebsocketServer::m_context = nullptr; std::atomic WebsocketServer::m_instance; RWMutex WebsocketServer::m_mutex; std::unordered_map WebsocketServer::m_wsis; -std::unordered_set WebsocketServer::m_activeSession; WebsocketServer *WebsocketServer::GetInstance() noexcept { @@ -159,12 +158,10 @@ void WebsocketServer::CloseAllWsSession() { WriteGuard lock(m_mutex); for (auto it = m_wsis.begin(); it != m_wsis.end(); ++it) { - free(it->second.buf); + it->second.EraseAllMessage(); close(it->second.pipes.at(0)); close(it->second.pipes.at(1)); - it->second.sended = true; delete it->second.buf_mutex; - delete it->second.sended_mutex; } m_wsis.clear(); } @@ -173,59 +170,54 @@ void WebsocketServer::CloseWsSession(int socketID) { auto it = m_wsis.find(socketID); if (it != m_wsis.end()) { - free(it->second.buf); + it->second.EraseAllMessage(); + // close the pipe write endpoint first, make sure io copy thread exit, + // otherwise epoll will trigger EOF + if (it->second.pipes.at(1) >= 0) { + close(it->second.pipes.at(1)); + it->second.pipes.at(1) = -1; + } + (void)sem_wait(it->second.sync_close_sem); + (void)sem_destroy(it->second.sync_close_sem); close(it->second.pipes.at(0)); - close(it->second.pipes.at(1)); - it->second.sended = true; delete it->second.buf_mutex; - delete it->second.sended_mutex; m_wsis.erase(it); } } -void WebsocketServer::RecordSession(struct lws *wsi) -{ - m_activeSession.insert(wsi); -} - -void WebsocketServer::RemoveSession(struct lws *wsi) -{ - m_activeSession.erase(wsi); -} - -bool WebsocketServer::IsValidSession(struct lws *wsi) -{ - return m_activeSession.count(wsi) != 0; -} - -int WebsocketServer::DumpHandshakeInfo(struct lws *wsi) noexcept +int WebsocketServer::GenerateSessionData(session_data &session) noexcept { int read_pipe_fd[PIPE_FD_NUM]; if (InitRWPipe(read_pipe_fd) < 0) { ERROR("failed to init read/write pipe!"); + return -1; } - session_data session; - session.pipes = std::array { read_pipe_fd[0], read_pipe_fd[1] }; + std::mutex *buf_mutex = new std::mutex; + sem_t *sync_close_sem = new sem_t; - int socketID = lws_get_socket_fd(wsi); - m_wsis.insert(std::make_pair(socketID, std::move(session))); - m_wsis[socketID].buf = (unsigned char *)util_common_calloc_s(LWS_PRE + MAX_MSG_BUFFER_SIZE + 1); - if (m_wsis[socketID].buf == nullptr) { - ERROR("Out of memory"); + if (sem_init(sync_close_sem, 0, 0) != 0) { + ERROR("Semaphore initialization failed"); + close(read_pipe_fd[1]); + close(read_pipe_fd[0]); + delete buf_mutex; + delete sync_close_sem; return -1; } - m_wsis[socketID].buf_mutex = new std::mutex; - m_wsis[socketID].sended_mutex = new std::mutex; - m_wsis[socketID].SetProcessingStatus(false); - int len; - char buf[MAX_BUF_LEN] { 0 }; + session.pipes = std::array { read_pipe_fd[0], read_pipe_fd[1] }; + session.buf_mutex = buf_mutex; + session.sync_close_sem = sync_close_sem; + return 0; +} + +int WebsocketServer::RegisterStreamTask(struct lws *wsi) noexcept +{ + char buf[MAX_BUF_LEN] { 0 }; lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI); if (strlen(buf) == 0) { ERROR("invalid url"); - CloseWsSession(socketID); return -1; } @@ -237,60 +229,68 @@ int WebsocketServer::DumpHandshakeInfo(struct lws *wsi) noexcept !m_handler.IsValidMethod(vec.at(1)) || !cache->IsValidToken(vec.at(2))) { ERROR("invalid url(%s): incorrect format!", buf); - CloseWsSession(socketID); return -1; } + session_data session; + if (GenerateSessionData(session) != 0) { + ERROR("failed to fill generate session data"); + return -1; + } + + int socketID = lws_get_socket_fd(wsi); + m_wsis.insert(std::make_pair(socketID, std::move(session))); + + lwsContext lwsCtx = { + .fd = socketID, + .sync_close_sem = m_wsis[socketID].sync_close_sem, + }; std::thread streamTh([ = ]() { - StreamTask(&m_handler, wsi, vec.at(1), vec.at(2), m_wsis[socketID].pipes.at(0)).Run(); + StreamTask(&m_handler, lwsCtx, vec.at(1), vec.at(2), m_wsis[socketID].pipes.at(0)).Run(); }); streamTh.detach(); - RecordSession(wsi); + + return 0; +} + +void WebsocketServer::DumpHandshakeInfo(struct lws *wsi) noexcept +{ int n = 0; const unsigned char *c = nullptr; + char buf[MAX_BUF_LEN] { 0 }; + do { c = lws_token_to_string((lws_token_indexes)n); if (c == nullptr) { n++; continue; } - len = lws_hdr_total_length(wsi, (lws_token_indexes)n); + int len = lws_hdr_total_length(wsi, (lws_token_indexes)n); if (len == 0 || (static_cast(len) > sizeof(buf) - 1)) { n++; continue; } + lws_hdr_copy(wsi, buf, sizeof(buf), (lws_token_indexes)n); buf[sizeof(buf) - 1] = '\0'; DEBUG(" %s = %s", (char *)c, buf); n++; } while (c != nullptr); - - return 0; } -int WebsocketServer::Wswrite(struct lws *wsi, void *in, size_t len) +int WebsocketServer::Wswrite(struct lws *wsi, const unsigned char *message) { auto it = m_wsis.find(lws_get_socket_fd(wsi)); if (it != m_wsis.end()) { - if (it->second.close) { - DEBUG("websocket session disconnected"); - return -1; - } - it->second.buf_mutex->lock(); - auto &buf = it->second.buf; - if (strlen((const char *)(&buf[LWS_PRE + 1])) == 0) { - it->second.buf_mutex->unlock(); + if (strlen((const char *)(&message[LWS_PRE + 1])) == 0) { return 0; } - int n = lws_write(wsi, (unsigned char *)(&buf[LWS_PRE]), - strlen((const char *)(&buf[LWS_PRE + 1])) + 1, LWS_WRITE_TEXT); + int n = lws_write(wsi, (unsigned char *)(&message[LWS_PRE]), + strlen((const char *)(&message[LWS_PRE + 1])) + 1, LWS_WRITE_TEXT); if (n < 0) { - it->second.buf_mutex->unlock(); ERROR("ERROR %d writing to socket, hanging up", n); return -1; } - (void)memset(buf, 0, LWS_PRE + MAX_MSG_BUFFER_SIZE + 1); - it->second.buf_mutex->unlock(); } return 0; @@ -314,16 +314,6 @@ void WebsocketServer::Receive(int socketID, void *in, size_t len) } } -void WebsocketServer::SetLwsSendedFlag(int socketID, bool sended) -{ - if (m_wsis.count(socketID) == 0) { - return; - } - m_wsis[socketID].sended_mutex->lock(); - m_wsis[socketID].sended = sended; - m_wsis[socketID].sended_mutex->unlock(); -} - int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { @@ -334,10 +324,13 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, return -1; case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: { WriteGuard lock(m_mutex); - if (WebsocketServer::GetInstance()->DumpHandshakeInfo(wsi)) { + WebsocketServer::GetInstance()->DumpHandshakeInfo(wsi); + if (WebsocketServer::GetInstance()->RegisterStreamTask(wsi) != 0) { // return non-zero here and kill the connection return -1; } + // Trigger polling in LWS_CALLBACK_SERVER_WRITEABLE + lws_callback_on_writable(wsi); } break; case LWS_CALLBACK_ESTABLISHED: { @@ -347,13 +340,32 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, case LWS_CALLBACK_SERVER_WRITEABLE: { ReadGuard lock(m_mutex); int socketID = lws_get_socket_fd(wsi); - if (WebsocketServer::GetInstance()->Wswrite(wsi, in, len)) { - WebsocketServer::GetInstance()->SetLwsSendedFlag(socketID, true); + auto it = m_wsis.find(socketID); + if (it == m_wsis.end()) { + DEBUG("invalid session!"); // return nonzero from the user callback to close the connection // and callback with the reason of LWS_CALLBACK_CLOSED return -1; } - WebsocketServer::GetInstance()->SetLwsSendedFlag(socketID, true); + + while (!it->second.buffer.empty()) { + unsigned char *message = it->second.FrontMessage(); + // send success! free it and erase for list + if (WebsocketServer::GetInstance()->Wswrite(wsi, (const unsigned char *)message) == 0) { + free(message); + it->second.PopMessage(); + } else { + // Another case ret > 0, send fail! keep message and send it again! + // Or maybe the client was shut down abnormally + break; + } + } + + if (it->second.close) { + DEBUG("websocket session disconnected"); + return -1; + } + lws_callback_on_writable(wsi); } break; case LWS_CALLBACK_RECEIVE: { @@ -364,8 +376,8 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, case LWS_CALLBACK_CLOSED: { WriteGuard lock(m_mutex); DEBUG("connection has been closed"); - WebsocketServer::GetInstance()->RemoveSession(wsi); - WebsocketServer::GetInstance()->CloseWsSession(lws_get_socket_fd(wsi)); + int socketID = lws_get_socket_fd(wsi); + WebsocketServer::GetInstance()->CloseWsSession(socketID); } break; default: @@ -377,6 +389,9 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, void WebsocketServer::ServiceWorkThread(int threadid) { int n = 0; + + prctl(PR_SET_NAME, "WebsocketServer"); + while (n >= 0 && !m_force_exit) { n = lws_service(m_context, 0); } @@ -410,73 +425,37 @@ void WebsocketServer::Wait() } namespace { -auto PrepareWsiSession(int socketID) -> session_data * -{ - WebsocketServer *server = WebsocketServer::GetInstance(); - server->ReadLockAllWsSession(); - auto itor = server->GetWsisData().find(socketID); - if (itor == server->GetWsisData().end()) { - ERROR("invalid session!"); - server->UnlockAllWsSession(); - return nullptr; - } - server->SetLwsSendedFlag(socketID, false); - server->UnlockAllWsSession(); - - return &itor->second; -} - -void DoWriteToClient(struct lws *wsi, session_data *session, +void DoWriteToClient(int fd, session_data *session, const void *data, size_t len, WebsocketChannel channel) { - session->buf_mutex->lock(); + unsigned char *buf = (unsigned char *)util_common_calloc_s(LWS_PRE + MAX_MSG_BUFFER_SIZE + 1); + if (buf == nullptr) { + ERROR("Out of memory"); + return; + } // Determine if it is standard output channel or error channel - (void)memset(session->buf, 0, LWS_PRE + MAX_MSG_BUFFER_SIZE + 1); - session->buf[LWS_PRE] = channel; - - (void)memcpy(&session->buf[LWS_PRE + 1], (void *)data, len); + buf[LWS_PRE] = channel; - lws_callback_on_writable(wsi); + (void)memcpy(&buf[LWS_PRE + 1], (void *)data, len); - session->buf_mutex->unlock(); -} - -void EnsureWrited(struct lws *wsi, session_data *session) -{ - const int RETRIES = 10; - const int CHECK_PERIOD_SECOND = 1; - const int TRIGGER_PERIOD_MS = 1; - auto start = std::chrono::system_clock::now(); - int count = 0; - - while (!session->sended && count < RETRIES) { - auto end = std::chrono::system_clock::now(); - auto duration = std::chrono::duration_cast(end - start); - double spend_time = static_cast(duration.count()) * std::chrono::microseconds::period::num / - std::chrono::microseconds::period::den; - if (spend_time > CHECK_PERIOD_SECOND) { - lws_callback_on_writable(wsi); - std::this_thread::sleep_for(std::chrono::milliseconds(TRIGGER_PERIOD_MS)); - start = std::chrono::system_clock::now(); - count++; - } - std::this_thread::sleep_for(std::chrono::milliseconds(TRIGGER_PERIOD_MS)); - } + // push back to message list + session->PushMessage(buf); } ssize_t WsWriteToClient(void *context, const void *data, size_t len, WebsocketChannel channel) { - struct lws *wsi = static_cast(context); + auto *lwsCtx = static_cast(context); + int fd = lwsCtx->fd; - session_data *session = PrepareWsiSession(lws_get_socket_fd(wsi)); - if (session == nullptr) { + WebsocketServer *server = WebsocketServer::GetInstance(); + auto itor = server->GetWsisData().find(fd); + if (itor == server->GetWsisData().end()) { + ERROR("invalid session!"); return 0; } - DoWriteToClient(wsi, session, data, len, channel); - - EnsureWrited(wsi, session); + DoWriteToClient(fd, &itor->second, data, len, channel); return static_cast(len); } @@ -495,23 +474,37 @@ ssize_t WsWriteStderrToClient(void *context, const void *data, size_t len) int closeWsConnect(void *context, char **err) { (void)err; - struct lws *wsi = static_cast(context); + auto *lwsCtx = static_cast(context); + + if (lwsCtx->sync_close_sem != nullptr) { + (void)sem_post(lwsCtx->sync_close_sem); + } WebsocketServer *server = WebsocketServer::GetInstance(); server->ReadLockAllWsSession(); - auto it = server->GetWsisData().find(lws_get_socket_fd(wsi)); + auto it = server->GetWsisData().find(lwsCtx->fd); if (it == server->GetWsisData().end()) { server->UnlockAllWsSession(); ERROR("websocket session not exist"); + delete lwsCtx; return -1; } - + // will close websocket session on LWS_CALLBACK_SERVER_WRITEABLE polling it->second.close = true; - // close websocket session - if (server->IsValidSession(wsi)) { - lws_callback_on_writable(wsi); - } server->UnlockAllWsSession(); + delete lwsCtx; return 0; } + +int closeWsStream(void *context, char **err) +{ + (void)err; + auto *lwsCtx = static_cast(context); + + if (lwsCtx->sync_close_sem != nullptr) { + (void)sem_post(lwsCtx->sync_close_sem); + } + + return 0; +} \ No newline at end of file diff --git a/src/daemon/entry/cri/websocket/service/ws_server.h b/src/daemon/entry/cri/websocket/service/ws_server.h index cb431f7f..ebba3305 100644 --- a/src/daemon/entry/cri/websocket/service/ws_server.h +++ b/src/daemon/entry/cri/websocket/service/ws_server.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include "route_callback_register.h" @@ -47,20 +48,45 @@ enum WebsocketChannel { struct session_data { std::array pipes; - unsigned char *buf; - volatile bool sended { false }; volatile bool close { false }; - volatile bool in_processing { false }; std::mutex *buf_mutex; - std::mutex *sended_mutex; + sem_t *sync_close_sem; + std::list buffer; - void SetProcessingStatus(bool status) + unsigned char *FrontMessage() { - in_processing = status; + unsigned char *message = nullptr; + + buf_mutex->lock(); + message = buffer.front(); + buf_mutex->unlock(); + + return message; + } + + void PopMessage() + { + buf_mutex->lock(); + buffer.pop_front(); + buf_mutex->unlock(); } - bool GetProcessingStatus() const + + void PushMessage(unsigned char *message) + { + buf_mutex->lock(); + buffer.push_back(message); + buf_mutex->unlock(); + } + + void EraseAllMessage() { - return in_processing; + buf_mutex->lock(); + for (auto iter = buffer.begin(); iter != buffer.end();) { + free(*iter); + *iter = NULL; + iter = buffer.erase(iter); + } + buf_mutex->unlock(); } }; @@ -77,7 +103,6 @@ public: void SetLwsSendedFlag(int socketID, bool sended); void ReadLockAllWsSession(); void UnlockAllWsSession(); - bool IsValidSession(struct lws *wsi); private: WebsocketServer(); @@ -89,15 +114,15 @@ private: static void EmitLog(int level, const char *line); int CreateContext(); inline void Receive(int socketID, void *in, size_t len); - int Wswrite(struct lws *wsi, void *in, size_t len); - inline int DumpHandshakeInfo(struct lws *wsi) noexcept; + int Wswrite(struct lws *wsi, const unsigned char *message); + inline void DumpHandshakeInfo(struct lws *wsi) noexcept; + int RegisterStreamTask(struct lws *wsi) noexcept; + int GenerateSessionData(session_data &session) noexcept; static int Callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); void ServiceWorkThread(int threadid); void CloseWsSession(int socketID); void CloseAllWsSession(); - void RecordSession(struct lws *wsi); - void RemoveSession(struct lws *wsi); private: static RWMutex m_mutex; @@ -110,7 +135,6 @@ private: }; RouteCallbackRegister m_handler; static std::unordered_map m_wsis; - static std::unordered_set m_activeSession; url::URLDatum m_url; int m_listenPort; }; @@ -118,6 +142,7 @@ private: ssize_t WsWriteStdoutToClient(void *context, const void *data, size_t len); ssize_t WsWriteStderrToClient(void *context, const void *data, size_t len); int closeWsConnect(void *context, char **err); +int closeWsStream(void *context, char **err); #endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_WS_SERVER_H diff --git a/src/daemon/modules/service/io_handler.c b/src/daemon/modules/service/io_handler.c index 75a36483..d57894f2 100644 --- a/src/daemon/modules/service/io_handler.c +++ b/src/daemon/modules/service/io_handler.c @@ -183,6 +183,7 @@ struct io_copy_arg { io_type dsttype; void *dst; int dstfifoflag; + transfer_channel_type channel; }; struct io_copy_thread_arg { @@ -194,7 +195,7 @@ struct io_copy_thread_arg { }; static void io_copy_thread_cleanup(struct io_write_wrapper *writers, struct io_copy_thread_arg *thread_arg, int *infds, - int *outfds, int *srcfds, size_t len) + int *outfds, int *srcfds, transfer_channel_type *channels, size_t len) { size_t i = 0; for (i = 0; i < len; i++) { @@ -214,9 +215,11 @@ static void io_copy_thread_cleanup(struct io_write_wrapper *writers, struct io_c free(infds); free(outfds); free(writers); + free(channels); } -static int io_copy_init_fds(size_t len, int **infds, int **outfds, int **srcfds, struct io_write_wrapper **writers) +static int io_copy_init_fds(size_t len, int **infds, int **outfds, int **srcfds, + struct io_write_wrapper **writers, transfer_channel_type **channels) { size_t i; @@ -252,9 +255,19 @@ static int io_copy_init_fds(size_t len, int **infds, int **outfds, int **srcfds, ERROR("Out of memory"); return -1; } + + *channels = util_common_calloc_s(sizeof(transfer_channel_type) * len); + if (*channels == NULL) { + ERROR("Out of memory"); + return -1; + } + + for (i = 0; i < len; i++) { + (*channels)[i] = MAX_CHANNEL; + } return 0; -} +} typedef int (*src_io_type_handle)(int index, struct io_copy_arg *copy_arg, int *infds, int *srcfds); struct src_io_copy_handler { @@ -292,7 +305,8 @@ static int handle_src_io_max(int index, struct io_copy_arg *copy_arg, int *infds return -1; } -static int io_copy_make_srcfds(size_t len, struct io_copy_arg *copy_arg, int *infds, int *srcfds) +static int io_copy_make_srcfds(size_t len, struct io_copy_arg *copy_arg, int *infds, + int *srcfds, transfer_channel_type *channels) { size_t i; @@ -307,6 +321,7 @@ static int io_copy_make_srcfds(size_t len, struct io_copy_arg *copy_arg, int *in if (src_handler_jump_table[(int)(copy_arg[i].srctype)].handle(i, copy_arg, infds, srcfds) != 0) { return -1; } + channels[i] = copy_arg[i].channel; } return 0; @@ -412,6 +427,7 @@ static void *io_copy_thread_main(void *arg) int *outfds = NULL; // recored fds to close int *srcfds = NULL; struct io_write_wrapper *writers = NULL; + transfer_channel_type *channels = NULL; int sync_fd = thread_arg->sync_fd; bool posted = false; @@ -426,11 +442,11 @@ static void *io_copy_thread_main(void *arg) (void)prctl(PR_SET_NAME, "IoCopy"); len = thread_arg->len; - if (io_copy_init_fds(len, &infds, &outfds, &srcfds, &writers) != 0) { + if (io_copy_init_fds(len, &infds, &outfds, &srcfds, &writers, &channels) != 0) { goto err; } - if (io_copy_make_srcfds(len, copy_arg, infds, srcfds) != 0) { + if (io_copy_make_srcfds(len, copy_arg, infds, srcfds, channels) != 0) { goto err; } @@ -440,12 +456,12 @@ static void *io_copy_thread_main(void *arg) sem_post(&thread_arg->wait_sem); posted = true; - (void)console_loop_io_copy(sync_fd, srcfds, writers, len); + (void)console_loop_io_copy(sync_fd, srcfds, writers, channels, len); err: if (!posted) { sem_post(&thread_arg->wait_sem); } - io_copy_thread_cleanup(writers, thread_arg, infds, outfds, srcfds, len); + io_copy_thread_cleanup(writers, thread_arg, infds, outfds, srcfds, channels, len); DAEMON_CLEAR_ERRMSG(); return NULL; } @@ -480,26 +496,27 @@ static int start_io_copy_thread(int sync_fd, bool detach, struct io_copy_arg *co } static void add_io_copy_element(struct io_copy_arg *element, io_type srctype, void *src, io_type dsttype, void *dst, - int dstfifoflag) + int dstfifoflag, transfer_channel_type channel) { element->srctype = srctype; element->src = src; element->dsttype = dsttype; element->dst = dst; element->dstfifoflag = dstfifoflag; + element->channel = channel; } /* ----------------------------------------------------------------------------------- | CHANNEL | iSula iSulad lxc | ----------------------------------------------------------------------------------- - | | fifoin fifos[0] | + | | fifoin | stdin_fd fifos[0] | | IN | RDWR --------> RD RDWR --------> RD | ----------------------------------------------------------------------------------- - | | fifoout fifos[1] | + | | fifoout | stdout_handler fifos[1] | | OUT | RD <-------- WR RD <-------- WR | ----------------------------------------------------------------------------------- - | | fifoerr fifos[2] | + | | fifoerr stderr_handler fifos[2] | | ERR | RD <-------- WR RD <-------- WR | ----------------------------------------------------------------------------------- */ @@ -513,29 +530,29 @@ int ready_copy_io_data(int sync_fd, bool detach, const char *fifoin, const char if (fifoin != NULL) { // fifoin : iSula -> iSulad read // fifos[0] : iSulad -> lxc write - add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifoin, IO_FIFO, (void *)fifos[0], O_RDWR); + add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifoin, IO_FIFO, (void *)fifos[0], O_RDWR, STDIN_CHANNEL); } if (fifoout != NULL) { // fifos[1] : lxc -> iSulad read // fifoout : iSulad -> iSula write - add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[1], IO_FIFO, (void *)fifoout, O_WRONLY); + add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[1], IO_FIFO, (void *)fifoout, O_WRONLY, STDOUT_CHANNEL); } if (fifoerr != NULL) { - add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[2], IO_FIFO, (void *)fifoerr, O_WRONLY); + add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[2], IO_FIFO, (void *)fifoerr, O_WRONLY, STDERR_CHANNEL); } if (stdin_fd > 0) { - add_io_copy_element(&io_copy[len++], IO_FD, &stdin_fd, IO_FIFO, (void *)fifos[0], O_RDWR); + add_io_copy_element(&io_copy[len++], IO_FD, &stdin_fd, IO_FIFO, (void *)fifos[0], O_RDWR, STDIN_CHANNEL); } if (stdout_handler != NULL) { - add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[1], IO_FUNC, stdout_handler, O_WRONLY); + add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[1], IO_FUNC, stdout_handler, O_WRONLY, STDOUT_CHANNEL); } if (stderr_handler != NULL) { - add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[2], IO_FUNC, stderr_handler, O_WRONLY); + add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[2], IO_FUNC, stderr_handler, O_WRONLY, STDERR_CHANNEL); } if (start_io_copy_thread(sync_fd, detach, io_copy, len, tid) != 0) { diff --git a/src/utils/console/console.c b/src/utils/console/console.c index 1aedd0c9..b1d8b6dc 100644 --- a/src/utils/console/console.c +++ b/src/utils/console/console.c @@ -328,7 +328,7 @@ out: return ret; } -static void client_console_tty_state_close(struct epoll_descr *descr, const struct tty_state *ts) +static void console_tty_state_close(struct epoll_descr *descr, const struct tty_state *ts) { if (ts->stdin_reader >= 0) { epoll_loop_del_handler(descr, ts->stdin_reader); @@ -430,13 +430,14 @@ int console_loop_with_std_fd(int stdinfd, int stdoutfd, int stderrfd, int fifoin ret = safe_epoll_loop(&descr); err_out: - client_console_tty_state_close(&descr, &ts); + console_tty_state_close(&descr, &ts); epoll_loop_close(&descr); return ret; } /* console loop copy */ -int console_loop_io_copy(int sync_fd, const int *srcfds, struct io_write_wrapper *writers, size_t len) +int console_loop_io_copy(int sync_fd, const int *srcfds, struct io_write_wrapper *writers, + transfer_channel_type *channels, size_t len) { int ret = 0; size_t i = 0; @@ -460,17 +461,35 @@ int console_loop_io_copy(int sync_fd, const int *srcfds, struct io_write_wrapper } for (i = 0; i < len; i++) { - // Reusing ts.stdout_reader and ts.stdout_writer for coping io - ts[i].stdout_reader = srcfds[i]; - ts[i].stdout_writer.context = writers[i].context; - ts[i].stdout_writer.write_func = writers[i].write_func; + // initial tty_state + ts[i].stdin_reader = -1; + ts[i].stdout_reader = -1; + ts[i].stderr_reader = -1; ts[i].sync_fd = -1; - ret = epoll_loop_add_handler(&descr, ts[i].stdout_reader, console_cb_stdio_copy, &ts[i]); + if (channels[i] == STDIN_CHANNEL) { + ts[i].stdin_reader = srcfds[i]; + ts[i].stdin_writer.context = writers[i].context; + ts[i].stdin_writer.write_func = writers[i].write_func; + ret = epoll_loop_add_handler(&descr, ts[i].stdin_reader, console_cb_stdio_copy, &ts[i]); + } else if (channels[i] == STDOUT_CHANNEL) { + // Reusing ts.stdout_reader and ts.stdout_writer for coping io + ts[i].stdout_reader = srcfds[i]; + ts[i].stdout_writer.context = writers[i].context; + ts[i].stdout_writer.write_func = writers[i].write_func; + ret = epoll_loop_add_handler(&descr, ts[i].stdout_reader, console_cb_stdio_copy, &ts[i]); + } else { + // Reusing ts.stderr_reader and ts.stderr_writer for coping io + ts[i].stderr_reader = srcfds[i]; + ts[i].stderr_writer.context = writers[i].context; + ts[i].stderr_writer.write_func = writers[i].write_func; + ret = epoll_loop_add_handler(&descr, ts[i].stderr_reader, console_cb_stdio_copy, &ts[i]); + } if (ret != 0) { ERROR("Add handler for masterfd failed"); goto err_out; } } + if (sync_fd >= 0) { ts[i].sync_fd = sync_fd; epoll_loop_add_handler(&descr, ts[i].sync_fd, console_cb_stdio_copy, &ts[i]); @@ -483,9 +502,8 @@ int console_loop_io_copy(int sync_fd, const int *srcfds, struct io_write_wrapper ret = safe_epoll_loop(&descr); err_out: - for (i = 0; i < (len + 1); i++) { - epoll_loop_del_handler(&descr, ts[i].stdout_reader); + console_tty_state_close(&descr, &ts[i]); } epoll_loop_close(&descr); free(ts); diff --git a/src/utils/console/console.h b/src/utils/console/console.h index 0dfe19d3..63103d2b 100644 --- a/src/utils/console/console.h +++ b/src/utils/console/console.h @@ -43,6 +43,8 @@ struct tty_state { bool ignore_stdin_close; }; +typedef enum { STDIN_CHANNEL, STDOUT_CHANNEL, STDERR_CHANNEL, MAX_CHANNEL} transfer_channel_type; + int console_fifo_name(const char *rundir, const char *subpath, const char *stdflag, char *fifo_name, size_t fifo_name_sz, char *fifo_path, size_t fifo_path_sz, bool do_mkdirp); @@ -59,7 +61,8 @@ void console_fifo_close(int fd); int console_loop_with_std_fd(int stdinfd, int stdoutfd, int stderrfd, int fifoinfd, int fifooutfd, int fifoerrfd, int tty_exit, bool tty); -int console_loop_io_copy(int sync_fd, const int *srcfds, struct io_write_wrapper *writers, size_t len); +int console_loop_io_copy(int sync_fd, const int *srcfds, struct io_write_wrapper *writers, + transfer_channel_type *channels, size_t len); int setup_tios(int fd, struct termios *curr_tios); -- 2.25.1