+ initial edition of meta-queue for RIB updates processing (bug #431)
diff --git a/zebra/zebra_rib.c b/zebra/zebra_rib.c
index c6af329..4cb72ba 100644
--- a/zebra/zebra_rib.c
+++ b/zebra/zebra_rib.c
@@ -981,15 +981,14 @@
 static void rib_unlink (struct route_node *, struct rib *);
 
 /* Core function for processing routing information base. */
-static wq_item_status
-rib_process (struct work_queue *wq, void *data)
+static void
+rib_process (struct route_node *rn)
 {
   struct rib *rib;
   struct rib *next;
   struct rib *fib = NULL;
   struct rib *select = NULL;
   struct rib *del = NULL;
-  struct route_node *rn = data;
   int installed = 0;
   struct nexthop *nexthop = NULL;
   char buf[INET6_ADDRSTRLEN];
@@ -1177,10 +1176,95 @@
 end:
   if (IS_ZEBRA_DEBUG_RIB_Q)
     zlog_debug ("%s: %s/%d: rn %p dequeued", __func__, buf, rn->p.prefixlen, rn);
-  if (rn->info)
-    UNSET_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED);  
-  route_unlock_node (rn); /* rib queue lock */
-  return WQ_SUCCESS;
+}
+
+/* Take a list of route_node structs and return 1, if there was a record picked from
+ * it and processed by rib_process(). Don't process more, than one RN record; operate
+ * only in the specified sub-queue.
+ */
+unsigned int
+process_subq (struct list * subq, u_char qindex)
+{
+  struct listnode *lnode;
+  struct route_node *rnode;
+  if (!(lnode = listhead (subq)))
+    return 0;
+  rnode = listgetdata (lnode);
+  rib_process (rnode);
+  if (rnode->info) /* The first RIB record is holding the flags bitmask. */
+    UNSET_FLAG (((struct rib *)rnode->info)->rn_status, RIB_ROUTE_QUEUED(qindex));
+  route_unlock_node (rnode);
+  list_delete_node (subq, lnode);
+  return 1;
+}
+
+/* Dispatch the meta queue by picking, processing and unlocking the next RN from
+ * a non-empty sub-queue with lowest priority. wq is equal to zebra->ribq and data
+ * is pointed to the meta queue structure.
+ */
+static wq_item_status
+meta_queue_process (struct work_queue *dummy, void *data)
+{
+  struct meta_queue * mq = data;
+  u_char i;
+  for (i = 0; i < MQ_SIZE; i++)
+    if (process_subq (mq->subq[i], i))
+    {
+      mq->size--;
+      break;
+    }
+  return mq->size ? WQ_REQUEUE : WQ_SUCCESS;
+}
+
+/* Look into the RN and queue it into one or more priority queues, increasing the size
+ * for each data push done.
+ */
+void rib_meta_queue_add (struct meta_queue *mq, struct route_node *rn)
+{
+  u_char qindex;
+  struct rib *rib;
+  char buf[INET6_ADDRSTRLEN];
+  if (IS_ZEBRA_DEBUG_RIB_Q)
+    inet_ntop (rn->p.family, &rn->p.u.prefix, buf, INET6_ADDRSTRLEN);
+  for (rib = rn->info; rib; rib = rib->next)
+  {
+    switch (rib->type)
+    {
+      case ZEBRA_ROUTE_KERNEL:
+      case ZEBRA_ROUTE_CONNECT:
+        qindex = 0;
+        break;
+      case ZEBRA_ROUTE_STATIC:
+        qindex = 1;
+        break;
+      case ZEBRA_ROUTE_RIP:
+      case ZEBRA_ROUTE_RIPNG:
+      case ZEBRA_ROUTE_OSPF:
+      case ZEBRA_ROUTE_OSPF6:
+      case ZEBRA_ROUTE_ISIS:
+        qindex = 2;
+        break;
+      case ZEBRA_ROUTE_BGP:
+        qindex = 3;
+        break;
+      default:
+        qindex = 4;
+        break;
+    }
+    /* Invariant: at this point we always have rn->info set. */
+    if (CHECK_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED(qindex)))
+    {
+      if (IS_ZEBRA_DEBUG_RIB_Q)
+        zlog_debug ("%s: %s/%d: rn %p is already queued in sub-queue %u", __func__, buf, rn->p.prefixlen, rn, qindex);
+      continue;
+    }
+    SET_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED(qindex));
+    listnode_add (mq->subq[qindex], rn);
+    route_lock_node (rn);
+    mq->size++;
+    if (IS_ZEBRA_DEBUG_RIB_Q)
+      zlog_debug ("%s: %s/%d: queued rn %p into sub-queue %u", __func__, buf, rn->p.prefixlen, rn, qindex);
+  }
 }
 
 /* Add route_node to work queue and schedule processing */
@@ -1202,17 +1286,6 @@
       return;
     }
 
-  /* Route-table node already queued, so nothing to do */
-  if (CHECK_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED))
-    {
-      if (IS_ZEBRA_DEBUG_RIB_Q)
-        zlog_debug ("%s: %s/%d: rn %p already queued", __func__, buf,
-          rn->p.prefixlen, rn);
-      return;
-    }
-
-  route_lock_node (rn); /* rib queue lock */
-
   if (IS_ZEBRA_DEBUG_RIB_Q)
     zlog_info ("%s: %s/%d: work queue added", __func__, buf, rn->p.prefixlen);
 
@@ -1221,13 +1294,21 @@
   if (zebra->ribq == NULL)
     {
       zlog_err ("%s: work_queue does not exist!", __func__);
-      route_unlock_node (rn);
       return;
     }
-  
-  work_queue_add (zebra->ribq, rn);
 
-  SET_FLAG (((struct rib *)rn->info)->rn_status, RIB_ROUTE_QUEUED);
+  /* The RIB queue should normally be either empty or holding the only work_queue_item
+   * element. In the latter case this element would hold a pointer to the meta queue
+   * structure, which must be used to actually queue the route nodes to process. So
+   * create the MQ holder, if necessary, then push the work into it in any case.
+   * This semantics was introduced after 0.99.9 release.
+   */
+
+  /* Should I invent work_queue_empty() and use it, or it's Ok to do as follows? */
+  if (!zebra->ribq->items->count)
+    work_queue_add (zebra->ribq, zebra->mq);
+
+  rib_meta_queue_add (zebra->mq, rn);
 
   if (IS_ZEBRA_DEBUG_RIB_Q)
     zlog_debug ("%s: %s/%d: rn %p queued", __func__, buf, rn->p.prefixlen, rn);
@@ -1235,6 +1316,30 @@
   return;
 }
 
+/* Create new meta queue. A destructor function doesn't seem to be necessary here. */
+struct meta_queue *
+meta_queue_new ()
+{
+  struct meta_queue *new;
+  unsigned i, failed = 0;
+
+  if ((new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct meta_queue))) == NULL)
+    return NULL;
+  for (i = 0; i < MQ_SIZE; i++)
+    if ((new->subq[i] = list_new ()) == NULL)
+      failed = 1;
+  if (failed)
+  {
+    for (i = 0; i < MQ_SIZE; i++)
+      if (new->subq[i])
+        list_delete (new->subq[i]);
+    XFREE (MTYPE_WORK_QUEUE, new);
+    return NULL;
+  }
+  new->size = 0;
+  return new;
+}
+
 /* initialise zebra rib work queue */
 static void
 rib_queue_init (struct zebra_t *zebra)
@@ -1249,12 +1354,17 @@
     }
 
   /* fill in the work queue spec */
-  zebra->ribq->spec.workfunc = &rib_process;
+  zebra->ribq->spec.workfunc = &meta_queue_process;
   zebra->ribq->spec.errorfunc = NULL;
   /* XXX: TODO: These should be runtime configurable via vty */
   zebra->ribq->spec.max_retries = 3;
   zebra->ribq->spec.hold = rib_process_hold_time;
   
+  if (!(zebra->mq = meta_queue_new ()))
+  {
+    zlog_err ("%s: could not initialise meta queue!", __func__);
+    return;
+  }
   return;
 }
 
@@ -1663,11 +1773,7 @@
     }
   }
   if (changed)
-  {
-    work_queue_aim_head (zebrad.ribq, 1);
     rib_queue_add (&zebrad, rn);
-    work_queue_aim_head (zebrad.ribq, 0);
-  }
 }
 
 int