728 lines
20 KiB
Diff
728 lines
20 KiB
Diff
From b3733e9fe70d9ac101a2ac79ee4568942a3dd874 Mon Sep 17 00:00:00 2001
|
|
From: chenxin <kepler.chenxin@huawei.com>
|
|
Date: Wed, 4 Sep 2019 15:52:20 +0800
|
|
Subject: [PATCH 2/4] evhtp: enable dynamic thread pool
|
|
|
|
Signed-off-by: tanyifeng <tanyifeng1@huawei.com>
|
|
Signed-off-by: chenxin <kepler.chenxin@huawei.com>
|
|
|
|
Conflicts:
|
|
thread.c
|
|
|
|
Change-Id: If3f154de82448d002f0e3c90efaaf73394d6734b
|
|
---
|
|
evhtp.c | 47 ++++++
|
|
include/evhtp/evhtp.h | 22 +++
|
|
include/evhtp/thread.h | 6 +
|
|
thread.c | 415 ++++++++++++++++++++++++++++++++++++++++++++++++-
|
|
4 files changed, 487 insertions(+), 3 deletions(-)
|
|
|
|
diff --git a/evhtp.c b/evhtp.c
|
|
index 8d34676..2ecb619 100644
|
|
--- a/evhtp.c
|
|
+++ b/evhtp.c
|
|
@@ -2824,6 +2824,11 @@ htp__accept_cb_(struct evconnlistener * serv, int fd, struct sockaddr * s, int s
|
|
|
|
log_debug("fd = %d, conn = %p", fd, connection);
|
|
|
|
+ if (evhtp_unlikely(sl <= 0)) {
|
|
+ evhtp_safe_free(connection, evhtp_connection_free);
|
|
+ return;
|
|
+ }
|
|
+
|
|
connection->saddr = htp__malloc_(sl);
|
|
|
|
if (evhtp_unlikely(connection->saddr == NULL)) {
|
|
@@ -4590,6 +4595,37 @@ htp__use_threads_(evhtp_t * htp,
|
|
return 0;
|
|
}
|
|
|
|
+static int
|
|
+htp__use_dynamic_threads_(evhtp_t * htp,
|
|
+ evhtp_thread_init_cb init_cb,
|
|
+ evhtp_thread_exit_cb exit_cb,
|
|
+ int nthreads_keep, int nthreads_limit,
|
|
+ int nrequest_limit, void * arg)
|
|
+{
|
|
+ if (htp == NULL)
|
|
+ {
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
+ htp->thread_cbarg = arg;
|
|
+ htp->thread_init_cb = init_cb;
|
|
+ htp->thread_exit_cb = exit_cb;
|
|
+
|
|
+#ifndef EVHTP_DISABLE_SSL
|
|
+ evhtp_ssl_use_threads();
|
|
+#endif
|
|
+
|
|
+ if (!(htp->thr_pool = evthr_pool_dynamic_wexit_new(nthreads_keep,
|
|
+ nthreads_limit, nrequest_limit,
|
|
+ htp__thread_init_,
|
|
+ htp__thread_exit_, htp)))
|
|
+ {
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
+ return 0;
|
|
+}
|
|
+
|
|
int
|
|
evhtp_use_threads(evhtp_t * htp, evhtp_thread_init_cb init_cb,
|
|
int nthreads, void * arg)
|
|
@@ -4606,6 +4642,17 @@ evhtp_use_threads_wexit(evhtp_t * htp,
|
|
return htp__use_threads_(htp, init_cb, exit_cb, nthreads, arg);
|
|
}
|
|
|
|
+int
|
|
+evhtp_use_dynamic_threads(evhtp_t * htp,
|
|
+ evhtp_thread_init_cb init_cb,
|
|
+ evhtp_thread_exit_cb exit_cb,
|
|
+ int nthreads_keep, int nthreads_limit,
|
|
+ int nrequest_limit, void * arg)
|
|
+{
|
|
+ return htp__use_dynamic_threads_(htp, init_cb, exit_cb,
|
|
+ nthreads_keep, nthreads_limit, nrequest_limit, arg);
|
|
+}
|
|
+
|
|
#endif
|
|
|
|
#ifndef EVHTP_DISABLE_EVTHR
|
|
diff --git a/include/evhtp/evhtp.h b/include/evhtp/evhtp.h
|
|
index 9bcee44..fbe79ba 100644
|
|
--- a/include/evhtp/evhtp.h
|
|
+++ b/include/evhtp/evhtp.h
|
|
@@ -871,6 +871,28 @@ EVHTP_EXPORT int evhtp_use_threads_wexit(evhtp_t *,
|
|
int nthreads, void * arg);
|
|
|
|
/**
|
|
+ * @brief Enable dynamic thread-pool support for an evhtp_t context. Every connection is
|
|
+ * distributed to a thread. An optional "on-start" callback can
|
|
+ * be set which allows you to manipulate the thread-specific inforation
|
|
+ * (such as the thread-specific event_base).
|
|
+ *
|
|
+ * @param htp
|
|
+ * @param init_cb
|
|
+ * @param exit_cb
|
|
+ * @param nthreads_keep - number of threads to keep alive in background
|
|
+ * @param nthreads_limit - number of threads limit to create in background
|
|
+ * @param nrequest_limit - number of concurrent request limit to handle
|
|
+ * @param arg
|
|
+ *
|
|
+ * @return
|
|
+ */
|
|
+EVHTP_EXPORT int evhtp_use_dynamic_threads(evhtp_t * htp,
|
|
+ evhtp_thread_init_cb init_cb,
|
|
+ evhtp_thread_exit_cb exit_cb,
|
|
+ int nthreads_keep, int nthreads_limit,
|
|
+ int nrequest_limit, void * arg);
|
|
+
|
|
+/**
|
|
* @brief generates all the right information for a reply to be sent to the client
|
|
*
|
|
* @param request
|
|
diff --git a/include/evhtp/thread.h b/include/evhtp/thread.h
|
|
index 7479aa8..61058a1 100644
|
|
--- a/include/evhtp/thread.h
|
|
+++ b/include/evhtp/thread.h
|
|
@@ -21,12 +21,14 @@ enum evthr_res {
|
|
EVTHR_RES_FATAL
|
|
};
|
|
|
|
+struct evthr_pool_cb;
|
|
struct evthr_pool;
|
|
struct evthr;
|
|
|
|
typedef struct event_base evbase_t;
|
|
typedef struct event ev_t;
|
|
|
|
+typedef struct evthr_pool_cb evthr_pool_cb_t;
|
|
typedef struct evthr_pool evthr_pool_t;
|
|
typedef struct evthr evthr_t;
|
|
typedef enum evthr_res evthr_res;
|
|
@@ -49,13 +51,17 @@ EVHTP_EXPORT void evthr_free(evthr_t * evthr);
|
|
EVHTP_EXPORT evthr_pool_t * evthr_pool_new(int nthreads, evthr_init_cb, void *)
|
|
DEPRECATED("will take on the syntax of evthr_pool_wexit_new");
|
|
|
|
+EVHTP_EXPORT evthr_res evthr_pool_dynamic_add(evthr_pool_t * pool, evthr_cb cb, void * arg);
|
|
EVHTP_EXPORT int evthr_pool_start(evthr_pool_t * pool);
|
|
EVHTP_EXPORT evthr_res evthr_pool_stop(evthr_pool_t * pool);
|
|
EVHTP_EXPORT evthr_res evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg);
|
|
+EVHTP_EXPORT evthr_res evthr_pool_callback_defer(evthr_t * thr, evthr_cb cb, void * arg);
|
|
EVHTP_EXPORT void evthr_pool_free(evthr_pool_t * pool);
|
|
|
|
EVHTP_EXPORT evthr_t * evthr_wexit_new(evthr_init_cb, evthr_exit_cb, void * shared);
|
|
EVHTP_EXPORT evthr_pool_t * evthr_pool_wexit_new(int nthreads, evthr_init_cb, evthr_exit_cb, void *);
|
|
+EVHTP_EXPORT evthr_pool_t * evthr_pool_dynamic_wexit_new(int nthreads_keep, int nthreads_limit, int nrequest_limit,
|
|
+ evthr_init_cb, evthr_exit_cb, void *);
|
|
|
|
#ifdef __cplusplus
|
|
}
|
|
diff --git a/thread.c b/thread.c
|
|
index 96065df..1570b69 100644
|
|
--- a/thread.c
|
|
+++ b/thread.c
|
|
@@ -2,6 +2,7 @@
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <stdint.h>
|
|
+#include <stdbool.h>
|
|
#include <limits.h>
|
|
#ifndef WIN32
|
|
#include <sys/queue.h>
|
|
@@ -19,6 +20,7 @@
|
|
|
|
typedef struct evthr_cmd evthr_cmd_t;
|
|
typedef struct evthr_pool_slist evthr_pool_slist_t;
|
|
+typedef struct evthr_pool_cb_slist evthr_pool_cb_slist_t;
|
|
|
|
struct evthr_cmd {
|
|
uint8_t stop;
|
|
@@ -27,6 +29,13 @@ struct evthr_cmd {
|
|
} __attribute__((packed));
|
|
|
|
TAILQ_HEAD(evthr_pool_slist, evthr);
|
|
+TAILQ_HEAD(evthr_pool_cb_slist, evthr_pool_cb);
|
|
+
|
|
+struct evthr_pool_cb {
|
|
+ evthr_cb func;
|
|
+ void *arg;
|
|
+ TAILQ_ENTRY(evthr_pool_cb) next;
|
|
+};
|
|
|
|
struct evthr_pool {
|
|
#ifdef EVTHR_SHARED_PIPE
|
|
@@ -35,6 +44,28 @@ struct evthr_pool {
|
|
#endif
|
|
int nthreads;
|
|
evthr_pool_slist_t threads;
|
|
+
|
|
+ //definition for dynamic threads pool
|
|
+ bool dynamic;
|
|
+ evthr_init_cb init_cb;
|
|
+ evthr_init_cb exit_cb;
|
|
+ void * shared;
|
|
+ int nthreads_keep; /* Maximum number of keep alive threads in this pool */
|
|
+ int nthreads_limit; /* Maximum number of limit threads in this pool */
|
|
+ int nthreads_wait; /* Number of waiting threads */
|
|
+ int nthreads_now; /* Number of threads created */
|
|
+ int nrequest_limit; /* Maximum number of concurrent request in this pool */
|
|
+ int nrequest; /* Number of threads created */
|
|
+ int shutdown; /* Flag to mark shutting down */
|
|
+ pthread_t * clear_thr;
|
|
+ pthread_mutex_t lock;
|
|
+ pthread_cond_t wait_cv;
|
|
+ /* Condition variable for thread to wait new callback*/
|
|
+ pthread_cond_t shutdown_cv;
|
|
+ /* Condition variable for thread to wait shutdown*/
|
|
+ evthr_pool_slist_t wait_threads;
|
|
+ evthr_pool_slist_t dead_threads;
|
|
+ evthr_pool_cb_slist_t callbacks;
|
|
};
|
|
|
|
struct evthr {
|
|
@@ -54,6 +85,7 @@ struct evthr {
|
|
int pool_rdr;
|
|
struct event * shared_pool_ev;
|
|
#endif
|
|
+ evthr_pool_t * pool;
|
|
TAILQ_ENTRY(evthr) next;
|
|
};
|
|
|
|
@@ -94,7 +126,7 @@ _evthr_loop(void * args)
|
|
evthr_t * thread;
|
|
|
|
if (!(thread = (evthr_t *)args)) {
|
|
- return NULL;
|
|
+ pthread_exit(NULL);
|
|
}
|
|
|
|
if (thread == NULL || thread->thr == NULL) {
|
|
@@ -139,6 +171,112 @@ _evthr_loop(void * args)
|
|
pthread_exit(NULL);
|
|
} /* _evthr_loop */
|
|
|
|
+static void *
|
|
+_evthr_dynamic_loop(void * args) {
|
|
+ evthr_t * thread;
|
|
+ evthr_pool_t * pool;
|
|
+ int ret;
|
|
+
|
|
+ if (!(thread = (evthr_t *)args)) {
|
|
+ pthread_exit(NULL);
|
|
+ }
|
|
+
|
|
+ if (thread == NULL || thread->thr == NULL) {
|
|
+ pthread_exit(NULL);
|
|
+ }
|
|
+
|
|
+ pool = thread->pool;
|
|
+
|
|
+ thread->evbase = event_base_new();
|
|
+ thread->event = event_new(thread->evbase, thread->rdr,
|
|
+ EV_READ | EV_PERSIST, _evthr_read_cmd, args);
|
|
+
|
|
+ event_add(thread->event, NULL);
|
|
+
|
|
+ ret = pthread_mutex_lock(&thread->lock);
|
|
+ if (ret < 0) {
|
|
+ pthread_exit(NULL);
|
|
+ }
|
|
+ if (thread->init_cb != NULL) {
|
|
+ (thread->init_cb)(thread, thread->arg);
|
|
+ }
|
|
+ pthread_mutex_unlock(&thread->lock);
|
|
+
|
|
+ ret = pthread_mutex_lock(&pool->lock);
|
|
+ if (ret < 0) {
|
|
+ pthread_exit(NULL);
|
|
+ }
|
|
+
|
|
+ for (;;) {
|
|
+ int retval = 0;
|
|
+ int cnt = 0;
|
|
+
|
|
+ if (pool->shutdown == 0 && pool->callbacks.tqh_first == NULL) {
|
|
+ if (pool->nthreads_wait >= pool->nthreads_keep) {
|
|
+ break;
|
|
+ }
|
|
+ pool->nthreads_wait++;
|
|
+ TAILQ_INSERT_TAIL(&pool->wait_threads, thread, next);
|
|
+ //fprintf(stderr, "wait %p\n", thread);
|
|
+ pthread_cond_wait(&pool->wait_cv, &pool->lock);
|
|
+ pool->nthreads_wait--;
|
|
+ }
|
|
+
|
|
+ if (pool->shutdown == 0 && pool->callbacks.tqh_first != NULL) {
|
|
+ evthr_pool_cb_t *cb = pool->callbacks.tqh_first;
|
|
+
|
|
+ TAILQ_REMOVE(&pool->callbacks, pool->callbacks.tqh_first, next);
|
|
+ pool->nrequest--;
|
|
+ pthread_mutex_unlock(&pool->lock);
|
|
+
|
|
+ //fprintf(stderr, "send cb %p\n", cb);
|
|
+ evthr_pool_callback_defer(thread, cb->func, cb->arg);
|
|
+ free(cb);
|
|
+
|
|
+ //fprintf(stderr, "exec %p\n", thread);
|
|
+ while(!retval && cnt != 1) {
|
|
+ retval = event_base_loop(thread->evbase, EVLOOP_ONCE);
|
|
+ //cnt++;
|
|
+ cnt = event_base_get_num_events(thread->evbase, EVENT_BASE_COUNT_ADDED);
|
|
+ //fprintf(stderr, "thread:%p, retval:%d, cnt:%d\n", thread, retval, cnt);
|
|
+ }
|
|
+
|
|
+ ret = pthread_mutex_lock(&pool->lock);
|
|
+ if (ret < 0) {
|
|
+ pthread_exit(NULL);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ if (pool->shutdown) {
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ pool->nthreads_now--;
|
|
+
|
|
+ ret = pthread_mutex_lock(&thread->lock);
|
|
+ if (ret <0 ) {
|
|
+ goto skip_exit_cb;
|
|
+ }
|
|
+ if (thread->exit_cb != NULL) {
|
|
+ (thread->exit_cb)(thread, thread->arg);
|
|
+ }
|
|
+ pthread_mutex_unlock(&thread->lock);
|
|
+
|
|
+skip_exit_cb:
|
|
+ if (thread->err == 1) {
|
|
+ fprintf(stderr, "FATAL ERROR!\n");
|
|
+ }
|
|
+
|
|
+ /* Add to list of dead threads for memory free */
|
|
+ TAILQ_INSERT_TAIL(&pool->dead_threads, thread, next);
|
|
+
|
|
+ pthread_mutex_unlock(&pool->lock);
|
|
+ //fprintf(stderr, "thread exit:%p\n", thread);
|
|
+
|
|
+ pthread_exit(NULL);
|
|
+} /* _evthr_loop */
|
|
+
|
|
evthr_res
|
|
evthr_defer(evthr_t * thread, evthr_cb cb, void * arg)
|
|
{
|
|
@@ -275,6 +413,28 @@ evthr_start(evthr_t * thread)
|
|
return 0;
|
|
}
|
|
|
|
+int
|
|
+evthr_dynamic_start(evthr_t * thread) {
|
|
+ pthread_attr_t attr;
|
|
+
|
|
+ if (thread == NULL || thread->thr == NULL) {
|
|
+ return -1;
|
|
+ }
|
|
+
|
|
+ if (pthread_attr_init(&attr))
|
|
+ return -1;
|
|
+ if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED))
|
|
+ return -1;
|
|
+
|
|
+ if (pthread_create(thread->thr, &attr, _evthr_dynamic_loop, (void *)thread)) {
|
|
+ pthread_attr_destroy(&attr);
|
|
+ return -1;
|
|
+ }
|
|
+ pthread_attr_destroy(&attr);
|
|
+
|
|
+ return 0;
|
|
+}
|
|
+
|
|
void
|
|
evthr_free(evthr_t * thread)
|
|
{
|
|
@@ -284,10 +444,12 @@ evthr_free(evthr_t * thread)
|
|
|
|
if (thread->rdr > 0) {
|
|
close(thread->rdr);
|
|
+ thread->rdr = -1;
|
|
}
|
|
|
|
if (thread->wdr > 0) {
|
|
close(thread->wdr);
|
|
+ thread->wdr = -1;
|
|
}
|
|
|
|
if (thread->thr) {
|
|
@@ -309,6 +471,8 @@ evthr_free(evthr_t * thread)
|
|
event_base_free(thread->evbase);
|
|
}
|
|
|
|
+ pthread_mutex_destroy(&thread->lock);
|
|
+
|
|
free(thread);
|
|
} /* evthr_free */
|
|
|
|
@@ -316,25 +480,46 @@ void
|
|
evthr_pool_free(evthr_pool_t * pool)
|
|
{
|
|
evthr_t * thread;
|
|
- evthr_t * save;
|
|
+ evthr_t * thread_save;
|
|
+ evthr_pool_cb_t * callback;
|
|
+ evthr_pool_cb_t * callback_save;
|
|
|
|
if (pool == NULL) {
|
|
return;
|
|
}
|
|
|
|
- TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) {
|
|
+ TAILQ_FOREACH_SAFE(thread, &pool->threads, next, thread_save) {
|
|
TAILQ_REMOVE(&pool->threads, thread, next);
|
|
|
|
evthr_free(thread);
|
|
}
|
|
|
|
+ if (pool->dynamic) {
|
|
+ if (pool->clear_thr)
|
|
+ free(pool->clear_thr);
|
|
+
|
|
+ TAILQ_FOREACH_SAFE(thread, &pool->dead_threads, next, thread_save) {
|
|
+ TAILQ_REMOVE(&pool->dead_threads, thread, next);
|
|
+ evthr_free(thread);
|
|
+ }
|
|
+
|
|
+ TAILQ_FOREACH_SAFE(callback, &pool->callbacks, next, callback_save) {
|
|
+ TAILQ_REMOVE(&pool->callbacks, callback, next);
|
|
+ free(callback);
|
|
+ }
|
|
+
|
|
+ pthread_mutex_destroy(&pool->lock);
|
|
+ }
|
|
+
|
|
#ifdef EVTHR_SHARED_PIPE
|
|
if (pool->rdr > 0) {
|
|
close(pool->rdr);
|
|
+ pool->rdr = -1;
|
|
}
|
|
|
|
if (pool->wdr > 0) {
|
|
close(pool->wdr);
|
|
+ pool->wdr = -1;
|
|
}
|
|
#endif
|
|
|
|
@@ -355,6 +540,31 @@ evthr_pool_stop(evthr_pool_t * pool)
|
|
evthr_stop(thr);
|
|
}
|
|
|
|
+ if (pool->dynamic) {
|
|
+ if (pthread_mutex_lock(&pool->lock) < 0) {
|
|
+ return EVTHR_RES_FATAL;
|
|
+ }
|
|
+ pool->shutdown = 1;
|
|
+ pthread_cond_broadcast(&pool->wait_cv);
|
|
+ pthread_cond_broadcast(&pool->shutdown_cv);
|
|
+ pthread_mutex_unlock(&pool->lock);
|
|
+
|
|
+wait_for_exit:
|
|
+ usleep(100 * 1000);
|
|
+ if (pthread_mutex_lock(&pool->lock) < 0) {
|
|
+ return EVTHR_RES_FATAL;
|
|
+ }
|
|
+ if (pool->nthreads_now != 0) {
|
|
+ pthread_mutex_unlock(&pool->lock);
|
|
+ goto wait_for_exit;
|
|
+ }
|
|
+ pthread_mutex_unlock(&pool->lock);
|
|
+
|
|
+ if (pool->clear_thr) {
|
|
+ pthread_join(*pool->clear_thr, NULL);
|
|
+ }
|
|
+ }
|
|
+
|
|
return EVTHR_RES_OK;
|
|
}
|
|
|
|
@@ -378,6 +588,10 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg)
|
|
.stop = 0
|
|
};
|
|
|
|
+ if (pool->dynamic) {
|
|
+ return evthr_pool_dynamic_add(pool, cb, arg);
|
|
+ }
|
|
+
|
|
if (evhtp_unlikely(send(pool->wdr, &cmd, sizeof(cmd), 0) == -1)) {
|
|
return EVTHR_RES_RETRY;
|
|
}
|
|
@@ -396,6 +610,9 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg)
|
|
return EVTHR_RES_NOCB;
|
|
}
|
|
|
|
+ if (pool->dynamic) {
|
|
+ return evthr_pool_dynamic_add(pool, cb, arg);
|
|
+ }
|
|
|
|
TAILQ_FOREACH(thread, &pool->threads, next) {
|
|
int backlog = get_backlog_(thread);
|
|
@@ -414,6 +631,66 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg)
|
|
return evthr_defer(min_thread, cb, arg);
|
|
} /* evthr_pool_defer */
|
|
|
|
+evthr_res
|
|
+evthr_pool_callback_defer(evthr_t * thr, evthr_cb cb, void * arg) {
|
|
+ if (thr == NULL) {
|
|
+ return EVTHR_RES_FATAL;
|
|
+ }
|
|
+
|
|
+ if (cb == NULL) {
|
|
+ return EVTHR_RES_NOCB;
|
|
+ }
|
|
+
|
|
+ return evthr_defer(thr, cb, arg);
|
|
+} /* evthr_pool_callback_defer */
|
|
+
|
|
+static void clear_dead_threads(evthr_pool_t *pool)
|
|
+{
|
|
+ evthr_t * thread;
|
|
+ evthr_t * save;
|
|
+
|
|
+ TAILQ_FOREACH_SAFE(thread, &pool->dead_threads, next, save) {
|
|
+ TAILQ_REMOVE(&pool->dead_threads, thread, next);
|
|
+ evthr_free(thread);
|
|
+ }
|
|
+}
|
|
+
|
|
+static void *
|
|
+_evthr_pool_time_clear_dead_threads(void * args)
|
|
+{
|
|
+#define __CLEAR_CYCLE_TIME 60
|
|
+ struct timespec ts;
|
|
+ evthr_pool_t * pool = (evthr_pool_t *)args;
|
|
+
|
|
+ if (pool == NULL) {
|
|
+ pthread_exit(NULL);
|
|
+ }
|
|
+
|
|
+ for(;;) {
|
|
+ if (pthread_mutex_lock(&pool->lock) < 0) {
|
|
+ sleep(1);
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
|
|
+ break;
|
|
+ }
|
|
+ ts.tv_sec += __CLEAR_CYCLE_TIME;
|
|
+ pthread_cond_timedwait(&pool->shutdown_cv, &pool->lock, &ts);
|
|
+
|
|
+ clear_dead_threads(pool);
|
|
+
|
|
+ if (pool->shutdown) {
|
|
+ pthread_mutex_unlock(&pool->lock);
|
|
+ break;
|
|
+ }
|
|
+
|
|
+ pthread_mutex_unlock(&pool->lock);
|
|
+ }
|
|
+ //fprintf(stderr, "exit time clear thread\n");
|
|
+ pthread_exit(NULL);
|
|
+}
|
|
+
|
|
static evthr_pool_t *
|
|
_evthr_pool_new(int nthreads,
|
|
evthr_init_cb init_cb,
|
|
@@ -469,6 +746,67 @@ _evthr_pool_new(int nthreads,
|
|
} /* _evthr_pool_new */
|
|
|
|
evthr_pool_t *
|
|
+_evthr_pool_dynamic_new(int nthreads_keep,
|
|
+ int nthreads_limit, int nrequest_limit,
|
|
+ evthr_init_cb init_cb, evthr_exit_cb exit_cb,
|
|
+ void * shared)
|
|
+{
|
|
+ evthr_pool_t * pool;
|
|
+ int i;
|
|
+
|
|
+ if (!(pool = calloc(sizeof(evthr_pool_t), 1))) {
|
|
+ return NULL;
|
|
+ }
|
|
+
|
|
+ pool->init_cb = init_cb;
|
|
+ pool->exit_cb = exit_cb;
|
|
+ pool->shared = shared;
|
|
+ pool->dynamic = true;
|
|
+
|
|
+ pool->nthreads_keep = nthreads_keep;
|
|
+ if (nthreads_limit <= 0) {
|
|
+ pool->nthreads_limit = INT_MAX;
|
|
+ } else {
|
|
+ pool->nthreads_limit = nthreads_limit;
|
|
+ }
|
|
+ if (nrequest_limit <= 0) {
|
|
+ pool->nrequest_limit = INT_MAX;
|
|
+ } else {
|
|
+ pool->nrequest_limit = nrequest_limit;
|
|
+ }
|
|
+
|
|
+ TAILQ_INIT(&pool->threads);
|
|
+ TAILQ_INIT(&pool->wait_threads);
|
|
+ TAILQ_INIT(&pool->dead_threads);
|
|
+ TAILQ_INIT(&pool->callbacks);
|
|
+
|
|
+ if (pthread_mutex_init(&pool->lock, NULL)) {
|
|
+ evthr_pool_free(pool);
|
|
+ return NULL;
|
|
+ }
|
|
+ if (pthread_cond_init(&pool->wait_cv, NULL)) {
|
|
+ evthr_pool_free(pool);
|
|
+ return NULL;
|
|
+ }
|
|
+ if (pthread_cond_init(&pool->shutdown_cv, NULL)) {
|
|
+ evthr_pool_free(pool);
|
|
+ return NULL;
|
|
+ }
|
|
+
|
|
+ pool->clear_thr = calloc(sizeof(pool->clear_thr), 1);
|
|
+ if (!pool->clear_thr) {
|
|
+ evthr_pool_free(pool);
|
|
+ return NULL;
|
|
+ }
|
|
+ if (pthread_create(pool->clear_thr, NULL, _evthr_pool_time_clear_dead_threads, (void *)pool)) {
|
|
+ evthr_pool_free(pool);
|
|
+ return NULL;
|
|
+ }
|
|
+
|
|
+ return pool;
|
|
+}
|
|
+
|
|
+evthr_pool_t *
|
|
evthr_pool_new(int nthreads, evthr_init_cb init_cb, void * shared)
|
|
{
|
|
return _evthr_pool_new(nthreads, init_cb, NULL, shared);
|
|
@@ -482,6 +820,15 @@ evthr_pool_wexit_new(int nthreads,
|
|
return _evthr_pool_new(nthreads, init_cb, exit_cb, shared);
|
|
}
|
|
|
|
+evthr_pool_t *
|
|
+evthr_pool_dynamic_wexit_new(int nthreads_keep,
|
|
+ int nthreads_limit, int nrequest_limit,
|
|
+ evthr_init_cb init_cb, evthr_exit_cb exit_cb,
|
|
+ void * shared) {
|
|
+ return _evthr_pool_dynamic_new(nthreads_keep, nthreads_limit, nrequest_limit,
|
|
+ init_cb, exit_cb, shared);
|
|
+}
|
|
+
|
|
int
|
|
evthr_pool_start(evthr_pool_t * pool)
|
|
{
|
|
@@ -501,3 +848,65 @@ evthr_pool_start(evthr_pool_t * pool)
|
|
|
|
return 0;
|
|
}
|
|
+
|
|
+evthr_res
|
|
+evthr_pool_dynamic_add(evthr_pool_t * pool, evthr_cb cb, void * arg) {
|
|
+ evthr_pool_cb_t *callback;
|
|
+ evthr_t * thread;
|
|
+
|
|
+ if (pool == NULL) {
|
|
+ return EVTHR_RES_FATAL;
|
|
+ }
|
|
+
|
|
+ if (pthread_mutex_lock(&pool->lock) < 0) {
|
|
+ return EVTHR_RES_FATAL;
|
|
+ }
|
|
+
|
|
+ if (pool->nrequest >= pool->nrequest_limit) {
|
|
+ pthread_mutex_unlock(&pool->lock);
|
|
+ log_debug("Too many request, limit is %d, refuse to handle.", pool->nrequest_limit);
|
|
+ return EVTHR_RES_FATAL;
|
|
+ }
|
|
+
|
|
+ if (!(callback = calloc(sizeof(evthr_pool_cb_t), 1))) {
|
|
+ pthread_mutex_unlock(&pool->lock);
|
|
+ return EVTHR_RES_FATAL;
|
|
+ }
|
|
+
|
|
+ callback->func = cb;
|
|
+ callback->arg = arg;
|
|
+
|
|
+ TAILQ_INSERT_TAIL(&pool->callbacks, callback, next);
|
|
+ pool->nrequest++;
|
|
+
|
|
+ if (pool->nthreads_wait == 0 && pool->nthreads_now < pool->nthreads_limit) {
|
|
+ if (!(thread = evthr_wexit_new(pool->init_cb, pool->exit_cb, pool->shared))) {
|
|
+ goto release_out;
|
|
+ }
|
|
+
|
|
+ thread->pool = pool;
|
|
+
|
|
+ if (evthr_dynamic_start(thread) < 0) {
|
|
+ goto release_out;
|
|
+ }
|
|
+ pool->nthreads_now++;
|
|
+ //fprintf(stderr, "create thread:%d,%p\n", pool->nthreads_now, thread);
|
|
+ } else {
|
|
+ pthread_cond_signal(&pool->wait_cv);
|
|
+ }
|
|
+
|
|
+ clear_dead_threads(pool);
|
|
+
|
|
+ pthread_mutex_unlock(&pool->lock);
|
|
+ return EVTHR_RES_OK;
|
|
+release_out:
|
|
+ evthr_free(thread);
|
|
+ TAILQ_REMOVE(&pool->callbacks, callback, next);
|
|
+ pool->nrequest--;
|
|
+ callback->func = NULL;
|
|
+ callback->arg = NULL;
|
|
+ free(callback);
|
|
+ pthread_mutex_unlock(&pool->lock);
|
|
+ return EVTHR_RES_FATAL;
|
|
+}
|
|
+
|
|
--
|
|
1.8.3.1
|
|
|