Added environment variables to make hardcoded values configurable.
Fixes: VOL-1652
Change-Id: I0d02bd9db5c06de98e154bda3e3eb0d85ac2ac16
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 44b8f53..f5f2453 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -18,14 +18,17 @@
import (
"errors"
+ "fmt"
+ "math"
+ "os"
"regexp"
"strconv"
"time"
"github.com/golang/protobuf/ptypes"
- empty "github.com/golang/protobuf/ptypes/empty"
+ "github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/common/log"
- kafka "github.com/opencord/voltha-go/kafka"
+ "github.com/opencord/voltha-go/kafka"
pb "github.com/opencord/voltha-protos/go/afrouter"
cmn "github.com/opencord/voltha-protos/go/common"
ic "github.com/opencord/voltha-protos/go/inter_container"
@@ -37,19 +40,6 @@
"k8s.io/client-go/rest"
)
-type configConn struct {
- Server string `json:"Server"`
- Cluster string `json:"Cluster"`
- Backend string `json:"Backend"`
- connections map[string]connection
-}
-
-type connection struct {
- Name string `json:"Connection"`
- Addr string `json:"Addr"`
- Port uint64 `json:"Port"`
-}
-
type volthaPod struct {
name string
ipAddr string
@@ -65,10 +55,41 @@
dn bool
}
-var nPods int = 6
+var (
+ podNamespace = getStrEnv("POD_NAMESPACE", "voltha")
+ podGrpcPort = uint64(getIntEnv("POD_GRPC_PORT", 0, math.MaxUint16, 50057))
-// Topic is affinityRouter
-// port: 9092
+ numRWPods = getIntEnv("NUM_RW_PODS", 1, math.MaxInt32, 6)
+ numROPods = getIntEnv("NUM_RO_PODS", 1, math.MaxInt32, 3)
+
+ afrouterRouterName = getStrEnv("AFROUTER_ROUTER_NAME", "vcore")
+ afrouterRWClusterName = getStrEnv("AFROUTER_RW_CLUSTER_NAME", "vcore")
+ afrouterROClusterName = getStrEnv("AFROUTER_RO_CLUSTER_NAME", "ro_vcore")
+
+ kafkaTopic = getStrEnv("KAFKA_TOPIC", "AffinityRouter")
+ kafkaClientType = getStrEnv("KAFKA_CLIENT_TYPE", "sarama")
+ kafkaHost = getStrEnv("KAFKA_HOST", "kafka")
+ kafkaPort = getIntEnv("KAFKA_PORT", 0, math.MaxUint16, 9092)
+ kafkaInstanceID = getStrEnv("KAFKA_INSTANCE_ID", "arouterd")
+)
+
+func getIntEnv(key string, min, max, defaultValue int) int {
+ if val, have := os.LookupEnv(key); have {
+ num, err := strconv.Atoi(val)
+ if err != nil || !(min <= num && num <= max) {
+ panic(fmt.Errorf("%s must be a number in the range [%d, %d]; default: %d", key, min, max, defaultValue))
+ }
+ return num
+ }
+ return defaultValue
+}
+
+func getStrEnv(key, defaultValue string) string {
+ if val, have := os.LookupEnv(key); have {
+ return val
+ }
+ return defaultValue
+}
func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
@@ -126,14 +147,14 @@
func getVolthaPods(cs *kubernetes.Clientset, coreFilter *regexp.Regexp) []*volthaPod {
var rtrn []*volthaPod
- pods, err := cs.CoreV1().Pods("").List(metav1.ListOptions{})
+ pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
//log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
for _, v := range pods.Items {
- if v.Namespace == "voltha" && coreFilter.MatchString(v.Name) {
+ if coreFilter.MatchString(v.Name) {
log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
v.Status.PodIP, v.Spec.NodeName)
// Only add the pod if it has an IP address. If it doesn't then it likely crashed and
@@ -148,16 +169,18 @@
}
func reconcilePodDeviceIds(pod *volthaPod, ids map[string]struct{}) bool {
- var idList cmn.IDs
- for k, _ := range ids {
- idList.Items = append(idList.Items, &cmn.ID{Id: k})
- }
- conn, err := connect(pod.ipAddr + ":50057")
+ conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
defer conn.Close()
if err != nil {
log.Debugf("Could not query devices from %s, could not connect", pod.name)
return false
}
+
+ var idList cmn.IDs
+ for k := range ids {
+ idList.Items = append(idList.Items, &cmn.ID{Id: k})
+ }
+
client := vpb.NewVolthaServiceClient(conn)
_, err = client.ReconcileDevices(context.Background(), &idList)
if err != nil {
@@ -169,10 +192,9 @@
}
func queryPodDeviceIds(pod *volthaPod) map[string]struct{} {
- var rtrn map[string]struct{} = make(map[string]struct{})
+ var rtrn = make(map[string]struct{})
// Open a connection to the pod
- // port 50057
- conn, err := connect(pod.ipAddr + ":50057")
+ conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
if err != nil {
log.Debugf("Could not query devices from %s, could not connect", pod.name)
return rtrn
@@ -192,7 +214,7 @@
}
func queryDeviceIds(pods []*volthaPod) {
- for pk, _ := range pods {
+ for pk := range pods {
// Keep the old Id list if a new list is not returned
if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
pods[pk].devIds = idList
@@ -201,7 +223,7 @@
}
func allEmpty(pods []*volthaPod) bool {
- for k, _ := range pods {
+ for k := range pods {
if len(pods[k].devIds) != 0 {
return false
}
@@ -234,12 +256,12 @@
//log.Debugf("Creating new group %s", pd[k].pod.name)
// Find the peer pod based on device overlap
// It's ok if one isn't found, an empty one will be used instead
- for k, _ := range pods {
+ for k := range pods {
if len(pods[k].devIds) == 0 { // Skip pods with no devices
//log.Debugf("%s empty pod", pd[k1].pod.name)
continue
}
- if intersect(grp[0].devIds, pods[k].devIds) == true {
+ if intersect(grp[0].devIds, pods[k].devIds) {
//log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
if grp[0].node == pods[k].node {
// This should never happen
@@ -268,7 +290,7 @@
func unallocPodCount(pd []*podTrack) int {
var rtrn int = 0
for _, v := range pd {
- if v.dn == false {
+ if !v.dn {
rtrn++
}
}
@@ -290,7 +312,7 @@
func startRemainingGroups1(grps [][]*volthaPod, pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
var grp []*volthaPod
- for k, _ := range pods {
+ for k := range pods {
if sameNode(pods[k], grps) {
continue
}
@@ -306,21 +328,21 @@
}
func hasSingleSecondNode(grp []*volthaPod) bool {
- var srvrs map[string]struct{} = make(map[string]struct{})
- for k, _ := range grp {
+ var servers = make(map[string]struct{})
+ for k := range grp {
if k == 0 {
continue // Ignore the first item
}
- srvrs[grp[k].node] = struct{}{}
+ servers[grp[k].node] = struct{}{}
}
- if len(srvrs) == 1 {
+ if len(servers) == 1 {
return true
}
return false
}
func addNode(grps [][]*volthaPod, idx *volthaPod, item *volthaPod) [][]*volthaPod {
- for k, _ := range grps {
+ for k := range grps {
if grps[k][0].name == idx.name {
grps[k] = append(grps[k], item)
return grps
@@ -331,8 +353,8 @@
}
func removeNode(grps [][]*volthaPod, item *volthaPod) [][]*volthaPod {
- for k, _ := range grps {
- for k1, _ := range grps[k] {
+ for k := range grps {
+ for k1 := range grps[k] {
if grps[k][k1].name == item.name {
grps[k] = append(grps[k][:k1], grps[k][k1+1:]...)
break
@@ -346,15 +368,15 @@
var lgrps [][]*volthaPod
// All groups must be started when this function is called.
// Copy incomplete groups
- for k, _ := range grps {
+ for k := range grps {
if len(grps[k]) != 2 {
lgrps = append(lgrps, grps[k])
}
}
// Add all pairing candidates to each started group.
- for k, _ := range pods {
- for k2, _ := range lgrps {
+ for k := range pods {
+ for k2 := range lgrps {
if lgrps[k2][0].node != pods[k].node {
lgrps[k2] = append(lgrps[k2], pods[k])
}
@@ -368,11 +390,11 @@
for { // Address groups with only a single server choice
var ssn bool = false
- for k, _ := range lgrps {
+ for k := range lgrps {
// Now if any of the groups only have a single
// node as the choice for the second member
// address that one first.
- if hasSingleSecondNode(lgrps[k]) == true {
+ if hasSingleSecondNode(lgrps[k]) {
ssn = true
// Add this pairing to the groups
grps = addNode(grps, lgrps[k][0], lgrps[k][1])
@@ -385,11 +407,11 @@
break
}
}
- if ssn == false {
+ if !ssn {
break
}
}
- // Now adress one of the remaining groups
+ // Now address one of the remaining groups
if len(lgrps) == 0 {
break // Nothing left to do, exit the loop
}
@@ -425,8 +447,8 @@
}
func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
- for k, _ := range d1 {
- if _, ok := d2[k]; ok == true {
+ for k := range d1 {
+ if _, ok := d2[k]; ok {
return true
}
}
@@ -448,8 +470,8 @@
func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
log.Debugf("Configuring backend %s : affinities \n", backend)
- aff := &pb.Affinity{Router: "vcore", Route: "dev_manager", Cluster: "vcore", Backend: backend}
- for k, _ := range ids {
+ aff := &pb.Affinity{Router: afrouterRouterName, Route: "dev_manager", Cluster: afrouterRWClusterName, Backend: backend}
+ for k := range ids {
log.Debugf("Setting affinity for id %s", k)
aff.Id = k
if res, err := client.SetAffinity(context.Background(), aff); err != nil {
@@ -475,7 +497,7 @@
func monitorDiscovery(client pb.ConfigurationClient,
ch <-chan *ic.InterContainerMessage,
coreGroups [][]*volthaPod) {
- var id map[string]struct{} = make(map[string]struct{})
+ var id = make(map[string]struct{})
select {
case msg := <-ch:
@@ -500,12 +522,12 @@
coreGroups [][]*volthaPod) error {
var ch <-chan *ic.InterContainerMessage
// Connect to kafka for discovery events
- topic := &kafka.Topic{Name: "AffinityRouter"}
- kc, err := newKafkaClient("sarama", "kafka", 9092, "arouterd")
+ topic := &kafka.Topic{Name: kafkaTopic}
+ kc, err := newKafkaClient(kafkaClientType, kafkaHost, kafkaPort, kafkaInstanceID)
kc.Start()
if ch, err = kc.Subscribe(topic); err != nil {
- log.Error("Could not subscribe to the 'AffinityRouter' channel, discovery disabled")
+ log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
return err
}
go monitorDiscovery(client, ch, coreGroups)
@@ -518,19 +540,19 @@
// items and a pod list with the new items
func getAddrDiffs(coreGroups [][]*volthaPod, rwPods []*volthaPod) ([][]*volthaPod, []*volthaPod) {
var nList []*volthaPod
- var rtrn [][]*volthaPod = make([][]*volthaPod, nPods>>1)
- var ipAddrs map[string]struct{} = make(map[string]struct{})
+ var rtrn = make([][]*volthaPod, numRWPods>>1)
+ var ipAddrs = make(map[string]struct{})
log.Debug("Get addr diffs")
// Start with an empty array
- for k, _ := range rtrn {
+ for k := range rtrn {
rtrn[k] = make([]*volthaPod, 2)
}
// Build a list with only the new items
for _, v := range rwPods {
- if hasIpAddr(coreGroups, v.ipAddr) == false {
+ if !hasIpAddr(coreGroups, v.ipAddr) {
nList = append(nList, v)
}
ipAddrs[v.ipAddr] = struct{}{} // for the search below
@@ -539,7 +561,7 @@
// Now build the coreGroups with only the changed items
for k1, v1 := range coreGroups {
for k2, v2 := range v1 {
- if _, ok := ipAddrs[v2.ipAddr]; ok == false {
+ if _, ok := ipAddrs[v2.ipAddr]; !ok {
rtrn[k1][k2] = v2
}
}
@@ -569,7 +591,7 @@
if v2 == nil { // Nothing to do here
continue
}
- if _, ok := srvrs[v2.node]; ok == true {
+ if _, ok := srvrs[v2.node]; ok {
coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
if len(srvrs[v2.node]) > 1 { // remove one entry from the list
srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
@@ -606,12 +628,12 @@
// Pull the device ids from the active-active peer
ids := queryPodDeviceIds(cores[k1][k2^1])
if len(ids) != 0 {
- if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
+ if !reconcilePodDeviceIds(newEntries[k1][k2], ids) {
log.Errorf("Attempt to reconcile ids on pod %v failed", newEntries[k1][k2])
}
}
// Send the affininty router new connection information
- setConnection(client, "vcore", v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
+ setConnection(client, afrouterRWClusterName, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, podGrpcPort)
// Copy the new entry information over
cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
cores[k1][k2].name = newEntries[k1][k2].name
@@ -633,7 +655,7 @@
break
}
}
- if found == false {
+ if !found {
mia = append(mia, cores[k1])
}
}
@@ -646,12 +668,12 @@
break
}
}
- if found == true {
+ if found {
continue
}
mia[0].ipAddr = v1.ipAddr
mia[0].name = v1.name
- setConnection(client, "ro_vcore", mia[0].backend, mia[0].connection, v1.ipAddr, 50057)
+ setConnection(client, afrouterROClusterName, mia[0].backend, mia[0].connection, v1.ipAddr, podGrpcPort)
// Now get rid of the mia entry just processed
mia = append(mia[:0], mia[1:]...)
}
@@ -661,15 +683,15 @@
}
func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
- var byName map[string]*volthaPod = make(map[string]*volthaPod)
+ var byName = make(map[string]*volthaPod)
- // Convinience
+ // Convenience
for _, v := range rwPods {
byName[v.name] = v
}
for k1, v1 := range coreGroups {
- for k2, _ := range v1 {
+ for k2 := range v1 {
coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
}
}
@@ -705,14 +727,13 @@
// If we didn't get 2n+1 pods then wait since
// something is down and will hopefully come
// back up at some point.
- // TODO: remove the 6 pod hardcoding
- if len(rwPods) != 6 {
+ if len(rwPods) != numRWPods {
continue
}
// We have all pods, check if any IP addresses
// have changed.
for _, v := range rwPods {
- if hasIpAddr(coreGroups, v.ipAddr) == false {
+ if !hasIpAddr(coreGroups, v.ipAddr) {
log.Debug("Address has changed...")
applyAddrDiffs(client, coreGroups, rwPods)
break
@@ -721,11 +742,11 @@
roPods := getVolthaPods(clientset, roCoreFltr)
- if len(roPods) != 3 {
+ if len(roPods) != numROPods {
continue
}
for _, v := range roPods {
- if hasIpAddr(oRoPods, v.ipAddr) == false {
+ if !hasIpAddr(oRoPods, v.ipAddr) {
applyAddrDiffs(client, oRoPods, roPods)
break
}
@@ -797,14 +818,14 @@
coreGroups := groupPods1(rwPods)
// Assign the groupings to the the backends and connections
- for k, _ := range coreGroups {
- for k1, _ := range coreGroups[k] {
- coreGroups[k][k1].cluster = "vcore"
- coreGroups[k][k1].backend = "vcore" + strconv.Itoa(k+1)
- coreGroups[k][k1].connection = "vcore" + strconv.Itoa(k+1) + strconv.Itoa(k1+1)
+ for k := range coreGroups {
+ for k1 := range coreGroups[k] {
+ coreGroups[k][k1].cluster = afrouterRWClusterName
+ coreGroups[k][k1].backend = afrouterRWClusterName + strconv.Itoa(k+1)
+ coreGroups[k][k1].connection = afrouterRWClusterName + strconv.Itoa(k+1) + strconv.Itoa(k1+1)
}
}
- log.Info("Core gouping completed")
+ log.Info("Core grouping completed")
// TODO: Debugging code, comment out for production
for k, v := range coreGroups {
@@ -821,20 +842,20 @@
log.Info("Setting connections")
// Configure the backeds based on the calculated core groups
for _, v := range coreGroups {
- setConnection(client, "vcore", v[0].backend, v[0].connection, v[0].ipAddr, 50057)
- setConnection(client, "vcore", v[1].backend, v[1].connection, v[1].ipAddr, 50057)
+ setConnection(client, afrouterRWClusterName, v[0].backend, v[0].connection, v[0].ipAddr, podGrpcPort)
+ setConnection(client, afrouterRWClusterName, v[1].backend, v[1].connection, v[1].ipAddr, podGrpcPort)
}
// Process the read only pods
roPods := getVolthaPods(clientset, roCoreFltr)
for k, v := range roPods {
log.Debugf("Processing ro_pod %v", v)
- vN := "ro_vcore" + strconv.Itoa(k+1)
+ vN := afrouterROClusterName + strconv.Itoa(k+1)
log.Debugf("Setting connection %s, %s, %s", vN, vN+"1", v.ipAddr)
- roPods[k].cluster = "ro_core"
+ roPods[k].cluster = afrouterROClusterName
roPods[k].backend = vN
roPods[k].connection = vN + "1"
- setConnection(client, "ro_vcore", v.backend, v.connection, v.ipAddr, 50057)
+ setConnection(client, afrouterROClusterName, v.backend, v.connection, v.ipAddr, podGrpcPort)
}
log.Info("Starting discovery monitoring")