lib: use heap to manage timers
Signed-off-by: Christian Franke <chris@opensourcerouting.org>
Signed-off-by: David Lamparter <equinox@opensourcerouting.org>
diff --git a/lib/pqueue.c b/lib/pqueue.c
index 12a779f..69ab8e6 100644
--- a/lib/pqueue.c
+++ b/lib/pqueue.c
@@ -168,3 +168,20 @@
trickle_down (0, queue);
return data;
}
+
+void
+pqueue_remove_at (int index, struct pqueue *queue)
+{
+ queue->array[index] = queue->array[--queue->size];
+
+ if (index > 0
+ && (*queue->cmp) (queue->array[index],
+ queue->array[PARENT_OF(index)]) < 0)
+ {
+ trickle_up (index, queue);
+ }
+ else
+ {
+ trickle_down (index, queue);
+ }
+}
diff --git a/lib/pqueue.h b/lib/pqueue.h
index be37f98..8bb6961 100644
--- a/lib/pqueue.h
+++ b/lib/pqueue.h
@@ -38,6 +38,7 @@
extern void pqueue_enqueue (void *data, struct pqueue *queue);
extern void *pqueue_dequeue (struct pqueue *queue);
+extern void pqueue_remove_at (int index, struct pqueue *queue);
extern void trickle_down (int index, struct pqueue *queue);
extern void trickle_up (int index, struct pqueue *queue);
diff --git a/lib/thread.c b/lib/thread.c
index ddb95c0..e2a37b1 100644
--- a/lib/thread.c
+++ b/lib/thread.c
@@ -27,6 +27,7 @@
#include "memory.h"
#include "log.h"
#include "hash.h"
+#include "pqueue.h"
#include "command.h"
#include "sigevent.h"
@@ -496,17 +497,49 @@
return CMD_SUCCESS;
}
+static int
+thread_timer_cmp(void *a, void *b)
+{
+ struct thread *thread_a = a;
+ struct thread *thread_b = b;
+
+ long cmp = timeval_cmp(thread_a->u.sands, thread_b->u.sands);
+
+ if (cmp < 0)
+ return -1;
+ if (cmp > 0)
+ return 1;
+ return 0;
+}
+
+static void
+thread_timer_update(void *node, int actual_position)
+{
+ struct thread *thread = node;
+
+ thread->index = actual_position;
+}
+
/* Allocate new thread master. */
struct thread_master *
thread_master_create ()
{
+ struct thread_master *rv;
+
if (cpu_record == NULL)
cpu_record
= hash_create ((unsigned int (*) (void *))cpu_record_hash_key,
(int (*) (const void *, const void *))cpu_record_hash_cmp);
-
- return (struct thread_master *) XCALLOC (MTYPE_THREAD_MASTER,
- sizeof (struct thread_master));
+
+ rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
+
+ /* Initialize the timer queues */
+ rv->timer = pqueue_create();
+ rv->background = pqueue_create();
+ rv->timer->cmp = rv->background->cmp = thread_timer_cmp;
+ rv->timer->update = rv->background->update = thread_timer_update;
+
+ return rv;
}
/* Add a new thread to the list. */
@@ -523,22 +556,6 @@
list->count++;
}
-/* Add a new thread just before the point. */
-static void
-thread_list_add_before (struct thread_list *list,
- struct thread *point,
- struct thread *thread)
-{
- thread->next = point;
- thread->prev = point->prev;
- if (point->prev)
- point->prev->next = thread;
- else
- list->head = thread;
- point->prev = thread;
- list->count++;
-}
-
/* Delete a thread from the list. */
static struct thread *
thread_list_delete (struct thread_list *list, struct thread *thread)
@@ -584,17 +601,29 @@
}
}
+static void
+thread_queue_free (struct thread_master *m, struct pqueue *queue)
+{
+ int i;
+
+ for (i = 0; i < queue->size; i++)
+ XFREE(MTYPE_THREAD, queue->array[i]);
+
+ m->alloc -= queue->size;
+ pqueue_delete(queue);
+}
+
/* Stop thread scheduler. */
void
thread_master_free (struct thread_master *m)
{
thread_list_free (m, &m->read);
thread_list_free (m, &m->write);
- thread_list_free (m, &m->timer);
+ thread_queue_free (m, m->timer);
thread_list_free (m, &m->event);
thread_list_free (m, &m->ready);
thread_list_free (m, &m->unuse);
- thread_list_free (m, &m->background);
+ thread_queue_free (m, m->background);
XFREE (MTYPE_THREAD_MASTER, m);
@@ -676,7 +705,8 @@
thread->master = m;
thread->func = func;
thread->arg = arg;
-
+ thread->index = -1;
+
strip_funcname (thread->funcname, funcname);
return thread;
@@ -737,16 +767,15 @@
const char* funcname)
{
struct thread *thread;
- struct thread_list *list;
+ struct pqueue *queue;
struct timeval alarm_time;
- struct thread *tt;
assert (m != NULL);
assert (type == THREAD_TIMER || type == THREAD_BACKGROUND);
assert (time_relative);
- list = ((type == THREAD_TIMER) ? &m->timer : &m->background);
+ queue = ((type == THREAD_TIMER) ? m->timer : m->background);
thread = thread_get (m, type, func, arg, funcname);
/* Do we need jitter here? */
@@ -755,16 +784,7 @@
alarm_time.tv_usec = relative_time.tv_usec + time_relative->tv_usec;
thread->u.sands = timeval_adjust(alarm_time);
- /* Sort by timeval. */
- for (tt = list->head; tt; tt = tt->next)
- if (timeval_cmp (thread->u.sands, tt->u.sands) <= 0)
- break;
-
- if (tt)
- thread_list_add_before (list, tt, thread);
- else
- thread_list_add (list, thread);
-
+ pqueue_enqueue(thread, queue);
return thread;
}
@@ -849,7 +869,8 @@
void
thread_cancel (struct thread *thread)
{
- struct thread_list *list;
+ struct thread_list *list = NULL;
+ struct pqueue *queue = NULL;
switch (thread->type)
{
@@ -864,7 +885,7 @@
list = &thread->master->write;
break;
case THREAD_TIMER:
- list = &thread->master->timer;
+ queue = thread->master->timer;
break;
case THREAD_EVENT:
list = &thread->master->event;
@@ -873,13 +894,28 @@
list = &thread->master->ready;
break;
case THREAD_BACKGROUND:
- list = &thread->master->background;
+ queue = thread->master->background;
break;
default:
return;
break;
}
- thread_list_delete (list, thread);
+
+ if (queue)
+ {
+ assert(thread->index >= 0);
+ assert(thread == queue->array[thread->index]);
+ pqueue_remove_at(thread->index, queue);
+ }
+ else if (list)
+ {
+ thread_list_delete (list, thread);
+ }
+ else
+ {
+ assert(!"Thread should be either in queue or list!");
+ }
+
thread->type = THREAD_UNUSED;
thread_add_unuse (thread->master, thread);
}
@@ -929,11 +965,12 @@
}
static struct timeval *
-thread_timer_wait (struct thread_list *tlist, struct timeval *timer_val)
+thread_timer_wait (struct pqueue *queue, struct timeval *timer_val)
{
- if (!thread_empty (tlist))
+ if (queue->size)
{
- *timer_val = timeval_subtract (tlist->head->u.sands, relative_time);
+ struct thread *next_timer = queue->array[0];
+ *timer_val = timeval_subtract (next_timer->u.sands, relative_time);
return timer_val;
}
return NULL;
@@ -977,18 +1014,17 @@
/* Add all timers that have popped to the ready list. */
static unsigned int
-thread_timer_process (struct thread_list *list, struct timeval *timenow)
+thread_timer_process (struct pqueue *queue, struct timeval *timenow)
{
struct thread *thread;
- struct thread *next;
unsigned int ready = 0;
- for (thread = list->head; thread; thread = next)
+ while (queue->size)
{
- next = thread->next;
+ thread = queue->array[0];
if (timeval_cmp (*timenow, thread->u.sands) < 0)
return ready;
- thread_list_delete (list, thread);
+ pqueue_dequeue(queue);
thread->type = THREAD_READY;
thread_list_add (&thread->master->ready, thread);
ready++;
@@ -1064,8 +1100,8 @@
if (m->ready.count == 0)
{
quagga_get_relative (NULL);
- timer_wait = thread_timer_wait (&m->timer, &timer_val);
- timer_wait_bg = thread_timer_wait (&m->background, &timer_val_bg);
+ timer_wait = thread_timer_wait (m->timer, &timer_val);
+ timer_wait_bg = thread_timer_wait (m->background, &timer_val_bg);
if (timer_wait_bg &&
(!timer_wait || (timeval_cmp (*timer_wait, *timer_wait_bg) > 0)))
@@ -1121,7 +1157,7 @@
priority than I/O threads, so let's push them onto the ready
list in front of the I/O threads. */
quagga_get_relative (NULL);
- thread_timer_process (&m->timer, &relative_time);
+ thread_timer_process (m->timer, &relative_time);
/* Got IO, process it */
if (num > 0)
@@ -1142,7 +1178,7 @@
#endif
/* Background timer/events, lowest priority */
- thread_timer_process (&m->background, &relative_time);
+ thread_timer_process (m->background, &relative_time);
if ((thread = thread_trim_head (&m->ready)) != NULL)
return thread_run (m, thread, fetch);
diff --git a/lib/thread.h b/lib/thread.h
index 67902cf..dbf5f25 100644
--- a/lib/thread.h
+++ b/lib/thread.h
@@ -44,16 +44,18 @@
int count;
};
+struct pqueue;
+
/* Master of the theads. */
struct thread_master
{
struct thread_list read;
struct thread_list write;
- struct thread_list timer;
+ struct pqueue *timer;
struct thread_list event;
struct thread_list ready;
struct thread_list unuse;
- struct thread_list background;
+ struct pqueue *background;
fd_set readfd;
fd_set writefd;
fd_set exceptfd;
@@ -80,6 +82,7 @@
int fd; /* file descriptor in case of read/write. */
struct timeval sands; /* rest of time sands value. */
} u;
+ int index; /* used for timers to store position in queue */
struct timeval real;
struct cpu_thread_history *hist; /* cache pointer to cpu_history */
char funcname[FUNCNAME_LEN];