iSulad/0020-Optimize-websocket-streaming-service-code.patch
chengzrz 8e72e1b4c4 Fixed a bug that occurs when running a container in host mode
Signed-off-by: chengzrz <czrzrichard@gmail.com>
2021-12-09 16:10:35 +08:00

1902 lines
69 KiB
Diff

From 3d5ad2160b9fe779433cce497bfa0cb0146bdcb3 Mon Sep 17 00:00:00 2001
From: wujing <wujing50@huawei.com>
Date: Sat, 4 Dec 2021 21:20:25 +0800
Subject: [PATCH] Optimize websocket streaming service code
Signed-off-by: wujing <wujing50@huawei.com>
---
.../cri/cri_container_manager_service_impl.cc | 28 +-
src/daemon/entry/cri/request_cache.cc | 123 ++-----
src/daemon/entry/cri/request_cache.h | 32 +-
.../cri/websocket/service/attach_serve.cc | 102 +++---
.../cri/websocket/service/attach_serve.h | 28 +-
.../entry/cri/websocket/service/exec_serve.cc | 158 +++++----
.../entry/cri/websocket/service/exec_serve.h | 27 +-
.../service/route_callback_register.cc | 80 +++++
.../service/route_callback_register.h | 61 ++--
.../cri/websocket/service/stream_server.cc | 6 +-
.../cri/websocket/service/stream_server.h | 4 +-
.../entry/cri/websocket/service/ws_server.cc | 300 +++++++++++-------
.../entry/cri/websocket/service/ws_server.h | 174 +++-------
13 files changed, 542 insertions(+), 581 deletions(-)
create mode 100644 src/daemon/entry/cri/websocket/service/route_callback_register.cc
diff --git a/src/daemon/entry/cri/cri_container_manager_service_impl.cc b/src/daemon/entry/cri/cri_container_manager_service_impl.cc
index 2e65ab51..b160ce31 100644
--- a/src/daemon/entry/cri/cri_container_manager_service_impl.cc
+++ b/src/daemon/entry/cri/cri_container_manager_service_impl.cc
@@ -376,7 +376,8 @@ ContainerManagerServiceImpl::GenerateCreateContainerRequest(const std::string &r
hostconfig->cgroup_parent = util_strdup_s(podSandboxConfig.linux().cgroup_parent().c_str());
}
- custom_config = GenerateCreateContainerCustomConfig(cname, realPodSandboxID, containerConfig, podSandboxConfig, error);
+ custom_config =
+ GenerateCreateContainerCustomConfig(cname, realPodSandboxID, containerConfig, podSandboxConfig, error);
if (error.NotEmpty()) {
goto cleanup;
}
@@ -409,11 +410,10 @@ cleanup:
return request;
}
-std::string ContainerManagerServiceImpl::CreateContainer(
- const std::string &podSandboxID,
- const runtime::v1alpha2::ContainerConfig &containerConfig,
- const runtime::v1alpha2::PodSandboxConfig &podSandboxConfig,
- Errors &error)
+std::string ContainerManagerServiceImpl::CreateContainer(const std::string &podSandboxID,
+ const runtime::v1alpha2::ContainerConfig &containerConfig,
+ const runtime::v1alpha2::PodSandboxConfig &podSandboxConfig,
+ Errors &error)
{
std::string response_id;
std::string podSandboxRuntime;
@@ -1305,10 +1305,16 @@ void ContainerManagerServiceImpl::Exec(const runtime::v1alpha2::ExecRequest &req
if (ValidateExecRequest(req, error) != 0) {
return;
}
+ auto execReq = new (std::nothrow) runtime::v1alpha2::ExecRequest(req);
+ if (execReq == nullptr) {
+ error.SetError("out of memory");
+ return;
+ }
RequestCache *cache = RequestCache::GetInstance();
- std::string token = cache->InsertExecRequest(req);
+ std::string token = cache->InsertRequest(req.container_id(), execReq);
if (token.empty()) {
error.SetError("failed to get a unique token!");
+ delete execReq;
return;
}
std::string url = BuildURL("exec", token);
@@ -1350,10 +1356,16 @@ void ContainerManagerServiceImpl::Attach(const runtime::v1alpha2::AttachRequest
error.SetError("Empty attach response arguments");
return;
}
+ auto attachReq = new (std::nothrow) runtime::v1alpha2::AttachRequest(req);
+ if (attachReq == nullptr) {
+ error.SetError("out of memory");
+ return;
+ }
RequestCache *cache = RequestCache::GetInstance();
- std::string token = cache->InsertAttachRequest(req);
+ std::string token = cache->InsertRequest(req.container_id(), attachReq);
if (token.empty()) {
error.SetError("failed to get a unique token!");
+ delete attachReq;
return;
}
std::string url = BuildURL("attach", token);
diff --git a/src/daemon/entry/cri/request_cache.cc b/src/daemon/entry/cri/request_cache.cc
index 4ff284ab..312a8071 100644
--- a/src/daemon/entry/cri/request_cache.cc
+++ b/src/daemon/entry/cri/request_cache.cc
@@ -1,5 +1,5 @@
/******************************************************************************
- * Copyright (c) Huawei Technologies Co., Ltd. 2017-2019. All rights reserved.
+ * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved.
* iSulad licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
@@ -8,8 +8,8 @@
* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
* PURPOSE.
* See the Mulan PSL v2 for more details.
- * Author: tanyifeng
- * Create: 2017-11-22
+ * Author: wujing
+ * Create: 2019-01-02
* Description: provide request cache function definition
*********************************************************************************/
#include "request_cache.h"
@@ -19,15 +19,25 @@
#include <thread>
#include <mutex>
#include <cmath>
-#include "isula_libutils/log.h"
+#include <isula_libutils/log.h>
#include "utils.h"
#include "utils_base64.h"
std::atomic<RequestCache *> RequestCache::m_instance;
std::mutex RequestCache::m_mutex;
+
+void CacheEntry::SetValue(const std::string &t, const std::string &id, ::google::protobuf::Message *request,
+ std::chrono::system_clock::time_point et)
+{
+ token = t;
+ containerID = id;
+ req = request;
+ expireTime = et;
+}
+
RequestCache *RequestCache::GetInstance() noexcept
{
- RequestCache *cache = m_instance.load(std::memory_order_relaxed);
+ auto *cache = m_instance.load(std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_acquire);
if (cache == nullptr) {
std::lock_guard<std::mutex> lock(m_mutex);
@@ -41,25 +51,7 @@ RequestCache *RequestCache::GetInstance() noexcept
return cache;
}
-std::string RequestCache::InsertExecRequest(const runtime::v1alpha2::ExecRequest &req)
-{
- std::lock_guard<std::mutex> lock(m_mutex);
- // Remove expired entries.
- GarbageCollection();
- // If the cache is full, reject the request.
- if (m_ll.size() == MaxInFlight) {
- ERROR("too many cache in flight!");
- return "";
- }
- auto token = UniqueToken();
- CacheEntry tmp;
- tmp.SetValue(token, &req, nullptr, std::chrono::system_clock::now() + std::chrono::minutes(1));
- m_ll.push_front(tmp);
- m_tokens.insert(std::make_pair(token, tmp));
- return token;
-}
-
-std::string RequestCache::InsertAttachRequest(const runtime::v1alpha2::AttachRequest &req)
+std::string RequestCache::InsertRequest(const std::string &containerID, ::google::protobuf::Message *req)
{
std::lock_guard<std::mutex> lock(m_mutex);
// Remove expired entries.
@@ -71,7 +63,7 @@ std::string RequestCache::InsertAttachRequest(const runtime::v1alpha2::AttachReq
}
auto token = UniqueToken();
CacheEntry tmp;
- tmp.SetValue(token, nullptr, &req, std::chrono::system_clock::now() + std::chrono::minutes(1));
+ tmp.SetValue(token, containerID, req, std::chrono::system_clock::now() + std::chrono::minutes(1));
m_ll.push_front(tmp);
m_tokens.insert(std::make_pair(token, tmp));
return token;
@@ -81,10 +73,14 @@ void RequestCache::GarbageCollection()
{
auto now = std::chrono::system_clock::now();
while (!m_ll.empty()) {
- CacheEntry oldest = m_ll.back();
+ auto oldest = m_ll.back();
if (now < oldest.expireTime) {
return;
}
+ if (oldest.req != nullptr) {
+ delete oldest.req;
+ oldest.req = nullptr;
+ }
m_ll.pop_back();
m_tokens.erase(oldest.token);
}
@@ -103,15 +99,15 @@ std::string RequestCache::UniqueToken()
continue;
}
- char *b64_encode_buf = nullptr;
- if (util_base64_encode((unsigned char *)rawToken, strlen(rawToken), &b64_encode_buf) < 0) {
+ char *b64EncodeBuf = nullptr;
+ if (util_base64_encode((unsigned char *)rawToken, strlen(rawToken), &b64EncodeBuf) < 0) {
ERROR("Encode raw token to base64 failed");
continue;
}
- std::string token(b64_encode_buf);
- free(b64_encode_buf);
- b64_encode_buf = nullptr;
+ std::string token(b64EncodeBuf);
+ free(b64EncodeBuf);
+ b64EncodeBuf = nullptr;
if (token.length() != TokenLen) {
continue;
}
@@ -133,37 +129,13 @@ bool RequestCache::IsValidToken(const std::string &token)
}
// Consume the token (remove it from the cache) and return the cached request, if found.
-runtime::v1alpha2::ExecRequest RequestCache::ConsumeExecRequest(const std::string &token)
-{
- std::lock_guard<std::mutex> lock(m_mutex);
-
- if (m_tokens.count(token) == 0 || m_tokens[token].execRequest.size() == 0) {
- ERROR("Invalid token");
- return runtime::v1alpha2::ExecRequest();
- }
-
- CacheEntry ele = m_tokens[token];
- for (auto it = m_ll.begin(); it != m_ll.end(); it++) {
- if (it->token == token) {
- m_ll.erase(it);
- break;
- }
- }
- m_tokens.erase(token);
- if (std::chrono::system_clock::now() > ele.expireTime) {
- return runtime::v1alpha2::ExecRequest();
- }
-
- return ele.execRequest.at(0);
-}
-
-runtime::v1alpha2::AttachRequest RequestCache::ConsumeAttachRequest(const std::string &token)
+::google::protobuf::Message *RequestCache::ConsumeRequest(const std::string &token)
{
std::lock_guard<std::mutex> lock(m_mutex);
- if (m_tokens.count(token) == 0 || m_tokens[token].attachRequest.size() == 0) {
+ if (m_tokens.count(token) == 0) {
ERROR("Invalid token");
- return runtime::v1alpha2::AttachRequest();
+ return nullptr;
}
CacheEntry ele = m_tokens[token];
@@ -175,45 +147,20 @@ runtime::v1alpha2::AttachRequest RequestCache::ConsumeAttachRequest(const std::s
}
m_tokens.erase(token);
if (std::chrono::system_clock::now() > ele.expireTime) {
- return runtime::v1alpha2::AttachRequest();
- }
-
- return ele.attachRequest.at(0);
-}
-
-std::string RequestCache::GetExecContainerIDByToken(const std::string &token)
-{
- std::lock_guard<std::mutex> lock(m_mutex);
-
- if (m_tokens.count(token) == 0 || m_tokens[token].execRequest.size() == 0) {
- ERROR("Invalid token");
- return "";
+ return nullptr;
}
- return m_tokens[token].execRequest.at(0).container_id();
+ return ele.req;
}
-std::string RequestCache::GetAttachContainerIDByToken(const std::string &token)
+std::string RequestCache::GetContainerIDByToken(const std::string &token)
{
std::lock_guard<std::mutex> lock(m_mutex);
- if (m_tokens.count(token) == 0 || m_tokens[token].attachRequest.size() == 0) {
+ if (m_tokens.count(token) == 0) {
ERROR("Invalid token");
return "";
}
- return m_tokens[token].attachRequest.at(0).container_id();
-}
-
-std::string RequestCache::GetContainerIDByToken(const std::string &method, const std::string &token)
-{
- if (method == "exec") {
- return GetExecContainerIDByToken(token);
- } else if (method == "attach") {
- return GetAttachContainerIDByToken(token);
- }
-
- ERROR("Invalid method: %s", method.c_str());
-
- return "";
+ return m_tokens[token].containerID;
}
diff --git a/src/daemon/entry/cri/request_cache.h b/src/daemon/entry/cri/request_cache.h
index d44b4d78..90ae20e8 100644
--- a/src/daemon/entry/cri/request_cache.h
+++ b/src/daemon/entry/cri/request_cache.h
@@ -1,5 +1,5 @@
/******************************************************************************
- * Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved.
+ * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved.
* iSulad licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
@@ -23,44 +23,28 @@
#include <chrono>
#include <typeinfo>
#include <google/protobuf/message.h>
-#include "api.pb.h"
struct CacheEntry {
std::string token;
- std::vector<runtime::v1alpha2::ExecRequest> execRequest;
- std::vector<runtime::v1alpha2::AttachRequest> attachRequest;
+ std::string containerID;
+ ::google::protobuf::Message *req;
std::chrono::system_clock::time_point expireTime;
- void SetValue(const std::string &t,
- const runtime::v1alpha2::ExecRequest *execReq,
- const runtime::v1alpha2::AttachRequest *attachReq,
- std::chrono::system_clock::time_point et)
- {
- token = t;
- if (execReq != nullptr) {
- execRequest.push_back(*execReq);
- } else if (attachReq != nullptr) {
- attachRequest.push_back(*attachReq);
- }
- expireTime = et;
- }
+ void SetValue(const std::string &t, const std::string &id, ::google::protobuf::Message *request,
+ std::chrono::system_clock::time_point et);
};
class RequestCache {
public:
static RequestCache *GetInstance() noexcept;
- std::string InsertExecRequest(const runtime::v1alpha2::ExecRequest &req);
- std::string InsertAttachRequest(const runtime::v1alpha2::AttachRequest &req);
- runtime::v1alpha2::ExecRequest ConsumeExecRequest(const std::string &token);
- runtime::v1alpha2::AttachRequest ConsumeAttachRequest(const std::string &token);
- std::string GetContainerIDByToken(const std::string &method, const std::string &token);
+ std::string InsertRequest(const std::string &containerID, ::google::protobuf::Message *req);
+ ::google::protobuf::Message *ConsumeRequest(const std::string &token);
+ std::string GetContainerIDByToken(const std::string &token);
bool IsValidToken(const std::string &token);
private:
void GarbageCollection();
std::string UniqueToken();
- std::string GetExecContainerIDByToken(const std::string &token);
- std::string GetAttachContainerIDByToken(const std::string &token);
private:
RequestCache() = default;
diff --git a/src/daemon/entry/cri/websocket/service/attach_serve.cc b/src/daemon/entry/cri/websocket/service/attach_serve.cc
index cda63c45..abe23f51 100644
--- a/src/daemon/entry/cri/websocket/service/attach_serve.cc
+++ b/src/daemon/entry/cri/websocket/service/attach_serve.cc
@@ -1,5 +1,5 @@
/******************************************************************************
- * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved.
+ * Copyright (c) Huawei Technologies Co., Ltd. 2018-2021. All rights reserved.
* iSulad licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
@@ -8,86 +8,78 @@
* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
* PURPOSE.
* See the Mulan PSL v2 for more details.
- * Author: lifeng
+ * Author: wujing
* Create: 2018-11-08
* Description: provide container attach functions
******************************************************************************/
#include "attach_serve.h"
+#include "api.pb.h"
+#include "ws_server.h"
+#include "isula_libutils/log.h"
+#include "callback.h"
#include "utils.h"
-int AttachServe::Execute(session_data *lws_ctx, const std::string &token)
+AttachServe::~AttachServe()
{
- if (lws_ctx == nullptr) {
- return -1;
- }
+ free_container_attach_request(m_request);
+ free_container_attach_response(m_response);
+}
+void AttachServe::SetServeThreadName()
+{
prctl(PR_SET_NAME, "AttachServe");
+}
- service_executor_t *cb = get_service_executor();
- if (cb == nullptr || cb->container.attach == nullptr) {
- sem_post(lws_ctx->sync_close_sem);
+int AttachServe::SetContainerStreamRequest(::google::protobuf::Message *request, const std::string &suffix)
+{
+ auto *grequest = dynamic_cast<runtime::v1alpha2::AttachRequest *>(request);
+
+ m_request = static_cast<container_attach_request *>(util_common_calloc_s(sizeof(container_attach_request)));
+ if (m_request == nullptr) {
+ ERROR("Out of memory");
return -1;
}
- container_attach_request *container_req = nullptr;
- if (GetContainerRequest(token, &container_req) != 0) {
- ERROR("Failed to get contaner request");
- sem_post(lws_ctx->sync_close_sem);
+ if (!grequest->container_id().empty()) {
+ m_request->container_id = util_strdup_s(grequest->container_id().c_str());
+ }
+ m_request->attach_stdin = grequest->stdin();
+ m_request->attach_stdout = grequest->stdout();
+ m_request->attach_stderr = grequest->stderr();
+
+ return 0;
+}
+
+int AttachServe::ExecuteStreamCommand(SessionData *lwsCtx)
+{
+ auto *cb = get_service_executor();
+ if (cb == nullptr || cb->container.attach == nullptr) {
+ ERROR("Failed to get attach service executor");
+ sem_post(lwsCtx->syncCloseSem);
return -1;
}
struct io_write_wrapper stringWriter = { 0 };
- stringWriter.context = (void *)(lws_ctx);
+ stringWriter.context = (void *)(lwsCtx);
stringWriter.write_func = WsWriteStdoutToClient;
stringWriter.close_func = closeWsConnect;
- container_req->attach_stderr = false;
-
- container_attach_response *container_res = nullptr;
- int ret = cb->container.attach(container_req, &container_res, container_req->attach_stdin ? lws_ctx->pipes.at(0) : -1,
- container_req->attach_stdout ? &stringWriter : nullptr, nullptr);
- if (ret != 0) {
- ERROR("Failed to attach container: %s", container_req->container_id);
- sem_post(lws_ctx->sync_close_sem);
- }
-
- free_container_attach_request(container_req);
- free_container_attach_response(container_res);
+ m_request->attach_stderr = false;
- return ret;
+ return cb->container.attach(m_request, &m_response, m_request->attach_stdin ? lwsCtx->pipes.at(0) : -1,
+ m_request->attach_stdout ? &stringWriter : nullptr, nullptr);
}
-int AttachServe::GetContainerRequest(const std::string &token, container_attach_request **container_req)
+void AttachServe::ErrorHandler(int ret, SessionData *lwsCtx)
{
- RequestCache *cache = RequestCache::GetInstance();
- auto request = cache->ConsumeAttachRequest(token);
-
- int ret = RequestFromCri(request, container_req);
- if (ret != 0) {
- ERROR("Failed to transform grpc request!");
+ if (ret == 0) {
+ return;
}
-
- return ret;
+ ERROR("Failed to attach container: %s", m_request->container_id);
+ sem_post(lwsCtx->syncCloseSem);
}
-int AttachServe::RequestFromCri(const runtime::v1alpha2::AttachRequest &grequest, container_attach_request **request)
+void AttachServe::CloseConnect(SessionData *lwsCtx)
{
- container_attach_request *tmpreq = nullptr;
-
- tmpreq = (container_attach_request *)util_common_calloc_s(sizeof(container_attach_request));
- if (tmpreq == nullptr) {
- ERROR("Out of memory");
- return -1;
- }
-
- if (!grequest.container_id().empty()) {
- tmpreq->container_id = util_strdup_s(grequest.container_id().c_str());
- }
- tmpreq->attach_stdin = grequest.stdin();
- tmpreq->attach_stdout = grequest.stdout();
- tmpreq->attach_stderr = grequest.stderr();
-
- *request = tmpreq;
-
- return 0;
+ (void)lwsCtx;
}
diff --git a/src/daemon/entry/cri/websocket/service/attach_serve.h b/src/daemon/entry/cri/websocket/service/attach_serve.h
index f7b8a017..38e75e29 100644
--- a/src/daemon/entry/cri/websocket/service/attach_serve.h
+++ b/src/daemon/entry/cri/websocket/service/attach_serve.h
@@ -1,5 +1,5 @@
/******************************************************************************
- * Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved.
+ * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved.
* iSulad licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
@@ -17,27 +17,27 @@
#define DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_ATTACH_SERVE_H
#include "route_callback_register.h"
-#include <chrono>
#include <string>
-#include <thread>
-#include "ws_server.h"
-
-#include "api.pb.h"
-#include "isula_libutils/log.h"
-#include "callback.h"
-#include "request_cache.h"
+#include "isula_libutils/container_attach_request.h"
+#include "isula_libutils/container_attach_response.h"
class AttachServe : public StreamingServeInterface {
public:
AttachServe() = default;
AttachServe(const AttachServe &) = delete;
AttachServe &operator=(const AttachServe &) = delete;
- virtual ~AttachServe() = default;
- int Execute(session_data *lws_ctx, const std::string &token) override;
+ virtual ~AttachServe();
+
+private:
+ virtual void SetServeThreadName() override;
+ virtual int SetContainerStreamRequest(::google::protobuf::Message *grequest, const std::string &suffix) override;
+ virtual int ExecuteStreamCommand(SessionData *lwsCtx) override;
+ virtual void ErrorHandler(int ret, SessionData *lwsCtx) override;
+ virtual void CloseConnect(SessionData *lwsCtx) override;
+
private:
- int RequestFromCri(const runtime::v1alpha2::AttachRequest &grequest,
- container_attach_request **request);
- int GetContainerRequest(const std::string &token, container_attach_request **container_req);
+ container_attach_request *m_request { nullptr };
+ container_attach_response *m_response { nullptr };
};
#endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_ATTACH_SERVE_H
diff --git a/src/daemon/entry/cri/websocket/service/exec_serve.cc b/src/daemon/entry/cri/websocket/service/exec_serve.cc
index 26b552de..b7709c48 100644
--- a/src/daemon/entry/cri/websocket/service/exec_serve.cc
+++ b/src/daemon/entry/cri/websocket/service/exec_serve.cc
@@ -1,5 +1,5 @@
/******************************************************************************
- * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved.
+ * Copyright (c) Huawei Technologies Co., Ltd. 2018-2021. All rights reserved.
* iSulad licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
@@ -8,127 +8,111 @@
* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
* PURPOSE.
* See the Mulan PSL v2 for more details.
- * Author: lifeng
+ * Author: wujing
* Create: 2018-11-08
* Description: provide ExecServe functions
******************************************************************************/
#include "exec_serve.h"
+#include <isula_libutils/log.h>
#include "io_wrapper.h"
+#include "ws_server.h"
#include "utils.h"
#include "cri_helpers.h"
-int ExecServe::Execute(session_data *lws_ctx, const std::string &token)
+ExecServe::~ExecServe()
{
- if (lws_ctx == nullptr) {
- return -1;
- }
+ free_container_exec_request(m_request);
+ free_container_exec_response(m_response);
+}
+void ExecServe::SetServeThreadName()
+{
prctl(PR_SET_NAME, "ExecServe");
+}
- service_executor_t *cb = get_service_executor();
- if (cb == nullptr || cb->container.exec == nullptr) {
- sem_post(lws_ctx->sync_close_sem);
+int ExecServe::SetContainerStreamRequest(::google::protobuf::Message *request, const std::string &suffix)
+{
+ auto *grequest = dynamic_cast<runtime::v1alpha2::ExecRequest *>(request);
+
+ m_request = static_cast<container_exec_request *>(util_common_calloc_s(sizeof(container_exec_request)));
+ if (m_request == nullptr) {
+ ERROR("Out of memory");
return -1;
}
- container_exec_request *container_req = nullptr;
- if (GetContainerRequest(token, lws_ctx->suffix, &container_req) != 0) {
- ERROR("Failed to get contaner request");
- sem_post(lws_ctx->sync_close_sem);
+ m_request->tty = grequest->tty();
+ m_request->attach_stdin = grequest->stdin();
+ m_request->attach_stdout = grequest->stdout();
+ m_request->attach_stderr = grequest->stderr();
+
+ if (!grequest->container_id().empty()) {
+ m_request->container_id = util_strdup_s(grequest->container_id().c_str());
+ }
+
+ if (grequest->cmd_size() > 0) {
+ if (static_cast<size_t>(grequest->cmd_size()) > SIZE_MAX / sizeof(char *)) {
+ ERROR("Too many arguments!");
+ return -1;
+ }
+ m_request->argv = (char **)util_common_calloc_s(sizeof(char *) * grequest->cmd_size());
+ if (m_request->argv == nullptr) {
+ ERROR("Out of memory!");
+ return -1;
+ }
+ for (int i = 0; i < grequest->cmd_size(); i++) {
+ m_request->argv[i] = util_strdup_s(grequest->cmd(i).c_str());
+ }
+ m_request->argv_len = static_cast<size_t>(grequest->cmd_size());
+ }
+
+ m_request->suffix = util_strdup_s(suffix.c_str());
+
+ return 0;
+}
+
+int ExecServe::ExecuteStreamCommand(SessionData *lwsCtx)
+{
+ auto *cb = get_service_executor();
+ if (cb == nullptr || cb->container.exec == nullptr) {
+ ERROR("Failed to get exec service executor");
+ sem_post(lwsCtx->syncCloseSem);
return -1;
}
struct io_write_wrapper StdoutstringWriter = { 0 };
- StdoutstringWriter.context = (void *)lws_ctx;
+ StdoutstringWriter.context = (void *)lwsCtx;
StdoutstringWriter.write_func = WsWriteStdoutToClient;
// the close function of StderrstringWriter is preferred unless StderrstringWriter is nullptr
StdoutstringWriter.close_func = nullptr;
struct io_write_wrapper StderrstringWriter = { 0 };
- StderrstringWriter.context = (void *)lws_ctx;
+ StderrstringWriter.context = (void *)lwsCtx;
StderrstringWriter.write_func = WsWriteStderrToClient;
StderrstringWriter.close_func = nullptr;
- container_exec_response *container_res = nullptr;
- int ret = cb->container.exec(container_req, &container_res, container_req->attach_stdin ? lws_ctx->pipes.at(0) : -1,
- container_req->attach_stdout ? &StdoutstringWriter : nullptr,
- container_req->attach_stderr ? &StderrstringWriter : nullptr);
+ return cb->container.exec(m_request, &m_response, m_request->attach_stdin ? lwsCtx->pipes.at(0) : -1,
+ m_request->attach_stdout ? &StdoutstringWriter : nullptr,
+ m_request->attach_stderr ? &StderrstringWriter : nullptr);
+}
+
+void ExecServe::ErrorHandler(int ret, SessionData *lwsCtx)
+{
if (ret != 0) {
std::string message;
- if (container_res != nullptr && container_res->errmsg != nullptr) {
- message = container_res->errmsg;
+ if (m_response != nullptr && m_response->errmsg != nullptr) {
+ message = m_response->errmsg;
} else {
message = "Failed to call exec container callback. ";
}
- WsWriteStdoutToClient(lws_ctx, message.c_str(), message.length());
+ WsWriteStdoutToClient(lwsCtx, message.c_str(), message.length());
}
- if (container_res != nullptr && container_res->exit_code != 0) {
- std::string exit_info = "Exit code :" + std::to_string((int)container_res->exit_code) + "\n";
- WsWriteStdoutToClient(lws_ctx, exit_info.c_str(), exit_info.length());
+ if (m_response != nullptr && m_response->exit_code != 0) {
+ std::string exit_info = "Exit code :" + std::to_string((int)m_response->exit_code) + "\n";
+ WsWriteStdoutToClient(lwsCtx, exit_info.c_str(), exit_info.length());
}
-
- free_container_exec_request(container_req);
- free_container_exec_response(container_res);
-
- closeWsConnect((void*)lws_ctx, nullptr);
-
- return ret;
}
-int ExecServe::GetContainerRequest(const std::string &token, const std::string &suffix,
- container_exec_request **container_req)
+void ExecServe::CloseConnect(SessionData *lwsCtx)
{
- RequestCache *cache = RequestCache::GetInstance();
- auto request = cache->ConsumeExecRequest(token);
-
- int ret = RequestFromCri(request, suffix, container_req);
- if (ret != 0) {
- ERROR("Failed to transform grpc request!");
- }
-
- return ret;
-}
-
-int ExecServe::RequestFromCri(const runtime::v1alpha2::ExecRequest &grequest, const std::string &suffix,
- container_exec_request **request)
-{
- container_exec_request *tmpreq = nullptr;
-
- tmpreq = (container_exec_request *)util_common_calloc_s(sizeof(container_exec_request));
- if (tmpreq == nullptr) {
- ERROR("Out of memory");
- return -1;
- }
-
- tmpreq->tty = grequest.tty();
- tmpreq->attach_stdin = grequest.stdin();
- tmpreq->attach_stdout = grequest.stdout();
- tmpreq->attach_stderr = grequest.stderr();
-
- if (!grequest.container_id().empty()) {
- tmpreq->container_id = util_strdup_s(grequest.container_id().c_str());
- }
-
- if (grequest.cmd_size() > 0) {
- if ((size_t)grequest.cmd_size() > SIZE_MAX / sizeof(char *)) {
- ERROR("Too many arguments!");
- free_container_exec_request(tmpreq);
- return -1;
- }
- tmpreq->argv = (char **)util_common_calloc_s(sizeof(char *) * grequest.cmd_size());
- if (tmpreq->argv == nullptr) {
- ERROR("Out of memory!");
- free_container_exec_request(tmpreq);
- return -1;
- }
- for (int i = 0; i < grequest.cmd_size(); i++) {
- tmpreq->argv[i] = util_strdup_s(grequest.cmd(i).c_str());
- }
- tmpreq->argv_len = (size_t)grequest.cmd_size();
- }
-
- tmpreq->suffix = util_strdup_s(suffix.c_str());
-
- *request = tmpreq;
- return 0;
+ closeWsConnect((void*)lwsCtx, nullptr);
}
diff --git a/src/daemon/entry/cri/websocket/service/exec_serve.h b/src/daemon/entry/cri/websocket/service/exec_serve.h
index 5cccdee8..3afb2abb 100644
--- a/src/daemon/entry/cri/websocket/service/exec_serve.h
+++ b/src/daemon/entry/cri/websocket/service/exec_serve.h
@@ -1,5 +1,5 @@
/******************************************************************************
- * Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved.
+ * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved.
* iSulad licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
@@ -20,28 +20,27 @@
#include <string>
#include <chrono>
#include <thread>
-#include <grpc++/grpc++.h>
-#include "api.grpc.pb.h"
-#include "container.grpc.pb.h"
#include "route_callback_register.h"
-#include "isula_libutils/log.h"
-#include "callback.h"
-#include "ws_server.h"
-#include "request_cache.h"
-#include "api.pb.h"
+#include "isula_libutils/container_exec_request.h"
+#include "isula_libutils/container_exec_response.h"
class ExecServe : public StreamingServeInterface {
public:
ExecServe() = default;
ExecServe(const ExecServe &) = delete;
ExecServe &operator=(const ExecServe &) = delete;
- virtual ~ExecServe() = default;
- int Execute(session_data *lws_ctx, const std::string &token) override;
+ virtual ~ExecServe();
private:
- int RequestFromCri(const runtime::v1alpha2::ExecRequest &grequest, const std::string &suffix,
- container_exec_request **request);
- int GetContainerRequest(const std::string &token, const std::string &suffix, container_exec_request **request);
+ virtual void SetServeThreadName() override;
+ virtual int SetContainerStreamRequest(::google::protobuf::Message *grequest, const std::string &suffix) override;
+ virtual int ExecuteStreamCommand(SessionData *lwsCtx) override;
+ virtual void ErrorHandler(int ret, SessionData *lwsCtx) override;
+ virtual void CloseConnect(SessionData *lwsCtx) override;
+
+private:
+ container_exec_request *m_request { nullptr };
+ container_exec_response *m_response { nullptr };
};
#endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_EXEC_SERVE_H
diff --git a/src/daemon/entry/cri/websocket/service/route_callback_register.cc b/src/daemon/entry/cri/websocket/service/route_callback_register.cc
new file mode 100644
index 00000000..fb14381f
--- /dev/null
+++ b/src/daemon/entry/cri/websocket/service/route_callback_register.cc
@@ -0,0 +1,80 @@
+/******************************************************************************
+ * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
+ * iSulad licensed under the Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+ * PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * Description: Streaming service function registration.
+ * Author: wujing
+ * Create: 2021-11-04
+ ******************************************************************************/
+#include "route_callback_register.h"
+#include <isula_libutils/log.h>
+#include "ws_server.h"
+
+int StreamingServeInterface::Execute(SessionData *lwsCtx, const std::string &token)
+{
+ if (lwsCtx == nullptr) {
+ return -1;
+ }
+
+ SetServeThreadName();
+
+ auto *cache = RequestCache::GetInstance();
+ auto request = cache->ConsumeRequest(token);
+ if (request == nullptr) {
+ ERROR("Failed to get cache request!");
+ sem_post(lwsCtx->syncCloseSem);
+ return -1;
+ }
+
+ if (SetContainerStreamRequest(request, lwsCtx->suffix) != 0) {
+ ERROR("Failed to set container request");
+ sem_post(lwsCtx->syncCloseSem);
+ return -1;
+ }
+
+ // request is stored on the heap in the cache and needs to be released after use
+ delete request;
+ request = nullptr;
+
+ int ret = ExecuteStreamCommand(lwsCtx);
+
+ ErrorHandler(ret, lwsCtx);
+
+ CloseConnect(lwsCtx);
+
+ return ret;
+}
+
+bool RouteCallbackRegister::IsValidMethod(const std::string &method)
+{
+ return static_cast<bool>(m_registeredcallbacks.count(method));
+}
+
+int RouteCallbackRegister::HandleCallback(SessionData *lwsCtx, const std::string &method, const std::string &token)
+{
+ auto it = m_registeredcallbacks.find(method);
+ if (it != m_registeredcallbacks.end()) {
+ std::shared_ptr<StreamingServeInterface> callback = it->second;
+ if (callback) {
+ return callback->Execute(lwsCtx, token);
+ }
+ }
+ ERROR("invalid method!");
+ return -1;
+}
+
+void RouteCallbackRegister::RegisterCallback(const std::string &path, std::shared_ptr<StreamingServeInterface> callback)
+{
+ m_registeredcallbacks.insert(std::pair<std::string, std::shared_ptr<StreamingServeInterface>>(path, callback));
+}
+
+int StreamTask::Run()
+{
+ return m_invoker->HandleCallback(m_lwsCtx, m_method, m_token);
+}
\ No newline at end of file
diff --git a/src/daemon/entry/cri/websocket/service/route_callback_register.h b/src/daemon/entry/cri/websocket/service/route_callback_register.h
index 909c552b..da75fc5b 100644
--- a/src/daemon/entry/cri/websocket/service/route_callback_register.h
+++ b/src/daemon/entry/cri/websocket/service/route_callback_register.h
@@ -22,9 +22,9 @@
#include <map>
#include <unistd.h>
#include <semaphore.h>
-#include "isula_libutils/log.h"
+#include "request_cache.h"
-struct session_data;
+struct SessionData;
class StreamingServeInterface {
public:
@@ -32,7 +32,14 @@ public:
StreamingServeInterface(const StreamingServeInterface &) = delete;
StreamingServeInterface &operator=(const StreamingServeInterface &) = delete;
virtual ~StreamingServeInterface() = default;
- virtual int Execute(session_data *lws_ctx, const std::string &token) = 0;
+ int Execute(SessionData *lwsCtx, const std::string &token);
+
+protected:
+ virtual void SetServeThreadName() = 0;
+ virtual int SetContainerStreamRequest(::google::protobuf::Message *grequest, const std::string &suffix) = 0;
+ virtual int ExecuteStreamCommand(SessionData *lwsCtx) = 0;
+ virtual void ErrorHandler(int ret, SessionData *lwsCtx) = 0;
+ virtual void CloseConnect(SessionData *lwsCtx) = 0;
};
class RouteCallbackRegister {
@@ -41,30 +48,10 @@ public:
RouteCallbackRegister(const RouteCallbackRegister &) = delete;
RouteCallbackRegister &operator=(const RouteCallbackRegister &) = delete;
virtual ~RouteCallbackRegister() = default;
- bool IsValidMethod(const std::string &method)
- {
- return static_cast<bool>(m_registeredcallbacks.count(method));
- }
- int HandleCallback(session_data *lws_ctx, const std::string &method,
- const std::string &token)
- {
- auto it = m_registeredcallbacks.find(method);
- if (it != m_registeredcallbacks.end()) {
- std::shared_ptr<StreamingServeInterface> callback = it->second;
- if (callback) {
- return callback->Execute(lws_ctx, token);
- }
- }
- ERROR("invalid method!");
- return -1;
- }
- void RegisterCallback(const std::string &path,
- std::shared_ptr<StreamingServeInterface> callback)
- {
- m_registeredcallbacks.insert(std::pair<std::string,
- std::shared_ptr<StreamingServeInterface>>(path, callback));
- }
+ bool IsValidMethod(const std::string &method);
+ int HandleCallback(SessionData *lwsCtx, const std::string &method, const std::string &token);
+ void RegisterCallback(const std::string &path, std::shared_ptr<StreamingServeInterface> callback);
private:
std::map<std::string, std::shared_ptr<StreamingServeInterface>> m_registeredcallbacks;
@@ -72,24 +59,24 @@ private:
class StreamTask {
public:
- StreamTask(RouteCallbackRegister *invoker, session_data *lws_ctx,
- const std::string &method,
+ StreamTask(RouteCallbackRegister *invoker, SessionData *lwsCtx, const std::string &method,
const std::string &token)
- : m_invoker(invoker), m_lws_ctx(lws_ctx), m_method(method), m_token(token) {}
+ : m_invoker(invoker)
+ , m_lwsCtx(lwsCtx)
+ , m_method(method)
+ , m_token(token)
+ {
+ }
StreamTask(const StreamTask &) = delete;
StreamTask &operator=(const StreamTask &) = delete;
virtual ~StreamTask() = default;
- int Run()
- {
- return m_invoker->HandleCallback(m_lws_ctx, m_method, m_token);
- }
+ int Run();
+
private:
- RouteCallbackRegister *m_invoker{ nullptr };
- session_data *m_lws_ctx;
+ RouteCallbackRegister *m_invoker { nullptr };
+ SessionData *m_lwsCtx;
std::string m_method;
std::string m_token;
};
#endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_ROUTE_CALLBACK_REGISTER_H
-
-
diff --git a/src/daemon/entry/cri/websocket/service/stream_server.cc b/src/daemon/entry/cri/websocket/service/stream_server.cc
index b4df642f..4342e006 100644
--- a/src/daemon/entry/cri/websocket/service/stream_server.cc
+++ b/src/daemon/entry/cri/websocket/service/stream_server.cc
@@ -22,7 +22,7 @@
void websocket_server_init(Errors &err)
{
- WebsocketServer *server = WebsocketServer::GetInstance();
+ auto *server = WebsocketServer::GetInstance();
server->RegisterCallback(std::string("exec"), std::make_shared<ExecServe>());
server->RegisterCallback(std::string("attach"), std::make_shared<AttachServe>());
server->Start(err);
@@ -30,13 +30,13 @@ void websocket_server_init(Errors &err)
void websocket_server_wait(void)
{
- WebsocketServer *server = WebsocketServer::GetInstance();
+ auto *server = WebsocketServer::GetInstance();
server->Wait();
}
void websocket_server_shutdown(void)
{
- WebsocketServer *server = WebsocketServer::GetInstance();
+ auto *server = WebsocketServer::GetInstance();
server->Shutdown();
}
diff --git a/src/daemon/entry/cri/websocket/service/stream_server.h b/src/daemon/entry/cri/websocket/service/stream_server.h
index 43e42b83..ba6b3672 100644
--- a/src/daemon/entry/cri/websocket/service/stream_server.h
+++ b/src/daemon/entry/cri/websocket/service/stream_server.h
@@ -1,5 +1,5 @@
/******************************************************************************
- * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved.
+ * Copyright (c) Huawei Technologies Co., Ltd. 2018-2021. All rights reserved.
* iSulad licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
@@ -8,7 +8,7 @@
* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
* PURPOSE.
* See the Mulan PSL v2 for more details.
- * Author: lifeng
+ * Author: wujing
* Create: 2018-11-08
* Description: provide websocket stream service definition
******************************************************************************/
diff --git a/src/daemon/entry/cri/websocket/service/ws_server.cc b/src/daemon/entry/cri/websocket/service/ws_server.cc
index e4b3a1b4..0e462737 100644
--- a/src/daemon/entry/cri/websocket/service/ws_server.cc
+++ b/src/daemon/entry/cri/websocket/service/ws_server.cc
@@ -1,5 +1,5 @@
/******************************************************************************
- * Copyright (c) Huawei Technologies Co., Ltd. 2018-2019. All rights reserved.
+ * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved.
* iSulad licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
@@ -8,8 +8,8 @@
* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
* PURPOSE.
* See the Mulan PSL v2 for more details.
- * Author: lifeng
- * Create: 2018-11-08
+ * Author: wujing
+ * Create: 2019-01-02
* Description: provide websocket server functions
******************************************************************************/
@@ -19,34 +19,132 @@
#include <future>
#include <utility>
#include <sys/resource.h>
+#include <isula_libutils/log.h>
#include "cxxutils.h"
-#include "isula_libutils/log.h"
#include "utils.h"
#include "request_cache.h"
#include "constants.h"
#include "isulad_config.h"
#include "callback.h"
#include "cri_helpers.h"
+#include "isula_libutils/cri_terminal_size.h"
struct lws_context *WebsocketServer::m_context = nullptr;
std::atomic<WebsocketServer *> WebsocketServer::m_instance;
RWMutex WebsocketServer::m_mutex;
-std::unordered_map<int, session_data *> WebsocketServer::m_wsis;
+std::unordered_map<int, SessionData *> WebsocketServer::m_wsis;
+
+namespace {
+const int MAX_BUF_LEN = 256;
+const int MAX_HTTP_HEADER_POOL = 8;
+// io copy maximum single transfer 4K, let max total buffer size: 1GB
+const int FIFO_LIST_BUFFER_MAX_LEN = 262144;
+const int SESSION_CAPABILITY = 300;
+const int MAX_SESSION_NUM = 128;
+}; // namespace
+
+enum WebsocketChannel { STDINCHANNEL = 0, STDOUTCHANNEL, STDERRCHANNEL, ERRORCHANNEL, RESIZECHANNEL };
+
+unsigned char *SessionData::FrontMessage()
+{
+ unsigned char *message = nullptr;
+
+ if (sessionMutex == nullptr) {
+ return nullptr;
+ }
+
+ sessionMutex->lock();
+ message = buffer.front();
+ sessionMutex->unlock();
+
+ return message;
+}
+
+void SessionData::PopMessage()
+{
+ if (sessionMutex == nullptr) {
+ return;
+ }
+
+ sessionMutex->lock();
+ buffer.pop_front();
+ sessionMutex->unlock();
+}
+
+int SessionData::PushMessage(unsigned char *message)
+{
+ if (sessionMutex == nullptr) {
+ return -1;
+ }
+
+ sessionMutex->lock();
+
+ // In extreme scenarios, websocket data cannot be processed,
+ // ignore the data coming in later to prevent iSulad from getting stuck
+ if (close || buffer.size() >= FIFO_LIST_BUFFER_MAX_LEN) {
+ free(message);
+ sessionMutex->unlock();
+ return -1;
+ }
+
+ buffer.push_back(message);
+ sessionMutex->unlock();
+ return 0;
+}
+
+bool SessionData::IsClosed()
+{
+ bool c = false;
+
+ if (sessionMutex == nullptr) {
+ return true;
+ }
+
+ sessionMutex->lock();
+ c = close;
+ sessionMutex->unlock();
+
+ return c;
+}
+
+void SessionData::CloseSession()
+{
+ if (sessionMutex == nullptr) {
+ return;
+ }
+
+ sessionMutex->lock();
+ close = true;
+ sessionMutex->unlock();
+}
+
+void SessionData::EraseAllMessage()
+{
+ if (sessionMutex == nullptr) {
+ return;
+ }
+
+ sessionMutex->lock();
+ for (auto iter = buffer.begin(); iter != buffer.end();) {
+ free(*iter);
+ *iter = NULL;
+ iter = buffer.erase(iter);
+ }
+ sessionMutex->unlock();
+}
WebsocketServer *WebsocketServer::GetInstance() noexcept
{
static std::once_flag flag;
- std::call_once(flag, [] {
- m_instance = new WebsocketServer;
- });
+ std::call_once(flag, [] { m_instance = new WebsocketServer; });
return m_instance;
}
WebsocketServer::WebsocketServer()
{
- m_force_exit = 0;
+ m_forceExit = 0;
m_wsis.reserve(SESSION_CAPABILITY);
}
@@ -60,19 +158,9 @@ url::URLDatum WebsocketServer::GetWebsocketUrl()
return m_url;
}
-void WebsocketServer::ReadLockAllWsSession()
-{
- m_mutex.rdlock();
-}
-
-void WebsocketServer::UnlockAllWsSession()
-{
- m_mutex.unlock();
-}
-
void WebsocketServer::Shutdown()
{
- m_force_exit = 1;
+ m_forceExit = 1;
lws_cancel_service(m_context);
}
@@ -99,15 +187,12 @@ void WebsocketServer::EmitLog(int level, const char *line)
int WebsocketServer::CreateContext()
{
- int limited;
- struct lws_context_creation_info info;
- struct rlimit oldLimit, newLimit;
- const size_t WS_ULIMIT_FDS = 1024;
+ const size_t WS_ULIMIT_FDS { 1024 };
m_url.SetScheme("ws");
m_url.SetHost("localhost:" + std::to_string(m_listenPort));
- (void)memset(&info, 0, sizeof(info));
+ lws_context_creation_info info { 0x00 };
lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_DEBUG, WebsocketServer::EmitLog);
info.port = m_listenPort;
@@ -125,9 +210,10 @@ int WebsocketServer::CreateContext()
* belowing lws_create_context limit the fds of websocket to RLIMIT_NOFILE,
* and malloced memory according to it. To reduce memory, we recover it to 1024 before create m_context.
*/
+ rlimit oldLimit, newLimit;
newLimit.rlim_cur = WS_ULIMIT_FDS;
newLimit.rlim_max = WS_ULIMIT_FDS;
- limited = prlimit(0, RLIMIT_NOFILE, &newLimit, &oldLimit);
+ int limited = prlimit(0, RLIMIT_NOFILE, &newLimit, &oldLimit);
if (limited != 0) {
WARN("Can not set ulimit of RLIMIT_NOFILE: %s", strerror(errno));
}
@@ -145,8 +231,7 @@ int WebsocketServer::CreateContext()
return 0;
}
-void WebsocketServer::RegisterCallback(const std::string &path,
- std::shared_ptr<StreamingServeInterface> callback)
+void WebsocketServer::RegisterCallback(const std::string &path, std::shared_ptr<StreamingServeInterface> callback)
{
m_handler.RegisterCallback(path, callback);
}
@@ -158,8 +243,8 @@ void WebsocketServer::CloseAllWsSession()
it->second->EraseAllMessage();
close(it->second->pipes.at(0));
close(it->second->pipes.at(1));
- (void)sem_destroy(it->second->sync_close_sem);
- delete it->second->session_mutex;
+ (void)sem_destroy(it->second->syncCloseSem);
+ delete it->second->sessionMutex;
delete it->second;
}
m_wsis.clear();
@@ -189,23 +274,23 @@ void WebsocketServer::CloseWsSession(int socketID)
close(session->pipes.at(1));
session->pipes.at(1) = -1;
}
- (void)sem_wait(session->sync_close_sem);
- (void)sem_destroy(session->sync_close_sem);
- delete session->sync_close_sem;
- session->sync_close_sem = nullptr;
+ (void)sem_wait(session->syncCloseSem);
+ (void)sem_destroy(session->syncCloseSem);
+ delete session->syncCloseSem;
+ session->syncCloseSem = nullptr;
close(session->pipes.at(0));
- delete session->session_mutex;
- session->session_mutex = nullptr;
+ delete session->sessionMutex;
+ session->sessionMutex = nullptr;
delete session;
}).detach();
}
-int WebsocketServer::GenerateSessionData(session_data *session, const std::string containerID) noexcept
+int WebsocketServer::GenerateSessionData(SessionData *session, const std::string containerID) noexcept
{
char *suffix = nullptr;
- int read_pipe_fd[PIPE_FD_NUM] = {-1, -1};
- std::mutex *buf_mutex = nullptr;
- sem_t *sync_close_sem = nullptr;
+ int readPipeFd[2] = { -1, -1 };
+ std::mutex *bufMutex = nullptr;
+ sem_t *syncCloseSem = nullptr;
suffix = CRIHelpers::GenerateExecSuffix();
if (suffix == nullptr) {
@@ -213,24 +298,24 @@ int WebsocketServer::GenerateSessionData(session_data *session, const std::strin
return -1;
}
- if (InitRWPipe(read_pipe_fd) < 0) {
+ if (InitRWPipe(readPipeFd) < 0) {
ERROR("failed to init read/write pipe!");
goto out;
}
- buf_mutex = new std::mutex;
- sync_close_sem = new sem_t;
+ bufMutex = new std::mutex;
+ syncCloseSem = new sem_t;
- if (sem_init(sync_close_sem, 0, 0) != 0) {
+ if (sem_init(syncCloseSem, 0, 0) != 0) {
ERROR("Semaphore initialization failed");
goto out;
}
- session->pipes = std::array<int, MAX_ARRAY_LEN> { read_pipe_fd[0], read_pipe_fd[1] };
- session->session_mutex = buf_mutex;
- session->sync_close_sem = sync_close_sem;
+ session->pipes = std::array<int, MAX_ARRAY_LEN> { readPipeFd[0], readPipeFd[1] };
+ session->sessionMutex = bufMutex;
+ session->syncCloseSem = syncCloseSem;
session->close = false;
- session->container_id = containerID;
+ session->containerID = containerID;
session->suffix = std::string(suffix);
free(suffix);
@@ -241,17 +326,17 @@ out:
if (suffix != nullptr) {
free(suffix);
}
- if (read_pipe_fd[1] >= 0) {
- close(read_pipe_fd[1]);
+ if (readPipeFd[1] >= 0) {
+ close(readPipeFd[1]);
}
- if (read_pipe_fd[0] >= 0) {
- close(read_pipe_fd[0]);
+ if (readPipeFd[0] >= 0) {
+ close(readPipeFd[0]);
}
- if (buf_mutex != nullptr) {
- delete buf_mutex;
+ if (bufMutex != nullptr) {
+ delete bufMutex;
}
- if (sync_close_sem) {
- delete sync_close_sem;
+ if (syncCloseSem) {
+ delete syncCloseSem;
}
return -1;
@@ -269,10 +354,9 @@ int WebsocketServer::RegisterStreamTask(struct lws *wsi) noexcept
buf[sizeof(buf) - 1] = '\0';
// format: "/cri/" + method + "/" + token + "/" + arg(container=cmd)
auto vec = CXXUtils::Split(buf + 1, '/');
- RequestCache *cache = RequestCache::GetInstance();
- if (vec.size() < MIN_VEC_SIZE ||
- !m_handler.IsValidMethod(vec.at(1)) ||
- !cache->IsValidToken(vec.at(2))) {
+ auto *cache = RequestCache::GetInstance();
+ // buffer contains at least 3 parts: cri, method, token
+ if (vec.size() < 3 || !m_handler.IsValidMethod(vec.at(1)) || !cache->IsValidToken(vec.at(2))) {
ERROR("invalid url(%s): incorrect format!", buf);
return -1;
}
@@ -288,13 +372,13 @@ int WebsocketServer::RegisterStreamTask(struct lws *wsi) noexcept
return -1;
}
- std::string containerID = cache->GetContainerIDByToken(vec.at(1), vec.at(2));
+ auto containerID = cache->GetContainerIDByToken(vec.at(2));
if (containerID.empty()) {
ERROR("Failed to get container id from %s request", vec.at(1).c_str());
return -1;
}
- session_data *session = new (std::nothrow) session_data;
+ auto *session = new (std::nothrow) SessionData;
if (session == nullptr) {
ERROR("Out of memory");
return -1;
@@ -304,7 +388,7 @@ int WebsocketServer::RegisterStreamTask(struct lws *wsi) noexcept
return -1;
}
- std::string suffixID = session->suffix;
+ auto suffixID = session->suffix;
auto insertRet = m_wsis.insert(std::make_pair(socketID, session));
if (!insertRet.second) {
ERROR("failed to insert session data to map");
@@ -351,8 +435,8 @@ int WebsocketServer::Wswrite(struct lws *wsi, const unsigned char *message)
if (strlen((const char *)(&message[LWS_PRE + 1])) == 0) {
return 0;
}
- int n = lws_write(wsi, (unsigned char *)(&message[LWS_PRE]),
- strlen((const char *)(&message[LWS_PRE + 1])) + 1, LWS_WRITE_TEXT);
+ auto n = lws_write(wsi, (unsigned char *)(&message[LWS_PRE]), strlen((const char *)(&message[LWS_PRE + 1])) + 1,
+ LWS_WRITE_TEXT);
if (n < 0) {
ERROR("ERROR %d writing to socket, hanging up", n);
return -1;
@@ -362,21 +446,18 @@ int WebsocketServer::Wswrite(struct lws *wsi, const unsigned char *message)
return 0;
}
-int WebsocketServer::parseTerminalSize(const char *jsonData, size_t len, uint16_t &width, uint16_t &height)
+int WebsocketServer::ParseTerminalSize(const char *jsonData, size_t len, uint16_t &width, uint16_t &height)
{
- int ret = 0;
- parser_error err = nullptr;
- cri_terminal_size *terminalSize = nullptr;
-
if (jsonData == nullptr || len == 0) {
return -1;
}
// No terminator is included in json data, and len contains a character occupied by channal
std::string jsonDataStr { jsonData, len - 1 };
-
+ parser_error err = nullptr;
+ int ret = 0;
// parse json data. eg: {"Width":xx,"Height":xx}
- terminalSize = cri_terminal_size_parse_data(jsonDataStr.c_str(), nullptr, &err);
+ cri_terminal_size *terminalSize = cri_terminal_size_parse_data(jsonDataStr.c_str(), nullptr, &err);
if (terminalSize == nullptr) {
ERROR("Failed to parse json: %s", err);
ret = -1;
@@ -391,29 +472,22 @@ int WebsocketServer::parseTerminalSize(const char *jsonData, size_t len, uint16_
return ret;
}
-int WebsocketServer::ResizeTerminal(
- int socketID, const char *jsonData, size_t len,
- const std::string &containerID,
- const std::string &suffix)
+int WebsocketServer::ResizeTerminal(int socketID, const char *jsonData, size_t len, const std::string &containerID,
+ const std::string &suffix)
{
- int ret;
- service_executor_t *cb = nullptr;
- struct isulad_container_resize_request *req = nullptr;
- struct isulad_container_resize_response *res = nullptr;
- uint16_t width = 0;
- uint16_t height = 0;
-
- cb = get_service_executor();
+ auto *cb = get_service_executor();
if (cb == nullptr || cb->container.resize == nullptr) {
return -1;
}
- if (parseTerminalSize(jsonData, len, width, height) != 0) {
+ uint16_t width = 0;
+ uint16_t height = 0;
+ if (ParseTerminalSize(jsonData, len, width, height) != 0) {
return -1;
}
- req = (struct isulad_container_resize_request *)util_common_calloc_s(
- sizeof(struct isulad_container_resize_request));
+ auto *req = static_cast<isulad_container_resize_request *>(
+ util_common_calloc_s(sizeof(struct isulad_container_resize_request)));
if (req == nullptr) {
ERROR("Out of memory");
return -1;
@@ -424,12 +498,12 @@ int WebsocketServer::ResizeTerminal(
req->height = height;
req->width = width;
- ret = cb->container.resize(req, &res);
+ struct isulad_container_resize_response *res = nullptr;
+ int ret = cb->container.resize(req, &res);
isulad_container_resize_request_free(req);
isulad_container_resize_response_free(res);
-
return ret;
}
@@ -442,9 +516,7 @@ void WebsocketServer::Receive(int socketID, void *in, size_t len)
}
if (*static_cast<char *>(in) == WebsocketChannel::RESIZECHANNEL) {
- std::string containerID = it->second->container_id;
- std::string suffix = it->second->suffix;
- if (ResizeTerminal(socketID, (char *)in + 1, len, containerID, suffix) != 0) {
+ if (ResizeTerminal(socketID, (char *)in + 1, len, it->second->containerID, it->second->suffix) != 0) {
ERROR("Failed to resize terminal tty");
return;
}
@@ -459,8 +531,7 @@ void WebsocketServer::Receive(int socketID, void *in, size_t len)
}
}
-int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason,
- void *user, void *in, size_t len)
+int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
switch (reason) {
case LWS_CALLBACK_HTTP:
@@ -493,11 +564,11 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason,
return -1;
}
- auto isSessionClosed = it->second->IsClosed();
+ auto sessionClosed = it->second->IsClosed();
while (!it->second->buffer.empty()) {
- unsigned char *message = it->second->FrontMessage();
+ auto *message = it->second->FrontMessage();
// send success! free it and erase for list
- if (WebsocketServer::GetInstance()->Wswrite(wsi, (const unsigned char *)message) == 0) {
+ if (WebsocketServer::GetInstance()->Wswrite(wsi, const_cast<const unsigned char *>(message)) == 0) {
free(message);
it->second->PopMessage();
} else {
@@ -508,7 +579,7 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason,
}
// avoid: push message to buffer and set closed true
- if (isSessionClosed) {
+ if (sessionClosed) {
DEBUG("websocket session disconnected");
return -1;
}
@@ -517,7 +588,7 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason,
break;
case LWS_CALLBACK_RECEIVE: {
ReadGuard<RWMutex> lock(m_mutex);
- WebsocketServer::GetInstance()->Receive(lws_get_socket_fd(wsi), (char *)in, len);
+ WebsocketServer::GetInstance()->Receive(lws_get_socket_fd(wsi), static_cast<char *>(in), len);
}
break;
case LWS_CALLBACK_CLOSED: {
@@ -538,7 +609,7 @@ void WebsocketServer::ServiceWorkThread(int threadid)
prctl(PR_SET_NAME, "WebsocketServer");
- while (n >= 0 && !m_force_exit) {
+ while (n >= 0 && !m_forceExit) {
n = lws_service(m_context, 0);
}
}
@@ -553,16 +624,17 @@ void WebsocketServer::Start(Errors &err)
if (CreateContext() < 0) {
err.SetError("Websocket server start failed! please check your network status"
- "(eg: port " + std::to_string(m_listenPort) + " is occupied)");
+ "(eg: port " +
+ std::to_string(m_listenPort) + " is occupied)");
return;
}
- m_pthread_service = std::thread(&WebsocketServer::ServiceWorkThread, this, 0);
+ m_pthreadService = std::thread(&WebsocketServer::ServiceWorkThread, this, 0);
}
void WebsocketServer::Wait()
{
- if (m_pthread_service.joinable()) {
- m_pthread_service.join();
+ if (m_pthreadService.joinable()) {
+ m_pthreadService.join();
}
CloseAllWsSession();
@@ -571,19 +643,17 @@ void WebsocketServer::Wait()
}
namespace {
-
-void DoWriteToClient(session_data *session,
- const void *data, size_t len, WebsocketChannel channel)
+void DoWriteToClient(SessionData *session, const void *data, size_t len, WebsocketChannel channel)
{
- unsigned char *buf = (unsigned char *)util_common_calloc_s(LWS_PRE + MAX_BUFFER_SIZE + 1);
+ auto *buf = static_cast<unsigned char *>(util_common_calloc_s(LWS_PRE + MAX_BUFFER_SIZE + 1));
if (buf == nullptr) {
ERROR("Out of memory");
return;
}
// Determine if it is standard output channel or error channel
- buf[LWS_PRE] = channel;
+ buf[LWS_PRE] = static_cast<int>(channel);
- (void)memcpy(&buf[LWS_PRE + 1], (void *)data, len);
+ (void)memcpy(&buf[LWS_PRE + 1], const_cast<void *>(data), len);
// push back to message list
if (session->PushMessage(buf) != 0) {
@@ -594,7 +664,7 @@ void DoWriteToClient(session_data *session,
ssize_t WsWriteToClient(void *context, const void *data, size_t len, WebsocketChannel channel)
{
- auto *lwsCtx = static_cast<session_data *>(context);
+ auto *lwsCtx = static_cast<SessionData *>(context);
// CloseWsSession wait IOCopy finished, and then delete session in m_wsis
// So don't need rdlock m_wsis here
@@ -605,7 +675,7 @@ ssize_t WsWriteToClient(void *context, const void *data, size_t len, WebsocketCh
DoWriteToClient(lwsCtx, data, len, channel);
return static_cast<ssize_t>(len);
}
-};
+}; // namespace
ssize_t WsWriteStdoutToClient(void *context, const void *data, size_t len)
{
@@ -636,12 +706,12 @@ int closeWsConnect(void *context, char **err)
return -1;
}
- auto *lwsCtx = static_cast<session_data *>(context);
+ auto *lwsCtx = static_cast<SessionData *>(context);
lwsCtx->CloseSession();
- if (lwsCtx->sync_close_sem != nullptr) {
- (void)sem_post(lwsCtx->sync_close_sem);
+ if (lwsCtx->syncCloseSem != nullptr) {
+ (void)sem_post(lwsCtx->syncCloseSem);
}
return 0;
diff --git a/src/daemon/entry/cri/websocket/service/ws_server.h b/src/daemon/entry/cri/websocket/service/ws_server.h
index 3ab8e22f..2d3bb4a7 100644
--- a/src/daemon/entry/cri/websocket/service/ws_server.h
+++ b/src/daemon/entry/cri/websocket/service/ws_server.h
@@ -1,5 +1,5 @@
/******************************************************************************
- * Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved.
+ * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved.
* iSulad licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
@@ -17,7 +17,6 @@
#define DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_WS_SERVER_H
#include <vector>
#include <unordered_map>
-#include <unordered_set>
#include <string>
#include <mutex>
#include <atomic>
@@ -29,140 +28,40 @@
#include "url.h"
#include "errors.h"
#include "read_write_lock.h"
-#include "isula_libutils/cri_terminal_size.h"
-#define MAX_ECHO_PAYLOAD 4096
-#define MAX_ARRAY_LEN 2
-#define MAX_BUF_LEN 256
-#define MAX_PROTOCOL_NUM 2
-#define MAX_HTTP_HEADER_POOL 8
-#define MIN_VEC_SIZE 3
-#define PIPE_FD_NUM 2
-#define BUF_BASE_SIZE 1024
-#define LWS_TIMEOUT 50
-// io copy maximum single transfer 4K, let max total buffer size: 1GB
-#define FIFO_LIST_BUFFER_MAX_LEN 262144
-#define SESSION_CAPABILITY 300
-#define MAX_SESSION_NUM 128
+namespace
+{
+const int MAX_ECHO_PAYLOAD = 4096;
+const int MAX_ARRAY_LEN = 2;
+const int MAX_PROTOCOL_NUM = 2;
+}; // namespace
-enum WebsocketChannel {
- STDINCHANNEL = 0,
- STDOUTCHANNEL,
- STDERRCHANNEL,
- ERRORCHANNEL,
- RESIZECHANNEL
-};
-
-struct session_data {
+struct SessionData {
std::array<int, MAX_ARRAY_LEN> pipes;
volatile bool close;
- std::mutex *session_mutex;
- sem_t *sync_close_sem;
+ std::mutex *sessionMutex;
+ sem_t *syncCloseSem;
std::list<unsigned char *> buffer;
- std::string container_id;
+ std::string containerID;
std::string suffix;
- unsigned char *FrontMessage()
- {
- unsigned char *message = nullptr;
-
- if (session_mutex == nullptr) {
- return nullptr;
- }
-
- session_mutex->lock();
- message = buffer.front();
- session_mutex->unlock();
-
- return message;
- }
-
- void PopMessage()
- {
- if (session_mutex == nullptr) {
- return;
- }
-
- session_mutex->lock();
- buffer.pop_front();
- session_mutex->unlock();
- }
-
- int PushMessage(unsigned char *message)
- {
- if (session_mutex == nullptr) {
- return -1;
- }
-
- session_mutex->lock();
-
- // In extreme scenarios, websocket data cannot be processed,
- // ignore the data coming in later to prevent iSulad from getting stuck
- if (close || buffer.size() >= FIFO_LIST_BUFFER_MAX_LEN) {
- free(message);
- session_mutex->unlock();
- return -1;
- }
-
- buffer.push_back(message);
- session_mutex->unlock();
- return 0;
- }
-
- bool IsClosed()
- {
- bool c = false;
-
- if (session_mutex == nullptr) {
- return true;
- }
-
- session_mutex->lock();
- c = close;
- session_mutex->unlock();
-
- return c;
- }
-
- void CloseSession()
- {
- if (session_mutex == nullptr) {
- return;
- }
-
- session_mutex->lock();
- close = true;
- session_mutex->unlock();
- }
-
- void EraseAllMessage()
- {
- if (session_mutex == nullptr) {
- return;
- }
-
- session_mutex->lock();
- for (auto iter = buffer.begin(); iter != buffer.end();) {
- free(*iter);
- *iter = NULL;
- iter = buffer.erase(iter);
- }
- session_mutex->unlock();
- }
+ unsigned char *FrontMessage();
+ void PopMessage();
+ int PushMessage(unsigned char *message);
+ bool IsClosed();
+ void CloseSession();
+ void EraseAllMessage();
};
class WebsocketServer {
public:
static WebsocketServer *GetInstance() noexcept;
- static std::atomic<WebsocketServer *> m_instance;
void Start(Errors &err);
void Wait();
void Shutdown();
void RegisterCallback(const std::string &path, std::shared_ptr<StreamingServeInterface> callback);
url::URLDatum GetWebsocketUrl();
void SetLwsSendedFlag(int socketID, bool sended);
- void ReadLockAllWsSession();
- void UnlockAllWsSession();
private:
WebsocketServer();
@@ -171,33 +70,41 @@ private:
virtual ~WebsocketServer();
int InitRWPipe(int read_fifo[]);
std::vector<std::string> split(std::string str, char r);
- static void EmitLog(int level, const char *line);
+
int CreateContext();
inline void Receive(int socketID, void *in, size_t len);
- int Wswrite(struct lws *wsi, const unsigned char *message);
+ int Wswrite(struct lws *wsi, const unsigned char *message);
inline void DumpHandshakeInfo(struct lws *wsi) noexcept;
int RegisterStreamTask(struct lws *wsi) noexcept;
- int GenerateSessionData(session_data *session, const std::string containerID) noexcept;
- static int Callback(struct lws *wsi, enum lws_callback_reasons reason,
- void *user, void *in, size_t len);
+ int GenerateSessionData(SessionData *session, const std::string containerID) noexcept;
void ServiceWorkThread(int threadid);
void CloseWsSession(int socketID);
void CloseAllWsSession();
- int ResizeTerminal(int socketID, const char *jsonData, size_t len,
- const std::string &containerID, const std::string &suffix);
- int parseTerminalSize(const char *jsonData, size_t len, uint16_t &width, uint16_t &height);
+ int ResizeTerminal(int socketID, const char *jsonData, size_t len, const std::string &containerID,
+ const std::string &suffix);
+ int ParseTerminalSize(const char *jsonData, size_t len, uint16_t &width, uint16_t &height);
+
+private:
+ // redirect libwebsockets logs to iSulad
+ static void EmitLog(int level, const char *line);
+ // libwebsockets Callback function
+ static int Callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
private:
+ static std::atomic<WebsocketServer *> m_instance;
static RWMutex m_mutex;
static struct lws_context *m_context;
- volatile int m_force_exit = 0;
- std::thread m_pthread_service;
- const struct lws_protocols m_protocols[MAX_PROTOCOL_NUM] = {
- { "channel.k8s.io", Callback, 0, MAX_ECHO_PAYLOAD, },
- { nullptr, nullptr, 0, 0 }
- };
+ volatile int m_forceExit = 0;
+ std::thread m_pthreadService;
+ const struct lws_protocols m_protocols[MAX_PROTOCOL_NUM] = { {
+ "channel.k8s.io",
+ Callback,
+ 0,
+ MAX_ECHO_PAYLOAD,
+ },
+ { nullptr, nullptr, 0, 0 } };
RouteCallbackRegister m_handler;
- static std::unordered_map<int, session_data *> m_wsis;
+ static std::unordered_map<int, SessionData *> m_wsis;
url::URLDatum m_url;
int m_listenPort;
};
@@ -207,4 +114,3 @@ ssize_t WsWriteStderrToClient(void *context, const void *data, size_t len);
int closeWsConnect(void *context, char **err);
#endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_WS_SERVER_H
-
--
2.25.1