694 lines
24 KiB
Diff
694 lines
24 KiB
Diff
From 710bb34f1b46e0df4d82fe46fc36e729160ea1d7 Mon Sep 17 00:00:00 2001
|
|
From: Lemmy Huang <hlm3280@163.com>
|
|
Date: Sun, 1 Sep 2024 00:21:52 +0800
|
|
Subject: [PATCH] cleancode: add rpc_async_call, remove rpc_msg_arg.socklen,
|
|
fix some format
|
|
|
|
Signed-off-by: Lemmy Huang <hlm3280@163.com>
|
|
---
|
|
src/lstack/core/lstack_lwip.c | 27 +----
|
|
src/lstack/core/lstack_protocol_stack.c | 25 ++---
|
|
src/lstack/core/lstack_thread_rpc.c | 124 +++++++++++----------
|
|
src/lstack/include/lstack_lwip.h | 27 +++--
|
|
src/lstack/include/lstack_protocol_stack.h | 13 +--
|
|
src/lstack/include/lstack_thread_rpc.h | 25 +++--
|
|
6 files changed, 117 insertions(+), 124 deletions(-)
|
|
|
|
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
|
|
index 3454961..91f4838 100644
|
|
--- a/src/lstack/core/lstack_lwip.c
|
|
+++ b/src/lstack/core/lstack_lwip.c
|
|
@@ -588,23 +588,6 @@ bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *
|
|
return replenish_again;
|
|
}
|
|
|
|
-int do_lwip_send(struct protocol_stack *stack, int32_t fd, struct lwip_sock *sock,
|
|
- size_t len, int32_t flags)
|
|
-{
|
|
- ssize_t ret;
|
|
- /* send all send_ring, so len set lwip send max. */
|
|
- if (NETCONN_IS_UDP(sock)) {
|
|
- ret = lwip_send(fd, sock, len, flags);
|
|
- } else {
|
|
- ret = lwip_send(fd, sock, UINT16_MAX, flags);
|
|
- }
|
|
- if (ret < 0 && (errno == ENOTCONN || errno == ECONNRESET || errno == ECONNABORTED)) {
|
|
- return -1;
|
|
- }
|
|
-
|
|
- return do_lwip_replenish_sendring(stack, sock);
|
|
-}
|
|
-
|
|
static inline void free_recv_ring_readover(struct rte_ring *ring)
|
|
{
|
|
void *pbufs[SOCK_RECV_RING_SIZE];
|
|
@@ -753,10 +736,10 @@ static inline void notice_stack_tcp_send(struct lwip_sock *sock, int32_t fd, int
|
|
|
|
static inline void notice_stack_udp_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
|
|
{
|
|
- __sync_fetch_and_add(&sock->call_num, 1);
|
|
- while (rpc_call_udp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) {
|
|
- usleep(1000); // 1000: wait 1ms to exec again
|
|
- }
|
|
+ __sync_fetch_and_add(&sock->call_num, 1);
|
|
+ while (rpc_call_udp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) {
|
|
+ usleep(1000); // 1000: wait 1ms to exec again
|
|
+ }
|
|
}
|
|
|
|
static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
|
|
@@ -875,7 +858,7 @@ ssize_t do_lwip_send_to_stack(int32_t fd, const void *buf, size_t len, int32_t f
|
|
// send = 0 : tcp peer close connection ?
|
|
if (send <= 0) {
|
|
return send;
|
|
- }
|
|
+ }
|
|
}
|
|
|
|
notice_stack_send(sock, fd, send, flags);
|
|
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
|
|
index bcca1a7..f1eeba1 100644
|
|
--- a/src/lstack/core/lstack_protocol_stack.c
|
|
+++ b/src/lstack/core/lstack_protocol_stack.c
|
|
@@ -88,7 +88,7 @@ struct protocol_stack *get_protocol_stack(void)
|
|
return g_stack_p;
|
|
}
|
|
|
|
-struct protocol_stack *get_protocol_stack_by_fd(int32_t fd)
|
|
+struct protocol_stack *get_protocol_stack_by_fd(int fd)
|
|
{
|
|
struct lwip_sock *sock = lwip_get_socket(fd);
|
|
if (POSIX_IS_CLOSED(sock)) {
|
|
@@ -468,7 +468,7 @@ END:
|
|
return NULL;
|
|
}
|
|
|
|
-int stack_polling(uint32_t wakeup_tick)
|
|
+int stack_polling(unsigned wakeup_tick)
|
|
{
|
|
int force_quit;
|
|
struct cfg_params *cfg = get_global_cfg_params();
|
|
@@ -536,12 +536,11 @@ int stack_polling(uint32_t wakeup_tick)
|
|
static void* gazelle_stack_thread(void *arg)
|
|
{
|
|
struct thread_params *t_params = (struct thread_params*) arg;
|
|
-
|
|
uint16_t queue_id = t_params->queue_id;
|
|
- uint32_t wakeup_tick = 0;
|
|
-
|
|
- struct protocol_stack *stack = stack_thread_init(arg);
|
|
+ struct protocol_stack *stack;
|
|
+ unsigned wakeup_tick = 0;
|
|
|
|
+ stack = stack_thread_init(arg);
|
|
free(arg);
|
|
if (stack == NULL) {
|
|
LSTACK_LOG(ERR, LSTACK, "stack_thread_init failed queue_id=%hu\n", queue_id);
|
|
@@ -607,7 +606,7 @@ static int stack_group_init_mempool(void)
|
|
return 0;
|
|
}
|
|
|
|
-int32_t stack_group_init(void)
|
|
+int stack_group_init(void)
|
|
{
|
|
struct protocol_stack_group *stack_group = get_protocol_stack_group();
|
|
stack_group->stack_num = 0;
|
|
@@ -632,7 +631,7 @@ int32_t stack_group_init(void)
|
|
return 0;
|
|
}
|
|
|
|
-int32_t stack_setup_app_thread(void)
|
|
+int stack_setup_app_thread(void)
|
|
{
|
|
static PER_THREAD int first_flags = 1;
|
|
static _Atomic uint32_t queue_id = 0;
|
|
@@ -660,21 +659,21 @@ int32_t stack_setup_app_thread(void)
|
|
return 0;
|
|
}
|
|
|
|
-int32_t stack_setup_thread(void)
|
|
+int stack_setup_thread(void)
|
|
{
|
|
- int32_t ret;
|
|
+ int ret, i;
|
|
char name[PATH_MAX];
|
|
int queue_num = get_global_cfg_params()->num_queue;
|
|
struct thread_params *t_params[PROTOCOL_STACK_MAX] = {NULL};
|
|
int process_index = get_global_cfg_params()->process_idx;
|
|
|
|
- for (uint32_t i = 0; i < queue_num; ++i) {
|
|
+ for (i = 0; i < queue_num; ++i) {
|
|
t_params[i] = malloc(sizeof(struct thread_params));
|
|
if (t_params[i] == NULL) {
|
|
goto OUT1;
|
|
}
|
|
}
|
|
- for (uint32_t i = 0; i < queue_num; i++) {
|
|
+ for (i = 0; i < queue_num; i++) {
|
|
if (get_global_cfg_params()->seperate_send_recv) {
|
|
if (i % 2 == 0) {
|
|
ret = sprintf_s(name, sizeof(name), "%s_%d_%d", LSTACK_RECV_THREAD_NAME, process_index, i / 2);
|
|
@@ -714,7 +713,7 @@ int32_t stack_setup_thread(void)
|
|
return 0;
|
|
|
|
OUT1:
|
|
- for (int32_t i = 0; i < queue_num; ++i) {
|
|
+ for (i = 0; i < queue_num; ++i) {
|
|
if (t_params[i] != NULL) {
|
|
free(t_params[i]);
|
|
}
|
|
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
|
|
index 3e9889a..b4a5953 100644
|
|
--- a/src/lstack/core/lstack_thread_rpc.c
|
|
+++ b/src/lstack/core/lstack_thread_rpc.c
|
|
@@ -45,20 +45,20 @@ static struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool)
|
|
return msg;
|
|
}
|
|
|
|
-static void rpc_msg_init(struct rpc_msg *msg, rpc_msg_func func, struct rpc_msg_pool *pool)
|
|
+__rte_always_inline
|
|
+static void rpc_msg_init(struct rpc_msg *msg, rpc_func_t func, struct rpc_msg_pool *pool)
|
|
{
|
|
msg->func = func;
|
|
msg->rpcpool = pool;
|
|
- msg->sync_flag = 1;
|
|
msg->recall_flag = 0;
|
|
pthread_spin_init(&msg->lock, PTHREAD_PROCESS_PRIVATE);
|
|
}
|
|
|
|
-static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
|
|
+static struct rpc_msg *rpc_msg_alloc(rpc_func_t func)
|
|
{
|
|
- struct rpc_msg *msg = NULL;
|
|
+ struct rpc_msg *msg;
|
|
|
|
- if (g_rpc_pool == NULL) {
|
|
+ if (unlikely(g_rpc_pool == NULL)) {
|
|
g_rpc_pool = calloc(1, sizeof(struct rpc_msg_pool));
|
|
if (g_rpc_pool == NULL) {
|
|
LSTACK_LOG(INFO, LSTACK, "g_rpc_pool calloc failed\n");
|
|
@@ -66,8 +66,8 @@ static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
|
|
exit(-1);
|
|
}
|
|
|
|
- g_rpc_pool->mempool = create_mempool("rpc_pool", get_global_cfg_params()->rpc_msg_max, sizeof(struct rpc_msg),
|
|
- 0, rte_gettid());
|
|
+ g_rpc_pool->mempool =
|
|
+ create_mempool("rpc_pool", get_global_cfg_params()->rpc_msg_max, sizeof(struct rpc_msg), 0, rte_gettid());
|
|
if (g_rpc_pool->mempool == NULL) {
|
|
LSTACK_LOG(INFO, LSTACK, "rpc_pool create failed, errno is %d\n", errno);
|
|
g_rpc_stats.call_alloc_fail++;
|
|
@@ -76,12 +76,12 @@ static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
|
|
}
|
|
|
|
msg = get_rpc_msg(g_rpc_pool);
|
|
- if (msg == NULL) {
|
|
+ if (unlikely(msg == NULL)) {
|
|
g_rpc_stats.call_alloc_fail++;
|
|
return NULL;
|
|
}
|
|
- rpc_msg_init(msg, func, g_rpc_pool);
|
|
|
|
+ rpc_msg_init(msg, func, g_rpc_pool);
|
|
return msg;
|
|
}
|
|
|
|
@@ -97,8 +97,9 @@ static void rpc_msg_free(struct rpc_msg *msg)
|
|
}
|
|
|
|
__rte_always_inline
|
|
-static void rpc_call(rpc_queue *queue, struct rpc_msg *msg)
|
|
+static void rpc_async_call(rpc_queue *queue, struct rpc_msg *msg)
|
|
{
|
|
+ msg->sync_flag = 0;
|
|
lockless_queue_mpsc_push(queue, &msg->queue_node);
|
|
}
|
|
|
|
@@ -108,7 +109,9 @@ static int rpc_sync_call(rpc_queue *queue, struct rpc_msg *msg)
|
|
int ret;
|
|
|
|
pthread_spin_trylock(&msg->lock);
|
|
- rpc_call(queue, msg);
|
|
+
|
|
+ msg->sync_flag = 1;
|
|
+ lockless_queue_mpsc_push(queue, &msg->queue_node);
|
|
|
|
// waiting stack unlock
|
|
pthread_spin_lock(&msg->lock);
|
|
@@ -123,7 +126,7 @@ int rpc_msgcnt(rpc_queue *queue)
|
|
return lockless_queue_count(queue);
|
|
}
|
|
|
|
-static struct rpc_msg *rpc_msg_alloc_except(rpc_msg_func func)
|
|
+static struct rpc_msg *rpc_msg_alloc_except(rpc_func_t func)
|
|
{
|
|
struct rpc_msg *msg = calloc(1, sizeof(struct rpc_msg));
|
|
if (msg == NULL) {
|
|
@@ -146,14 +149,14 @@ int rpc_call_stack_exit(rpc_queue *queue)
|
|
return -1;
|
|
}
|
|
|
|
- rpc_call(queue, msg);
|
|
+ rpc_async_call(queue, msg);
|
|
return 0;
|
|
}
|
|
|
|
int rpc_poll_msg(rpc_queue *queue, int max_num)
|
|
{
|
|
int force_quit = 0;
|
|
- struct rpc_msg *msg = NULL;
|
|
+ struct rpc_msg *msg;
|
|
|
|
while (max_num--) {
|
|
lockless_queue_node *node = lockless_queue_mpsc_pop(queue);
|
|
@@ -163,24 +166,24 @@ int rpc_poll_msg(rpc_queue *queue, int max_num)
|
|
|
|
msg = container_of(node, struct rpc_msg, queue_node);
|
|
|
|
- if (msg->func) {
|
|
+ if (likely(msg->func)) {
|
|
msg->func(msg);
|
|
} else {
|
|
g_rpc_stats.call_null++;
|
|
}
|
|
|
|
- if (msg->func == stack_exit_by_rpc) {
|
|
+ if (unlikely(msg->func == stack_exit_by_rpc)) {
|
|
force_quit = 1;
|
|
}
|
|
+ if (msg->recall_flag) {
|
|
+ msg->recall_flag = 0;
|
|
+ continue;
|
|
+ }
|
|
|
|
- if (!msg->recall_flag) {
|
|
- if (msg->sync_flag) {
|
|
- pthread_spin_unlock(&msg->lock);
|
|
- } else {
|
|
- rpc_msg_free(msg);
|
|
- }
|
|
+ if (msg->sync_flag) {
|
|
+ pthread_spin_unlock(&msg->lock);
|
|
} else {
|
|
- msg->recall_flag = 0;
|
|
+ rpc_msg_free(msg);
|
|
}
|
|
}
|
|
|
|
@@ -204,7 +207,7 @@ static void callback_close(struct rpc_msg *msg)
|
|
|
|
if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
|
|
msg->recall_flag = 1;
|
|
- rpc_call(&stack->rpc_queue, msg); /* until stack_send recall finish */
|
|
+ rpc_async_call(&stack->rpc_queue, msg); /* until stack_send recall finish */
|
|
return;
|
|
}
|
|
|
|
@@ -223,7 +226,7 @@ static void callback_shutdown(struct rpc_msg *msg)
|
|
|
|
if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
|
|
msg->recall_flag = 1;
|
|
- rpc_call(&stack->rpc_queue, msg);
|
|
+ rpc_async_call(&stack->rpc_queue, msg);
|
|
return;
|
|
}
|
|
|
|
@@ -237,7 +240,7 @@ static void callback_shutdown(struct rpc_msg *msg)
|
|
|
|
static void callback_bind(struct rpc_msg *msg)
|
|
{
|
|
- msg->result = lwip_bind(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].socklen);
|
|
+ msg->result = lwip_bind(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].u);
|
|
if (msg->result != 0) {
|
|
LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
|
|
}
|
|
@@ -265,7 +268,7 @@ static void callback_create_shadow_fd(struct rpc_msg *msg)
|
|
{
|
|
int fd = msg->args[MSG_ARG_0].i;
|
|
struct sockaddr *addr = msg->args[MSG_ARG_1].p;
|
|
- socklen_t addr_len = msg->args[MSG_ARG_2].socklen;
|
|
+ socklen_t addr_len = msg->args[MSG_ARG_2].u;
|
|
|
|
int clone_fd = 0;
|
|
struct lwip_sock *sock = lwip_get_socket(fd);
|
|
@@ -337,7 +340,7 @@ static void callback_accept(struct rpc_msg *msg)
|
|
|
|
static void callback_connect(struct rpc_msg *msg)
|
|
{
|
|
- msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].socklen);
|
|
+ msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].u);
|
|
if (msg->result < 0) {
|
|
msg->result = -errno;
|
|
}
|
|
@@ -391,7 +394,7 @@ int rpc_call_bind(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen
|
|
|
|
msg->args[MSG_ARG_0].i = fd;
|
|
msg->args[MSG_ARG_1].cp = addr;
|
|
- msg->args[MSG_ARG_2].socklen = addrlen;
|
|
+ msg->args[MSG_ARG_2].u = addrlen;
|
|
|
|
return rpc_sync_call(queue, msg);
|
|
}
|
|
@@ -418,7 +421,7 @@ int rpc_call_shadow_fd(rpc_queue *queue, int fd, const struct sockaddr *addr, so
|
|
|
|
msg->args[MSG_ARG_0].i = fd;
|
|
msg->args[MSG_ARG_1].cp = addr;
|
|
- msg->args[MSG_ARG_2].socklen = addrlen;
|
|
+ msg->args[MSG_ARG_2].u = addrlen;
|
|
|
|
return rpc_sync_call(queue, msg);
|
|
}
|
|
@@ -447,7 +450,7 @@ int rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, sock
|
|
|
|
msg->args[MSG_ARG_0].i = fd;
|
|
msg->args[MSG_ARG_1].cp = addr;
|
|
- msg->args[MSG_ARG_2].socklen = addrlen;
|
|
+ msg->args[MSG_ARG_2].u = addrlen;
|
|
|
|
int ret = rpc_sync_call(queue, msg);
|
|
if (ret < 0) {
|
|
@@ -486,7 +489,7 @@ static void callback_getsockopt(struct rpc_msg *msg)
|
|
static void callback_setsockopt(struct rpc_msg *msg)
|
|
{
|
|
msg->result = lwip_setsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i,
|
|
- msg->args[MSG_ARG_3].cp, msg->args[MSG_ARG_4].socklen);
|
|
+ msg->args[MSG_ARG_3].cp, msg->args[MSG_ARG_4].u);
|
|
if (msg->result != 0) {
|
|
LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(),
|
|
msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result);
|
|
@@ -548,7 +551,7 @@ int rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const
|
|
msg->args[MSG_ARG_1].i = level;
|
|
msg->args[MSG_ARG_2].i = optname;
|
|
msg->args[MSG_ARG_3].cp = optval;
|
|
- msg->args[MSG_ARG_4].socklen = optlen;
|
|
+ msg->args[MSG_ARG_4].u = optlen;
|
|
|
|
return rpc_sync_call(queue, msg);
|
|
}
|
|
@@ -556,13 +559,13 @@ int rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const
|
|
static void callback_tcp_send(struct rpc_msg *msg)
|
|
{
|
|
int fd = msg->args[MSG_ARG_0].i;
|
|
- size_t len = msg->args[MSG_ARG_1].size;
|
|
+ size_t len = UINT16_MAX; /* ignore msg->args[MSG_ARG_1].size; */
|
|
struct protocol_stack *stack = get_protocol_stack();
|
|
- int replenish_again;
|
|
+ int ret;
|
|
+ msg->result = -1;
|
|
|
|
struct lwip_sock *sock = lwip_get_socket(fd);
|
|
- if (POSIX_IS_CLOSED(sock)) {
|
|
- msg->result = -1;
|
|
+ if (unlikely(POSIX_IS_CLOSED(sock))) {
|
|
return;
|
|
}
|
|
|
|
@@ -570,16 +573,18 @@ static void callback_tcp_send(struct rpc_msg *msg)
|
|
calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
|
|
}
|
|
|
|
- replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
|
|
- if (replenish_again < 0) {
|
|
+ ret = lwip_send(fd, sock, len, 0);
|
|
+ if (unlikely(ret < 0) && (errno == ENOTCONN || errno == ECONNRESET || errno == ECONNABORTED)) {
|
|
__sync_fetch_and_sub(&sock->call_num, 1);
|
|
return;
|
|
}
|
|
+ msg->result = 0;
|
|
|
|
- if (NETCONN_IS_DATAOUT(sock) || replenish_again > 0) {
|
|
+ ret = do_lwip_replenish_sendring(stack, sock);
|
|
+ if (ret > 0 || NETCONN_IS_DATAOUT(sock)) {
|
|
if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1) {
|
|
msg->recall_flag = 1;
|
|
- rpc_call(&stack->rpc_queue, msg);
|
|
+ rpc_async_call(&stack->rpc_queue, msg);
|
|
return;
|
|
}
|
|
}
|
|
@@ -593,11 +598,11 @@ static void callback_udp_send(struct rpc_msg *msg)
|
|
int fd = msg->args[MSG_ARG_0].i;
|
|
size_t len = msg->args[MSG_ARG_1].size;
|
|
struct protocol_stack *stack = get_protocol_stack();
|
|
- int replenish_again;
|
|
+ int ret;
|
|
+ msg->result = -1;
|
|
|
|
struct lwip_sock *sock = lwip_get_socket(fd);
|
|
- if (POSIX_IS_CLOSED(sock)) {
|
|
- msg->result = -1;
|
|
+ if (unlikely(POSIX_IS_CLOSED(sock))) {
|
|
return;
|
|
}
|
|
|
|
@@ -605,8 +610,15 @@ static void callback_udp_send(struct rpc_msg *msg)
|
|
calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
|
|
}
|
|
|
|
- replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
|
|
- if ((replenish_again > 0) && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) {
|
|
+ ret = lwip_send(fd, sock, len, 0);
|
|
+ if (unlikely(ret < 0) && (errno == ENOTCONN || errno == ECONNRESET || errno == ECONNABORTED)) {
|
|
+ __sync_fetch_and_sub(&sock->call_num, 1);
|
|
+ return;
|
|
+ }
|
|
+ msg->result = 0;
|
|
+
|
|
+ ret = do_lwip_replenish_sendring(stack, sock);
|
|
+ if (ret > 0 && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) {
|
|
rpc_call_replenish(&stack->rpc_queue, sock);
|
|
return;
|
|
}
|
|
@@ -629,9 +641,8 @@ int rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags)
|
|
msg->args[MSG_ARG_0].i = fd;
|
|
msg->args[MSG_ARG_1].size = len;
|
|
msg->args[MSG_ARG_2].i = flags;
|
|
- msg->sync_flag = 0;
|
|
|
|
- rpc_call(queue, msg);
|
|
+ rpc_async_call(queue, msg);
|
|
return 0;
|
|
}
|
|
|
|
@@ -649,9 +660,8 @@ int rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags)
|
|
msg->args[MSG_ARG_0].i = fd;
|
|
msg->args[MSG_ARG_1].size = len;
|
|
msg->args[MSG_ARG_2].i = flags;
|
|
- msg->sync_flag = 0;
|
|
|
|
- rpc_call(queue, msg);
|
|
+ rpc_async_call(queue, msg);
|
|
return 0;
|
|
}
|
|
|
|
@@ -663,7 +673,7 @@ static void callback_replenish_sendring(struct rpc_msg *msg)
|
|
msg->result = do_lwip_replenish_sendring(stack, sock);
|
|
if (msg->result == true) {
|
|
msg->recall_flag = 1;
|
|
- rpc_call(&stack->rpc_queue, msg);
|
|
+ rpc_async_call(&stack->rpc_queue, msg);
|
|
}
|
|
}
|
|
|
|
@@ -675,9 +685,8 @@ int rpc_call_replenish(rpc_queue *queue, void *sock)
|
|
}
|
|
|
|
msg->args[MSG_ARG_0].p = sock;
|
|
- msg->sync_flag = 0;
|
|
|
|
- rpc_call(queue, msg);
|
|
+ rpc_async_call(queue, msg);
|
|
return 0;
|
|
}
|
|
|
|
@@ -713,16 +722,17 @@ static void callback_clean_epoll(struct rpc_msg *msg)
|
|
list_del_node(&wakeup->wakeup_list[stack->stack_idx]);
|
|
}
|
|
|
|
-void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup)
|
|
+int rpc_call_clean_epoll(rpc_queue *queue, void *wakeup)
|
|
{
|
|
struct rpc_msg *msg = rpc_msg_alloc(callback_clean_epoll);
|
|
if (msg == NULL) {
|
|
- return;
|
|
+ return -1;
|
|
}
|
|
|
|
msg->args[MSG_ARG_0].p = wakeup;
|
|
|
|
rpc_sync_call(queue, msg);
|
|
+ return 0;
|
|
}
|
|
|
|
static void callback_arp(struct rpc_msg *msg)
|
|
@@ -740,11 +750,9 @@ int rpc_call_arp(rpc_queue *queue, void *mbuf)
|
|
return -1;
|
|
}
|
|
|
|
- msg->sync_flag = 0;
|
|
msg->args[MSG_ARG_0].p = mbuf;
|
|
|
|
- rpc_call(queue, msg);
|
|
-
|
|
+ rpc_async_call(queue, msg);
|
|
return 0;
|
|
}
|
|
|
|
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
|
|
index 0c7bb62..dcb7dac 100644
|
|
--- a/src/lstack/include/lstack_lwip.h
|
|
+++ b/src/lstack/include/lstack_lwip.h
|
|
@@ -29,15 +29,23 @@ unsigned same_node_ring_count(struct lwip_sock *sock);
|
|
#define NETCONN_IS_OUTIDLE(sock) gazelle_ring_readable_count((sock)->send_ring)
|
|
#define NETCONN_IS_UDP(sock) (NETCONNTYPE_GROUP(netconn_type((sock)->conn)) == NETCONN_UDP)
|
|
|
|
-void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock);
|
|
-
|
|
+/* lwip api */
|
|
struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size);
|
|
struct pbuf *do_lwip_udp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size);
|
|
void do_lwip_get_from_sendring_over(struct lwip_sock *sock);
|
|
-bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *sock);
|
|
ssize_t do_lwip_read_from_lwip(struct lwip_sock *sock, int32_t flags, uint8_t apiflags);
|
|
|
|
-/* app write/read ring */
|
|
+/* lwip api */
|
|
+void do_lwip_free_pbuf(struct pbuf *pbuf);
|
|
+struct pbuf *do_lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type);
|
|
+
|
|
+/* lwip api */
|
|
+void do_lwip_add_recvlist(int32_t fd);
|
|
+/* stack api */
|
|
+void do_lwip_read_recvlist(struct protocol_stack *stack, uint32_t max_num);
|
|
+
|
|
+
|
|
+/* app api */
|
|
ssize_t do_lwip_sendmsg_to_stack(struct lwip_sock *sock, int32_t s,
|
|
const struct msghdr *message, int32_t flags);
|
|
ssize_t do_lwip_recvmsg_from_stack(int32_t s, const struct msghdr *message, int32_t flags);
|
|
@@ -47,17 +55,14 @@ ssize_t do_lwip_send_to_stack(int32_t fd, const void *buf, size_t len, int32_t f
|
|
ssize_t do_lwip_read_from_stack(int32_t fd, void *buf, size_t len, int32_t flags,
|
|
struct sockaddr *addr, socklen_t *addrlen);
|
|
|
|
-void do_lwip_read_recvlist(struct protocol_stack *stack, uint32_t max_num);
|
|
-void do_lwip_add_recvlist(int32_t fd);
|
|
-int do_lwip_send(struct protocol_stack *stack, int32_t fd, struct lwip_sock *sock,
|
|
- size_t len, int32_t flags);
|
|
+/* stack api */
|
|
+bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *sock);
|
|
+
|
|
+void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock);
|
|
|
|
uint32_t do_lwip_get_conntable(struct gazelle_stat_lstack_conn_info *conn, uint32_t max_num);
|
|
uint32_t do_lwip_get_connnum(void);
|
|
|
|
-void do_lwip_free_pbuf(struct pbuf *pbuf);
|
|
-struct pbuf *do_lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type);
|
|
-
|
|
void read_same_node_recv_list(struct protocol_stack *stack);
|
|
|
|
#endif
|
|
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
|
|
index fdd5388..08a3901 100644
|
|
--- a/src/lstack/include/lstack_protocol_stack.h
|
|
+++ b/src/lstack/include/lstack_protocol_stack.h
|
|
@@ -31,11 +31,8 @@
|
|
#include "lstack_tx_cache.h"
|
|
|
|
#define SOCK_RECV_RING_SIZE (get_global_cfg_params()->recv_ring_size)
|
|
-#define SOCK_RECV_FREE_THRES (32)
|
|
#define SOCK_RECV_RING_SIZE_MAX (2048)
|
|
#define SOCK_SEND_RING_SIZE_MAX (2048)
|
|
-#define SOCK_SEND_REPLENISH_THRES (16)
|
|
-#define WAKEUP_MAX_NUM (32)
|
|
|
|
#define MBUFPOOL_RESERVE_NUM (get_global_cfg_params()->nic.rxqueue_size + 1024)
|
|
|
|
@@ -113,7 +110,7 @@ struct protocol_stack_group {
|
|
long get_stack_tid(void);
|
|
|
|
struct protocol_stack *get_protocol_stack(void);
|
|
-struct protocol_stack *get_protocol_stack_by_fd(int32_t fd);
|
|
+struct protocol_stack *get_protocol_stack_by_fd(int fd);
|
|
struct protocol_stack *get_bind_protocol_stack(void);
|
|
struct protocol_stack_group *get_protocol_stack_group(void);
|
|
|
|
@@ -121,13 +118,13 @@ int get_min_conn_stack(struct protocol_stack_group *stack_group);
|
|
void bind_to_stack_numa(struct protocol_stack *stack);
|
|
void thread_bind_stack(struct protocol_stack *stack);
|
|
|
|
-int32_t stack_group_init(void);
|
|
+int stack_group_init(void);
|
|
void stack_group_exit(void);
|
|
void stack_exit(void);
|
|
|
|
-int32_t stack_setup_thread(void);
|
|
-int32_t stack_setup_app_thread(void);
|
|
+int stack_setup_thread(void);
|
|
+int stack_setup_app_thread(void);
|
|
|
|
-int stack_polling(uint32_t wakeup_tick);
|
|
+int stack_polling(unsigned wakeup_tick);
|
|
|
|
#endif
|
|
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
|
|
index c2654bb..6f8e03e 100644
|
|
--- a/src/lstack/include/lstack_thread_rpc.h
|
|
+++ b/src/lstack/include/lstack_thread_rpc.h
|
|
@@ -32,8 +32,6 @@ struct rpc_stats {
|
|
uint64_t call_alloc_fail;
|
|
};
|
|
|
|
-struct rpc_msg;
|
|
-typedef void (*rpc_msg_func)(struct rpc_msg *msg);
|
|
union rpc_msg_arg {
|
|
int i;
|
|
unsigned int u;
|
|
@@ -41,22 +39,25 @@ union rpc_msg_arg {
|
|
unsigned long ul;
|
|
void *p;
|
|
const void *cp;
|
|
- socklen_t socklen;
|
|
size_t size;
|
|
};
|
|
-struct rpc_msg_pool {
|
|
- struct rte_mempool *mempool;
|
|
-};
|
|
+
|
|
+struct rpc_msg;
|
|
+typedef void (*rpc_func_t)(struct rpc_msg *msg);
|
|
struct rpc_msg {
|
|
- pthread_spinlock_t lock; /* msg handler unlock notice sender msg process done */
|
|
int8_t sync_flag : 1;
|
|
int8_t recall_flag : 1;
|
|
- int64_t result; /* func return val */
|
|
- lockless_queue_node queue_node;
|
|
- struct rpc_msg_pool *rpcpool;
|
|
|
|
- rpc_msg_func func; /* msg handle func hook */
|
|
+ long result; /* func return val */
|
|
+ rpc_func_t func; /* msg handle func hook */
|
|
union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */
|
|
+
|
|
+ struct rpc_msg_pool {
|
|
+ struct rte_mempool *mempool;
|
|
+ } *rpcpool;
|
|
+
|
|
+ pthread_spinlock_t lock; /* msg handler unlock notice sender msg process done */
|
|
+ lockless_queue_node queue_node;
|
|
};
|
|
|
|
static inline void rpc_queue_init(rpc_queue *queue)
|
|
@@ -92,7 +93,7 @@ int rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags);
|
|
int rpc_call_replenish(rpc_queue *queue, void *sock);
|
|
int rpc_call_recvlistcnt(rpc_queue *queue);
|
|
|
|
-void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup);
|
|
+int rpc_call_clean_epoll(rpc_queue *queue, void *wakeup);
|
|
int rpc_call_arp(rpc_queue *queue, void *mbuf);
|
|
|
|
int rpc_call_conntable(rpc_queue *queue, void *conn_table, unsigned max_conn);
|
|
--
|
|
2.33.0
|
|
|