[VOL-782,VOL-783,VOL-787]
Initial commit of the affinity router control plane
for voltha.
Change-Id: Ic2b5b52693d337e8107cfebfe6b92317d3c6d4f5
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
new file mode 100644
index 0000000..0ea0b49
--- /dev/null
+++ b/arouterd/arouterd.go
@@ -0,0 +1,947 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ //"os"
+ "fmt"
+ "time"
+ "regexp"
+ "errors"
+ "strconv"
+ //"io/ioutil"
+ //"encoding/json"
+
+ "k8s.io/client-go/rest"
+ "google.golang.org/grpc"
+ "golang.org/x/net/context"
+ "k8s.io/client-go/kubernetes"
+ "github.com/golang/protobuf/ptypes"
+ //"k8s.io/apimachinery/pkg/api/errors"
+ "github.com/opencord/voltha-go/common/log"
+ kafka "github.com/opencord/voltha-go/kafka"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ empty "github.com/golang/protobuf/ptypes/empty"
+ vpb "github.com/opencord/voltha-go/protos/voltha"
+ pb "github.com/opencord/voltha-go/protos/afrouter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+)
+
+type configConn struct {
+ Server string `json:"Server"`
+ Cluster string `json:"Cluster"`
+ Backend string `json:"Backend"`
+ connections map[string]connection
+}
+
+type connection struct {
+ Name string `json:"Connection"`
+ Addr string `json:"Addr"`
+ Port uint64 `json:"Port"`
+}
+
+type rwPod struct {
+ name string
+ ipAddr string
+ node string
+ devIds map[string]struct{}
+ backend string
+ connection string
+}
+
+type podTrack struct {
+ pod *rwPod
+ dn bool
+}
+
+// Topic is affinityRouter
+// port: 9092
+
+func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
+
+ log.Infow("kafka-client-type", log.Fields{"client": clientType})
+ switch clientType {
+ case "sarama":
+ return kafka.NewSaramaClient(
+ kafka.Host(host),
+ kafka.Port(port),
+ kafka.ConsumerType(kafka.GroupCustomer),
+ kafka.ProducerReturnOnErrors(true),
+ kafka.ProducerReturnOnSuccess(true),
+ kafka.ProducerMaxRetries(6),
+ kafka.NumPartitions(3),
+ kafka.ConsumerGroupName(instanceID),
+ kafka.ConsumerGroupPrefix(instanceID),
+ kafka.AutoCreateTopic(false),
+ kafka.ProducerFlushFrequency(5),
+ kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
+ }
+ return nil, errors.New("unsupported-client-type")
+}
+
+
+func k8sClientSet() *kubernetes.Clientset {
+ // creates the in-cluster config
+ config, err := rest.InClusterConfig()
+ if err != nil {
+ panic(err.Error())
+ }
+ // creates the clientset
+ clientset, err := kubernetes.NewForConfig(config)
+ if err != nil {
+ panic(err.Error())
+ }
+
+ return clientset
+}
+
+
+func connect(addr string) (*grpc.ClientConn, error) {
+ for ctr :=0 ; ctr < 100; ctr++ {
+ log.Debug("Trying to connect to %s", addr)
+ conn, err := grpc.Dial(addr, grpc.WithInsecure())
+ if err != nil {
+ log.Debugf("Attempt to connect failed, retrying %v:", err)
+ } else {
+ log.Debugf("Connection succeeded")
+ return conn,err
+ }
+ time.Sleep(10 * time.Second)
+ }
+ log.Debugf("Too many connection attempts, giving up!")
+ return nil,errors.New("Timeout attempting to conect")
+}
+
+func getRwPods(cs *kubernetes.Clientset, coreFilter * regexp.Regexp) []*rwPod {
+ var rtrn []*rwPod
+
+ pods, err := cs.CoreV1().Pods("").List(metav1.ListOptions{})
+ if err != nil {
+ panic(err.Error())
+ }
+ log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
+
+ for k,v := range pods.Items {
+ if v.Namespace == "voltha" && coreFilter.MatchString(v.Name) {
+ fmt.Printf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
+ v.Status.PodIP, v.Spec.NodeName)
+ //fmt.Printf("Pod %v,%v\n\n\n",k,v)
+ _ = k
+ // Add this pod to the core structure.
+ rtrn = append(rtrn, &rwPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
+ devIds:make(map[string]struct{}), backend:"", connection:""})
+ }
+ }
+ return rtrn
+}
+
+func queryDevices(pods []*rwPod) {
+ for pk,pv := range pods {
+ // Open a connection to the pod
+ // port 50057
+ conn, err := connect(pv.ipAddr+":50057")
+ if (err != nil) {
+ log.Debugf("Could not query devices from %s, could not connect", pv.name)
+ continue
+ }
+ client := vpb.NewVolthaServiceClient(conn)
+ devs,err := client.ListDevices(context.Background(), &empty.Empty{})
+ if err != nil {
+ log.Error(err)
+ conn.Close()
+ continue
+ }
+ for _,dv := range devs.Items {
+ pods[pk].devIds[dv.Id]=struct{}{}
+ }
+ conn.Close()
+ }
+}
+
+func allEmpty(pods []*rwPod) bool {
+ for k,_ := range pods {
+ if len(pods[k].devIds) != 0 {
+ return false
+ }
+ }
+ return true
+}
+
+//func groupEmptyCores(pods []*rwPod) [][]*rwPod {
+// return [][]*rwPod{}
+//}
+
+//func groupPods(pods []*rwPod) [][]*rwPod {
+
+// if allEmpty(pods) == true {
+// return groupEmptyCores(pods)
+// } else {
+// return groupPopulatedCores(pods)
+// }
+//}
+
+func rmPod(pods []*rwPod, idx int) []*rwPod {
+ return append(pods[:idx],pods[idx+1:]...)
+}
+
+func groupIntersectingPods1(pods []*rwPod, podCt int) ([][]*rwPod,[]*rwPod) {
+ var rtrn [][]*rwPod
+ var out []*rwPod
+
+ for {
+ if len(pods) == 0 {
+ break
+ }
+ if len(pods[0].devIds) == 0 { // Ignore pods with no devices
+ ////log.Debugf("%s empty pod", pd[k].pod.name)
+ out = append(out, pods[0])
+ pods = rmPod(pods, 0)
+ continue
+ }
+ // Start a pod group with this pod
+ var grp []*rwPod
+ grp = append(grp, pods[0])
+ pods = rmPod(pods,0)
+ //log.Debugf("Creating new group %s", pd[k].pod.name)
+ // Find the peer pod based on device overlap
+ // It's ok if one isn't found, an empty one will be used instead
+ for k,_ := range pods {
+ if len(pods[k].devIds) == 0 { // Skip pods with no devices
+ //log.Debugf("%s empty pod", pd[k1].pod.name)
+ continue
+ }
+ if intersect(grp[0].devIds, pods[k].devIds) == true {
+ //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
+ if grp[0].node == pods[k].node {
+ // This should never happen
+ log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
+ grp[0].name, pods[k].name)
+ continue
+ }
+ grp = append(grp, pods[k])
+ pods = rmPod(pods, k)
+ break
+
+ }
+ }
+ rtrn = append(rtrn, grp)
+ //log.Debugf("Added group %s", grp[0].name)
+ // Check if the number of groups = half the pods, if so all groups are started.
+ if len(rtrn) == podCt >> 1 {
+ // Append any remaining pods to out
+ out = append(out,pods[0:]...)
+ break
+ }
+ }
+ return rtrn,out
+}
+
+func groupIntersectingPods(pd []*podTrack) ([][]*rwPod,[]*podTrack) {
+ var rtrn [][]*rwPod
+
+ for k,_ := range pd {
+ if pd[k].dn == true { // Already processed?
+ //log.Debugf("%s already processed", pd[k].pod.name)
+ continue
+ }
+ if len(pd[k].pod.devIds) == 0 { // Ignore pods with no devices
+ ////log.Debugf("%s empty pod", pd[k].pod.name)
+ continue
+ }
+ // Start a pod group with this pod
+ var grp []*rwPod
+ grp = append(grp, pd[k].pod)
+ pd[k].dn = true
+ //log.Debugf("Creating new group %s", pd[k].pod.name)
+ // Find the peer pod based on device overlap
+ // It's ok if one isn't found, an empty one will be used instead
+ for k1,_ := range pd {
+ if pd[k1].dn == true { // Skip over eliminated pods
+ //log.Debugf("%s eliminated pod", pd[k1].pod.name)
+ continue
+ }
+ if len(pd[k1].pod.devIds) == 0 { // Skip pods with no devices
+ //log.Debugf("%s empty pod", pd[k1].pod.name)
+ continue
+ }
+ if intersect(pd[k].pod.devIds, pd[k1].pod.devIds) == true {
+ //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
+ if pd[k].pod.node == pd[k1].pod.node {
+ // This should never happen
+ log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
+ pd[k].pod.name, pd[k1].pod.name)
+ continue
+ }
+ pd[k1].dn = true
+ grp = append(grp, pd[k1].pod)
+ break
+ }
+ }
+ rtrn = append(rtrn, grp)
+ //log.Debugf("Added group %s", grp[0].name)
+ // Check if the number of groups = half the pods, if so all groups are started.
+ if len(rtrn) == len(pd) >> 1 {
+ break
+ }
+ }
+ return rtrn,pd
+}
+
+func unallocPodCount(pd []*podTrack) int {
+ var rtrn int = 0
+ for _,v := range pd {
+ if v.dn == false {
+ rtrn++
+ }
+ }
+ return rtrn
+}
+
+
+func sameNode(pod *rwPod, grps [][]*rwPod) bool {
+ for _,v := range grps {
+ if v[0].node == pod.node {
+ return true
+ }
+ if len(v) == 2 && v[1].node == pod.node {
+ return true
+ }
+ }
+ return false
+}
+
+func startRemainingGroups1(grps [][]*rwPod, pods []*rwPod, podCt int) ([][]*rwPod, []*rwPod) {
+ var grp []*rwPod
+
+ for k,_ := range pods {
+ if sameNode(pods[k], grps) {
+ continue
+ }
+ grp = []*rwPod{}
+ grp = append(grp, pods[k])
+ pods = rmPod(pods, k)
+ grps = append(grps, grp)
+ if len(grps) == podCt >> 1 {
+ break
+ }
+ }
+ return grps, pods
+}
+
+func startRemainingGroups(grps [][]*rwPod, pd []*podTrack) ([][]*rwPod, []*podTrack) {
+ var grp []*rwPod
+
+ for k,_ := range pd {
+ if sameNode(pd[k].pod, grps) == true {
+ continue
+ }
+ grp = append(grp, pd[k].pod)
+ grps = append(grps, grp)
+ pd[k].dn = true
+ if len(grps) == len(pd) >> 1 {
+ break
+ }
+ }
+ return grps, pd
+}
+
+func hasSingleSecondNode(grp []*rwPod) bool {
+ var srvrs map[string]struct{} = make(map[string]struct{})
+ for k,_ := range grp {
+ if k == 0 {
+ continue // Ignore the first item
+ }
+ srvrs[grp[k].node] = struct{}{}
+ }
+ if len(srvrs) == 1 {
+ return true
+ }
+ return false
+}
+
+func addNode(grps [][]*rwPod, idx *rwPod, item *rwPod) [][]*rwPod {
+ for k,_ := range grps {
+ if grps[k][0].name == idx.name {
+ grps[k] = append(grps[k], item)
+ return grps
+ }
+ }
+ // TODO: Error checking required here.
+ return grps
+}
+
+func removeNode(grps [][]*rwPod, item *rwPod) [][]*rwPod {
+ for k,_ := range grps {
+ for k1,_ := range grps[k] {
+ if grps[k][k1].name == item.name {
+ grps[k] = append(grps[k][:k1],grps[k][k1+1:]...)
+ break
+ }
+ }
+ }
+ return grps
+}
+
+func groupRemainingPods1(grps [][]*rwPod, pods []*rwPod) [][]*rwPod {
+ var lgrps [][]*rwPod
+ // All groups must be started when this function is called.
+ // Copy incomplete groups
+ for k,_ := range grps {
+ if len(grps[k]) != 2 {
+ lgrps = append(lgrps, grps[k])
+ }
+ }
+
+ // Add all pairing candidates to each started group.
+ for k,_ := range pods {
+ for k2,_ := range lgrps {
+ if lgrps[k2][0].node != pods[k].node {
+ lgrps[k2] = append(lgrps[k2], pods[k])
+ }
+ }
+ }
+
+ //TODO: If any member of lgrps doesn't have at least 2
+ // nodes something is wrong. Check for that here
+
+ for {
+ for { // Address groups with only a single server choice
+ var ssn bool = false
+
+ for k,_ := range lgrps {
+ // Now if any of the groups only have a single
+ // node as the choice for the second member
+ // address that one first.
+ if hasSingleSecondNode(lgrps[k]) == true {
+ ssn = true
+ // Add this pairing to the groups
+ grps = addNode(grps, lgrps[k][0], lgrps[k][1])
+ // Since this node is now used, remove it from all
+ // remaining tenative groups
+ lgrps = removeNode(lgrps, lgrps[k][1])
+ // Now remove this group completely since
+ // it's been addressed
+ lgrps = append(lgrps[:k],lgrps[k+1:]...)
+ break
+ }
+ }
+ if ssn == false {
+ break
+ }
+ }
+ // Now adress one of the remaining groups
+ if len(lgrps) == 0 {
+ break // Nothing left to do, exit the loop
+ }
+ grps = addNode(grps, lgrps[0][0], lgrps[0][1])
+ lgrps = removeNode(lgrps, lgrps[0][1])
+ lgrps = append(lgrps[:0],lgrps[1:]...)
+ }
+ return grps
+}
+
+func groupRemainingPods(grps [][]*rwPod, pd []*podTrack) [][]*rwPod{
+ var lgrps [][]*rwPod
+ // All groups must be started when this function is called.
+ // Copy incomplete groups
+ for k,_ := range grps {
+ if len(grps[k]) != 2 {
+ lgrps = append(lgrps, grps[k])
+ }
+ }
+
+ // Add all pairing candidates to each started group.
+ for k,_ := range pd {
+ if pd[k].dn == true {
+ continue
+ }
+ for k2,_ := range lgrps {
+ if lgrps[k2][0].node != pd[k].pod.node {
+ lgrps[k2] = append(lgrps[k2], pd[k].pod)
+ }
+ }
+ }
+
+ //TODO: If any member of lgrps doesn't have at least 2
+ // nodes something is wrong. Check for that here
+
+ for {
+ for { // Address groups with only a single server choice
+ var ssn bool = false
+
+ for k,_ := range lgrps {
+ // Now if any of the groups only have a single
+ // node as the choice for the second member
+ // address that one first.
+ if hasSingleSecondNode(lgrps[k]) == true {
+ ssn = true
+ // Add this pairing to the groups
+ grps = addNode(grps, lgrps[k][0], lgrps[k][1])
+ // Since this node is now used, remove it from all
+ // remaining tenative groups
+ lgrps = removeNode(lgrps, lgrps[k][1])
+ // Now remove this group completely since
+ // it's been addressed
+ lgrps = append(lgrps[:k],lgrps[k+1:]...)
+ break
+ }
+ }
+ if ssn == false {
+ break
+ }
+ }
+ // Now adress one of the remaining groups
+ if len(lgrps) == 0 {
+ break // Nothing left to do, exit the loop
+ }
+ grps = addNode(grps, lgrps[0][0], lgrps[0][1])
+ lgrps = removeNode(lgrps, lgrps[0][1])
+ lgrps = append(lgrps[:0],lgrps[1:]...)
+ }
+ return grps
+}
+
+func groupPods1(pods []*rwPod) [][]*rwPod {
+ var rtrn [][]*rwPod
+ var podCt int = len(pods)
+
+ rtrn,pods = groupIntersectingPods1(pods, podCt)
+ // There are several outcomes here
+ // 1) All pods have been paired and we're done
+ // 2) Some un-allocated pods remain
+ // 2.a) All groups have been started
+ // 2.b) Not all groups have been started
+ if len(pods) == 0 {
+ return rtrn
+ } else if len(rtrn) == podCt >> 1 { // All groupings started
+ // Allocate the remaining (presumably empty) pods to the started groups
+ return groupRemainingPods1(rtrn, pods)
+ } else { // Some groupings started
+ // Start empty groups with remaining pods
+ // each grouping is on a different server then
+ // allocate remaining pods.
+ rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
+ return groupRemainingPods1(rtrn, pods)
+ }
+}
+
+func groupPods(pods []*rwPod) [][]*rwPod {
+ var rtrn [][]*rwPod
+ var pd []*podTrack
+
+ // Tracking of the grouping process
+ for k,_ := range pods {
+ pd = append(pd, &podTrack{pods[k],false})
+ }
+
+
+ rtrn,pd = groupIntersectingPods(pd)
+ // There are several outcomes here
+ // 1) All pods have been paired and we're done
+ // 2) Some un-allocated pods remain
+ // 2.a) All groups have been started
+ // 2.b) Not all groups have been started
+ if unallocPodCount(pd) == 0 {
+ return rtrn
+ } else if len(rtrn) == len(pd) >> 1 { // All groupings started
+ // Allocate the remaining (presumably empty) pods to the started groups
+ return groupRemainingPods(rtrn, pd)
+ } else { // Some groupings started
+ // Start empty groups with remaining pods
+ // each grouping is on a different server then
+ // allocate remaining pods.
+ rtrn, pd = startRemainingGroups(rtrn, pd)
+ return groupRemainingPods(rtrn, pd)
+ }
+
+
+ // Establish groupings of non-empty pods that have overlapping devices.
+ for k,_ := range pd {
+ if pd[k].dn == true { // Already processed?
+ //log.Debugf("%s already processed", pd[k].pod.name)
+ continue
+ }
+ if len(pd[k].pod.devIds) == 0 { // Ignore pods with no devices
+ ////log.Debugf("%s empty pod", pd[k].pod.name)
+ continue
+ }
+ // Start a pod group with this pod
+ var grp []*rwPod
+ grp = append(grp, pd[k].pod)
+ pd[k].dn = true
+ //log.Debugf("Creating new group %s", pd[k].pod.name)
+ // Find the peer pod based on device overlap
+ // It's ok if one isn't found, an empty one will be used instead
+ for k1,_ := range pd {
+ if pd[k1].dn == true { // Skip over eliminated pods
+ //log.Debugf("%s eliminated pod", pd[k1].pod.name)
+ continue
+ }
+ if len(pd[k1].pod.devIds) == 0 { // Skip pods with no devices
+ //log.Debugf("%s empty pod", pd[k1].pod.name)
+ continue
+ }
+ if intersect(pd[k].pod.devIds, pd[k1].pod.devIds) == true {
+ //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
+ pd[k1].dn = true
+ grp = append(grp, pd[k1].pod)
+ break
+ }
+ }
+ rtrn = append(rtrn, grp)
+ //log.Debugf("Added group %s", grp[0].name)
+ }
+ // Now find any grouping without 2 members and assign one of the
+ // pods with no devices and on a different server to it.
+ // If there are no pods with no devices left before all
+ // groups are filled report an exception but leave one of the
+ // groups with only one pod.
+ for k,_ := range rtrn {
+ if len(rtrn[k]) < 2 {
+ for k2,_ := range pd {
+ if pd[k2].dn == true {
+ continue
+ }
+ // There should be only empty pods here
+ if len(pd[k2].pod.devIds) != 0 {
+ log.Error("Non empty pod found where empty pod was expected")
+ continue
+ }
+ if pd[k2].pod.node == rtrn[k][0].node {
+ //log.Error("Pods aren't on different servers, continuing")
+ continue
+ }
+ // Add this empty and unused pod to the group
+ //log.Debugf("Adding empty pod %s", pd[k2].pod.name)
+ rtrn[k] = append(rtrn[k], pd[k2].pod)
+ pd[k2].dn = true
+ break
+ }
+ }
+ }
+ return rtrn
+}
+
+func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
+ for k,_ := range d1 {
+ if _,ok := d2[k]; ok == true {
+ return true
+ }
+ }
+ return false
+}
+
+func setConnection(client pb.ConfigurationClient, backend string, connection string, addr string, port uint64) {
+ log.Debugf("Configuring backend %s : connection %s\n\n", backend, connection)
+ cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:backend,
+ Connection:connection,Addr:addr,
+ Port:port}
+ if res, err := client.SetConnection(context.Background(), cnf); err != nil {
+ log.Debugf("failed SetConnection RPC call: %s", err)
+ } else {
+ log.Debugf("Result: %v", res)
+ }
+}
+
+func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
+ log.Debugf("Configuring backend %s : affinities \n", backend)
+ aff := &pb.Affinity{Router:"vcore",Route:"dev_manager",Cluster:"vcore",Backend:backend}
+ for k,_ := range ids {
+ log.Debugf("Setting affinity for id %s", k)
+ aff.Id = k
+ if res, err := client.SetAffinity(context.Background(), aff); err != nil {
+ log.Debugf("failed affinity RPC call: %s", err)
+ } else {
+ log.Debugf("Result: %v", res)
+ }
+ }
+}
+
+func monitorDiscovery(client pb.ConfigurationClient,
+ ch <-chan *ic.InterContainerMessage) {
+ select {
+ case msg := <-ch:
+ log.Debugf("Received a device discovery notification")
+ _ = msg
+ requestBody := &ic.InterContainerRequestBody{}
+ if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
+ log.Errorf("Could not unmarshal received notification %v", msg)
+ } else {
+ // Do something with the message here
+ }
+ break
+ }
+}
+
+func startDiscoveryMonitor(client pb.ConfigurationClient) error {
+ var ch <-chan *ic.InterContainerMessage
+ // Connect to kafka for discovery events
+ topic := &kafka.Topic{Name: "AffinityRouter"}
+ kc,err := newKafkaClient("sarama", "kafka", 9092, "arouterd")
+ kc.Start()
+
+ if ch, err = kc.Subscribe(topic); err != nil {
+ log.Error("Could not subscribe to the 'AffinityRouter' channel, discovery disabled")
+ return err
+ }
+ go monitorDiscovery(client, ch)
+ return nil
+}
+
+func startCoreMonitor(client pb.ConfigurationClient,
+ clientset *kubernetes.Clientset,
+ coreFltr *regexp.Regexp,
+ coreGroups [][]*rwPod) error {
+ // Now that initial allocation has been completed, monitor the pods
+ // for IP changes
+ // The main loop needs to do the following:
+ // 1) Periodically query the pods and filter out
+ // the vcore ones
+ // 2) Validate that the pods running are the same
+ // as the previous check
+ // 3) Validate that the IP addresses are the same
+ // as the last check.
+ // If the pod name(s) ha(s/ve) changed then remove
+ // the unused pod names and add in the new pod names
+ // maintaining the cluster/backend information.
+ // If an IP address has changed (which shouldn't
+ // happen unless a pod is re-started) it should get
+ // caught by the pod name change.
+ for {
+ time.Sleep(10 * time.Second) // Wait a while
+ // Get the rw core list from k8s
+ rwPods := getRwPods(clientset, coreFltr)
+ // If we didn't get 2n+1 pods then wait since
+ // something is down and will hopefully come
+ // back up at some point.
+ // TODO: remove the 6 pod hardcoding
+ if len(rwPods) != 6 {
+ continue
+ }
+ // We have all pods, check if any IP addresses
+ // have changed.
+ for _,v := range rwPods {
+ //if hasIpAddr(coreGroups, v.ipAddr) == false {
+ //log.Debug("Address has changed...")
+ //applyAddrDiffs(coreGroups, rwPods)
+ //}
+ _ = v
+ }
+ }
+
+}
+
+func main() {
+ // This is currently hard coded to a cluster with 3 servers
+ //var connections map[string]configConn = make(map[string]configConn)
+ //var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
+ var rwCoreNodesPrev map[string][]rwPod = make(map[string][]rwPod)
+ var firstTime bool = true
+ var err error
+ var conn *grpc.ClientConn
+
+
+ // Set up the regular expression to identify the voltha cores
+ coreFltr := regexp.MustCompile(`rw-core[0-9]-`)
+
+ // Set up logging
+ if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+
+ // Set up kubernetes api
+ clientset := k8sClientSet()
+
+ // Connect to the affinity router and set up the client
+ conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
+ if err != nil {
+ panic(err.Error())
+ }
+ client := pb.NewConfigurationClient(conn)
+
+ // Get the voltha rw-core podes
+ rwPods := getRwPods(clientset, coreFltr)
+
+ // Fetch the devices held by each running core
+ queryDevices(rwPods)
+
+ // For debugging... comment out l8r
+ for _,v := range rwPods {
+ log.Debugf("Pod list %v", *v)
+ }
+
+ coreGroups := groupPods1(rwPods)
+
+
+ // Assign the groupings to the the backends and connections
+ for k,_ := range coreGroups {
+ for k1,_ := range coreGroups[k] {
+ coreGroups[k][k1].backend = "vcore"+strconv.Itoa(k+1)
+ coreGroups[k][k1].connection = "vcore"+strconv.Itoa(k+1)+strconv.Itoa(k1+1)
+ }
+ }
+ log.Debug("Core gouping completed")
+
+ // TODO: Debugging code, comment out for production
+ for k,v := range coreGroups {
+ for k2,v2 := range v {
+ log.Debugf("Core group %d,%d: %v", k, k2, v2)
+ }
+ }
+ log.Debug("Setting affinities")
+ // Now set the affinities for exising devices in the cores
+ for _,v := range coreGroups {
+ setAffinity(client, v[0].devIds, v[0].backend)
+ setAffinity(client, v[1].devIds, v[1].backend)
+ }
+ log.Debug("Setting connections")
+ // Configure the backeds based on the calculated core groups
+ for _,v := range coreGroups {
+ setConnection(client, v[0].backend, v[0].connection, v[0].ipAddr, 50057)
+ setConnection(client, v[1].backend, v[1].connection, v[1].ipAddr, 50057)
+ }
+
+ log.Debug("Starting discovery monitoring")
+ startDiscoveryMonitor(client)
+
+ log.Debugf("Starting core monitoring")
+ startCoreMonitor(client, clientset, coreFltr, coreGroups) // Never returns
+ return
+
+
+ // The main loop needs to do the following:
+ // 1) Periodically query the pods and filter out
+ // the vcore ones
+ // 2) Validate that the pods running are the same
+ // as the previous check
+ // 3) Validate that the IP addresses are the same
+ // as the last check.
+ // If the pod name(s) ha(s/ve) changed then remove
+ // the unused pod names and add in the new pod names
+ // maintaining the cluster/backend information.
+ // If an IP address has changed (which shouldn't
+ // happen unless a pod is re-started) it should get
+ // caught by the pod name change.
+ for {
+ var rwCorePods map[string]rwPod = make(map[string]rwPod)
+ var rwCoreNodes map[string][]rwPod = make(map[string][]rwPod)
+ pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
+ if err != nil {
+ panic(err.Error())
+ }
+ log.Debugf("There are %d pods in the cluster\n", len(pods.Items))
+
+ /*
+ for k,v := range pods.Items {
+ if v.Namespace == "voltha" && coreFltr.MatchString(v.Name) {
+ fmt.Printf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
+ v.Status.PodIP, v.Spec.NodeName)
+ //fmt.Printf("Pod %v,%v\n\n\n",k,v)
+ _ = k
+ // Add this pod to the core structure.
+ if firstTime == true {
+ rwCorePodsPrev[v.Name] = rwPod{name:v.Name,node:v.Spec.NodeName}
+ rwCoreNodesPrev[v.Spec.NodeName] =
+ append(rwCoreNodesPrev[v.Spec.NodeName], rwPod{name:v.Name,node:v.Spec.NodeName})
+ }
+ rwCorePods[v.Name] = rwPod{v.Name,v.Status.PodIP,v.Spec.NodeName, "", ""}
+ rwCoreNodes[v.Spec.NodeName] =
+ append(rwCoreNodes[v.Spec.NodeName], rwPod{v.Name,v.Status.PodIP,v.Spec.NodeName,"",""})
+ }
+ }
+ */
+
+ if len(rwCorePods) != 6 {
+ continue
+ }
+
+ //fmt.Printf("Pod map: %v\n", rwCorePods)
+ //fmt.Printf("Pod map2: %v\n", rwCoreNodes)
+
+ // Examples for error handling:
+ // - Use helper functions like e.g. errors.IsNotFound()
+ // - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
+ /*
+ _, err = clientset.CoreV1().Pods("default").Get("example-xxxxx", metav1.GetOptions{})
+ if errors.IsNotFound(err) {
+ fmt.Printf("Pod not found\n")
+ } else if statusError, isStatus := err.(*errors.StatusError); isStatus {
+ fmt.Printf("Error getting pod %v\n", statusError.ErrStatus.Message)
+ } else if err != nil {
+ panic(err.Error())
+ } else {
+ fmt.Printf("Found pod\n")
+ }
+ */
+ // Set the association to backends and connections only once.
+ // TODO: This needs to be reworked for when a pod crashes
+ // and it's name changes.
+ if firstTime == true {
+ be := 1
+ for k,_ := range rwCoreNodesPrev { // Each node has 2 cores running on it
+ // Use a pretty dumb distribution algorithm.
+ log.Debugf("Processing core node %s:%d\n", k,be)
+ rwCoreNodesPrev[k][0].backend = "vcore"+strconv.Itoa(be)
+ rwCoreNodesPrev[k][0].connection = "vcore"+strconv.Itoa(be)+strconv.Itoa(1)
+ rwCoreNodesPrev[k][1].backend = "vcore"+strconv.Itoa(be%3+1)
+ rwCoreNodesPrev[k][1].connection = "vcore"+strconv.Itoa(be%3+1)+strconv.Itoa(2)
+ be++
+ }
+ }
+
+ log.Debugf("Backend Allocation: %v",rwCoreNodesPrev)
+ // Compare the current node IPs with the previous node IPs and if they differ
+ // then set the new one and send the command to configure the router with the
+ // new backend connection.
+ for k,v := range rwCoreNodesPrev {
+ if rwCoreNodes[k][0].ipAddr != rwCoreNodesPrev[k][0].ipAddr {
+ log.Debugf("Configuring backend %s : connection %s\n\n", v[0].backend, v[0].connection)
+ cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:rwCoreNodesPrev[k][0].backend,
+ Connection:rwCoreNodesPrev[k][0].connection,Addr:rwCoreNodes[k][0].ipAddr,
+ Port:50057}
+ if res, err := client.SetConnection(context.Background(), cnf); err != nil {
+ log.Debugf("failed SetConnection RPC call: %s", err)
+ } else {
+ log.Debugf("Result: %v", res)
+ rwCoreNodesPrev[k][0].ipAddr = rwCoreNodes[k][0].ipAddr
+ }
+ }
+ if rwCoreNodes[k][1].ipAddr != rwCoreNodesPrev[k][1].ipAddr {
+ log.Debugf("Configuring backend %s : connection %s\n\n", v[1].backend, v[1].connection)
+ cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:rwCoreNodesPrev[k][1].backend,
+ Connection:rwCoreNodesPrev[k][1].connection,Addr:rwCoreNodes[k][1].ipAddr,
+ Port:50057}
+ if res, err := client.SetConnection(context.Background(), cnf); err != nil {
+ log.Debugf("failed SetConnection RPC call: %s", err)
+ } else {
+ log.Debugf("Result: %v", res)
+ rwCoreNodesPrev[k][1].ipAddr = rwCoreNodes[k][1].ipAddr
+ }
+ }
+ }
+
+
+ fmt.Printf("The structure for setting the connections is: %v\n", rwCoreNodesPrev)
+ firstTime = false
+
+ // Now make the API calls
+ time.Sleep(10 * time.Second)
+ }
+ conn.Close()
+
+}
+
diff --git a/k8s/affinity-router.yml b/k8s/affinity-router.yml
index 28c2623..a92e442 100644
--- a/k8s/affinity-router.yml
+++ b/k8s/affinity-router.yml
@@ -12,41 +12,45 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-apiVersion: v1
-kind: Pod
+apiVersion: apps/v1beta1
+kind: Deployment
metadata:
name: afrouter
namespace: voltha
- labels:
- app: afrouter
- annotations:
- cni: "calico"
spec:
- containers:
- - name: arouter
- image: volthacore/afrouter:testing
- imagePullPolicy: Always
- volumeMounts:
- - name: config-volume
- mountPath: /app/config
- ports:
- - containerPort: 55555
- command: ["/app/afrouter"]
- args: ["-config", "/app/config/arouter.voltha3.json"]
- - name: envoy
- image: volthacore/envoy
- volumeMounts:
- - name: config-volume
- mountPath: /envoy/config
- ports:
- - containerPort: 8192
- - containerPort: 50555
- - name: arouterd
- image: volthacore/afrouterd:testing
- command: ["/app/arouterd"]
- imagePullPolicy: Always
- restartPolicy: Never
- volumes:
- - name: config-volume
- configMap:
- name: afrouter-config
+ replicas: 1
+ template:
+ metadata:
+ labels:
+ app: afrouter
+ annotations:
+ cni: "calico"
+ spec:
+ containers:
+ - name: arouter
+ image: volthacore/afrouter:testing
+ imagePullPolicy: Always
+ volumeMounts:
+ - name: config-volume
+ mountPath: /app/config
+ ports:
+ - containerPort: 55555
+ command: ["/app/afrouter"]
+ args: ["-config", "/app/config/arouter.voltha3.json"]
+ - name: envoy
+ image: volthacore/envoy
+ volumeMounts:
+ - name: config-volume
+ mountPath: /envoy/config
+ ports:
+ - containerPort: 8192
+ - containerPort: 50555
+ - name: arouterd
+ image: volthacore/afrouterd:testing
+ command: ["/app/arouterd"]
+ imagePullPolicy: Always
+ restartPolicy: Always
+ volumes:
+ - name: config-volume
+ configMap:
+ name: afrouter-config