From f599cc99be29c47a7765e3fae602f3474b30926a Mon Sep 17 00:00:00 2001 From: yinbin6 Date: Thu, 11 Jul 2024 10:30:31 +0800 Subject: [PATCH] example: sync example update --- build/build.sh | 8 + examples/FAULT_INJECT.md | 33 ++ examples/README.md | 79 +++- examples/inc/bussiness.h | 16 +- examples/inc/client.h | 44 +- examples/inc/parameter.h | 80 +++- examples/inc/server.h | 25 +- examples/inc/utilities.h | 88 +++- examples/main.c | 7 +- examples/src/bussiness.c | 212 ++++++--- examples/src/client.c | 562 ++++++++++++++++++------ examples/src/parameter.c | 484 +++++++++++++++++--- examples/src/server.c | 512 ++++++++++++++++----- examples/src/utilities.c | 386 +++++++++++----- src/lstack/core/lstack_protocol_stack.c | 2 +- 15 files changed, 2006 insertions(+), 532 deletions(-) create mode 100644 examples/FAULT_INJECT.md diff --git a/build/build.sh b/build/build.sh index 4464f8c..622e1cc 100755 --- a/build/build.sh +++ b/build/build.sh @@ -31,3 +31,11 @@ if [ $? -ne 0 ]; then fi cd - +cd ../examples +cmake . +make +if [ $? -ne 0 ]; then + echo "build examples failed" + exit 1 +fi +cd - diff --git a/examples/FAULT_INJECT.md b/examples/FAULT_INJECT.md new file mode 100644 index 0000000..ff551a9 --- /dev/null +++ b/examples/FAULT_INJECT.md @@ -0,0 +1,33 @@ +# Gazelle 故障注入 说明 + +## 需求 +1. example:构造黑盒故障 + * 延迟类:accept|read: + * accept: 构造tcp_acceptmbox_full的情景. + * read: 构造tcp_refuse_count、recvmbox满 + * 跳过类:跳过 read/write并close: + * read: 构造链接关闭时时4次挥手的情景,验证TCP状态机。 +2. gazelle/lwip: 构造白盒故障,支持注入故障报文、协议栈状态、事件设置、资源异常等 + * 编译宏支持 + * 提供接口:配置文件、env + * 故障报文注入: + * 类似内核tc工具: + * 内核TC工具qdisc指令原理:报文分组被添加到网卡队列(qdisc),该队列决定发包顺序。
+ qdisc指令可以在队列层面实现延时、丢包、重复等故障。 + * dpdk性能检测工具testpmd可以模拟实现类似的故障模拟,testpmd与gazelle不兼容,需要参考其中调用的dpdk接口来改gazelle代码。
+ * 延时故障 + * 丢包故障 + - 思路:调整网卡队列,随机丢弃百分比的包,然后发送。 + - 函数调用:rte_rand(),rte_eth_tx_burst()。 + * 包重复故障 + * 随机故障 + * 乱序故障 + * 协议栈状态故障 + * ... + * 事件设置 + * ... + * 资源异常 + * 资源耗尽,无法申请。 + * ... + + diff --git a/examples/README.md b/examples/README.md index 5a73ce0..77a0f85 100644 --- a/examples/README.md +++ b/examples/README.md @@ -7,6 +7,7 @@ * 支持多线程网络非对称模型,一个 listen 线程,若干个读写线程。listen 线程和读写线程使用 `poll` / `epoll` 监听事件。 * 支持 `recvmsg` 、`sendmsg` 、`recv` 、`send` 、`recvfrom`、`sendto`、`getpeername` 、`getsockopt` 、`epoll_ctl` 等 posix 接口。 * 网络通讯报文采用问答方式,丢包或者内容错误则报错并停止通讯。报文内容有变化,长度可配。 +* 支持网络故障注入,延迟进行(delay)、跳过(skip)read、write、accept等逻辑。 ## 网络模型 @@ -103,15 +104,15 @@ * `-a, --as [server | client]`:作为服务端还是客户端。 * `server`:作为服务端。 * `client`:作为客户端。 -* `-i, --ip [xxx.xxx.xxx.xxx]`:IP地址。 -* `-g, --groupip [xxx.xxx.xxx.xxx]`:UDP组播地址。 +* `-i, --ip [xxx.xxx.xxx.xxx]`:server端IP地址。当v4与v6地址同时存在时,以","分隔。例如:`-i 192.168.1.88,aa22:bb11:1122:cdef:1234:aa99:7654:7410` +* `-g, --groupip [xxx.xxx.xxx.xxx,xxx.xxx.xxx.xxx]`:配置UDP组播地址与interface地址,以','分隔,其中interface地址为可选项。例如:`-g 224.0.0.24,192.168.1.202`或`-g 224.0.0.24` * `-p, --port [xxxx]`:端口。 * `-m, --model [mum | mud]`:采用的网络模型类型。 * `mum (multi thread, unblock, multiplexing IO)`:多线程非阻塞IO复用。 * `mud (multi thread, unblock, dissymmetric)`:多线程非阻塞非对称。 * `-t, --threadnum`:线程数设置。 * `-c, --connectnum`:连接数设置。当 `domain` 设置为 `udp` 时,`connectnum` 会被设置为1。 -* `-D, --domain [unix | tcp | udp]`:通信协议。 +* `-D, --domain [unix | tcp | udp]`:通信协议。当支持多个通信协议时以","分隔。例如:`-D tcp,udp` * `unix`:基于 unix 协议实现。 * `tcp`:基于 tcp 协议实现。 * `udp`:基于 udp 协议实现。 @@ -132,6 +133,19 @@ * `-C, --accept`:accept的方式。 * `ac`:使用accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)通过套接口接受连接。 * `ac4`:使用accept4(int sockfd, struct sockaddr *addr,socklen_t *addrlen, int flags)通过套接口接受连接,flags=SOCK_CLOEXEC。 +* `-k, --keep_alive`:配置TCP keep_alive idle , keep_alive interval时间(second)。 +* `-I, --inject`: 配置故障注入类型。 + * `delay`: + * `"delay 20 before_accept"`: 延迟20秒进行accept,时间可自定义,需大于0。可用于构造tcp_acceptmbox_full的情景 + * `"delay 20 before_read"`: 延迟20秒进行read,时间可自定义,需大于0。 + * `"delay 20 before_write"`: 延迟20秒进行write,时间可自定义,需大于0。 + * `"delay 20 before_read_and_write"`: 延迟20秒进行read和write,时间可自定义,需大于0。 + * `skip`: + * `"skip write"`: 跳过写过程,并关闭链接。 + * `"skip read"`: 跳过读过程,并关闭链接。 + * `"skip read_and_write"`: 跳过读写写过程,并关闭链接。 + + ## 使用 * **环境配置** @@ -235,12 +249,12 @@ make * 创建udp组播服务端 ``` -./example -A server -D udp -i 192.168.0.1 -g 225.0.0.1 -A recvfromsendto +./example -A server -D udp -g 225.0.0.1,192.168.0.1 -A recvfromsendto [program parameters]: --> [as]: server ---> [server ip]: 192.168.0.1 --> [server group ip]: 225.0.0.1 +--> [server groupip_interface]: 192.168.0.1 --> [server port]: 5050 --> [model]: mum --> [thread number]: 1 @@ -260,12 +274,12 @@ make * 创建udp组播客户端 ``` -./example -A client -D udp -i 192.168.0.1 -g 225.0.0.1 -A recvfromsendto +./example -A client -D udp -g 225.0.0.1,192.168.0.1 -A recvfromsendto [program parameters]: ---> [as]: server ---> [server ip]: 225.0.0.1 ---> [client send ip]: 192.168.0.1 +--> [as]: client +--> [client group ip]: 225.0.0.1 +--> [client groupip_interface]: 192.168.0.1 --> [server port]: 5050 --> [thread number]: 1 --> [connection number]: 1 @@ -280,3 +294,50 @@ make [program informations]: --> : [connect num]: 0, [send]: 0.000 B/s ``` + +* 混杂模式下server 与 client 配置 +``` +./example -a server -D tcp,udp -i 192.168.1.88 -p 33333 -g 224.0.0.24,192.168.1.188 +[program parameters]: +--> [as]: server +--> [server group ip]: 224.0.0.24 +--> [server groupip_interface]: 192.168.1.188 +--> [server ip]: 192.168.1.888 +--> [server port]: 33333 +--> [model]: mum +--> [thread number]: 1 +--> [domain]: tcp,udp +--> [api]: read & write +--> [packet length]: 1024 +--> [verify]: off +--> [ringpmd]: off +--> [debug]: off +--> [epoll create]: ec +--> [accept]: ac +--> [inject]: none + +[program informations]: +``` +``` +./example -a client -D tcp,udp -i 192.168.1.188 -p 33333 -g 192.168.1.202,224.0.0.24 +[program parameters]: +--> [as]: client +--> [client group ip]: 224.0.0.24 +--> [client groupip_interface]: 192.168.1.202 +--> [server ip]: 192.168.1.188 +--> [server port]: 33333 +--> [thread number]: 1 +--> [connection number]: 1 +--> [domain]: tcp,udp +--> [api]: read & write +--> [packet length]: 1024 +--> [verify]: off +--> [ringpmd]: off +--> [debug]: off +--> [epoll create]: ec +--> [accept]: ac +--> [inject]: none + +[program informations]: + +``` \ No newline at end of file diff --git a/examples/inc/bussiness.h b/examples/inc/bussiness.h index 83645ef..3a78b1f 100644 --- a/examples/inc/bussiness.h +++ b/examples/inc/bussiness.h @@ -28,7 +28,9 @@ */ struct ServerHandler { + int32_t listen_fd_array[PROTOCOL_MODE_MAX]; int32_t fd; ///< socket file descriptor + int32_t is_v6; }; /** @@ -39,6 +41,7 @@ struct ClientHandler { int32_t fd; ///< socket file descriptor uint32_t msg_idx; ///< the start charactors index of message + int32_t sendtime_interverl; ///< udp send packet interverl }; @@ -90,24 +93,21 @@ int32_t client_bussiness(char *out, const char *in, uint32_t size, bool verify, /** * @brief server checks the information and answers * This function checks the information and answers. - * @param server_handler server handler + * @param fd socket_fd * @param pktlen the length of package * @param api the api * @return the result */ -int32_t server_ans(struct ServerHandler *server_handler, uint32_t pktlen, const char* api, const char* domain); +int32_t server_ans(int32_t fd, uint32_t pktlen, const char* api, const char* domain); /** * @brief client asks server * This function asks server. * @param client_handler client handler - * @param pktlen the length of package - * @param api the api - * @param domain the domain + * @param client_unit ClientUnit * @return the result */ -int32_t client_ask(struct ClientHandler *client_handler, uint32_t pktlen, const char* api, const char* domain, in_addr_t ip, uint16_t port); - +int32_t client_ask(struct ClientHandler *client_handler, struct ClientUnit *client_unit); /** * @brief client checks the information and answers * This function checks the information and answers. @@ -119,7 +119,7 @@ int32_t client_ask(struct ClientHandler *client_handler, uint32_t pktlen, const * @param ip the ip address of peer, maybe group ip * @return the result */ -int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, bool verify, const char* api, const char* domain, in_addr_t ip); +int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, bool verify, const char* api, const char* domain, ip_addr_t* ip); #endif // __EXAMPLES_BUSSINESS_H__ diff --git a/examples/inc/client.h b/examples/inc/client.h index 97af33f..0fe07aa 100644 --- a/examples/inc/client.h +++ b/examples/inc/client.h @@ -19,31 +19,8 @@ #include "parameter.h" #include "bussiness.h" - -/** - * @brief client unit - * The information of each thread of client. - */ -struct ClientUnit -{ - struct ClientHandler *handlers; ///< the handlers - int32_t epfd; ///< the connect epoll file descriptor - struct epoll_event *epevs; ///< the epoll events - uint32_t curr_connect; ///< current connection number - uint64_t send_bytes; ///< total send bytes - in_addr_t ip; ///< server ip - in_addr_t groupip; ///< server groupip - uint16_t port; ///< server port - uint16_t sport; ///< client sport - uint32_t connect_num; ///< total connection number - uint32_t pktlen; ///< the length of peckage - bool verify; ///< if we verify or not - char* domain; ///< the communication domain - char* api; ///< the type of api - bool debug; ///< if we print the debug information - char* epollcreate; ///< epoll_create method - struct ClientUnit *next; ///< next pointer -}; +#define TIME_SCAN_INTERVAL 1 +#define TIME_SEND_INTERVAL 1 /** * @brief client @@ -53,8 +30,14 @@ struct Client { struct ClientUnit *uints; ///< the server mum unit bool debug; ///< if we print the debug information + uint32_t threadNum; + bool loop; ///< judge client info print while loop is open }; +struct Client_domain_ip { + char *domain; + uint8_t ip_family; +}; /** * @brief the single thread, client prints informations @@ -66,7 +49,7 @@ struct Client * @param debug if debug or not * @return the result pointer */ -void client_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug); +void client_debug_print(const char *ch_str, const char *act_str, ip_addr_t *ip, uint16_t port, bool debug); /** * @brief the client prints informations @@ -86,7 +69,7 @@ void client_info_print(struct Client *client); * @param domain domain * @return the result pointer */ -int32_t client_thread_try_connect(struct ClientHandler *client_handler, int32_t epoll_fd, in_addr_t ip, in_addr_t groupip, uint16_t port, uint16_t sport, const char *domain, const char *api); +int32_t client_thread_try_connect(struct ClientHandler *client_handler, struct ClientUnit *client_unit); /** * @brief the single thread, client retry to connect to server, register to epoll @@ -122,4 +105,11 @@ void *client_s_create_and_run(void *arg); int32_t client_create_and_run(struct ProgramParams *params); +/** + * @brief loop server info + * This function print loop mode server info. + */ +void loop_info_print(); + + #endif // __EXAMPLES_CLIENT_H__ diff --git a/examples/inc/parameter.h b/examples/inc/parameter.h index 93e3672..ff2f114 100644 --- a/examples/inc/parameter.h +++ b/examples/inc/parameter.h @@ -20,6 +20,8 @@ #define PARAM_DEFAULT_AS ("server") ///< default type #define PARAM_DEFAULT_IP ("127.0.0.1") ///< default IP +#define PARAM_DEFAULT_IP_V6 ("0.0.0.0.0.0.0.0") ///< default IP +#define PARAM_DEFAULT_ADDR_FAMILY (AF_INET) ///< default address family #define PARAM_DEFAULT_PORT (5050) ///< default port #define PARAM_DEFAULT_SPORT (0) ///< default sport #define PARAM_DEFAULT_MODEL ("mum") ///< default model type @@ -34,6 +36,9 @@ #define PARAM_DEFAULT_EPOLLCREATE ("ec") ///< default method of epoll_create #define PARAM_DEFAULT_ACCEPT ("ac") ///< default method of accept method #define PARAM_DEFAULT_GROUPIP ("0.0.0.0") ///< default group IP> +#define PARAM_DEFAULT_KEEPALIVEIDLE (0) ///< default TCP_KEEPALIVE_IDLE_TIME> + +#define TCP_KEEPALIVE_IDLE_MAX (3600) // time: second enum { @@ -43,7 +48,7 @@ enum { PARAM_NUM_IP = 'i', #define PARAM_NAME_PORT ("port") ///< name of parameter port PARAM_NUM_PORT = 'p', -#define PARAM_NAME_SPORT ("sport") ///< name of parameter sport +#define PARAM_NAME_SPORT ("sport") ///< name of parameter sport PARAM_NUM_SPORT = 's', #define PARAM_NAME_MODEL ("model") ///< name of parameter model type PARAM_NUM_MODEL = 'm', @@ -71,12 +76,27 @@ enum { PARAM_NUM_ACCEPT = 'C', #define PARAM_NAME_GROUPIP ("groupip") ///< name of parameter group ip PARAM_NUM_GROUPIP = 'g', +#define PARAM_NAME_KEEPALIVE ("keep_alive") ///< name of parameter keep_alive + PARAM_NUM_KEEPALIVE = 'k', +#define PARAM_NAME_INJECT ("inject") ///< name of parameter fault inject + PARAM_NUM_INJECT = 'I', }; #define NO_ARGUMENT 0 ///< options takes no arguments #define REQUIRED_ARGUMETN 1 ///< options requires arguments #define OPTIONAL_ARGUMETN 2 ///< options arguments are optional +uint8_t getbit_num(uint8_t mode, uint8_t index); +uint8_t setbitnum_on(uint8_t mode, uint8_t index); +uint8_t setbitnum_off(uint8_t mode, uint8_t index); + +uint8_t program_get_protocol_mode_by_domain_ip(char* domain, char* ipv4, char* ipv6, char* group_ip); + +struct ServerBaseCfgInfo { + const char *domain; + const char *api; + uint32_t pktlen; +}; /** * @brief program option description @@ -96,12 +116,13 @@ struct ProgramOption { struct ProgramParams { char* as; ///< as server or client char* ip; ///< IP address - uint32_t port; ///< port - uint32_t sport; ///< sport + char* ipv6; + bool port[UNIX_TCP_PORT_MAX]; ///< index:port list; value:port is set or not + bool sport[UNIX_TCP_PORT_MAX]; ///< index:sport list; value:sport is set or not char* model; ///< model type uint32_t thread_num; ///< the number of threads uint32_t connect_num; ///< the connection number - char* domain; ///< the communication dimain + char* domain; ///< the communication domain char* api; ///< the type of api uint32_t pktlen; ///< the packet length bool verify; ///< if we verify the message or not @@ -110,8 +131,58 @@ struct ProgramParams { char* accept; ///< accept connections method bool ringpmd; ///< if we use ring PMD or not char* groupip; ///< group IP address> + char* groupip_interface; ///< udp multicast interface address> + uint32_t addr_family; ///< IP address family + int32_t tcp_keepalive_idle; ///< tcp keepalive idle time + int32_t tcp_keepalive_interval; ///< tcp keepalive interval time +#define INJECT_TYPE_IDX (0) ///< the index of inject type +#define INJECT_TIME_IDX (1) ///< the index of delay time +#define INJECT_SKIP_IDX (1) ///< the index of skip location +#define INJECT_LOCATION_IDX (2) ///< the index of delay location +#define FAULT_INJECT_PARA_COUNT (3) ///< the count of fault injection parameters + char* inject[FAULT_INJECT_PARA_COUNT]; /// < fault inject }; +typedef enum { + INJECT_DELAY_ACCEPT = 0, + INJECT_DELAY_READ, + INJECT_DELAY_WRITE, + INJECT_DELAY_MAX, +}delay_type; + +typedef enum { + INJECT_SKIP_READ = 0, + INJECT_SKIP_WRITE, + INJECT_SKIP_MAX, +} skip_type; + +typedef enum { + V4_TCP, + V6_TCP, + V4_UDP, + V6_UDP, + UDP_MULTICAST, + UNIX, + PROTOCOL_MODE_MAX +} PROTOCOL_MODE_ENUM_TYPE; + +#define FAULT_INJECT_SKIP_BEGIN(skip_type) \ + if (get_g_inject_skip((skip_type))) {} \ + else { +#define FAULT_INJECT_SKIP_END } + +/** + * @brief return g_inject_skip value + * This function return g_inject_skip value to deside if excute skip + */ +int32_t get_g_inject_skip(skip_type type); + +/** + * @brief function execute delay inject + * This function delay execute following program. + */ +void fault_inject_delay(delay_type type); + /** * @brief initialize the parameters * This function initializes the parameters of main function. @@ -142,5 +213,6 @@ int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char * */ void program_params_print(struct ProgramParams *params); +bool ip_is_v6(const char *ip); #endif // __EXAMPLES_PARAMETER_H__ diff --git a/examples/inc/server.h b/examples/inc/server.h index a3affef..4631a28 100644 --- a/examples/inc/server.h +++ b/examples/inc/server.h @@ -31,8 +31,7 @@ struct ServerMumUnit struct epoll_event *epevs; ///< the epoll events uint32_t curr_connect; ///< current connection number uint64_t recv_bytes; ///< total receive bytes - in_addr_t ip; ///< server ip - in_addr_t groupip; ///< server group ip + struct ServerIpInfo server_ip_info; uint16_t port; ///< server port uint32_t pktlen; ///< the length of peckage char* domain; ///< communication domain @@ -40,6 +39,9 @@ struct ServerMumUnit bool debug; ///< if we print the debug information char* epollcreate; ///< epoll_create method char* accept; ///< accept connections method + int32_t tcp_keepalive_idle; ///< tcp keepalive idle time + int32_t tcp_keepalive_interval; ///< tcp keepalive interval time + uint8_t protocol_type_mode; ///< tcp/udp ipv4/ipv6 protocol mode struct ServerMumUnit *next; ///< next pointer }; @@ -64,11 +66,13 @@ struct ServerMudWorker struct epoll_event *epevs; ///< the epoll events uint64_t recv_bytes; ///< total receive bytes uint32_t pktlen; ///< the length of peckage - in_addr_t ip; ///< client ip + ip_addr_t ip; ///< client ip uint16_t port; ///< client port char* api; ///< the type of api bool debug; ///< if we print the debug information char* epollcreate; ///< epoll_create method + char* domain; + uint32_t curr_connect; struct ServerMudWorker *next; ///< next pointer }; @@ -82,16 +86,17 @@ struct ServerMud struct ServerMudWorker *workers; ///< the workers int32_t epfd; ///< the listen epoll file descriptor struct epoll_event *epevs; ///< the epoll events - uint32_t curr_connect; ///< current connection number - in_addr_t ip; ///< server ip - in_addr_t groupip; ///< server group ip - uint16_t port; ///< server port + struct ServerIpInfo server_ip_info; + bool* port; ///< server port point to parameter's port uint32_t pktlen; ///< the length of peckage char* domain; ///< communication domain char* api; ///< the type of api bool debug; ///< if we print the debug information char* accept; ///< accept connections method char* epollcreate; ///< epoll_create method + int32_t tcp_keepalive_idle; ///< tcp keepalive idle time + int32_t tcp_keepalive_interval; ///< tcp keepalive interval time + uint8_t protocol_type_mode; ///< tcp/udp ipv4/ipv6 protocol mode }; @@ -105,7 +110,7 @@ struct ServerMud * @param debug if debug or not * @return the result pointer */ -void server_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug); +void server_debug_print(const char *ch_str, const char *act_str, ip_addr_t *ip, uint16_t port, bool debug); /** * @brief the multi thread, unblock, dissymmetric server prints informations @@ -136,7 +141,7 @@ int32_t sermud_listener_create_epfd_and_reg(struct ServerMud *server_mud); * @param server_mud the server unit * @return the result pointer */ -int32_t sermud_listener_accept_connects(struct ServerMud *server_mud); +int32_t sermud_listener_accept_connects(struct epoll_event *curr_epev, struct ServerMud *server_mud); /** * @brief the worker thread, unblock, dissymmetric server processes the events @@ -200,7 +205,7 @@ int32_t sersum_create_epfd_and_reg(struct ServerMumUnit *server_unit); * @param server_handler the server handler * @return the result pointer */ -int32_t sersum_accept_connects(struct ServerMumUnit *server_unit, struct ServerHandler *server_handler); +int32_t sersum_accept_connects(struct epoll_event *cur_epev, struct ServerMumUnit *server_unit); /** * @brief the single thread, unblock, mutliplexing IO server processes the events diff --git a/examples/inc/utilities.h b/examples/inc/utilities.h index 0f9db4e..262481a 100644 --- a/examples/inc/utilities.h +++ b/examples/inc/utilities.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -36,6 +37,7 @@ #include #include +#include #include #include "securec.h" @@ -47,7 +49,7 @@ { \ printf("\n[error]: "); \ printf(format, ##__VA_ARGS__); \ - printf("\n"); \ + printf("\n\n"); \ } while (0) #define PRINT_WARNNING(format, ...) do \ { \ @@ -76,7 +78,7 @@ } while(0) #define PRINT_CLIENT_DATAFLOW(format, ...) do \ { \ - printf("\033[?25l\033[A\033[K"); \ + printf(" "); \ printf("--> : "); \ printf(format, ##__VA_ARGS__); \ printf("\033[?25h\n"); \ @@ -90,24 +92,94 @@ #define PROGRAM_INPROGRESS (-2) ///< program in progress flag #define UNIX_TCP_PORT_MIN (1024) ///< TCP minimum port number in unix -#define UNIX_TCP_PORT_MAX (65535) ///< TCP minimum port number in unix +#define UNIX_TCP_PORT_MAX (65535) ///< TCP maximum port number in unix #define THREAD_NUM_MIN (1) ///< minimum number of thead #define THREAD_NUM_MAX (1000) ///< maximum number of thead #define MESSAGE_PKTLEN_MIN (2) ///< minimum length of message (1 byte) #define MESSAGE_PKTLEN_MAX (1024 * 1024 * 10) ///< maximum length of message (10 Mb) +#define UDP_PKTLEN_MAX (65507) ///< maximum length of udp message -#define SERVER_SOCKET_LISTEN_BACKLOG (128) ///< the queue of socket +#define SERVER_SOCKET_LISTEN_BACKLOG (4096) ///< the queue of socket #define SERVER_EPOLL_SIZE_MAX (10000) ///< the max wait event of epoll #define SERVER_EPOLL_WAIT_TIMEOUT (-1) ///< the timeout value of epoll #define CLIENT_EPOLL_SIZE_MAX (10000) ///< the max wait event of epoll #define CLIENT_EPOLL_WAIT_TIMEOUT (-1) ///< the timeout value of epoll -#define TERMINAL_REFRESH_MS (100) ///< the time cut off between of terminal refresh +#define TERMINAL_REFRESH_MS (500) ///< the time cut off between of terminal refresh #define SOCKET_UNIX_DOMAIN_FILE "unix_domain_file" ///< socket unix domain file +#define IPV4_STR "V4" +#define IPV6_STR "V6" +#define IPV4_MULTICAST "Multicast" +#define INVAILD_STR "STR_NULL" + +#define TIMES_CONVERSION_RATE (1000) +#define KB (1024) +#define MB (KB * KB) +#define GB (MB * MB) + +struct ThreadUintInfo { + uint64_t send_bytes; ///< total send bytes + uint32_t cur_connect_num; ///< total connection number + char* domain; + char* ip_type_info; + pthread_t thread_id; +}; + +typedef struct ip_addr { + struct { + struct in_addr ip4; + struct in6_addr ip6; + } u_addr; + uint32_t addr_family; +} ip_addr_t; + +typedef union sockaddr_union { + struct sockaddr sa; + struct sockaddr_in in; + struct sockaddr_in6 in6; +} sockaddr_t; +/** + * @brief client unit + * The information of each thread of client. + */ +struct ClientUnit { + struct ClientHandler *handlers; ///< the handlers + int32_t epfd; ///< the connect epoll file descriptor + struct epoll_event *epevs; ///< the epoll events + uint32_t curr_connect; ///< current connection number + ip_addr_t ip; ///< server ip + ip_addr_t groupip; ///< server groupip + uint32_t port; ///< server port + ip_addr_t groupip_interface; ///< udp multicast interface address> + uint32_t sport; ///< client sport + uint32_t connect_num; ///< total connection number + uint32_t pktlen; ///< the length of peckage + uint32_t loop; ///< the packet send to loop + bool verify; ///< if we verify or not + char* domain; ///< the communication domain + char* api; ///< the type of api + bool debug; ///< if we print the debug information + char* epollcreate; ///< epoll_create method + uint8_t protocol_type_mode; ///< tcp/udp ipv4/ipv6 protocol mode + struct ThreadUintInfo threadVolume; + struct ClientUnit *next; ///< next pointer +}; +struct ServerIpInfo { + ip_addr_t ip; ///< server ip + ip_addr_t groupip; ///< server group ip + ip_addr_t groupip_interface; ///< server group interface ip +}; + +struct LoopInfo { + char* model; + struct ServerMud *server_mud_info; + struct ServerMum *server_mum_info; +}; +extern struct LoopInfo loopmod; /** * @brief create the socket and listen * Thi function creates the socket and listen. @@ -118,7 +190,8 @@ * @param domain domain * @return the result */ -int32_t create_socket_and_listen(int32_t *socket_fd, in_addr_t ip, in_addr_t groupip, uint16_t port, const char *domain); +int32_t create_socket_and_listen(int32_t *listen_fd_array, struct ServerIpInfo *server_ip_info, uint16_t port, + uint8_t protocol_mode); /** * @brief create the socket and connect @@ -131,7 +204,7 @@ int32_t create_socket_and_listen(int32_t *socket_fd, in_addr_t ip, in_addr_t gro * @param api api * @return the result */ -int32_t create_socket_and_connect(int32_t *socket_fd, in_addr_t ip, in_addr_t groupip, uint16_t port, uint16_t sport, const char *domain, const char *api); +int32_t create_socket_and_connect(int32_t *socket_fd, struct ClientUnit *client_unit); /** * @brief set the socket to unblock @@ -140,6 +213,7 @@ int32_t create_socket_and_connect(int32_t *socket_fd, in_addr_t ip, in_addr_t gr * @return the result */ int32_t set_socket_unblock(int32_t socket_fd); +int32_t set_tcp_keep_alive_info(int32_t sockfd, int32_t tcp_keepalive_idle, int32_t tcp_keepalive_interval); #endif // __EXAMPLES_UTILITIES_H__ diff --git a/examples/main.c b/examples/main.c index 5338572..dfee2db 100644 --- a/examples/main.c +++ b/examples/main.c @@ -31,7 +31,12 @@ int32_t main(int argc, char *argv[]) if (strcmp(prog_params.as, "server") == 0) { server_create_and_run(&prog_params); - } else { + } else if (strcmp(prog_params.as, "client") == 0) { + client_create_and_run(&prog_params); + } else if (strcmp(prog_params.as, "loop") == 0) { + server_create_and_run(&prog_params); + /* sleep to wait server creating */ + sleep(1); client_create_and_run(&prog_params); } diff --git a/examples/src/bussiness.c b/examples/src/bussiness.c index 7263371..46c99fe 100644 --- a/examples/src/bussiness.c +++ b/examples/src/bussiness.c @@ -11,8 +11,9 @@ */ -#include "bussiness.h" +#include "parameter.h" #include "client.h" +#include "bussiness.h" static const char bussiness_messages_low[] = "abcdefghijklmnopqrstuvwxyz"; // the lower charactors of business message @@ -135,41 +136,41 @@ int32_t client_bussiness(char *out, const char *in, uint32_t size, bool verify, return PROGRAM_OK; } -// server answers -int32_t server_ans(struct ServerHandler *server_handler, uint32_t pktlen, const char* api, const char* domain) +static void server_ans_free_buff(char *buff_in, char *buff_out) { - const uint32_t length = pktlen; - char *buffer_in = (char *)malloc(length * sizeof(char)); - char *buffer_out = (char *)malloc(length * sizeof(char)); + if (buff_in) { + free(buff_in); + } + if (buff_out) { + free(buff_out); + } +} + +// server_ans_read +static int32_t server_ans_read(int32_t socket_fd, struct ServerBaseCfgInfo *server_base_info, char *buffer_in, + struct sockaddr *client_addr) +{ + const uint32_t length = server_base_info->pktlen; + const char *api = server_base_info->api; + const char *domain = server_base_info->domain; int32_t cread = 0; int32_t sread = length; int32_t nread = 0; - struct sockaddr_in client_addr; - socklen_t len = sizeof(client_addr); - if (strcmp(domain, "udp") == 0 && strncmp(api, "recvfrom", strlen("recvfrom")) != 0) { - if (getpeername(server_handler->fd, (struct sockaddr *)&client_addr, &len) < 0) { - if (recvfrom(server_handler->fd, buffer_in, length, MSG_PEEK, (struct sockaddr *)&client_addr, &len) < 0) { - return PROGRAM_FAULT; - } - if (connect(server_handler->fd, (struct sockaddr *)&client_addr, sizeof(struct sockaddr_in)) < 0) { - return PROGRAM_FAULT; - } - } - } + socklen_t len = sizeof(sockaddr_t); while (cread < sread) { if (strcmp(domain, "udp") == 0 && strcmp(api, "recvfromsendto") == 0) { - nread = recvfrom(server_handler->fd, buffer_in, length, 0, (struct sockaddr *)&client_addr, &len); + nread = recvfrom(socket_fd, buffer_in, length, 0, client_addr, &len); } else { - nread = read_api(server_handler->fd, buffer_in, length, api); + nread = read_api(socket_fd, buffer_in, length, api); } - if (nread == 0) { return PROGRAM_ABORT; } else if (nread < 0) { if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { + PRINT_ERROR("nread =%d, errno=%d", nread, errno); return PROGRAM_FAULT; } } else { @@ -177,66 +178,152 @@ int32_t server_ans(struct ServerHandler *server_handler, uint32_t pktlen, const continue; } } + return PROGRAM_OK; +} - if (strcmp(api, "recvfrom") == 0) { - free(buffer_in); - free(buffer_out); - return PROGRAM_OK; - } - - server_bussiness(buffer_out, buffer_in, length); +static int32_t server_ans_write(int32_t socket_fd, struct ServerBaseCfgInfo *server_base_info, char *buffer_out, + struct sockaddr *client_addr) +{ + const uint32_t length = server_base_info->pktlen; + const char *api = server_base_info->api; + const char *domain = server_base_info->domain; int32_t cwrite = 0; int32_t swrite = length; int32_t nwrite = 0; + socklen_t len = sizeof(sockaddr_t); + while (cwrite < swrite) { if (strcmp(domain, "udp") == 0 && strcmp(api, "recvfromsendto") == 0) { - nwrite = sendto(server_handler->fd, buffer_out, length, 0, (struct sockaddr *)&client_addr, len); + nwrite = sendto(socket_fd, buffer_out, swrite - cwrite, 0, client_addr, len); } else { - nwrite = write_api(server_handler->fd, buffer_out, length, api); + nwrite = write_api(socket_fd, buffer_out, swrite - cwrite, api); } if (nwrite == 0) { return PROGRAM_ABORT; } else if (nwrite < 0) { - if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { + if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { + PRINT_ERROR("nwrite =%d, errno=%d", nwrite, errno); return PROGRAM_FAULT; - } + } } else { cwrite += nwrite; continue; } } + return PROGRAM_OK; +} - free(buffer_in); - free(buffer_out); +// server answers +int32_t server_ans(int32_t fd, uint32_t pktlen, const char* api, const char* domain) +{ + const uint32_t length = pktlen; + char *buffer_in = (char *)calloc(length, sizeof(char)); + char *buffer_out = (char *)calloc(length, sizeof(char)); + if (buffer_in == NULL || buffer_out == NULL) { + return PROGRAM_FAULT; + } + + struct ServerBaseCfgInfo server_base_info; + server_base_info.domain = domain; + server_base_info.api = api; + server_base_info.pktlen = pktlen; + + sockaddr_t client_addr; + socklen_t len = sizeof(sockaddr_t); + + if (strcmp(domain, "udp") == 0 && strncmp(api, "recvfrom", strlen("recvfrom")) != 0) { + if (getpeername(fd, (struct sockaddr *)&client_addr, &len) < 0) { + if (recvfrom(fd, buffer_in, length, MSG_PEEK, (struct sockaddr *)&client_addr, &len) < 0) { + server_ans_free_buff(buffer_in, buffer_out); + return PROGRAM_FAULT; + } + if (connect(fd, (struct sockaddr *)&client_addr, len) < 0) { + server_ans_free_buff(buffer_in, buffer_out); + return PROGRAM_FAULT; + } + } + } + + fault_inject_delay(INJECT_DELAY_READ); + FAULT_INJECT_SKIP_BEGIN(INJECT_SKIP_READ) + + if (server_ans_read(fd, &server_base_info, buffer_in, (struct sockaddr *)&client_addr) != PROGRAM_OK) { + server_ans_free_buff(buffer_in, buffer_out); + return PROGRAM_FAULT; + } + + FAULT_INJECT_SKIP_END + + if (strcmp(api, "recvfrom") == 0) { + server_ans_free_buff(buffer_in, buffer_out); + return PROGRAM_OK; + } + + server_bussiness(buffer_out, buffer_in, length); + + fault_inject_delay(INJECT_DELAY_WRITE); + FAULT_INJECT_SKIP_BEGIN(INJECT_SKIP_WRITE) + + if (server_ans_write(fd, &server_base_info, buffer_out, (struct sockaddr *)&client_addr) != PROGRAM_OK) { + server_ans_free_buff(buffer_in, buffer_out); + return PROGRAM_FAULT; + } + + FAULT_INJECT_SKIP_END + + server_ans_free_buff(buffer_in, buffer_out); return PROGRAM_OK; } // client asks -int32_t client_ask(struct ClientHandler *client_handler, uint32_t pktlen, const char* api, const char* domain, in_addr_t ip, uint16_t port) +int32_t client_ask(struct ClientHandler *client_handler, struct ClientUnit *client_unit) { - const uint32_t length = pktlen; - char *buffer_in = (char *)malloc(length * sizeof(char)); - char *buffer_out = (char *)malloc(length * sizeof(char)); - struct sockaddr_in server_addr; - socklen_t len = sizeof(server_addr); - memset_s(&server_addr, sizeof(server_addr), 0, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - server_addr.sin_addr.s_addr = ip; - server_addr.sin_port = port; + const char *api = client_unit->api; + const char *domain = client_unit->domain; + + ip_addr_t *ip = client_unit->protocol_type_mode == UDP_MULTICAST ? &client_unit->groupip : &client_unit->ip; + uint16_t port = client_unit->port; + + const uint32_t length = client_unit->pktlen; + char *buffer_in = (char *)calloc(length, sizeof(char)); + char *buffer_out = (char *)calloc(length, sizeof(char)); + if (buffer_in == NULL || buffer_out == NULL) { + return PROGRAM_FAULT; + } + sockaddr_t server_addr; + socklen_t len = 0; + + if (ip->addr_family == AF_INET6) { + memset_s(&server_addr, sizeof(struct sockaddr_in6), 0, sizeof(struct sockaddr_in6)); + ((struct sockaddr_in6 *)&server_addr)->sin6_family = AF_INET6; + ((struct sockaddr_in6 *)&server_addr)->sin6_addr = ip->u_addr.ip6; + ((struct sockaddr_in6 *)&server_addr)->sin6_port = port; + len = sizeof(struct sockaddr_in6); + } else if (ip->addr_family == AF_INET) { + memset_s(&server_addr, sizeof(struct sockaddr_in), 0, sizeof(struct sockaddr_in)); + ((struct sockaddr_in *)&server_addr)->sin_family = AF_INET; + ((struct sockaddr_in *)&server_addr)->sin_addr = ip->u_addr.ip4; + ((struct sockaddr_in *)&server_addr)->sin_port = port; + len = sizeof(struct sockaddr_in); + } client_bussiness(buffer_out, buffer_in, length, false, &(client_handler->msg_idx)); int32_t cwrite = 0; int32_t swrite = length; int32_t nwrite = 0; + + fault_inject_delay(INJECT_DELAY_WRITE); + FAULT_INJECT_SKIP_BEGIN(INJECT_SKIP_WRITE) + while (cwrite < swrite) { if (strcmp(domain, "udp") == 0 && strcmp(api, "recvfromsendto") == 0) { - nwrite = sendto(client_handler->fd, buffer_out, length, 0, (struct sockaddr *)&server_addr, len); + nwrite = sendto(client_handler->fd, buffer_out, swrite - cwrite, 0, (struct sockaddr *)&server_addr, len); } else { - nwrite = write_api(client_handler->fd, buffer_out, length, api); + nwrite = write_api(client_handler->fd, buffer_out, swrite - cwrite, api); } if (nwrite == 0) { return PROGRAM_ABORT; @@ -250,6 +337,8 @@ int32_t client_ask(struct ClientHandler *client_handler, uint32_t pktlen, const } } + FAULT_INJECT_SKIP_END + free(buffer_in); free(buffer_out); @@ -257,18 +346,24 @@ int32_t client_ask(struct ClientHandler *client_handler, uint32_t pktlen, const } // client checks -int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, bool verify, const char* api, const char* domain, in_addr_t ip) +int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, bool verify, const char* api, const char* domain, ip_addr_t* ip) { const uint32_t length = pktlen; - char *buffer_in = (char *)malloc(length * sizeof(char)); - char *buffer_out = (char *)malloc(length * sizeof(char)); + char *buffer_in = (char *)calloc(length, sizeof(char)); + char *buffer_out = (char *)calloc(length, sizeof(char)); + if (buffer_in == NULL || buffer_out == NULL) { + return PROGRAM_FAULT; + } int32_t cread = 0; int32_t sread = length; int32_t nread = 0; - struct sockaddr_in server_addr; - socklen_t len = sizeof(server_addr); + sockaddr_t server_addr; + socklen_t len = ip->addr_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + fault_inject_delay(INJECT_DELAY_READ); + FAULT_INJECT_SKIP_BEGIN(INJECT_SKIP_READ) + while (cread < sread) { if (strcmp(domain, "udp") == 0 && strcmp(api, "recvfromsendto") == 0) { nread = recvfrom(client_handler->fd, buffer_in, length, 0, (struct sockaddr *)&server_addr, &len); @@ -287,6 +382,8 @@ int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, boo } } + FAULT_INJECT_SKIP_END + if (client_bussiness(buffer_out, buffer_in, length, verify, &(client_handler->msg_idx)) < 0) { PRINT_ERROR("message verify fault! "); getchar(); @@ -295,15 +392,18 @@ int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, boo int32_t cwrite = 0; int32_t swrite = length; int32_t nwrite = 0; - if (ip >= inet_addr("224.0.0.0") && ip <= inet_addr("239.255.255.255")) { - server_addr.sin_addr.s_addr = ip; + if (ip->addr_family == AF_INET && ip->u_addr.ip4.s_addr >= inet_addr("224.0.0.0") && ip->u_addr.ip4.s_addr <= inet_addr("239.255.255.255")) { + ((struct sockaddr_in*)&server_addr)->sin_addr = ip->u_addr.ip4; } + fault_inject_delay(INJECT_DELAY_WRITE); + FAULT_INJECT_SKIP_BEGIN(INJECT_SKIP_WRITE) + while (cwrite < swrite) { if (strcmp(domain, "udp") == 0 && strcmp(api, "recvfromsendto") == 0) { - nwrite = sendto(client_handler->fd, buffer_out, length, 0, (struct sockaddr *)&server_addr, len); + nwrite = sendto(client_handler->fd, buffer_out, swrite - cwrite, 0, (struct sockaddr *)&server_addr, len); } else { - nwrite = write_api(client_handler->fd, buffer_out, length, api); + nwrite = write_api(client_handler->fd, buffer_out, swrite - cwrite, api); } if (nwrite == 0) { return PROGRAM_ABORT; @@ -317,6 +417,8 @@ int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, boo } } + FAULT_INJECT_SKIP_END + free(buffer_in); free(buffer_out); diff --git a/examples/src/client.c b/examples/src/client.c index 1366924..43fbd0e 100644 --- a/examples/src/client.c +++ b/examples/src/client.c @@ -12,24 +12,61 @@ #include "client.h" - +#include "server.h" static pthread_mutex_t client_debug_mutex; // the client mutex for printf +struct Client *g_client_begin = NULL; + +static int32_t client_process_ask(struct ClientHandler *client_handler, struct ClientUnit *client_unit); +static void client_get_domain_ipversion(uint8_t protocol_type, struct ClientUnit *client_unit); +static void timer_handle(int signum) +{ + if (g_client_begin == NULL) { + return; + } + + struct ClientUnit *begin_client_unit = g_client_begin->uints; + while (begin_client_unit != NULL) { + if (begin_client_unit->domain != NULL && strcmp(begin_client_unit->domain, "udp") != 0) { + begin_client_unit = begin_client_unit->next; + continue; + } + for (int32_t i = 0; i < begin_client_unit->connect_num; i++) { + struct ClientHandler *handle = begin_client_unit->handlers + i; + if (handle->sendtime_interverl == TIME_SEND_INTERVAL) { + client_process_ask(handle, begin_client_unit); + } else { + handle->sendtime_interverl++; + } + } + + begin_client_unit = begin_client_unit->next; + } + alarm(TIME_SCAN_INTERVAL); +} + +static struct Client_domain_ip g_cfgmode_map[PROTOCOL_MODE_MAX] = { + [V4_TCP] = {"tcp", AF_INET}, + [V6_TCP] = {"tcp", AF_INET6}, + [V4_UDP] = {"udp", AF_INET}, + [V6_UDP] = {"udp", AF_INET6}, + [UDP_MULTICAST] = {"udp", AF_INET}}; // the single thread, client prints informations -void client_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug) +void client_debug_print(const char *ch_str, const char *act_str, ip_addr_t *ip, uint16_t port, bool debug) { if (debug == true) { pthread_mutex_lock(&client_debug_mutex); - struct in_addr sin_addr; - sin_addr.s_addr = ip; + uint8_t str_len = ip->addr_family == AF_INET ? INET_ADDRSTRLEN : INET6_ADDRSTRLEN; + char str_ip[str_len]; + inet_ntop(ip->addr_family, &ip->u_addr, str_ip, str_len); PRINT_CLIENT("[%s] [pid: %d] [tid: %ld] [%s <- %s:%d]. ", \ ch_str, \ getpid(), \ pthread_self(), \ act_str, \ - inet_ntoa(sin_addr), \ + str_ip, \ ntohs(port)); pthread_mutex_unlock(&client_debug_mutex); } @@ -41,7 +78,8 @@ void client_info_print(struct Client *client) if (client->debug == false) { struct timeval begin; gettimeofday(&begin, NULL); - uint64_t begin_time = (uint64_t)begin.tv_sec * 1000 + (uint64_t)begin.tv_usec / 1000; + uint64_t begin_time = (uint64_t)begin.tv_sec * TIMES_CONVERSION_RATE + + (uint64_t)begin.tv_usec / TIMES_CONVERSION_RATE; uint32_t curr_connect = 0; double bytes_ps = 0; @@ -49,45 +87,164 @@ void client_info_print(struct Client *client) struct ClientUnit *begin_uint = client->uints; while (begin_uint != NULL) { curr_connect += begin_uint->curr_connect; - begin_send_bytes += begin_uint->send_bytes; + begin_send_bytes += begin_uint->threadVolume.send_bytes; begin_uint = begin_uint->next; } struct timeval delay; delay.tv_sec = 0; - delay.tv_usec = TERMINAL_REFRESH_MS * 1000; + delay.tv_usec = TERMINAL_REFRESH_MS * TIMES_CONVERSION_RATE; select(0, NULL, NULL, NULL, &delay); uint64_t end_send_bytes = 0; struct ClientUnit *end_uint = client->uints; while (end_uint != NULL) { - end_send_bytes += end_uint->send_bytes; + end_send_bytes += end_uint->threadVolume.send_bytes; end_uint = end_uint->next; } struct timeval end; gettimeofday(&end, NULL); - uint64_t end_time = (uint64_t)end.tv_sec * 1000 + (uint64_t)end.tv_usec / 1000; - + uint64_t end_time = (uint64_t)end.tv_sec * TIMES_CONVERSION_RATE + + (uint64_t)end.tv_usec / TIMES_CONVERSION_RATE; + double bytes_sub = end_send_bytes > begin_send_bytes ? (double)(end_send_bytes - begin_send_bytes) : 0; - double time_sub = end_time > begin_time ? (double)(end_time - begin_time) / 1000 : 0; + double time_sub = end_time > begin_time ? (double)(end_time - begin_time) / TIMES_CONVERSION_RATE : 0; bytes_ps = bytes_sub / time_sub; - if (bytes_ps < 1024) { + if (bytes_ps < KB) { PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f B/s", curr_connect, bytes_ps); - } else if (bytes_ps < (1024 * 1024)) { - PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f KB/s", curr_connect, bytes_ps / 1024); + } else if (bytes_ps < MB) { + PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f KB/s", curr_connect, bytes_ps / KB); } else { - PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f MB/s", curr_connect, bytes_ps / (1024 * 1024)); + PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f MB/s", curr_connect, bytes_ps / MB); + } + + if (client->loop) { + printf("\033[2A\033[120C\033[K\n"); + return; + } + printf("\033[A\033[K"); + } +} + +static int32_t client_process_ask(struct ClientHandler *client_handler, struct ClientUnit *client_unit) +{ + // not support udp+v6 currently + if (strcmp(client_unit->domain, "udp") == 0 && client_unit->ip.addr_family == AF_INET6) { + return PROGRAM_OK; + } + + int32_t client_ask_ret = client_ask(client_handler, client_unit); + if (client_ask_ret == PROGRAM_FAULT) { + --client_unit->curr_connect; + struct epoll_event ep_ev; + if (client_handler->fd > 0 && epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, (client_handler)->fd, &ep_ev) < 0) { + PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", (client_handler)->fd, errno); + return PROGRAM_FAULT; } + } else if (client_ask_ret == PROGRAM_ABORT) { + --client_unit->curr_connect; + if (close((client_handler)->fd) < 0) { + PRINT_ERROR("client can't close the socket! "); + return PROGRAM_FAULT; + } + client_debug_print("client unit", "close", &client_unit->ip, client_unit->port, client_unit->debug); + } else { + client_unit->threadVolume.send_bytes += client_unit->pktlen; + client_handler->sendtime_interverl = 0; + client_debug_print("client unit", "send", &client_unit->ip, client_unit->port, client_unit->debug); + } + return PROGRAM_OK; +} + +static void client_get_thread_volume(struct Client *client, struct ThreadUintInfo *threadVolume) +{ + int index = 0; + struct ClientUnit *curUint = client->uints; + while (curUint != NULL && index < client->threadNum) { + threadVolume[index].send_bytes = curUint->threadVolume.send_bytes; + + threadVolume[index].cur_connect_num = curUint->curr_connect; + threadVolume[index].thread_id = curUint->threadVolume.thread_id; + threadVolume[index].domain = curUint->threadVolume.domain; + threadVolume[index].ip_type_info = curUint->threadVolume.ip_type_info; + curUint = curUint->next; + index++; + } +} + +void client_info_print_mixed(struct Client *client, struct ThreadUintInfo *threadVolume, + struct ThreadUintInfo *endThreadVolume) +{ + if (client->debug == true) { + return; + } + int32_t pthread_num = client->threadNum; + int32_t not_support_thread = 0; + struct timeval cur = {0}; + + gettimeofday(&cur, NULL); + uint64_t begin_time = (uint64_t)cur.tv_sec * TIMES_CONVERSION_RATE + (uint64_t)cur.tv_usec / TIMES_CONVERSION_RATE; + + client_get_thread_volume(client, threadVolume); + + struct timeval delay; + delay.tv_sec = 0; + delay.tv_usec = TERMINAL_REFRESH_MS * TIMES_CONVERSION_RATE; + select(0, NULL, NULL, NULL, &delay); + + client_get_thread_volume(client, endThreadVolume); + + gettimeofday(&cur, NULL); + uint64_t end_time = (uint64_t)cur.tv_sec * TIMES_CONVERSION_RATE + (uint64_t)cur.tv_usec / TIMES_CONVERSION_RATE; + + for (int i = 0; i < pthread_num; i++) { + uint64_t begin_send_bytes = threadVolume[i].send_bytes; + uint64_t end_send_bytes = endThreadVolume[i].send_bytes; + pthread_t thread_id = endThreadVolume[i].thread_id; + uint32_t connect_num = endThreadVolume[i].cur_connect_num; + char *domain = endThreadVolume[i].domain; + char *ip_ver = endThreadVolume[i].ip_type_info; + + if (thread_id == 0) { + not_support_thread++; + continue; + } + + double bytes_sub = end_send_bytes > begin_send_bytes ? (double)(end_send_bytes - begin_send_bytes) : 0; + double time_sub = end_time > begin_time ? (double)(end_time - begin_time) / TIMES_CONVERSION_RATE : 0; + double bytes_ps = bytes_sub / time_sub; + + if (bytes_ps < KB) { + PRINT_CLIENT_DATAFLOW("threadID=%-15lu, %s_%-9s [connect num]: %u, [send]: %.3f B/s", + thread_id, domain, ip_ver, connect_num, bytes_ps); + } else if (bytes_ps < MB) { + PRINT_CLIENT_DATAFLOW("threadID=%-15lu, %s_%-9s [connect num]: %u, [send]: %.3f kB/s", + thread_id, domain, ip_ver, connect_num, bytes_ps / KB); + } else { + PRINT_CLIENT_DATAFLOW("threadID=%-15lu, %s_%-9s [connect num]: %u, [send]: %.3f MB/s", + thread_id, domain, ip_ver, connect_num, bytes_ps / MB); + } + } + printf("\033[%dA\033[K", pthread_num - not_support_thread); +} + +void loop_info_print() +{ + printf(" "); + if (strcmp(loopmod.model, "mum") == 0) { + sermum_info_print(loopmod.server_mum_info); + } else { + sermud_info_print(loopmod.server_mud_info); } } // the single thread, client try to connect to server, register to epoll -int32_t client_thread_try_connect(struct ClientHandler *client_handler, int32_t epoll_fd, in_addr_t ip, in_addr_t groupip, uint16_t port, uint16_t sport, const char *domain, const char *api) +int32_t client_thread_try_connect(struct ClientHandler *client_handler, struct ClientUnit *client_unit) { - int32_t create_socket_and_connect_ret = create_socket_and_connect(&(client_handler->fd), ip, groupip, port, sport, domain, api); + int32_t create_socket_and_connect_ret = create_socket_and_connect(&(client_handler->fd), client_unit); if (create_socket_and_connect_ret == PROGRAM_INPROGRESS) { return PROGRAM_OK; } @@ -97,7 +254,7 @@ int32_t client_thread_try_connect(struct ClientHandler *client_handler, int32_t // the single thread, client retry to connect to server, register to epoll int32_t client_thread_retry_connect(struct ClientUnit *client_unit, struct ClientHandler *client_handler) { - int32_t clithd_try_cnntask_ret = client_thread_try_connect(client_handler, client_unit->epfd, client_unit->ip, client_unit->groupip, client_unit->port, client_unit->sport, client_unit->domain, client_unit->api); + int32_t clithd_try_cnntask_ret = client_thread_try_connect(client_handler, client_unit); if (clithd_try_cnntask_ret < 0) { if (clithd_try_cnntask_ret == PROGRAM_INPROGRESS) { return PROGRAM_OK; @@ -114,35 +271,27 @@ int32_t client_thread_retry_connect(struct ClientUnit *client_unit, struct Clien ++(client_unit->curr_connect); - struct sockaddr_in server_addr; - socklen_t server_addr_len = sizeof(server_addr); + sockaddr_t server_addr; + socklen_t server_addr_len = client_unit->ip.addr_family ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); if (getpeername(client_handler->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) { PRINT_ERROR("client can't socket peername %d! ", errno); return PROGRAM_FAULT; } - client_debug_print("client unit", "connect", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); - int32_t client_ask_ret = client_ask(client_handler, client_unit->pktlen, client_unit->api, client_unit->domain, client_unit->groupip ? client_unit->groupip:client_unit->ip, client_unit->port); - if (client_ask_ret == PROGRAM_FAULT) { - --client_unit->curr_connect; - struct epoll_event ep_ev; - if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, client_handler->fd, &ep_ev) < 0) { - PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", client_handler->fd, errno); - return PROGRAM_FAULT; - } - } else if (client_ask_ret == PROGRAM_ABORT) { - --client_unit->curr_connect; - if (close(client_handler->fd) < 0) { - PRINT_ERROR("client can't close the socket %d! ", errno); - return PROGRAM_FAULT; - } - client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); - } else { - client_unit->send_bytes += client_unit->pktlen; - client_debug_print("client unit", "send", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); + // sockaddr to ip, port + ip_addr_t remote_ip; + uint16_t remote_port = ((struct sockaddr_in*)&server_addr)->sin_port; + if (((struct sockaddr *)&server_addr)->sa_family == AF_INET) { + remote_ip.addr_family = AF_INET; + remote_ip.u_addr.ip4 = ((struct sockaddr_in *)&server_addr)->sin_addr; + } else if (((struct sockaddr *)&server_addr)->sa_family == AF_INET6) { + remote_ip.addr_family = AF_INET6; + remote_ip.u_addr.ip6 = ((struct sockaddr_in6 *)&server_addr)->sin6_addr; } - return PROGRAM_OK; + client_debug_print("client unit", "connect", &remote_ip, remote_port, client_unit->debug); + + return client_process_ask(client_handler, client_unit); } // the single thread, client connects and gets epoll feature descriptors @@ -162,7 +311,7 @@ int32_t client_thread_create_epfd_and_reg(struct ClientUnit *client_unit) } for (uint32_t i = 0; i < connect_num; ++i) { - int32_t clithd_try_cnntask_ret = client_thread_try_connect(client_unit->handlers + i, client_unit->epfd, client_unit->ip, client_unit->groupip, client_unit->port, client_unit->sport, client_unit->domain, client_unit->api); + int32_t clithd_try_cnntask_ret = client_thread_try_connect(client_unit->handlers + i, client_unit); if (clithd_try_cnntask_ret < 0) { if (clithd_try_cnntask_ret == PROGRAM_INPROGRESS) { continue; @@ -179,26 +328,11 @@ int32_t client_thread_create_epfd_and_reg(struct ClientUnit *client_unit) ++(client_unit->curr_connect); - client_debug_print("client unit", "connect", client_unit->ip, client_unit->port, client_unit->debug); - - int32_t client_ask_ret = client_ask(client_unit->handlers + i, client_unit->pktlen, client_unit->api, client_unit->domain, client_unit->groupip ? client_unit->groupip:client_unit->ip, client_unit->port); - if (client_ask_ret == PROGRAM_FAULT) { - --client_unit->curr_connect; - struct epoll_event ep_ev; - if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, (client_unit->handlers + i)->fd, &ep_ev) < 0) { - PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", client_unit->epevs[i].data.fd, errno); - return PROGRAM_FAULT; - } - } else if (client_ask_ret == PROGRAM_ABORT) { - --client_unit->curr_connect; - if (close((client_unit->handlers + i)->fd) < 0) { - PRINT_ERROR("client can't close the socket! "); - return PROGRAM_FAULT; - } - client_debug_print("client unit", "close", client_unit->ip, client_unit->port, client_unit->debug); - } else { - client_unit->send_bytes += client_unit->pktlen; - client_debug_print("client unit", "send", client_unit->ip, client_unit->port, client_unit->debug); + client_debug_print("client unit", "connect", &client_unit->ip, client_unit->port, client_unit->debug); + + int32_t client_ask_ret = client_process_ask(client_unit->handlers + i, client_unit); + if (client_ask_ret != PROGRAM_OK) { + return client_ask_ret; } } } @@ -206,15 +340,97 @@ int32_t client_thread_create_epfd_and_reg(struct ClientUnit *client_unit) return PROGRAM_OK; } + +static int32_t clithd_proc_epevs_epollout(struct epoll_event *curr_epev, struct ClientUnit *client_unit) +{ + int32_t connect_error = 0; + socklen_t connect_error_len = sizeof(connect_error); + struct ClientHandler *client_handler = (struct ClientHandler *)curr_epev->data.ptr; + if (getsockopt(client_handler->fd, SOL_SOCKET, SO_ERROR, (void *)(&connect_error), &connect_error_len) < 0) { + PRINT_ERROR("client can't get socket option %d! ", errno); + return PROGRAM_FAULT; + } + if (connect_error < 0) { + if (connect_error == ETIMEDOUT) { + if (client_thread_retry_connect(client_unit, client_handler) < 0) { + return PROGRAM_FAULT; + } + return PROGRAM_OK; + } + PRINT_ERROR("client connect error %d! ", connect_error); + return PROGRAM_FAULT; + } else { + ++(client_unit->curr_connect); + + sockaddr_t server_addr; + socklen_t server_addr_len = + client_unit->ip.addr_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + if (getpeername(client_handler->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) { + PRINT_ERROR("client can't socket peername %d! ", errno); + return PROGRAM_FAULT; + } + + // sockaddr to ip, port + ip_addr_t remote_ip; + uint16_t remote_port = ((struct sockaddr_in *)&server_addr)->sin_port; + if (((struct sockaddr *)&server_addr)->sa_family == AF_INET) { + remote_ip.addr_family = AF_INET; + remote_ip.u_addr.ip4 = ((struct sockaddr_in *)&server_addr)->sin_addr; + } else if (((struct sockaddr *)&server_addr)->sa_family == AF_INET6) { + remote_ip.addr_family = AF_INET6; + remote_ip.u_addr.ip6 = ((struct sockaddr_in6 *)&server_addr)->sin6_addr; + } + + client_debug_print("client unit", "connect", &remote_ip, remote_port, client_unit->debug); + + int32_t client_ask_ret = client_process_ask(client_handler, client_unit); + if (client_ask_ret != PROGRAM_OK) { + return client_ask_ret; + } + } + return PROGRAM_OK; +} + +static int32_t clithd_proc_epevs_epollin(struct epoll_event *curr_epev, struct ClientUnit *client_unit) +{ + ip_addr_t *chkans_ip = client_unit->protocol_type_mode == UDP_MULTICAST ? &client_unit->groupip : &client_unit->ip; + int32_t client_chkans_ret = client_chkans((struct ClientHandler *)curr_epev->data.ptr, client_unit->pktlen, + client_unit->verify, client_unit->api, client_unit->domain, chkans_ip); + struct ClientHandler *client_handler = (struct ClientHandler *)curr_epev->data.ptr; + int32_t fd = client_handler->fd; + if (client_chkans_ret == PROGRAM_FAULT) { + --client_unit->curr_connect; + struct epoll_event ep_ev; + if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, fd, &ep_ev) < 0) { + PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", fd, errno); + return PROGRAM_FAULT; + } + } else if (client_chkans_ret == PROGRAM_ABORT) { + --client_unit->curr_connect; + if (close(fd) < 0) { + PRINT_ERROR("client can't close the socket %d! ", errno); + return PROGRAM_FAULT; + } + client_debug_print("client unit", "close", &client_unit->ip, client_unit->port, client_unit->debug); + } else { + client_unit->threadVolume.send_bytes += client_unit->pktlen; + client_handler->sendtime_interverl = 0; + client_debug_print("client unit", "receive", &client_unit->ip, client_unit->port, client_unit->debug); + } + return PROGRAM_OK; +} + // the single thread, client processes epoll events int32_t clithd_proc_epevs(struct ClientUnit *client_unit) { int32_t epoll_nfds = epoll_wait(client_unit->epfd, client_unit->epevs, CLIENT_EPOLL_SIZE_MAX, CLIENT_EPOLL_WAIT_TIMEOUT); + int ret = 0; if (epoll_nfds < 0) { PRINT_ERROR("client epoll wait error %d! ", errno); return PROGRAM_FAULT; } + for (int32_t i = 0; i < epoll_nfds; ++i) { struct epoll_event *curr_epev = client_unit->epevs + i; @@ -222,76 +438,17 @@ int32_t clithd_proc_epevs(struct ClientUnit *client_unit) PRINT_ERROR("client epoll wait error! %d", curr_epev->events); return PROGRAM_FAULT; } else if (curr_epev->events == EPOLLOUT) { - int32_t connect_error = 0; - socklen_t connect_error_len = sizeof(connect_error); - struct ClientHandler *client_handler = (struct ClientHandler *)curr_epev->data.ptr; - if (getsockopt(client_handler->fd, SOL_SOCKET, SO_ERROR, (void *)(&connect_error), &connect_error_len) < 0) { - PRINT_ERROR("client can't get socket option %d! ", errno); - return PROGRAM_FAULT; - } - if (connect_error < 0) { - if (connect_error == ETIMEDOUT) { - if (client_thread_retry_connect(client_unit, client_handler) < 0) { - return PROGRAM_FAULT; - } - continue; - } - PRINT_ERROR("client connect error %d! ", connect_error); - return PROGRAM_FAULT; - } else { - ++(client_unit->curr_connect); - - struct sockaddr_in server_addr; - socklen_t server_addr_len = sizeof(server_addr); - if (getpeername(client_handler->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) { - PRINT_ERROR("client can't socket peername %d! ", errno); - return PROGRAM_FAULT; - } - client_debug_print("client unit", "connect", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); - - int32_t client_ask_ret = client_ask(client_handler, client_unit->pktlen, client_unit->api, client_unit->domain, client_unit->groupip ? client_unit->groupip:client_unit->ip, client_unit->port); - if (client_ask_ret == PROGRAM_FAULT) { - --client_unit->curr_connect; - struct epoll_event ep_ev; - if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, curr_epev->data.fd, &ep_ev) < 0) { - PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", curr_epev->data.fd, errno); - return PROGRAM_FAULT; - } - } else if (client_ask_ret == PROGRAM_ABORT) { - --client_unit->curr_connect; - if (close(curr_epev->data.fd) < 0) { - PRINT_ERROR("client can't close the socket! "); - return PROGRAM_FAULT; - } - client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); - } else { - client_unit->send_bytes += client_unit->pktlen; - client_debug_print("client unit", "send", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug); - } + ret = clithd_proc_epevs_epollout(curr_epev, client_unit); + if (ret != PROGRAM_OK) { + return ret; } } else if (curr_epev->events == EPOLLIN) { - int32_t client_chkans_ret = client_chkans((struct ClientHandler *)curr_epev->data.ptr, client_unit->pktlen, client_unit->verify, client_unit->api, client_unit->domain, client_unit->groupip ? client_unit->groupip:client_unit->ip); - if (client_chkans_ret == PROGRAM_FAULT) { - --client_unit->curr_connect; - struct epoll_event ep_ev; - if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, curr_epev->data.fd, &ep_ev) < 0) { - PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", curr_epev->data.fd, errno); - return PROGRAM_FAULT; - } - } else if (client_chkans_ret == PROGRAM_ABORT) { - --client_unit->curr_connect; - if (close(curr_epev->data.fd) < 0) { - PRINT_ERROR("client can't close the socket %d! ", errno); - return PROGRAM_FAULT; - } - client_debug_print("client unit", "close", client_unit->ip, client_unit->port, client_unit->debug); - } else { - client_unit->send_bytes += client_unit->pktlen; - client_debug_print("client unit", "receive", client_unit->ip, client_unit->port, client_unit->debug); + ret = clithd_proc_epevs_epollin(curr_epev, client_unit); + if (ret != PROGRAM_OK) { + return ret; } } } - return PROGRAM_OK; } @@ -299,6 +456,17 @@ int32_t clithd_proc_epevs(struct ClientUnit *client_unit) void *client_s_create_and_run(void *arg) { struct ClientUnit *client_unit = (struct ClientUnit *)arg; + // update domain ip info. + client_get_domain_ipversion(client_unit->protocol_type_mode, client_unit); + + if (client_unit->protocol_type_mode == UDP_MULTICAST) { + client_unit->threadVolume.ip_type_info = IPV4_MULTICAST; + } else { + client_unit->threadVolume.ip_type_info = (client_unit->ip.addr_family == AF_INET ? IPV4_STR : IPV6_STR); + } + client_unit->threadVolume.thread_id = pthread_self(); + + client_unit->threadVolume.domain = client_unit->domain; if (client_thread_create_epfd_and_reg(client_unit) < 0) { exit(PROGRAM_FAULT); @@ -316,6 +484,42 @@ void *client_s_create_and_run(void *arg) return (void *)PROGRAM_OK; } +// prase the specific supported TCP IP types by cfg_mode. +static void client_get_protocol_type_by_cfgmode(uint8_t mode, int32_t *support_type_array, int32_t buff_len, + int32_t *actual_len) +{ + int32_t index = 0; + for (uint8_t i = V4_TCP; i < PROTOCOL_MODE_MAX; i++) { + if (i == V6_UDP) { + continue; + } + if (getbit_num(mode, i) == 1) { + if (index >= buff_len) { + PRINT_ERROR("index is over, index =%d", index); + return; + } + support_type_array[index] = i; + index++; + } + } + *actual_len = index; +} + +static void client_get_domain_ipversion(uint8_t protocol_type, struct ClientUnit *client_unit) +{ + client_unit->domain = g_cfgmode_map[protocol_type].domain; + client_unit->ip.addr_family = g_cfgmode_map[protocol_type].ip_family; +} + +static void alarm_init() +{ + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = &timer_handle; + sigaction(SIGALRM, &sa, NULL); + alarm(TIME_SCAN_INTERVAL); +} + // create client and run int32_t client_create_and_run(struct ProgramParams *params) { @@ -323,16 +527,44 @@ int32_t client_create_and_run(struct ProgramParams *params) const uint32_t thread_num = params->thread_num; pthread_t *tids = (pthread_t *)malloc(thread_num * sizeof(pthread_t)); struct Client *client = (struct Client *)malloc(sizeof(struct Client)); + g_client_begin = client; + client->threadNum = thread_num; + struct ClientUnit *client_unit = (struct ClientUnit *)malloc(sizeof(struct ClientUnit)); + memset_s(client_unit, sizeof(struct ClientUnit), 0, sizeof(struct ClientUnit)); + int32_t protocol_support_array[PROTOCOL_MODE_MAX] = {0}; + int32_t number_of_support_type = 1; if (pthread_mutex_init(&client_debug_mutex, NULL) < 0) { PRINT_ERROR("client can't init posix mutex %d! ", errno); return PROGRAM_FAULT; } + bool v4_cfg_flag = (strcmp(params->ip, PARAM_DEFAULT_IP) != 0); + bool v6_cfg_flag = (strcmp(params->ipv6, PARAM_DEFAULT_IP_V6) != 0); + bool multcact_cfg_flag = (strcmp(params->groupip, PARAM_DEFAULT_GROUPIP) != 0); + + bool mixed_mode_flag = false; + if ((strchr(params->domain, ',') != NULL) || (v4_cfg_flag && v6_cfg_flag) || + (multcact_cfg_flag && (v4_cfg_flag || v6_cfg_flag))) { + mixed_mode_flag = true; + } + client->uints = client_unit; client->debug = params->debug; + uint8_t protocol_type_mode = program_get_protocol_mode_by_domain_ip(params->domain, params->ip, params->ipv6, + params->groupip); + + client_get_protocol_type_by_cfgmode(protocol_type_mode, protocol_support_array, PROTOCOL_MODE_MAX, + &number_of_support_type); + + uint32_t port = UNIX_TCP_PORT_MIN; + uint32_t sport = 0; + uint32_t sp = 0; + + alarm_init(); + for (uint32_t i = 0; i < thread_num; ++i) { client_unit->handlers = (struct ClientHandler *)malloc(connect_num * sizeof(struct ClientHandler)); for (uint32_t j = 0; j < connect_num; ++j) { @@ -342,13 +574,42 @@ int32_t client_create_and_run(struct ProgramParams *params) client_unit->epfd = -1; client_unit->epevs = (struct epoll_event *)malloc(CLIENT_EPOLL_SIZE_MAX * sizeof(struct epoll_event)); client_unit->curr_connect = 0; - client_unit->send_bytes = 0; - client_unit->ip = inet_addr(params->ip); - client_unit->groupip = inet_addr(params->groupip); - client_unit->port = htons(params->port); - client_unit->sport = htons(params->sport); + + client_unit->threadVolume.cur_connect_num = 0; + client_unit->threadVolume.thread_id = 0; + client_unit->threadVolume.send_bytes = 0; + client_unit->threadVolume.ip_type_info = INVAILD_STR; + client_unit->threadVolume.domain = INVAILD_STR; + + client_unit->ip.addr_family = params->addr_family; + inet_pton(AF_INET, params->ip, &client_unit->ip.u_addr.ip4); + inet_pton(AF_INET6, params->ipv6, &client_unit->ip.u_addr.ip6); + client_unit->groupip.addr_family = AF_INET; + inet_pton(AF_INET, params->groupip, &client_unit->groupip.u_addr); + client_unit->groupip_interface.addr_family = params->addr_family; + inet_pton(AF_INET, params->groupip_interface, &client_unit->groupip_interface.u_addr); + + /* loop to set ports to each client_units */ + while (!((params->port)[port])) { + port = (port + 1) % UNIX_TCP_PORT_MAX; + } + client_unit->port = htons(port++); + + sp = sport; + sport++; + while (!((params->sport)[sport]) && (sport != sp)) { + sport = (sport + 1) % UNIX_TCP_PORT_MAX; + } + + client_unit->sport = htons(sport); client_unit->connect_num = params->connect_num; client_unit->pktlen = params->pktlen; + if (strcmp(params->as, "loop") == 0) { + client_unit->loop = 1; + } else { + client_unit->loop = 0; + } + client_unit->verify = params->verify; client_unit->domain = params->domain; client_unit->api = params->api; @@ -357,6 +618,16 @@ int32_t client_create_and_run(struct ProgramParams *params) client_unit->next = (struct ClientUnit *)malloc(sizeof(struct ClientUnit)); memset_s(client_unit->next, sizeof(struct ClientUnit), 0, sizeof(struct ClientUnit)); + if (number_of_support_type > 0) { + int32_t index = i % number_of_support_type; + client_unit->protocol_type_mode = protocol_support_array[index]; + } + if (client_unit->protocol_type_mode == V4_UDP || client_unit->protocol_type_mode == V6_UDP || + client_unit->protocol_type_mode == UDP_MULTICAST) { + client_unit->pktlen = params->pktlen > UDP_PKTLEN_MAX ? UDP_PKTLEN_MAX : params->pktlen; + } else { + client_unit->pktlen = params->pktlen; + } if (pthread_create((tids + i), NULL, client_s_create_and_run, client_unit) < 0) { PRINT_ERROR("client can't create thread of poisx %d! ", errno); return PROGRAM_FAULT; @@ -367,9 +638,34 @@ int32_t client_create_and_run(struct ProgramParams *params) if (client->debug == false) { printf("[program informations]: \n\n"); } + + struct ThreadUintInfo *beginVolume = (struct ThreadUintInfo *)malloc(thread_num * sizeof(struct ThreadUintInfo)); + if (beginVolume == NULL) { + return PROGRAM_FAULT; + } + memset_s(beginVolume, thread_num * sizeof(struct ThreadUintInfo), 0, thread_num * sizeof(struct ThreadUintInfo)); + struct ThreadUintInfo *endVolume = (struct ThreadUintInfo *)malloc(thread_num * sizeof(struct ThreadUintInfo)); + if (endVolume == NULL) { + return PROGRAM_FAULT; + } + memset_s(endVolume, thread_num * sizeof(struct ThreadUintInfo), 0, thread_num * sizeof(struct ThreadUintInfo)); + + if (strcmp(params->as, "loop") == 0) { + client->loop = true; + } + while (true) { - client_info_print(client); + if (strcmp(params->as, "loop") == 0) { + loop_info_print(); + } + if (mixed_mode_flag == true) { + client_info_print_mixed(client, beginVolume, endVolume); + } else { + client_info_print(client); + } } + free(beginVolume); + free(endVolume); pthread_mutex_destroy(&client_debug_mutex); diff --git a/examples/src/parameter.c b/examples/src/parameter.c index 1bb6858..7f519e7 100644 --- a/examples/src/parameter.c +++ b/examples/src/parameter.c @@ -13,6 +13,8 @@ #include "parameter.h" +static int32_t g_inject_delay[INJECT_DELAY_MAX] = {0}; +static int32_t g_inject_skip[INJECT_SKIP_MAX]; // program short options const char prog_short_opts[] = \ @@ -30,9 +32,11 @@ const char prog_short_opts[] = \ "r" // ringpmd "d" // debug "h" // help - "E" // epollcreate - "C" // accept + "E:" // epollcreate + "C:" // accept "g:" // group address + "k:" // tcp keep_alive + "I:" // fault inject ; // program long options @@ -55,17 +59,72 @@ const struct ProgramOption prog_long_opts[] = \ {PARAM_NAME_EPOLLCREATE, REQUIRED_ARGUMETN, NULL, PARAM_NUM_EPOLLCREATE}, {PARAM_NAME_ACCEPT, REQUIRED_ARGUMETN, NULL, PARAM_NUM_ACCEPT}, {PARAM_NAME_GROUPIP, REQUIRED_ARGUMETN, NULL, PARAM_NUM_GROUPIP}, + {PARAM_NAME_KEEPALIVE, REQUIRED_ARGUMETN, NULL, PARAM_NUM_KEEPALIVE}, + {PARAM_NAME_INJECT, REQUIRED_ARGUMETN, NULL, PARAM_NUM_INJECT}, }; // get long options int getopt_long(int argc, char * const argv[], const char *optstring, const struct ProgramOption *long_opts, int *long_idx); +// index [0,7) +uint8_t getbit_num(uint8_t mode, uint8_t index) +{ + return (mode & ((uint8_t)1 << index)) != 0; +} + +uint8_t setbitnum_on(uint8_t mode, uint8_t index) +{ + mode |= ((uint8_t)1 << index); + return mode; +} + +uint8_t setbitnum_off(uint8_t mode, uint8_t index) +{ + mode &= ~((uint8_t)1 << index); + return mode; +} + +static uint8_t program_set_protocol_mode(uint8_t protocol_mode, char *ipv4, char *ipv6, uint8_t index_v4, + uint8_t index_v6) +{ + uint8_t protocol_mode_temp = protocol_mode; + if (strcmp(ipv4, PARAM_DEFAULT_IP) != 0) { + protocol_mode_temp = setbitnum_on(protocol_mode_temp, index_v4); + } + if (strcmp(ipv6, PARAM_DEFAULT_IP_V6) != 0) { + protocol_mode_temp = setbitnum_on(protocol_mode_temp, index_v6); + } + return protocol_mode_temp; +} + +uint8_t program_get_protocol_mode_by_domain_ip(char* domain, char* ipv4, char* ipv6, char* groupip) +{ + uint8_t protocol_mode = 0; + char *cur_ptr = NULL; + char *next_Ptr = NULL; + cur_ptr = strtok_s(domain, ",", &next_Ptr); + while (cur_ptr) { + if (strcmp(cur_ptr, "tcp") == 0) { + protocol_mode = program_set_protocol_mode(protocol_mode, ipv4, ipv6, V4_TCP, V6_TCP); + } else if (strcmp(cur_ptr, "udp") == 0) { + protocol_mode = program_set_protocol_mode(protocol_mode, ipv4, ipv6, V4_UDP, V6_UDP); + } else if (strcmp(cur_ptr, "unix") == 0) { + protocol_mode = setbitnum_on(protocol_mode, UNIX); + } + cur_ptr = strtok_s(NULL, ",", &next_Ptr); + } + + if (strcmp(groupip, PARAM_DEFAULT_GROUPIP) != 0) { + protocol_mode = setbitnum_on(protocol_mode, UDP_MULTICAST); + } + return protocol_mode; +} // set `as` parameter void program_param_parse_as(struct ProgramParams *params) { - if (strcmp(optarg, "server") == 0 || strcmp(optarg, "client") == 0) { + if (strcmp(optarg, "server") == 0 || strcmp(optarg, "client") == 0 || strcmp(optarg, "loop") == 0) { params->as = optarg; } else { PRINT_ERROR("illigal argument -- %s \n", optarg); @@ -73,40 +132,113 @@ void program_param_parse_as(struct ProgramParams *params) } } -// set `ip` parameter -void program_param_parse_ip(struct ProgramParams *params) +bool ip_is_v6(const char *cp) +{ + if (cp != NULL) { + const char *c; + for (c = cp; *c != 0; c++) { + if (*c == ':') { + return 1; + } + } + } + return 0; +} + + +static bool program_ipv4_check(char *ipv4) { - if (inet_addr(optarg) != INADDR_NONE) { - params->ip = optarg; + in_addr_t ip = ntohl(inet_addr(ipv4)); + if (ip == INADDR_NONE) { + PRINT_ERROR("illigal argument -- %s \n", ipv4); + return false; + } + if ((ip >= ntohl(inet_addr("1.0.0.1")) && ip <= ntohl(inet_addr("126.255.255.254"))) || + (ip >= ntohl(inet_addr("127.0.0.1")) && ip <= ntohl(inet_addr("127.255.255.254"))) || + (ip >= ntohl(inet_addr("128.0.0.1")) && ip <= ntohl(inet_addr("191.255.255.254"))) || + (ip >= ntohl(inet_addr("192.0.0.1")) && ip <= ntohl(inet_addr("223.255.255.254"))) || + (ip >= ntohl(inet_addr("224.0.0.1")) && ip <= ntohl(inet_addr("224.255.255.255"))) ) { // Broadcast IP + return true; + } + + PRINT_ERROR("illigal argument -- %s \n", ipv4); + return false; +} + +static void program_param_parse_ipv4_addr(char* v4ip_addr, struct ProgramParams *params) +{ + struct in6_addr ip_tmp; + params->addr_family = AF_INET; + if (inet_pton(params->addr_family, v4ip_addr, &ip_tmp) > 0 && program_ipv4_check(v4ip_addr) == true) { + params->ip = v4ip_addr; } else { - PRINT_ERROR("illigal argument -- %s \n", optarg); + PRINT_ERROR("illegal ipv4 addr -- %s \n", v4ip_addr); exit(PROGRAM_ABORT); } } -// set `port` parameter -void program_param_parse_port(struct ProgramParams *params) +static void program_param_parse_ipv6_addr(char* v6ip_add, struct ProgramParams *params) { - int32_t port_arg = strtol(optarg, NULL, 0); - printf("%d\n", port_arg); - if (CHECK_VAL_RANGE(port_arg, UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX) == true) { - params->port = (uint32_t)port_arg; + struct in6_addr ip_tmp; + params->addr_family = AF_INET6; + if (inet_pton(AF_INET6, v6ip_add, &ip_tmp) > 0) { + params->ipv6 = v6ip_add; } else { - PRINT_ERROR("illigal argument -- %s \n", optarg); + PRINT_ERROR("illegal ipv6 addr -- %s \n", v6ip_add); exit(PROGRAM_ABORT); } } +// set `ip` parameter,支持同时配置 ipv4 和 ipv6 地址,格式为 ipv4,ipv6 +void program_param_parse_ip(struct ProgramParams *params) +{ + char *cur_ptr = NULL; + char *next_ptr = NULL; + + cur_ptr = strtok_s(optarg, ",", &next_ptr); + while (cur_ptr) { + if (ip_is_v6(cur_ptr)) { + program_param_parse_ipv6_addr(cur_ptr, params); + } else { + program_param_parse_ipv4_addr(cur_ptr, params); + } + cur_ptr = strtok_s(NULL, ",", &next_ptr); + } +} + +// set `port` parameter +void program_param_parse_port(struct ProgramParams *params) +{ + char* port_list = optarg; + char* token = NULL; + int32_t port_arg = 0; + params->port[PARAM_DEFAULT_PORT] = 0; + + while ((token = strtok_r(port_list, ",", &port_list))) { + port_arg = strtol(token, NULL, 0); + if (CHECK_VAL_RANGE(port_arg, UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX) == true) { + params->port[port_arg] = 1; + } else { + PRINT_ERROR("illigal argument -- %s \n", optarg); + exit(PROGRAM_ABORT); + } + } +} // set `sport` parameter void program_param_parse_sport(struct ProgramParams *params) { - int32_t sport_arg = strtol(optarg, NULL, 0); - printf("%d\n", sport_arg); - if (CHECK_VAL_RANGE(sport_arg, UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX) == true) { - params->sport = (uint32_t)sport_arg; - } else { - PRINT_ERROR("illigal argument -- %s \n", optarg); - exit(PROGRAM_ABORT); + char* port_list = optarg; + char* token = NULL; + int32_t port_arg = 0; + + while ((token = strtok_r(port_list, ",", &port_list))) { + port_arg = strtol(token, NULL, 0); + if (CHECK_VAL_RANGE(port_arg, UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX) == true) { + params->sport[port_arg] = 1; + } else { + PRINT_ERROR("illigal argument -- %s \n", optarg); + exit(PROGRAM_ABORT); + } } } @@ -148,12 +280,23 @@ void program_param_parse_threadnum(struct ProgramParams *params) // set `domain` parameter void program_param_parse_domain(struct ProgramParams *params) { - if (strcmp(optarg, "unix") == 0 || strcmp(optarg, "tcp") == 0 || strcmp(optarg, "udp") == 0) { - params->domain = optarg; - } else { - PRINT_ERROR("illigal argument -- %s \n", optarg); + char temp[100] = {0}; + int32_t ret = strcpy_s(temp, sizeof(temp) / sizeof(char), optarg); + if (ret != 0) { + PRINT_ERROR("strcpy_s fail ret=%d \n", ret); exit(PROGRAM_ABORT); } + char *cur_ptr = temp; + char *next_ptr = NULL; + cur_ptr = strtok_s(cur_ptr, ",", &next_ptr); + while (cur_ptr) { + if (strcmp(cur_ptr, "tcp") != 0 && strcmp(cur_ptr, "udp") != 0 && strcmp(cur_ptr, "unix") != 0) { + PRINT_ERROR("illigal argument -- %s \n", cur_ptr); + exit(PROGRAM_ABORT); + } + cur_ptr = strtok_s(NULL, ",", &next_ptr); + } + params->domain = optarg; } // set `api` parameter @@ -174,6 +317,9 @@ void program_param_parse_pktlen(struct ProgramParams *params) int32_t pktlen_arg = strtol(optarg, NULL, 0); if (CHECK_VAL_RANGE(pktlen_arg, MESSAGE_PKTLEN_MIN, MESSAGE_PKTLEN_MAX) == true) { params->pktlen = (uint32_t)pktlen_arg; + if (strstr(params->domain, "udp") && params->pktlen > UDP_PKTLEN_MAX) { + PRINT_WARNNING("udp message too long, change it to %d \n", UDP_PKTLEN_MAX); + } } else { PRINT_ERROR("illigal argument -- %s \n", optarg); exit(PROGRAM_ABORT); @@ -202,16 +348,196 @@ void program_param_parse_accept(struct ProgramParams *params) } } +// set `tcp_keepalive_idle` parameter +void program_param_parse_keepalive(struct ProgramParams *params) +{ + char *token = NULL; + char *next_token = NULL; + token = strtok_s(optarg, ",", &next_token); + if (token == NULL) { + PRINT_ERROR("parse keep_alive idle null, illigal argument(%s) \n", optarg); + exit(PROGRAM_ABORT); + } + + int32_t keep_alive_idle = strtol(optarg, NULL, 0); + if (keep_alive_idle > 0 && keep_alive_idle <= TCP_KEEPALIVE_IDLE_MAX) { + params->tcp_keepalive_idle = keep_alive_idle; + } else { + PRINT_ERROR("keep_alive_idle=%d,illigal argument -- %s \n", keep_alive_idle, optarg); + exit(PROGRAM_ABORT); + } + + token = strtok_s(NULL, ",", &next_token); + if (token == NULL) { + PRINT_ERROR("parse keep_alive interval null, illigal argument(%s) \n", optarg); + exit(PROGRAM_ABORT); + } + int32_t keep_alive_interval = strtol(token, NULL, 0); + if (keep_alive_interval > 0 && keep_alive_interval <= TCP_KEEPALIVE_IDLE_MAX) { + params->tcp_keepalive_interval = keep_alive_interval; + } else { + PRINT_ERROR("keep_alive_interval=%d,illigal argument -- %s \n", keep_alive_interval, optarg); + exit(PROGRAM_ABORT); + } +} + // set `group ip` parameter void program_param_parse_groupip(struct ProgramParams *params) { - in_addr_t ip = inet_addr(optarg); - if (ip != INADDR_NONE && ip >= inet_addr("224.0.0.0") && ip <= inet_addr("239.255.255.255")) { - params->groupip = optarg; + char *cur_ptr = NULL; + char *next_ptr = NULL; + + cur_ptr = strtok_s(optarg, ",", &next_ptr); + if (program_ipv4_check(cur_ptr) == false) { + PRINT_ERROR("illigal argument -- %s \n", cur_ptr); + exit(PROGRAM_ABORT); + } + + in_addr_t ip = ntohl(inet_addr(cur_ptr)); + if (ip != INADDR_NONE && ip >= ntohl(inet_addr("224.0.0.0")) && ip <= ntohl(inet_addr("239.255.255.255"))) { + params->groupip = cur_ptr; } else { - PRINT_ERROR("illigal argument -- %s \n", optarg); + PRINT_ERROR("illigal argument -- %s \n", cur_ptr); + exit(PROGRAM_ABORT); + } + + if (*next_ptr) { + if (program_ipv4_check(next_ptr)) { + params->groupip_interface = next_ptr; + } else { + PRINT_ERROR("illigal argument -- %s \n", next_ptr); + exit(PROGRAM_ABORT); + } + } +} + +void fault_inject_delay(delay_type type) +{ + if (g_inject_delay[type]) { + printf("FAULT INJECT: Delay begin, sleep %d seconds.\n", g_inject_delay[type]); + sleep(g_inject_delay[type]); + g_inject_delay[type] = 0; + printf("FAULT INJECT: Delay finished.\n"); + } +} + + +// apply fault inject type of delay +static void delay_param_parse(struct ProgramParams *params) +{ + int32_t time = 0; + if (params->inject[INJECT_TIME_IDX] != NULL) { + time = atoi(params->inject[INJECT_TIME_IDX]); + } + if (time <= 0) { + PRINT_ERROR("FAULT INJECT: delay time input error! receive: \"%s\"\n", params->inject[INJECT_TIME_IDX]); + exit(PROGRAM_ABORT); + } + + char *location = params->inject[INJECT_LOCATION_IDX]; + if (location == NULL) { + PRINT_ERROR("FAULT INJECT: Lack param for delay fault inject, The location is not appointed.\n"); exit(PROGRAM_ABORT); } + + if (strcmp("before_accept", location) == 0) { + g_inject_delay[INJECT_DELAY_ACCEPT] = time; + return; + } + if (strcmp("before_read", location) == 0) { + g_inject_delay[INJECT_DELAY_READ] = time; + return; + } + if (strcmp("before_write", location) == 0) { + g_inject_delay[INJECT_DELAY_WRITE] = time; + return; + } + if (strcmp("before_read_and_write", location) == 0) { + g_inject_delay[INJECT_DELAY_READ] = time; + g_inject_delay[INJECT_DELAY_WRITE] = time; + return; + } + + PRINT_ERROR("FAULT INJECT: Unidentified fault inject location -- %s \n", location); + exit(PROGRAM_ABORT); +} + +// apply fault inject type of skip +static void skip_param_parse(struct ProgramParams *params) +{ + char* location = params->inject[INJECT_SKIP_IDX]; + if (location == NULL) { + PRINT_ERROR("FAULT INJECT: Lack param for skip fault inject, location is not appointed.\n"); + exit(PROGRAM_ABORT); + } + + if (strcmp("read", location) == 0) { + g_inject_skip[INJECT_SKIP_READ] = 1; + return; + } + if (strcmp("write", location) == 0) { + g_inject_skip[INJECT_SKIP_WRITE] = 1; + return; + } + if (strcmp("read_and_write", location) == 0) { + g_inject_skip[INJECT_SKIP_READ] = 1; + g_inject_skip[INJECT_SKIP_WRITE] = 1; + return; + } + + PRINT_ERROR("FAULT INJECT: Unidentified fault inject location -- %s \n", location); + exit(PROGRAM_ABORT); +} + +// judge if need skip fault inject +int32_t get_g_inject_skip(skip_type type) +{ + return g_inject_skip[type]; +} + +// check legitimacy of fault injection and apply it. +static void apply_fault_inject(struct ProgramParams *params) +{ + char *inject_type = params->inject[INJECT_TYPE_IDX]; + if (strcmp("delay", inject_type) == 0) { + delay_param_parse(params); + return; + } + if (strcmp("skip", inject_type) == 0) { + skip_param_parse(params); + return; + } + + PRINT_ERROR("FAULT INJCET: Unidentified fault inject -- %s \n", inject_type); + exit(PROGRAM_ABORT); +} + +// set `fault injection` parameter +static void program_param_parse_inject(struct ProgramParams *params) +{ + int32_t inject_idx = 0; + char *inject_input = strdup(optarg); + if (inject_input == NULL) { + PRINT_ERROR("FAULT INJCET: Insufficient memory, strdup failed.\n"); + exit(PROGRAM_ABORT); + } + + char *context = NULL; + char *elem = strtok_s(inject_input, " ", &context); + if (elem == NULL) { + PRINT_ERROR("FAULT INJECT: Input error. -- %s \n", inject_input); + exit(PROGRAM_ABORT); + } + while (elem != NULL) { + if (inject_idx == FAULT_INJECT_PARA_COUNT) { + PRINT_ERROR("FAULT INJECT: Exceed the max count (3) of fault inject params. -- %s\n", optarg); + exit(PROGRAM_ABORT); + } + params->inject[inject_idx++] = elem; + elem = strtok_s(NULL, " ", &context); + } + + apply_fault_inject(params); } // initialize the parameters @@ -219,8 +545,11 @@ void program_params_init(struct ProgramParams *params) { params->as = PARAM_DEFAULT_AS; params->ip = PARAM_DEFAULT_IP; - params->port = PARAM_DEFAULT_PORT; - params->sport = PARAM_DEFAULT_SPORT; + params->ipv6 = PARAM_DEFAULT_IP_V6; + params->addr_family = PARAM_DEFAULT_ADDR_FAMILY; + memset_s(params->port, sizeof(bool)*UNIX_TCP_PORT_MAX, 0, sizeof(bool)*UNIX_TCP_PORT_MAX); + memset_s(params->sport, sizeof(bool)*UNIX_TCP_PORT_MAX, 0, sizeof(bool)*UNIX_TCP_PORT_MAX); + (params->port)[PARAM_DEFAULT_PORT] = 1; params->model = PARAM_DEFAULT_MODEL; params->thread_num = PARAM_DEFAULT_THREAD_NUM; params->connect_num = PARAM_DEFAULT_CONNECT_NUM; @@ -233,15 +562,19 @@ void program_params_init(struct ProgramParams *params) params->epollcreate = PARAM_DEFAULT_EPOLLCREATE; params->accept = PARAM_DEFAULT_ACCEPT; params->groupip = PARAM_DEFAULT_GROUPIP; + params->groupip_interface = PARAM_DEFAULT_GROUPIP; + params->tcp_keepalive_idle = PARAM_DEFAULT_KEEPALIVEIDLE; + params->tcp_keepalive_interval = PARAM_DEFAULT_KEEPALIVEIDLE; } // print program helps void program_params_help(void) { printf("\n"); - printf("-a, --as [server | client]: set programas server or client. \n"); + printf("-a, --as [server | client | loop]: set programas server, client or loop. \n"); printf(" server: as server. \n"); printf(" client: as client. \n"); + printf(" loop: as server and client. \n"); printf("-i, --ip [???.???.???.???]: set ip address. \n"); printf("-g, --groupip [???.???.???.???]: set group ip address. \n"); printf("-p, --port [????]: set port number in range of %d - %d. \n", UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX); @@ -268,6 +601,16 @@ void program_params_help(void) printf("-h, --help: see helps. \n"); printf("-E, --epollcreate [ec | ec1]: epoll_create method. \n"); printf("-C, --accept [ac | ac4]: accept method. \n"); + printf("-k, --keep_alive [keep_alive_idle:keep_alive_interval]: set tcp-alive info in range of %d-%d. \n", + PARAM_DEFAULT_KEEPALIVEIDLE, TCP_KEEPALIVE_IDLE_MAX); + printf("-I, --inject [\"fault_inject_param0 fault_inject_param1 fault_inject_param2\"]: fault inject\n"); + printf(" for example: \"delay 20 before_accept\"\n"); + printf(" \"delay 20 before_read\"\n"); + printf(" \"delay 20 before_write\"\n"); + printf(" \"delay 20 before_read_and_write\"\n"); + printf(" \"skip read\"\n"); + printf(" \"skip write\"\n"); + printf(" \"skip read_and_write\"\n"); printf("\n"); } @@ -295,7 +638,7 @@ int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char * case (PARAM_NUM_PORT): program_param_parse_port(params); break; - case (PARAM_NUM_SPORT): + case (PARAM_NUM_SPORT): program_param_parse_sport(params); break; case (PARAM_NUM_MODEL): @@ -331,9 +674,15 @@ int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char * case (PARAM_NUM_ACCEPT): program_param_parse_accept(params); break; - case (PARAM_NUM_GROUPIP): - program_param_parse_groupip(params); - break; + case (PARAM_NUM_GROUPIP): + program_param_parse_groupip(params); + break; + case (PARAM_NUM_KEEPALIVE): + program_param_parse_keepalive(params); + break; + case (PARAM_NUM_INJECT): + program_param_parse_inject(params); + break; case (PARAM_NUM_HELP): program_params_help(); return PROGRAM_ABORT; @@ -345,11 +694,6 @@ int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char * } } - if (strcmp(params->domain, "tcp") != 0) { - params->thread_num = 1; - params->connect_num = 1; - } - return PROGRAM_OK; } @@ -361,22 +705,47 @@ void program_params_print(struct ProgramParams *params) printf("--> [as]: %s \n", params->as); if (strcmp(params->groupip, PARAM_DEFAULT_GROUPIP) != 0) { if (strcmp(params->as, "server") == 0) { - printf("--> [server ip]: %s \n", params->ip); printf("--> [server group ip]: %s \n", params->groupip); + printf("--> [server groupip_interface]: %s \n", params->groupip_interface); } else { - printf("--> [server ip]: %s \n", params->groupip); - printf("--> [client send ip]: %s \n", params->ip); + printf("--> [client group ip]: %s \n", params->groupip); + printf("--> [client groupip_interface]: %s \n", params->groupip_interface); } - } else { - printf("--> [server ip]: %s \n", params->ip); } - if ((strcmp(params->as, "server") == 0 && strcmp(params->groupip, PARAM_DEFAULT_GROUPIP)) != 0) { - printf("--> [server group ip]: %s \n", params->groupip); + printf("--> [server ip]: %s \n", params->ip); + if (strcmp(params->ipv6, PARAM_DEFAULT_IP_V6) != 0) { + printf("--> [server ipv6]: %s \n", params->ipv6); + } + + printf("--> [server port]: "); + uint32_t comma = 0; + uint32_t sport = 0; + + /* use comma to print port list */ + for (uint32_t i = UNIX_TCP_PORT_MIN; i < UNIX_TCP_PORT_MAX; i++) { + if ((params->port)[i]) { + printf("%s%u", comma?",":"", i); + comma = 1; + } + if ((params->sport)[i]) { + sport = i; + } } - printf("--> [server port]: %u \n", params->port); - if (params->sport && strcmp(params->as, "client") == 0) { - printf("--> [client sport]: %u \n", params->sport); + printf(" \n"); + + /* use comma to print sport list */ + if (sport && strcmp(params->as, "client") == 0) { + printf("--> [client sport]: "); + comma = 0; + for (uint32_t i = UNIX_TCP_PORT_MIN; i < sport + 1; i++) { + if ((params->sport)[i]) { + printf("%s%u", comma?",":"", i); + comma = 1; + } + } + printf(" \n"); } + if (strcmp(params->as, "server") == 0) { printf("--> [model]: %s \n", params->model); } @@ -404,5 +773,16 @@ void program_params_print(struct ProgramParams *params) printf("--> [debug]: %s \n", (params->debug == true) ? "on" : "off"); printf("--> [epoll create]: %s \n", params->epollcreate); printf("--> [accept]: %s \n", params->accept); + printf("--> [inject]: "); + if (params->inject[INJECT_TYPE_IDX] == NULL) { + printf("none \n"); + } else { + for (int32_t i = 0; i < FAULT_INJECT_PARA_COUNT; ++i) { + if (params->inject[i] != NULL) { + printf("%s ", params->inject[i]); + } + } + printf("\n"); + } printf("\n"); } diff --git a/examples/src/server.c b/examples/src/server.c index 8634dde..7bc7d9e 100644 --- a/examples/src/server.c +++ b/examples/src/server.c @@ -14,20 +14,22 @@ #include "server.h" static pthread_mutex_t server_debug_mutex; // the server mutex for debug +struct LoopInfo loopmod; // server debug information print -void server_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug) +void server_debug_print(const char *ch_str, const char *act_str, ip_addr_t *ip, uint16_t port, bool debug) { if (debug == true) { pthread_mutex_lock(&server_debug_mutex); - struct in_addr sin_addr; - sin_addr.s_addr = ip; + uint8_t str_len = ip->addr_family == AF_INET ? INET_ADDRSTRLEN : INET6_ADDRSTRLEN; + char str_ip[str_len]; + inet_ntop(ip->addr_family, &ip->u_addr, str_ip, str_len); PRINT_SERVER("[%s] [pid: %d] [tid: %ld] [%s <- %s:%d]. ", \ ch_str, \ getpid(), \ pthread_self(), \ act_str, \ - inet_ntoa(sin_addr), \ + str_ip, \ ntohs(port)); pthread_mutex_unlock(&server_debug_mutex); } @@ -37,7 +39,7 @@ void server_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, u void sermud_info_print(struct ServerMud *server_mud) { if (server_mud->debug == false) { - uint32_t curr_connect = server_mud->curr_connect; + uint32_t curr_connect = 0; struct timeval begin; gettimeofday(&begin, NULL); @@ -48,6 +50,7 @@ void sermud_info_print(struct ServerMud *server_mud) struct ServerMudWorker *begin_uint = server_mud->workers; while (begin_uint != NULL) { begin_recv_bytes += begin_uint->recv_bytes; + curr_connect += begin_uint->curr_connect; begin_uint = begin_uint->next; } @@ -122,45 +125,82 @@ int32_t sermud_listener_create_epfd_and_reg(struct ServerMud *server_mud) } struct epoll_event ep_ev; - ep_ev.data.ptr = (void *)&(server_mud->listener); ep_ev.events = EPOLLIN | EPOLLET; - if (epoll_ctl(server_mud->epfd, EPOLL_CTL_ADD, server_mud->listener.fd, &ep_ev) < 0) { - PRINT_ERROR("server can't control epoll %d! ", errno); - return PROGRAM_FAULT; + for (int i = 0; i < PROTOCOL_MODE_MAX; i++) { + if (server_mud->listener.listen_fd_array[i] != -1) { + struct ServerHandler *server_handler = (struct ServerHandler *)malloc(sizeof(struct ServerHandler)); + memset_s(server_handler, sizeof(struct ServerHandler), 0, sizeof(struct ServerHandler)); + server_handler->fd = server_mud->listener.listen_fd_array[i]; + ep_ev.data.ptr = (void *)server_handler; + if (epoll_ctl(server_mud->epfd, EPOLL_CTL_ADD, server_mud->listener.listen_fd_array[i], &ep_ev) < 0) { + PRINT_ERROR("epoll_ctl failed %d! listen_fd=%d ", errno, server_mud->listener.listen_fd_array[i]); + return PROGRAM_FAULT; + } + } + } + + return PROGRAM_OK; +} + +static void sermud_accept_get_remote_ip(sockaddr_t *accept_addr, ip_addr_t *remote_ip, bool is_tcp_v6_flag) +{ + remote_ip->addr_family = is_tcp_v6_flag ? AF_INET6 : AF_INET; + if (is_tcp_v6_flag == false) { + remote_ip->u_addr.ip4 = ((struct sockaddr_in *)accept_addr)->sin_addr; + } else { + remote_ip->u_addr.ip6 = ((struct sockaddr_in6 *)accept_addr)->sin6_addr; } +} - server_debug_print("server mud listener", "waiting", server_mud->ip, server_mud->port, server_mud->debug); +int32_t sermud_set_socket_opt(int32_t accept_fd, struct ServerMud *server_mud) +{ + if (set_tcp_keep_alive_info(accept_fd, server_mud->tcp_keepalive_idle, server_mud->tcp_keepalive_interval) < 0) { + PRINT_ERROR("cant't set_tcp_keep_alive_info! "); + return PROGRAM_FAULT; + } + if (set_socket_unblock(accept_fd) < 0) { + PRINT_ERROR("server can't set the connect socket to unblock! "); + return PROGRAM_FAULT; + } return PROGRAM_OK; } // the listener thread, unblock, dissymmetric server accepts the connections -int32_t sermud_listener_accept_connects(struct ServerMud *server_mud) +int32_t sermud_listener_accept_connects(struct epoll_event *curr_epev, struct ServerMud *server_mud) { + int32_t fd = ((struct ServerHandler*)(curr_epev->data.ptr))->fd; + fault_inject_delay(INJECT_DELAY_ACCEPT); + while (true) { - struct sockaddr_in accept_addr; - uint32_t sockaddr_in_len = sizeof(struct sockaddr_in); + sockaddr_t accept_addr; + bool is_tcp_v6_flag = (fd == server_mud->listener.listen_fd_array[V6_TCP]) ? true : false; + + uint32_t sockaddr_in_len = is_tcp_v6_flag ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in); + int32_t accept_fd; - if (strcmp(server_mud->domain, "udp") == 0) { - break; - } + + int32_t listen_fd_index = is_tcp_v6_flag ? V6_TCP : V4_TCP; + int32_t listen_fd = server_mud->listener.listen_fd_array[listen_fd_index]; if (strcmp(server_mud->accept, "ac4") == 0) { - accept_fd = accept4(server_mud->listener.fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len, SOCK_CLOEXEC); + accept_fd = accept4(listen_fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len, SOCK_CLOEXEC); } else { - accept_fd = accept(server_mud->listener.fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len); + accept_fd = accept(listen_fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len); } - + if (accept_fd < 0) { break; } - if (set_socket_unblock(accept_fd) < 0) { - PRINT_ERROR("server can't set the connect socket to unblock! "); + if (sermud_set_socket_opt(accept_fd, server_mud) < 0) { return PROGRAM_FAULT; } - ++(server_mud->curr_connect); + // sockaddr to ip, port + ip_addr_t remote_ip; + uint16_t remote_port = ((struct sockaddr_in *)&accept_addr)->sin_port; + sermud_accept_get_remote_ip(&accept_addr, &remote_ip, is_tcp_v6_flag); pthread_t *tid = (pthread_t *)malloc(sizeof(pthread_t)); struct ServerMudWorker *worker = (struct ServerMudWorker *)malloc(sizeof(struct ServerMudWorker)); @@ -169,26 +209,50 @@ int32_t sermud_listener_accept_connects(struct ServerMud *server_mud) worker->epevs = (struct epoll_event *)malloc(sizeof(struct epoll_event)); worker->recv_bytes = 0; worker->pktlen = server_mud->pktlen; - worker->ip = accept_addr.sin_addr.s_addr; - worker->port = accept_addr.sin_port; + worker->ip = remote_ip; + worker->port = remote_port; worker->api = server_mud->api; worker->debug = server_mud->debug; worker->next = server_mud->workers; worker->epollcreate = server_mud->epollcreate; + worker->worker.is_v6 = is_tcp_v6_flag ? 1 : 0; + worker->domain = server_mud->domain; + worker->curr_connect = 1; server_mud->workers = worker; - if (pthread_create(tid, NULL, sermud_worker_create_and_run, server_mud) < 0) { + if (pthread_create(tid, NULL, sermud_worker_create_and_run, worker) < 0) { PRINT_ERROR("server can't create poisx thread %d! ", errno); return PROGRAM_FAULT; } - server_debug_print("server mud listener", "accept", accept_addr.sin_addr.s_addr, accept_addr.sin_port, server_mud->debug); + server_debug_print("server mud listener", "accept", &remote_ip, remote_port, server_mud->debug); } return PROGRAM_OK; } +static int32_t server_handler_close(int32_t epfd, struct ServerHandler *server_handler) +{ + int32_t fd = server_handler->fd; + struct epoll_event ep_ev; + if (server_handler) { + free(server_handler); + } + + if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ep_ev) < 0) { + PRINT_ERROR("server can't delete socket '%d' to control epoll %d! ", fd, errno); + return PROGRAM_FAULT; + } + + if (close(fd) < 0) { + PRINT_ERROR("server can't close the socket %d! ", errno); + return PROGRAM_FAULT; + } + + return 0; +} + // the worker thread, unblock, dissymmetric server processes the events int32_t sermud_worker_proc_epevs(struct ServerMudWorker *worker_unit, const char* domain) { @@ -201,32 +265,60 @@ int32_t sermud_worker_proc_epevs(struct ServerMudWorker *worker_unit, const char for (int32_t i = 0; i < epoll_nfds; ++i) { struct epoll_event *curr_epev = worker_unit->epevs + i; - if (curr_epev->events == EPOLLERR || curr_epev->events == EPOLLHUP || curr_epev->events == EPOLLRDHUP) { + if (curr_epev->events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { + worker_unit->curr_connect--; PRINT_ERROR("server epoll wait error %d! ", curr_epev->events); - return PROGRAM_FAULT; + if (server_handler_close(worker_unit->epfd, (struct ServerHandler *)curr_epev->data.ptr) != 0) { + return PROGRAM_FAULT; + } } if (curr_epev->events == EPOLLIN) { struct ServerHandler *server_handler = (struct ServerHandler *)curr_epev->data.ptr; - int32_t server_ans_ret = server_ans(server_handler, worker_unit->pktlen, worker_unit->api, domain); + int32_t server_ans_ret = server_ans(server_handler->fd, worker_unit->pktlen, worker_unit->api, "tcp"); if (server_ans_ret == PROGRAM_FAULT) { - struct epoll_event ep_ev; - if (epoll_ctl(worker_unit->epfd, EPOLL_CTL_DEL, server_handler->fd, &ep_ev) < 0) { - PRINT_ERROR("server can't delete socket '%d' to control epoll %d! ", server_handler->fd, errno); + worker_unit->curr_connect--; + if (server_handler_close(worker_unit->epfd, server_handler) != 0) { return PROGRAM_FAULT; } } else if (server_ans_ret == PROGRAM_ABORT) { - if (close(server_handler->fd) < 0) { - PRINT_ERROR("server can't close the socket %d! ", errno); + worker_unit->curr_connect--; + server_debug_print("server mud worker", "close", &worker_unit->ip, worker_unit->port, worker_unit->debug); + if (server_handler_close(worker_unit->epfd, server_handler) != 0) { return PROGRAM_FAULT; } - server_debug_print("server mud worker", "close", worker_unit->ip, worker_unit->port, worker_unit->debug); } else { worker_unit->recv_bytes += worker_unit->pktlen; - server_debug_print("server mud worker", "receive", worker_unit->ip, worker_unit->port, worker_unit->debug); + server_debug_print("server mud worker", "receive", &worker_unit->ip, worker_unit->port, worker_unit->debug); + } + } + } + + return PROGRAM_OK; +} + +static int32_t sermud_process_epollin_event(struct epoll_event *curr_epev, struct ServerMud *server_mud) +{ + struct ServerHandler *server_handler = (struct ServerHandler *)curr_epev->data.ptr; + + if (server_handler->fd == server_mud->listener.listen_fd_array[V4_UDP] || + server_handler->fd == server_mud->listener.listen_fd_array[UDP_MULTICAST]) { + uint32_t pktlen = server_mud->pktlen > UDP_PKTLEN_MAX ? UDP_PKTLEN_MAX : server_mud->pktlen; + int32_t server_ans_ret = server_ans(server_handler->fd, pktlen, server_mud->api, "udp"); + if (server_ans_ret != PROGRAM_OK) { + if (server_handler_close(server_mud->epfd, server_handler) != 0) { + PRINT_ERROR("server_handler_close server_ans_ret %d! \n", server_ans_ret); + return PROGRAM_FAULT; } } + server_mud->workers->recv_bytes += pktlen; + } else { + int32_t sermud_listener_accept_connects_ret = sermud_listener_accept_connects(curr_epev, server_mud); + if (sermud_listener_accept_connects_ret < 0) { + PRINT_ERROR("server try accept error %d! ", sermud_listener_accept_connects_ret); + return PROGRAM_FAULT; + } } return PROGRAM_OK; @@ -244,15 +336,14 @@ int32_t sermud_listener_proc_epevs(struct ServerMud *server_mud) for (int32_t i = 0; i < epoll_nfds; ++i) { struct epoll_event *curr_epev = server_mud->epevs + i; - if (curr_epev->events == EPOLLERR || curr_epev->events == EPOLLHUP || curr_epev->events == EPOLLRDHUP) { + if (curr_epev->events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { PRINT_ERROR("server epoll wait error %d! ", curr_epev->events); - return PROGRAM_FAULT; + server_handler_close(server_mud->epfd, (struct ServerHandler *)curr_epev->data.ptr); + return PROGRAM_OK; } if (curr_epev->events == EPOLLIN) { - int32_t sermud_listener_accept_connects_ret = sermud_listener_accept_connects(server_mud); - if (sermud_listener_accept_connects_ret < 0) { - PRINT_ERROR("server try accept error %d! ", sermud_listener_accept_connects_ret); + if (sermud_process_epollin_event(curr_epev, server_mud) < 0) { return PROGRAM_FAULT; } } @@ -266,15 +357,15 @@ void *sermud_worker_create_and_run(void *arg) { pthread_detach(pthread_self()); - struct ServerMudWorker *worker_unit = ((struct ServerMud *)arg)->workers; - char* domain = ((struct ServerMud *)arg)->domain; + struct ServerMudWorker *worker_unit = (struct ServerMudWorker *)arg; + char *domain = worker_unit->domain; if (sermud_worker_create_epfd_and_reg(worker_unit) < 0) { - exit(PROGRAM_FAULT); + return (void *)PROGRAM_OK; } while (true) { if (sermud_worker_proc_epevs(worker_unit, domain) < 0) { - exit(PROGRAM_FAULT); + return (void *)PROGRAM_OK; } } @@ -284,26 +375,60 @@ void *sermud_worker_create_and_run(void *arg) return (void *)PROGRAM_OK; } +void sermud_memory_recycle(struct ServerMud *server_mud) +{ + // recycle mem of epevs + if (server_mud->epevs) { + free(server_mud->epevs); + } + struct ServerMudWorker *head = server_mud->workers; + while (head) { + if (head->epevs) { + free(head->epevs); + } + struct ServerMudWorker *next = head->next; + free(head); + head = next; + } +} + // create the listener thread, unblock, dissymmetric server and run void *sermud_listener_create_and_run(void *arg) { struct ServerMud *server_mud = (struct ServerMud *)arg; - if (create_socket_and_listen(&(server_mud->listener.fd), server_mud->ip, server_mud->groupip, server_mud->port, server_mud->domain) < 0) { - exit(PROGRAM_FAULT); + uint32_t port = 0; + for (; port < UNIX_TCP_PORT_MAX; port++) { + if ((server_mud->port)[port]) { + if (create_socket_and_listen(server_mud->listener.listen_fd_array, &(server_mud->server_ip_info), + htons(port), server_mud->protocol_type_mode) < 0) { + PRINT_ERROR("create_socket_and_listen err"); + sermud_memory_recycle(server_mud); + exit(PROGRAM_FAULT); + } + } } + if (sermud_listener_create_epfd_and_reg(server_mud) < 0) { - exit(PROGRAM_FAULT); + sermud_memory_recycle(server_mud); + exit(PROGRAM_FAULT); } while (true) { if (sermud_listener_proc_epevs(server_mud) < 0) { + sermud_memory_recycle(server_mud); exit(PROGRAM_FAULT); } } - if (close(server_mud->listener.fd) < 0 || close(server_mud->epfd) < 0) { - exit(PROGRAM_FAULT); - } + for (int i = 0; i < PROTOCOL_MODE_MAX; i++) { + if (server_mud->listener.listen_fd_array[i] == -1) + continue; + if (close(server_mud->listener.listen_fd_array[i]) < 0) { + sermud_memory_recycle(server_mud); + exit(PROGRAM_FAULT); + } + } + sermud_memory_recycle(server_mud); return (void *)PROGRAM_OK; } @@ -319,19 +444,44 @@ int32_t sermud_create_and_run(struct ProgramParams *params) } server_mud->listener.fd = -1; - server_mud->workers = NULL; + for (int32_t i = 0; i < PROTOCOL_MODE_MAX; i++) { + server_mud->listener.listen_fd_array[i] = -1; + } + + struct ServerMudWorker *workers = (struct ServerMudWorker *)malloc(sizeof(struct ServerMudWorker)); + if (workers == NULL) { + PRINT_ERROR("malloc truct ServerMudWorker failed "); + return PROGRAM_FAULT; + } + memset_s(workers, sizeof(struct ServerMudWorker), 0, sizeof(struct ServerMudWorker)); + workers->next = NULL; + server_mud->workers = workers; + server_mud->epfd = -1; server_mud->epevs = (struct epoll_event *)malloc(SERVER_EPOLL_SIZE_MAX * sizeof(struct epoll_event)); - server_mud->curr_connect = 0; - server_mud->ip = inet_addr(params->ip); - server_mud->groupip = inet_addr(params->groupip); - server_mud->port = htons(params->port); + server_mud->server_ip_info.ip.addr_family = params->addr_family; + + inet_pton(AF_INET, params->ip, &server_mud->server_ip_info.ip.u_addr.ip4); + inet_pton(AF_INET6, params->ipv6, &server_mud->server_ip_info.ip.u_addr.ip6); + + server_mud->server_ip_info.groupip.addr_family = params->addr_family; + inet_pton(AF_INET, params->groupip, &server_mud->server_ip_info.groupip.u_addr); + + server_mud->server_ip_info.groupip_interface.addr_family = params->addr_family; + inet_pton(AF_INET, params->groupip_interface, &server_mud->server_ip_info.groupip_interface.u_addr); + + server_mud->port = params->port; server_mud->pktlen = params->pktlen; - server_mud->domain = params->domain; + + server_mud->protocol_type_mode = program_get_protocol_mode_by_domain_ip(params->domain, params->ip, params->ipv6, + params->groupip); + server_mud->api = params->api; server_mud->debug = params->debug; server_mud->epollcreate = params->epollcreate; server_mud->accept = params->accept; + server_mud->tcp_keepalive_idle = params->tcp_keepalive_idle; + server_mud->tcp_keepalive_interval = params->tcp_keepalive_interval; if (pthread_create(tid, NULL, sermud_listener_create_and_run, server_mud) < 0) { PRINT_ERROR("server can't create poisx thread %d! ", errno); @@ -341,10 +491,17 @@ int32_t sermud_create_and_run(struct ProgramParams *params) if (server_mud->debug == false) { printf("[program informations]: \n\n"); } - while (true) { - sermud_info_print(server_mud); + + if (strcmp(params->as, "server") == 0) { + while (true) { + sermud_info_print(server_mud); + } + } else if (strcmp(params->as, "loop") == 0) { + loopmod.model = params->model; + loopmod.server_mud_info = server_mud; } + pthread_mutex_destroy(&server_debug_mutex); return PROGRAM_OK; @@ -413,39 +570,62 @@ int32_t sersum_create_epfd_and_reg(struct ServerMumUnit *server_unit) return PROGRAM_FAULT; } - struct epoll_event ep_ev; - ep_ev.data.ptr = (void *)&(server_unit->listener); + struct epoll_event ep_ev = {0}; ep_ev.events = EPOLLIN | EPOLLET; - if (epoll_ctl(server_unit->epfd, EPOLL_CTL_ADD, server_unit->listener.fd, &ep_ev) < 0) { - PRINT_ERROR("server can't control epoll %d! ", errno); - return PROGRAM_FAULT; + + for (int32_t i = 0; i < PROTOCOL_MODE_MAX; i++) { + if (server_unit->listener.listen_fd_array[i] != -1) { + struct ServerHandler *server_handler = (struct ServerHandler *)malloc(sizeof(struct ServerHandler)); + memset_s(server_handler, sizeof(struct ServerHandler), 0, sizeof(struct ServerHandler)); + server_handler->fd = server_unit->listener.listen_fd_array[i]; + + ep_ev.data.ptr = (void *)server_handler; + if (epoll_ctl(server_unit->epfd, EPOLL_CTL_ADD, server_unit->listener.listen_fd_array[i], &ep_ev) < 0) { + PRINT_ERROR("epoll_ctl failed %d! listen_fd=%d ", errno, server_unit->listener.listen_fd_array[i]); + return PROGRAM_FAULT; + } + } } - server_debug_print("server mum unit", "waiting", server_unit->ip, server_unit->port, server_unit->debug); + server_debug_print("server mum unit", "waiting", &server_unit->server_ip_info.ip, server_unit->port, + server_unit->debug); return PROGRAM_OK; } // the single thread, unblock, mutliplexing IO server accepts the connections -int32_t sersum_accept_connects(struct ServerMumUnit *server_unit, struct ServerHandler *server_handler) +int32_t sersum_accept_connects(struct epoll_event *cur_epev, struct ServerMumUnit *server_unit) { + fault_inject_delay(INJECT_DELAY_ACCEPT); + int32_t fd = ((struct ServerHandler*)(cur_epev->data.ptr))->fd; while (true) { - struct sockaddr_in accept_addr; - uint32_t sockaddr_in_len = sizeof(struct sockaddr_in); + sockaddr_t accept_addr; + bool is_tcp_v6 = (fd == (server_unit->listener.listen_fd_array[V6_TCP])) ? true : false; + + socklen_t sockaddr_in_len = is_tcp_v6 ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in); int32_t accept_fd; - if (strcmp(server_unit->domain, "udp") == 0) { - break; - } + int32_t ret = 0; + + int32_t listen_index = (is_tcp_v6) ? V6_TCP : V4_TCP; + int32_t listen_fd = server_unit->listener.listen_fd_array[listen_index]; if (strcmp(server_unit->accept, "ac4") == 0) { - accept_fd = accept4(server_unit->listener.fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len, SOCK_CLOEXEC); + accept_fd = accept4(listen_fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len, SOCK_CLOEXEC); } else { - accept_fd = accept(server_unit->listener.fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len); + accept_fd = accept(listen_fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len); } - + if (accept_fd < 0) { + if (errno != EWOULDBLOCK && errno != EAGAIN){ + PRINT_ERROR("accept_fd=%d , errno=%d ", accept_fd, errno); + } break; } + ret = set_tcp_keep_alive_info(accept_fd, server_unit->tcp_keepalive_idle, server_unit->tcp_keepalive_interval); + if (ret < 0) { + PRINT_ERROR("set_tcp_keep_alive_info ret=%d \n", ret); + return PROGRAM_FAULT; + } if (set_socket_unblock(accept_fd) < 0) { PRINT_ERROR("server can't set the connect socket to unblock! "); @@ -454,6 +634,8 @@ int32_t sersum_accept_connects(struct ServerMumUnit *server_unit, struct ServerH struct ServerHandler *server_handler = (struct ServerHandler *)malloc(sizeof(struct ServerHandler)); server_handler->fd = accept_fd; + server_handler->is_v6 = (is_tcp_v6) ? 1 : 0; + struct epoll_event ep_ev; ep_ev.data.ptr = (void *)server_handler; ep_ev.events = EPOLLIN | EPOLLET; @@ -463,13 +645,98 @@ int32_t sersum_accept_connects(struct ServerMumUnit *server_unit, struct ServerH } ++server_unit->curr_connect; - - server_debug_print("server mum unit", "accept", accept_addr.sin_addr.s_addr, accept_addr.sin_port, server_unit->debug); + + // sockaddr tp ip, port + ip_addr_t remote_ip; + uint16_t remote_port = ((struct sockaddr_in*)&accept_addr)->sin_port; + remote_ip.addr_family = (is_tcp_v6) ? AF_INET6 : AF_INET; + if (is_tcp_v6 == false) { + remote_ip.u_addr.ip4 = ((struct sockaddr_in *)&accept_addr)->sin_addr; + } else { + remote_ip.u_addr.ip6 = ((struct sockaddr_in6 *)&accept_addr)->sin6_addr; + } + + server_debug_print("server mum unit", "accept", &remote_ip, remote_port, server_unit->debug); } return PROGRAM_OK; } +static int sersum_get_remote_ip(struct ServerHandler *server_handler, ip_addr_t *remote_ip, uint16_t *remote_port) +{ + sockaddr_t connect_addr; + socklen_t connect_addr_len = server_handler->is_v6 == 0 ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + if (getpeername(server_handler->fd, (struct sockaddr *)&connect_addr, &connect_addr_len) < 0) { + PRINT_ERROR("server can't socket peername %d! ", errno); + return PROGRAM_ABORT; + } + + *remote_port = ((struct sockaddr_in *)&connect_addr)->sin_port; + if (((struct sockaddr *)&connect_addr)->sa_family == AF_INET) { + remote_ip->addr_family = AF_INET; + remote_ip->u_addr.ip4 = ((struct sockaddr_in *)&connect_addr)->sin_addr; + } else if (((struct sockaddr *)&connect_addr)->sa_family == AF_INET6) { + remote_ip->addr_family = AF_INET6; + remote_ip->u_addr.ip6 = ((struct sockaddr_in6 *)&connect_addr)->sin6_addr; + } + return PROGRAM_OK; +} + +static int sersum_process_tcp_accept_event(struct ServerMumUnit *server_unit, struct epoll_event *curr_epev) +{ + struct ServerHandler *server_handler = (struct ServerHandler *)curr_epev->data.ptr; + ip_addr_t remote_ip; + uint16_t remote_port; + + if (sersum_get_remote_ip(server_handler, &remote_ip, &remote_port) != PROGRAM_OK) { + return PROGRAM_ABORT; + } + + int32_t server_ans_ret = server_ans(server_handler->fd, server_unit->pktlen, server_unit->api, "tcp"); + if (server_ans_ret == PROGRAM_FAULT) { + --server_unit->curr_connect; + server_handler_close(server_unit->epfd, server_handler); + } else if (server_ans_ret == PROGRAM_ABORT) { + --server_unit->curr_connect; + server_debug_print("server mum unit", "close", &remote_ip, remote_port, server_unit->debug); + server_handler_close(server_unit->epfd, server_handler); + } else { + server_unit->recv_bytes += server_unit->pktlen; + server_debug_print("server mum unit", "receive", &remote_ip, remote_port, server_unit->debug); + } + return PROGRAM_OK; +} + +static int sersum_process_epollin_event(struct ServerMumUnit *server_unit, struct epoll_event *curr_epev) +{ + struct ServerHandler *server_handler = (struct ServerHandler *)curr_epev->data.ptr; + int32_t fd = server_handler->fd; + if (fd == (server_unit->listener.listen_fd_array[V4_TCP]) || + fd == (server_unit->listener.listen_fd_array[V6_TCP])) { + int32_t sersum_accept_connects_ret = sersum_accept_connects(curr_epev, server_unit); + if (sersum_accept_connects_ret < 0) { + PRINT_ERROR("server try accept error %d! ", sersum_accept_connects_ret); + return PROGRAM_ABORT; + } + } else if (fd == (server_unit->listener.listen_fd_array[V4_UDP]) || + fd == (server_unit->listener.listen_fd_array[UDP_MULTICAST])) { + uint32_t pktlen = server_unit->pktlen > UDP_PKTLEN_MAX ? UDP_PKTLEN_MAX : server_unit->pktlen; + int32_t server_ans_ret = server_ans(fd, pktlen, server_unit->api, "udp"); + if (server_ans_ret != PROGRAM_OK) { + if (server_handler_close(server_unit->epfd, server_handler) != 0) { + PRINT_ERROR("server_handler_close ret %d! \n", server_ans_ret); + return PROGRAM_ABORT; + } + } + server_unit->recv_bytes += pktlen; + } else { + if (sersum_process_tcp_accept_event(server_unit, curr_epev) != PROGRAM_OK) { + return PROGRAM_ABORT; + } + } + return PROGRAM_OK; +} + // the single thread, unblock, mutliplexing IO server processes the events int32_t sersum_proc_epevs(struct ServerMumUnit *server_unit) { @@ -482,47 +749,16 @@ int32_t sersum_proc_epevs(struct ServerMumUnit *server_unit) for (int32_t i = 0; i < epoll_nfds; ++i) { struct epoll_event *curr_epev = server_unit->epevs + i; - if (curr_epev->events == EPOLLERR || curr_epev->events == EPOLLHUP || curr_epev->events == EPOLLRDHUP) { - PRINT_ERROR("server epoll wait error %d! ", curr_epev->events); - return PROGRAM_FAULT; + if (curr_epev->events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { + server_unit->curr_connect--; + if (server_handler_close(server_unit->epfd, (struct ServerHandler *)curr_epev->data.ptr) != 0) { + return PROGRAM_OK; + } } if (curr_epev->events == EPOLLIN) { - if (curr_epev->data.ptr == (void *)&(server_unit->listener) && strcmp(server_unit->domain, "udp") != 0) { - int32_t sersum_accept_connects_ret = sersum_accept_connects(server_unit, &(server_unit->listener)); - if (sersum_accept_connects_ret < 0) { - PRINT_ERROR("server try accept error %d! ", sersum_accept_connects_ret); - return PROGRAM_FAULT; - } - continue; - } else { - struct ServerHandler *server_handler = (struct ServerHandler *)curr_epev->data.ptr; - struct sockaddr_in connect_addr; - socklen_t connect_addr_len = sizeof(connect_addr); - if (strcmp(server_unit->domain, "udp") != 0 && getpeername(server_handler->fd, (struct sockaddr *)&connect_addr, &connect_addr_len) < 0) { - PRINT_ERROR("server can't socket peername %d! ", errno); - return PROGRAM_FAULT; - } - - int32_t server_ans_ret = server_ans(server_handler, server_unit->pktlen, server_unit->api, server_unit->domain); - if (server_ans_ret == PROGRAM_FAULT) { - --server_unit->curr_connect; - struct epoll_event ep_ev; - if (epoll_ctl(server_unit->epfd, EPOLL_CTL_DEL, server_handler->fd, &ep_ev) < 0) { - PRINT_ERROR("server can't delete socket '%d' to control epoll %d! ", server_handler->fd, errno); - return PROGRAM_FAULT; - } - } else if (server_ans_ret == PROGRAM_ABORT) { - --server_unit->curr_connect; - if (close(server_handler->fd) < 0) { - PRINT_ERROR("server can't close the socket %d! ", errno); - return PROGRAM_FAULT; - } - server_debug_print("server mum unit", "close", connect_addr.sin_addr.s_addr, connect_addr.sin_port, server_unit->debug); - } else { - server_unit->recv_bytes += server_unit->pktlen; - server_debug_print("server mum unit", "receive", connect_addr.sin_addr.s_addr, connect_addr.sin_port, server_unit->debug); - } + if (sersum_process_epollin_event(server_unit, curr_epev) != PROGRAM_OK) { + return PROGRAM_ABORT; } } } @@ -535,7 +771,9 @@ void *sersum_create_and_run(void *arg) { struct ServerMumUnit *server_unit = (struct ServerMumUnit *)arg; - if (create_socket_and_listen(&(server_unit->listener.fd), server_unit->ip, server_unit->groupip, server_unit->port, server_unit->domain) < 0) { + if (create_socket_and_listen(server_unit->listener.listen_fd_array, &(server_unit->server_ip_info), + server_unit->port, server_unit->protocol_type_mode) < 0) { + PRINT_ERROR("create_socket_and_listen err! \n"); exit(PROGRAM_FAULT); } if (sersum_create_epfd_and_reg(server_unit) < 0) { @@ -560,6 +798,7 @@ int32_t sermum_create_and_run(struct ProgramParams *params) pthread_t *tids = (pthread_t *)malloc(thread_num * sizeof(pthread_t)); struct ServerMum *server_mum = (struct ServerMum *)malloc(sizeof(struct ServerMum)); struct ServerMumUnit *server_unit = (struct ServerMumUnit *)malloc(sizeof(struct ServerMumUnit)); + memset_s(server_unit, sizeof(struct ServerMumUnit), 0, sizeof(struct ServerMumUnit)); if (pthread_mutex_init(&server_debug_mutex, NULL) < 0) { PRINT_ERROR("server can't init posix mutex %d! ", errno); @@ -568,22 +807,47 @@ int32_t sermum_create_and_run(struct ProgramParams *params) server_mum->uints = server_unit; server_mum->debug = params->debug; + uint32_t port = UNIX_TCP_PORT_MIN; for (uint32_t i = 0; i < thread_num; ++i) { server_unit->listener.fd = -1; + for (int32_t i = 0; i < PROTOCOL_MODE_MAX; i++) { + server_unit->listener.listen_fd_array[i] = -1; + } server_unit->epfd = -1; server_unit->epevs = (struct epoll_event *)malloc(SERVER_EPOLL_SIZE_MAX * sizeof(struct epoll_event)); server_unit->curr_connect = 0; server_unit->recv_bytes = 0; - server_unit->ip = inet_addr(params->ip); - server_unit->groupip = inet_addr(params->groupip); - server_unit->port = htons(params->port); + server_unit->server_ip_info.ip.addr_family = params->addr_family; + inet_pton(AF_INET, params->ip, &server_unit->server_ip_info.ip.u_addr.ip4); + inet_pton(AF_INET6, params->ipv6, &server_unit->server_ip_info.ip.u_addr.ip6); + + server_unit->server_ip_info.groupip.addr_family = AF_INET; + inet_pton(AF_INET, params->groupip, &server_unit->server_ip_info.groupip.u_addr); + + server_unit->server_ip_info.groupip_interface.addr_family = AF_INET; + inet_pton(AF_INET, params->groupip_interface, &server_unit->server_ip_info.groupip_interface.u_addr); + + /* loop to set ports to each server_mums */ + while (!((params->port)[port])) { + port = (port + 1) % UNIX_TCP_PORT_MAX; + } + server_unit->port = htons(port++); server_unit->pktlen = params->pktlen; - server_unit->domain = params->domain; + + server_unit->protocol_type_mode = program_get_protocol_mode_by_domain_ip(params->domain, params->ip, + params->ipv6, params->groupip); + + // Create multicast sockets only on the first thread + if (i != 0) { + server_unit->protocol_type_mode = setbitnum_off(server_unit->protocol_type_mode, UDP_MULTICAST); + } server_unit->api = params->api; server_unit->debug = params->debug; server_unit->epollcreate = params->epollcreate; server_unit->accept = params->accept; + server_unit->tcp_keepalive_idle = params->tcp_keepalive_idle; + server_unit->tcp_keepalive_interval = params->tcp_keepalive_interval; server_unit->next = (struct ServerMumUnit *)malloc(sizeof(struct ServerMumUnit)); if (server_unit->next) { memset_s(server_unit->next, sizeof(struct ServerMumUnit), 0, sizeof(struct ServerMumUnit)); @@ -599,8 +863,14 @@ int32_t sermum_create_and_run(struct ProgramParams *params) if (server_mum->debug == false) { printf("[program informations]: \n\n"); } - while (true) { - sermum_info_print(server_mum); + + if (strcmp(params->as, "server") == 0) { + while (true) { + sermum_info_print(server_mum); + } + } else if (strcmp(params->as, "loop") == 0) { + loopmod.model = params->model; + loopmod.server_mum_info = server_mum; } pthread_mutex_destroy(&server_debug_mutex); diff --git a/examples/src/utilities.c b/examples/src/utilities.c index 7247b44..59d8bea 100644 --- a/examples/src/utilities.c +++ b/examples/src/utilities.c @@ -11,35 +11,215 @@ */ -#include "utilities.h" +#include "parameter.h" +int32_t set_tcp_keep_alive_info(int32_t sockfd, int32_t tcp_keepalive_idle, int32_t tcp_keepalive_interval) +{ + int32_t ret = 0; + int32_t keep_alive = 1; + int32_t keep_idle = 1; + int32_t keep_interval = 1; + + if ((tcp_keepalive_idle == PARAM_DEFAULT_KEEPALIVEIDLE) || + (tcp_keepalive_interval == PARAM_DEFAULT_KEEPALIVEIDLE)) { + return 0; + } + + keep_idle = tcp_keepalive_idle; + ret = setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&keep_alive, sizeof(keep_alive)); + if (ret != 0) { + PRINT_ERROR("setsockopt keep_alive err ret=%d \n", ret); + return ret; + } + + ret = setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (void *)&keep_idle, sizeof(keep_idle)); + if (ret != 0) { + PRINT_ERROR("setsockopt keep_idle err ret=%d \n", ret); + return ret; + } + + keep_interval = tcp_keepalive_interval; + ret = setsockopt(sockfd, SOL_TCP, TCP_KEEPINTVL, (void *)&keep_interval, sizeof(keep_interval)); + if (ret != 0) { + PRINT_ERROR("setsockopt keep_interval err ret=%d \n", ret); + return ret; + } + return ret; +} + +static int32_t process_unix_fd(int32_t *socket_fd, int32_t *listen_fd_array) +{ + struct sockaddr_un socket_addr; + int32_t fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) { + PRINT_ERROR("can't create socket %d! ", errno); + return PROGRAM_FAULT; + } + *socket_fd = fd; + + unlink(SOCKET_UNIX_DOMAIN_FILE); + socket_addr.sun_family = AF_UNIX; + strcpy_s(socket_addr.sun_path, sizeof(socket_addr.sun_path), SOCKET_UNIX_DOMAIN_FILE); + if (bind(*socket_fd, (struct sockaddr *)&socket_addr, sizeof(struct sockaddr_un)) < 0) { + PRINT_ERROR("can't bind the address to socket %d! ", errno); + return PROGRAM_FAULT; + } + + if (listen(*socket_fd, SERVER_SOCKET_LISTEN_BACKLOG) < 0) { + PRINT_ERROR("server socket can't lisiten %d! ", errno); + return PROGRAM_FAULT; + } + return PROGRAM_OK; +} + +static int32_t process_udp_groupip(int32_t fd, ip_addr_t *ip, ip_addr_t *groupip, sockaddr_t *socker_add_info, + ip_addr_t *groupip_interface) +{ + struct ip_mreq mreq; + if (groupip->u_addr.ip4.s_addr) { + mreq.imr_multiaddr = groupip->u_addr.ip4; + if (groupip_interface->u_addr.ip4.s_addr) { + mreq.imr_interface = groupip_interface->u_addr.ip4; + } else { + mreq.imr_interface = ip->u_addr.ip4; + } + + if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(struct ip_mreq)) == -1) { + PRINT_ERROR("can't set the address to group %d! ", errno); + return PROGRAM_FAULT; + } + ((struct sockaddr_in *)socker_add_info)->sin_addr = groupip->u_addr.ip4; + return PROGRAM_OK; + } + return PROGRAM_OK; +} + +static int32_t server_create_sock(uint8_t protocol_mode, int32_t* fd_arry) +{ + bool ret = true; + for (int32_t i = 0; i < PROTOCOL_MODE_MAX; i++) { + if (getbit_num(protocol_mode, i) == 0) + continue; + if (i == V4_TCP) { + fd_arry[i] = socket(AF_INET, SOCK_STREAM, 0); + } else if (i == V6_TCP) { + fd_arry[i] = socket(AF_INET6, SOCK_STREAM, 0); + } else if (i == V4_UDP) { + fd_arry[i] = socket(AF_INET, SOCK_DGRAM, 0); + } else if (i == UDP_MULTICAST) { + fd_arry[i] = socket(AF_INET, SOCK_DGRAM, 0); + } else { + continue; + } + if (fd_arry[i] < 0) { + PRINT_ERROR("can't create socket type=%d errno=%d! ", i, errno); + ret = false; + break; + } + } + + if (ret == false) { + for (int32_t i = 0; i< PROTOCOL_MODE_MAX; i++) { + if (fd_arry[i] > 0) { + close(fd_arry[i]); + } + } + return PROGRAM_FAULT; + } + return PROGRAM_OK; +} + +static int32_t socket_add_info_init(int32_t idx, uint16_t port, struct ServerIpInfo *server_ip_info, + sockaddr_t *socker_add_info, int32_t *listen_fd_array) +{ + ip_addr_t *ip = &(server_ip_info->ip); + ip_addr_t *groupip = &(server_ip_info->groupip); + ip_addr_t *groupip_interface = &(server_ip_info->groupip_interface); + + uint32_t len = ((idx == V4_TCP || idx == V4_UDP || idx == UDP_MULTICAST) ? + sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)); + memset_s(socker_add_info, len, 0, len); + + if (idx == V4_TCP || idx == V4_UDP) { + ((struct sockaddr_in *)socker_add_info)->sin_addr = ip->u_addr.ip4; + } else if (idx == V6_TCP) { + ((struct sockaddr_in6 *)socker_add_info)->sin6_addr = ip->u_addr.ip6; + } else if (idx == UDP_MULTICAST) { + if (process_udp_groupip(listen_fd_array[idx], ip, groupip, socker_add_info, groupip_interface) != PROGRAM_OK) { + return PROGRAM_FAULT; + } + } + + ((struct sockaddr *)socker_add_info)->sa_family = ((idx == V4_TCP || idx == V4_UDP || idx == UDP_MULTICAST) ? + AF_INET : AF_INET6); + ((struct sockaddr_in *)socker_add_info)->sin_port = port; + return PROGRAM_OK; +} // create the socket and listen -int32_t create_socket_and_listen(int32_t *socket_fd, in_addr_t ip, in_addr_t groupip, uint16_t port, const char *domain) +int32_t create_socket_and_listen(int32_t *listen_fd_array, struct ServerIpInfo *server_ip_info, + uint16_t port, uint8_t protocol_mode) { - if (strcmp(domain, "tcp") == 0) { - *socket_fd = socket(AF_INET, SOCK_STREAM, 0); - if (*socket_fd < 0) { - PRINT_ERROR("can't create socket %d! ", errno); + int32_t port_multi = 1; + uint32_t len = 0; + sockaddr_t socker_add_info; + + if (getbit_num(protocol_mode, UNIX) == 1) { + if (process_unix_fd(&listen_fd_array[UNIX], listen_fd_array) != PROGRAM_OK) { return PROGRAM_FAULT; } - } else if (strcmp(domain, "unix") == 0) { - *socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (*socket_fd < 0) { - PRINT_ERROR("can't create socket %d! ", errno); + return PROGRAM_OK; + } + + if (server_create_sock(protocol_mode, listen_fd_array) != PROGRAM_OK) { + return PROGRAM_FAULT; + } + + for (int32_t i = 0;i< PROTOCOL_MODE_MAX; i++) { + if (listen_fd_array[i] <= 0) + continue; + if (setsockopt(listen_fd_array[i], SOL_SOCKET, SO_REUSEPORT, (void *)&port_multi, sizeof(int32_t)) < 0) { + PRINT_ERROR("can't set the option of socket %d! ", errno); return PROGRAM_FAULT; } - } else if (strcmp(domain, "udp") == 0) { - *socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - if (*socket_fd < 0) { - PRINT_ERROR("can't create socket %d! ", errno); + if (set_socket_unblock(listen_fd_array[i]) < 0) { + PRINT_ERROR("can't set the socket to unblock! "); + return PROGRAM_FAULT; + } + if (socket_add_info_init(i, port, server_ip_info, &socker_add_info, listen_fd_array) != PROGRAM_OK) { + return PROGRAM_FAULT; + } + + len = ((i == V4_TCP || i == V4_UDP || i == UDP_MULTICAST) ? + sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)); + + if (bind(listen_fd_array[i], (struct sockaddr *)&socker_add_info, len) < 0) { + PRINT_ERROR("can't bind the address %d!, i=%d, listen_fd_array[i]=%d ", errno, i, listen_fd_array[i]); return PROGRAM_FAULT; } + + if (i == V4_TCP || i == V6_TCP) { + if (listen(listen_fd_array[i], SERVER_SOCKET_LISTEN_BACKLOG) < 0) { + PRINT_ERROR("server socket can't lisiten %d! ", errno); + return PROGRAM_FAULT; + } + } } + return PROGRAM_OK; +} - int32_t port_multi = 1; - if (setsockopt(*socket_fd, SOL_SOCKET, SO_REUSEPORT, (void *)&port_multi, sizeof(int32_t)) < 0) { - PRINT_ERROR("can't set the option of socket %d! ", errno); +static int32_t creat_socket_init(int32_t *socket_fd, struct ClientUnit *client_unit, sockaddr_t *server_addr) +{ + ip_addr_t *ip = &client_unit->ip; + const char *domain = client_unit->domain; + + if (strcmp(domain, "tcp") == 0) { + *socket_fd = socket(ip->addr_family, SOCK_STREAM, 0); + } else { + *socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + } + if (*socket_fd < 0) { + PRINT_ERROR("client can't create socket %d! ", errno); return PROGRAM_FAULT; } @@ -48,106 +228,118 @@ int32_t create_socket_and_listen(int32_t *socket_fd, in_addr_t ip, in_addr_t gro return PROGRAM_FAULT; } - if (strcmp(domain, "tcp") == 0) { - struct sockaddr_in socket_addr; - memset_s(&socket_addr, sizeof(socket_addr), 0, sizeof(socket_addr)); - socket_addr.sin_family = AF_INET; - socket_addr.sin_addr.s_addr = ip; - socket_addr.sin_port = port; - if (bind(*socket_fd, (struct sockaddr *)&socket_addr, sizeof(struct sockaddr_in)) < 0) { - PRINT_ERROR("can't bind the address to socket %d! ", errno); - return PROGRAM_FAULT; - } + ((struct sockaddr *)server_addr)->sa_family = ip->addr_family; - if (listen(*socket_fd, SERVER_SOCKET_LISTEN_BACKLOG) < 0) { - PRINT_ERROR("server socket can't lisiten %d! ", errno); - return PROGRAM_FAULT; + return PROGRAM_OK; +} + +static int32_t pocess_connect_sport(int32_t *socket_fd, struct ClientUnit *client_unit, sockaddr_t *server_addr) +{ + uint16_t sport = client_unit->sport; + ip_addr_t *ip = &client_unit->ip; + uint32_t addr_len = ip->addr_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + + if (sport) { + if (ip->addr_family == AF_INET) { + ((struct sockaddr_in *)server_addr)->sin_addr.s_addr = htonl(INADDR_ANY); + } else if (ip->addr_family == AF_INET6) { + ((struct sockaddr_in6 *)server_addr)->sin6_addr = in6addr_any; } - } else if (strcmp(domain, "unix") == 0) { - struct sockaddr_un socket_addr; - unlink(SOCKET_UNIX_DOMAIN_FILE); - socket_addr.sun_family = AF_UNIX; - strcpy_s(socket_addr.sun_path, sizeof(socket_addr.sun_path), SOCKET_UNIX_DOMAIN_FILE); - if (bind(*socket_fd, (struct sockaddr *)&socket_addr, sizeof(struct sockaddr_un)) < 0) { + ((struct sockaddr_in *)server_addr)->sin_port = sport; + if (bind(*socket_fd, (struct sockaddr *)server_addr, addr_len) < 0) { PRINT_ERROR("can't bind the address to socket %d! ", errno); return PROGRAM_FAULT; } + } + return PROGRAM_OK; +} - if (listen(*socket_fd, SERVER_SOCKET_LISTEN_BACKLOG) < 0) { - PRINT_ERROR("server socket can't lisiten %d! ", errno); +static int32_t pocess_unix_create_connect(int32_t *socket_fd) +{ + *socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (*socket_fd < 0) { + PRINT_ERROR("client can't create socket %d! ", errno); + return PROGRAM_FAULT; + } + + struct sockaddr_un server_addr; + server_addr.sun_family = AF_UNIX; + strcpy_s(server_addr.sun_path, sizeof(server_addr.sun_path), SOCKET_UNIX_DOMAIN_FILE); + if (connect(*socket_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_un)) < 0) { + if (errno == EINPROGRESS) { + return PROGRAM_INPROGRESS; + } else { + PRINT_ERROR("client can't connect to the server %d! ", errno); return PROGRAM_FAULT; } - } else if (strcmp(domain, "udp") == 0) { - struct sockaddr_in socket_addr; - memset_s(&socket_addr, sizeof(socket_addr), 0, sizeof(socket_addr)); - socket_addr.sin_family = AF_INET; - socket_addr.sin_port = port; - - if (groupip) { - struct ip_mreq mreq; - mreq.imr_multiaddr.s_addr = groupip; - mreq.imr_interface.s_addr = ip; - if (setsockopt(*socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(struct ip_mreq)) == -1) { - PRINT_ERROR("can't set the address to group %d! ", errno); - return PROGRAM_FAULT;; + } + return PROGRAM_OK; +} + +static int32_t pocess_udp_multicast(int32_t *socket_fd, struct ClientUnit *client_unit, sockaddr_t *server_addr) +{ + const uint32_t loop = client_unit->loop; + ip_addr_t *groupip = &client_unit->groupip; + if (client_unit->protocol_type_mode == UDP_MULTICAST) { + /* set the local device for a multicast socket */ + ((struct sockaddr_in *)server_addr)->sin_addr = groupip->u_addr.ip4; + + struct in_addr localInterface; + localInterface.s_addr = client_unit->groupip_interface.u_addr.ip4.s_addr; + if (localInterface.s_addr) { + if (setsockopt(*socket_fd, IPPROTO_IP, IP_MULTICAST_IF, (char *)&localInterface, + sizeof(localInterface)) < 0) { + PRINT_ERROR("can't set the multicast interface %d! ", errno); + return PROGRAM_FAULT; } - socket_addr.sin_addr.s_addr = groupip; - } else { - socket_addr.sin_addr.s_addr = ip; } - if (bind(*socket_fd, (struct sockaddr *)&socket_addr, sizeof(struct sockaddr_in)) < 0) { - PRINT_ERROR("can't bind the address to socket %d! ", errno); + /* sent multicast packets should be looped back to the local socket */ + if (setsockopt(*socket_fd, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) == -1) { + PRINT_ERROR("can't set the multicast loop %d! ", errno); return PROGRAM_FAULT; - } + } } - return PROGRAM_OK; } // create the socket and connect -int32_t create_socket_and_connect(int32_t *socket_fd, in_addr_t ip, in_addr_t groupip, uint16_t port, uint16_t sport, const char *domain, const char *api) +int32_t create_socket_and_connect(int32_t *socket_fd, struct ClientUnit *client_unit) { + ip_addr_t *ip = &client_unit->ip; + const char *domain = client_unit->domain; + const char *api = client_unit->api; + + sockaddr_t server_addr; + if (strcmp(domain, "tcp") == 0 || strcmp(domain, "udp") == 0) { - if (strcmp(domain, "tcp") == 0) { - *socket_fd = socket(AF_INET, SOCK_STREAM, 0); - } else { - *socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - } - if (*socket_fd < 0) { - PRINT_ERROR("client can't create socket %d! ", errno); + uint32_t addr_len = ip->addr_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + memset_s(&server_addr, addr_len, 0, addr_len); + + if (creat_socket_init(socket_fd, client_unit, &server_addr) != PROGRAM_OK) { return PROGRAM_FAULT; } - if (set_socket_unblock(*socket_fd) < 0) { - PRINT_ERROR("can't set the socket to unblock! "); + if (pocess_connect_sport(socket_fd, client_unit, &server_addr) < 0) { return PROGRAM_FAULT; } - struct sockaddr_in server_addr; - memset_s(&server_addr, sizeof(server_addr), 0, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - if (sport) { - server_addr.sin_addr.s_addr = htonl(INADDR_ANY); - server_addr.sin_port = sport; - if (bind(*socket_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in)) < 0) { - PRINT_ERROR("can't bind the address to socket %d! ", errno); - return PROGRAM_FAULT; - } + if (ip->addr_family == AF_INET) { + ((struct sockaddr_in *)&server_addr)->sin_addr = ip->u_addr.ip4; + } else if (ip->addr_family == AF_INET6) { + ((struct sockaddr_in6 *)&server_addr)->sin6_addr = ip->u_addr.ip6; } - server_addr.sin_addr.s_addr = ip; - server_addr.sin_port = port; + ((struct sockaddr_in *)&server_addr)->sin_port = client_unit->port; + if (strcmp(domain, "udp") == 0) { - if (groupip) { - server_addr.sin_addr.s_addr = groupip; - if (setsockopt(*socket_fd, IPPROTO_IP, IP_MULTICAST_IF, &ip, sizeof(ip)) != 0) { - PRINT_ERROR("can't set the multicast interface %d! ", errno); - return PROGRAM_FAULT; - } + int32_t ret = pocess_udp_multicast(socket_fd, client_unit, &server_addr); + if (ret != PROGRAM_OK) { + return ret; } } + if (strcmp(domain, "udp") != 0 || strcmp(api, "recvfromsendto") != 0) { - if (connect(*socket_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in)) < 0) { + if (connect(*socket_fd, (struct sockaddr *)&server_addr, addr_len) < 0) { if (errno == EINPROGRESS) { return PROGRAM_INPROGRESS; } else { @@ -157,25 +349,11 @@ int32_t create_socket_and_connect(int32_t *socket_fd, in_addr_t ip, in_addr_t gr } } } else if (strcmp(domain, "unix") == 0) { - *socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (*socket_fd < 0) { - PRINT_ERROR("client can't create socket %d! ", errno); - return PROGRAM_FAULT; - } - - struct sockaddr_un server_addr; - server_addr.sun_family = AF_UNIX; - strcpy_s(server_addr.sun_path, sizeof(server_addr.sun_path), SOCKET_UNIX_DOMAIN_FILE); - if (connect(*socket_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_un)) < 0) { - if (errno == EINPROGRESS) { - return PROGRAM_INPROGRESS; - } else { - PRINT_ERROR("client can't connect to the server %d! ", errno); - return PROGRAM_FAULT; - } + int32_t ret = pocess_unix_create_connect(socket_fd); + if (ret != PROGRAM_OK) { + return ret; } } - return PROGRAM_OK; } diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c index 3e6eeef..e272a04 100644 --- a/src/lstack/core/lstack_protocol_stack.c +++ b/src/lstack/core/lstack_protocol_stack.c @@ -655,7 +655,6 @@ int32_t stack_setup_thread(void) goto OUT1; } } - for (uint32_t i = 0; i < queue_num; i++) { if (get_global_cfg_params()->seperate_send_recv) { if (i % 2 == 0) { @@ -694,6 +693,7 @@ int32_t stack_setup_thread(void) g_stack_group.stack_num = queue_num; return 0; + OUT1: for (int32_t i = 0; i < queue_num; ++i) { if (t_params[i] != NULL) { -- 2.33.0