blob: 6a86c64dafbe0b011e2104459e2b88dd922c318b [file] [log] [blame]
Scott Bakere702d122019-10-22 11:54:12 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouterd
18
19import (
20 "fmt"
21 "github.com/golang/protobuf/ptypes/empty"
22 "github.com/opencord/voltha-go/common/log"
23 pb "github.com/opencord/voltha-protos/go/afrouter"
24 cmn "github.com/opencord/voltha-protos/go/common"
25 vpb "github.com/opencord/voltha-protos/go/voltha"
26 "golang.org/x/net/context"
27 "k8s.io/api/core/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/client-go/kubernetes"
30 "time"
31)
32
33func getVolthaPods(cs *kubernetes.Clientset) ([]*volthaPod, error) {
34 pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{LabelSelector: podLabelSelector})
35 if err != nil {
36 return nil, err
37 }
38
39 var rwPods []*volthaPod
40items:
41 for _, v := range pods.Items {
42 // only pods that are actually running should be considered
43 if v.Status.Phase == v1.PodRunning {
44 for _, condition := range v.Status.Conditions {
45 if condition.Status != v1.ConditionTrue {
46 continue items
47 }
48 }
49
50 if group, have := v.Labels[podAffinityGroupLabel]; have {
51 log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name, v.Status.PodIP, v.Spec.NodeName)
52 rwPods = append(rwPods, &volthaPod{
53 name: v.Name,
54 ipAddr: v.Status.PodIP,
55 node: v.Spec.NodeName,
56 devIds: make(map[string]struct{}),
57 backend: afrouterRWClusterName + group,
58 })
59 } else {
60 log.Warnf("Pod %s found matching % without label %", v.Name, podLabelSelector, podAffinityGroupLabel)
61 }
62 }
63 }
64 return rwPods, nil
65}
66
67func reconcilePodDeviceIds(ctx context.Context, pod *volthaPod, ids map[string]struct{}) {
68 ctxTimeout, _ := context.WithTimeout(ctx, time.Second*5)
69 conn, err := Connect(ctxTimeout, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
70 if err != nil {
71 log.Debugf("Could not reconcile devices from %s, could not connect: %s", pod.name, err)
72 return
73 }
74 defer conn.Close()
75
76 var idList cmn.IDs
77 for k := range ids {
78 idList.Items = append(idList.Items, &cmn.ID{Id: k})
79 }
80
81 client := vpb.NewVolthaServiceClient(conn)
82 _, err = client.ReconcileDevices(ctx, &idList)
83 if err != nil {
84 log.Errorf("Attempt to reconcile ids on pod %s failed: %s", pod.name, err)
85 return
86 }
87}
88
89func queryPodDeviceIds(ctx context.Context, pod *volthaPod) map[string]struct{} {
90 ctxTimeout, _ := context.WithTimeout(ctx, time.Second*5)
91 conn, err := Connect(ctxTimeout, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
92 if err != nil {
93 log.Debugf("Could not query devices from %s, could not connect: %s", pod.name, err)
94 return nil
95 }
96 defer conn.Close()
97
98 client := vpb.NewVolthaServiceClient(conn)
99 devs, err := client.ListDeviceIds(ctx, &empty.Empty{})
100 if err != nil {
101 log.Error(err)
102 return nil
103 }
104
105 var ret = make(map[string]struct{})
106 for _, dv := range devs.Items {
107 ret[dv.Id] = struct{}{}
108 }
109 return ret
110}
111
112// coreMonitor polls the list of devices from all RW cores, pushes these devices
113// into the affinity router, and ensures that all cores in a backend have their devices synced
114func CoreMonitor(ctx context.Context, client pb.ConfigurationClient, clientset *kubernetes.Clientset) {
115 // map[backend]map[deviceId]struct{}
116 deviceOwnership := make(map[string]map[string]struct{})
117loop:
118 for {
119 // get the rw core list from k8s
120 rwPods, err := getVolthaPods(clientset)
121 if err != nil {
122 log.Error(err)
123 continue
124 }
125
126 // for every pod
127 for _, pod := range rwPods {
128 // get the devices for this pod's backend
129 devices, have := deviceOwnership[pod.backend]
130 if !have {
131 devices = make(map[string]struct{})
132 deviceOwnership[pod.backend] = devices
133 }
134
135 coreDevices := queryPodDeviceIds(ctx, pod)
136
137 // handle devices that exist in the core, but we have just learned about
138 for deviceId := range coreDevices {
139 // if there's a new device
140 if _, have := devices[deviceId]; !have {
141 // add the device to our local list
142 devices[deviceId] = struct{}{}
143 // push the device into the affinity router
144 setAffinity(ctx, client, deviceId, pod.backend)
145 }
146 }
147
148 // ensure that the core knows about all devices in its backend
149 toSync := make(map[string]struct{})
150 for deviceId := range devices {
151 // if the pod is missing any devices
152 if _, have := coreDevices[deviceId]; !have {
153 // we will reconcile them
154 toSync[deviceId] = struct{}{}
155 }
156 }
157
158 if len(toSync) != 0 {
159 reconcilePodDeviceIds(ctx, pod, toSync)
160 }
161 }
162
163 select {
164 case <-ctx.Done():
165 // if we're done, exit
166 break loop
167 case <-time.After(10 * time.Second): // wait a while
168 }
169 }
170}