From a3b74902cadf6dd671a093c6747a5bd624d09cc4 Mon Sep 17 00:00:00 2001 From: Wenkai Lin Date: Tue, 4 Jan 2022 12:20:29 +0800 Subject: [PATCH 32/53] uadk: env: fix wd_add_task_to_async_queue clear global information if an add task error occurred. Signed-off-by: Wenkai Lin --- wd_util.c | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/wd_util.c b/wd_util.c index fd132aa..676f475 100644 --- a/wd_util.c +++ b/wd_util.c @@ -45,7 +45,9 @@ struct async_task { struct async_task_queue { struct async_task *head; int depth; + /* the producer offset of task queue */ int prod; + /* the consumer offset of task queue */ int cons; int cur_task; int left_task; @@ -1113,8 +1115,8 @@ static struct async_task_queue *find_async_queue(struct wd_env_config *config, int wd_add_task_to_async_queue(struct wd_env_config *config, __u32 idx) { struct async_task_queue *task_queue; - struct async_task *head, *task; - int prod, ret; + struct async_task *task; + int curr_prod, ret; if (!config->enable_internal_poll) return 0; @@ -1131,14 +1133,13 @@ int wd_add_task_to_async_queue(struct wd_env_config *config, __u32 idx) pthread_mutex_lock(&task_queue->lock); - prod = task_queue->prod; - head = task_queue->head; - task = head + prod; - /* fix me */ + /* get an available async task and fill ctx idx */ + curr_prod = task_queue->prod; + task = task_queue->head + curr_prod; task->idx = idx; - prod = (prod + 1) % task_queue->depth; - task_queue->prod = prod; + /* update global information of task queue */ + task_queue->prod = (curr_prod + 1) % task_queue->depth; task_queue->cur_task++; task_queue->left_task--; @@ -1147,10 +1148,19 @@ int wd_add_task_to_async_queue(struct wd_env_config *config, __u32 idx) ret = sem_post(&task_queue->full_sem); if (ret) { WD_ERR("failed to post full_sem!\n"); - return ret; + goto err_out; } return 0; + +err_out: + pthread_mutex_lock(&task_queue->lock); + task_queue->left_task++; + task_queue->cur_task--; + task_queue->prod = curr_prod; + pthread_mutex_unlock(&task_queue->lock); + + return ret; } static void *async_poll_process_func(void *args) @@ -1216,18 +1226,18 @@ static int wd_init_one_task_queue(struct async_task_queue *task_queue, void *alg_poll_ctx) { - struct async_task *task_head; + struct async_task *head; pthread_t thread_id; pthread_attr_t attr; int depth, ret; task_queue->depth = depth = WD_ASYNC_DEF_QUEUE_DEPTH; - task_head = calloc(task_queue->depth, sizeof(struct async_task)); - if (!task_head) + head = calloc(task_queue->depth, sizeof(*head)); + if (!head) return -WD_ENOMEM; - task_queue->head = task_head; + task_queue->head = head; task_queue->left_task = depth; task_queue->alg_poll_ctx = alg_poll_ctx; @@ -1268,7 +1278,7 @@ err_uninit_full_sem: err_uninit_empty_sem: sem_destroy(&task_queue->empty_sem); err_free_head: - free(task_head); + free(head); ret = -errno; return ret; } -- 2.25.1