From 435916d197bfce059e3afbca0975315fc5b1662f Mon Sep 17 00:00:00 2001 From: yangxin <245051644@qq.com> Date: Fri, 10 Feb 2023 17:02:05 +0800 Subject: [PATCH 4/5] Add udsproxy. Signed-off-by: yangxin <245051644@qq.com> --- qtfs/README.md | 7 +- qtfs/comm.h | 8 +- qtfs/conn.c | 9 +- qtfs/conn.h | 1 + qtfs/ipc/Makefile | 22 + qtfs/ipc/uds_connector.c | 129 +++++ qtfs/ipc/uds_event.c | 999 +++++++++++++++++++++++++++++++++ qtfs/ipc/uds_event.h | 64 +++ qtfs/ipc/uds_main.c | 556 ++++++++++++++++++ qtfs/ipc/uds_main.h | 141 +++++ qtfs/ipc/uds_module.h | 19 + qtfs/misc.c | 7 + qtfs/qtfs/sb.c | 6 +- qtfs/qtfs_server/Makefile | 17 +- qtfs/qtfs_server/fsops.c | 5 +- qtfs/qtfs_server/qtfs-server.c | 11 +- qtfs/qtfs_server/user_engine.c | 26 +- qtfs/qtinfo/qtinfo.c | 77 ++- qtfs/qtsock.c | 332 +++++++++++ 19 files changed, 2406 insertions(+), 30 deletions(-) create mode 100644 qtfs/ipc/Makefile create mode 100644 qtfs/ipc/uds_connector.c create mode 100644 qtfs/ipc/uds_event.c create mode 100644 qtfs/ipc/uds_event.h create mode 100644 qtfs/ipc/uds_main.c create mode 100644 qtfs/ipc/uds_main.h create mode 100644 qtfs/ipc/uds_module.h create mode 100644 qtfs/qtsock.c diff --git a/qtfs/README.md b/qtfs/README.md index 0cbc2e1..19987e0 100644 --- a/qtfs/README.md +++ b/qtfs/README.md @@ -24,6 +24,7 @@ qtfs的特性: ## 安装教程 目录说明: ++ **ipc**: 跨主机unix domain socket协同组件,在该目录下编译udsproxyd二进制和libudsproxy.so库。 + **qtfs**: 客户端内核模块相关代码,直接在该目录下编译客户端ko。 + **qtfs_server**: 服务端内核模块相关代码,直接在该目录下编译服务端ko和相关程序。 + **qtinfo**: 诊断工具,支持查询文件系统的工作状态以及修改log级别等。 @@ -34,19 +35,23 @@ qtfs的特性: 1. 要求内核版本在5.10或更高版本。 2. 安装内核开发包:yum install kernel-devel。 + 3. 假设host服务器ip为192.168.10.10,dpu为192.168.10.11 服务端安装: 1. cd qtfs_server 2. make clean && make 3. insmod qtfs_server.ko qtfs_server_ip=x.x.x.x qtfs_server_port=12345 qtfs_log_level=WARN - 4. ./engine 4096 16 + 4. nohup ./engine 16 1 192.168.10.10 12121 192.168.10.11 12121 2>&1 & 客户端安装: 1. cd qtfs 2. make clean && make 3. insmod qtfs.ko qtfs_server_ip=x.x.x.x qtfs_server_port=12345 qtfs_log_level=WARN + 4. cd ../ipc/ + 5. make clean && make && make install + 6. nohup udsproxyd 1 192.168.10.11 12121 192.168.10.10 12121 2>&1 & ## 使用说明 diff --git a/qtfs/comm.h b/qtfs/comm.h index 901552c..2e562bb 100644 --- a/qtfs/comm.h +++ b/qtfs/comm.h @@ -3,6 +3,9 @@ extern struct qtinfo *qtfs_diag_info; +#define QTFS_CLIENT_DEV "/dev/qtfs_client" +#define QTFS_SERVER_DEV "/dev/qtfs_server" + #define QTFS_IOCTL_MAGIC 'Q' enum { _QTFS_IOCTL_EXEC, @@ -18,6 +21,7 @@ enum { _QTFS_IOCTL_LOG_LEVEL, _QTFS_IOCTL_EPOLL_SUPPORT, + _QTFS_IOCTL_UDS_PROXY_PID, }; #define QTFS_IOCTL_THREAD_INIT _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_EXEC) @@ -31,6 +35,7 @@ enum { #define QTFS_IOCTL_CLEARALL _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_CLEARALL) #define QTFS_IOCTL_LOGLEVEL _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_LOG_LEVEL) #define QTFS_IOCTL_EPOLL_SUPPORT _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_EPOLL_SUPPORT) +#define QTFS_IOCTL_UDS_PROXY_PID _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_UDS_PROXY_PID) #define QTINFO_MAX_EVENT_TYPE 36 // look qtreq_type at req.h #define QTFS_FUNCTION_LEN 64 @@ -119,6 +124,7 @@ enum qtinfo_cnts { }; #endif +#if (defined(QTFS_CLIENT) || defined(client) || defined(QTFS_SERVER) || defined(server)) // for connection state machine typedef enum { QTCONN_INIT, @@ -159,7 +165,7 @@ struct qtinfo { #define QTINFO_STATE(state) ((state == QTCONN_INIT) ? "INIT" : \ ((state == QTCONN_CONNECTING) ? "CONNECTING" : \ ((state == QTCONN_ACTIVE) ? "ACTIVE" : "UNKNOWN"))) - +#endif //ko compile #if (defined(QTFS_CLIENT) || defined(client)) static inline void qtinfo_clear(void) diff --git a/qtfs/conn.c b/qtfs/conn.c index 26930b1..c84c85c 100644 --- a/qtfs/conn.c +++ b/qtfs/conn.c @@ -32,6 +32,7 @@ struct qtfs_sock_var_s *qtfs_epoll_var = NULL; struct socket *qtfs_server_main_sock = NULL; struct qtfs_server_userp_s *qtfs_userps = NULL; #endif +int qtfs_uds_proxy_pid = -1; #define QTFS_EPOLL_THREADIDX (QTFS_MAX_THREADS + 4) @@ -76,6 +77,10 @@ static int qtfs_conn_sockserver_init(struct qtfs_sock_var_s *pvar) { struct socket *sock; int ret; + struct sockaddr_in saddr; + saddr.sin_family = AF_INET; + saddr.sin_port = htons(pvar->port); + saddr.sin_addr.s_addr = in_aton(pvar->addr); if (!QTCONN_IS_EPOLL_CONN(pvar) && qtfs_server_main_sock != NULL) { qtfs_info("qtfs server main sock is %lx, valid or out-of-date?", (unsigned long)qtfs_server_main_sock); @@ -147,10 +152,6 @@ static int qtfs_conn_sockclient_init(struct qtfs_sock_var_s *pvar) { struct socket *sock; int ret; - struct sockaddr_in saddr; - saddr.sin_family = AF_INET; - saddr.sin_port = htons(pvar->port); - saddr.sin_addr.s_addr = in_aton(pvar->addr); ret = sock_create_kern(&init_net, AF_INET, SOCK_STREAM, 0, &sock); if (ret) { diff --git a/qtfs/conn.h b/qtfs/conn.h index db590fc..742def4 100644 --- a/qtfs/conn.h +++ b/qtfs/conn.h @@ -26,6 +26,7 @@ extern char qtfs_log_level[QTFS_LOGLEVEL_STRLEN]; extern int log_level; extern struct qtinfo *qtfs_diag_info; extern bool qtfs_epoll_mode; +extern int qtfs_uds_proxy_pid; #define qtfs_conn_get_param(void) _qtfs_conn_get_param(__func__) diff --git a/qtfs/ipc/Makefile b/qtfs/ipc/Makefile new file mode 100644 index 0000000..47e74ad --- /dev/null +++ b/qtfs/ipc/Makefile @@ -0,0 +1,22 @@ +all: udsproxyd libudsproxy.so + +udsproxyd: uds_event.o uds_main.o + gcc -g -O2 -o udsproxyd $^ -I../ + +uds_event.o: + cc -g -c -o uds_event.o uds_event.c + +uds_main.o: + cc -g -c -o uds_main.o uds_main.c + +libudsproxy.so: + gcc -g -O2 -o libudsproxy.so uds_connector.c -fPIC --shared + +install: + yes | cp udsproxyd /usr/bin/ + yes | cp libudsproxy.so /usr/lib64/ + +clean: + @rm -rf *.o udsproxyd libudsproxy.so + +.PHONY: clean diff --git a/qtfs/ipc/uds_connector.c b/qtfs/ipc/uds_connector.c new file mode 100644 index 0000000..3c46ce5 --- /dev/null +++ b/qtfs/ipc/uds_connector.c @@ -0,0 +1,129 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "uds_module.h" + +#define uds_log(info, ...) \ + do { \ + time_t t; \ + struct tm *p; \ + time(&t); \ + p = localtime(&t); \ + printf("[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \ + p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \ + p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \ + } while (0); + +#define uds_err(info, ...) \ + do { \ + time_t t; \ + struct tm *p; \ + time(&t); \ + p = localtime(&t); \ + printf("[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \ + p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \ + p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \ + } while (0); + +static unsigned short uds_conn_get_sock_type(int sockfd) +{ + unsigned short type; + int len = 2; + int ret = getsockopt(sockfd, SOL_SOCKET, SO_TYPE, &type, &len); + if (ret < 0) { + uds_err("get sock type failed, fd:%d", sockfd); + return (unsigned short)-1; + } + uds_log("fd:%d type:%d", sockfd, type); + return type; +} + +static int uds_conn_whitelist_check(const char *path) +{ + return 1; +} + +int connect(int fd, const struct sockaddr *addrarg, socklen_t len) +{ + int sock_fd; + typeof(connect) *libcconnect = NULL; + int libcret; + const struct sockaddr_un *addr = (const struct sockaddr_un *)addrarg; + + if (libcconnect == NULL) { + libcconnect = dlsym(((void *) - 1l), "connect"); + if (libcconnect == NULL) { + uds_err("can't find connect by dlsym."); + return -1; + } + } + + libcret = (*libcconnect)(fd, addrarg, len); + if (libcret == 0 || addr->sun_family != AF_UNIX) { + // 如果本地connect成功,或者非UNIX DOMAIN SOCKET,都直接返回即可 + return libcret; + } + + uds_log("enter uds connect fd:%d sunpath:%s family:%d len:%d connect function:0x%lx", fd, addr->sun_path, + addr->sun_family, len, libcconnect); + // 本地未连接,且是uds链接 + if (!uds_conn_whitelist_check(addr->sun_path)) { + uds_err("path:%s not in white list", addr->sun_path); + return libcret; + } + + // 尝试远端链接 + do { + int ret; + struct uds_proxy_remote_conn_req remoteconn; + struct uds_proxy_remote_conn_rsp remotersp; + struct sockaddr_un proxy = {.sun_family = AF_UNIX}; + sock_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock_fd < 0) { + uds_err("create socket failed"); + return libcret; + } + + strncpy(proxy.sun_path, UDS_BUILD_CONN_ADDR, sizeof(proxy.sun_path)); + if ((*libcconnect)(sock_fd, (struct sockaddr *)&proxy, sizeof(struct sockaddr_un)) < 0) { + uds_err("can't connect to uds proxy: %s", UDS_BUILD_CONN_ADDR); + goto err_end; + } + // 这里type需要是第一个入参fd的type + remoteconn.type = uds_conn_get_sock_type(fd); + if (remoteconn.type == (unsigned short)-1) { + remoteconn.type = SOCK_STREAM; + } + memset(remoteconn.sun_path, 0, sizeof(remoteconn.sun_path)); + strncpy(remoteconn.sun_path, addr->sun_path, sizeof(remoteconn.sun_path)); + ret = send(sock_fd, &remoteconn, sizeof(remoteconn), 0); + if (ret <= 0) { + uds_err("send remote connect request failed, ret:%d err:%s", ret, strerror(errno)); + goto err_end; + } + ret = recv(sock_fd, &remotersp, sizeof(remotersp), MSG_WAITALL); + if (ret <= 0) { + uds_err("recv remote connect replay failed, ret:%d err:%s", ret, strerror(errno)); + goto err_end; + } + if (remotersp.ret == 0) { + goto err_end; + } + } while(0); + + close(sock_fd); + return (*libcconnect)(fd, addrarg, len); + +err_end: + close(sock_fd); + return libcret; +} diff --git a/qtfs/ipc/uds_event.c b/qtfs/ipc/uds_event.c new file mode 100644 index 0000000..ff4d79b --- /dev/null +++ b/qtfs/ipc/uds_event.c @@ -0,0 +1,999 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "dirent.h" + +#include "uds_main.h" +#include "uds_event.h" + +int uds_event_build_step2(void *arg, int epfd, struct uds_event_global_var *p_event_var); +int uds_event_remote_build(void *arg, int epfd, struct uds_event_global_var *p_event_var); +int uds_event_build_step3(void *arg, int epfd, struct uds_event_global_var *p_event_var); +int uds_event_uds2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var); +int uds_event_tcp2uds(void *arg, int epfd, struct uds_event_global_var *p_event_var); +int uds_event_build_step4(void *arg, int epfd, struct uds_event_global_var *p_event_var); +int uds_event_tcp2pipe(void *arg, int epfd, struct uds_event_global_var *p_event_var); +int uds_event_pipe2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var); + +int uds_event_module_init(struct uds_event_global_var *p) +{ + p->msg_controllen = UDS_EVENT_BUFLEN; + p->iov_len = UDS_EVENT_BUFLEN; + p->buflen = UDS_EVENT_BUFLEN; + p->msg_controlsendlen = UDS_EVENT_BUFLEN; + p->iov_sendlen = UDS_EVENT_BUFLEN; + + p->msg_control = (char *)malloc(p->msg_controllen); + if (p->msg_control == NULL) { + uds_err("malloc msg control buf failed."); + p->msg_controllen = 0; + return EVENT_ERR; + } + p->msg_control_send = (char *)malloc(p->msg_controlsendlen); + if (p->msg_control_send == NULL) { + goto free1; + } + p->iov_base = (char *)malloc(p->iov_len); + if (p->iov_base == NULL) { + uds_err("malloc iov base failed."); + goto free2; + } + p->iov_base_send = (char *)malloc(p->iov_sendlen); + if (p->iov_base_send == NULL) { + goto free3; + } + p->buf = (char *)malloc(p->buflen); + if (p->buf == NULL) { + uds_err("malloc buf failed."); + goto free4; + } + return EVENT_OK; + +free4: + free(p->iov_base_send); + p->iov_base_send = NULL; + +free3: + free(p->iov_base); + p->iov_base = NULL; + +free2: + free(p->msg_control_send); + p->msg_control_send = NULL; + +free1: + free(p->msg_control); + p->msg_control = NULL; + return EVENT_ERR; +} + +void uds_event_module_fini(struct uds_event_global_var *p) +{ + if (p->msg_control != NULL) { + free(p->msg_control); + p->msg_control = NULL; + p->msg_controllen = 0; + } + if (p->msg_control_send != NULL) { + free(p->msg_control_send); + p->msg_control_send = NULL; + p->msg_controlsendlen = 0; + } + if (p->iov_base != NULL) { + free(p->iov_base); + p->iov_base = NULL; + p->iov_len = 0; + } + if (p->iov_base_send != NULL) { + free(p->iov_base_send); + p->iov_base_send = NULL; + p->iov_sendlen = 0; + } + if (p->buf != NULL) { + free(p->buf); + p->buf = NULL; + p->buflen = 0; + } + return; +} + +int uds_event_pre_hook(struct uds_event_global_var *p_event_var) +{ + p_event_var->cur = 0; + memset(p_event_var->tofree, 0, sizeof(struct uds_event *) * UDS_EPOLL_MAX_EVENTS); + return 0; +} + +int uds_event_post_hook(struct uds_event_global_var *p_event_var) +{ + for (int i = 0; i < p_event_var->cur; i++) { + uds_log("event:%lx fd:%d free by its peer", p_event_var->tofree[i], p_event_var->tofree[i]->fd); + uds_del_event(p_event_var->tofree[i]); + } + return 0; +} + +int uds_event_add_to_free(struct uds_event_global_var *p_event_var, struct uds_event *evt) +{ + if (evt->pipe == 1) { + uds_log("pipe event:%d no need to free peer", evt->fd); + return 0; + } + + struct uds_event *peerevt = evt->peer; + if (peerevt == NULL) { + uds_err("peer event add to free is NULL, my fd:%d", evt->fd); + return -1; + } + peerevt->tofree = 1; + uds_log("event fd:%d addr:%lx add to free", peerevt->fd, peerevt); + p_event_var->tofree[p_event_var->cur] = peerevt; + p_event_var->cur++; + return 0; +} + +int uds_event_pre_handler(struct uds_event *evt) +{ + if (evt->tofree == 1) { + uds_log("event fd:%d marked by peer as pending deletion", evt->fd); + return EVENT_ERR; + } + return EVENT_OK; +} + +/* + * 1. accept local uds connect request + * 2. set new connection's event to build link step2 + * 3. add new connection event to epoll list + */ +int uds_event_uds_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var) +{ + int connfd; + struct uds_event *evt = (struct uds_event *)arg; + if (evt == NULL) { + uds_err("param is invalid."); + return EVENT_ERR; + } + connfd = uds_sock_step_accept(evt->fd, AF_UNIX); + if (connfd <= 0) { + uds_err("conn fd error:%d", connfd); + return EVENT_ERR; + } + + uds_log("accept an new connection, fd:%d", connfd); + + uds_add_event(connfd, NULL, uds_event_build_step2, NULL); + return EVENT_OK; +} + +int uds_event_build_step2(void *arg, int epfd, struct uds_event_global_var *p_event_var) +{ + struct uds_event *evt = (struct uds_event *)arg; + if (evt == NULL) { + uds_err("param is invalid."); + return EVENT_ERR; + } + char buf[sizeof(struct uds_tcp2tcp) + sizeof(struct uds_proxy_remote_conn_req)] = {0}; + struct uds_tcp2tcp *bdmsg = (struct uds_tcp2tcp *)buf; + struct uds_proxy_remote_conn_req *msg = (struct uds_proxy_remote_conn_req *)bdmsg->data; + int len; + memset(buf, 0, sizeof(buf)); + len = recv(evt->fd, msg, sizeof(struct uds_proxy_remote_conn_req), MSG_WAITALL); + if (len == 0) { + uds_err("recv err msg:%d errno:%s", len, strerror(errno)); + return EVENT_DEL; + } + if (len < 0) { + uds_err("read msg error:%d errno:%s", len, strerror(errno)); + goto end; + } + if (msg->type != SOCK_STREAM && msg->type != SOCK_DGRAM) { + uds_err("uds type:%d invalid", msg->type); + return EVENT_ERR; + } + + struct uds_conn_arg tcp = { + .cs = UDS_SOCKET_CLIENT, + }; + int ret; + if ((ret = uds_build_tcp_connection(&tcp)) < 0) { + uds_err("step2 build tcp connection failed, return:%d", ret); + goto end; + } + bdmsg->msgtype = MSGCNTL_UDS; + bdmsg->msglen = sizeof(struct uds_proxy_remote_conn_req); + if (write(tcp.connfd, bdmsg, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_proxy_remote_conn_req)) < 0) { + uds_err("send msg to tcp failed"); + goto end; + } + + struct uds_proxy_remote_conn_req *priv = (void *)malloc(sizeof(struct uds_proxy_remote_conn_req)); + if (priv == NULL) { + uds_err("malloc failed"); + goto end; + } + + uds_log("step2 recv sun path:%s, add step3 event fd:%d", msg->sun_path, tcp.connfd); + memcpy(priv, msg, sizeof(struct uds_proxy_remote_conn_req)); + uds_add_event(tcp.connfd, evt, uds_event_build_step3, priv); + +end: + return EVENT_OK; +} + + +int uds_event_build_step3(void *arg, int epfd, struct uds_event_global_var *p_event_var) +{ + struct uds_event *evt = (struct uds_event *)arg; + struct uds_proxy_remote_conn_rsp msg; + int len; + memset(&msg, 0, sizeof(struct uds_proxy_remote_conn_rsp)); + len = read(evt->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp)); + if (len <= 0) { + uds_err("read error len:%d", len); + if (len == 0) + goto event_del; + return EVENT_ERR; + } + if (msg.ret == EVENT_ERR) { + uds_log("get build ack:%d, failed", msg.ret); + goto event_del; + } + + struct uds_proxy_remote_conn_req *udsmsg = (struct uds_proxy_remote_conn_req *)evt->priv; + struct uds_conn_arg uds; + + memset(&uds, 0, sizeof(struct uds_conn_arg)); + uds.cs = UDS_SOCKET_SERVER; + uds.udstype = udsmsg->type; + strncpy(uds.sun_path, udsmsg->sun_path, sizeof(uds.sun_path)); + if (uds_build_unix_connection(&uds) < 0) { + uds_err("failed to build uds server sunpath:%s", uds.sun_path); + goto event_del; + } + uds_log("remote conn build success, build uds server type:%d sunpath:%s fd:%d OK this event suspend,", + udsmsg->type, udsmsg->sun_path, uds.sockfd); + uds_event_suspend(epfd, evt); + uds_add_event(uds.sockfd, evt, uds_event_build_step4, NULL); + + msg.ret = 1; + write(evt->peer->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp)); + return EVENT_OK; + +event_del: + msg.ret = 0; + write(evt->peer->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp)); + free(evt->priv); + return EVENT_DEL; +} + +int uds_event_build_step4(void *arg, int epfd, struct uds_event_global_var *p_event_var) +{ + struct uds_event *evt = (struct uds_event *)arg; + int connfd = uds_sock_step_accept(evt->fd, AF_UNIX); + if (connfd < 0) { + uds_err("accept connection failed fd:%d", connfd); + return EVENT_ERR; + } + struct uds_event *peerevt = (struct uds_event *)evt->peer; + peerevt->handler = uds_event_tcp2uds; + peerevt->peer = uds_add_event(connfd, peerevt, uds_event_uds2tcp, NULL); + + uds_log("accept new connection fd:%d, peerfd:%d frontfd:%d peerfd:%d, peerevt(fd:%d) active now", + connfd, evt->peer->fd, peerevt->fd, peerevt->peer->fd, peerevt->fd); + uds_event_insert(epfd, peerevt); + return EVENT_DEL; +} + +int uds_event_tcp_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var) +{ + struct uds_event *evt = (struct uds_event *)arg; + int connfd = uds_sock_step_accept(evt->fd, AF_INET); + if (connfd <= 0) { + uds_err("tcp conn fd error:%d", connfd); + return EVENT_ERR; + } + uds_log("tcp listener event enter, new connection fd:%d.", connfd); + + uds_add_event(connfd, NULL, uds_event_remote_build, NULL); + return 0; +} + +int uds_build_connect2uds(struct uds_event *evt, struct uds_proxy_remote_conn_req *msg) +{ + struct uds_conn_arg targ; + int len = recv(evt->fd, msg, sizeof(struct uds_proxy_remote_conn_req), MSG_WAITALL); + if (len <= 0) { + uds_err("recv failed, len:%d str:%s", len, strerror(errno)); + return EVENT_ERR; + } + + targ.cs = UDS_SOCKET_CLIENT; + targ.udstype = msg->type; + memset(targ.sun_path, 0, sizeof(targ.sun_path)); + strncpy(targ.sun_path, msg->sun_path, sizeof(targ.sun_path)); + if (uds_build_unix_connection(&targ) < 0) { + struct uds_proxy_remote_conn_rsp ack; + uds_err("can't connect to sun_path:%s", targ.sun_path); + ack.ret = EVENT_ERR; + write(evt->fd, &ack, sizeof(struct uds_proxy_remote_conn_rsp)); + return EVENT_DEL; + } + + evt->peer = uds_add_event(targ.connfd, evt, uds_event_uds2tcp, NULL); + evt->handler = uds_event_tcp2uds; + + uds_log("build link req from tcp, sunpath:%s, type:%d, eventfd:%d peerfd:%d", + msg->sun_path, msg->type, targ.connfd, evt->fd); + + struct uds_proxy_remote_conn_rsp ack; + ack.ret = EVENT_OK; + + int ret = write(evt->fd, &ack, sizeof(struct uds_proxy_remote_conn_rsp)); + if (ret <= 0) { + uds_err("apply ack failed, ret:%d", ret); + return EVENT_DEL; + } + return EVENT_OK; +} + +int uds_build_pipe_proxy(struct uds_event *evt, struct uds_stru_scm_pipe *msg) +{ + int len = recv(evt->fd, msg, sizeof(struct uds_stru_scm_pipe), MSG_WAITALL); + if (len <= 0) { + uds_err("recv failed, len:%d str:%s", len, strerror(errno)); + return EVENT_ERR; + } + if (msg->dir != SCM_PIPE_READ && msg->dir != SCM_PIPE_WRITE) { + uds_err("invalid pipe dir:%d", msg->dir); + return EVENT_ERR; + } + uds_log("pipe proxy event fd:%d pipe fd:%d dir:%d", evt->fd, msg->srcfd, msg->dir); + + if (msg->dir == SCM_PIPE_READ) { + evt->pipe = 1; + evt->peerfd = evt->fd; + evt->fd = msg->srcfd; + evt->handler = uds_event_pipe2tcp; + } else { + evt->pipe = 1; + evt->peerfd = msg->srcfd; + evt->handler = uds_event_tcp2pipe; + } + return EVENT_OK; +} + +int uds_event_remote_build(void *arg, int epfd, struct uds_event_global_var *p_event_var) +{ + struct uds_event *evt = (struct uds_event *)arg; + struct uds_tcp2tcp *bdmsg = (struct uds_tcp2tcp *)p_event_var->iov_base; + struct uds_proxy_remote_conn_req *msg = (struct uds_proxy_remote_conn_req *)bdmsg->data; + int len; + int ret = EVENT_OK; + memset(p_event_var->iov_base, 0, p_event_var->iov_len); + len = recv(evt->fd, bdmsg, sizeof(struct uds_tcp2tcp), MSG_WAITALL); + if (len <= 0) { + uds_err("read no msg from sock:%d, len:%d", evt->fd, len); + return EVENT_ERR; + } + + switch (bdmsg->msgtype) { + case MSGCNTL_UDS: + ret = uds_build_connect2uds(evt, msg); + break; + case MSGCNTL_PIPE: + ret = uds_build_pipe_proxy(evt, (struct uds_stru_scm_pipe *)bdmsg->data); + break; + default: + uds_err("remote build not support msgtype %d now", bdmsg->msgtype); + break; + } + return ret; +} + +static inline mode_t uds_msg_file_mode(int fd) +{ + struct stat st; + char path[32] = {0}; + if (fstat(fd, &st) != 0) { + uds_err("get fd:%d fstat failed, errstr:%s", fd, strerror(errno)); + } + if (S_ISFIFO(st.st_mode)) { + uds_log("fd:%d is fifo", fd); + } + + return st.st_mode; +} + +static int uds_msg_scm_regular_file(int scmfd, int tcpfd, struct uds_event_global_var *p_event_var) +{ + int ret; + struct uds_tcp2tcp *p_msg = (struct uds_tcp2tcp *)p_event_var->buf; + struct uds_msg_scmrights *p_scmr = (struct uds_msg_scmrights *)&p_msg->data; + char *fdproc = calloc(1, UDS_PATH_MAX); + if (fdproc == NULL) { + uds_err("failed to calloc memory:%lx %lx", fdproc); + return EVENT_ERR; + } + sprintf(fdproc, "/proc/self/fd/%d", scmfd); + ret = readlink(fdproc, p_scmr->path, UDS_PATH_MAX); + if (ret < 0) { + uds_err("readlink:%s error, ret:%d, errstr:%s", fdproc, ret, strerror(errno)); + free(fdproc); + close(scmfd); + return EVENT_ERR; + } + free(fdproc); + p_scmr->flags = fcntl(scmfd, F_GETFL, 0); + if (p_scmr->flags < 0) { + uds_err("fcntl get flags failed:%d error:%s", p_scmr->flags, strerror(errno)); + close(scmfd); + return EVENT_ERR; + } + close(scmfd); + p_msg->msgtype = MSG_SCM_RIGHTS; + ret = write(tcpfd, p_msg, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_msg_scmrights)); + if (ret <= 0) { + uds_err("send scm rights msg to tcp failed, ret:%d", ret); + return EVENT_ERR; + } + uds_log("scm rights msg send to tcp, fd:%d path:%s flags:%d", scmfd, p_scmr->path, p_scmr->flags); + return EVENT_OK; +} + +static int uds_msg_scm_fifo_file(int scmfd, int tcpfd, struct uds_event_global_var *p_event_var) +{ +#define FDPATH_LEN 32 + int ret; + struct uds_tcp2tcp *p_get = (struct uds_tcp2tcp *)p_event_var->buf; + struct uds_stru_scm_pipe *p_pipe = (struct uds_stru_scm_pipe *)p_get->data; + char path[FDPATH_LEN] = {0}; + struct stat st; + p_get->msgtype = MSG_SCM_PIPE; + p_get->msglen = sizeof(struct uds_stru_scm_pipe); + + sprintf(path, "/proc/self/fd/%d", scmfd); + lstat(path, &st); + if (st.st_mode & S_IRUSR) { + p_pipe->dir = SCM_PIPE_READ; + uds_log("scm rights recv read pipe fd:%d, mode:%o", scmfd, st.st_mode); + } else if (st.st_mode & S_IWUSR) { + p_pipe->dir = SCM_PIPE_WRITE; + uds_log("scm rights recv write pipe fd:%d, mode:%o", scmfd, st.st_mode); + } else { + uds_err("scm rights recv invalid pipe, mode:%o fd:%d", st.st_mode, scmfd); + return EVENT_ERR; + } + p_pipe->srcfd = scmfd; + ret = send(tcpfd, p_get, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_stru_scm_pipe), 0); + if (ret <= 0) { + uds_err("send tar get msg failed, ret:%d errstr:%s", ret, strerror(errno)); + return EVENT_ERR; + } + return EVENT_OK; +} + +static int uds_msg_scmrights2tcp(struct cmsghdr *cmsg, int tcpfd, struct uds_event_global_var *p_event_var) +{ + int scmfd; + mode_t mode; + + memset(p_event_var->buf, 0, p_event_var->buflen); + memcpy(&scmfd, CMSG_DATA(cmsg), sizeof(scmfd)); + if (scmfd <= 0) { + uds_err("recv invalid scm fd:%d", scmfd); + return EVENT_ERR; + } + + mode = uds_msg_file_mode(scmfd); + + switch (mode & S_IFMT) { + case S_IFREG: + uds_log("recv scmfd:%d from uds, is regular file", scmfd); + uds_msg_scm_regular_file(scmfd, tcpfd, p_event_var); + break; + case S_IFIFO: + uds_log("recv scmfd:%d from uds, is fifo", scmfd); + uds_msg_scm_fifo_file(scmfd, tcpfd, p_event_var); + break; + default: + uds_err("scm rights not support file mode:%o", mode); + break; + } + + return EVENT_OK; +} + +static int uds_msg_cmsg2tcp(struct msghdr *msg, struct uds_event *evt, struct uds_event_global_var *p_event_var) +{ + int cnt = 0; + struct cmsghdr *cmsg = CMSG_FIRSTHDR(msg); + while (cmsg != NULL) { + cnt ++; + uds_log("cmsg type:%d len:%d level:%d, tcpfd:%d", cmsg->cmsg_type, + cmsg->cmsg_len, cmsg->cmsg_level, evt->peer->fd); + switch (cmsg->cmsg_type) { + case SCM_RIGHTS: + uds_msg_scmrights2tcp(cmsg, evt->peer->fd, p_event_var); + break; + default: + uds_err("cmsg type:%d not support now", cmsg->cmsg_type); + break; + } + cmsg = CMSG_NXTHDR(msg, cmsg); + } + return cnt; +} + +static int uds_msg_scmfd_combine_msg(struct msghdr *msg, struct cmsghdr **cmsg, int *controllen, int fd) +{ + struct cmsghdr *cnxt = NULL; + if (*cmsg == NULL) { + cnxt = CMSG_FIRSTHDR(msg); + } else { + cnxt = CMSG_NXTHDR(msg, *cmsg); + } + *cmsg = cnxt; + cnxt->cmsg_level = SOL_SOCKET; + cnxt->cmsg_type = SCM_RIGHTS; + cnxt->cmsg_len = CMSG_LEN(sizeof(fd)); + memcpy(CMSG_DATA(cnxt), &fd, sizeof(fd)); + *controllen = *controllen + cnxt->cmsg_len; + return EVENT_OK; +} + +static int uds_msg_scmright_send_fd(int sock, int fd) +{ + char byte = 0; + struct iovec iov; + struct msghdr msg; + struct cmsghdr *cmsg; + char buf[CMSG_SPACE(sizeof(fd))]; + + // send at least one char + memset(&msg, 0, sizeof(msg)); + iov.iov_base = &byte; + iov.iov_len = 1; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_name = NULL; + msg.msg_namelen = 0; + + + msg.msg_control = buf; + msg.msg_controllen = sizeof(buf); + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(fd)); + // Initialize the payload + memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd)); + msg.msg_controllen = cmsg->cmsg_len; + + if (sendmsg(sock, &msg, 0) != iov.iov_len) + return -1; + return 0; +} + +static int uds_msg_cmsg2uds(struct uds_tcp2tcp *msg, struct uds_event *evt) +{ + int scmfd = -1; + switch (msg->msgtype) { + case MSG_SCM_RIGHTS: { + struct uds_msg_scmrights *p_scmr = (struct uds_msg_scmrights *)&msg->data; + int ret; + int scmfd = open(p_scmr->path, p_scmr->flags); + if (scmfd < 0) { + uds_err("scm rights send fd failed, scmfd:%d path:%s flags:%d", scmfd, p_scmr->path, p_scmr->flags); + return -1; + } + uds_log("scm send fd:%d path:%s flags:%d", scmfd, p_scmr->path, p_scmr->flags); + break; + } + default: + uds_err("msg type:%d not support.", msg->msgtype); + return -1; + } + return scmfd; +} + +int uds_msg_tcp2uds_scm_pipe(struct uds_tcp2tcp *p_msg, struct uds_event *evt) +{ + int scmfd; + int fd[SCM_PIPE_NUM]; + struct uds_stru_scm_pipe *p_pipe = (struct uds_stru_scm_pipe *)p_msg->data; + int len = recv(evt->fd, p_pipe, p_msg->msglen, MSG_WAITALL); + if (len <= 0) { + uds_err("recv data failed, len:%d", len); + return EVENT_DEL; + } + if (p_pipe->dir != SCM_PIPE_READ && p_pipe->dir != SCM_PIPE_WRITE) { + uds_err("scm pipe recv invalid pipe dir:%d, srcfd:%d", p_pipe->dir, p_pipe->srcfd); + return EVENT_ERR; + } + struct uds_conn_arg tcp = { + .cs = UDS_SOCKET_CLIENT, + }; + int ret; + if ((ret = uds_build_tcp_connection(&tcp)) < 0) { + uds_err("build tcp connection failed, return:%d", ret); + return EVENT_ERR; + } + if (pipe(fd) == -1) { + uds_err("pipe syscall error, strerr:%s", strerror(errno)); + return EVENT_ERR; + } + if (p_pipe->dir == SCM_PIPE_READ) { + uds_log("send read pipe:%d to peer:%d", fd[SCM_PIPE_READ], evt->peer->fd); + scmfd = fd[SCM_PIPE_READ]; + // read方向,proxy读取消息并转发,此代码处是远端,所以监听tcp换发给pipe write + uds_add_pipe_event(tcp.connfd, fd[SCM_PIPE_WRITE], uds_event_tcp2pipe, NULL); + } else { + uds_log("send write pipe:%d to peer:%d", fd[SCM_PIPE_WRITE], evt->peer->fd); + scmfd = fd[SCM_PIPE_WRITE]; + // write方向,proxy读取远端代理pipe消息并转发,此处是远端,所以监听pipe read并转发给tcp + uds_add_pipe_event(fd[SCM_PIPE_READ], tcp.connfd, uds_event_pipe2tcp, NULL); + } + + p_msg->msgtype = MSGCNTL_PIPE; + p_msg->msglen = sizeof(struct uds_stru_scm_pipe); + len = write(tcp.connfd, p_msg, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_stru_scm_pipe)); + if (len <= 0) { + uds_err("send pipe msg failed, len:%d", len); + return EVENT_ERR; + } + uds_log("success to build pipe fd map, dir:%d srcfd:%d tcpfd:%d readfd:%d writefd:%d", + p_pipe->dir, p_pipe->srcfd, tcp.connfd, fd[SCM_PIPE_READ], fd[SCM_PIPE_WRITE]); + + return scmfd; +} + +int uds_event_tcp2pipe(void *arg, int epfd, struct uds_event_global_var *p_event_var) +{ + struct uds_event *evt = (struct uds_event *)arg; + memset(p_event_var->iov_base, 0, p_event_var->iov_len); + int len = read(evt->fd, p_event_var->iov_base, p_event_var->iov_len); + if (len <= 0) { + uds_err("read from tcp failed, len:%d str:%s", len, strerror(errno)); + return EVENT_DEL; + } + + uds_log("tcp:%d to pipe:%d len:%d, buf:\n>>>>>>>\n%.*s\n<<<<<<<\n", evt->fd, evt->peerfd, len, len, p_event_var->iov_base); + int ret = write(evt->peerfd, p_event_var->iov_base, len); + if (ret <= 0) { + uds_err("write to pipe failed, fd:%d str:%s", evt->peerfd, strerror(errno)); + return EVENT_DEL; + } + return EVENT_OK; +} + +int uds_event_pipe2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var) +{ + struct uds_event *evt = (struct uds_event *)arg; + memset(p_event_var->iov_base, 0, p_event_var->iov_len); + int len = read(evt->fd, p_event_var->iov_base, p_event_var->iov_len); + if (len <= 0) { + uds_err("read from pipe failed, len:%d str:%s", len, strerror(errno)); + return EVENT_DEL; + } + + uds_log("pipe:%d to tcp:%d len:%d, buf:\n>>>>>>>\n%.*s\n<<<<<<<\n", evt->fd, evt->peerfd, len, len, p_event_var->iov_base); + int ret = write(evt->peerfd, p_event_var->iov_base, len); + if (ret <= 0) { + uds_err("write to tcp failed, fd:%d str:%s", evt->peerfd, strerror(errno)); + return EVENT_DEL; + } + return EVENT_OK; + +} + +int uds_msg_tcp_end_msg(int sock) +{ + struct uds_tcp2tcp end = {.msgtype = MSG_END, .msglen = 0,}; + int ret = write(sock, &end, sizeof(struct uds_tcp2tcp)); + if (ret <= 0) { + uds_err("write end msg failed, ret:%d fd:%d", ret, sock); + return EVENT_DEL; + } + return EVENT_OK; +} + +void uds_msg_init_event_buf(struct uds_event_global_var *p) +{ + memset(p->iov_base, 0, p->iov_len); + memset(p->iov_base_send, 0, p->iov_sendlen); + memset(p->msg_control, 0, p->msg_controllen); + memset(p->msg_control_send, 0, p->msg_controlsendlen); + memset(p->buf, 0, p->buflen); + return; +} + +#define TEST_BUFLEN 256 +int uds_event_uds2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var) +{ + struct uds_event *evt = (struct uds_event *)arg; + struct iovec iov; + struct msghdr msg; + struct cmsghdr *cmsg; + int cmsgcnt = 0; + int len; + + memset(&msg, 0, sizeof(msg)); + iov.iov_base = p_event_var->iov_base + sizeof(struct uds_tcp2tcp); + iov.iov_len = p_event_var->iov_len - sizeof(struct uds_tcp2tcp); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_name = NULL; + msg.msg_namelen = 0; + + msg.msg_control = p_event_var->msg_control; + msg.msg_controllen = p_event_var->msg_controllen; + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_len = p_event_var->msg_controllen; + + len = recvmsg(evt->fd, &msg, 0); + if (len == 0) { + uds_err("recvmsg error, return:%d", len); + uds_event_add_to_free(p_event_var, evt); + return EVENT_DEL; + } + if (len < 0) { + uds_err("recvmsg error return val:%d", len); + return EVENT_ERR; + } + cmsg = CMSG_FIRSTHDR(&msg); + if (cmsg != NULL) { + uds_log("recvmsg cmsg len:%d cmsglen:%d iovlen:%d iov:%s cmsglevel:%d cmsgtype:%d", + len, cmsg->cmsg_len, iov.iov_len, iov.iov_base, cmsg->cmsg_level, cmsg->cmsg_type); + cmsgcnt = uds_msg_cmsg2tcp(&msg, evt, p_event_var); + if (len - cmsgcnt == 0) + goto endmsg; + } + + struct uds_tcp2tcp *p_msg = (struct uds_tcp2tcp *)p_event_var->iov_base; + p_msg->msgtype = MSG_NORMAL; + p_msg->msglen = len; + int ret = write(evt->peer->fd, (void *)p_msg, p_msg->msglen + sizeof(struct uds_tcp2tcp)); + if (ret <= 0) { + uds_err("write to peer:%d failed, retcode:%d len:%d", evt->peer->fd, ret, len); + return EVENT_ERR; + } + + uds_log("write iov msg to tcp success, msgtype:%d ret:%d iovlen:%d recvlen:%d udsheadlen:%d msglen:%d msg:\n>>>>>>>\n%.*s\n<<<<<<<\n", + p_msg->msgtype, ret, iov.iov_len, len, sizeof(struct uds_tcp2tcp), p_msg->msglen, p_msg->msglen, p_msg->data); +endmsg: + return uds_msg_tcp_end_msg(evt->peer->fd); +} + +int uds_event_tcp2uds(void *arg, int epfd, struct uds_event_global_var *p_event_var) +{ +#define MAX_FDS 64 + int fds[MAX_FDS] = {0}; + int fdnum = 0; + struct uds_event *evt = (struct uds_event *)arg; + struct uds_tcp2tcp *p_msg = (struct uds_tcp2tcp *)p_event_var->iov_base; + int ret; + int normal_msg_len = 0; + struct msghdr msg; + struct cmsghdr *cmsg = NULL; + struct iovec iov; + int msg_controllen = 0; + + memset(&msg, 0, sizeof(msg)); + iov.iov_base = p_event_var->iov_base_send; + iov.iov_len = 0; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_control = p_event_var->msg_control_send; + msg.msg_controllen = p_event_var->msg_controlsendlen; + + while (1) { + int len = recv(evt->fd, p_msg, sizeof(struct uds_tcp2tcp), MSG_WAITALL); + if (len <= 0) { + uds_err("recv no msg maybe sock is closed, delete this tcp2uds event, len:%d.", len); + goto close_event; + } + uds_log("pmsg:%lx type:%d len:%d iov_base:%lx len:%d", p_msg, p_msg->msgtype, p_msg->msglen, p_event_var->iov_base, len); + if (p_msg->msgtype == MSG_END) { + break; + } + if (p_msg->msglen > p_event_var->iov_len - sizeof(struct uds_tcp2tcp) || p_msg->msglen <= 0) { + uds_err("pmsg len:%d is invalid, fd:%d peerfd:%d", p_msg->msglen, evt->fd, evt->peer->fd); + continue; + } + switch(p_msg->msgtype) { + case MSG_NORMAL: + if (normal_msg_len != 0) { + uds_err("normal msg repeat recv fd:%d", evt->fd); + goto err; + } + normal_msg_len = recv(evt->fd, p_event_var->iov_base_send, p_msg->msglen, MSG_WAITALL); + if (normal_msg_len <= 0) { + uds_err("recv msg error:%d fd:%d", len, evt->fd); + goto close_event; + } + iov.iov_len = normal_msg_len; + uds_log("recv normal msg len:%d str: \n>>>>>>>\n%.*s\n<<<<<<<", iov.iov_len, iov.iov_len, iov.iov_base); + break; + case MSG_SCM_RIGHTS: { + int len; + int scmfd; + struct uds_msg_scmrights *p_scm = (struct uds_msg_scmrights *) p_msg->data; + memset(p_scm->path, 0, sizeof(p_scm->path)); + // SCM RIGHTS msg proc + len = recv(evt->fd, p_msg->data, p_msg->msglen, MSG_WAITALL); + if (len <= 0) { + uds_err("recv data failed len:%d", p_msg->msglen); + return EVENT_DEL; + } + scmfd = uds_msg_cmsg2uds(p_msg, evt); + if (scmfd == -1) { + goto err; + } + fds[fdnum++] = scmfd; + uds_msg_scmfd_combine_msg(&msg, &cmsg, &msg_controllen, scmfd); + break; + } + case MSG_SCM_PIPE: { + int scmfd; + scmfd = uds_msg_tcp2uds_scm_pipe(p_msg, evt); + if (scmfd == EVENT_DEL) + goto close_event; + if (scmfd < 0) + goto err; + fds[fdnum++] = scmfd; + uds_msg_scmfd_combine_msg(&msg, &cmsg, &msg_controllen, scmfd); + break; + } + default: + uds_err("recv unsupport msg type:%d event fd:%d", p_msg->msgtype, evt->fd); + break; + } + } + if (msg_controllen == 0 && iov.iov_len == 0) + goto err; + msg.msg_controllen = msg_controllen; + if (iov.iov_len == 0) iov.iov_len = 1; + ret = sendmsg(evt->peer->fd, &msg, 0); + uds_log("evt:%d sendmsg len:%d, controllen:%d errno:%s", evt->fd, ret, msg_controllen, strerror(errno)); + for (int i = 0; i < fdnum; i++) { + close(fds[i]); + } + return EVENT_OK; +err: + return EVENT_ERR; + +close_event: + uds_event_add_to_free(p_event_var, evt); + return EVENT_DEL; +} + +int uds_diag_is_epoll_fd(int fd) +{ + for (int i = 0; i < p_uds_var->work_thread_num; i++) { + if (fd == p_uds_var->efd[i]) + return 1; + } + return 0; +} + +void uds_diag_list_fd(char *buf, int len) +{ +#define FDPATH_LEN 32 + int pos = 0; + char path[32] = {0}; + DIR *dir = NULL; + struct dirent *entry; + dir = opendir("/proc/self/fd/"); + if (dir == NULL) { + uds_err("open path:/proc/self/fd/ failed"); + return; + } + while (entry = readdir(dir)) { + int fd = atoi(entry->d_name); + char fdpath[FDPATH_LEN]; + char link[FDPATH_LEN]; + int ret; + if (fd <= 2 || uds_diag_is_epoll_fd(fd)) + continue; + memset(fdpath, 0, FDPATH_LEN); + memset(link, 0, FDPATH_LEN); + sprintf(fdpath, "/proc/self/fd/%d", fd); + ret = readlink(fdpath, link, FDPATH_LEN); + pos += sprintf(&buf[pos], "+ fd:%s type:%u link:%s\n", entry->d_name, entry->d_type, link); + } + closedir(dir); + return; +} + +int uds_diag_string(char *buf, int len) +{ + int pos = 0; + memset(buf, 0, len); + pos = sprintf(buf, "+-----------------------------Unix Proxy Diagnostic information-------------------------+\n"); + pos += sprintf(&buf[pos], "+ Thread nums:%d\n", p_uds_var->work_thread_num); + for (int i = 0; i < p_uds_var->work_thread_num; i++) { + pos += sprintf(&buf[pos], "+ Thread %d events count:%d\n", i+1, p_uds_var->work_thread[i].info.events); + } + pos += sprintf(&buf[pos], "+ Log level:%s\n", p_uds_var->logstr[p_uds_var->loglevel]); + strcat(buf, "+---------------------------------------------------------------------------------------+\n"); + return strlen(buf); +} + +// DIAG INFO +int uds_event_diag_info(void *arg, int epfd, struct uds_event_global_var *p_event_var) +{ + int connfd; + int len; + int ret; + struct uds_event *evt = (struct uds_event *)arg; + if (evt == NULL) { + uds_err("param is invalid."); + return EVENT_ERR; + } + connfd = uds_sock_step_accept(evt->fd, AF_UNIX); + if (connfd <= 0) { + uds_err("conn fd error:%d", connfd); + return EVENT_ERR; + } + + uds_log("diag accept an new connection to send diag info, fd:%d", connfd); + len = uds_diag_string(p_event_var->iov_base, p_event_var->iov_len); + ret = send(connfd, p_event_var->iov_base, len, 0); + if (ret <= 0) { + uds_err("send diag info error, ret:%d len:%d", ret, len); + } + close(connfd); + return EVENT_OK; +} + +#define UDS_LOG_STR(level) (level < 0 || level >= UDS_LOG_MAX) ? p_uds_var->logstr[UDS_LOG_MAX] : p_uds_var->logstr[level] +int uds_event_debug_level(void *arg, int epfd, struct uds_event_global_var *p_event_var) +{ + int connfd; + int len; + int ret; + int cur; + struct uds_event *evt = (struct uds_event *)arg; + if (evt == NULL) { + uds_err("param is invalid."); + return EVENT_ERR; + } + connfd = uds_sock_step_accept(evt->fd, AF_UNIX); + if (connfd <= 0) { + uds_err("conn fd error:%d", connfd); + return EVENT_ERR; + } + + cur = p_uds_var->loglevel; + if (cur + 1 < UDS_LOG_MAX) { + p_uds_var->loglevel += 1; + } else { + p_uds_var->loglevel = UDS_LOG_NONE; + } + + uds_log("debug level accept a new connection, current level:%s change to:%s", UDS_LOG_STR(cur), UDS_LOG_STR(p_uds_var->loglevel)); + + len = sprintf(p_event_var->iov_base, "+---------------UDS LOG LEVEL UPDATE--------------+\n" + "+ Log level is:%s before, now change to :%s.\n" + "+-------------------------------------------------+\n", UDS_LOG_STR(cur), UDS_LOG_STR(p_uds_var->loglevel)); + + ret = send(connfd, p_event_var->iov_base, len, 0); + if (ret <= 0) { + uds_err("send debug level info error, ret:%d len:%d", ret, len); + } + close(connfd); + return EVENT_OK; +} diff --git a/qtfs/ipc/uds_event.h b/qtfs/ipc/uds_event.h new file mode 100644 index 0000000..a52bf67 --- /dev/null +++ b/qtfs/ipc/uds_event.h @@ -0,0 +1,64 @@ +#ifndef __QTFS_UDS_EVENT_H__ +#define __QTFS_UDS_EVENT_H__ + +#define UDS_EVENT_BUFLEN 4096 +#define UDS_PATH_MAX 1024 + +enum EVENT_RETCODE { + EVENT_OK = 0, + EVENT_ERR = -1, + EVENT_DEL = -2, // del this event after return +}; + +enum TCP2TCP_TYPE { + MSG_NORMAL = 0xa5a5, // 消息类型从特殊数字开始,防止误识别消息 + MSG_SCM_RIGHTS, + MSG_SCM_CREDENTIALS, // unix domain 扩展消息,预留 + MSG_SCM_SECURITY, // unix domain 扩展消息,预留 + MSG_GET_TARGET, // 控制消息,用于获取对端的target fd + MSG_SCM_PIPE, // 使用SCM传递了一个pipe + MSG_END, // tcp消息的结束体 +}; + +enum TCPCNTL_TYPE { + MSGCNTL_UDS = 1, // uds代理模式 + MSGCNTL_PIPE, // pipe匿名管道代理模式 +}; + +// 因为要区分SCM_RIGHTS和普通消息,TCP到TCP需要有一个协议头 +struct uds_tcp2tcp { + int msgtype; + int msglen; // len of data + char data[0]; +}; + +struct uds_msg_scmrights { + int flags; // open flags + char path[UDS_PATH_MAX]; +}; + +enum { + SCM_PIPE_READ = 0, + SCM_PIPE_WRITE, + SCM_PIPE_NUM, +}; + +struct uds_stru_scm_pipe { + int dir; // 0: send read filedes; 1: send write filedes + // proxy通过scm rights接收到员pipe fd,后面消息回来时事件 + // 会发生变化,所以需要回消息时带上,才能建立关联 + int srcfd; +}; + +int uds_event_uds_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var); +int uds_event_tcp_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var); +int uds_event_diag_info(void *arg, int epfd, struct uds_event_global_var *p_event_var); +int uds_event_debug_level(void *arg, int epfd, struct uds_event_global_var *p_event_var); +int uds_event_pre_handler(struct uds_event *evt); +int uds_event_pre_hook(struct uds_event_global_var *p_event_var); +int uds_event_post_hook(struct uds_event_global_var *p_event_var); +int uds_event_module_init(struct uds_event_global_var *p_event_var); +void uds_event_module_fini(struct uds_event_global_var *p); + +#endif + diff --git a/qtfs/ipc/uds_main.c b/qtfs/ipc/uds_main.c new file mode 100644 index 0000000..b479a60 --- /dev/null +++ b/qtfs/ipc/uds_main.c @@ -0,0 +1,556 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../comm.h" +#include "uds_main.h" +#include "uds_event.h" + +struct uds_global_var g_uds_var = {.logstr = {"NONE", "ERROR", "INFO", "UNKNOWN"}}; +struct uds_global_var *p_uds_var = &g_uds_var; +struct uds_event_global_var *g_event_var = NULL; + +struct uds_event *uds_alloc_event() +{ + struct uds_event *p = (struct uds_event *)malloc(sizeof(struct uds_event)); + if (p == NULL) { + uds_err("malloc failed."); + return NULL; + } + memset(p, 0, sizeof(struct uds_event)); + return p; +} + +int uds_event_insert(int efd, struct uds_event *event) +{ + struct epoll_event evt; + evt.data.ptr = (void *)event; + evt.events = EPOLLIN; + if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, event->fd, &evt)) { + uds_err("epoll ctl add fd:%d event failed.", event->fd); + return -1; + } + return 0; +} + +int uds_event_suspend(int efd, struct uds_event *event) +{ + int ret = epoll_ctl(efd, EPOLL_CTL_DEL, event->fd, NULL); + if (ret != 0) { + uds_err("failed to suspend fd:%d.", event->fd); + return -1; + } + return 0; +} + +int uds_event_delete(int efd, int fd) +{ + int ret = epoll_ctl(efd, EPOLL_CTL_DEL, fd, NULL); + if (ret != 0) { + uds_err("failed to delete event fd:%d.", fd); + } else { + uds_log("event fd:%d deleted.", fd); + } + close(fd); + return ret; +} + +void uds_main_loop(int efd, struct uds_thread_arg *arg) +{ + int n = 0; + int ret; + struct uds_event *udsevt; + struct epoll_event *evts = NULL; + struct uds_event_global_var *p_event_var = arg->p_event_var; + if (p_event_var == NULL) { + uds_err("event variable invalid."); + return; + } + + evts = calloc(UDS_EPOLL_MAX_EVENTS, sizeof(struct epoll_event)); + if (evts == NULL) { + uds_err("init calloc evts failed."); + return; + } + if (uds_event_module_init(p_event_var) == EVENT_ERR) { + uds_err("uds event module init failed, main loop not run."); + return; + } +#ifdef QTFS_SERVER + extern int engine_run; + while (engine_run) { +#else + while (1) { +#endif + n = epoll_wait(efd, evts, UDS_EPOLL_MAX_EVENTS, 1000); + if (n == 0) + continue; + if (n < 0) { + uds_err("epoll wait return errcode:%d", n); + continue; + } + arg->info.events += n; + uds_event_pre_hook(p_event_var); + for (int i = 0; i < n; i++) { + udsevt = (struct uds_event *)evts[i].data.ptr; + uds_log("event fd:%d events:%d tofree:%d", udsevt->fd, evts[i].events, udsevt->tofree); + if (udsevt->handler == NULL) { + uds_err("bad event, fd:%d handler is NULL.", udsevt->fd); + continue; + } + // 预检查失败择不执行handler + if (uds_event_pre_handler(udsevt) == EVENT_ERR) { + continue; + } + ret = udsevt->handler(udsevt, efd, p_event_var); + // 此处释放当前事件,peer事件需要handler里面释放 + if (ret == EVENT_DEL) { + uds_del_event(udsevt); + } + } + uds_event_post_hook(p_event_var); + } + uds_log("main loop exit."); + uds_event_module_fini(p_event_var); + return; +} + +int uds_build_tcp_connection(struct uds_conn_arg *arg) +{ + const int sock_max_conn_num = 1024; + + if (arg->cs > UDS_SOCKET_SERVER) { + uds_err("cs type %d is error.", arg->cs); + return -1; + } + struct sockaddr_in sock_addr = { + .sin_family = AF_INET, + }; + int sock_fd = socket(AF_INET, SOCK_STREAM, 0); + + if (sock_fd < 0) { + uds_err("As %s failed, socket fd: %d, err:%s.", + (arg->cs == UDS_SOCKET_CLIENT) ? "client" : "server", + sock_fd, strerror(errno)); + return -1; + } + arg->sockfd = sock_fd; + + if (arg->cs == UDS_SOCKET_SERVER) { + sock_addr.sin_port = htons(p_uds_var->tcp.port); + sock_addr.sin_addr.s_addr = inet_addr(p_uds_var->tcp.addr); + if (bind(sock_fd, (struct sockaddr *)&sock_addr, sizeof(sock_addr)) < 0) { + uds_err("As server failed, bind error, err:%s.", + strerror(errno)); + goto close_and_return; + } + if (listen(sock_fd, sock_max_conn_num) < 0) { + uds_err("As server listen failed, err:%s.", strerror(errno)); + goto close_and_return; + } + } else { + sock_addr.sin_port = htons(p_uds_var->tcp.peerport); + sock_addr.sin_addr.s_addr = inet_addr(p_uds_var->tcp.peeraddr); + if (connect(arg->sockfd, (struct sockaddr *)&sock_addr, sizeof(struct sockaddr_in)) < 0) { + goto close_and_return; + } + arg->connfd = sock_fd; + uds_log("Connect to server successed, ip:%s port:%u", p_uds_var->tcp.peeraddr, p_uds_var->tcp.peerport); + } + + return 0; +close_and_return: + close(sock_fd); + return -1; +} + +int uds_build_unix_connection(struct uds_conn_arg *arg) +{ + const int sock_max_conn_num = 5; + if (arg->cs > UDS_SOCKET_SERVER) { + uds_err("cs type %d is error.", arg->cs); + return -1; + } + struct sockaddr_un sock_addr = { + .sun_family = AF_UNIX, + }; + int sock_fd = socket(AF_UNIX, arg->udstype, 0); + + if (sock_fd < 0) { + uds_err("As %s failed, socket fd: %d, err:%s.", + (arg->cs == UDS_SOCKET_CLIENT) ? "client" : "server", + sock_fd, strerror(errno)); + return -1; + } + strncpy(sock_addr.sun_path, arg->sun_path, sizeof(sock_addr.sun_path)); + arg->sockfd = sock_fd; + + if (arg->cs == UDS_SOCKET_SERVER) { + unlink(sock_addr.sun_path); + if (bind(sock_fd, (struct sockaddr *)&sock_addr, sizeof(sock_addr)) < 0) { + uds_err("As server failed, bind error, err:%s.", + strerror(errno)); + goto close_and_return; + } + if (listen(sock_fd, sock_max_conn_num) < 0) { + uds_err("As server listen failed, err:%s.", strerror(errno)); + goto close_and_return; + } + } else { + if (connect(arg->sockfd, (struct sockaddr *)&sock_addr, sizeof(struct sockaddr_un)) < 0) { + goto close_and_return; + } + arg->connfd = sock_fd; + uds_log("Connect to server successed, sun path:%s", arg->sun_path); + } + + return 0; +close_and_return: + uds_log("close sockfd:%d and return", sock_fd); + close(sock_fd); + return -1; + +} + +int uds_sock_step_accept(int sock_fd, int family) +{ + struct sockaddr_in in_addr; + struct sockaddr_un un_addr; + socklen_t len = (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_un); + int connfd; + if (family == AF_INET) { + connfd = accept(sock_fd, (struct sockaddr *)&in_addr, &len); + } else { + connfd = accept(sock_fd, (struct sockaddr *)&un_addr, &len); + } + if (connfd < 0) { + uds_err("Accept error:%d, err:%s.", connfd, strerror(errno)); + return connfd; + } + if (family == AF_INET) { + uds_log("Accept success, ip:%s, port:%u", + inet_ntoa(in_addr.sin_addr), + ntohs(in_addr.sin_port)); + } else { + uds_log("Accept success, sun path:%s", un_addr.sun_path); + } + return connfd; +} + +struct uds_event *uds_add_event(int fd, struct uds_event *peer, int (*handler)(void *, int, struct uds_event_global_var *), void *priv) +{ + struct uds_event *newevt = uds_alloc_event(); + int hash = fd % p_uds_var->work_thread_num; + if (newevt == NULL || p_uds_var->efd[hash] <= 0) { + uds_err("alloc event failed, efd:%d hash:%d", p_uds_var->efd[hash], hash); + return NULL; + } + + newevt->fd = fd; + newevt->peer = peer; // 如果tcp回应,消息转回uds这个fd + newevt->handler = handler; + newevt->priv = priv; + newevt->tofree = 0; + uds_event_insert(p_uds_var->efd[hash], newevt); + return newevt; +} + +struct uds_event *uds_add_pipe_event(int fd, int peerfd, int (*handler)(void *, int, struct uds_event_global_var *), void *priv) +{ + int hash = fd % p_uds_var->work_thread_num; + struct uds_event *newevt = uds_alloc_event(); + if (newevt == NULL || p_uds_var->efd[hash] <= 0) { + uds_err("alloc event failed, efd:%d", p_uds_var->efd[hash]); + return NULL; + } + + newevt->fd = fd; + newevt->peerfd = peerfd; // 如果tcp回应,消息转回uds这个fd + newevt->handler = handler; + newevt->priv = priv; + newevt->tofree = 0; + newevt->pipe = 1; + uds_event_insert(p_uds_var->efd[hash], newevt); + return newevt; +} + +void uds_del_event(struct uds_event *evt) +{ + int hash = evt->fd % p_uds_var->work_thread_num; + if (evt->pipe == 1 &&evt->peerfd != -1) { + // pipe是单向,peerfd没有epoll事件,所以直接关闭 + close(evt->peerfd); + evt->peerfd = -1; + } + uds_event_delete(p_uds_var->efd[hash], evt->fd); + free(evt); + return; +} + +void uds_thread_diag_init(struct uds_thread_info *info) +{ + info->events = 0; + info->fdnum = 0; +} + +void *uds_proxy_thread(void *arg) +{ + struct uds_thread_arg *parg = (struct uds_thread_arg *)arg; + uds_thread_diag_init(&parg->info); + uds_main_loop(parg->efd, parg); + return NULL; +} + +struct uds_event *uds_init_unix_listener(const char *addr, int (*handler)(void *, int, struct uds_event_global_var *)) +{ + struct uds_event *udsevt; + struct uds_conn_arg arg; + struct uds_conn_arg *parg = &arg; + + parg->cs = UDS_SOCKET_SERVER; + strncpy(parg->sun_path, addr, sizeof(parg->sun_path)); + parg->udstype = SOCK_STREAM; + if (uds_build_unix_connection(parg) != 0) + return NULL; + udsevt = uds_add_event(parg->sockfd, NULL, handler, NULL); + if (udsevt == NULL) { + uds_err("add unix listener event failed."); + return NULL; + } + return udsevt; +} + +struct uds_event *uds_init_tcp_listener() +{ + struct uds_event *tcpevt; + struct uds_conn_arg arg; + struct uds_conn_arg *parg = &arg; + parg->cs = UDS_SOCKET_SERVER; + if (uds_build_tcp_connection(parg) != 0) + return NULL; + + tcpevt = uds_add_event(parg->sockfd, NULL, uds_event_tcp_listener, NULL); + if (tcpevt == NULL) + return NULL; + return tcpevt; +} + +void uds_thread_create() +{ + struct uds_conn_arg arg; + struct uds_conn_arg *parg = &arg; + struct uds_event *udsevt; + struct uds_event *tcpevt; + struct uds_event *diagevt; + struct uds_event *logevt; + int efd; + + for (int i = 0; i < p_uds_var->work_thread_num; i++) { + efd = epoll_create1(0); + if (efd == -1) { + uds_err("epoll create1 failed, i:%d.", i); + return; + } + p_uds_var->efd[i] = efd; + } + + if ((udsevt = uds_init_unix_listener(UDS_BUILD_CONN_ADDR, uds_event_uds_listener)) == NULL) + return; + + if ((tcpevt = uds_init_tcp_listener()) == NULL) + goto end; + + if ((diagevt = uds_init_unix_listener(UDS_DIAG_ADDR, uds_event_diag_info)) == NULL) + goto end1; + + if ((logevt = uds_init_unix_listener(UDS_LOGLEVEL_UPD, uds_event_debug_level)) == NULL) + goto end2; + + do { + pthread_t *thrd = (pthread_t *)malloc(sizeof(pthread_t) * p_uds_var->work_thread_num); + struct uds_thread_arg *work_thread; + if (thrd == NULL) { + uds_err("thread info malloc failed."); + break; + } + work_thread = (struct uds_thread_arg *)malloc(sizeof(struct uds_thread_arg *) * p_uds_var->work_thread_num); + if (work_thread == NULL) { + uds_err("thread arg malloc failed."); + free(thrd); + break; + } + + for (int i = 0; i < p_uds_var->work_thread_num; i++) { + p_uds_var->work_thread[i].p_event_var = &g_event_var[i]; + p_uds_var->work_thread[i].efd = p_uds_var->efd[i]; + (void)pthread_create(&thrd[i], NULL, uds_proxy_thread, &p_uds_var->work_thread[i]); + } + p_uds_var->loglevel = UDS_LOG_NONE; + for (int i = 0; i < p_uds_var->work_thread_num; i++) + pthread_join(thrd[i], NULL); + free(thrd); + free(work_thread); + } while(0); +end2: + uds_del_event(diagevt); +end1: + uds_del_event(tcpevt); +end: + uds_del_event(udsevt); + for (int i = 0; i < p_uds_var->work_thread_num; i++) + close(p_uds_var->efd[i]); + + return; +} + +int uds_set_pid() +{ + int fd = -1; + if (access(QTFS_CLIENT_DEV, 0) == 0) { + fd = open(QTFS_CLIENT_DEV, O_RDONLY | O_NONBLOCK); + if (fd < 0) + goto open_failed; + goto set; + } + if (access(QTFS_SERVER_DEV, 0) == 0) { + fd = open(QTFS_SERVER_DEV, O_RDONLY | O_NONBLOCK); + if (fd < 0) + goto open_failed; + goto set; + } + uds_err("qtfs dev(<%s> or <%s>) both not exist", QTFS_CLIENT_DEV, QTFS_SERVER_DEV); + return EVENT_ERR; + +open_failed: + uds_err("open %s failed, ret:%d", QTFS_CLIENT_DEV, fd); + return EVENT_ERR; + +set: + do { + int pid = getpid(); + int ret = ioctl(fd, QTFS_IOCTL_UDS_PROXY_PID, &pid); + if (ret < 0) { + uds_err("ioctl failed to set pid:%d ret:%d", pid, ret); + return EVENT_ERR; + } + uds_log("set proxy pid:%d to qtfs successed.", pid); + } while (0); + close(fd); + return EVENT_OK; +} + +int uds_env_prepare() +{ + DIR *dir; + if (access(UDS_BUILD_CONN_ADDR, 0) == 0) + return EVENT_OK; + + if ((dir = opendir(UDS_BUILD_CONN_DIR)) == NULL) { + if (mkdir(UDS_BUILD_CONN_DIR, 0755) < 0) { + uds_err("mkdir %s failed.", UDS_BUILD_CONN_DIR); + } + } else { + closedir(dir); + } + int fd = open(UDS_BUILD_CONN_ADDR, O_RDONLY|O_CREAT, 0700); + if (fd < 0) { + uds_err("create file:%s failed.", UDS_BUILD_CONN_ADDR); + return EVENT_ERR; + } + uds_log("success to create %s.", UDS_BUILD_CONN_ADDR); + close(fd); + return EVENT_OK; +} + +static void uds_sig_pipe(int signum) +{ + uds_log("uds proxy recv sigpipe and ignore"); +} + +void uds_helpinfo(char *argv[]) +{ + uds_err("Usage:"); + uds_err(" %s .", argv[0]); + uds_err("Param:"); + uds_err(" - server ip address"); + uds_err(" - port number"); + uds_err(" - peer address"); + uds_err(" - peer port"); + return; +} + +/* + * uds跨主机协同主程序,设计成镜像的,每一端2个线程:send thread、recv thread + * 在server侧线程由原engine拉起,在client侧新起一个engine进程 + */ +#ifdef QTFS_SERVER +int uds_proxy_main(int argc, char *argv[]) +#else +int main(int argc, char *argv[]) +#endif +{ + p_uds_var->loglevel = UDS_LOG_INFO; +#define ARG_NUM 6 + if (argc != ARG_NUM) { + uds_helpinfo(argv); + return -1; + } + if (uds_set_pid() != EVENT_OK) { + uds_err("proxy failed to set pid."); + return -1; + } + if (uds_env_prepare() != EVENT_OK) { + uds_err("proxy prepare environment failed."); + return -1; + } + signal(SIGPIPE, uds_sig_pipe); + p_uds_var->work_thread_num = atoi(argv[1]); + if (p_uds_var->work_thread_num <= 0 || p_uds_var->work_thread_num > UDS_WORK_THREAD_MAX) { + uds_err("work thread num:%d is too large.(must small or equal than %d)", p_uds_var->work_thread_num, UDS_WORK_THREAD_MAX); + return -1; + } + p_uds_var->efd = (int *)malloc(sizeof(int) * p_uds_var->work_thread_num); + if (p_uds_var->efd == NULL) { + uds_err("efd malloc failed, num:%d", p_uds_var->work_thread_num); + return -1; + } + + p_uds_var->work_thread = (struct uds_thread_arg *)malloc(sizeof(struct uds_thread_arg) * p_uds_var->work_thread_num); + if (p_uds_var->work_thread == NULL) { + uds_err("work thread var malloc failed."); + return -1; + } + p_uds_var->tcp.port = atoi(argv[3]); + strncpy(p_uds_var->tcp.addr, argv[2], 20); + p_uds_var->tcp.peerport = atoi(argv[5]); + strncpy(p_uds_var->tcp.peeraddr, argv[4], 20); + + uds_log("uds proxy param thread num:%d ip:%s port:%u peerip:%s port:%u", + p_uds_var->work_thread_num, p_uds_var->tcp.addr, p_uds_var->tcp.port, + p_uds_var->tcp.peeraddr, p_uds_var->tcp.peerport); + g_event_var = (struct uds_event_global_var *)malloc(sizeof(struct uds_event_global_var) * p_uds_var->work_thread_num); + if (g_event_var == NULL) { + uds_err("event variable malloc failed"); + return -1; + } + uds_thread_create(); + + return 0; +} diff --git a/qtfs/ipc/uds_main.h b/qtfs/ipc/uds_main.h new file mode 100644 index 0000000..793cd2f --- /dev/null +++ b/qtfs/ipc/uds_main.h @@ -0,0 +1,141 @@ +#ifndef __QTFS_UDS_MAIN_H__ +#define __QTFS_UDS_MAIN_H__ + +#include + +#include "uds_module.h" + +#define UDS_EPOLL_MAX_EVENTS 64 +#define UDS_WORK_THREAD_MAX 64 + +extern struct uds_global_var *p_uds_var; + +enum { + UDS_LOG_NONE, + UDS_LOG_ERROR, + UDS_LOG_INFO, + UDS_LOG_MAX, +}; + +#define uds_log(info, ...) \ + if (p_uds_var->loglevel >= UDS_LOG_INFO) {\ + time_t t; \ + struct tm *p; \ + time(&t); \ + p = localtime(&t); \ + printf("[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \ + p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \ + p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \ + } + +#define uds_log2(info, ...) \ + if (p_uds_var->loglevel >= UDS_LOG_INFO) {\ + time_t t; \ + struct tm *p; \ + time(&t); \ + p = localtime(&t); \ + printf("[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \ + p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \ + p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \ + } + +#define uds_err(info, ...) \ + if (p_uds_var->loglevel >= UDS_LOG_ERROR) {\ + time_t t; \ + struct tm *p; \ + time(&t); \ + p = localtime(&t); \ + printf("[%d/%02d/%02d %02d:%02d:%02d][ERROR:%s:%3d]"info"\n", \ + p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \ + p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \ + } + +enum { + UDS_THREAD_EPWAIT = 1, // epoll wait status +}; +struct uds_thread_info { + int fdnum; + + int events; + int status; +}; + +struct uds_event_global_var { + int cur; + struct uds_event *tofree[UDS_EPOLL_MAX_EVENTS]; + char *msg_control; + int msg_controllen; + char *msg_control_send; + int msg_controlsendlen; + char *iov_base; + int iov_len; + char *iov_base_send; + int iov_sendlen; + char *buf; + int buflen; +}; + +struct uds_event { + int fd; /* 本事件由这个fd触发 */ + unsigned int tofree : 1, /* 1--in to free list; 0--not */ + pipe : 1, // this is a pipe event + reserved : 30; + union { + struct uds_event *peer; /* peer event */ + int peerfd; // scm pipe 场景单向导通,只需要一个fd即可 + }; + int (*handler)(void *, int, struct uds_event_global_var *); /* event处理函数 */ + void *priv; // private data + char cpath[UDS_SUN_PATH_LEN]; + char spath[UDS_SUN_PATH_LEN]; +}; + + +struct uds_thread_arg { + int efd; + struct uds_event_global_var *p_event_var; + struct uds_thread_info info; +}; + +struct uds_global_var { + int work_thread_num; + int *efd; + struct uds_thread_arg *work_thread; + int loglevel; + char *logstr[UDS_LOG_MAX + 1]; + struct _tcp { + char addr[20]; + unsigned short port; + char peeraddr[20]; + unsigned short peerport; + } tcp; + struct _uds { + char sun_path[UDS_SUN_PATH_LEN]; + } uds; +}; +enum uds_cs { + UDS_SOCKET_CLIENT = 1, + UDS_SOCKET_SERVER, +}; + +struct uds_conn_arg { + int cs; // client(1) or server(2) + + int udstype; // DGRAM or STREAM + char sun_path[UDS_SUN_PATH_LEN]; + int sockfd; + int connfd; +}; + +struct uds_event *uds_add_event(int fd, struct uds_event *peer, int (*handler)(void *, int, struct uds_event_global_var *), void *priv); +struct uds_event *uds_add_pipe_event(int fd, int peerfd, int (*handler)(void *, int, struct uds_event_global_var *), void *priv); +int uds_sock_step_accept(int sockFd, int family); +int uds_build_tcp_connection(struct uds_conn_arg *arg); +int uds_build_unix_connection(struct uds_conn_arg *arg); +void uds_del_event(struct uds_event *evt); +int uds_event_suspend(int efd, struct uds_event *event); +int uds_event_insert(int efd, struct uds_event *event); +#ifdef QTFS_SERVER +int uds_proxy_main(int argc, char *argv[]); +#endif +#endif diff --git a/qtfs/ipc/uds_module.h b/qtfs/ipc/uds_module.h new file mode 100644 index 0000000..9ccbb9d --- /dev/null +++ b/qtfs/ipc/uds_module.h @@ -0,0 +1,19 @@ +#ifndef __QTFS_UDS_MODULE_H__ +#define __QTFS_UDS_MODULE_H__ + +#define UDS_BUILD_CONN_ADDR "/var/run/qtfs/remote_uds.sock" +#define UDS_DIAG_ADDR "/var/run/qtfs/uds_proxy_diag.sock" +#define UDS_LOGLEVEL_UPD "/var/run/qtfs/uds_loglevel.sock" +#define UDS_BUILD_CONN_DIR "/var/run/qtfs/" + +#define UDS_SUN_PATH_LEN 108 // from glibc +struct uds_proxy_remote_conn_req { + unsigned short type; + unsigned short resv; + char sun_path[UDS_SUN_PATH_LEN]; +}; +struct uds_proxy_remote_conn_rsp { + int ret; +}; + +#endif diff --git a/qtfs/misc.c b/qtfs/misc.c index 98222bd..44da4e1 100644 --- a/qtfs/misc.c +++ b/qtfs/misc.c @@ -156,6 +156,13 @@ long qtfs_misc_ioctl(struct file *file, unsigned int cmd, unsigned long arg) } break; } + case QTFS_IOCTL_UDS_PROXY_PID: + if (copy_from_user(&qtfs_uds_proxy_pid, (void *)arg, sizeof(int))) { + qtfs_err("ioctl get uds proxy pid failed."); + break; + } + qtfs_info("ioctl get uds proxy process pid is %d", qtfs_uds_proxy_pid); + break; } return ret; } diff --git a/qtfs/qtfs/sb.c b/qtfs/qtfs/sb.c index 7445fad..104d137 100644 --- a/qtfs/qtfs/sb.c +++ b/qtfs/qtfs/sb.c @@ -288,7 +288,7 @@ ssize_t qtfs_readiter(struct kiocb *kio, struct iov_iter *iov) req->fd = private->fd; if (req->fd <= 0) { - qtfs_err("qtfs_readiter: invalid file(0x%llx)", req->fd); + qtfs_err("qtfs_readiter: invalid file(%d)", req->fd); qtfs_conn_put_param(pvar); return -EINVAL; } @@ -360,7 +360,7 @@ ssize_t qtfs_writeiter(struct kiocb *kio, struct iov_iter *iov) req->d.fd = private->fd; if (req->d.fd < 0) { - qtfs_err("qtfs_write: invalid file(0x%llx)", req->d.fd); + qtfs_err("qtfs_write: invalid file(%d)", req->d.fd); qtfs_conn_put_param(pvar); return -EINVAL; } @@ -1172,7 +1172,7 @@ int qtfs_getattr(const struct path *path, struct kstat *stat, u32 req_mask, unsi *stat = rsp->stat; qtfs_debug("qtfs getattr success:<%s> blksiz:%u size:%lld mode:%o ino:%llu pathino:%lu. %s\n", req->path, rsp->stat.blksize, rsp->stat.size, rsp->stat.mode, rsp->stat.ino, inode->i_ino, rsp->stat.ino != inode->i_ino ? "delete current inode" : ""); - if (inode->i_ino != rsp->stat.ino || rsp->stat.mode != inode->i_mode) { + if (inode->i_ino != rsp->stat.ino || inode->i_mode != rsp->stat.mode) { if (inode->i_nlink > 0){ drop_nlink(inode); } diff --git a/qtfs/qtfs_server/Makefile b/qtfs/qtfs_server/Makefile index 9c6bcd5..2ff826f 100644 --- a/qtfs/qtfs_server/Makefile +++ b/qtfs/qtfs_server/Makefile @@ -4,15 +4,26 @@ KBUILD=/lib/modules/$(shell uname -r)/build/ obj-m:=qtfs_server.o qtfs_server-objs:=../conn.o fsops.o qtfs-server.o ../misc.o +DEPGLIB=-lglib-2.0 -I../ -I/usr/include/glib-2.0 -I/usr/lib64/glib-2.0/include + all: qtfs_server engine qtfs_server: make -C $(KBUILD) M=$(PWD) modules -engine: - gcc -O2 -o engine user_engine.c -lpthread -lglib-2.0 -I../ -I/usr/include/glib-2.0 -I/usr/lib64/glib-2.0/include -DQTFS_SERVER +engine: uds_event.o uds_main.o user_engine.o + gcc -O2 -o engine $^ -lpthread $(DEPGLIB) -I../ -I../ipc/ -DQTFS_SERVER + +user_engine.o: + cc -g -c -o user_engine.o user_engine.c $(DEPGLIB) -I../ -DQTFS_SERVER + +uds_event.o: + cc -g -c -o uds_event.o ../ipc/uds_event.c -DQTFS_SERVER + +uds_main.o: + cc -g -c -o uds_main.o ../ipc/uds_main.c -DQTFS_SERVER clean: make -C $(KBUILD) M=$(PWD) clean rm -rf engine - rm -rf ../*.o ../.*.o.cmd + rm -rf ../*.o diff --git a/qtfs/qtfs_server/fsops.c b/qtfs/qtfs_server/fsops.c index 61e8895..6c3e201 100644 --- a/qtfs/qtfs_server/fsops.c +++ b/qtfs/qtfs_server/fsops.c @@ -25,10 +25,11 @@ bool in_white_list(char *path, int type) { + int i, in_wl = -1; + if (!whitelist[type]) { return true; } - int i, in_wl = -1; for (i = 0; i < whitelist[type]->len; i++) { if (!strncmp(path, whitelist[type]->wl[i].path, whitelist[type]->wl[i].len)){ in_wl = i; @@ -202,7 +203,7 @@ static int handle_statfs(struct qtserver_arg *arg) static int handle_mount(struct qtserver_arg *arg) { struct path path; - int ret, i, in_wl = -1; + int ret; struct qtreq_mount *req = (struct qtreq_mount *)REQ(arg); struct qtrsp_mount *rsp = (struct qtrsp_mount *)RSP(arg); if (!in_white_list(req->path, QTFS_WHITELIST_MOUNT)) { diff --git a/qtfs/qtfs_server/qtfs-server.c b/qtfs/qtfs_server/qtfs-server.c index b0b8ab0..cbe07f0 100644 --- a/qtfs/qtfs_server/qtfs-server.c +++ b/qtfs/qtfs_server/qtfs-server.c @@ -214,11 +214,6 @@ long qtfs_server_misc_ioctl(struct file *file, unsigned int cmd, unsigned long a qtfs_server_thread_run = arg; break; - case QTFS_IOCTL_ALLINFO: - case QTFS_IOCTL_CLEARALL: - case QTFS_IOCTL_LOGLEVEL: - ret = qtfs_misc_ioctl(file, cmd, arg); - break; case QTFS_IOCTL_WHITELIST: if (copy_from_user(&len, (void __user *)arg, sizeof(int))) { qtfs_err("qtfs ioctl white init copy from user failed."); @@ -239,6 +234,12 @@ long qtfs_server_misc_ioctl(struct file *file, unsigned int cmd, unsigned long a qtfs_err("init %d list:%d %s", tmp->type, i, whitelist[tmp->type]->wl[i].path); } break; + case QTFS_IOCTL_ALLINFO: + case QTFS_IOCTL_CLEARALL: + case QTFS_IOCTL_LOGLEVEL: + case QTFS_IOCTL_UDS_PROXY_PID: + ret = qtfs_misc_ioctl(file, cmd, arg); + break; default: qtfs_err("qtfs misc ioctl unknown cmd:%u.", cmd); break; diff --git a/qtfs/qtfs_server/user_engine.c b/qtfs/qtfs_server/user_engine.c index 547935c..a3d627d 100644 --- a/qtfs/qtfs_server/user_engine.c +++ b/qtfs/qtfs_server/user_engine.c @@ -14,6 +14,7 @@ #include #include "comm.h" +#include "ipc/uds_main.h" char wl_type_str[QTFS_WHITELIST_MAX][10] = {"Open", "Write", "Read", "Readdir", "Mkdir", "Rmdir", "Create", "Unlink", "Rename", "Setattr", "Setxattr", "Mount"}; @@ -220,13 +221,12 @@ int qtfs_whitelist_init(int fd) int main(int argc, char *argv[]) { - if (argc != 3) { - engine_out("Usage: %s .", argv[0]); - engine_out(" Example: %s 4096 16.", argv[0]); + if (argc != 7) { + engine_out("Usage: %s .", argv[0]); + engine_out(" Example: %s 16 1 192.168.10.10 12121 192.168.10.11 12121.", argv[0]); return -1; } - int psize = atoi(argv[1]); - int thread_nums = atoi(argv[2]); + int thread_nums = atoi(argv[1]); int fd = open(QTFS_SERVER_FILE, O_RDONLY); if (fd < 0) { engine_err("qtfs server file:%s open failed, fd:%d.", QTFS_SERVER_FILE, fd); @@ -247,9 +247,9 @@ int main(int argc, char *argv[]) pthread_t texec[QTFS_MAX_THREADS]; pthread_t tepoll; - if (psize > QTFS_USERP_MAXSIZE || thread_nums > QTFS_MAX_THREADS) { - engine_err("qtfs engine param invalid, size:%d(must <= %d) thread_nums:%d(must <= %d).", - psize, QTFS_USERP_MAXSIZE, thread_nums, QTFS_MAX_THREADS); + if (thread_nums > QTFS_MAX_THREADS) { + engine_err("qtfs engine param invalid, thread_nums:%d(must <= %d).", + thread_nums, QTFS_MAX_THREADS); goto end; } (void)ioctl(fd, QTFS_IOCTL_EXIT, 1); @@ -257,24 +257,30 @@ int main(int argc, char *argv[]) signal(SIGKILL, qtfs_signal_int); signal(SIGTERM, qtfs_signal_int); - struct qtfs_server_userp_s *userp = qtfs_engine_thread_init(fd, thread_nums, psize); + struct qtfs_server_userp_s *userp = qtfs_engine_thread_init(fd, thread_nums, QTFS_USERP_SIZE); if (userp == NULL) { engine_out("qtfs engine userp init failed."); goto end; } struct engine_arg arg[QTFS_MAX_THREADS]; for (int i = 0; i < thread_nums; i++) { - arg[i].psize = psize; + arg[i].psize = QTFS_USERP_SIZE; arg[i].fd = fd; arg[i].thread_idx = i; (void)pthread_create(&texec[i], NULL, qtfs_engine_kthread, &arg[i]); } (void)pthread_create(&tepoll, NULL, qtfs_engine_epoll_thread, &arg[0]); + // 必须放在这个位置,uds main里面最终也有join + if (uds_proxy_main(6, &argv[1]) != 0) { + engine_out("uds proxy start failed."); + goto engine_free; + } for (int i = 0; i < thread_nums; i++) { pthread_join(texec[i], NULL); engine_out("qtfs engine join thread %d.", i); } pthread_join(tepoll, NULL); +engine_free: qtfs_engine_userp_free(userp, thread_nums); engine_out("qtfs engine join epoll thread."); end: diff --git a/qtfs/qtinfo/qtinfo.c b/qtfs/qtinfo/qtinfo.c index dc88da0..a8ba2e0 100644 --- a/qtfs/qtinfo/qtinfo.c +++ b/qtfs/qtinfo/qtinfo.c @@ -4,9 +4,13 @@ #include #include #include +#include +#include +#include #include "qtinfo.h" #include "comm.h" +#include "ipc/uds_main.h" #ifdef client #define QTFS_DEV_NAME "/dev/qtfs_client" @@ -312,6 +316,69 @@ void qtinfo_opt_p(int fd, char *support) return; } +#define PATH_MAX 4096 +void qtinfo_opt_u() +{ + int len; + struct sockaddr_un svr; + int sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sockfd < 0) { + qtinfo_err("Create socket fd failed."); + return; + } + + memset(&svr, 0, sizeof(svr)); + svr.sun_family = AF_UNIX; + strcpy(svr.sun_path, UDS_DIAG_ADDR); + len = offsetof(struct sockaddr_un, sun_path) + strlen(svr.sun_path); + if (connect(sockfd, (struct sockaddr *)&svr, len) < 0) { + qtinfo_err("connect to %s failed.", UDS_DIAG_ADDR); + return; + } + while (1) { + char buf[256]; + int n; + memset(buf, 0, 256); + n = recv(sockfd, buf, 256, 0); + if (n <= 0) + break; + qtinfo_out2("%s", buf); + } + close(sockfd); + return; +} + +void qtinfo_opt_s() +{ + int len; + struct sockaddr_un svr; + int sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sockfd < 0) { + qtinfo_err("Create socket fd failed."); + return; + } + + memset(&svr, 0, sizeof(svr)); + svr.sun_family = AF_UNIX; + strcpy(svr.sun_path, UDS_LOGLEVEL_UPD); + len = offsetof(struct sockaddr_un, sun_path) + strlen(svr.sun_path); + if (connect(sockfd, (struct sockaddr *)&svr, len) < 0) { + qtinfo_err("connect to %s failed.", UDS_LOGLEVEL_UPD); + return; + } + while (1) { + char buf[256]; + int n; + memset(buf, 0, 256); + n = recv(sockfd, buf, 256, 0); + if (n <= 0) + break; + qtinfo_out2("%s", buf); + } + close(sockfd); + return; + +} static void qtinfo_help(char *exec) { @@ -322,6 +389,8 @@ static void qtinfo_help(char *exec) qtinfo_out(" -l, Set log level(valid param: \"NONE\", \"ERROR\", \"WARN\", \"INFO\", \"DEBUG\")."); qtinfo_out(" -t, For test informations."); qtinfo_out(" -p, Epoll support file mode(1: any files; 0: only fifo)."); + qtinfo_out(" -u, Display unix socket proxy diagnostic info"); + qtinfo_out(" -s, Set unix socket proxy log level(Increase by 1 each time)"); } int main(int argc, char *argv[]) @@ -334,7 +403,7 @@ int main(int argc, char *argv[]) qtinfo_err("open file %s failed.", QTFS_DEV_NAME); return 0; } - while ((ch = getopt(argc, argv, "acl:tp:")) != -1) { + while ((ch = getopt(argc, argv, "acl:tp:us")) != -1) { switch (ch) { case 'a': qtinfo_opt_a(fd); @@ -351,6 +420,12 @@ int main(int argc, char *argv[]) case 'p': qtinfo_opt_p(fd, optarg); break; + case 'u': + qtinfo_opt_u(); + break; + case 's': + qtinfo_opt_s(); + break; default: qtinfo_help(argv[0]); break; diff --git a/qtfs/qtsock.c b/qtfs/qtsock.c new file mode 100644 index 0000000..58b2eab --- /dev/null +++ b/qtfs/qtsock.c @@ -0,0 +1,332 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "conn.h" +#include "log.h" +#include "comm.h" +#include "qtfs/syscall.h" + +#define MAX_SOCK_PATH_LEN 108 + +static struct socket *qtfs_sock = NULL; +static struct mutex qtfs_sock_mutex; +static char qtfs_sock_path[] = "/var/run/qtfs/remote_uds.sock"; + +struct qtsock_wl_stru qtsock_wl; + +static struct sock *(*origin_unix_find_other)(struct net *net, + struct sockaddr_un *sunname, int len, + int type, unsigned int hash, int *error); + +struct ftrace_hook { + const char *name; + void *func; + void *origin; + + unsigned long addr; + struct ftrace_ops ops; +}; + +struct ftrace_hook unix_find_other_hook; + +static int resolve_hook_address(struct ftrace_hook *hook) +{ + hook->addr = qtfs_kallsyms_lookup_name(hook->name); + if (!hook->addr) { + qtfs_warn("unresolved symbol during resolving hook address:%s\n", hook->name); + return -ENOENT; + } + *((unsigned long *)hook->origin) = hook->addr; + + return 0; +} + +static void notrace ftrace_thunk(unsigned long ip, unsigned long parent_ip, + struct ftrace_ops *ops, struct pt_regs *regs) +{ + struct ftrace_hook *hook = container_of(ops, struct ftrace_hook, ops); + + if (!within_module(parent_ip, THIS_MODULE)) + regs->ip = (unsigned long)hook->func; +} + +int install_hook(struct ftrace_hook *hook) +{ + int err; + + err = resolve_hook_address(hook); + if (err) + return err; + + hook->ops.func = ftrace_thunk; + hook->ops.flags = FTRACE_OPS_FL_SAVE_REGS | FTRACE_OPS_FL_IPMODIFY; + + err = ftrace_set_filter_ip(&hook->ops, hook->addr, 0, 0); + if (err) { + qtfs_err("ftrace_set_filter_ip failed:%d\n", err); + return err; + } + + err = register_ftrace_function(&hook->ops); + if (err) { + qtfs_err("register_ftrace_function failed with :%d\n", err); + ftrace_set_filter_ip(&hook->ops, hook->addr, 1, 0); + return err; + } + qtfs_info("install hook(%s) done\n", hook->name); + + return 0; +} + +void remove_hook(struct ftrace_hook *hook) +{ + int err; + + err = unregister_ftrace_function(&hook->ops); + if (err) + qtfs_err("unregister_ftrace_function failed:%d\n", err); + + err = ftrace_set_filter_ip(&hook->ops, hook->addr, 1, 0); + if (err) + qtfs_err("ftrace_set_filter_ip failed:%d\n", err); + qtfs_info("remove hook(%s) done", hook->name); +} + +struct qtfs_sock_req { + int magic; + int type; + char sunname[MAX_SOCK_PATH_LEN]; +}; + +struct qtfs_sock_rsp { + int found; +}; + +static int qtsock_conn(void) +{ + int ret; + struct sockaddr_un saddr; + + ret = mutex_lock_interruptible(&qtfs_sock_mutex); + if (ret <0) { + qtfs_err("Failed to get qtfs sock mutex lock:%d\n", ret); + return false; + } + // calling this function means qtfs_sock isn't working properly. + // so it's ok to release and clean old qtfs_sock + if (qtfs_sock) { + sock_release(qtfs_sock); + qtfs_sock = NULL; + } + // connect to userspace unix socket server + ret = __sock_create(&init_net, AF_UNIX, SOCK_STREAM, 0, &qtfs_sock, 1); + if (ret) { + qtfs_err("qtfs sock client init create sock failed:%d\n", ret); + mutex_unlock(&qtfs_sock_mutex); + return ret; + } + saddr.sun_family = PF_UNIX; + strcpy(saddr.sun_path, qtfs_sock_path); + ret = qtfs_sock->ops->connect(qtfs_sock, (struct sockaddr *)&saddr, + sizeof(struct sockaddr_un) - 1, 0); + if (ret) { + qtfs_err("qtfs sock client sock connect failed:%d\n", ret); + sock_release(qtfs_sock); + qtfs_sock = NULL; + mutex_unlock(&qtfs_sock_mutex); + return ret; + } + + mutex_unlock(&qtfs_sock_mutex); + return ret; +} + +bool qtfs_udsfind(char *sunname, int len, int type) +{ + struct qtfs_sock_req qs_req; + struct qtfs_sock_rsp qs_rsp; + struct kvec send_vec, recv_vec; + struct msghdr send_msg, recv_msg; + int ret; + int retry = 0, penalty = 100, i = 0; + + // qtfs_sock still not initialized, try to connect to server + if (!qtfs_sock && (qtsock_conn() < 0)) { + qtfs_err("failed to connect to qtfs socket\n"); + return false; + } + if (len > MAX_SOCK_PATH_LEN) { + qtfs_err("Invalid socket path name len(%d)\n", len); + return false; + } + memset(&qs_req, 0, sizeof(qs_req)); + memset(&qs_rsp, 0, sizeof(qs_rsp)); + strncpy(qs_req.sunname, sunname, len); + qs_req.type = type; + qs_req.magic = 0xDEADBEEF; + + memset(&send_msg, 0, sizeof(send_msg)); + memset(&send_vec, 0, sizeof(send_vec)); + memset(&recv_msg, 0, sizeof(recv_msg)); + memset(&recv_vec, 0, sizeof(recv_vec)); + + send_vec.iov_base = &qs_req; + send_vec.iov_len = sizeof(qs_req); + qtfs_info("qtfs uds find socket(%s), type(%d)\n", sunname, type); + +reconn: + if (retry) { + for (i = 0; i < retry; i++) { + if (qtsock_conn() == 0) + break; + qtfs_err("qtfs socket reconnect failed for %d trial", i+1); + penalty *= 2; + msleep(penalty); + } + } + ret = mutex_lock_interruptible(&qtfs_sock_mutex); + if (ret < 0) { + qtfs_err("Failed to get qtfs sock mutex lock:%d\n", ret); + return false; + } + if (!qtfs_sock) { + qtfs_err("qtfs_sock is NULL, please check\n"); + mutex_unlock(&qtfs_sock_mutex); + return false; + } + send_msg.msg_flags |= MSG_NOSIGNAL; + ret = kernel_sendmsg(qtfs_sock, &send_msg, &send_vec, 1, sizeof(qs_req)); + if (ret == -EPIPE && retry == 0) { + qtfs_err("uds find connection has broken, try to reconnect\n"); + retry = 3; + mutex_unlock(&qtfs_sock_mutex); + goto reconn; + } else if (ret < 0) { + qtfs_err("Failed to send uds find message:%d\n", ret); + mutex_unlock(&qtfs_sock_mutex); + return false; + } + + // waiting for response + recv_vec.iov_base = &qs_rsp; + recv_vec.iov_len = sizeof(qs_rsp); +retry: + recv_msg.msg_flags |= MSG_NOSIGNAL; + ret = kernel_recvmsg(qtfs_sock, &recv_msg, &recv_vec, 1, sizeof(qs_rsp), 0); + if (ret == -ERESTARTSYS || ret == -EINTR) { + qtfs_err("uds remote find get interrupted, just retry"); + msleep(1); + goto retry; + } + mutex_unlock(&qtfs_sock_mutex); + if (ret < 0) { + qtfs_err("Failed to receive uds find response:%d\n", ret); + return false; + } + qtfs_info("uds remote find socket(%s), type(%d), result:%s\n", sunname, type, qs_rsp.found ? "found" : "not found"); + return qs_rsp.found; +} + +static int uds_find_whitelist(const char *path) +{ + int i; + int ret = 1; + read_lock(&qtsock_wl.rwlock); + for (i = 0; i< qtsock_wl.nums; i++) { + if (strncmp(path, qtsock_wl.wl[i], strlen(qtsock_wl.wl[i])) == 0) { + ret = 0; + break; + } + } + read_unlock(&qtsock_wl.rwlock); + return ret; +} + +static inline bool uds_is_proxy(void) +{ + return (current->tgid == qtfs_uds_proxy_pid); +} + +static struct sock *qtfs_unix_find_other(struct net *net, + struct sockaddr_un *sunname, int len, + int type, unsigned int hash, int *error) +{ + struct sock *other = NULL; + bool found = false; + + qtfs_debug("in qtfs_unix_find_other (%s)\n", sunname->sun_path); + other = origin_unix_find_other(net, sunname, len, type, hash, error); + if (other) { + qtfs_debug("find unix other sock(%s) locally", sunname->sun_path); + return other; + } + + // do not call remote find if sunname is annomous or sunpath not in whitelist + if (!sunname->sun_path[0] || uds_find_whitelist(sunname->sun_path) || + uds_is_proxy() == true) { + *error = -ECONNREFUSED; + return NULL; + } + + qtfs_info("Failed to find unix other sock(%s) locally, try to find remotely\n", sunname->sun_path); + // refer userspace service to get remote socket status + // if found, which means userspace service has create this unix socket server, just go to origin_unix_find_other, it will be found + // if not found, return NULL + found = qtfs_udsfind(sunname->sun_path, len, type); + if (!found) { + qtfs_info("failed to find unix other sock(%s) remotely", sunname->sun_path); + *error = -ECONNREFUSED; + return NULL; + } + qtfs_info("find unix other sock(%s) remotely\n", sunname->sun_path); + + // found it remotely, so we will inform userspace engine to create specfic unix socket and connect to qtfs server + // and call unix_find_other locally + // xxx: will this be called recursively? Hope not + return origin_unix_find_other(net, sunname, len, type, hash, error); +} + +int qtfs_sock_init(void) +{ + qtfs_kallsyms_hack_init(); + + qtfs_info("in qtfs ftrace hook unix_find_other\n"); + unix_find_other_hook.name = "unix_find_other"; + unix_find_other_hook.func = qtfs_unix_find_other; + unix_find_other_hook.origin = &origin_unix_find_other; + + install_hook(&unix_find_other_hook); + mutex_init(&qtfs_sock_mutex); + rwlock_init(&qtsock_wl.rwlock); + qtsock_wl.nums = 0; + qtsock_wl.wl = (char **)kmalloc(sizeof(char *) * QTSOCK_WL_MAX_NUM, GFP_KERNEL); + if (qtsock_wl.wl == NULL) { + + qtfs_err("failed to kmalloc wl, max num:%d", QTSOCK_WL_MAX_NUM); + } + + return 0; +} + +void qtfs_sock_exit(void) +{ + int ret; + qtfs_info("exit qtfs ftrace, remove unix_find_other_hook\n"); + remove_hook(&unix_find_other_hook); + + ret = mutex_lock_interruptible(&qtfs_sock_mutex); + if (ret < 0) + qtfs_err("Failed to get qtfs sock mutex lock:%d\n", ret); + // close unix socket connected to userspace + if (qtfs_sock) { + sock_release(qtfs_sock); + qtfs_sock = NULL; + } + mutex_unlock(&qtfs_sock_mutex); +} -- 2.33.0