From b3733e9fe70d9ac101a2ac79ee4568942a3dd874 Mon Sep 17 00:00:00 2001 From: chenxin Date: Wed, 4 Sep 2019 15:52:20 +0800 Subject: [PATCH 2/4] evhtp: enable dynamic thread pool Signed-off-by: tanyifeng Signed-off-by: chenxin Conflicts: thread.c Change-Id: If3f154de82448d002f0e3c90efaaf73394d6734b --- evhtp.c | 47 ++++++ include/evhtp/evhtp.h | 22 +++ include/evhtp/thread.h | 6 + thread.c | 415 ++++++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 487 insertions(+), 3 deletions(-) diff --git a/evhtp.c b/evhtp.c index 8d34676..2ecb619 100644 --- a/evhtp.c +++ b/evhtp.c @@ -2824,6 +2824,11 @@ htp__accept_cb_(struct evconnlistener * serv, int fd, struct sockaddr * s, int s log_debug("fd = %d, conn = %p", fd, connection); + if (evhtp_unlikely(sl <= 0)) { + evhtp_safe_free(connection, evhtp_connection_free); + return; + } + connection->saddr = htp__malloc_(sl); if (evhtp_unlikely(connection->saddr == NULL)) { @@ -4590,6 +4595,37 @@ htp__use_threads_(evhtp_t * htp, return 0; } +static int +htp__use_dynamic_threads_(evhtp_t * htp, + evhtp_thread_init_cb init_cb, + evhtp_thread_exit_cb exit_cb, + int nthreads_keep, int nthreads_limit, + int nrequest_limit, void * arg) +{ + if (htp == NULL) + { + return -1; + } + + htp->thread_cbarg = arg; + htp->thread_init_cb = init_cb; + htp->thread_exit_cb = exit_cb; + +#ifndef EVHTP_DISABLE_SSL + evhtp_ssl_use_threads(); +#endif + + if (!(htp->thr_pool = evthr_pool_dynamic_wexit_new(nthreads_keep, + nthreads_limit, nrequest_limit, + htp__thread_init_, + htp__thread_exit_, htp))) + { + return -1; + } + + return 0; +} + int evhtp_use_threads(evhtp_t * htp, evhtp_thread_init_cb init_cb, int nthreads, void * arg) @@ -4606,6 +4642,17 @@ evhtp_use_threads_wexit(evhtp_t * htp, return htp__use_threads_(htp, init_cb, exit_cb, nthreads, arg); } +int +evhtp_use_dynamic_threads(evhtp_t * htp, + evhtp_thread_init_cb init_cb, + evhtp_thread_exit_cb exit_cb, + int nthreads_keep, int nthreads_limit, + int nrequest_limit, void * arg) +{ + return htp__use_dynamic_threads_(htp, init_cb, exit_cb, + nthreads_keep, nthreads_limit, nrequest_limit, arg); +} + #endif #ifndef EVHTP_DISABLE_EVTHR diff --git a/include/evhtp/evhtp.h b/include/evhtp/evhtp.h index 9bcee44..fbe79ba 100644 --- a/include/evhtp/evhtp.h +++ b/include/evhtp/evhtp.h @@ -871,6 +871,28 @@ EVHTP_EXPORT int evhtp_use_threads_wexit(evhtp_t *, int nthreads, void * arg); /** + * @brief Enable dynamic thread-pool support for an evhtp_t context. Every connection is + * distributed to a thread. An optional "on-start" callback can + * be set which allows you to manipulate the thread-specific inforation + * (such as the thread-specific event_base). + * + * @param htp + * @param init_cb + * @param exit_cb + * @param nthreads_keep - number of threads to keep alive in background + * @param nthreads_limit - number of threads limit to create in background + * @param nrequest_limit - number of concurrent request limit to handle + * @param arg + * + * @return + */ +EVHTP_EXPORT int evhtp_use_dynamic_threads(evhtp_t * htp, + evhtp_thread_init_cb init_cb, + evhtp_thread_exit_cb exit_cb, + int nthreads_keep, int nthreads_limit, + int nrequest_limit, void * arg); + +/** * @brief generates all the right information for a reply to be sent to the client * * @param request diff --git a/include/evhtp/thread.h b/include/evhtp/thread.h index 7479aa8..61058a1 100644 --- a/include/evhtp/thread.h +++ b/include/evhtp/thread.h @@ -21,12 +21,14 @@ enum evthr_res { EVTHR_RES_FATAL }; +struct evthr_pool_cb; struct evthr_pool; struct evthr; typedef struct event_base evbase_t; typedef struct event ev_t; +typedef struct evthr_pool_cb evthr_pool_cb_t; typedef struct evthr_pool evthr_pool_t; typedef struct evthr evthr_t; typedef enum evthr_res evthr_res; @@ -49,13 +51,17 @@ EVHTP_EXPORT void evthr_free(evthr_t * evthr); EVHTP_EXPORT evthr_pool_t * evthr_pool_new(int nthreads, evthr_init_cb, void *) DEPRECATED("will take on the syntax of evthr_pool_wexit_new"); +EVHTP_EXPORT evthr_res evthr_pool_dynamic_add(evthr_pool_t * pool, evthr_cb cb, void * arg); EVHTP_EXPORT int evthr_pool_start(evthr_pool_t * pool); EVHTP_EXPORT evthr_res evthr_pool_stop(evthr_pool_t * pool); EVHTP_EXPORT evthr_res evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg); +EVHTP_EXPORT evthr_res evthr_pool_callback_defer(evthr_t * thr, evthr_cb cb, void * arg); EVHTP_EXPORT void evthr_pool_free(evthr_pool_t * pool); EVHTP_EXPORT evthr_t * evthr_wexit_new(evthr_init_cb, evthr_exit_cb, void * shared); EVHTP_EXPORT evthr_pool_t * evthr_pool_wexit_new(int nthreads, evthr_init_cb, evthr_exit_cb, void *); +EVHTP_EXPORT evthr_pool_t * evthr_pool_dynamic_wexit_new(int nthreads_keep, int nthreads_limit, int nrequest_limit, + evthr_init_cb, evthr_exit_cb, void *); #ifdef __cplusplus } diff --git a/thread.c b/thread.c index 96065df..1570b69 100644 --- a/thread.c +++ b/thread.c @@ -2,6 +2,7 @@ #include #include #include +#include #include #ifndef WIN32 #include @@ -19,6 +20,7 @@ typedef struct evthr_cmd evthr_cmd_t; typedef struct evthr_pool_slist evthr_pool_slist_t; +typedef struct evthr_pool_cb_slist evthr_pool_cb_slist_t; struct evthr_cmd { uint8_t stop; @@ -27,6 +29,13 @@ struct evthr_cmd { } __attribute__((packed)); TAILQ_HEAD(evthr_pool_slist, evthr); +TAILQ_HEAD(evthr_pool_cb_slist, evthr_pool_cb); + +struct evthr_pool_cb { + evthr_cb func; + void *arg; + TAILQ_ENTRY(evthr_pool_cb) next; +}; struct evthr_pool { #ifdef EVTHR_SHARED_PIPE @@ -35,6 +44,28 @@ struct evthr_pool { #endif int nthreads; evthr_pool_slist_t threads; + + //definition for dynamic threads pool + bool dynamic; + evthr_init_cb init_cb; + evthr_init_cb exit_cb; + void * shared; + int nthreads_keep; /* Maximum number of keep alive threads in this pool */ + int nthreads_limit; /* Maximum number of limit threads in this pool */ + int nthreads_wait; /* Number of waiting threads */ + int nthreads_now; /* Number of threads created */ + int nrequest_limit; /* Maximum number of concurrent request in this pool */ + int nrequest; /* Number of threads created */ + int shutdown; /* Flag to mark shutting down */ + pthread_t * clear_thr; + pthread_mutex_t lock; + pthread_cond_t wait_cv; + /* Condition variable for thread to wait new callback*/ + pthread_cond_t shutdown_cv; + /* Condition variable for thread to wait shutdown*/ + evthr_pool_slist_t wait_threads; + evthr_pool_slist_t dead_threads; + evthr_pool_cb_slist_t callbacks; }; struct evthr { @@ -54,6 +85,7 @@ struct evthr { int pool_rdr; struct event * shared_pool_ev; #endif + evthr_pool_t * pool; TAILQ_ENTRY(evthr) next; }; @@ -94,7 +126,7 @@ _evthr_loop(void * args) evthr_t * thread; if (!(thread = (evthr_t *)args)) { - return NULL; + pthread_exit(NULL); } if (thread == NULL || thread->thr == NULL) { @@ -139,6 +171,112 @@ _evthr_loop(void * args) pthread_exit(NULL); } /* _evthr_loop */ +static void * +_evthr_dynamic_loop(void * args) { + evthr_t * thread; + evthr_pool_t * pool; + int ret; + + if (!(thread = (evthr_t *)args)) { + pthread_exit(NULL); + } + + if (thread == NULL || thread->thr == NULL) { + pthread_exit(NULL); + } + + pool = thread->pool; + + thread->evbase = event_base_new(); + thread->event = event_new(thread->evbase, thread->rdr, + EV_READ | EV_PERSIST, _evthr_read_cmd, args); + + event_add(thread->event, NULL); + + ret = pthread_mutex_lock(&thread->lock); + if (ret < 0) { + pthread_exit(NULL); + } + if (thread->init_cb != NULL) { + (thread->init_cb)(thread, thread->arg); + } + pthread_mutex_unlock(&thread->lock); + + ret = pthread_mutex_lock(&pool->lock); + if (ret < 0) { + pthread_exit(NULL); + } + + for (;;) { + int retval = 0; + int cnt = 0; + + if (pool->shutdown == 0 && pool->callbacks.tqh_first == NULL) { + if (pool->nthreads_wait >= pool->nthreads_keep) { + break; + } + pool->nthreads_wait++; + TAILQ_INSERT_TAIL(&pool->wait_threads, thread, next); + //fprintf(stderr, "wait %p\n", thread); + pthread_cond_wait(&pool->wait_cv, &pool->lock); + pool->nthreads_wait--; + } + + if (pool->shutdown == 0 && pool->callbacks.tqh_first != NULL) { + evthr_pool_cb_t *cb = pool->callbacks.tqh_first; + + TAILQ_REMOVE(&pool->callbacks, pool->callbacks.tqh_first, next); + pool->nrequest--; + pthread_mutex_unlock(&pool->lock); + + //fprintf(stderr, "send cb %p\n", cb); + evthr_pool_callback_defer(thread, cb->func, cb->arg); + free(cb); + + //fprintf(stderr, "exec %p\n", thread); + while(!retval && cnt != 1) { + retval = event_base_loop(thread->evbase, EVLOOP_ONCE); + //cnt++; + cnt = event_base_get_num_events(thread->evbase, EVENT_BASE_COUNT_ADDED); + //fprintf(stderr, "thread:%p, retval:%d, cnt:%d\n", thread, retval, cnt); + } + + ret = pthread_mutex_lock(&pool->lock); + if (ret < 0) { + pthread_exit(NULL); + } + } + + if (pool->shutdown) { + break; + } + } + + pool->nthreads_now--; + + ret = pthread_mutex_lock(&thread->lock); + if (ret <0 ) { + goto skip_exit_cb; + } + if (thread->exit_cb != NULL) { + (thread->exit_cb)(thread, thread->arg); + } + pthread_mutex_unlock(&thread->lock); + +skip_exit_cb: + if (thread->err == 1) { + fprintf(stderr, "FATAL ERROR!\n"); + } + + /* Add to list of dead threads for memory free */ + TAILQ_INSERT_TAIL(&pool->dead_threads, thread, next); + + pthread_mutex_unlock(&pool->lock); + //fprintf(stderr, "thread exit:%p\n", thread); + + pthread_exit(NULL); +} /* _evthr_loop */ + evthr_res evthr_defer(evthr_t * thread, evthr_cb cb, void * arg) { @@ -275,6 +413,28 @@ evthr_start(evthr_t * thread) return 0; } +int +evthr_dynamic_start(evthr_t * thread) { + pthread_attr_t attr; + + if (thread == NULL || thread->thr == NULL) { + return -1; + } + + if (pthread_attr_init(&attr)) + return -1; + if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) + return -1; + + if (pthread_create(thread->thr, &attr, _evthr_dynamic_loop, (void *)thread)) { + pthread_attr_destroy(&attr); + return -1; + } + pthread_attr_destroy(&attr); + + return 0; +} + void evthr_free(evthr_t * thread) { @@ -284,10 +444,12 @@ evthr_free(evthr_t * thread) if (thread->rdr > 0) { close(thread->rdr); + thread->rdr = -1; } if (thread->wdr > 0) { close(thread->wdr); + thread->wdr = -1; } if (thread->thr) { @@ -309,6 +471,8 @@ evthr_free(evthr_t * thread) event_base_free(thread->evbase); } + pthread_mutex_destroy(&thread->lock); + free(thread); } /* evthr_free */ @@ -316,25 +480,46 @@ void evthr_pool_free(evthr_pool_t * pool) { evthr_t * thread; - evthr_t * save; + evthr_t * thread_save; + evthr_pool_cb_t * callback; + evthr_pool_cb_t * callback_save; if (pool == NULL) { return; } - TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) { + TAILQ_FOREACH_SAFE(thread, &pool->threads, next, thread_save) { TAILQ_REMOVE(&pool->threads, thread, next); evthr_free(thread); } + if (pool->dynamic) { + if (pool->clear_thr) + free(pool->clear_thr); + + TAILQ_FOREACH_SAFE(thread, &pool->dead_threads, next, thread_save) { + TAILQ_REMOVE(&pool->dead_threads, thread, next); + evthr_free(thread); + } + + TAILQ_FOREACH_SAFE(callback, &pool->callbacks, next, callback_save) { + TAILQ_REMOVE(&pool->callbacks, callback, next); + free(callback); + } + + pthread_mutex_destroy(&pool->lock); + } + #ifdef EVTHR_SHARED_PIPE if (pool->rdr > 0) { close(pool->rdr); + pool->rdr = -1; } if (pool->wdr > 0) { close(pool->wdr); + pool->wdr = -1; } #endif @@ -355,6 +540,31 @@ evthr_pool_stop(evthr_pool_t * pool) evthr_stop(thr); } + if (pool->dynamic) { + if (pthread_mutex_lock(&pool->lock) < 0) { + return EVTHR_RES_FATAL; + } + pool->shutdown = 1; + pthread_cond_broadcast(&pool->wait_cv); + pthread_cond_broadcast(&pool->shutdown_cv); + pthread_mutex_unlock(&pool->lock); + +wait_for_exit: + usleep(100 * 1000); + if (pthread_mutex_lock(&pool->lock) < 0) { + return EVTHR_RES_FATAL; + } + if (pool->nthreads_now != 0) { + pthread_mutex_unlock(&pool->lock); + goto wait_for_exit; + } + pthread_mutex_unlock(&pool->lock); + + if (pool->clear_thr) { + pthread_join(*pool->clear_thr, NULL); + } + } + return EVTHR_RES_OK; } @@ -378,6 +588,10 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) .stop = 0 }; + if (pool->dynamic) { + return evthr_pool_dynamic_add(pool, cb, arg); + } + if (evhtp_unlikely(send(pool->wdr, &cmd, sizeof(cmd), 0) == -1)) { return EVTHR_RES_RETRY; } @@ -396,6 +610,9 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) return EVTHR_RES_NOCB; } + if (pool->dynamic) { + return evthr_pool_dynamic_add(pool, cb, arg); + } TAILQ_FOREACH(thread, &pool->threads, next) { int backlog = get_backlog_(thread); @@ -414,6 +631,66 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) return evthr_defer(min_thread, cb, arg); } /* evthr_pool_defer */ +evthr_res +evthr_pool_callback_defer(evthr_t * thr, evthr_cb cb, void * arg) { + if (thr == NULL) { + return EVTHR_RES_FATAL; + } + + if (cb == NULL) { + return EVTHR_RES_NOCB; + } + + return evthr_defer(thr, cb, arg); +} /* evthr_pool_callback_defer */ + +static void clear_dead_threads(evthr_pool_t *pool) +{ + evthr_t * thread; + evthr_t * save; + + TAILQ_FOREACH_SAFE(thread, &pool->dead_threads, next, save) { + TAILQ_REMOVE(&pool->dead_threads, thread, next); + evthr_free(thread); + } +} + +static void * +_evthr_pool_time_clear_dead_threads(void * args) +{ +#define __CLEAR_CYCLE_TIME 60 + struct timespec ts; + evthr_pool_t * pool = (evthr_pool_t *)args; + + if (pool == NULL) { + pthread_exit(NULL); + } + + for(;;) { + if (pthread_mutex_lock(&pool->lock) < 0) { + sleep(1); + continue; + } + + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { + break; + } + ts.tv_sec += __CLEAR_CYCLE_TIME; + pthread_cond_timedwait(&pool->shutdown_cv, &pool->lock, &ts); + + clear_dead_threads(pool); + + if (pool->shutdown) { + pthread_mutex_unlock(&pool->lock); + break; + } + + pthread_mutex_unlock(&pool->lock); + } + //fprintf(stderr, "exit time clear thread\n"); + pthread_exit(NULL); +} + static evthr_pool_t * _evthr_pool_new(int nthreads, evthr_init_cb init_cb, @@ -469,6 +746,67 @@ _evthr_pool_new(int nthreads, } /* _evthr_pool_new */ evthr_pool_t * +_evthr_pool_dynamic_new(int nthreads_keep, + int nthreads_limit, int nrequest_limit, + evthr_init_cb init_cb, evthr_exit_cb exit_cb, + void * shared) +{ + evthr_pool_t * pool; + int i; + + if (!(pool = calloc(sizeof(evthr_pool_t), 1))) { + return NULL; + } + + pool->init_cb = init_cb; + pool->exit_cb = exit_cb; + pool->shared = shared; + pool->dynamic = true; + + pool->nthreads_keep = nthreads_keep; + if (nthreads_limit <= 0) { + pool->nthreads_limit = INT_MAX; + } else { + pool->nthreads_limit = nthreads_limit; + } + if (nrequest_limit <= 0) { + pool->nrequest_limit = INT_MAX; + } else { + pool->nrequest_limit = nrequest_limit; + } + + TAILQ_INIT(&pool->threads); + TAILQ_INIT(&pool->wait_threads); + TAILQ_INIT(&pool->dead_threads); + TAILQ_INIT(&pool->callbacks); + + if (pthread_mutex_init(&pool->lock, NULL)) { + evthr_pool_free(pool); + return NULL; + } + if (pthread_cond_init(&pool->wait_cv, NULL)) { + evthr_pool_free(pool); + return NULL; + } + if (pthread_cond_init(&pool->shutdown_cv, NULL)) { + evthr_pool_free(pool); + return NULL; + } + + pool->clear_thr = calloc(sizeof(pool->clear_thr), 1); + if (!pool->clear_thr) { + evthr_pool_free(pool); + return NULL; + } + if (pthread_create(pool->clear_thr, NULL, _evthr_pool_time_clear_dead_threads, (void *)pool)) { + evthr_pool_free(pool); + return NULL; + } + + return pool; +} + +evthr_pool_t * evthr_pool_new(int nthreads, evthr_init_cb init_cb, void * shared) { return _evthr_pool_new(nthreads, init_cb, NULL, shared); @@ -482,6 +820,15 @@ evthr_pool_wexit_new(int nthreads, return _evthr_pool_new(nthreads, init_cb, exit_cb, shared); } +evthr_pool_t * +evthr_pool_dynamic_wexit_new(int nthreads_keep, + int nthreads_limit, int nrequest_limit, + evthr_init_cb init_cb, evthr_exit_cb exit_cb, + void * shared) { + return _evthr_pool_dynamic_new(nthreads_keep, nthreads_limit, nrequest_limit, + init_cb, exit_cb, shared); +} + int evthr_pool_start(evthr_pool_t * pool) { @@ -501,3 +848,65 @@ evthr_pool_start(evthr_pool_t * pool) return 0; } + +evthr_res +evthr_pool_dynamic_add(evthr_pool_t * pool, evthr_cb cb, void * arg) { + evthr_pool_cb_t *callback; + evthr_t * thread; + + if (pool == NULL) { + return EVTHR_RES_FATAL; + } + + if (pthread_mutex_lock(&pool->lock) < 0) { + return EVTHR_RES_FATAL; + } + + if (pool->nrequest >= pool->nrequest_limit) { + pthread_mutex_unlock(&pool->lock); + log_debug("Too many request, limit is %d, refuse to handle.", pool->nrequest_limit); + return EVTHR_RES_FATAL; + } + + if (!(callback = calloc(sizeof(evthr_pool_cb_t), 1))) { + pthread_mutex_unlock(&pool->lock); + return EVTHR_RES_FATAL; + } + + callback->func = cb; + callback->arg = arg; + + TAILQ_INSERT_TAIL(&pool->callbacks, callback, next); + pool->nrequest++; + + if (pool->nthreads_wait == 0 && pool->nthreads_now < pool->nthreads_limit) { + if (!(thread = evthr_wexit_new(pool->init_cb, pool->exit_cb, pool->shared))) { + goto release_out; + } + + thread->pool = pool; + + if (evthr_dynamic_start(thread) < 0) { + goto release_out; + } + pool->nthreads_now++; + //fprintf(stderr, "create thread:%d,%p\n", pool->nthreads_now, thread); + } else { + pthread_cond_signal(&pool->wait_cv); + } + + clear_dead_threads(pool); + + pthread_mutex_unlock(&pool->lock); + return EVTHR_RES_OK; +release_out: + evthr_free(thread); + TAILQ_REMOVE(&pool->callbacks, callback, next); + pool->nrequest--; + callback->func = NULL; + callback->arg = NULL; + free(callback); + pthread_mutex_unlock(&pool->lock); + return EVTHR_RES_FATAL; +} + -- 1.8.3.1