Removal of exports that aren't needed, other genereal
cleanup of commented out code, and other minor changes.
Change-Id: Icb29cdc527d4c01e3a5d4d3d6de2e074745d0f33
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 63c29f6..bd2787a 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -17,21 +17,16 @@
package main
import (
- //"os"
- "fmt"
"time"
"regexp"
"errors"
"strconv"
- //"io/ioutil"
- //"encoding/json"
"k8s.io/client-go/rest"
"google.golang.org/grpc"
"golang.org/x/net/context"
"k8s.io/client-go/kubernetes"
"github.com/golang/protobuf/ptypes"
- //"k8s.io/apimachinery/pkg/api/errors"
"github.com/opencord/voltha-go/common/log"
kafka "github.com/opencord/voltha-go/kafka"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -215,19 +210,6 @@
return true
}
-//func groupEmptyCores(pods []*rwPod) [][]*rwPod {
-// return [][]*rwPod{}
-//}
-
-//func groupPods(pods []*rwPod) [][]*rwPod {
-
-// if allEmpty(pods) == true {
-// return groupEmptyCores(pods)
-// } else {
-// return groupPopulatedCores(pods)
-// }
-//}
-
func rmPod(pods []*rwPod, idx int) []*rwPod {
return append(pods[:idx],pods[idx+1:]...)
}
@@ -284,57 +266,6 @@
return rtrn,out
}
-func groupIntersectingPods(pd []*podTrack) ([][]*rwPod,[]*podTrack) {
- var rtrn [][]*rwPod
-
- for k,_ := range pd {
- if pd[k].dn == true { // Already processed?
- //log.Debugf("%s already processed", pd[k].pod.name)
- continue
- }
- if len(pd[k].pod.devIds) == 0 { // Ignore pods with no devices
- ////log.Debugf("%s empty pod", pd[k].pod.name)
- continue
- }
- // Start a pod group with this pod
- var grp []*rwPod
- grp = append(grp, pd[k].pod)
- pd[k].dn = true
- //log.Debugf("Creating new group %s", pd[k].pod.name)
- // Find the peer pod based on device overlap
- // It's ok if one isn't found, an empty one will be used instead
- for k1,_ := range pd {
- if pd[k1].dn == true { // Skip over eliminated pods
- //log.Debugf("%s eliminated pod", pd[k1].pod.name)
- continue
- }
- if len(pd[k1].pod.devIds) == 0 { // Skip pods with no devices
- //log.Debugf("%s empty pod", pd[k1].pod.name)
- continue
- }
- if intersect(pd[k].pod.devIds, pd[k1].pod.devIds) == true {
- //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
- if pd[k].pod.node == pd[k1].pod.node {
- // This should never happen
- log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
- pd[k].pod.name, pd[k1].pod.name)
- continue
- }
- pd[k1].dn = true
- grp = append(grp, pd[k1].pod)
- break
- }
- }
- rtrn = append(rtrn, grp)
- //log.Debugf("Added group %s", grp[0].name)
- // Check if the number of groups = half the pods, if so all groups are started.
- if len(rtrn) == len(pd) >> 1 {
- break
- }
- }
- return rtrn,pd
-}
-
func unallocPodCount(pd []*podTrack) int {
var rtrn int = 0
for _,v := range pd {
@@ -376,23 +307,6 @@
return grps, pods
}
-func startRemainingGroups(grps [][]*rwPod, pd []*podTrack) ([][]*rwPod, []*podTrack) {
- var grp []*rwPod
-
- for k,_ := range pd {
- if sameNode(pd[k].pod, grps) == true {
- continue
- }
- grp = append(grp, pd[k].pod)
- grps = append(grps, grp)
- pd[k].dn = true
- if len(grps) == len(pd) >> 1 {
- break
- }
- }
- return grps, pd
-}
-
func hasSingleSecondNode(grp []*rwPod) bool {
var srvrs map[string]struct{} = make(map[string]struct{})
for k,_ := range grp {
@@ -488,67 +402,6 @@
return grps
}
-func groupRemainingPods(grps [][]*rwPod, pd []*podTrack) [][]*rwPod{
- var lgrps [][]*rwPod
- // All groups must be started when this function is called.
- // Copy incomplete groups
- for k,_ := range grps {
- if len(grps[k]) != 2 {
- lgrps = append(lgrps, grps[k])
- }
- }
-
- // Add all pairing candidates to each started group.
- for k,_ := range pd {
- if pd[k].dn == true {
- continue
- }
- for k2,_ := range lgrps {
- if lgrps[k2][0].node != pd[k].pod.node {
- lgrps[k2] = append(lgrps[k2], pd[k].pod)
- }
- }
- }
-
- //TODO: If any member of lgrps doesn't have at least 2
- // nodes something is wrong. Check for that here
-
- for {
- for { // Address groups with only a single server choice
- var ssn bool = false
-
- for k,_ := range lgrps {
- // Now if any of the groups only have a single
- // node as the choice for the second member
- // address that one first.
- if hasSingleSecondNode(lgrps[k]) == true {
- ssn = true
- // Add this pairing to the groups
- grps = addNode(grps, lgrps[k][0], lgrps[k][1])
- // Since this node is now used, remove it from all
- // remaining tenative groups
- lgrps = removeNode(lgrps, lgrps[k][1])
- // Now remove this group completely since
- // it's been addressed
- lgrps = append(lgrps[:k],lgrps[k+1:]...)
- break
- }
- }
- if ssn == false {
- break
- }
- }
- // Now adress one of the remaining groups
- if len(lgrps) == 0 {
- break // Nothing left to do, exit the loop
- }
- grps = addNode(grps, lgrps[0][0], lgrps[0][1])
- lgrps = removeNode(lgrps, lgrps[0][1])
- lgrps = append(lgrps[:0],lgrps[1:]...)
- }
- return grps
-}
-
func groupPods1(pods []*rwPod) [][]*rwPod {
var rtrn [][]*rwPod
var podCt int = len(pods)
@@ -573,103 +426,6 @@
}
}
-func groupPods(pods []*rwPod) [][]*rwPod {
- var rtrn [][]*rwPod
- var pd []*podTrack
-
- // Tracking of the grouping process
- for k,_ := range pods {
- pd = append(pd, &podTrack{pods[k],false})
- }
-
-
- rtrn,pd = groupIntersectingPods(pd)
- // There are several outcomes here
- // 1) All pods have been paired and we're done
- // 2) Some un-allocated pods remain
- // 2.a) All groups have been started
- // 2.b) Not all groups have been started
- if unallocPodCount(pd) == 0 {
- return rtrn
- } else if len(rtrn) == len(pd) >> 1 { // All groupings started
- // Allocate the remaining (presumably empty) pods to the started groups
- return groupRemainingPods(rtrn, pd)
- } else { // Some groupings started
- // Start empty groups with remaining pods
- // each grouping is on a different server then
- // allocate remaining pods.
- rtrn, pd = startRemainingGroups(rtrn, pd)
- return groupRemainingPods(rtrn, pd)
- }
-
-
- // Establish groupings of non-empty pods that have overlapping devices.
- for k,_ := range pd {
- if pd[k].dn == true { // Already processed?
- //log.Debugf("%s already processed", pd[k].pod.name)
- continue
- }
- if len(pd[k].pod.devIds) == 0 { // Ignore pods with no devices
- ////log.Debugf("%s empty pod", pd[k].pod.name)
- continue
- }
- // Start a pod group with this pod
- var grp []*rwPod
- grp = append(grp, pd[k].pod)
- pd[k].dn = true
- //log.Debugf("Creating new group %s", pd[k].pod.name)
- // Find the peer pod based on device overlap
- // It's ok if one isn't found, an empty one will be used instead
- for k1,_ := range pd {
- if pd[k1].dn == true { // Skip over eliminated pods
- //log.Debugf("%s eliminated pod", pd[k1].pod.name)
- continue
- }
- if len(pd[k1].pod.devIds) == 0 { // Skip pods with no devices
- //log.Debugf("%s empty pod", pd[k1].pod.name)
- continue
- }
- if intersect(pd[k].pod.devIds, pd[k1].pod.devIds) == true {
- //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
- pd[k1].dn = true
- grp = append(grp, pd[k1].pod)
- break
- }
- }
- rtrn = append(rtrn, grp)
- //log.Debugf("Added group %s", grp[0].name)
- }
- // Now find any grouping without 2 members and assign one of the
- // pods with no devices and on a different server to it.
- // If there are no pods with no devices left before all
- // groups are filled report an exception but leave one of the
- // groups with only one pod.
- for k,_ := range rtrn {
- if len(rtrn[k]) < 2 {
- for k2,_ := range pd {
- if pd[k2].dn == true {
- continue
- }
- // There should be only empty pods here
- if len(pd[k2].pod.devIds) != 0 {
- log.Error("Non empty pod found where empty pod was expected")
- continue
- }
- if pd[k2].pod.node == rtrn[k][0].node {
- //log.Error("Pods aren't on different servers, continuing")
- continue
- }
- // Add this empty and unused pod to the group
- //log.Debugf("Adding empty pod %s", pd[k2].pod.name)
- rtrn[k] = append(rtrn[k], pd[k2].pod)
- pd[k2].dn = true
- break
- }
- }
- }
- return rtrn
-}
-
func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
for k,_ := range d1 {
if _,ok := d2[k]; ok == true {
@@ -864,6 +620,21 @@
}
}
+func updateDeviceIds(coreGroups [][]*rwPod, rwPods []*rwPod) {
+ var byName map[string]*rwPod = make(map[string]*rwPod)
+
+ // Convinience
+ for _,v := range rwPods {
+ byName[v.name] = v
+ }
+
+ for k1,v1 := range coreGroups {
+ for k2,_ := range v1 {
+ coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
+ }
+ }
+}
+
func startCoreMonitor(client pb.ConfigurationClient,
clientset *kubernetes.Clientset,
coreFltr *regexp.Regexp,
@@ -888,6 +659,7 @@
// Get the rw core list from k8s
rwPods := getRwPods(clientset, coreFltr)
queryDeviceIds(rwPods)
+ updateDeviceIds(coreGroups, rwPods)
// If we didn't get 2n+1 pods then wait since
// something is down and will hopefully come
// back up at some point.
@@ -923,8 +695,6 @@
// This is currently hard coded to a cluster with 3 servers
//var connections map[string]configConn = make(map[string]configConn)
//var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
- var rwCoreNodesPrev map[string][]rwPod = make(map[string][]rwPod)
- var firstTime bool = true
var err error
var conn *grpc.ClientConn
@@ -942,6 +712,7 @@
// Connect to the affinity router and set up the client
conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
+ defer conn.Close()
if err != nil {
panic(err.Error())
}
@@ -960,7 +731,6 @@
coreGroups := groupPods1(rwPods)
-
// Assign the groupings to the the backends and connections
for k,_ := range coreGroups {
for k1,_ := range coreGroups[k] {
@@ -968,7 +738,7 @@
coreGroups[k][k1].connection = "vcore"+strconv.Itoa(k+1)+strconv.Itoa(k1+1)
}
}
- log.Debug("Core gouping completed")
+ log.Info("Core gouping completed")
// TODO: Debugging code, comment out for production
for k,v := range coreGroups {
@@ -976,146 +746,24 @@
log.Debugf("Core group %d,%d: %v", k, k2, v2)
}
}
- log.Debug("Setting affinities")
+ log.Info("Setting affinities")
// Now set the affinities for exising devices in the cores
for _,v := range coreGroups {
setAffinity(client, v[0].devIds, v[0].backend)
setAffinity(client, v[1].devIds, v[1].backend)
}
- log.Debug("Setting connections")
+ log.Info("Setting connections")
// Configure the backeds based on the calculated core groups
for _,v := range coreGroups {
setConnection(client, v[0].backend, v[0].connection, v[0].ipAddr, 50057)
setConnection(client, v[1].backend, v[1].connection, v[1].ipAddr, 50057)
}
- log.Debug("Starting discovery monitoring")
+ log.Info("Starting discovery monitoring")
startDiscoveryMonitor(client, coreGroups)
- log.Debugf("Starting core monitoring")
+ log.Info("Starting core monitoring")
startCoreMonitor(client, clientset, coreFltr, coreGroups) // Never returns
return
-
-
- // The main loop needs to do the following:
- // 1) Periodically query the pods and filter out
- // the vcore ones
- // 2) Validate that the pods running are the same
- // as the previous check
- // 3) Validate that the IP addresses are the same
- // as the last check.
- // If the pod name(s) ha(s/ve) changed then remove
- // the unused pod names and add in the new pod names
- // maintaining the cluster/backend information.
- // If an IP address has changed (which shouldn't
- // happen unless a pod is re-started) it should get
- // caught by the pod name change.
- for {
- var rwCorePods map[string]rwPod = make(map[string]rwPod)
- var rwCoreNodes map[string][]rwPod = make(map[string][]rwPod)
- pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
- if err != nil {
- panic(err.Error())
- }
- log.Debugf("There are %d pods in the cluster\n", len(pods.Items))
-
- /*
- for k,v := range pods.Items {
- if v.Namespace == "voltha" && coreFltr.MatchString(v.Name) {
- fmt.Printf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
- v.Status.PodIP, v.Spec.NodeName)
- //fmt.Printf("Pod %v,%v\n\n\n",k,v)
- _ = k
- // Add this pod to the core structure.
- if firstTime == true {
- rwCorePodsPrev[v.Name] = rwPod{name:v.Name,node:v.Spec.NodeName}
- rwCoreNodesPrev[v.Spec.NodeName] =
- append(rwCoreNodesPrev[v.Spec.NodeName], rwPod{name:v.Name,node:v.Spec.NodeName})
- }
- rwCorePods[v.Name] = rwPod{v.Name,v.Status.PodIP,v.Spec.NodeName, "", ""}
- rwCoreNodes[v.Spec.NodeName] =
- append(rwCoreNodes[v.Spec.NodeName], rwPod{v.Name,v.Status.PodIP,v.Spec.NodeName,"",""})
- }
- }
- */
-
- if len(rwCorePods) != 6 {
- continue
- }
-
- //fmt.Printf("Pod map: %v\n", rwCorePods)
- //fmt.Printf("Pod map2: %v\n", rwCoreNodes)
-
- // Examples for error handling:
- // - Use helper functions like e.g. errors.IsNotFound()
- // - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
- /*
- _, err = clientset.CoreV1().Pods("default").Get("example-xxxxx", metav1.GetOptions{})
- if errors.IsNotFound(err) {
- fmt.Printf("Pod not found\n")
- } else if statusError, isStatus := err.(*errors.StatusError); isStatus {
- fmt.Printf("Error getting pod %v\n", statusError.ErrStatus.Message)
- } else if err != nil {
- panic(err.Error())
- } else {
- fmt.Printf("Found pod\n")
- }
- */
- // Set the association to backends and connections only once.
- // TODO: This needs to be reworked for when a pod crashes
- // and it's name changes.
- if firstTime == true {
- be := 1
- for k,_ := range rwCoreNodesPrev { // Each node has 2 cores running on it
- // Use a pretty dumb distribution algorithm.
- log.Debugf("Processing core node %s:%d\n", k,be)
- rwCoreNodesPrev[k][0].backend = "vcore"+strconv.Itoa(be)
- rwCoreNodesPrev[k][0].connection = "vcore"+strconv.Itoa(be)+strconv.Itoa(1)
- rwCoreNodesPrev[k][1].backend = "vcore"+strconv.Itoa(be%3+1)
- rwCoreNodesPrev[k][1].connection = "vcore"+strconv.Itoa(be%3+1)+strconv.Itoa(2)
- be++
- }
- }
-
- log.Debugf("Backend Allocation: %v",rwCoreNodesPrev)
- // Compare the current node IPs with the previous node IPs and if they differ
- // then set the new one and send the command to configure the router with the
- // new backend connection.
- for k,v := range rwCoreNodesPrev {
- if rwCoreNodes[k][0].ipAddr != rwCoreNodesPrev[k][0].ipAddr {
- log.Debugf("Configuring backend %s : connection %s\n\n", v[0].backend, v[0].connection)
- cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:rwCoreNodesPrev[k][0].backend,
- Connection:rwCoreNodesPrev[k][0].connection,Addr:rwCoreNodes[k][0].ipAddr,
- Port:50057}
- if res, err := client.SetConnection(context.Background(), cnf); err != nil {
- log.Debugf("failed SetConnection RPC call: %s", err)
- } else {
- log.Debugf("Result: %v", res)
- rwCoreNodesPrev[k][0].ipAddr = rwCoreNodes[k][0].ipAddr
- }
- }
- if rwCoreNodes[k][1].ipAddr != rwCoreNodesPrev[k][1].ipAddr {
- log.Debugf("Configuring backend %s : connection %s\n\n", v[1].backend, v[1].connection)
- cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:rwCoreNodesPrev[k][1].backend,
- Connection:rwCoreNodesPrev[k][1].connection,Addr:rwCoreNodes[k][1].ipAddr,
- Port:50057}
- if res, err := client.SetConnection(context.Background(), cnf); err != nil {
- log.Debugf("failed SetConnection RPC call: %s", err)
- } else {
- log.Debugf("Result: %v", res)
- rwCoreNodesPrev[k][1].ipAddr = rwCoreNodes[k][1].ipAddr
- }
- }
- }
-
-
- fmt.Printf("The structure for setting the connections is: %v\n", rwCoreNodesPrev)
- firstTime = false
-
- // Now make the API calls
- time.Sleep(10 * time.Second)
- }
- conn.Close()
-
}