2005-04-28 Paul Jakma <paul.jakma@sun.com>

	* rib.h: (struct rib) Add lock field for refcounting.
	* zserv.h: (struct zebra_t) Add a ribq workqueue to the zebra
	  'master' struct.
	* zserv.c: (zread_ipv4_add) XMALLOC then memset should be XCALLOC.
	* zebra_rib.c: Clean up refcounting of route_node, make struct rib
	  refcounted and convert rib_process to work-queue. In general,
	  rib's should be rib_addnode'd and delnode'd to route_nodes, and
	  these symmetrical functions will manage the locking of referenced
	  route_node and freeing of struct rib - rather than having users
	  manage each seperately - with much scope for bugs..
	  (newrib_free) removed and replaced with rib_lock
	  (rib_lock) new function, check state of lock and increment.
	  (rib_unlock) new function, check lock state and decrement. Free
	  struct rib if refcount hits 0, freeing struct nexthop's, as
	  newrib_free did.
	  (rib_addnode) Add RIB to route_node, locking both.
	  (rib_delnode) Delete RIB from route_node, unlocking each.
	  (rib_process) Converted to a work-queue work function.
	  Functional changes are minimal, just arguments, comments and
	  whitespace.
	  (rib_queue_add_qnode) Helper function to setup a ribq item.
	  (rib_queue_add) Helper function, same arguments as old
	  rib_process, to replace in callers of rib_process.
	  (rib_queue_qnode_del) ribq deconstructor.
	  (rib_queue_init) Create the ribq.
	  (rib_init) call rib_queue_init.
	  (remainder) Sanitise refcounting of route_node's. Convert to
	  rib_queue_add, rib_addnode and rib_delnode. Change XMALLOC/memset
	  to XCALLOC. Remove calls to nexthop_delete and nexthop_free.
diff --git a/zebra/zebra_rib.c b/zebra/zebra_rib.c
index 8eb8061..a8aaef3 100644
--- a/zebra/zebra_rib.c
+++ b/zebra/zebra_rib.c
@@ -29,6 +29,9 @@
 #include "if.h"
 #include "log.h"
 #include "sockunion.h"
+#include "linklist.h"
+#include "thread.h"
+#include "workqueue.h"
 
 #include "zebra/rib.h"
 #include "zebra/rt.h"
@@ -57,6 +60,12 @@
   {ZEBRA_ROUTE_ISIS,    115},
   {ZEBRA_ROUTE_BGP,      20  /* IBGP is 200. */}
 };
+
+struct zebra_queue_node_t
+{
+  struct route_node *node;
+  struct rib *del;
+};
 
 /* Vector for routing table.  */
 vector vrf_vector;
@@ -776,18 +785,35 @@
 #define RIB_SYSTEM_ROUTE(R) \
         ((R)->type == ZEBRA_ROUTE_KERNEL || (R)->type == ZEBRA_ROUTE_CONNECT)
 
-void
-newrib_free (struct rib *rib)
+static struct rib *
+rib_lock (struct rib *rib)
+{
+  assert (rib->lock >= 0);
+  
+  rib->lock++;
+  return rib;
+}
+
+static struct rib *
+rib_unlock (struct rib *rib)
 {
   struct nexthop *nexthop;
   struct nexthop *next;
+  
+  assert (rib->lock > 0);
+  rib->lock--;
 
-  for (nexthop = rib->nexthop; nexthop; nexthop = next)
+  if (rib->lock == 0)
     {
-      next = nexthop->next;
-      nexthop_free (nexthop);
+      for (nexthop = rib->nexthop; nexthop; nexthop = next)
+        {
+          next = nexthop->next;
+          nexthop_free (nexthop);
+        }
+      XFREE (MTYPE_RIB, rib);
+      return NULL;
     }
-  XFREE (MTYPE_RIB, rib);
+  return rib;
 }
 
 void
@@ -854,16 +880,28 @@
 }
 
 /* Core function for processing routing information base. */
-void
-rib_process (struct route_node *rn, struct rib *del)
+wq_item_status
+rib_process (struct zebra_queue_node_t *qnode)
 {
   struct rib *rib;
   struct rib *next;
   struct rib *fib = NULL;
   struct rib *select = NULL;
+  struct rib *del = qnode->del;
+  struct route_node *rn = qnode->node;
   int installed = 0;
   struct nexthop *nexthop = NULL;
-
+  
+  assert (rn);
+  
+  /* possibly should lock and unlock rib on each iteration. however, for
+   * now, we assume called functions are synchronous and dont delete RIBs
+   * (as the work-queue deconstructor for this function is supposed to be
+   * the canonical 'delete' path for RIBs). Further if called functions
+   * below were to made asynchronous they should themselves acquire any
+   * locks/refcounts as needed and not depend on this caller to do it for
+   * them
+   */
   for (rib = rn->info; rib; rib = next)
     {
       next = rib->next;
@@ -871,7 +909,7 @@
       /* Currently installed rib. */
       if (CHECK_FLAG (rib->flags, ZEBRA_FLAG_SELECTED))
         fib = rib;
-
+      
       /* Skip unreachable nexthop. */
       if (! nexthop_active_update (rn, rib, 0))
         continue;
@@ -885,44 +923,51 @@
           || rib->type == ZEBRA_ROUTE_CONNECT)
         select = rib;
     }
-
+  
   /* Deleted route check. */
   if (del && CHECK_FLAG (del->flags, ZEBRA_FLAG_SELECTED))
     fib = del;
-
+  
+  /* We possibly should lock fib and select here However, all functions
+   * below are 'inline' and not asynchronous And if any were to be
+   * converted, they should manage references themselves really..  See
+   * previous comment above.
+   */
+  
   /* Same route is selected. */
   if (select && select == fib)
     {
       if (CHECK_FLAG (select->flags, ZEBRA_FLAG_CHANGED))
-	{
-	  redistribute_delete (&rn->p, select);
-	  if (! RIB_SYSTEM_ROUTE (select))
-	    rib_uninstall_kernel (rn, select);
+        {
+          redistribute_delete (&rn->p, select);
+          if (! RIB_SYSTEM_ROUTE (select))
+            rib_uninstall_kernel (rn, select);
 
-	  /* Set real nexthop. */
-	  nexthop_active_update (rn, select, 1);
+          /* Set real nexthop. */
+          nexthop_active_update (rn, select, 1);
   
-	  if (! RIB_SYSTEM_ROUTE (select))
-	    rib_install_kernel (rn, select);
-	  redistribute_add (&rn->p, select);
-	}
+          if (! RIB_SYSTEM_ROUTE (select))
+            rib_install_kernel (rn, select);
+          redistribute_add (&rn->p, select);
+        }
       else if (! RIB_SYSTEM_ROUTE (select))
-	{
-	  /* Housekeeping code to deal with 
-	     race conditions in kernel with linux
-	     netlink reporting interface up before IPv4 or IPv6 protocol
-	     is ready to add routes.
-	     This makes sure the routes are IN the kernel.
-	  */
+        {
+          /* Housekeeping code to deal with 
+             race conditions in kernel with linux
+             netlink reporting interface up before IPv4 or IPv6 protocol
+             is ready to add routes.
+             This makes sure the routes are IN the kernel.
+           */
 
-	  for (nexthop = select->nexthop; nexthop; nexthop = nexthop->next)
-	    {
-	      if (CHECK_FLAG (nexthop->flags, NEXTHOP_FLAG_FIB))
-		installed = 1;
-	    }
-	    if (! installed) rib_install_kernel (rn, select);
-	}
-      return;
+          for (nexthop = select->nexthop; nexthop; nexthop = nexthop->next)
+            {
+              if (CHECK_FLAG (nexthop->flags, NEXTHOP_FLAG_FIB))
+                installed = 1;
+            }
+          if (! installed) 
+            rib_install_kernel (rn, select);
+        }
+      return WQ_SUCCESS;
     }
 
   /* Uninstall old rib from forwarding table. */
@@ -944,10 +989,102 @@
       nexthop_active_update (rn, select, 1);
 
       if (! RIB_SYSTEM_ROUTE (select))
-	rib_install_kernel (rn, select);
+        rib_install_kernel (rn, select);
       SET_FLAG (select->flags, ZEBRA_FLAG_SELECTED);
       redistribute_add (&rn->p, select);
     }
+
+  return WQ_SUCCESS;
+
+}
+
+/* Add work queue item to work queue and schedule processing */
+void
+rib_queue_add_qnode (struct zebra_t *zebra, struct zebra_queue_node_t *qnode)
+{
+  route_lock_node (qnode->node);
+  
+  if (IS_ZEBRA_DEBUG_EVENT)
+    zlog_info ("rib_queue_add_qnode: work queue added");
+
+  assert (zebra && qnode && qnode->node);
+
+  if (qnode->del)
+    rib_lock (qnode->del);
+  
+  if (zebra->ribq == NULL)
+    {
+      zlog_err ("rib_queue_add_qnode: ribq work_queue does not exist!");
+      route_unlock_node (qnode->node);
+      return;
+    }
+  
+  work_queue_add (zebra->ribq, qnode);
+
+  return;
+}
+
+/* Add route node and rib to work queue and schedule processing */
+void
+rib_queue_add (struct zebra_t *zebra, struct route_node *rn, struct rib *del)
+{
+ struct zebra_queue_node_t *qnode;
+
+ assert (zebra && rn);
+ 
+ qnode = (struct zebra_queue_node_t *) 
+          XCALLOC (MTYPE_RIB_QUEUE, sizeof (struct zebra_queue_node_t));
+ 
+ if (qnode == NULL)
+   {
+     zlog_err ("rib_queue_add: failed to allocate queue node memory, %s",
+               strerror (errno));
+     return;
+   }
+
+ qnode->node = rn;
+ qnode->del = del;
+ 
+ rib_queue_add_qnode (zebra, qnode);
+
+ return;
+}
+
+/* free zebra_queue_node_t */
+void
+rib_queue_qnode_del (struct zebra_queue_node_t *qnode)
+{
+  route_unlock_node (qnode->node);
+  
+  if (qnode->del)
+    rib_unlock (qnode->del);
+  
+  XFREE (MTYPE_RIB_QUEUE, qnode);
+}
+
+/* initialise zebra rib work queue */
+void
+rib_queue_init (struct zebra_t *zebra)
+{
+  assert (zebra);
+  
+  if (! (zebra->ribq = work_queue_new (zebra->master, 
+                                       "zebra_rib_work_queue")))
+    {
+      zlog_err ("rib_queue_init: could not initialise work queue!");
+      return;
+    }
+
+  /* fill in the work queue spec */
+  zebra->ribq->spec.workfunc = (wq_item_status (*) (void *))&rib_process;
+  zebra->ribq->spec.errorfunc = NULL;
+  zebra->ribq->spec.del_item_data = (void (*) (void *)) &rib_queue_qnode_del;
+  /* XXX: TODO: These should be runtime configurable via vty */
+  zebra->ribq->spec.max_retries = 3;
+  zebra->ribq->spec.hold = 500;
+  zebra->ribq->spec.delay = 10;
+  
+  return;
 }
 
 /* Add RIB to head of the route node. */
@@ -955,7 +1092,12 @@
 rib_addnode (struct route_node *rn, struct rib *rib)
 {
   struct rib *head;
-
+  
+  assert (rib && rn);
+  
+  rib_lock (rib);
+  route_lock_node (rn);
+  
   head = rn->info;
   if (head)
     head->prev = rib;
@@ -966,12 +1108,17 @@
 void
 rib_delnode (struct route_node *rn, struct rib *rib)
 {
+  assert (rn && rib);
+  
   if (rib->next)
     rib->next->prev = rib->prev;
   if (rib->prev)
     rib->prev->next = rib->next;
   else
     rn->info = rib->next;
+  
+  rib_unlock (rib);
+  route_unlock_node (rn);
 }
 
 int
@@ -1011,30 +1158,27 @@
   for (rib = rn->info; rib; rib = rib->next)
     {
       if (rib->type == ZEBRA_ROUTE_CONNECT)
-	{
-	  nexthop = rib->nexthop;
+        {
+          nexthop = rib->nexthop;
 
-	  /* Duplicate connected route comes in. */
-	  if (rib->type == type
-	      && nexthop && nexthop->type == NEXTHOP_TYPE_IFINDEX
-	      && nexthop->ifindex == ifindex)
-	    {
-	      rib->refcnt++;
-	      return 0 ;
-	    }
-	}
+          /* Duplicate connected route comes in. */
+          if (rib->type == type
+              && nexthop && nexthop->type == NEXTHOP_TYPE_IFINDEX
+              && nexthop->ifindex == ifindex)
+            {
+              rib->refcnt++;
+              return 0 ;
+            }
+        }
       else if (rib->type == type)
-	{
-	  same = rib;
-	  rib_delnode (rn, same);
-	  route_unlock_node (rn);
-	  break;
-	}
+        {
+          same = rib;
+          break;
+        }
     }
 
   /* Allocate new rib structure. */
-  rib = XMALLOC (MTYPE_RIB, sizeof (struct rib));
-  memset (rib, 0, sizeof (struct rib));
+  rib = XCALLOC (MTYPE_RIB, sizeof (struct rib));
   rib->type = type;
   rib->distance = distance;
   rib->flags = flags;
@@ -1061,14 +1205,15 @@
 
   /* Link new rib to node.*/
   rib_addnode (rn, rib);
-
+  
   /* Process this route node. */
-  rib_process (rn, same);
-
+  rib_queue_add (&zebrad, rn, same);
+  
   /* Free implicit route.*/
   if (same)
-    newrib_free (same);
-
+    rib_delnode (rn, same);
+  
+  route_unlock_node (rn);
   return 0;
 }
 
@@ -1079,12 +1224,11 @@
   struct route_node *rn;
   struct rib *same;
   struct nexthop *nexthop;
-
+  
   /* Lookup table.  */
   table = vrf_table (AFI_IP, SAFI_UNICAST, 0);
   if (! table)
     return 0;
-
   /* Make it sure prefixlen is applied to the prefix. */
   apply_mask_ipv4 (p);
 
@@ -1108,13 +1252,9 @@
     {
       if (same->type == rib->type && same->table == rib->table
 	  && same->type != ZEBRA_ROUTE_CONNECT)
-	{
-	  rib_delnode (rn, same);
-	  route_unlock_node (rn);
-	  break;
-	}
+        break;
     }
-
+  
   /* If this route is kernel route, set FIB flag to the route. */
   if (rib->type == ZEBRA_ROUTE_KERNEL || rib->type == ZEBRA_ROUTE_CONNECT)
     for (nexthop = rib->nexthop; nexthop; nexthop = nexthop->next)
@@ -1124,12 +1264,13 @@
   rib_addnode (rn, rib);
 
   /* Process this route node. */
-  rib_process (rn, same);
+  rib_queue_add (&zebrad, rn, same);
 
   /* Free implicit route.*/
   if (same)
-    newrib_free (same);
-
+    rib_delnode (rn, same);
+  
+  route_unlock_node (rn);
   return 0;
 }
 
@@ -1265,21 +1406,14 @@
 	  return ZEBRA_ERR_RTNOEXIST;
 	}
     }
+  
+  /* Process changes. */
+  rib_queue_add (&zebrad, rn, same);
 
   if (same)
     rib_delnode (rn, same);
-
-  /* Process changes. */
-  rib_process (rn, same);
-
-  if (same)
-    {
-      newrib_free (same);
-      route_unlock_node (rn);
-    }
-
+  
   route_unlock_node (rn);
-
   return 0;
 }
 
@@ -1318,15 +1452,14 @@
           case STATIC_IPV4_BLACKHOLE:
             nexthop_blackhole_add (rib);
             break;
-         }
-      rib_process (rn, NULL);
+        }
+      rib_queue_add (&zebrad, rn, NULL);
     }
   else
     {
       /* This is new static route. */
-      rib = XMALLOC (MTYPE_RIB, sizeof (struct rib));
-      memset (rib, 0, sizeof (struct rib));
-
+      rib = XCALLOC (MTYPE_RIB, sizeof (struct rib));
+      
       rib->type = ZEBRA_ROUTE_STATIC;
       rib->distance = si->distance;
       rib->metric = 0;
@@ -1352,7 +1485,7 @@
       rib_addnode (rn, rib);
 
       /* Process this prefix. */
-      rib_process (rn, NULL);
+      rib_queue_add (&zebrad, rn, NULL);
     }
 }
 
@@ -1386,7 +1519,7 @@
   table = vrf_table (AFI_IP, SAFI_UNICAST, 0);
   if (! table)
     return;
-
+  
   /* Lookup existing route with type and distance. */
   rn = route_node_lookup (table, p);
   if (! rn)
@@ -1417,20 +1550,15 @@
   /* Check nexthop. */
   if (rib->nexthop_num == 1)
     {
+      rib_queue_add (&zebrad, rn, rib);
       rib_delnode (rn, rib);
-      rib_process (rn, rib);
-      newrib_free (rib);
-      route_unlock_node (rn);
     }
   else
     {
       if (CHECK_FLAG (nexthop->flags, NEXTHOP_FLAG_FIB))
         rib_uninstall (rn, rib);
-      nexthop_delete (rib, nexthop);
-      nexthop_free (nexthop);
-      rib_process (rn, rib);
+      rib_queue_add (&zebrad, rn, rib);
     }
-
   /* Unlock node. */
   route_unlock_node (rn);
 }
@@ -1671,15 +1799,13 @@
       else if (rib->type == type)
 	{
 	  same = rib;
-	  rib_delnode (rn, same);
-	  route_unlock_node (rn);
 	  break;
 	}
     }
 
   /* Allocate new rib structure. */
-  rib = XMALLOC (MTYPE_RIB, sizeof (struct rib));
-  memset (rib, 0, sizeof (struct rib));
+  rib = XCALLOC (MTYPE_RIB, sizeof (struct rib));
+  
   rib->type = type;
   rib->distance = distance;
   rib->flags = flags;
@@ -1708,12 +1834,13 @@
   rib_addnode (rn, rib);
 
   /* Process this route node. */
-  rib_process (rn, same);
-
+  rib_queue_add (&zebrad, rn, same);
+  
   /* Free implicit route.*/
   if (same)
-    newrib_free (same);
-
+    rib_delnode (rn, same);
+  
+  route_unlock_node (rn);
   return 0;
 }
 
@@ -1737,7 +1864,7 @@
   table = vrf_table (AFI_IP6, SAFI_UNICAST, 0);
   if (! table)
     return 0;
-
+  
   /* Lookup route node. */
   rn = route_node_lookup (table, (struct prefix *) p);
   if (! rn)
@@ -1829,20 +1956,13 @@
 	}
     }
 
+  /* Process changes. */
+  rib_queue_add (&zebrad, rn, same);
+
   if (same)
     rib_delnode (rn, same);
-
-  /* Process changes. */
-  rib_process (rn, same);
-
-  if (same)
-    {
-      newrib_free (same);
-      route_unlock_node (rn);
-    }
-
+  
   route_unlock_node (rn);
-
   return 0;
 }
 
@@ -1883,14 +2003,13 @@
 	  nexthop_ipv6_ifname_add (rib, &si->ipv6, si->ifname);
 	  break;
 	}
-      rib_process (rn, NULL);
+      rib_queue_add (&zebrad, rn, NULL);
     }
   else
     {
       /* This is new static route. */
-      rib = XMALLOC (MTYPE_RIB, sizeof (struct rib));
-      memset (rib, 0, sizeof (struct rib));
-
+      rib = XCALLOC (MTYPE_RIB, sizeof (struct rib));
+      
       rib->type = ZEBRA_ROUTE_STATIC;
       rib->distance = si->distance;
       rib->metric = 0;
@@ -1916,7 +2035,7 @@
       rib_addnode (rn, rib);
 
       /* Process this prefix. */
-      rib_process (rn, NULL);
+      rib_queue_add (&zebrad, rn, NULL);
     }
 }
 
@@ -1982,19 +2101,14 @@
   if (rib->nexthop_num == 1)
     {
       rib_delnode (rn, rib);
-      rib_process (rn, rib);
-      newrib_free (rib);
-      route_unlock_node (rn);
+      rib_queue_add (&zebrad, rn, rib);
     }
   else
     {
       if (CHECK_FLAG (nexthop->flags, NEXTHOP_FLAG_FIB))
         rib_uninstall (rn, rib);
-      nexthop_delete (rib, nexthop);
-      nexthop_free (nexthop);
-      rib_process (rn, rib);
+      rib_queue_add (&zebrad, rn, rib);
     }
-
   /* Unlock node. */
   route_unlock_node (rn);
 }
@@ -2140,16 +2254,18 @@
 {
   struct route_node *rn;
   struct route_table *table;
-
+  
   table = vrf_table (AFI_IP, SAFI_UNICAST, 0);
   if (table)
     for (rn = route_top (table); rn; rn = route_next (rn))
-      rib_process (rn, NULL);
+      if (rn->info)
+        rib_queue_add (&zebrad, rn, NULL);
 
   table = vrf_table (AFI_IP6, SAFI_UNICAST, 0);
   if (table)
     for (rn = route_top (table); rn; rn = route_next (rn))
-      rib_process (rn, NULL);
+      if (rn->info)
+        rib_queue_add (&zebrad, rn, NULL);
 }
 
 /* Interface goes up. */
@@ -2182,11 +2298,7 @@
 
 	  if (rib->table != zebrad.rtm_table_default &&
 	      rib->table != RT_TABLE_MAIN)
-	    {
-	      rib_delnode (rn, rib);
-	      newrib_free (rib);
-	      route_unlock_node (rn);
-	    }
+            rib_delnode (rn, rib);
 	}
 }
 
@@ -2218,11 +2330,7 @@
 	    {
 	      ret = rib_uninstall_kernel (rn, rib);
 	      if (! ret)
-		{
-		  rib_delnode (rn, rib);
-		  newrib_free (rib);
-		  route_unlock_node (rn);
-		}
+                rib_delnode (rn, rib);
 	    }
 	}
 }
@@ -2262,6 +2370,7 @@
 void
 rib_init ()
 {
+  rib_queue_init (&zebrad);
   /* VRF initialization.  */
   vrf_init ();
 }