dpu-utilities/0008-Add-udsproxy.patch
yangxin 4141866c23 Add udsproxy, add whitelist to qtfs and rexec, fix errors.
Signed-off-by: yangxin <245051644@qq.com>
2023-02-10 14:16:08 +00:00

2813 lines
82 KiB
Diff
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

From 435916d197bfce059e3afbca0975315fc5b1662f Mon Sep 17 00:00:00 2001
From: yangxin <245051644@qq.com>
Date: Fri, 10 Feb 2023 17:02:05 +0800
Subject: [PATCH 4/5] Add udsproxy.
Signed-off-by: yangxin <245051644@qq.com>
---
qtfs/README.md | 7 +-
qtfs/comm.h | 8 +-
qtfs/conn.c | 9 +-
qtfs/conn.h | 1 +
qtfs/ipc/Makefile | 22 +
qtfs/ipc/uds_connector.c | 129 +++++
qtfs/ipc/uds_event.c | 999 +++++++++++++++++++++++++++++++++
qtfs/ipc/uds_event.h | 64 +++
qtfs/ipc/uds_main.c | 556 ++++++++++++++++++
qtfs/ipc/uds_main.h | 141 +++++
qtfs/ipc/uds_module.h | 19 +
qtfs/misc.c | 7 +
qtfs/qtfs/sb.c | 6 +-
qtfs/qtfs_server/Makefile | 17 +-
qtfs/qtfs_server/fsops.c | 5 +-
qtfs/qtfs_server/qtfs-server.c | 11 +-
qtfs/qtfs_server/user_engine.c | 26 +-
qtfs/qtinfo/qtinfo.c | 77 ++-
qtfs/qtsock.c | 332 +++++++++++
19 files changed, 2406 insertions(+), 30 deletions(-)
create mode 100644 qtfs/ipc/Makefile
create mode 100644 qtfs/ipc/uds_connector.c
create mode 100644 qtfs/ipc/uds_event.c
create mode 100644 qtfs/ipc/uds_event.h
create mode 100644 qtfs/ipc/uds_main.c
create mode 100644 qtfs/ipc/uds_main.h
create mode 100644 qtfs/ipc/uds_module.h
create mode 100644 qtfs/qtsock.c
diff --git a/qtfs/README.md b/qtfs/README.md
index 0cbc2e1..19987e0 100644
--- a/qtfs/README.md
+++ b/qtfs/README.md
@@ -24,6 +24,7 @@ qtfs的特性
## 安装教程
目录说明:
++ **ipc**: 跨主机unix domain socket协同组件在该目录下编译udsproxyd二进制和libudsproxy.so库。
+ **qtfs**: 客户端内核模块相关代码直接在该目录下编译客户端ko。
+ **qtfs_server**: 服务端内核模块相关代码直接在该目录下编译服务端ko和相关程序。
+ **qtinfo**: 诊断工具支持查询文件系统的工作状态以及修改log级别等。
@@ -34,19 +35,23 @@ qtfs的特性
1. 要求内核版本在5.10或更高版本。
2. 安装内核开发包yum install kernel-devel。
+ 3. 假设host服务器ip为192.168.10.10dpu为192.168.10.11
服务端安装:
1. cd qtfs_server
2. make clean && make
3. insmod qtfs_server.ko qtfs_server_ip=x.x.x.x qtfs_server_port=12345 qtfs_log_level=WARN
- 4. ./engine 4096 16
+ 4. nohup ./engine 16 1 192.168.10.10 12121 192.168.10.11 12121 2>&1 &
客户端安装:
1. cd qtfs
2. make clean && make
3. insmod qtfs.ko qtfs_server_ip=x.x.x.x qtfs_server_port=12345 qtfs_log_level=WARN
+ 4. cd ../ipc/
+ 5. make clean && make && make install
+ 6. nohup udsproxyd 1 192.168.10.11 12121 192.168.10.10 12121 2>&1 &
## 使用说明
diff --git a/qtfs/comm.h b/qtfs/comm.h
index 901552c..2e562bb 100644
--- a/qtfs/comm.h
+++ b/qtfs/comm.h
@@ -3,6 +3,9 @@
extern struct qtinfo *qtfs_diag_info;
+#define QTFS_CLIENT_DEV "/dev/qtfs_client"
+#define QTFS_SERVER_DEV "/dev/qtfs_server"
+
#define QTFS_IOCTL_MAGIC 'Q'
enum {
_QTFS_IOCTL_EXEC,
@@ -18,6 +21,7 @@ enum {
_QTFS_IOCTL_LOG_LEVEL,
_QTFS_IOCTL_EPOLL_SUPPORT,
+ _QTFS_IOCTL_UDS_PROXY_PID,
};
#define QTFS_IOCTL_THREAD_INIT _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_EXEC)
@@ -31,6 +35,7 @@ enum {
#define QTFS_IOCTL_CLEARALL _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_CLEARALL)
#define QTFS_IOCTL_LOGLEVEL _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_LOG_LEVEL)
#define QTFS_IOCTL_EPOLL_SUPPORT _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_EPOLL_SUPPORT)
+#define QTFS_IOCTL_UDS_PROXY_PID _IO(QTFS_IOCTL_MAGIC, _QTFS_IOCTL_UDS_PROXY_PID)
#define QTINFO_MAX_EVENT_TYPE 36 // look qtreq_type at req.h
#define QTFS_FUNCTION_LEN 64
@@ -119,6 +124,7 @@ enum qtinfo_cnts {
};
#endif
+#if (defined(QTFS_CLIENT) || defined(client) || defined(QTFS_SERVER) || defined(server))
// for connection state machine
typedef enum {
QTCONN_INIT,
@@ -159,7 +165,7 @@ struct qtinfo {
#define QTINFO_STATE(state) ((state == QTCONN_INIT) ? "INIT" : \
((state == QTCONN_CONNECTING) ? "CONNECTING" : \
((state == QTCONN_ACTIVE) ? "ACTIVE" : "UNKNOWN")))
-
+#endif
//ko compile
#if (defined(QTFS_CLIENT) || defined(client))
static inline void qtinfo_clear(void)
diff --git a/qtfs/conn.c b/qtfs/conn.c
index 26930b1..c84c85c 100644
--- a/qtfs/conn.c
+++ b/qtfs/conn.c
@@ -32,6 +32,7 @@ struct qtfs_sock_var_s *qtfs_epoll_var = NULL;
struct socket *qtfs_server_main_sock = NULL;
struct qtfs_server_userp_s *qtfs_userps = NULL;
#endif
+int qtfs_uds_proxy_pid = -1;
#define QTFS_EPOLL_THREADIDX (QTFS_MAX_THREADS + 4)
@@ -76,6 +77,10 @@ static int qtfs_conn_sockserver_init(struct qtfs_sock_var_s *pvar)
{
struct socket *sock;
int ret;
+ struct sockaddr_in saddr;
+ saddr.sin_family = AF_INET;
+ saddr.sin_port = htons(pvar->port);
+ saddr.sin_addr.s_addr = in_aton(pvar->addr);
if (!QTCONN_IS_EPOLL_CONN(pvar) && qtfs_server_main_sock != NULL) {
qtfs_info("qtfs server main sock is %lx, valid or out-of-date?", (unsigned long)qtfs_server_main_sock);
@@ -147,10 +152,6 @@ static int qtfs_conn_sockclient_init(struct qtfs_sock_var_s *pvar)
{
struct socket *sock;
int ret;
- struct sockaddr_in saddr;
- saddr.sin_family = AF_INET;
- saddr.sin_port = htons(pvar->port);
- saddr.sin_addr.s_addr = in_aton(pvar->addr);
ret = sock_create_kern(&init_net, AF_INET, SOCK_STREAM, 0, &sock);
if (ret) {
diff --git a/qtfs/conn.h b/qtfs/conn.h
index db590fc..742def4 100644
--- a/qtfs/conn.h
+++ b/qtfs/conn.h
@@ -26,6 +26,7 @@ extern char qtfs_log_level[QTFS_LOGLEVEL_STRLEN];
extern int log_level;
extern struct qtinfo *qtfs_diag_info;
extern bool qtfs_epoll_mode;
+extern int qtfs_uds_proxy_pid;
#define qtfs_conn_get_param(void) _qtfs_conn_get_param(__func__)
diff --git a/qtfs/ipc/Makefile b/qtfs/ipc/Makefile
new file mode 100644
index 0000000..47e74ad
--- /dev/null
+++ b/qtfs/ipc/Makefile
@@ -0,0 +1,22 @@
+all: udsproxyd libudsproxy.so
+
+udsproxyd: uds_event.o uds_main.o
+ gcc -g -O2 -o udsproxyd $^ -I../
+
+uds_event.o:
+ cc -g -c -o uds_event.o uds_event.c
+
+uds_main.o:
+ cc -g -c -o uds_main.o uds_main.c
+
+libudsproxy.so:
+ gcc -g -O2 -o libudsproxy.so uds_connector.c -fPIC --shared
+
+install:
+ yes | cp udsproxyd /usr/bin/
+ yes | cp libudsproxy.so /usr/lib64/
+
+clean:
+ @rm -rf *.o udsproxyd libudsproxy.so
+
+.PHONY: clean
diff --git a/qtfs/ipc/uds_connector.c b/qtfs/ipc/uds_connector.c
new file mode 100644
index 0000000..3c46ce5
--- /dev/null
+++ b/qtfs/ipc/uds_connector.c
@@ -0,0 +1,129 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <time.h>
+#include <dlfcn.h>
+#include <sys/types.h>
+
+#include "uds_module.h"
+
+#define uds_log(info, ...) \
+ do { \
+ time_t t; \
+ struct tm *p; \
+ time(&t); \
+ p = localtime(&t); \
+ printf("[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \
+ p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \
+ p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \
+ } while (0);
+
+#define uds_err(info, ...) \
+ do { \
+ time_t t; \
+ struct tm *p; \
+ time(&t); \
+ p = localtime(&t); \
+ printf("[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \
+ p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \
+ p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \
+ } while (0);
+
+static unsigned short uds_conn_get_sock_type(int sockfd)
+{
+ unsigned short type;
+ int len = 2;
+ int ret = getsockopt(sockfd, SOL_SOCKET, SO_TYPE, &type, &len);
+ if (ret < 0) {
+ uds_err("get sock type failed, fd:%d", sockfd);
+ return (unsigned short)-1;
+ }
+ uds_log("fd:%d type:%d", sockfd, type);
+ return type;
+}
+
+static int uds_conn_whitelist_check(const char *path)
+{
+ return 1;
+}
+
+int connect(int fd, const struct sockaddr *addrarg, socklen_t len)
+{
+ int sock_fd;
+ typeof(connect) *libcconnect = NULL;
+ int libcret;
+ const struct sockaddr_un *addr = (const struct sockaddr_un *)addrarg;
+
+ if (libcconnect == NULL) {
+ libcconnect = dlsym(((void *) - 1l), "connect");
+ if (libcconnect == NULL) {
+ uds_err("can't find connect by dlsym.");
+ return -1;
+ }
+ }
+
+ libcret = (*libcconnect)(fd, addrarg, len);
+ if (libcret == 0 || addr->sun_family != AF_UNIX) {
+ // 如果本地connect成功或者非UNIX DOMAIN SOCKET都直接返回即可
+ return libcret;
+ }
+
+ uds_log("enter uds connect fd:%d sunpath:%s family:%d len:%d connect function:0x%lx", fd, addr->sun_path,
+ addr->sun_family, len, libcconnect);
+ // 本地未连接且是uds链接
+ if (!uds_conn_whitelist_check(addr->sun_path)) {
+ uds_err("path:%s not in white list", addr->sun_path);
+ return libcret;
+ }
+
+ // 尝试远端链接
+ do {
+ int ret;
+ struct uds_proxy_remote_conn_req remoteconn;
+ struct uds_proxy_remote_conn_rsp remotersp;
+ struct sockaddr_un proxy = {.sun_family = AF_UNIX};
+ sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sock_fd < 0) {
+ uds_err("create socket failed");
+ return libcret;
+ }
+
+ strncpy(proxy.sun_path, UDS_BUILD_CONN_ADDR, sizeof(proxy.sun_path));
+ if ((*libcconnect)(sock_fd, (struct sockaddr *)&proxy, sizeof(struct sockaddr_un)) < 0) {
+ uds_err("can't connect to uds proxy: %s", UDS_BUILD_CONN_ADDR);
+ goto err_end;
+ }
+ // 这里type需要是第一个入参fd的type
+ remoteconn.type = uds_conn_get_sock_type(fd);
+ if (remoteconn.type == (unsigned short)-1) {
+ remoteconn.type = SOCK_STREAM;
+ }
+ memset(remoteconn.sun_path, 0, sizeof(remoteconn.sun_path));
+ strncpy(remoteconn.sun_path, addr->sun_path, sizeof(remoteconn.sun_path));
+ ret = send(sock_fd, &remoteconn, sizeof(remoteconn), 0);
+ if (ret <= 0) {
+ uds_err("send remote connect request failed, ret:%d err:%s", ret, strerror(errno));
+ goto err_end;
+ }
+ ret = recv(sock_fd, &remotersp, sizeof(remotersp), MSG_WAITALL);
+ if (ret <= 0) {
+ uds_err("recv remote connect replay failed, ret:%d err:%s", ret, strerror(errno));
+ goto err_end;
+ }
+ if (remotersp.ret == 0) {
+ goto err_end;
+ }
+ } while(0);
+
+ close(sock_fd);
+ return (*libcconnect)(fd, addrarg, len);
+
+err_end:
+ close(sock_fd);
+ return libcret;
+}
diff --git a/qtfs/ipc/uds_event.c b/qtfs/ipc/uds_event.c
new file mode 100644
index 0000000..ff4d79b
--- /dev/null
+++ b/qtfs/ipc/uds_event.c
@@ -0,0 +1,999 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <signal.h>
+#include <errno.h>
+#include <time.h>
+#include <fcntl.h>
+#include <sys/epoll.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include "dirent.h"
+
+#include "uds_main.h"
+#include "uds_event.h"
+
+int uds_event_build_step2(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_remote_build(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_build_step3(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_uds2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_tcp2uds(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_build_step4(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_tcp2pipe(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_pipe2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+
+int uds_event_module_init(struct uds_event_global_var *p)
+{
+ p->msg_controllen = UDS_EVENT_BUFLEN;
+ p->iov_len = UDS_EVENT_BUFLEN;
+ p->buflen = UDS_EVENT_BUFLEN;
+ p->msg_controlsendlen = UDS_EVENT_BUFLEN;
+ p->iov_sendlen = UDS_EVENT_BUFLEN;
+
+ p->msg_control = (char *)malloc(p->msg_controllen);
+ if (p->msg_control == NULL) {
+ uds_err("malloc msg control buf failed.");
+ p->msg_controllen = 0;
+ return EVENT_ERR;
+ }
+ p->msg_control_send = (char *)malloc(p->msg_controlsendlen);
+ if (p->msg_control_send == NULL) {
+ goto free1;
+ }
+ p->iov_base = (char *)malloc(p->iov_len);
+ if (p->iov_base == NULL) {
+ uds_err("malloc iov base failed.");
+ goto free2;
+ }
+ p->iov_base_send = (char *)malloc(p->iov_sendlen);
+ if (p->iov_base_send == NULL) {
+ goto free3;
+ }
+ p->buf = (char *)malloc(p->buflen);
+ if (p->buf == NULL) {
+ uds_err("malloc buf failed.");
+ goto free4;
+ }
+ return EVENT_OK;
+
+free4:
+ free(p->iov_base_send);
+ p->iov_base_send = NULL;
+
+free3:
+ free(p->iov_base);
+ p->iov_base = NULL;
+
+free2:
+ free(p->msg_control_send);
+ p->msg_control_send = NULL;
+
+free1:
+ free(p->msg_control);
+ p->msg_control = NULL;
+ return EVENT_ERR;
+}
+
+void uds_event_module_fini(struct uds_event_global_var *p)
+{
+ if (p->msg_control != NULL) {
+ free(p->msg_control);
+ p->msg_control = NULL;
+ p->msg_controllen = 0;
+ }
+ if (p->msg_control_send != NULL) {
+ free(p->msg_control_send);
+ p->msg_control_send = NULL;
+ p->msg_controlsendlen = 0;
+ }
+ if (p->iov_base != NULL) {
+ free(p->iov_base);
+ p->iov_base = NULL;
+ p->iov_len = 0;
+ }
+ if (p->iov_base_send != NULL) {
+ free(p->iov_base_send);
+ p->iov_base_send = NULL;
+ p->iov_sendlen = 0;
+ }
+ if (p->buf != NULL) {
+ free(p->buf);
+ p->buf = NULL;
+ p->buflen = 0;
+ }
+ return;
+}
+
+int uds_event_pre_hook(struct uds_event_global_var *p_event_var)
+{
+ p_event_var->cur = 0;
+ memset(p_event_var->tofree, 0, sizeof(struct uds_event *) * UDS_EPOLL_MAX_EVENTS);
+ return 0;
+}
+
+int uds_event_post_hook(struct uds_event_global_var *p_event_var)
+{
+ for (int i = 0; i < p_event_var->cur; i++) {
+ uds_log("event:%lx fd:%d free by its peer", p_event_var->tofree[i], p_event_var->tofree[i]->fd);
+ uds_del_event(p_event_var->tofree[i]);
+ }
+ return 0;
+}
+
+int uds_event_add_to_free(struct uds_event_global_var *p_event_var, struct uds_event *evt)
+{
+ if (evt->pipe == 1) {
+ uds_log("pipe event:%d no need to free peer", evt->fd);
+ return 0;
+ }
+
+ struct uds_event *peerevt = evt->peer;
+ if (peerevt == NULL) {
+ uds_err("peer event add to free is NULL, my fd:%d", evt->fd);
+ return -1;
+ }
+ peerevt->tofree = 1;
+ uds_log("event fd:%d addr:%lx add to free", peerevt->fd, peerevt);
+ p_event_var->tofree[p_event_var->cur] = peerevt;
+ p_event_var->cur++;
+ return 0;
+}
+
+int uds_event_pre_handler(struct uds_event *evt)
+{
+ if (evt->tofree == 1) {
+ uds_log("event fd:%d marked by peer as pending deletion", evt->fd);
+ return EVENT_ERR;
+ }
+ return EVENT_OK;
+}
+
+/*
+ * 1. accept local uds connect request
+ * 2. set new connection's event to build link step2
+ * 3. add new connection event to epoll list
+ */
+int uds_event_uds_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ int connfd;
+ struct uds_event *evt = (struct uds_event *)arg;
+ if (evt == NULL) {
+ uds_err("param is invalid.");
+ return EVENT_ERR;
+ }
+ connfd = uds_sock_step_accept(evt->fd, AF_UNIX);
+ if (connfd <= 0) {
+ uds_err("conn fd error:%d", connfd);
+ return EVENT_ERR;
+ }
+
+ uds_log("accept an new connection, fd:%d", connfd);
+
+ uds_add_event(connfd, NULL, uds_event_build_step2, NULL);
+ return EVENT_OK;
+}
+
+int uds_event_build_step2(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ if (evt == NULL) {
+ uds_err("param is invalid.");
+ return EVENT_ERR;
+ }
+ char buf[sizeof(struct uds_tcp2tcp) + sizeof(struct uds_proxy_remote_conn_req)] = {0};
+ struct uds_tcp2tcp *bdmsg = (struct uds_tcp2tcp *)buf;
+ struct uds_proxy_remote_conn_req *msg = (struct uds_proxy_remote_conn_req *)bdmsg->data;
+ int len;
+ memset(buf, 0, sizeof(buf));
+ len = recv(evt->fd, msg, sizeof(struct uds_proxy_remote_conn_req), MSG_WAITALL);
+ if (len == 0) {
+ uds_err("recv err msg:%d errno:%s", len, strerror(errno));
+ return EVENT_DEL;
+ }
+ if (len < 0) {
+ uds_err("read msg error:%d errno:%s", len, strerror(errno));
+ goto end;
+ }
+ if (msg->type != SOCK_STREAM && msg->type != SOCK_DGRAM) {
+ uds_err("uds type:%d invalid", msg->type);
+ return EVENT_ERR;
+ }
+
+ struct uds_conn_arg tcp = {
+ .cs = UDS_SOCKET_CLIENT,
+ };
+ int ret;
+ if ((ret = uds_build_tcp_connection(&tcp)) < 0) {
+ uds_err("step2 build tcp connection failed, return:%d", ret);
+ goto end;
+ }
+ bdmsg->msgtype = MSGCNTL_UDS;
+ bdmsg->msglen = sizeof(struct uds_proxy_remote_conn_req);
+ if (write(tcp.connfd, bdmsg, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_proxy_remote_conn_req)) < 0) {
+ uds_err("send msg to tcp failed");
+ goto end;
+ }
+
+ struct uds_proxy_remote_conn_req *priv = (void *)malloc(sizeof(struct uds_proxy_remote_conn_req));
+ if (priv == NULL) {
+ uds_err("malloc failed");
+ goto end;
+ }
+
+ uds_log("step2 recv sun path:%s, add step3 event fd:%d", msg->sun_path, tcp.connfd);
+ memcpy(priv, msg, sizeof(struct uds_proxy_remote_conn_req));
+ uds_add_event(tcp.connfd, evt, uds_event_build_step3, priv);
+
+end:
+ return EVENT_OK;
+}
+
+
+int uds_event_build_step3(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ struct uds_proxy_remote_conn_rsp msg;
+ int len;
+ memset(&msg, 0, sizeof(struct uds_proxy_remote_conn_rsp));
+ len = read(evt->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp));
+ if (len <= 0) {
+ uds_err("read error len:%d", len);
+ if (len == 0)
+ goto event_del;
+ return EVENT_ERR;
+ }
+ if (msg.ret == EVENT_ERR) {
+ uds_log("get build ack:%d, failed", msg.ret);
+ goto event_del;
+ }
+
+ struct uds_proxy_remote_conn_req *udsmsg = (struct uds_proxy_remote_conn_req *)evt->priv;
+ struct uds_conn_arg uds;
+
+ memset(&uds, 0, sizeof(struct uds_conn_arg));
+ uds.cs = UDS_SOCKET_SERVER;
+ uds.udstype = udsmsg->type;
+ strncpy(uds.sun_path, udsmsg->sun_path, sizeof(uds.sun_path));
+ if (uds_build_unix_connection(&uds) < 0) {
+ uds_err("failed to build uds server sunpath:%s", uds.sun_path);
+ goto event_del;
+ }
+ uds_log("remote conn build success, build uds server type:%d sunpath:%s fd:%d OK this event suspend,",
+ udsmsg->type, udsmsg->sun_path, uds.sockfd);
+ uds_event_suspend(epfd, evt);
+ uds_add_event(uds.sockfd, evt, uds_event_build_step4, NULL);
+
+ msg.ret = 1;
+ write(evt->peer->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp));
+ return EVENT_OK;
+
+event_del:
+ msg.ret = 0;
+ write(evt->peer->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp));
+ free(evt->priv);
+ return EVENT_DEL;
+}
+
+int uds_event_build_step4(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ int connfd = uds_sock_step_accept(evt->fd, AF_UNIX);
+ if (connfd < 0) {
+ uds_err("accept connection failed fd:%d", connfd);
+ return EVENT_ERR;
+ }
+ struct uds_event *peerevt = (struct uds_event *)evt->peer;
+ peerevt->handler = uds_event_tcp2uds;
+ peerevt->peer = uds_add_event(connfd, peerevt, uds_event_uds2tcp, NULL);
+
+ uds_log("accept new connection fd:%d, peerfd:%d frontfd:%d peerfd:%d, peerevt(fd:%d) active now",
+ connfd, evt->peer->fd, peerevt->fd, peerevt->peer->fd, peerevt->fd);
+ uds_event_insert(epfd, peerevt);
+ return EVENT_DEL;
+}
+
+int uds_event_tcp_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ int connfd = uds_sock_step_accept(evt->fd, AF_INET);
+ if (connfd <= 0) {
+ uds_err("tcp conn fd error:%d", connfd);
+ return EVENT_ERR;
+ }
+ uds_log("tcp listener event enter, new connection fd:%d.", connfd);
+
+ uds_add_event(connfd, NULL, uds_event_remote_build, NULL);
+ return 0;
+}
+
+int uds_build_connect2uds(struct uds_event *evt, struct uds_proxy_remote_conn_req *msg)
+{
+ struct uds_conn_arg targ;
+ int len = recv(evt->fd, msg, sizeof(struct uds_proxy_remote_conn_req), MSG_WAITALL);
+ if (len <= 0) {
+ uds_err("recv failed, len:%d str:%s", len, strerror(errno));
+ return EVENT_ERR;
+ }
+
+ targ.cs = UDS_SOCKET_CLIENT;
+ targ.udstype = msg->type;
+ memset(targ.sun_path, 0, sizeof(targ.sun_path));
+ strncpy(targ.sun_path, msg->sun_path, sizeof(targ.sun_path));
+ if (uds_build_unix_connection(&targ) < 0) {
+ struct uds_proxy_remote_conn_rsp ack;
+ uds_err("can't connect to sun_path:%s", targ.sun_path);
+ ack.ret = EVENT_ERR;
+ write(evt->fd, &ack, sizeof(struct uds_proxy_remote_conn_rsp));
+ return EVENT_DEL;
+ }
+
+ evt->peer = uds_add_event(targ.connfd, evt, uds_event_uds2tcp, NULL);
+ evt->handler = uds_event_tcp2uds;
+
+ uds_log("build link req from tcp, sunpath:%s, type:%d, eventfd:%d peerfd:%d",
+ msg->sun_path, msg->type, targ.connfd, evt->fd);
+
+ struct uds_proxy_remote_conn_rsp ack;
+ ack.ret = EVENT_OK;
+
+ int ret = write(evt->fd, &ack, sizeof(struct uds_proxy_remote_conn_rsp));
+ if (ret <= 0) {
+ uds_err("apply ack failed, ret:%d", ret);
+ return EVENT_DEL;
+ }
+ return EVENT_OK;
+}
+
+int uds_build_pipe_proxy(struct uds_event *evt, struct uds_stru_scm_pipe *msg)
+{
+ int len = recv(evt->fd, msg, sizeof(struct uds_stru_scm_pipe), MSG_WAITALL);
+ if (len <= 0) {
+ uds_err("recv failed, len:%d str:%s", len, strerror(errno));
+ return EVENT_ERR;
+ }
+ if (msg->dir != SCM_PIPE_READ && msg->dir != SCM_PIPE_WRITE) {
+ uds_err("invalid pipe dir:%d", msg->dir);
+ return EVENT_ERR;
+ }
+ uds_log("pipe proxy event fd:%d pipe fd:%d dir:%d", evt->fd, msg->srcfd, msg->dir);
+
+ if (msg->dir == SCM_PIPE_READ) {
+ evt->pipe = 1;
+ evt->peerfd = evt->fd;
+ evt->fd = msg->srcfd;
+ evt->handler = uds_event_pipe2tcp;
+ } else {
+ evt->pipe = 1;
+ evt->peerfd = msg->srcfd;
+ evt->handler = uds_event_tcp2pipe;
+ }
+ return EVENT_OK;
+}
+
+int uds_event_remote_build(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ struct uds_tcp2tcp *bdmsg = (struct uds_tcp2tcp *)p_event_var->iov_base;
+ struct uds_proxy_remote_conn_req *msg = (struct uds_proxy_remote_conn_req *)bdmsg->data;
+ int len;
+ int ret = EVENT_OK;
+ memset(p_event_var->iov_base, 0, p_event_var->iov_len);
+ len = recv(evt->fd, bdmsg, sizeof(struct uds_tcp2tcp), MSG_WAITALL);
+ if (len <= 0) {
+ uds_err("read no msg from sock:%d, len:%d", evt->fd, len);
+ return EVENT_ERR;
+ }
+
+ switch (bdmsg->msgtype) {
+ case MSGCNTL_UDS:
+ ret = uds_build_connect2uds(evt, msg);
+ break;
+ case MSGCNTL_PIPE:
+ ret = uds_build_pipe_proxy(evt, (struct uds_stru_scm_pipe *)bdmsg->data);
+ break;
+ default:
+ uds_err("remote build not support msgtype %d now", bdmsg->msgtype);
+ break;
+ }
+ return ret;
+}
+
+static inline mode_t uds_msg_file_mode(int fd)
+{
+ struct stat st;
+ char path[32] = {0};
+ if (fstat(fd, &st) != 0) {
+ uds_err("get fd:%d fstat failed, errstr:%s", fd, strerror(errno));
+ }
+ if (S_ISFIFO(st.st_mode)) {
+ uds_log("fd:%d is fifo", fd);
+ }
+
+ return st.st_mode;
+}
+
+static int uds_msg_scm_regular_file(int scmfd, int tcpfd, struct uds_event_global_var *p_event_var)
+{
+ int ret;
+ struct uds_tcp2tcp *p_msg = (struct uds_tcp2tcp *)p_event_var->buf;
+ struct uds_msg_scmrights *p_scmr = (struct uds_msg_scmrights *)&p_msg->data;
+ char *fdproc = calloc(1, UDS_PATH_MAX);
+ if (fdproc == NULL) {
+ uds_err("failed to calloc memory:%lx %lx", fdproc);
+ return EVENT_ERR;
+ }
+ sprintf(fdproc, "/proc/self/fd/%d", scmfd);
+ ret = readlink(fdproc, p_scmr->path, UDS_PATH_MAX);
+ if (ret < 0) {
+ uds_err("readlink:%s error, ret:%d, errstr:%s", fdproc, ret, strerror(errno));
+ free(fdproc);
+ close(scmfd);
+ return EVENT_ERR;
+ }
+ free(fdproc);
+ p_scmr->flags = fcntl(scmfd, F_GETFL, 0);
+ if (p_scmr->flags < 0) {
+ uds_err("fcntl get flags failed:%d error:%s", p_scmr->flags, strerror(errno));
+ close(scmfd);
+ return EVENT_ERR;
+ }
+ close(scmfd);
+ p_msg->msgtype = MSG_SCM_RIGHTS;
+ ret = write(tcpfd, p_msg, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_msg_scmrights));
+ if (ret <= 0) {
+ uds_err("send scm rights msg to tcp failed, ret:%d", ret);
+ return EVENT_ERR;
+ }
+ uds_log("scm rights msg send to tcp, fd:%d path:%s flags:%d", scmfd, p_scmr->path, p_scmr->flags);
+ return EVENT_OK;
+}
+
+static int uds_msg_scm_fifo_file(int scmfd, int tcpfd, struct uds_event_global_var *p_event_var)
+{
+#define FDPATH_LEN 32
+ int ret;
+ struct uds_tcp2tcp *p_get = (struct uds_tcp2tcp *)p_event_var->buf;
+ struct uds_stru_scm_pipe *p_pipe = (struct uds_stru_scm_pipe *)p_get->data;
+ char path[FDPATH_LEN] = {0};
+ struct stat st;
+ p_get->msgtype = MSG_SCM_PIPE;
+ p_get->msglen = sizeof(struct uds_stru_scm_pipe);
+
+ sprintf(path, "/proc/self/fd/%d", scmfd);
+ lstat(path, &st);
+ if (st.st_mode & S_IRUSR) {
+ p_pipe->dir = SCM_PIPE_READ;
+ uds_log("scm rights recv read pipe fd:%d, mode:%o", scmfd, st.st_mode);
+ } else if (st.st_mode & S_IWUSR) {
+ p_pipe->dir = SCM_PIPE_WRITE;
+ uds_log("scm rights recv write pipe fd:%d, mode:%o", scmfd, st.st_mode);
+ } else {
+ uds_err("scm rights recv invalid pipe, mode:%o fd:%d", st.st_mode, scmfd);
+ return EVENT_ERR;
+ }
+ p_pipe->srcfd = scmfd;
+ ret = send(tcpfd, p_get, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_stru_scm_pipe), 0);
+ if (ret <= 0) {
+ uds_err("send tar get msg failed, ret:%d errstr:%s", ret, strerror(errno));
+ return EVENT_ERR;
+ }
+ return EVENT_OK;
+}
+
+static int uds_msg_scmrights2tcp(struct cmsghdr *cmsg, int tcpfd, struct uds_event_global_var *p_event_var)
+{
+ int scmfd;
+ mode_t mode;
+
+ memset(p_event_var->buf, 0, p_event_var->buflen);
+ memcpy(&scmfd, CMSG_DATA(cmsg), sizeof(scmfd));
+ if (scmfd <= 0) {
+ uds_err("recv invalid scm fd:%d", scmfd);
+ return EVENT_ERR;
+ }
+
+ mode = uds_msg_file_mode(scmfd);
+
+ switch (mode & S_IFMT) {
+ case S_IFREG:
+ uds_log("recv scmfd:%d from uds, is regular file", scmfd);
+ uds_msg_scm_regular_file(scmfd, tcpfd, p_event_var);
+ break;
+ case S_IFIFO:
+ uds_log("recv scmfd:%d from uds, is fifo", scmfd);
+ uds_msg_scm_fifo_file(scmfd, tcpfd, p_event_var);
+ break;
+ default:
+ uds_err("scm rights not support file mode:%o", mode);
+ break;
+ }
+
+ return EVENT_OK;
+}
+
+static int uds_msg_cmsg2tcp(struct msghdr *msg, struct uds_event *evt, struct uds_event_global_var *p_event_var)
+{
+ int cnt = 0;
+ struct cmsghdr *cmsg = CMSG_FIRSTHDR(msg);
+ while (cmsg != NULL) {
+ cnt ++;
+ uds_log("cmsg type:%d len:%d level:%d, tcpfd:%d", cmsg->cmsg_type,
+ cmsg->cmsg_len, cmsg->cmsg_level, evt->peer->fd);
+ switch (cmsg->cmsg_type) {
+ case SCM_RIGHTS:
+ uds_msg_scmrights2tcp(cmsg, evt->peer->fd, p_event_var);
+ break;
+ default:
+ uds_err("cmsg type:%d not support now", cmsg->cmsg_type);
+ break;
+ }
+ cmsg = CMSG_NXTHDR(msg, cmsg);
+ }
+ return cnt;
+}
+
+static int uds_msg_scmfd_combine_msg(struct msghdr *msg, struct cmsghdr **cmsg, int *controllen, int fd)
+{
+ struct cmsghdr *cnxt = NULL;
+ if (*cmsg == NULL) {
+ cnxt = CMSG_FIRSTHDR(msg);
+ } else {
+ cnxt = CMSG_NXTHDR(msg, *cmsg);
+ }
+ *cmsg = cnxt;
+ cnxt->cmsg_level = SOL_SOCKET;
+ cnxt->cmsg_type = SCM_RIGHTS;
+ cnxt->cmsg_len = CMSG_LEN(sizeof(fd));
+ memcpy(CMSG_DATA(cnxt), &fd, sizeof(fd));
+ *controllen = *controllen + cnxt->cmsg_len;
+ return EVENT_OK;
+}
+
+static int uds_msg_scmright_send_fd(int sock, int fd)
+{
+ char byte = 0;
+ struct iovec iov;
+ struct msghdr msg;
+ struct cmsghdr *cmsg;
+ char buf[CMSG_SPACE(sizeof(fd))];
+
+ // send at least one char
+ memset(&msg, 0, sizeof(msg));
+ iov.iov_base = &byte;
+ iov.iov_len = 1;
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+
+
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(fd));
+ // Initialize the payload
+ memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ if (sendmsg(sock, &msg, 0) != iov.iov_len)
+ return -1;
+ return 0;
+}
+
+static int uds_msg_cmsg2uds(struct uds_tcp2tcp *msg, struct uds_event *evt)
+{
+ int scmfd = -1;
+ switch (msg->msgtype) {
+ case MSG_SCM_RIGHTS: {
+ struct uds_msg_scmrights *p_scmr = (struct uds_msg_scmrights *)&msg->data;
+ int ret;
+ int scmfd = open(p_scmr->path, p_scmr->flags);
+ if (scmfd < 0) {
+ uds_err("scm rights send fd failed, scmfd:%d path:%s flags:%d", scmfd, p_scmr->path, p_scmr->flags);
+ return -1;
+ }
+ uds_log("scm send fd:%d path:%s flags:%d", scmfd, p_scmr->path, p_scmr->flags);
+ break;
+ }
+ default:
+ uds_err("msg type:%d not support.", msg->msgtype);
+ return -1;
+ }
+ return scmfd;
+}
+
+int uds_msg_tcp2uds_scm_pipe(struct uds_tcp2tcp *p_msg, struct uds_event *evt)
+{
+ int scmfd;
+ int fd[SCM_PIPE_NUM];
+ struct uds_stru_scm_pipe *p_pipe = (struct uds_stru_scm_pipe *)p_msg->data;
+ int len = recv(evt->fd, p_pipe, p_msg->msglen, MSG_WAITALL);
+ if (len <= 0) {
+ uds_err("recv data failed, len:%d", len);
+ return EVENT_DEL;
+ }
+ if (p_pipe->dir != SCM_PIPE_READ && p_pipe->dir != SCM_PIPE_WRITE) {
+ uds_err("scm pipe recv invalid pipe dir:%d, srcfd:%d", p_pipe->dir, p_pipe->srcfd);
+ return EVENT_ERR;
+ }
+ struct uds_conn_arg tcp = {
+ .cs = UDS_SOCKET_CLIENT,
+ };
+ int ret;
+ if ((ret = uds_build_tcp_connection(&tcp)) < 0) {
+ uds_err("build tcp connection failed, return:%d", ret);
+ return EVENT_ERR;
+ }
+ if (pipe(fd) == -1) {
+ uds_err("pipe syscall error, strerr:%s", strerror(errno));
+ return EVENT_ERR;
+ }
+ if (p_pipe->dir == SCM_PIPE_READ) {
+ uds_log("send read pipe:%d to peer:%d", fd[SCM_PIPE_READ], evt->peer->fd);
+ scmfd = fd[SCM_PIPE_READ];
+ // read方向proxy读取消息并转发此代码处是远端所以监听tcp换发给pipe write
+ uds_add_pipe_event(tcp.connfd, fd[SCM_PIPE_WRITE], uds_event_tcp2pipe, NULL);
+ } else {
+ uds_log("send write pipe:%d to peer:%d", fd[SCM_PIPE_WRITE], evt->peer->fd);
+ scmfd = fd[SCM_PIPE_WRITE];
+ // write方向proxy读取远端代理pipe消息并转发此处是远端所以监听pipe read并转发给tcp
+ uds_add_pipe_event(fd[SCM_PIPE_READ], tcp.connfd, uds_event_pipe2tcp, NULL);
+ }
+
+ p_msg->msgtype = MSGCNTL_PIPE;
+ p_msg->msglen = sizeof(struct uds_stru_scm_pipe);
+ len = write(tcp.connfd, p_msg, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_stru_scm_pipe));
+ if (len <= 0) {
+ uds_err("send pipe msg failed, len:%d", len);
+ return EVENT_ERR;
+ }
+ uds_log("success to build pipe fd map, dir:%d srcfd:%d tcpfd:%d readfd:%d writefd:%d",
+ p_pipe->dir, p_pipe->srcfd, tcp.connfd, fd[SCM_PIPE_READ], fd[SCM_PIPE_WRITE]);
+
+ return scmfd;
+}
+
+int uds_event_tcp2pipe(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ memset(p_event_var->iov_base, 0, p_event_var->iov_len);
+ int len = read(evt->fd, p_event_var->iov_base, p_event_var->iov_len);
+ if (len <= 0) {
+ uds_err("read from tcp failed, len:%d str:%s", len, strerror(errno));
+ return EVENT_DEL;
+ }
+
+ uds_log("tcp:%d to pipe:%d len:%d, buf:\n>>>>>>>\n%.*s\n<<<<<<<\n", evt->fd, evt->peerfd, len, len, p_event_var->iov_base);
+ int ret = write(evt->peerfd, p_event_var->iov_base, len);
+ if (ret <= 0) {
+ uds_err("write to pipe failed, fd:%d str:%s", evt->peerfd, strerror(errno));
+ return EVENT_DEL;
+ }
+ return EVENT_OK;
+}
+
+int uds_event_pipe2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ memset(p_event_var->iov_base, 0, p_event_var->iov_len);
+ int len = read(evt->fd, p_event_var->iov_base, p_event_var->iov_len);
+ if (len <= 0) {
+ uds_err("read from pipe failed, len:%d str:%s", len, strerror(errno));
+ return EVENT_DEL;
+ }
+
+ uds_log("pipe:%d to tcp:%d len:%d, buf:\n>>>>>>>\n%.*s\n<<<<<<<\n", evt->fd, evt->peerfd, len, len, p_event_var->iov_base);
+ int ret = write(evt->peerfd, p_event_var->iov_base, len);
+ if (ret <= 0) {
+ uds_err("write to tcp failed, fd:%d str:%s", evt->peerfd, strerror(errno));
+ return EVENT_DEL;
+ }
+ return EVENT_OK;
+
+}
+
+int uds_msg_tcp_end_msg(int sock)
+{
+ struct uds_tcp2tcp end = {.msgtype = MSG_END, .msglen = 0,};
+ int ret = write(sock, &end, sizeof(struct uds_tcp2tcp));
+ if (ret <= 0) {
+ uds_err("write end msg failed, ret:%d fd:%d", ret, sock);
+ return EVENT_DEL;
+ }
+ return EVENT_OK;
+}
+
+void uds_msg_init_event_buf(struct uds_event_global_var *p)
+{
+ memset(p->iov_base, 0, p->iov_len);
+ memset(p->iov_base_send, 0, p->iov_sendlen);
+ memset(p->msg_control, 0, p->msg_controllen);
+ memset(p->msg_control_send, 0, p->msg_controlsendlen);
+ memset(p->buf, 0, p->buflen);
+ return;
+}
+
+#define TEST_BUFLEN 256
+int uds_event_uds2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ struct uds_event *evt = (struct uds_event *)arg;
+ struct iovec iov;
+ struct msghdr msg;
+ struct cmsghdr *cmsg;
+ int cmsgcnt = 0;
+ int len;
+
+ memset(&msg, 0, sizeof(msg));
+ iov.iov_base = p_event_var->iov_base + sizeof(struct uds_tcp2tcp);
+ iov.iov_len = p_event_var->iov_len - sizeof(struct uds_tcp2tcp);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+
+ msg.msg_control = p_event_var->msg_control;
+ msg.msg_controllen = p_event_var->msg_controllen;
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_len = p_event_var->msg_controllen;
+
+ len = recvmsg(evt->fd, &msg, 0);
+ if (len == 0) {
+ uds_err("recvmsg error, return:%d", len);
+ uds_event_add_to_free(p_event_var, evt);
+ return EVENT_DEL;
+ }
+ if (len < 0) {
+ uds_err("recvmsg error return val:%d", len);
+ return EVENT_ERR;
+ }
+ cmsg = CMSG_FIRSTHDR(&msg);
+ if (cmsg != NULL) {
+ uds_log("recvmsg cmsg len:%d cmsglen:%d iovlen:%d iov:%s cmsglevel:%d cmsgtype:%d",
+ len, cmsg->cmsg_len, iov.iov_len, iov.iov_base, cmsg->cmsg_level, cmsg->cmsg_type);
+ cmsgcnt = uds_msg_cmsg2tcp(&msg, evt, p_event_var);
+ if (len - cmsgcnt == 0)
+ goto endmsg;
+ }
+
+ struct uds_tcp2tcp *p_msg = (struct uds_tcp2tcp *)p_event_var->iov_base;
+ p_msg->msgtype = MSG_NORMAL;
+ p_msg->msglen = len;
+ int ret = write(evt->peer->fd, (void *)p_msg, p_msg->msglen + sizeof(struct uds_tcp2tcp));
+ if (ret <= 0) {
+ uds_err("write to peer:%d failed, retcode:%d len:%d", evt->peer->fd, ret, len);
+ return EVENT_ERR;
+ }
+
+ uds_log("write iov msg to tcp success, msgtype:%d ret:%d iovlen:%d recvlen:%d udsheadlen:%d msglen:%d msg:\n>>>>>>>\n%.*s\n<<<<<<<\n",
+ p_msg->msgtype, ret, iov.iov_len, len, sizeof(struct uds_tcp2tcp), p_msg->msglen, p_msg->msglen, p_msg->data);
+endmsg:
+ return uds_msg_tcp_end_msg(evt->peer->fd);
+}
+
+int uds_event_tcp2uds(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+#define MAX_FDS 64
+ int fds[MAX_FDS] = {0};
+ int fdnum = 0;
+ struct uds_event *evt = (struct uds_event *)arg;
+ struct uds_tcp2tcp *p_msg = (struct uds_tcp2tcp *)p_event_var->iov_base;
+ int ret;
+ int normal_msg_len = 0;
+ struct msghdr msg;
+ struct cmsghdr *cmsg = NULL;
+ struct iovec iov;
+ int msg_controllen = 0;
+
+ memset(&msg, 0, sizeof(msg));
+ iov.iov_base = p_event_var->iov_base_send;
+ iov.iov_len = 0;
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_control = p_event_var->msg_control_send;
+ msg.msg_controllen = p_event_var->msg_controlsendlen;
+
+ while (1) {
+ int len = recv(evt->fd, p_msg, sizeof(struct uds_tcp2tcp), MSG_WAITALL);
+ if (len <= 0) {
+ uds_err("recv no msg maybe sock is closed, delete this tcp2uds event, len:%d.", len);
+ goto close_event;
+ }
+ uds_log("pmsg:%lx type:%d len:%d iov_base:%lx len:%d", p_msg, p_msg->msgtype, p_msg->msglen, p_event_var->iov_base, len);
+ if (p_msg->msgtype == MSG_END) {
+ break;
+ }
+ if (p_msg->msglen > p_event_var->iov_len - sizeof(struct uds_tcp2tcp) || p_msg->msglen <= 0) {
+ uds_err("pmsg len:%d is invalid, fd:%d peerfd:%d", p_msg->msglen, evt->fd, evt->peer->fd);
+ continue;
+ }
+ switch(p_msg->msgtype) {
+ case MSG_NORMAL:
+ if (normal_msg_len != 0) {
+ uds_err("normal msg repeat recv fd:%d", evt->fd);
+ goto err;
+ }
+ normal_msg_len = recv(evt->fd, p_event_var->iov_base_send, p_msg->msglen, MSG_WAITALL);
+ if (normal_msg_len <= 0) {
+ uds_err("recv msg error:%d fd:%d", len, evt->fd);
+ goto close_event;
+ }
+ iov.iov_len = normal_msg_len;
+ uds_log("recv normal msg len:%d str: \n>>>>>>>\n%.*s\n<<<<<<<", iov.iov_len, iov.iov_len, iov.iov_base);
+ break;
+ case MSG_SCM_RIGHTS: {
+ int len;
+ int scmfd;
+ struct uds_msg_scmrights *p_scm = (struct uds_msg_scmrights *) p_msg->data;
+ memset(p_scm->path, 0, sizeof(p_scm->path));
+ // SCM RIGHTS msg proc
+ len = recv(evt->fd, p_msg->data, p_msg->msglen, MSG_WAITALL);
+ if (len <= 0) {
+ uds_err("recv data failed len:%d", p_msg->msglen);
+ return EVENT_DEL;
+ }
+ scmfd = uds_msg_cmsg2uds(p_msg, evt);
+ if (scmfd == -1) {
+ goto err;
+ }
+ fds[fdnum++] = scmfd;
+ uds_msg_scmfd_combine_msg(&msg, &cmsg, &msg_controllen, scmfd);
+ break;
+ }
+ case MSG_SCM_PIPE: {
+ int scmfd;
+ scmfd = uds_msg_tcp2uds_scm_pipe(p_msg, evt);
+ if (scmfd == EVENT_DEL)
+ goto close_event;
+ if (scmfd < 0)
+ goto err;
+ fds[fdnum++] = scmfd;
+ uds_msg_scmfd_combine_msg(&msg, &cmsg, &msg_controllen, scmfd);
+ break;
+ }
+ default:
+ uds_err("recv unsupport msg type:%d event fd:%d", p_msg->msgtype, evt->fd);
+ break;
+ }
+ }
+ if (msg_controllen == 0 && iov.iov_len == 0)
+ goto err;
+ msg.msg_controllen = msg_controllen;
+ if (iov.iov_len == 0) iov.iov_len = 1;
+ ret = sendmsg(evt->peer->fd, &msg, 0);
+ uds_log("evt:%d sendmsg len:%d, controllen:%d errno:%s", evt->fd, ret, msg_controllen, strerror(errno));
+ for (int i = 0; i < fdnum; i++) {
+ close(fds[i]);
+ }
+ return EVENT_OK;
+err:
+ return EVENT_ERR;
+
+close_event:
+ uds_event_add_to_free(p_event_var, evt);
+ return EVENT_DEL;
+}
+
+int uds_diag_is_epoll_fd(int fd)
+{
+ for (int i = 0; i < p_uds_var->work_thread_num; i++) {
+ if (fd == p_uds_var->efd[i])
+ return 1;
+ }
+ return 0;
+}
+
+void uds_diag_list_fd(char *buf, int len)
+{
+#define FDPATH_LEN 32
+ int pos = 0;
+ char path[32] = {0};
+ DIR *dir = NULL;
+ struct dirent *entry;
+ dir = opendir("/proc/self/fd/");
+ if (dir == NULL) {
+ uds_err("open path:/proc/self/fd/ failed");
+ return;
+ }
+ while (entry = readdir(dir)) {
+ int fd = atoi(entry->d_name);
+ char fdpath[FDPATH_LEN];
+ char link[FDPATH_LEN];
+ int ret;
+ if (fd <= 2 || uds_diag_is_epoll_fd(fd))
+ continue;
+ memset(fdpath, 0, FDPATH_LEN);
+ memset(link, 0, FDPATH_LEN);
+ sprintf(fdpath, "/proc/self/fd/%d", fd);
+ ret = readlink(fdpath, link, FDPATH_LEN);
+ pos += sprintf(&buf[pos], "+ fd:%s type:%u link:%s\n", entry->d_name, entry->d_type, link);
+ }
+ closedir(dir);
+ return;
+}
+
+int uds_diag_string(char *buf, int len)
+{
+ int pos = 0;
+ memset(buf, 0, len);
+ pos = sprintf(buf, "+-----------------------------Unix Proxy Diagnostic information-------------------------+\n");
+ pos += sprintf(&buf[pos], "+ Thread nums:%d\n", p_uds_var->work_thread_num);
+ for (int i = 0; i < p_uds_var->work_thread_num; i++) {
+ pos += sprintf(&buf[pos], "+ Thread %d events count:%d\n", i+1, p_uds_var->work_thread[i].info.events);
+ }
+ pos += sprintf(&buf[pos], "+ Log level:%s\n", p_uds_var->logstr[p_uds_var->loglevel]);
+ strcat(buf, "+---------------------------------------------------------------------------------------+\n");
+ return strlen(buf);
+}
+
+// DIAG INFO
+int uds_event_diag_info(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ int connfd;
+ int len;
+ int ret;
+ struct uds_event *evt = (struct uds_event *)arg;
+ if (evt == NULL) {
+ uds_err("param is invalid.");
+ return EVENT_ERR;
+ }
+ connfd = uds_sock_step_accept(evt->fd, AF_UNIX);
+ if (connfd <= 0) {
+ uds_err("conn fd error:%d", connfd);
+ return EVENT_ERR;
+ }
+
+ uds_log("diag accept an new connection to send diag info, fd:%d", connfd);
+ len = uds_diag_string(p_event_var->iov_base, p_event_var->iov_len);
+ ret = send(connfd, p_event_var->iov_base, len, 0);
+ if (ret <= 0) {
+ uds_err("send diag info error, ret:%d len:%d", ret, len);
+ }
+ close(connfd);
+ return EVENT_OK;
+}
+
+#define UDS_LOG_STR(level) (level < 0 || level >= UDS_LOG_MAX) ? p_uds_var->logstr[UDS_LOG_MAX] : p_uds_var->logstr[level]
+int uds_event_debug_level(void *arg, int epfd, struct uds_event_global_var *p_event_var)
+{
+ int connfd;
+ int len;
+ int ret;
+ int cur;
+ struct uds_event *evt = (struct uds_event *)arg;
+ if (evt == NULL) {
+ uds_err("param is invalid.");
+ return EVENT_ERR;
+ }
+ connfd = uds_sock_step_accept(evt->fd, AF_UNIX);
+ if (connfd <= 0) {
+ uds_err("conn fd error:%d", connfd);
+ return EVENT_ERR;
+ }
+
+ cur = p_uds_var->loglevel;
+ if (cur + 1 < UDS_LOG_MAX) {
+ p_uds_var->loglevel += 1;
+ } else {
+ p_uds_var->loglevel = UDS_LOG_NONE;
+ }
+
+ uds_log("debug level accept a new connection, current level:%s change to:%s", UDS_LOG_STR(cur), UDS_LOG_STR(p_uds_var->loglevel));
+
+ len = sprintf(p_event_var->iov_base, "+---------------UDS LOG LEVEL UPDATE--------------+\n"
+ "+ Log level is:%s before, now change to :%s.\n"
+ "+-------------------------------------------------+\n", UDS_LOG_STR(cur), UDS_LOG_STR(p_uds_var->loglevel));
+
+ ret = send(connfd, p_event_var->iov_base, len, 0);
+ if (ret <= 0) {
+ uds_err("send debug level info error, ret:%d len:%d", ret, len);
+ }
+ close(connfd);
+ return EVENT_OK;
+}
diff --git a/qtfs/ipc/uds_event.h b/qtfs/ipc/uds_event.h
new file mode 100644
index 0000000..a52bf67
--- /dev/null
+++ b/qtfs/ipc/uds_event.h
@@ -0,0 +1,64 @@
+#ifndef __QTFS_UDS_EVENT_H__
+#define __QTFS_UDS_EVENT_H__
+
+#define UDS_EVENT_BUFLEN 4096
+#define UDS_PATH_MAX 1024
+
+enum EVENT_RETCODE {
+ EVENT_OK = 0,
+ EVENT_ERR = -1,
+ EVENT_DEL = -2, // del this event after return
+};
+
+enum TCP2TCP_TYPE {
+ MSG_NORMAL = 0xa5a5, // 消息类型从特殊数字开始,防止误识别消息
+ MSG_SCM_RIGHTS,
+ MSG_SCM_CREDENTIALS, // unix domain 扩展消息,预留
+ MSG_SCM_SECURITY, // unix domain 扩展消息,预留
+ MSG_GET_TARGET, // 控制消息用于获取对端的target fd
+ MSG_SCM_PIPE, // 使用SCM传递了一个pipe
+ MSG_END, // tcp消息的结束体
+};
+
+enum TCPCNTL_TYPE {
+ MSGCNTL_UDS = 1, // uds代理模式
+ MSGCNTL_PIPE, // pipe匿名管道代理模式
+};
+
+// 因为要区分SCM_RIGHTS和普通消息TCP到TCP需要有一个协议头
+struct uds_tcp2tcp {
+ int msgtype;
+ int msglen; // len of data
+ char data[0];
+};
+
+struct uds_msg_scmrights {
+ int flags; // open flags
+ char path[UDS_PATH_MAX];
+};
+
+enum {
+ SCM_PIPE_READ = 0,
+ SCM_PIPE_WRITE,
+ SCM_PIPE_NUM,
+};
+
+struct uds_stru_scm_pipe {
+ int dir; // 0: send read filedes; 1: send write filedes
+ // proxy通过scm rights接收到员pipe fd后面消息回来时事件
+ // 会发生变化,所以需要回消息时带上,才能建立关联
+ int srcfd;
+};
+
+int uds_event_uds_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_tcp_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_diag_info(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_debug_level(void *arg, int epfd, struct uds_event_global_var *p_event_var);
+int uds_event_pre_handler(struct uds_event *evt);
+int uds_event_pre_hook(struct uds_event_global_var *p_event_var);
+int uds_event_post_hook(struct uds_event_global_var *p_event_var);
+int uds_event_module_init(struct uds_event_global_var *p_event_var);
+void uds_event_module_fini(struct uds_event_global_var *p);
+
+#endif
+
diff --git a/qtfs/ipc/uds_main.c b/qtfs/ipc/uds_main.c
new file mode 100644
index 0000000..b479a60
--- /dev/null
+++ b/qtfs/ipc/uds_main.c
@@ -0,0 +1,556 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/epoll.h>
+#include <netinet/ip.h>
+#include <netinet/in.h>
+#include <sys/un.h>
+#include <netinet/udp.h>
+#include <netinet/tcp.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <sys/ioctl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+
+#include "../comm.h"
+#include "uds_main.h"
+#include "uds_event.h"
+
+struct uds_global_var g_uds_var = {.logstr = {"NONE", "ERROR", "INFO", "UNKNOWN"}};
+struct uds_global_var *p_uds_var = &g_uds_var;
+struct uds_event_global_var *g_event_var = NULL;
+
+struct uds_event *uds_alloc_event()
+{
+ struct uds_event *p = (struct uds_event *)malloc(sizeof(struct uds_event));
+ if (p == NULL) {
+ uds_err("malloc failed.");
+ return NULL;
+ }
+ memset(p, 0, sizeof(struct uds_event));
+ return p;
+}
+
+int uds_event_insert(int efd, struct uds_event *event)
+{
+ struct epoll_event evt;
+ evt.data.ptr = (void *)event;
+ evt.events = EPOLLIN;
+ if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, event->fd, &evt)) {
+ uds_err("epoll ctl add fd:%d event failed.", event->fd);
+ return -1;
+ }
+ return 0;
+}
+
+int uds_event_suspend(int efd, struct uds_event *event)
+{
+ int ret = epoll_ctl(efd, EPOLL_CTL_DEL, event->fd, NULL);
+ if (ret != 0) {
+ uds_err("failed to suspend fd:%d.", event->fd);
+ return -1;
+ }
+ return 0;
+}
+
+int uds_event_delete(int efd, int fd)
+{
+ int ret = epoll_ctl(efd, EPOLL_CTL_DEL, fd, NULL);
+ if (ret != 0) {
+ uds_err("failed to delete event fd:%d.", fd);
+ } else {
+ uds_log("event fd:%d deleted.", fd);
+ }
+ close(fd);
+ return ret;
+}
+
+void uds_main_loop(int efd, struct uds_thread_arg *arg)
+{
+ int n = 0;
+ int ret;
+ struct uds_event *udsevt;
+ struct epoll_event *evts = NULL;
+ struct uds_event_global_var *p_event_var = arg->p_event_var;
+ if (p_event_var == NULL) {
+ uds_err("event variable invalid.");
+ return;
+ }
+
+ evts = calloc(UDS_EPOLL_MAX_EVENTS, sizeof(struct epoll_event));
+ if (evts == NULL) {
+ uds_err("init calloc evts failed.");
+ return;
+ }
+ if (uds_event_module_init(p_event_var) == EVENT_ERR) {
+ uds_err("uds event module init failed, main loop not run.");
+ return;
+ }
+#ifdef QTFS_SERVER
+ extern int engine_run;
+ while (engine_run) {
+#else
+ while (1) {
+#endif
+ n = epoll_wait(efd, evts, UDS_EPOLL_MAX_EVENTS, 1000);
+ if (n == 0)
+ continue;
+ if (n < 0) {
+ uds_err("epoll wait return errcode:%d", n);
+ continue;
+ }
+ arg->info.events += n;
+ uds_event_pre_hook(p_event_var);
+ for (int i = 0; i < n; i++) {
+ udsevt = (struct uds_event *)evts[i].data.ptr;
+ uds_log("event fd:%d events:%d tofree:%d", udsevt->fd, evts[i].events, udsevt->tofree);
+ if (udsevt->handler == NULL) {
+ uds_err("bad event, fd:%d handler is NULL.", udsevt->fd);
+ continue;
+ }
+ // 预检查失败择不执行handler
+ if (uds_event_pre_handler(udsevt) == EVENT_ERR) {
+ continue;
+ }
+ ret = udsevt->handler(udsevt, efd, p_event_var);
+ // 此处释放当前事件peer事件需要handler里面释放
+ if (ret == EVENT_DEL) {
+ uds_del_event(udsevt);
+ }
+ }
+ uds_event_post_hook(p_event_var);
+ }
+ uds_log("main loop exit.");
+ uds_event_module_fini(p_event_var);
+ return;
+}
+
+int uds_build_tcp_connection(struct uds_conn_arg *arg)
+{
+ const int sock_max_conn_num = 1024;
+
+ if (arg->cs > UDS_SOCKET_SERVER) {
+ uds_err("cs type %d is error.", arg->cs);
+ return -1;
+ }
+ struct sockaddr_in sock_addr = {
+ .sin_family = AF_INET,
+ };
+ int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+ if (sock_fd < 0) {
+ uds_err("As %s failed, socket fd: %d, err:%s.",
+ (arg->cs == UDS_SOCKET_CLIENT) ? "client" : "server",
+ sock_fd, strerror(errno));
+ return -1;
+ }
+ arg->sockfd = sock_fd;
+
+ if (arg->cs == UDS_SOCKET_SERVER) {
+ sock_addr.sin_port = htons(p_uds_var->tcp.port);
+ sock_addr.sin_addr.s_addr = inet_addr(p_uds_var->tcp.addr);
+ if (bind(sock_fd, (struct sockaddr *)&sock_addr, sizeof(sock_addr)) < 0) {
+ uds_err("As server failed, bind error, err:%s.",
+ strerror(errno));
+ goto close_and_return;
+ }
+ if (listen(sock_fd, sock_max_conn_num) < 0) {
+ uds_err("As server listen failed, err:%s.", strerror(errno));
+ goto close_and_return;
+ }
+ } else {
+ sock_addr.sin_port = htons(p_uds_var->tcp.peerport);
+ sock_addr.sin_addr.s_addr = inet_addr(p_uds_var->tcp.peeraddr);
+ if (connect(arg->sockfd, (struct sockaddr *)&sock_addr, sizeof(struct sockaddr_in)) < 0) {
+ goto close_and_return;
+ }
+ arg->connfd = sock_fd;
+ uds_log("Connect to server successed, ip:%s port:%u", p_uds_var->tcp.peeraddr, p_uds_var->tcp.peerport);
+ }
+
+ return 0;
+close_and_return:
+ close(sock_fd);
+ return -1;
+}
+
+int uds_build_unix_connection(struct uds_conn_arg *arg)
+{
+ const int sock_max_conn_num = 5;
+ if (arg->cs > UDS_SOCKET_SERVER) {
+ uds_err("cs type %d is error.", arg->cs);
+ return -1;
+ }
+ struct sockaddr_un sock_addr = {
+ .sun_family = AF_UNIX,
+ };
+ int sock_fd = socket(AF_UNIX, arg->udstype, 0);
+
+ if (sock_fd < 0) {
+ uds_err("As %s failed, socket fd: %d, err:%s.",
+ (arg->cs == UDS_SOCKET_CLIENT) ? "client" : "server",
+ sock_fd, strerror(errno));
+ return -1;
+ }
+ strncpy(sock_addr.sun_path, arg->sun_path, sizeof(sock_addr.sun_path));
+ arg->sockfd = sock_fd;
+
+ if (arg->cs == UDS_SOCKET_SERVER) {
+ unlink(sock_addr.sun_path);
+ if (bind(sock_fd, (struct sockaddr *)&sock_addr, sizeof(sock_addr)) < 0) {
+ uds_err("As server failed, bind error, err:%s.",
+ strerror(errno));
+ goto close_and_return;
+ }
+ if (listen(sock_fd, sock_max_conn_num) < 0) {
+ uds_err("As server listen failed, err:%s.", strerror(errno));
+ goto close_and_return;
+ }
+ } else {
+ if (connect(arg->sockfd, (struct sockaddr *)&sock_addr, sizeof(struct sockaddr_un)) < 0) {
+ goto close_and_return;
+ }
+ arg->connfd = sock_fd;
+ uds_log("Connect to server successed, sun path:%s", arg->sun_path);
+ }
+
+ return 0;
+close_and_return:
+ uds_log("close sockfd:%d and return", sock_fd);
+ close(sock_fd);
+ return -1;
+
+}
+
+int uds_sock_step_accept(int sock_fd, int family)
+{
+ struct sockaddr_in in_addr;
+ struct sockaddr_un un_addr;
+ socklen_t len = (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_un);
+ int connfd;
+ if (family == AF_INET) {
+ connfd = accept(sock_fd, (struct sockaddr *)&in_addr, &len);
+ } else {
+ connfd = accept(sock_fd, (struct sockaddr *)&un_addr, &len);
+ }
+ if (connfd < 0) {
+ uds_err("Accept error:%d, err:%s.", connfd, strerror(errno));
+ return connfd;
+ }
+ if (family == AF_INET) {
+ uds_log("Accept success, ip:%s, port:%u",
+ inet_ntoa(in_addr.sin_addr),
+ ntohs(in_addr.sin_port));
+ } else {
+ uds_log("Accept success, sun path:%s", un_addr.sun_path);
+ }
+ return connfd;
+}
+
+struct uds_event *uds_add_event(int fd, struct uds_event *peer, int (*handler)(void *, int, struct uds_event_global_var *), void *priv)
+{
+ struct uds_event *newevt = uds_alloc_event();
+ int hash = fd % p_uds_var->work_thread_num;
+ if (newevt == NULL || p_uds_var->efd[hash] <= 0) {
+ uds_err("alloc event failed, efd:%d hash:%d", p_uds_var->efd[hash], hash);
+ return NULL;
+ }
+
+ newevt->fd = fd;
+ newevt->peer = peer; // 如果tcp回应消息转回uds这个fd
+ newevt->handler = handler;
+ newevt->priv = priv;
+ newevt->tofree = 0;
+ uds_event_insert(p_uds_var->efd[hash], newevt);
+ return newevt;
+}
+
+struct uds_event *uds_add_pipe_event(int fd, int peerfd, int (*handler)(void *, int, struct uds_event_global_var *), void *priv)
+{
+ int hash = fd % p_uds_var->work_thread_num;
+ struct uds_event *newevt = uds_alloc_event();
+ if (newevt == NULL || p_uds_var->efd[hash] <= 0) {
+ uds_err("alloc event failed, efd:%d", p_uds_var->efd[hash]);
+ return NULL;
+ }
+
+ newevt->fd = fd;
+ newevt->peerfd = peerfd; // 如果tcp回应消息转回uds这个fd
+ newevt->handler = handler;
+ newevt->priv = priv;
+ newevt->tofree = 0;
+ newevt->pipe = 1;
+ uds_event_insert(p_uds_var->efd[hash], newevt);
+ return newevt;
+}
+
+void uds_del_event(struct uds_event *evt)
+{
+ int hash = evt->fd % p_uds_var->work_thread_num;
+ if (evt->pipe == 1 &&evt->peerfd != -1) {
+ // pipe是单向peerfd没有epoll事件所以直接关闭
+ close(evt->peerfd);
+ evt->peerfd = -1;
+ }
+ uds_event_delete(p_uds_var->efd[hash], evt->fd);
+ free(evt);
+ return;
+}
+
+void uds_thread_diag_init(struct uds_thread_info *info)
+{
+ info->events = 0;
+ info->fdnum = 0;
+}
+
+void *uds_proxy_thread(void *arg)
+{
+ struct uds_thread_arg *parg = (struct uds_thread_arg *)arg;
+ uds_thread_diag_init(&parg->info);
+ uds_main_loop(parg->efd, parg);
+ return NULL;
+}
+
+struct uds_event *uds_init_unix_listener(const char *addr, int (*handler)(void *, int, struct uds_event_global_var *))
+{
+ struct uds_event *udsevt;
+ struct uds_conn_arg arg;
+ struct uds_conn_arg *parg = &arg;
+
+ parg->cs = UDS_SOCKET_SERVER;
+ strncpy(parg->sun_path, addr, sizeof(parg->sun_path));
+ parg->udstype = SOCK_STREAM;
+ if (uds_build_unix_connection(parg) != 0)
+ return NULL;
+ udsevt = uds_add_event(parg->sockfd, NULL, handler, NULL);
+ if (udsevt == NULL) {
+ uds_err("add unix listener event failed.");
+ return NULL;
+ }
+ return udsevt;
+}
+
+struct uds_event *uds_init_tcp_listener()
+{
+ struct uds_event *tcpevt;
+ struct uds_conn_arg arg;
+ struct uds_conn_arg *parg = &arg;
+ parg->cs = UDS_SOCKET_SERVER;
+ if (uds_build_tcp_connection(parg) != 0)
+ return NULL;
+
+ tcpevt = uds_add_event(parg->sockfd, NULL, uds_event_tcp_listener, NULL);
+ if (tcpevt == NULL)
+ return NULL;
+ return tcpevt;
+}
+
+void uds_thread_create()
+{
+ struct uds_conn_arg arg;
+ struct uds_conn_arg *parg = &arg;
+ struct uds_event *udsevt;
+ struct uds_event *tcpevt;
+ struct uds_event *diagevt;
+ struct uds_event *logevt;
+ int efd;
+
+ for (int i = 0; i < p_uds_var->work_thread_num; i++) {
+ efd = epoll_create1(0);
+ if (efd == -1) {
+ uds_err("epoll create1 failed, i:%d.", i);
+ return;
+ }
+ p_uds_var->efd[i] = efd;
+ }
+
+ if ((udsevt = uds_init_unix_listener(UDS_BUILD_CONN_ADDR, uds_event_uds_listener)) == NULL)
+ return;
+
+ if ((tcpevt = uds_init_tcp_listener()) == NULL)
+ goto end;
+
+ if ((diagevt = uds_init_unix_listener(UDS_DIAG_ADDR, uds_event_diag_info)) == NULL)
+ goto end1;
+
+ if ((logevt = uds_init_unix_listener(UDS_LOGLEVEL_UPD, uds_event_debug_level)) == NULL)
+ goto end2;
+
+ do {
+ pthread_t *thrd = (pthread_t *)malloc(sizeof(pthread_t) * p_uds_var->work_thread_num);
+ struct uds_thread_arg *work_thread;
+ if (thrd == NULL) {
+ uds_err("thread info malloc failed.");
+ break;
+ }
+ work_thread = (struct uds_thread_arg *)malloc(sizeof(struct uds_thread_arg *) * p_uds_var->work_thread_num);
+ if (work_thread == NULL) {
+ uds_err("thread arg malloc failed.");
+ free(thrd);
+ break;
+ }
+
+ for (int i = 0; i < p_uds_var->work_thread_num; i++) {
+ p_uds_var->work_thread[i].p_event_var = &g_event_var[i];
+ p_uds_var->work_thread[i].efd = p_uds_var->efd[i];
+ (void)pthread_create(&thrd[i], NULL, uds_proxy_thread, &p_uds_var->work_thread[i]);
+ }
+ p_uds_var->loglevel = UDS_LOG_NONE;
+ for (int i = 0; i < p_uds_var->work_thread_num; i++)
+ pthread_join(thrd[i], NULL);
+ free(thrd);
+ free(work_thread);
+ } while(0);
+end2:
+ uds_del_event(diagevt);
+end1:
+ uds_del_event(tcpevt);
+end:
+ uds_del_event(udsevt);
+ for (int i = 0; i < p_uds_var->work_thread_num; i++)
+ close(p_uds_var->efd[i]);
+
+ return;
+}
+
+int uds_set_pid()
+{
+ int fd = -1;
+ if (access(QTFS_CLIENT_DEV, 0) == 0) {
+ fd = open(QTFS_CLIENT_DEV, O_RDONLY | O_NONBLOCK);
+ if (fd < 0)
+ goto open_failed;
+ goto set;
+ }
+ if (access(QTFS_SERVER_DEV, 0) == 0) {
+ fd = open(QTFS_SERVER_DEV, O_RDONLY | O_NONBLOCK);
+ if (fd < 0)
+ goto open_failed;
+ goto set;
+ }
+ uds_err("qtfs dev(<%s> or <%s>) both not exist", QTFS_CLIENT_DEV, QTFS_SERVER_DEV);
+ return EVENT_ERR;
+
+open_failed:
+ uds_err("open %s failed, ret:%d", QTFS_CLIENT_DEV, fd);
+ return EVENT_ERR;
+
+set:
+ do {
+ int pid = getpid();
+ int ret = ioctl(fd, QTFS_IOCTL_UDS_PROXY_PID, &pid);
+ if (ret < 0) {
+ uds_err("ioctl failed to set pid:%d ret:%d", pid, ret);
+ return EVENT_ERR;
+ }
+ uds_log("set proxy pid:%d to qtfs successed.", pid);
+ } while (0);
+ close(fd);
+ return EVENT_OK;
+}
+
+int uds_env_prepare()
+{
+ DIR *dir;
+ if (access(UDS_BUILD_CONN_ADDR, 0) == 0)
+ return EVENT_OK;
+
+ if ((dir = opendir(UDS_BUILD_CONN_DIR)) == NULL) {
+ if (mkdir(UDS_BUILD_CONN_DIR, 0755) < 0) {
+ uds_err("mkdir %s failed.", UDS_BUILD_CONN_DIR);
+ }
+ } else {
+ closedir(dir);
+ }
+ int fd = open(UDS_BUILD_CONN_ADDR, O_RDONLY|O_CREAT, 0700);
+ if (fd < 0) {
+ uds_err("create file:%s failed.", UDS_BUILD_CONN_ADDR);
+ return EVENT_ERR;
+ }
+ uds_log("success to create %s.", UDS_BUILD_CONN_ADDR);
+ close(fd);
+ return EVENT_OK;
+}
+
+static void uds_sig_pipe(int signum)
+{
+ uds_log("uds proxy recv sigpipe and ignore");
+}
+
+void uds_helpinfo(char *argv[])
+{
+ uds_err("Usage:");
+ uds_err(" %s <addr> <port> <peeraddr> <peerport>.", argv[0]);
+ uds_err("Param:");
+ uds_err(" <addr> - server ip address");
+ uds_err(" <port> - port number");
+ uds_err(" <peeraddr> - peer address");
+ uds_err(" <peerport> - peer port");
+ return;
+}
+
+/*
+ * uds跨主机协同主程序设计成镜像的每一端2个线程send thread、recv thread
+ * 在server侧线程由原engine拉起在client侧新起一个engine进程
+ */
+#ifdef QTFS_SERVER
+int uds_proxy_main(int argc, char *argv[])
+#else
+int main(int argc, char *argv[])
+#endif
+{
+ p_uds_var->loglevel = UDS_LOG_INFO;
+#define ARG_NUM 6
+ if (argc != ARG_NUM) {
+ uds_helpinfo(argv);
+ return -1;
+ }
+ if (uds_set_pid() != EVENT_OK) {
+ uds_err("proxy failed to set pid.");
+ return -1;
+ }
+ if (uds_env_prepare() != EVENT_OK) {
+ uds_err("proxy prepare environment failed.");
+ return -1;
+ }
+ signal(SIGPIPE, uds_sig_pipe);
+ p_uds_var->work_thread_num = atoi(argv[1]);
+ if (p_uds_var->work_thread_num <= 0 || p_uds_var->work_thread_num > UDS_WORK_THREAD_MAX) {
+ uds_err("work thread num:%d is too large.(must small or equal than %d)", p_uds_var->work_thread_num, UDS_WORK_THREAD_MAX);
+ return -1;
+ }
+ p_uds_var->efd = (int *)malloc(sizeof(int) * p_uds_var->work_thread_num);
+ if (p_uds_var->efd == NULL) {
+ uds_err("efd malloc failed, num:%d", p_uds_var->work_thread_num);
+ return -1;
+ }
+
+ p_uds_var->work_thread = (struct uds_thread_arg *)malloc(sizeof(struct uds_thread_arg) * p_uds_var->work_thread_num);
+ if (p_uds_var->work_thread == NULL) {
+ uds_err("work thread var malloc failed.");
+ return -1;
+ }
+ p_uds_var->tcp.port = atoi(argv[3]);
+ strncpy(p_uds_var->tcp.addr, argv[2], 20);
+ p_uds_var->tcp.peerport = atoi(argv[5]);
+ strncpy(p_uds_var->tcp.peeraddr, argv[4], 20);
+
+ uds_log("uds proxy param thread num:%d ip:%s port:%u peerip:%s port:%u",
+ p_uds_var->work_thread_num, p_uds_var->tcp.addr, p_uds_var->tcp.port,
+ p_uds_var->tcp.peeraddr, p_uds_var->tcp.peerport);
+ g_event_var = (struct uds_event_global_var *)malloc(sizeof(struct uds_event_global_var) * p_uds_var->work_thread_num);
+ if (g_event_var == NULL) {
+ uds_err("event variable malloc failed");
+ return -1;
+ }
+ uds_thread_create();
+
+ return 0;
+}
diff --git a/qtfs/ipc/uds_main.h b/qtfs/ipc/uds_main.h
new file mode 100644
index 0000000..793cd2f
--- /dev/null
+++ b/qtfs/ipc/uds_main.h
@@ -0,0 +1,141 @@
+#ifndef __QTFS_UDS_MAIN_H__
+#define __QTFS_UDS_MAIN_H__
+
+#include <time.h>
+
+#include "uds_module.h"
+
+#define UDS_EPOLL_MAX_EVENTS 64
+#define UDS_WORK_THREAD_MAX 64
+
+extern struct uds_global_var *p_uds_var;
+
+enum {
+ UDS_LOG_NONE,
+ UDS_LOG_ERROR,
+ UDS_LOG_INFO,
+ UDS_LOG_MAX,
+};
+
+#define uds_log(info, ...) \
+ if (p_uds_var->loglevel >= UDS_LOG_INFO) {\
+ time_t t; \
+ struct tm *p; \
+ time(&t); \
+ p = localtime(&t); \
+ printf("[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \
+ p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \
+ p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \
+ }
+
+#define uds_log2(info, ...) \
+ if (p_uds_var->loglevel >= UDS_LOG_INFO) {\
+ time_t t; \
+ struct tm *p; \
+ time(&t); \
+ p = localtime(&t); \
+ printf("[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \
+ p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \
+ p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \
+ }
+
+#define uds_err(info, ...) \
+ if (p_uds_var->loglevel >= UDS_LOG_ERROR) {\
+ time_t t; \
+ struct tm *p; \
+ time(&t); \
+ p = localtime(&t); \
+ printf("[%d/%02d/%02d %02d:%02d:%02d][ERROR:%s:%3d]"info"\n", \
+ p->tm_year + 1900, p->tm_mon+1, p->tm_mday, \
+ p->tm_hour, p->tm_min, p->tm_sec, __func__, __LINE__, ##__VA_ARGS__); \
+ }
+
+enum {
+ UDS_THREAD_EPWAIT = 1, // epoll wait status
+};
+struct uds_thread_info {
+ int fdnum;
+
+ int events;
+ int status;
+};
+
+struct uds_event_global_var {
+ int cur;
+ struct uds_event *tofree[UDS_EPOLL_MAX_EVENTS];
+ char *msg_control;
+ int msg_controllen;
+ char *msg_control_send;
+ int msg_controlsendlen;
+ char *iov_base;
+ int iov_len;
+ char *iov_base_send;
+ int iov_sendlen;
+ char *buf;
+ int buflen;
+};
+
+struct uds_event {
+ int fd; /* 本事件由这个fd触发 */
+ unsigned int tofree : 1, /* 1--in to free list; 0--not */
+ pipe : 1, // this is a pipe event
+ reserved : 30;
+ union {
+ struct uds_event *peer; /* peer event */
+ int peerfd; // scm pipe 场景单向导通只需要一个fd即可
+ };
+ int (*handler)(void *, int, struct uds_event_global_var *); /* event处理函数 */
+ void *priv; // private data
+ char cpath[UDS_SUN_PATH_LEN];
+ char spath[UDS_SUN_PATH_LEN];
+};
+
+
+struct uds_thread_arg {
+ int efd;
+ struct uds_event_global_var *p_event_var;
+ struct uds_thread_info info;
+};
+
+struct uds_global_var {
+ int work_thread_num;
+ int *efd;
+ struct uds_thread_arg *work_thread;
+ int loglevel;
+ char *logstr[UDS_LOG_MAX + 1];
+ struct _tcp {
+ char addr[20];
+ unsigned short port;
+ char peeraddr[20];
+ unsigned short peerport;
+ } tcp;
+ struct _uds {
+ char sun_path[UDS_SUN_PATH_LEN];
+ } uds;
+};
+enum uds_cs {
+ UDS_SOCKET_CLIENT = 1,
+ UDS_SOCKET_SERVER,
+};
+
+struct uds_conn_arg {
+ int cs; // client(1) or server(2)
+
+ int udstype; // DGRAM or STREAM
+ char sun_path[UDS_SUN_PATH_LEN];
+ int sockfd;
+ int connfd;
+};
+
+struct uds_event *uds_add_event(int fd, struct uds_event *peer, int (*handler)(void *, int, struct uds_event_global_var *), void *priv);
+struct uds_event *uds_add_pipe_event(int fd, int peerfd, int (*handler)(void *, int, struct uds_event_global_var *), void *priv);
+int uds_sock_step_accept(int sockFd, int family);
+int uds_build_tcp_connection(struct uds_conn_arg *arg);
+int uds_build_unix_connection(struct uds_conn_arg *arg);
+void uds_del_event(struct uds_event *evt);
+int uds_event_suspend(int efd, struct uds_event *event);
+int uds_event_insert(int efd, struct uds_event *event);
+#ifdef QTFS_SERVER
+int uds_proxy_main(int argc, char *argv[]);
+#endif
+#endif
diff --git a/qtfs/ipc/uds_module.h b/qtfs/ipc/uds_module.h
new file mode 100644
index 0000000..9ccbb9d
--- /dev/null
+++ b/qtfs/ipc/uds_module.h
@@ -0,0 +1,19 @@
+#ifndef __QTFS_UDS_MODULE_H__
+#define __QTFS_UDS_MODULE_H__
+
+#define UDS_BUILD_CONN_ADDR "/var/run/qtfs/remote_uds.sock"
+#define UDS_DIAG_ADDR "/var/run/qtfs/uds_proxy_diag.sock"
+#define UDS_LOGLEVEL_UPD "/var/run/qtfs/uds_loglevel.sock"
+#define UDS_BUILD_CONN_DIR "/var/run/qtfs/"
+
+#define UDS_SUN_PATH_LEN 108 // from glibc
+struct uds_proxy_remote_conn_req {
+ unsigned short type;
+ unsigned short resv;
+ char sun_path[UDS_SUN_PATH_LEN];
+};
+struct uds_proxy_remote_conn_rsp {
+ int ret;
+};
+
+#endif
diff --git a/qtfs/misc.c b/qtfs/misc.c
index 98222bd..44da4e1 100644
--- a/qtfs/misc.c
+++ b/qtfs/misc.c
@@ -156,6 +156,13 @@ long qtfs_misc_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
}
break;
}
+ case QTFS_IOCTL_UDS_PROXY_PID:
+ if (copy_from_user(&qtfs_uds_proxy_pid, (void *)arg, sizeof(int))) {
+ qtfs_err("ioctl get uds proxy pid failed.");
+ break;
+ }
+ qtfs_info("ioctl get uds proxy process pid is %d", qtfs_uds_proxy_pid);
+ break;
}
return ret;
}
diff --git a/qtfs/qtfs/sb.c b/qtfs/qtfs/sb.c
index 7445fad..104d137 100644
--- a/qtfs/qtfs/sb.c
+++ b/qtfs/qtfs/sb.c
@@ -288,7 +288,7 @@ ssize_t qtfs_readiter(struct kiocb *kio, struct iov_iter *iov)
req->fd = private->fd;
if (req->fd <= 0) {
- qtfs_err("qtfs_readiter: invalid file(0x%llx)", req->fd);
+ qtfs_err("qtfs_readiter: invalid file(%d)", req->fd);
qtfs_conn_put_param(pvar);
return -EINVAL;
}
@@ -360,7 +360,7 @@ ssize_t qtfs_writeiter(struct kiocb *kio, struct iov_iter *iov)
req->d.fd = private->fd;
if (req->d.fd < 0) {
- qtfs_err("qtfs_write: invalid file(0x%llx)", req->d.fd);
+ qtfs_err("qtfs_write: invalid file(%d)", req->d.fd);
qtfs_conn_put_param(pvar);
return -EINVAL;
}
@@ -1172,7 +1172,7 @@ int qtfs_getattr(const struct path *path, struct kstat *stat, u32 req_mask, unsi
*stat = rsp->stat;
qtfs_debug("qtfs getattr success:<%s> blksiz:%u size:%lld mode:%o ino:%llu pathino:%lu. %s\n", req->path, rsp->stat.blksize,
rsp->stat.size, rsp->stat.mode, rsp->stat.ino, inode->i_ino, rsp->stat.ino != inode->i_ino ? "delete current inode" : "");
- if (inode->i_ino != rsp->stat.ino || rsp->stat.mode != inode->i_mode) {
+ if (inode->i_ino != rsp->stat.ino || inode->i_mode != rsp->stat.mode) {
if (inode->i_nlink > 0){
drop_nlink(inode);
}
diff --git a/qtfs/qtfs_server/Makefile b/qtfs/qtfs_server/Makefile
index 9c6bcd5..2ff826f 100644
--- a/qtfs/qtfs_server/Makefile
+++ b/qtfs/qtfs_server/Makefile
@@ -4,15 +4,26 @@ KBUILD=/lib/modules/$(shell uname -r)/build/
obj-m:=qtfs_server.o
qtfs_server-objs:=../conn.o fsops.o qtfs-server.o ../misc.o
+DEPGLIB=-lglib-2.0 -I../ -I/usr/include/glib-2.0 -I/usr/lib64/glib-2.0/include
+
all: qtfs_server engine
qtfs_server:
make -C $(KBUILD) M=$(PWD) modules
-engine:
- gcc -O2 -o engine user_engine.c -lpthread -lglib-2.0 -I../ -I/usr/include/glib-2.0 -I/usr/lib64/glib-2.0/include -DQTFS_SERVER
+engine: uds_event.o uds_main.o user_engine.o
+ gcc -O2 -o engine $^ -lpthread $(DEPGLIB) -I../ -I../ipc/ -DQTFS_SERVER
+
+user_engine.o:
+ cc -g -c -o user_engine.o user_engine.c $(DEPGLIB) -I../ -DQTFS_SERVER
+
+uds_event.o:
+ cc -g -c -o uds_event.o ../ipc/uds_event.c -DQTFS_SERVER
+
+uds_main.o:
+ cc -g -c -o uds_main.o ../ipc/uds_main.c -DQTFS_SERVER
clean:
make -C $(KBUILD) M=$(PWD) clean
rm -rf engine
- rm -rf ../*.o ../.*.o.cmd
+ rm -rf ../*.o
diff --git a/qtfs/qtfs_server/fsops.c b/qtfs/qtfs_server/fsops.c
index 61e8895..6c3e201 100644
--- a/qtfs/qtfs_server/fsops.c
+++ b/qtfs/qtfs_server/fsops.c
@@ -25,10 +25,11 @@
bool in_white_list(char *path, int type)
{
+ int i, in_wl = -1;
+
if (!whitelist[type]) {
return true;
}
- int i, in_wl = -1;
for (i = 0; i < whitelist[type]->len; i++) {
if (!strncmp(path, whitelist[type]->wl[i].path, whitelist[type]->wl[i].len)){
in_wl = i;
@@ -202,7 +203,7 @@ static int handle_statfs(struct qtserver_arg *arg)
static int handle_mount(struct qtserver_arg *arg)
{
struct path path;
- int ret, i, in_wl = -1;
+ int ret;
struct qtreq_mount *req = (struct qtreq_mount *)REQ(arg);
struct qtrsp_mount *rsp = (struct qtrsp_mount *)RSP(arg);
if (!in_white_list(req->path, QTFS_WHITELIST_MOUNT)) {
diff --git a/qtfs/qtfs_server/qtfs-server.c b/qtfs/qtfs_server/qtfs-server.c
index b0b8ab0..cbe07f0 100644
--- a/qtfs/qtfs_server/qtfs-server.c
+++ b/qtfs/qtfs_server/qtfs-server.c
@@ -214,11 +214,6 @@ long qtfs_server_misc_ioctl(struct file *file, unsigned int cmd, unsigned long a
qtfs_server_thread_run = arg;
break;
- case QTFS_IOCTL_ALLINFO:
- case QTFS_IOCTL_CLEARALL:
- case QTFS_IOCTL_LOGLEVEL:
- ret = qtfs_misc_ioctl(file, cmd, arg);
- break;
case QTFS_IOCTL_WHITELIST:
if (copy_from_user(&len, (void __user *)arg, sizeof(int))) {
qtfs_err("qtfs ioctl white init copy from user failed.");
@@ -239,6 +234,12 @@ long qtfs_server_misc_ioctl(struct file *file, unsigned int cmd, unsigned long a
qtfs_err("init %d list:%d %s", tmp->type, i, whitelist[tmp->type]->wl[i].path);
}
break;
+ case QTFS_IOCTL_ALLINFO:
+ case QTFS_IOCTL_CLEARALL:
+ case QTFS_IOCTL_LOGLEVEL:
+ case QTFS_IOCTL_UDS_PROXY_PID:
+ ret = qtfs_misc_ioctl(file, cmd, arg);
+ break;
default:
qtfs_err("qtfs misc ioctl unknown cmd:%u.", cmd);
break;
diff --git a/qtfs/qtfs_server/user_engine.c b/qtfs/qtfs_server/user_engine.c
index 547935c..a3d627d 100644
--- a/qtfs/qtfs_server/user_engine.c
+++ b/qtfs/qtfs_server/user_engine.c
@@ -14,6 +14,7 @@
#include <sys/epoll.h>
#include "comm.h"
+#include "ipc/uds_main.h"
char wl_type_str[QTFS_WHITELIST_MAX][10] = {"Open", "Write", "Read", "Readdir", "Mkdir", "Rmdir", "Create", "Unlink", "Rename", "Setattr", "Setxattr", "Mount"};
@@ -220,13 +221,12 @@ int qtfs_whitelist_init(int fd)
int main(int argc, char *argv[])
{
- if (argc != 3) {
- engine_out("Usage: %s <buf size> <number of threads>.", argv[0]);
- engine_out(" Example: %s 4096 16.", argv[0]);
+ if (argc != 7) {
+ engine_out("Usage: %s <number of threads> <uds proxy thread num> <host ip> <uds proxy port> <dpu ip> <uds proxy port>.", argv[0]);
+ engine_out(" Example: %s 16 1 192.168.10.10 12121 192.168.10.11 12121.", argv[0]);
return -1;
}
- int psize = atoi(argv[1]);
- int thread_nums = atoi(argv[2]);
+ int thread_nums = atoi(argv[1]);
int fd = open(QTFS_SERVER_FILE, O_RDONLY);
if (fd < 0) {
engine_err("qtfs server file:%s open failed, fd:%d.", QTFS_SERVER_FILE, fd);
@@ -247,9 +247,9 @@ int main(int argc, char *argv[])
pthread_t texec[QTFS_MAX_THREADS];
pthread_t tepoll;
- if (psize > QTFS_USERP_MAXSIZE || thread_nums > QTFS_MAX_THREADS) {
- engine_err("qtfs engine param invalid, size:%d(must <= %d) thread_nums:%d(must <= %d).",
- psize, QTFS_USERP_MAXSIZE, thread_nums, QTFS_MAX_THREADS);
+ if (thread_nums > QTFS_MAX_THREADS) {
+ engine_err("qtfs engine param invalid, thread_nums:%d(must <= %d).",
+ thread_nums, QTFS_MAX_THREADS);
goto end;
}
(void)ioctl(fd, QTFS_IOCTL_EXIT, 1);
@@ -257,24 +257,30 @@ int main(int argc, char *argv[])
signal(SIGKILL, qtfs_signal_int);
signal(SIGTERM, qtfs_signal_int);
- struct qtfs_server_userp_s *userp = qtfs_engine_thread_init(fd, thread_nums, psize);
+ struct qtfs_server_userp_s *userp = qtfs_engine_thread_init(fd, thread_nums, QTFS_USERP_SIZE);
if (userp == NULL) {
engine_out("qtfs engine userp init failed.");
goto end;
}
struct engine_arg arg[QTFS_MAX_THREADS];
for (int i = 0; i < thread_nums; i++) {
- arg[i].psize = psize;
+ arg[i].psize = QTFS_USERP_SIZE;
arg[i].fd = fd;
arg[i].thread_idx = i;
(void)pthread_create(&texec[i], NULL, qtfs_engine_kthread, &arg[i]);
}
(void)pthread_create(&tepoll, NULL, qtfs_engine_epoll_thread, &arg[0]);
+ // 必须放在这个位置uds main里面最终也有join
+ if (uds_proxy_main(6, &argv[1]) != 0) {
+ engine_out("uds proxy start failed.");
+ goto engine_free;
+ }
for (int i = 0; i < thread_nums; i++) {
pthread_join(texec[i], NULL);
engine_out("qtfs engine join thread %d.", i);
}
pthread_join(tepoll, NULL);
+engine_free:
qtfs_engine_userp_free(userp, thread_nums);
engine_out("qtfs engine join epoll thread.");
end:
diff --git a/qtfs/qtinfo/qtinfo.c b/qtfs/qtinfo/qtinfo.c
index dc88da0..a8ba2e0 100644
--- a/qtfs/qtinfo/qtinfo.c
+++ b/qtfs/qtinfo/qtinfo.c
@@ -4,9 +4,13 @@
#include <fcntl.h>
#include <sys/ioctl.h>
#include <string.h>
+#include <stddef.h>
+#include <sys/socket.h>
+#include <sys/un.h>
#include "qtinfo.h"
#include "comm.h"
+#include "ipc/uds_main.h"
#ifdef client
#define QTFS_DEV_NAME "/dev/qtfs_client"
@@ -312,6 +316,69 @@ void qtinfo_opt_p(int fd, char *support)
return;
}
+#define PATH_MAX 4096
+void qtinfo_opt_u()
+{
+ int len;
+ struct sockaddr_un svr;
+ int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sockfd < 0) {
+ qtinfo_err("Create socket fd failed.");
+ return;
+ }
+
+ memset(&svr, 0, sizeof(svr));
+ svr.sun_family = AF_UNIX;
+ strcpy(svr.sun_path, UDS_DIAG_ADDR);
+ len = offsetof(struct sockaddr_un, sun_path) + strlen(svr.sun_path);
+ if (connect(sockfd, (struct sockaddr *)&svr, len) < 0) {
+ qtinfo_err("connect to %s failed.", UDS_DIAG_ADDR);
+ return;
+ }
+ while (1) {
+ char buf[256];
+ int n;
+ memset(buf, 0, 256);
+ n = recv(sockfd, buf, 256, 0);
+ if (n <= 0)
+ break;
+ qtinfo_out2("%s", buf);
+ }
+ close(sockfd);
+ return;
+}
+
+void qtinfo_opt_s()
+{
+ int len;
+ struct sockaddr_un svr;
+ int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sockfd < 0) {
+ qtinfo_err("Create socket fd failed.");
+ return;
+ }
+
+ memset(&svr, 0, sizeof(svr));
+ svr.sun_family = AF_UNIX;
+ strcpy(svr.sun_path, UDS_LOGLEVEL_UPD);
+ len = offsetof(struct sockaddr_un, sun_path) + strlen(svr.sun_path);
+ if (connect(sockfd, (struct sockaddr *)&svr, len) < 0) {
+ qtinfo_err("connect to %s failed.", UDS_LOGLEVEL_UPD);
+ return;
+ }
+ while (1) {
+ char buf[256];
+ int n;
+ memset(buf, 0, 256);
+ n = recv(sockfd, buf, 256, 0);
+ if (n <= 0)
+ break;
+ qtinfo_out2("%s", buf);
+ }
+ close(sockfd);
+ return;
+
+}
static void qtinfo_help(char *exec)
{
@@ -322,6 +389,8 @@ static void qtinfo_help(char *exec)
qtinfo_out(" -l, Set log level(valid param: \"NONE\", \"ERROR\", \"WARN\", \"INFO\", \"DEBUG\").");
qtinfo_out(" -t, For test informations.");
qtinfo_out(" -p, Epoll support file mode(1: any files; 0: only fifo).");
+ qtinfo_out(" -u, Display unix socket proxy diagnostic info");
+ qtinfo_out(" -s, Set unix socket proxy log level(Increase by 1 each time)");
}
int main(int argc, char *argv[])
@@ -334,7 +403,7 @@ int main(int argc, char *argv[])
qtinfo_err("open file %s failed.", QTFS_DEV_NAME);
return 0;
}
- while ((ch = getopt(argc, argv, "acl:tp:")) != -1) {
+ while ((ch = getopt(argc, argv, "acl:tp:us")) != -1) {
switch (ch) {
case 'a':
qtinfo_opt_a(fd);
@@ -351,6 +420,12 @@ int main(int argc, char *argv[])
case 'p':
qtinfo_opt_p(fd, optarg);
break;
+ case 'u':
+ qtinfo_opt_u();
+ break;
+ case 's':
+ qtinfo_opt_s();
+ break;
default:
qtinfo_help(argv[0]);
break;
diff --git a/qtfs/qtsock.c b/qtfs/qtsock.c
new file mode 100644
index 0000000..58b2eab
--- /dev/null
+++ b/qtfs/qtsock.c
@@ -0,0 +1,332 @@
+#include <linux/fs.h>
+#include <linux/init.h>
+#include <linux/kernel.h>
+#include <linux/ftrace.h>
+#include <linux/kprobes.h>
+#include <net/sock.h>
+#include <linux/netlink.h>
+#include <linux/un.h>
+
+#include "conn.h"
+#include "log.h"
+#include "comm.h"
+#include "qtfs/syscall.h"
+
+#define MAX_SOCK_PATH_LEN 108
+
+static struct socket *qtfs_sock = NULL;
+static struct mutex qtfs_sock_mutex;
+static char qtfs_sock_path[] = "/var/run/qtfs/remote_uds.sock";
+
+struct qtsock_wl_stru qtsock_wl;
+
+static struct sock *(*origin_unix_find_other)(struct net *net,
+ struct sockaddr_un *sunname, int len,
+ int type, unsigned int hash, int *error);
+
+struct ftrace_hook {
+ const char *name;
+ void *func;
+ void *origin;
+
+ unsigned long addr;
+ struct ftrace_ops ops;
+};
+
+struct ftrace_hook unix_find_other_hook;
+
+static int resolve_hook_address(struct ftrace_hook *hook)
+{
+ hook->addr = qtfs_kallsyms_lookup_name(hook->name);
+ if (!hook->addr) {
+ qtfs_warn("unresolved symbol during resolving hook address:%s\n", hook->name);
+ return -ENOENT;
+ }
+ *((unsigned long *)hook->origin) = hook->addr;
+
+ return 0;
+}
+
+static void notrace ftrace_thunk(unsigned long ip, unsigned long parent_ip,
+ struct ftrace_ops *ops, struct pt_regs *regs)
+{
+ struct ftrace_hook *hook = container_of(ops, struct ftrace_hook, ops);
+
+ if (!within_module(parent_ip, THIS_MODULE))
+ regs->ip = (unsigned long)hook->func;
+}
+
+int install_hook(struct ftrace_hook *hook)
+{
+ int err;
+
+ err = resolve_hook_address(hook);
+ if (err)
+ return err;
+
+ hook->ops.func = ftrace_thunk;
+ hook->ops.flags = FTRACE_OPS_FL_SAVE_REGS | FTRACE_OPS_FL_IPMODIFY;
+
+ err = ftrace_set_filter_ip(&hook->ops, hook->addr, 0, 0);
+ if (err) {
+ qtfs_err("ftrace_set_filter_ip failed:%d\n", err);
+ return err;
+ }
+
+ err = register_ftrace_function(&hook->ops);
+ if (err) {
+ qtfs_err("register_ftrace_function failed with :%d\n", err);
+ ftrace_set_filter_ip(&hook->ops, hook->addr, 1, 0);
+ return err;
+ }
+ qtfs_info("install hook(%s) done\n", hook->name);
+
+ return 0;
+}
+
+void remove_hook(struct ftrace_hook *hook)
+{
+ int err;
+
+ err = unregister_ftrace_function(&hook->ops);
+ if (err)
+ qtfs_err("unregister_ftrace_function failed:%d\n", err);
+
+ err = ftrace_set_filter_ip(&hook->ops, hook->addr, 1, 0);
+ if (err)
+ qtfs_err("ftrace_set_filter_ip failed:%d\n", err);
+ qtfs_info("remove hook(%s) done", hook->name);
+}
+
+struct qtfs_sock_req {
+ int magic;
+ int type;
+ char sunname[MAX_SOCK_PATH_LEN];
+};
+
+struct qtfs_sock_rsp {
+ int found;
+};
+
+static int qtsock_conn(void)
+{
+ int ret;
+ struct sockaddr_un saddr;
+
+ ret = mutex_lock_interruptible(&qtfs_sock_mutex);
+ if (ret <0) {
+ qtfs_err("Failed to get qtfs sock mutex lock:%d\n", ret);
+ return false;
+ }
+ // calling this function means qtfs_sock isn't working properly.
+ // so it's ok to release and clean old qtfs_sock
+ if (qtfs_sock) {
+ sock_release(qtfs_sock);
+ qtfs_sock = NULL;
+ }
+ // connect to userspace unix socket server
+ ret = __sock_create(&init_net, AF_UNIX, SOCK_STREAM, 0, &qtfs_sock, 1);
+ if (ret) {
+ qtfs_err("qtfs sock client init create sock failed:%d\n", ret);
+ mutex_unlock(&qtfs_sock_mutex);
+ return ret;
+ }
+ saddr.sun_family = PF_UNIX;
+ strcpy(saddr.sun_path, qtfs_sock_path);
+ ret = qtfs_sock->ops->connect(qtfs_sock, (struct sockaddr *)&saddr,
+ sizeof(struct sockaddr_un) - 1, 0);
+ if (ret) {
+ qtfs_err("qtfs sock client sock connect failed:%d\n", ret);
+ sock_release(qtfs_sock);
+ qtfs_sock = NULL;
+ mutex_unlock(&qtfs_sock_mutex);
+ return ret;
+ }
+
+ mutex_unlock(&qtfs_sock_mutex);
+ return ret;
+}
+
+bool qtfs_udsfind(char *sunname, int len, int type)
+{
+ struct qtfs_sock_req qs_req;
+ struct qtfs_sock_rsp qs_rsp;
+ struct kvec send_vec, recv_vec;
+ struct msghdr send_msg, recv_msg;
+ int ret;
+ int retry = 0, penalty = 100, i = 0;
+
+ // qtfs_sock still not initialized, try to connect to server
+ if (!qtfs_sock && (qtsock_conn() < 0)) {
+ qtfs_err("failed to connect to qtfs socket\n");
+ return false;
+ }
+ if (len > MAX_SOCK_PATH_LEN) {
+ qtfs_err("Invalid socket path name len(%d)\n", len);
+ return false;
+ }
+ memset(&qs_req, 0, sizeof(qs_req));
+ memset(&qs_rsp, 0, sizeof(qs_rsp));
+ strncpy(qs_req.sunname, sunname, len);
+ qs_req.type = type;
+ qs_req.magic = 0xDEADBEEF;
+
+ memset(&send_msg, 0, sizeof(send_msg));
+ memset(&send_vec, 0, sizeof(send_vec));
+ memset(&recv_msg, 0, sizeof(recv_msg));
+ memset(&recv_vec, 0, sizeof(recv_vec));
+
+ send_vec.iov_base = &qs_req;
+ send_vec.iov_len = sizeof(qs_req);
+ qtfs_info("qtfs uds find socket(%s), type(%d)\n", sunname, type);
+
+reconn:
+ if (retry) {
+ for (i = 0; i < retry; i++) {
+ if (qtsock_conn() == 0)
+ break;
+ qtfs_err("qtfs socket reconnect failed for %d trial", i+1);
+ penalty *= 2;
+ msleep(penalty);
+ }
+ }
+ ret = mutex_lock_interruptible(&qtfs_sock_mutex);
+ if (ret < 0) {
+ qtfs_err("Failed to get qtfs sock mutex lock:%d\n", ret);
+ return false;
+ }
+ if (!qtfs_sock) {
+ qtfs_err("qtfs_sock is NULL, please check\n");
+ mutex_unlock(&qtfs_sock_mutex);
+ return false;
+ }
+ send_msg.msg_flags |= MSG_NOSIGNAL;
+ ret = kernel_sendmsg(qtfs_sock, &send_msg, &send_vec, 1, sizeof(qs_req));
+ if (ret == -EPIPE && retry == 0) {
+ qtfs_err("uds find connection has broken, try to reconnect\n");
+ retry = 3;
+ mutex_unlock(&qtfs_sock_mutex);
+ goto reconn;
+ } else if (ret < 0) {
+ qtfs_err("Failed to send uds find message:%d\n", ret);
+ mutex_unlock(&qtfs_sock_mutex);
+ return false;
+ }
+
+ // waiting for response
+ recv_vec.iov_base = &qs_rsp;
+ recv_vec.iov_len = sizeof(qs_rsp);
+retry:
+ recv_msg.msg_flags |= MSG_NOSIGNAL;
+ ret = kernel_recvmsg(qtfs_sock, &recv_msg, &recv_vec, 1, sizeof(qs_rsp), 0);
+ if (ret == -ERESTARTSYS || ret == -EINTR) {
+ qtfs_err("uds remote find get interrupted, just retry");
+ msleep(1);
+ goto retry;
+ }
+ mutex_unlock(&qtfs_sock_mutex);
+ if (ret < 0) {
+ qtfs_err("Failed to receive uds find response:%d\n", ret);
+ return false;
+ }
+ qtfs_info("uds remote find socket(%s), type(%d), result:%s\n", sunname, type, qs_rsp.found ? "found" : "not found");
+ return qs_rsp.found;
+}
+
+static int uds_find_whitelist(const char *path)
+{
+ int i;
+ int ret = 1;
+ read_lock(&qtsock_wl.rwlock);
+ for (i = 0; i< qtsock_wl.nums; i++) {
+ if (strncmp(path, qtsock_wl.wl[i], strlen(qtsock_wl.wl[i])) == 0) {
+ ret = 0;
+ break;
+ }
+ }
+ read_unlock(&qtsock_wl.rwlock);
+ return ret;
+}
+
+static inline bool uds_is_proxy(void)
+{
+ return (current->tgid == qtfs_uds_proxy_pid);
+}
+
+static struct sock *qtfs_unix_find_other(struct net *net,
+ struct sockaddr_un *sunname, int len,
+ int type, unsigned int hash, int *error)
+{
+ struct sock *other = NULL;
+ bool found = false;
+
+ qtfs_debug("in qtfs_unix_find_other (%s)\n", sunname->sun_path);
+ other = origin_unix_find_other(net, sunname, len, type, hash, error);
+ if (other) {
+ qtfs_debug("find unix other sock(%s) locally", sunname->sun_path);
+ return other;
+ }
+
+ // do not call remote find if sunname is annomous or sunpath not in whitelist
+ if (!sunname->sun_path[0] || uds_find_whitelist(sunname->sun_path) ||
+ uds_is_proxy() == true) {
+ *error = -ECONNREFUSED;
+ return NULL;
+ }
+
+ qtfs_info("Failed to find unix other sock(%s) locally, try to find remotely\n", sunname->sun_path);
+ // refer userspace service to get remote socket status
+ // if found, which means userspace service has create this unix socket server, just go to origin_unix_find_other, it will be found
+ // if not found, return NULL
+ found = qtfs_udsfind(sunname->sun_path, len, type);
+ if (!found) {
+ qtfs_info("failed to find unix other sock(%s) remotely", sunname->sun_path);
+ *error = -ECONNREFUSED;
+ return NULL;
+ }
+ qtfs_info("find unix other sock(%s) remotely\n", sunname->sun_path);
+
+ // found it remotely, so we will inform userspace engine to create specfic unix socket and connect to qtfs server
+ // and call unix_find_other locally
+ // xxx: will this be called recursively? Hope not
+ return origin_unix_find_other(net, sunname, len, type, hash, error);
+}
+
+int qtfs_sock_init(void)
+{
+ qtfs_kallsyms_hack_init();
+
+ qtfs_info("in qtfs ftrace hook unix_find_other\n");
+ unix_find_other_hook.name = "unix_find_other";
+ unix_find_other_hook.func = qtfs_unix_find_other;
+ unix_find_other_hook.origin = &origin_unix_find_other;
+
+ install_hook(&unix_find_other_hook);
+ mutex_init(&qtfs_sock_mutex);
+ rwlock_init(&qtsock_wl.rwlock);
+ qtsock_wl.nums = 0;
+ qtsock_wl.wl = (char **)kmalloc(sizeof(char *) * QTSOCK_WL_MAX_NUM, GFP_KERNEL);
+ if (qtsock_wl.wl == NULL) {
+
+ qtfs_err("failed to kmalloc wl, max num:%d", QTSOCK_WL_MAX_NUM);
+ }
+
+ return 0;
+}
+
+void qtfs_sock_exit(void)
+{
+ int ret;
+ qtfs_info("exit qtfs ftrace, remove unix_find_other_hook\n");
+ remove_hook(&unix_find_other_hook);
+
+ ret = mutex_lock_interruptible(&qtfs_sock_mutex);
+ if (ret < 0)
+ qtfs_err("Failed to get qtfs sock mutex lock:%d\n", ret);
+ // close unix socket connected to userspace
+ if (qtfs_sock) {
+ sock_release(qtfs_sock);
+ qtfs_sock = NULL;
+ }
+ mutex_unlock(&qtfs_sock_mutex);
+}
--
2.33.0