blob: aa44e315bb76786abfe6425997fcb425ccc16e55 [file] [log] [blame]
/*
* Copyright (c) 2003-2018, Great Software Laboratory Pvt. Ltd.
* Copyright (c) 2017 Intel Corporation
* Copyright (c) 2019, Infosys Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
#include <unistd.h>
#include "thread_pool.h"
#include "tpool_queue.h"
struct Job *create_job(JobFunction function, void *arg)
{
struct Job *job;
job = (struct Job *)malloc(sizeof(struct Job));
if(job == NULL) {
#ifdef DEBUG
log_msg(LOG_ERROR, "failed to allocate memory\n");
#endif
return NULL;
}
job->function = function;
job->arg = arg;
return job;
}
static void *worker_thread(void *userdata)
{
void *arg;
struct Job *job;
JobFunction function;
struct thread_pool *pool;
pool = (struct thread_pool *)userdata;
while(1) {
pthread_mutex_lock(&pool->queue_mutex);
/* waiting until dispatch thread signals for new job */
pthread_cond_wait(&pool->job_received, &pool->queue_mutex);
job = queue_pop_head(pool->job_queue);
pthread_mutex_unlock(&pool->queue_mutex);
if(job != NULL) {
function = job->function;
arg = job->arg;
free(job);
/* atomically updating idle_threads */
__sync_fetch_and_sub(&pool->idle_threads, 1);
function(arg);
__sync_fetch_and_add(&pool->idle_threads, 1);
}
}
return NULL;
}
/* If queue has pending jobs and
* thread is idle then signal the thread
* to process the job
*/
static void *dispatch_if_idle(void *userdata)
{
struct thread_pool *pool;
pool = (struct thread_pool *)userdata;
while(1) {
if((pool->idle_threads > 0) &&
pool->job_queue->length > 0 ) {
pthread_cond_signal(&pool->job_received);
} else usleep(10);
}
return NULL;
}
/* creates a thread and pushes into queue */
static int spawn_thread(struct thread_pool *pool)
{
int status;
pthread_t thread;
status = pthread_create(&thread, NULL, worker_thread, pool);
if(status < 0)
return status;
queue_push_tail(pool->thread_queue, &thread);
return 0;
}
/* pushes job into queue */
int insert_job(struct thread_pool *pool, JobFunction function, void *userdata)
{
struct Job *job;
if(pool == NULL)
return -1;
job = create_job(function, userdata);
if(job == NULL)
return -ENOMEM;
pthread_mutex_lock(&pool->queue_mutex);
queue_push_tail(pool->job_queue, job);
pthread_mutex_unlock(&pool->queue_mutex);
return 0;
}
struct thread_pool *thread_pool_new(int count)
{
int i, status;
pthread_t thread;
struct thread_pool *pool;
pool = (struct thread_pool *)malloc(sizeof(struct thread_pool));
if(pool == NULL) {
#ifdef DEBUG
log_msg(LOG_ERROR, "failed to allocate memory\n");
#endif
errno = ENOMEM;
return NULL;
}
pool->idle_threads = count;
pool->job_queue = queue_new();
pool->thread_queue = queue_new();
pthread_mutex_init(&pool->queue_mutex, NULL);
pthread_cond_init(&pool->job_received, NULL);
status = pthread_create(&thread, NULL, dispatch_if_idle, pool);
if(status < 0) {
#ifdef DEBUG
log_msg(LOG_ERROR, "failed to spawn dispatch thread, stopping\n");
#endif
return NULL;
}
pool->dispatch_thread = thread;
i = 0;
while(i < count)
if (spawn_thread(pool) == 0)
i++;
return pool;
}
int thread_pool_destroy(struct thread_pool *pool)
{
pthread_t *thread;
if(pool == NULL)
return -1;
pthread_cancel(pool->dispatch_thread);
while((thread = queue_pop_head(pool->job_queue)) != NULL) {
pthread_cancel(*thread);
}
queue_destroy(pool->job_queue, free);
queue_destroy(pool->thread_queue, NULL);
pthread_mutex_destroy(&pool->queue_mutex);
pthread_cond_destroy(&pool->job_received);
free(pool);
return 0;
}