Afrouterd contextualization.

- Removed connect loop in favour of grpc-implemented backoff.
- Added context that is passed to all subprocesses, and canceled when the connection to the afrouter is lost.
- On afrouter re-connect, everything is stopped, and the system starts over from a clean slate.

Resolves VOL-1681.
Possibly resolves VOL-1655 & VOL-1661, please retest.

Change-Id: I92e16ac02b2ba209570d25ac407515d2df1c7b22
diff --git a/Gopkg.lock b/Gopkg.lock
index eaa2bb4..0749a6c 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -810,6 +810,7 @@
     "google.golang.org/grpc/connectivity",
     "google.golang.org/grpc/credentials",
     "google.golang.org/grpc/grpclog",
+    "google.golang.org/grpc/keepalive",
     "google.golang.org/grpc/metadata",
     "google.golang.org/grpc/status",
     "gopkg.in/Shopify/sarama.v1",
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 9f1215c..bfd7ba9 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -20,6 +20,8 @@
 	"errors"
 	"flag"
 	"fmt"
+	"google.golang.org/grpc/connectivity"
+	"google.golang.org/grpc/keepalive"
 	"math"
 	"os"
 	"path"
@@ -74,6 +76,8 @@
 	numRWPods = getIntEnv("NUM_RW_PODS", 1, math.MaxInt32, 6)
 	numROPods = getIntEnv("NUM_RO_PODS", 1, math.MaxInt32, 3)
 
+	afrouterApiAddress = getStrEnv("AFROUTER_API_ADDRESS", "localhost:55554")
+
 	afrouterRouterName    = getStrEnv("AFROUTER_ROUTER_NAME", "vcore")
 	afrouterRWClusterName = getStrEnv("AFROUTER_RW_CLUSTER_NAME", "vcore")
 	afrouterROClusterName = getStrEnv("AFROUTER_RO_CLUSTER_NAME", "ro_vcore")
@@ -152,34 +156,27 @@
 	return clientset
 }
 
-func connect(addr string) (*grpc.ClientConn, error) {
-	for ctr := 0; ctr < 100; ctr++ {
-		startTime := time.Now()
-		log.Debugf("Trying to connect to %s", addr)
-		ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
-		conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock())
-		if err != nil {
-			log.Debugf("Attempt to connect failed, retrying. (%v)", err)
-		} else {
-			log.Debugf("Connection succeeded")
-			return conn, err
-		}
-		// 5s between attempts, whether or not if the connection fails immediately
-		time.Sleep(startTime.Add(time.Second * 5).Sub(time.Now()))
+func connect(ctx context.Context, addr string) (*grpc.ClientConn, error) {
+	log.Debugf("Trying to connect to %s", addr)
+	conn, err := grpc.DialContext(ctx, addr,
+		grpc.WithInsecure(),
+		grpc.WithBlock(),
+		grpc.WithBackoffMaxDelay(time.Second*5),
+		grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: time.Second * 10, Timeout: time.Second * 5}))
+	if err == nil {
+		log.Debugf("Connection succeeded")
 	}
-	log.Debugf("Too many connection attempts, giving up!")
-	return nil, errors.New("Timeout attempting to conect")
+	return conn, err
 }
 
-func getVolthaPods(cs *kubernetes.Clientset, coreFilter *regexp.Regexp) []*volthaPod {
-	var rtrn []*volthaPod
-
+func getVolthaPods(cs *kubernetes.Clientset, coreFilter *regexp.Regexp) ([]*volthaPod, error) {
 	pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
 	if err != nil {
-		panic(err)
+		return nil, err
 	}
 	//log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
 
+	var rtrn []*volthaPod
 	for _, v := range pods.Items {
 		if coreFilter.MatchString(v.Name) {
 			log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
@@ -192,16 +189,17 @@
 			}
 		}
 	}
-	return rtrn
+	return rtrn, nil
 }
 
 func reconcilePodDeviceIds(pod *volthaPod, ids map[string]struct{}) bool {
-	conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
-	defer conn.Close()
+	ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
+	conn, err := connect(ctx, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
 	if err != nil {
 		log.Debugf("Could not query devices from %s, could not connect", pod.name)
 		return false
 	}
+	defer conn.Close()
 
 	var idList cmn.IDs
 	for k := range ids {
@@ -221,7 +219,8 @@
 func queryPodDeviceIds(pod *volthaPod) map[string]struct{} {
 	var rtrn = make(map[string]struct{})
 	// Open a connection to the pod
-	conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
+	ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
+	conn, err := connect(ctx, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
 	if err != nil {
 		log.Debugf("Could not query devices from %s, could not connect", pod.name)
 		return rtrn
@@ -482,26 +481,26 @@
 	return false
 }
 
-func setConnection(client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
+func setConnection(ctx context.Context, client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
 	log.Debugf("Configuring backend %s : connection %s in cluster %s\n\n",
 		backend, connection, cluster)
 	cnf := &pb.Conn{Server: "grpc_command", Cluster: cluster, Backend: backend,
 		Connection: connection, Addr: addr,
 		Port: port}
-	if res, err := client.SetConnection(context.Background(), cnf); err != nil {
+	if res, err := client.SetConnection(ctx, cnf); err != nil {
 		log.Debugf("failed SetConnection RPC call: %s", err)
 	} else {
 		log.Debugf("Result: %v", res)
 	}
 }
 
-func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
+func setAffinity(ctx context.Context, client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
 	log.Debugf("Configuring backend %s : affinities \n", backend)
 	aff := &pb.Affinity{Router: afrouterRouterName, Route: "dev_manager", Cluster: afrouterRWClusterName, Backend: backend}
 	for k := range ids {
 		log.Debugf("Setting affinity for id %s", k)
 		aff.Id = k
-		if res, err := client.SetAffinity(context.Background(), aff); err != nil {
+		if res, err := client.SetAffinity(ctx, aff); err != nil {
 			log.Debugf("failed affinity RPC call: %s", err)
 		} else {
 			log.Debugf("Result: %v", res)
@@ -521,12 +520,17 @@
 	return ""
 }
 
-func monitorDiscovery(client pb.ConfigurationClient,
+func monitorDiscovery(ctx context.Context,
+	client pb.ConfigurationClient,
 	ch <-chan *ic.InterContainerMessage,
-	coreGroups [][]*volthaPod) {
+	coreGroups [][]*volthaPod,
+	doneCh chan<- struct{}) {
+	defer close(doneCh)
+
 	var id = make(map[string]struct{})
 
 	select {
+	case <-ctx.Done():
 	case msg := <-ch:
 		log.Debugf("Received a device discovery notification")
 		device := &ic.DeviceDiscovered{}
@@ -536,7 +540,7 @@
 			// Set the affinity of the discovered device.
 			if be := getBackendForCore(device.Id, coreGroups); be != "" {
 				id[device.Id] = struct{}{}
-				setAffinity(client, id, be)
+				setAffinity(ctx, client, id, be)
 			} else {
 				log.Error("Cant use an empty string as a backend name")
 			}
@@ -545,20 +549,25 @@
 	}
 }
 
-func startDiscoveryMonitor(client pb.ConfigurationClient,
-	coreGroups [][]*volthaPod) error {
+func startDiscoveryMonitor(ctx context.Context,
+	client pb.ConfigurationClient,
+	coreGroups [][]*volthaPod) (<-chan struct{}, error) {
+	doneCh := make(chan struct{})
 	var ch <-chan *ic.InterContainerMessage
 	// Connect to kafka for discovery events
 	topic := &kafka.Topic{Name: kafkaTopic}
 	kc, err := newKafkaClient(kafkaClientType, kafkaHost, kafkaPort, kafkaInstanceID)
 	kc.Start()
+	defer kc.Stop()
 
 	if ch, err = kc.Subscribe(topic); err != nil {
 		log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
-		return err
+		close(doneCh)
+		return doneCh, err
 	}
-	go monitorDiscovery(client, ch, coreGroups)
-	return nil
+
+	go monitorDiscovery(ctx, client, ch, coreGroups, doneCh)
+	return doneCh, nil
 }
 
 // Determines which items in core groups
@@ -635,7 +644,7 @@
 	return coreGroupDiffs
 }
 
-func applyAddrDiffs(client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
+func applyAddrDiffs(ctx context.Context, client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
 	var newEntries [][]*volthaPod
 
 	log.Debug("Applying diffs")
@@ -660,7 +669,7 @@
 						}
 					}
 					// Send the affininty router new connection information
-					setConnection(client, afrouterRWClusterName, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, podGrpcPort)
+					setConnection(ctx, client, afrouterRWClusterName, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, podGrpcPort)
 					// Copy the new entry information over
 					cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
 					cores[k1][k2].name = newEntries[k1][k2].name
@@ -700,7 +709,7 @@
 			}
 			mia[0].ipAddr = v1.ipAddr
 			mia[0].name = v1.name
-			setConnection(client, afrouterROClusterName, mia[0].backend, mia[0].connection, v1.ipAddr, podGrpcPort)
+			setConnection(ctx, client, afrouterROClusterName, mia[0].backend, mia[0].connection, v1.ipAddr, podGrpcPort)
 			// Now get rid of the mia entry just processed
 			mia = append(mia[:0], mia[1:]...)
 		}
@@ -724,12 +733,13 @@
 	}
 }
 
-func startCoreMonitor(client pb.ConfigurationClient,
+func startCoreMonitor(ctx context.Context,
+	client pb.ConfigurationClient,
 	clientset *kubernetes.Clientset,
 	rwCoreFltr *regexp.Regexp,
 	roCoreFltr *regexp.Regexp,
 	coreGroups [][]*volthaPod,
-	oRoPods []*volthaPod) error {
+	oRoPods []*volthaPod) {
 	// Now that initial allocation has been completed, monitor the pods
 	// for IP changes
 	// The main loop needs to do the following:
@@ -745,10 +755,22 @@
 	// If an IP address has changed (which shouldn't
 	// happen unless a pod is re-started) it should get
 	// caught by the pod name change.
+loop:
 	for {
-		time.Sleep(10 * time.Second) // Wait a while
+		select {
+		case <-ctx.Done():
+			// if we're done, exit
+			break loop
+		case <-time.After(10 * time.Second): //wait a while
+		}
+
 		// Get the rw core list from k8s
-		rwPods := getVolthaPods(clientset, rwCoreFltr)
+		rwPods, err := getVolthaPods(clientset, rwCoreFltr)
+		if err != nil {
+			log.Error(err)
+			continue
+		}
+
 		queryDeviceIds(rwPods)
 		updateDeviceIds(coreGroups, rwPods)
 		// If we didn't get 2n+1 pods then wait since
@@ -762,23 +784,26 @@
 		for _, v := range rwPods {
 			if !hasIpAddr(coreGroups, v.ipAddr) {
 				log.Debug("Address has changed...")
-				applyAddrDiffs(client, coreGroups, rwPods)
+				applyAddrDiffs(ctx, client, coreGroups, rwPods)
 				break
 			}
 		}
 
-		roPods := getVolthaPods(clientset, roCoreFltr)
+		roPods, err := getVolthaPods(clientset, roCoreFltr)
+		if err != nil {
+			log.Error(err)
+			continue
+		}
 
 		if len(roPods) != numROPods {
 			continue
 		}
 		for _, v := range roPods {
 			if !hasIpAddr(oRoPods, v.ipAddr) {
-				applyAddrDiffs(client, oRoPods, roPods)
+				applyAddrDiffs(ctx, client, oRoPods, roPods)
 				break
 			}
 		}
-
 	}
 }
 
@@ -804,19 +829,27 @@
 	return false
 }
 
-func main() {
-	// This is currently hard coded to a cluster with 3 servers
-	//var connections map[string]configConn = make(map[string]configConn)
-	//var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
-	var err error
-	var conn *grpc.ClientConn
+// endOnClose cancels the context when the connection closes
+func connectionActiveContext(conn *grpc.ClientConn) context.Context {
+	ctx, disconnected := context.WithCancel(context.Background())
+	go func() {
+		for state := conn.GetState(); state != connectivity.TransientFailure && state != connectivity.Shutdown; state = conn.GetState() {
+			if !conn.WaitForStateChange(context.Background(), state) {
+				break
+			}
+		}
+		log.Infof("Connection to afrouter lost")
+		disconnected()
+	}()
+	return ctx
+}
 
+func main() {
 	config := &Configuration{}
 	cmdParse := flag.NewFlagSet(path.Base(os.Args[0]), flag.ContinueOnError)
 	config.DisplayVersionOnly = cmdParse.Bool("version", false, "Print version information and exit")
 
-	err = cmdParse.Parse(os.Args[1:])
-	if err != nil {
+	if err := cmdParse.Parse(os.Args[1:]); err != nil {
 		fmt.Printf("Error: %v\n", err)
 		os.Exit(1)
 	}
@@ -827,10 +860,6 @@
 		return
 	}
 
-	// Set up the regular expression to identify the voltha cores
-	rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
-	roCoreFltr := regexp.MustCompile(`ro-core-`)
-
 	// Set up logging
 	if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
 		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
@@ -839,16 +868,38 @@
 	// Set up kubernetes api
 	clientset := k8sClientSet()
 
-	// Connect to the affinity router and set up the client
-	conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
+	for {
+		// Connect to the affinity router
+		conn, err := connect(context.Background(), afrouterApiAddress) // This is a sidecar container so communicating over localhost
+		if err != nil {
+			panic(err)
+		}
+
+		// monitor the connection status, end context if connection is lost
+		ctx := connectionActiveContext(conn)
+
+		// set up the client
+		client := pb.NewConfigurationClient(conn)
+
+		// determine config & repopulate the afrouter
+		generateAndMaintainConfiguration(ctx, client, clientset)
+
+		conn.Close()
+	}
+}
+
+// generateAndMaintainConfiguration does the pod-reconciliation work,
+// it only returns once all sub-processes have completed
+func generateAndMaintainConfiguration(ctx context.Context, client pb.ConfigurationClient, clientset *kubernetes.Clientset) {
+	// Set up the regular expression to identify the voltha cores
+	rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
+	roCoreFltr := regexp.MustCompile(`ro-core-`)
+
+	// Get the voltha rw-core podes
+	rwPods, err := getVolthaPods(clientset, rwCoreFltr)
 	if err != nil {
 		panic(err)
 	}
-	defer conn.Close()
-	client := pb.NewConfigurationClient(conn)
-
-	// Get the voltha rw-core podes
-	rwPods := getVolthaPods(clientset, rwCoreFltr)
 
 	// Fetch the devices held by each running core
 	queryDeviceIds(rwPods)
@@ -879,18 +930,21 @@
 	log.Info("Setting affinities")
 	// Now set the affinities for exising devices in the cores
 	for _, v := range coreGroups {
-		setAffinity(client, v[0].devIds, v[0].backend)
-		setAffinity(client, v[1].devIds, v[1].backend)
+		setAffinity(ctx, client, v[0].devIds, v[0].backend)
+		setAffinity(ctx, client, v[1].devIds, v[1].backend)
 	}
 	log.Info("Setting connections")
 	// Configure the backeds based on the calculated core groups
 	for _, v := range coreGroups {
-		setConnection(client, afrouterRWClusterName, v[0].backend, v[0].connection, v[0].ipAddr, podGrpcPort)
-		setConnection(client, afrouterRWClusterName, v[1].backend, v[1].connection, v[1].ipAddr, podGrpcPort)
+		setConnection(ctx, client, afrouterRWClusterName, v[0].backend, v[0].connection, v[0].ipAddr, podGrpcPort)
+		setConnection(ctx, client, afrouterRWClusterName, v[1].backend, v[1].connection, v[1].ipAddr, podGrpcPort)
 	}
 
 	// Process the read only pods
-	roPods := getVolthaPods(clientset, roCoreFltr)
+	roPods, err := getVolthaPods(clientset, roCoreFltr)
+	if err != nil {
+		panic(err)
+	}
 	for k, v := range roPods {
 		log.Debugf("Processing ro_pod %v", v)
 		vN := afrouterROClusterName + strconv.Itoa(k+1)
@@ -898,14 +952,15 @@
 		roPods[k].cluster = afrouterROClusterName
 		roPods[k].backend = vN
 		roPods[k].connection = vN + "1"
-		setConnection(client, afrouterROClusterName, v.backend, v.connection, v.ipAddr, podGrpcPort)
+		setConnection(ctx, client, afrouterROClusterName, v.backend, v.connection, v.ipAddr, podGrpcPort)
 	}
 
 	log.Info("Starting discovery monitoring")
-	startDiscoveryMonitor(client, coreGroups)
+	doneCh, _ := startDiscoveryMonitor(ctx, client, coreGroups)
 
 	log.Info("Starting core monitoring")
-	startCoreMonitor(client, clientset, rwCoreFltr,
-		roCoreFltr, coreGroups, roPods) // Never returns
-	return
+	startCoreMonitor(ctx, client, clientset, rwCoreFltr, roCoreFltr, coreGroups, roPods)
+
+	//ensure the discovery monitor to quit
+	<-doneCh
 }