dpu-utilities/0001-rewrite-client-rexec_run.patch

548 lines
19 KiB
Diff
Raw Normal View History

From 47fdab7bf180b058f6bbed10dd17e9a4c784eecc Mon Sep 17 00:00:00 2001
From: liqiang <liqiang64@huawei.com>
Date: Thu, 1 Jun 2023 15:46:05 +0800
Subject: rewrite client rexec_run
Signed-off-by: liqiang <liqiang64@huawei.com>
---
qtfs/rexec/rexec.c | 256 +++++++++++++++++++++++++++++---------
qtfs/rexec/rexec.h | 8 ++
qtfs/rexec/rexec_server.c | 51 +++++---
3 files changed, 236 insertions(+), 79 deletions(-)
diff --git a/qtfs/rexec/rexec.c b/qtfs/rexec/rexec.c
index 4dd206d..489ebec 100644
--- a/qtfs/rexec/rexec.c
+++ b/qtfs/rexec/rexec.c
@@ -42,6 +42,21 @@
#define REXEC_MSG_LEN 1024
FILE *rexec_logfile = NULL;
+struct rexec_global_var {
+ int rexec_hs_fd[2];
+};
+
+struct rexec_global_var g_rexec;
+
+
+struct rexec_client_event {
+ int fd;
+ int outfd; // for stdin out err and other pipe
+ int (*handler)(struct rexec_client_event *);
+ int *exit_status;
+ int *pidfd;
+};
+
#define REXEC_PIDMAP_PATH "/var/run/rexec/pids"
#define REXEC_PIDMAP_PATH_LEN 64
#define REXEC_PID_LEN 16
@@ -84,37 +99,39 @@ static int rexec_msg_fill_argv(int argc, char *argv[], char *msg)
return offset;
}
-static int rexec_io(int infd, int outfd, char *buf, int buflen)
+static int rexec_io(struct rexec_client_event *evt)
{
+#define MAX_MSG_LEN 256
+ char buf[MAX_MSG_LEN];
int len;
int ret;
- while ((len = read(infd, buf, buflen)) > 0) {
- ret = write(outfd, buf, len);
+ while ((len = read(evt->fd, buf, MAX_MSG_LEN)) > 0) {
+ ret = write(evt->outfd, buf, len);
if (ret <= 0) {
- rexec_err("Read from fd:%d len:%d write to fd:%d failed ret:%d", infd, len, outfd, ret);
- return -1;
+ rexec_err("Read from fd:%d len:%d write to fd:%d failed ret:%d", evt->fd, len, evt->outfd, ret);
+ return REXEC_EVENT_EXIT;
}
if (ret != len) {
- rexec_err("Read from fd:%d len:%d but write to fd:%d ret:%d", infd, len, outfd, ret);
+ rexec_err("Read from fd:%d len:%d but write to fd:%d ret:%d", evt->fd, len, evt->outfd, ret);
}
}
- return 0;
+ return REXEC_EVENT_OK;
}
// return -1 means process exit.
-static int rexec_conn_msg(int connfd, int *exit_status, int *pidfd)
+static int rexec_conn_msg(struct rexec_client_event *evt)
{
struct rexec_msg head;
- int ret = recv(connfd, &head, sizeof(struct rexec_msg), MSG_WAITALL);
+ int ret = recv(evt->fd, &head, sizeof(struct rexec_msg), MSG_WAITALL);
if (ret <= 0) {
rexec_err("Rexec conn recv err:%d errno:%d", ret, errno);
- return -1;
+ return REXEC_EVENT_EXIT;
}
switch (head.msgtype) {
case REXEC_KILL:
- *exit_status = head.exit_status;
+ *evt->exit_status = head.exit_status;
rexec_err("Rexec conn recv kill msg, exit:%d now.", head.exit_status);
- return -1;
+ return REXEC_EVENT_EXIT;
case REXEC_PIDMAP: {
int mypid = getpid();
int peerpid = head.pid;
@@ -122,9 +139,9 @@ static int rexec_conn_msg(int connfd, int *exit_status, int *pidfd)
char buf[REXEC_PID_LEN] = {0};
int fd;
int err;
- if (*pidfd > 0) {
+ if (*evt->pidfd > 0) {
rexec_err("Rexec pidmap msg > 1 error.");
- return 0;
+ return REXEC_EVENT_OK;
}
sprintf(path, "%s/%d", REXEC_PIDMAP_PATH, mypid);
fd = open(path, O_CREAT|O_WRONLY, 0600);
@@ -133,23 +150,41 @@ static int rexec_conn_msg(int connfd, int *exit_status, int *pidfd)
mypid, peerpid, path, fd);
break;
}
- *pidfd = fd;
+ *evt->pidfd = fd;
if ((err = flock(fd, LOCK_EX)) != 0) {
rexec_err("Rexec flock file:%s failed, errno:%d rexec exit.", path, err);
- return -1;
+ return REXEC_EVENT_EXIT;
}
if ((err = ftruncate(fd, 0)) != 0) {
rexec_err("Rexec pidmap file:%s clear failed errno:%d rexec exit.", path, err);
- return -1;
+ return REXEC_EVENT_EXIT;
}
if ((err = lseek(fd, 0, SEEK_SET)) < 0) {
rexec_err("Rexec pidmap file:%s lseek 0 failed errno:%d rexec exit", path, err);
- return -1;
+ return REXEC_EVENT_EXIT;
}
sprintf(buf, "%d", peerpid);
if ((err = write(fd, buf, strlen(buf))) <= 0) {
rexec_err("Rexec pidmap file:%s write pid:%d failed errno:%d rexec exit.", path, peerpid, err);
- return -1;
+ return REXEC_EVENT_EXIT;
+ }
+ if (g_rexec.rexec_hs_fd[PIPE_WRITE] != -1 && g_rexec.rexec_hs_fd[PIPE_READ] != -1) {
+ err = write(g_rexec.rexec_hs_fd[PIPE_WRITE], "1", 1);
+ if (err <= 0) {
+ rexec_err("rexec handshake write 1 failed, hs write:%d.", g_rexec.rexec_hs_fd[PIPE_WRITE]);
+ return REXEC_EVENT_ERR;
+ }
+ } else {
+ char msg[sizeof(struct rexec_msg) + 1];
+ struct rexec_msg *hs = msg;
+ char *ok = hs->msg;
+ hs->msgtype = REXEC_HANDSHAKE;
+ hs->msglen = 1;
+ *ok = '1';
+ if (write(evt->fd, hs, sizeof(struct rexec_msg) + 1) <= 0) {
+ rexec_err("send handshake failed, remote process will die");
+ return REXEC_EVENT_EXIT;
+ }
}
break;
}
@@ -159,6 +194,35 @@ static int rexec_conn_msg(int connfd, int *exit_status, int *pidfd)
rexec_log("Rexec conn recv msgtype:%d argc:%d stdno:%d msglen:%d",
head.msgtype, head.argc, head.stdno, head.msglen);
+ return REXEC_EVENT_OK;
+}
+
+static struct rexec_client_event *rexec_add_event(int efd, int fd, int outfd, int (*handler)(struct rexec_client_event *))
+{
+ struct rexec_client_event *event = (struct rexec_client_event *)malloc(sizeof(struct rexec_client_event));
+ if (event == NULL) {
+ rexec_err("malloc failed.");
+ return NULL;
+ }
+ event->fd = fd;
+ event->outfd = outfd;
+ event->handler = handler;
+ struct epoll_event evt;
+ evt.data.ptr = (void *)event;
+ evt.events = EPOLLIN;
+ if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, event->fd, &evt)) {
+ rexec_err("epoll ctl add fd:%d event failed.", event->fd);
+ free(event);
+ return NULL;
+ }
+ return event;
+}
+
+static int rexec_del_event(struct rexec_client_event *event)
+{
+ // close will del fd in epoll list
+ close(event->fd);
+ free(event);
return 0;
}
@@ -166,48 +230,48 @@ enum {
REPOL_IN_INDEX = 0,
REPOL_OUT_INDEX,
REPOL_ERR_INDEX,
- REPOL_CONN_INDEX,
REPOL_INV_INDEX,
};
-static int rexec_run(int rstdin, int rstdout, int rstderr, int connfd, char *argv[])
+static int rexec_std_event(int efd, int rstdin, int rstdout, int rstderr)
{
- int exit_status = EXIT_FAILURE;
-#define REXEC_MAX_EVENTS 4
- int infds[4] = {STDIN_FILENO, rstdout, rstderr, connfd};
- int outfds[4] = {rstdin, STDOUT_FILENO, STDERR_FILENO, connfd};
+ #define REXEC_MAX_EVENTS 4
+ int infds[REPOL_INV_INDEX] = {STDIN_FILENO, rstdout, rstderr};
+ int outfds[REPOL_INV_INDEX] = {rstdin, STDOUT_FILENO, STDERR_FILENO};
- int efd = epoll_create1(0);
- if (efd == -1) {
- rexec_err("epoll create1 failed, errno:%d.", errno);
- return exit_status;
- }
- struct epoll_event evt;
for (int i = 0; i < REPOL_INV_INDEX; i++) {
- evt.data.u32 = i;
- evt.events = EPOLLIN;
- if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, infds[i], &evt)) {
+ if (NULL == rexec_add_event(efd, infds[i], outfds[i], rexec_io)) {
rexec_err("epoll ctl add fd:%d event failed and ignore this mistake.", infds[i]);
continue;
} else {
if (rexec_set_nonblock(infds[i], 1) != 0) {
rexec_err("rexec set fd:%d i:%d non block failed.", infds[i], i);
- return exit_status;
+ return -1;
}
}
}
+ return 0;
+}
+
+static int rexec_run(int efd, int connfd, char *argv[])
+{
+ int pidfd = -1;
+ int exit_status = EXIT_FAILURE;
+
+ struct rexec_client_event *connevt = rexec_add_event(efd, connfd, -1, rexec_conn_msg);
+ if (NULL == connevt || rexec_set_nonblock(connfd, 1) != 0) {
+ // process will exit, fd or mem resource will free by kernel soon
+ rexec_err("rexec add connfd event failed");
+ return exit_status;
+ }
+ // 这两个指针只能在当前函数上下文使用,是当前函数栈指针
+ connevt->exit_status = &exit_status;
+ connevt->pidfd = &pidfd;
struct epoll_event *evts = calloc(REXEC_MAX_EVENTS, sizeof(struct epoll_event));
if (evts == NULL) {
rexec_err("init calloc evts failed.");
goto end;
}
- int buflen = REXEC_MSG_LEN;
- char *buf = (char *)malloc(buflen);
- int pidfd = -1;
- if (buf == NULL) {
- rexec_err("Rexec malloc failed.");
- goto free_end;
- }
rexec_log("Rexec process start run, as proxy of remote %s", argv[1]);
while (1) {
int n = epoll_wait(efd, evts, REXEC_MAX_EVENTS, 1000);
@@ -219,23 +283,16 @@ static int rexec_run(int rstdin, int rstdout, int rstderr, int connfd, char *arg
continue;
}
for (int i = 0; i < n; i++) {
- int infd = -1;
- int outfd = -1;
- if (evts[i].data.u32 >= REPOL_INV_INDEX) {
- rexec_err("invalid epoll events index data:%d", evts[i].data.u32);
- continue;
+ struct rexec_client_event *evt = (struct rexec_client_event *)evts[i].data.ptr;
+ int ret = evt->handler(evt);
+ if (evts[i].events & EPOLLHUP || ret == REXEC_EVENT_EXIT) {
+ process_exit = 1;
}
- infd = infds[evts[i].data.u32];
- outfd = outfds[evts[i].data.u32];
- if (infd == connfd) {
- if (evts[i].events & EPOLLHUP || rexec_conn_msg(connfd, &exit_status, &pidfd) == -1)
- process_exit = 1;
- } else {
- if (rexec_io(infd, outfd, buf, buflen) == -1) {
- close(infd);
- }
+ if (ret == REXEC_EVENT_DEL) {
+ rexec_del_event(evt);
}
}
+ // process will exit, and free all resource and exit
if (process_exit) {
rexec_log("Rexec process %s exit.", argv[1]);
break;
@@ -250,8 +307,6 @@ static int rexec_run(int rstdin, int rstdout, int rstderr, int connfd, char *arg
remove(path);
}
- free(buf);
-
free_end:
free(evts);
@@ -319,7 +374,7 @@ struct rexec_fdinfo {
int offset;
};
-static inline int rexec_is_reg_file(int fd)
+static inline unsigned int rexec_fd_mode(int fd)
{
struct stat st;
char path[32] = {0};
@@ -327,9 +382,13 @@ static inline int rexec_is_reg_file(int fd)
rexec_err("get fd:%d fstat failed, errno:%d", fd, errno);
return 0;
}
- if (S_ISREG(st.st_mode)) {
+ return st.st_mode;
+}
+
+static inline int rexec_is_reg_file(int fd)
+{
+ if (S_ISREG(rexec_fd_mode(fd)))
return 1;
- }
return 0;
}
@@ -429,16 +488,85 @@ err_end:
return NULL;
}
+static int rexec_handshake_proc(struct rexec_client_event *evt)
+{
+ char msg[sizeof(struct rexec_msg) + 1];
+ struct rexec_msg *hs = msg;
+ int ret = read(evt->fd, hs->msg, 1);
+ if (ret <= 0) {
+ rexec_err("read from handshake pipe failed, ret:%d err:%d", ret, errno);
+ return REXEC_EVENT_DEL;
+ }
+ hs->msgtype = REXEC_HANDSHAKE;
+ hs->msglen = 1;
+ ret = write(evt->outfd, hs, sizeof(struct rexec_msg) + 1);
+ if (ret < 0) {
+ rexec_err("send handshake failed, connfd:%d.", evt->outfd);
+ }
+ return REXEC_EVENT_OK;
+}
+
+static int rexec_handshake_init(int efd, int connfd)
+{
+ char *hs_read = getenv("REXEC_HANDSHAKE_RD");
+ char *hs_write = getenv("REXEC_HANDSHAKE_WR");
+
+ if (hs_read == NULL || hs_write == NULL) {
+ rexec_log("handshake not in effect, read:%lx write%lx", hs_read, hs_write);
+ return 0;
+ }
+ g_rexec.rexec_hs_fd[PIPE_READ] = atoi(hs_read);
+ g_rexec.rexec_hs_fd[PIPE_WRITE] = atoi(hs_write);
+ if (g_rexec.rexec_hs_fd[PIPE_READ] <= STDERR_FILENO || g_rexec.rexec_hs_fd[PIPE_WRITE] <= STDERR_FILENO) {
+ rexec_log("handshake invalid fd read:%d write:%d", g_rexec.rexec_hs_fd[PIPE_READ], g_rexec.rexec_hs_fd[PIPE_WRITE]);
+ goto err_end;
+ }
+ if (!S_ISFIFO(rexec_fd_mode(g_rexec.rexec_hs_fd[PIPE_READ])) || !S_ISFIFO(rexec_fd_mode(g_rexec.rexec_hs_fd[PIPE_WRITE]))) {
+ rexec_err("handshake fd mode not fifo:%d %d", g_rexec.rexec_hs_fd[PIPE_READ], g_rexec.rexec_hs_fd[PIPE_WRITE]);
+ goto err_end;
+ }
+ if (rexec_add_event(efd, g_rexec.rexec_hs_fd[PIPE_READ], connfd, rexec_handshake_proc) == NULL) {
+ rexec_err("add handshake pipe read fd:%d to epoll failed", g_rexec.rexec_hs_fd[PIPE_READ]);
+ goto err_end;
+ }
+ rexec_log("handshake effect read:%d write:%d", g_rexec.rexec_hs_fd[PIPE_READ], g_rexec.rexec_hs_fd[PIPE_WRITE]);
+ return 0;
+err_end:
+ g_rexec.rexec_hs_fd[PIPE_READ] = -1;
+ g_rexec.rexec_hs_fd[PIPE_WRITE] = -1;
+ return -1;
+}
+
+static void rexec_global_var_init()
+{
+ memset(&g_rexec, 0, sizeof(g_rexec));
+ g_rexec.rexec_hs_fd[PIPE_READ] = -1;
+ g_rexec.rexec_hs_fd[PIPE_WRITE] = -1;
+ return;
+}
+
int main(int argc, char *argv[])
{
rexec_log_init();
rexec_clear_pids();
+ int efd = epoll_create1(0);
+ if (efd == -1) {
+ rexec_err("epoll create1 failed, errno:%d.", errno);
+ return -1;
+ }
+ rexec_global_var_init();
+
int connfd = rexec_conn_to_server();
if (connfd < 0) {
rexec_err("Rexec connect to server failed, errno:%d", errno);
return -1;
}
+
+ if (rexec_handshake_init(efd, connfd) != 0) {
+ rexec_err("Rexec handshake environment set but get error.");
+ return -1;
+ }
rexec_log("Remote exec binary:%s", argv[1]);
int arglen = rexec_calc_argv_len(argc - 1, &argv[1]);
@@ -513,7 +641,11 @@ int main(int argc, char *argv[])
close(rstdin[0]);
close(rstdout[1]);
close(rstderr[1]);
- exit_status = rexec_run(rstdin[1], rstdout[0], rstderr[0], connfd, argv);
+ if (rexec_std_event(efd, rstdin[1], rstdout[0], rstderr[0]) != 0) {
+ rexec_err("add std event failed");
+ goto err_end;
+ }
+ exit_status = rexec_run(efd, connfd, argv);
close(rstdin[1]);
close(rstdout[0]);
close(rstderr[0]);
diff --git a/qtfs/rexec/rexec.h b/qtfs/rexec/rexec.h
index ba7c2be..ce1280a 100644
--- a/qtfs/rexec/rexec.h
+++ b/qtfs/rexec/rexec.h
@@ -24,6 +24,13 @@ enum {
PIPE_WRITE,
};
+enum {
+ REXEC_EVENT_OK,
+ REXEC_EVENT_DEL, // del this event
+ REXEC_EVENT_EXIT, // exit process
+ REXEC_EVENT_ERR,
+};
+
enum {
REXEC_STDIN = 0x5a,
REXEC_STDOUT,
@@ -45,6 +52,7 @@ enum rexec_msgtype {
REXEC_KILL, // kill process
REXEC_PIPE, // client send a pipefd as stdin/out/err to server
REXEC_PIDMAP, // server send remote process's pid to client
+ REXEC_HANDSHAKE,
};
struct rexec_msg {
diff --git a/qtfs/rexec/rexec_server.c b/qtfs/rexec/rexec_server.c
index 686c051..2aa3275 100644
--- a/qtfs/rexec/rexec_server.c
+++ b/qtfs/rexec/rexec_server.c
@@ -65,12 +65,6 @@ struct rexec_event {
int (*handler)(struct rexec_event *);
};
-enum {
- REXEC_EVENT_OK,
- REXEC_EVENT_ERR,
- REXEC_EVENT_DEL,
-};
-
static int rexec_add_event(int efd, int fd, int pid, int (*handler)(struct rexec_event *))
{
struct rexec_event *event = (struct rexec_event *)malloc(sizeof(struct rexec_event));
@@ -86,6 +80,7 @@ static int rexec_add_event(int efd, int fd, int pid, int (*handler)(struct rexec
evt.events = EPOLLIN;
if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, event->fd, &evt)) {
rexec_err("epoll ctl add fd:%d event failed.", event->fd);
+ free(event);
return -1;
}
return 0;
@@ -136,15 +131,6 @@ static int rexec_event_handshake(struct rexec_event *event)
rexec_log("Rexec recv son pid:%d, connfd:%d", sonpid, connfd);
rexec_hash_insert_direct(child_hash, sonpid, connfd);
-
- struct rexec_msg head;
- head.msgtype = REXEC_PIDMAP;
- head.msglen = 0;
- head.pid = sonpid;
- ret = write(connfd, &head, sizeof(struct rexec_msg));
- if (ret <= 0) {
- rexec_err("Rexec send son pid:%d to client failed, ret:%d errno:%d", sonpid, ret, errno);
- }
rexec_add_event(main_epoll_fd, connfd, sonpid, rexec_event_process_manage);
// 成功后同样要删除这个pipe监听事件删除时会close掉fd
@@ -326,7 +312,7 @@ static int rexec_start_new_process(int newconnfd)
int scmfd = -1;
int len = sizeof(struct rexec_msg);
memset(&head, 0, sizeof(struct rexec_msg));
- int ret = rexec_recvmsg(newconnfd, (char *)&head, len, &scmfd, MSG_WAITALL);
+ ret = rexec_recvmsg(newconnfd, (char *)&head, len, &scmfd, MSG_WAITALL);
if (ret <= 0) {
rexec_log("recvmsg ret:%d, errno:%d", ret, errno);
goto err_to_parent;
@@ -375,14 +361,45 @@ static int rexec_start_new_process(int newconnfd)
goto err_free;
}
+ char *ack;
int mypid = getpid();
+ char msg[sizeof(struct rexec_msg) + 1];
+ struct rexec_msg *pm = msg;
+ pm->msgtype = REXEC_PIDMAP;
+ pm->msglen = 0;
+ pm->pid = mypid;
+ ret = write(newconnfd, pm, sizeof(struct rexec_msg));
+ if (ret <= 0) {
+ rexec_err("Rexec send son pid:%d to client failed, ret:%d errno:%d", mypid, ret, errno);
+ } else {
+retry:
+ rexec_log("Waiting for rexec client handshake...");
+ ret = read(newconnfd, pm, sizeof(struct rexec_msg) + 1);
+ if (ret <= 0) {
+ rexec_err("Recv handshake failed, ret:%d err:%d", ret, errno);
+ goto err_to_parent;
+ }
+ if (pm->msgtype != REXEC_HANDSHAKE) {
+ rexec_err("Recv unexpected msg:%d", pm->msgtype);
+ goto retry;
+ }
+ ack = pm->msg;
+ if (*ack != '1') {
+ rexec_err("recv error handshake ack from client:%c, exit now", *ack);
+ goto err_to_parent;
+ }
+ }
// 写会PID必须放在基于newconnfd接收完所有消息之后
// 后面newconnfd的控制权交回父进程rexec server服务进程
- write(pipefd[PIPE_WRITE], &mypid, sizeof(int));
+ if (write(pipefd[PIPE_WRITE], &mypid, sizeof(int)) <= 0) {
+ rexec_err("write pid to parent failed, pipefd:%d.", pipefd[PIPE_WRITE]);
+ }
// 子进程不再使用pipe write和connfd
close(pipefd[PIPE_WRITE]);
close(newconnfd);
+ rexec_log("handshake over normaly, continue to exec new process:%s.", binary);
+
// rexec_shim_entry argv like:
// argv[0]: binary
// argv[1]: -f
--
2.33.0