blob: 63c29f604e0a06e9b3f29aaf9d0029932ace5867 [file] [log] [blame]
sslobodr16e41bc2019-01-18 16:22:21 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package main
18
19import (
20 //"os"
21 "fmt"
22 "time"
23 "regexp"
24 "errors"
25 "strconv"
26 //"io/ioutil"
27 //"encoding/json"
28
29 "k8s.io/client-go/rest"
30 "google.golang.org/grpc"
31 "golang.org/x/net/context"
32 "k8s.io/client-go/kubernetes"
33 "github.com/golang/protobuf/ptypes"
34 //"k8s.io/apimachinery/pkg/api/errors"
35 "github.com/opencord/voltha-go/common/log"
36 kafka "github.com/opencord/voltha-go/kafka"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 empty "github.com/golang/protobuf/ptypes/empty"
39 vpb "github.com/opencord/voltha-go/protos/voltha"
sslobodre7ce71d2019-01-22 16:21:45 -050040 cmn "github.com/opencord/voltha-go/protos/common"
sslobodr16e41bc2019-01-18 16:22:21 -050041 pb "github.com/opencord/voltha-go/protos/afrouter"
42 ic "github.com/opencord/voltha-go/protos/inter_container"
43)
44
45type configConn struct {
46 Server string `json:"Server"`
47 Cluster string `json:"Cluster"`
48 Backend string `json:"Backend"`
49 connections map[string]connection
50}
51
52type connection struct {
53 Name string `json:"Connection"`
54 Addr string `json:"Addr"`
55 Port uint64 `json:"Port"`
56}
57
58type rwPod struct {
59 name string
60 ipAddr string
61 node string
62 devIds map[string]struct{}
63 backend string
64 connection string
65}
66
67type podTrack struct {
68 pod *rwPod
69 dn bool
70}
71
sslobodre7ce71d2019-01-22 16:21:45 -050072var nPods int = 6
73
sslobodr16e41bc2019-01-18 16:22:21 -050074// Topic is affinityRouter
75// port: 9092
76
77func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
78
79 log.Infow("kafka-client-type", log.Fields{"client": clientType})
80 switch clientType {
81 case "sarama":
82 return kafka.NewSaramaClient(
83 kafka.Host(host),
84 kafka.Port(port),
85 kafka.ConsumerType(kafka.GroupCustomer),
86 kafka.ProducerReturnOnErrors(true),
87 kafka.ProducerReturnOnSuccess(true),
88 kafka.ProducerMaxRetries(6),
89 kafka.NumPartitions(3),
90 kafka.ConsumerGroupName(instanceID),
91 kafka.ConsumerGroupPrefix(instanceID),
92 kafka.AutoCreateTopic(false),
93 kafka.ProducerFlushFrequency(5),
94 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
95 }
96 return nil, errors.New("unsupported-client-type")
97}
98
99
100func k8sClientSet() *kubernetes.Clientset {
101 // creates the in-cluster config
102 config, err := rest.InClusterConfig()
103 if err != nil {
104 panic(err.Error())
105 }
106 // creates the clientset
107 clientset, err := kubernetes.NewForConfig(config)
108 if err != nil {
109 panic(err.Error())
110 }
111
112 return clientset
113}
114
115
116func connect(addr string) (*grpc.ClientConn, error) {
117 for ctr :=0 ; ctr < 100; ctr++ {
sslobodre7ce71d2019-01-22 16:21:45 -0500118 log.Debugf("Trying to connect to %s", addr)
sslobodr16e41bc2019-01-18 16:22:21 -0500119 conn, err := grpc.Dial(addr, grpc.WithInsecure())
120 if err != nil {
121 log.Debugf("Attempt to connect failed, retrying %v:", err)
122 } else {
123 log.Debugf("Connection succeeded")
124 return conn,err
125 }
126 time.Sleep(10 * time.Second)
127 }
128 log.Debugf("Too many connection attempts, giving up!")
129 return nil,errors.New("Timeout attempting to conect")
130}
131
132func getRwPods(cs *kubernetes.Clientset, coreFilter * regexp.Regexp) []*rwPod {
133 var rtrn []*rwPod
134
135 pods, err := cs.CoreV1().Pods("").List(metav1.ListOptions{})
136 if err != nil {
137 panic(err.Error())
138 }
sslobodre7ce71d2019-01-22 16:21:45 -0500139 //log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
sslobodr16e41bc2019-01-18 16:22:21 -0500140
sslobodre7ce71d2019-01-22 16:21:45 -0500141 for _,v := range pods.Items {
sslobodr16e41bc2019-01-18 16:22:21 -0500142 if v.Namespace == "voltha" && coreFilter.MatchString(v.Name) {
sslobodre7ce71d2019-01-22 16:21:45 -0500143 log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
sslobodr16e41bc2019-01-18 16:22:21 -0500144 v.Status.PodIP, v.Spec.NodeName)
sslobodre7ce71d2019-01-22 16:21:45 -0500145 // Only add the pod if it has an IP address. If it doesn't then it likely crashed and
146 // and is still in the process of getting re-started.
147 if v.Status.PodIP != "" {
148 rtrn = append(rtrn, &rwPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
149 devIds:make(map[string]struct{}), backend:"", connection:""})
150 }
sslobodr16e41bc2019-01-18 16:22:21 -0500151 }
152 }
153 return rtrn
154}
155
sslobodre7ce71d2019-01-22 16:21:45 -0500156func reconcilePodDeviceIds(pod * rwPod, ids map[string]struct{}) bool {
157 var idList cmn.IDs
sslobodr6c1689c2019-01-24 07:31:15 -0500158 for k,_ := range ids {
sslobodre7ce71d2019-01-22 16:21:45 -0500159 idList.Items = append(idList.Items, &cmn.ID{Id:k})
160 }
161 conn,err := connect(pod.ipAddr+":50057")
162 defer conn.Close()
sslobodr6c1689c2019-01-24 07:31:15 -0500163 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500164 log.Debugf("Could not query devices from %s, could not connect", pod.name)
165 return false
166 }
167 client := vpb.NewVolthaServiceClient(conn)
168 _,err = client.ReconcileDevices(context.Background(), &idList)
169 if err != nil {
170 log.Error(err)
171 return false
172 }
173
174 return true
175}
176
177func queryPodDeviceIds(pod * rwPod) map[string]struct{} {
178 var rtrn map[string]struct{} = make(map[string]struct{})
179 // Open a connection to the pod
180 // port 50057
181 conn, err := connect(pod.ipAddr+":50057")
sslobodr6c1689c2019-01-24 07:31:15 -0500182 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500183 log.Debugf("Could not query devices from %s, could not connect", pod.name)
184 return rtrn
185 }
186 defer conn.Close()
187 client := vpb.NewVolthaServiceClient(conn)
188 devs,err := client.ListDeviceIds(context.Background(), &empty.Empty{})
189 if err != nil {
190 log.Error(err)
191 return rtrn
192 }
193 for _,dv := range devs.Items {
194 rtrn[dv.Id]=struct{}{}
195 }
196
197 return rtrn
198}
199
200func queryDeviceIds(pods []*rwPod) {
201 for pk,_ := range pods {
202 // Keep the old Id list if a new list is not returned
203 if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
204 pods[pk].devIds = idList
sslobodr16e41bc2019-01-18 16:22:21 -0500205 }
sslobodr16e41bc2019-01-18 16:22:21 -0500206 }
207}
208
209func allEmpty(pods []*rwPod) bool {
210 for k,_ := range pods {
211 if len(pods[k].devIds) != 0 {
212 return false
213 }
214 }
215 return true
216}
217
218//func groupEmptyCores(pods []*rwPod) [][]*rwPod {
219// return [][]*rwPod{}
220//}
221
222//func groupPods(pods []*rwPod) [][]*rwPod {
223
224// if allEmpty(pods) == true {
225// return groupEmptyCores(pods)
226// } else {
227// return groupPopulatedCores(pods)
228// }
229//}
230
231func rmPod(pods []*rwPod, idx int) []*rwPod {
232 return append(pods[:idx],pods[idx+1:]...)
233}
234
235func groupIntersectingPods1(pods []*rwPod, podCt int) ([][]*rwPod,[]*rwPod) {
236 var rtrn [][]*rwPod
237 var out []*rwPod
238
239 for {
240 if len(pods) == 0 {
241 break
242 }
243 if len(pods[0].devIds) == 0 { // Ignore pods with no devices
244 ////log.Debugf("%s empty pod", pd[k].pod.name)
245 out = append(out, pods[0])
246 pods = rmPod(pods, 0)
247 continue
248 }
249 // Start a pod group with this pod
250 var grp []*rwPod
251 grp = append(grp, pods[0])
252 pods = rmPod(pods,0)
253 //log.Debugf("Creating new group %s", pd[k].pod.name)
254 // Find the peer pod based on device overlap
255 // It's ok if one isn't found, an empty one will be used instead
256 for k,_ := range pods {
257 if len(pods[k].devIds) == 0 { // Skip pods with no devices
258 //log.Debugf("%s empty pod", pd[k1].pod.name)
259 continue
260 }
261 if intersect(grp[0].devIds, pods[k].devIds) == true {
262 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
263 if grp[0].node == pods[k].node {
264 // This should never happen
265 log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
266 grp[0].name, pods[k].name)
267 continue
268 }
269 grp = append(grp, pods[k])
270 pods = rmPod(pods, k)
271 break
272
273 }
274 }
275 rtrn = append(rtrn, grp)
276 //log.Debugf("Added group %s", grp[0].name)
277 // Check if the number of groups = half the pods, if so all groups are started.
278 if len(rtrn) == podCt >> 1 {
279 // Append any remaining pods to out
280 out = append(out,pods[0:]...)
281 break
282 }
283 }
284 return rtrn,out
285}
286
287func groupIntersectingPods(pd []*podTrack) ([][]*rwPod,[]*podTrack) {
288 var rtrn [][]*rwPod
289
290 for k,_ := range pd {
291 if pd[k].dn == true { // Already processed?
292 //log.Debugf("%s already processed", pd[k].pod.name)
293 continue
294 }
295 if len(pd[k].pod.devIds) == 0 { // Ignore pods with no devices
296 ////log.Debugf("%s empty pod", pd[k].pod.name)
297 continue
298 }
299 // Start a pod group with this pod
300 var grp []*rwPod
301 grp = append(grp, pd[k].pod)
302 pd[k].dn = true
303 //log.Debugf("Creating new group %s", pd[k].pod.name)
304 // Find the peer pod based on device overlap
305 // It's ok if one isn't found, an empty one will be used instead
306 for k1,_ := range pd {
307 if pd[k1].dn == true { // Skip over eliminated pods
308 //log.Debugf("%s eliminated pod", pd[k1].pod.name)
309 continue
310 }
311 if len(pd[k1].pod.devIds) == 0 { // Skip pods with no devices
312 //log.Debugf("%s empty pod", pd[k1].pod.name)
313 continue
314 }
315 if intersect(pd[k].pod.devIds, pd[k1].pod.devIds) == true {
316 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
317 if pd[k].pod.node == pd[k1].pod.node {
318 // This should never happen
319 log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
320 pd[k].pod.name, pd[k1].pod.name)
321 continue
322 }
323 pd[k1].dn = true
324 grp = append(grp, pd[k1].pod)
325 break
326 }
327 }
328 rtrn = append(rtrn, grp)
329 //log.Debugf("Added group %s", grp[0].name)
330 // Check if the number of groups = half the pods, if so all groups are started.
331 if len(rtrn) == len(pd) >> 1 {
332 break
333 }
334 }
335 return rtrn,pd
336}
337
338func unallocPodCount(pd []*podTrack) int {
339 var rtrn int = 0
340 for _,v := range pd {
341 if v.dn == false {
342 rtrn++
343 }
344 }
345 return rtrn
346}
347
348
349func sameNode(pod *rwPod, grps [][]*rwPod) bool {
350 for _,v := range grps {
351 if v[0].node == pod.node {
352 return true
353 }
354 if len(v) == 2 && v[1].node == pod.node {
355 return true
356 }
357 }
358 return false
359}
360
361func startRemainingGroups1(grps [][]*rwPod, pods []*rwPod, podCt int) ([][]*rwPod, []*rwPod) {
362 var grp []*rwPod
363
364 for k,_ := range pods {
365 if sameNode(pods[k], grps) {
366 continue
367 }
368 grp = []*rwPod{}
369 grp = append(grp, pods[k])
370 pods = rmPod(pods, k)
371 grps = append(grps, grp)
372 if len(grps) == podCt >> 1 {
373 break
374 }
375 }
376 return grps, pods
377}
378
379func startRemainingGroups(grps [][]*rwPod, pd []*podTrack) ([][]*rwPod, []*podTrack) {
380 var grp []*rwPod
381
382 for k,_ := range pd {
383 if sameNode(pd[k].pod, grps) == true {
384 continue
385 }
386 grp = append(grp, pd[k].pod)
387 grps = append(grps, grp)
388 pd[k].dn = true
389 if len(grps) == len(pd) >> 1 {
390 break
391 }
392 }
393 return grps, pd
394}
395
396func hasSingleSecondNode(grp []*rwPod) bool {
397 var srvrs map[string]struct{} = make(map[string]struct{})
398 for k,_ := range grp {
399 if k == 0 {
400 continue // Ignore the first item
401 }
402 srvrs[grp[k].node] = struct{}{}
403 }
404 if len(srvrs) == 1 {
405 return true
406 }
407 return false
408}
409
410func addNode(grps [][]*rwPod, idx *rwPod, item *rwPod) [][]*rwPod {
411 for k,_ := range grps {
412 if grps[k][0].name == idx.name {
413 grps[k] = append(grps[k], item)
414 return grps
415 }
416 }
417 // TODO: Error checking required here.
418 return grps
419}
420
421func removeNode(grps [][]*rwPod, item *rwPod) [][]*rwPod {
422 for k,_ := range grps {
423 for k1,_ := range grps[k] {
424 if grps[k][k1].name == item.name {
425 grps[k] = append(grps[k][:k1],grps[k][k1+1:]...)
426 break
427 }
428 }
429 }
430 return grps
431}
432
433func groupRemainingPods1(grps [][]*rwPod, pods []*rwPod) [][]*rwPod {
434 var lgrps [][]*rwPod
435 // All groups must be started when this function is called.
436 // Copy incomplete groups
437 for k,_ := range grps {
438 if len(grps[k]) != 2 {
439 lgrps = append(lgrps, grps[k])
440 }
441 }
442
443 // Add all pairing candidates to each started group.
444 for k,_ := range pods {
445 for k2,_ := range lgrps {
446 if lgrps[k2][0].node != pods[k].node {
447 lgrps[k2] = append(lgrps[k2], pods[k])
448 }
449 }
450 }
451
452 //TODO: If any member of lgrps doesn't have at least 2
453 // nodes something is wrong. Check for that here
454
455 for {
456 for { // Address groups with only a single server choice
457 var ssn bool = false
458
459 for k,_ := range lgrps {
460 // Now if any of the groups only have a single
461 // node as the choice for the second member
462 // address that one first.
463 if hasSingleSecondNode(lgrps[k]) == true {
464 ssn = true
465 // Add this pairing to the groups
466 grps = addNode(grps, lgrps[k][0], lgrps[k][1])
467 // Since this node is now used, remove it from all
468 // remaining tenative groups
469 lgrps = removeNode(lgrps, lgrps[k][1])
470 // Now remove this group completely since
471 // it's been addressed
472 lgrps = append(lgrps[:k],lgrps[k+1:]...)
473 break
474 }
475 }
476 if ssn == false {
477 break
478 }
479 }
480 // Now adress one of the remaining groups
481 if len(lgrps) == 0 {
482 break // Nothing left to do, exit the loop
483 }
484 grps = addNode(grps, lgrps[0][0], lgrps[0][1])
485 lgrps = removeNode(lgrps, lgrps[0][1])
486 lgrps = append(lgrps[:0],lgrps[1:]...)
487 }
488 return grps
489}
490
491func groupRemainingPods(grps [][]*rwPod, pd []*podTrack) [][]*rwPod{
492 var lgrps [][]*rwPod
493 // All groups must be started when this function is called.
494 // Copy incomplete groups
495 for k,_ := range grps {
496 if len(grps[k]) != 2 {
497 lgrps = append(lgrps, grps[k])
498 }
499 }
500
501 // Add all pairing candidates to each started group.
502 for k,_ := range pd {
503 if pd[k].dn == true {
504 continue
505 }
506 for k2,_ := range lgrps {
507 if lgrps[k2][0].node != pd[k].pod.node {
508 lgrps[k2] = append(lgrps[k2], pd[k].pod)
509 }
510 }
511 }
512
513 //TODO: If any member of lgrps doesn't have at least 2
514 // nodes something is wrong. Check for that here
515
516 for {
517 for { // Address groups with only a single server choice
518 var ssn bool = false
519
520 for k,_ := range lgrps {
521 // Now if any of the groups only have a single
522 // node as the choice for the second member
523 // address that one first.
524 if hasSingleSecondNode(lgrps[k]) == true {
525 ssn = true
526 // Add this pairing to the groups
527 grps = addNode(grps, lgrps[k][0], lgrps[k][1])
528 // Since this node is now used, remove it from all
529 // remaining tenative groups
530 lgrps = removeNode(lgrps, lgrps[k][1])
531 // Now remove this group completely since
532 // it's been addressed
533 lgrps = append(lgrps[:k],lgrps[k+1:]...)
534 break
535 }
536 }
537 if ssn == false {
538 break
539 }
540 }
541 // Now adress one of the remaining groups
542 if len(lgrps) == 0 {
543 break // Nothing left to do, exit the loop
544 }
545 grps = addNode(grps, lgrps[0][0], lgrps[0][1])
546 lgrps = removeNode(lgrps, lgrps[0][1])
547 lgrps = append(lgrps[:0],lgrps[1:]...)
548 }
549 return grps
550}
551
552func groupPods1(pods []*rwPod) [][]*rwPod {
553 var rtrn [][]*rwPod
554 var podCt int = len(pods)
555
556 rtrn,pods = groupIntersectingPods1(pods, podCt)
557 // There are several outcomes here
558 // 1) All pods have been paired and we're done
559 // 2) Some un-allocated pods remain
560 // 2.a) All groups have been started
561 // 2.b) Not all groups have been started
562 if len(pods) == 0 {
563 return rtrn
564 } else if len(rtrn) == podCt >> 1 { // All groupings started
565 // Allocate the remaining (presumably empty) pods to the started groups
566 return groupRemainingPods1(rtrn, pods)
567 } else { // Some groupings started
568 // Start empty groups with remaining pods
569 // each grouping is on a different server then
570 // allocate remaining pods.
571 rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
572 return groupRemainingPods1(rtrn, pods)
573 }
574}
575
576func groupPods(pods []*rwPod) [][]*rwPod {
577 var rtrn [][]*rwPod
578 var pd []*podTrack
579
580 // Tracking of the grouping process
581 for k,_ := range pods {
582 pd = append(pd, &podTrack{pods[k],false})
583 }
584
585
586 rtrn,pd = groupIntersectingPods(pd)
587 // There are several outcomes here
588 // 1) All pods have been paired and we're done
589 // 2) Some un-allocated pods remain
590 // 2.a) All groups have been started
591 // 2.b) Not all groups have been started
592 if unallocPodCount(pd) == 0 {
593 return rtrn
594 } else if len(rtrn) == len(pd) >> 1 { // All groupings started
595 // Allocate the remaining (presumably empty) pods to the started groups
596 return groupRemainingPods(rtrn, pd)
597 } else { // Some groupings started
598 // Start empty groups with remaining pods
599 // each grouping is on a different server then
600 // allocate remaining pods.
601 rtrn, pd = startRemainingGroups(rtrn, pd)
602 return groupRemainingPods(rtrn, pd)
603 }
604
605
606 // Establish groupings of non-empty pods that have overlapping devices.
607 for k,_ := range pd {
608 if pd[k].dn == true { // Already processed?
609 //log.Debugf("%s already processed", pd[k].pod.name)
610 continue
611 }
612 if len(pd[k].pod.devIds) == 0 { // Ignore pods with no devices
613 ////log.Debugf("%s empty pod", pd[k].pod.name)
614 continue
615 }
616 // Start a pod group with this pod
617 var grp []*rwPod
618 grp = append(grp, pd[k].pod)
619 pd[k].dn = true
620 //log.Debugf("Creating new group %s", pd[k].pod.name)
621 // Find the peer pod based on device overlap
622 // It's ok if one isn't found, an empty one will be used instead
623 for k1,_ := range pd {
624 if pd[k1].dn == true { // Skip over eliminated pods
625 //log.Debugf("%s eliminated pod", pd[k1].pod.name)
626 continue
627 }
628 if len(pd[k1].pod.devIds) == 0 { // Skip pods with no devices
629 //log.Debugf("%s empty pod", pd[k1].pod.name)
630 continue
631 }
632 if intersect(pd[k].pod.devIds, pd[k1].pod.devIds) == true {
633 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
634 pd[k1].dn = true
635 grp = append(grp, pd[k1].pod)
636 break
637 }
638 }
639 rtrn = append(rtrn, grp)
640 //log.Debugf("Added group %s", grp[0].name)
641 }
642 // Now find any grouping without 2 members and assign one of the
643 // pods with no devices and on a different server to it.
644 // If there are no pods with no devices left before all
645 // groups are filled report an exception but leave one of the
646 // groups with only one pod.
647 for k,_ := range rtrn {
648 if len(rtrn[k]) < 2 {
649 for k2,_ := range pd {
650 if pd[k2].dn == true {
651 continue
652 }
653 // There should be only empty pods here
654 if len(pd[k2].pod.devIds) != 0 {
655 log.Error("Non empty pod found where empty pod was expected")
656 continue
657 }
658 if pd[k2].pod.node == rtrn[k][0].node {
659 //log.Error("Pods aren't on different servers, continuing")
660 continue
661 }
662 // Add this empty and unused pod to the group
663 //log.Debugf("Adding empty pod %s", pd[k2].pod.name)
664 rtrn[k] = append(rtrn[k], pd[k2].pod)
665 pd[k2].dn = true
666 break
667 }
668 }
669 }
670 return rtrn
671}
672
673func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
674 for k,_ := range d1 {
675 if _,ok := d2[k]; ok == true {
676 return true
677 }
678 }
679 return false
680}
681
682func setConnection(client pb.ConfigurationClient, backend string, connection string, addr string, port uint64) {
683 log.Debugf("Configuring backend %s : connection %s\n\n", backend, connection)
684 cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:backend,
685 Connection:connection,Addr:addr,
686 Port:port}
687 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
688 log.Debugf("failed SetConnection RPC call: %s", err)
689 } else {
690 log.Debugf("Result: %v", res)
691 }
692}
693
694func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
695 log.Debugf("Configuring backend %s : affinities \n", backend)
696 aff := &pb.Affinity{Router:"vcore",Route:"dev_manager",Cluster:"vcore",Backend:backend}
697 for k,_ := range ids {
698 log.Debugf("Setting affinity for id %s", k)
699 aff.Id = k
700 if res, err := client.SetAffinity(context.Background(), aff); err != nil {
701 log.Debugf("failed affinity RPC call: %s", err)
702 } else {
703 log.Debugf("Result: %v", res)
704 }
705 }
706}
707
sslobodr38afd0d2019-01-21 12:31:46 -0500708func getBackendForCore(coreId string, coreGroups [][]*rwPod) string {
sslobodr6c1689c2019-01-24 07:31:15 -0500709 for _,v := range coreGroups {
710 for _,v2 := range v {
sslobodr38afd0d2019-01-21 12:31:46 -0500711 if v2.name == coreId {
712 return v2.backend
713 }
714 }
715 }
716 log.Errorf("No backend found for core %s\n", coreId)
717 return ""
718}
719
sslobodr16e41bc2019-01-18 16:22:21 -0500720func monitorDiscovery(client pb.ConfigurationClient,
sslobodr38afd0d2019-01-21 12:31:46 -0500721 ch <-chan *ic.InterContainerMessage,
722 coreGroups [][]*rwPod) {
723 var id map[string]struct{} = make(map[string]struct{})
724
sslobodr16e41bc2019-01-18 16:22:21 -0500725 select {
726 case msg := <-ch:
727 log.Debugf("Received a device discovery notification")
sslobodr38afd0d2019-01-21 12:31:46 -0500728 device := &ic.DeviceDiscovered{}
729 if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
sslobodr16e41bc2019-01-18 16:22:21 -0500730 log.Errorf("Could not unmarshal received notification %v", msg)
731 } else {
sslobodr38afd0d2019-01-21 12:31:46 -0500732 // Set the affinity of the discovered device.
733 if be := getBackendForCore(device.Id, coreGroups); be != "" {
734 id[device.Id]=struct{}{}
735 setAffinity(client, id, be)
736 } else {
737 log.Error("Cant use an empty string as a backend name")
738 }
sslobodr16e41bc2019-01-18 16:22:21 -0500739 }
740 break
741 }
742}
743
sslobodr38afd0d2019-01-21 12:31:46 -0500744func startDiscoveryMonitor(client pb.ConfigurationClient,
745 coreGroups [][]*rwPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500746 var ch <-chan *ic.InterContainerMessage
747 // Connect to kafka for discovery events
748 topic := &kafka.Topic{Name: "AffinityRouter"}
749 kc,err := newKafkaClient("sarama", "kafka", 9092, "arouterd")
750 kc.Start()
751
752 if ch, err = kc.Subscribe(topic); err != nil {
753 log.Error("Could not subscribe to the 'AffinityRouter' channel, discovery disabled")
754 return err
755 }
sslobodr38afd0d2019-01-21 12:31:46 -0500756 go monitorDiscovery(client, ch, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500757 return nil
758}
759
sslobodre7ce71d2019-01-22 16:21:45 -0500760// Determines which items in core groups
761// have changed based on the list provided
762// and returns a coreGroup with only the changed
763// items and a pod list with the new items
764func getAddrDiffs(coreGroups [][]*rwPod, rwPods []*rwPod) ([][]*rwPod, []*rwPod) {
765 var nList []*rwPod
766 var rtrn [][]*rwPod = make([][]*rwPod, nPods>>1)
767 var ipAddrs map[string]struct{} = make(map[string]struct{})
768
769 log.Debug("Get addr diffs")
770
771 // Start with an empty array
sslobodr6c1689c2019-01-24 07:31:15 -0500772 for k,_ := range rtrn {
sslobodre7ce71d2019-01-22 16:21:45 -0500773 rtrn[k] = make([]*rwPod, 2)
774 }
775
776 // Build a list with only the new items
sslobodr6c1689c2019-01-24 07:31:15 -0500777 for _,v := range rwPods {
sslobodre7ce71d2019-01-22 16:21:45 -0500778 if hasIpAddr(coreGroups, v.ipAddr) == false {
779 nList = append(nList, v)
780 }
781 ipAddrs[v.ipAddr] = struct{}{} // for the search below
782 }
783
784 // Now build the coreGroups with only the changed items
sslobodr6c1689c2019-01-24 07:31:15 -0500785 for k1,v1 := range coreGroups {
786 for k2,v2 := range v1 {
sslobodre7ce71d2019-01-22 16:21:45 -0500787 if _,ok := ipAddrs[v2.ipAddr]; ok == false {
788 rtrn[k1][k2] = v2
789 }
790 }
791 }
792 return rtrn, nList
793}
794
795// Figure out where best to put the new pods
796// in the coreGroup array based on the old
797// pods being replaced. The criteria is that
798// the new pod be on the same server as the
799// old pod was.
800func reconcileAddrDiffs(coreGroupDiffs [][]*rwPod, rwPodDiffs []*rwPod) ([][]*rwPod) {
801 var srvrs map[string][]*rwPod = make(map[string][]*rwPod)
802
803 log.Debug("Reconciling diffs")
804 log.Debug("Building server list")
sslobodr6c1689c2019-01-24 07:31:15 -0500805 for _,v := range rwPodDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500806 log.Debugf("Adding %v to the server list", *v)
807 srvrs[v.node] = append(srvrs[v.node], v)
808 }
809
sslobodr6c1689c2019-01-24 07:31:15 -0500810 for k1,v1 := range coreGroupDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500811 log.Debugf("k1:%v, v1:%v", k1,v1)
sslobodr6c1689c2019-01-24 07:31:15 -0500812 for k2,v2 := range v1 {
sslobodre7ce71d2019-01-22 16:21:45 -0500813 log.Debugf("k2:%v, v2:%v", k2,v2)
814 if v2 == nil { // Nothing to do here
815 continue
816 }
817 if _,ok := srvrs[v2.node]; ok == true {
818 coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
819 if len(srvrs[v2.node]) > 1 { // remove one entry from the list
820 srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
821 } else { // Delete the endtry from the map
822 delete(srvrs, v2.node)
823 }
824 } else {
825 log.Error("This should never happen, node appears to have changed names")
826 // attempt to limp along by keeping this old entry
827 }
828 }
829 }
830
831 return coreGroupDiffs
832}
833
834func applyAddrDiffs(client pb.ConfigurationClient, coreGroups [][]*rwPod, rwPods []*rwPod) {
835 var newEntries [][]*rwPod
836
837 log.Debug("Applying diffs")
838 newEntries = reconcileAddrDiffs(getAddrDiffs(coreGroups, rwPods))
839
840 // Now replace the information in coreGropus with the new
841 // entries and then reconcile the device ids on the core
842 // that's in the new entry with the device ids of it's
843 // active-active peer.
sslobodr6c1689c2019-01-24 07:31:15 -0500844 for k1,v1 := range coreGroups {
845 for k2,v2 := range v1 {
sslobodre7ce71d2019-01-22 16:21:45 -0500846 if newEntries[k1][k2] != nil {
847 // TODO: Missing is the case where bothe the primary
848 // and the secondary core crash and come back.
849 // Pull the device ids from the active-active peer
850 ids := queryPodDeviceIds(coreGroups[k1][k2^1])
851 if len(ids) != 0 {
852 if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
853 log.Errorf("Attempt to reconcile ids on pod %v failed",newEntries[k1][k2])
854 }
855 }
856 // Send the affininty router new connection information
857 setConnection(client, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
858 // Copy the new entry information over
859 coreGroups[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
860 coreGroups[k1][k2].name = newEntries[k1][k2].name
861 coreGroups[k1][k2].devIds = ids
862 }
863 }
864 }
865}
866
sslobodr16e41bc2019-01-18 16:22:21 -0500867func startCoreMonitor(client pb.ConfigurationClient,
868 clientset *kubernetes.Clientset,
869 coreFltr *regexp.Regexp,
870 coreGroups [][]*rwPod) error {
871 // Now that initial allocation has been completed, monitor the pods
872 // for IP changes
873 // The main loop needs to do the following:
874 // 1) Periodically query the pods and filter out
875 // the vcore ones
876 // 2) Validate that the pods running are the same
877 // as the previous check
878 // 3) Validate that the IP addresses are the same
879 // as the last check.
880 // If the pod name(s) ha(s/ve) changed then remove
881 // the unused pod names and add in the new pod names
882 // maintaining the cluster/backend information.
883 // If an IP address has changed (which shouldn't
884 // happen unless a pod is re-started) it should get
885 // caught by the pod name change.
886 for {
887 time.Sleep(10 * time.Second) // Wait a while
888 // Get the rw core list from k8s
889 rwPods := getRwPods(clientset, coreFltr)
sslobodre7ce71d2019-01-22 16:21:45 -0500890 queryDeviceIds(rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500891 // If we didn't get 2n+1 pods then wait since
892 // something is down and will hopefully come
893 // back up at some point.
894 // TODO: remove the 6 pod hardcoding
895 if len(rwPods) != 6 {
896 continue
897 }
898 // We have all pods, check if any IP addresses
899 // have changed.
900 for _,v := range rwPods {
sslobodre7ce71d2019-01-22 16:21:45 -0500901 if hasIpAddr(coreGroups, v.ipAddr) == false {
902 log.Debug("Address has changed...")
903 applyAddrDiffs(client, coreGroups, rwPods)
904
905 }
sslobodr16e41bc2019-01-18 16:22:21 -0500906 }
907 }
sslobodr16e41bc2019-01-18 16:22:21 -0500908}
909
sslobodre7ce71d2019-01-22 16:21:45 -0500910func hasIpAddr(coreGroups [][]*rwPod, ipAddr string) bool {
sslobodr6c1689c2019-01-24 07:31:15 -0500911 for _,v1 := range coreGroups {
912 for _,v2 := range v1 {
sslobodre7ce71d2019-01-22 16:21:45 -0500913 if v2.ipAddr == ipAddr {
914 return true
915 }
916 }
917 }
918 return false
919}
920
921
sslobodr16e41bc2019-01-18 16:22:21 -0500922func main() {
923 // This is currently hard coded to a cluster with 3 servers
924 //var connections map[string]configConn = make(map[string]configConn)
925 //var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
926 var rwCoreNodesPrev map[string][]rwPod = make(map[string][]rwPod)
927 var firstTime bool = true
928 var err error
929 var conn *grpc.ClientConn
930
931
932 // Set up the regular expression to identify the voltha cores
933 coreFltr := regexp.MustCompile(`rw-core[0-9]-`)
934
935 // Set up logging
936 if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
937 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
938 }
939
940 // Set up kubernetes api
941 clientset := k8sClientSet()
942
943 // Connect to the affinity router and set up the client
944 conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
945 if err != nil {
946 panic(err.Error())
947 }
948 client := pb.NewConfigurationClient(conn)
949
950 // Get the voltha rw-core podes
951 rwPods := getRwPods(clientset, coreFltr)
952
953 // Fetch the devices held by each running core
sslobodre7ce71d2019-01-22 16:21:45 -0500954 queryDeviceIds(rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500955
956 // For debugging... comment out l8r
957 for _,v := range rwPods {
958 log.Debugf("Pod list %v", *v)
959 }
960
961 coreGroups := groupPods1(rwPods)
962
963
964 // Assign the groupings to the the backends and connections
965 for k,_ := range coreGroups {
966 for k1,_ := range coreGroups[k] {
967 coreGroups[k][k1].backend = "vcore"+strconv.Itoa(k+1)
968 coreGroups[k][k1].connection = "vcore"+strconv.Itoa(k+1)+strconv.Itoa(k1+1)
969 }
970 }
971 log.Debug("Core gouping completed")
972
973 // TODO: Debugging code, comment out for production
974 for k,v := range coreGroups {
975 for k2,v2 := range v {
976 log.Debugf("Core group %d,%d: %v", k, k2, v2)
977 }
978 }
979 log.Debug("Setting affinities")
980 // Now set the affinities for exising devices in the cores
981 for _,v := range coreGroups {
982 setAffinity(client, v[0].devIds, v[0].backend)
983 setAffinity(client, v[1].devIds, v[1].backend)
984 }
985 log.Debug("Setting connections")
986 // Configure the backeds based on the calculated core groups
987 for _,v := range coreGroups {
988 setConnection(client, v[0].backend, v[0].connection, v[0].ipAddr, 50057)
989 setConnection(client, v[1].backend, v[1].connection, v[1].ipAddr, 50057)
990 }
991
992 log.Debug("Starting discovery monitoring")
sslobodr38afd0d2019-01-21 12:31:46 -0500993 startDiscoveryMonitor(client, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500994
995 log.Debugf("Starting core monitoring")
996 startCoreMonitor(client, clientset, coreFltr, coreGroups) // Never returns
997 return
998
999
1000 // The main loop needs to do the following:
1001 // 1) Periodically query the pods and filter out
1002 // the vcore ones
1003 // 2) Validate that the pods running are the same
1004 // as the previous check
1005 // 3) Validate that the IP addresses are the same
1006 // as the last check.
1007 // If the pod name(s) ha(s/ve) changed then remove
1008 // the unused pod names and add in the new pod names
1009 // maintaining the cluster/backend information.
1010 // If an IP address has changed (which shouldn't
1011 // happen unless a pod is re-started) it should get
1012 // caught by the pod name change.
1013 for {
1014 var rwCorePods map[string]rwPod = make(map[string]rwPod)
1015 var rwCoreNodes map[string][]rwPod = make(map[string][]rwPod)
1016 pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
1017 if err != nil {
1018 panic(err.Error())
1019 }
1020 log.Debugf("There are %d pods in the cluster\n", len(pods.Items))
1021
1022 /*
1023 for k,v := range pods.Items {
1024 if v.Namespace == "voltha" && coreFltr.MatchString(v.Name) {
1025 fmt.Printf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
1026 v.Status.PodIP, v.Spec.NodeName)
1027 //fmt.Printf("Pod %v,%v\n\n\n",k,v)
1028 _ = k
1029 // Add this pod to the core structure.
1030 if firstTime == true {
1031 rwCorePodsPrev[v.Name] = rwPod{name:v.Name,node:v.Spec.NodeName}
1032 rwCoreNodesPrev[v.Spec.NodeName] =
1033 append(rwCoreNodesPrev[v.Spec.NodeName], rwPod{name:v.Name,node:v.Spec.NodeName})
1034 }
1035 rwCorePods[v.Name] = rwPod{v.Name,v.Status.PodIP,v.Spec.NodeName, "", ""}
1036 rwCoreNodes[v.Spec.NodeName] =
1037 append(rwCoreNodes[v.Spec.NodeName], rwPod{v.Name,v.Status.PodIP,v.Spec.NodeName,"",""})
1038 }
1039 }
1040 */
1041
1042 if len(rwCorePods) != 6 {
1043 continue
1044 }
1045
1046 //fmt.Printf("Pod map: %v\n", rwCorePods)
1047 //fmt.Printf("Pod map2: %v\n", rwCoreNodes)
1048
1049 // Examples for error handling:
1050 // - Use helper functions like e.g. errors.IsNotFound()
1051 // - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
1052 /*
1053 _, err = clientset.CoreV1().Pods("default").Get("example-xxxxx", metav1.GetOptions{})
1054 if errors.IsNotFound(err) {
1055 fmt.Printf("Pod not found\n")
1056 } else if statusError, isStatus := err.(*errors.StatusError); isStatus {
1057 fmt.Printf("Error getting pod %v\n", statusError.ErrStatus.Message)
1058 } else if err != nil {
1059 panic(err.Error())
1060 } else {
1061 fmt.Printf("Found pod\n")
1062 }
1063 */
1064 // Set the association to backends and connections only once.
1065 // TODO: This needs to be reworked for when a pod crashes
1066 // and it's name changes.
1067 if firstTime == true {
1068 be := 1
1069 for k,_ := range rwCoreNodesPrev { // Each node has 2 cores running on it
1070 // Use a pretty dumb distribution algorithm.
1071 log.Debugf("Processing core node %s:%d\n", k,be)
1072 rwCoreNodesPrev[k][0].backend = "vcore"+strconv.Itoa(be)
1073 rwCoreNodesPrev[k][0].connection = "vcore"+strconv.Itoa(be)+strconv.Itoa(1)
1074 rwCoreNodesPrev[k][1].backend = "vcore"+strconv.Itoa(be%3+1)
1075 rwCoreNodesPrev[k][1].connection = "vcore"+strconv.Itoa(be%3+1)+strconv.Itoa(2)
1076 be++
1077 }
1078 }
1079
1080 log.Debugf("Backend Allocation: %v",rwCoreNodesPrev)
1081 // Compare the current node IPs with the previous node IPs and if they differ
1082 // then set the new one and send the command to configure the router with the
1083 // new backend connection.
1084 for k,v := range rwCoreNodesPrev {
1085 if rwCoreNodes[k][0].ipAddr != rwCoreNodesPrev[k][0].ipAddr {
1086 log.Debugf("Configuring backend %s : connection %s\n\n", v[0].backend, v[0].connection)
1087 cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:rwCoreNodesPrev[k][0].backend,
1088 Connection:rwCoreNodesPrev[k][0].connection,Addr:rwCoreNodes[k][0].ipAddr,
1089 Port:50057}
1090 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
1091 log.Debugf("failed SetConnection RPC call: %s", err)
1092 } else {
1093 log.Debugf("Result: %v", res)
1094 rwCoreNodesPrev[k][0].ipAddr = rwCoreNodes[k][0].ipAddr
1095 }
1096 }
1097 if rwCoreNodes[k][1].ipAddr != rwCoreNodesPrev[k][1].ipAddr {
1098 log.Debugf("Configuring backend %s : connection %s\n\n", v[1].backend, v[1].connection)
1099 cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:rwCoreNodesPrev[k][1].backend,
1100 Connection:rwCoreNodesPrev[k][1].connection,Addr:rwCoreNodes[k][1].ipAddr,
1101 Port:50057}
1102 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
1103 log.Debugf("failed SetConnection RPC call: %s", err)
1104 } else {
1105 log.Debugf("Result: %v", res)
1106 rwCoreNodesPrev[k][1].ipAddr = rwCoreNodes[k][1].ipAddr
1107 }
1108 }
1109 }
1110
1111
1112 fmt.Printf("The structure for setting the connections is: %v\n", rwCoreNodesPrev)
1113 firstTime = false
1114
1115 // Now make the API calls
1116 time.Sleep(10 * time.Second)
1117 }
1118 conn.Close()
1119
1120}
1121