sync cleancode: declare different cfg_params types

(cherry picked from commit a90b70e9699c3082533da0812da33a4fdf485e32)
This commit is contained in:
jiangheng 2024-09-27 15:54:05 +08:00 committed by openeuler-sync-bot
parent ebff16560b
commit 41d8d0f252
5 changed files with 4690 additions and 1 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,693 @@
From 710bb34f1b46e0df4d82fe46fc36e729160ea1d7 Mon Sep 17 00:00:00 2001
From: Lemmy Huang <hlm3280@163.com>
Date: Sun, 1 Sep 2024 00:21:52 +0800
Subject: [PATCH] cleancode: add rpc_async_call, remove rpc_msg_arg.socklen,
fix some format
Signed-off-by: Lemmy Huang <hlm3280@163.com>
---
src/lstack/core/lstack_lwip.c | 27 +----
src/lstack/core/lstack_protocol_stack.c | 25 ++---
src/lstack/core/lstack_thread_rpc.c | 124 +++++++++++----------
src/lstack/include/lstack_lwip.h | 27 +++--
src/lstack/include/lstack_protocol_stack.h | 13 +--
src/lstack/include/lstack_thread_rpc.h | 25 +++--
6 files changed, 117 insertions(+), 124 deletions(-)
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 3454961..91f4838 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -588,23 +588,6 @@ bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *
return replenish_again;
}
-int do_lwip_send(struct protocol_stack *stack, int32_t fd, struct lwip_sock *sock,
- size_t len, int32_t flags)
-{
- ssize_t ret;
- /* send all send_ring, so len set lwip send max. */
- if (NETCONN_IS_UDP(sock)) {
- ret = lwip_send(fd, sock, len, flags);
- } else {
- ret = lwip_send(fd, sock, UINT16_MAX, flags);
- }
- if (ret < 0 && (errno == ENOTCONN || errno == ECONNRESET || errno == ECONNABORTED)) {
- return -1;
- }
-
- return do_lwip_replenish_sendring(stack, sock);
-}
-
static inline void free_recv_ring_readover(struct rte_ring *ring)
{
void *pbufs[SOCK_RECV_RING_SIZE];
@@ -753,10 +736,10 @@ static inline void notice_stack_tcp_send(struct lwip_sock *sock, int32_t fd, int
static inline void notice_stack_udp_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
{
- __sync_fetch_and_add(&sock->call_num, 1);
- while (rpc_call_udp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) {
- usleep(1000); // 1000: wait 1ms to exec again
- }
+ __sync_fetch_and_add(&sock->call_num, 1);
+ while (rpc_call_udp_send(&sock->stack->rpc_queue, fd, len, flags) < 0) {
+ usleep(1000); // 1000: wait 1ms to exec again
+ }
}
static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
@@ -875,7 +858,7 @@ ssize_t do_lwip_send_to_stack(int32_t fd, const void *buf, size_t len, int32_t f
// send = 0 : tcp peer close connection ?
if (send <= 0) {
return send;
- }
+ }
}
notice_stack_send(sock, fd, send, flags);
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index bcca1a7..f1eeba1 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -88,7 +88,7 @@ struct protocol_stack *get_protocol_stack(void)
return g_stack_p;
}
-struct protocol_stack *get_protocol_stack_by_fd(int32_t fd)
+struct protocol_stack *get_protocol_stack_by_fd(int fd)
{
struct lwip_sock *sock = lwip_get_socket(fd);
if (POSIX_IS_CLOSED(sock)) {
@@ -468,7 +468,7 @@ END:
return NULL;
}
-int stack_polling(uint32_t wakeup_tick)
+int stack_polling(unsigned wakeup_tick)
{
int force_quit;
struct cfg_params *cfg = get_global_cfg_params();
@@ -536,12 +536,11 @@ int stack_polling(uint32_t wakeup_tick)
static void* gazelle_stack_thread(void *arg)
{
struct thread_params *t_params = (struct thread_params*) arg;
-
uint16_t queue_id = t_params->queue_id;
- uint32_t wakeup_tick = 0;
-
- struct protocol_stack *stack = stack_thread_init(arg);
+ struct protocol_stack *stack;
+ unsigned wakeup_tick = 0;
+ stack = stack_thread_init(arg);
free(arg);
if (stack == NULL) {
LSTACK_LOG(ERR, LSTACK, "stack_thread_init failed queue_id=%hu\n", queue_id);
@@ -607,7 +606,7 @@ static int stack_group_init_mempool(void)
return 0;
}
-int32_t stack_group_init(void)
+int stack_group_init(void)
{
struct protocol_stack_group *stack_group = get_protocol_stack_group();
stack_group->stack_num = 0;
@@ -632,7 +631,7 @@ int32_t stack_group_init(void)
return 0;
}
-int32_t stack_setup_app_thread(void)
+int stack_setup_app_thread(void)
{
static PER_THREAD int first_flags = 1;
static _Atomic uint32_t queue_id = 0;
@@ -660,21 +659,21 @@ int32_t stack_setup_app_thread(void)
return 0;
}
-int32_t stack_setup_thread(void)
+int stack_setup_thread(void)
{
- int32_t ret;
+ int ret, i;
char name[PATH_MAX];
int queue_num = get_global_cfg_params()->num_queue;
struct thread_params *t_params[PROTOCOL_STACK_MAX] = {NULL};
int process_index = get_global_cfg_params()->process_idx;
- for (uint32_t i = 0; i < queue_num; ++i) {
+ for (i = 0; i < queue_num; ++i) {
t_params[i] = malloc(sizeof(struct thread_params));
if (t_params[i] == NULL) {
goto OUT1;
}
}
- for (uint32_t i = 0; i < queue_num; i++) {
+ for (i = 0; i < queue_num; i++) {
if (get_global_cfg_params()->seperate_send_recv) {
if (i % 2 == 0) {
ret = sprintf_s(name, sizeof(name), "%s_%d_%d", LSTACK_RECV_THREAD_NAME, process_index, i / 2);
@@ -714,7 +713,7 @@ int32_t stack_setup_thread(void)
return 0;
OUT1:
- for (int32_t i = 0; i < queue_num; ++i) {
+ for (i = 0; i < queue_num; ++i) {
if (t_params[i] != NULL) {
free(t_params[i]);
}
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 3e9889a..b4a5953 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -45,20 +45,20 @@ static struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool)
return msg;
}
-static void rpc_msg_init(struct rpc_msg *msg, rpc_msg_func func, struct rpc_msg_pool *pool)
+__rte_always_inline
+static void rpc_msg_init(struct rpc_msg *msg, rpc_func_t func, struct rpc_msg_pool *pool)
{
msg->func = func;
msg->rpcpool = pool;
- msg->sync_flag = 1;
msg->recall_flag = 0;
pthread_spin_init(&msg->lock, PTHREAD_PROCESS_PRIVATE);
}
-static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
+static struct rpc_msg *rpc_msg_alloc(rpc_func_t func)
{
- struct rpc_msg *msg = NULL;
+ struct rpc_msg *msg;
- if (g_rpc_pool == NULL) {
+ if (unlikely(g_rpc_pool == NULL)) {
g_rpc_pool = calloc(1, sizeof(struct rpc_msg_pool));
if (g_rpc_pool == NULL) {
LSTACK_LOG(INFO, LSTACK, "g_rpc_pool calloc failed\n");
@@ -66,8 +66,8 @@ static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
exit(-1);
}
- g_rpc_pool->mempool = create_mempool("rpc_pool", get_global_cfg_params()->rpc_msg_max, sizeof(struct rpc_msg),
- 0, rte_gettid());
+ g_rpc_pool->mempool =
+ create_mempool("rpc_pool", get_global_cfg_params()->rpc_msg_max, sizeof(struct rpc_msg), 0, rte_gettid());
if (g_rpc_pool->mempool == NULL) {
LSTACK_LOG(INFO, LSTACK, "rpc_pool create failed, errno is %d\n", errno);
g_rpc_stats.call_alloc_fail++;
@@ -76,12 +76,12 @@ static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
}
msg = get_rpc_msg(g_rpc_pool);
- if (msg == NULL) {
+ if (unlikely(msg == NULL)) {
g_rpc_stats.call_alloc_fail++;
return NULL;
}
- rpc_msg_init(msg, func, g_rpc_pool);
+ rpc_msg_init(msg, func, g_rpc_pool);
return msg;
}
@@ -97,8 +97,9 @@ static void rpc_msg_free(struct rpc_msg *msg)
}
__rte_always_inline
-static void rpc_call(rpc_queue *queue, struct rpc_msg *msg)
+static void rpc_async_call(rpc_queue *queue, struct rpc_msg *msg)
{
+ msg->sync_flag = 0;
lockless_queue_mpsc_push(queue, &msg->queue_node);
}
@@ -108,7 +109,9 @@ static int rpc_sync_call(rpc_queue *queue, struct rpc_msg *msg)
int ret;
pthread_spin_trylock(&msg->lock);
- rpc_call(queue, msg);
+
+ msg->sync_flag = 1;
+ lockless_queue_mpsc_push(queue, &msg->queue_node);
// waiting stack unlock
pthread_spin_lock(&msg->lock);
@@ -123,7 +126,7 @@ int rpc_msgcnt(rpc_queue *queue)
return lockless_queue_count(queue);
}
-static struct rpc_msg *rpc_msg_alloc_except(rpc_msg_func func)
+static struct rpc_msg *rpc_msg_alloc_except(rpc_func_t func)
{
struct rpc_msg *msg = calloc(1, sizeof(struct rpc_msg));
if (msg == NULL) {
@@ -146,14 +149,14 @@ int rpc_call_stack_exit(rpc_queue *queue)
return -1;
}
- rpc_call(queue, msg);
+ rpc_async_call(queue, msg);
return 0;
}
int rpc_poll_msg(rpc_queue *queue, int max_num)
{
int force_quit = 0;
- struct rpc_msg *msg = NULL;
+ struct rpc_msg *msg;
while (max_num--) {
lockless_queue_node *node = lockless_queue_mpsc_pop(queue);
@@ -163,24 +166,24 @@ int rpc_poll_msg(rpc_queue *queue, int max_num)
msg = container_of(node, struct rpc_msg, queue_node);
- if (msg->func) {
+ if (likely(msg->func)) {
msg->func(msg);
} else {
g_rpc_stats.call_null++;
}
- if (msg->func == stack_exit_by_rpc) {
+ if (unlikely(msg->func == stack_exit_by_rpc)) {
force_quit = 1;
}
+ if (msg->recall_flag) {
+ msg->recall_flag = 0;
+ continue;
+ }
- if (!msg->recall_flag) {
- if (msg->sync_flag) {
- pthread_spin_unlock(&msg->lock);
- } else {
- rpc_msg_free(msg);
- }
+ if (msg->sync_flag) {
+ pthread_spin_unlock(&msg->lock);
} else {
- msg->recall_flag = 0;
+ rpc_msg_free(msg);
}
}
@@ -204,7 +207,7 @@ static void callback_close(struct rpc_msg *msg)
if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg); /* until stack_send recall finish */
+ rpc_async_call(&stack->rpc_queue, msg); /* until stack_send recall finish */
return;
}
@@ -223,7 +226,7 @@ static void callback_shutdown(struct rpc_msg *msg)
if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg);
+ rpc_async_call(&stack->rpc_queue, msg);
return;
}
@@ -237,7 +240,7 @@ static void callback_shutdown(struct rpc_msg *msg)
static void callback_bind(struct rpc_msg *msg)
{
- msg->result = lwip_bind(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].socklen);
+ msg->result = lwip_bind(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].u);
if (msg->result != 0) {
LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
}
@@ -265,7 +268,7 @@ static void callback_create_shadow_fd(struct rpc_msg *msg)
{
int fd = msg->args[MSG_ARG_0].i;
struct sockaddr *addr = msg->args[MSG_ARG_1].p;
- socklen_t addr_len = msg->args[MSG_ARG_2].socklen;
+ socklen_t addr_len = msg->args[MSG_ARG_2].u;
int clone_fd = 0;
struct lwip_sock *sock = lwip_get_socket(fd);
@@ -337,7 +340,7 @@ static void callback_accept(struct rpc_msg *msg)
static void callback_connect(struct rpc_msg *msg)
{
- msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].socklen);
+ msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].u);
if (msg->result < 0) {
msg->result = -errno;
}
@@ -391,7 +394,7 @@ int rpc_call_bind(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen
msg->args[MSG_ARG_0].i = fd;
msg->args[MSG_ARG_1].cp = addr;
- msg->args[MSG_ARG_2].socklen = addrlen;
+ msg->args[MSG_ARG_2].u = addrlen;
return rpc_sync_call(queue, msg);
}
@@ -418,7 +421,7 @@ int rpc_call_shadow_fd(rpc_queue *queue, int fd, const struct sockaddr *addr, so
msg->args[MSG_ARG_0].i = fd;
msg->args[MSG_ARG_1].cp = addr;
- msg->args[MSG_ARG_2].socklen = addrlen;
+ msg->args[MSG_ARG_2].u = addrlen;
return rpc_sync_call(queue, msg);
}
@@ -447,7 +450,7 @@ int rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, sock
msg->args[MSG_ARG_0].i = fd;
msg->args[MSG_ARG_1].cp = addr;
- msg->args[MSG_ARG_2].socklen = addrlen;
+ msg->args[MSG_ARG_2].u = addrlen;
int ret = rpc_sync_call(queue, msg);
if (ret < 0) {
@@ -486,7 +489,7 @@ static void callback_getsockopt(struct rpc_msg *msg)
static void callback_setsockopt(struct rpc_msg *msg)
{
msg->result = lwip_setsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i,
- msg->args[MSG_ARG_3].cp, msg->args[MSG_ARG_4].socklen);
+ msg->args[MSG_ARG_3].cp, msg->args[MSG_ARG_4].u);
if (msg->result != 0) {
LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(),
msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result);
@@ -548,7 +551,7 @@ int rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const
msg->args[MSG_ARG_1].i = level;
msg->args[MSG_ARG_2].i = optname;
msg->args[MSG_ARG_3].cp = optval;
- msg->args[MSG_ARG_4].socklen = optlen;
+ msg->args[MSG_ARG_4].u = optlen;
return rpc_sync_call(queue, msg);
}
@@ -556,13 +559,13 @@ int rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const
static void callback_tcp_send(struct rpc_msg *msg)
{
int fd = msg->args[MSG_ARG_0].i;
- size_t len = msg->args[MSG_ARG_1].size;
+ size_t len = UINT16_MAX; /* ignore msg->args[MSG_ARG_1].size; */
struct protocol_stack *stack = get_protocol_stack();
- int replenish_again;
+ int ret;
+ msg->result = -1;
struct lwip_sock *sock = lwip_get_socket(fd);
- if (POSIX_IS_CLOSED(sock)) {
- msg->result = -1;
+ if (unlikely(POSIX_IS_CLOSED(sock))) {
return;
}
@@ -570,16 +573,18 @@ static void callback_tcp_send(struct rpc_msg *msg)
calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
}
- replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
- if (replenish_again < 0) {
+ ret = lwip_send(fd, sock, len, 0);
+ if (unlikely(ret < 0) && (errno == ENOTCONN || errno == ECONNRESET || errno == ECONNABORTED)) {
__sync_fetch_and_sub(&sock->call_num, 1);
return;
}
+ msg->result = 0;
- if (NETCONN_IS_DATAOUT(sock) || replenish_again > 0) {
+ ret = do_lwip_replenish_sendring(stack, sock);
+ if (ret > 0 || NETCONN_IS_DATAOUT(sock)) {
if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1) {
msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg);
+ rpc_async_call(&stack->rpc_queue, msg);
return;
}
}
@@ -593,11 +598,11 @@ static void callback_udp_send(struct rpc_msg *msg)
int fd = msg->args[MSG_ARG_0].i;
size_t len = msg->args[MSG_ARG_1].size;
struct protocol_stack *stack = get_protocol_stack();
- int replenish_again;
+ int ret;
+ msg->result = -1;
struct lwip_sock *sock = lwip_get_socket(fd);
- if (POSIX_IS_CLOSED(sock)) {
- msg->result = -1;
+ if (unlikely(POSIX_IS_CLOSED(sock))) {
return;
}
@@ -605,8 +610,15 @@ static void callback_udp_send(struct rpc_msg *msg)
calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
}
- replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
- if ((replenish_again > 0) && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) {
+ ret = lwip_send(fd, sock, len, 0);
+ if (unlikely(ret < 0) && (errno == ENOTCONN || errno == ECONNRESET || errno == ECONNABORTED)) {
+ __sync_fetch_and_sub(&sock->call_num, 1);
+ return;
+ }
+ msg->result = 0;
+
+ ret = do_lwip_replenish_sendring(stack, sock);
+ if (ret > 0 && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) {
rpc_call_replenish(&stack->rpc_queue, sock);
return;
}
@@ -629,9 +641,8 @@ int rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags)
msg->args[MSG_ARG_0].i = fd;
msg->args[MSG_ARG_1].size = len;
msg->args[MSG_ARG_2].i = flags;
- msg->sync_flag = 0;
- rpc_call(queue, msg);
+ rpc_async_call(queue, msg);
return 0;
}
@@ -649,9 +660,8 @@ int rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags)
msg->args[MSG_ARG_0].i = fd;
msg->args[MSG_ARG_1].size = len;
msg->args[MSG_ARG_2].i = flags;
- msg->sync_flag = 0;
- rpc_call(queue, msg);
+ rpc_async_call(queue, msg);
return 0;
}
@@ -663,7 +673,7 @@ static void callback_replenish_sendring(struct rpc_msg *msg)
msg->result = do_lwip_replenish_sendring(stack, sock);
if (msg->result == true) {
msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg);
+ rpc_async_call(&stack->rpc_queue, msg);
}
}
@@ -675,9 +685,8 @@ int rpc_call_replenish(rpc_queue *queue, void *sock)
}
msg->args[MSG_ARG_0].p = sock;
- msg->sync_flag = 0;
- rpc_call(queue, msg);
+ rpc_async_call(queue, msg);
return 0;
}
@@ -713,16 +722,17 @@ static void callback_clean_epoll(struct rpc_msg *msg)
list_del_node(&wakeup->wakeup_list[stack->stack_idx]);
}
-void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup)
+int rpc_call_clean_epoll(rpc_queue *queue, void *wakeup)
{
struct rpc_msg *msg = rpc_msg_alloc(callback_clean_epoll);
if (msg == NULL) {
- return;
+ return -1;
}
msg->args[MSG_ARG_0].p = wakeup;
rpc_sync_call(queue, msg);
+ return 0;
}
static void callback_arp(struct rpc_msg *msg)
@@ -740,11 +750,9 @@ int rpc_call_arp(rpc_queue *queue, void *mbuf)
return -1;
}
- msg->sync_flag = 0;
msg->args[MSG_ARG_0].p = mbuf;
- rpc_call(queue, msg);
-
+ rpc_async_call(queue, msg);
return 0;
}
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index 0c7bb62..dcb7dac 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -29,15 +29,23 @@ unsigned same_node_ring_count(struct lwip_sock *sock);
#define NETCONN_IS_OUTIDLE(sock) gazelle_ring_readable_count((sock)->send_ring)
#define NETCONN_IS_UDP(sock) (NETCONNTYPE_GROUP(netconn_type((sock)->conn)) == NETCONN_UDP)
-void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock);
-
+/* lwip api */
struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size);
struct pbuf *do_lwip_udp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size);
void do_lwip_get_from_sendring_over(struct lwip_sock *sock);
-bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *sock);
ssize_t do_lwip_read_from_lwip(struct lwip_sock *sock, int32_t flags, uint8_t apiflags);
-/* app write/read ring */
+/* lwip api */
+void do_lwip_free_pbuf(struct pbuf *pbuf);
+struct pbuf *do_lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type);
+
+/* lwip api */
+void do_lwip_add_recvlist(int32_t fd);
+/* stack api */
+void do_lwip_read_recvlist(struct protocol_stack *stack, uint32_t max_num);
+
+
+/* app api */
ssize_t do_lwip_sendmsg_to_stack(struct lwip_sock *sock, int32_t s,
const struct msghdr *message, int32_t flags);
ssize_t do_lwip_recvmsg_from_stack(int32_t s, const struct msghdr *message, int32_t flags);
@@ -47,17 +55,14 @@ ssize_t do_lwip_send_to_stack(int32_t fd, const void *buf, size_t len, int32_t f
ssize_t do_lwip_read_from_stack(int32_t fd, void *buf, size_t len, int32_t flags,
struct sockaddr *addr, socklen_t *addrlen);
-void do_lwip_read_recvlist(struct protocol_stack *stack, uint32_t max_num);
-void do_lwip_add_recvlist(int32_t fd);
-int do_lwip_send(struct protocol_stack *stack, int32_t fd, struct lwip_sock *sock,
- size_t len, int32_t flags);
+/* stack api */
+bool do_lwip_replenish_sendring(struct protocol_stack *stack, struct lwip_sock *sock);
+
+void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock);
uint32_t do_lwip_get_conntable(struct gazelle_stat_lstack_conn_info *conn, uint32_t max_num);
uint32_t do_lwip_get_connnum(void);
-void do_lwip_free_pbuf(struct pbuf *pbuf);
-struct pbuf *do_lwip_alloc_pbuf(pbuf_layer layer, uint16_t length, pbuf_type type);
-
void read_same_node_recv_list(struct protocol_stack *stack);
#endif
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index fdd5388..08a3901 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -31,11 +31,8 @@
#include "lstack_tx_cache.h"
#define SOCK_RECV_RING_SIZE (get_global_cfg_params()->recv_ring_size)
-#define SOCK_RECV_FREE_THRES (32)
#define SOCK_RECV_RING_SIZE_MAX (2048)
#define SOCK_SEND_RING_SIZE_MAX (2048)
-#define SOCK_SEND_REPLENISH_THRES (16)
-#define WAKEUP_MAX_NUM (32)
#define MBUFPOOL_RESERVE_NUM (get_global_cfg_params()->nic.rxqueue_size + 1024)
@@ -113,7 +110,7 @@ struct protocol_stack_group {
long get_stack_tid(void);
struct protocol_stack *get_protocol_stack(void);
-struct protocol_stack *get_protocol_stack_by_fd(int32_t fd);
+struct protocol_stack *get_protocol_stack_by_fd(int fd);
struct protocol_stack *get_bind_protocol_stack(void);
struct protocol_stack_group *get_protocol_stack_group(void);
@@ -121,13 +118,13 @@ int get_min_conn_stack(struct protocol_stack_group *stack_group);
void bind_to_stack_numa(struct protocol_stack *stack);
void thread_bind_stack(struct protocol_stack *stack);
-int32_t stack_group_init(void);
+int stack_group_init(void);
void stack_group_exit(void);
void stack_exit(void);
-int32_t stack_setup_thread(void);
-int32_t stack_setup_app_thread(void);
+int stack_setup_thread(void);
+int stack_setup_app_thread(void);
-int stack_polling(uint32_t wakeup_tick);
+int stack_polling(unsigned wakeup_tick);
#endif
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index c2654bb..6f8e03e 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -32,8 +32,6 @@ struct rpc_stats {
uint64_t call_alloc_fail;
};
-struct rpc_msg;
-typedef void (*rpc_msg_func)(struct rpc_msg *msg);
union rpc_msg_arg {
int i;
unsigned int u;
@@ -41,22 +39,25 @@ union rpc_msg_arg {
unsigned long ul;
void *p;
const void *cp;
- socklen_t socklen;
size_t size;
};
-struct rpc_msg_pool {
- struct rte_mempool *mempool;
-};
+
+struct rpc_msg;
+typedef void (*rpc_func_t)(struct rpc_msg *msg);
struct rpc_msg {
- pthread_spinlock_t lock; /* msg handler unlock notice sender msg process done */
int8_t sync_flag : 1;
int8_t recall_flag : 1;
- int64_t result; /* func return val */
- lockless_queue_node queue_node;
- struct rpc_msg_pool *rpcpool;
- rpc_msg_func func; /* msg handle func hook */
+ long result; /* func return val */
+ rpc_func_t func; /* msg handle func hook */
union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */
+
+ struct rpc_msg_pool {
+ struct rte_mempool *mempool;
+ } *rpcpool;
+
+ pthread_spinlock_t lock; /* msg handler unlock notice sender msg process done */
+ lockless_queue_node queue_node;
};
static inline void rpc_queue_init(rpc_queue *queue)
@@ -92,7 +93,7 @@ int rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags);
int rpc_call_replenish(rpc_queue *queue, void *sock);
int rpc_call_recvlistcnt(rpc_queue *queue);
-void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup);
+int rpc_call_clean_epoll(rpc_queue *queue, void *wakeup);
int rpc_call_arp(rpc_queue *queue, void *mbuf);
int rpc_call_conntable(rpc_queue *queue, void *conn_table, unsigned max_conn);
--
2.33.0

View File

@ -0,0 +1,347 @@
From 3313619bd53f1bafc5e48eb4642213fd5208c0e2 Mon Sep 17 00:00:00 2001
From: Lemmy Huang <hlm3280@163.com>
Date: Sun, 1 Sep 2024 11:33:12 +0800
Subject: [PATCH] cleancode: declare different cfg_params types
Signed-off-by: Lemmy Huang <hlm3280@163.com>
---
src/lstack/core/lstack_cfg.c | 10 +-
src/lstack/core/lstack_dpdk.c | 8 +-
src/lstack/core/lstack_protocol_stack.c | 2 +-
src/lstack/core/lstack_virtio.c | 4 +-
src/lstack/include/lstack_cfg.h | 161 +++++++++++----------
src/lstack/include/lstack_protocol_stack.h | 2 +-
src/lstack/netif/lstack_ethdev.c | 4 +-
src/lstack/netif/lstack_vdev.c | 2 +-
8 files changed, 104 insertions(+), 89 deletions(-)
diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c
index 882e60a..659a2a7 100644
--- a/src/lstack/core/lstack_cfg.c
+++ b/src/lstack/core/lstack_cfg.c
@@ -1300,9 +1300,9 @@ static int32_t parse_use_sockmap(void)
static int32_t parse_nic_rxqueue_size(void)
{
int32_t ret;
- PARSE_ARG(g_config_params.nic.rxqueue_size, "nic_rxqueue_size", 4096,
+ PARSE_ARG(g_config_params.rxqueue_size, "nic_rxqueue_size", 4096,
NIC_QUEUE_SIZE_MIN, NIC_QUEUE_SIZE_MAX, ret);
- if (!rte_is_power_of_2(g_config_params.nic.rxqueue_size)) {
+ if (!rte_is_power_of_2(g_config_params.rxqueue_size)) {
LSTACK_LOG(ERR, LSTACK, "nic queue size only support power of two\n");
return -1;
}
@@ -1312,9 +1312,9 @@ static int32_t parse_nic_rxqueue_size(void)
static int32_t parse_nic_txqueue_size(void)
{
int32_t ret;
- PARSE_ARG(g_config_params.nic.txqueue_size, "nic_txqueue_size", 2048,
+ PARSE_ARG(g_config_params.txqueue_size, "nic_txqueue_size", 2048,
NIC_QUEUE_SIZE_MIN, NIC_QUEUE_SIZE_MAX, ret);
- if (!rte_is_power_of_2(g_config_params.nic.txqueue_size)) {
+ if (!rte_is_power_of_2(g_config_params.txqueue_size)) {
LSTACK_LOG(ERR, LSTACK, "nic queue size only support power of two\n");
return -1;
}
@@ -1352,7 +1352,7 @@ static int32_t parse_stack_thread_mode(void)
static int32_t parse_nic_vlan_mode(void)
{
int32_t ret;
- PARSE_ARG(g_config_params.nic.vlan_mode, "nic_vlan_mode", -1, -1, 4094, ret);
+ PARSE_ARG(g_config_params.vlan_mode, "nic_vlan_mode", -1, -1, 4094, ret);
return ret;
}
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index f87e362..1fe0f0a 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -432,8 +432,8 @@ static int eth_params_init(struct eth_params *eth_params, uint16_t port_id, uint
eth_params->port_id = port_id;
eth_params->nb_queues = nb_queues;
- eth_params->nb_rx_desc = get_global_cfg_params()->nic.rxqueue_size;
- eth_params->nb_tx_desc = get_global_cfg_params()->nic.txqueue_size;
+ eth_params->nb_rx_desc = get_global_cfg_params()->rxqueue_size;
+ eth_params->nb_tx_desc = get_global_cfg_params()->txqueue_size;
eth_params->conf.link_speeds = RTE_ETH_LINK_SPEED_AUTONEG;
eth_params->conf.txmode.mq_mode = RTE_ETH_MQ_TX_NONE;
eth_params->conf.rxmode.mq_mode = RTE_ETH_MQ_RX_NONE;
@@ -571,7 +571,7 @@ int32_t dpdk_ethdev_init(int port_id)
}
/* after rte_eth_dev_configure */
- if ((get_global_cfg_params()->nic.vlan_mode != -1) &&
+ if ((get_global_cfg_params()->vlan_mode != -1) &&
((stack_group->rx_offload & RTE_ETH_RX_OFFLOAD_VLAN_FILTER) == RTE_ETH_RX_OFFLOAD_VLAN_FILTER)) {
/*
* vlan filter can be configured for switch,nic and software.
@@ -583,7 +583,7 @@ int32_t dpdk_ethdev_init(int port_id)
*/
if ((get_global_cfg_params()->bond_mode != BONDING_MODE_8023AD) &&
(get_global_cfg_params()->bond_mode != BONDING_MODE_ALB)) {
- ret = rte_eth_dev_vlan_filter(port_id, get_global_cfg_params()->nic.vlan_mode, 1);
+ ret = rte_eth_dev_vlan_filter(port_id, get_global_cfg_params()->vlan_mode, 1);
if (ret != 0) {
LSTACK_LOG(ERR, LSTACK, "dpdk add vlan filter failed ret = %d\n", ret);
return -1;
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index f1eeba1..d03b744 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -571,7 +571,7 @@ static int stack_group_init_mempool(void)
struct cfg_params *cfg_params = get_global_cfg_params();
uint32_t total_mbufs = 0;
uint32_t total_conn_mbufs = cfg_params->mbuf_count_per_conn * cfg_params->tcp_conn_count;
- uint32_t total_nic_mbufs = cfg_params->nic.rxqueue_size + cfg_params->nic.txqueue_size;
+ uint32_t total_nic_mbufs = cfg_params->rxqueue_size + cfg_params->txqueue_size;
struct rte_mempool *rxtx_mbuf = NULL;
uint32_t cpu_id = 0;
unsigned numa_id = 0;
diff --git a/src/lstack/core/lstack_virtio.c b/src/lstack/core/lstack_virtio.c
index 7a8d947..fefb06d 100644
--- a/src/lstack/core/lstack_virtio.c
+++ b/src/lstack/core/lstack_virtio.c
@@ -226,10 +226,10 @@ void virtio_tap_process_rx(uint16_t port, uint32_t queue_id)
* so no action will be taken.
* For TSO, tap devices do not support it, so no action will be taken.
*/
- if (get_global_cfg_params()->nic.vlan_mode != -1) {
+ if (get_global_cfg_params()->vlan_mode != -1) {
for (int i = 0; i< pkg_num; i++) {
pkts_burst[i]->ol_flags |= RTE_MBUF_F_TX_VLAN;
- pkts_burst[i]->vlan_tci = (u16_t)get_global_cfg_params()->nic.vlan_mode;
+ pkts_burst[i]->vlan_tci = (u16_t)get_global_cfg_params()->vlan_mode;
}
}
diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h
index 4fc99f3..5e2d6fc 100644
--- a/src/lstack/include/lstack_cfg.h
+++ b/src/lstack/include/lstack_cfg.h
@@ -50,93 +50,108 @@
#define LSTACK_LPM_PKTS_IN_DETECT 1000
#define LSTACK_LPM_RX_PKTS 20
-
#define LSTACK_LPM_PKTS_IN_DETECT_MIN 5
#define LSTACK_LPM_PKTS_IN_DETECT_MAX 65535
+struct dev_addr {
#define DEV_ADDR_TYPE_EMPTY 0
#define DEV_ADDR_TYPE_MAC 1
#define DEV_ADDR_TYPE_PCI 2
-
-struct dev_addr {
- uint8_t addr_type; // 0:empty, 1:mac, 2:pci
+ uint8_t addr_type;
union addr_union {
struct rte_ether_addr mac_addr;
struct rte_pci_addr pci_addr;
} addr;
};
-struct secondary_attach_arg {
- uint8_t socket_num;
- uint64_t socket_size;
- uint32_t socket_per_size[GAZELLE_MAX_NUMA_NODES];
- uintptr_t base_virtaddr;
- char file_prefix[PATH_MAX];
-};
-
-struct cfg_nic_params {
- uint32_t rxqueue_size;
- uint32_t txqueue_size;
- int32_t vlan_mode;
-};
-
struct cfg_params {
- ip4_addr_t host_addr;
- ip6_addr_t host_addr6;
- ip4_addr_t netmask;
- ip4_addr_t gateway_addr;
- uint8_t mac_addr[ETHER_ADDR_LEN];
- uint16_t num_cpu;
- uint32_t cpus[CFG_MAX_CPUS];
- uint32_t send_cpus[CFG_MAX_CPUS];
- uint32_t recv_cpus[CFG_MAX_CPUS];
- uint16_t app_exclude_num_cpu;
- uint32_t app_exclude_cpus[CFG_MAX_CPUS];
- uint8_t num_ports;
- uint16_t ports[CFG_MAX_PORTS];
char log_file[PATH_MAX];
- uint16_t low_power_mod;
- uint16_t lpm_rx_pkts;
- uint32_t lpm_detect_ms;
- uint32_t lpm_pkts_in_detect;
- uint32_t tcp_conn_count;
- uint32_t mbuf_count_per_conn;
- uint32_t read_connect_number;
- uint32_t rpc_number;
- uint32_t nic_read_number;
- uint8_t use_ltran; // false:lstack read from nic. true:lstack read form ltran process.
-
- uint16_t num_process;
- uint16_t num_listen_port;
- uint16_t is_primary;
- uint16_t num_queue;
- uint16_t tot_queue_num;
- uint8_t process_idx;
- uint32_t process_numa[PROTOCOL_STACK_MAX];
-
- bool kni_switch;
- bool listen_shadow; // true:listen in all stack thread. false:listen in one stack thread.
- bool app_bind_numa;
- bool main_thread_affinity;
- bool seperate_send_recv;
- int dpdk_argc;
- char **dpdk_argv;
- struct secondary_attach_arg sec_attach_arg;
- char unix_socket_filename[NAME_MAX];
- uint16_t send_ring_size;
- uint16_t recv_ring_size;
- bool tuple_filter;
- int8_t bond_mode;
- int32_t bond_miimon;
- struct dev_addr bond_slave_addr[GAZELLE_MAX_BOND_NUM];
- bool use_sockmap;
- bool udp_enable;
- struct cfg_nic_params nic;
- bool stack_mode_rtc;
- bool nonblock_mode;
- uint32_t rpc_msg_max;
- bool send_cache_mode;
- bool flow_bifurcation;
+
+ struct { // dpdk
+ char **dpdk_argv;
+ uint8_t dpdk_argc;
+ struct secondary_attach_arg {
+ uint8_t socket_num;
+ uint64_t socket_size;
+ uint32_t socket_per_size[GAZELLE_MAX_NUMA_NODES];
+ uintptr_t base_virtaddr;
+ char file_prefix[PATH_MAX];
+ } sec_attach_arg;
+ };
+
+ struct { // eth
+ ip4_addr_t host_addr;
+ ip6_addr_t host_addr6;
+ ip4_addr_t netmask;
+ ip4_addr_t gateway_addr;
+ uint8_t mac_addr[ETHER_ADDR_LEN];
+ int8_t bond_mode;
+ int32_t bond_miimon;
+ struct dev_addr bond_slave_addr[GAZELLE_MAX_BOND_NUM];
+ };
+
+ struct { // low_power
+ uint16_t low_power_mod;
+ uint16_t lpm_rx_pkts;
+ uint32_t lpm_detect_ms;
+ uint32_t lpm_pkts_in_detect;
+ };
+
+ struct { // eth_rxtx
+ uint32_t rxqueue_size;
+ uint32_t txqueue_size;
+ uint16_t num_queue;
+ uint16_t tot_queue_num;
+ bool send_cache_mode;
+ bool flow_bifurcation;
+ int32_t vlan_mode;
+ };
+
+ struct { // stack
+ uint16_t num_cpu;
+ uint32_t cpus[CFG_MAX_CPUS];
+
+ bool main_thread_affinity;
+ bool app_bind_numa;
+ uint16_t app_exclude_num_cpu;
+ uint32_t app_exclude_cpus[CFG_MAX_CPUS];
+
+ bool stack_mode_rtc;
+ bool listen_shadow; // true:listen in all stack thread. false:listen in one stack thread.
+
+ uint32_t read_connect_number;
+ uint32_t nic_read_number;
+ uint32_t rpc_number;
+ uint32_t rpc_msg_max;
+ };
+
+ struct { // socket
+ uint16_t send_ring_size;
+ uint16_t recv_ring_size;
+ uint32_t tcp_conn_count;
+ uint32_t mbuf_count_per_conn;
+ };
+
+ struct { // deprecated
+ char unix_socket_filename[NAME_MAX];
+ bool use_ltran; // false:lstack read from nic. true:lstack read form ltran process.
+ bool nonblock_mode;
+ bool udp_enable;
+ bool kni_switch;
+ };
+
+ struct { // experiment
+ uint16_t num_process;
+ uint16_t is_primary;
+ uint8_t process_idx;
+ uint32_t process_numa[PROTOCOL_STACK_MAX];
+ bool tuple_filter;
+ bool use_sockmap;
+
+ bool seperate_send_recv;
+ uint32_t send_cpus[CFG_MAX_CPUS];
+ uint32_t recv_cpus[CFG_MAX_CPUS];
+ };
};
struct cfg_params *get_global_cfg_params(void);
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 08a3901..8cb0020 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -34,7 +34,7 @@
#define SOCK_RECV_RING_SIZE_MAX (2048)
#define SOCK_SEND_RING_SIZE_MAX (2048)
-#define MBUFPOOL_RESERVE_NUM (get_global_cfg_params()->nic.rxqueue_size + 1024)
+#define MBUFPOOL_RESERVE_NUM (get_global_cfg_params()->rxqueue_size + 1024)
struct protocol_stack {
uint32_t tid;
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 4f3cbc1..1a721f6 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -418,8 +418,8 @@ int32_t ethdev_init(struct protocol_stack *stack)
}
/* 0-4094: The vlaue range for VLAN IDs is 0 to 4094. */
- if (get_global_cfg_params()->nic.vlan_mode >= 0 && get_global_cfg_params()->nic.vlan_mode <= 4094) {
- netif_set_vlan_tci(&stack->netif, (u16_t)get_global_cfg_params()->nic.vlan_mode);
+ if (get_global_cfg_params()->vlan_mode >= 0 && get_global_cfg_params()->vlan_mode <= 4094) {
+ netif_set_vlan_tci(&stack->netif, (u16_t)get_global_cfg_params()->vlan_mode);
}
netif_set_link_up(&stack->netif);
diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c
index 9ca77ba..e1a63a7 100644
--- a/src/lstack/netif/lstack_vdev.c
+++ b/src/lstack/netif/lstack_vdev.c
@@ -145,7 +145,7 @@ static uint32_t vdev_rx_poll(struct protocol_stack *stack, struct rte_mbuf **pkt
}
/* skip gro when tcp/ip cksum offloads disable */
- if (get_protocol_stack_group()->rx_offload == 0 || (get_global_cfg_params()->nic.vlan_mode >= 0
+ if (get_protocol_stack_group()->rx_offload == 0 || (get_global_cfg_params()->vlan_mode >= 0
&& !(get_protocol_stack_group()->rx_offload & RTE_ETH_RX_OFFLOAD_VLAN_STRIP))) {
return pkt_num;
}
--
2.33.0

View File

@ -2,7 +2,7 @@
Name: gazelle
Version: 1.0.2
Release: 63
Release: 64
Summary: gazelle is a high performance user-mode stack
License: MulanPSL-2.0
URL: https://gitee.com/openeuler/gazelle
@ -273,6 +273,10 @@ Patch9253: 0253-example-solve-double-free.patch
Patch9254: 0254-WRAP-support-setsockopt-SO_SNDTIMEO-SO_SNBUF.patch
Patch9255: 0255-DFX-adapt-log-optimization.patch
Patch9256: 0256-LOG-add-log-when-udp-send_ring-is-exhausted.patch
Patch9257: 0257-cleancode-refactor-rtc_api-rtw_api-and-dummy_api.patch
Patch9258: 0258-cleancode-move-some-API-from-stack-to-rpc-and-rtw.patch
Patch9259: 0259-cleancode-add-rpc_async_call-remove-rpc_msg_arg.sock.patch
Patch9260: 0260-cleancode-declare-different-cfg_params-types.patch
%description
%{name} is a high performance user-mode stack.
@ -314,6 +318,12 @@ install -Dpm 0640 %{_builddir}/%{name}-%{version}/src/ltran/ltran.conf %{b
%config(noreplace) %{conf_path}/ltran.conf
%changelog
* Fri Sep 27 2024 yinbin6 <jiangheng14@huawei.com> - 1.0.2-64
- cleancode: declare different cfg_params types
- cleancode: add rpc_async_call, remove rpc_msg_arg.socklen, fix some format
- cleancode: move some API from stack to rpc and rtw
- cleancode: refactor rtc_api rtw_api and dummy_api
* Fri Sep 27 2024 yinbin6 <jiangheng14@huawei.com> - 1.0.2-63
- LOG:add log when udp send_ring is exhausted
- DFX: adapt log optimization