blob: 44b8f53df683aa40c1d0a375240a2555f9b3d298 [file] [log] [blame]
sslobodr16e41bc2019-01-18 16:22:21 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package main
18
19import (
sslobodr16e41bc2019-01-18 16:22:21 -050020 "errors"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040021 "regexp"
sslobodr16e41bc2019-01-18 16:22:21 -050022 "strconv"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040023 "time"
sslobodr16e41bc2019-01-18 16:22:21 -050024
sslobodr16e41bc2019-01-18 16:22:21 -050025 "github.com/golang/protobuf/ptypes"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040026 empty "github.com/golang/protobuf/ptypes/empty"
sslobodr16e41bc2019-01-18 16:22:21 -050027 "github.com/opencord/voltha-go/common/log"
28 kafka "github.com/opencord/voltha-go/kafka"
William Kurkiandaa6bb22019-03-07 12:26:28 -050029 pb "github.com/opencord/voltha-protos/go/afrouter"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040030 cmn "github.com/opencord/voltha-protos/go/common"
William Kurkiandaa6bb22019-03-07 12:26:28 -050031 ic "github.com/opencord/voltha-protos/go/inter_container"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040032 vpb "github.com/opencord/voltha-protos/go/voltha"
33 "golang.org/x/net/context"
34 "google.golang.org/grpc"
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 "k8s.io/client-go/kubernetes"
37 "k8s.io/client-go/rest"
sslobodr16e41bc2019-01-18 16:22:21 -050038)
39
40type configConn struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040041 Server string `json:"Server"`
42 Cluster string `json:"Cluster"`
43 Backend string `json:"Backend"`
sslobodr16e41bc2019-01-18 16:22:21 -050044 connections map[string]connection
45}
46
47type connection struct {
48 Name string `json:"Connection"`
49 Addr string `json:"Addr"`
50 Port uint64 `json:"Port"`
51}
52
sslobodr8e2ccb52019-02-05 09:21:47 -050053type volthaPod struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040054 name string
55 ipAddr string
56 node string
57 devIds map[string]struct{}
58 cluster string
59 backend string
sslobodr16e41bc2019-01-18 16:22:21 -050060 connection string
61}
62
63type podTrack struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040064 pod *volthaPod
65 dn bool
sslobodr16e41bc2019-01-18 16:22:21 -050066}
67
sslobodre7ce71d2019-01-22 16:21:45 -050068var nPods int = 6
69
sslobodr16e41bc2019-01-18 16:22:21 -050070// Topic is affinityRouter
71// port: 9092
72
73func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
74
75 log.Infow("kafka-client-type", log.Fields{"client": clientType})
76 switch clientType {
77 case "sarama":
78 return kafka.NewSaramaClient(
79 kafka.Host(host),
80 kafka.Port(port),
81 kafka.ConsumerType(kafka.GroupCustomer),
82 kafka.ProducerReturnOnErrors(true),
83 kafka.ProducerReturnOnSuccess(true),
84 kafka.ProducerMaxRetries(6),
85 kafka.NumPartitions(3),
86 kafka.ConsumerGroupName(instanceID),
87 kafka.ConsumerGroupPrefix(instanceID),
88 kafka.AutoCreateTopic(false),
89 kafka.ProducerFlushFrequency(5),
90 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
91 }
92 return nil, errors.New("unsupported-client-type")
93}
94
sslobodr16e41bc2019-01-18 16:22:21 -050095func k8sClientSet() *kubernetes.Clientset {
96 // creates the in-cluster config
97 config, err := rest.InClusterConfig()
98 if err != nil {
99 panic(err.Error())
100 }
101 // creates the clientset
102 clientset, err := kubernetes.NewForConfig(config)
103 if err != nil {
104 panic(err.Error())
105 }
106
107 return clientset
108}
109
sslobodr16e41bc2019-01-18 16:22:21 -0500110func connect(addr string) (*grpc.ClientConn, error) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400111 for ctr := 0; ctr < 100; ctr++ {
sslobodre7ce71d2019-01-22 16:21:45 -0500112 log.Debugf("Trying to connect to %s", addr)
sslobodr16e41bc2019-01-18 16:22:21 -0500113 conn, err := grpc.Dial(addr, grpc.WithInsecure())
114 if err != nil {
115 log.Debugf("Attempt to connect failed, retrying %v:", err)
116 } else {
117 log.Debugf("Connection succeeded")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400118 return conn, err
sslobodr16e41bc2019-01-18 16:22:21 -0500119 }
120 time.Sleep(10 * time.Second)
121 }
122 log.Debugf("Too many connection attempts, giving up!")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400123 return nil, errors.New("Timeout attempting to conect")
sslobodr16e41bc2019-01-18 16:22:21 -0500124}
125
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400126func getVolthaPods(cs *kubernetes.Clientset, coreFilter *regexp.Regexp) []*volthaPod {
sslobodr8e2ccb52019-02-05 09:21:47 -0500127 var rtrn []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500128
129 pods, err := cs.CoreV1().Pods("").List(metav1.ListOptions{})
130 if err != nil {
131 panic(err.Error())
132 }
sslobodre7ce71d2019-01-22 16:21:45 -0500133 //log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
sslobodr16e41bc2019-01-18 16:22:21 -0500134
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400135 for _, v := range pods.Items {
sslobodr16e41bc2019-01-18 16:22:21 -0500136 if v.Namespace == "voltha" && coreFilter.MatchString(v.Name) {
sslobodre7ce71d2019-01-22 16:21:45 -0500137 log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400138 v.Status.PodIP, v.Spec.NodeName)
sslobodre7ce71d2019-01-22 16:21:45 -0500139 // Only add the pod if it has an IP address. If it doesn't then it likely crashed and
140 // and is still in the process of getting re-started.
141 if v.Status.PodIP != "" {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400142 rtrn = append(rtrn, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName,
143 devIds: make(map[string]struct{}), backend: "", connection: ""})
sslobodre7ce71d2019-01-22 16:21:45 -0500144 }
sslobodr16e41bc2019-01-18 16:22:21 -0500145 }
146 }
147 return rtrn
148}
149
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400150func reconcilePodDeviceIds(pod *volthaPod, ids map[string]struct{}) bool {
sslobodre7ce71d2019-01-22 16:21:45 -0500151 var idList cmn.IDs
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400152 for k, _ := range ids {
153 idList.Items = append(idList.Items, &cmn.ID{Id: k})
sslobodre7ce71d2019-01-22 16:21:45 -0500154 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400155 conn, err := connect(pod.ipAddr + ":50057")
sslobodre7ce71d2019-01-22 16:21:45 -0500156 defer conn.Close()
sslobodr6c1689c2019-01-24 07:31:15 -0500157 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500158 log.Debugf("Could not query devices from %s, could not connect", pod.name)
159 return false
160 }
161 client := vpb.NewVolthaServiceClient(conn)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400162 _, err = client.ReconcileDevices(context.Background(), &idList)
sslobodre7ce71d2019-01-22 16:21:45 -0500163 if err != nil {
164 log.Error(err)
165 return false
166 }
167
168 return true
169}
170
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400171func queryPodDeviceIds(pod *volthaPod) map[string]struct{} {
sslobodre7ce71d2019-01-22 16:21:45 -0500172 var rtrn map[string]struct{} = make(map[string]struct{})
173 // Open a connection to the pod
174 // port 50057
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400175 conn, err := connect(pod.ipAddr + ":50057")
sslobodr6c1689c2019-01-24 07:31:15 -0500176 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500177 log.Debugf("Could not query devices from %s, could not connect", pod.name)
178 return rtrn
179 }
180 defer conn.Close()
181 client := vpb.NewVolthaServiceClient(conn)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400182 devs, err := client.ListDeviceIds(context.Background(), &empty.Empty{})
sslobodre7ce71d2019-01-22 16:21:45 -0500183 if err != nil {
184 log.Error(err)
185 return rtrn
186 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400187 for _, dv := range devs.Items {
188 rtrn[dv.Id] = struct{}{}
sslobodre7ce71d2019-01-22 16:21:45 -0500189 }
190
191 return rtrn
192}
193
sslobodr8e2ccb52019-02-05 09:21:47 -0500194func queryDeviceIds(pods []*volthaPod) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400195 for pk, _ := range pods {
sslobodre7ce71d2019-01-22 16:21:45 -0500196 // Keep the old Id list if a new list is not returned
197 if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
198 pods[pk].devIds = idList
sslobodr16e41bc2019-01-18 16:22:21 -0500199 }
sslobodr16e41bc2019-01-18 16:22:21 -0500200 }
201}
202
sslobodr8e2ccb52019-02-05 09:21:47 -0500203func allEmpty(pods []*volthaPod) bool {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400204 for k, _ := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500205 if len(pods[k].devIds) != 0 {
206 return false
207 }
208 }
209 return true
210}
211
sslobodr8e2ccb52019-02-05 09:21:47 -0500212func rmPod(pods []*volthaPod, idx int) []*volthaPod {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400213 return append(pods[:idx], pods[idx+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500214}
215
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400216func groupIntersectingPods1(pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
sslobodr8e2ccb52019-02-05 09:21:47 -0500217 var rtrn [][]*volthaPod
218 var out []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500219
220 for {
221 if len(pods) == 0 {
222 break
223 }
224 if len(pods[0].devIds) == 0 { // Ignore pods with no devices
225 ////log.Debugf("%s empty pod", pd[k].pod.name)
226 out = append(out, pods[0])
227 pods = rmPod(pods, 0)
228 continue
229 }
230 // Start a pod group with this pod
sslobodr8e2ccb52019-02-05 09:21:47 -0500231 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500232 grp = append(grp, pods[0])
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400233 pods = rmPod(pods, 0)
sslobodr16e41bc2019-01-18 16:22:21 -0500234 //log.Debugf("Creating new group %s", pd[k].pod.name)
235 // Find the peer pod based on device overlap
236 // It's ok if one isn't found, an empty one will be used instead
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400237 for k, _ := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500238 if len(pods[k].devIds) == 0 { // Skip pods with no devices
239 //log.Debugf("%s empty pod", pd[k1].pod.name)
240 continue
241 }
242 if intersect(grp[0].devIds, pods[k].devIds) == true {
243 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
244 if grp[0].node == pods[k].node {
245 // This should never happen
246 log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400247 grp[0].name, pods[k].name)
sslobodr16e41bc2019-01-18 16:22:21 -0500248 continue
249 }
250 grp = append(grp, pods[k])
251 pods = rmPod(pods, k)
252 break
253
254 }
255 }
256 rtrn = append(rtrn, grp)
257 //log.Debugf("Added group %s", grp[0].name)
258 // Check if the number of groups = half the pods, if so all groups are started.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400259 if len(rtrn) == podCt>>1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500260 // Append any remaining pods to out
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400261 out = append(out, pods[0:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500262 break
263 }
264 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400265 return rtrn, out
sslobodr16e41bc2019-01-18 16:22:21 -0500266}
267
sslobodr16e41bc2019-01-18 16:22:21 -0500268func unallocPodCount(pd []*podTrack) int {
269 var rtrn int = 0
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400270 for _, v := range pd {
sslobodr16e41bc2019-01-18 16:22:21 -0500271 if v.dn == false {
272 rtrn++
273 }
274 }
275 return rtrn
276}
277
sslobodr8e2ccb52019-02-05 09:21:47 -0500278func sameNode(pod *volthaPod, grps [][]*volthaPod) bool {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400279 for _, v := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500280 if v[0].node == pod.node {
281 return true
282 }
283 if len(v) == 2 && v[1].node == pod.node {
284 return true
285 }
286 }
287 return false
288}
289
sslobodr8e2ccb52019-02-05 09:21:47 -0500290func startRemainingGroups1(grps [][]*volthaPod, pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
291 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500292
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400293 for k, _ := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500294 if sameNode(pods[k], grps) {
295 continue
296 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500297 grp = []*volthaPod{}
sslobodr16e41bc2019-01-18 16:22:21 -0500298 grp = append(grp, pods[k])
299 pods = rmPod(pods, k)
300 grps = append(grps, grp)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400301 if len(grps) == podCt>>1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500302 break
303 }
304 }
305 return grps, pods
306}
307
sslobodr8e2ccb52019-02-05 09:21:47 -0500308func hasSingleSecondNode(grp []*volthaPod) bool {
sslobodr16e41bc2019-01-18 16:22:21 -0500309 var srvrs map[string]struct{} = make(map[string]struct{})
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400310 for k, _ := range grp {
sslobodr16e41bc2019-01-18 16:22:21 -0500311 if k == 0 {
312 continue // Ignore the first item
313 }
314 srvrs[grp[k].node] = struct{}{}
315 }
316 if len(srvrs) == 1 {
317 return true
318 }
319 return false
320}
321
sslobodr8e2ccb52019-02-05 09:21:47 -0500322func addNode(grps [][]*volthaPod, idx *volthaPod, item *volthaPod) [][]*volthaPod {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400323 for k, _ := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500324 if grps[k][0].name == idx.name {
325 grps[k] = append(grps[k], item)
326 return grps
327 }
328 }
329 // TODO: Error checking required here.
330 return grps
331}
332
sslobodr8e2ccb52019-02-05 09:21:47 -0500333func removeNode(grps [][]*volthaPod, item *volthaPod) [][]*volthaPod {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400334 for k, _ := range grps {
335 for k1, _ := range grps[k] {
sslobodr16e41bc2019-01-18 16:22:21 -0500336 if grps[k][k1].name == item.name {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400337 grps[k] = append(grps[k][:k1], grps[k][k1+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500338 break
339 }
340 }
341 }
342 return grps
343}
344
sslobodr8e2ccb52019-02-05 09:21:47 -0500345func groupRemainingPods1(grps [][]*volthaPod, pods []*volthaPod) [][]*volthaPod {
346 var lgrps [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500347 // All groups must be started when this function is called.
348 // Copy incomplete groups
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400349 for k, _ := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500350 if len(grps[k]) != 2 {
351 lgrps = append(lgrps, grps[k])
352 }
353 }
354
355 // Add all pairing candidates to each started group.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400356 for k, _ := range pods {
357 for k2, _ := range lgrps {
sslobodr16e41bc2019-01-18 16:22:21 -0500358 if lgrps[k2][0].node != pods[k].node {
359 lgrps[k2] = append(lgrps[k2], pods[k])
360 }
361 }
362 }
363
364 //TODO: If any member of lgrps doesn't have at least 2
365 // nodes something is wrong. Check for that here
366
367 for {
368 for { // Address groups with only a single server choice
369 var ssn bool = false
370
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400371 for k, _ := range lgrps {
sslobodr16e41bc2019-01-18 16:22:21 -0500372 // Now if any of the groups only have a single
373 // node as the choice for the second member
374 // address that one first.
375 if hasSingleSecondNode(lgrps[k]) == true {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400376 ssn = true
sslobodr16e41bc2019-01-18 16:22:21 -0500377 // Add this pairing to the groups
378 grps = addNode(grps, lgrps[k][0], lgrps[k][1])
379 // Since this node is now used, remove it from all
380 // remaining tenative groups
381 lgrps = removeNode(lgrps, lgrps[k][1])
382 // Now remove this group completely since
383 // it's been addressed
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400384 lgrps = append(lgrps[:k], lgrps[k+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500385 break
386 }
387 }
388 if ssn == false {
389 break
390 }
391 }
392 // Now adress one of the remaining groups
393 if len(lgrps) == 0 {
394 break // Nothing left to do, exit the loop
395 }
396 grps = addNode(grps, lgrps[0][0], lgrps[0][1])
397 lgrps = removeNode(lgrps, lgrps[0][1])
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400398 lgrps = append(lgrps[:0], lgrps[1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500399 }
400 return grps
401}
402
sslobodr8e2ccb52019-02-05 09:21:47 -0500403func groupPods1(pods []*volthaPod) [][]*volthaPod {
404 var rtrn [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500405 var podCt int = len(pods)
406
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400407 rtrn, pods = groupIntersectingPods1(pods, podCt)
408 // There are several outcomes here
sslobodr16e41bc2019-01-18 16:22:21 -0500409 // 1) All pods have been paired and we're done
410 // 2) Some un-allocated pods remain
411 // 2.a) All groups have been started
412 // 2.b) Not all groups have been started
413 if len(pods) == 0 {
414 return rtrn
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400415 } else if len(rtrn) == podCt>>1 { // All groupings started
sslobodr16e41bc2019-01-18 16:22:21 -0500416 // Allocate the remaining (presumably empty) pods to the started groups
417 return groupRemainingPods1(rtrn, pods)
418 } else { // Some groupings started
419 // Start empty groups with remaining pods
420 // each grouping is on a different server then
421 // allocate remaining pods.
422 rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
423 return groupRemainingPods1(rtrn, pods)
424 }
425}
426
sslobodr16e41bc2019-01-18 16:22:21 -0500427func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400428 for k, _ := range d1 {
429 if _, ok := d2[k]; ok == true {
sslobodr16e41bc2019-01-18 16:22:21 -0500430 return true
431 }
432 }
433 return false
434}
435
sslobodr8e2ccb52019-02-05 09:21:47 -0500436func setConnection(client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
sslobodr360c8d72019-02-05 12:47:56 -0500437 log.Debugf("Configuring backend %s : connection %s in cluster %s\n\n",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400438 backend, connection, cluster)
439 cnf := &pb.Conn{Server: "grpc_command", Cluster: cluster, Backend: backend,
440 Connection: connection, Addr: addr,
441 Port: port}
sslobodr16e41bc2019-01-18 16:22:21 -0500442 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
443 log.Debugf("failed SetConnection RPC call: %s", err)
444 } else {
445 log.Debugf("Result: %v", res)
446 }
447}
448
449func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
450 log.Debugf("Configuring backend %s : affinities \n", backend)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400451 aff := &pb.Affinity{Router: "vcore", Route: "dev_manager", Cluster: "vcore", Backend: backend}
452 for k, _ := range ids {
sslobodr16e41bc2019-01-18 16:22:21 -0500453 log.Debugf("Setting affinity for id %s", k)
454 aff.Id = k
455 if res, err := client.SetAffinity(context.Background(), aff); err != nil {
456 log.Debugf("failed affinity RPC call: %s", err)
457 } else {
458 log.Debugf("Result: %v", res)
459 }
460 }
461}
462
sslobodr8e2ccb52019-02-05 09:21:47 -0500463func getBackendForCore(coreId string, coreGroups [][]*volthaPod) string {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400464 for _, v := range coreGroups {
465 for _, v2 := range v {
sslobodr38afd0d2019-01-21 12:31:46 -0500466 if v2.name == coreId {
467 return v2.backend
468 }
469 }
470 }
471 log.Errorf("No backend found for core %s\n", coreId)
472 return ""
473}
474
sslobodr16e41bc2019-01-18 16:22:21 -0500475func monitorDiscovery(client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400476 ch <-chan *ic.InterContainerMessage,
477 coreGroups [][]*volthaPod) {
sslobodr38afd0d2019-01-21 12:31:46 -0500478 var id map[string]struct{} = make(map[string]struct{})
479
sslobodr16e41bc2019-01-18 16:22:21 -0500480 select {
481 case msg := <-ch:
482 log.Debugf("Received a device discovery notification")
sslobodr38afd0d2019-01-21 12:31:46 -0500483 device := &ic.DeviceDiscovered{}
484 if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
sslobodr16e41bc2019-01-18 16:22:21 -0500485 log.Errorf("Could not unmarshal received notification %v", msg)
486 } else {
sslobodr38afd0d2019-01-21 12:31:46 -0500487 // Set the affinity of the discovered device.
488 if be := getBackendForCore(device.Id, coreGroups); be != "" {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400489 id[device.Id] = struct{}{}
sslobodr38afd0d2019-01-21 12:31:46 -0500490 setAffinity(client, id, be)
491 } else {
492 log.Error("Cant use an empty string as a backend name")
493 }
sslobodr16e41bc2019-01-18 16:22:21 -0500494 }
495 break
496 }
497}
498
sslobodr38afd0d2019-01-21 12:31:46 -0500499func startDiscoveryMonitor(client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400500 coreGroups [][]*volthaPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500501 var ch <-chan *ic.InterContainerMessage
502 // Connect to kafka for discovery events
503 topic := &kafka.Topic{Name: "AffinityRouter"}
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400504 kc, err := newKafkaClient("sarama", "kafka", 9092, "arouterd")
sslobodr16e41bc2019-01-18 16:22:21 -0500505 kc.Start()
506
507 if ch, err = kc.Subscribe(topic); err != nil {
508 log.Error("Could not subscribe to the 'AffinityRouter' channel, discovery disabled")
509 return err
510 }
sslobodr38afd0d2019-01-21 12:31:46 -0500511 go monitorDiscovery(client, ch, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500512 return nil
513}
514
sslobodre7ce71d2019-01-22 16:21:45 -0500515// Determines which items in core groups
516// have changed based on the list provided
517// and returns a coreGroup with only the changed
518// items and a pod list with the new items
sslobodr8e2ccb52019-02-05 09:21:47 -0500519func getAddrDiffs(coreGroups [][]*volthaPod, rwPods []*volthaPod) ([][]*volthaPod, []*volthaPod) {
520 var nList []*volthaPod
521 var rtrn [][]*volthaPod = make([][]*volthaPod, nPods>>1)
sslobodre7ce71d2019-01-22 16:21:45 -0500522 var ipAddrs map[string]struct{} = make(map[string]struct{})
523
524 log.Debug("Get addr diffs")
525
526 // Start with an empty array
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400527 for k, _ := range rtrn {
sslobodr8e2ccb52019-02-05 09:21:47 -0500528 rtrn[k] = make([]*volthaPod, 2)
sslobodre7ce71d2019-01-22 16:21:45 -0500529 }
530
531 // Build a list with only the new items
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400532 for _, v := range rwPods {
sslobodre7ce71d2019-01-22 16:21:45 -0500533 if hasIpAddr(coreGroups, v.ipAddr) == false {
534 nList = append(nList, v)
535 }
536 ipAddrs[v.ipAddr] = struct{}{} // for the search below
537 }
538
539 // Now build the coreGroups with only the changed items
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400540 for k1, v1 := range coreGroups {
541 for k2, v2 := range v1 {
542 if _, ok := ipAddrs[v2.ipAddr]; ok == false {
sslobodre7ce71d2019-01-22 16:21:45 -0500543 rtrn[k1][k2] = v2
544 }
545 }
546 }
547 return rtrn, nList
548}
549
550// Figure out where best to put the new pods
551// in the coreGroup array based on the old
552// pods being replaced. The criteria is that
553// the new pod be on the same server as the
554// old pod was.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400555func reconcileAddrDiffs(coreGroupDiffs [][]*volthaPod, rwPodDiffs []*volthaPod) [][]*volthaPod {
sslobodr8e2ccb52019-02-05 09:21:47 -0500556 var srvrs map[string][]*volthaPod = make(map[string][]*volthaPod)
sslobodre7ce71d2019-01-22 16:21:45 -0500557
558 log.Debug("Reconciling diffs")
559 log.Debug("Building server list")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400560 for _, v := range rwPodDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500561 log.Debugf("Adding %v to the server list", *v)
562 srvrs[v.node] = append(srvrs[v.node], v)
563 }
564
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400565 for k1, v1 := range coreGroupDiffs {
566 log.Debugf("k1:%v, v1:%v", k1, v1)
567 for k2, v2 := range v1 {
568 log.Debugf("k2:%v, v2:%v", k2, v2)
sslobodre7ce71d2019-01-22 16:21:45 -0500569 if v2 == nil { // Nothing to do here
570 continue
571 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400572 if _, ok := srvrs[v2.node]; ok == true {
sslobodre7ce71d2019-01-22 16:21:45 -0500573 coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
574 if len(srvrs[v2.node]) > 1 { // remove one entry from the list
575 srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
576 } else { // Delete the endtry from the map
577 delete(srvrs, v2.node)
578 }
579 } else {
580 log.Error("This should never happen, node appears to have changed names")
581 // attempt to limp along by keeping this old entry
582 }
583 }
584 }
585
586 return coreGroupDiffs
587}
588
sslobodr8e2ccb52019-02-05 09:21:47 -0500589func applyAddrDiffs(client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
590 var newEntries [][]*volthaPod
sslobodre7ce71d2019-01-22 16:21:45 -0500591
592 log.Debug("Applying diffs")
sslobodr8e2ccb52019-02-05 09:21:47 -0500593 switch cores := coreList.(type) {
594 case [][]*volthaPod:
595 newEntries = reconcileAddrDiffs(getAddrDiffs(cores, nPods))
sslobodre7ce71d2019-01-22 16:21:45 -0500596
sslobodr8e2ccb52019-02-05 09:21:47 -0500597 // Now replace the information in coreGropus with the new
598 // entries and then reconcile the device ids on the core
599 // that's in the new entry with the device ids of it's
600 // active-active peer.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400601 for k1, v1 := range cores {
602 for k2, v2 := range v1 {
sslobodr8e2ccb52019-02-05 09:21:47 -0500603 if newEntries[k1][k2] != nil {
604 // TODO: Missing is the case where bothe the primary
605 // and the secondary core crash and come back.
606 // Pull the device ids from the active-active peer
607 ids := queryPodDeviceIds(cores[k1][k2^1])
608 if len(ids) != 0 {
609 if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400610 log.Errorf("Attempt to reconcile ids on pod %v failed", newEntries[k1][k2])
sslobodr8e2ccb52019-02-05 09:21:47 -0500611 }
sslobodre7ce71d2019-01-22 16:21:45 -0500612 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500613 // Send the affininty router new connection information
614 setConnection(client, "vcore", v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400615 // Copy the new entry information over
sslobodr8e2ccb52019-02-05 09:21:47 -0500616 cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
617 cores[k1][k2].name = newEntries[k1][k2].name
618 cores[k1][k2].devIds = ids
sslobodre7ce71d2019-01-22 16:21:45 -0500619 }
sslobodre7ce71d2019-01-22 16:21:45 -0500620 }
621 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500622 case []*volthaPod:
623 var mia []*volthaPod
624 var found bool
625 // TODO: Break this using functions to simplify
626 // reading of the code.
627 // Find the core(s) that have changed addresses
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400628 for k1, v1 := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500629 found = false
630 for _, v2 := range nPods {
631 if v1.ipAddr == v2.ipAddr {
632 found = true
633 break
634 }
635 }
636 if found == false {
637 mia = append(mia, cores[k1])
638 }
639 }
640 // Now plug in the new addresses and set the connection
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400641 for _, v1 := range nPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500642 found = false
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400643 for _, v2 := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500644 if v1.ipAddr == v2.ipAddr {
645 found = true
646 break
647 }
648 }
649 if found == true {
650 continue
651 }
652 mia[0].ipAddr = v1.ipAddr
653 mia[0].name = v1.name
654 setConnection(client, "ro_vcore", mia[0].backend, mia[0].connection, v1.ipAddr, 50057)
655 // Now get rid of the mia entry just processed
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400656 mia = append(mia[:0], mia[1:]...)
sslobodr8e2ccb52019-02-05 09:21:47 -0500657 }
658 default:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400659 log.Error("Internal: Unexpected type in call to applyAddrDiffs")
sslobodre7ce71d2019-01-22 16:21:45 -0500660 }
661}
662
sslobodr8e2ccb52019-02-05 09:21:47 -0500663func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
664 var byName map[string]*volthaPod = make(map[string]*volthaPod)
sslobodrcd37bc52019-01-24 11:47:16 -0500665
666 // Convinience
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400667 for _, v := range rwPods {
sslobodrcd37bc52019-01-24 11:47:16 -0500668 byName[v.name] = v
669 }
670
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400671 for k1, v1 := range coreGroups {
672 for k2, _ := range v1 {
sslobodrcd37bc52019-01-24 11:47:16 -0500673 coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
674 }
675 }
676}
677
sslobodr16e41bc2019-01-18 16:22:21 -0500678func startCoreMonitor(client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400679 clientset *kubernetes.Clientset,
680 rwCoreFltr *regexp.Regexp,
681 roCoreFltr *regexp.Regexp,
682 coreGroups [][]*volthaPod,
683 oRoPods []*volthaPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500684 // Now that initial allocation has been completed, monitor the pods
685 // for IP changes
686 // The main loop needs to do the following:
687 // 1) Periodically query the pods and filter out
688 // the vcore ones
689 // 2) Validate that the pods running are the same
690 // as the previous check
691 // 3) Validate that the IP addresses are the same
692 // as the last check.
693 // If the pod name(s) ha(s/ve) changed then remove
694 // the unused pod names and add in the new pod names
695 // maintaining the cluster/backend information.
696 // If an IP address has changed (which shouldn't
697 // happen unless a pod is re-started) it should get
698 // caught by the pod name change.
699 for {
700 time.Sleep(10 * time.Second) // Wait a while
701 // Get the rw core list from k8s
sslobodr8e2ccb52019-02-05 09:21:47 -0500702 rwPods := getVolthaPods(clientset, rwCoreFltr)
sslobodre7ce71d2019-01-22 16:21:45 -0500703 queryDeviceIds(rwPods)
sslobodrcd37bc52019-01-24 11:47:16 -0500704 updateDeviceIds(coreGroups, rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500705 // If we didn't get 2n+1 pods then wait since
706 // something is down and will hopefully come
707 // back up at some point.
708 // TODO: remove the 6 pod hardcoding
709 if len(rwPods) != 6 {
710 continue
711 }
712 // We have all pods, check if any IP addresses
713 // have changed.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400714 for _, v := range rwPods {
sslobodre7ce71d2019-01-22 16:21:45 -0500715 if hasIpAddr(coreGroups, v.ipAddr) == false {
716 log.Debug("Address has changed...")
717 applyAddrDiffs(client, coreGroups, rwPods)
sslobodr8e2ccb52019-02-05 09:21:47 -0500718 break
sslobodre7ce71d2019-01-22 16:21:45 -0500719 }
sslobodr16e41bc2019-01-18 16:22:21 -0500720 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500721
722 roPods := getVolthaPods(clientset, roCoreFltr)
723
724 if len(roPods) != 3 {
725 continue
726 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400727 for _, v := range roPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500728 if hasIpAddr(oRoPods, v.ipAddr) == false {
729 applyAddrDiffs(client, oRoPods, roPods)
730 break
731 }
732 }
733
sslobodr16e41bc2019-01-18 16:22:21 -0500734 }
sslobodr16e41bc2019-01-18 16:22:21 -0500735}
736
sslobodr8e2ccb52019-02-05 09:21:47 -0500737func hasIpAddr(coreList interface{}, ipAddr string) bool {
738 switch cores := coreList.(type) {
739 case []*volthaPod:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400740 for _, v := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500741 if v.ipAddr == ipAddr {
sslobodre7ce71d2019-01-22 16:21:45 -0500742 return true
743 }
744 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500745 case [][]*volthaPod:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400746 for _, v1 := range cores {
747 for _, v2 := range v1 {
sslobodr8e2ccb52019-02-05 09:21:47 -0500748 if v2.ipAddr == ipAddr {
749 return true
750 }
751 }
752 }
753 default:
754 log.Error("Internal: Unexpected type in call to hasIpAddr")
sslobodre7ce71d2019-01-22 16:21:45 -0500755 }
756 return false
757}
758
sslobodr16e41bc2019-01-18 16:22:21 -0500759func main() {
760 // This is currently hard coded to a cluster with 3 servers
761 //var connections map[string]configConn = make(map[string]configConn)
762 //var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
sslobodr16e41bc2019-01-18 16:22:21 -0500763 var err error
764 var conn *grpc.ClientConn
765
sslobodr16e41bc2019-01-18 16:22:21 -0500766 // Set up the regular expression to identify the voltha cores
sslobodr8e2ccb52019-02-05 09:21:47 -0500767 rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
768 roCoreFltr := regexp.MustCompile(`ro-core-`)
sslobodr16e41bc2019-01-18 16:22:21 -0500769
770 // Set up logging
771 if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
772 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
773 }
774
775 // Set up kubernetes api
776 clientset := k8sClientSet()
777
778 // Connect to the affinity router and set up the client
779 conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
sslobodrcd37bc52019-01-24 11:47:16 -0500780 defer conn.Close()
sslobodr16e41bc2019-01-18 16:22:21 -0500781 if err != nil {
782 panic(err.Error())
783 }
784 client := pb.NewConfigurationClient(conn)
785
786 // Get the voltha rw-core podes
sslobodr8e2ccb52019-02-05 09:21:47 -0500787 rwPods := getVolthaPods(clientset, rwCoreFltr)
sslobodr16e41bc2019-01-18 16:22:21 -0500788
789 // Fetch the devices held by each running core
sslobodre7ce71d2019-01-22 16:21:45 -0500790 queryDeviceIds(rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500791
792 // For debugging... comment out l8r
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400793 for _, v := range rwPods {
sslobodr16e41bc2019-01-18 16:22:21 -0500794 log.Debugf("Pod list %v", *v)
795 }
796
797 coreGroups := groupPods1(rwPods)
798
sslobodr16e41bc2019-01-18 16:22:21 -0500799 // Assign the groupings to the the backends and connections
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400800 for k, _ := range coreGroups {
801 for k1, _ := range coreGroups[k] {
sslobodr8e2ccb52019-02-05 09:21:47 -0500802 coreGroups[k][k1].cluster = "vcore"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400803 coreGroups[k][k1].backend = "vcore" + strconv.Itoa(k+1)
804 coreGroups[k][k1].connection = "vcore" + strconv.Itoa(k+1) + strconv.Itoa(k1+1)
sslobodr16e41bc2019-01-18 16:22:21 -0500805 }
806 }
sslobodrcd37bc52019-01-24 11:47:16 -0500807 log.Info("Core gouping completed")
sslobodr16e41bc2019-01-18 16:22:21 -0500808
809 // TODO: Debugging code, comment out for production
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400810 for k, v := range coreGroups {
811 for k2, v2 := range v {
sslobodr16e41bc2019-01-18 16:22:21 -0500812 log.Debugf("Core group %d,%d: %v", k, k2, v2)
813 }
814 }
sslobodrcd37bc52019-01-24 11:47:16 -0500815 log.Info("Setting affinities")
sslobodr16e41bc2019-01-18 16:22:21 -0500816 // Now set the affinities for exising devices in the cores
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400817 for _, v := range coreGroups {
sslobodr16e41bc2019-01-18 16:22:21 -0500818 setAffinity(client, v[0].devIds, v[0].backend)
819 setAffinity(client, v[1].devIds, v[1].backend)
820 }
sslobodrcd37bc52019-01-24 11:47:16 -0500821 log.Info("Setting connections")
sslobodr16e41bc2019-01-18 16:22:21 -0500822 // Configure the backeds based on the calculated core groups
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400823 for _, v := range coreGroups {
sslobodr8e2ccb52019-02-05 09:21:47 -0500824 setConnection(client, "vcore", v[0].backend, v[0].connection, v[0].ipAddr, 50057)
825 setConnection(client, "vcore", v[1].backend, v[1].connection, v[1].ipAddr, 50057)
826 }
827
828 // Process the read only pods
829 roPods := getVolthaPods(clientset, roCoreFltr)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400830 for k, v := range roPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500831 log.Debugf("Processing ro_pod %v", v)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400832 vN := "ro_vcore" + strconv.Itoa(k+1)
sslobodr8e2ccb52019-02-05 09:21:47 -0500833 log.Debugf("Setting connection %s, %s, %s", vN, vN+"1", v.ipAddr)
834 roPods[k].cluster = "ro_core"
835 roPods[k].backend = vN
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400836 roPods[k].connection = vN + "1"
sslobodr8e2ccb52019-02-05 09:21:47 -0500837 setConnection(client, "ro_vcore", v.backend, v.connection, v.ipAddr, 50057)
sslobodr16e41bc2019-01-18 16:22:21 -0500838 }
839
sslobodrcd37bc52019-01-24 11:47:16 -0500840 log.Info("Starting discovery monitoring")
sslobodr38afd0d2019-01-21 12:31:46 -0500841 startDiscoveryMonitor(client, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500842
sslobodrcd37bc52019-01-24 11:47:16 -0500843 log.Info("Starting core monitoring")
sslobodr8e2ccb52019-02-05 09:21:47 -0500844 startCoreMonitor(client, clientset, rwCoreFltr,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400845 roCoreFltr, coreGroups, roPods) // Never returns
sslobodr16e41bc2019-01-18 16:22:21 -0500846 return
sslobodr16e41bc2019-01-18 16:22:21 -0500847}