iSulad/0069-cri-fix-residual-IO-copy-thread-in-CRI-exec-operatio.patch
WangFengTu b1ffa045c4 iSulad: sync with upstream iSulad
Signed-off-by: WangFengTu <wangfengtu@huawei.com>
2021-05-18 14:48:15 +08:00

1062 lines
41 KiB
Diff

From e10bcde6bc507767fc0770c0a1606b6f96494d6b Mon Sep 17 00:00:00 2001
From: wujing <wujing50@huawei.com>
Date: Sat, 27 Mar 2021 18:38:08 +0800
Subject: [PATCH 069/104] cri: fix residual IO copy thread in CRI exec
operation
Signed-off-by: wujing <wujing50@huawei.com>
---
.../cri/websocket/service/attach_serve.cc | 20 +-
.../cri/websocket/service/attach_serve.h | 2 +-
.../entry/cri/websocket/service/exec_serve.cc | 24 +-
.../entry/cri/websocket/service/exec_serve.h | 2 +-
.../service/route_callback_register.h | 24 +-
.../entry/cri/websocket/service/ws_server.cc | 259 +++++++++---------
.../entry/cri/websocket/service/ws_server.h | 53 +++-
src/daemon/modules/service/io_handler.c | 53 ++--
src/utils/console/console.c | 38 ++-
src/utils/console/console.h | 5 +-
10 files changed, 284 insertions(+), 196 deletions(-)
diff --git a/src/daemon/entry/cri/websocket/service/attach_serve.cc b/src/daemon/entry/cri/websocket/service/attach_serve.cc
index 01c6b9cf..ec2edc8b 100644
--- a/src/daemon/entry/cri/websocket/service/attach_serve.cc
+++ b/src/daemon/entry/cri/websocket/service/attach_serve.cc
@@ -16,30 +16,44 @@
#include "attach_serve.h"
#include "utils.h"
-int AttachServe::Execute(struct lws *wsi, const std::string &token, int read_pipe_fd)
+int AttachServe::Execute(lwsContext lws_ctx, const std::string &token, int read_pipe_fd)
{
+ prctl(PR_SET_NAME, "AttachServe");
+
service_executor_t *cb = get_service_executor();
if (cb == nullptr || cb->container.attach == nullptr) {
+ sem_post(lws_ctx.sync_close_sem);
return -1;
}
container_attach_request *container_req = nullptr;
if (GetContainerRequest(token, &container_req) != 0) {
ERROR("Failed to get contaner request");
+ sem_post(lws_ctx.sync_close_sem);
+ return -1;
+ }
+
+ // attach operation is non-blocking and cannot pass a local variable in
+ // free memory when close websocket session in closeWsConnect
+ lwsContext *lws_context = new (std::nothrow)lwsContext(lws_ctx);
+ if (lws_context == nullptr) {
+ ERROR("Out of memory");
+ sem_post(lws_ctx.sync_close_sem);
return -1;
}
struct io_write_wrapper stringWriter = { 0 };
- stringWriter.context = (void *)wsi;
+ stringWriter.context = (void *)(lws_context);
stringWriter.write_func = WsWriteStdoutToClient;
stringWriter.close_func = closeWsConnect;
container_req->attach_stderr = false;
container_attach_response *container_res = nullptr;
int ret = cb->container.attach(container_req, &container_res, container_req->attach_stdin ? read_pipe_fd : -1,
- &stringWriter, nullptr);
+ container_req->attach_stdout ? &stringWriter : nullptr, nullptr);
if (ret != 0) {
ERROR("Failed to attach container: %s", container_req->container_id);
+ sem_post(lws_ctx.sync_close_sem);
}
free_container_attach_request(container_req);
diff --git a/src/daemon/entry/cri/websocket/service/attach_serve.h b/src/daemon/entry/cri/websocket/service/attach_serve.h
index 00e2b34e..0d29f35b 100644
--- a/src/daemon/entry/cri/websocket/service/attach_serve.h
+++ b/src/daemon/entry/cri/websocket/service/attach_serve.h
@@ -33,7 +33,7 @@ public:
AttachServe(const AttachServe &) = delete;
AttachServe &operator=(const AttachServe &) = delete;
virtual ~AttachServe() = default;
- int Execute(struct lws *wsi, const std::string &token, int read_pipe_fd) override;
+ int Execute(lwsContext lws_ctx, const std::string &token, int read_pipe_fd) override;
private:
int RequestFromCri(const runtime::v1alpha2::AttachRequest &grequest,
container_attach_request **request);
diff --git a/src/daemon/entry/cri/websocket/service/exec_serve.cc b/src/daemon/entry/cri/websocket/service/exec_serve.cc
index 855d28b8..96675734 100644
--- a/src/daemon/entry/cri/websocket/service/exec_serve.cc
+++ b/src/daemon/entry/cri/websocket/service/exec_serve.cc
@@ -17,25 +17,37 @@
#include "io_wrapper.h"
#include "utils.h"
-int ExecServe::Execute(struct lws *wsi, const std::string &token, int read_pipe_fd)
+int ExecServe::Execute(lwsContext lws_ctx, const std::string &token, int read_pipe_fd)
{
service_executor_t *cb = get_service_executor();
if (cb == nullptr || cb->container.exec == nullptr) {
+ sem_post(lws_ctx.sync_close_sem);
return -1;
}
container_exec_request *container_req = nullptr;
if (GetContainerRequest(token, &container_req) != 0) {
ERROR("Failed to get contaner request");
+ sem_post(lws_ctx.sync_close_sem);
+ return -1;
+ }
+
+ lwsContext *lws_context = new (std::nothrow)lwsContext(lws_ctx);
+ if (lws_context == nullptr) {
+ ERROR("Out of memory");
+ sem_post(lws_ctx.sync_close_sem);
return -1;
}
struct io_write_wrapper StdoutstringWriter = { 0 };
- StdoutstringWriter.context = (void *)wsi;
+ StdoutstringWriter.context = (void *)lws_context;
StdoutstringWriter.write_func = WsWriteStdoutToClient;
+ // the close function of StderrstringWriter is preferred unless StderrstringWriter is nullptr
+ StdoutstringWriter.close_func = container_req->attach_stderr ? nullptr : closeWsStream;
struct io_write_wrapper StderrstringWriter = { 0 };
- StderrstringWriter.context = (void *)wsi;
+ StderrstringWriter.context = (void *)lws_context;
StderrstringWriter.write_func = WsWriteStderrToClient;
+ StderrstringWriter.close_func = container_req->attach_stderr ? closeWsStream : nullptr;
container_exec_response *container_res = nullptr;
int ret = cb->container.exec(container_req, &container_res, container_req->attach_stdin ? read_pipe_fd : -1,
@@ -48,17 +60,17 @@ int ExecServe::Execute(struct lws *wsi, const std::string &token, int read_pipe_
} else {
message = "Failed to call exec container callback. ";
}
- WsWriteStdoutToClient(wsi, message.c_str(), message.length());
+ WsWriteStdoutToClient(lws_context, message.c_str(), message.length());
}
if (container_res != nullptr && container_res->exit_code != 0) {
std::string exit_info = "Exit code :" + std::to_string((int)container_res->exit_code) + "\n";
- WsWriteStdoutToClient(wsi, exit_info.c_str(), exit_info.length());
+ WsWriteStdoutToClient(lws_context, exit_info.c_str(), exit_info.length());
}
free_container_exec_request(container_req);
free_container_exec_response(container_res);
- (void)closeWsConnect((void*)wsi, nullptr);
+ closeWsConnect((void*)lws_context, nullptr);
return ret;
}
diff --git a/src/daemon/entry/cri/websocket/service/exec_serve.h b/src/daemon/entry/cri/websocket/service/exec_serve.h
index b29c3e1e..093f076a 100644
--- a/src/daemon/entry/cri/websocket/service/exec_serve.h
+++ b/src/daemon/entry/cri/websocket/service/exec_serve.h
@@ -37,7 +37,7 @@ public:
ExecServe(const ExecServe &) = delete;
ExecServe &operator=(const ExecServe &) = delete;
virtual ~ExecServe() = default;
- int Execute(struct lws *wsi, const std::string &token, int read_pipe_fd) override;
+ int Execute(lwsContext lws_ctx, const std::string &token, int read_pipe_fd) override;
private:
int RequestFromCri(const runtime::v1alpha2::ExecRequest &grequest, container_exec_request **request);
diff --git a/src/daemon/entry/cri/websocket/service/route_callback_register.h b/src/daemon/entry/cri/websocket/service/route_callback_register.h
index 5d021d17..9c6bdd64 100644
--- a/src/daemon/entry/cri/websocket/service/route_callback_register.h
+++ b/src/daemon/entry/cri/websocket/service/route_callback_register.h
@@ -21,14 +21,21 @@
#include <utility>
#include <map>
#include <unistd.h>
+#include <semaphore.h>
#include "isula_libutils/log.h"
+
+struct lwsContext {
+ int fd;
+ sem_t *sync_close_sem;
+};
+
class StreamingServeInterface {
public:
StreamingServeInterface() = default;
StreamingServeInterface(const StreamingServeInterface &) = delete;
StreamingServeInterface &operator=(const StreamingServeInterface &) = delete;
virtual ~StreamingServeInterface() = default;
- virtual int Execute(struct lws *wsi, const std::string &token, int read_pipe_fd) = 0;
+ virtual int Execute(lwsContext lws_ctx, const std::string &token, int read_pipe_fd) = 0;
};
class RouteCallbackRegister {
@@ -42,15 +49,14 @@ public:
return static_cast<bool>(m_registeredcallbacks.count(method));
}
- int HandleCallback(struct lws *wsi, const std::string &method,
- const std::string &token,
- int read_pipe_fd)
+ int HandleCallback(lwsContext lws_ctx, const std::string &method,
+ const std::string &token, int read_pipe_fd)
{
auto it = m_registeredcallbacks.find(method);
if (it != m_registeredcallbacks.end()) {
std::shared_ptr<StreamingServeInterface> callback = it->second;
if (callback) {
- return callback->Execute(wsi, token, read_pipe_fd);
+ return callback->Execute(lws_ctx, token, read_pipe_fd);
}
}
ERROR("invalid method!");
@@ -69,21 +75,21 @@ private:
class StreamTask {
public:
- StreamTask(RouteCallbackRegister *invoker, struct lws *wsi,
+ StreamTask(RouteCallbackRegister *invoker, lwsContext lws_ctx,
const std::string &method,
const std::string &token, int read_pipe_fd)
- : m_invoker(invoker), m_wsi(wsi), m_method(method), m_token(token),
+ : m_invoker(invoker), m_lws_ctx(lws_ctx), m_method(method), m_token(token),
m_read_pipe_fd(read_pipe_fd) {}
StreamTask(const StreamTask &) = delete;
StreamTask &operator=(const StreamTask &) = delete;
virtual ~StreamTask() = default;
int Run()
{
- return m_invoker->HandleCallback(m_wsi, m_method, m_token, m_read_pipe_fd);
+ return m_invoker->HandleCallback(m_lws_ctx, m_method, m_token, m_read_pipe_fd);
}
private:
RouteCallbackRegister *m_invoker{ nullptr };
- struct lws *m_wsi;
+ lwsContext m_lws_ctx;
std::string m_method;
std::string m_token;
int m_read_pipe_fd;
diff --git a/src/daemon/entry/cri/websocket/service/ws_server.cc b/src/daemon/entry/cri/websocket/service/ws_server.cc
index 795d2c1e..4993e1e8 100644
--- a/src/daemon/entry/cri/websocket/service/ws_server.cc
+++ b/src/daemon/entry/cri/websocket/service/ws_server.cc
@@ -30,7 +30,6 @@ struct lws_context *WebsocketServer::m_context = nullptr;
std::atomic<WebsocketServer *> WebsocketServer::m_instance;
RWMutex WebsocketServer::m_mutex;
std::unordered_map<int, session_data> WebsocketServer::m_wsis;
-std::unordered_set<struct lws *> WebsocketServer::m_activeSession;
WebsocketServer *WebsocketServer::GetInstance() noexcept
{
@@ -159,12 +158,10 @@ void WebsocketServer::CloseAllWsSession()
{
WriteGuard<RWMutex> lock(m_mutex);
for (auto it = m_wsis.begin(); it != m_wsis.end(); ++it) {
- free(it->second.buf);
+ it->second.EraseAllMessage();
close(it->second.pipes.at(0));
close(it->second.pipes.at(1));
- it->second.sended = true;
delete it->second.buf_mutex;
- delete it->second.sended_mutex;
}
m_wsis.clear();
}
@@ -173,59 +170,54 @@ void WebsocketServer::CloseWsSession(int socketID)
{
auto it = m_wsis.find(socketID);
if (it != m_wsis.end()) {
- free(it->second.buf);
+ it->second.EraseAllMessage();
+ // close the pipe write endpoint first, make sure io copy thread exit,
+ // otherwise epoll will trigger EOF
+ if (it->second.pipes.at(1) >= 0) {
+ close(it->second.pipes.at(1));
+ it->second.pipes.at(1) = -1;
+ }
+ (void)sem_wait(it->second.sync_close_sem);
+ (void)sem_destroy(it->second.sync_close_sem);
close(it->second.pipes.at(0));
- close(it->second.pipes.at(1));
- it->second.sended = true;
delete it->second.buf_mutex;
- delete it->second.sended_mutex;
m_wsis.erase(it);
}
}
-void WebsocketServer::RecordSession(struct lws *wsi)
-{
- m_activeSession.insert(wsi);
-}
-
-void WebsocketServer::RemoveSession(struct lws *wsi)
-{
- m_activeSession.erase(wsi);
-}
-
-bool WebsocketServer::IsValidSession(struct lws *wsi)
-{
- return m_activeSession.count(wsi) != 0;
-}
-
-int WebsocketServer::DumpHandshakeInfo(struct lws *wsi) noexcept
+int WebsocketServer::GenerateSessionData(session_data &session) noexcept
{
int read_pipe_fd[PIPE_FD_NUM];
if (InitRWPipe(read_pipe_fd) < 0) {
ERROR("failed to init read/write pipe!");
+ return -1;
}
- session_data session;
- session.pipes = std::array<int, MAX_ARRAY_LEN> { read_pipe_fd[0], read_pipe_fd[1] };
+ std::mutex *buf_mutex = new std::mutex;
+ sem_t *sync_close_sem = new sem_t;
- int socketID = lws_get_socket_fd(wsi);
- m_wsis.insert(std::make_pair(socketID, std::move(session)));
- m_wsis[socketID].buf = (unsigned char *)util_common_calloc_s(LWS_PRE + MAX_MSG_BUFFER_SIZE + 1);
- if (m_wsis[socketID].buf == nullptr) {
- ERROR("Out of memory");
+ if (sem_init(sync_close_sem, 0, 0) != 0) {
+ ERROR("Semaphore initialization failed");
+ close(read_pipe_fd[1]);
+ close(read_pipe_fd[0]);
+ delete buf_mutex;
+ delete sync_close_sem;
return -1;
}
- m_wsis[socketID].buf_mutex = new std::mutex;
- m_wsis[socketID].sended_mutex = new std::mutex;
- m_wsis[socketID].SetProcessingStatus(false);
- int len;
- char buf[MAX_BUF_LEN] { 0 };
+ session.pipes = std::array<int, MAX_ARRAY_LEN> { read_pipe_fd[0], read_pipe_fd[1] };
+ session.buf_mutex = buf_mutex;
+ session.sync_close_sem = sync_close_sem;
+ return 0;
+}
+
+int WebsocketServer::RegisterStreamTask(struct lws *wsi) noexcept
+{
+ char buf[MAX_BUF_LEN] { 0 };
lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI);
if (strlen(buf) == 0) {
ERROR("invalid url");
- CloseWsSession(socketID);
return -1;
}
@@ -237,60 +229,68 @@ int WebsocketServer::DumpHandshakeInfo(struct lws *wsi) noexcept
!m_handler.IsValidMethod(vec.at(1)) ||
!cache->IsValidToken(vec.at(2))) {
ERROR("invalid url(%s): incorrect format!", buf);
- CloseWsSession(socketID);
return -1;
}
+ session_data session;
+ if (GenerateSessionData(session) != 0) {
+ ERROR("failed to fill generate session data");
+ return -1;
+ }
+
+ int socketID = lws_get_socket_fd(wsi);
+ m_wsis.insert(std::make_pair(socketID, std::move(session)));
+
+ lwsContext lwsCtx = {
+ .fd = socketID,
+ .sync_close_sem = m_wsis[socketID].sync_close_sem,
+ };
std::thread streamTh([ = ]() {
- StreamTask(&m_handler, wsi, vec.at(1), vec.at(2), m_wsis[socketID].pipes.at(0)).Run();
+ StreamTask(&m_handler, lwsCtx, vec.at(1), vec.at(2), m_wsis[socketID].pipes.at(0)).Run();
});
streamTh.detach();
- RecordSession(wsi);
+
+ return 0;
+}
+
+void WebsocketServer::DumpHandshakeInfo(struct lws *wsi) noexcept
+{
int n = 0;
const unsigned char *c = nullptr;
+ char buf[MAX_BUF_LEN] { 0 };
+
do {
c = lws_token_to_string((lws_token_indexes)n);
if (c == nullptr) {
n++;
continue;
}
- len = lws_hdr_total_length(wsi, (lws_token_indexes)n);
+ int len = lws_hdr_total_length(wsi, (lws_token_indexes)n);
if (len == 0 || (static_cast<size_t>(len) > sizeof(buf) - 1)) {
n++;
continue;
}
+
lws_hdr_copy(wsi, buf, sizeof(buf), (lws_token_indexes)n);
buf[sizeof(buf) - 1] = '\0';
DEBUG(" %s = %s", (char *)c, buf);
n++;
} while (c != nullptr);
-
- return 0;
}
-int WebsocketServer::Wswrite(struct lws *wsi, void *in, size_t len)
+int WebsocketServer::Wswrite(struct lws *wsi, const unsigned char *message)
{
auto it = m_wsis.find(lws_get_socket_fd(wsi));
if (it != m_wsis.end()) {
- if (it->second.close) {
- DEBUG("websocket session disconnected");
- return -1;
- }
- it->second.buf_mutex->lock();
- auto &buf = it->second.buf;
- if (strlen((const char *)(&buf[LWS_PRE + 1])) == 0) {
- it->second.buf_mutex->unlock();
+ if (strlen((const char *)(&message[LWS_PRE + 1])) == 0) {
return 0;
}
- int n = lws_write(wsi, (unsigned char *)(&buf[LWS_PRE]),
- strlen((const char *)(&buf[LWS_PRE + 1])) + 1, LWS_WRITE_TEXT);
+ int n = lws_write(wsi, (unsigned char *)(&message[LWS_PRE]),
+ strlen((const char *)(&message[LWS_PRE + 1])) + 1, LWS_WRITE_TEXT);
if (n < 0) {
- it->second.buf_mutex->unlock();
ERROR("ERROR %d writing to socket, hanging up", n);
return -1;
}
- (void)memset(buf, 0, LWS_PRE + MAX_MSG_BUFFER_SIZE + 1);
- it->second.buf_mutex->unlock();
}
return 0;
@@ -314,16 +314,6 @@ void WebsocketServer::Receive(int socketID, void *in, size_t len)
}
}
-void WebsocketServer::SetLwsSendedFlag(int socketID, bool sended)
-{
- if (m_wsis.count(socketID) == 0) {
- return;
- }
- m_wsis[socketID].sended_mutex->lock();
- m_wsis[socketID].sended = sended;
- m_wsis[socketID].sended_mutex->unlock();
-}
-
int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
@@ -334,10 +324,13 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason,
return -1;
case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: {
WriteGuard<RWMutex> lock(m_mutex);
- if (WebsocketServer::GetInstance()->DumpHandshakeInfo(wsi)) {
+ WebsocketServer::GetInstance()->DumpHandshakeInfo(wsi);
+ if (WebsocketServer::GetInstance()->RegisterStreamTask(wsi) != 0) {
// return non-zero here and kill the connection
return -1;
}
+ // Trigger polling in LWS_CALLBACK_SERVER_WRITEABLE
+ lws_callback_on_writable(wsi);
}
break;
case LWS_CALLBACK_ESTABLISHED: {
@@ -347,13 +340,32 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason,
case LWS_CALLBACK_SERVER_WRITEABLE: {
ReadGuard<RWMutex> lock(m_mutex);
int socketID = lws_get_socket_fd(wsi);
- if (WebsocketServer::GetInstance()->Wswrite(wsi, in, len)) {
- WebsocketServer::GetInstance()->SetLwsSendedFlag(socketID, true);
+ auto it = m_wsis.find(socketID);
+ if (it == m_wsis.end()) {
+ DEBUG("invalid session!");
// return nonzero from the user callback to close the connection
// and callback with the reason of LWS_CALLBACK_CLOSED
return -1;
}
- WebsocketServer::GetInstance()->SetLwsSendedFlag(socketID, true);
+
+ while (!it->second.buffer.empty()) {
+ unsigned char *message = it->second.FrontMessage();
+ // send success! free it and erase for list
+ if (WebsocketServer::GetInstance()->Wswrite(wsi, (const unsigned char *)message) == 0) {
+ free(message);
+ it->second.PopMessage();
+ } else {
+ // Another case ret > 0, send fail! keep message and send it again!
+ // Or maybe the client was shut down abnormally
+ break;
+ }
+ }
+
+ if (it->second.close) {
+ DEBUG("websocket session disconnected");
+ return -1;
+ }
+ lws_callback_on_writable(wsi);
}
break;
case LWS_CALLBACK_RECEIVE: {
@@ -364,8 +376,8 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason,
case LWS_CALLBACK_CLOSED: {
WriteGuard<RWMutex> lock(m_mutex);
DEBUG("connection has been closed");
- WebsocketServer::GetInstance()->RemoveSession(wsi);
- WebsocketServer::GetInstance()->CloseWsSession(lws_get_socket_fd(wsi));
+ int socketID = lws_get_socket_fd(wsi);
+ WebsocketServer::GetInstance()->CloseWsSession(socketID);
}
break;
default:
@@ -377,6 +389,9 @@ int WebsocketServer::Callback(struct lws *wsi, enum lws_callback_reasons reason,
void WebsocketServer::ServiceWorkThread(int threadid)
{
int n = 0;
+
+ prctl(PR_SET_NAME, "WebsocketServer");
+
while (n >= 0 && !m_force_exit) {
n = lws_service(m_context, 0);
}
@@ -410,73 +425,37 @@ void WebsocketServer::Wait()
}
namespace {
-auto PrepareWsiSession(int socketID) -> session_data *
-{
- WebsocketServer *server = WebsocketServer::GetInstance();
- server->ReadLockAllWsSession();
- auto itor = server->GetWsisData().find(socketID);
- if (itor == server->GetWsisData().end()) {
- ERROR("invalid session!");
- server->UnlockAllWsSession();
- return nullptr;
- }
- server->SetLwsSendedFlag(socketID, false);
- server->UnlockAllWsSession();
-
- return &itor->second;
-}
-
-void DoWriteToClient(struct lws *wsi, session_data *session,
+void DoWriteToClient(int fd, session_data *session,
const void *data, size_t len, WebsocketChannel channel)
{
- session->buf_mutex->lock();
+ unsigned char *buf = (unsigned char *)util_common_calloc_s(LWS_PRE + MAX_MSG_BUFFER_SIZE + 1);
+ if (buf == nullptr) {
+ ERROR("Out of memory");
+ return;
+ }
// Determine if it is standard output channel or error channel
- (void)memset(session->buf, 0, LWS_PRE + MAX_MSG_BUFFER_SIZE + 1);
- session->buf[LWS_PRE] = channel;
-
- (void)memcpy(&session->buf[LWS_PRE + 1], (void *)data, len);
+ buf[LWS_PRE] = channel;
- lws_callback_on_writable(wsi);
+ (void)memcpy(&buf[LWS_PRE + 1], (void *)data, len);
- session->buf_mutex->unlock();
-}
-
-void EnsureWrited(struct lws *wsi, session_data *session)
-{
- const int RETRIES = 10;
- const int CHECK_PERIOD_SECOND = 1;
- const int TRIGGER_PERIOD_MS = 1;
- auto start = std::chrono::system_clock::now();
- int count = 0;
-
- while (!session->sended && count < RETRIES) {
- auto end = std::chrono::system_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
- double spend_time = static_cast<double>(duration.count()) * std::chrono::microseconds::period::num /
- std::chrono::microseconds::period::den;
- if (spend_time > CHECK_PERIOD_SECOND) {
- lws_callback_on_writable(wsi);
- std::this_thread::sleep_for(std::chrono::milliseconds(TRIGGER_PERIOD_MS));
- start = std::chrono::system_clock::now();
- count++;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(TRIGGER_PERIOD_MS));
- }
+ // push back to message list
+ session->PushMessage(buf);
}
ssize_t WsWriteToClient(void *context, const void *data, size_t len, WebsocketChannel channel)
{
- struct lws *wsi = static_cast<struct lws *>(context);
+ auto *lwsCtx = static_cast<lwsContext *>(context);
+ int fd = lwsCtx->fd;
- session_data *session = PrepareWsiSession(lws_get_socket_fd(wsi));
- if (session == nullptr) {
+ WebsocketServer *server = WebsocketServer::GetInstance();
+ auto itor = server->GetWsisData().find(fd);
+ if (itor == server->GetWsisData().end()) {
+ ERROR("invalid session!");
return 0;
}
- DoWriteToClient(wsi, session, data, len, channel);
-
- EnsureWrited(wsi, session);
+ DoWriteToClient(fd, &itor->second, data, len, channel);
return static_cast<ssize_t>(len);
}
@@ -495,23 +474,37 @@ ssize_t WsWriteStderrToClient(void *context, const void *data, size_t len)
int closeWsConnect(void *context, char **err)
{
(void)err;
- struct lws *wsi = static_cast<struct lws *>(context);
+ auto *lwsCtx = static_cast<lwsContext *>(context);
+
+ if (lwsCtx->sync_close_sem != nullptr) {
+ (void)sem_post(lwsCtx->sync_close_sem);
+ }
WebsocketServer *server = WebsocketServer::GetInstance();
server->ReadLockAllWsSession();
- auto it = server->GetWsisData().find(lws_get_socket_fd(wsi));
+ auto it = server->GetWsisData().find(lwsCtx->fd);
if (it == server->GetWsisData().end()) {
server->UnlockAllWsSession();
ERROR("websocket session not exist");
+ delete lwsCtx;
return -1;
}
-
+ // will close websocket session on LWS_CALLBACK_SERVER_WRITEABLE polling
it->second.close = true;
- // close websocket session
- if (server->IsValidSession(wsi)) {
- lws_callback_on_writable(wsi);
- }
server->UnlockAllWsSession();
+ delete lwsCtx;
return 0;
}
+
+int closeWsStream(void *context, char **err)
+{
+ (void)err;
+ auto *lwsCtx = static_cast<lwsContext *>(context);
+
+ if (lwsCtx->sync_close_sem != nullptr) {
+ (void)sem_post(lwsCtx->sync_close_sem);
+ }
+
+ return 0;
+}
\ No newline at end of file
diff --git a/src/daemon/entry/cri/websocket/service/ws_server.h b/src/daemon/entry/cri/websocket/service/ws_server.h
index cb431f7f..ebba3305 100644
--- a/src/daemon/entry/cri/websocket/service/ws_server.h
+++ b/src/daemon/entry/cri/websocket/service/ws_server.h
@@ -22,6 +22,7 @@
#include <mutex>
#include <atomic>
#include <memory>
+#include <list>
#include <thread>
#include <libwebsockets.h>
#include "route_callback_register.h"
@@ -47,20 +48,45 @@ enum WebsocketChannel {
struct session_data {
std::array<int, MAX_ARRAY_LEN> pipes;
- unsigned char *buf;
- volatile bool sended { false };
volatile bool close { false };
- volatile bool in_processing { false };
std::mutex *buf_mutex;
- std::mutex *sended_mutex;
+ sem_t *sync_close_sem;
+ std::list<unsigned char *> buffer;
- void SetProcessingStatus(bool status)
+ unsigned char *FrontMessage()
{
- in_processing = status;
+ unsigned char *message = nullptr;
+
+ buf_mutex->lock();
+ message = buffer.front();
+ buf_mutex->unlock();
+
+ return message;
+ }
+
+ void PopMessage()
+ {
+ buf_mutex->lock();
+ buffer.pop_front();
+ buf_mutex->unlock();
}
- bool GetProcessingStatus() const
+
+ void PushMessage(unsigned char *message)
+ {
+ buf_mutex->lock();
+ buffer.push_back(message);
+ buf_mutex->unlock();
+ }
+
+ void EraseAllMessage()
{
- return in_processing;
+ buf_mutex->lock();
+ for (auto iter = buffer.begin(); iter != buffer.end();) {
+ free(*iter);
+ *iter = NULL;
+ iter = buffer.erase(iter);
+ }
+ buf_mutex->unlock();
}
};
@@ -77,7 +103,6 @@ public:
void SetLwsSendedFlag(int socketID, bool sended);
void ReadLockAllWsSession();
void UnlockAllWsSession();
- bool IsValidSession(struct lws *wsi);
private:
WebsocketServer();
@@ -89,15 +114,15 @@ private:
static void EmitLog(int level, const char *line);
int CreateContext();
inline void Receive(int socketID, void *in, size_t len);
- int Wswrite(struct lws *wsi, void *in, size_t len);
- inline int DumpHandshakeInfo(struct lws *wsi) noexcept;
+ int Wswrite(struct lws *wsi, const unsigned char *message);
+ inline void DumpHandshakeInfo(struct lws *wsi) noexcept;
+ int RegisterStreamTask(struct lws *wsi) noexcept;
+ int GenerateSessionData(session_data &session) noexcept;
static int Callback(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len);
void ServiceWorkThread(int threadid);
void CloseWsSession(int socketID);
void CloseAllWsSession();
- void RecordSession(struct lws *wsi);
- void RemoveSession(struct lws *wsi);
private:
static RWMutex m_mutex;
@@ -110,7 +135,6 @@ private:
};
RouteCallbackRegister m_handler;
static std::unordered_map<int, session_data> m_wsis;
- static std::unordered_set<struct lws *> m_activeSession;
url::URLDatum m_url;
int m_listenPort;
};
@@ -118,6 +142,7 @@ private:
ssize_t WsWriteStdoutToClient(void *context, const void *data, size_t len);
ssize_t WsWriteStderrToClient(void *context, const void *data, size_t len);
int closeWsConnect(void *context, char **err);
+int closeWsStream(void *context, char **err);
#endif // DAEMON_ENTRY_CRI_WEBSOCKET_SERVICE_WS_SERVER_H
diff --git a/src/daemon/modules/service/io_handler.c b/src/daemon/modules/service/io_handler.c
index 75a36483..d57894f2 100644
--- a/src/daemon/modules/service/io_handler.c
+++ b/src/daemon/modules/service/io_handler.c
@@ -183,6 +183,7 @@ struct io_copy_arg {
io_type dsttype;
void *dst;
int dstfifoflag;
+ transfer_channel_type channel;
};
struct io_copy_thread_arg {
@@ -194,7 +195,7 @@ struct io_copy_thread_arg {
};
static void io_copy_thread_cleanup(struct io_write_wrapper *writers, struct io_copy_thread_arg *thread_arg, int *infds,
- int *outfds, int *srcfds, size_t len)
+ int *outfds, int *srcfds, transfer_channel_type *channels, size_t len)
{
size_t i = 0;
for (i = 0; i < len; i++) {
@@ -214,9 +215,11 @@ static void io_copy_thread_cleanup(struct io_write_wrapper *writers, struct io_c
free(infds);
free(outfds);
free(writers);
+ free(channels);
}
-static int io_copy_init_fds(size_t len, int **infds, int **outfds, int **srcfds, struct io_write_wrapper **writers)
+static int io_copy_init_fds(size_t len, int **infds, int **outfds, int **srcfds,
+ struct io_write_wrapper **writers, transfer_channel_type **channels)
{
size_t i;
@@ -252,9 +255,19 @@ static int io_copy_init_fds(size_t len, int **infds, int **outfds, int **srcfds,
ERROR("Out of memory");
return -1;
}
+
+ *channels = util_common_calloc_s(sizeof(transfer_channel_type) * len);
+ if (*channels == NULL) {
+ ERROR("Out of memory");
+ return -1;
+ }
+
+ for (i = 0; i < len; i++) {
+ (*channels)[i] = MAX_CHANNEL;
+ }
return 0;
-}
+}
typedef int (*src_io_type_handle)(int index, struct io_copy_arg *copy_arg, int *infds, int *srcfds);
struct src_io_copy_handler {
@@ -292,7 +305,8 @@ static int handle_src_io_max(int index, struct io_copy_arg *copy_arg, int *infds
return -1;
}
-static int io_copy_make_srcfds(size_t len, struct io_copy_arg *copy_arg, int *infds, int *srcfds)
+static int io_copy_make_srcfds(size_t len, struct io_copy_arg *copy_arg, int *infds,
+ int *srcfds, transfer_channel_type *channels)
{
size_t i;
@@ -307,6 +321,7 @@ static int io_copy_make_srcfds(size_t len, struct io_copy_arg *copy_arg, int *in
if (src_handler_jump_table[(int)(copy_arg[i].srctype)].handle(i, copy_arg, infds, srcfds) != 0) {
return -1;
}
+ channels[i] = copy_arg[i].channel;
}
return 0;
@@ -412,6 +427,7 @@ static void *io_copy_thread_main(void *arg)
int *outfds = NULL; // recored fds to close
int *srcfds = NULL;
struct io_write_wrapper *writers = NULL;
+ transfer_channel_type *channels = NULL;
int sync_fd = thread_arg->sync_fd;
bool posted = false;
@@ -426,11 +442,11 @@ static void *io_copy_thread_main(void *arg)
(void)prctl(PR_SET_NAME, "IoCopy");
len = thread_arg->len;
- if (io_copy_init_fds(len, &infds, &outfds, &srcfds, &writers) != 0) {
+ if (io_copy_init_fds(len, &infds, &outfds, &srcfds, &writers, &channels) != 0) {
goto err;
}
- if (io_copy_make_srcfds(len, copy_arg, infds, srcfds) != 0) {
+ if (io_copy_make_srcfds(len, copy_arg, infds, srcfds, channels) != 0) {
goto err;
}
@@ -440,12 +456,12 @@ static void *io_copy_thread_main(void *arg)
sem_post(&thread_arg->wait_sem);
posted = true;
- (void)console_loop_io_copy(sync_fd, srcfds, writers, len);
+ (void)console_loop_io_copy(sync_fd, srcfds, writers, channels, len);
err:
if (!posted) {
sem_post(&thread_arg->wait_sem);
}
- io_copy_thread_cleanup(writers, thread_arg, infds, outfds, srcfds, len);
+ io_copy_thread_cleanup(writers, thread_arg, infds, outfds, srcfds, channels, len);
DAEMON_CLEAR_ERRMSG();
return NULL;
}
@@ -480,26 +496,27 @@ static int start_io_copy_thread(int sync_fd, bool detach, struct io_copy_arg *co
}
static void add_io_copy_element(struct io_copy_arg *element, io_type srctype, void *src, io_type dsttype, void *dst,
- int dstfifoflag)
+ int dstfifoflag, transfer_channel_type channel)
{
element->srctype = srctype;
element->src = src;
element->dsttype = dsttype;
element->dst = dst;
element->dstfifoflag = dstfifoflag;
+ element->channel = channel;
}
/*
-----------------------------------------------------------------------------------
| CHANNEL | iSula iSulad lxc |
-----------------------------------------------------------------------------------
- | | fifoin fifos[0] |
+ | | fifoin | stdin_fd fifos[0] |
| IN | RDWR --------> RD RDWR --------> RD |
-----------------------------------------------------------------------------------
- | | fifoout fifos[1] |
+ | | fifoout | stdout_handler fifos[1] |
| OUT | RD <-------- WR RD <-------- WR |
-----------------------------------------------------------------------------------
- | | fifoerr fifos[2] |
+ | | fifoerr stderr_handler fifos[2] |
| ERR | RD <-------- WR RD <-------- WR |
-----------------------------------------------------------------------------------
*/
@@ -513,29 +530,29 @@ int ready_copy_io_data(int sync_fd, bool detach, const char *fifoin, const char
if (fifoin != NULL) {
// fifoin : iSula -> iSulad read
// fifos[0] : iSulad -> lxc write
- add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifoin, IO_FIFO, (void *)fifos[0], O_RDWR);
+ add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifoin, IO_FIFO, (void *)fifos[0], O_RDWR, STDIN_CHANNEL);
}
if (fifoout != NULL) {
// fifos[1] : lxc -> iSulad read
// fifoout : iSulad -> iSula write
- add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[1], IO_FIFO, (void *)fifoout, O_WRONLY);
+ add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[1], IO_FIFO, (void *)fifoout, O_WRONLY, STDOUT_CHANNEL);
}
if (fifoerr != NULL) {
- add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[2], IO_FIFO, (void *)fifoerr, O_WRONLY);
+ add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[2], IO_FIFO, (void *)fifoerr, O_WRONLY, STDERR_CHANNEL);
}
if (stdin_fd > 0) {
- add_io_copy_element(&io_copy[len++], IO_FD, &stdin_fd, IO_FIFO, (void *)fifos[0], O_RDWR);
+ add_io_copy_element(&io_copy[len++], IO_FD, &stdin_fd, IO_FIFO, (void *)fifos[0], O_RDWR, STDIN_CHANNEL);
}
if (stdout_handler != NULL) {
- add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[1], IO_FUNC, stdout_handler, O_WRONLY);
+ add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[1], IO_FUNC, stdout_handler, O_WRONLY, STDOUT_CHANNEL);
}
if (stderr_handler != NULL) {
- add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[2], IO_FUNC, stderr_handler, O_WRONLY);
+ add_io_copy_element(&io_copy[len++], IO_FIFO, (void *)fifos[2], IO_FUNC, stderr_handler, O_WRONLY, STDERR_CHANNEL);
}
if (start_io_copy_thread(sync_fd, detach, io_copy, len, tid) != 0) {
diff --git a/src/utils/console/console.c b/src/utils/console/console.c
index 1aedd0c9..b1d8b6dc 100644
--- a/src/utils/console/console.c
+++ b/src/utils/console/console.c
@@ -328,7 +328,7 @@ out:
return ret;
}
-static void client_console_tty_state_close(struct epoll_descr *descr, const struct tty_state *ts)
+static void console_tty_state_close(struct epoll_descr *descr, const struct tty_state *ts)
{
if (ts->stdin_reader >= 0) {
epoll_loop_del_handler(descr, ts->stdin_reader);
@@ -430,13 +430,14 @@ int console_loop_with_std_fd(int stdinfd, int stdoutfd, int stderrfd, int fifoin
ret = safe_epoll_loop(&descr);
err_out:
- client_console_tty_state_close(&descr, &ts);
+ console_tty_state_close(&descr, &ts);
epoll_loop_close(&descr);
return ret;
}
/* console loop copy */
-int console_loop_io_copy(int sync_fd, const int *srcfds, struct io_write_wrapper *writers, size_t len)
+int console_loop_io_copy(int sync_fd, const int *srcfds, struct io_write_wrapper *writers,
+ transfer_channel_type *channels, size_t len)
{
int ret = 0;
size_t i = 0;
@@ -460,17 +461,35 @@ int console_loop_io_copy(int sync_fd, const int *srcfds, struct io_write_wrapper
}
for (i = 0; i < len; i++) {
- // Reusing ts.stdout_reader and ts.stdout_writer for coping io
- ts[i].stdout_reader = srcfds[i];
- ts[i].stdout_writer.context = writers[i].context;
- ts[i].stdout_writer.write_func = writers[i].write_func;
+ // initial tty_state
+ ts[i].stdin_reader = -1;
+ ts[i].stdout_reader = -1;
+ ts[i].stderr_reader = -1;
ts[i].sync_fd = -1;
- ret = epoll_loop_add_handler(&descr, ts[i].stdout_reader, console_cb_stdio_copy, &ts[i]);
+ if (channels[i] == STDIN_CHANNEL) {
+ ts[i].stdin_reader = srcfds[i];
+ ts[i].stdin_writer.context = writers[i].context;
+ ts[i].stdin_writer.write_func = writers[i].write_func;
+ ret = epoll_loop_add_handler(&descr, ts[i].stdin_reader, console_cb_stdio_copy, &ts[i]);
+ } else if (channels[i] == STDOUT_CHANNEL) {
+ // Reusing ts.stdout_reader and ts.stdout_writer for coping io
+ ts[i].stdout_reader = srcfds[i];
+ ts[i].stdout_writer.context = writers[i].context;
+ ts[i].stdout_writer.write_func = writers[i].write_func;
+ ret = epoll_loop_add_handler(&descr, ts[i].stdout_reader, console_cb_stdio_copy, &ts[i]);
+ } else {
+ // Reusing ts.stderr_reader and ts.stderr_writer for coping io
+ ts[i].stderr_reader = srcfds[i];
+ ts[i].stderr_writer.context = writers[i].context;
+ ts[i].stderr_writer.write_func = writers[i].write_func;
+ ret = epoll_loop_add_handler(&descr, ts[i].stderr_reader, console_cb_stdio_copy, &ts[i]);
+ }
if (ret != 0) {
ERROR("Add handler for masterfd failed");
goto err_out;
}
}
+
if (sync_fd >= 0) {
ts[i].sync_fd = sync_fd;
epoll_loop_add_handler(&descr, ts[i].sync_fd, console_cb_stdio_copy, &ts[i]);
@@ -483,9 +502,8 @@ int console_loop_io_copy(int sync_fd, const int *srcfds, struct io_write_wrapper
ret = safe_epoll_loop(&descr);
err_out:
-
for (i = 0; i < (len + 1); i++) {
- epoll_loop_del_handler(&descr, ts[i].stdout_reader);
+ console_tty_state_close(&descr, &ts[i]);
}
epoll_loop_close(&descr);
free(ts);
diff --git a/src/utils/console/console.h b/src/utils/console/console.h
index 0dfe19d3..63103d2b 100644
--- a/src/utils/console/console.h
+++ b/src/utils/console/console.h
@@ -43,6 +43,8 @@ struct tty_state {
bool ignore_stdin_close;
};
+typedef enum { STDIN_CHANNEL, STDOUT_CHANNEL, STDERR_CHANNEL, MAX_CHANNEL} transfer_channel_type;
+
int console_fifo_name(const char *rundir, const char *subpath, const char *stdflag, char *fifo_name,
size_t fifo_name_sz, char *fifo_path, size_t fifo_path_sz, bool do_mkdirp);
@@ -59,7 +61,8 @@ void console_fifo_close(int fd);
int console_loop_with_std_fd(int stdinfd, int stdoutfd, int stderrfd, int fifoinfd, int fifooutfd, int fifoerrfd,
int tty_exit, bool tty);
-int console_loop_io_copy(int sync_fd, const int *srcfds, struct io_write_wrapper *writers, size_t len);
+int console_loop_io_copy(int sync_fd, const int *srcfds, struct io_write_wrapper *writers,
+ transfer_channel_type *channels, size_t len);
int setup_tios(int fd, struct termios *curr_tios);
--
2.25.1