diff --git a/evhtp.c b/evhtp.c index 6eaf319..220c044 100644 --- a/evhtp.c +++ b/evhtp.c @@ -4595,6 +4595,37 @@ htp__use_threads_(evhtp_t * htp, return 0; } +static int +htp__use_dynamic_threads_(evhtp_t * htp, + evhtp_thread_init_cb init_cb, + evhtp_thread_exit_cb exit_cb, + int nthreads_keep, int nthreads_limit, + int nrequest_limit, void * arg) +{ + if (htp == NULL) + { + return -1; + } + + htp->thread_cbarg = arg; + htp->thread_init_cb = init_cb; + htp->thread_exit_cb = exit_cb; + +#ifndef EVHTP_DISABLE_SSL + evhtp_ssl_use_threads(); +#endif + + if (!(htp->thr_pool = evthr_pool_dynamic_wexit_new(nthreads_keep, + nthreads_limit, nrequest_limit, + htp__thread_init_, + htp__thread_exit_, htp))) + { + return -1; + } + + return 0; +} + int evhtp_use_threads(evhtp_t * htp, evhtp_thread_init_cb init_cb, int nthreads, void * arg) @@ -4611,6 +4642,17 @@ evhtp_use_threads_wexit(evhtp_t * htp, return htp__use_threads_(htp, init_cb, exit_cb, nthreads, arg); } +int +evhtp_use_dynamic_threads(evhtp_t * htp, + evhtp_thread_init_cb init_cb, + evhtp_thread_exit_cb exit_cb, + int nthreads_keep, int nthreads_limit, + int nrequest_limit, void * arg) +{ + return htp__use_dynamic_threads_(htp, init_cb, exit_cb, + nthreads_keep, nthreads_limit, nrequest_limit, arg); +} + #endif #ifndef EVHTP_DISABLE_EVTHR diff --git a/include/evhtp/evhtp.h b/include/evhtp/evhtp.h index 344f58c..8061bba 100644 --- a/include/evhtp/evhtp.h +++ b/include/evhtp/evhtp.h @@ -880,6 +880,28 @@ EVHTP_EXPORT int evhtp_use_threads_wexit(evhtp_t *, evhtp_thread_exit_cb, int nthreads, void * arg); +/** + * @brief Enable dynamic thread-pool support for an evhtp_t context. Every connection is + * distributed to a thread. An optional "on-start" callback can + * be set which allows you to manipulate the thread-specific inforation + * (such as the thread-specific event_base). + * + * @param htp + * @param init_cb + * @param exit_cb + * @param nthreads_keep - number of threads to keep alive in background + * @param nthreads_limit - number of threads limit to create in background + * @param nrequest_limit - number of concurrent request limit to handle + * @param arg + * + * @return + */ +EVHTP_EXPORT int evhtp_use_dynamic_threads(evhtp_t * htp, + evhtp_thread_init_cb init_cb, + evhtp_thread_exit_cb exit_cb, + int nthreads_keep, int nthreads_limit, + int nrequest_limit, void * arg); + /** * @brief generates all the right information for a reply to be sent to the client * diff --git a/include/evhtp/thread.h b/include/evhtp/thread.h index 7479aa8..61058a1 100644 --- a/include/evhtp/thread.h +++ b/include/evhtp/thread.h @@ -21,12 +21,14 @@ enum evthr_res { EVTHR_RES_FATAL }; +struct evthr_pool_cb; struct evthr_pool; struct evthr; typedef struct event_base evbase_t; typedef struct event ev_t; +typedef struct evthr_pool_cb evthr_pool_cb_t; typedef struct evthr_pool evthr_pool_t; typedef struct evthr evthr_t; typedef enum evthr_res evthr_res; @@ -49,13 +51,17 @@ EVHTP_EXPORT void evthr_free(evthr_t * evthr); EVHTP_EXPORT evthr_pool_t * evthr_pool_new(int nthreads, evthr_init_cb, void *) DEPRECATED("will take on the syntax of evthr_pool_wexit_new"); +EVHTP_EXPORT evthr_res evthr_pool_dynamic_add(evthr_pool_t * pool, evthr_cb cb, void * arg); EVHTP_EXPORT int evthr_pool_start(evthr_pool_t * pool); EVHTP_EXPORT evthr_res evthr_pool_stop(evthr_pool_t * pool); EVHTP_EXPORT evthr_res evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg); +EVHTP_EXPORT evthr_res evthr_pool_callback_defer(evthr_t * thr, evthr_cb cb, void * arg); EVHTP_EXPORT void evthr_pool_free(evthr_pool_t * pool); EVHTP_EXPORT evthr_t * evthr_wexit_new(evthr_init_cb, evthr_exit_cb, void * shared); EVHTP_EXPORT evthr_pool_t * evthr_pool_wexit_new(int nthreads, evthr_init_cb, evthr_exit_cb, void *); +EVHTP_EXPORT evthr_pool_t * evthr_pool_dynamic_wexit_new(int nthreads_keep, int nthreads_limit, int nrequest_limit, + evthr_init_cb, evthr_exit_cb, void *); #ifdef __cplusplus } diff --git a/thread.c b/thread.c index 7659cee..37bdeaa 100644 --- a/thread.c +++ b/thread.c @@ -2,6 +2,7 @@ #include #include #include +#include #include #ifndef WIN32 #include @@ -15,9 +16,11 @@ #include "internal.h" #include "evhtp/thread.h" +#include "log.h" typedef struct evthr_cmd evthr_cmd_t; typedef struct evthr_pool_slist evthr_pool_slist_t; +typedef struct evthr_pool_cb_slist evthr_pool_cb_slist_t; struct evthr_cmd { uint8_t stop; @@ -26,6 +29,13 @@ struct evthr_cmd { } __attribute__((packed)); TAILQ_HEAD(evthr_pool_slist, evthr); +TAILQ_HEAD(evthr_pool_cb_slist, evthr_pool_cb); + +struct evthr_pool_cb { + evthr_cb func; + void *arg; + TAILQ_ENTRY(evthr_pool_cb) next; +}; struct evthr_pool { #ifdef EVTHR_SHARED_PIPE @@ -34,6 +44,28 @@ struct evthr_pool { #endif int nthreads; evthr_pool_slist_t threads; + + //definition for dynamic threads pool + bool dynamic; + evthr_init_cb init_cb; + evthr_init_cb exit_cb; + void * shared; + int nthreads_keep; /* Maximum number of keep alive threads in this pool */ + int nthreads_limit; /* Maximum number of limit threads in this pool */ + int nthreads_wait; /* Number of waiting threads */ + int nthreads_now; /* Number of threads created */ + int nrequest_limit; /* Maximum number of concurrent request in this pool */ + int nrequest; /* Number of threads created */ + int shutdown; /* Flag to mark shutting down */ + pthread_t * clear_thr; + pthread_mutex_t lock; + pthread_cond_t wait_cv; + /* Condition variable for thread to wait new callback*/ + pthread_cond_t shutdown_cv; + /* Condition variable for thread to wait shutdown*/ + evthr_pool_slist_t wait_threads; + evthr_pool_slist_t dead_threads; + evthr_pool_cb_slist_t callbacks; }; struct evthr { @@ -53,6 +85,7 @@ struct evthr { int pool_rdr; struct event * shared_pool_ev; #endif + evthr_pool_t * pool; TAILQ_ENTRY(evthr) next; }; @@ -91,7 +124,7 @@ _evthr_loop(void * args) { evthr_t * thread; if (!(thread = (evthr_t *)args)) { - return NULL; + pthread_exit(NULL); } if (thread == NULL || thread->thr == NULL) { @@ -133,6 +166,92 @@ _evthr_loop(void * args) { pthread_exit(NULL); } /* _evthr_loop */ +static void * +_evthr_dynamic_loop(void * args) { + evthr_t * thread = NULL; + evthr_pool_t * pool = NULL; + + if (!(thread = (evthr_t *)args)) { + pthread_exit(NULL); + } + + if (thread == NULL || thread->thr == NULL) { + pthread_exit(NULL); + } + + pool = thread->pool; + + thread->evbase = event_base_new(); + thread->event = event_new(thread->evbase, thread->rdr, + EV_READ | EV_PERSIST, _evthr_read_cmd, args); + + event_add(thread->event, NULL); + + pthread_mutex_lock(&thread->lock); + if (thread->init_cb != NULL) { + (thread->init_cb)(thread, thread->arg); + } + pthread_mutex_unlock(&thread->lock); + + pthread_mutex_lock(&pool->lock); + + for (;;) { + int retval = 0; + int cnt = 0; + + if (pool->shutdown == 0 && pool->callbacks.tqh_first == NULL) { + if (pool->nthreads_wait >= pool->nthreads_keep) { + break; + } + pool->nthreads_wait++; + TAILQ_INSERT_TAIL(&pool->wait_threads, thread, next); + pthread_cond_wait(&pool->wait_cv, &pool->lock); + pool->nthreads_wait--; + } + + if (pool->shutdown == 0 && pool->callbacks.tqh_first != NULL) { + evthr_pool_cb_t *cb = pool->callbacks.tqh_first; + + TAILQ_REMOVE(&pool->callbacks, pool->callbacks.tqh_first, next); + pool->nrequest--; + pthread_mutex_unlock(&pool->lock); + + evthr_pool_callback_defer(thread, cb->func, cb->arg); + free(cb); + + while(!retval && cnt != 1) { + retval = event_base_loop(thread->evbase, EVLOOP_ONCE); + cnt = event_base_get_num_events(thread->evbase, EVENT_BASE_COUNT_ADDED); + } + + pthread_mutex_lock(&pool->lock); + } + + if (pool->shutdown) { + break; + } + } + + pool->nthreads_now--; + + pthread_mutex_lock(&thread->lock); + if (thread->exit_cb != NULL) { + (thread->exit_cb)(thread, thread->arg); + } + pthread_mutex_unlock(&thread->lock); + + if (thread->err == 1) { + fprintf(stderr, "FATAL ERROR!\n"); + } + + /* Add to list of dead threads for memory free */ + TAILQ_INSERT_TAIL(&pool->dead_threads, thread, next); + + pthread_mutex_unlock(&pool->lock); + + pthread_exit(NULL); +} /* _evthr_loop */ + evthr_res evthr_defer(evthr_t * thread, evthr_cb cb, void * arg) { evthr_cmd_t cmd = { @@ -258,6 +377,28 @@ evthr_start(evthr_t * thread) { return 0; } +int +evthr_dynamic_start(evthr_t * thread) { + pthread_attr_t attr; + + if (thread == NULL || thread->thr == NULL) { + return -1; + } + + if (pthread_attr_init(&attr)) + return -1; + if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) + return -1; + + if (pthread_create(thread->thr, &attr, _evthr_dynamic_loop, (void *)thread)) { + pthread_attr_destroy(&attr); + return -1; + } + pthread_attr_destroy(&attr); + + return 0; +} + void evthr_free(evthr_t * thread) { if (thread == NULL) { @@ -280,35 +421,73 @@ evthr_free(evthr_t * thread) { event_free(thread->event); } +#ifdef EVTHR_SHARED_PIPE + if (thread->shared_pool_ev) { + event_free(thread->shared_pool_ev); + } +#endif + if (thread->evbase) { event_base_free(thread->evbase); } + pthread_mutex_destroy(&thread->lock); + free(thread); } /* evthr_free */ void evthr_pool_free(evthr_pool_t * pool) { - evthr_t * thread; - evthr_t * save; + evthr_t * thread = NULL; + evthr_t * thread_save = NULL; + evthr_pool_cb_t * callback = NULL; + evthr_pool_cb_t * callback_save = NULL; if (pool == NULL) { return; } - TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) { + TAILQ_FOREACH_SAFE(thread, &pool->threads, next, thread_save) { TAILQ_REMOVE(&pool->threads, thread, next); evthr_free(thread); } + if (pool->dynamic) { + if (pool->clear_thr) + free(pool->clear_thr); + + TAILQ_FOREACH_SAFE(thread, &pool->dead_threads, next, thread_save) { + TAILQ_REMOVE(&pool->dead_threads, thread, next); + evthr_free(thread); + } + + TAILQ_FOREACH_SAFE(callback, &pool->callbacks, next, callback_save) { + TAILQ_REMOVE(&pool->callbacks, callback, next); + free(callback); + } + + pthread_mutex_destroy(&pool->lock); + } + +#ifdef EVTHR_SHARED_PIPE + if (pool->rdr > 0) { + close(pool->rdr); + } + + if (pool->wdr > 0) { + close(pool->wdr); + } +#endif + free(pool); } evthr_res evthr_pool_stop(evthr_pool_t * pool) { - evthr_t * thr; - evthr_t * save; + evthr_t * thr = NULL; + evthr_t * save = NULL; + struct timespec sleeptime; if (pool == NULL) { return EVTHR_RES_FATAL; @@ -318,6 +497,29 @@ evthr_pool_stop(evthr_pool_t * pool) { evthr_stop(thr); } + if (pool->dynamic) { + pthread_mutex_lock(&pool->lock); + pool->shutdown = 1; + pthread_cond_broadcast(&pool->wait_cv); + pthread_cond_broadcast(&pool->shutdown_cv); + pthread_mutex_unlock(&pool->lock); + +wait_for_exit: + sleeptime.tv_sec = 0; + sleeptime.tv_nsec = 100 * 1000 * 1000; + (void)nanosleep(&sleeptime, NULL); + pthread_mutex_lock(&pool->lock); + if (pool->nthreads_now != 0) { + pthread_mutex_unlock(&pool->lock); + goto wait_for_exit; + } + pthread_mutex_unlock(&pool->lock); + + if (pool->clear_thr) { + pthread_join(*pool->clear_thr, NULL); + } + } + return EVTHR_RES_OK; } @@ -330,12 +532,17 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) { .stop = 0 }; + if (pool->dynamic) { + return evthr_pool_dynamic_add(pool, cb, arg); + } + if (evhtp_unlikely(send(pool->wdr, &cmd, sizeof(cmd), 0) == -1)) { return EVTHR_RES_RETRY; } return EVTHR_RES_OK; #else + evthr_t * thr = NULL; if (pool == NULL) { @@ -346,16 +553,75 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) { return EVTHR_RES_NOCB; } + if (pool->dynamic) { + return evthr_pool_dynamic_add(pool, cb, arg); + } + thr = TAILQ_FIRST(&pool->threads); TAILQ_REMOVE(&pool->threads, thr, next); TAILQ_INSERT_TAIL(&pool->threads, thr, next); - return evthr_defer(thr, cb, arg); #endif } /* evthr_pool_defer */ +evthr_res +evthr_pool_callback_defer(evthr_t * thr, evthr_cb cb, void * arg) { + if (thr == NULL) { + return EVTHR_RES_FATAL; + } + + if (cb == NULL) { + return EVTHR_RES_NOCB; + } + + return evthr_defer(thr, cb, arg); +} /* evthr_pool_callback_defer */ + +static void clear_dead_threads(evthr_pool_t *pool) +{ + evthr_t * thread = NULL; + evthr_t * save = NULL; + + TAILQ_FOREACH_SAFE(thread, &pool->dead_threads, next, save) { + TAILQ_REMOVE(&pool->dead_threads, thread, next); + evthr_free(thread); + } +} + +static void * +_evthr_pool_time_clear_dead_threads(void * args) +{ +#define __CLEAR_CYCLE_TIME 60 + struct timespec ts; + evthr_pool_t * pool = (evthr_pool_t *)args; + + if (pool == NULL) { + pthread_exit(NULL); + } + + for(;;) { + pthread_mutex_lock(&pool->lock); + + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { + break; + } + ts.tv_sec += __CLEAR_CYCLE_TIME; + pthread_cond_timedwait(&pool->shutdown_cv, &pool->lock, &ts); + + clear_dead_threads(pool); + + if (pool->shutdown) { + pthread_mutex_unlock(&pool->lock); + break; + } + + pthread_mutex_unlock(&pool->lock); + } + pthread_exit(NULL); +} + static evthr_pool_t * _evthr_pool_new(int nthreads, evthr_init_cb init_cb, @@ -392,7 +658,7 @@ _evthr_pool_new(int nthreads, #endif for (i = 0; i < nthreads; i++) { - evthr_t * thread; + evthr_t * thread = NULL; if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) { evthr_pool_free(pool); @@ -409,6 +675,67 @@ _evthr_pool_new(int nthreads, return pool; } /* _evthr_pool_new */ +evthr_pool_t * +_evthr_pool_dynamic_new(int nthreads_keep, + int nthreads_limit, int nrequest_limit, + evthr_init_cb init_cb, evthr_exit_cb exit_cb, + void * shared) +{ + evthr_pool_t * pool = NULL; + int i; + + if (!(pool = calloc(sizeof(evthr_pool_t), 1))) { + return NULL; + } + + pool->init_cb = init_cb; + pool->exit_cb = exit_cb; + pool->shared = shared; + pool->dynamic = true; + + pool->nthreads_keep = nthreads_keep; + if (nthreads_limit <= 0) { + pool->nthreads_limit = INT_MAX; + } else { + pool->nthreads_limit = nthreads_limit; + } + if (nrequest_limit <= 0) { + pool->nrequest_limit = INT_MAX; + } else { + pool->nrequest_limit = nrequest_limit; + } + + TAILQ_INIT(&pool->threads); + TAILQ_INIT(&pool->wait_threads); + TAILQ_INIT(&pool->dead_threads); + TAILQ_INIT(&pool->callbacks); + + if (pthread_mutex_init(&pool->lock, NULL)) { + evthr_pool_free(pool); + return NULL; + } + if (pthread_cond_init(&pool->wait_cv, NULL)) { + evthr_pool_free(pool); + return NULL; + } + if (pthread_cond_init(&pool->shutdown_cv, NULL)) { + evthr_pool_free(pool); + return NULL; + } + + pool->clear_thr = calloc(sizeof(pool->clear_thr), 1); + if (!pool->clear_thr) { + evthr_pool_free(pool); + return NULL; + } + if (pthread_create(pool->clear_thr, NULL, _evthr_pool_time_clear_dead_threads, (void *)pool)) { + evthr_pool_free(pool); + return NULL; + } + + return pool; +} + evthr_pool_t * evthr_pool_new(int nthreads, evthr_init_cb init_cb, void * shared) { return _evthr_pool_new(nthreads, init_cb, NULL, shared); @@ -421,6 +748,15 @@ evthr_pool_wexit_new(int nthreads, return _evthr_pool_new(nthreads, init_cb, exit_cb, shared); } +evthr_pool_t * +evthr_pool_dynamic_wexit_new(int nthreads_keep, + int nthreads_limit, int nrequest_limit, + evthr_init_cb init_cb, evthr_exit_cb exit_cb, + void * shared) { + return _evthr_pool_dynamic_new(nthreads_keep, nthreads_limit, nrequest_limit, + init_cb, exit_cb, shared); +} + int evthr_pool_start(evthr_pool_t * pool) { evthr_t * evthr = NULL; @@ -439,3 +775,56 @@ evthr_pool_start(evthr_pool_t * pool) { return 0; } + +evthr_res +evthr_pool_dynamic_add(evthr_pool_t * pool, evthr_cb cb, void * arg) { + evthr_pool_cb_t * callback = NULL; + evthr_t * thread = NULL; + + if (pool == NULL) { + return EVTHR_RES_FATAL; + } + + pthread_mutex_lock(&pool->lock); + + if (pool->nrequest >= pool->nrequest_limit) { + pthread_mutex_unlock(&pool->lock); + log_debug("Too many request, limit is %d, refuse to handle.", pool->nrequest_limit); + return EVTHR_RES_FATAL; + } + + if (!(callback = calloc(sizeof(evthr_pool_cb_t), 1))) { + pthread_mutex_unlock(&pool->lock); + return EVTHR_RES_FATAL; + } + + callback->func = cb; + callback->arg = arg; + + TAILQ_INSERT_TAIL(&pool->callbacks, callback, next); + pool->nrequest++; + + if (pool->nthreads_wait == 0 && pool->nthreads_now < pool->nthreads_limit) { + if (!(thread = evthr_wexit_new(pool->init_cb, pool->exit_cb, pool->shared))) { + pthread_mutex_unlock(&pool->lock); + return EVTHR_RES_FATAL; + } + + thread->pool = pool; + + if (evthr_dynamic_start(thread) < 0) { + evthr_free(thread); + pthread_mutex_unlock(&pool->lock); + return EVTHR_RES_FATAL; + } + pool->nthreads_now++; + } else { + pthread_cond_signal(&pool->wait_cv); + } + + clear_dead_threads(pool); + + pthread_mutex_unlock(&pool->lock); + return EVTHR_RES_OK; +} +