2005-05-23 Paul Jakma <paul@dishone.st>

	* workqueue.h: Add a WQ_QUEUE_BLOCKED item_status return code,
	  to allow a queue function to indicate the queue is not
	  ready/blocked - rather than any problem with the item at hand.
	  Add a notion of being able to 'plug' and 'unplug' a queue.
	  Add helpers to plug/unplug a queue.
	  Add a completion callback, to be called when a queue is emptied.
	* workqueue.c: (work_queue_new) remove useless list_free.
	  (work_queue_schedule) new internal helper function to schedule
	  queue, if appropriate.
	  (work_queue_add) use work_queue_schedule
	  (show_work_queues) Print 'P' if queue is plugged.
	  (work_queue_plug) new API function, plug a queue - ie prevent it
	  from 'drained' / processed / scheduled.
	  (work_queue_unplug) unplug a queue, allowing it to be drained
	  / scheduled / processed again.
	  (work_queue_run) Add support for WQ_QUEUE_BLOCKED.
	  Add comment for RETRY_NOW case.
	  Make hysteris more aggresive in ramping up granularity, improves
	  performance significantly.
	  Add support for calling completion callback when queue is emptied,
	  possibly useful for knowing when to unplug a queue.
diff --git a/lib/ChangeLog b/lib/ChangeLog
index 3268ab9..8e5b7c3 100644
--- a/lib/ChangeLog
+++ b/lib/ChangeLog
@@ -9,6 +9,27 @@
 	  from VTY_GET_INTEGER_RANGE
 	* vty.h: fix the VTY_GET macros, do {..} while(0) so they have
 	  correct function like syntax in usage.
+	* workqueue.h: Add a WQ_QUEUE_BLOCKED item_status return code,
+	  to allow a queue function to indicate the queue is not
+	  ready/blocked - rather than any problem with the item at hand.
+	  Add a notion of being able to 'plug' and 'unplug' a queue.
+	  Add helpers to plug/unplug a queue.
+	  Add a completion callback, to be called when a queue is emptied.
+	* workqueue.c: (work_queue_new) remove useless list_free.
+	  (work_queue_schedule) new internal helper function to schedule
+	  queue, if appropriate.
+	  (work_queue_add) use work_queue_schedule
+	  (show_work_queues) Print 'P' if queue is plugged.
+	  (work_queue_plug) new API function, plug a queue - ie prevent it
+	  from 'drained' / processed / scheduled.
+	  (work_queue_unplug) unplug a queue, allowing it to be drained
+	  / scheduled / processed again.
+	  (work_queue_run) Add support for WQ_QUEUE_BLOCKED.
+	  Add comment for RETRY_NOW case.
+	  Make hysteris more aggresive in ramping up granularity, improves
+	  performance significantly.
+	  Add support for calling completion callback when queue is emptied,
+	  possibly useful for knowing when to unplug a queue.
 
 2005-05-19 Paul Jakma <paul@dishone.st>
 
diff --git a/lib/workqueue.c b/lib/workqueue.c
index fc61d68..bac4130 100644
--- a/lib/workqueue.c
+++ b/lib/workqueue.c
@@ -69,9 +69,6 @@
   
   if ( (new->items = list_new ()) == NULL)
     {
-      if (new->items)
-        list_free (new->items);
-      
       XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
       XFREE (MTYPE_WORK_QUEUE, new);
       
@@ -99,6 +96,22 @@
   return;
 }
 
+static inline int
+work_queue_schedule (struct work_queue *wq, unsigned int delay)
+{
+  /* if appropriate, schedule work queue thread */
+  if ( (wq->flags == WQ_UNPLUGGED) 
+       && (wq->thread == NULL)
+       && (listcount (wq->items) > 0) )
+    {
+      wq->thread = thread_add_background (wq->master, work_queue_run, 
+                                          wq, delay);
+      return 1;
+    }
+  else
+    return 0;
+}
+  
 void
 work_queue_add (struct work_queue *wq, void *data)
 {
@@ -115,12 +128,7 @@
   item->data = data;
   listnode_add (wq->items, item);
   
-  /* if thread isnt already waiting, add one */
-  if (wq->thread == NULL)
-    wq->thread = thread_add_background (wq->master, work_queue_run, 
-                                        wq, wq->spec.hold);
-
-  /* XXX: what if we didnt get a thread? try again? */
+  work_queue_schedule (wq, wq->spec.hold);
   
   return;
 }
@@ -159,11 +167,12 @@
   struct work_queue *wq;
   
   vty_out (vty, 
-           "%8s  %11s  %8s %21s%s",
-           "List","(ms)   ","Q. Runs","Cycle Counts   ",
+           "%c %8s  %11s  %8s %21s%s",
+           ' ', "List","(ms)   ","Q. Runs","Cycle Counts   ",
            VTY_NEWLINE);
   vty_out (vty,
-           "%8s  %5s %5s  %8s  %7s %6s %6s %s%s",
+           "%c %8s  %5s %5s  %8s  %7s %6s %6s %s%s",
+           ' ',
            "Items",
            "Delay","Hold",
            "Total",
@@ -173,7 +182,8 @@
  
   for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
     {
-      vty_out (vty,"%8d  %5d %5d  %8ld  %7d %6d %6u %s%s",
+      vty_out (vty,"%c %8d  %5d %5d  %8ld  %7d %6d %6u %s%s",
+               (wq->flags == WQ_PLUGGED ? 'P' : ' '),
                listcount (wq->items),
                wq->spec.delay, wq->spec.hold,
                wq->runs,
@@ -187,6 +197,32 @@
   return CMD_SUCCESS;
 }
 
+/* 'plug' a queue: Stop it from being scheduled,
+ * ie: prevent the queue from draining.
+ */
+void
+work_queue_plug (struct work_queue *wq)
+{
+  if (wq->thread)
+    thread_cancel (wq->thread);
+  
+  wq->thread = NULL;
+  
+  wq->flags = WQ_PLUGGED;
+}
+
+/* unplug queue, schedule it again, if appropriate
+ * Ie: Allow the queue to be drained again
+ */
+void
+work_queue_unplug (struct work_queue *wq)
+{
+  wq->flags = WQ_UNPLUGGED;
+
+  /* if thread isnt already waiting, add one */
+  work_queue_schedule (wq, wq->spec.hold);
+}
+
 /* timer thread to process a work queue
  * will reschedule itself if required,
  * otherwise work_queue_item_add 
@@ -250,6 +286,13 @@
 
     switch (ret)
       {
+      case WQ_QUEUE_BLOCKED:
+        {
+          /* decrement item->ran again, cause this isn't an item
+           * specific error, and fall through to WQ_RETRY_LATER
+           */
+          item->ran--;
+        }
       case WQ_RETRY_LATER:
 	{
 	  goto stats;
@@ -260,6 +303,7 @@
 	  break;
 	}
       case WQ_RETRY_NOW:
+        /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
       case WQ_ERROR:
 	{
 	  if (wq->spec.errorfunc)
@@ -303,7 +347,9 @@
         wq->cycles.best = cycles;
       
       /* along with yielded check, provides hysteris for granularity */
-      if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR))
+      if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR * 2))
+        wq->cycles.granularity *= WQ_HYSTERIS_FACTOR; /* quick ramp-up */
+      else if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR))
         wq->cycles.granularity += WQ_HYSTERIS_FACTOR;
     }
 #undef WQ_HYSTERIS_FACTOR
@@ -316,10 +362,11 @@
             __func__, cycles, wq->cycles.best, wq->cycles.granularity);
 #endif
   
-  /* Is the queue done yet? */
+  /* Is the queue done yet? If it is, call the completion callback. */
   if (listcount (wq->items) > 0)
-    wq->thread = thread_add_background (wq->master, work_queue_run, wq,
-                                        wq->spec.delay);
-
+    work_queue_schedule (wq, wq->spec.delay);
+  else if (wq->spec.completion_func)
+    wq->spec.completion_func (wq);
+  
   return 0;
 }
diff --git a/lib/workqueue.h b/lib/workqueue.h
index 257667e..626d8e6 100644
--- a/lib/workqueue.h
+++ b/lib/workqueue.h
@@ -35,7 +35,10 @@
   WQ_ERROR,             /* Error, run error handler if provided */
   WQ_RETRY_NOW,         /* retry immediately */
   WQ_RETRY_LATER,       /* retry later, cease processing work queue */
-  WQ_REQUEUE            /* requeue item, continue processing work queue */
+  WQ_REQUEUE,		/* requeue item, continue processing work queue */
+  WQ_QUEUE_BLOCKED,	/* Queue cant be processed at this time.
+                         * Similar to WQ_RETRY_LATER, but doesn't penalise
+                         * the particular item.. */
 } wq_item_status;
 
 /* A single work queue item, unsurprisingly */
@@ -45,11 +48,18 @@
   unsigned short ran;			/* # of times item has been run */
 };
 
+enum work_queue_flags
+{
+  WQ_UNPLUGGED = 0,
+  WQ_PLUGGED = 1,
+};
+
 struct work_queue
 {
   struct thread_master *master;       /* thread master */
   struct thread *thread;              /* thread, if one is active */
   char *name;                         /* work queue name */
+  enum work_queue_flags flags;		/* flags */
   
   /* specification for this work queue */
   struct {
@@ -62,6 +72,9 @@
     /* callback to delete user specific item data */
     void (*del_item_data) (void *);
     
+    /* completion callback, called when queue is emptied, optional */
+    void (*completion_func) (struct work_queue *);
+    
     /* max number of retries to make for item that errors */
     unsigned int max_retries;	
 
@@ -71,7 +84,7 @@
   
   /* remaining fields should be opaque to users */
   struct list *items;                 /* queue item list */
-  unsigned long runs;                  /* runs count */
+  unsigned long runs;                 /* runs count */
   
   struct {
     unsigned int best;
@@ -81,11 +94,24 @@
 };
 
 /* User API */
+
+/* create a new work queue, of given name. 
+ * user must fill in the spec of the returned work queue before adding
+ * anything to it
+ */
 extern struct work_queue *work_queue_new (struct thread_master *,
                                           const char *);
+/* destroy work queue */
 extern void work_queue_free (struct work_queue *);
+
+/* Add the supplied data as an item onto the workqueue */
 extern void work_queue_add (struct work_queue *, void *);
 
+/* plug the queue, ie prevent it from being drained / processed */
+extern void work_queue_plug (struct work_queue *wq);
+/* unplug the queue, allow it to be drained again */
+extern void work_queue_unplug (struct work_queue *wq);
+
 /* Helpers, exported for thread.c and command.c */
 extern int work_queue_run (struct thread *);
 extern struct cmd_element show_work_queues_cmd;