+ initial edition of meta-queue for RIB updates processing (bug #431)
diff --git a/lib/ChangeLog b/lib/ChangeLog
index da0fa8c..d8eb06e 100644
--- a/lib/ChangeLog
+++ b/lib/ChangeLog
@@ -1,3 +1,10 @@
+2008-06-02 Denis Ovsienko
+
+	* workqueue.[ch]: completely drop WQ_AIM_HEAD flag and
+	  work_queue_aim_head() function, they aren't needed any more
+	  with the new meta queue structure; (work_queue_run) don't
+	  increment the counter on work item requeueing
+
 2008-02-28 Paul Jakma <paul.jakma@sun.com>
 
 	* log.c: (mes_lookup) Sowmini Varadhan diagnosed a problem where
diff --git a/lib/workqueue.c b/lib/workqueue.c
index 8880b9e..1d32d24 100644
--- a/lib/workqueue.c
+++ b/lib/workqueue.c
@@ -67,7 +67,6 @@
   new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
   new->master = m;
   SET_FLAG (new->flags, WQ_UNPLUGGED);
-  UNSET_FLAG (new->flags, WQ_AIM_HEAD);
   
   if ( (new->items = list_new ()) == NULL)
     {
@@ -131,10 +130,7 @@
     }
   
   item->data = data;
-  if (CHECK_FLAG (wq->flags, WQ_AIM_HEAD))
-    listnode_add_after (wq->items, NULL, item);
-  else
-    listnode_add (wq->items, item);
+  listnode_add (wq->items, item);
   
   work_queue_schedule (wq, wq->spec.hold);
   
@@ -231,15 +227,6 @@
   work_queue_schedule (wq, wq->spec.hold);
 }
 
-void
-work_queue_aim_head (struct work_queue *wq, const unsigned aim_head)
-{
-  if (aim_head)
-    SET_FLAG (wq->flags, WQ_AIM_HEAD);
-  else
-    UNSET_FLAG (wq->flags, WQ_AIM_HEAD);
-}
-
 /* timer thread to process a work queue
  * will reschedule itself if required,
  * otherwise work_queue_item_add 
@@ -317,6 +304,7 @@
 	}
       case WQ_REQUEUE:
 	{
+	  item->ran--;
 	  work_queue_item_requeue (wq, node);
 	  break;
 	}
diff --git a/lib/workqueue.h b/lib/workqueue.h
index 3150c32..f59499a 100644
--- a/lib/workqueue.h
+++ b/lib/workqueue.h
@@ -48,7 +48,6 @@
 };
 
 #define WQ_UNPLUGGED	(1 << 0) /* available for draining */
-#define WQ_AIM_HEAD	(1 << 1) /* add new items before list head, not after tail */
 
 struct work_queue
 {
@@ -119,8 +118,6 @@
 extern void work_queue_plug (struct work_queue *wq);
 /* unplug the queue, allow it to be drained again */
 extern void work_queue_unplug (struct work_queue *wq);
-/* control the value for WQ_AIM_HEAD flag */
-extern void work_queue_aim_head (struct work_queue *wq, const unsigned);
 
 /* Helpers, exported for thread.c and command.c */
 extern int work_queue_run (struct thread *);
diff --git a/zebra/ChangeLog b/zebra/ChangeLog
index 6f6dfa2..6483f2c 100644
--- a/zebra/ChangeLog
+++ b/zebra/ChangeLog
@@ -1,3 +1,18 @@
+2008-06-02 Denis Ovsienko
+
+	* connected.c: (connected_up_ipv4, connected_down_ipv4,
+	  connected_up_ipv6, connected_down_ipv6): don't call
+	  work_queue_aim_head()
+	* rib.h: adjust RIB_ROUTE_QUEUED macro for meta_queue,
+	  declare meta_queue structure
+	* zebra_rib.c: (process_subq, meta_queue_process, rib_meta_queue_add,
+	  meta_queue_new) new functions; (rib_queue_add) don't try checking
+	  RIB_QUEUE_ADDED flag, rib_meta_queue_add() does it better, take care
+	  of the meta queue instead; (rib_queue_init) initialize the meta queue
+	  as well; (rib_lookup_and_pushup) don't call work_queue_aim_head();
+	  (rib_process) only do actual processing, don't do deallocation;
+	* zserv.h: include meta_queue field into zebra_t structure
+
 2008-05-29 Stephen Hemminger <stephen.hemminger@vyatta.com>
 
 	* rt_netlink.c: (netlink_install_filter) BPF filter to catch and
diff --git a/zebra/connected.c b/zebra/connected.c
index 8bf1d33..ad3e960 100644
--- a/zebra/connected.c
+++ b/zebra/connected.c
@@ -188,15 +188,8 @@
   if (prefix_ipv4_any (&p))
     return;
 
-  /* Always push arriving/departing connected routes into the head of
-   * the working queue to make possible proper validation of the rest
-   * of the RIB queue (which will contain the whole RIB after the first
-   * call to rib_update()).
-   */
-  work_queue_aim_head (zebrad.ribq, 1);
   rib_add_ipv4 (ZEBRA_ROUTE_CONNECT, 0, &p, NULL, NULL, ifp->ifindex,
 	RT_TABLE_MAIN, ifp->metric, 0);
-  work_queue_aim_head (zebrad.ribq, 0);
 
   rib_update ();
 }
@@ -302,9 +295,7 @@
     return;
 
   /* Same logic as for connected_up_ipv4(): push the changes into the head. */
-  work_queue_aim_head (zebrad.ribq, 1);
   rib_delete_ipv4 (ZEBRA_ROUTE_CONNECT, 0, &p, NULL, ifp->ifindex, 0);
-  work_queue_aim_head (zebrad.ribq, 0);
 
   rib_update ();
 }
@@ -349,10 +340,8 @@
     return;
 #endif
 
-  work_queue_aim_head (zebrad.ribq, 1);
   rib_add_ipv6 (ZEBRA_ROUTE_CONNECT, 0, &p, NULL, ifp->ifindex, 0,
                 ifp->metric, 0);
-  work_queue_aim_head (zebrad.ribq, 0);
 
   rib_update ();
 }
@@ -426,9 +415,7 @@
   if (IN6_IS_ADDR_UNSPECIFIED (&p.prefix))
     return;
 
-  work_queue_aim_head (zebrad.ribq, 1);
   rib_delete_ipv6 (ZEBRA_ROUTE_CONNECT, 0, &p, NULL, ifp->ifindex, 0);
-  work_queue_aim_head (zebrad.ribq, 0);
 
   rib_update ();
 }
diff --git a/zebra/rib.h b/zebra/rib.h
index 9621f2c..887ed3c 100644
--- a/zebra/rib.h
+++ b/zebra/rib.h
@@ -40,7 +40,7 @@
 {
   /* Status Flags for the *route_node*, but kept in the head RIB.. */
   u_char rn_status;
-#define RIB_ROUTE_QUEUED	(1 << 0)
+#define RIB_ROUTE_QUEUED(x)	(1 << (x))
 
   /* Link list. */
   struct rib *next;
@@ -83,6 +83,20 @@
   u_char nexthop_fib_num;
 };
 
+/* meta-queue structure:
+ * sub-queue 0: connected, kernel
+ * sub-queue 1: static
+ * sub-queue 2: RIP, RIPng, OSPF, OSPF6, IS-IS
+ * sub-queue 3: iBGP, eBGP
+ * sub-queue 4: any other origin (if any)
+ */
+#define MQ_SIZE 5
+struct meta_queue
+{
+  struct list *subq[MQ_SIZE];
+  u_int32_t size; /* sum of lengths of all subqueues */
+};
+
 /* Static route information. */
 struct static_ipv4
 {
diff --git a/zebra/zebra_rib.c b/zebra/zebra_rib.c
index c6af329..4cb72ba 100644
--- a/zebra/zebra_rib.c
+++ b/zebra/zebra_rib.c
@@ -981,15 +981,14 @@
 static void rib_unlink (struct route_node *, struct rib *);
 
 /* Core function for processing routing information base. */
-static wq_item_status
-rib_process (struct work_queue *wq, void *data)
+static void
+rib_process (struct route_node *rn)
 {
   struct rib *rib;
   struct rib *next;
   struct rib *fib = NULL;
   struct rib *select = NULL;
   struct rib *del = NULL;
-  struct route_node *rn = data;
   int installed = 0;
   struct nexthop *nexthop = NULL;
   char buf[INET6_ADDRSTRLEN];
@@ -1177,10 +1176,95 @@
 end:
   if (IS_ZEBRA_DEBUG_RIB_Q)
     zlog_debug ("%s: %s/%d: rn %p dequeued", __func__, buf, rn->p.prefixlen, rn);
-  if (rn->info)
-    UNSET_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED);  
-  route_unlock_node (rn); /* rib queue lock */
-  return WQ_SUCCESS;
+}
+
+/* Take a list of route_node structs and return 1, if there was a record picked from
+ * it and processed by rib_process(). Don't process more, than one RN record; operate
+ * only in the specified sub-queue.
+ */
+unsigned int
+process_subq (struct list * subq, u_char qindex)
+{
+  struct listnode *lnode;
+  struct route_node *rnode;
+  if (!(lnode = listhead (subq)))
+    return 0;
+  rnode = listgetdata (lnode);
+  rib_process (rnode);
+  if (rnode->info) /* The first RIB record is holding the flags bitmask. */
+    UNSET_FLAG (((struct rib *)rnode->info)->rn_status, RIB_ROUTE_QUEUED(qindex));
+  route_unlock_node (rnode);
+  list_delete_node (subq, lnode);
+  return 1;
+}
+
+/* Dispatch the meta queue by picking, processing and unlocking the next RN from
+ * a non-empty sub-queue with lowest priority. wq is equal to zebra->ribq and data
+ * is pointed to the meta queue structure.
+ */
+static wq_item_status
+meta_queue_process (struct work_queue *dummy, void *data)
+{
+  struct meta_queue * mq = data;
+  u_char i;
+  for (i = 0; i < MQ_SIZE; i++)
+    if (process_subq (mq->subq[i], i))
+    {
+      mq->size--;
+      break;
+    }
+  return mq->size ? WQ_REQUEUE : WQ_SUCCESS;
+}
+
+/* Look into the RN and queue it into one or more priority queues, increasing the size
+ * for each data push done.
+ */
+void rib_meta_queue_add (struct meta_queue *mq, struct route_node *rn)
+{
+  u_char qindex;
+  struct rib *rib;
+  char buf[INET6_ADDRSTRLEN];
+  if (IS_ZEBRA_DEBUG_RIB_Q)
+    inet_ntop (rn->p.family, &rn->p.u.prefix, buf, INET6_ADDRSTRLEN);
+  for (rib = rn->info; rib; rib = rib->next)
+  {
+    switch (rib->type)
+    {
+      case ZEBRA_ROUTE_KERNEL:
+      case ZEBRA_ROUTE_CONNECT:
+        qindex = 0;
+        break;
+      case ZEBRA_ROUTE_STATIC:
+        qindex = 1;
+        break;
+      case ZEBRA_ROUTE_RIP:
+      case ZEBRA_ROUTE_RIPNG:
+      case ZEBRA_ROUTE_OSPF:
+      case ZEBRA_ROUTE_OSPF6:
+      case ZEBRA_ROUTE_ISIS:
+        qindex = 2;
+        break;
+      case ZEBRA_ROUTE_BGP:
+        qindex = 3;
+        break;
+      default:
+        qindex = 4;
+        break;
+    }
+    /* Invariant: at this point we always have rn->info set. */
+    if (CHECK_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED(qindex)))
+    {
+      if (IS_ZEBRA_DEBUG_RIB_Q)
+        zlog_debug ("%s: %s/%d: rn %p is already queued in sub-queue %u", __func__, buf, rn->p.prefixlen, rn, qindex);
+      continue;
+    }
+    SET_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED(qindex));
+    listnode_add (mq->subq[qindex], rn);
+    route_lock_node (rn);
+    mq->size++;
+    if (IS_ZEBRA_DEBUG_RIB_Q)
+      zlog_debug ("%s: %s/%d: queued rn %p into sub-queue %u", __func__, buf, rn->p.prefixlen, rn, qindex);
+  }
 }
 
 /* Add route_node to work queue and schedule processing */
@@ -1202,17 +1286,6 @@
       return;
     }
 
-  /* Route-table node already queued, so nothing to do */
-  if (CHECK_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED))
-    {
-      if (IS_ZEBRA_DEBUG_RIB_Q)
-        zlog_debug ("%s: %s/%d: rn %p already queued", __func__, buf,
-          rn->p.prefixlen, rn);
-      return;
-    }
-
-  route_lock_node (rn); /* rib queue lock */
-
   if (IS_ZEBRA_DEBUG_RIB_Q)
     zlog_info ("%s: %s/%d: work queue added", __func__, buf, rn->p.prefixlen);
 
@@ -1221,13 +1294,21 @@
   if (zebra->ribq == NULL)
     {
       zlog_err ("%s: work_queue does not exist!", __func__);
-      route_unlock_node (rn);
       return;
     }
-  
-  work_queue_add (zebra->ribq, rn);
 
-  SET_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED);
+  /* The RIB queue should normally be either empty or holding the only work_queue_item
+   * element. In the latter case this element would hold a pointer to the meta queue
+   * structure, which must be used to actually queue the route nodes to process. So
+   * create the MQ holder, if necessary, then push the work into it in any case.
+   * This semantics was introduced after 0.99.9 release.
+   */
+
+  /* Should I invent work_queue_empty() and use it, or it's Ok to do as follows? */
+  if (!zebra->ribq->items->count)
+    work_queue_add (zebra->ribq, zebra->mq);
+
+  rib_meta_queue_add (zebra->mq, rn);
 
   if (IS_ZEBRA_DEBUG_RIB_Q)
     zlog_debug ("%s: %s/%d: rn %p queued", __func__, buf, rn->p.prefixlen, rn);
@@ -1235,6 +1316,30 @@
   return;
 }
 
+/* Create new meta queue. A destructor function doesn't seem to be necessary here. */
+struct meta_queue *
+meta_queue_new ()
+{
+  struct meta_queue *new;
+  unsigned i, failed = 0;
+
+  if ((new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct meta_queue))) == NULL)
+    return NULL;
+  for (i = 0; i < MQ_SIZE; i++)
+    if ((new->subq[i] = list_new ()) == NULL)
+      failed = 1;
+  if (failed)
+  {
+    for (i = 0; i < MQ_SIZE; i++)
+      if (new->subq[i])
+        list_delete (new->subq[i]);
+    XFREE (MTYPE_WORK_QUEUE, new);
+    return NULL;
+  }
+  new->size = 0;
+  return new;
+}
+
 /* initialise zebra rib work queue */
 static void
 rib_queue_init (struct zebra_t *zebra)
@@ -1249,12 +1354,17 @@
     }
 
   /* fill in the work queue spec */
-  zebra->ribq->spec.workfunc = &rib_process;
+  zebra->ribq->spec.workfunc = &meta_queue_process;
   zebra->ribq->spec.errorfunc = NULL;
   /* XXX: TODO: These should be runtime configurable via vty */
   zebra->ribq->spec.max_retries = 3;
   zebra->ribq->spec.hold = rib_process_hold_time;
   
+  if (!(zebra->mq = meta_queue_new ()))
+  {
+    zlog_err ("%s: could not initialise meta queue!", __func__);
+    return;
+  }
   return;
 }
 
@@ -1663,11 +1773,7 @@
     }
   }
   if (changed)
-  {
-    work_queue_aim_head (zebrad.ribq, 1);
     rib_queue_add (&zebrad, rn);
-    work_queue_aim_head (zebrad.ribq, 0);
-  }
 }
 
 int
diff --git a/zebra/zserv.h b/zebra/zserv.h
index 5e22377..87a33a4 100644
--- a/zebra/zserv.h
+++ b/zebra/zserv.h
@@ -80,6 +80,7 @@
 
   /* rib work queue */
   struct work_queue *ribq;
+  struct meta_queue *mq;
 };
 
 /* Count prefix size from mask length */