blob: aa44e315bb76786abfe6425997fcb425ccc16e55 [file] [log] [blame]
anjana_sreekumar@infosys.com991c2062020-01-08 11:42:57 +05301/*
2 * Copyright (c) 2003-2018, Great Software Laboratory Pvt. Ltd.
3 * Copyright (c) 2017 Intel Corporation
4 * Copyright (c) 2019, Infosys Ltd.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include <stdio.h>
20#include <string.h>
21#include <stdlib.h>
22#include <errno.h>
23#include <pthread.h>
24#include <unistd.h>
25#include "thread_pool.h"
26#include "tpool_queue.h"
27
28struct Job *create_job(JobFunction function, void *arg)
29{
30 struct Job *job;
31 job = (struct Job *)malloc(sizeof(struct Job));
32 if(job == NULL) {
33#ifdef DEBUG
34 log_msg(LOG_ERROR, "failed to allocate memory\n");
35#endif
36 return NULL;
37 }
38 job->function = function;
39 job->arg = arg;
40 return job;
41}
42
43static void *worker_thread(void *userdata)
44{
45 void *arg;
46 struct Job *job;
47 JobFunction function;
48 struct thread_pool *pool;
49
50 pool = (struct thread_pool *)userdata;
51
52 while(1) {
53 pthread_mutex_lock(&pool->queue_mutex);
54
55 /* waiting until dispatch thread signals for new job */
56 pthread_cond_wait(&pool->job_received, &pool->queue_mutex);
57 job = queue_pop_head(pool->job_queue);
58 pthread_mutex_unlock(&pool->queue_mutex);
59
60 if(job != NULL) {
61 function = job->function;
62 arg = job->arg;
63 free(job);
64
65 /* atomically updating idle_threads */
66 __sync_fetch_and_sub(&pool->idle_threads, 1);
67
68 function(arg);
69
70 __sync_fetch_and_add(&pool->idle_threads, 1);
71 }
72 }
73 return NULL;
74}
75
76/* If queue has pending jobs and
77 * thread is idle then signal the thread
78 * to process the job
79 */
80static void *dispatch_if_idle(void *userdata)
81{
82 struct thread_pool *pool;
83
84 pool = (struct thread_pool *)userdata;
85
86 while(1) {
87 if((pool->idle_threads > 0) &&
88 pool->job_queue->length > 0 ) {
89 pthread_cond_signal(&pool->job_received);
90 } else usleep(10);
91 }
92 return NULL;
93}
94
95/* creates a thread and pushes into queue */
96static int spawn_thread(struct thread_pool *pool)
97{
98 int status;
99 pthread_t thread;
100
101 status = pthread_create(&thread, NULL, worker_thread, pool);
102 if(status < 0)
103 return status;
104
105 queue_push_tail(pool->thread_queue, &thread);
106
107 return 0;
108}
109
110/* pushes job into queue */
111int insert_job(struct thread_pool *pool, JobFunction function, void *userdata)
112{
113 struct Job *job;
114
115 if(pool == NULL)
116 return -1;
117
118 job = create_job(function, userdata);
119 if(job == NULL)
120 return -ENOMEM;
121
122 pthread_mutex_lock(&pool->queue_mutex);
123 queue_push_tail(pool->job_queue, job);
124 pthread_mutex_unlock(&pool->queue_mutex);
125
126 return 0;
127}
128
129struct thread_pool *thread_pool_new(int count)
130{
131 int i, status;
132 pthread_t thread;
133 struct thread_pool *pool;
134
135 pool = (struct thread_pool *)malloc(sizeof(struct thread_pool));
136 if(pool == NULL) {
137#ifdef DEBUG
138 log_msg(LOG_ERROR, "failed to allocate memory\n");
139#endif
140 errno = ENOMEM;
141 return NULL;
142 }
143 pool->idle_threads = count;
144 pool->job_queue = queue_new();
145 pool->thread_queue = queue_new();
146
147 pthread_mutex_init(&pool->queue_mutex, NULL);
148 pthread_cond_init(&pool->job_received, NULL);
149
150 status = pthread_create(&thread, NULL, dispatch_if_idle, pool);
151 if(status < 0) {
152#ifdef DEBUG
153 log_msg(LOG_ERROR, "failed to spawn dispatch thread, stopping\n");
154#endif
155 return NULL;
156 }
157 pool->dispatch_thread = thread;
158
159 i = 0;
160 while(i < count)
161 if (spawn_thread(pool) == 0)
162 i++;
163
164 return pool;
165}
166
167int thread_pool_destroy(struct thread_pool *pool)
168{
169 pthread_t *thread;
170
171 if(pool == NULL)
172 return -1;
173
174 pthread_cancel(pool->dispatch_thread);
175 while((thread = queue_pop_head(pool->job_queue)) != NULL) {
176 pthread_cancel(*thread);
177 }
178
179 queue_destroy(pool->job_queue, free);
180 queue_destroy(pool->thread_queue, NULL);
181 pthread_mutex_destroy(&pool->queue_mutex);
182 pthread_cond_destroy(&pool->job_received);
183
184 free(pool);
185 return 0;
186}
187
188