Added environment variables to make hardcoded values configurable.

Fixes: VOL-1652

Change-Id: I0d02bd9db5c06de98e154bda3e3eb0d85ac2ac16
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 44b8f53..f5f2453 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -18,14 +18,17 @@
 
 import (
 	"errors"
+	"fmt"
+	"math"
+	"os"
 	"regexp"
 	"strconv"
 	"time"
 
 	"github.com/golang/protobuf/ptypes"
-	empty "github.com/golang/protobuf/ptypes/empty"
+	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/common/log"
-	kafka "github.com/opencord/voltha-go/kafka"
+	"github.com/opencord/voltha-go/kafka"
 	pb "github.com/opencord/voltha-protos/go/afrouter"
 	cmn "github.com/opencord/voltha-protos/go/common"
 	ic "github.com/opencord/voltha-protos/go/inter_container"
@@ -37,19 +40,6 @@
 	"k8s.io/client-go/rest"
 )
 
-type configConn struct {
-	Server      string `json:"Server"`
-	Cluster     string `json:"Cluster"`
-	Backend     string `json:"Backend"`
-	connections map[string]connection
-}
-
-type connection struct {
-	Name string `json:"Connection"`
-	Addr string `json:"Addr"`
-	Port uint64 `json:"Port"`
-}
-
 type volthaPod struct {
 	name       string
 	ipAddr     string
@@ -65,10 +55,41 @@
 	dn  bool
 }
 
-var nPods int = 6
+var (
+	podNamespace = getStrEnv("POD_NAMESPACE", "voltha")
+	podGrpcPort  = uint64(getIntEnv("POD_GRPC_PORT", 0, math.MaxUint16, 50057))
 
-// Topic is affinityRouter
-// port: 9092
+	numRWPods = getIntEnv("NUM_RW_PODS", 1, math.MaxInt32, 6)
+	numROPods = getIntEnv("NUM_RO_PODS", 1, math.MaxInt32, 3)
+
+	afrouterRouterName    = getStrEnv("AFROUTER_ROUTER_NAME", "vcore")
+	afrouterRWClusterName = getStrEnv("AFROUTER_RW_CLUSTER_NAME", "vcore")
+	afrouterROClusterName = getStrEnv("AFROUTER_RO_CLUSTER_NAME", "ro_vcore")
+
+	kafkaTopic      = getStrEnv("KAFKA_TOPIC", "AffinityRouter")
+	kafkaClientType = getStrEnv("KAFKA_CLIENT_TYPE", "sarama")
+	kafkaHost       = getStrEnv("KAFKA_HOST", "kafka")
+	kafkaPort       = getIntEnv("KAFKA_PORT", 0, math.MaxUint16, 9092)
+	kafkaInstanceID = getStrEnv("KAFKA_INSTANCE_ID", "arouterd")
+)
+
+func getIntEnv(key string, min, max, defaultValue int) int {
+	if val, have := os.LookupEnv(key); have {
+		num, err := strconv.Atoi(val)
+		if err != nil || !(min <= num && num <= max) {
+			panic(fmt.Errorf("%s must be a number in the range [%d, %d]; default: %d", key, min, max, defaultValue))
+		}
+		return num
+	}
+	return defaultValue
+}
+
+func getStrEnv(key, defaultValue string) string {
+	if val, have := os.LookupEnv(key); have {
+		return val
+	}
+	return defaultValue
+}
 
 func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
 
@@ -126,14 +147,14 @@
 func getVolthaPods(cs *kubernetes.Clientset, coreFilter *regexp.Regexp) []*volthaPod {
 	var rtrn []*volthaPod
 
-	pods, err := cs.CoreV1().Pods("").List(metav1.ListOptions{})
+	pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
 	if err != nil {
 		panic(err.Error())
 	}
 	//log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
 
 	for _, v := range pods.Items {
-		if v.Namespace == "voltha" && coreFilter.MatchString(v.Name) {
+		if coreFilter.MatchString(v.Name) {
 			log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
 				v.Status.PodIP, v.Spec.NodeName)
 			// Only add the pod if it has an IP address. If it doesn't then it likely crashed and
@@ -148,16 +169,18 @@
 }
 
 func reconcilePodDeviceIds(pod *volthaPod, ids map[string]struct{}) bool {
-	var idList cmn.IDs
-	for k, _ := range ids {
-		idList.Items = append(idList.Items, &cmn.ID{Id: k})
-	}
-	conn, err := connect(pod.ipAddr + ":50057")
+	conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
 	defer conn.Close()
 	if err != nil {
 		log.Debugf("Could not query devices from %s, could not connect", pod.name)
 		return false
 	}
+
+	var idList cmn.IDs
+	for k := range ids {
+		idList.Items = append(idList.Items, &cmn.ID{Id: k})
+	}
+
 	client := vpb.NewVolthaServiceClient(conn)
 	_, err = client.ReconcileDevices(context.Background(), &idList)
 	if err != nil {
@@ -169,10 +192,9 @@
 }
 
 func queryPodDeviceIds(pod *volthaPod) map[string]struct{} {
-	var rtrn map[string]struct{} = make(map[string]struct{})
+	var rtrn = make(map[string]struct{})
 	// Open a connection to the pod
-	// port 50057
-	conn, err := connect(pod.ipAddr + ":50057")
+	conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
 	if err != nil {
 		log.Debugf("Could not query devices from %s, could not connect", pod.name)
 		return rtrn
@@ -192,7 +214,7 @@
 }
 
 func queryDeviceIds(pods []*volthaPod) {
-	for pk, _ := range pods {
+	for pk := range pods {
 		// Keep the old Id list if a new list is not returned
 		if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
 			pods[pk].devIds = idList
@@ -201,7 +223,7 @@
 }
 
 func allEmpty(pods []*volthaPod) bool {
-	for k, _ := range pods {
+	for k := range pods {
 		if len(pods[k].devIds) != 0 {
 			return false
 		}
@@ -234,12 +256,12 @@
 		//log.Debugf("Creating new group %s", pd[k].pod.name)
 		// Find the peer pod based on device overlap
 		// It's ok if one isn't found, an empty one will be used instead
-		for k, _ := range pods {
+		for k := range pods {
 			if len(pods[k].devIds) == 0 { // Skip pods with no devices
 				//log.Debugf("%s empty pod", pd[k1].pod.name)
 				continue
 			}
-			if intersect(grp[0].devIds, pods[k].devIds) == true {
+			if intersect(grp[0].devIds, pods[k].devIds) {
 				//log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
 				if grp[0].node == pods[k].node {
 					// This should never happen
@@ -268,7 +290,7 @@
 func unallocPodCount(pd []*podTrack) int {
 	var rtrn int = 0
 	for _, v := range pd {
-		if v.dn == false {
+		if !v.dn {
 			rtrn++
 		}
 	}
@@ -290,7 +312,7 @@
 func startRemainingGroups1(grps [][]*volthaPod, pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
 	var grp []*volthaPod
 
-	for k, _ := range pods {
+	for k := range pods {
 		if sameNode(pods[k], grps) {
 			continue
 		}
@@ -306,21 +328,21 @@
 }
 
 func hasSingleSecondNode(grp []*volthaPod) bool {
-	var srvrs map[string]struct{} = make(map[string]struct{})
-	for k, _ := range grp {
+	var servers = make(map[string]struct{})
+	for k := range grp {
 		if k == 0 {
 			continue // Ignore the first item
 		}
-		srvrs[grp[k].node] = struct{}{}
+		servers[grp[k].node] = struct{}{}
 	}
-	if len(srvrs) == 1 {
+	if len(servers) == 1 {
 		return true
 	}
 	return false
 }
 
 func addNode(grps [][]*volthaPod, idx *volthaPod, item *volthaPod) [][]*volthaPod {
-	for k, _ := range grps {
+	for k := range grps {
 		if grps[k][0].name == idx.name {
 			grps[k] = append(grps[k], item)
 			return grps
@@ -331,8 +353,8 @@
 }
 
 func removeNode(grps [][]*volthaPod, item *volthaPod) [][]*volthaPod {
-	for k, _ := range grps {
-		for k1, _ := range grps[k] {
+	for k := range grps {
+		for k1 := range grps[k] {
 			if grps[k][k1].name == item.name {
 				grps[k] = append(grps[k][:k1], grps[k][k1+1:]...)
 				break
@@ -346,15 +368,15 @@
 	var lgrps [][]*volthaPod
 	// All groups must be started when this function is called.
 	// Copy incomplete groups
-	for k, _ := range grps {
+	for k := range grps {
 		if len(grps[k]) != 2 {
 			lgrps = append(lgrps, grps[k])
 		}
 	}
 
 	// Add all pairing candidates to each started group.
-	for k, _ := range pods {
-		for k2, _ := range lgrps {
+	for k := range pods {
+		for k2 := range lgrps {
 			if lgrps[k2][0].node != pods[k].node {
 				lgrps[k2] = append(lgrps[k2], pods[k])
 			}
@@ -368,11 +390,11 @@
 		for { // Address groups with only a single server choice
 			var ssn bool = false
 
-			for k, _ := range lgrps {
+			for k := range lgrps {
 				// Now if any of the groups only have a single
 				// node as the choice for the second member
 				// address that one first.
-				if hasSingleSecondNode(lgrps[k]) == true {
+				if hasSingleSecondNode(lgrps[k]) {
 					ssn = true
 					// Add this pairing to the groups
 					grps = addNode(grps, lgrps[k][0], lgrps[k][1])
@@ -385,11 +407,11 @@
 					break
 				}
 			}
-			if ssn == false {
+			if !ssn {
 				break
 			}
 		}
-		// Now adress one of the remaining groups
+		// Now address one of the remaining groups
 		if len(lgrps) == 0 {
 			break // Nothing left to do, exit the loop
 		}
@@ -425,8 +447,8 @@
 }
 
 func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
-	for k, _ := range d1 {
-		if _, ok := d2[k]; ok == true {
+	for k := range d1 {
+		if _, ok := d2[k]; ok {
 			return true
 		}
 	}
@@ -448,8 +470,8 @@
 
 func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
 	log.Debugf("Configuring backend %s : affinities \n", backend)
-	aff := &pb.Affinity{Router: "vcore", Route: "dev_manager", Cluster: "vcore", Backend: backend}
-	for k, _ := range ids {
+	aff := &pb.Affinity{Router: afrouterRouterName, Route: "dev_manager", Cluster: afrouterRWClusterName, Backend: backend}
+	for k := range ids {
 		log.Debugf("Setting affinity for id %s", k)
 		aff.Id = k
 		if res, err := client.SetAffinity(context.Background(), aff); err != nil {
@@ -475,7 +497,7 @@
 func monitorDiscovery(client pb.ConfigurationClient,
 	ch <-chan *ic.InterContainerMessage,
 	coreGroups [][]*volthaPod) {
-	var id map[string]struct{} = make(map[string]struct{})
+	var id = make(map[string]struct{})
 
 	select {
 	case msg := <-ch:
@@ -500,12 +522,12 @@
 	coreGroups [][]*volthaPod) error {
 	var ch <-chan *ic.InterContainerMessage
 	// Connect to kafka for discovery events
-	topic := &kafka.Topic{Name: "AffinityRouter"}
-	kc, err := newKafkaClient("sarama", "kafka", 9092, "arouterd")
+	topic := &kafka.Topic{Name: kafkaTopic}
+	kc, err := newKafkaClient(kafkaClientType, kafkaHost, kafkaPort, kafkaInstanceID)
 	kc.Start()
 
 	if ch, err = kc.Subscribe(topic); err != nil {
-		log.Error("Could not subscribe to the 'AffinityRouter' channel, discovery disabled")
+		log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
 		return err
 	}
 	go monitorDiscovery(client, ch, coreGroups)
@@ -518,19 +540,19 @@
 // items and a pod list with the new items
 func getAddrDiffs(coreGroups [][]*volthaPod, rwPods []*volthaPod) ([][]*volthaPod, []*volthaPod) {
 	var nList []*volthaPod
-	var rtrn [][]*volthaPod = make([][]*volthaPod, nPods>>1)
-	var ipAddrs map[string]struct{} = make(map[string]struct{})
+	var rtrn = make([][]*volthaPod, numRWPods>>1)
+	var ipAddrs = make(map[string]struct{})
 
 	log.Debug("Get addr diffs")
 
 	// Start with an empty array
-	for k, _ := range rtrn {
+	for k := range rtrn {
 		rtrn[k] = make([]*volthaPod, 2)
 	}
 
 	// Build a list with only the new items
 	for _, v := range rwPods {
-		if hasIpAddr(coreGroups, v.ipAddr) == false {
+		if !hasIpAddr(coreGroups, v.ipAddr) {
 			nList = append(nList, v)
 		}
 		ipAddrs[v.ipAddr] = struct{}{} // for the search below
@@ -539,7 +561,7 @@
 	// Now build the coreGroups with only the changed items
 	for k1, v1 := range coreGroups {
 		for k2, v2 := range v1 {
-			if _, ok := ipAddrs[v2.ipAddr]; ok == false {
+			if _, ok := ipAddrs[v2.ipAddr]; !ok {
 				rtrn[k1][k2] = v2
 			}
 		}
@@ -569,7 +591,7 @@
 			if v2 == nil { // Nothing to do here
 				continue
 			}
-			if _, ok := srvrs[v2.node]; ok == true {
+			if _, ok := srvrs[v2.node]; ok {
 				coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
 				if len(srvrs[v2.node]) > 1 { // remove one entry from the list
 					srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
@@ -606,12 +628,12 @@
 					// Pull the device ids from the active-active peer
 					ids := queryPodDeviceIds(cores[k1][k2^1])
 					if len(ids) != 0 {
-						if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
+						if !reconcilePodDeviceIds(newEntries[k1][k2], ids) {
 							log.Errorf("Attempt to reconcile ids on pod %v failed", newEntries[k1][k2])
 						}
 					}
 					// Send the affininty router new connection information
-					setConnection(client, "vcore", v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
+					setConnection(client, afrouterRWClusterName, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, podGrpcPort)
 					// Copy the new entry information over
 					cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
 					cores[k1][k2].name = newEntries[k1][k2].name
@@ -633,7 +655,7 @@
 					break
 				}
 			}
-			if found == false {
+			if !found {
 				mia = append(mia, cores[k1])
 			}
 		}
@@ -646,12 +668,12 @@
 					break
 				}
 			}
-			if found == true {
+			if found {
 				continue
 			}
 			mia[0].ipAddr = v1.ipAddr
 			mia[0].name = v1.name
-			setConnection(client, "ro_vcore", mia[0].backend, mia[0].connection, v1.ipAddr, 50057)
+			setConnection(client, afrouterROClusterName, mia[0].backend, mia[0].connection, v1.ipAddr, podGrpcPort)
 			// Now get rid of the mia entry just processed
 			mia = append(mia[:0], mia[1:]...)
 		}
@@ -661,15 +683,15 @@
 }
 
 func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
-	var byName map[string]*volthaPod = make(map[string]*volthaPod)
+	var byName = make(map[string]*volthaPod)
 
-	// Convinience
+	// Convenience
 	for _, v := range rwPods {
 		byName[v.name] = v
 	}
 
 	for k1, v1 := range coreGroups {
-		for k2, _ := range v1 {
+		for k2 := range v1 {
 			coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
 		}
 	}
@@ -705,14 +727,13 @@
 		// If we didn't get 2n+1 pods then wait since
 		// something is down and will hopefully come
 		// back up at some point.
-		// TODO: remove the 6 pod hardcoding
-		if len(rwPods) != 6 {
+		if len(rwPods) != numRWPods {
 			continue
 		}
 		// We have all pods, check if any IP addresses
 		// have changed.
 		for _, v := range rwPods {
-			if hasIpAddr(coreGroups, v.ipAddr) == false {
+			if !hasIpAddr(coreGroups, v.ipAddr) {
 				log.Debug("Address has changed...")
 				applyAddrDiffs(client, coreGroups, rwPods)
 				break
@@ -721,11 +742,11 @@
 
 		roPods := getVolthaPods(clientset, roCoreFltr)
 
-		if len(roPods) != 3 {
+		if len(roPods) != numROPods {
 			continue
 		}
 		for _, v := range roPods {
-			if hasIpAddr(oRoPods, v.ipAddr) == false {
+			if !hasIpAddr(oRoPods, v.ipAddr) {
 				applyAddrDiffs(client, oRoPods, roPods)
 				break
 			}
@@ -797,14 +818,14 @@
 	coreGroups := groupPods1(rwPods)
 
 	// Assign the groupings to the the backends and connections
-	for k, _ := range coreGroups {
-		for k1, _ := range coreGroups[k] {
-			coreGroups[k][k1].cluster = "vcore"
-			coreGroups[k][k1].backend = "vcore" + strconv.Itoa(k+1)
-			coreGroups[k][k1].connection = "vcore" + strconv.Itoa(k+1) + strconv.Itoa(k1+1)
+	for k := range coreGroups {
+		for k1 := range coreGroups[k] {
+			coreGroups[k][k1].cluster = afrouterRWClusterName
+			coreGroups[k][k1].backend = afrouterRWClusterName + strconv.Itoa(k+1)
+			coreGroups[k][k1].connection = afrouterRWClusterName + strconv.Itoa(k+1) + strconv.Itoa(k1+1)
 		}
 	}
-	log.Info("Core gouping completed")
+	log.Info("Core grouping completed")
 
 	// TODO: Debugging code, comment out for production
 	for k, v := range coreGroups {
@@ -821,20 +842,20 @@
 	log.Info("Setting connections")
 	// Configure the backeds based on the calculated core groups
 	for _, v := range coreGroups {
-		setConnection(client, "vcore", v[0].backend, v[0].connection, v[0].ipAddr, 50057)
-		setConnection(client, "vcore", v[1].backend, v[1].connection, v[1].ipAddr, 50057)
+		setConnection(client, afrouterRWClusterName, v[0].backend, v[0].connection, v[0].ipAddr, podGrpcPort)
+		setConnection(client, afrouterRWClusterName, v[1].backend, v[1].connection, v[1].ipAddr, podGrpcPort)
 	}
 
 	// Process the read only pods
 	roPods := getVolthaPods(clientset, roCoreFltr)
 	for k, v := range roPods {
 		log.Debugf("Processing ro_pod %v", v)
-		vN := "ro_vcore" + strconv.Itoa(k+1)
+		vN := afrouterROClusterName + strconv.Itoa(k+1)
 		log.Debugf("Setting connection %s, %s, %s", vN, vN+"1", v.ipAddr)
-		roPods[k].cluster = "ro_core"
+		roPods[k].cluster = afrouterROClusterName
 		roPods[k].backend = vN
 		roPods[k].connection = vN + "1"
-		setConnection(client, "ro_vcore", v.backend, v.connection, v.ipAddr, 50057)
+		setConnection(client, afrouterROClusterName, v.backend, v.connection, v.ipAddr, podGrpcPort)
 	}
 
 	log.Info("Starting discovery monitoring")