2005-04-25 Paul Jakma <paul.jakma@sun.com>

	* workqueue.{c,h}: Helper API for setting up and running queues via
	  background threads.
	* command.c: install the 'show workqueues' command
	* memtypes.c: Add work queue mtypes, and a rib-queue type for
	  a zebra rib work queue.
	* memtypes.h: Updated to match memtypes.c
	* Makefile.am: Add new workqueue files to build.
diff --git a/lib/Makefile.am b/lib/Makefile.am
index 03ca5e1..3763c8b 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -12,7 +12,7 @@
 	sockunion.c prefix.c thread.c if.c memory.c buffer.c table.c hash.c \
 	filter.c routemap.c distribute.c stream.c str.c log.c plist.c \
 	zclient.c sockopt.c smux.c md5.c if_rmap.c keychain.c privs.c \
-	sigevent.c pqueue.c jhash.c memtypes.c
+	sigevent.c pqueue.c jhash.c memtypes.c workqueue.c
 
 BUILT_SOURCES = memtypes.h
 
@@ -25,7 +25,8 @@
 	memory.h network.h prefix.h routemap.h distribute.h sockunion.h \
 	str.h stream.h table.h thread.h vector.h version.h vty.h zebra.h \
 	plist.h zclient.h sockopt.h smux.h md5-gnu.h if_rmap.h keychain.h \
-	privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h
+	privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h \
+	privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h workqueue.h
 
 EXTRA_DIST = regex.c regex-gnu.h memtypes.awk
 
diff --git a/lib/command.c b/lib/command.c
index 6411810..9b5f75f 100644
--- a/lib/command.c
+++ b/lib/command.c
@@ -1,5 +1,5 @@
 /*
-   $Id: command.c,v 1.46 2005/03/14 20:19:01 paul Exp $
+   $Id: command.c,v 1.47 2005/04/25 16:26:42 paul Exp $
  
    Command interpreter routine for virtual terminal [aka TeletYpe]
    Copyright (C) 1997, 98, 99 Kunihiro Ishiguro
@@ -31,6 +31,7 @@
 #include "vector.h"
 #include "vty.h"
 #include "command.h"
+#include "workqueue.h"
 
 /* Command vector which includes some level of command lists. Normally
    each daemon maintains each own cmdvec. */
@@ -3578,8 +3579,10 @@
       install_element (CONFIG_NODE, &service_terminal_length_cmd);
       install_element (CONFIG_NODE, &no_service_terminal_length_cmd);
 
-      install_element(VIEW_NODE, &show_thread_cpu_cmd);
-      install_element(ENABLE_NODE, &show_thread_cpu_cmd);
+      install_element (VIEW_NODE, &show_thread_cpu_cmd);
+      install_element (ENABLE_NODE, &show_thread_cpu_cmd);
+      install_element (VIEW_NODE, &show_work_queues_cmd);
+      install_element (ENABLE_NODE, &show_work_queues_cmd);
     }
   srand(time(NULL));
 }
diff --git a/lib/memtypes.c b/lib/memtypes.c
index 7caa42a..7865f54 100644
--- a/lib/memtypes.c
+++ b/lib/memtypes.c
@@ -6,7 +6,7 @@
  * The script is sensitive to the format (though not whitespace), see
  * the top of memtypes.awk for more details.
  *
- * $Id: memtypes.c,v 1.3 2005/04/25 14:02:44 paul Exp $
+ * $Id: memtypes.c,v 1.4 2005/04/25 16:26:43 paul Exp $
  */
 
 #include "zebra.h"
@@ -64,6 +64,9 @@
   { MTYPE_PRIVS,		"Privilege information"		},
   { MTYPE_ZLOG,			"Logging"			},
   { MTYPE_ZCLIENT,		"Zclient"			},
+  { MTYPE_WORK_QUEUE,		"Work queue"			},
+  { MTYPE_WORK_QUEUE_ITEM,	"Work queue item"		},
+  { MTYPE_WORK_QUEUE_NAME,	"Work queue name string"	},
   { -1, NULL },
 };
 
@@ -74,6 +77,7 @@
   { MTYPE_VRF_NAME,		"VRF name"			},
   { MTYPE_NEXTHOP,		"Nexthop"			},
   { MTYPE_RIB,			"RIB"				},
+  { MTYPE_RIB_QUEUE,		"RIB process work queue"	},
   { MTYPE_STATIC_IPV4,		"Static IPv4 route"		},
   { MTYPE_STATIC_IPV6,		"Static IPv6 route"		},
   { -1, NULL },
diff --git a/lib/memtypes.h b/lib/memtypes.h
index 2d843c5..b1ca6f6 100644
--- a/lib/memtypes.h
+++ b/lib/memtypes.h
@@ -56,11 +56,16 @@
   MTYPE_PRIVS,
   MTYPE_ZLOG,
   MTYPE_ZCLIENT,
+  MTYPE_WORK_QUEUE,
+  MTYPE_WORK_QUEUE_ITEM,
+  MTYPE_WORK_QUEUE_NAME,
+  MTYPE_WORK_QUEUE_SPEC,
   MTYPE_RTADV_PREFIX,
   MTYPE_VRF,
   MTYPE_VRF_NAME,
   MTYPE_NEXTHOP,
   MTYPE_RIB,
+  MTYPE_RIB_QUEUE,
   MTYPE_STATIC_IPV4,
   MTYPE_STATIC_IPV6,
   MTYPE_BGP,
diff --git a/lib/workqueue.c b/lib/workqueue.c
new file mode 100644
index 0000000..0c9592d
--- /dev/null
+++ b/lib/workqueue.c
@@ -0,0 +1,329 @@
+/* 
+ * Quagga Work Queue Support.
+ *
+ * Copyright (C) 2005 Sun Microsystems, Inc.
+ *
+ * This file is part of GNU Zebra.
+ *
+ * Quagga is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2, or (at your option) any
+ * later version.
+ *
+ * Quagga is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Quagga; see the file COPYING.  If not, write to the Free
+ * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+ * 02111-1307, USA.  
+ */
+
+#include <lib/zebra.h>
+#include "thread.h"
+#include "memory.h"
+#include "workqueue.h"
+#include "linklist.h"
+#include "command.h"
+#include "log.h"
+
+/* master list of work_queues */
+static struct list work_queues;
+
+#define WORK_QUEUE_MIN_GRANULARITY 1
+
+static struct work_queue_item *
+work_queue_item_new (struct work_queue *wq)
+{
+  struct work_queue_item *item;
+  assert (wq);
+
+  item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, 
+                  sizeof (struct work_queue_item));
+  
+  return item;
+}
+
+static void
+work_queue_item_free (struct work_queue_item *item)
+{
+  XFREE (MTYPE_WORK_QUEUE_ITEM, item);
+  return;
+}
+
+/* create new work queue */
+struct work_queue *
+work_queue_new (struct thread_master *m, const char *queue_name)
+{
+  struct work_queue *new;
+  
+  new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
+
+  if (new == NULL)
+    return new;
+  
+  new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
+  new->master = m;
+  
+  if ( (new->items = list_new ()) == NULL)
+    {
+      if (new->items)
+        list_free (new->items);
+      
+      XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
+      XFREE (MTYPE_WORK_QUEUE, new);
+      
+      return NULL;
+    }
+  
+  new->items->del = (void (*)(void *)) work_queue_item_free;  
+  
+  listnode_add (&work_queues, new);
+  
+  new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
+  
+  return new;
+}
+
+void
+work_queue_free (struct work_queue *wq)
+{
+  /* list_delete frees items via callback */
+  list_delete (wq->items);
+  listnode_delete (&work_queues, wq);
+  
+  XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
+  XFREE (MTYPE_WORK_QUEUE, wq);
+  return;
+}
+
+void
+work_queue_add (struct work_queue *wq, void *data)
+{
+  struct work_queue_item *item;
+  
+  assert (wq);
+
+  if (!(item = work_queue_item_new (wq)))
+    {
+      zlog_err ("%s: unable to get new queue item", __func__);
+      return;
+    }
+  
+  item->data = data;
+  listnode_add (wq->items, item);
+  
+  /* if thread isnt already waiting, add one */
+  if (wq->thread == NULL)
+    wq->thread = thread_add_background (wq->master, work_queue_run, 
+                                        wq, wq->spec.hold);
+
+  /* XXX: what if we didnt get a thread? try again? */
+  
+  return;
+}
+
+static void
+work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
+{
+  struct work_queue_item *item = listgetdata (ln);
+
+  assert (item && item->data);
+
+  /* call private data deletion callback if needed */  
+  if (wq->spec.del_item_data)
+    wq->spec.del_item_data (item->data);
+
+  list_delete_node (wq->items, ln);
+  work_queue_item_free (item);
+  
+  return;
+}
+
+static void
+work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
+{
+  LISTNODE_DETACH (wq->items, ln);
+  LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
+}
+
+DEFUN(show_work_queues,
+      show_work_queues_cmd,
+      "show work-queues",
+      SHOW_STR
+      "Work Queue information\n")
+{
+  struct listnode *node;
+  struct work_queue *wq;
+  struct timeval tvnow;
+  
+  gettimeofday (&tvnow, NULL);
+  
+  vty_out (vty, 
+           "%8s  %11s  %8s %21s%s",
+           "List","(ms)    ","Q. Runs","Cycle Counts   ",
+           VTY_NEWLINE);
+  vty_out (vty,
+           "%8s  %5s %5s  %8s  %7s %6s %6s %s%s",
+           "Items",
+           "Delay","Hold",
+           "Total",
+           "Best","Gran.","Avg.", 
+           "Name", 
+           VTY_NEWLINE);
+ 
+  for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
+    {
+      vty_out (vty,"%8d  %5d %5d  %8ld  %7d %6d %6u  %s%s",
+               listcount (wq->items),
+               wq->spec.delay, wq->spec.hold,
+               wq->runs,
+               wq->cycles.best, wq->cycles.granularity, 
+                 (unsigned int)(wq->cycles.total / wq->runs),
+               wq->name,
+               VTY_NEWLINE);
+    }
+    
+  return CMD_SUCCESS;
+}
+
+/* timer thread to process a work queue
+ * will reschedule itself if required,
+ * otherwise work_queue_item_add 
+ */
+int
+work_queue_run (struct thread *thread)
+{
+  struct work_queue *wq;
+  struct work_queue_item *item;
+  wq_item_status ret;
+  unsigned int cycles = 0;
+  struct listnode *node, *nnode;
+  char yielded = 0;
+
+  wq = THREAD_ARG (thread);
+  wq->thread = NULL;
+
+  assert (wq && wq->items);
+
+  /* calculate cycle granularity:
+   * list iteration == 1 cycle
+   * granularity == # cycles between checks whether we should yield.
+   *
+   * granularity should be > 0, and can increase slowly after each run to
+   * provide some hysteris, but not past cycles.best or 2*cycles.
+   *
+   * Best: starts low, can only increase
+   *
+   * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased if we run to end of time
+   *              slot, can increase otherwise by a small factor.
+   *
+   * We could use just the average and save some work, however we want to be
+   * able to adjust quickly to CPU pressure. Average wont shift much if
+   * daemon has been running a long time.
+   */
+   if (wq->cycles.granularity == 0)
+     wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
+
+  for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
+  {
+    assert (item && item->data);
+    
+    /* dont run items which are past their allowed retries */
+    if (item->retry_count >= wq->spec.max_retries)
+      {
+        /* run error handler, if any */
+	if (wq->spec.errorfunc)
+	  wq->spec.errorfunc (wq, item->data);
+	work_queue_item_remove (wq, node);
+	continue;
+      }
+
+    /* run and take care of items that want to be retried immediately */
+    do
+      {
+        ret = wq->spec.workfunc (item->data);
+        item->retry_count++;
+      }
+    while ((ret == WQ_RETRY_NOW) 
+           && (item->retry_count < wq->spec.max_retries));
+
+    switch (ret)
+      {
+      case WQ_RETRY_LATER:
+	{
+	  item->retry_count++;
+	  goto stats;
+	}
+      case WQ_REQUEUE:
+	{
+	  item->retry_count++;
+	  work_queue_item_requeue (wq, node);
+	  break;
+	}
+      case WQ_RETRY_NOW:
+      case WQ_ERROR:
+	{
+	  if (wq->spec.errorfunc)
+	    wq->spec.errorfunc (wq, item);
+	}
+	/* fall through here is deliberate */
+      case WQ_SUCCESS:
+      default:
+	{
+	  work_queue_item_remove (wq, node);
+	  break;
+	}
+      }
+
+    /* completed cycle */
+    cycles++;
+
+    /* test if we should yield */
+    if ( !(cycles % wq->cycles.granularity) 
+        && thread_should_yield (thread))
+      {
+        yielded = 1;
+        goto stats;
+      }
+  }
+
+stats:
+
+#define WQ_HYSTERIS_FACTOR 2
+
+  /* we yielded, check whether granularity should be reduced */
+  if (yielded && (cycles < wq->cycles.granularity))
+    {
+      wq->cycles.granularity = ((cycles > 0) ? cycles 
+                                             : WORK_QUEUE_MIN_GRANULARITY);
+    }
+  
+  if (cycles > (wq->cycles.granularity))
+    {
+      if (cycles > wq->cycles.best)
+        wq->cycles.best = cycles;
+      
+      /* along with yielded check, provides hysteris for granularity */
+      if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR))
+        wq->cycles.granularity += WQ_HYSTERIS_FACTOR;
+    }
+#undef WQ_HYSTERIS_FACTOR
+  
+  wq->runs++;
+  wq->cycles.total += cycles;
+
+#if 0
+  printf ("%s: cycles %d, new: best %d, worst %d\n",
+            __func__, cycles, wq->cycles.best, wq->cycles.granularity);
+#endif
+  
+  /* Is the queue done yet? */
+  if (listcount (wq->items) > 0)
+    wq->thread = thread_add_background (wq->master, work_queue_run, wq,
+                                        wq->spec.delay);
+
+  return 0;
+}
diff --git a/lib/workqueue.h b/lib/workqueue.h
new file mode 100644
index 0000000..45fffe7
--- /dev/null
+++ b/lib/workqueue.h
@@ -0,0 +1,91 @@
+/* 
+ * Quagga Work Queues.
+ *
+ * Copyright (C) 2005 Sun Microsystems, Inc.
+ *
+ * This file is part of Quagga.
+ *
+ * Quagga is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2, or (at your option) any
+ * later version.
+ *
+ * Quagga is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Quagga; see the file COPYING.  If not, write to the Free
+ * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+ * 02111-1307, USA.  
+ */
+
+#ifndef _QUAGGA_WORK_QUEUE_H
+#define _QUAGGA_WORK_QUEUE_H
+
+/* Work queue default hold and cycle times - millisec */
+#define WORK_QUEUE_DEFAULT_HOLD  50  /* hold time for initial run of a queue */
+#define WORK_QUEUE_DEFAULT_DELAY 10  /* minimum delay between queue runs */
+
+/* action value, for use by item processor and item error handlers */
+typedef enum
+{
+  WQ_SUCCESS = 0,
+  WQ_ERROR,             /* Error, run error handler if provided */
+  WQ_RETRY_NOW,         /* retry immediately */
+  WQ_RETRY_LATER,       /* retry later, cease processing work queue */
+  WQ_REQUEUE            /* requeue item, continue processing work queue */
+} wq_item_status;
+
+/* A single work queue item, unsurprisingly */
+struct work_queue_item
+{
+  void *data;                           /* opaque data */
+  unsigned short retry_count;           /* number of times retried */            
+};
+
+struct work_queue
+{
+  struct thread_master *master;       /* thread master */
+  struct thread *thread;              /* thread, if one is active */
+  char *name;                         /* work queue name */
+  
+  /* specification for this work queue */
+  struct {
+    /* work function to process items with */
+    wq_item_status (*workfunc) (void *);
+
+    /* error handling function, optional */
+    void (*errorfunc) (struct work_queue *, struct work_queue_item *);
+    
+    /* callback to delete user specific item data */
+    void (*del_item_data) (void *);
+    
+    /* max number of retries to make for item that errors */
+    unsigned int max_retries;	
+
+    unsigned int hold;	/* hold time for first run, in ms */
+    unsigned int delay; /* min delay between queue runs, in ms */
+  } spec;
+  
+  /* remaining fields should be opaque to users */
+  struct list *items;                 /* queue item list */
+  unsigned long runs;                  /* runs count */
+  
+  struct {
+    unsigned int best;
+    unsigned int granularity;
+    unsigned long total;
+  } cycles;	/* cycle counts */
+};
+
+/* User API */
+struct work_queue *work_queue_new (struct thread_master *, const char *);
+void work_queue_free (struct work_queue *);
+void work_queue_add (struct work_queue *, void *);
+
+/* Helpers, exported for thread.c and command.c */
+int work_queue_run (struct thread *);
+extern struct cmd_element show_work_queues_cmd;
+#endif /* _QUAGGA_WORK_QUEUE_H */