2005-04-25 Paul Jakma <paul.jakma@sun.com>
* workqueue.{c,h}: Helper API for setting up and running queues via
background threads.
* command.c: install the 'show workqueues' command
* memtypes.c: Add work queue mtypes, and a rib-queue type for
a zebra rib work queue.
* memtypes.h: Updated to match memtypes.c
* Makefile.am: Add new workqueue files to build.
diff --git a/lib/Makefile.am b/lib/Makefile.am
index 03ca5e1..3763c8b 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -12,7 +12,7 @@
sockunion.c prefix.c thread.c if.c memory.c buffer.c table.c hash.c \
filter.c routemap.c distribute.c stream.c str.c log.c plist.c \
zclient.c sockopt.c smux.c md5.c if_rmap.c keychain.c privs.c \
- sigevent.c pqueue.c jhash.c memtypes.c
+ sigevent.c pqueue.c jhash.c memtypes.c workqueue.c
BUILT_SOURCES = memtypes.h
@@ -25,7 +25,8 @@
memory.h network.h prefix.h routemap.h distribute.h sockunion.h \
str.h stream.h table.h thread.h vector.h version.h vty.h zebra.h \
plist.h zclient.h sockopt.h smux.h md5-gnu.h if_rmap.h keychain.h \
- privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h
+ privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h \
+ privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h workqueue.h
EXTRA_DIST = regex.c regex-gnu.h memtypes.awk
diff --git a/lib/command.c b/lib/command.c
index 6411810..9b5f75f 100644
--- a/lib/command.c
+++ b/lib/command.c
@@ -1,5 +1,5 @@
/*
- $Id: command.c,v 1.46 2005/03/14 20:19:01 paul Exp $
+ $Id: command.c,v 1.47 2005/04/25 16:26:42 paul Exp $
Command interpreter routine for virtual terminal [aka TeletYpe]
Copyright (C) 1997, 98, 99 Kunihiro Ishiguro
@@ -31,6 +31,7 @@
#include "vector.h"
#include "vty.h"
#include "command.h"
+#include "workqueue.h"
/* Command vector which includes some level of command lists. Normally
each daemon maintains each own cmdvec. */
@@ -3578,8 +3579,10 @@
install_element (CONFIG_NODE, &service_terminal_length_cmd);
install_element (CONFIG_NODE, &no_service_terminal_length_cmd);
- install_element(VIEW_NODE, &show_thread_cpu_cmd);
- install_element(ENABLE_NODE, &show_thread_cpu_cmd);
+ install_element (VIEW_NODE, &show_thread_cpu_cmd);
+ install_element (ENABLE_NODE, &show_thread_cpu_cmd);
+ install_element (VIEW_NODE, &show_work_queues_cmd);
+ install_element (ENABLE_NODE, &show_work_queues_cmd);
}
srand(time(NULL));
}
diff --git a/lib/memtypes.c b/lib/memtypes.c
index 7caa42a..7865f54 100644
--- a/lib/memtypes.c
+++ b/lib/memtypes.c
@@ -6,7 +6,7 @@
* The script is sensitive to the format (though not whitespace), see
* the top of memtypes.awk for more details.
*
- * $Id: memtypes.c,v 1.3 2005/04/25 14:02:44 paul Exp $
+ * $Id: memtypes.c,v 1.4 2005/04/25 16:26:43 paul Exp $
*/
#include "zebra.h"
@@ -64,6 +64,9 @@
{ MTYPE_PRIVS, "Privilege information" },
{ MTYPE_ZLOG, "Logging" },
{ MTYPE_ZCLIENT, "Zclient" },
+ { MTYPE_WORK_QUEUE, "Work queue" },
+ { MTYPE_WORK_QUEUE_ITEM, "Work queue item" },
+ { MTYPE_WORK_QUEUE_NAME, "Work queue name string" },
{ -1, NULL },
};
@@ -74,6 +77,7 @@
{ MTYPE_VRF_NAME, "VRF name" },
{ MTYPE_NEXTHOP, "Nexthop" },
{ MTYPE_RIB, "RIB" },
+ { MTYPE_RIB_QUEUE, "RIB process work queue" },
{ MTYPE_STATIC_IPV4, "Static IPv4 route" },
{ MTYPE_STATIC_IPV6, "Static IPv6 route" },
{ -1, NULL },
diff --git a/lib/memtypes.h b/lib/memtypes.h
index 2d843c5..b1ca6f6 100644
--- a/lib/memtypes.h
+++ b/lib/memtypes.h
@@ -56,11 +56,16 @@
MTYPE_PRIVS,
MTYPE_ZLOG,
MTYPE_ZCLIENT,
+ MTYPE_WORK_QUEUE,
+ MTYPE_WORK_QUEUE_ITEM,
+ MTYPE_WORK_QUEUE_NAME,
+ MTYPE_WORK_QUEUE_SPEC,
MTYPE_RTADV_PREFIX,
MTYPE_VRF,
MTYPE_VRF_NAME,
MTYPE_NEXTHOP,
MTYPE_RIB,
+ MTYPE_RIB_QUEUE,
MTYPE_STATIC_IPV4,
MTYPE_STATIC_IPV6,
MTYPE_BGP,
diff --git a/lib/workqueue.c b/lib/workqueue.c
new file mode 100644
index 0000000..0c9592d
--- /dev/null
+++ b/lib/workqueue.c
@@ -0,0 +1,329 @@
+/*
+ * Quagga Work Queue Support.
+ *
+ * Copyright (C) 2005 Sun Microsystems, Inc.
+ *
+ * This file is part of GNU Zebra.
+ *
+ * Quagga is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2, or (at your option) any
+ * later version.
+ *
+ * Quagga is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Quagga; see the file COPYING. If not, write to the Free
+ * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+ * 02111-1307, USA.
+ */
+
+#include <lib/zebra.h>
+#include "thread.h"
+#include "memory.h"
+#include "workqueue.h"
+#include "linklist.h"
+#include "command.h"
+#include "log.h"
+
+/* master list of work_queues */
+static struct list work_queues;
+
+#define WORK_QUEUE_MIN_GRANULARITY 1
+
+static struct work_queue_item *
+work_queue_item_new (struct work_queue *wq)
+{
+ struct work_queue_item *item;
+ assert (wq);
+
+ item = XCALLOC (MTYPE_WORK_QUEUE_ITEM,
+ sizeof (struct work_queue_item));
+
+ return item;
+}
+
+static void
+work_queue_item_free (struct work_queue_item *item)
+{
+ XFREE (MTYPE_WORK_QUEUE_ITEM, item);
+ return;
+}
+
+/* create new work queue */
+struct work_queue *
+work_queue_new (struct thread_master *m, const char *queue_name)
+{
+ struct work_queue *new;
+
+ new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
+
+ if (new == NULL)
+ return new;
+
+ new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
+ new->master = m;
+
+ if ( (new->items = list_new ()) == NULL)
+ {
+ if (new->items)
+ list_free (new->items);
+
+ XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
+ XFREE (MTYPE_WORK_QUEUE, new);
+
+ return NULL;
+ }
+
+ new->items->del = (void (*)(void *)) work_queue_item_free;
+
+ listnode_add (&work_queues, new);
+
+ new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
+
+ return new;
+}
+
+void
+work_queue_free (struct work_queue *wq)
+{
+ /* list_delete frees items via callback */
+ list_delete (wq->items);
+ listnode_delete (&work_queues, wq);
+
+ XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
+ XFREE (MTYPE_WORK_QUEUE, wq);
+ return;
+}
+
+void
+work_queue_add (struct work_queue *wq, void *data)
+{
+ struct work_queue_item *item;
+
+ assert (wq);
+
+ if (!(item = work_queue_item_new (wq)))
+ {
+ zlog_err ("%s: unable to get new queue item", __func__);
+ return;
+ }
+
+ item->data = data;
+ listnode_add (wq->items, item);
+
+ /* if thread isnt already waiting, add one */
+ if (wq->thread == NULL)
+ wq->thread = thread_add_background (wq->master, work_queue_run,
+ wq, wq->spec.hold);
+
+ /* XXX: what if we didnt get a thread? try again? */
+
+ return;
+}
+
+static void
+work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
+{
+ struct work_queue_item *item = listgetdata (ln);
+
+ assert (item && item->data);
+
+ /* call private data deletion callback if needed */
+ if (wq->spec.del_item_data)
+ wq->spec.del_item_data (item->data);
+
+ list_delete_node (wq->items, ln);
+ work_queue_item_free (item);
+
+ return;
+}
+
+static void
+work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
+{
+ LISTNODE_DETACH (wq->items, ln);
+ LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
+}
+
+DEFUN(show_work_queues,
+ show_work_queues_cmd,
+ "show work-queues",
+ SHOW_STR
+ "Work Queue information\n")
+{
+ struct listnode *node;
+ struct work_queue *wq;
+ struct timeval tvnow;
+
+ gettimeofday (&tvnow, NULL);
+
+ vty_out (vty,
+ "%8s %11s %8s %21s%s",
+ "List","(ms) ","Q. Runs","Cycle Counts ",
+ VTY_NEWLINE);
+ vty_out (vty,
+ "%8s %5s %5s %8s %7s %6s %6s %s%s",
+ "Items",
+ "Delay","Hold",
+ "Total",
+ "Best","Gran.","Avg.",
+ "Name",
+ VTY_NEWLINE);
+
+ for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
+ {
+ vty_out (vty,"%8d %5d %5d %8ld %7d %6d %6u %s%s",
+ listcount (wq->items),
+ wq->spec.delay, wq->spec.hold,
+ wq->runs,
+ wq->cycles.best, wq->cycles.granularity,
+ (unsigned int)(wq->cycles.total / wq->runs),
+ wq->name,
+ VTY_NEWLINE);
+ }
+
+ return CMD_SUCCESS;
+}
+
+/* timer thread to process a work queue
+ * will reschedule itself if required,
+ * otherwise work_queue_item_add
+ */
+int
+work_queue_run (struct thread *thread)
+{
+ struct work_queue *wq;
+ struct work_queue_item *item;
+ wq_item_status ret;
+ unsigned int cycles = 0;
+ struct listnode *node, *nnode;
+ char yielded = 0;
+
+ wq = THREAD_ARG (thread);
+ wq->thread = NULL;
+
+ assert (wq && wq->items);
+
+ /* calculate cycle granularity:
+ * list iteration == 1 cycle
+ * granularity == # cycles between checks whether we should yield.
+ *
+ * granularity should be > 0, and can increase slowly after each run to
+ * provide some hysteris, but not past cycles.best or 2*cycles.
+ *
+ * Best: starts low, can only increase
+ *
+ * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased if we run to end of time
+ * slot, can increase otherwise by a small factor.
+ *
+ * We could use just the average and save some work, however we want to be
+ * able to adjust quickly to CPU pressure. Average wont shift much if
+ * daemon has been running a long time.
+ */
+ if (wq->cycles.granularity == 0)
+ wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
+
+ for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
+ {
+ assert (item && item->data);
+
+ /* dont run items which are past their allowed retries */
+ if (item->retry_count >= wq->spec.max_retries)
+ {
+ /* run error handler, if any */
+ if (wq->spec.errorfunc)
+ wq->spec.errorfunc (wq, item->data);
+ work_queue_item_remove (wq, node);
+ continue;
+ }
+
+ /* run and take care of items that want to be retried immediately */
+ do
+ {
+ ret = wq->spec.workfunc (item->data);
+ item->retry_count++;
+ }
+ while ((ret == WQ_RETRY_NOW)
+ && (item->retry_count < wq->spec.max_retries));
+
+ switch (ret)
+ {
+ case WQ_RETRY_LATER:
+ {
+ item->retry_count++;
+ goto stats;
+ }
+ case WQ_REQUEUE:
+ {
+ item->retry_count++;
+ work_queue_item_requeue (wq, node);
+ break;
+ }
+ case WQ_RETRY_NOW:
+ case WQ_ERROR:
+ {
+ if (wq->spec.errorfunc)
+ wq->spec.errorfunc (wq, item);
+ }
+ /* fall through here is deliberate */
+ case WQ_SUCCESS:
+ default:
+ {
+ work_queue_item_remove (wq, node);
+ break;
+ }
+ }
+
+ /* completed cycle */
+ cycles++;
+
+ /* test if we should yield */
+ if ( !(cycles % wq->cycles.granularity)
+ && thread_should_yield (thread))
+ {
+ yielded = 1;
+ goto stats;
+ }
+ }
+
+stats:
+
+#define WQ_HYSTERIS_FACTOR 2
+
+ /* we yielded, check whether granularity should be reduced */
+ if (yielded && (cycles < wq->cycles.granularity))
+ {
+ wq->cycles.granularity = ((cycles > 0) ? cycles
+ : WORK_QUEUE_MIN_GRANULARITY);
+ }
+
+ if (cycles > (wq->cycles.granularity))
+ {
+ if (cycles > wq->cycles.best)
+ wq->cycles.best = cycles;
+
+ /* along with yielded check, provides hysteris for granularity */
+ if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR))
+ wq->cycles.granularity += WQ_HYSTERIS_FACTOR;
+ }
+#undef WQ_HYSTERIS_FACTOR
+
+ wq->runs++;
+ wq->cycles.total += cycles;
+
+#if 0
+ printf ("%s: cycles %d, new: best %d, worst %d\n",
+ __func__, cycles, wq->cycles.best, wq->cycles.granularity);
+#endif
+
+ /* Is the queue done yet? */
+ if (listcount (wq->items) > 0)
+ wq->thread = thread_add_background (wq->master, work_queue_run, wq,
+ wq->spec.delay);
+
+ return 0;
+}
diff --git a/lib/workqueue.h b/lib/workqueue.h
new file mode 100644
index 0000000..45fffe7
--- /dev/null
+++ b/lib/workqueue.h
@@ -0,0 +1,91 @@
+/*
+ * Quagga Work Queues.
+ *
+ * Copyright (C) 2005 Sun Microsystems, Inc.
+ *
+ * This file is part of Quagga.
+ *
+ * Quagga is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2, or (at your option) any
+ * later version.
+ *
+ * Quagga is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Quagga; see the file COPYING. If not, write to the Free
+ * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+ * 02111-1307, USA.
+ */
+
+#ifndef _QUAGGA_WORK_QUEUE_H
+#define _QUAGGA_WORK_QUEUE_H
+
+/* Work queue default hold and cycle times - millisec */
+#define WORK_QUEUE_DEFAULT_HOLD 50 /* hold time for initial run of a queue */
+#define WORK_QUEUE_DEFAULT_DELAY 10 /* minimum delay between queue runs */
+
+/* action value, for use by item processor and item error handlers */
+typedef enum
+{
+ WQ_SUCCESS = 0,
+ WQ_ERROR, /* Error, run error handler if provided */
+ WQ_RETRY_NOW, /* retry immediately */
+ WQ_RETRY_LATER, /* retry later, cease processing work queue */
+ WQ_REQUEUE /* requeue item, continue processing work queue */
+} wq_item_status;
+
+/* A single work queue item, unsurprisingly */
+struct work_queue_item
+{
+ void *data; /* opaque data */
+ unsigned short retry_count; /* number of times retried */
+};
+
+struct work_queue
+{
+ struct thread_master *master; /* thread master */
+ struct thread *thread; /* thread, if one is active */
+ char *name; /* work queue name */
+
+ /* specification for this work queue */
+ struct {
+ /* work function to process items with */
+ wq_item_status (*workfunc) (void *);
+
+ /* error handling function, optional */
+ void (*errorfunc) (struct work_queue *, struct work_queue_item *);
+
+ /* callback to delete user specific item data */
+ void (*del_item_data) (void *);
+
+ /* max number of retries to make for item that errors */
+ unsigned int max_retries;
+
+ unsigned int hold; /* hold time for first run, in ms */
+ unsigned int delay; /* min delay between queue runs, in ms */
+ } spec;
+
+ /* remaining fields should be opaque to users */
+ struct list *items; /* queue item list */
+ unsigned long runs; /* runs count */
+
+ struct {
+ unsigned int best;
+ unsigned int granularity;
+ unsigned long total;
+ } cycles; /* cycle counts */
+};
+
+/* User API */
+struct work_queue *work_queue_new (struct thread_master *, const char *);
+void work_queue_free (struct work_queue *);
+void work_queue_add (struct work_queue *, void *);
+
+/* Helpers, exported for thread.c and command.c */
+int work_queue_run (struct thread *);
+extern struct cmd_element show_work_queues_cmd;
+#endif /* _QUAGGA_WORK_QUEUE_H */