From 7e4d7c638681df4f32c0d719c62b4e38ef69c1eb Mon Sep 17 00:00:00 2001 From: Lemmy Huang Date: Sat, 31 Aug 2024 14:51:41 +0800 Subject: [PATCH] cleancode: move some API from stack to rpc and rtw Signed-off-by: Lemmy Huang --- src/lstack/api/lstack_epoll.c | 17 +- src/lstack/api/lstack_rtc_api.c | 7 +- src/lstack/api/lstack_rtw_api.c | 250 ++++++- src/lstack/api/lstack_wrap.c | 1 - src/lstack/core/lstack_dpdk.c | 5 +- src/lstack/core/lstack_lwip.c | 47 +- src/lstack/core/lstack_protocol_stack.c | 757 ++------------------- src/lstack/core/lstack_thread_rpc.c | 610 +++++++++++++---- src/lstack/include/lstack_dpdk.h | 33 +- src/lstack/include/lstack_epoll.h | 12 +- src/lstack/include/lstack_lwip.h | 9 +- src/lstack/include/lstack_protocol_stack.h | 51 +- src/lstack/include/lstack_rpc_proc.h | 47 -- src/lstack/include/lstack_thread_rpc.h | 81 +-- src/lstack/netif/lstack_ethdev.c | 56 ++ 15 files changed, 927 insertions(+), 1056 deletions(-) delete mode 100644 src/lstack/include/lstack_rpc_proc.h diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c index 1c13076..ce3d267 100644 --- a/src/lstack/api/lstack_epoll.c +++ b/src/lstack/api/lstack_epoll.c @@ -19,15 +19,9 @@ #include #include -#include #include -#include -#include -#include -#include #include -#include "lstack_ethdev.h" #include "lstack_stack_stat.h" #include "lstack_cfg.h" #include "lstack_log.h" @@ -306,6 +300,17 @@ int32_t lstack_epoll_create(int32_t flags) return lstack_do_epoll_create(fd); } +static void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup) +{ + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + struct protocol_stack *stack = NULL; + + for (int32_t i = 0; i < stack_group->stack_num; i++) { + stack = stack_group->stacks[i]; + rpc_call_clean_epoll(&stack->rpc_queue, wakeup); + } +} + int32_t lstack_epoll_close(int32_t fd) { struct lwip_sock *sock = lwip_get_socket(fd); diff --git a/src/lstack/api/lstack_rtc_api.c b/src/lstack/api/lstack_rtc_api.c index 7689c83..60d3b23 100644 --- a/src/lstack/api/lstack_rtc_api.c +++ b/src/lstack/api/lstack_rtc_api.c @@ -42,11 +42,6 @@ static int rtc_close(int s) return lwip_close(s); } -static int rtc_shutdown(int fd, int how) -{ - return lwip_shutdown(fd, how); -} - static int rtc_epoll_create(int flags) { if (stack_setup_app_thread() < 0) { @@ -90,7 +85,7 @@ static int rtc_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *excep void rtc_api_init(posix_api_t *api) { api->close_fn = rtc_close; - api->shutdown_fn = rtc_shutdown; + api->shutdown_fn = lwip_shutdown; api->socket_fn = rtc_socket; api->accept_fn = lwip_accept; api->accept4_fn = lwip_accept4; diff --git a/src/lstack/api/lstack_rtw_api.c b/src/lstack/api/lstack_rtw_api.c index d8634cc..7ceff20 100644 --- a/src/lstack/api/lstack_rtw_api.c +++ b/src/lstack/api/lstack_rtw_api.c @@ -13,14 +13,258 @@ #include #include +#include "common/gazelle_base_func.h" +#include "lstack_log.h" +#include "lstack_cfg.h" #include "lstack_thread_rpc.h" -#include "lstack_epoll.h" #include "lstack_protocol_stack.h" -#include "lstack_cfg.h" #include "lstack_lwip.h" -#include "common/gazelle_base_func.h" +#include "lstack_epoll.h" #include "lstack_rtw_api.h" +/* when fd is listenfd, listenfd of all protocol stack thread will be closed */ +static int stack_broadcast_close(int fd) +{ + int ret = 0; + struct lwip_sock *sock = lwip_get_socket(fd); + struct protocol_stack *stack = get_protocol_stack_by_fd(fd); + if (sock == NULL) { + GAZELLE_RETURN(EBADF); + } + + do { + sock = sock->listen_next; + if (stack == NULL || rpc_call_close(&stack->rpc_queue, fd)) { + ret = -1; + } + + if (POSIX_IS_CLOSED(sock)) { + break; + } + fd = sock->conn->callback_arg.socket; + stack = get_protocol_stack_by_fd(fd); + } while (1); + + return ret; +} + +static int stack_broadcast_shutdown(int fd, int how) +{ + int32_t ret = 0; + struct lwip_sock *sock = lwip_get_socket(fd); + struct protocol_stack *stack = get_protocol_stack_by_fd(fd); + if (sock == NULL) { + GAZELLE_RETURN(EBADF); + } + + do { + sock = sock->listen_next; + if (stack == NULL || rpc_call_shutdown(&stack->rpc_queue, fd, how)) { + ret = -1; + } + + if (POSIX_IS_CLOSED(sock)) { + break; + } + fd = sock->conn->callback_arg.socket; + stack = get_protocol_stack_by_fd(fd); + } while (1); + + return ret; +} + +/* choice one stack bind */ +static int stack_single_bind(int fd, const struct sockaddr *name, socklen_t namelen) +{ + struct protocol_stack *stack = get_protocol_stack_by_fd(fd); + if (stack == NULL) { + GAZELLE_RETURN(EBADF); + } + return rpc_call_bind(&stack->rpc_queue, fd, name, namelen); +} + +/* bind sync to all protocol stack thread, so that any protocol stack thread can build connect */ +static int stack_broadcast_bind(int fd, const struct sockaddr *name, socklen_t namelen) +{ + struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd); + struct protocol_stack *stack = NULL; + int ret, clone_fd; + + struct lwip_sock *sock = lwip_get_socket(fd); + if (sock == NULL || cur_stack == NULL) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd); + GAZELLE_RETURN(EBADF); + } + + ret = rpc_call_bind(&cur_stack->rpc_queue, fd, name, namelen); + if (ret < 0) { + close(fd); + return ret; + } + + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + for (int i = 0; i < stack_group->stack_num; ++i) { + stack = stack_group->stacks[i]; + if (stack != cur_stack) { + clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, name, namelen); + if (clone_fd < 0) { + stack_broadcast_close(fd); + return clone_fd; + } + } + } + return 0; +} + +static void inline del_accept_in_event(struct lwip_sock *sock) +{ + pthread_spin_lock(&sock->wakeup->event_list_lock); + + if (!NETCONN_IS_ACCEPTIN(sock)) { + sock->events &= ~EPOLLIN; + if (sock->events == 0) { + list_del_node(&sock->event_list); + } + } + + pthread_spin_unlock(&sock->wakeup->event_list_lock); +} + +static struct lwip_sock *get_min_accept_sock(int fd) +{ + struct lwip_sock *sock = lwip_get_socket(fd); + struct lwip_sock *min_sock = NULL; + + while (sock) { + if (!NETCONN_IS_ACCEPTIN(sock)) { + sock = sock->listen_next; + continue; + } + + if (min_sock == NULL || min_sock->stack->conn_num > sock->stack->conn_num) { + min_sock = sock; + } + + sock = sock->listen_next; + } + + return min_sock; +} + +/* ergodic the protocol stack thread to find the connection, because all threads are listening */ +static int stack_broadcast_accept4(int fd, struct sockaddr *addr, socklen_t *addrlen, int flags) +{ + int ret = -1; + struct lwip_sock *min_sock = NULL; + struct lwip_sock *sock = lwip_get_socket(fd); + struct protocol_stack *stack = NULL; + if (sock == NULL) { + GAZELLE_RETURN(EBADF); + } + + if (netconn_is_nonblocking(sock->conn)) { + min_sock = get_min_accept_sock(fd); + } else { + while ((min_sock = get_min_accept_sock(fd)) == NULL) { + lstack_block_wait(sock->wakeup, 0); + } + } + + if (min_sock && min_sock->conn) { + stack = get_protocol_stack_by_fd(min_sock->conn->callback_arg.socket); + if (stack == NULL) { + GAZELLE_RETURN(EBADF); + } + ret = rpc_call_accept(&stack->rpc_queue, min_sock->conn->callback_arg.socket, addr, addrlen, flags); + } + + if (min_sock && min_sock->wakeup && min_sock->wakeup->type == WAKEUP_EPOLL) { + del_accept_in_event(min_sock); + } + + if (ret < 0) { + errno = EAGAIN; + } + return ret; +} + +static int stack_broadcast_accept(int fd, struct sockaddr *addr, socklen_t *addrlen) +{ + if (get_global_cfg_params()->nonblock_mode) + return stack_broadcast_accept4(fd, addr, addrlen, O_NONBLOCK); + else + return stack_broadcast_accept4(fd, addr, addrlen, 0); +} + +/* choice one stack listen */ +static int stack_single_listen(int fd, int backlog) +{ + struct protocol_stack *stack = get_protocol_stack_by_fd(fd); + if (stack == NULL) { + GAZELLE_RETURN(EBADF); + } + return rpc_call_listen(&stack->rpc_queue, fd, backlog); +} + +/* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */ +static int stack_broadcast_listen(int fd, int backlog) +{ + typedef union sockaddr_union { + struct sockaddr sa; + struct sockaddr_in in; + struct sockaddr_in6 in6; + } sockaddr_t; + + struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd); + struct protocol_stack *stack = NULL; + sockaddr_t addr; + socklen_t addr_len = sizeof(addr); + int ret, clone_fd; + + struct lwip_sock *sock = lwip_get_socket(fd); + if (sock == NULL || cur_stack == NULL) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd); + GAZELLE_RETURN(EBADF); + } + + ret = rpc_call_getsockname(&cur_stack->rpc_queue, fd, (struct sockaddr *)&addr, &addr_len); + if (ret != 0) { + return ret; + } + + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + int min_conn_stk_idx = get_min_conn_stack(stack_group); + + for (int32_t i = 0; i < stack_group->stack_num; ++i) { + stack = stack_group->stacks[i]; + if (get_global_cfg_params()->seperate_send_recv && stack->is_send_thread) { + continue; + } + if (stack != cur_stack) { + clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, (struct sockaddr *)&addr, addr_len); + if (clone_fd < 0) { + stack_broadcast_close(fd); + return clone_fd; + } + } else { + clone_fd = fd; + } + + if (min_conn_stk_idx == i) { + lwip_get_socket(clone_fd)->conn->is_master_fd = 1; + } else { + lwip_get_socket(clone_fd)->conn->is_master_fd = 0; + } + + ret = rpc_call_listen(&stack->rpc_queue, clone_fd, backlog); + if (ret < 0) { + stack_broadcast_close(fd); + return ret; + } + } + return 0; +} + static int rtw_socket(int domain, int type, int protocol) { struct protocol_stack *stack = get_bind_protocol_stack(); diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c index 1d5529d..8f80f98 100644 --- a/src/lstack/api/lstack_wrap.c +++ b/src/lstack/api/lstack_wrap.c @@ -14,7 +14,6 @@ #include #include #include -#include #include #include diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c index 50fbdf6..f87e362 100644 --- a/src/lstack/core/lstack_dpdk.c +++ b/src/lstack/core/lstack_dpdk.c @@ -43,7 +43,6 @@ #include "lstack_log.h" #include "common/dpdk_common.h" #include "lstack_protocol_stack.h" -#include "lstack_thread_rpc.h" #include "lstack_lwip.h" #include "lstack_cfg.h" #include "lstack_virtio.h" @@ -765,9 +764,9 @@ static int dpdk_bond_create(uint8_t mode, int *slave_port_id, int count) return 0; } -int32_t init_dpdk_ethdev(void) +int init_dpdk_ethdev(void) { - int32_t ret; + int ret; int slave_port_id[GAZELLE_MAX_BOND_NUM]; int port_id = 0; struct cfg_params *cfg = get_global_cfg_params(); diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c index 89142a4..3454961 100644 --- a/src/lstack/core/lstack_lwip.c +++ b/src/lstack/core/lstack_lwip.c @@ -29,15 +29,12 @@ #include #include "common/gazelle_base_func.h" -#include "lstack_ethdev.h" -#include "lstack_protocol_stack.h" #include "lstack_log.h" -#include "lstack_dpdk.h" +#include "lstack_cfg.h" +#include "lstack_protocol_stack.h" #include "lstack_stack_stat.h" #include "lstack_epoll.h" -#include "lstack_thread_rpc.h" -#include "common/dpdk_common.h" -#include "lstack_cfg.h" +#include "lstack_dpdk.h" #include "lstack_lwip.h" static const uint8_t fin_packet = 0; @@ -841,43 +838,24 @@ ssize_t gazelle_same_node_ring_send(struct lwip_sock *sock, const void *buf, siz return act_len; } -PER_THREAD uint16_t stack_sock_num[GAZELLE_MAX_STACK_NUM] = {0}; -PER_THREAD uint16_t max_sock_stack = 0; - -static inline void thread_bind_stack(struct lwip_sock *sock) -{ - if (likely(sock->already_bind_numa || !sock->stack)) { - return; - } - sock->already_bind_numa = 1; - - if (get_global_cfg_params()->app_bind_numa == 0) { - return; - } - - stack_sock_num[sock->stack->stack_idx]++; - if (stack_sock_num[sock->stack->stack_idx] > max_sock_stack) { - max_sock_stack = stack_sock_num[sock->stack->stack_idx]; - bind_to_stack_numa(sock->stack); - } -} - ssize_t do_lwip_send_to_stack(int32_t fd, const void *buf, size_t len, int32_t flags, const struct sockaddr *addr, socklen_t addrlen) { + struct lwip_sock *sock; ssize_t send = 0; - + if (buf == NULL) { GAZELLE_RETURN(EINVAL); } - if (addr && addr->sa_family != AF_INET && addr->sa_family != AF_INET6) { GAZELLE_RETURN(EINVAL); } - - struct lwip_sock *sock = lwip_get_socket(fd); - thread_bind_stack(sock); + sock = lwip_get_socket(fd); + if (unlikely(sock->already_bind_numa == 0 && sock->stack)) { + thread_bind_stack(sock->stack); + sock->already_bind_numa = 1; + } if (sock->same_node_tx_ring != NULL) { return gazelle_same_node_ring_send(sock, buf, len, flags); @@ -1131,7 +1109,10 @@ ssize_t do_lwip_read_from_stack(int32_t fd, void *buf, size_t len, int32_t flags return -1; } - thread_bind_stack(sock); + if (unlikely(sock->already_bind_numa == 0 && sock->stack)) { + thread_bind_stack(sock->stack); + sock->already_bind_numa = 1; + } if (sock->same_node_rx_ring != NULL) { return gazelle_same_node_ring_recv(sock, buf, len, flags); diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index f56e911..bcca1a7 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -22,14 +22,12 @@ #include #include "common/gazelle_base_func.h" -#include "lstack_thread_rpc.h" #include "common/dpdk_common.h" #include "lstack_log.h" +#include "lstack_cfg.h" #include "lstack_dpdk.h" #include "lstack_ethdev.h" -#include "lstack_vdev.h" #include "lstack_lwip.h" -#include "lstack_cfg.h" #include "lstack_control_plane.h" #include "lstack_epoll.h" #include "lstack_stack_stat.h" @@ -64,18 +62,6 @@ static void stack_wait_quit(struct protocol_stack *stack) } } -void bind_to_stack_numa(struct protocol_stack *stack) -{ - int32_t ret; - pthread_t tid = pthread_self(); - - ret = pthread_setaffinity_np(tid, sizeof(stack->idle_cpuset), &stack->idle_cpuset); - if (ret != 0) { - LSTACK_LOG(ERR, LSTACK, "thread %d setaffinity to stack %hu failed\n", rte_gettid(), stack->queue_id); - return; - } -} - static inline void set_stack_idx(uint16_t idx) { g_stack_p = g_stack_group.stacks[idx]; @@ -97,27 +83,6 @@ struct protocol_stack_group *get_protocol_stack_group(void) return &g_stack_group; } -int get_min_conn_stack(struct protocol_stack_group *stack_group) -{ - int min_conn_stk_idx = 0; - int min_conn_num = GAZELLE_MAX_CLIENTS; - for (int i = 0; i < stack_group->stack_num; i++) { - struct protocol_stack* stack = stack_group->stacks[i]; - if (get_global_cfg_params()->seperate_send_recv) { - if (!stack->is_send_thread && stack->conn_num < min_conn_num) { - min_conn_stk_idx = i; - min_conn_num = stack->conn_num; - } - } else { - if (stack->conn_num < min_conn_num) { - min_conn_stk_idx = i; - min_conn_num = stack->conn_num; - } - } - } - return min_conn_stk_idx; -} - struct protocol_stack *get_protocol_stack(void) { return g_stack_p; @@ -179,6 +144,57 @@ struct protocol_stack *get_bind_protocol_stack(void) return stack_group->stacks[index]; } +int get_min_conn_stack(struct protocol_stack_group *stack_group) +{ + struct protocol_stack* stack; + int min_conn_stk_idx = 0; + int min_conn_num = GAZELLE_MAX_CLIENTS; + + for (int i = 0; i < stack_group->stack_num; i++) { + stack = stack_group->stacks[i]; + if (get_global_cfg_params()->seperate_send_recv) { + if (!stack->is_send_thread && stack->conn_num < min_conn_num) { + min_conn_stk_idx = i; + min_conn_num = stack->conn_num; + } + } else { + if (stack->conn_num < min_conn_num) { + min_conn_stk_idx = i; + min_conn_num = stack->conn_num; + } + } + } + return min_conn_stk_idx; +} + +void bind_to_stack_numa(struct protocol_stack *stack) +{ + int32_t ret; + pthread_t tid = pthread_self(); + + ret = pthread_setaffinity_np(tid, sizeof(stack->idle_cpuset), &stack->idle_cpuset); + if (ret != 0) { + LSTACK_LOG(ERR, LSTACK, "thread %d setaffinity to stack %hu failed\n", rte_gettid(), stack->queue_id); + return; + } +} + +void thread_bind_stack(struct protocol_stack *stack) +{ + static PER_THREAD uint16_t stack_sock_num[GAZELLE_MAX_STACK_NUM] = {0}; + static PER_THREAD uint16_t max_sock_stack = 0; + + if (get_global_cfg_params()->app_bind_numa == 0) { + return; + } + + stack_sock_num[stack->stack_idx]++; + if (stack_sock_num[stack->stack_idx] > max_sock_stack) { + max_sock_stack = stack_sock_num[stack->stack_idx]; + bind_to_stack_numa(stack); + } +} + static uint32_t get_protocol_traffic(struct protocol_stack *stack) { if (use_ltran()) { @@ -232,6 +248,11 @@ void low_power_idling(struct protocol_stack *stack) } } +struct thread_params { + uint16_t queue_id; + uint16_t idx; +}; + static int32_t create_thread(void *arg, char *thread_name, stack_thread_func func) { /* thread may run slow, if arg is temp var maybe have relese */ @@ -373,7 +394,7 @@ static int32_t init_stack_value(struct protocol_stack *stack, void *arg) return 0; } -void wait_sem_value(sem_t *sem, int32_t wait_value) +static void wait_sem_value(sem_t *sem, int32_t wait_value) { int32_t sem_val; do { @@ -546,7 +567,7 @@ static void* gazelle_stack_thread(void *arg) return NULL; } -int32_t stack_group_init_mempool(void) +static int stack_group_init_mempool(void) { struct cfg_params *cfg_params = get_global_cfg_params(); uint32_t total_mbufs = 0; @@ -702,655 +723,9 @@ OUT2: return -1; } -void stack_arp(struct rpc_msg *msg) -{ - struct rte_mbuf *mbuf = (struct rte_mbuf *)msg->args[MSG_ARG_0].p; - struct protocol_stack *stack = get_protocol_stack(); - - eth_dev_recv(mbuf, stack); -} - -void stack_socket(struct rpc_msg *msg) -{ - msg->result = lwip_socket(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i); - if (msg->result < 0) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, %ld socket failed\n", get_stack_tid(), msg->result); - } -} - -void stack_close(struct rpc_msg *msg) -{ - int32_t fd = msg->args[MSG_ARG_0].i; - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct lwip_sock *sock = lwip_get_socket(fd); - - if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) { - msg->recall_flag = 1; - rpc_call(&stack->rpc_queue, msg); /* until stack_send recall finish */ - return; - } - - msg->result = lwip_close(fd); - if (msg->result != 0) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); - } -} - -void stack_shutdown(struct rpc_msg *msg) -{ - int fd = msg->args[MSG_ARG_0].i; - int how = msg->args[MSG_ARG_1].i; - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - struct lwip_sock *sock = lwip_get_socket(fd); - - if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) { - msg->recall_flag = 1; - rpc_call(&stack->rpc_queue, msg); - return; - } - - msg->result = lwip_shutdown(fd, how); - if (msg->result != 0 && errno != ENOTCONN) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), fd, msg->result); - } - - posix_api->shutdown_fn(fd, how); -} - -void stack_bind(struct rpc_msg *msg) -{ - msg->result = lwip_bind(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].socklen); - if (msg->result != 0) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); - } -} - -void stack_listen(struct rpc_msg *msg) -{ - int32_t fd = msg->args[MSG_ARG_0].i; - int32_t backlog = msg->args[MSG_ARG_1].i; - - struct lwip_sock *sock = lwip_get_socket(fd); - if (sock == NULL) { - msg->result = -1; - return; - } - - /* new listen add to stack listen list */ - msg->result = lwip_listen(fd, backlog); - if (msg->result != 0) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); - } -} - -void stack_accept(struct rpc_msg *msg) -{ - int32_t fd = msg->args[MSG_ARG_0].i; - msg->result = -1; - struct protocol_stack *stack = get_protocol_stack(); - - int32_t accept_fd = lwip_accept4(fd, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p, msg->args[MSG_ARG_3].i); - if (accept_fd < 0) { - stack->stats.accept_fail++; - LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd); - return; - } - - struct lwip_sock *sock = lwip_get_socket(accept_fd); - if (sock == NULL || sock->stack == NULL) { - lwip_close(accept_fd); - LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd); - return; - } - - msg->result = accept_fd; - sock->stack->conn_num++; - if (rte_ring_count(sock->conn->recvmbox->ring)) { - do_lwip_add_recvlist(accept_fd); - } -} - -void stack_connect(struct rpc_msg *msg) -{ - msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].socklen); - if (msg->result < 0) { - msg->result = -errno; - } -} - -void stack_getpeername(struct rpc_msg *msg) -{ - msg->result = lwip_getpeername(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p); - if (msg->result != 0) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); - } -} - -void stack_getsockname(struct rpc_msg *msg) -{ - msg->result = lwip_getsockname(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p); - if (msg->result != 0) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); - } -} - -void stack_getsockopt(struct rpc_msg *msg) -{ - msg->result = lwip_getsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, - msg->args[MSG_ARG_3].p, msg->args[MSG_ARG_4].p); - if (msg->result != 0) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(), - msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result); - } -} - -void stack_setsockopt(struct rpc_msg *msg) -{ - msg->result = lwip_setsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, - msg->args[MSG_ARG_3].cp, msg->args[MSG_ARG_4].socklen); - if (msg->result != 0) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(), - msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result); - } -} - -void stack_fcntl(struct rpc_msg *msg) -{ - msg->result = lwip_fcntl(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].l); - if (msg->result != 0) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); - } -} - -void stack_ioctl(struct rpc_msg *msg) -{ - msg->result = lwip_ioctl(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].l, msg->args[MSG_ARG_2].p); - if (msg->result != 0) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); - } -} - -void stack_recv(struct rpc_msg *msg) -{ - msg->result = lwip_recv(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].size, - msg->args[MSG_ARG_3].i); -} - -void stack_tcp_send(struct rpc_msg *msg) -{ - int32_t fd = msg->args[MSG_ARG_0].i; - size_t len = msg->args[MSG_ARG_1].size; - struct protocol_stack *stack = get_protocol_stack(); - int replenish_again; - - struct lwip_sock *sock = lwip_get_socket(fd); - if (POSIX_IS_CLOSED(sock)) { - msg->result = -1; - return; - } - - if (get_protocol_stack_group()->latency_start) { - calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG); - } - - replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0); - if (replenish_again < 0) { - __sync_fetch_and_sub(&sock->call_num, 1); - return; - } - - if (NETCONN_IS_DATAOUT(sock) || replenish_again > 0) { - if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1) { - msg->recall_flag = 1; - rpc_call(&stack->rpc_queue, msg); - return; - } - } - - __sync_fetch_and_sub(&sock->call_num, 1); - return; -} - -void stack_udp_send(struct rpc_msg *msg) -{ - int32_t fd = msg->args[MSG_ARG_0].i; - size_t len = msg->args[MSG_ARG_1].size; - struct protocol_stack *stack = get_protocol_stack(); - int replenish_again; - - struct lwip_sock *sock = lwip_get_socket(fd); - if (POSIX_IS_CLOSED(sock)) { - msg->result = -1; - return; - } - - if (get_protocol_stack_group()->latency_start) { - calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG); - } - - replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0); - if ((replenish_again > 0) && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) { - rpc_call_replenish(&stack->rpc_queue, sock); - return; - } - - __sync_fetch_and_sub(&sock->call_num, 1); - return; -} - -/* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */ -void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack) -{ - struct protocol_stack_group *stack_group = get_protocol_stack_group(); - struct rte_mbuf *mbuf_copy = NULL; - struct protocol_stack *stack = NULL; - int32_t ret; - - for (int32_t i = 0; i < stack_group->stack_num; i++) { - stack = stack_group->stacks[i]; - if (cur_stack == stack) { - continue; - } - - /* stack maybe not init in app thread yet */ - if (stack == NULL || !(netif_is_up(&stack->netif))) { - continue; - } - - ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true); - if (ret != 0) { - stack->stats.rx_allocmbuf_fail++; - return; - } - copy_mbuf(mbuf_copy, mbuf); - - ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy); - if (ret != 0) { - rte_pktmbuf_free(mbuf_copy); - return; - } - } -#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0) - if (get_global_cfg_params()->kni_switch) { - ret = dpdk_alloc_pktmbuf(cur_stack->rxtx_mbuf_pool, &mbuf_copy, 1, true); - if (ret != 0) { - cur_stack->stats.rx_allocmbuf_fail++; - return; - } - copy_mbuf(mbuf_copy, mbuf); - kni_handle_tx(mbuf_copy); - } -#endif - if (get_global_cfg_params()->flow_bifurcation) { - ret = dpdk_alloc_pktmbuf(cur_stack->rxtx_mbuf_pool, &mbuf_copy, 1, true); - if (ret != 0) { - cur_stack->stats.rx_allocmbuf_fail++; - return; - } - copy_mbuf(mbuf_copy, mbuf); - virtio_tap_process_tx(cur_stack->queue_id, mbuf_copy); - } - return; -} - -void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup) -{ - struct protocol_stack_group *stack_group = get_protocol_stack_group(); - struct protocol_stack *stack = NULL; - - for (int32_t i = 0; i < stack_group->stack_num; i++) { - stack = stack_group->stacks[i]; - rpc_call_clean_epoll(&stack->rpc_queue, wakeup); - } -} - -void stack_clean_epoll(struct rpc_msg *msg) -{ - struct protocol_stack *stack = get_protocol_stack(); - struct wakeup_poll *wakeup = (struct wakeup_poll *)msg->args[MSG_ARG_0].p; - - list_del_node(&wakeup->wakeup_list[stack->stack_idx]); -} - -void stack_mempool_size(struct rpc_msg *msg) -{ - struct protocol_stack *stack = get_protocol_stack(); - - msg->result = rte_mempool_avail_count(stack->rxtx_mbuf_pool); -} - -void stack_create_shadow_fd(struct rpc_msg *msg) -{ - int32_t fd = msg->args[MSG_ARG_0].i; - struct sockaddr *addr = msg->args[MSG_ARG_1].p; - socklen_t addr_len = msg->args[MSG_ARG_2].socklen; - - int32_t clone_fd = 0; - struct lwip_sock *sock = lwip_get_socket(fd); - if (sock == NULL) { - LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d\n", fd); - msg->result = -1; - return; - } - - int domain = addr->sa_family; - int type = NETCONN_IS_UDP(sock) ? SOCK_DGRAM : SOCK_STREAM; - clone_fd = lwip_socket(domain, type, 0); - if (clone_fd < 0) { - LSTACK_LOG(ERR, LSTACK, "clone socket failed clone_fd=%d errno=%d\n", clone_fd, errno); - msg->result = clone_fd; - return; - } - - struct lwip_sock *clone_sock = lwip_get_socket(clone_fd); - if (clone_sock == NULL) { - LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d clone_fd=%d\n", fd, clone_fd); - msg->result = -1; - return; - } - - do_lwip_clone_sockopt(clone_sock, sock); - - while (sock->listen_next) { - sock = sock->listen_next; - } - sock->listen_next = clone_sock; - - int32_t ret = lwip_bind(clone_fd, addr, addr_len); - if (ret < 0) { - LSTACK_LOG(ERR, LSTACK, "clone bind failed clone_fd=%d errno=%d\n", clone_fd, errno); - msg->result = ret; - return; - } - - msg->result = clone_fd; -} - -void stack_replenish_sendring(struct rpc_msg *msg) -{ - struct protocol_stack *stack = get_protocol_stack(); - struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p; - - msg->result = do_lwip_replenish_sendring(stack, sock); - if (msg->result == true) { - msg->recall_flag = 1; - rpc_call(&stack->rpc_queue, msg); - } -} - -void stack_get_conntable(struct rpc_msg *msg) -{ - struct gazelle_stat_lstack_conn_info *conn = (struct gazelle_stat_lstack_conn_info *)msg->args[MSG_ARG_0].p; - uint32_t max_num = msg->args[MSG_ARG_1].u; - - msg->result = do_lwip_get_conntable(conn, max_num); -} - -void stack_get_connnum(struct rpc_msg *msg) -{ - msg->result = do_lwip_get_connnum(); -} - -void stack_recvlist_count(struct rpc_msg *msg) -{ - struct protocol_stack *stack = get_protocol_stack(); - struct list_node *list = &stack->recv_list; - uint32_t count = 0; - struct list_node *node; - struct list_node *temp; - - list_for_each_node(node, temp, list) { - count++; - } - - msg->result = count; -} - -/* when fd is listenfd, listenfd of all protocol stack thread will be closed */ -int32_t stack_broadcast_close(int32_t fd) -{ - int32_t ret = 0; - struct lwip_sock *sock = lwip_get_socket(fd); - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - if (sock == NULL) { - GAZELLE_RETURN(EBADF); - } - - do { - sock = sock->listen_next; - if (stack == NULL || rpc_call_close(&stack->rpc_queue, fd)) { - ret = -1; - } - - if (POSIX_IS_CLOSED(sock)) { - break; - } - fd = sock->conn->callback_arg.socket; - stack = get_protocol_stack_by_fd(fd); - } while (1); - - return ret; -} - -int stack_broadcast_shutdown(int fd, int how) -{ - int32_t ret = 0; - struct lwip_sock *sock = lwip_get_socket(fd); - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - if (sock == NULL) { - GAZELLE_RETURN(EBADF); - } - - do { - sock = sock->listen_next; - if (stack == NULL || rpc_call_shutdown(&stack->rpc_queue, fd, how)) { - ret = -1; - } - - if (POSIX_IS_CLOSED(sock)) { - break; - } - fd = sock->conn->callback_arg.socket; - stack = get_protocol_stack_by_fd(fd); - } while (1); - - return ret; -} - -/* choice one stack listen */ -int32_t stack_single_listen(int32_t fd, int32_t backlog) -{ - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - if (stack == NULL) { - GAZELLE_RETURN(EBADF); - } - return rpc_call_listen(&stack->rpc_queue, fd, backlog); -} - -/* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */ -int32_t stack_broadcast_listen(int32_t fd, int32_t backlog) -{ - typedef union sockaddr_union { - struct sockaddr sa; - struct sockaddr_in in; - struct sockaddr_in6 in6; - } sockaddr_t; - - struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd); - struct protocol_stack *stack = NULL; - sockaddr_t addr; - socklen_t addr_len = sizeof(addr); - int32_t ret, clone_fd; - - struct lwip_sock *sock = lwip_get_socket(fd); - if (sock == NULL || cur_stack == NULL) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd); - GAZELLE_RETURN(EBADF); - } - - ret = rpc_call_getsockname(&cur_stack->rpc_queue, fd, (struct sockaddr *)&addr, &addr_len); - if (ret != 0) { - return ret; - } - - struct protocol_stack_group *stack_group = get_protocol_stack_group(); - int min_conn_stk_idx = get_min_conn_stack(stack_group); - - for (int32_t i = 0; i < stack_group->stack_num; ++i) { - stack = stack_group->stacks[i]; - if (get_global_cfg_params()->seperate_send_recv && stack->is_send_thread) { - continue; - } - if (stack != cur_stack) { - clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, (struct sockaddr *)&addr, addr_len); - if (clone_fd < 0) { - stack_broadcast_close(fd); - return clone_fd; - } - } else { - clone_fd = fd; - } - - if (min_conn_stk_idx == i) { - lwip_get_socket(clone_fd)->conn->is_master_fd = 1; - } else { - lwip_get_socket(clone_fd)->conn->is_master_fd = 0; - } - - ret = rpc_call_listen(&stack->rpc_queue, clone_fd, backlog); - if (ret < 0) { - stack_broadcast_close(fd); - return ret; - } - } - return 0; -} - -static struct lwip_sock *get_min_accept_sock(int32_t fd) -{ - struct lwip_sock *sock = lwip_get_socket(fd); - struct lwip_sock *min_sock = NULL; - - while (sock) { - if (!NETCONN_IS_ACCEPTIN(sock)) { - sock = sock->listen_next; - continue; - } - - if (min_sock == NULL || min_sock->stack->conn_num > sock->stack->conn_num) { - min_sock = sock; - } - - sock = sock->listen_next; - } - - return min_sock; -} - -static void inline del_accept_in_event(struct lwip_sock *sock) -{ - pthread_spin_lock(&sock->wakeup->event_list_lock); - - if (!NETCONN_IS_ACCEPTIN(sock)) { - sock->events &= ~EPOLLIN; - if (sock->events == 0) { - list_del_node(&sock->event_list); - } - } - - pthread_spin_unlock(&sock->wakeup->event_list_lock); -} - -/* choice one stack bind */ -int32_t stack_single_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen) -{ - struct protocol_stack *stack = get_protocol_stack_by_fd(fd); - if (stack == NULL) { - GAZELLE_RETURN(EBADF); - } - return rpc_call_bind(&stack->rpc_queue, fd, name, namelen); -} - -/* bind sync to all protocol stack thread, so that any protocol stack thread can build connect */ -int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen) -{ - struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd); - struct protocol_stack *stack = NULL; - int32_t ret, clone_fd; - - struct lwip_sock *sock = lwip_get_socket(fd); - if (sock == NULL || cur_stack == NULL) { - LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd); - GAZELLE_RETURN(EBADF); - } - - ret = rpc_call_bind(&cur_stack->rpc_queue, fd, name, namelen); - if (ret < 0) { - close(fd); - return ret; - } - - struct protocol_stack_group *stack_group = get_protocol_stack_group(); - for (int32_t i = 0; i < stack_group->stack_num; ++i) { - stack = stack_group->stacks[i]; - if (stack != cur_stack) { - clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, name, namelen); - if (clone_fd < 0) { - stack_broadcast_close(fd); - return clone_fd; - } - } - } - return 0; -} - -/* ergodic the protocol stack thread to find the connection, because all threads are listening */ -int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *addrlen, int flags) -{ - int32_t ret = -1; - struct lwip_sock *min_sock = NULL; - struct lwip_sock *sock = lwip_get_socket(fd); - struct protocol_stack *stack = NULL; - if (sock == NULL) { - GAZELLE_RETURN(EBADF); - } - - if (netconn_is_nonblocking(sock->conn)) { - min_sock = get_min_accept_sock(fd); - } else { - while ((min_sock = get_min_accept_sock(fd)) == NULL) { - lstack_block_wait(sock->wakeup, 0); - } - } - - if (min_sock && min_sock->conn) { - stack = get_protocol_stack_by_fd(min_sock->conn->callback_arg.socket); - if (stack == NULL) { - GAZELLE_RETURN(EBADF); - } - ret = rpc_call_accept(&stack->rpc_queue, min_sock->conn->callback_arg.socket, addr, addrlen, flags); - } - - if (min_sock && min_sock->wakeup && min_sock->wakeup->type == WAKEUP_EPOLL) { - del_accept_in_event(min_sock); - } - - if (ret < 0) { - errno = EAGAIN; - } - return ret; -} - -int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen) -{ - if (get_global_cfg_params()->nonblock_mode) - return stack_broadcast_accept4(fd, addr, addrlen, O_NONBLOCK); - else - return stack_broadcast_accept4(fd, addr, addrlen, 0); -} - -static void stack_all_fds_close(void) +void stack_exit(void) { + /* close all fd */ for (int i = 3; i < GAZELLE_MAX_CLIENTS + GAZELLE_RESERVED_CLIENTS; i++) { struct lwip_sock *sock = lwip_get_socket(i); if (!POSIX_IS_CLOSED(sock) && sock->stack == get_protocol_stack()) { @@ -1359,16 +734,6 @@ static void stack_all_fds_close(void) } } -static void stack_exit(void) -{ - stack_all_fds_close(); -} - -void stack_exit_by_rpc(struct rpc_msg *msg) -{ - stack_exit(); -} - void stack_group_exit(void) { int i; diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c index 8ac06cb..3e9889a 100644 --- a/src/lstack/core/lstack_thread_rpc.c +++ b/src/lstack/core/lstack_thread_rpc.c @@ -13,22 +13,27 @@ #include #include +#include "lwip/lwipgz_posix_api.h" + #include "lstack_log.h" #include "lstack_cfg.h" #include "lstack_dpdk.h" -#include "lstack_rpc_proc.h" #include "lstack_stack_stat.h" #include "lstack_protocol_stack.h" #include "lstack_thread_rpc.h" +#include "lstack_epoll.h" +#include "lstack_lwip.h" static PER_THREAD struct rpc_msg_pool *g_rpc_pool = NULL; static struct rpc_stats g_rpc_stats; + struct rpc_stats *rpc_stats_get(void) { return &g_rpc_stats; } -static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool) +__rte_always_inline +static struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool) { int ret; struct rpc_msg *msg = NULL; @@ -42,23 +47,11 @@ static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct static void rpc_msg_init(struct rpc_msg *msg, rpc_msg_func func, struct rpc_msg_pool *pool) { - msg->rpcpool = pool; - pthread_spin_init(&msg->lock, PTHREAD_PROCESS_PRIVATE); - msg->func = func; - msg->sync_flag = 1; + msg->func = func; + msg->rpcpool = pool; + msg->sync_flag = 1; msg->recall_flag = 0; -} - -static struct rpc_msg *rpc_msg_alloc_except(rpc_msg_func func) -{ - struct rpc_msg *msg = calloc(1, sizeof(struct rpc_msg)); - if (msg == NULL) { - return NULL; - } - - rpc_msg_init(msg, func, NULL); - - return msg; + pthread_spin_init(&msg->lock, PTHREAD_PROCESS_PRIVATE); } static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func) @@ -92,9 +85,27 @@ static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func) return msg; } -static inline __attribute__((always_inline)) int32_t rpc_sync_call(rpc_queue *queue, struct rpc_msg *msg) +__rte_always_inline +static void rpc_msg_free(struct rpc_msg *msg) +{ + pthread_spin_destroy(&msg->lock); + if (msg->rpcpool != NULL && msg->rpcpool->mempool != NULL) { + rte_mempool_put(msg->rpcpool->mempool, (void *)msg); + } else { + free(msg); + } +} + +__rte_always_inline +static void rpc_call(rpc_queue *queue, struct rpc_msg *msg) { - int32_t ret; + lockless_queue_mpsc_push(queue, &msg->queue_node); +} + +__rte_always_inline +static int rpc_sync_call(rpc_queue *queue, struct rpc_msg *msg) +{ + int ret; pthread_spin_trylock(&msg->lock); rpc_call(queue, msg); @@ -107,12 +118,39 @@ static inline __attribute__((always_inline)) int32_t rpc_sync_call(rpc_queue *qu return ret; } -int32_t rpc_msgcnt(rpc_queue *queue) +int rpc_msgcnt(rpc_queue *queue) { return lockless_queue_count(queue); } -int rpc_poll_msg(rpc_queue *queue, uint32_t max_num) +static struct rpc_msg *rpc_msg_alloc_except(rpc_msg_func func) +{ + struct rpc_msg *msg = calloc(1, sizeof(struct rpc_msg)); + if (msg == NULL) { + return NULL; + } + + rpc_msg_init(msg, func, NULL); + return msg; +} + +static void stack_exit_by_rpc(struct rpc_msg *msg) +{ + stack_exit(); +} + +int rpc_call_stack_exit(rpc_queue *queue) +{ + struct rpc_msg *msg = rpc_msg_alloc_except(stack_exit_by_rpc); + if (msg == NULL) { + return -1; + } + + rpc_call(queue, msg); + return 0; +} + +int rpc_poll_msg(rpc_queue *queue, int max_num) { int force_quit = 0; struct rpc_msg *msg = NULL; @@ -149,101 +187,165 @@ int rpc_poll_msg(rpc_queue *queue, uint32_t max_num) return force_quit; } -int32_t rpc_call_conntable(rpc_queue *queue, void *conn_table, uint32_t max_conn) + +static void callback_socket(struct rpc_msg *msg) { - struct rpc_msg *msg = rpc_msg_alloc(stack_get_conntable); - if (msg == NULL) { - return -1; + msg->result = lwip_socket(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i); + if (msg->result < 0) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, %ld socket failed\n", get_stack_tid(), msg->result); } - - msg->args[MSG_ARG_0].p = conn_table; - msg->args[MSG_ARG_1].u = max_conn; - - return rpc_sync_call(queue, msg); } -int32_t rpc_call_connnum(rpc_queue *queue) +static void callback_close(struct rpc_msg *msg) { - struct rpc_msg *msg = rpc_msg_alloc(stack_get_connnum); - if (msg == NULL) { - return -1; + int fd = msg->args[MSG_ARG_0].i; + struct protocol_stack *stack = get_protocol_stack_by_fd(fd); + struct lwip_sock *sock = lwip_get_socket(fd); + + if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) { + msg->recall_flag = 1; + rpc_call(&stack->rpc_queue, msg); /* until stack_send recall finish */ + return; } - return rpc_sync_call(queue, msg); + msg->result = lwip_close(fd); + if (msg->result != 0) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); + } } -int32_t rpc_call_shadow_fd(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen) +static void callback_shutdown(struct rpc_msg *msg) { - struct rpc_msg *msg = rpc_msg_alloc(stack_create_shadow_fd); - if (msg == NULL) { - return -1; + int fd = msg->args[MSG_ARG_0].i; + int how = msg->args[MSG_ARG_1].i; + struct protocol_stack *stack = get_protocol_stack_by_fd(fd); + struct lwip_sock *sock = lwip_get_socket(fd); + + if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) { + msg->recall_flag = 1; + rpc_call(&stack->rpc_queue, msg); + return; } - msg->args[MSG_ARG_0].i = fd; - msg->args[MSG_ARG_1].cp = addr; - msg->args[MSG_ARG_2].socklen = addrlen; + msg->result = lwip_shutdown(fd, how); + if (msg->result != 0 && errno != ENOTCONN) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), fd, msg->result); + } - return rpc_sync_call(queue, msg); + posix_api->shutdown_fn(fd, how); } -int32_t rpc_call_thread_regphase1(rpc_queue *queue, void *conn) +static void callback_bind(struct rpc_msg *msg) { - struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase1); - if (msg == NULL) { - return -1; + msg->result = lwip_bind(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].socklen); + if (msg->result != 0) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); } - msg->args[MSG_ARG_0].p = conn; - return rpc_sync_call(queue, msg); } -int32_t rpc_call_thread_regphase2(rpc_queue *queue, void *conn) +static void callback_listen(struct rpc_msg *msg) { - struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase2); - if (msg == NULL) { - return -1; + int fd = msg->args[MSG_ARG_0].i; + int backlog = msg->args[MSG_ARG_1].i; + + struct lwip_sock *sock = lwip_get_socket(fd); + if (sock == NULL) { + msg->result = -1; + return; + } + + /* new listen add to stack listen list */ + msg->result = lwip_listen(fd, backlog); + if (msg->result != 0) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); } - msg->args[MSG_ARG_0].p = conn; - return rpc_sync_call(queue, msg); } -int32_t rpc_call_mbufpoolsize(rpc_queue *queue) +static void callback_create_shadow_fd(struct rpc_msg *msg) { - struct rpc_msg *msg = rpc_msg_alloc(stack_mempool_size); - if (msg == NULL) { - return -1; + int fd = msg->args[MSG_ARG_0].i; + struct sockaddr *addr = msg->args[MSG_ARG_1].p; + socklen_t addr_len = msg->args[MSG_ARG_2].socklen; + + int clone_fd = 0; + struct lwip_sock *sock = lwip_get_socket(fd); + if (sock == NULL) { + LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d\n", fd); + msg->result = -1; + return; } - return rpc_sync_call(queue, msg); -} + int domain = addr->sa_family; + int type = NETCONN_IS_UDP(sock) ? SOCK_DGRAM : SOCK_STREAM; + clone_fd = lwip_socket(domain, type, 0); + if (clone_fd < 0) { + LSTACK_LOG(ERR, LSTACK, "clone socket failed clone_fd=%d errno=%d\n", clone_fd, errno); + msg->result = clone_fd; + return; + } -int32_t rpc_call_recvlistcnt(rpc_queue *queue) -{ - struct rpc_msg *msg = rpc_msg_alloc(stack_recvlist_count); - if (msg == NULL) { - return -1; + struct lwip_sock *clone_sock = lwip_get_socket(clone_fd); + if (clone_sock == NULL) { + LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d clone_fd=%d\n", fd, clone_fd); + msg->result = -1; + return; } - return rpc_sync_call(queue, msg); + do_lwip_clone_sockopt(clone_sock, sock); + + while (sock->listen_next) { + sock = sock->listen_next; + } + sock->listen_next = clone_sock; + + int ret = lwip_bind(clone_fd, addr, addr_len); + if (ret < 0) { + LSTACK_LOG(ERR, LSTACK, "clone bind failed clone_fd=%d errno=%d\n", clone_fd, errno); + msg->result = ret; + return; + } + + msg->result = clone_fd; } -int32_t rpc_call_arp(rpc_queue *queue, void *mbuf) +static void callback_accept(struct rpc_msg *msg) { - struct rpc_msg *msg = rpc_msg_alloc(stack_arp); - if (msg == NULL) { - return -1; + int fd = msg->args[MSG_ARG_0].i; + msg->result = -1; + struct protocol_stack *stack = get_protocol_stack(); + + int accept_fd = lwip_accept4(fd, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p, msg->args[MSG_ARG_3].i); + if (accept_fd < 0) { + stack->stats.accept_fail++; + LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd); + return; } - msg->sync_flag = 0; - msg->args[MSG_ARG_0].p = mbuf; + struct lwip_sock *sock = lwip_get_socket(accept_fd); + if (sock == NULL || sock->stack == NULL) { + lwip_close(accept_fd); + LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd); + return; + } - rpc_call(queue, msg); + msg->result = accept_fd; + sock->stack->conn_num++; + if (rte_ring_count(sock->conn->recvmbox->ring)) { + do_lwip_add_recvlist(accept_fd); + } +} - return 0; +static void callback_connect(struct rpc_msg *msg) +{ + msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].socklen); + if (msg->result < 0) { + msg->result = -errno; + } } -int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t protocol) +int rpc_call_socket(rpc_queue *queue, int domain, int type, int protocol) { - struct rpc_msg *msg = rpc_msg_alloc(stack_socket); + struct rpc_msg *msg = rpc_msg_alloc(callback_socket); if (msg == NULL) { return -1; } @@ -255,9 +357,9 @@ int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t return rpc_sync_call(queue, msg); } -int32_t rpc_call_close(rpc_queue *queue, int fd) +int rpc_call_close(rpc_queue *queue, int fd) { - struct rpc_msg *msg = rpc_msg_alloc(stack_close); + struct rpc_msg *msg = rpc_msg_alloc(callback_close); if (msg == NULL) { return -1; } @@ -267,20 +369,9 @@ int32_t rpc_call_close(rpc_queue *queue, int fd) return rpc_sync_call(queue, msg); } -int32_t rpc_call_stack_exit(rpc_queue *queue) +int rpc_call_shutdown(rpc_queue *queue, int fd, int how) { - struct rpc_msg *msg = rpc_msg_alloc_except(stack_exit_by_rpc); - if (msg == NULL) { - return -1; - } - - rpc_call(queue, msg); - return 0; -} - -int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how) -{ - struct rpc_msg *msg = rpc_msg_alloc(stack_shutdown); + struct rpc_msg *msg = rpc_msg_alloc(callback_shutdown); if (msg == NULL) { return -1; } @@ -291,48 +382,50 @@ int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how) return rpc_sync_call(queue, msg); } -void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup) +int rpc_call_bind(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen) { - struct rpc_msg *msg = rpc_msg_alloc(stack_clean_epoll); + struct rpc_msg *msg = rpc_msg_alloc(callback_bind); if (msg == NULL) { - return; + return -1; } - msg->args[MSG_ARG_0].p = wakeup; + msg->args[MSG_ARG_0].i = fd; + msg->args[MSG_ARG_1].cp = addr; + msg->args[MSG_ARG_2].socklen = addrlen; - rpc_sync_call(queue, msg); + return rpc_sync_call(queue, msg); } -int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen) +int rpc_call_listen(rpc_queue *queue, int s, int backlog) { - struct rpc_msg *msg = rpc_msg_alloc(stack_bind); + struct rpc_msg *msg = rpc_msg_alloc(callback_listen); if (msg == NULL) { return -1; } - msg->args[MSG_ARG_0].i = fd; - msg->args[MSG_ARG_1].cp = addr; - msg->args[MSG_ARG_2].socklen = addrlen; + msg->args[MSG_ARG_0].i = s; + msg->args[MSG_ARG_1].i = backlog; return rpc_sync_call(queue, msg); } -int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog) +int rpc_call_shadow_fd(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen) { - struct rpc_msg *msg = rpc_msg_alloc(stack_listen); + struct rpc_msg *msg = rpc_msg_alloc(callback_create_shadow_fd); if (msg == NULL) { return -1; } - msg->args[MSG_ARG_0].i = s; - msg->args[MSG_ARG_1].i = backlog; + msg->args[MSG_ARG_0].i = fd; + msg->args[MSG_ARG_1].cp = addr; + msg->args[MSG_ARG_2].socklen = addrlen; return rpc_sync_call(queue, msg); } -int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags) +int rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags) { - struct rpc_msg *msg = rpc_msg_alloc(stack_accept); + struct rpc_msg *msg = rpc_msg_alloc(callback_accept); if (msg == NULL) { return -1; } @@ -345,9 +438,9 @@ int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen return rpc_sync_call(queue, msg); } -int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen) +int rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen) { - struct rpc_msg *msg = rpc_msg_alloc(stack_connect); + struct rpc_msg *msg = rpc_msg_alloc(callback_connect); if (msg == NULL) { return -1; } @@ -356,7 +449,7 @@ int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, msg->args[MSG_ARG_1].cp = addr; msg->args[MSG_ARG_2].socklen = addrlen; - int32_t ret = rpc_sync_call(queue, msg); + int ret = rpc_sync_call(queue, msg); if (ret < 0) { errno = -ret; return -1; @@ -364,9 +457,45 @@ int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, return ret; } -int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen) +static void callback_getpeername(struct rpc_msg *msg) +{ + msg->result = lwip_getpeername(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p); + if (msg->result != 0) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); + } +} + +static void callback_getsockname(struct rpc_msg *msg) { - struct rpc_msg *msg = rpc_msg_alloc(stack_getpeername); + msg->result = lwip_getsockname(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p); + if (msg->result != 0) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result); + } +} + +static void callback_getsockopt(struct rpc_msg *msg) +{ + msg->result = lwip_getsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, + msg->args[MSG_ARG_3].p, msg->args[MSG_ARG_4].p); + if (msg->result != 0) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(), + msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result); + } +} + +static void callback_setsockopt(struct rpc_msg *msg) +{ + msg->result = lwip_setsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, + msg->args[MSG_ARG_3].cp, msg->args[MSG_ARG_4].socklen); + if (msg->result != 0) { + LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(), + msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result); + } +} + +int rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen) +{ + struct rpc_msg *msg = rpc_msg_alloc(callback_getpeername); if (msg == NULL) { return -1; } @@ -378,9 +507,9 @@ int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, so return rpc_sync_call(queue, msg); } -int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen) +int rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen) { - struct rpc_msg *msg = rpc_msg_alloc(stack_getsockname); + struct rpc_msg *msg = rpc_msg_alloc(callback_getsockname); if (msg == NULL) { return -1; } @@ -392,9 +521,9 @@ int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, so return rpc_sync_call(queue, msg); } -int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen) +int rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen) { - struct rpc_msg *msg = rpc_msg_alloc(stack_getsockopt); + struct rpc_msg *msg = rpc_msg_alloc(callback_getsockopt); if (msg == NULL) { return -1; } @@ -408,9 +537,9 @@ int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, vo return rpc_sync_call(queue, msg); } -int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen) +int rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen) { - struct rpc_msg *msg = rpc_msg_alloc(stack_setsockopt); + struct rpc_msg *msg = rpc_msg_alloc(callback_setsockopt); if (msg == NULL) { return -1; } @@ -424,51 +553,91 @@ int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, co return rpc_sync_call(queue, msg); } -int32_t rpc_call_fcntl(rpc_queue *queue, int fd, int cmd, long val) +static void callback_tcp_send(struct rpc_msg *msg) { - struct rpc_msg *msg = rpc_msg_alloc(stack_fcntl); - if (msg == NULL) { - return -1; + int fd = msg->args[MSG_ARG_0].i; + size_t len = msg->args[MSG_ARG_1].size; + struct protocol_stack *stack = get_protocol_stack(); + int replenish_again; + + struct lwip_sock *sock = lwip_get_socket(fd); + if (POSIX_IS_CLOSED(sock)) { + msg->result = -1; + return; } - msg->args[MSG_ARG_0].i = fd; - msg->args[MSG_ARG_1].i = cmd; - msg->args[MSG_ARG_2].l = val; + if (get_protocol_stack_group()->latency_start) { + calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG); + } - return rpc_sync_call(queue, msg); + replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0); + if (replenish_again < 0) { + __sync_fetch_and_sub(&sock->call_num, 1); + return; + } + + if (NETCONN_IS_DATAOUT(sock) || replenish_again > 0) { + if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1) { + msg->recall_flag = 1; + rpc_call(&stack->rpc_queue, msg); + return; + } + } + + __sync_fetch_and_sub(&sock->call_num, 1); + return; } -int32_t rpc_call_ioctl(rpc_queue *queue, int fd, long cmd, void *argp) +static void callback_udp_send(struct rpc_msg *msg) { - struct rpc_msg *msg = rpc_msg_alloc(stack_ioctl); - if (msg == NULL) { - return -1; + int fd = msg->args[MSG_ARG_0].i; + size_t len = msg->args[MSG_ARG_1].size; + struct protocol_stack *stack = get_protocol_stack(); + int replenish_again; + + struct lwip_sock *sock = lwip_get_socket(fd); + if (POSIX_IS_CLOSED(sock)) { + msg->result = -1; + return; } - msg->args[MSG_ARG_0].i = fd; - msg->args[MSG_ARG_1].l = cmd; - msg->args[MSG_ARG_2].p = argp; + if (get_protocol_stack_group()->latency_start) { + calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG); + } - return rpc_sync_call(queue, msg); + replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0); + if ((replenish_again > 0) && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) { + rpc_call_replenish(&stack->rpc_queue, sock); + return; + } + + __sync_fetch_and_sub(&sock->call_num, 1); + return; } -int32_t rpc_call_replenish(rpc_queue *queue, void *sock) +int rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags) { - struct rpc_msg *msg = rpc_msg_alloc(stack_replenish_sendring); + struct rpc_msg *msg = rpc_msg_alloc(callback_udp_send); if (msg == NULL) { return -1; } - msg->args[MSG_ARG_0].p = sock; + if (get_protocol_stack_group()->latency_start) { + time_stamp_into_rpcmsg(lwip_get_socket(fd)); + } + + msg->args[MSG_ARG_0].i = fd; + msg->args[MSG_ARG_1].size = len; + msg->args[MSG_ARG_2].i = flags; msg->sync_flag = 0; rpc_call(queue, msg); return 0; } -int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags) +int rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags) { - struct rpc_msg *msg = rpc_msg_alloc(stack_tcp_send); + struct rpc_msg *msg = rpc_msg_alloc(callback_tcp_send); if (msg == NULL) { return -1; } @@ -483,28 +652,173 @@ int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags) msg->sync_flag = 0; rpc_call(queue, msg); + return 0; +} + +static void callback_replenish_sendring(struct rpc_msg *msg) +{ + struct protocol_stack *stack = get_protocol_stack(); + struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p; + + msg->result = do_lwip_replenish_sendring(stack, sock); + if (msg->result == true) { + msg->recall_flag = 1; + rpc_call(&stack->rpc_queue, msg); + } +} + +int rpc_call_replenish(rpc_queue *queue, void *sock) +{ + struct rpc_msg *msg = rpc_msg_alloc(callback_replenish_sendring); + if (msg == NULL) { + return -1; + } + msg->args[MSG_ARG_0].p = sock; + msg->sync_flag = 0; + + rpc_call(queue, msg); return 0; } -int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags) +static void callback_recvlist_count(struct rpc_msg *msg) +{ + struct protocol_stack *stack = get_protocol_stack(); + struct list_node *list = &stack->recv_list; + int count = 0; + struct list_node *node; + struct list_node *temp; + + list_for_each_node(node, temp, list) { + count++; + } + msg->result = count; +} + +int rpc_call_recvlistcnt(rpc_queue *queue) { - struct rpc_msg *msg = rpc_msg_alloc(stack_udp_send); + struct rpc_msg *msg = rpc_msg_alloc(callback_recvlist_count); if (msg == NULL) { return -1; } - if (get_protocol_stack_group()->latency_start) { - time_stamp_into_rpcmsg(lwip_get_socket(fd)); + return rpc_sync_call(queue, msg); +} + +static void callback_clean_epoll(struct rpc_msg *msg) +{ + struct protocol_stack *stack = get_protocol_stack(); + struct wakeup_poll *wakeup = (struct wakeup_poll *)msg->args[MSG_ARG_0].p; + + list_del_node(&wakeup->wakeup_list[stack->stack_idx]); +} + +void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup) +{ + struct rpc_msg *msg = rpc_msg_alloc(callback_clean_epoll); + if (msg == NULL) { + return; + } + + msg->args[MSG_ARG_0].p = wakeup; + + rpc_sync_call(queue, msg); +} + +static void callback_arp(struct rpc_msg *msg) +{ + struct rte_mbuf *mbuf = (struct rte_mbuf *)msg->args[MSG_ARG_0].p; + struct protocol_stack *stack = get_protocol_stack(); + + eth_dev_recv(mbuf, stack); +} + +int rpc_call_arp(rpc_queue *queue, void *mbuf) +{ + struct rpc_msg *msg = rpc_msg_alloc(callback_arp); + if (msg == NULL) { + return -1; } - msg->args[MSG_ARG_0].i = fd; - msg->args[MSG_ARG_1].size = len; - msg->args[MSG_ARG_2].i = flags; msg->sync_flag = 0; + msg->args[MSG_ARG_0].p = mbuf; rpc_call(queue, msg); return 0; } +static void callback_mempool_size(struct rpc_msg *msg) +{ + struct protocol_stack *stack = get_protocol_stack(); + + msg->result = rte_mempool_avail_count(stack->rxtx_mbuf_pool); +} + +static void callback_get_conntable(struct rpc_msg *msg) +{ + struct gazelle_stat_lstack_conn_info *conn = (struct gazelle_stat_lstack_conn_info *)msg->args[MSG_ARG_0].p; + unsigned max_num = msg->args[MSG_ARG_1].u; + + msg->result = do_lwip_get_conntable(conn, max_num); +} + +static void callback_get_connnum(struct rpc_msg *msg) +{ + msg->result = do_lwip_get_connnum(); +} + +int rpc_call_conntable(rpc_queue *queue, void *conn_table, unsigned max_conn) +{ + struct rpc_msg *msg = rpc_msg_alloc(callback_get_conntable); + if (msg == NULL) { + return -1; + } + + msg->args[MSG_ARG_0].p = conn_table; + msg->args[MSG_ARG_1].u = max_conn; + + return rpc_sync_call(queue, msg); +} + +int rpc_call_connnum(rpc_queue *queue) +{ + struct rpc_msg *msg = rpc_msg_alloc(callback_get_connnum); + if (msg == NULL) { + return -1; + } + + return rpc_sync_call(queue, msg); +} + +int rpc_call_mbufpoolsize(rpc_queue *queue) +{ + struct rpc_msg *msg = rpc_msg_alloc(callback_mempool_size); + if (msg == NULL) { + return -1; + } + + return rpc_sync_call(queue, msg); +} + +extern void thread_register_phase1(struct rpc_msg *msg); +int rpc_call_thread_regphase1(rpc_queue *queue, void *conn) +{ + struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase1); + if (msg == NULL) { + return -1; + } + msg->args[MSG_ARG_0].p = conn; + return rpc_sync_call(queue, msg); +} + +extern void thread_register_phase2(struct rpc_msg *msg); +int rpc_call_thread_regphase2(rpc_queue *queue, void *conn) +{ + struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase2); + if (msg == NULL) { + return -1; + } + msg->args[MSG_ARG_0].p = conn; + return rpc_sync_call(queue, msg); +} diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h index d058409..965a0cb 100644 --- a/src/lstack/include/lstack_dpdk.h +++ b/src/lstack/include/lstack_dpdk.h @@ -13,7 +13,10 @@ #ifndef _GAZELLE_DPDK_H_ #define _GAZELLE_DPDK_H_ -#include +#include +#include +#include + #include "common/gazelle_opt.h" #include "common/gazelle_dfx_msg.h" @@ -32,32 +35,34 @@ */ #define MBUF_MAX_NUM 0xfffffff +struct protocol_stack; + +int32_t dpdk_eal_init(void); +void lstack_log_level_init(void); + +int dpdk_ethdev_init(int port_id); +int dpdk_ethdev_start(void); +int init_dpdk_ethdev(void); + int thread_affinity_default(void); int thread_affinity_init(int cpu_id); -struct protocol_stack; -struct rte_mempool; -struct rte_ring; -struct rte_mbuf; +int32_t create_shared_ring(struct protocol_stack *stack); int32_t fill_mbuf_to_ring(struct rte_mempool *mempool, struct rte_ring *ring, uint32_t mbuf_num); -int32_t dpdk_eal_init(void); int32_t pktmbuf_pool_init(struct protocol_stack *stack); struct rte_mempool *create_mempool(const char *name, uint32_t count, uint32_t size, uint32_t flags, int32_t idx); -int32_t create_shared_ring(struct protocol_stack *stack); -void lstack_log_level_init(void); -int dpdk_ethdev_init(int port_id); -int dpdk_ethdev_start(void); +struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf, + uint32_t mbuf_cache_size, uint16_t queue_id, unsigned numa_id); +int32_t dpdk_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num, bool reserve); + #if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0) void dpdk_skip_nic_init(void); void dpdk_restore_pci(void); #endif int32_t dpdk_init_lstack_kni(void); -bool port_in_stack_queue(gz_addr_t *src_ip, gz_addr_t *dst_ip, uint16_t src_port, uint16_t dst_port); -struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf, - uint32_t mbuf_cache_size, uint16_t queue_id, unsigned numa_id); void dpdk_nic_xstats_get(struct gazelle_stack_dfx_data *dfx, uint16_t port_id); -int32_t dpdk_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num, bool reserve); void dpdk_nic_features_get(struct gazelle_stack_dfx_data *dfx, uint16_t port_id); + #endif /* GAZELLE_DPDK_H */ diff --git a/src/lstack/include/lstack_epoll.h b/src/lstack/include/lstack_epoll.h index 6e02615..e7ae26b 100644 --- a/src/lstack/include/lstack_epoll.h +++ b/src/lstack/include/lstack_epoll.h @@ -19,14 +19,11 @@ #include #include +#include #include "common/gazelle_dfx_msg.h" #include "common/gazelle_opt.h" -#ifdef __cplusplus -extern "C" { -#endif - enum wakeup_type { WAKEUP_EPOLL = 0, WAKEUP_POLL, @@ -61,9 +58,6 @@ struct wakeup_poll { pthread_spinlock_t event_list_lock; }; -struct netconn; -struct lwip_sock; - void add_sock_event(struct lwip_sock *sock, uint32_t event); void add_sock_event_nolock(struct lwip_sock *sock, uint32_t event); void del_sock_event(struct lwip_sock *sock, uint32_t event); @@ -91,8 +85,4 @@ static inline void lstack_block_wakeup(struct wakeup_poll *wakeup) } } -#ifdef __cplusplus -} -#endif - #endif /* _GAZELLE_EPOLL_H_ */ diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h index b972f11..0c7bb62 100644 --- a/src/lstack/include/lstack_lwip.h +++ b/src/lstack/include/lstack_lwip.h @@ -15,8 +15,12 @@ #include #include "common/gazelle_dfx_msg.h" +#include "common/dpdk_common.h" struct lwip_sock; +struct rpc_msg; +struct protocol_stack; + unsigned same_node_ring_count(struct lwip_sock *sock); #define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox)) @@ -25,11 +29,6 @@ unsigned same_node_ring_count(struct lwip_sock *sock); #define NETCONN_IS_OUTIDLE(sock) gazelle_ring_readable_count((sock)->send_ring) #define NETCONN_IS_UDP(sock) (NETCONNTYPE_GROUP(netconn_type((sock)->conn)) == NETCONN_UDP) -struct rte_mempool; -struct rpc_msg; -struct rte_mbuf; -struct protocol_stack; - void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock); struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size); diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h index 7dce757..fdd5388 100644 --- a/src/lstack/include/lstack_protocol_stack.h +++ b/src/lstack/include/lstack_protocol_stack.h @@ -17,6 +17,10 @@ #include #include +#include +#include +#include + #include #include @@ -35,10 +39,6 @@ #define MBUFPOOL_RESERVE_NUM (get_global_cfg_params()->nic.rxqueue_size + 1024) -struct rte_mempool; -struct rte_ring; -struct rte_mbuf; - struct protocol_stack { uint32_t tid; uint16_t queue_id; @@ -111,50 +111,23 @@ struct protocol_stack_group { }; long get_stack_tid(void); + struct protocol_stack *get_protocol_stack(void); struct protocol_stack *get_protocol_stack_by_fd(int32_t fd); struct protocol_stack *get_bind_protocol_stack(void); struct protocol_stack_group *get_protocol_stack_group(void); +int get_min_conn_stack(struct protocol_stack_group *stack_group); +void bind_to_stack_numa(struct protocol_stack *stack); +void thread_bind_stack(struct protocol_stack *stack); + int32_t stack_group_init(void); void stack_group_exit(void); +void stack_exit(void); + int32_t stack_setup_thread(void); int32_t stack_setup_app_thread(void); -void bind_to_stack_numa(struct protocol_stack *stack); -int32_t init_dpdk_ethdev(void); - -void wait_sem_value(sem_t *sem, int32_t wait_value); - -/* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */ -void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack); - -/* when fd is listenfd, listenfd of all protocol stack thread will be closed */ -int32_t stack_broadcast_close(int32_t fd); - -int stack_broadcast_shutdown(int fd, int how); - -/* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */ -int32_t stack_broadcast_listen(int32_t fd, int backlog); -int32_t stack_single_listen(int32_t fd, int32_t backlog); - -/* bind sync to all protocol stack thread, only for udp protocol */ -int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen); -int32_t stack_single_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen); - -/* ergodic the protocol stack thread to find the connection, because all threads are listening */ -int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen); -int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *addrlen, int32_t flags); - -struct wakeup_poll; -void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup); - -void stack_send_pkts(struct protocol_stack *stack); - -struct thread_params { - uint16_t queue_id; - uint16_t idx; -}; - int stack_polling(uint32_t wakeup_tick); + #endif diff --git a/src/lstack/include/lstack_rpc_proc.h b/src/lstack/include/lstack_rpc_proc.h deleted file mode 100644 index 77b18bd..0000000 --- a/src/lstack/include/lstack_rpc_proc.h +++ /dev/null @@ -1,47 +0,0 @@ -/* -* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved. -* gazelle is licensed under the Mulan PSL v2. -* You can use this software according to the terms and conditions of the Mulan PSL v2. -* You may obtain a copy of Mulan PSL v2 at: -* http://license.coscl.org.cn/MulanPSL2 -* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -* PURPOSE. -* See the Mulan PSL v2 for more details. -*/ - -#ifndef __GAZELLE_RPC_PROC_H__ -#define __GAZELLE_RPC_PROC_H__ -#include "lstack_thread_rpc.h" - -void stack_clean_epoll(struct rpc_msg *msg); -void stack_arp(struct rpc_msg *msg); -void stack_socket(struct rpc_msg *msg); -void stack_close(struct rpc_msg *msg); -void stack_shutdown(struct rpc_msg *msg); -void stack_bind(struct rpc_msg *msg); -void stack_listen(struct rpc_msg *msg); -void stack_accept(struct rpc_msg *msg); -void stack_connect(struct rpc_msg *msg); -void stack_recv(struct rpc_msg *msg); -void stack_getpeername(struct rpc_msg *msg); -void stack_getsockname(struct rpc_msg *msg); -void stack_getsockopt(struct rpc_msg *msg); -void stack_setsockopt(struct rpc_msg *msg); -void stack_fcntl(struct rpc_msg *msg); -void stack_ioctl(struct rpc_msg *msg); -void stack_tcp_send(struct rpc_msg *msg); -void stack_udp_send(struct rpc_msg *msg); -void stack_mempool_size(struct rpc_msg *msg); -void stack_rpcpool_size(struct rpc_msg *msg); -void stack_create_shadow_fd(struct rpc_msg *msg); -void stack_replenish_sendring(struct rpc_msg *msg); -void stack_get_conntable(struct rpc_msg *msg); -void stack_get_connnum(struct rpc_msg *msg); -void stack_recvlist_count(struct rpc_msg *msg); -void stack_exit_by_rpc(struct rpc_msg *msg); - -void thread_register_phase1(struct rpc_msg *msg); -void thread_register_phase2(struct rpc_msg *msg); - -#endif diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h index d268366..c2654bb 100644 --- a/src/lstack/include/lstack_thread_rpc.h +++ b/src/lstack/include/lstack_thread_rpc.h @@ -35,8 +35,8 @@ struct rpc_stats { struct rpc_msg; typedef void (*rpc_msg_func)(struct rpc_msg *msg); union rpc_msg_arg { - int32_t i; - uint32_t u; + int i; + unsigned int u; long l; unsigned long ul; void *p; @@ -63,50 +63,43 @@ static inline void rpc_queue_init(rpc_queue *queue) { lockless_queue_init(queue); } - struct rpc_stats *rpc_stats_get(void); -int32_t rpc_msgcnt(rpc_queue *queue); -int rpc_poll_msg(rpc_queue *queue, uint32_t max_num); +int rpc_msgcnt(rpc_queue *queue); +int rpc_poll_msg(rpc_queue *queue, int max_num); + +int rpc_call_stack_exit(rpc_queue *queue); + +/* #include will conflict with lwip/sockets.h */ +struct sockaddr; + +int rpc_call_close(rpc_queue *queue, int fd); +int rpc_call_shutdown(rpc_queue *queue, int fd, int how); +int rpc_call_socket(rpc_queue *queue, int domain, int type, int protocol); +int rpc_call_bind(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen); +int rpc_call_listen(rpc_queue *queue, int s, int backlog); +int rpc_call_shadow_fd(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen); +int rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags); +int rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen); + +int rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen); +int rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen); +int rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen); +int rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen); + +int rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags); +int rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags); + +int rpc_call_replenish(rpc_queue *queue, void *sock); +int rpc_call_recvlistcnt(rpc_queue *queue); + void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup); -int32_t rpc_call_shadow_fd(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen); -int32_t rpc_call_recvlistcnt(rpc_queue *queue); -int32_t rpc_call_thread_regphase1(rpc_queue *queue, void *conn); -int32_t rpc_call_thread_regphase2(rpc_queue *queue, void *conn); -int32_t rpc_call_conntable(rpc_queue *queue, void *conn_table, uint32_t max_conn); -int32_t rpc_call_connnum(rpc_queue *queue); -int32_t rpc_call_arp(rpc_queue *queue, void *mbuf); -int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t protocol); -int32_t rpc_call_close(rpc_queue *queue, int32_t fd); -int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how); -int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen); -int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog); -int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags); -int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen); -int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags); -int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags); -int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen); -int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen); -int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen); -int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen); -int32_t rpc_call_fcntl(rpc_queue *queue, int fd, int cmd, long val); -int32_t rpc_call_ioctl(rpc_queue *queue, int fd, long cmd, void *argp); -int32_t rpc_call_replenish(rpc_queue *queue, void *sock); -int32_t rpc_call_mbufpoolsize(rpc_queue *queue); -int32_t rpc_call_stack_exit(rpc_queue *queue); - -static inline __attribute__((always_inline)) void rpc_call(rpc_queue *queue, struct rpc_msg *msg) -{ - lockless_queue_mpsc_push(queue, &msg->queue_node); -} +int rpc_call_arp(rpc_queue *queue, void *mbuf); -static inline __attribute__((always_inline)) void rpc_msg_free(struct rpc_msg *msg) -{ - pthread_spin_destroy(&msg->lock); - if (msg->rpcpool != NULL && msg->rpcpool->mempool != NULL) { - rte_mempool_put(msg->rpcpool->mempool, (void *)msg); - } else { - free(msg); - } -} +int rpc_call_conntable(rpc_queue *queue, void *conn_table, unsigned max_conn); +int rpc_call_connnum(rpc_queue *queue); +int rpc_call_mbufpoolsize(rpc_queue *queue); + +int rpc_call_thread_regphase1(rpc_queue *queue, void *conn); +int rpc_call_thread_regphase2(rpc_queue *queue, void *conn); #endif diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c index cf66e15..4f3cbc1 100644 --- a/src/lstack/netif/lstack_ethdev.c +++ b/src/lstack/netif/lstack_ethdev.c @@ -43,6 +43,62 @@ #define MBUF_MAX_LEN 1514 #define PACKET_READ_SIZE 32 +/* any protocol stack thread receives arp packet and sync it to other threads, + * so that it can have the arp table */ +static void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack) +{ + struct protocol_stack_group *stack_group = get_protocol_stack_group(); + struct rte_mbuf *mbuf_copy = NULL; + struct protocol_stack *stack = NULL; + int32_t ret; + + for (int32_t i = 0; i < stack_group->stack_num; i++) { + stack = stack_group->stacks[i]; + if (cur_stack == stack) { + continue; + } + + /* stack maybe not init in app thread yet */ + if (stack == NULL || !(netif_is_up(&stack->netif))) { + continue; + } + + ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true); + if (ret != 0) { + stack->stats.rx_allocmbuf_fail++; + return; + } + copy_mbuf(mbuf_copy, mbuf); + + ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy); + if (ret != 0) { + rte_pktmbuf_free(mbuf_copy); + return; + } + } +#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0) + if (get_global_cfg_params()->kni_switch) { + ret = dpdk_alloc_pktmbuf(cur_stack->rxtx_mbuf_pool, &mbuf_copy, 1, true); + if (ret != 0) { + cur_stack->stats.rx_allocmbuf_fail++; + return; + } + copy_mbuf(mbuf_copy, mbuf); + kni_handle_tx(mbuf_copy); + } +#endif + if (get_global_cfg_params()->flow_bifurcation) { + ret = dpdk_alloc_pktmbuf(cur_stack->rxtx_mbuf_pool, &mbuf_copy, 1, true); + if (ret != 0) { + cur_stack->stats.rx_allocmbuf_fail++; + return; + } + copy_mbuf(mbuf_copy, mbuf); + virtio_tap_process_tx(cur_stack->queue_id, mbuf_copy); + } + return; +} + void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack) { int32_t ret; -- 2.33.0