[VOL-1416]
THis update fixes the problem described by the Jira above.
The affinity router's control plane for voltha now correctly
detects dynamic state changes.

Change-Id: I302ea65eb4f3618ae3cbcca7cd813d0b6cf4de50
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 06f4628..fe9a352 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -37,6 +37,7 @@
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	empty "github.com/golang/protobuf/ptypes/empty"
 	vpb "github.com/opencord/voltha-go/protos/voltha"
+	cmn "github.com/opencord/voltha-go/protos/common"
 	pb "github.com/opencord/voltha-go/protos/afrouter"
 	ic "github.com/opencord/voltha-go/protos/inter_container"
 )
@@ -68,6 +69,8 @@
 		dn bool
 }
 
+var nPods int = 6
+
 // Topic is affinityRouter
 // port: 9092
 
@@ -112,7 +115,7 @@
 
 func connect(addr string) (*grpc.ClientConn, error) {
 	for ctr :=0 ; ctr < 100; ctr++ {
-		log.Debug("Trying to connect to %s", addr)
+		log.Debugf("Trying to connect to %s", addr)
 		conn, err := grpc.Dial(addr, grpc.WithInsecure())
 		if err != nil {
 			log.Debugf("Attempt to connect failed, retrying %v:", err)
@@ -133,42 +136,73 @@
 	if err != nil {
 		panic(err.Error())
 	}
-	log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
+	//log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
 
-	for k,v := range pods.Items {
+	for _,v := range pods.Items {
 		if v.Namespace == "voltha" && coreFilter.MatchString(v.Name) {
-			fmt.Printf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
+			log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
 																	v.Status.PodIP, v.Spec.NodeName)
-			//fmt.Printf("Pod %v,%v\n\n\n",k,v)
-			_ = k
-			// Add this pod to the core structure.
-			rtrn = append(rtrn, &rwPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
-						  devIds:make(map[string]struct{}), backend:"", connection:""})
+			// Only add the pod if it has an IP address. If it doesn't then it likely crashed and
+			// and is still in the process of getting re-started.
+			if v.Status.PodIP != "" {
+				rtrn = append(rtrn, &rwPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
+							devIds:make(map[string]struct{}), backend:"", connection:""})
+			}
 		}
 	}
 	return rtrn
 }
 
-func queryDevices(pods []*rwPod) {
-	for pk,pv := range pods {
-		// Open a connection to the pod
-		// port 50057
-		conn, err := connect(pv.ipAddr+":50057")
-		if (err != nil) {
-			log.Debugf("Could not query devices from %s, could not connect", pv.name)
-			continue
+func reconcilePodDeviceIds(pod * rwPod, ids map[string]struct{}) bool {
+	var idList cmn.IDs
+	for k,_ := range(ids) {
+		 idList.Items = append(idList.Items, &cmn.ID{Id:k})
+	}
+	conn,err := connect(pod.ipAddr+":50057")
+	defer conn.Close()
+	if (err != nil) {
+		log.Debugf("Could not query devices from %s, could not connect", pod.name)
+		return false
+	}
+	client := vpb.NewVolthaServiceClient(conn)
+	_,err = client.ReconcileDevices(context.Background(), &idList)
+	if err != nil {
+		log.Error(err)
+		return false
+	}
+
+	return true
+}
+
+func queryPodDeviceIds(pod * rwPod) map[string]struct{} {
+	var rtrn map[string]struct{} = make(map[string]struct{})
+	// Open a connection to the pod
+	// port 50057
+	conn, err := connect(pod.ipAddr+":50057")
+	if (err != nil) {
+		log.Debugf("Could not query devices from %s, could not connect", pod.name)
+		return rtrn
+	}
+	defer conn.Close()
+	client := vpb.NewVolthaServiceClient(conn)
+	devs,err := client.ListDeviceIds(context.Background(), &empty.Empty{})
+	if err != nil {
+		log.Error(err)
+		return rtrn
+	}
+	for _,dv := range devs.Items {
+		rtrn[dv.Id]=struct{}{}
+	}
+
+	return rtrn
+}
+
+func queryDeviceIds(pods []*rwPod) {
+	for pk,_ := range pods {
+		// Keep the old Id list if a new list is not returned
+		if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
+			pods[pk].devIds = idList
 		}
-		client := vpb.NewVolthaServiceClient(conn)
-		devs,err := client.ListDevices(context.Background(), &empty.Empty{})
-		if err != nil {
-			log.Error(err)
-			conn.Close()
-			continue
-		}
-		for _,dv := range devs.Items {
-			pods[pk].devIds[dv.Id]=struct{}{}
-		}
-		conn.Close()
 	}
 }
 
@@ -723,6 +757,119 @@
 	return nil
 }
 
+func deepCopyCoreGroups(coreGroups [][]*rwPod) ([][]*rwPod) {
+	var rtrn [][]*rwPod
+	return rtrn
+}
+
+
+// Determines which items in core groups
+// have changed based on the list provided
+// and returns a coreGroup with only the changed
+// items and a pod list with the new items
+func getAddrDiffs(coreGroups [][]*rwPod, rwPods []*rwPod) ([][]*rwPod, []*rwPod) {
+	var nList []*rwPod
+	var rtrn [][]*rwPod = make([][]*rwPod, nPods>>1)
+	var ipAddrs map[string]struct{} = make(map[string]struct{})
+
+	log.Debug("Get addr diffs")
+
+	// Start with an empty array
+	for k,_ := range(rtrn) {
+		rtrn[k] = make([]*rwPod, 2)
+	}
+
+	// Build a list with only the new items
+	for _,v := range(rwPods) {
+		if hasIpAddr(coreGroups, v.ipAddr) == false {
+			nList = append(nList, v)
+		}
+		ipAddrs[v.ipAddr] = struct{}{} // for the search below
+	}
+
+	// Now build the coreGroups with only the changed items
+	for k1,v1 := range(coreGroups) {
+		for k2,v2 := range(v1) {
+			if _,ok := ipAddrs[v2.ipAddr]; ok == false {
+				rtrn[k1][k2] = v2
+			}
+		}
+	}
+	return rtrn, nList
+}
+
+// Figure out where best to put the new pods
+// in the coreGroup array based on the old
+// pods being replaced. The criteria is that
+// the new pod be on the same server as the
+// old pod was.
+func reconcileAddrDiffs(coreGroupDiffs [][]*rwPod, rwPodDiffs []*rwPod) ([][]*rwPod) {
+	var srvrs map[string][]*rwPod = make(map[string][]*rwPod)
+
+	log.Debug("Reconciling diffs")
+	log.Debug("Building server list")
+	for _,v := range(rwPodDiffs) {
+		log.Debugf("Adding %v to the server list", *v)
+		srvrs[v.node] = append(srvrs[v.node], v)
+	}
+
+	for k1,v1 := range(coreGroupDiffs) {
+		log.Debugf("k1:%v, v1:%v", k1,v1)
+		for k2,v2 :=  range(v1) {
+			log.Debugf("k2:%v, v2:%v", k2,v2)
+			if v2 == nil { // Nothing to do here
+				continue
+			}
+			if _,ok := srvrs[v2.node]; ok == true {
+				coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
+				if len(srvrs[v2.node]) > 1 { // remove one entry from the list
+					srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
+				} else { // Delete the endtry from the map
+					delete(srvrs, v2.node)
+				}
+			} else {
+				log.Error("This should never happen, node appears to have changed names")
+				// attempt to limp along by keeping this old entry
+			}
+		}
+	}
+
+	return coreGroupDiffs
+}
+
+func applyAddrDiffs(client pb.ConfigurationClient, coreGroups [][]*rwPod, rwPods []*rwPod) {
+	var newEntries [][]*rwPod
+
+	log.Debug("Applying diffs")
+	newEntries = reconcileAddrDiffs(getAddrDiffs(coreGroups, rwPods))
+
+	// Now replace the information in coreGropus with the new
+	// entries and then reconcile the device ids on the core
+	// that's in the new entry with the device ids of it's
+	// active-active peer.
+	for k1,v1 := range(coreGroups) {
+		for k2,v2 := range(v1) {
+			if newEntries[k1][k2] != nil {
+				// TODO: Missing is the case where bothe the primary
+				// and the secondary core crash and come back.
+				// Pull the device ids from the active-active peer
+				ids := queryPodDeviceIds(coreGroups[k1][k2^1])
+				if len(ids) != 0 {
+					if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
+						log.Errorf("Attempt to reconcile ids on pod %v failed",newEntries[k1][k2])
+					}
+				}
+				// Send the affininty router new connection information
+				setConnection(client, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
+				// Copy the new entry information over
+				coreGroups[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
+				coreGroups[k1][k2].name = newEntries[k1][k2].name
+				coreGroups[k1][k2].devIds = ids
+			}
+		}
+	}
+}
+
 func startCoreMonitor(client pb.ConfigurationClient,
 					clientset *kubernetes.Clientset,
 					coreFltr *regexp.Regexp,
@@ -746,6 +893,7 @@
 		time.Sleep(10 * time.Second) // Wait a while
 		// Get the rw core list from k8s
 		rwPods := getRwPods(clientset, coreFltr)
+		queryDeviceIds(rwPods)
 		// If we didn't get 2n+1 pods then wait since
 		// something is down and will hopefully come
 		// back up at some point.
@@ -756,16 +904,27 @@
 		// We have all pods, check if any IP addresses
 		// have changed.
 		for _,v := range rwPods {
-			//if hasIpAddr(coreGroups, v.ipAddr) == false {
-				//log.Debug("Address has changed...")
-				//applyAddrDiffs(coreGroups, rwPods)
-			//}
-			_ = v
+			if hasIpAddr(coreGroups, v.ipAddr) == false {
+				log.Debug("Address has changed...")
+				applyAddrDiffs(client, coreGroups, rwPods)
+
+			}
 		}
 	}
-
 }
 
+func hasIpAddr(coreGroups [][]*rwPod, ipAddr string) bool {
+	for _,v1 := range(coreGroups) {
+		for _,v2 := range(v1) {
+			if v2.ipAddr == ipAddr {
+				return true
+			}
+		}
+	}
+	return false
+}
+
+
 func main() {
 	// This is currently hard coded to a cluster with 3 servers
 	//var connections map[string]configConn = make(map[string]configConn)
@@ -798,7 +957,7 @@
 	rwPods := getRwPods(clientset, coreFltr)
 
 	// Fetch the devices held by each running core
-	queryDevices(rwPods)
+	queryDeviceIds(rwPods)
 
 	// For debugging... comment out l8r
 	for _,v := range rwPods {