[VOL-782,VOL-783,VOL-787]
Initial commit of the affinity router control plane
for voltha.

Change-Id: Ic2b5b52693d337e8107cfebfe6b92317d3c6d4f5
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
new file mode 100644
index 0000000..0ea0b49
--- /dev/null
+++ b/arouterd/arouterd.go
@@ -0,0 +1,947 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+	//"os"
+	"fmt"
+	"time"
+	"regexp"
+	"errors"
+	"strconv"
+	//"io/ioutil"
+	//"encoding/json"
+
+	"k8s.io/client-go/rest"
+	"google.golang.org/grpc"
+	"golang.org/x/net/context"
+	"k8s.io/client-go/kubernetes"
+	"github.com/golang/protobuf/ptypes"
+	//"k8s.io/apimachinery/pkg/api/errors"
+	"github.com/opencord/voltha-go/common/log"
+	kafka "github.com/opencord/voltha-go/kafka"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	empty "github.com/golang/protobuf/ptypes/empty"
+	vpb "github.com/opencord/voltha-go/protos/voltha"
+	pb "github.com/opencord/voltha-go/protos/afrouter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+)
+
+type configConn struct {
+	Server string `json:"Server"`
+	Cluster string `json:"Cluster"`
+	Backend string `json:"Backend"`
+	connections map[string]connection
+}
+
+type connection struct {
+	Name string `json:"Connection"`
+	Addr string `json:"Addr"`
+	Port uint64 `json:"Port"`
+}
+
+type rwPod struct {
+	name string
+	ipAddr string
+	node string
+	devIds map[string]struct{}
+	backend string
+	connection string
+}
+
+type podTrack struct {
+		pod *rwPod
+		dn bool
+}
+
+// Topic is affinityRouter
+// port: 9092
+
+func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
+
+	log.Infow("kafka-client-type", log.Fields{"client": clientType})
+	switch clientType {
+	case "sarama":
+		return kafka.NewSaramaClient(
+			kafka.Host(host),
+			kafka.Port(port),
+			kafka.ConsumerType(kafka.GroupCustomer),
+			kafka.ProducerReturnOnErrors(true),
+			kafka.ProducerReturnOnSuccess(true),
+			kafka.ProducerMaxRetries(6),
+			kafka.NumPartitions(3),
+			kafka.ConsumerGroupName(instanceID),
+			kafka.ConsumerGroupPrefix(instanceID),
+			kafka.AutoCreateTopic(false),
+			kafka.ProducerFlushFrequency(5),
+			kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
+	}
+	return nil, errors.New("unsupported-client-type")
+}
+
+
+func k8sClientSet() *kubernetes.Clientset {
+	// creates the in-cluster config
+	config, err := rest.InClusterConfig()
+	if err != nil {
+		panic(err.Error())
+	}
+	// creates the clientset
+	clientset, err := kubernetes.NewForConfig(config)
+	if err != nil {
+		panic(err.Error())
+	}
+
+	return clientset
+}
+
+
+func connect(addr string) (*grpc.ClientConn, error) {
+	for ctr :=0 ; ctr < 100; ctr++ {
+		log.Debug("Trying to connect to %s", addr)
+		conn, err := grpc.Dial(addr, grpc.WithInsecure())
+		if err != nil {
+			log.Debugf("Attempt to connect failed, retrying %v:", err)
+		} else {
+			log.Debugf("Connection succeeded")
+			return conn,err
+		}
+		time.Sleep(10 * time.Second)
+	}
+	log.Debugf("Too many connection attempts, giving up!")
+	return nil,errors.New("Timeout attempting to conect")
+}
+
+func getRwPods(cs *kubernetes.Clientset, coreFilter * regexp.Regexp) []*rwPod {
+	var rtrn []*rwPod
+
+	pods, err := cs.CoreV1().Pods("").List(metav1.ListOptions{})
+	if err != nil {
+		panic(err.Error())
+	}
+	log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
+
+	for k,v := range pods.Items {
+		if v.Namespace == "voltha" && coreFilter.MatchString(v.Name) {
+			fmt.Printf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
+																	v.Status.PodIP, v.Spec.NodeName)
+			//fmt.Printf("Pod %v,%v\n\n\n",k,v)
+			_ = k
+			// Add this pod to the core structure.
+			rtrn = append(rtrn, &rwPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
+						  devIds:make(map[string]struct{}), backend:"", connection:""})
+		}
+	}
+	return rtrn
+}
+
+func queryDevices(pods []*rwPod) {
+	for pk,pv := range pods {
+		// Open a connection to the pod
+		// port 50057
+		conn, err := connect(pv.ipAddr+":50057")
+		if (err != nil) {
+			log.Debugf("Could not query devices from %s, could not connect", pv.name)
+			continue
+		}
+		client := vpb.NewVolthaServiceClient(conn)
+		devs,err := client.ListDevices(context.Background(), &empty.Empty{})
+		if err != nil {
+			log.Error(err)
+			conn.Close()
+			continue
+		}
+		for _,dv := range devs.Items {
+			pods[pk].devIds[dv.Id]=struct{}{}
+		}
+		conn.Close()
+	}
+}
+
+func allEmpty(pods []*rwPod) bool {
+	for k,_ := range pods {
+		if len(pods[k].devIds) != 0 {
+			return false
+		}
+	}
+	return true
+}
+
+//func groupEmptyCores(pods []*rwPod) [][]*rwPod {
+//	return [][]*rwPod{}
+//}
+
+//func groupPods(pods []*rwPod) [][]*rwPod {
+
+//	if allEmpty(pods) == true {
+//		return groupEmptyCores(pods)
+//	} else {
+//		return groupPopulatedCores(pods)
+//	}
+//}
+
+func rmPod(pods []*rwPod, idx int) []*rwPod {
+	return append(pods[:idx],pods[idx+1:]...)
+}
+
+func groupIntersectingPods1(pods []*rwPod, podCt int) ([][]*rwPod,[]*rwPod) {
+	var rtrn [][]*rwPod
+	var out []*rwPod
+
+	for {
+		if len(pods) == 0 {
+			break
+		}
+		if len(pods[0].devIds) == 0 { // Ignore pods with no devices
+			////log.Debugf("%s empty pod", pd[k].pod.name)
+			out = append(out, pods[0])
+			pods = rmPod(pods, 0)
+			continue
+		}
+		// Start a pod group with this pod
+		var grp []*rwPod
+		grp = append(grp, pods[0])
+		pods = rmPod(pods,0)
+		//log.Debugf("Creating new group %s", pd[k].pod.name)
+		// Find the peer pod based on device overlap
+		// It's ok if one isn't found, an empty one will be used instead
+		for k,_ := range pods {
+			if len(pods[k].devIds) == 0 { // Skip pods with no devices
+				//log.Debugf("%s empty pod", pd[k1].pod.name)
+				continue
+			}
+			if intersect(grp[0].devIds, pods[k].devIds) == true {
+				//log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
+				if grp[0].node == pods[k].node {
+					// This should never happen
+					log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
+								grp[0].name, pods[k].name)
+					continue
+				}
+				grp = append(grp, pods[k])
+				pods = rmPod(pods, k)
+				break
+
+			}
+		}
+		rtrn = append(rtrn, grp)
+		//log.Debugf("Added group %s", grp[0].name)
+		// Check if the number of groups = half the pods, if so all groups are started.
+		if len(rtrn) == podCt >> 1 {
+			// Append any remaining pods to out
+			out = append(out,pods[0:]...)
+			break
+		}
+	}
+	return rtrn,out
+}
+
+func groupIntersectingPods(pd []*podTrack) ([][]*rwPod,[]*podTrack) {
+	var rtrn [][]*rwPod
+
+	for k,_ := range pd {
+		if pd[k].dn == true { // Already processed?
+			//log.Debugf("%s already processed", pd[k].pod.name)
+			continue
+		}
+		if len(pd[k].pod.devIds) == 0 { // Ignore pods with no devices
+			////log.Debugf("%s empty pod", pd[k].pod.name)
+			continue
+		}
+		// Start a pod group with this pod
+		var grp []*rwPod
+		grp = append(grp, pd[k].pod)
+		pd[k].dn = true
+		//log.Debugf("Creating new group %s", pd[k].pod.name)
+		// Find the peer pod based on device overlap
+		// It's ok if one isn't found, an empty one will be used instead
+		for k1,_ := range pd {
+			if pd[k1].dn == true { // Skip over eliminated pods
+				//log.Debugf("%s eliminated pod", pd[k1].pod.name)
+				continue
+			}
+			if len(pd[k1].pod.devIds) == 0 { // Skip pods with no devices
+				//log.Debugf("%s empty pod", pd[k1].pod.name)
+				continue
+			}
+			if intersect(pd[k].pod.devIds, pd[k1].pod.devIds) == true {
+				//log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
+				if pd[k].pod.node == pd[k1].pod.node {
+					// This should never happen
+					log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
+								pd[k].pod.name, pd[k1].pod.name)
+					continue
+				}
+				pd[k1].dn = true
+				grp = append(grp, pd[k1].pod)
+				break
+			}
+		}
+		rtrn = append(rtrn, grp)
+		//log.Debugf("Added group %s", grp[0].name)
+		// Check if the number of groups = half the pods, if so all groups are started.
+		if len(rtrn) == len(pd) >> 1 {
+			break
+		}
+	}
+	return rtrn,pd
+}
+
+func unallocPodCount(pd []*podTrack) int {
+	var rtrn int = 0
+	for _,v := range pd {
+		if v.dn == false {
+			rtrn++
+		}
+	}
+	return rtrn
+}
+
+
+func sameNode(pod *rwPod, grps [][]*rwPod) bool {
+	for _,v := range grps {
+		if v[0].node == pod.node {
+			return true
+		}
+		if len(v) == 2 && v[1].node == pod.node {
+			return true
+		}
+	}
+	return false
+}
+
+func startRemainingGroups1(grps [][]*rwPod, pods []*rwPod, podCt int) ([][]*rwPod, []*rwPod) {
+	var grp []*rwPod
+
+	for k,_ := range pods {
+		if sameNode(pods[k], grps) {
+			continue
+		}
+		grp = []*rwPod{}
+		grp = append(grp, pods[k])
+		pods = rmPod(pods, k)
+		grps = append(grps, grp)
+		if len(grps) == podCt >> 1 {
+			break
+		}
+	}
+	return grps, pods
+}
+
+func startRemainingGroups(grps [][]*rwPod, pd []*podTrack) ([][]*rwPod, []*podTrack) {
+	var grp []*rwPod
+
+	for k,_ := range pd {
+		if sameNode(pd[k].pod, grps) == true {
+			continue
+		}
+		grp = append(grp, pd[k].pod)
+		grps = append(grps, grp)
+		pd[k].dn = true
+		if len(grps) == len(pd) >> 1 {
+			break
+		}
+	}
+	return grps, pd
+}
+
+func hasSingleSecondNode(grp []*rwPod) bool {
+	var srvrs map[string]struct{} = make(map[string]struct{})
+	for k,_ := range grp {
+		if k == 0 {
+			continue // Ignore the first item
+		}
+		srvrs[grp[k].node] = struct{}{}
+	}
+	if len(srvrs) == 1 {
+		return true
+	}
+	return false
+}
+
+func addNode(grps [][]*rwPod, idx *rwPod, item *rwPod) [][]*rwPod {
+	for k,_ := range grps {
+		if grps[k][0].name == idx.name {
+			grps[k] = append(grps[k], item)
+			return grps
+		}
+	}
+	// TODO: Error checking required here.
+	return grps
+}
+
+func removeNode(grps [][]*rwPod, item *rwPod) [][]*rwPod {
+	for k,_ := range grps {
+		for k1,_ := range grps[k] {
+			if grps[k][k1].name == item.name {
+				grps[k] = append(grps[k][:k1],grps[k][k1+1:]...)
+				break
+			}
+		}
+	}
+	return grps
+}
+
+func groupRemainingPods1(grps [][]*rwPod, pods []*rwPod) [][]*rwPod {
+	var lgrps [][]*rwPod
+	// All groups must be started when this function is called.
+	// Copy incomplete groups
+	for k,_ := range grps {
+		if len(grps[k]) != 2 {
+			lgrps = append(lgrps, grps[k])
+		}
+	}
+
+	// Add all pairing candidates to each started group.
+	for k,_ := range pods {
+		for k2,_ := range lgrps {
+			if lgrps[k2][0].node != pods[k].node {
+				lgrps[k2] = append(lgrps[k2], pods[k])
+			}
+		}
+	}
+
+	//TODO: If any member of lgrps doesn't have at least 2
+	// nodes something is wrong. Check for that here
+
+	for {
+		for { // Address groups with only a single server choice
+			var ssn bool = false
+
+			for k,_ := range lgrps {
+				// Now if any of the groups only have a single
+				// node as the choice for the second member
+				// address that one first.
+				if hasSingleSecondNode(lgrps[k]) == true {
+					ssn =  true
+					// Add this pairing to the groups
+					grps = addNode(grps, lgrps[k][0], lgrps[k][1])
+					// Since this node is now used, remove it from all
+					// remaining tenative groups
+					lgrps = removeNode(lgrps, lgrps[k][1])
+					// Now remove this group completely since
+					// it's been addressed
+					lgrps  = append(lgrps[:k],lgrps[k+1:]...)
+					break
+				}
+			}
+			if ssn == false {
+				break
+			}
+		}
+		// Now adress one of the remaining groups
+		if len(lgrps) == 0 {
+			break // Nothing left to do, exit the loop
+		}
+		grps = addNode(grps, lgrps[0][0], lgrps[0][1])
+		lgrps = removeNode(lgrps, lgrps[0][1])
+		lgrps  = append(lgrps[:0],lgrps[1:]...)
+	}
+	return grps
+}
+
+func groupRemainingPods(grps [][]*rwPod, pd []*podTrack) [][]*rwPod{
+	var lgrps [][]*rwPod
+	// All groups must be started when this function is called.
+	// Copy incomplete groups
+	for k,_ := range grps {
+		if len(grps[k]) != 2 {
+			lgrps = append(lgrps, grps[k])
+		}
+	}
+
+	// Add all pairing candidates to each started group.
+	for k,_ := range pd {
+		if pd[k].dn == true {
+			continue
+		}
+		for k2,_ := range lgrps {
+			if lgrps[k2][0].node != pd[k].pod.node {
+				lgrps[k2] = append(lgrps[k2], pd[k].pod)
+			}
+		}
+	}
+
+	//TODO: If any member of lgrps doesn't have at least 2
+	// nodes something is wrong. Check for that here
+
+	for {
+		for { // Address groups with only a single server choice
+			var ssn bool = false
+
+			for k,_ := range lgrps {
+				// Now if any of the groups only have a single
+				// node as the choice for the second member
+				// address that one first.
+				if hasSingleSecondNode(lgrps[k]) == true {
+					ssn =  true
+					// Add this pairing to the groups
+					grps = addNode(grps, lgrps[k][0], lgrps[k][1])
+					// Since this node is now used, remove it from all
+					// remaining tenative groups
+					lgrps = removeNode(lgrps, lgrps[k][1])
+					// Now remove this group completely since
+					// it's been addressed
+					lgrps  = append(lgrps[:k],lgrps[k+1:]...)
+					break
+				}
+			}
+			if ssn == false {
+				break
+			}
+		}
+		// Now adress one of the remaining groups
+		if len(lgrps) == 0 {
+			break // Nothing left to do, exit the loop
+		}
+		grps = addNode(grps, lgrps[0][0], lgrps[0][1])
+		lgrps = removeNode(lgrps, lgrps[0][1])
+		lgrps  = append(lgrps[:0],lgrps[1:]...)
+	}
+	return grps
+}
+
+func groupPods1(pods []*rwPod) [][]*rwPod {
+	var rtrn [][]*rwPod
+	var podCt int = len(pods)
+
+	rtrn,pods = groupIntersectingPods1(pods, podCt)
+	// There are several outcomes here 
+	// 1) All pods have been paired and we're done
+	// 2) Some un-allocated pods remain
+	// 2.a) All groups have been started
+	// 2.b) Not all groups have been started
+	if len(pods) == 0 {
+		return rtrn
+	} else if len(rtrn) == podCt >> 1 { // All groupings started
+		// Allocate the remaining (presumably empty) pods to the started groups
+		return groupRemainingPods1(rtrn, pods)
+	} else { // Some groupings started
+		// Start empty groups with remaining pods
+		// each grouping is on a different server then
+		// allocate remaining pods.
+		rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
+		return groupRemainingPods1(rtrn, pods)
+	}
+}
+
+func groupPods(pods []*rwPod) [][]*rwPod {
+	var rtrn [][]*rwPod
+	var pd []*podTrack
+
+	// Tracking of the grouping process
+	for k,_ := range pods {
+		pd = append(pd, &podTrack{pods[k],false})
+	}
+
+
+	rtrn,pd = groupIntersectingPods(pd)
+	// There are several outcomes here 
+	// 1) All pods have been paired and we're done
+	// 2) Some un-allocated pods remain
+	// 2.a) All groups have been started
+	// 2.b) Not all groups have been started
+	if unallocPodCount(pd) == 0 {
+		return rtrn
+	} else if len(rtrn) == len(pd) >> 1 { // All groupings started
+		// Allocate the remaining (presumably empty) pods to the started groups
+		return groupRemainingPods(rtrn, pd)
+	} else { // Some groupings started
+		// Start empty groups with remaining pods
+		// each grouping is on a different server then
+		// allocate remaining pods.
+		rtrn, pd = startRemainingGroups(rtrn, pd)
+		return groupRemainingPods(rtrn, pd)
+	}
+
+
+	// Establish groupings of non-empty pods that have overlapping devices.
+	for k,_ := range pd {
+		if pd[k].dn == true { // Already processed?
+			//log.Debugf("%s already processed", pd[k].pod.name)
+			continue
+		}
+		if len(pd[k].pod.devIds) == 0 { // Ignore pods with no devices
+			////log.Debugf("%s empty pod", pd[k].pod.name)
+			continue
+		}
+		// Start a pod group with this pod
+		var grp []*rwPod
+		grp = append(grp, pd[k].pod)
+		pd[k].dn = true
+		//log.Debugf("Creating new group %s", pd[k].pod.name)
+		// Find the peer pod based on device overlap
+		// It's ok if one isn't found, an empty one will be used instead
+		for k1,_ := range pd {
+			if pd[k1].dn == true { // Skip over eliminated pods
+				//log.Debugf("%s eliminated pod", pd[k1].pod.name)
+				continue
+			}
+			if len(pd[k1].pod.devIds) == 0 { // Skip pods with no devices
+				//log.Debugf("%s empty pod", pd[k1].pod.name)
+				continue
+			}
+			if intersect(pd[k].pod.devIds, pd[k1].pod.devIds) == true {
+				//log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
+				pd[k1].dn = true
+				grp = append(grp, pd[k1].pod)
+				break
+			}
+		}
+		rtrn = append(rtrn, grp)
+		//log.Debugf("Added group %s", grp[0].name)
+	}
+	// Now find any grouping without 2 members and assign one of the 
+	// pods with no devices and on a different server to it.
+	// If there are no pods with no devices left before all
+	// groups are filled report an exception but leave one of the
+	// groups with only one pod.
+	for k,_ := range rtrn {
+		if len(rtrn[k]) < 2 {
+			for k2,_ := range pd {
+				if pd[k2].dn == true {
+					continue
+				}
+				// There should be only empty pods here
+				if len(pd[k2].pod.devIds) != 0 {
+					log.Error("Non empty pod found where empty pod was expected")
+					continue
+				}
+				if pd[k2].pod.node == rtrn[k][0].node {
+					//log.Error("Pods aren't on different servers, continuing")
+					continue
+				}
+				// Add this empty and unused pod to the group
+				//log.Debugf("Adding empty pod %s", pd[k2].pod.name)
+				rtrn[k] = append(rtrn[k], pd[k2].pod)
+				pd[k2].dn = true
+				break
+			}
+		}
+	}
+	return rtrn
+}
+
+func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
+	for k,_ := range d1 {
+		if _,ok := d2[k]; ok == true {
+			return true
+		}
+	}
+	return false
+}
+
+func setConnection(client pb.ConfigurationClient, backend string, connection string, addr string, port uint64) {
+	log.Debugf("Configuring backend %s : connection %s\n\n", backend, connection)
+	cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:backend,
+					Connection:connection,Addr:addr,
+					Port:port}
+	if res, err := client.SetConnection(context.Background(), cnf); err != nil {
+		log.Debugf("failed SetConnection RPC call: %s", err)
+	} else {
+		log.Debugf("Result: %v", res)
+	}
+}
+
+func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
+	log.Debugf("Configuring backend %s : affinities \n", backend)
+	aff := &pb.Affinity{Router:"vcore",Route:"dev_manager",Cluster:"vcore",Backend:backend}
+	for k,_ := range ids {
+		log.Debugf("Setting affinity for id %s", k)
+		aff.Id = k
+		if res, err := client.SetAffinity(context.Background(), aff); err != nil {
+			log.Debugf("failed affinity RPC call: %s", err)
+		} else {
+			log.Debugf("Result: %v", res)
+		}
+	}
+}
+
+func monitorDiscovery(client pb.ConfigurationClient,
+						ch <-chan *ic.InterContainerMessage) {
+	select {
+	case msg := <-ch:
+		log.Debugf("Received a device discovery notification")
+		_ = msg
+		requestBody := &ic.InterContainerRequestBody{}
+		if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
+			log.Errorf("Could not unmarshal received notification %v", msg)
+		} else {
+			// Do something with the message here
+		}
+		break
+	}
+}
+
+func startDiscoveryMonitor(client pb.ConfigurationClient) error {
+	var ch <-chan *ic.InterContainerMessage
+	// Connect to kafka for discovery events
+	topic := &kafka.Topic{Name: "AffinityRouter"}
+	kc,err := newKafkaClient("sarama", "kafka", 9092, "arouterd")
+	kc.Start()
+
+	if ch, err = kc.Subscribe(topic); err != nil {
+		log.Error("Could not subscribe to the 'AffinityRouter' channel, discovery disabled")
+		return err
+	}
+	go monitorDiscovery(client, ch)
+	return nil
+}
+
+func startCoreMonitor(client pb.ConfigurationClient,
+					clientset *kubernetes.Clientset,
+					coreFltr *regexp.Regexp,
+					coreGroups [][]*rwPod) error {
+	// Now that initial allocation has been completed, monitor the pods
+	// for IP changes
+	// The main loop needs to do the following:
+	// 1) Periodically query the pods and filter out
+	//    the vcore ones
+	// 2) Validate that the pods running are the same
+	//    as the previous check
+	// 3) Validate that the IP addresses are the same
+	//    as the last check.
+	// If the pod name(s) ha(s/ve) changed then remove
+	// the unused pod names and add in the new pod names
+	// maintaining the cluster/backend information.
+	// If an IP address has changed (which shouldn't
+	// happen unless a pod is re-started) it should get
+	// caught by the pod name change.
+	for {
+		time.Sleep(10 * time.Second) // Wait a while
+		// Get the rw core list from k8s
+		rwPods := getRwPods(clientset, coreFltr)
+		// If we didn't get 2n+1 pods then wait since
+		// something is down and will hopefully come
+		// back up at some point.
+		// TODO: remove the 6 pod hardcoding
+		if len(rwPods) != 6 {
+			continue
+		}
+		// We have all pods, check if any IP addresses
+		// have changed.
+		for _,v := range rwPods {
+			//if hasIpAddr(coreGroups, v.ipAddr) == false {
+				//log.Debug("Address has changed...")
+				//applyAddrDiffs(coreGroups, rwPods)
+			//}
+			_ = v
+		}
+	}
+
+}
+
+func main() {
+	// This is currently hard coded to a cluster with 3 servers
+	//var connections map[string]configConn = make(map[string]configConn)
+	//var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
+	var rwCoreNodesPrev map[string][]rwPod = make(map[string][]rwPod)
+	var firstTime bool = true
+	var err error
+	var conn *grpc.ClientConn
+
+
+	// Set up the regular expression to identify the voltha cores
+	coreFltr := regexp.MustCompile(`rw-core[0-9]-`)
+
+	// Set up logging
+	if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+
+	// Set up kubernetes api
+	clientset := k8sClientSet()
+
+	// Connect to the affinity router and set up the client
+	conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
+	if err != nil {
+		panic(err.Error())
+	}
+	client := pb.NewConfigurationClient(conn)
+
+	// Get the voltha rw-core podes
+	rwPods := getRwPods(clientset, coreFltr)
+
+	// Fetch the devices held by each running core
+	queryDevices(rwPods)
+
+	// For debugging... comment out l8r
+	for _,v := range rwPods {
+		log.Debugf("Pod list %v", *v)
+	}
+
+	coreGroups := groupPods1(rwPods)
+
+
+	// Assign the groupings to the the backends and connections
+	for k,_ := range coreGroups {
+		for k1,_ := range coreGroups[k] {
+			coreGroups[k][k1].backend = "vcore"+strconv.Itoa(k+1)
+			coreGroups[k][k1].connection = "vcore"+strconv.Itoa(k+1)+strconv.Itoa(k1+1)
+		}
+	}
+	log.Debug("Core gouping completed")
+
+	// TODO: Debugging code, comment out for production
+	for k,v := range coreGroups {
+		for k2,v2 := range v {
+			log.Debugf("Core group %d,%d: %v", k, k2, v2)
+		}
+	}
+	log.Debug("Setting affinities")
+	// Now set the affinities for exising devices in the cores
+	for _,v := range coreGroups {
+		setAffinity(client, v[0].devIds, v[0].backend)
+		setAffinity(client, v[1].devIds, v[1].backend)
+	}
+	log.Debug("Setting connections")
+	// Configure the backeds based on the calculated core groups
+	for _,v := range coreGroups {
+		setConnection(client, v[0].backend, v[0].connection, v[0].ipAddr, 50057)
+		setConnection(client, v[1].backend, v[1].connection, v[1].ipAddr, 50057)
+	}
+
+	log.Debug("Starting discovery monitoring")
+	startDiscoveryMonitor(client)
+
+	log.Debugf("Starting core monitoring")
+	startCoreMonitor(client, clientset, coreFltr, coreGroups) // Never returns
+	return
+
+
+	// The main loop needs to do the following:
+	// 1) Periodically query the pods and filter out
+	//    the vcore ones
+	// 2) Validate that the pods running are the same
+	//    as the previous check
+	// 3) Validate that the IP addresses are the same
+	//    as the last check.
+	// If the pod name(s) ha(s/ve) changed then remove
+	// the unused pod names and add in the new pod names
+	// maintaining the cluster/backend information.
+	// If an IP address has changed (which shouldn't
+	// happen unless a pod is re-started) it should get
+	// caught by the pod name change.
+	for {
+		var rwCorePods map[string]rwPod = make(map[string]rwPod)
+		var rwCoreNodes map[string][]rwPod = make(map[string][]rwPod)
+		pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
+		if err != nil {
+			panic(err.Error())
+		}
+		log.Debugf("There are %d pods in the cluster\n", len(pods.Items))
+
+		/*
+		for k,v := range pods.Items {
+			if v.Namespace == "voltha" && coreFltr.MatchString(v.Name) {
+				fmt.Printf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
+																		v.Status.PodIP, v.Spec.NodeName)
+				//fmt.Printf("Pod %v,%v\n\n\n",k,v)
+				_ = k
+				// Add this pod to the core structure.
+				if firstTime == true {
+					rwCorePodsPrev[v.Name] = rwPod{name:v.Name,node:v.Spec.NodeName}
+					rwCoreNodesPrev[v.Spec.NodeName] =
+							append(rwCoreNodesPrev[v.Spec.NodeName], rwPod{name:v.Name,node:v.Spec.NodeName})
+				}
+				rwCorePods[v.Name] = rwPod{v.Name,v.Status.PodIP,v.Spec.NodeName, "", ""}
+				rwCoreNodes[v.Spec.NodeName] =
+							append(rwCoreNodes[v.Spec.NodeName], rwPod{v.Name,v.Status.PodIP,v.Spec.NodeName,"",""})
+			}
+		}
+		*/
+
+		if len(rwCorePods) != 6 {
+			continue
+		}
+
+		//fmt.Printf("Pod map: %v\n", rwCorePods)
+		//fmt.Printf("Pod map2: %v\n", rwCoreNodes)
+
+		// Examples for error handling:
+		// - Use helper functions like e.g. errors.IsNotFound()
+		// - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
+		/*
+		_, err = clientset.CoreV1().Pods("default").Get("example-xxxxx", metav1.GetOptions{})
+		if errors.IsNotFound(err) {
+			fmt.Printf("Pod not found\n")
+		} else if statusError, isStatus := err.(*errors.StatusError); isStatus {
+			fmt.Printf("Error getting pod %v\n", statusError.ErrStatus.Message)
+		} else if err != nil {
+			panic(err.Error())
+		} else {
+			fmt.Printf("Found pod\n")
+		}
+		*/
+		// Set the association to backends and connections only once.
+		// TODO: This needs to be reworked for when a pod crashes
+		// and it's name changes.
+		if firstTime == true {
+			be := 1
+			for k,_ := range rwCoreNodesPrev { // Each node has 2 cores running on it
+				// Use a pretty dumb distribution algorithm.
+				log.Debugf("Processing core node %s:%d\n", k,be)
+				rwCoreNodesPrev[k][0].backend = "vcore"+strconv.Itoa(be)
+				rwCoreNodesPrev[k][0].connection = "vcore"+strconv.Itoa(be)+strconv.Itoa(1)
+				rwCoreNodesPrev[k][1].backend = "vcore"+strconv.Itoa(be%3+1)
+				rwCoreNodesPrev[k][1].connection = "vcore"+strconv.Itoa(be%3+1)+strconv.Itoa(2)
+				be++
+			}
+		}
+
+		log.Debugf("Backend Allocation: %v",rwCoreNodesPrev)
+		// Compare the current node IPs with the previous node IPs and if they differ
+		// then set the new one and send the command to configure the router with the
+		// new backend connection.
+		for k,v := range rwCoreNodesPrev {
+			if rwCoreNodes[k][0].ipAddr != rwCoreNodesPrev[k][0].ipAddr {
+				log.Debugf("Configuring backend %s : connection %s\n\n", v[0].backend, v[0].connection)
+				cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:rwCoreNodesPrev[k][0].backend,
+								Connection:rwCoreNodesPrev[k][0].connection,Addr:rwCoreNodes[k][0].ipAddr,
+								Port:50057}
+				if res, err := client.SetConnection(context.Background(), cnf); err != nil {
+					log.Debugf("failed SetConnection RPC call: %s", err)
+				} else {
+					log.Debugf("Result: %v", res)
+					rwCoreNodesPrev[k][0].ipAddr = rwCoreNodes[k][0].ipAddr
+				}
+			}
+			if rwCoreNodes[k][1].ipAddr != rwCoreNodesPrev[k][1].ipAddr {
+				log.Debugf("Configuring backend %s : connection %s\n\n", v[1].backend, v[1].connection)
+				cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:rwCoreNodesPrev[k][1].backend,
+								Connection:rwCoreNodesPrev[k][1].connection,Addr:rwCoreNodes[k][1].ipAddr,
+								Port:50057}
+				if res, err := client.SetConnection(context.Background(), cnf); err != nil {
+					log.Debugf("failed SetConnection RPC call: %s", err)
+				} else {
+					log.Debugf("Result: %v", res)
+					rwCoreNodesPrev[k][1].ipAddr = rwCoreNodes[k][1].ipAddr
+				}
+			}
+		}
+
+
+		fmt.Printf("The structure for setting the connections is: %v\n", rwCoreNodesPrev)
+		firstTime = false
+
+		// Now make the API calls 
+		time.Sleep(10 * time.Second)
+	}
+	conn.Close()
+
+}
+
diff --git a/k8s/affinity-router.yml b/k8s/affinity-router.yml
index 28c2623..a92e442 100644
--- a/k8s/affinity-router.yml
+++ b/k8s/affinity-router.yml
@@ -12,41 +12,45 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-apiVersion: v1
-kind: Pod
+apiVersion: apps/v1beta1
+kind: Deployment
 metadata:
   name: afrouter
   namespace: voltha
-  labels:
-    app: afrouter
-  annotations:
-    cni: "calico"
 spec:
-  containers:
-  - name: arouter
-    image: volthacore/afrouter:testing
-    imagePullPolicy: Always
-    volumeMounts:
-    - name: config-volume
-      mountPath: /app/config
-    ports:
-    - containerPort: 55555
-    command: ["/app/afrouter"]
-    args: ["-config", "/app/config/arouter.voltha3.json"]
-  - name: envoy
-    image: volthacore/envoy
-    volumeMounts:
-    - name: config-volume
-      mountPath: /envoy/config
-    ports:
-    - containerPort: 8192
-    - containerPort: 50555
-  - name: arouterd
-    image: volthacore/afrouterd:testing
-    command: ["/app/arouterd"]
-    imagePullPolicy: Always
-  restartPolicy: Never
-  volumes:
-    - name: config-volume
-      configMap:
-        name: afrouter-config
+  replicas: 1
+  template:
+    metadata:
+      labels:
+        app: afrouter
+      annotations:
+        cni: "calico"
+    spec:
+      containers:
+      - name: arouter
+        image: volthacore/afrouter:testing
+        imagePullPolicy: Always
+        volumeMounts:
+        - name: config-volume
+          mountPath: /app/config
+        ports:
+        - containerPort: 55555
+        command: ["/app/afrouter"]
+        args: ["-config", "/app/config/arouter.voltha3.json"]
+      - name: envoy
+        image: volthacore/envoy
+        volumeMounts:
+        - name: config-volume
+          mountPath: /envoy/config
+        ports:
+        - containerPort: 8192
+        - containerPort: 50555
+      - name: arouterd
+        image: volthacore/afrouterd:testing
+        command: ["/app/arouterd"]
+        imagePullPolicy: Always
+      restartPolicy: Always
+      volumes:
+        - name: config-volume
+          configMap:
+            name: afrouter-config