diff --git a/0016-fix-mem-leak.patch b/0016-fix-mem-leak.patch new file mode 100644 index 0000000..4394b83 --- /dev/null +++ b/0016-fix-mem-leak.patch @@ -0,0 +1,42 @@ +From b97bdc9e63872bef2164a3b97ab837ac607ccf16 Mon Sep 17 00:00:00 2001 +From: gaohuatao +Date: Fri, 3 Dec 2021 16:36:18 +0800 +Subject: [PATCH] fix mem leak + +Signed-off-by: gaohuatao +--- + src/daemon/modules/image/oci/storage/image_store/image_store.c | 3 ++- + .../modules/image/oci/storage/rootfs_store/rootfs_store.c | 3 ++- + 2 files changed, 4 insertions(+), 2 deletions(-) + +diff --git a/src/daemon/modules/image/oci/storage/image_store/image_store.c b/src/daemon/modules/image/oci/storage/image_store/image_store.c +index d2956114..288d7bd7 100644 +--- a/src/daemon/modules/image/oci/storage/image_store/image_store.c ++++ b/src/daemon/modules/image/oci/storage/image_store/image_store.c +@@ -2980,7 +2980,8 @@ static int append_image_by_directory(const char *image_dir) + im = storage_image_parse_file(image_path, NULL, &err); + if (im == NULL) { + ERROR("Failed to parse images path: %s", err); +- return -1; ++ ret = -1; ++ goto out; + } + + ret = strip_default_hostname(im); +diff --git a/src/daemon/modules/image/oci/storage/rootfs_store/rootfs_store.c b/src/daemon/modules/image/oci/storage/rootfs_store/rootfs_store.c +index 8e1d5a11..378d1a96 100644 +--- a/src/daemon/modules/image/oci/storage/rootfs_store/rootfs_store.c ++++ b/src/daemon/modules/image/oci/storage/rootfs_store/rootfs_store.c +@@ -177,7 +177,8 @@ static int append_container_by_directory(const char *container_dir) + c = storage_rootfs_parse_file(container_path, NULL, &err); + if (c == NULL) { + ERROR("Failed to parse container path: %s", err); +- return -1; ++ ret = -1; ++ goto out; + } + + if (do_append_container(c) != 0) { +-- +2.25.1 + diff --git a/0017-isula-pull-does-not-support-format-name-digest.patch b/0017-isula-pull-does-not-support-format-name-digest.patch new file mode 100644 index 0000000..92fdfb1 --- /dev/null +++ b/0017-isula-pull-does-not-support-format-name-digest.patch @@ -0,0 +1,26 @@ +From de8ef6a226fdbee53975d6d746a065a24a98ea05 Mon Sep 17 00:00:00 2001 +From: WangFengTu +Date: Mon, 6 Dec 2021 11:07:36 +0800 +Subject: [PATCH] isula pull does not support format name@digest + +Signed-off-by: WangFengTu +--- + src/cmd/isula/images/pull.c | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/src/cmd/isula/images/pull.c b/src/cmd/isula/images/pull.c +index 3ba7a715..da9cae52 100644 +--- a/src/cmd/isula/images/pull.c ++++ b/src/cmd/isula/images/pull.c +@@ -25,7 +25,7 @@ + #include "connect.h" + + const char g_cmd_pull_desc[] = "Pull an image or a repository from a registry"; +-const char g_cmd_pull_usage[] = "pull [OPTIONS] NAME[:TAG|@DIGEST]"; ++const char g_cmd_pull_usage[] = "pull [OPTIONS] NAME[:TAG]"; + + struct client_arguments g_cmd_pull_args = {}; + +-- +2.25.1 + diff --git a/0018-Fixed-dangerous-memory-operations.patch b/0018-Fixed-dangerous-memory-operations.patch new file mode 100644 index 0000000..e89d58b --- /dev/null +++ b/0018-Fixed-dangerous-memory-operations.patch @@ -0,0 +1,144 @@ +From 6f337131977c21966cf7a6898cfc81414c07cf05 Mon Sep 17 00:00:00 2001 +From: chengzrz +Date: Mon, 6 Dec 2021 15:34:31 +0800 +Subject: [PATCH] Fixed dangerous memory operations + +Signed-off-by: chengzrz +--- + .../cri_pod_sandbox_manager_service_impl.cc | 3 +- + .../executor/container_cb/execution_create.c | 5 +++ + src/utils/cutils/utils_network.c | 43 +++++++++++++------ + test/mocks/namespace_mock.h | 3 +- + 4 files changed, 40 insertions(+), 14 deletions(-) + +diff --git a/src/daemon/entry/cri/cri_pod_sandbox_manager_service_impl.cc b/src/daemon/entry/cri/cri_pod_sandbox_manager_service_impl.cc +index 0a577849..57297287 100644 +--- a/src/daemon/entry/cri/cri_pod_sandbox_manager_service_impl.cc ++++ b/src/daemon/entry/cri/cri_pod_sandbox_manager_service_impl.cc +@@ -477,7 +477,7 @@ void PodSandboxManagerServiceImpl::SetupSandboxNetwork(const runtime::v1alpha2:: + { + std::map stdAnnos; + std::map networkOptions; +- const char* sandbox_key = get_sandbox_key(inspect_data); ++ char* sandbox_key = get_sandbox_key(inspect_data); + + // Setup sandbox files + if (config.has_dns_config() && inspect_data->resolv_conf_path != nullptr) { +@@ -510,6 +510,7 @@ void PodSandboxManagerServiceImpl::SetupSandboxNetwork(const runtime::v1alpha2:: + } + + cleanup: ++ free(sandbox_key); + return; + } + +diff --git a/src/daemon/executor/container_cb/execution_create.c b/src/daemon/executor/container_cb/execution_create.c +index 95a7d9ab..e647ca06 100644 +--- a/src/daemon/executor/container_cb/execution_create.c ++++ b/src/daemon/executor/container_cb/execution_create.c +@@ -1421,6 +1421,11 @@ static char *new_pod_sandbox_key(void) + + static int generate_network_settings(const host_config *host_config, container_config_v2_common_config *v2_spec) + { ++ if (host_config == NULL || v2_spec == NULL) { ++ ERROR("Invalid input"); ++ return -1; ++ } ++ + container_config_v2_common_config_network_settings *settings = NULL; + + if (!namespace_is_file(host_config->network_mode)) { +diff --git a/src/utils/cutils/utils_network.c b/src/utils/cutils/utils_network.c +index a5d77c93..1ca901ea 100644 +--- a/src/utils/cutils/utils_network.c ++++ b/src/utils/cutils/utils_network.c +@@ -65,26 +65,34 @@ out: + return ret; + } + +-static void mount_netns(void *netns_path) ++static void* mount_netns(void *netns_path) + { +- int failure = EXIT_FAILURE; +- int success = EXIT_SUCCESS; ++ int *ecode = (int *)malloc(sizeof(int)); + char fullpath[PATH_MAX] = { 0x00 }; + int ret = 0; + + if (unshare(CLONE_NEWNET) != 0) { +- pthread_exit((void *)&failure); ++ ERROR("Failed to unshare"); ++ goto err_out; + } + + ret = snprintf(fullpath, sizeof(fullpath), "/proc/%d/task/%ld/ns/net", getpid(), (long int)syscall(__NR_gettid)); + if (ret < 0 || (size_t)ret >= sizeof(fullpath)) { +- pthread_exit((void *)&failure); ++ ERROR("Failed to get full path"); ++ goto err_out; + } + + if (util_mount(fullpath, (char *)netns_path, "none", "bind") != 0) { +- pthread_exit((void *)&failure); ++ ERROR("Failed to mount %s", fullpath); ++ goto err_out; + } +- pthread_exit((void *)&success); ++ ++ *ecode = EXIT_SUCCESS; ++ pthread_exit((void *)ecode); ++ ++err_out: ++ *ecode = EXIT_FAILURE; ++ pthread_exit((void *)ecode); + } + + // this function mounts netns path to /proc/%d/task/%d/ns/net +@@ -103,14 +111,25 @@ int util_mount_namespace(const char *netns_path) + ret = pthread_join(newns_thread, &status); + if (ret != 0) { + ERROR("Failed to join thread"); ++ ret = -1; ++ goto out; ++ } ++ ++ if (status == NULL) { ++ ERROR("Failed set exit status"); + return -1; ++ } ++ ++ if (*(int *)status != 0) { ++ ERROR("Failed to initialize network namespace, status code is %d", *(int *)status); ++ ret = -1; + } else { +- if (*(int *)status != 0) { +- ERROR("Failed to initialize network namespace"); +- return -1; +- } ++ ret = 0; + } +- return 0; ++ ++out: ++ free(status); ++ return ret; + } + + int util_umount_namespace(const char *netns_path) +diff --git a/test/mocks/namespace_mock.h b/test/mocks/namespace_mock.h +index 80e75b0b..5bfc2c70 100644 +--- a/test/mocks/namespace_mock.h ++++ b/test/mocks/namespace_mock.h +@@ -26,7 +26,8 @@ public: + MOCK_METHOD1(ConnectedContainer, char *(const char *mode)); + MOCK_METHOD3(GetShareNamespacePath, int(const char *type, const char *src_path, char **dest_path)); + MOCK_METHOD1(GetContainerProcessLabel, char *(const char *path)); +- MOCK_METHOD4(GetNetworkNamespacePath, int(const host_config *, const container_config_v2_common_config_network_settings *, const char *, char **)); ++ MOCK_METHOD4(GetNetworkNamespacePath, int(const host_config *, ++ const container_config_v2_common_config_network_settings *, const char *, char **)); + }; + + void MockNamespace_SetMock(MockNamespace *mock); +-- +2.25.1 + diff --git a/0019-add-pull-request-gateway-checker-for-build-and-ut.patch b/0019-add-pull-request-gateway-checker-for-build-and-ut.patch new file mode 100644 index 0000000..6dbee9c --- /dev/null +++ b/0019-add-pull-request-gateway-checker-for-build-and-ut.patch @@ -0,0 +1,97 @@ +From 5a9ab3c983158c8848868e92d5a06fbd7bfc9141 Mon Sep 17 00:00:00 2001 +From: haozi007 +Date: Mon, 6 Dec 2021 09:26:40 +0000 +Subject: [PATCH] add pull request gateway checker for build and ut + +Signed-off-by: haozi007 +--- + CI/pr-gateway.sh | 77 ++++++++++++++++++++++++++++++++++++++++++++++++ + 1 file changed, 77 insertions(+) + create mode 100755 CI/pr-gateway.sh + +diff --git a/CI/pr-gateway.sh b/CI/pr-gateway.sh +new file mode 100755 +index 00000000..c38059b9 +--- /dev/null ++++ b/CI/pr-gateway.sh +@@ -0,0 +1,77 @@ ++#!/bin/bash ++####################################################################### ++##- @Copyright (C) Huawei Technologies., Ltd. 2021. All rights reserved. ++# - iSulad licensed under the Mulan PSL v2. ++# - You can use this software according to the terms and conditions of the Mulan PSL v2. ++# - You may obtain a copy of Mulan PSL v2 at: ++# - http://license.coscl.org.cn/MulanPSL2 ++# - THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# - IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# - PURPOSE. ++# - See the Mulan PSL v2 for more details. ++##- @Description:provide gateway checker for pull request of iSulad ++##- @Author: haozi007 ++##- @Create: 2021-12-06 ++####################################################################### ++tbranch="master" ++if [ $# -eq 1 ]; then ++ tbranch=$1 ++fi ++ ++dnf install -y gtest-devel gmock-devel diffutils cmake gcc-c++ yajl-devel patch make libtool libevent-devel libevhtp-devel grpc grpc-plugins grpc-devel protobuf-devel libcurl libcurl-devel sqlite-devel libarchive-devel device-mapper-devel http-parser-devel libseccomp-devel libcap-devel libselinux-devel libwebsockets libwebsockets-devel systemd-devel git chrpath ++ ++# dnf install -y cargo rust rust-packaging ++ ++cd ~ ++ ++rm -rf lxc ++git clone https://gitee.com/src-openeuler/lxc.git ++pushd lxc ++rm -rf lxc-4.0.3 ++./apply-patches || exit 1 ++pushd lxc-4.0.3 ++./autogen.sh && ./configure || exit 1 ++make -j $(nproc) || exit 1 ++make install ++popd ++popd ++ ++ldconfig ++rm -rf lcr ++git clone https://gitee.com/openeuler/lcr.git ++pushd lcr ++git checkout ${tbranch} ++rm -rf build ++mkdir build ++pushd build ++cmake -DDEBUG=ON -DCMAKE_SKIP_RPATH=TRUE ../ || exit 1 ++make -j $(nproc) || exit 1 ++make install ++popd ++popd ++ ++ldconfig ++rm -rf clibcni ++git clone https://gitee.com/openeuler/clibcni.git ++pushd clibcni ++git checkout ${tbranch} ++rm -rf build ++mkdir build ++pushd build ++cmake -DDEBUG=ON ../ || exit 1 ++make -j $(nproc) || exit 1 ++make install ++popd ++popd ++ ++ldconfig ++pushd iSulad ++rm -rf build ++mkdir build ++pushd build ++cmake -DDEBUG=ON -DCMAKE_INSTALL_PREFIX=/usr -DENABLE_UT=ON -DENABLE_SHIM_V2=OFF ../ || exit 1 ++#cmake -DDEBUG=ON -DCMAKE_INSTALL_PREFIX=/usr -DENABLE_UT=ON -DENABLE_SHIM_V2=ON ../ || exit 1 ++make -j $(nproc) || exit 1 ++ctest -V ++popd ++popd +-- +2.25.1 + diff --git a/0020-Optimize-websocket-streaming-service-code.patch b/0020-Optimize-websocket-streaming-service-code.patch new file mode 100644 index 0000000..bd9d764 --- /dev/null +++ b/0020-Optimize-websocket-streaming-service-code.patch @@ -0,0 +1,1901 @@ +From 3d5ad2160b9fe779433cce497bfa0cb0146bdcb3 Mon Sep 17 00:00:00 2001 +From: wujing +Date: Sat, 4 Dec 2021 21:20:25 +0800 +Subject: [PATCH] Optimize websocket streaming service code + +Signed-off-by: wujing +--- + .../cri/cri_container_manager_service_impl.cc | 28 +- + src/daemon/entry/cri/request_cache.cc | 123 ++----- + src/daemon/entry/cri/request_cache.h | 32 +- + .../cri/websocket/service/attach_serve.cc | 102 +++--- + .../cri/websocket/service/attach_serve.h | 28 +- + .../entry/cri/websocket/service/exec_serve.cc | 158 +++++---- + .../entry/cri/websocket/service/exec_serve.h | 27 +- + .../service/route_callback_register.cc | 80 +++++ + .../service/route_callback_register.h | 61 ++-- + .../cri/websocket/service/stream_server.cc | 6 +- + .../cri/websocket/service/stream_server.h | 4 +- + .../entry/cri/websocket/service/ws_server.cc | 300 +++++++++++------- + .../entry/cri/websocket/service/ws_server.h | 174 +++------- + 13 files changed, 542 insertions(+), 581 deletions(-) + create mode 100644 src/daemon/entry/cri/websocket/service/route_callback_register.cc + +diff --git a/src/daemon/entry/cri/cri_container_manager_service_impl.cc b/src/daemon/entry/cri/cri_container_manager_service_impl.cc +index 2e65ab51..b160ce31 100644 +--- a/src/daemon/entry/cri/cri_container_manager_service_impl.cc ++++ b/src/daemon/entry/cri/cri_container_manager_service_impl.cc +@@ -376,7 +376,8 @@ ContainerManagerServiceImpl::GenerateCreateContainerRequest(const std::string &r + hostconfig->cgroup_parent = util_strdup_s(podSandboxConfig.linux().cgroup_parent().c_str()); + } + +- custom_config = GenerateCreateContainerCustomConfig(cname, realPodSandboxID, containerConfig, podSandboxConfig, error); ++ custom_config = ++ GenerateCreateContainerCustomConfig(cname, realPodSandboxID, containerConfig, podSandboxConfig, error); + if (error.NotEmpty()) { + goto cleanup; + } +@@ -409,11 +410,10 @@ cleanup: + return request; + } + +-std::string ContainerManagerServiceImpl::CreateContainer( +- const std::string &podSandboxID, +- const runtime::v1alpha2::ContainerConfig &containerConfig, +- const runtime::v1alpha2::PodSandboxConfig &podSandboxConfig, +- Errors &error) ++std::string ContainerManagerServiceImpl::CreateContainer(const std::string &podSandboxID, ++ const runtime::v1alpha2::ContainerConfig &containerConfig, ++ const runtime::v1alpha2::PodSandboxConfig &podSandboxConfig, ++ Errors &error) + { + std::string response_id; + std::string podSandboxRuntime; +@@ -1305,10 +1305,16 @@ void ContainerManagerServiceImpl::Exec(const runtime::v1alpha2::ExecRequest &req + if (ValidateExecRequest(req, error) != 0) { + return; + } ++ auto execReq = new (std::nothrow) runtime::v1alpha2::ExecRequest(req); ++ if (execReq == nullptr) { ++ error.SetError("out of memory"); ++ return; ++ } + RequestCache *cache = RequestCache::GetInstance(); +- std::string token = cache->InsertExecRequest(req); ++ std::string token = cache->InsertRequest(req.container_id(), execReq); + if (token.empty()) { + error.SetError("failed to get a unique token!"); ++ delete execReq; + return; + } + std::string url = BuildURL("exec", token); +@@ -1350,10 +1356,16 @@ void ContainerManagerServiceImpl::Attach(const runtime::v1alpha2::AttachRequest + error.SetError("Empty attach response arguments"); + return; + } ++ auto attachReq = new (std::nothrow) runtime::v1alpha2::AttachRequest(req); ++ if (attachReq == nullptr) { ++ error.SetError("out of memory"); ++ return; ++ } + RequestCache *cache = RequestCache::GetInstance(); +- std::string token = cache->InsertAttachRequest(req); ++ std::string token = cache->InsertRequest(req.container_id(), attachReq); + if (token.empty()) { + error.SetError("failed to get a unique token!"); ++ delete attachReq; + return; + } + std::string url = BuildURL("attach", token); +diff --git a/src/daemon/entry/cri/request_cache.cc b/src/daemon/entry/cri/request_cache.cc +index 4ff284ab..312a8071 100644 +--- a/src/daemon/entry/cri/request_cache.cc ++++ b/src/daemon/entry/cri/request_cache.cc +@@ -1,5 +1,5 @@ + /****************************************************************************** +- * Copyright (c) Huawei Technologies Co., Ltd. 2017-2019. All rights reserved. ++ * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. + * iSulad licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: +@@ -8,8 +8,8 @@ + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. +- * Author: tanyifeng +- * Create: 2017-11-22 ++ * Author: wujing ++ * Create: 2019-01-02 + * Description: provide request cache function definition + *********************************************************************************/ + #include "request_cache.h" +@@ -19,15 +19,25 @@ + #include + #include + #include +-#include "isula_libutils/log.h" ++#include + #include "utils.h" + #include "utils_base64.h" + + std::atomic RequestCache::m_instance; + std::mutex RequestCache::m_mutex; ++ ++void CacheEntry::SetValue(const std::string &t, const std::string &id, ::google::protobuf::Message *request, ++ std::chrono::system_clock::time_point et) ++{ ++ token = t; ++ containerID = id; ++ req = request; ++ expireTime = et; ++} ++ + RequestCache *RequestCache::GetInstance() noexcept + { +- RequestCache *cache = m_instance.load(std::memory_order_relaxed); ++ auto *cache = m_instance.load(std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_acquire); + if (cache == nullptr) { + std::lock_guard lock(m_mutex); +@@ -41,25 +51,7 @@ RequestCache *RequestCache::GetInstance() noexcept + return cache; + } + +-std::string RequestCache::InsertExecRequest(const runtime::v1alpha2::ExecRequest &req) +-{ +- std::lock_guard lock(m_mutex); +- // Remove expired entries. +- GarbageCollection(); +- // If the cache is full, reject the request. +- if (m_ll.size() == MaxInFlight) { +- ERROR("too many cache in flight!"); +- return ""; +- } +- auto token = UniqueToken(); +- CacheEntry tmp; +- tmp.SetValue(token, &req, nullptr, std::chrono::system_clock::now() + std::chrono::minutes(1)); +- m_ll.push_front(tmp); +- m_tokens.insert(std::make_pair(token, tmp)); +- return token; +-} +- +-std::string RequestCache::InsertAttachRequest(const runtime::v1alpha2::AttachRequest &req) ++std::string RequestCache::InsertRequest(const std::string &containerID, ::google::protobuf::Message *req) + { + std::lock_guard lock(m_mutex); + // Remove expired entries. +@@ -71,7 +63,7 @@ std::string RequestCache::InsertAttachRequest(const runtime::v1alpha2::AttachReq + } + auto token = UniqueToken(); + CacheEntry tmp; +- tmp.SetValue(token, nullptr, &req, std::chrono::system_clock::now() + std::chrono::minutes(1)); ++ tmp.SetValue(token, containerID, req, std::chrono::system_clock::now() + std::chrono::minutes(1)); + m_ll.push_front(tmp); + m_tokens.insert(std::make_pair(token, tmp)); + return token; +@@ -81,10 +73,14 @@ void RequestCache::GarbageCollection() + { + auto now = std::chrono::system_clock::now(); + while (!m_ll.empty()) { +- CacheEntry oldest = m_ll.back(); ++ auto oldest = m_ll.back(); + if (now < oldest.expireTime) { + return; + } ++ if (oldest.req != nullptr) { ++ delete oldest.req; ++ oldest.req = nullptr; ++ } + m_ll.pop_back(); + m_tokens.erase(oldest.token); + } +@@ -103,15 +99,15 @@ std::string RequestCache::UniqueToken() + continue; + } + +- char *b64_encode_buf = nullptr; +- if (util_base64_encode((unsigned char *)rawToken, strlen(rawToken), &b64_encode_buf) < 0) { ++ char *b64EncodeBuf = nullptr; ++ if (util_base64_encode((unsigned char *)rawToken, strlen(rawToken), &b64EncodeBuf) < 0) { + ERROR("Encode raw token to base64 failed"); + continue; + } + +- std::string token(b64_encode_buf); +- free(b64_encode_buf); +- b64_encode_buf = nullptr; ++ std::string token(b64EncodeBuf); ++ free(b64EncodeBuf); ++ b64EncodeBuf = nullptr; + if (token.length() != TokenLen) { + continue; + } +@@ -133,37 +129,13 @@ bool RequestCache::IsValidToken(const std::string &token) + } + + // Consume the token (remove it from the cache) and return the cached request, if found. +-runtime::v1alpha2::ExecRequest RequestCache::ConsumeExecRequest(const std::string &token) +-{ +- std::lock_guard lock(m_mutex); +- +- if (m_tokens.count(token) == 0 || m_tokens[token].execRequest.size() == 0) { +- ERROR("Invalid token"); +- return runtime::v1alpha2::ExecRequest(); +- } +- +- CacheEntry ele = m_tokens[token]; +- for (auto it = m_ll.begin(); it != m_ll.end(); it++) { +- if (it->token == token) { +- m_ll.erase(it); +- break; +- } +- } +- m_tokens.erase(token); +- if (std::chrono::system_clock::now() > ele.expireTime) { +- return runtime::v1alpha2::ExecRequest(); +- } +- +- return ele.execRequest.at(0); +-} +- +-runtime::v1alpha2::AttachRequest RequestCache::ConsumeAttachRequest(const std::string &token) ++::google::protobuf::Message *RequestCache::ConsumeRequest(const std::string &token) + { + std::lock_guard lock(m_mutex); + +- if (m_tokens.count(token) == 0 || m_tokens[token].attachRequest.size() == 0) { ++ if (m_tokens.count(token) == 0) { + ERROR("Invalid token"); +- return runtime::v1alpha2::AttachRequest(); ++ return nullptr; + } + + CacheEntry ele = m_tokens[token]; +@@ -175,45 +147,20 @@ runtime::v1alpha2::AttachRequest RequestCache::ConsumeAttachRequest(const std::s + } + m_tokens.erase(token); + if (std::chrono::system_clock::now() > ele.expireTime) { +- return runtime::v1alpha2::AttachRequest(); +- } +- +- return ele.attachRequest.at(0); +-} +- +-std::string RequestCache::GetExecContainerIDByToken(const std::string &token) +-{ +- std::lock_guard lock(m_mutex); +- +- if (m_tokens.count(token) == 0 || m_tokens[token].execRequest.size() == 0) { +- ERROR("Invalid token"); +- return ""; ++ return nullptr; + } + +- return m_tokens[token].execRequest.at(0).container_id(); ++ return ele.req; + } + +-std::string RequestCache::GetAttachContainerIDByToken(const std::string &token) ++std::string RequestCache::GetContainerIDByToken(const std::string &token) + { + std::lock_guard lock(m_mutex); + +- if (m_tokens.count(token) == 0 || m_tokens[token].attachRequest.size() == 0) { ++ if (m_tokens.count(token) == 0) { + ERROR("Invalid token"); + return ""; + } + +- return m_tokens[token].attachRequest.at(0).container_id(); +-} +- +-std::string RequestCache::GetContainerIDByToken(const std::string &method, const std::string &token) +-{ +- if (method == "exec") { +- return GetExecContainerIDByToken(token); +- } else if (method == "attach") { +- return GetAttachContainerIDByToken(token); +- } +- +- ERROR("Invalid method: %s", method.c_str()); +- +- return ""; ++ return m_tokens[token].containerID; + } +diff --git a/src/daemon/entry/cri/request_cache.h b/src/daemon/entry/cri/request_cache.h +index d44b4d78..90ae20e8 100644 +--- a/src/daemon/entry/cri/request_cache.h ++++ b/src/daemon/entry/cri/request_cache.h +@@ -1,5 +1,5 @@ + /****************************************************************************** +- * Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved. ++ * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. + * iSulad licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: +@@ -23,44 +23,28 @@ + #include + #include + #include +-#include "api.pb.h" + + struct CacheEntry { + std::string token; +- std::vector execRequest; +- std::vector attachRequest; ++ std::string containerID; ++ ::google::protobuf::Message *req; + std::chrono::system_clock::time_point expireTime; + +- void SetValue(const std::string &t, +- const runtime::v1alpha2::ExecRequest *execReq, +- const runtime::v1alpha2::AttachRequest *attachReq, +- std::chrono::system_clock::time_point et) +- { +- token = t; +- if (execReq != nullptr) { +- execRequest.push_back(*execReq); +- } else if (attachReq != nullptr) { +- attachRequest.push_back(*attachReq); +- } +- expireTime = et; +- } ++ void SetValue(const std::string &t, const std::string &id, ::google::protobuf::Message *request, ++ std::chrono::system_clock::time_point et); + }; + + class RequestCache { + public: + static RequestCache *GetInstance() noexcept; +- std::string InsertExecRequest(const runtime::v1alpha2::ExecRequest &req); +- std::string InsertAttachRequest(const runtime::v1alpha2::AttachRequest &req); +- runtime::v1alpha2::ExecRequest ConsumeExecRequest(const std::string &token); +- runtime::v1alpha2::AttachRequest ConsumeAttachRequest(const std::string &token); +- std::string GetContainerIDByToken(const std::string &method, const std::string &token); ++ std::string InsertRequest(const std::string &containerID, ::google::protobuf::Message *req); ++ ::google::protobuf::Message *ConsumeRequest(const std::string &token); ++ std::string GetContainerIDByToken(const std::string &token); + bool IsValidToken(const std::string &token); + + private: + void GarbageCollection(); + std::string UniqueToken(); +- std::string GetExecContainerIDByToken(const std::string &token); +- std::string GetAttachContainerIDByToken(const std::string &token); + + private: + RequestCache() = default; +diff --git a/src/daemon/entry/cri/websocket/service/attach_serve.cc b/src/daemon/entry/cri/websocket/service/attach_serve.cc +index cda63c45..abe23f51 100644 +--- a/src/daemon/entry/cri/websocket/service/attach_serve.cc ++++ b/src/daemon/entry/cri/websocket/service/attach_serve.cc +@@ -1,5 +1,5 @@ + /****************************************************************************** +- * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved. ++ * Copyright (c) Huawei Technologies Co., Ltd. 2018-2021. All rights reserved. + * iSulad licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: +@@ -8,86 +8,78 @@ + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. +- * Author: lifeng ++ * Author: wujing + * Create: 2018-11-08 + * Description: provide container attach functions + ******************************************************************************/ + + #include "attach_serve.h" ++#include "api.pb.h" ++#include "ws_server.h" ++#include "isula_libutils/log.h" ++#include "callback.h" + #include "utils.h" + +-int AttachServe::Execute(session_data *lws_ctx, const std::string &token) ++AttachServe::~AttachServe() + { +- if (lws_ctx == nullptr) { +- return -1; +- } ++ free_container_attach_request(m_request); ++ free_container_attach_response(m_response); ++} + ++void AttachServe::SetServeThreadName() ++{ + prctl(PR_SET_NAME, "AttachServe"); ++} + +- service_executor_t *cb = get_service_executor(); +- if (cb == nullptr || cb->container.attach == nullptr) { +- sem_post(lws_ctx->sync_close_sem); ++int AttachServe::SetContainerStreamRequest(::google::protobuf::Message *request, const std::string &suffix) ++{ ++ auto *grequest = dynamic_cast(request); ++ ++ m_request = static_cast(util_common_calloc_s(sizeof(container_attach_request))); ++ if (m_request == nullptr) { ++ ERROR("Out of memory"); + return -1; + } + +- container_attach_request *container_req = nullptr; +- if (GetContainerRequest(token, &container_req) != 0) { +- ERROR("Failed to get contaner request"); +- sem_post(lws_ctx->sync_close_sem); ++ if (!grequest->container_id().empty()) { ++ m_request->container_id = util_strdup_s(grequest->container_id().c_str()); ++ } ++ m_request->attach_stdin = grequest->stdin(); ++ m_request->attach_stdout = grequest->stdout(); ++ m_request->attach_stderr = grequest->stderr(); ++ ++ return 0; ++} ++ ++int AttachServe::ExecuteStreamCommand(SessionData *lwsCtx) ++{ ++ auto *cb = get_service_executor(); ++ if (cb == nullptr || cb->container.attach == nullptr) { ++ ERROR("Failed to get attach service executor"); ++ sem_post(lwsCtx->syncCloseSem); + return -1; + } + + struct io_write_wrapper stringWriter = { 0 }; +- stringWriter.context = (void *)(lws_ctx); ++ stringWriter.context = (void *)(lwsCtx); + stringWriter.write_func = WsWriteStdoutToClient; + stringWriter.close_func = closeWsConnect; +- container_req->attach_stderr = false; +- +- container_attach_response *container_res = nullptr; +- int ret = cb->container.attach(container_req, &container_res, container_req->attach_stdin ? lws_ctx->pipes.at(0) : -1, +- container_req->attach_stdout ? &stringWriter : nullptr, nullptr); +- if (ret != 0) { +- ERROR("Failed to attach container: %s", container_req->container_id); +- sem_post(lws_ctx->sync_close_sem); +- } +- +- free_container_attach_request(container_req); +- free_container_attach_response(container_res); ++ m_request->attach_stderr = false; + +- return ret; ++ return cb->container.attach(m_request, &m_response, m_request->attach_stdin ? lwsCtx->pipes.at(0) : -1, ++ m_request->attach_stdout ? &stringWriter : nullptr, nullptr); + } + +-int AttachServe::GetContainerRequest(const std::string &token, container_attach_request **container_req) ++void AttachServe::ErrorHandler(int ret, SessionData *lwsCtx) + { +- RequestCache *cache = RequestCache::GetInstance(); +- auto request = cache->ConsumeAttachRequest(token); +- +- int ret = RequestFromCri(request, container_req); +- if (ret != 0) { +- ERROR("Failed to transform grpc request!"); ++ if (ret == 0) { ++ return; + } +- +- return ret; ++ ERROR("Failed to attach container: %s", m_request->container_id); ++ sem_post(lwsCtx->syncCloseSem); + } + +-int AttachServe::RequestFromCri(const runtime::v1alpha2::AttachRequest &grequest, container_attach_request **request) ++void AttachServe::CloseConnect(SessionData *lwsCtx) + { +- container_attach_request *tmpreq = nullptr; +- +- tmpreq = (container_attach_request *)util_common_calloc_s(sizeof(container_attach_request)); +- if (tmpreq == nullptr) { +- ERROR("Out of memory"); +- return -1; +- } +- +- if (!grequest.container_id().empty()) { +- tmpreq->container_id = util_strdup_s(grequest.container_id().c_str()); +- } +- tmpreq->attach_stdin = grequest.stdin(); +- tmpreq->attach_stdout = grequest.stdout(); +- tmpreq->attach_stderr = grequest.stderr(); +- +- *request = tmpreq; +- +- return 0; ++ (void)lwsCtx; + } +diff --git a/src/daemon/entry/cri/websocket/service/attach_serve.h b/src/daemon/entry/cri/websocket/service/attach_serve.h +index f7b8a017..38e75e29 100644 +--- a/src/daemon/entry/cri/websocket/service/attach_serve.h ++++ b/src/daemon/entry/cri/websocket/service/attach_serve.h +@@ -1,5 +1,5 @@ + /****************************************************************************** +- * Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved. ++ * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. + * iSulad licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: +@@ -17,27 +17,27 @@ + #define DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_ATTACH_SERVE_H + + #include "route_callback_register.h" +-#include + #include +-#include +-#include "ws_server.h" +- +-#include "api.pb.h" +-#include "isula_libutils/log.h" +-#include "callback.h" +-#include "request_cache.h" ++#include "isula_libutils/container_attach_request.h" ++#include "isula_libutils/container_attach_response.h" + + class AttachServe : public StreamingServeInterface { + public: + AttachServe() = default; + AttachServe(const AttachServe &) = delete; + AttachServe &operator=(const AttachServe &) = delete; +- virtual ~AttachServe() = default; +- int Execute(session_data *lws_ctx, const std::string &token) override; ++ virtual ~AttachServe(); ++ ++private: ++ virtual void SetServeThreadName() override; ++ virtual int SetContainerStreamRequest(::google::protobuf::Message *grequest, const std::string &suffix) override; ++ virtual int ExecuteStreamCommand(SessionData *lwsCtx) override; ++ virtual void ErrorHandler(int ret, SessionData *lwsCtx) override; ++ virtual void CloseConnect(SessionData *lwsCtx) override; ++ + private: +- int RequestFromCri(const runtime::v1alpha2::AttachRequest &grequest, +- container_attach_request **request); +- int GetContainerRequest(const std::string &token, container_attach_request **container_req); ++ container_attach_request *m_request { nullptr }; ++ container_attach_response *m_response { nullptr }; + }; + #endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_ATTACH_SERVE_H + +diff --git a/src/daemon/entry/cri/websocket/service/exec_serve.cc b/src/daemon/entry/cri/websocket/service/exec_serve.cc +index 26b552de..b7709c48 100644 +--- a/src/daemon/entry/cri/websocket/service/exec_serve.cc ++++ b/src/daemon/entry/cri/websocket/service/exec_serve.cc +@@ -1,5 +1,5 @@ + /****************************************************************************** +- * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved. ++ * Copyright (c) Huawei Technologies Co., Ltd. 2018-2021. All rights reserved. + * iSulad licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: +@@ -8,127 +8,111 @@ + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. +- * Author: lifeng ++ * Author: wujing + * Create: 2018-11-08 + * Description: provide ExecServe functions + ******************************************************************************/ + + #include "exec_serve.h" ++#include + #include "io_wrapper.h" ++#include "ws_server.h" + #include "utils.h" + #include "cri_helpers.h" + +-int ExecServe::Execute(session_data *lws_ctx, const std::string &token) ++ExecServe::~ExecServe() + { +- if (lws_ctx == nullptr) { +- return -1; +- } ++ free_container_exec_request(m_request); ++ free_container_exec_response(m_response); ++} + ++void ExecServe::SetServeThreadName() ++{ + prctl(PR_SET_NAME, "ExecServe"); ++} + +- service_executor_t *cb = get_service_executor(); +- if (cb == nullptr || cb->container.exec == nullptr) { +- sem_post(lws_ctx->sync_close_sem); ++int ExecServe::SetContainerStreamRequest(::google::protobuf::Message *request, const std::string &suffix) ++{ ++ auto *grequest = dynamic_cast(request); ++ ++ m_request = static_cast(util_common_calloc_s(sizeof(container_exec_request))); ++ if (m_request == nullptr) { ++ ERROR("Out of memory"); + return -1; + } + +- container_exec_request *container_req = nullptr; +- if (GetContainerRequest(token, lws_ctx->suffix, &container_req) != 0) { +- ERROR("Failed to get contaner request"); +- sem_post(lws_ctx->sync_close_sem); ++ m_request->tty = grequest->tty(); ++ m_request->attach_stdin = grequest->stdin(); ++ m_request->attach_stdout = grequest->stdout(); ++ m_request->attach_stderr = grequest->stderr(); ++ ++ if (!grequest->container_id().empty()) { ++ m_request->container_id = util_strdup_s(grequest->container_id().c_str()); ++ } ++ ++ if (grequest->cmd_size() > 0) { ++ if (static_cast(grequest->cmd_size()) > SIZE_MAX / sizeof(char *)) { ++ ERROR("Too many arguments!"); ++ return -1; ++ } ++ m_request->argv = (char **)util_common_calloc_s(sizeof(char *) * grequest->cmd_size()); ++ if (m_request->argv == nullptr) { ++ ERROR("Out of memory!"); ++ return -1; ++ } ++ for (int i = 0; i < grequest->cmd_size(); i++) { ++ m_request->argv[i] = util_strdup_s(grequest->cmd(i).c_str()); ++ } ++ m_request->argv_len = static_cast(grequest->cmd_size()); ++ } ++ ++ m_request->suffix = util_strdup_s(suffix.c_str()); ++ ++ return 0; ++} ++ ++int ExecServe::ExecuteStreamCommand(SessionData *lwsCtx) ++{ ++ auto *cb = get_service_executor(); ++ if (cb == nullptr || cb->container.exec == nullptr) { ++ ERROR("Failed to get exec service executor"); ++ sem_post(lwsCtx->syncCloseSem); + return -1; + } + + struct io_write_wrapper StdoutstringWriter = { 0 }; +- StdoutstringWriter.context = (void *)lws_ctx; ++ StdoutstringWriter.context = (void *)lwsCtx; + StdoutstringWriter.write_func = WsWriteStdoutToClient; + // the close function of StderrstringWriter is preferred unless StderrstringWriter is nullptr + StdoutstringWriter.close_func = nullptr; + struct io_write_wrapper StderrstringWriter = { 0 }; +- StderrstringWriter.context = (void *)lws_ctx; ++ StderrstringWriter.context = (void *)lwsCtx; + StderrstringWriter.write_func = WsWriteStderrToClient; + StderrstringWriter.close_func = nullptr; + +- container_exec_response *container_res = nullptr; +- int ret = cb->container.exec(container_req, &container_res, container_req->attach_stdin ? lws_ctx->pipes.at(0) : -1, +- container_req->attach_stdout ? &StdoutstringWriter : nullptr, +- container_req->attach_stderr ? &StderrstringWriter : nullptr); ++ return cb->container.exec(m_request, &m_response, m_request->attach_stdin ? lwsCtx->pipes.at(0) : -1, ++ m_request->attach_stdout ? &StdoutstringWriter : nullptr, ++ m_request->attach_stderr ? &StderrstringWriter : nullptr); ++} ++ ++void ExecServe::ErrorHandler(int ret, SessionData *lwsCtx) ++{ + if (ret != 0) { + std::string message; +- if (container_res != nullptr && container_res->errmsg != nullptr) { +- message = container_res->errmsg; ++ if (m_response != nullptr && m_response->errmsg != nullptr) { ++ message = m_response->errmsg; + } else { + message = "Failed to call exec container callback. "; + } +- WsWriteStdoutToClient(lws_ctx, message.c_str(), message.length()); ++ WsWriteStdoutToClient(lwsCtx, message.c_str(), message.length()); + } +- if (container_res != nullptr && container_res->exit_code != 0) { +- std::string exit_info = "Exit code :" + std::to_string((int)container_res->exit_code) + "\n"; +- WsWriteStdoutToClient(lws_ctx, exit_info.c_str(), exit_info.length()); ++ if (m_response != nullptr && m_response->exit_code != 0) { ++ std::string exit_info = "Exit code :" + std::to_string((int)m_response->exit_code) + "\n"; ++ WsWriteStdoutToClient(lwsCtx, exit_info.c_str(), exit_info.length()); + } +- +- free_container_exec_request(container_req); +- free_container_exec_response(container_res); +- +- closeWsConnect((void*)lws_ctx, nullptr); +- +- return ret; + } + +-int ExecServe::GetContainerRequest(const std::string &token, const std::string &suffix, +- container_exec_request **container_req) ++void ExecServe::CloseConnect(SessionData *lwsCtx) + { +- RequestCache *cache = RequestCache::GetInstance(); +- auto request = cache->ConsumeExecRequest(token); +- +- int ret = RequestFromCri(request, suffix, container_req); +- if (ret != 0) { +- ERROR("Failed to transform grpc request!"); +- } +- +- return ret; +-} +- +-int ExecServe::RequestFromCri(const runtime::v1alpha2::ExecRequest &grequest, const std::string &suffix, +- container_exec_request **request) +-{ +- container_exec_request *tmpreq = nullptr; +- +- tmpreq = (container_exec_request *)util_common_calloc_s(sizeof(container_exec_request)); +- if (tmpreq == nullptr) { +- ERROR("Out of memory"); +- return -1; +- } +- +- tmpreq->tty = grequest.tty(); +- tmpreq->attach_stdin = grequest.stdin(); +- tmpreq->attach_stdout = grequest.stdout(); +- tmpreq->attach_stderr = grequest.stderr(); +- +- if (!grequest.container_id().empty()) { +- tmpreq->container_id = util_strdup_s(grequest.container_id().c_str()); +- } +- +- if (grequest.cmd_size() > 0) { +- if ((size_t)grequest.cmd_size() > SIZE_MAX / sizeof(char *)) { +- ERROR("Too many arguments!"); +- free_container_exec_request(tmpreq); +- return -1; +- } +- tmpreq->argv = (char **)util_common_calloc_s(sizeof(char *) * grequest.cmd_size()); +- if (tmpreq->argv == nullptr) { +- ERROR("Out of memory!"); +- free_container_exec_request(tmpreq); +- return -1; +- } +- for (int i = 0; i < grequest.cmd_size(); i++) { +- tmpreq->argv[i] = util_strdup_s(grequest.cmd(i).c_str()); +- } +- tmpreq->argv_len = (size_t)grequest.cmd_size(); +- } +- +- tmpreq->suffix = util_strdup_s(suffix.c_str()); +- +- *request = tmpreq; +- return 0; ++ closeWsConnect((void*)lwsCtx, nullptr); + } +diff --git a/src/daemon/entry/cri/websocket/service/exec_serve.h b/src/daemon/entry/cri/websocket/service/exec_serve.h +index 5cccdee8..3afb2abb 100644 +--- a/src/daemon/entry/cri/websocket/service/exec_serve.h ++++ b/src/daemon/entry/cri/websocket/service/exec_serve.h +@@ -1,5 +1,5 @@ + /****************************************************************************** +- * Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved. ++ * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. + * iSulad licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: +@@ -20,28 +20,27 @@ + #include + #include + #include +-#include +-#include "api.grpc.pb.h" +-#include "container.grpc.pb.h" + + #include "route_callback_register.h" +-#include "isula_libutils/log.h" +-#include "callback.h" +-#include "ws_server.h" +-#include "request_cache.h" +-#include "api.pb.h" ++#include "isula_libutils/container_exec_request.h" ++#include "isula_libutils/container_exec_response.h" + + class ExecServe : public StreamingServeInterface { + public: + ExecServe() = default; + ExecServe(const ExecServe &) = delete; + ExecServe &operator=(const ExecServe &) = delete; +- virtual ~ExecServe() = default; +- int Execute(session_data *lws_ctx, const std::string &token) override; ++ virtual ~ExecServe(); + + private: +- int RequestFromCri(const runtime::v1alpha2::ExecRequest &grequest, const std::string &suffix, +- container_exec_request **request); +- int GetContainerRequest(const std::string &token, const std::string &suffix, container_exec_request **request); ++ virtual void SetServeThreadName() override; ++ virtual int SetContainerStreamRequest(::google::protobuf::Message *grequest, const std::string &suffix) override; ++ virtual int ExecuteStreamCommand(SessionData *lwsCtx) override; ++ virtual void ErrorHandler(int ret, SessionData *lwsCtx) override; ++ virtual void CloseConnect(SessionData *lwsCtx) override; ++ ++private: ++ container_exec_request *m_request { nullptr }; ++ container_exec_response *m_response { nullptr }; + }; + #endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_EXEC_SERVE_H +diff --git a/src/daemon/entry/cri/websocket/service/route_callback_register.cc b/src/daemon/entry/cri/websocket/service/route_callback_register.cc +new file mode 100644 +index 00000000..fb14381f +--- /dev/null ++++ b/src/daemon/entry/cri/websocket/service/route_callback_register.cc +@@ -0,0 +1,80 @@ ++/****************************************************************************** ++ * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved. ++ * iSulad licensed under the Mulan PSL v2. ++ * You can use this software according to the terms and conditions of the Mulan PSL v2. ++ * You may obtain a copy of Mulan PSL v2 at: ++ * http://license.coscl.org.cn/MulanPSL2 ++ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++ * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++ * PURPOSE. ++ * See the Mulan PSL v2 for more details. ++ * Description: Streaming service function registration. ++ * Author: wujing ++ * Create: 2021-11-04 ++ ******************************************************************************/ ++#include "route_callback_register.h" ++#include ++#include "ws_server.h" ++ ++int StreamingServeInterface::Execute(SessionData *lwsCtx, const std::string &token) ++{ ++ if (lwsCtx == nullptr) { ++ return -1; ++ } ++ ++ SetServeThreadName(); ++ ++ auto *cache = RequestCache::GetInstance(); ++ auto request = cache->ConsumeRequest(token); ++ if (request == nullptr) { ++ ERROR("Failed to get cache request!"); ++ sem_post(lwsCtx->syncCloseSem); ++ return -1; ++ } ++ ++ if (SetContainerStreamRequest(request, lwsCtx->suffix) != 0) { ++ ERROR("Failed to set container request"); ++ sem_post(lwsCtx->syncCloseSem); ++ return -1; ++ } ++ ++ // request is stored on the heap in the cache and needs to be released after use ++ delete request; ++ request = nullptr; ++ ++ int ret = ExecuteStreamCommand(lwsCtx); ++ ++ ErrorHandler(ret, lwsCtx); ++ ++ CloseConnect(lwsCtx); ++ ++ return ret; ++} ++ ++bool RouteCallbackRegister::IsValidMethod(const std::string &method) ++{ ++ return static_cast(m_registeredcallbacks.count(method)); ++} ++ ++int RouteCallbackRegister::HandleCallback(SessionData *lwsCtx, const std::string &method, const std::string &token) ++{ ++ auto it = m_registeredcallbacks.find(method); ++ if (it != m_registeredcallbacks.end()) { ++ std::shared_ptr callback = it->second; ++ if (callback) { ++ return callback->Execute(lwsCtx, token); ++ } ++ } ++ ERROR("invalid method!"); ++ return -1; ++} ++ ++void RouteCallbackRegister::RegisterCallback(const std::string &path, std::shared_ptr callback) ++{ ++ m_registeredcallbacks.insert(std::pair>(path, callback)); ++} ++ ++int StreamTask::Run() ++{ ++ return m_invoker->HandleCallback(m_lwsCtx, m_method, m_token); ++} +\ No newline at end of file +diff --git a/src/daemon/entry/cri/websocket/service/route_callback_register.h b/src/daemon/entry/cri/websocket/service/route_callback_register.h +index 909c552b..da75fc5b 100644 +--- a/src/daemon/entry/cri/websocket/service/route_callback_register.h ++++ b/src/daemon/entry/cri/websocket/service/route_callback_register.h +@@ -22,9 +22,9 @@ + #include + #include + #include +-#include "isula_libutils/log.h" ++#include "request_cache.h" + +-struct session_data; ++struct SessionData; + + class StreamingServeInterface { + public: +@@ -32,7 +32,14 @@ public: + StreamingServeInterface(const StreamingServeInterface &) = delete; + StreamingServeInterface &operator=(const StreamingServeInterface &) = delete; + virtual ~StreamingServeInterface() = default; +- virtual int Execute(session_data *lws_ctx, const std::string &token) = 0; ++ int Execute(SessionData *lwsCtx, const std::string &token); ++ ++protected: ++ virtual void SetServeThreadName() = 0; ++ virtual int SetContainerStreamRequest(::google::protobuf::Message *grequest, const std::string &suffix) = 0; ++ virtual int ExecuteStreamCommand(SessionData *lwsCtx) = 0; ++ virtual void ErrorHandler(int ret, SessionData *lwsCtx) = 0; ++ virtual void CloseConnect(SessionData *lwsCtx) = 0; + }; + + class RouteCallbackRegister { +@@ -41,30 +48,10 @@ public: + RouteCallbackRegister(const RouteCallbackRegister &) = delete; + RouteCallbackRegister &operator=(const RouteCallbackRegister &) = delete; + virtual ~RouteCallbackRegister() = default; +- bool IsValidMethod(const std::string &method) +- { +- return static_cast(m_registeredcallbacks.count(method)); +- } + +- int HandleCallback(session_data *lws_ctx, const std::string &method, +- const std::string &token) +- { +- auto it = m_registeredcallbacks.find(method); +- if (it != m_registeredcallbacks.end()) { +- std::shared_ptr callback = it->second; +- if (callback) { +- return callback->Execute(lws_ctx, token); +- } +- } +- ERROR("invalid method!"); +- return -1; +- } +- void RegisterCallback(const std::string &path, +- std::shared_ptr callback) +- { +- m_registeredcallbacks.insert(std::pair>(path, callback)); +- } ++ bool IsValidMethod(const std::string &method); ++ int HandleCallback(SessionData *lwsCtx, const std::string &method, const std::string &token); ++ void RegisterCallback(const std::string &path, std::shared_ptr callback); + + private: + std::map> m_registeredcallbacks; +@@ -72,24 +59,24 @@ private: + + class StreamTask { + public: +- StreamTask(RouteCallbackRegister *invoker, session_data *lws_ctx, +- const std::string &method, ++ StreamTask(RouteCallbackRegister *invoker, SessionData *lwsCtx, const std::string &method, + const std::string &token) +- : m_invoker(invoker), m_lws_ctx(lws_ctx), m_method(method), m_token(token) {} ++ : m_invoker(invoker) ++ , m_lwsCtx(lwsCtx) ++ , m_method(method) ++ , m_token(token) ++ { ++ } + StreamTask(const StreamTask &) = delete; + StreamTask &operator=(const StreamTask &) = delete; + virtual ~StreamTask() = default; +- int Run() +- { +- return m_invoker->HandleCallback(m_lws_ctx, m_method, m_token); +- } ++ int Run(); ++ + private: +- RouteCallbackRegister *m_invoker{ nullptr }; +- session_data *m_lws_ctx; ++ RouteCallbackRegister *m_invoker { nullptr }; ++ SessionData *m_lwsCtx; + std::string m_method; + std::string m_token; + }; + + #endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_ROUTE_CALLBACK_REGISTER_H +- +- +diff --git a/src/daemon/entry/cri/websocket/service/stream_server.cc b/src/daemon/entry/cri/websocket/service/stream_server.cc +index b4df642f..4342e006 100644 +--- a/src/daemon/entry/cri/websocket/service/stream_server.cc ++++ b/src/daemon/entry/cri/websocket/service/stream_server.cc +@@ -22,7 +22,7 @@ + + void websocket_server_init(Errors &err) + { +- WebsocketServer *server = WebsocketServer::GetInstance(); ++ auto *server = WebsocketServer::GetInstance(); + server->RegisterCallback(std::string("exec"), std::make_shared()); + server->RegisterCallback(std::string("attach"), std::make_shared()); + server->Start(err); +@@ -30,13 +30,13 @@ void websocket_server_init(Errors &err) + + void websocket_server_wait(void) + { +- WebsocketServer *server = WebsocketServer::GetInstance(); ++ auto *server = WebsocketServer::GetInstance(); + server->Wait(); + } + + void websocket_server_shutdown(void) + { +- WebsocketServer *server = WebsocketServer::GetInstance(); ++ auto *server = WebsocketServer::GetInstance(); + server->Shutdown(); + } + +diff --git a/src/daemon/entry/cri/websocket/service/stream_server.h b/src/daemon/entry/cri/websocket/service/stream_server.h +index 43e42b83..ba6b3672 100644 +--- a/src/daemon/entry/cri/websocket/service/stream_server.h ++++ b/src/daemon/entry/cri/websocket/service/stream_server.h +@@ -1,5 +1,5 @@ + /****************************************************************************** +- * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved. ++ * Copyright (c) Huawei Technologies Co., Ltd. 2018-2021. All rights reserved. + * iSulad licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: +@@ -8,7 +8,7 @@ + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. +- * Author: lifeng ++ * Author: wujing + * Create: 2018-11-08 + * Description: provide websocket stream service definition + ******************************************************************************/ +diff --git a/src/daemon/entry/cri/websocket/service/ws_server.cc b/src/daemon/entry/cri/websocket/service/ws_server.cc +index e4b3a1b4..0e462737 100644 +--- a/src/daemon/entry/cri/websocket/service/ws_server.cc ++++ b/src/daemon/entry/cri/websocket/service/ws_server.cc +@@ -1,5 +1,5 @@ + /****************************************************************************** +- * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved. ++ * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. + * iSulad licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: +@@ -8,8 +8,8 @@ + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. +- * Author: lifeng +- * Create: 2018-11-08 ++ * Author: wujing ++ * Create: 2019-01-02 + * Description: provide websocket server functions + ******************************************************************************/ + +@@ -19,34 +19,132 @@ + #include + #include + #include ++#include + #include "cxxutils.h" +-#include "isula_libutils/log.h" + #include "utils.h" + #include "request_cache.h" + #include "constants.h" + #include "isulad_config.h" + #include "callback.h" + #include "cri_helpers.h" ++#include "isula_libutils/cri_terminal_size.h" + + struct lws_context *WebsocketServer::m_context = nullptr; + std::atomic WebsocketServer::m_instance; + RWMutex WebsocketServer::m_mutex; +-std::unordered_map WebsocketServer::m_wsis; ++std::unordered_map WebsocketServer::m_wsis; ++ ++namespace { ++const int MAX_BUF_LEN = 256; ++const int MAX_HTTP_HEADER_POOL = 8; ++// io copy maximum single transfer 4K, let max total buffer size: 1GB ++const int FIFO_LIST_BUFFER_MAX_LEN = 262144; ++const int SESSION_CAPABILITY = 300; ++const int MAX_SESSION_NUM = 128; ++}; // namespace ++ ++enum WebsocketChannel { STDINCHANNEL = 0, STDOUTCHANNEL, STDERRCHANNEL, ERRORCHANNEL, RESIZECHANNEL }; ++ ++unsigned char *SessionData::FrontMessage() ++{ ++ unsigned char *message = nullptr; ++ ++ if (sessionMutex == nullptr) { ++ return nullptr; ++ } ++ ++ sessionMutex->lock(); ++ message = buffer.front(); ++ sessionMutex->unlock(); ++ ++ return message; ++} ++ ++void SessionData::PopMessage() ++{ ++ if (sessionMutex == nullptr) { ++ return; ++ } ++ ++ sessionMutex->lock(); ++ buffer.pop_front(); ++ sessionMutex->unlock(); ++} ++ ++int SessionData::PushMessage(unsigned char *message) ++{ ++ if (sessionMutex == nullptr) { ++ return -1; ++ } ++ ++ sessionMutex->lock(); ++ ++ // In extreme scenarios, websocket data cannot be processed, ++ // ignore the data coming in later to prevent iSulad from getting stuck ++ if (close || buffer.size() >= FIFO_LIST_BUFFER_MAX_LEN) { ++ free(message); ++ sessionMutex->unlock(); ++ return -1; ++ } ++ ++ buffer.push_back(message); ++ sessionMutex->unlock(); ++ return 0; ++} ++ ++bool SessionData::IsClosed() ++{ ++ bool c = false; ++ ++ if (sessionMutex == nullptr) { ++ return true; ++ } ++ ++ sessionMutex->lock(); ++ c = close; ++ sessionMutex->unlock(); ++ ++ return c; ++} ++ ++void SessionData::CloseSession() ++{ ++ if (sessionMutex == nullptr) { ++ return; ++ } ++ ++ sessionMutex->lock(); ++ close = true; ++ sessionMutex->unlock(); ++} ++ ++void SessionData::EraseAllMessage() ++{ ++ if (sessionMutex == nullptr) { ++ return; ++ } ++ ++ sessionMutex->lock(); ++ for (auto iter = buffer.begin(); iter != buffer.end();) { ++ free(*iter); ++ *iter = NULL; ++ iter = buffer.erase(iter); ++ } ++ sessionMutex->unlock(); ++} + + WebsocketServer *WebsocketServer::GetInstance() noexcept + { + static std::once_flag flag; + +- std::call_once(flag, [] { +- m_instance = new WebsocketServer; +- }); ++ std::call_once(flag, [] { m_instance = new WebsocketServer; }); + + return m_instance; + } + + WebsocketServer::WebsocketServer() + { +- m_force_exit = 0; ++ m_forceExit = 0; + m_wsis.reserve(SESSION_CAPABILITY); + } + +@@ -60,19 +158,9 @@ url::URLDatum WebsocketServer::GetWebsocketUrl() + return m_url; + } + +-void WebsocketServer::ReadLockAllWsSession() +-{ +- m_mutex.rdlock(); +-} +- +-void WebsocketServer::UnlockAllWsSession() +-{ +- m_mutex.unlock(); +-} +- + void WebsocketServer::Shutdown() + { +- m_force_exit = 1; ++ m_forceExit = 1; + lws_cancel_service(m_context); + } + +@@ -99,15 +187,12 @@ void WebsocketServer::EmitLog(int level, const char *line) + + int WebsocketServer::CreateContext() + { +- int limited; +- struct lws_context_creation_info info; +- struct rlimit oldLimit, newLimit; +- const size_t WS_ULIMIT_FDS = 1024; ++ const size_t WS_ULIMIT_FDS { 1024 }; + + m_url.SetScheme("ws"); + m_url.SetHost("localhost:" + std::to_string(m_listenPort)); + +- (void)memset(&info, 0, sizeof(info)); ++ lws_context_creation_info info { 0x00 }; + lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_DEBUG, WebsocketServer::EmitLog); + + info.port = m_listenPort; +@@ -125,9 +210,10 @@ int WebsocketServer::CreateContext() + * belowing lws_create_context limit the fds of websocket to RLIMIT_NOFILE, + * and malloced memory according to it. To reduce memory, we recover it to 1024 before create m_context. + */ ++ rlimit oldLimit, newLimit; + newLimit.rlim_cur = WS_ULIMIT_FDS; + newLimit.rlim_max = WS_ULIMIT_FDS; +- limited = prlimit(0, RLIMIT_NOFILE, &newLimit, &oldLimit); ++ int limited = prlimit(0, RLIMIT_NOFILE, &newLimit, &oldLimit); + if (limited != 0) { + WARN("Can not set ulimit of RLIMIT_NOFILE: %s", strerror(errno)); + } +@@ -145,8 +231,7 @@ int WebsocketServer::CreateContext() + return 0; + } + +-void WebsocketServer::RegisterCallback(const std::string &path, +- std::shared_ptr callback) ++void WebsocketServer::RegisterCallback(const std::string &path, std::shared_ptr callback) + { + m_handler.RegisterCallback(path, callback); + } +@@ -158,8 +243,8 @@ void WebsocketServer::CloseAllWsSession() + it->second->EraseAllMessage(); + close(it->second->pipes.at(0)); + close(it->second->pipes.at(1)); +- (void)sem_destroy(it->second->sync_close_sem); +- delete it->second->session_mutex; ++ (void)sem_destroy(it->second->syncCloseSem); ++ delete it->second->sessionMutex; + delete it->second; + } + m_wsis.clear(); +@@ -189,23 +274,23 @@ void WebsocketServer::CloseWsSession(int socketID) + close(session->pipes.at(1)); + session->pipes.at(1) = -1; + } +- (void)sem_wait(session->sync_close_sem); +- (void)sem_destroy(session->sync_close_sem); +- delete session->sync_close_sem; +- session->sync_close_sem = nullptr; ++ (void)sem_wait(session->syncCloseSem); ++ (void)sem_destroy(session->syncCloseSem); ++ delete session->syncCloseSem; ++ session->syncCloseSem = nullptr; + close(session->pipes.at(0)); +- delete session->session_mutex; +- session->session_mutex = nullptr; ++ delete session->sessionMutex; ++ session->sessionMutex = nullptr; + delete session; + }).detach(); + } + +-int WebsocketServer::GenerateSessionData(session_data *session, const std::string containerID) noexcept ++int WebsocketServer::GenerateSessionData(SessionData *session, const std::string containerID) noexcept + { + char *suffix = nullptr; +- int read_pipe_fd[PIPE_FD_NUM] = {-1, -1}; +- std::mutex *buf_mutex = nullptr; +- sem_t *sync_close_sem = nullptr; ++ int readPipeFd[2] = { -1, -1 }; ++ std::mutex *bufMutex = nullptr; ++ sem_t *syncCloseSem = nullptr; + + suffix = CRIHelpers::GenerateExecSuffix(); + if (suffix == nullptr) { +@@ -213,24 +298,24 @@ int WebsocketServer::GenerateSessionData(session_data *session, const std::strin + return -1; + } + +- if (InitRWPipe(read_pipe_fd) < 0) { ++ if (InitRWPipe(readPipeFd) < 0) { + ERROR("failed to init read/write pipe!"); + goto out; + } + +- buf_mutex = new std::mutex; +- sync_close_sem = new sem_t; ++ bufMutex = new std::mutex; ++ syncCloseSem = new sem_t; + +- if (sem_init(sync_close_sem, 0, 0) != 0) { ++ if (sem_init(syncCloseSem, 0, 0) != 0) { + ERROR("Semaphore initialization failed"); + goto out; + } + +- session->pipes = std::array { read_pipe_fd[0], read_pipe_fd[1] }; +- session->session_mutex = buf_mutex; +- session->sync_close_sem = sync_close_sem; ++ session->pipes = std::array { readPipeFd[0], readPipeFd[1] }; ++ session->sessionMutex = bufMutex; ++ session->syncCloseSem = syncCloseSem; + session->close = false; +- session->container_id = containerID; ++ session->containerID = containerID; + session->suffix = std::string(suffix); + + free(suffix); +@@ -241,17 +326,17 @@ out: + if (suffix != nullptr) { + free(suffix); + } +- if (read_pipe_fd[1] >= 0) { +- close(read_pipe_fd[1]); ++ if (readPipeFd[1] >= 0) { ++ close(readPipeFd[1]); + } +- if (read_pipe_fd[0] >= 0) { +- close(read_pipe_fd[0]); ++ if (readPipeFd[0] >= 0) { ++ close(readPipeFd[0]); + } +- if (buf_mutex != nullptr) { +- delete buf_mutex; ++ if (bufMutex != nullptr) { ++ delete bufMutex; + } +- if (sync_close_sem) { +- delete sync_close_sem; ++ if (syncCloseSem) { ++ delete syncCloseSem; + } + + return -1; +@@ -269,10 +354,9 @@ int WebsocketServer::RegisterStreamTask(struct lws *wsi) noexcept + buf[sizeof(buf) - 1] = '\0'; + // format: "/cri/" + method + "/" + token + "/" + arg(container=cmd) + auto vec = CXXUtils::Split(buf + 1, '/'); +- RequestCache *cache = RequestCache::GetInstance(); +- if (vec.size() < MIN_VEC_SIZE || +- !m_handler.IsValidMethod(vec.at(1)) || +- !cache->IsValidToken(vec.at(2))) { ++ auto *cache = RequestCache::GetInstance(); ++ // buffer contains at least 3 parts: cri, method, token ++ if (vec.size() < 3 || !m_handler.IsValidMethod(vec.at(1)) || !cache->IsValidToken(vec.at(2))) { + ERROR("invalid url(%s): incorrect format!", buf); + return -1; + } +@@ -288,13 +372,13 @@ int WebsocketServer::RegisterStreamTask(struct lws *wsi) noexcept + return -1; + } + +- std::string containerID = cache->GetContainerIDByToken(vec.at(1), vec.at(2)); ++ auto containerID = cache->GetContainerIDByToken(vec.at(2)); + if (containerID.empty()) { + ERROR("Failed to get container id from %s request", vec.at(1).c_str()); + return -1; + } + +- session_data *session = new (std::nothrow) session_data; ++ auto *session = new (std::nothrow) SessionData; + if (session == nullptr) { + ERROR("Out of memory"); + return -1; +@@ -304,7 +388,7 @@ int WebsocketServer::RegisterStreamTask(struct lws *wsi) noexcept + return -1; + } + +- std::string suffixID = session->suffix; ++ auto suffixID = session->suffix; + auto insertRet = m_wsis.insert(std::make_pair(socketID, session)); + if (!insertRet.second) { + ERROR("failed to insert session data to map"); +@@ -351,8 +435,8 @@ int WebsocketServer::Wswrite(struct lws *wsi, const unsigned char *message) + if (strlen((const char *)(&message[LWS_PRE + 1])) == 0) { + return 0; + } +- int n = lws_write(wsi, (unsigned char *)(&message[LWS_PRE]), +- strlen((const char *)(&message[LWS_PRE + 1])) + 1, LWS_WRITE_TEXT); ++ auto n = lws_write(wsi, (unsigned char *)(&message[LWS_PRE]), strlen((const char *)(&message[LWS_PRE + 1])) + 1, ++ LWS_WRITE_TEXT); + if (n < 0) { + ERROR("ERROR %d writing to socket, hanging up", n); + return -1; +@@ -362,21 +446,18 @@ int WebsocketServer::Wswrite(struct lws *wsi, const unsigned char *message) + return 0; + } + +-int WebsocketServer::parseTerminalSize(const char *jsonData, size_t len, uint16_t &width, uint16_t &height) ++int WebsocketServer::ParseTerminalSize(const char *jsonData, size_t len, uint16_t &width, uint16_t &height) + { +- int ret = 0; +- parser_error err = nullptr; +- cri_terminal_size *terminalSize = nullptr; +- + if (jsonData == nullptr || len == 0) { + return -1; + } + + // No terminator is included in json data, and len contains a character occupied by channal + std::string jsonDataStr { jsonData, len - 1 }; +- ++ parser_error err = nullptr; ++ int ret = 0; + // parse json data. eg: {"Width":xx,"Height":xx} +- terminalSize = cri_terminal_size_parse_data(jsonDataStr.c_str(), nullptr, &err); ++ cri_terminal_size *terminalSize = cri_terminal_size_parse_data(jsonDataStr.c_str(), nullptr, &err); + if (terminalSize == nullptr) { + ERROR("Failed to parse json: %s", err); + ret = -1; +@@ -391,29 +472,22 @@ int WebsocketServer::parseTerminalSize(const char *jsonData, size_t len, uint16_ + return ret; + } + +-int WebsocketServer::ResizeTerminal( +- int socketID, const char *jsonData, size_t len, +- const std::string &containerID, +- const std::string &suffix) ++int WebsocketServer::ResizeTerminal(int socketID, const char *jsonData, size_t len, const std::string &containerID, ++ const std::string &suffix) + { +- int ret; +- service_executor_t *cb = nullptr; +- struct isulad_container_resize_request *req = nullptr; +- struct isulad_container_resize_response *res = nullptr; +- uint16_t width = 0; +- uint16_t height = 0; +- +- cb = get_service_executor(); ++ auto *cb = get_service_executor(); + if (cb == nullptr || cb->container.resize == nullptr) { + return -1; + } + +- if (parseTerminalSize(jsonData, len, width, height) != 0) { ++ uint16_t width = 0; ++ uint16_t height = 0; ++ if (ParseTerminalSize(jsonData, len, width, height) != 0) { + return -1; + } + +- req = (struct isulad_container_resize_request *)util_common_calloc_s( +- sizeof(struct isulad_container_resize_request)); ++ auto *req = static_cast( ++ util_common_calloc_s(sizeof(struct isulad_container_resize_request))); + if (req == nullptr) { + ERROR("Out of memory"); + return -1; +@@ -424,12 +498,12 @@ int WebsocketServer::ResizeTerminal( + req->height = height; + req->width = width; + +- ret = cb->container.resize(req, &res); ++ struct isulad_container_resize_response *res = nullptr; ++ int ret = cb->container.resize(req, &res); + + isulad_container_resize_request_free(req); + isulad_container_resize_response_free(res); + +- + return ret; + } + +@@ -442,9 +516,7 @@ void WebsocketServer::Receive(int socketID, void *in, size_t len) + } + + if (*static_cast(in) == WebsocketChannel::RESIZECHANNEL) { +- std::string containerID = it->second->container_id; +- std::string suffix = it->second->suffix; +- if (ResizeTerminal(socketID, (char *)in + 1, len, containerID, suffix) != 0) { ++ if (ResizeTerminal(socketID, (char *)in + 1, len, it->second->containerID, it->second->suffix) != 0) { + ERROR("Failed to resize terminal tty"); + return; + } +@@ -459,8 +531,7 @@ void WebsocketServer::Receive(int socketID, void *in, size_t len) + } + } + +-int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, +- void *user, void *in, size_t len) ++int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) + { + switch (reason) { + case LWS_CALLBACK_HTTP: +@@ -493,11 +564,11 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, + return -1; + } + +- auto isSessionClosed = it->second->IsClosed(); ++ auto sessionClosed = it->second->IsClosed(); + while (!it->second->buffer.empty()) { +- unsigned char *message = it->second->FrontMessage(); ++ auto *message = it->second->FrontMessage(); + // send success! free it and erase for list +- if (WebsocketServer::GetInstance()->Wswrite(wsi, (const unsigned char *)message) == 0) { ++ if (WebsocketServer::GetInstance()->Wswrite(wsi, const_cast(message)) == 0) { + free(message); + it->second->PopMessage(); + } else { +@@ -508,7 +579,7 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, + } + + // avoid: push message to buffer and set closed true +- if (isSessionClosed) { ++ if (sessionClosed) { + DEBUG("websocket session disconnected"); + return -1; + } +@@ -517,7 +588,7 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, + break; + case LWS_CALLBACK_RECEIVE: { + ReadGuard lock(m_mutex); +- WebsocketServer::GetInstance()->Receive(lws_get_socket_fd(wsi), (char *)in, len); ++ WebsocketServer::GetInstance()->Receive(lws_get_socket_fd(wsi), static_cast(in), len); + } + break; + case LWS_CALLBACK_CLOSED: { +@@ -538,7 +609,7 @@ void WebsocketServer::ServiceWorkThread(int threadid) + + prctl(PR_SET_NAME, "WebsocketServer"); + +- while (n >= 0 && !m_force_exit) { ++ while (n >= 0 && !m_forceExit) { + n = lws_service(m_context, 0); + } + } +@@ -553,16 +624,17 @@ void WebsocketServer::Start(Errors &err) + + if (CreateContext() < 0) { + err.SetError("Websocket server start failed! please check your network status" +- "(eg: port " + std::to_string(m_listenPort) + " is occupied)"); ++ "(eg: port " + ++ std::to_string(m_listenPort) + " is occupied)"); + return; + } +- m_pthread_service = std::thread(&WebsocketServer::ServiceWorkThread, this, 0); ++ m_pthreadService = std::thread(&WebsocketServer::ServiceWorkThread, this, 0); + } + + void WebsocketServer::Wait() + { +- if (m_pthread_service.joinable()) { +- m_pthread_service.join(); ++ if (m_pthreadService.joinable()) { ++ m_pthreadService.join(); + } + + CloseAllWsSession(); +@@ -571,19 +643,17 @@ void WebsocketServer::Wait() + } + + namespace { +- +-void DoWriteToClient(session_data *session, +- const void *data, size_t len, WebsocketChannel channel) ++void DoWriteToClient(SessionData *session, const void *data, size_t len, WebsocketChannel channel) + { +- unsigned char *buf = (unsigned char *)util_common_calloc_s(LWS_PRE + MAX_BUFFER_SIZE + 1); ++ auto *buf = static_cast(util_common_calloc_s(LWS_PRE + MAX_BUFFER_SIZE + 1)); + if (buf == nullptr) { + ERROR("Out of memory"); + return; + } + // Determine if it is standard output channel or error channel +- buf[LWS_PRE] = channel; ++ buf[LWS_PRE] = static_cast(channel); + +- (void)memcpy(&buf[LWS_PRE + 1], (void *)data, len); ++ (void)memcpy(&buf[LWS_PRE + 1], const_cast(data), len); + + // push back to message list + if (session->PushMessage(buf) != 0) { +@@ -594,7 +664,7 @@ void DoWriteToClient(session_data *session, + + ssize_t WsWriteToClient(void *context, const void *data, size_t len, WebsocketChannel channel) + { +- auto *lwsCtx = static_cast(context); ++ auto *lwsCtx = static_cast(context); + + // CloseWsSession wait IOCopy finished, and then delete session in m_wsis + // So don't need rdlock m_wsis here +@@ -605,7 +675,7 @@ ssize_t WsWriteToClient(void *context, const void *data, size_t len, WebsocketCh + DoWriteToClient(lwsCtx, data, len, channel); + return static_cast(len); + } +-}; ++}; // namespace + + ssize_t WsWriteStdoutToClient(void *context, const void *data, size_t len) + { +@@ -636,12 +706,12 @@ int closeWsConnect(void *context, char **err) + return -1; + } + +- auto *lwsCtx = static_cast(context); ++ auto *lwsCtx = static_cast(context); + + lwsCtx->CloseSession(); + +- if (lwsCtx->sync_close_sem != nullptr) { +- (void)sem_post(lwsCtx->sync_close_sem); ++ if (lwsCtx->syncCloseSem != nullptr) { ++ (void)sem_post(lwsCtx->syncCloseSem); + } + + return 0; +diff --git a/src/daemon/entry/cri/websocket/service/ws_server.h b/src/daemon/entry/cri/websocket/service/ws_server.h +index 3ab8e22f..2d3bb4a7 100644 +--- a/src/daemon/entry/cri/websocket/service/ws_server.h ++++ b/src/daemon/entry/cri/websocket/service/ws_server.h +@@ -1,5 +1,5 @@ + /****************************************************************************** +- * Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved. ++ * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. + * iSulad licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: +@@ -17,7 +17,6 @@ + #define DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_WS_SERVER_H + #include + #include +-#include + #include + #include + #include +@@ -29,140 +28,40 @@ + #include "url.h" + #include "errors.h" + #include "read_write_lock.h" +-#include "isula_libutils/cri_terminal_size.h" + +-#define MAX_ECHO_PAYLOAD 4096 +-#define MAX_ARRAY_LEN 2 +-#define MAX_BUF_LEN 256 +-#define MAX_PROTOCOL_NUM 2 +-#define MAX_HTTP_HEADER_POOL 8 +-#define MIN_VEC_SIZE 3 +-#define PIPE_FD_NUM 2 +-#define BUF_BASE_SIZE 1024 +-#define LWS_TIMEOUT 50 +-// io copy maximum single transfer 4K, let max total buffer size: 1GB +-#define FIFO_LIST_BUFFER_MAX_LEN 262144 +-#define SESSION_CAPABILITY 300 +-#define MAX_SESSION_NUM 128 ++namespace ++{ ++const int MAX_ECHO_PAYLOAD = 4096; ++const int MAX_ARRAY_LEN = 2; ++const int MAX_PROTOCOL_NUM = 2; ++}; // namespace + +-enum WebsocketChannel { +- STDINCHANNEL = 0, +- STDOUTCHANNEL, +- STDERRCHANNEL, +- ERRORCHANNEL, +- RESIZECHANNEL +-}; +- +-struct session_data { ++struct SessionData { + std::array pipes; + volatile bool close; +- std::mutex *session_mutex; +- sem_t *sync_close_sem; ++ std::mutex *sessionMutex; ++ sem_t *syncCloseSem; + std::list buffer; +- std::string container_id; ++ std::string containerID; + std::string suffix; + +- unsigned char *FrontMessage() +- { +- unsigned char *message = nullptr; +- +- if (session_mutex == nullptr) { +- return nullptr; +- } +- +- session_mutex->lock(); +- message = buffer.front(); +- session_mutex->unlock(); +- +- return message; +- } +- +- void PopMessage() +- { +- if (session_mutex == nullptr) { +- return; +- } +- +- session_mutex->lock(); +- buffer.pop_front(); +- session_mutex->unlock(); +- } +- +- int PushMessage(unsigned char *message) +- { +- if (session_mutex == nullptr) { +- return -1; +- } +- +- session_mutex->lock(); +- +- // In extreme scenarios, websocket data cannot be processed, +- // ignore the data coming in later to prevent iSulad from getting stuck +- if (close || buffer.size() >= FIFO_LIST_BUFFER_MAX_LEN) { +- free(message); +- session_mutex->unlock(); +- return -1; +- } +- +- buffer.push_back(message); +- session_mutex->unlock(); +- return 0; +- } +- +- bool IsClosed() +- { +- bool c = false; +- +- if (session_mutex == nullptr) { +- return true; +- } +- +- session_mutex->lock(); +- c = close; +- session_mutex->unlock(); +- +- return c; +- } +- +- void CloseSession() +- { +- if (session_mutex == nullptr) { +- return; +- } +- +- session_mutex->lock(); +- close = true; +- session_mutex->unlock(); +- } +- +- void EraseAllMessage() +- { +- if (session_mutex == nullptr) { +- return; +- } +- +- session_mutex->lock(); +- for (auto iter = buffer.begin(); iter != buffer.end();) { +- free(*iter); +- *iter = NULL; +- iter = buffer.erase(iter); +- } +- session_mutex->unlock(); +- } ++ unsigned char *FrontMessage(); ++ void PopMessage(); ++ int PushMessage(unsigned char *message); ++ bool IsClosed(); ++ void CloseSession(); ++ void EraseAllMessage(); + }; + + class WebsocketServer { + public: + static WebsocketServer *GetInstance() noexcept; +- static std::atomic m_instance; + void Start(Errors &err); + void Wait(); + void Shutdown(); + void RegisterCallback(const std::string &path, std::shared_ptr callback); + url::URLDatum GetWebsocketUrl(); + void SetLwsSendedFlag(int socketID, bool sended); +- void ReadLockAllWsSession(); +- void UnlockAllWsSession(); + + private: + WebsocketServer(); +@@ -171,33 +70,41 @@ private: + virtual ~WebsocketServer(); + int InitRWPipe(int read_fifo[]); + std::vector split(std::string str, char r); +- static void EmitLog(int level, const char *line); ++ + int CreateContext(); + inline void Receive(int socketID, void *in, size_t len); +- int Wswrite(struct lws *wsi, const unsigned char *message); ++ int Wswrite(struct lws *wsi, const unsigned char *message); + inline void DumpHandshakeInfo(struct lws *wsi) noexcept; + int RegisterStreamTask(struct lws *wsi) noexcept; +- int GenerateSessionData(session_data *session, const std::string containerID) noexcept; +- static int Callback(struct lws *wsi, enum lws_callback_reasons reason, +- void *user, void *in, size_t len); ++ int GenerateSessionData(SessionData *session, const std::string containerID) noexcept; + void ServiceWorkThread(int threadid); + void CloseWsSession(int socketID); + void CloseAllWsSession(); +- int ResizeTerminal(int socketID, const char *jsonData, size_t len, +- const std::string &containerID, const std::string &suffix); +- int parseTerminalSize(const char *jsonData, size_t len, uint16_t &width, uint16_t &height); ++ int ResizeTerminal(int socketID, const char *jsonData, size_t len, const std::string &containerID, ++ const std::string &suffix); ++ int ParseTerminalSize(const char *jsonData, size_t len, uint16_t &width, uint16_t &height); ++ ++private: ++ // redirect libwebsockets logs to iSulad ++ static void EmitLog(int level, const char *line); ++ // libwebsockets Callback function ++ static int Callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); + + private: ++ static std::atomic m_instance; + static RWMutex m_mutex; + static struct lws_context *m_context; +- volatile int m_force_exit = 0; +- std::thread m_pthread_service; +- const struct lws_protocols m_protocols[MAX_PROTOCOL_NUM] = { +- { "channel.k8s.io", Callback, 0, MAX_ECHO_PAYLOAD, }, +- { nullptr, nullptr, 0, 0 } +- }; ++ volatile int m_forceExit = 0; ++ std::thread m_pthreadService; ++ const struct lws_protocols m_protocols[MAX_PROTOCOL_NUM] = { { ++ "channel.k8s.io", ++ Callback, ++ 0, ++ MAX_ECHO_PAYLOAD, ++ }, ++ { nullptr, nullptr, 0, 0 } }; + RouteCallbackRegister m_handler; +- static std::unordered_map m_wsis; ++ static std::unordered_map m_wsis; + url::URLDatum m_url; + int m_listenPort; + }; +@@ -207,4 +114,3 @@ ssize_t WsWriteStderrToClient(void *context, const void *data, size_t len); + int closeWsConnect(void *context, char **err); + + #endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_WS_SERVER_H +- +-- +2.25.1 + diff --git a/0021-Fixed-a-bug-that-occurs-when-starting-container-in-h.patch b/0021-Fixed-a-bug-that-occurs-when-starting-container-in-h.patch new file mode 100644 index 0000000..66a2f6e --- /dev/null +++ b/0021-Fixed-a-bug-that-occurs-when-starting-container-in-h.patch @@ -0,0 +1,36 @@ +From fcc132e592ba1f9c427e02ef6f930eb208a6ebca Mon Sep 17 00:00:00 2001 +From: chengzrz +Date: Thu, 9 Dec 2021 14:56:39 +0800 +Subject: [PATCH] Fixed a bug that occurs when starting container in host mode + +Signed-off-by: chengzrz +--- + src/daemon/modules/spec/specs_namespace.c | 5 ++++- + 1 file changed, 4 insertions(+), 1 deletion(-) + +diff --git a/src/daemon/modules/spec/specs_namespace.c b/src/daemon/modules/spec/specs_namespace.c +index eea0b3ff..e9f98d00 100644 +--- a/src/daemon/modules/spec/specs_namespace.c ++++ b/src/daemon/modules/spec/specs_namespace.c +@@ -156,7 +156,7 @@ static int handle_get_path_from_host(const host_config *host_spec, + const container_config_v2_common_config_network_settings *network_settings, + const char *type, char **dest_path) + { +- *dest_path = namespace_get_host_namespace_path(host_spec->network_mode); ++ *dest_path = namespace_get_host_namespace_path(type); + if (*dest_path == NULL) { + return -1; + } +@@ -209,6 +209,9 @@ int get_network_namespace_path(const host_config *host_spec, + for (index = 0; index < jump_table_size; ++index) { + if (strncmp(network_mode, handler_jump_table[index].mode, strlen(handler_jump_table[index].mode)) == 0) { + ret = handler_jump_table[index].handle(host_spec, network_settings, type, dest_path); ++ if (ret != 0) { ++ ERROR("Failed to get ns path, network mode is %s, type is %s", network_mode, type); ++ } + return ret; + } + } +-- +2.25.1 + diff --git a/iSulad.spec b/iSulad.spec index 91b95fa..a6cfee5 100644 --- a/iSulad.spec +++ b/iSulad.spec @@ -1,5 +1,5 @@ %global _version 2.0.10 -%global _release 10 +%global _release 11 %global is_systemd 1 %global enable_shimv2 1 %global is_embedded 1 @@ -28,6 +28,12 @@ Patch0012: 0012-print-valgrind-log.patch Patch0013: 0013-fix-cri-version-memory-leak.patch Patch0014: 0014-fix-undefined-reference-in-libisulad_img.so.patch Patch0015: 0015-fix-undefined-reference-to-service_arguments_free-in.patch +Patch0016: 0016-fix-mem-leak.patch +Patch0017: 0017-isula-pull-does-not-support-format-name-digest.patch +Patch0018: 0018-Fixed-dangerous-memory-operations.patch +Patch0019: 0019-add-pull-request-gateway-checker-for-build-and-ut.patch +Patch0020: 0020-Optimize-websocket-streaming-service-code.patch +Patch0021: 0021-Fixed-a-bug-that-occurs-when-starting-container-in-h.patch %ifarch x86_64 aarch64 Provides: libhttpclient.so()(64bit) @@ -252,6 +258,12 @@ fi %endif %changelog +* Thu Dec 09 2021 chengzeruizhi - 2.0.10-11 +- Type: bugfix +- ID: NA +- SUG: NA +- DESC: fixed a bug that occurs when starting a container in host mode + * Thu Dec 09 2021 wangfengtu - 2.0.10-10 - Type: bugfix - ID: NA