lib: Replace lists with arrays to store read and write threads
With arrays, a thread corresponding to given fd is looked up in constant time
versus the linear time taken for list traversals.
Signed-off-by: Denil Vira <denil@cumulusnetworks.com>
Signed-off-by: Donald Sharp <sharpd@cumulusnetworks.com>
diff --git a/lib/thread.c b/lib/thread.c
index 5e40261..6c2b0b0 100644
--- a/lib/thread.c
+++ b/lib/thread.c
@@ -22,6 +22,7 @@
/* #define DEBUG */
#include <zebra.h>
+#include <sys/resource.h>
#include "thread.h"
#include "memory.h"
@@ -525,6 +526,9 @@
thread_master_create ()
{
struct thread_master *rv;
+ struct rlimit limit;
+
+ getrlimit(RLIMIT_NOFILE, &limit);
if (cpu_record == NULL)
cpu_record
@@ -532,6 +536,26 @@
(int (*) (const void *, const void *))cpu_record_hash_cmp);
rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master));
+ if (rv == NULL)
+ {
+ return NULL;
+ }
+
+ rv->fd_limit = (int)limit.rlim_cur;
+ rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
+ if (rv->read == NULL)
+ {
+ XFREE (MTYPE_THREAD_MASTER, rv);
+ return NULL;
+ }
+
+ rv->write = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit);
+ if (rv->write == NULL)
+ {
+ XFREE (MTYPE_THREAD, rv->read);
+ XFREE (MTYPE_THREAD_MASTER, rv);
+ return NULL;
+ }
/* Initialize the timer queues */
rv->timer = pqueue_create();
@@ -573,6 +597,18 @@
return thread;
}
+static void
+thread_delete_fd (struct thread **thread_array, struct thread *thread)
+{
+ thread_array[thread->u.fd] = NULL;
+}
+
+static void
+thread_add_fd (struct thread **thread_array, struct thread *thread)
+{
+ thread_array[thread->u.fd] = thread;
+}
+
/* Move thread to unuse list. */
static void
thread_add_unuse (struct thread_master *m, struct thread *thread)
@@ -601,6 +637,25 @@
}
static void
+thread_array_free (struct thread_master *m, struct thread **thread_array)
+{
+ struct thread *t;
+ int index;
+
+ for (index = 0; index < m->fd_limit; ++index)
+ {
+ t = thread_array[index];
+ if (t)
+ {
+ thread_array[index] = NULL;
+ XFREE (MTYPE_THREAD, t);
+ m->alloc--;
+ }
+ }
+ XFREE (MTYPE_THREAD, thread_array);
+}
+
+static void
thread_queue_free (struct thread_master *m, struct pqueue *queue)
{
int i;
@@ -616,8 +671,8 @@
void
thread_master_free (struct thread_master *m)
{
- thread_list_free (m, &m->read);
- thread_list_free (m, &m->write);
+ thread_array_free (m, m->read);
+ thread_array_free (m, m->write);
thread_queue_free (m, m->timer);
thread_list_free (m, &m->event);
thread_list_free (m, &m->ready);
@@ -718,7 +773,7 @@
thread = thread_get (m, THREAD_READ, func, arg, debugargpass);
FD_SET (fd, &m->readfd);
thread->u.fd = fd;
- thread_list_add (&m->read, thread);
+ thread_add_fd (m->read, thread);
return thread;
}
@@ -742,7 +797,7 @@
thread = thread_get (m, THREAD_WRITE, func, arg, debugargpass);
FD_SET (fd, &m->writefd);
thread->u.fd = fd;
- thread_list_add (&m->write, thread);
+ thread_add_fd (m->write, thread);
return thread;
}
@@ -863,18 +918,19 @@
{
struct thread_list *list = NULL;
struct pqueue *queue = NULL;
+ struct thread **thread_array = NULL;
switch (thread->type)
{
case THREAD_READ:
assert (FD_ISSET (thread->u.fd, &thread->master->readfd));
FD_CLR (thread->u.fd, &thread->master->readfd);
- list = &thread->master->read;
+ thread_array = thread->master->read;
break;
case THREAD_WRITE:
assert (FD_ISSET (thread->u.fd, &thread->master->writefd));
FD_CLR (thread->u.fd, &thread->master->writefd);
- list = &thread->master->write;
+ thread_array = thread->master->write;
break;
case THREAD_TIMER:
queue = thread->master->timer;
@@ -903,9 +959,13 @@
{
thread_list_delete (list, thread);
}
+ else if (thread_array)
+ {
+ thread_delete_fd (thread_array, thread);
+ }
else
{
- assert(!"Thread should be either in queue or list!");
+ assert(!"Thread should be either in queue or list or array!");
}
thread->type = THREAD_UNUSED;
@@ -979,29 +1039,27 @@
}
static int
-thread_process_fd (struct thread_list *list, fd_set *fdset, fd_set *mfdset)
+thread_process_fd (struct thread **thread_array, fd_set *fdset, fd_set *mfdset, int num, int fd_limit)
{
struct thread *thread;
- struct thread *next;
- int ready = 0;
-
- assert (list);
-
- for (thread = list->head; thread; thread = next)
- {
- next = thread->next;
+ int ready = 0, index;
- if (FD_ISSET (THREAD_FD (thread), fdset))
+ assert (thread_array);
+
+ for (index = 0; index < fd_limit && ready < num; ++index)
+ {
+ thread = thread_array[index];
+ if (thread && FD_ISSET (THREAD_FD (thread), fdset))
{
assert (FD_ISSET (THREAD_FD (thread), mfdset));
FD_CLR(THREAD_FD (thread), mfdset);
- thread_list_delete (list, thread);
+ thread_delete_fd (thread_array, thread);
thread_list_add (&thread->master->ready, thread);
thread->type = THREAD_READY;
ready++;
}
}
- return ready;
+ return num - ready;
}
/* Add all timers that have popped to the ready list. */
@@ -1154,10 +1212,10 @@
/* Got IO, process it */
if (num > 0)
{
- /* Normal priority read thead. */
- thread_process_fd (&m->read, &readfd, &m->readfd);
- /* Write thead. */
- thread_process_fd (&m->write, &writefd, &m->writefd);
+ /* Normal priority read thread. */
+ num = thread_process_fd (m->read, &readfd, &m->readfd, num, m->fd_limit);
+ /* Write thread. */
+ num = thread_process_fd (m->write, &writefd, &m->writefd, num, m->fd_limit);
}
#if 0