From 3d5ad2160b9fe779433cce497bfa0cb0146bdcb3 Mon Sep 17 00:00:00 2001 From: wujing Date: Sat, 4 Dec 2021 21:20:25 +0800 Subject: [PATCH] Optimize websocket streaming service code Signed-off-by: wujing --- .../cri/cri_container_manager_service_impl.cc | 28 +- src/daemon/entry/cri/request_cache.cc | 123 ++----- src/daemon/entry/cri/request_cache.h | 32 +- .../cri/websocket/service/attach_serve.cc | 102 +++--- .../cri/websocket/service/attach_serve.h | 28 +- .../entry/cri/websocket/service/exec_serve.cc | 158 +++++---- .../entry/cri/websocket/service/exec_serve.h | 27 +- .../service/route_callback_register.cc | 80 +++++ .../service/route_callback_register.h | 61 ++-- .../cri/websocket/service/stream_server.cc | 6 +- .../cri/websocket/service/stream_server.h | 4 +- .../entry/cri/websocket/service/ws_server.cc | 300 +++++++++++------- .../entry/cri/websocket/service/ws_server.h | 174 +++------- 13 files changed, 542 insertions(+), 581 deletions(-) create mode 100644 src/daemon/entry/cri/websocket/service/route_callback_register.cc diff --git a/src/daemon/entry/cri/cri_container_manager_service_impl.cc b/src/daemon/entry/cri/cri_container_manager_service_impl.cc index 2e65ab51..b160ce31 100644 --- a/src/daemon/entry/cri/cri_container_manager_service_impl.cc +++ b/src/daemon/entry/cri/cri_container_manager_service_impl.cc @@ -376,7 +376,8 @@ ContainerManagerServiceImpl::GenerateCreateContainerRequest(const std::string &r hostconfig->cgroup_parent = util_strdup_s(podSandboxConfig.linux().cgroup_parent().c_str()); } - custom_config = GenerateCreateContainerCustomConfig(cname, realPodSandboxID, containerConfig, podSandboxConfig, error); + custom_config = + GenerateCreateContainerCustomConfig(cname, realPodSandboxID, containerConfig, podSandboxConfig, error); if (error.NotEmpty()) { goto cleanup; } @@ -409,11 +410,10 @@ cleanup: return request; } -std::string ContainerManagerServiceImpl::CreateContainer( - const std::string &podSandboxID, - const runtime::v1alpha2::ContainerConfig &containerConfig, - const runtime::v1alpha2::PodSandboxConfig &podSandboxConfig, - Errors &error) +std::string ContainerManagerServiceImpl::CreateContainer(const std::string &podSandboxID, + const runtime::v1alpha2::ContainerConfig &containerConfig, + const runtime::v1alpha2::PodSandboxConfig &podSandboxConfig, + Errors &error) { std::string response_id; std::string podSandboxRuntime; @@ -1305,10 +1305,16 @@ void ContainerManagerServiceImpl::Exec(const runtime::v1alpha2::ExecRequest &req if (ValidateExecRequest(req, error) != 0) { return; } + auto execReq = new (std::nothrow) runtime::v1alpha2::ExecRequest(req); + if (execReq == nullptr) { + error.SetError("out of memory"); + return; + } RequestCache *cache = RequestCache::GetInstance(); - std::string token = cache->InsertExecRequest(req); + std::string token = cache->InsertRequest(req.container_id(), execReq); if (token.empty()) { error.SetError("failed to get a unique token!"); + delete execReq; return; } std::string url = BuildURL("exec", token); @@ -1350,10 +1356,16 @@ void ContainerManagerServiceImpl::Attach(const runtime::v1alpha2::AttachRequest error.SetError("Empty attach response arguments"); return; } + auto attachReq = new (std::nothrow) runtime::v1alpha2::AttachRequest(req); + if (attachReq == nullptr) { + error.SetError("out of memory"); + return; + } RequestCache *cache = RequestCache::GetInstance(); - std::string token = cache->InsertAttachRequest(req); + std::string token = cache->InsertRequest(req.container_id(), attachReq); if (token.empty()) { error.SetError("failed to get a unique token!"); + delete attachReq; return; } std::string url = BuildURL("attach", token); diff --git a/src/daemon/entry/cri/request_cache.cc b/src/daemon/entry/cri/request_cache.cc index 4ff284ab..312a8071 100644 --- a/src/daemon/entry/cri/request_cache.cc +++ b/src/daemon/entry/cri/request_cache.cc @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright (c) Huawei Technologies Co., Ltd. 2017-2019. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. * iSulad licensed under the Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: @@ -8,8 +8,8 @@ * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR * PURPOSE. * See the Mulan PSL v2 for more details. - * Author: tanyifeng - * Create: 2017-11-22 + * Author: wujing + * Create: 2019-01-02 * Description: provide request cache function definition *********************************************************************************/ #include "request_cache.h" @@ -19,15 +19,25 @@ #include #include #include -#include "isula_libutils/log.h" +#include #include "utils.h" #include "utils_base64.h" std::atomic RequestCache::m_instance; std::mutex RequestCache::m_mutex; + +void CacheEntry::SetValue(const std::string &t, const std::string &id, ::google::protobuf::Message *request, + std::chrono::system_clock::time_point et) +{ + token = t; + containerID = id; + req = request; + expireTime = et; +} + RequestCache *RequestCache::GetInstance() noexcept { - RequestCache *cache = m_instance.load(std::memory_order_relaxed); + auto *cache = m_instance.load(std::memory_order_relaxed); std::atomic_thread_fence(std::memory_order_acquire); if (cache == nullptr) { std::lock_guard lock(m_mutex); @@ -41,25 +51,7 @@ RequestCache *RequestCache::GetInstance() noexcept return cache; } -std::string RequestCache::InsertExecRequest(const runtime::v1alpha2::ExecRequest &req) -{ - std::lock_guard lock(m_mutex); - // Remove expired entries. - GarbageCollection(); - // If the cache is full, reject the request. - if (m_ll.size() == MaxInFlight) { - ERROR("too many cache in flight!"); - return ""; - } - auto token = UniqueToken(); - CacheEntry tmp; - tmp.SetValue(token, &req, nullptr, std::chrono::system_clock::now() + std::chrono::minutes(1)); - m_ll.push_front(tmp); - m_tokens.insert(std::make_pair(token, tmp)); - return token; -} - -std::string RequestCache::InsertAttachRequest(const runtime::v1alpha2::AttachRequest &req) +std::string RequestCache::InsertRequest(const std::string &containerID, ::google::protobuf::Message *req) { std::lock_guard lock(m_mutex); // Remove expired entries. @@ -71,7 +63,7 @@ std::string RequestCache::InsertAttachRequest(const runtime::v1alpha2::AttachReq } auto token = UniqueToken(); CacheEntry tmp; - tmp.SetValue(token, nullptr, &req, std::chrono::system_clock::now() + std::chrono::minutes(1)); + tmp.SetValue(token, containerID, req, std::chrono::system_clock::now() + std::chrono::minutes(1)); m_ll.push_front(tmp); m_tokens.insert(std::make_pair(token, tmp)); return token; @@ -81,10 +73,14 @@ void RequestCache::GarbageCollection() { auto now = std::chrono::system_clock::now(); while (!m_ll.empty()) { - CacheEntry oldest = m_ll.back(); + auto oldest = m_ll.back(); if (now < oldest.expireTime) { return; } + if (oldest.req != nullptr) { + delete oldest.req; + oldest.req = nullptr; + } m_ll.pop_back(); m_tokens.erase(oldest.token); } @@ -103,15 +99,15 @@ std::string RequestCache::UniqueToken() continue; } - char *b64_encode_buf = nullptr; - if (util_base64_encode((unsigned char *)rawToken, strlen(rawToken), &b64_encode_buf) < 0) { + char *b64EncodeBuf = nullptr; + if (util_base64_encode((unsigned char *)rawToken, strlen(rawToken), &b64EncodeBuf) < 0) { ERROR("Encode raw token to base64 failed"); continue; } - std::string token(b64_encode_buf); - free(b64_encode_buf); - b64_encode_buf = nullptr; + std::string token(b64EncodeBuf); + free(b64EncodeBuf); + b64EncodeBuf = nullptr; if (token.length() != TokenLen) { continue; } @@ -133,37 +129,13 @@ bool RequestCache::IsValidToken(const std::string &token) } // Consume the token (remove it from the cache) and return the cached request, if found. -runtime::v1alpha2::ExecRequest RequestCache::ConsumeExecRequest(const std::string &token) -{ - std::lock_guard lock(m_mutex); - - if (m_tokens.count(token) == 0 || m_tokens[token].execRequest.size() == 0) { - ERROR("Invalid token"); - return runtime::v1alpha2::ExecRequest(); - } - - CacheEntry ele = m_tokens[token]; - for (auto it = m_ll.begin(); it != m_ll.end(); it++) { - if (it->token == token) { - m_ll.erase(it); - break; - } - } - m_tokens.erase(token); - if (std::chrono::system_clock::now() > ele.expireTime) { - return runtime::v1alpha2::ExecRequest(); - } - - return ele.execRequest.at(0); -} - -runtime::v1alpha2::AttachRequest RequestCache::ConsumeAttachRequest(const std::string &token) +::google::protobuf::Message *RequestCache::ConsumeRequest(const std::string &token) { std::lock_guard lock(m_mutex); - if (m_tokens.count(token) == 0 || m_tokens[token].attachRequest.size() == 0) { + if (m_tokens.count(token) == 0) { ERROR("Invalid token"); - return runtime::v1alpha2::AttachRequest(); + return nullptr; } CacheEntry ele = m_tokens[token]; @@ -175,45 +147,20 @@ runtime::v1alpha2::AttachRequest RequestCache::ConsumeAttachRequest(const std::s } m_tokens.erase(token); if (std::chrono::system_clock::now() > ele.expireTime) { - return runtime::v1alpha2::AttachRequest(); - } - - return ele.attachRequest.at(0); -} - -std::string RequestCache::GetExecContainerIDByToken(const std::string &token) -{ - std::lock_guard lock(m_mutex); - - if (m_tokens.count(token) == 0 || m_tokens[token].execRequest.size() == 0) { - ERROR("Invalid token"); - return ""; + return nullptr; } - return m_tokens[token].execRequest.at(0).container_id(); + return ele.req; } -std::string RequestCache::GetAttachContainerIDByToken(const std::string &token) +std::string RequestCache::GetContainerIDByToken(const std::string &token) { std::lock_guard lock(m_mutex); - if (m_tokens.count(token) == 0 || m_tokens[token].attachRequest.size() == 0) { + if (m_tokens.count(token) == 0) { ERROR("Invalid token"); return ""; } - return m_tokens[token].attachRequest.at(0).container_id(); -} - -std::string RequestCache::GetContainerIDByToken(const std::string &method, const std::string &token) -{ - if (method == "exec") { - return GetExecContainerIDByToken(token); - } else if (method == "attach") { - return GetAttachContainerIDByToken(token); - } - - ERROR("Invalid method: %s", method.c_str()); - - return ""; + return m_tokens[token].containerID; } diff --git a/src/daemon/entry/cri/request_cache.h b/src/daemon/entry/cri/request_cache.h index d44b4d78..90ae20e8 100644 --- a/src/daemon/entry/cri/request_cache.h +++ b/src/daemon/entry/cri/request_cache.h @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. * iSulad licensed under the Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: @@ -23,44 +23,28 @@ #include #include #include -#include "api.pb.h" struct CacheEntry { std::string token; - std::vector execRequest; - std::vector attachRequest; + std::string containerID; + ::google::protobuf::Message *req; std::chrono::system_clock::time_point expireTime; - void SetValue(const std::string &t, - const runtime::v1alpha2::ExecRequest *execReq, - const runtime::v1alpha2::AttachRequest *attachReq, - std::chrono::system_clock::time_point et) - { - token = t; - if (execReq != nullptr) { - execRequest.push_back(*execReq); - } else if (attachReq != nullptr) { - attachRequest.push_back(*attachReq); - } - expireTime = et; - } + void SetValue(const std::string &t, const std::string &id, ::google::protobuf::Message *request, + std::chrono::system_clock::time_point et); }; class RequestCache { public: static RequestCache *GetInstance() noexcept; - std::string InsertExecRequest(const runtime::v1alpha2::ExecRequest &req); - std::string InsertAttachRequest(const runtime::v1alpha2::AttachRequest &req); - runtime::v1alpha2::ExecRequest ConsumeExecRequest(const std::string &token); - runtime::v1alpha2::AttachRequest ConsumeAttachRequest(const std::string &token); - std::string GetContainerIDByToken(const std::string &method, const std::string &token); + std::string InsertRequest(const std::string &containerID, ::google::protobuf::Message *req); + ::google::protobuf::Message *ConsumeRequest(const std::string &token); + std::string GetContainerIDByToken(const std::string &token); bool IsValidToken(const std::string &token); private: void GarbageCollection(); std::string UniqueToken(); - std::string GetExecContainerIDByToken(const std::string &token); - std::string GetAttachContainerIDByToken(const std::string &token); private: RequestCache() = default; diff --git a/src/daemon/entry/cri/websocket/service/attach_serve.cc b/src/daemon/entry/cri/websocket/service/attach_serve.cc index cda63c45..abe23f51 100644 --- a/src/daemon/entry/cri/websocket/service/attach_serve.cc +++ b/src/daemon/entry/cri/websocket/service/attach_serve.cc @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2018-2021. All rights reserved. * iSulad licensed under the Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: @@ -8,86 +8,78 @@ * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR * PURPOSE. * See the Mulan PSL v2 for more details. - * Author: lifeng + * Author: wujing * Create: 2018-11-08 * Description: provide container attach functions ******************************************************************************/ #include "attach_serve.h" +#include "api.pb.h" +#include "ws_server.h" +#include "isula_libutils/log.h" +#include "callback.h" #include "utils.h" -int AttachServe::Execute(session_data *lws_ctx, const std::string &token) +AttachServe::~AttachServe() { - if (lws_ctx == nullptr) { - return -1; - } + free_container_attach_request(m_request); + free_container_attach_response(m_response); +} +void AttachServe::SetServeThreadName() +{ prctl(PR_SET_NAME, "AttachServe"); +} - service_executor_t *cb = get_service_executor(); - if (cb == nullptr || cb->container.attach == nullptr) { - sem_post(lws_ctx->sync_close_sem); +int AttachServe::SetContainerStreamRequest(::google::protobuf::Message *request, const std::string &suffix) +{ + auto *grequest = dynamic_cast(request); + + m_request = static_cast(util_common_calloc_s(sizeof(container_attach_request))); + if (m_request == nullptr) { + ERROR("Out of memory"); return -1; } - container_attach_request *container_req = nullptr; - if (GetContainerRequest(token, &container_req) != 0) { - ERROR("Failed to get contaner request"); - sem_post(lws_ctx->sync_close_sem); + if (!grequest->container_id().empty()) { + m_request->container_id = util_strdup_s(grequest->container_id().c_str()); + } + m_request->attach_stdin = grequest->stdin(); + m_request->attach_stdout = grequest->stdout(); + m_request->attach_stderr = grequest->stderr(); + + return 0; +} + +int AttachServe::ExecuteStreamCommand(SessionData *lwsCtx) +{ + auto *cb = get_service_executor(); + if (cb == nullptr || cb->container.attach == nullptr) { + ERROR("Failed to get attach service executor"); + sem_post(lwsCtx->syncCloseSem); return -1; } struct io_write_wrapper stringWriter = { 0 }; - stringWriter.context = (void *)(lws_ctx); + stringWriter.context = (void *)(lwsCtx); stringWriter.write_func = WsWriteStdoutToClient; stringWriter.close_func = closeWsConnect; - container_req->attach_stderr = false; - - container_attach_response *container_res = nullptr; - int ret = cb->container.attach(container_req, &container_res, container_req->attach_stdin ? lws_ctx->pipes.at(0) : -1, - container_req->attach_stdout ? &stringWriter : nullptr, nullptr); - if (ret != 0) { - ERROR("Failed to attach container: %s", container_req->container_id); - sem_post(lws_ctx->sync_close_sem); - } - - free_container_attach_request(container_req); - free_container_attach_response(container_res); + m_request->attach_stderr = false; - return ret; + return cb->container.attach(m_request, &m_response, m_request->attach_stdin ? lwsCtx->pipes.at(0) : -1, + m_request->attach_stdout ? &stringWriter : nullptr, nullptr); } -int AttachServe::GetContainerRequest(const std::string &token, container_attach_request **container_req) +void AttachServe::ErrorHandler(int ret, SessionData *lwsCtx) { - RequestCache *cache = RequestCache::GetInstance(); - auto request = cache->ConsumeAttachRequest(token); - - int ret = RequestFromCri(request, container_req); - if (ret != 0) { - ERROR("Failed to transform grpc request!"); + if (ret == 0) { + return; } - - return ret; + ERROR("Failed to attach container: %s", m_request->container_id); + sem_post(lwsCtx->syncCloseSem); } -int AttachServe::RequestFromCri(const runtime::v1alpha2::AttachRequest &grequest, container_attach_request **request) +void AttachServe::CloseConnect(SessionData *lwsCtx) { - container_attach_request *tmpreq = nullptr; - - tmpreq = (container_attach_request *)util_common_calloc_s(sizeof(container_attach_request)); - if (tmpreq == nullptr) { - ERROR("Out of memory"); - return -1; - } - - if (!grequest.container_id().empty()) { - tmpreq->container_id = util_strdup_s(grequest.container_id().c_str()); - } - tmpreq->attach_stdin = grequest.stdin(); - tmpreq->attach_stdout = grequest.stdout(); - tmpreq->attach_stderr = grequest.stderr(); - - *request = tmpreq; - - return 0; + (void)lwsCtx; } diff --git a/src/daemon/entry/cri/websocket/service/attach_serve.h b/src/daemon/entry/cri/websocket/service/attach_serve.h index f7b8a017..38e75e29 100644 --- a/src/daemon/entry/cri/websocket/service/attach_serve.h +++ b/src/daemon/entry/cri/websocket/service/attach_serve.h @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. * iSulad licensed under the Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: @@ -17,27 +17,27 @@ #define DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_ATTACH_SERVE_H #include "route_callback_register.h" -#include #include -#include -#include "ws_server.h" - -#include "api.pb.h" -#include "isula_libutils/log.h" -#include "callback.h" -#include "request_cache.h" +#include "isula_libutils/container_attach_request.h" +#include "isula_libutils/container_attach_response.h" class AttachServe : public StreamingServeInterface { public: AttachServe() = default; AttachServe(const AttachServe &) = delete; AttachServe &operator=(const AttachServe &) = delete; - virtual ~AttachServe() = default; - int Execute(session_data *lws_ctx, const std::string &token) override; + virtual ~AttachServe(); + +private: + virtual void SetServeThreadName() override; + virtual int SetContainerStreamRequest(::google::protobuf::Message *grequest, const std::string &suffix) override; + virtual int ExecuteStreamCommand(SessionData *lwsCtx) override; + virtual void ErrorHandler(int ret, SessionData *lwsCtx) override; + virtual void CloseConnect(SessionData *lwsCtx) override; + private: - int RequestFromCri(const runtime::v1alpha2::AttachRequest &grequest, - container_attach_request **request); - int GetContainerRequest(const std::string &token, container_attach_request **container_req); + container_attach_request *m_request { nullptr }; + container_attach_response *m_response { nullptr }; }; #endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_ATTACH_SERVE_H diff --git a/src/daemon/entry/cri/websocket/service/exec_serve.cc b/src/daemon/entry/cri/websocket/service/exec_serve.cc index 26b552de..b7709c48 100644 --- a/src/daemon/entry/cri/websocket/service/exec_serve.cc +++ b/src/daemon/entry/cri/websocket/service/exec_serve.cc @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2018-2021. All rights reserved. * iSulad licensed under the Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: @@ -8,127 +8,111 @@ * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR * PURPOSE. * See the Mulan PSL v2 for more details. - * Author: lifeng + * Author: wujing * Create: 2018-11-08 * Description: provide ExecServe functions ******************************************************************************/ #include "exec_serve.h" +#include #include "io_wrapper.h" +#include "ws_server.h" #include "utils.h" #include "cri_helpers.h" -int ExecServe::Execute(session_data *lws_ctx, const std::string &token) +ExecServe::~ExecServe() { - if (lws_ctx == nullptr) { - return -1; - } + free_container_exec_request(m_request); + free_container_exec_response(m_response); +} +void ExecServe::SetServeThreadName() +{ prctl(PR_SET_NAME, "ExecServe"); +} - service_executor_t *cb = get_service_executor(); - if (cb == nullptr || cb->container.exec == nullptr) { - sem_post(lws_ctx->sync_close_sem); +int ExecServe::SetContainerStreamRequest(::google::protobuf::Message *request, const std::string &suffix) +{ + auto *grequest = dynamic_cast(request); + + m_request = static_cast(util_common_calloc_s(sizeof(container_exec_request))); + if (m_request == nullptr) { + ERROR("Out of memory"); return -1; } - container_exec_request *container_req = nullptr; - if (GetContainerRequest(token, lws_ctx->suffix, &container_req) != 0) { - ERROR("Failed to get contaner request"); - sem_post(lws_ctx->sync_close_sem); + m_request->tty = grequest->tty(); + m_request->attach_stdin = grequest->stdin(); + m_request->attach_stdout = grequest->stdout(); + m_request->attach_stderr = grequest->stderr(); + + if (!grequest->container_id().empty()) { + m_request->container_id = util_strdup_s(grequest->container_id().c_str()); + } + + if (grequest->cmd_size() > 0) { + if (static_cast(grequest->cmd_size()) > SIZE_MAX / sizeof(char *)) { + ERROR("Too many arguments!"); + return -1; + } + m_request->argv = (char **)util_common_calloc_s(sizeof(char *) * grequest->cmd_size()); + if (m_request->argv == nullptr) { + ERROR("Out of memory!"); + return -1; + } + for (int i = 0; i < grequest->cmd_size(); i++) { + m_request->argv[i] = util_strdup_s(grequest->cmd(i).c_str()); + } + m_request->argv_len = static_cast(grequest->cmd_size()); + } + + m_request->suffix = util_strdup_s(suffix.c_str()); + + return 0; +} + +int ExecServe::ExecuteStreamCommand(SessionData *lwsCtx) +{ + auto *cb = get_service_executor(); + if (cb == nullptr || cb->container.exec == nullptr) { + ERROR("Failed to get exec service executor"); + sem_post(lwsCtx->syncCloseSem); return -1; } struct io_write_wrapper StdoutstringWriter = { 0 }; - StdoutstringWriter.context = (void *)lws_ctx; + StdoutstringWriter.context = (void *)lwsCtx; StdoutstringWriter.write_func = WsWriteStdoutToClient; // the close function of StderrstringWriter is preferred unless StderrstringWriter is nullptr StdoutstringWriter.close_func = nullptr; struct io_write_wrapper StderrstringWriter = { 0 }; - StderrstringWriter.context = (void *)lws_ctx; + StderrstringWriter.context = (void *)lwsCtx; StderrstringWriter.write_func = WsWriteStderrToClient; StderrstringWriter.close_func = nullptr; - container_exec_response *container_res = nullptr; - int ret = cb->container.exec(container_req, &container_res, container_req->attach_stdin ? lws_ctx->pipes.at(0) : -1, - container_req->attach_stdout ? &StdoutstringWriter : nullptr, - container_req->attach_stderr ? &StderrstringWriter : nullptr); + return cb->container.exec(m_request, &m_response, m_request->attach_stdin ? lwsCtx->pipes.at(0) : -1, + m_request->attach_stdout ? &StdoutstringWriter : nullptr, + m_request->attach_stderr ? &StderrstringWriter : nullptr); +} + +void ExecServe::ErrorHandler(int ret, SessionData *lwsCtx) +{ if (ret != 0) { std::string message; - if (container_res != nullptr && container_res->errmsg != nullptr) { - message = container_res->errmsg; + if (m_response != nullptr && m_response->errmsg != nullptr) { + message = m_response->errmsg; } else { message = "Failed to call exec container callback. "; } - WsWriteStdoutToClient(lws_ctx, message.c_str(), message.length()); + WsWriteStdoutToClient(lwsCtx, message.c_str(), message.length()); } - if (container_res != nullptr && container_res->exit_code != 0) { - std::string exit_info = "Exit code :" + std::to_string((int)container_res->exit_code) + "\n"; - WsWriteStdoutToClient(lws_ctx, exit_info.c_str(), exit_info.length()); + if (m_response != nullptr && m_response->exit_code != 0) { + std::string exit_info = "Exit code :" + std::to_string((int)m_response->exit_code) + "\n"; + WsWriteStdoutToClient(lwsCtx, exit_info.c_str(), exit_info.length()); } - - free_container_exec_request(container_req); - free_container_exec_response(container_res); - - closeWsConnect((void*)lws_ctx, nullptr); - - return ret; } -int ExecServe::GetContainerRequest(const std::string &token, const std::string &suffix, - container_exec_request **container_req) +void ExecServe::CloseConnect(SessionData *lwsCtx) { - RequestCache *cache = RequestCache::GetInstance(); - auto request = cache->ConsumeExecRequest(token); - - int ret = RequestFromCri(request, suffix, container_req); - if (ret != 0) { - ERROR("Failed to transform grpc request!"); - } - - return ret; -} - -int ExecServe::RequestFromCri(const runtime::v1alpha2::ExecRequest &grequest, const std::string &suffix, - container_exec_request **request) -{ - container_exec_request *tmpreq = nullptr; - - tmpreq = (container_exec_request *)util_common_calloc_s(sizeof(container_exec_request)); - if (tmpreq == nullptr) { - ERROR("Out of memory"); - return -1; - } - - tmpreq->tty = grequest.tty(); - tmpreq->attach_stdin = grequest.stdin(); - tmpreq->attach_stdout = grequest.stdout(); - tmpreq->attach_stderr = grequest.stderr(); - - if (!grequest.container_id().empty()) { - tmpreq->container_id = util_strdup_s(grequest.container_id().c_str()); - } - - if (grequest.cmd_size() > 0) { - if ((size_t)grequest.cmd_size() > SIZE_MAX / sizeof(char *)) { - ERROR("Too many arguments!"); - free_container_exec_request(tmpreq); - return -1; - } - tmpreq->argv = (char **)util_common_calloc_s(sizeof(char *) * grequest.cmd_size()); - if (tmpreq->argv == nullptr) { - ERROR("Out of memory!"); - free_container_exec_request(tmpreq); - return -1; - } - for (int i = 0; i < grequest.cmd_size(); i++) { - tmpreq->argv[i] = util_strdup_s(grequest.cmd(i).c_str()); - } - tmpreq->argv_len = (size_t)grequest.cmd_size(); - } - - tmpreq->suffix = util_strdup_s(suffix.c_str()); - - *request = tmpreq; - return 0; + closeWsConnect((void*)lwsCtx, nullptr); } diff --git a/src/daemon/entry/cri/websocket/service/exec_serve.h b/src/daemon/entry/cri/websocket/service/exec_serve.h index 5cccdee8..3afb2abb 100644 --- a/src/daemon/entry/cri/websocket/service/exec_serve.h +++ b/src/daemon/entry/cri/websocket/service/exec_serve.h @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. * iSulad licensed under the Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: @@ -20,28 +20,27 @@ #include #include #include -#include -#include "api.grpc.pb.h" -#include "container.grpc.pb.h" #include "route_callback_register.h" -#include "isula_libutils/log.h" -#include "callback.h" -#include "ws_server.h" -#include "request_cache.h" -#include "api.pb.h" +#include "isula_libutils/container_exec_request.h" +#include "isula_libutils/container_exec_response.h" class ExecServe : public StreamingServeInterface { public: ExecServe() = default; ExecServe(const ExecServe &) = delete; ExecServe &operator=(const ExecServe &) = delete; - virtual ~ExecServe() = default; - int Execute(session_data *lws_ctx, const std::string &token) override; + virtual ~ExecServe(); private: - int RequestFromCri(const runtime::v1alpha2::ExecRequest &grequest, const std::string &suffix, - container_exec_request **request); - int GetContainerRequest(const std::string &token, const std::string &suffix, container_exec_request **request); + virtual void SetServeThreadName() override; + virtual int SetContainerStreamRequest(::google::protobuf::Message *grequest, const std::string &suffix) override; + virtual int ExecuteStreamCommand(SessionData *lwsCtx) override; + virtual void ErrorHandler(int ret, SessionData *lwsCtx) override; + virtual void CloseConnect(SessionData *lwsCtx) override; + +private: + container_exec_request *m_request { nullptr }; + container_exec_response *m_response { nullptr }; }; #endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_EXEC_SERVE_H diff --git a/src/daemon/entry/cri/websocket/service/route_callback_register.cc b/src/daemon/entry/cri/websocket/service/route_callback_register.cc new file mode 100644 index 00000000..fb14381f --- /dev/null +++ b/src/daemon/entry/cri/websocket/service/route_callback_register.cc @@ -0,0 +1,80 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved. + * iSulad licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Description: Streaming service function registration. + * Author: wujing + * Create: 2021-11-04 + ******************************************************************************/ +#include "route_callback_register.h" +#include +#include "ws_server.h" + +int StreamingServeInterface::Execute(SessionData *lwsCtx, const std::string &token) +{ + if (lwsCtx == nullptr) { + return -1; + } + + SetServeThreadName(); + + auto *cache = RequestCache::GetInstance(); + auto request = cache->ConsumeRequest(token); + if (request == nullptr) { + ERROR("Failed to get cache request!"); + sem_post(lwsCtx->syncCloseSem); + return -1; + } + + if (SetContainerStreamRequest(request, lwsCtx->suffix) != 0) { + ERROR("Failed to set container request"); + sem_post(lwsCtx->syncCloseSem); + return -1; + } + + // request is stored on the heap in the cache and needs to be released after use + delete request; + request = nullptr; + + int ret = ExecuteStreamCommand(lwsCtx); + + ErrorHandler(ret, lwsCtx); + + CloseConnect(lwsCtx); + + return ret; +} + +bool RouteCallbackRegister::IsValidMethod(const std::string &method) +{ + return static_cast(m_registeredcallbacks.count(method)); +} + +int RouteCallbackRegister::HandleCallback(SessionData *lwsCtx, const std::string &method, const std::string &token) +{ + auto it = m_registeredcallbacks.find(method); + if (it != m_registeredcallbacks.end()) { + std::shared_ptr callback = it->second; + if (callback) { + return callback->Execute(lwsCtx, token); + } + } + ERROR("invalid method!"); + return -1; +} + +void RouteCallbackRegister::RegisterCallback(const std::string &path, std::shared_ptr callback) +{ + m_registeredcallbacks.insert(std::pair>(path, callback)); +} + +int StreamTask::Run() +{ + return m_invoker->HandleCallback(m_lwsCtx, m_method, m_token); +} \ No newline at end of file diff --git a/src/daemon/entry/cri/websocket/service/route_callback_register.h b/src/daemon/entry/cri/websocket/service/route_callback_register.h index 909c552b..da75fc5b 100644 --- a/src/daemon/entry/cri/websocket/service/route_callback_register.h +++ b/src/daemon/entry/cri/websocket/service/route_callback_register.h @@ -22,9 +22,9 @@ #include #include #include -#include "isula_libutils/log.h" +#include "request_cache.h" -struct session_data; +struct SessionData; class StreamingServeInterface { public: @@ -32,7 +32,14 @@ public: StreamingServeInterface(const StreamingServeInterface &) = delete; StreamingServeInterface &operator=(const StreamingServeInterface &) = delete; virtual ~StreamingServeInterface() = default; - virtual int Execute(session_data *lws_ctx, const std::string &token) = 0; + int Execute(SessionData *lwsCtx, const std::string &token); + +protected: + virtual void SetServeThreadName() = 0; + virtual int SetContainerStreamRequest(::google::protobuf::Message *grequest, const std::string &suffix) = 0; + virtual int ExecuteStreamCommand(SessionData *lwsCtx) = 0; + virtual void ErrorHandler(int ret, SessionData *lwsCtx) = 0; + virtual void CloseConnect(SessionData *lwsCtx) = 0; }; class RouteCallbackRegister { @@ -41,30 +48,10 @@ public: RouteCallbackRegister(const RouteCallbackRegister &) = delete; RouteCallbackRegister &operator=(const RouteCallbackRegister &) = delete; virtual ~RouteCallbackRegister() = default; - bool IsValidMethod(const std::string &method) - { - return static_cast(m_registeredcallbacks.count(method)); - } - int HandleCallback(session_data *lws_ctx, const std::string &method, - const std::string &token) - { - auto it = m_registeredcallbacks.find(method); - if (it != m_registeredcallbacks.end()) { - std::shared_ptr callback = it->second; - if (callback) { - return callback->Execute(lws_ctx, token); - } - } - ERROR("invalid method!"); - return -1; - } - void RegisterCallback(const std::string &path, - std::shared_ptr callback) - { - m_registeredcallbacks.insert(std::pair>(path, callback)); - } + bool IsValidMethod(const std::string &method); + int HandleCallback(SessionData *lwsCtx, const std::string &method, const std::string &token); + void RegisterCallback(const std::string &path, std::shared_ptr callback); private: std::map> m_registeredcallbacks; @@ -72,24 +59,24 @@ private: class StreamTask { public: - StreamTask(RouteCallbackRegister *invoker, session_data *lws_ctx, - const std::string &method, + StreamTask(RouteCallbackRegister *invoker, SessionData *lwsCtx, const std::string &method, const std::string &token) - : m_invoker(invoker), m_lws_ctx(lws_ctx), m_method(method), m_token(token) {} + : m_invoker(invoker) + , m_lwsCtx(lwsCtx) + , m_method(method) + , m_token(token) + { + } StreamTask(const StreamTask &) = delete; StreamTask &operator=(const StreamTask &) = delete; virtual ~StreamTask() = default; - int Run() - { - return m_invoker->HandleCallback(m_lws_ctx, m_method, m_token); - } + int Run(); + private: - RouteCallbackRegister *m_invoker{ nullptr }; - session_data *m_lws_ctx; + RouteCallbackRegister *m_invoker { nullptr }; + SessionData *m_lwsCtx; std::string m_method; std::string m_token; }; #endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_ROUTE_CALLBACK_REGISTER_H - - diff --git a/src/daemon/entry/cri/websocket/service/stream_server.cc b/src/daemon/entry/cri/websocket/service/stream_server.cc index b4df642f..4342e006 100644 --- a/src/daemon/entry/cri/websocket/service/stream_server.cc +++ b/src/daemon/entry/cri/websocket/service/stream_server.cc @@ -22,7 +22,7 @@ void websocket_server_init(Errors &err) { - WebsocketServer *server = WebsocketServer::GetInstance(); + auto *server = WebsocketServer::GetInstance(); server->RegisterCallback(std::string("exec"), std::make_shared()); server->RegisterCallback(std::string("attach"), std::make_shared()); server->Start(err); @@ -30,13 +30,13 @@ void websocket_server_init(Errors &err) void websocket_server_wait(void) { - WebsocketServer *server = WebsocketServer::GetInstance(); + auto *server = WebsocketServer::GetInstance(); server->Wait(); } void websocket_server_shutdown(void) { - WebsocketServer *server = WebsocketServer::GetInstance(); + auto *server = WebsocketServer::GetInstance(); server->Shutdown(); } diff --git a/src/daemon/entry/cri/websocket/service/stream_server.h b/src/daemon/entry/cri/websocket/service/stream_server.h index 43e42b83..ba6b3672 100644 --- a/src/daemon/entry/cri/websocket/service/stream_server.h +++ b/src/daemon/entry/cri/websocket/service/stream_server.h @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2018-2021. All rights reserved. * iSulad licensed under the Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: @@ -8,7 +8,7 @@ * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR * PURPOSE. * See the Mulan PSL v2 for more details. - * Author: lifeng + * Author: wujing * Create: 2018-11-08 * Description: provide websocket stream service definition ******************************************************************************/ diff --git a/src/daemon/entry/cri/websocket/service/ws_server.cc b/src/daemon/entry/cri/websocket/service/ws_server.cc index e4b3a1b4..0e462737 100644 --- a/src/daemon/entry/cri/websocket/service/ws_server.cc +++ b/src/daemon/entry/cri/websocket/service/ws_server.cc @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. * iSulad licensed under the Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: @@ -8,8 +8,8 @@ * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR * PURPOSE. * See the Mulan PSL v2 for more details. - * Author: lifeng - * Create: 2018-11-08 + * Author: wujing + * Create: 2019-01-02 * Description: provide websocket server functions ******************************************************************************/ @@ -19,34 +19,132 @@ #include #include #include +#include #include "cxxutils.h" -#include "isula_libutils/log.h" #include "utils.h" #include "request_cache.h" #include "constants.h" #include "isulad_config.h" #include "callback.h" #include "cri_helpers.h" +#include "isula_libutils/cri_terminal_size.h" struct lws_context *WebsocketServer::m_context = nullptr; std::atomic WebsocketServer::m_instance; RWMutex WebsocketServer::m_mutex; -std::unordered_map WebsocketServer::m_wsis; +std::unordered_map WebsocketServer::m_wsis; + +namespace { +const int MAX_BUF_LEN = 256; +const int MAX_HTTP_HEADER_POOL = 8; +// io copy maximum single transfer 4K, let max total buffer size: 1GB +const int FIFO_LIST_BUFFER_MAX_LEN = 262144; +const int SESSION_CAPABILITY = 300; +const int MAX_SESSION_NUM = 128; +}; // namespace + +enum WebsocketChannel { STDINCHANNEL = 0, STDOUTCHANNEL, STDERRCHANNEL, ERRORCHANNEL, RESIZECHANNEL }; + +unsigned char *SessionData::FrontMessage() +{ + unsigned char *message = nullptr; + + if (sessionMutex == nullptr) { + return nullptr; + } + + sessionMutex->lock(); + message = buffer.front(); + sessionMutex->unlock(); + + return message; +} + +void SessionData::PopMessage() +{ + if (sessionMutex == nullptr) { + return; + } + + sessionMutex->lock(); + buffer.pop_front(); + sessionMutex->unlock(); +} + +int SessionData::PushMessage(unsigned char *message) +{ + if (sessionMutex == nullptr) { + return -1; + } + + sessionMutex->lock(); + + // In extreme scenarios, websocket data cannot be processed, + // ignore the data coming in later to prevent iSulad from getting stuck + if (close || buffer.size() >= FIFO_LIST_BUFFER_MAX_LEN) { + free(message); + sessionMutex->unlock(); + return -1; + } + + buffer.push_back(message); + sessionMutex->unlock(); + return 0; +} + +bool SessionData::IsClosed() +{ + bool c = false; + + if (sessionMutex == nullptr) { + return true; + } + + sessionMutex->lock(); + c = close; + sessionMutex->unlock(); + + return c; +} + +void SessionData::CloseSession() +{ + if (sessionMutex == nullptr) { + return; + } + + sessionMutex->lock(); + close = true; + sessionMutex->unlock(); +} + +void SessionData::EraseAllMessage() +{ + if (sessionMutex == nullptr) { + return; + } + + sessionMutex->lock(); + for (auto iter = buffer.begin(); iter != buffer.end();) { + free(*iter); + *iter = NULL; + iter = buffer.erase(iter); + } + sessionMutex->unlock(); +} WebsocketServer *WebsocketServer::GetInstance() noexcept { static std::once_flag flag; - std::call_once(flag, [] { - m_instance = new WebsocketServer; - }); + std::call_once(flag, [] { m_instance = new WebsocketServer; }); return m_instance; } WebsocketServer::WebsocketServer() { - m_force_exit = 0; + m_forceExit = 0; m_wsis.reserve(SESSION_CAPABILITY); } @@ -60,19 +158,9 @@ url::URLDatum WebsocketServer::GetWebsocketUrl() return m_url; } -void WebsocketServer::ReadLockAllWsSession() -{ - m_mutex.rdlock(); -} - -void WebsocketServer::UnlockAllWsSession() -{ - m_mutex.unlock(); -} - void WebsocketServer::Shutdown() { - m_force_exit = 1; + m_forceExit = 1; lws_cancel_service(m_context); } @@ -99,15 +187,12 @@ void WebsocketServer::EmitLog(int level, const char *line) int WebsocketServer::CreateContext() { - int limited; - struct lws_context_creation_info info; - struct rlimit oldLimit, newLimit; - const size_t WS_ULIMIT_FDS = 1024; + const size_t WS_ULIMIT_FDS { 1024 }; m_url.SetScheme("ws"); m_url.SetHost("localhost:" + std::to_string(m_listenPort)); - (void)memset(&info, 0, sizeof(info)); + lws_context_creation_info info { 0x00 }; lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_DEBUG, WebsocketServer::EmitLog); info.port = m_listenPort; @@ -125,9 +210,10 @@ int WebsocketServer::CreateContext() * belowing lws_create_context limit the fds of websocket to RLIMIT_NOFILE, * and malloced memory according to it. To reduce memory, we recover it to 1024 before create m_context. */ + rlimit oldLimit, newLimit; newLimit.rlim_cur = WS_ULIMIT_FDS; newLimit.rlim_max = WS_ULIMIT_FDS; - limited = prlimit(0, RLIMIT_NOFILE, &newLimit, &oldLimit); + int limited = prlimit(0, RLIMIT_NOFILE, &newLimit, &oldLimit); if (limited != 0) { WARN("Can not set ulimit of RLIMIT_NOFILE: %s", strerror(errno)); } @@ -145,8 +231,7 @@ int WebsocketServer::CreateContext() return 0; } -void WebsocketServer::RegisterCallback(const std::string &path, - std::shared_ptr callback) +void WebsocketServer::RegisterCallback(const std::string &path, std::shared_ptr callback) { m_handler.RegisterCallback(path, callback); } @@ -158,8 +243,8 @@ void WebsocketServer::CloseAllWsSession() it->second->EraseAllMessage(); close(it->second->pipes.at(0)); close(it->second->pipes.at(1)); - (void)sem_destroy(it->second->sync_close_sem); - delete it->second->session_mutex; + (void)sem_destroy(it->second->syncCloseSem); + delete it->second->sessionMutex; delete it->second; } m_wsis.clear(); @@ -189,23 +274,23 @@ void WebsocketServer::CloseWsSession(int socketID) close(session->pipes.at(1)); session->pipes.at(1) = -1; } - (void)sem_wait(session->sync_close_sem); - (void)sem_destroy(session->sync_close_sem); - delete session->sync_close_sem; - session->sync_close_sem = nullptr; + (void)sem_wait(session->syncCloseSem); + (void)sem_destroy(session->syncCloseSem); + delete session->syncCloseSem; + session->syncCloseSem = nullptr; close(session->pipes.at(0)); - delete session->session_mutex; - session->session_mutex = nullptr; + delete session->sessionMutex; + session->sessionMutex = nullptr; delete session; }).detach(); } -int WebsocketServer::GenerateSessionData(session_data *session, const std::string containerID) noexcept +int WebsocketServer::GenerateSessionData(SessionData *session, const std::string containerID) noexcept { char *suffix = nullptr; - int read_pipe_fd[PIPE_FD_NUM] = {-1, -1}; - std::mutex *buf_mutex = nullptr; - sem_t *sync_close_sem = nullptr; + int readPipeFd[2] = { -1, -1 }; + std::mutex *bufMutex = nullptr; + sem_t *syncCloseSem = nullptr; suffix = CRIHelpers::GenerateExecSuffix(); if (suffix == nullptr) { @@ -213,24 +298,24 @@ int WebsocketServer::GenerateSessionData(session_data *session, const std::strin return -1; } - if (InitRWPipe(read_pipe_fd) < 0) { + if (InitRWPipe(readPipeFd) < 0) { ERROR("failed to init read/write pipe!"); goto out; } - buf_mutex = new std::mutex; - sync_close_sem = new sem_t; + bufMutex = new std::mutex; + syncCloseSem = new sem_t; - if (sem_init(sync_close_sem, 0, 0) != 0) { + if (sem_init(syncCloseSem, 0, 0) != 0) { ERROR("Semaphore initialization failed"); goto out; } - session->pipes = std::array { read_pipe_fd[0], read_pipe_fd[1] }; - session->session_mutex = buf_mutex; - session->sync_close_sem = sync_close_sem; + session->pipes = std::array { readPipeFd[0], readPipeFd[1] }; + session->sessionMutex = bufMutex; + session->syncCloseSem = syncCloseSem; session->close = false; - session->container_id = containerID; + session->containerID = containerID; session->suffix = std::string(suffix); free(suffix); @@ -241,17 +326,17 @@ out: if (suffix != nullptr) { free(suffix); } - if (read_pipe_fd[1] >= 0) { - close(read_pipe_fd[1]); + if (readPipeFd[1] >= 0) { + close(readPipeFd[1]); } - if (read_pipe_fd[0] >= 0) { - close(read_pipe_fd[0]); + if (readPipeFd[0] >= 0) { + close(readPipeFd[0]); } - if (buf_mutex != nullptr) { - delete buf_mutex; + if (bufMutex != nullptr) { + delete bufMutex; } - if (sync_close_sem) { - delete sync_close_sem; + if (syncCloseSem) { + delete syncCloseSem; } return -1; @@ -269,10 +354,9 @@ int WebsocketServer::RegisterStreamTask(struct lws *wsi) noexcept buf[sizeof(buf) - 1] = '\0'; // format: "/cri/" + method + "/" + token + "/" + arg(container=cmd) auto vec = CXXUtils::Split(buf + 1, '/'); - RequestCache *cache = RequestCache::GetInstance(); - if (vec.size() < MIN_VEC_SIZE || - !m_handler.IsValidMethod(vec.at(1)) || - !cache->IsValidToken(vec.at(2))) { + auto *cache = RequestCache::GetInstance(); + // buffer contains at least 3 parts: cri, method, token + if (vec.size() < 3 || !m_handler.IsValidMethod(vec.at(1)) || !cache->IsValidToken(vec.at(2))) { ERROR("invalid url(%s): incorrect format!", buf); return -1; } @@ -288,13 +372,13 @@ int WebsocketServer::RegisterStreamTask(struct lws *wsi) noexcept return -1; } - std::string containerID = cache->GetContainerIDByToken(vec.at(1), vec.at(2)); + auto containerID = cache->GetContainerIDByToken(vec.at(2)); if (containerID.empty()) { ERROR("Failed to get container id from %s request", vec.at(1).c_str()); return -1; } - session_data *session = new (std::nothrow) session_data; + auto *session = new (std::nothrow) SessionData; if (session == nullptr) { ERROR("Out of memory"); return -1; @@ -304,7 +388,7 @@ int WebsocketServer::RegisterStreamTask(struct lws *wsi) noexcept return -1; } - std::string suffixID = session->suffix; + auto suffixID = session->suffix; auto insertRet = m_wsis.insert(std::make_pair(socketID, session)); if (!insertRet.second) { ERROR("failed to insert session data to map"); @@ -351,8 +435,8 @@ int WebsocketServer::Wswrite(struct lws *wsi, const unsigned char *message) if (strlen((const char *)(&message[LWS_PRE + 1])) == 0) { return 0; } - int n = lws_write(wsi, (unsigned char *)(&message[LWS_PRE]), - strlen((const char *)(&message[LWS_PRE + 1])) + 1, LWS_WRITE_TEXT); + auto n = lws_write(wsi, (unsigned char *)(&message[LWS_PRE]), strlen((const char *)(&message[LWS_PRE + 1])) + 1, + LWS_WRITE_TEXT); if (n < 0) { ERROR("ERROR %d writing to socket, hanging up", n); return -1; @@ -362,21 +446,18 @@ int WebsocketServer::Wswrite(struct lws *wsi, const unsigned char *message) return 0; } -int WebsocketServer::parseTerminalSize(const char *jsonData, size_t len, uint16_t &width, uint16_t &height) +int WebsocketServer::ParseTerminalSize(const char *jsonData, size_t len, uint16_t &width, uint16_t &height) { - int ret = 0; - parser_error err = nullptr; - cri_terminal_size *terminalSize = nullptr; - if (jsonData == nullptr || len == 0) { return -1; } // No terminator is included in json data, and len contains a character occupied by channal std::string jsonDataStr { jsonData, len - 1 }; - + parser_error err = nullptr; + int ret = 0; // parse json data. eg: {"Width":xx,"Height":xx} - terminalSize = cri_terminal_size_parse_data(jsonDataStr.c_str(), nullptr, &err); + cri_terminal_size *terminalSize = cri_terminal_size_parse_data(jsonDataStr.c_str(), nullptr, &err); if (terminalSize == nullptr) { ERROR("Failed to parse json: %s", err); ret = -1; @@ -391,29 +472,22 @@ int WebsocketServer::parseTerminalSize(const char *jsonData, size_t len, uint16_ return ret; } -int WebsocketServer::ResizeTerminal( - int socketID, const char *jsonData, size_t len, - const std::string &containerID, - const std::string &suffix) +int WebsocketServer::ResizeTerminal(int socketID, const char *jsonData, size_t len, const std::string &containerID, + const std::string &suffix) { - int ret; - service_executor_t *cb = nullptr; - struct isulad_container_resize_request *req = nullptr; - struct isulad_container_resize_response *res = nullptr; - uint16_t width = 0; - uint16_t height = 0; - - cb = get_service_executor(); + auto *cb = get_service_executor(); if (cb == nullptr || cb->container.resize == nullptr) { return -1; } - if (parseTerminalSize(jsonData, len, width, height) != 0) { + uint16_t width = 0; + uint16_t height = 0; + if (ParseTerminalSize(jsonData, len, width, height) != 0) { return -1; } - req = (struct isulad_container_resize_request *)util_common_calloc_s( - sizeof(struct isulad_container_resize_request)); + auto *req = static_cast( + util_common_calloc_s(sizeof(struct isulad_container_resize_request))); if (req == nullptr) { ERROR("Out of memory"); return -1; @@ -424,12 +498,12 @@ int WebsocketServer::ResizeTerminal( req->height = height; req->width = width; - ret = cb->container.resize(req, &res); + struct isulad_container_resize_response *res = nullptr; + int ret = cb->container.resize(req, &res); isulad_container_resize_request_free(req); isulad_container_resize_response_free(res); - return ret; } @@ -442,9 +516,7 @@ void WebsocketServer::Receive(int socketID, void *in, size_t len) } if (*static_cast(in) == WebsocketChannel::RESIZECHANNEL) { - std::string containerID = it->second->container_id; - std::string suffix = it->second->suffix; - if (ResizeTerminal(socketID, (char *)in + 1, len, containerID, suffix) != 0) { + if (ResizeTerminal(socketID, (char *)in + 1, len, it->second->containerID, it->second->suffix) != 0) { ERROR("Failed to resize terminal tty"); return; } @@ -459,8 +531,7 @@ void WebsocketServer::Receive(int socketID, void *in, size_t len) } } -int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, - void *user, void *in, size_t len) +int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { switch (reason) { case LWS_CALLBACK_HTTP: @@ -493,11 +564,11 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, return -1; } - auto isSessionClosed = it->second->IsClosed(); + auto sessionClosed = it->second->IsClosed(); while (!it->second->buffer.empty()) { - unsigned char *message = it->second->FrontMessage(); + auto *message = it->second->FrontMessage(); // send success! free it and erase for list - if (WebsocketServer::GetInstance()->Wswrite(wsi, (const unsigned char *)message) == 0) { + if (WebsocketServer::GetInstance()->Wswrite(wsi, const_cast(message)) == 0) { free(message); it->second->PopMessage(); } else { @@ -508,7 +579,7 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, } // avoid: push message to buffer and set closed true - if (isSessionClosed) { + if (sessionClosed) { DEBUG("websocket session disconnected"); return -1; } @@ -517,7 +588,7 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, break; case LWS_CALLBACK_RECEIVE: { ReadGuard lock(m_mutex); - WebsocketServer::GetInstance()->Receive(lws_get_socket_fd(wsi), (char *)in, len); + WebsocketServer::GetInstance()->Receive(lws_get_socket_fd(wsi), static_cast(in), len); } break; case LWS_CALLBACK_CLOSED: { @@ -538,7 +609,7 @@ void WebsocketServer::ServiceWorkThread(int threadid) prctl(PR_SET_NAME, "WebsocketServer"); - while (n >= 0 && !m_force_exit) { + while (n >= 0 && !m_forceExit) { n = lws_service(m_context, 0); } } @@ -553,16 +624,17 @@ void WebsocketServer::Start(Errors &err) if (CreateContext() < 0) { err.SetError("Websocket server start failed! please check your network status" - "(eg: port " + std::to_string(m_listenPort) + " is occupied)"); + "(eg: port " + + std::to_string(m_listenPort) + " is occupied)"); return; } - m_pthread_service = std::thread(&WebsocketServer::ServiceWorkThread, this, 0); + m_pthreadService = std::thread(&WebsocketServer::ServiceWorkThread, this, 0); } void WebsocketServer::Wait() { - if (m_pthread_service.joinable()) { - m_pthread_service.join(); + if (m_pthreadService.joinable()) { + m_pthreadService.join(); } CloseAllWsSession(); @@ -571,19 +643,17 @@ void WebsocketServer::Wait() } namespace { - -void DoWriteToClient(session_data *session, - const void *data, size_t len, WebsocketChannel channel) +void DoWriteToClient(SessionData *session, const void *data, size_t len, WebsocketChannel channel) { - unsigned char *buf = (unsigned char *)util_common_calloc_s(LWS_PRE + MAX_BUFFER_SIZE + 1); + auto *buf = static_cast(util_common_calloc_s(LWS_PRE + MAX_BUFFER_SIZE + 1)); if (buf == nullptr) { ERROR("Out of memory"); return; } // Determine if it is standard output channel or error channel - buf[LWS_PRE] = channel; + buf[LWS_PRE] = static_cast(channel); - (void)memcpy(&buf[LWS_PRE + 1], (void *)data, len); + (void)memcpy(&buf[LWS_PRE + 1], const_cast(data), len); // push back to message list if (session->PushMessage(buf) != 0) { @@ -594,7 +664,7 @@ void DoWriteToClient(session_data *session, ssize_t WsWriteToClient(void *context, const void *data, size_t len, WebsocketChannel channel) { - auto *lwsCtx = static_cast(context); + auto *lwsCtx = static_cast(context); // CloseWsSession wait IOCopy finished, and then delete session in m_wsis // So don't need rdlock m_wsis here @@ -605,7 +675,7 @@ ssize_t WsWriteToClient(void *context, const void *data, size_t len, WebsocketCh DoWriteToClient(lwsCtx, data, len, channel); return static_cast(len); } -}; +}; // namespace ssize_t WsWriteStdoutToClient(void *context, const void *data, size_t len) { @@ -636,12 +706,12 @@ int closeWsConnect(void *context, char **err) return -1; } - auto *lwsCtx = static_cast(context); + auto *lwsCtx = static_cast(context); lwsCtx->CloseSession(); - if (lwsCtx->sync_close_sem != nullptr) { - (void)sem_post(lwsCtx->sync_close_sem); + if (lwsCtx->syncCloseSem != nullptr) { + (void)sem_post(lwsCtx->syncCloseSem); } return 0; diff --git a/src/daemon/entry/cri/websocket/service/ws_server.h b/src/daemon/entry/cri/websocket/service/ws_server.h index 3ab8e22f..2d3bb4a7 100644 --- a/src/daemon/entry/cri/websocket/service/ws_server.h +++ b/src/daemon/entry/cri/websocket/service/ws_server.h @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. * iSulad licensed under the Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. * You may obtain a copy of Mulan PSL v2 at: @@ -17,7 +17,6 @@ #define DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_WS_SERVER_H #include #include -#include #include #include #include @@ -29,140 +28,40 @@ #include "url.h" #include "errors.h" #include "read_write_lock.h" -#include "isula_libutils/cri_terminal_size.h" -#define MAX_ECHO_PAYLOAD 4096 -#define MAX_ARRAY_LEN 2 -#define MAX_BUF_LEN 256 -#define MAX_PROTOCOL_NUM 2 -#define MAX_HTTP_HEADER_POOL 8 -#define MIN_VEC_SIZE 3 -#define PIPE_FD_NUM 2 -#define BUF_BASE_SIZE 1024 -#define LWS_TIMEOUT 50 -// io copy maximum single transfer 4K, let max total buffer size: 1GB -#define FIFO_LIST_BUFFER_MAX_LEN 262144 -#define SESSION_CAPABILITY 300 -#define MAX_SESSION_NUM 128 +namespace +{ +const int MAX_ECHO_PAYLOAD = 4096; +const int MAX_ARRAY_LEN = 2; +const int MAX_PROTOCOL_NUM = 2; +}; // namespace -enum WebsocketChannel { - STDINCHANNEL = 0, - STDOUTCHANNEL, - STDERRCHANNEL, - ERRORCHANNEL, - RESIZECHANNEL -}; - -struct session_data { +struct SessionData { std::array pipes; volatile bool close; - std::mutex *session_mutex; - sem_t *sync_close_sem; + std::mutex *sessionMutex; + sem_t *syncCloseSem; std::list buffer; - std::string container_id; + std::string containerID; std::string suffix; - unsigned char *FrontMessage() - { - unsigned char *message = nullptr; - - if (session_mutex == nullptr) { - return nullptr; - } - - session_mutex->lock(); - message = buffer.front(); - session_mutex->unlock(); - - return message; - } - - void PopMessage() - { - if (session_mutex == nullptr) { - return; - } - - session_mutex->lock(); - buffer.pop_front(); - session_mutex->unlock(); - } - - int PushMessage(unsigned char *message) - { - if (session_mutex == nullptr) { - return -1; - } - - session_mutex->lock(); - - // In extreme scenarios, websocket data cannot be processed, - // ignore the data coming in later to prevent iSulad from getting stuck - if (close || buffer.size() >= FIFO_LIST_BUFFER_MAX_LEN) { - free(message); - session_mutex->unlock(); - return -1; - } - - buffer.push_back(message); - session_mutex->unlock(); - return 0; - } - - bool IsClosed() - { - bool c = false; - - if (session_mutex == nullptr) { - return true; - } - - session_mutex->lock(); - c = close; - session_mutex->unlock(); - - return c; - } - - void CloseSession() - { - if (session_mutex == nullptr) { - return; - } - - session_mutex->lock(); - close = true; - session_mutex->unlock(); - } - - void EraseAllMessage() - { - if (session_mutex == nullptr) { - return; - } - - session_mutex->lock(); - for (auto iter = buffer.begin(); iter != buffer.end();) { - free(*iter); - *iter = NULL; - iter = buffer.erase(iter); - } - session_mutex->unlock(); - } + unsigned char *FrontMessage(); + void PopMessage(); + int PushMessage(unsigned char *message); + bool IsClosed(); + void CloseSession(); + void EraseAllMessage(); }; class WebsocketServer { public: static WebsocketServer *GetInstance() noexcept; - static std::atomic m_instance; void Start(Errors &err); void Wait(); void Shutdown(); void RegisterCallback(const std::string &path, std::shared_ptr callback); url::URLDatum GetWebsocketUrl(); void SetLwsSendedFlag(int socketID, bool sended); - void ReadLockAllWsSession(); - void UnlockAllWsSession(); private: WebsocketServer(); @@ -171,33 +70,41 @@ private: virtual ~WebsocketServer(); int InitRWPipe(int read_fifo[]); std::vector split(std::string str, char r); - static void EmitLog(int level, const char *line); + int CreateContext(); inline void Receive(int socketID, void *in, size_t len); - int Wswrite(struct lws *wsi, const unsigned char *message); + int Wswrite(struct lws *wsi, const unsigned char *message); inline void DumpHandshakeInfo(struct lws *wsi) noexcept; int RegisterStreamTask(struct lws *wsi) noexcept; - int GenerateSessionData(session_data *session, const std::string containerID) noexcept; - static int Callback(struct lws *wsi, enum lws_callback_reasons reason, - void *user, void *in, size_t len); + int GenerateSessionData(SessionData *session, const std::string containerID) noexcept; void ServiceWorkThread(int threadid); void CloseWsSession(int socketID); void CloseAllWsSession(); - int ResizeTerminal(int socketID, const char *jsonData, size_t len, - const std::string &containerID, const std::string &suffix); - int parseTerminalSize(const char *jsonData, size_t len, uint16_t &width, uint16_t &height); + int ResizeTerminal(int socketID, const char *jsonData, size_t len, const std::string &containerID, + const std::string &suffix); + int ParseTerminalSize(const char *jsonData, size_t len, uint16_t &width, uint16_t &height); + +private: + // redirect libwebsockets logs to iSulad + static void EmitLog(int level, const char *line); + // libwebsockets Callback function + static int Callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); private: + static std::atomic m_instance; static RWMutex m_mutex; static struct lws_context *m_context; - volatile int m_force_exit = 0; - std::thread m_pthread_service; - const struct lws_protocols m_protocols[MAX_PROTOCOL_NUM] = { - { "channel.k8s.io", Callback, 0, MAX_ECHO_PAYLOAD, }, - { nullptr, nullptr, 0, 0 } - }; + volatile int m_forceExit = 0; + std::thread m_pthreadService; + const struct lws_protocols m_protocols[MAX_PROTOCOL_NUM] = { { + "channel.k8s.io", + Callback, + 0, + MAX_ECHO_PAYLOAD, + }, + { nullptr, nullptr, 0, 0 } }; RouteCallbackRegister m_handler; - static std::unordered_map m_wsis; + static std::unordered_map m_wsis; url::URLDatum m_url; int m_listenPort; }; @@ -207,4 +114,3 @@ ssize_t WsWriteStderrToClient(void *context, const void *data, size_t len); int closeWsConnect(void *context, char **err); #endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_WS_SERVER_H - -- 2.25.1