From bbf3f17765483e2e87e96e975c1d85bb5250c8f2 Mon Sep 17 00:00:00 2001 From: gaohuatao Date: Wed, 20 Jan 2021 10:13:14 +0800 Subject: [PATCH 11/26] shim: optimize io stream Signed-off-by: gaohuatao --- src/cmd/isulad-shim/common.c | 25 ++++++++++++++ src/cmd/isulad-shim/common.h | 2 ++ src/cmd/isulad-shim/process.c | 48 ++++++++++++++++---------- src/cmd/isulad-shim/process.h | 6 ++-- src/cmd/isulad-shim/terminal.c | 2 +- test/cmd/isulad-shim/isulad-shim_ut.cc | 2 +- 6 files changed, 61 insertions(+), 24 deletions(-) diff --git a/src/cmd/isulad-shim/common.c b/src/cmd/isulad-shim/common.c index 23aa33cd..324d72a1 100644 --- a/src/cmd/isulad-shim/common.c +++ b/src/cmd/isulad-shim/common.c @@ -84,6 +84,31 @@ ssize_t write_nointr(int fd, const void *buf, size_t count) return nret; } +ssize_t write_nointr_in_total(int fd, const char *buf, size_t count) +{ + ssize_t nret = 0; + ssize_t nwritten; + + if (buf == NULL) { + return -1; + } + + for (nwritten = 0; nwritten < count;) { + nret = write(fd, buf + nwritten, count - nwritten); + if (nret < 0) { + if (errno == EINTR || errno == EAGAIN) { + continue; + } else { + return nret; + } + } else { + nwritten += nret; + } + } + + return nwritten; +} + bool file_exists(const char *f) { struct stat buf; diff --git a/src/cmd/isulad-shim/common.h b/src/cmd/isulad-shim/common.h index b83d72ba..8c6ea7ba 100644 --- a/src/cmd/isulad-shim/common.h +++ b/src/cmd/isulad-shim/common.h @@ -52,6 +52,8 @@ extern "C" { ssize_t read_nointr(int fd, void *buf, size_t count); ssize_t write_nointr(int fd, const void *buf, size_t count); +ssize_t write_nointr_in_total(int fd, const char *buf, size_t count); + char *read_text_file(const char *path); bool file_exists(const char *f); diff --git a/src/cmd/isulad-shim/process.c b/src/cmd/isulad-shim/process.c index 606a3df7..3ac739b9 100644 --- a/src/cmd/isulad-shim/process.c +++ b/src/cmd/isulad-shim/process.c @@ -44,15 +44,6 @@ extern int g_log_fd; -typedef int (*epoll_loop_callback_t)(int fd, uint32_t event, void *data); - -struct epoll_loop_handler { - epoll_loop_callback_t cb; - int epfd; - int cbfd; - void *cbdata; -}; - static shim_client_process_state *load_process() { parser_error err = NULL; @@ -243,7 +234,7 @@ static void remove_io_dispatch(io_thread_t *io_thd, int from, int to) pthread_mutex_unlock(&(ioc->mutex)); } -static void *task_io_copy(void *data) +static void *do_io_copy(void *data) { io_thread_t *io_thd = (io_thread_t *)data; if (io_thd == NULL || io_thd->ioc == NULL) { @@ -278,7 +269,7 @@ static void *task_io_copy(void *data) shim_write_container_log_file(io_thd->terminal, ioc->id == stdid_out ? "stdout" : "stderr", buf, r_count); } else { - int w_count = write_nointr(fn->fd, buf, r_count); + int w_count = write_nointr_in_total(fn->fd, buf, r_count); if (w_count < 0) { /* When any error occurs, remove the write fd */ remove_io_dispatch(io_thd, -1, fn->fd); @@ -287,7 +278,11 @@ static void *task_io_copy(void *data) } } - if (io_thd->shutdown) { + /* + In the case of stdout and stderr, maybe numbers of read bytes are not the last msg in pipe. + So, when the value of r_count is larger than zero, we need to try reading again to avoid loss msgs. + */ + if (io_thd->shutdown && r_count <= 0) { break; } } @@ -301,7 +296,7 @@ static void *task_io_copy(void *data) return NULL; } -static void do_io_copy(int fd, uint32_t event, void *data) +static void sem_post_inotify_io_copy(int fd, uint32_t event, void *data) { io_thread_t *thd = (io_thread_t *)data; if (thd->ioc == NULL || fd != thd->ioc->fd_from) { @@ -318,7 +313,7 @@ static void do_io_copy(int fd, uint32_t event, void *data) return; } -static int process_io_start(process_t *p, int std_id) +static int create_io_copy_thread(process_t *p, int std_id) { int ret = SHIM_ERR; io_thread_t *io_thd = NULL; @@ -351,7 +346,7 @@ static int process_io_start(process_t *p, int std_id) p->io_threads[std_id] = io_thd; - ret = pthread_create(&(io_thd->tid), NULL, task_io_copy, io_thd); + ret = pthread_create(&(io_thd->tid), NULL, do_io_copy, io_thd); if (ret != SHIM_OK) { write_message(g_log_fd, ERR_MSG, "thread io copy create failed:%d", SHIM_SYS_ERR(errno)); goto failure; @@ -380,7 +375,7 @@ static int start_io_copy_threads(process_t *p) /* 3 threads for stdin, stdout and stderr */ for (i = 0; i < 3; i++) { - ret = process_io_start(p, i); + ret = create_io_copy_thread(p, i); if (ret != SHIM_OK) { return SHIM_ERR; } @@ -405,6 +400,20 @@ static void destroy_io_thread(process_t *p, int std_id) p->io_threads[std_id] = NULL; } +/* + std_id: channel type + isulad_stdio: one side of the isulad fifo file + fd: one side of the shim io pipe + --------------------------------------------------------------- + | CHANNEL | iSulad Fifo Side | Flow Direction | fd | + --------------------------------------------------------------- + | STDIN | READ | --> | WRITE | + --------------------------------------------------------------- + | STDOUT | WRITE | <-- | READ | + --------------------------------------------------------------- + | STDERR | WRITE | <-- | READ | + --------------------------------------------------------------- +*/ static int connect_to_isulad(process_t *p, int std_id, const char *isulad_stdio, int fd) { mode_t mode; @@ -501,7 +510,7 @@ out: return NULL; } -static void *task_io_loop(void *data) +static void *io_epoll_loop(void *data) { process_t *p = (process_t *)data; int wait_fds = 0; @@ -526,7 +535,7 @@ static void *task_io_loop(void *data) for (i = 0; i < wait_fds; i++) { io_thread_t *thd_io = (io_thread_t *)evs[i].data.ptr; - do_io_copy(thd_io->ioc->fd_from, evs[i].events, thd_io); + sem_post_inotify_io_copy(thd_io->ioc->fd_from, evs[i].events, thd_io); } } } @@ -702,6 +711,7 @@ static int open_generic_io(process_t *p) { int ret = SHIM_ERR; + // io: in: w out/err: r stdio_t *io = initialize_io(p); if (io == NULL) { return SHIM_ERR; @@ -858,7 +868,7 @@ int process_io_init(process_t *p) int ret = SHIM_ERR; pthread_t tid_loop; - ret = pthread_create(&tid_loop, NULL, task_io_loop, p); + ret = pthread_create(&tid_loop, NULL, io_epoll_loop, p); if (ret != SHIM_OK) { return SHIM_SYS_ERR(errno); } diff --git a/src/cmd/isulad-shim/process.h b/src/cmd/isulad-shim/process.h index 17704320..c17a20b1 100644 --- a/src/cmd/isulad-shim/process.h +++ b/src/cmd/isulad-shim/process.h @@ -66,13 +66,13 @@ typedef struct process { char *id; char *bundle; char *runtime; - char *console_sock_path; + char *console_sock_path;// pty socket path int io_loop_fd; int exit_fd; int ctr_pid; log_terminal *terminal; - stdio_t *stdio; - stdio_t *shim_io; + stdio_t *stdio;// shim to on runtime side, in:r out/err: w + stdio_t *shim_io; // shim io on isulad side, in: w out/err: r io_thread_t *io_threads[3];// stdin,stdout,stderr shim_client_process_state *state; sem_t sem_mainloop; diff --git a/src/cmd/isulad-shim/terminal.c b/src/cmd/isulad-shim/terminal.c index 989f20d8..ac39539a 100644 --- a/src/cmd/isulad-shim/terminal.c +++ b/src/cmd/isulad-shim/terminal.c @@ -38,7 +38,7 @@ static ssize_t shim_write_nointr_lock(log_terminal *terminal, const void *buf, s ssize_t ret; (void)pthread_rwlock_wrlock(&terminal->log_terminal_rwlock); - ret = write_nointr(terminal->fd, buf, count); + ret = write_nointr_in_total(terminal->fd, buf, count); (void)pthread_rwlock_unlock(&terminal->log_terminal_rwlock); return ret; diff --git a/test/cmd/isulad-shim/isulad-shim_ut.cc b/test/cmd/isulad-shim/isulad-shim_ut.cc index d512f0bc..34ecd452 100644 --- a/test/cmd/isulad-shim/isulad-shim_ut.cc +++ b/test/cmd/isulad-shim/isulad-shim_ut.cc @@ -79,7 +79,7 @@ TEST_F(IsuladShimUnitTest, test_read_write_nointr) fd_wr = open_no_inherit(test_file.c_str(), O_CREAT | O_RDWR | O_APPEND | O_SYNC, 0640); EXPECT_GT(fd_wr, 0); - nwrite = write_nointr(fd_wr, test_string.c_str(), 5); + nwrite = write_nointr_in_total(fd_wr, test_string.c_str(), 5); EXPECT_EQ(nwrite, 5); fd_rd = open(test_file.c_str(), O_RDONLY); nread = read_nointr(fd_rd, buf, 32); -- 2.25.1