Core pre-pairing.

This requires changes to voltha-helm-charts to work correctly, please consider/merge both patches together.

- Removed existing core pairing/re-pairing logic.
- Removed SetConnection calls to the affinity router, re-routing when cores move is now handled by headless k8s services.  (See voltha-helm-charts change.)
- Reworked deviceId polling, core syncing, and afrouter affinity configuration.  New algorithm has been drastically simplified.
- Removed wait for all RW/RO cores to be up.  Since pairing is no longer handled here, the location of every core doesn't need to be known.
- Removed all references to RO cores, as all configuration now handled by k8s headless services.  (See voltha-helm-charts change.)
- Fixed a bug where the kafka device monitor was incorrectly configuring the affinity router due to a deviceId being passed where a coreId was expected.  Rather hackish solution determines which backend to use from the kafka message's publisher.

Change-Id: I7b1c119b2dd772e2122767d16d1c1d03c387db90
diff --git a/afrouter/arouter.json b/afrouter/arouter.json
index c1530a7..7739d65 100644
--- a/afrouter/arouter.json
+++ b/afrouter/arouter.json
@@ -176,13 +176,13 @@
           "connections": [
             {
               "name": "vcore11",
-              "addr": "",
-              "port": ""
+              "addr": "voltha-rw-core-11.voltha.svc.cluster.local",
+              "port": "50057"
             },
             {
               "name": "vcore12",
-              "addr": "",
-              "port": ""
+              "addr": "voltha-rw-core-12.voltha.svc.cluster.local",
+              "port": "50057"
             }
           ]
         },
@@ -198,13 +198,13 @@
           "connections": [
             {
               "name": "vcore21",
-              "addr": "",
-              "port": ""
+              "addr": "voltha-rw-core-21.voltha.svc.cluster.local",
+              "port": "50057"
             },
             {
               "name": "vcore22",
-              "addr": "",
-              "port": ""
+              "addr": "voltha-rw-core-22.voltha.svc.cluster.local",
+              "port": "50057"
             }
           ]
         },
@@ -220,13 +220,13 @@
           "connections": [
             {
               "name": "vcore31",
-              "addr": "",
-              "port": ""
+              "addr": "voltha-rw-core-31.voltha.svc.cluster.local",
+              "port": "50057"
             },
             {
               "name": "vcore32",
-              "addr": "",
-              "port": ""
+              "addr": "voltha-rw-core-32.voltha.svc.cluster.local",
+              "port": "50057"
             }
           ]
         }
@@ -241,8 +241,8 @@
           "connections": [
             {
               "name": "ro_vcore11",
-              "addr": "",
-              "port": ""
+              "addr": "voltha-ro-core-0.voltha-ro-core.voltha.svc.cluster.local",
+              "port": "50057"
             }
           ]
         },
@@ -252,8 +252,8 @@
           "connections": [
             {
               "name": "ro_vcore21",
-              "addr": "",
-              "port": ""
+              "addr": "voltha-ro-core-1.voltha-ro-core.voltha.svc.cluster.local",
+              "port": "50057"
             }
           ]
         },
@@ -263,8 +263,8 @@
           "connections": [
             {
               "name": "ro_vcore31",
-              "addr": "",
-              "port": ""
+              "addr": "voltha-ro-core-2.voltha-ro-core.voltha.svc.cluster.local",
+              "port": "50057"
             }
           ]
         }
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 3516261..538d4c8 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -52,16 +52,10 @@
 	ipAddr     string
 	node       string
 	devIds     map[string]struct{}
-	cluster    string
 	backend    string
 	connection string
 }
 
-type podTrack struct {
-	pod *volthaPod
-	dn  bool
-}
-
 type Configuration struct {
 	DisplayVersionOnly *bool
 }
@@ -71,17 +65,17 @@
 	k8sApiServer      = getStrEnv("K8S_API_SERVER", "")
 	k8sKubeConfigPath = getStrEnv("K8S_KUBE_CONFIG_PATH", "")
 
-	podNamespace = getStrEnv("POD_NAMESPACE", "voltha")
-	podGrpcPort  = uint64(getIntEnv("POD_GRPC_PORT", 0, math.MaxUint16, 50057))
+	podNamespace          = getStrEnv("POD_NAMESPACE", "voltha")
+	podLabelSelector      = getStrEnv("POD_LABEL_SELECTOR", "app=rw-core")
+	podAffinityGroupLabel = getStrEnv("POD_AFFINITY_GROUP_LABEL", "affinity-group")
 
-	numRWPods = getIntEnv("NUM_RW_PODS", 1, math.MaxInt32, 6)
-	numROPods = getIntEnv("NUM_RO_PODS", 1, math.MaxInt32, 3)
+	podGrpcPort = uint64(getIntEnv("POD_GRPC_PORT", 0, math.MaxUint16, 50057))
 
 	afrouterApiAddress = getStrEnv("AFROUTER_API_ADDRESS", "localhost:55554")
 
 	afrouterRouterName    = getStrEnv("AFROUTER_ROUTER_NAME", "vcore")
+	afrouterRouteName     = getStrEnv("AFROUTER_ROUTE_NAME", "dev_manager")
 	afrouterRWClusterName = getStrEnv("AFROUTER_RW_CLUSTER_NAME", "vcore")
-	afrouterROClusterName = getStrEnv("AFROUTER_RO_CLUSTER_NAME", "ro_vcore")
 
 	kafkaTopic      = getStrEnv("KAFKA_TOPIC", "AffinityRouter")
 	kafkaClientType = getStrEnv("KAFKA_CLIENT_TYPE", "sarama")
@@ -109,7 +103,6 @@
 }
 
 func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
-
 	log.Infow("kafka-client-type", log.Fields{"client": clientType})
 	switch clientType {
 	case "sarama":
@@ -170,17 +163,13 @@
 	return conn, err
 }
 
-func getVolthaPods(cs *kubernetes.Clientset) ([]*volthaPod, []*volthaPod, error) {
-	pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
+func getVolthaPods(cs *kubernetes.Clientset) ([]*volthaPod, error) {
+	pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{LabelSelector: podLabelSelector})
 	if err != nil {
-		return nil, nil, err
+		return nil, err
 	}
 
-	// Set up the regular expression to identify the voltha cores
-	rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
-	roCoreFltr := regexp.MustCompile(`ro-core-`)
-
-	var rwPods, roPods []*volthaPod
+	var rwPods []*volthaPod
 items:
 	for _, v := range pods.Items {
 		// only pods that are actually running should be considered
@@ -191,24 +180,29 @@
 				}
 			}
 
-			if rwCoreFltr.MatchString(v.Name) {
+			if group, have := v.Labels[podAffinityGroupLabel]; have {
 				log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name, v.Status.PodIP, v.Spec.NodeName)
-				rwPods = append(rwPods, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName, devIds: make(map[string]struct{}), backend: "", connection: ""})
-			} else if roCoreFltr.MatchString(v.Name) {
-				log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name, v.Status.PodIP, v.Spec.NodeName)
-				roPods = append(roPods, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName, devIds: make(map[string]struct{}), backend: "", connection: ""})
+				rwPods = append(rwPods, &volthaPod{
+					name:    v.Name,
+					ipAddr:  v.Status.PodIP,
+					node:    v.Spec.NodeName,
+					devIds:  make(map[string]struct{}),
+					backend: afrouterRWClusterName + group,
+				})
+			} else {
+				log.Warnf("Pod %s found matching % without label %", v.Name, podLabelSelector, podAffinityGroupLabel)
 			}
 		}
 	}
-	return rwPods, roPods, nil
+	return rwPods, nil
 }
 
-func reconcilePodDeviceIds(ctx context.Context, pod *volthaPod, ids map[string]struct{}) bool {
+func reconcilePodDeviceIds(ctx context.Context, pod *volthaPod, ids map[string]struct{}) {
 	ctxTimeout, _ := context.WithTimeout(ctx, time.Second*5)
 	conn, err := connect(ctxTimeout, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
 	if err != nil {
-		log.Debugf("Could not query devices from %s, could not connect", pod.name)
-		return false
+		log.Debugf("Could not reconcile devices from %s, could not connect: %s", pod.name, err)
+		return
 	}
 	defer conn.Close()
 
@@ -220,619 +214,155 @@
 	client := vpb.NewVolthaServiceClient(conn)
 	_, err = client.ReconcileDevices(ctx, &idList)
 	if err != nil {
-		log.Error(err)
-		return false
+		log.Errorf("Attempt to reconcile ids on pod %s failed: %s", pod.name, err)
+		return
 	}
-
-	return true
 }
 
 func queryPodDeviceIds(ctx context.Context, pod *volthaPod) map[string]struct{} {
-	var rtrn = make(map[string]struct{})
-	// Open a connection to the pod
 	ctxTimeout, _ := context.WithTimeout(ctx, time.Second*5)
 	conn, err := connect(ctxTimeout, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
 	if err != nil {
-		log.Debugf("Could not query devices from %s, could not connect", pod.name)
-		return rtrn
+		log.Debugf("Could not query devices from %s, could not connect: %s", pod.name, err)
+		return nil
 	}
 	defer conn.Close()
+
 	client := vpb.NewVolthaServiceClient(conn)
 	devs, err := client.ListDeviceIds(ctx, &empty.Empty{})
 	if err != nil {
 		log.Error(err)
-		return rtrn
+		return nil
 	}
+
+	var ret = make(map[string]struct{})
 	for _, dv := range devs.Items {
-		rtrn[dv.Id] = struct{}{}
+		ret[dv.Id] = struct{}{}
 	}
-
-	return rtrn
+	return ret
 }
 
-func queryDeviceIds(ctx context.Context, pods []*volthaPod) {
-	for pk := range pods {
-		// Keep the old Id list if a new list is not returned
-		if idList := queryPodDeviceIds(ctx, pods[pk]); len(idList) != 0 {
-			pods[pk].devIds = idList
-		}
-	}
-}
-
-func allEmpty(pods []*volthaPod) bool {
-	for k := range pods {
-		if len(pods[k].devIds) != 0 {
-			return false
-		}
-	}
-	return true
-}
-
-func rmPod(pods []*volthaPod, idx int) []*volthaPod {
-	return append(pods[:idx], pods[idx+1:]...)
-}
-
-func groupIntersectingPods1(pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
-	var rtrn [][]*volthaPod
-	var out []*volthaPod
-
-	for {
-		if len(pods) == 0 {
-			break
-		}
-		if len(pods[0].devIds) == 0 { // Ignore pods with no devices
-			////log.Debugf("%s empty pod", pd[k].pod.name)
-			out = append(out, pods[0])
-			pods = rmPod(pods, 0)
-			continue
-		}
-		// Start a pod group with this pod
-		var grp []*volthaPod
-		grp = append(grp, pods[0])
-		pods = rmPod(pods, 0)
-		//log.Debugf("Creating new group %s", pd[k].pod.name)
-		// Find the peer pod based on device overlap
-		// It's ok if one isn't found, an empty one will be used instead
-		for k := range pods {
-			if len(pods[k].devIds) == 0 { // Skip pods with no devices
-				//log.Debugf("%s empty pod", pd[k1].pod.name)
-				continue
-			}
-			if intersect(grp[0].devIds, pods[k].devIds) {
-				//log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
-				if grp[0].node == pods[k].node {
-					// This should never happen
-					log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
-						grp[0].name, pods[k].name)
-					continue
-				}
-				grp = append(grp, pods[k])
-				pods = rmPod(pods, k)
-				break
-
-			}
-		}
-		rtrn = append(rtrn, grp)
-		//log.Debugf("Added group %s", grp[0].name)
-		// Check if the number of groups = half the pods, if so all groups are started.
-		if len(rtrn) == podCt>>1 {
-			// Append any remaining pods to out
-			out = append(out, pods[0:]...)
-			break
-		}
-	}
-	return rtrn, out
-}
-
-func unallocPodCount(pd []*podTrack) int {
-	var rtrn int = 0
-	for _, v := range pd {
-		if !v.dn {
-			rtrn++
-		}
-	}
-	return rtrn
-}
-
-func sameNode(pod *volthaPod, grps [][]*volthaPod) bool {
-	for _, v := range grps {
-		if v[0].node == pod.node {
-			return true
-		}
-		if len(v) == 2 && v[1].node == pod.node {
-			return true
-		}
-	}
-	return false
-}
-
-func startRemainingGroups1(grps [][]*volthaPod, pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
-	var grp []*volthaPod
-
-	for k := range pods {
-		if sameNode(pods[k], grps) {
-			continue
-		}
-		grp = []*volthaPod{}
-		grp = append(grp, pods[k])
-		pods = rmPod(pods, k)
-		grps = append(grps, grp)
-		if len(grps) == podCt>>1 {
-			break
-		}
-	}
-	return grps, pods
-}
-
-func hasSingleSecondNode(grp []*volthaPod) bool {
-	var servers = make(map[string]struct{})
-	for k := range grp {
-		if k == 0 {
-			continue // Ignore the first item
-		}
-		servers[grp[k].node] = struct{}{}
-	}
-	if len(servers) == 1 {
-		return true
-	}
-	return false
-}
-
-func addNode(grps [][]*volthaPod, idx *volthaPod, item *volthaPod) [][]*volthaPod {
-	for k := range grps {
-		if grps[k][0].name == idx.name {
-			grps[k] = append(grps[k], item)
-			return grps
-		}
-	}
-	// TODO: Error checking required here.
-	return grps
-}
-
-func removeNode(grps [][]*volthaPod, item *volthaPod) [][]*volthaPod {
-	for k := range grps {
-		for k1 := range grps[k] {
-			if grps[k][k1].name == item.name {
-				grps[k] = append(grps[k][:k1], grps[k][k1+1:]...)
-				break
-			}
-		}
-	}
-	return grps
-}
-
-func groupRemainingPods1(grps [][]*volthaPod, pods []*volthaPod) [][]*volthaPod {
-	var lgrps [][]*volthaPod
-	// All groups must be started when this function is called.
-	// Copy incomplete groups
-	for k := range grps {
-		if len(grps[k]) != 2 {
-			lgrps = append(lgrps, grps[k])
-		}
-	}
-
-	// Add all pairing candidates to each started group.
-	for k := range pods {
-		for k2 := range lgrps {
-			if lgrps[k2][0].node != pods[k].node {
-				lgrps[k2] = append(lgrps[k2], pods[k])
-			}
-		}
-	}
-
-	//TODO: If any member of lgrps doesn't have at least 2
-	// nodes something is wrong. Check for that here
-
-	for {
-		for { // Address groups with only a single server choice
-			var ssn bool = false
-
-			for k := range lgrps {
-				// Now if any of the groups only have a single
-				// node as the choice for the second member
-				// address that one first.
-				if hasSingleSecondNode(lgrps[k]) {
-					ssn = true
-					// Add this pairing to the groups
-					grps = addNode(grps, lgrps[k][0], lgrps[k][1])
-					// Since this node is now used, remove it from all
-					// remaining tenative groups
-					lgrps = removeNode(lgrps, lgrps[k][1])
-					// Now remove this group completely since
-					// it's been addressed
-					lgrps = append(lgrps[:k], lgrps[k+1:]...)
-					break
-				}
-			}
-			if !ssn {
-				break
-			}
-		}
-		// Now address one of the remaining groups
-		if len(lgrps) == 0 {
-			break // Nothing left to do, exit the loop
-		}
-		grps = addNode(grps, lgrps[0][0], lgrps[0][1])
-		lgrps = removeNode(lgrps, lgrps[0][1])
-		lgrps = append(lgrps[:0], lgrps[1:]...)
-	}
-	return grps
-}
-
-func groupPods1(pods []*volthaPod) [][]*volthaPod {
-	var rtrn [][]*volthaPod
-	var podCt int = len(pods)
-
-	rtrn, pods = groupIntersectingPods1(pods, podCt)
-	// There are several outcomes here
-	// 1) All pods have been paired and we're done
-	// 2) Some un-allocated pods remain
-	// 2.a) All groups have been started
-	// 2.b) Not all groups have been started
-	if len(pods) == 0 {
-		return rtrn
-	} else if len(rtrn) == podCt>>1 { // All groupings started
-		// Allocate the remaining (presumably empty) pods to the started groups
-		return groupRemainingPods1(rtrn, pods)
-	} else { // Some groupings started
-		// Start empty groups with remaining pods
-		// each grouping is on a different server then
-		// allocate remaining pods.
-		rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
-		return groupRemainingPods1(rtrn, pods)
-	}
-}
-
-func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
-	for k := range d1 {
-		if _, ok := d2[k]; ok {
-			return true
-		}
-	}
-	return false
-}
-
-func setConnection(ctx context.Context, client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
-	log.Debugf("Configuring backend %s : connection %s in cluster %s\n\n",
-		backend, connection, cluster)
-	cnf := &pb.Conn{Server: "grpc_command", Cluster: cluster, Backend: backend,
-		Connection: connection, Addr: addr,
-		Port: port}
-	if res, err := client.SetConnection(ctx, cnf); err != nil {
-		log.Debugf("failed SetConnection RPC call: %s", err)
+func setAffinity(ctx context.Context, client pb.ConfigurationClient, deviceId string, backend string) {
+	log.Debugf("Configuring backend %s with device id %s \n", backend, deviceId)
+	if res, err := client.SetAffinity(ctx, &pb.Affinity{
+		Router:  afrouterRouterName,
+		Route:   afrouterRouteName,
+		Cluster: afrouterRWClusterName,
+		Backend: backend,
+		Id:      deviceId,
+	}); err != nil {
+		log.Debugf("failed affinity RPC call: %s\n", err)
 	} else {
-		log.Debugf("Result: %v", res)
+		log.Debugf("Result: %v\n", res)
 	}
 }
 
-func setAffinity(ctx context.Context, client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
-	log.Debugf("Configuring backend %s : affinities \n", backend)
-	aff := &pb.Affinity{Router: afrouterRouterName, Route: "dev_manager", Cluster: afrouterRWClusterName, Backend: backend}
-	for k := range ids {
-		log.Debugf("Setting affinity for id %s", k)
-		aff.Id = k
-		if res, err := client.SetAffinity(ctx, aff); err != nil {
-			log.Debugf("failed affinity RPC call: %s", err)
-		} else {
-			log.Debugf("Result: %v", res)
-		}
-	}
-}
-
-func getBackendForCore(coreId string, coreGroups [][]*volthaPod) string {
-	for _, v := range coreGroups {
-		for _, v2 := range v {
-			if v2.name == coreId {
-				return v2.backend
-			}
-		}
-	}
-	log.Errorf("No backend found for core %s\n", coreId)
-	return ""
-}
-
-func monitorDiscovery(ctx context.Context,
-	client pb.ConfigurationClient,
-	ch <-chan *ic.InterContainerMessage,
-	coreGroups [][]*volthaPod,
-	doneCh chan<- struct{}) {
+func monitorDiscovery(ctx context.Context, client pb.ConfigurationClient, ch <-chan *ic.InterContainerMessage, doneCh chan<- struct{}) {
 	defer close(doneCh)
 
-	var id = make(map[string]struct{})
-
-	select {
-	case <-ctx.Done():
-	case msg := <-ch:
-		log.Debugf("Received a device discovery notification")
-		device := &ic.DeviceDiscovered{}
-		if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
-			log.Errorf("Could not unmarshal received notification %v", msg)
-		} else {
-			// Set the affinity of the discovered device.
-			if be := getBackendForCore(device.Id, coreGroups); be != "" {
-				id[device.Id] = struct{}{}
-				setAffinity(ctx, client, id, be)
+monitorLoop:
+	for {
+		select {
+		case <-ctx.Done():
+		case msg := <-ch:
+			log.Debug("Received a device discovery notification")
+			device := &ic.DeviceDiscovered{}
+			if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
+				log.Errorf("Could not unmarshal received notification %v", msg)
 			} else {
-				log.Error("Cant use an empty string as a backend name")
+				// somewhat hackish solution, backend is known from the first digit found in the publisher name
+				group := regexp.MustCompile(`\d`).FindString(device.Publisher)
+				if group == "" {
+					// set the affinity of the discovered device
+					setAffinity(ctx, client, device.Id, afrouterRWClusterName+group)
+				} else {
+					log.Error("backend is unknown")
+				}
 			}
+			break monitorLoop
 		}
-		break
 	}
 }
 
-func startDiscoveryMonitor(ctx context.Context,
-	client pb.ConfigurationClient,
-	coreGroups [][]*volthaPod) (<-chan struct{}, error) {
+func startDiscoveryMonitor(ctx context.Context, client pb.ConfigurationClient) (<-chan struct{}, error) {
 	doneCh := make(chan struct{})
-	var ch <-chan *ic.InterContainerMessage
 	// Connect to kafka for discovery events
-	topic := &kafka.Topic{Name: kafkaTopic}
 	kc, err := newKafkaClient(kafkaClientType, kafkaHost, kafkaPort, kafkaInstanceID)
+	if err != nil {
+		panic(err)
+	}
 	kc.Start()
 	defer kc.Stop()
 
-	if ch, err = kc.Subscribe(topic); err != nil {
+	ch, err := kc.Subscribe(&kafka.Topic{Name: kafkaTopic})
+	if err != nil {
 		log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
 		close(doneCh)
 		return doneCh, err
 	}
 
-	go monitorDiscovery(ctx, client, ch, coreGroups, doneCh)
+	go monitorDiscovery(ctx, client, ch, doneCh)
 	return doneCh, nil
 }
 
-// Determines which items in core groups
-// have changed based on the list provided
-// and returns a coreGroup with only the changed
-// items and a pod list with the new items
-func getAddrDiffs(coreGroups [][]*volthaPod, rwPods []*volthaPod) ([][]*volthaPod, []*volthaPod) {
-	var nList []*volthaPod
-	var rtrn = make([][]*volthaPod, numRWPods>>1)
-	var ipAddrs = make(map[string]struct{})
-
-	log.Debug("Get addr diffs")
-
-	// Start with an empty array
-	for k := range rtrn {
-		rtrn[k] = make([]*volthaPod, 2)
-	}
-
-	// Build a list with only the new items
-	for _, v := range rwPods {
-		if !hasIpAddr(coreGroups, v.ipAddr) {
-			nList = append(nList, v)
-		}
-		ipAddrs[v.ipAddr] = struct{}{} // for the search below
-	}
-
-	// Now build the coreGroups with only the changed items
-	for k1, v1 := range coreGroups {
-		for k2, v2 := range v1 {
-			if _, ok := ipAddrs[v2.ipAddr]; !ok {
-				rtrn[k1][k2] = v2
-			}
-		}
-	}
-	return rtrn, nList
-}
-
-// Figure out where best to put the new pods
-// in the coreGroup array based on the old
-// pods being replaced. The criteria is that
-// the new pod be on the same server as the
-// old pod was.
-func reconcileAddrDiffs(coreGroupDiffs [][]*volthaPod, rwPodDiffs []*volthaPod) [][]*volthaPod {
-	var srvrs map[string][]*volthaPod = make(map[string][]*volthaPod)
-
-	log.Debug("Reconciling diffs")
-	log.Debug("Building server list")
-	for _, v := range rwPodDiffs {
-		log.Debugf("Adding %v to the server list", *v)
-		srvrs[v.node] = append(srvrs[v.node], v)
-	}
-
-	for k1, v1 := range coreGroupDiffs {
-		log.Debugf("k1:%v, v1:%v", k1, v1)
-		for k2, v2 := range v1 {
-			log.Debugf("k2:%v, v2:%v", k2, v2)
-			if v2 == nil { // Nothing to do here
-				continue
-			}
-			if _, ok := srvrs[v2.node]; ok {
-				coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
-				if len(srvrs[v2.node]) > 1 { // remove one entry from the list
-					srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
-				} else { // Delete the endtry from the map
-					delete(srvrs, v2.node)
-				}
-			} else {
-				log.Error("This should never happen, node appears to have changed names")
-				// attempt to limp along by keeping this old entry
-			}
-		}
-	}
-
-	return coreGroupDiffs
-}
-
-func applyAddrDiffs(ctx context.Context, client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
-	log.Debug("Applying diffs")
-	switch cores := coreList.(type) {
-	case [][]*volthaPod:
-		newEntries := reconcileAddrDiffs(getAddrDiffs(cores, nPods))
-
-		// Now replace the information in coreGropus with the new
-		// entries and then reconcile the device ids on the core
-		// that's in the new entry with the device ids of it's
-		// active-active peer.
-		for k1, v1 := range cores {
-			for k2, v2 := range v1 {
-				if newEntries[k1][k2] != nil {
-					// TODO: Missing is the case where bothe the primary
-					// and the secondary core crash and come back.
-					// Pull the device ids from the active-active peer
-					ids := queryPodDeviceIds(ctx, cores[k1][k2^1])
-					if len(ids) != 0 {
-						if !reconcilePodDeviceIds(ctx, newEntries[k1][k2], ids) {
-							log.Errorf("Attempt to reconcile ids on pod %v failed", newEntries[k1][k2])
-						}
-					}
-					// Send the affininty router new connection information
-					setConnection(ctx, client, afrouterRWClusterName, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, podGrpcPort)
-					// Copy the new entry information over
-					cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
-					cores[k1][k2].name = newEntries[k1][k2].name
-					cores[k1][k2].devIds = ids
-				}
-			}
-		}
-	case []*volthaPod:
-		var mia []*volthaPod
-		var found bool
-		// TODO: Break this using functions to simplify
-		// reading of the code.
-		// Find the core(s) that have changed addresses
-		for k1, v1 := range cores {
-			found = false
-			for _, v2 := range nPods {
-				if v1.ipAddr == v2.ipAddr {
-					found = true
-					break
-				}
-			}
-			if !found {
-				mia = append(mia, cores[k1])
-			}
-		}
-		// Now plug in the new addresses and set the connection
-		for _, v1 := range nPods {
-			found = false
-			for _, v2 := range cores {
-				if v1.ipAddr == v2.ipAddr {
-					found = true
-					break
-				}
-			}
-			if found {
-				continue
-			}
-			mia[0].ipAddr = v1.ipAddr
-			mia[0].name = v1.name
-			setConnection(ctx, client, afrouterROClusterName, mia[0].backend, mia[0].connection, v1.ipAddr, podGrpcPort)
-			// Now get rid of the mia entry just processed
-			mia = append(mia[:0], mia[1:]...)
-		}
-	default:
-		log.Error("Internal: Unexpected type in call to applyAddrDiffs")
-	}
-}
-
-func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
-	// Convenience
-	var byName = make(map[string]*volthaPod)
-	for _, v := range rwPods {
-		byName[v.name] = v
-	}
-
-	for k1, v1 := range coreGroups {
-		for k2, v2 := range v1 {
-			if pod, have := byName[v2.name]; have {
-				coreGroups[k1][k2].devIds = pod.devIds
-			}
-		}
-	}
-}
-
-func startCoreMonitor(ctx context.Context,
-	client pb.ConfigurationClient,
-	clientset *kubernetes.Clientset,
-	coreGroups [][]*volthaPod,
-	oRoPods []*volthaPod) {
-	// Now that initial allocation has been completed, monitor the pods
-	// for IP changes
-	// The main loop needs to do the following:
-	// 1) Periodically query the pods and filter out
-	//    the vcore ones
-	// 2) Validate that the pods running are the same
-	//    as the previous check
-	// 3) Validate that the IP addresses are the same
-	//    as the last check.
-	// If the pod name(s) ha(s/ve) changed then remove
-	// the unused pod names and add in the new pod names
-	// maintaining the cluster/backend information.
-	// If an IP address has changed (which shouldn't
-	// happen unless a pod is re-started) it should get
-	// caught by the pod name change.
+// coreMonitor polls the list of devices from all RW cores, pushes these devices
+// into the affinity router, and ensures that all cores in a backend have their devices synced
+func coreMonitor(ctx context.Context, client pb.ConfigurationClient, clientset *kubernetes.Clientset) {
+	// map[backend]map[deviceId]struct{}
+	deviceOwnership := make(map[string]map[string]struct{})
 loop:
 	for {
-		select {
-		case <-ctx.Done():
-			// if we're done, exit
-			break loop
-		case <-time.After(10 * time.Second): //wait a while
-		}
-
-		// Get the rw core list from k8s
-		rwPods, roPods, err := getVolthaPods(clientset)
+		// get the rw core list from k8s
+		rwPods, err := getVolthaPods(clientset)
 		if err != nil {
 			log.Error(err)
 			continue
 		}
 
-		// If we didn't get 2n+1 pods then wait since
-		// something is down and will hopefully come
-		// back up at some point.
-		if len(rwPods) != numRWPods {
-			log.Debug("One or more RW pod(s) are offline, will wait and retry")
-			continue
-		}
-
-		queryDeviceIds(ctx, rwPods)
-		updateDeviceIds(coreGroups, rwPods)
-
-		// We have all pods, check if any IP addresses
-		// have changed.
-		for _, v := range rwPods {
-			if !hasIpAddr(coreGroups, v.ipAddr) {
-				log.Debug("Address has changed...")
-				applyAddrDiffs(ctx, client, coreGroups, rwPods)
-				break
+		// for every pod
+		for _, pod := range rwPods {
+			// get the devices for this pod's backend
+			devices, have := deviceOwnership[pod.backend]
+			if !have {
+				devices = make(map[string]struct{})
+				deviceOwnership[pod.backend] = devices
 			}
-		}
 
-		if len(roPods) != numROPods {
-			log.Debug("One or more RO pod(s) are offline, will wait and retry")
-			continue
-		}
-		for _, v := range roPods {
-			if !hasIpAddr(oRoPods, v.ipAddr) {
-				applyAddrDiffs(ctx, client, oRoPods, roPods)
-				break
-			}
-		}
-	}
-}
+			coreDevices := queryPodDeviceIds(ctx, pod)
 
-func hasIpAddr(coreList interface{}, ipAddr string) bool {
-	switch cores := coreList.(type) {
-	case []*volthaPod:
-		for _, v := range cores {
-			if v.ipAddr == ipAddr {
-				return true
-			}
-		}
-	case [][]*volthaPod:
-		for _, v1 := range cores {
-			for _, v2 := range v1 {
-				if v2.ipAddr == ipAddr {
-					return true
+			// handle devices that exist in the core, but we have just learned about
+			for deviceId := range coreDevices {
+				// if there's a new device
+				if _, have := devices[deviceId]; !have {
+					// add the device to our local list
+					devices[deviceId] = struct{}{}
+					// push the device into the affinity router
+					setAffinity(ctx, client, deviceId, pod.backend)
 				}
 			}
+
+			// ensure that the core knows about all devices in its backend
+			toSync := make(map[string]struct{})
+			for deviceId := range devices {
+				// if the pod is missing any devices
+				if _, have := coreDevices[deviceId]; !have {
+					// we will reconcile them
+					toSync[deviceId] = struct{}{}
+				}
+			}
+
+			if len(toSync) != 0 {
+				reconcilePodDeviceIds(ctx, pod, toSync)
+			}
 		}
-	default:
-		log.Error("Internal: Unexpected type in call to hasIpAddr")
+
+		select {
+		case <-ctx.Done():
+			// if we're done, exit
+			break loop
+		case <-time.After(10 * time.Second): // wait a while
+		}
 	}
-	return false
 }
 
 // endOnClose cancels the context when the connection closes
@@ -887,94 +417,18 @@
 		// set up the client
 		client := pb.NewConfigurationClient(conn)
 
-		// determine config & repopulate the afrouter
-		generateAndMaintainConfiguration(ctx, client, clientset)
+		// start the discovery monitor and core monitor
+		// these two processes do the majority of the work
+
+		log.Info("Starting discovery monitoring")
+		doneCh, _ := startDiscoveryMonitor(ctx, client)
+
+		log.Info("Starting core monitoring")
+		coreMonitor(ctx, client, clientset)
+
+		//ensure the discovery monitor to quit
+		<-doneCh
 
 		conn.Close()
 	}
 }
-
-// generateAndMaintainConfiguration does the pod-reconciliation work,
-// it only returns once all sub-processes have completed
-func generateAndMaintainConfiguration(ctx context.Context, client pb.ConfigurationClient, clientset *kubernetes.Clientset) {
-	// Get the voltha rw-/ro-core pods
-	var rwPods, roPods []*volthaPod
-	for {
-		var err error
-		if rwPods, roPods, err = getVolthaPods(clientset); err != nil {
-			log.Error(err)
-			return
-		}
-
-		if len(rwPods) == numRWPods && len(roPods) == numROPods {
-			break
-		}
-
-		log.Debug("One or more RW/RO pod(s) are offline, will wait and retry")
-		select {
-		case <-ctx.Done():
-			return
-		case <-time.After(time.Second * 5):
-			// retry
-		}
-	}
-
-	// Fetch the devices held by each running core
-	queryDeviceIds(ctx, rwPods)
-
-	// For debugging... comment out l8r
-	for _, v := range rwPods {
-		log.Debugf("Pod list %v", *v)
-	}
-
-	coreGroups := groupPods1(rwPods)
-
-	// Assign the groupings to the the backends and connections
-	for k, coresInGroup := range coreGroups {
-		for k1 := range coresInGroup {
-			coreGroups[k][k1].cluster = afrouterRWClusterName
-			coreGroups[k][k1].backend = afrouterRWClusterName + strconv.Itoa(k+1)
-			coreGroups[k][k1].connection = afrouterRWClusterName + strconv.Itoa(k+1) + strconv.Itoa(k1+1)
-		}
-	}
-	log.Info("Core grouping completed")
-
-	// TODO: Debugging code, comment out for production
-	for k, v := range coreGroups {
-		for k2, v2 := range v {
-			log.Debugf("Core group %d,%d: %v", k, k2, v2)
-		}
-	}
-	log.Info("Setting affinities")
-	// Now set the affinities for exising devices in the cores
-	for _, v := range coreGroups {
-		setAffinity(ctx, client, v[0].devIds, v[0].backend)
-		setAffinity(ctx, client, v[1].devIds, v[1].backend)
-	}
-	log.Info("Setting connections")
-	// Configure the backeds based on the calculated core groups
-	for _, v := range coreGroups {
-		setConnection(ctx, client, afrouterRWClusterName, v[0].backend, v[0].connection, v[0].ipAddr, podGrpcPort)
-		setConnection(ctx, client, afrouterRWClusterName, v[1].backend, v[1].connection, v[1].ipAddr, podGrpcPort)
-	}
-
-	// Process the read only pods
-	for k, v := range roPods {
-		log.Debugf("Processing ro_pod %v", v)
-		vN := afrouterROClusterName + strconv.Itoa(k+1)
-		log.Debugf("Setting connection %s, %s, %s", vN, vN+"1", v.ipAddr)
-		roPods[k].cluster = afrouterROClusterName
-		roPods[k].backend = vN
-		roPods[k].connection = vN + "1"
-		setConnection(ctx, client, afrouterROClusterName, v.backend, v.connection, v.ipAddr, podGrpcPort)
-	}
-
-	log.Info("Starting discovery monitoring")
-	doneCh, _ := startDiscoveryMonitor(ctx, client, coreGroups)
-
-	log.Info("Starting core monitoring")
-	startCoreMonitor(ctx, client, clientset, coreGroups, roPods)
-
-	//ensure the discovery monitor to quit
-	<-doneCh
-}