Removal of exports that aren't needed, other genereal
cleanup of commented out code, and other minor changes.
Change-Id: Icb29cdc527d4c01e3a5d4d3d6de2e074745d0f33
diff --git a/afrouter/afrouter/affinity-router.go b/afrouter/afrouter/affinity-router.go
index 2b5a640..443a55e 100644
--- a/afrouter/afrouter/affinity-router.go
+++ b/afrouter/afrouter/affinity-router.go
@@ -42,7 +42,7 @@
curBknd **backend
}
-func NewAffinityRouter(rconf *RouterConfig, config *RouteConfig) (Router,error) {
+func newAffinityRouter(rconf *RouterConfig, config *RouteConfig) (Router,error) {
var err error = nil
var rtrn_err bool = false
// Validate the configuration
@@ -181,7 +181,7 @@
// Create the backend cluster or link to an existing one
ok := true
if dr.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
- if dr.bkndClstr, err = NewBackendCluster(config.backendCluster); err != nil {
+ if dr.bkndClstr, err = newBackendCluster(config.backendCluster); err != nil {
log.Errorf("Could not create a backend for router %s", config.Name)
rtrn_err = true
}
diff --git a/afrouter/afrouter/api.go b/afrouter/afrouter/api.go
index dd1506f..8235525 100644
--- a/afrouter/afrouter/api.go
+++ b/afrouter/afrouter/api.go
@@ -38,7 +38,7 @@
ar *ArouterProxy
}
-func NewApi(config *ApiConfig, ar *ArouterProxy) (*ArouterApi, error) {
+func newApi(config *ApiConfig, ar *ArouterProxy) (*ArouterApi, error) {
var rtrn_err bool
// Create a seperate server and listener for the API
// Validate the ip address if one is provided
diff --git a/afrouter/afrouter/arproxy.go b/afrouter/afrouter/arproxy.go
index b2fc190..d809fbb 100644
--- a/afrouter/afrouter/arproxy.go
+++ b/afrouter/afrouter/arproxy.go
@@ -49,7 +49,7 @@
arProxy = &ArouterProxy{servers:make(map[string]*server)}
// Create all the servers listed in the configuration
for _,s := range conf.Servers {
- if ns, err := NewServer(&s); err != nil {
+ if ns, err := newServer(&s); err != nil {
log.Error("Configuration failed")
return nil, err
} else {
@@ -59,7 +59,7 @@
// TODO: The API is not mandatory, check if it's even in the config before
// trying to create it. If it isn't then don't bother but log a warning.
- if api,err := NewApi(&conf.Api, arProxy); err != nil {
+ if api,err := newApi(&conf.Api, arProxy); err != nil {
return nil, err
} else {
arProxy.api = api
diff --git a/afrouter/afrouter/backend.go b/afrouter/afrouter/backend.go
index 6c97bb6..9262d43 100644
--- a/afrouter/afrouter/backend.go
+++ b/afrouter/afrouter/backend.go
@@ -121,7 +121,7 @@
//TODO: Move the backend type (active/active etc) to the cluster
// level. All backends should really be of the same type.
// Create a new backend cluster
-func NewBackendCluster(conf *BackendClusterConfig) (*backendCluster, error) {
+func newBackendCluster(conf *BackendClusterConfig) (*backendCluster, error) {
var err error = nil
var rtrn_err bool = false
var be *backend
diff --git a/afrouter/afrouter/binding-router.go b/afrouter/afrouter/binding-router.go
index 45e9a27..a87481b 100644
--- a/afrouter/afrouter/binding-router.go
+++ b/afrouter/afrouter/binding-router.go
@@ -121,7 +121,7 @@
return nil
}
-func NewBindingRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
+func newBindingRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
var rtrn_err bool = false
var err error = nil
log.Debugf("Creating binding router %s",config.Name)
@@ -213,7 +213,7 @@
// Create the backend cluster or link to an existing one
ok := true
if br.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
- if br.bkndClstr, err = NewBackendCluster(config.backendCluster); err != nil {
+ if br.bkndClstr, err = newBackendCluster(config.backendCluster); err != nil {
log.Errorf("Could not create a backend for router %s", config.Name)
rtrn_err = true
}
diff --git a/afrouter/afrouter/codec.go b/afrouter/afrouter/codec.go
index 20bf354..6090bdc 100644
--- a/afrouter/afrouter/codec.go
+++ b/afrouter/afrouter/codec.go
@@ -29,11 +29,11 @@
return CodecWithParent(&protoCodec{})
}
-func CodecWithParent(fallback grpc.Codec) grpc.Codec {
- return &rawCodec{fallback}
+func CodecWithParent(parent grpc.Codec) grpc.Codec {
+ return &transparentRoutingCodec{parent}
}
-type rawCodec struct {
+type transparentRoutingCodec struct {
parentCodec grpc.Codec
}
@@ -58,19 +58,19 @@
metaVal string
}
-func (c *rawCodec) Marshal(v interface{}) ([]byte, error) {
+func (cdc *transparentRoutingCodec) Marshal(v interface{}) ([]byte, error) {
switch t := v.(type) {
case *sbFrame:
return t.payload, nil
case *nbFrame:
return t.payload, nil
default:
- return c.parentCodec.Marshal(v)
+ return cdc.parentCodec.Marshal(v)
}
}
-func (c *rawCodec) Unmarshal(data []byte, v interface{}) error {
+func (cdc *transparentRoutingCodec) Unmarshal(data []byte, v interface{}) error {
switch t := v.(type) {
case *sbFrame:
t.payload = data
@@ -85,15 +85,15 @@
log.Debugf("Routing returned %v for method %s", t.be, t.mthdSlice[REQ_METHOD])
return nil
default:
- return c.parentCodec.Unmarshal(data,v)
+ return cdc.parentCodec.Unmarshal(data,v)
}
}
-func (c *rawCodec) String() string {
- return fmt.Sprintf("proxy>%s", c.parentCodec.String())
+func (cdc *transparentRoutingCodec) String() string {
+ return fmt.Sprintf("%s", cdc.parentCodec.String())
}
-// protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC.
+// protoCodec is a Codec implementation with protobuf. It is the default Codec for gRPC.
type protoCodec struct{}
func (protoCodec) Marshal(v interface{}) ([]byte, error) {
@@ -105,5 +105,5 @@
}
func (protoCodec) String() string {
- return "proto"
+ return "protoCodec"
}
diff --git a/afrouter/afrouter/method-router.go b/afrouter/afrouter/method-router.go
index 8394c60..e4824da 100644
--- a/afrouter/afrouter/method-router.go
+++ b/afrouter/afrouter/method-router.go
@@ -41,7 +41,7 @@
}
for _,rtv := range config.Routes {
var idx1 string
- r,err := newRouter(config, &rtv)
+ r,err := newSubRouter(config, &rtv)
if err != nil {
return nil, err
}
diff --git a/afrouter/afrouter/round-robin-router.go b/afrouter/afrouter/round-robin-router.go
index 02df6ac..4e81985 100644
--- a/afrouter/afrouter/round-robin-router.go
+++ b/afrouter/afrouter/round-robin-router.go
@@ -32,7 +32,7 @@
curBknd **backend
}
-func NewRoundRobinRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
+func newRoundRobinRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
var err error = nil
var rtrn_err bool = false
// Validate the configuration
@@ -74,7 +74,7 @@
// Create the backend cluster or link to an existing one
ok := true
if rr.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
- if rr.bkndClstr, err = NewBackendCluster(config.backendCluster); err != nil {
+ if rr.bkndClstr, err = newBackendCluster(config.backendCluster); err != nil {
log.Errorf("Could not create a backend for router %s", config.Name)
rtrn_err = true
}
diff --git a/afrouter/afrouter/router.go b/afrouter/afrouter/router.go
index 7b0576e..f1e729f 100644
--- a/afrouter/afrouter/router.go
+++ b/afrouter/afrouter/router.go
@@ -47,7 +47,7 @@
GetMetaKeyVal(serverStream grpc.ServerStream) (string,string,error)
}
-func NewRouter(config *RouterConfig) (Router, error) {
+func newRouter(config *RouterConfig) (Router, error) {
r,err := newMethodRouter(config)
if err == nil {
allRouters[r.Name()] = r
@@ -55,23 +55,23 @@
return r, err
}
-func newRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
+func newSubRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
idx := strIndex(rTypeNames, config.Type)
switch idx {
case RT_RPC_AFFINITY_MESSAGE:
- r,err := NewAffinityRouter(rconf, config)
+ r,err := newAffinityRouter(rconf, config)
if err == nil {
allRouters[rconf.Name+config.Name] = r
}
return r, err
case RT_BINDING:
- r,err := NewBindingRouter(rconf, config)
+ r,err := newBindingRouter(rconf, config)
if err == nil {
allRouters[rconf.Name+config.Name] = r
}
return r, err
case RT_ROUND_ROBIN:
- r,err := NewRoundRobinRouter(rconf, config)
+ r,err := newRoundRobinRouter(rconf, config)
if err == nil {
allRouters[rconf.Name+config.Name] = r
}
diff --git a/afrouter/afrouter/server.go b/afrouter/afrouter/server.go
index e2146ef..17c3b4f 100644
--- a/afrouter/afrouter/server.go
+++ b/afrouter/afrouter/server.go
@@ -63,7 +63,7 @@
var mthdSlicer *regexp.Regexp // The compiled regex to extract the package/service/method
-func NewServer(config *ServerConfig) (*server,error) {
+func newServer(config *ServerConfig) (*server,error) {
var err error = nil
var rtrn_err bool = false
var srvr *server
@@ -114,7 +114,7 @@
log.Debugf("Configuring the routers for server %s", srvr.name)
for p,r := range config.routers {
log.Debugf("Processing router %s for package %s", r.Name,p)
- if dr,err := NewRouter(r); err != nil {
+ if dr,err := newRouter(r); err != nil {
log.Error(err)
return nil, err
} else {
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 63c29f6..bd2787a 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -17,21 +17,16 @@
package main
import (
- //"os"
- "fmt"
"time"
"regexp"
"errors"
"strconv"
- //"io/ioutil"
- //"encoding/json"
"k8s.io/client-go/rest"
"google.golang.org/grpc"
"golang.org/x/net/context"
"k8s.io/client-go/kubernetes"
"github.com/golang/protobuf/ptypes"
- //"k8s.io/apimachinery/pkg/api/errors"
"github.com/opencord/voltha-go/common/log"
kafka "github.com/opencord/voltha-go/kafka"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -215,19 +210,6 @@
return true
}
-//func groupEmptyCores(pods []*rwPod) [][]*rwPod {
-// return [][]*rwPod{}
-//}
-
-//func groupPods(pods []*rwPod) [][]*rwPod {
-
-// if allEmpty(pods) == true {
-// return groupEmptyCores(pods)
-// } else {
-// return groupPopulatedCores(pods)
-// }
-//}
-
func rmPod(pods []*rwPod, idx int) []*rwPod {
return append(pods[:idx],pods[idx+1:]...)
}
@@ -284,57 +266,6 @@
return rtrn,out
}
-func groupIntersectingPods(pd []*podTrack) ([][]*rwPod,[]*podTrack) {
- var rtrn [][]*rwPod
-
- for k,_ := range pd {
- if pd[k].dn == true { // Already processed?
- //log.Debugf("%s already processed", pd[k].pod.name)
- continue
- }
- if len(pd[k].pod.devIds) == 0 { // Ignore pods with no devices
- ////log.Debugf("%s empty pod", pd[k].pod.name)
- continue
- }
- // Start a pod group with this pod
- var grp []*rwPod
- grp = append(grp, pd[k].pod)
- pd[k].dn = true
- //log.Debugf("Creating new group %s", pd[k].pod.name)
- // Find the peer pod based on device overlap
- // It's ok if one isn't found, an empty one will be used instead
- for k1,_ := range pd {
- if pd[k1].dn == true { // Skip over eliminated pods
- //log.Debugf("%s eliminated pod", pd[k1].pod.name)
- continue
- }
- if len(pd[k1].pod.devIds) == 0 { // Skip pods with no devices
- //log.Debugf("%s empty pod", pd[k1].pod.name)
- continue
- }
- if intersect(pd[k].pod.devIds, pd[k1].pod.devIds) == true {
- //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
- if pd[k].pod.node == pd[k1].pod.node {
- // This should never happen
- log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
- pd[k].pod.name, pd[k1].pod.name)
- continue
- }
- pd[k1].dn = true
- grp = append(grp, pd[k1].pod)
- break
- }
- }
- rtrn = append(rtrn, grp)
- //log.Debugf("Added group %s", grp[0].name)
- // Check if the number of groups = half the pods, if so all groups are started.
- if len(rtrn) == len(pd) >> 1 {
- break
- }
- }
- return rtrn,pd
-}
-
func unallocPodCount(pd []*podTrack) int {
var rtrn int = 0
for _,v := range pd {
@@ -376,23 +307,6 @@
return grps, pods
}
-func startRemainingGroups(grps [][]*rwPod, pd []*podTrack) ([][]*rwPod, []*podTrack) {
- var grp []*rwPod
-
- for k,_ := range pd {
- if sameNode(pd[k].pod, grps) == true {
- continue
- }
- grp = append(grp, pd[k].pod)
- grps = append(grps, grp)
- pd[k].dn = true
- if len(grps) == len(pd) >> 1 {
- break
- }
- }
- return grps, pd
-}
-
func hasSingleSecondNode(grp []*rwPod) bool {
var srvrs map[string]struct{} = make(map[string]struct{})
for k,_ := range grp {
@@ -488,67 +402,6 @@
return grps
}
-func groupRemainingPods(grps [][]*rwPod, pd []*podTrack) [][]*rwPod{
- var lgrps [][]*rwPod
- // All groups must be started when this function is called.
- // Copy incomplete groups
- for k,_ := range grps {
- if len(grps[k]) != 2 {
- lgrps = append(lgrps, grps[k])
- }
- }
-
- // Add all pairing candidates to each started group.
- for k,_ := range pd {
- if pd[k].dn == true {
- continue
- }
- for k2,_ := range lgrps {
- if lgrps[k2][0].node != pd[k].pod.node {
- lgrps[k2] = append(lgrps[k2], pd[k].pod)
- }
- }
- }
-
- //TODO: If any member of lgrps doesn't have at least 2
- // nodes something is wrong. Check for that here
-
- for {
- for { // Address groups with only a single server choice
- var ssn bool = false
-
- for k,_ := range lgrps {
- // Now if any of the groups only have a single
- // node as the choice for the second member
- // address that one first.
- if hasSingleSecondNode(lgrps[k]) == true {
- ssn = true
- // Add this pairing to the groups
- grps = addNode(grps, lgrps[k][0], lgrps[k][1])
- // Since this node is now used, remove it from all
- // remaining tenative groups
- lgrps = removeNode(lgrps, lgrps[k][1])
- // Now remove this group completely since
- // it's been addressed
- lgrps = append(lgrps[:k],lgrps[k+1:]...)
- break
- }
- }
- if ssn == false {
- break
- }
- }
- // Now adress one of the remaining groups
- if len(lgrps) == 0 {
- break // Nothing left to do, exit the loop
- }
- grps = addNode(grps, lgrps[0][0], lgrps[0][1])
- lgrps = removeNode(lgrps, lgrps[0][1])
- lgrps = append(lgrps[:0],lgrps[1:]...)
- }
- return grps
-}
-
func groupPods1(pods []*rwPod) [][]*rwPod {
var rtrn [][]*rwPod
var podCt int = len(pods)
@@ -573,103 +426,6 @@
}
}
-func groupPods(pods []*rwPod) [][]*rwPod {
- var rtrn [][]*rwPod
- var pd []*podTrack
-
- // Tracking of the grouping process
- for k,_ := range pods {
- pd = append(pd, &podTrack{pods[k],false})
- }
-
-
- rtrn,pd = groupIntersectingPods(pd)
- // There are several outcomes here
- // 1) All pods have been paired and we're done
- // 2) Some un-allocated pods remain
- // 2.a) All groups have been started
- // 2.b) Not all groups have been started
- if unallocPodCount(pd) == 0 {
- return rtrn
- } else if len(rtrn) == len(pd) >> 1 { // All groupings started
- // Allocate the remaining (presumably empty) pods to the started groups
- return groupRemainingPods(rtrn, pd)
- } else { // Some groupings started
- // Start empty groups with remaining pods
- // each grouping is on a different server then
- // allocate remaining pods.
- rtrn, pd = startRemainingGroups(rtrn, pd)
- return groupRemainingPods(rtrn, pd)
- }
-
-
- // Establish groupings of non-empty pods that have overlapping devices.
- for k,_ := range pd {
- if pd[k].dn == true { // Already processed?
- //log.Debugf("%s already processed", pd[k].pod.name)
- continue
- }
- if len(pd[k].pod.devIds) == 0 { // Ignore pods with no devices
- ////log.Debugf("%s empty pod", pd[k].pod.name)
- continue
- }
- // Start a pod group with this pod
- var grp []*rwPod
- grp = append(grp, pd[k].pod)
- pd[k].dn = true
- //log.Debugf("Creating new group %s", pd[k].pod.name)
- // Find the peer pod based on device overlap
- // It's ok if one isn't found, an empty one will be used instead
- for k1,_ := range pd {
- if pd[k1].dn == true { // Skip over eliminated pods
- //log.Debugf("%s eliminated pod", pd[k1].pod.name)
- continue
- }
- if len(pd[k1].pod.devIds) == 0 { // Skip pods with no devices
- //log.Debugf("%s empty pod", pd[k1].pod.name)
- continue
- }
- if intersect(pd[k].pod.devIds, pd[k1].pod.devIds) == true {
- //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
- pd[k1].dn = true
- grp = append(grp, pd[k1].pod)
- break
- }
- }
- rtrn = append(rtrn, grp)
- //log.Debugf("Added group %s", grp[0].name)
- }
- // Now find any grouping without 2 members and assign one of the
- // pods with no devices and on a different server to it.
- // If there are no pods with no devices left before all
- // groups are filled report an exception but leave one of the
- // groups with only one pod.
- for k,_ := range rtrn {
- if len(rtrn[k]) < 2 {
- for k2,_ := range pd {
- if pd[k2].dn == true {
- continue
- }
- // There should be only empty pods here
- if len(pd[k2].pod.devIds) != 0 {
- log.Error("Non empty pod found where empty pod was expected")
- continue
- }
- if pd[k2].pod.node == rtrn[k][0].node {
- //log.Error("Pods aren't on different servers, continuing")
- continue
- }
- // Add this empty and unused pod to the group
- //log.Debugf("Adding empty pod %s", pd[k2].pod.name)
- rtrn[k] = append(rtrn[k], pd[k2].pod)
- pd[k2].dn = true
- break
- }
- }
- }
- return rtrn
-}
-
func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
for k,_ := range d1 {
if _,ok := d2[k]; ok == true {
@@ -864,6 +620,21 @@
}
}
+func updateDeviceIds(coreGroups [][]*rwPod, rwPods []*rwPod) {
+ var byName map[string]*rwPod = make(map[string]*rwPod)
+
+ // Convinience
+ for _,v := range rwPods {
+ byName[v.name] = v
+ }
+
+ for k1,v1 := range coreGroups {
+ for k2,_ := range v1 {
+ coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
+ }
+ }
+}
+
func startCoreMonitor(client pb.ConfigurationClient,
clientset *kubernetes.Clientset,
coreFltr *regexp.Regexp,
@@ -888,6 +659,7 @@
// Get the rw core list from k8s
rwPods := getRwPods(clientset, coreFltr)
queryDeviceIds(rwPods)
+ updateDeviceIds(coreGroups, rwPods)
// If we didn't get 2n+1 pods then wait since
// something is down and will hopefully come
// back up at some point.
@@ -923,8 +695,6 @@
// This is currently hard coded to a cluster with 3 servers
//var connections map[string]configConn = make(map[string]configConn)
//var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
- var rwCoreNodesPrev map[string][]rwPod = make(map[string][]rwPod)
- var firstTime bool = true
var err error
var conn *grpc.ClientConn
@@ -942,6 +712,7 @@
// Connect to the affinity router and set up the client
conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
+ defer conn.Close()
if err != nil {
panic(err.Error())
}
@@ -960,7 +731,6 @@
coreGroups := groupPods1(rwPods)
-
// Assign the groupings to the the backends and connections
for k,_ := range coreGroups {
for k1,_ := range coreGroups[k] {
@@ -968,7 +738,7 @@
coreGroups[k][k1].connection = "vcore"+strconv.Itoa(k+1)+strconv.Itoa(k1+1)
}
}
- log.Debug("Core gouping completed")
+ log.Info("Core gouping completed")
// TODO: Debugging code, comment out for production
for k,v := range coreGroups {
@@ -976,146 +746,24 @@
log.Debugf("Core group %d,%d: %v", k, k2, v2)
}
}
- log.Debug("Setting affinities")
+ log.Info("Setting affinities")
// Now set the affinities for exising devices in the cores
for _,v := range coreGroups {
setAffinity(client, v[0].devIds, v[0].backend)
setAffinity(client, v[1].devIds, v[1].backend)
}
- log.Debug("Setting connections")
+ log.Info("Setting connections")
// Configure the backeds based on the calculated core groups
for _,v := range coreGroups {
setConnection(client, v[0].backend, v[0].connection, v[0].ipAddr, 50057)
setConnection(client, v[1].backend, v[1].connection, v[1].ipAddr, 50057)
}
- log.Debug("Starting discovery monitoring")
+ log.Info("Starting discovery monitoring")
startDiscoveryMonitor(client, coreGroups)
- log.Debugf("Starting core monitoring")
+ log.Info("Starting core monitoring")
startCoreMonitor(client, clientset, coreFltr, coreGroups) // Never returns
return
-
-
- // The main loop needs to do the following:
- // 1) Periodically query the pods and filter out
- // the vcore ones
- // 2) Validate that the pods running are the same
- // as the previous check
- // 3) Validate that the IP addresses are the same
- // as the last check.
- // If the pod name(s) ha(s/ve) changed then remove
- // the unused pod names and add in the new pod names
- // maintaining the cluster/backend information.
- // If an IP address has changed (which shouldn't
- // happen unless a pod is re-started) it should get
- // caught by the pod name change.
- for {
- var rwCorePods map[string]rwPod = make(map[string]rwPod)
- var rwCoreNodes map[string][]rwPod = make(map[string][]rwPod)
- pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
- if err != nil {
- panic(err.Error())
- }
- log.Debugf("There are %d pods in the cluster\n", len(pods.Items))
-
- /*
- for k,v := range pods.Items {
- if v.Namespace == "voltha" && coreFltr.MatchString(v.Name) {
- fmt.Printf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
- v.Status.PodIP, v.Spec.NodeName)
- //fmt.Printf("Pod %v,%v\n\n\n",k,v)
- _ = k
- // Add this pod to the core structure.
- if firstTime == true {
- rwCorePodsPrev[v.Name] = rwPod{name:v.Name,node:v.Spec.NodeName}
- rwCoreNodesPrev[v.Spec.NodeName] =
- append(rwCoreNodesPrev[v.Spec.NodeName], rwPod{name:v.Name,node:v.Spec.NodeName})
- }
- rwCorePods[v.Name] = rwPod{v.Name,v.Status.PodIP,v.Spec.NodeName, "", ""}
- rwCoreNodes[v.Spec.NodeName] =
- append(rwCoreNodes[v.Spec.NodeName], rwPod{v.Name,v.Status.PodIP,v.Spec.NodeName,"",""})
- }
- }
- */
-
- if len(rwCorePods) != 6 {
- continue
- }
-
- //fmt.Printf("Pod map: %v\n", rwCorePods)
- //fmt.Printf("Pod map2: %v\n", rwCoreNodes)
-
- // Examples for error handling:
- // - Use helper functions like e.g. errors.IsNotFound()
- // - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
- /*
- _, err = clientset.CoreV1().Pods("default").Get("example-xxxxx", metav1.GetOptions{})
- if errors.IsNotFound(err) {
- fmt.Printf("Pod not found\n")
- } else if statusError, isStatus := err.(*errors.StatusError); isStatus {
- fmt.Printf("Error getting pod %v\n", statusError.ErrStatus.Message)
- } else if err != nil {
- panic(err.Error())
- } else {
- fmt.Printf("Found pod\n")
- }
- */
- // Set the association to backends and connections only once.
- // TODO: This needs to be reworked for when a pod crashes
- // and it's name changes.
- if firstTime == true {
- be := 1
- for k,_ := range rwCoreNodesPrev { // Each node has 2 cores running on it
- // Use a pretty dumb distribution algorithm.
- log.Debugf("Processing core node %s:%d\n", k,be)
- rwCoreNodesPrev[k][0].backend = "vcore"+strconv.Itoa(be)
- rwCoreNodesPrev[k][0].connection = "vcore"+strconv.Itoa(be)+strconv.Itoa(1)
- rwCoreNodesPrev[k][1].backend = "vcore"+strconv.Itoa(be%3+1)
- rwCoreNodesPrev[k][1].connection = "vcore"+strconv.Itoa(be%3+1)+strconv.Itoa(2)
- be++
- }
- }
-
- log.Debugf("Backend Allocation: %v",rwCoreNodesPrev)
- // Compare the current node IPs with the previous node IPs and if they differ
- // then set the new one and send the command to configure the router with the
- // new backend connection.
- for k,v := range rwCoreNodesPrev {
- if rwCoreNodes[k][0].ipAddr != rwCoreNodesPrev[k][0].ipAddr {
- log.Debugf("Configuring backend %s : connection %s\n\n", v[0].backend, v[0].connection)
- cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:rwCoreNodesPrev[k][0].backend,
- Connection:rwCoreNodesPrev[k][0].connection,Addr:rwCoreNodes[k][0].ipAddr,
- Port:50057}
- if res, err := client.SetConnection(context.Background(), cnf); err != nil {
- log.Debugf("failed SetConnection RPC call: %s", err)
- } else {
- log.Debugf("Result: %v", res)
- rwCoreNodesPrev[k][0].ipAddr = rwCoreNodes[k][0].ipAddr
- }
- }
- if rwCoreNodes[k][1].ipAddr != rwCoreNodesPrev[k][1].ipAddr {
- log.Debugf("Configuring backend %s : connection %s\n\n", v[1].backend, v[1].connection)
- cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:rwCoreNodesPrev[k][1].backend,
- Connection:rwCoreNodesPrev[k][1].connection,Addr:rwCoreNodes[k][1].ipAddr,
- Port:50057}
- if res, err := client.SetConnection(context.Background(), cnf); err != nil {
- log.Debugf("failed SetConnection RPC call: %s", err)
- } else {
- log.Debugf("Result: %v", res)
- rwCoreNodesPrev[k][1].ipAddr = rwCoreNodes[k][1].ipAddr
- }
- }
- }
-
-
- fmt.Printf("The structure for setting the connections is: %v\n", rwCoreNodesPrev)
- firstTime = false
-
- // Now make the API calls
- time.Sleep(10 * time.Second)
- }
- conn.Close()
-
}