Changed algorithm to wait for all containers to be running.

- Extra wait loop on startup, to wait for rw/ro pods to come online.
- Removed crashed pods from consideration (more stringent checks so this actually works).
- Now passing context to southbound requests where it was missed before.

Change-Id: I667e17c324282e0af87e8282195eb6632c3424ab
diff --git a/Gopkg.lock b/Gopkg.lock
index 407375c..3105662 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -814,6 +814,7 @@
     "google.golang.org/grpc/metadata",
     "google.golang.org/grpc/status",
     "gopkg.in/Shopify/sarama.v1",
+    "k8s.io/api/core/v1",
     "k8s.io/apimachinery/pkg/apis/meta/v1",
     "k8s.io/client-go/kubernetes",
     "k8s.io/client-go/rest",
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index bfd7ba9..3516261 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -22,6 +22,7 @@
 	"fmt"
 	"google.golang.org/grpc/connectivity"
 	"google.golang.org/grpc/keepalive"
+	"k8s.io/api/core/v1"
 	"math"
 	"os"
 	"path"
@@ -169,32 +170,42 @@
 	return conn, err
 }
 
-func getVolthaPods(cs *kubernetes.Clientset, coreFilter *regexp.Regexp) ([]*volthaPod, error) {
+func getVolthaPods(cs *kubernetes.Clientset) ([]*volthaPod, []*volthaPod, error) {
 	pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
-	//log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
 
-	var rtrn []*volthaPod
+	// Set up the regular expression to identify the voltha cores
+	rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
+	roCoreFltr := regexp.MustCompile(`ro-core-`)
+
+	var rwPods, roPods []*volthaPod
+items:
 	for _, v := range pods.Items {
-		if coreFilter.MatchString(v.Name) {
-			log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
-				v.Status.PodIP, v.Spec.NodeName)
-			// Only add the pod if it has an IP address. If it doesn't then it likely crashed and
-			// and is still in the process of getting re-started.
-			if v.Status.PodIP != "" {
-				rtrn = append(rtrn, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName,
-					devIds: make(map[string]struct{}), backend: "", connection: ""})
+		// only pods that are actually running should be considered
+		if v.Status.Phase == v1.PodRunning {
+			for _, condition := range v.Status.Conditions {
+				if condition.Status != v1.ConditionTrue {
+					continue items
+				}
+			}
+
+			if rwCoreFltr.MatchString(v.Name) {
+				log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name, v.Status.PodIP, v.Spec.NodeName)
+				rwPods = append(rwPods, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName, devIds: make(map[string]struct{}), backend: "", connection: ""})
+			} else if roCoreFltr.MatchString(v.Name) {
+				log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name, v.Status.PodIP, v.Spec.NodeName)
+				roPods = append(roPods, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName, devIds: make(map[string]struct{}), backend: "", connection: ""})
 			}
 		}
 	}
-	return rtrn, nil
+	return rwPods, roPods, nil
 }
 
-func reconcilePodDeviceIds(pod *volthaPod, ids map[string]struct{}) bool {
-	ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
-	conn, err := connect(ctx, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
+func reconcilePodDeviceIds(ctx context.Context, pod *volthaPod, ids map[string]struct{}) bool {
+	ctxTimeout, _ := context.WithTimeout(ctx, time.Second*5)
+	conn, err := connect(ctxTimeout, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
 	if err != nil {
 		log.Debugf("Could not query devices from %s, could not connect", pod.name)
 		return false
@@ -207,7 +218,7 @@
 	}
 
 	client := vpb.NewVolthaServiceClient(conn)
-	_, err = client.ReconcileDevices(context.Background(), &idList)
+	_, err = client.ReconcileDevices(ctx, &idList)
 	if err != nil {
 		log.Error(err)
 		return false
@@ -216,18 +227,18 @@
 	return true
 }
 
-func queryPodDeviceIds(pod *volthaPod) map[string]struct{} {
+func queryPodDeviceIds(ctx context.Context, pod *volthaPod) map[string]struct{} {
 	var rtrn = make(map[string]struct{})
 	// Open a connection to the pod
-	ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
-	conn, err := connect(ctx, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
+	ctxTimeout, _ := context.WithTimeout(ctx, time.Second*5)
+	conn, err := connect(ctxTimeout, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
 	if err != nil {
 		log.Debugf("Could not query devices from %s, could not connect", pod.name)
 		return rtrn
 	}
 	defer conn.Close()
 	client := vpb.NewVolthaServiceClient(conn)
-	devs, err := client.ListDeviceIds(context.Background(), &empty.Empty{})
+	devs, err := client.ListDeviceIds(ctx, &empty.Empty{})
 	if err != nil {
 		log.Error(err)
 		return rtrn
@@ -239,10 +250,10 @@
 	return rtrn
 }
 
-func queryDeviceIds(pods []*volthaPod) {
+func queryDeviceIds(ctx context.Context, pods []*volthaPod) {
 	for pk := range pods {
 		// Keep the old Id list if a new list is not returned
-		if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
+		if idList := queryPodDeviceIds(ctx, pods[pk]); len(idList) != 0 {
 			pods[pk].devIds = idList
 		}
 	}
@@ -645,12 +656,10 @@
 }
 
 func applyAddrDiffs(ctx context.Context, client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
-	var newEntries [][]*volthaPod
-
 	log.Debug("Applying diffs")
 	switch cores := coreList.(type) {
 	case [][]*volthaPod:
-		newEntries = reconcileAddrDiffs(getAddrDiffs(cores, nPods))
+		newEntries := reconcileAddrDiffs(getAddrDiffs(cores, nPods))
 
 		// Now replace the information in coreGropus with the new
 		// entries and then reconcile the device ids on the core
@@ -662,9 +671,9 @@
 					// TODO: Missing is the case where bothe the primary
 					// and the secondary core crash and come back.
 					// Pull the device ids from the active-active peer
-					ids := queryPodDeviceIds(cores[k1][k2^1])
+					ids := queryPodDeviceIds(ctx, cores[k1][k2^1])
 					if len(ids) != 0 {
-						if !reconcilePodDeviceIds(newEntries[k1][k2], ids) {
+						if !reconcilePodDeviceIds(ctx, newEntries[k1][k2], ids) {
 							log.Errorf("Attempt to reconcile ids on pod %v failed", newEntries[k1][k2])
 						}
 					}
@@ -719,16 +728,17 @@
 }
 
 func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
-	var byName = make(map[string]*volthaPod)
-
 	// Convenience
+	var byName = make(map[string]*volthaPod)
 	for _, v := range rwPods {
 		byName[v.name] = v
 	}
 
 	for k1, v1 := range coreGroups {
-		for k2 := range v1 {
-			coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
+		for k2, v2 := range v1 {
+			if pod, have := byName[v2.name]; have {
+				coreGroups[k1][k2].devIds = pod.devIds
+			}
 		}
 	}
 }
@@ -736,8 +746,6 @@
 func startCoreMonitor(ctx context.Context,
 	client pb.ConfigurationClient,
 	clientset *kubernetes.Clientset,
-	rwCoreFltr *regexp.Regexp,
-	roCoreFltr *regexp.Regexp,
 	coreGroups [][]*volthaPod,
 	oRoPods []*volthaPod) {
 	// Now that initial allocation has been completed, monitor the pods
@@ -765,20 +773,23 @@
 		}
 
 		// Get the rw core list from k8s
-		rwPods, err := getVolthaPods(clientset, rwCoreFltr)
+		rwPods, roPods, err := getVolthaPods(clientset)
 		if err != nil {
 			log.Error(err)
 			continue
 		}
 
-		queryDeviceIds(rwPods)
-		updateDeviceIds(coreGroups, rwPods)
 		// If we didn't get 2n+1 pods then wait since
 		// something is down and will hopefully come
 		// back up at some point.
 		if len(rwPods) != numRWPods {
+			log.Debug("One or more RW pod(s) are offline, will wait and retry")
 			continue
 		}
+
+		queryDeviceIds(ctx, rwPods)
+		updateDeviceIds(coreGroups, rwPods)
+
 		// We have all pods, check if any IP addresses
 		// have changed.
 		for _, v := range rwPods {
@@ -789,13 +800,8 @@
 			}
 		}
 
-		roPods, err := getVolthaPods(clientset, roCoreFltr)
-		if err != nil {
-			log.Error(err)
-			continue
-		}
-
 		if len(roPods) != numROPods {
+			log.Debug("One or more RO pod(s) are offline, will wait and retry")
 			continue
 		}
 		for _, v := range roPods {
@@ -891,18 +897,30 @@
 // generateAndMaintainConfiguration does the pod-reconciliation work,
 // it only returns once all sub-processes have completed
 func generateAndMaintainConfiguration(ctx context.Context, client pb.ConfigurationClient, clientset *kubernetes.Clientset) {
-	// Set up the regular expression to identify the voltha cores
-	rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
-	roCoreFltr := regexp.MustCompile(`ro-core-`)
+	// Get the voltha rw-/ro-core pods
+	var rwPods, roPods []*volthaPod
+	for {
+		var err error
+		if rwPods, roPods, err = getVolthaPods(clientset); err != nil {
+			log.Error(err)
+			return
+		}
 
-	// Get the voltha rw-core podes
-	rwPods, err := getVolthaPods(clientset, rwCoreFltr)
-	if err != nil {
-		panic(err)
+		if len(rwPods) == numRWPods && len(roPods) == numROPods {
+			break
+		}
+
+		log.Debug("One or more RW/RO pod(s) are offline, will wait and retry")
+		select {
+		case <-ctx.Done():
+			return
+		case <-time.After(time.Second * 5):
+			// retry
+		}
 	}
 
 	// Fetch the devices held by each running core
-	queryDeviceIds(rwPods)
+	queryDeviceIds(ctx, rwPods)
 
 	// For debugging... comment out l8r
 	for _, v := range rwPods {
@@ -912,8 +930,8 @@
 	coreGroups := groupPods1(rwPods)
 
 	// Assign the groupings to the the backends and connections
-	for k := range coreGroups {
-		for k1 := range coreGroups[k] {
+	for k, coresInGroup := range coreGroups {
+		for k1 := range coresInGroup {
 			coreGroups[k][k1].cluster = afrouterRWClusterName
 			coreGroups[k][k1].backend = afrouterRWClusterName + strconv.Itoa(k+1)
 			coreGroups[k][k1].connection = afrouterRWClusterName + strconv.Itoa(k+1) + strconv.Itoa(k1+1)
@@ -941,10 +959,6 @@
 	}
 
 	// Process the read only pods
-	roPods, err := getVolthaPods(clientset, roCoreFltr)
-	if err != nil {
-		panic(err)
-	}
 	for k, v := range roPods {
 		log.Debugf("Processing ro_pod %v", v)
 		vN := afrouterROClusterName + strconv.Itoa(k+1)
@@ -959,7 +973,7 @@
 	doneCh, _ := startDiscoveryMonitor(ctx, client, coreGroups)
 
 	log.Info("Starting core monitoring")
-	startCoreMonitor(ctx, client, clientset, rwCoreFltr, roCoreFltr, coreGroups, roPods)
+	startCoreMonitor(ctx, client, clientset, coreGroups, roPods)
 
 	//ensure the discovery monitor to quit
 	<-doneCh