blob: fe9a3528ab7beeab8cc38ccd534d88c4cfeaf384 [file] [log] [blame]
sslobodr16e41bc2019-01-18 16:22:21 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package main
18
19import (
20 //"os"
21 "fmt"
22 "time"
23 "regexp"
24 "errors"
25 "strconv"
26 //"io/ioutil"
27 //"encoding/json"
28
29 "k8s.io/client-go/rest"
30 "google.golang.org/grpc"
31 "golang.org/x/net/context"
32 "k8s.io/client-go/kubernetes"
33 "github.com/golang/protobuf/ptypes"
34 //"k8s.io/apimachinery/pkg/api/errors"
35 "github.com/opencord/voltha-go/common/log"
36 kafka "github.com/opencord/voltha-go/kafka"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 empty "github.com/golang/protobuf/ptypes/empty"
39 vpb "github.com/opencord/voltha-go/protos/voltha"
sslobodre7ce71d2019-01-22 16:21:45 -050040 cmn "github.com/opencord/voltha-go/protos/common"
sslobodr16e41bc2019-01-18 16:22:21 -050041 pb "github.com/opencord/voltha-go/protos/afrouter"
42 ic "github.com/opencord/voltha-go/protos/inter_container"
43)
44
45type configConn struct {
46 Server string `json:"Server"`
47 Cluster string `json:"Cluster"`
48 Backend string `json:"Backend"`
49 connections map[string]connection
50}
51
52type connection struct {
53 Name string `json:"Connection"`
54 Addr string `json:"Addr"`
55 Port uint64 `json:"Port"`
56}
57
58type rwPod struct {
59 name string
60 ipAddr string
61 node string
62 devIds map[string]struct{}
63 backend string
64 connection string
65}
66
67type podTrack struct {
68 pod *rwPod
69 dn bool
70}
71
sslobodre7ce71d2019-01-22 16:21:45 -050072var nPods int = 6
73
sslobodr16e41bc2019-01-18 16:22:21 -050074// Topic is affinityRouter
75// port: 9092
76
77func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
78
79 log.Infow("kafka-client-type", log.Fields{"client": clientType})
80 switch clientType {
81 case "sarama":
82 return kafka.NewSaramaClient(
83 kafka.Host(host),
84 kafka.Port(port),
85 kafka.ConsumerType(kafka.GroupCustomer),
86 kafka.ProducerReturnOnErrors(true),
87 kafka.ProducerReturnOnSuccess(true),
88 kafka.ProducerMaxRetries(6),
89 kafka.NumPartitions(3),
90 kafka.ConsumerGroupName(instanceID),
91 kafka.ConsumerGroupPrefix(instanceID),
92 kafka.AutoCreateTopic(false),
93 kafka.ProducerFlushFrequency(5),
94 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
95 }
96 return nil, errors.New("unsupported-client-type")
97}
98
99
100func k8sClientSet() *kubernetes.Clientset {
101 // creates the in-cluster config
102 config, err := rest.InClusterConfig()
103 if err != nil {
104 panic(err.Error())
105 }
106 // creates the clientset
107 clientset, err := kubernetes.NewForConfig(config)
108 if err != nil {
109 panic(err.Error())
110 }
111
112 return clientset
113}
114
115
116func connect(addr string) (*grpc.ClientConn, error) {
117 for ctr :=0 ; ctr < 100; ctr++ {
sslobodre7ce71d2019-01-22 16:21:45 -0500118 log.Debugf("Trying to connect to %s", addr)
sslobodr16e41bc2019-01-18 16:22:21 -0500119 conn, err := grpc.Dial(addr, grpc.WithInsecure())
120 if err != nil {
121 log.Debugf("Attempt to connect failed, retrying %v:", err)
122 } else {
123 log.Debugf("Connection succeeded")
124 return conn,err
125 }
126 time.Sleep(10 * time.Second)
127 }
128 log.Debugf("Too many connection attempts, giving up!")
129 return nil,errors.New("Timeout attempting to conect")
130}
131
132func getRwPods(cs *kubernetes.Clientset, coreFilter * regexp.Regexp) []*rwPod {
133 var rtrn []*rwPod
134
135 pods, err := cs.CoreV1().Pods("").List(metav1.ListOptions{})
136 if err != nil {
137 panic(err.Error())
138 }
sslobodre7ce71d2019-01-22 16:21:45 -0500139 //log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
sslobodr16e41bc2019-01-18 16:22:21 -0500140
sslobodre7ce71d2019-01-22 16:21:45 -0500141 for _,v := range pods.Items {
sslobodr16e41bc2019-01-18 16:22:21 -0500142 if v.Namespace == "voltha" && coreFilter.MatchString(v.Name) {
sslobodre7ce71d2019-01-22 16:21:45 -0500143 log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
sslobodr16e41bc2019-01-18 16:22:21 -0500144 v.Status.PodIP, v.Spec.NodeName)
sslobodre7ce71d2019-01-22 16:21:45 -0500145 // Only add the pod if it has an IP address. If it doesn't then it likely crashed and
146 // and is still in the process of getting re-started.
147 if v.Status.PodIP != "" {
148 rtrn = append(rtrn, &rwPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
149 devIds:make(map[string]struct{}), backend:"", connection:""})
150 }
sslobodr16e41bc2019-01-18 16:22:21 -0500151 }
152 }
153 return rtrn
154}
155
sslobodre7ce71d2019-01-22 16:21:45 -0500156func reconcilePodDeviceIds(pod * rwPod, ids map[string]struct{}) bool {
157 var idList cmn.IDs
158 for k,_ := range(ids) {
159 idList.Items = append(idList.Items, &cmn.ID{Id:k})
160 }
161 conn,err := connect(pod.ipAddr+":50057")
162 defer conn.Close()
163 if (err != nil) {
164 log.Debugf("Could not query devices from %s, could not connect", pod.name)
165 return false
166 }
167 client := vpb.NewVolthaServiceClient(conn)
168 _,err = client.ReconcileDevices(context.Background(), &idList)
169 if err != nil {
170 log.Error(err)
171 return false
172 }
173
174 return true
175}
176
177func queryPodDeviceIds(pod * rwPod) map[string]struct{} {
178 var rtrn map[string]struct{} = make(map[string]struct{})
179 // Open a connection to the pod
180 // port 50057
181 conn, err := connect(pod.ipAddr+":50057")
182 if (err != nil) {
183 log.Debugf("Could not query devices from %s, could not connect", pod.name)
184 return rtrn
185 }
186 defer conn.Close()
187 client := vpb.NewVolthaServiceClient(conn)
188 devs,err := client.ListDeviceIds(context.Background(), &empty.Empty{})
189 if err != nil {
190 log.Error(err)
191 return rtrn
192 }
193 for _,dv := range devs.Items {
194 rtrn[dv.Id]=struct{}{}
195 }
196
197 return rtrn
198}
199
200func queryDeviceIds(pods []*rwPod) {
201 for pk,_ := range pods {
202 // Keep the old Id list if a new list is not returned
203 if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
204 pods[pk].devIds = idList
sslobodr16e41bc2019-01-18 16:22:21 -0500205 }
sslobodr16e41bc2019-01-18 16:22:21 -0500206 }
207}
208
209func allEmpty(pods []*rwPod) bool {
210 for k,_ := range pods {
211 if len(pods[k].devIds) != 0 {
212 return false
213 }
214 }
215 return true
216}
217
218//func groupEmptyCores(pods []*rwPod) [][]*rwPod {
219// return [][]*rwPod{}
220//}
221
222//func groupPods(pods []*rwPod) [][]*rwPod {
223
224// if allEmpty(pods) == true {
225// return groupEmptyCores(pods)
226// } else {
227// return groupPopulatedCores(pods)
228// }
229//}
230
231func rmPod(pods []*rwPod, idx int) []*rwPod {
232 return append(pods[:idx],pods[idx+1:]...)
233}
234
235func groupIntersectingPods1(pods []*rwPod, podCt int) ([][]*rwPod,[]*rwPod) {
236 var rtrn [][]*rwPod
237 var out []*rwPod
238
239 for {
240 if len(pods) == 0 {
241 break
242 }
243 if len(pods[0].devIds) == 0 { // Ignore pods with no devices
244 ////log.Debugf("%s empty pod", pd[k].pod.name)
245 out = append(out, pods[0])
246 pods = rmPod(pods, 0)
247 continue
248 }
249 // Start a pod group with this pod
250 var grp []*rwPod
251 grp = append(grp, pods[0])
252 pods = rmPod(pods,0)
253 //log.Debugf("Creating new group %s", pd[k].pod.name)
254 // Find the peer pod based on device overlap
255 // It's ok if one isn't found, an empty one will be used instead
256 for k,_ := range pods {
257 if len(pods[k].devIds) == 0 { // Skip pods with no devices
258 //log.Debugf("%s empty pod", pd[k1].pod.name)
259 continue
260 }
261 if intersect(grp[0].devIds, pods[k].devIds) == true {
262 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
263 if grp[0].node == pods[k].node {
264 // This should never happen
265 log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
266 grp[0].name, pods[k].name)
267 continue
268 }
269 grp = append(grp, pods[k])
270 pods = rmPod(pods, k)
271 break
272
273 }
274 }
275 rtrn = append(rtrn, grp)
276 //log.Debugf("Added group %s", grp[0].name)
277 // Check if the number of groups = half the pods, if so all groups are started.
278 if len(rtrn) == podCt >> 1 {
279 // Append any remaining pods to out
280 out = append(out,pods[0:]...)
281 break
282 }
283 }
284 return rtrn,out
285}
286
287func groupIntersectingPods(pd []*podTrack) ([][]*rwPod,[]*podTrack) {
288 var rtrn [][]*rwPod
289
290 for k,_ := range pd {
291 if pd[k].dn == true { // Already processed?
292 //log.Debugf("%s already processed", pd[k].pod.name)
293 continue
294 }
295 if len(pd[k].pod.devIds) == 0 { // Ignore pods with no devices
296 ////log.Debugf("%s empty pod", pd[k].pod.name)
297 continue
298 }
299 // Start a pod group with this pod
300 var grp []*rwPod
301 grp = append(grp, pd[k].pod)
302 pd[k].dn = true
303 //log.Debugf("Creating new group %s", pd[k].pod.name)
304 // Find the peer pod based on device overlap
305 // It's ok if one isn't found, an empty one will be used instead
306 for k1,_ := range pd {
307 if pd[k1].dn == true { // Skip over eliminated pods
308 //log.Debugf("%s eliminated pod", pd[k1].pod.name)
309 continue
310 }
311 if len(pd[k1].pod.devIds) == 0 { // Skip pods with no devices
312 //log.Debugf("%s empty pod", pd[k1].pod.name)
313 continue
314 }
315 if intersect(pd[k].pod.devIds, pd[k1].pod.devIds) == true {
316 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
317 if pd[k].pod.node == pd[k1].pod.node {
318 // This should never happen
319 log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
320 pd[k].pod.name, pd[k1].pod.name)
321 continue
322 }
323 pd[k1].dn = true
324 grp = append(grp, pd[k1].pod)
325 break
326 }
327 }
328 rtrn = append(rtrn, grp)
329 //log.Debugf("Added group %s", grp[0].name)
330 // Check if the number of groups = half the pods, if so all groups are started.
331 if len(rtrn) == len(pd) >> 1 {
332 break
333 }
334 }
335 return rtrn,pd
336}
337
338func unallocPodCount(pd []*podTrack) int {
339 var rtrn int = 0
340 for _,v := range pd {
341 if v.dn == false {
342 rtrn++
343 }
344 }
345 return rtrn
346}
347
348
349func sameNode(pod *rwPod, grps [][]*rwPod) bool {
350 for _,v := range grps {
351 if v[0].node == pod.node {
352 return true
353 }
354 if len(v) == 2 && v[1].node == pod.node {
355 return true
356 }
357 }
358 return false
359}
360
361func startRemainingGroups1(grps [][]*rwPod, pods []*rwPod, podCt int) ([][]*rwPod, []*rwPod) {
362 var grp []*rwPod
363
364 for k,_ := range pods {
365 if sameNode(pods[k], grps) {
366 continue
367 }
368 grp = []*rwPod{}
369 grp = append(grp, pods[k])
370 pods = rmPod(pods, k)
371 grps = append(grps, grp)
372 if len(grps) == podCt >> 1 {
373 break
374 }
375 }
376 return grps, pods
377}
378
379func startRemainingGroups(grps [][]*rwPod, pd []*podTrack) ([][]*rwPod, []*podTrack) {
380 var grp []*rwPod
381
382 for k,_ := range pd {
383 if sameNode(pd[k].pod, grps) == true {
384 continue
385 }
386 grp = append(grp, pd[k].pod)
387 grps = append(grps, grp)
388 pd[k].dn = true
389 if len(grps) == len(pd) >> 1 {
390 break
391 }
392 }
393 return grps, pd
394}
395
396func hasSingleSecondNode(grp []*rwPod) bool {
397 var srvrs map[string]struct{} = make(map[string]struct{})
398 for k,_ := range grp {
399 if k == 0 {
400 continue // Ignore the first item
401 }
402 srvrs[grp[k].node] = struct{}{}
403 }
404 if len(srvrs) == 1 {
405 return true
406 }
407 return false
408}
409
410func addNode(grps [][]*rwPod, idx *rwPod, item *rwPod) [][]*rwPod {
411 for k,_ := range grps {
412 if grps[k][0].name == idx.name {
413 grps[k] = append(grps[k], item)
414 return grps
415 }
416 }
417 // TODO: Error checking required here.
418 return grps
419}
420
421func removeNode(grps [][]*rwPod, item *rwPod) [][]*rwPod {
422 for k,_ := range grps {
423 for k1,_ := range grps[k] {
424 if grps[k][k1].name == item.name {
425 grps[k] = append(grps[k][:k1],grps[k][k1+1:]...)
426 break
427 }
428 }
429 }
430 return grps
431}
432
433func groupRemainingPods1(grps [][]*rwPod, pods []*rwPod) [][]*rwPod {
434 var lgrps [][]*rwPod
435 // All groups must be started when this function is called.
436 // Copy incomplete groups
437 for k,_ := range grps {
438 if len(grps[k]) != 2 {
439 lgrps = append(lgrps, grps[k])
440 }
441 }
442
443 // Add all pairing candidates to each started group.
444 for k,_ := range pods {
445 for k2,_ := range lgrps {
446 if lgrps[k2][0].node != pods[k].node {
447 lgrps[k2] = append(lgrps[k2], pods[k])
448 }
449 }
450 }
451
452 //TODO: If any member of lgrps doesn't have at least 2
453 // nodes something is wrong. Check for that here
454
455 for {
456 for { // Address groups with only a single server choice
457 var ssn bool = false
458
459 for k,_ := range lgrps {
460 // Now if any of the groups only have a single
461 // node as the choice for the second member
462 // address that one first.
463 if hasSingleSecondNode(lgrps[k]) == true {
464 ssn = true
465 // Add this pairing to the groups
466 grps = addNode(grps, lgrps[k][0], lgrps[k][1])
467 // Since this node is now used, remove it from all
468 // remaining tenative groups
469 lgrps = removeNode(lgrps, lgrps[k][1])
470 // Now remove this group completely since
471 // it's been addressed
472 lgrps = append(lgrps[:k],lgrps[k+1:]...)
473 break
474 }
475 }
476 if ssn == false {
477 break
478 }
479 }
480 // Now adress one of the remaining groups
481 if len(lgrps) == 0 {
482 break // Nothing left to do, exit the loop
483 }
484 grps = addNode(grps, lgrps[0][0], lgrps[0][1])
485 lgrps = removeNode(lgrps, lgrps[0][1])
486 lgrps = append(lgrps[:0],lgrps[1:]...)
487 }
488 return grps
489}
490
491func groupRemainingPods(grps [][]*rwPod, pd []*podTrack) [][]*rwPod{
492 var lgrps [][]*rwPod
493 // All groups must be started when this function is called.
494 // Copy incomplete groups
495 for k,_ := range grps {
496 if len(grps[k]) != 2 {
497 lgrps = append(lgrps, grps[k])
498 }
499 }
500
501 // Add all pairing candidates to each started group.
502 for k,_ := range pd {
503 if pd[k].dn == true {
504 continue
505 }
506 for k2,_ := range lgrps {
507 if lgrps[k2][0].node != pd[k].pod.node {
508 lgrps[k2] = append(lgrps[k2], pd[k].pod)
509 }
510 }
511 }
512
513 //TODO: If any member of lgrps doesn't have at least 2
514 // nodes something is wrong. Check for that here
515
516 for {
517 for { // Address groups with only a single server choice
518 var ssn bool = false
519
520 for k,_ := range lgrps {
521 // Now if any of the groups only have a single
522 // node as the choice for the second member
523 // address that one first.
524 if hasSingleSecondNode(lgrps[k]) == true {
525 ssn = true
526 // Add this pairing to the groups
527 grps = addNode(grps, lgrps[k][0], lgrps[k][1])
528 // Since this node is now used, remove it from all
529 // remaining tenative groups
530 lgrps = removeNode(lgrps, lgrps[k][1])
531 // Now remove this group completely since
532 // it's been addressed
533 lgrps = append(lgrps[:k],lgrps[k+1:]...)
534 break
535 }
536 }
537 if ssn == false {
538 break
539 }
540 }
541 // Now adress one of the remaining groups
542 if len(lgrps) == 0 {
543 break // Nothing left to do, exit the loop
544 }
545 grps = addNode(grps, lgrps[0][0], lgrps[0][1])
546 lgrps = removeNode(lgrps, lgrps[0][1])
547 lgrps = append(lgrps[:0],lgrps[1:]...)
548 }
549 return grps
550}
551
552func groupPods1(pods []*rwPod) [][]*rwPod {
553 var rtrn [][]*rwPod
554 var podCt int = len(pods)
555
556 rtrn,pods = groupIntersectingPods1(pods, podCt)
557 // There are several outcomes here
558 // 1) All pods have been paired and we're done
559 // 2) Some un-allocated pods remain
560 // 2.a) All groups have been started
561 // 2.b) Not all groups have been started
562 if len(pods) == 0 {
563 return rtrn
564 } else if len(rtrn) == podCt >> 1 { // All groupings started
565 // Allocate the remaining (presumably empty) pods to the started groups
566 return groupRemainingPods1(rtrn, pods)
567 } else { // Some groupings started
568 // Start empty groups with remaining pods
569 // each grouping is on a different server then
570 // allocate remaining pods.
571 rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
572 return groupRemainingPods1(rtrn, pods)
573 }
574}
575
576func groupPods(pods []*rwPod) [][]*rwPod {
577 var rtrn [][]*rwPod
578 var pd []*podTrack
579
580 // Tracking of the grouping process
581 for k,_ := range pods {
582 pd = append(pd, &podTrack{pods[k],false})
583 }
584
585
586 rtrn,pd = groupIntersectingPods(pd)
587 // There are several outcomes here
588 // 1) All pods have been paired and we're done
589 // 2) Some un-allocated pods remain
590 // 2.a) All groups have been started
591 // 2.b) Not all groups have been started
592 if unallocPodCount(pd) == 0 {
593 return rtrn
594 } else if len(rtrn) == len(pd) >> 1 { // All groupings started
595 // Allocate the remaining (presumably empty) pods to the started groups
596 return groupRemainingPods(rtrn, pd)
597 } else { // Some groupings started
598 // Start empty groups with remaining pods
599 // each grouping is on a different server then
600 // allocate remaining pods.
601 rtrn, pd = startRemainingGroups(rtrn, pd)
602 return groupRemainingPods(rtrn, pd)
603 }
604
605
606 // Establish groupings of non-empty pods that have overlapping devices.
607 for k,_ := range pd {
608 if pd[k].dn == true { // Already processed?
609 //log.Debugf("%s already processed", pd[k].pod.name)
610 continue
611 }
612 if len(pd[k].pod.devIds) == 0 { // Ignore pods with no devices
613 ////log.Debugf("%s empty pod", pd[k].pod.name)
614 continue
615 }
616 // Start a pod group with this pod
617 var grp []*rwPod
618 grp = append(grp, pd[k].pod)
619 pd[k].dn = true
620 //log.Debugf("Creating new group %s", pd[k].pod.name)
621 // Find the peer pod based on device overlap
622 // It's ok if one isn't found, an empty one will be used instead
623 for k1,_ := range pd {
624 if pd[k1].dn == true { // Skip over eliminated pods
625 //log.Debugf("%s eliminated pod", pd[k1].pod.name)
626 continue
627 }
628 if len(pd[k1].pod.devIds) == 0 { // Skip pods with no devices
629 //log.Debugf("%s empty pod", pd[k1].pod.name)
630 continue
631 }
632 if intersect(pd[k].pod.devIds, pd[k1].pod.devIds) == true {
633 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
634 pd[k1].dn = true
635 grp = append(grp, pd[k1].pod)
636 break
637 }
638 }
639 rtrn = append(rtrn, grp)
640 //log.Debugf("Added group %s", grp[0].name)
641 }
642 // Now find any grouping without 2 members and assign one of the
643 // pods with no devices and on a different server to it.
644 // If there are no pods with no devices left before all
645 // groups are filled report an exception but leave one of the
646 // groups with only one pod.
647 for k,_ := range rtrn {
648 if len(rtrn[k]) < 2 {
649 for k2,_ := range pd {
650 if pd[k2].dn == true {
651 continue
652 }
653 // There should be only empty pods here
654 if len(pd[k2].pod.devIds) != 0 {
655 log.Error("Non empty pod found where empty pod was expected")
656 continue
657 }
658 if pd[k2].pod.node == rtrn[k][0].node {
659 //log.Error("Pods aren't on different servers, continuing")
660 continue
661 }
662 // Add this empty and unused pod to the group
663 //log.Debugf("Adding empty pod %s", pd[k2].pod.name)
664 rtrn[k] = append(rtrn[k], pd[k2].pod)
665 pd[k2].dn = true
666 break
667 }
668 }
669 }
670 return rtrn
671}
672
673func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
674 for k,_ := range d1 {
675 if _,ok := d2[k]; ok == true {
676 return true
677 }
678 }
679 return false
680}
681
682func setConnection(client pb.ConfigurationClient, backend string, connection string, addr string, port uint64) {
683 log.Debugf("Configuring backend %s : connection %s\n\n", backend, connection)
684 cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:backend,
685 Connection:connection,Addr:addr,
686 Port:port}
687 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
688 log.Debugf("failed SetConnection RPC call: %s", err)
689 } else {
690 log.Debugf("Result: %v", res)
691 }
692}
693
694func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
695 log.Debugf("Configuring backend %s : affinities \n", backend)
696 aff := &pb.Affinity{Router:"vcore",Route:"dev_manager",Cluster:"vcore",Backend:backend}
697 for k,_ := range ids {
698 log.Debugf("Setting affinity for id %s", k)
699 aff.Id = k
700 if res, err := client.SetAffinity(context.Background(), aff); err != nil {
701 log.Debugf("failed affinity RPC call: %s", err)
702 } else {
703 log.Debugf("Result: %v", res)
704 }
705 }
706}
707
sslobodr38afd0d2019-01-21 12:31:46 -0500708func getBackendForCore(coreId string, coreGroups [][]*rwPod) string {
709 for _,v := range(coreGroups) {
710 for _,v2 := range(v) {
711 if v2.name == coreId {
712 return v2.backend
713 }
714 }
715 }
716 log.Errorf("No backend found for core %s\n", coreId)
717 return ""
718}
719
sslobodr16e41bc2019-01-18 16:22:21 -0500720func monitorDiscovery(client pb.ConfigurationClient,
sslobodr38afd0d2019-01-21 12:31:46 -0500721 ch <-chan *ic.InterContainerMessage,
722 coreGroups [][]*rwPod) {
723 var id map[string]struct{} = make(map[string]struct{})
724
sslobodr16e41bc2019-01-18 16:22:21 -0500725 select {
726 case msg := <-ch:
727 log.Debugf("Received a device discovery notification")
sslobodr38afd0d2019-01-21 12:31:46 -0500728 device := &ic.DeviceDiscovered{}
729 if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
sslobodr16e41bc2019-01-18 16:22:21 -0500730 log.Errorf("Could not unmarshal received notification %v", msg)
731 } else {
sslobodr38afd0d2019-01-21 12:31:46 -0500732 // Set the affinity of the discovered device.
733 if be := getBackendForCore(device.Id, coreGroups); be != "" {
734 id[device.Id]=struct{}{}
735 setAffinity(client, id, be)
736 } else {
737 log.Error("Cant use an empty string as a backend name")
738 }
sslobodr16e41bc2019-01-18 16:22:21 -0500739 }
740 break
741 }
742}
743
sslobodr38afd0d2019-01-21 12:31:46 -0500744func startDiscoveryMonitor(client pb.ConfigurationClient,
745 coreGroups [][]*rwPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500746 var ch <-chan *ic.InterContainerMessage
747 // Connect to kafka for discovery events
748 topic := &kafka.Topic{Name: "AffinityRouter"}
749 kc,err := newKafkaClient("sarama", "kafka", 9092, "arouterd")
750 kc.Start()
751
752 if ch, err = kc.Subscribe(topic); err != nil {
753 log.Error("Could not subscribe to the 'AffinityRouter' channel, discovery disabled")
754 return err
755 }
sslobodr38afd0d2019-01-21 12:31:46 -0500756 go monitorDiscovery(client, ch, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500757 return nil
758}
759
sslobodre7ce71d2019-01-22 16:21:45 -0500760func deepCopyCoreGroups(coreGroups [][]*rwPod) ([][]*rwPod) {
761 var rtrn [][]*rwPod
762 return rtrn
763}
764
765
766// Determines which items in core groups
767// have changed based on the list provided
768// and returns a coreGroup with only the changed
769// items and a pod list with the new items
770func getAddrDiffs(coreGroups [][]*rwPod, rwPods []*rwPod) ([][]*rwPod, []*rwPod) {
771 var nList []*rwPod
772 var rtrn [][]*rwPod = make([][]*rwPod, nPods>>1)
773 var ipAddrs map[string]struct{} = make(map[string]struct{})
774
775 log.Debug("Get addr diffs")
776
777 // Start with an empty array
778 for k,_ := range(rtrn) {
779 rtrn[k] = make([]*rwPod, 2)
780 }
781
782 // Build a list with only the new items
783 for _,v := range(rwPods) {
784 if hasIpAddr(coreGroups, v.ipAddr) == false {
785 nList = append(nList, v)
786 }
787 ipAddrs[v.ipAddr] = struct{}{} // for the search below
788 }
789
790 // Now build the coreGroups with only the changed items
791 for k1,v1 := range(coreGroups) {
792 for k2,v2 := range(v1) {
793 if _,ok := ipAddrs[v2.ipAddr]; ok == false {
794 rtrn[k1][k2] = v2
795 }
796 }
797 }
798 return rtrn, nList
799}
800
801// Figure out where best to put the new pods
802// in the coreGroup array based on the old
803// pods being replaced. The criteria is that
804// the new pod be on the same server as the
805// old pod was.
806func reconcileAddrDiffs(coreGroupDiffs [][]*rwPod, rwPodDiffs []*rwPod) ([][]*rwPod) {
807 var srvrs map[string][]*rwPod = make(map[string][]*rwPod)
808
809 log.Debug("Reconciling diffs")
810 log.Debug("Building server list")
811 for _,v := range(rwPodDiffs) {
812 log.Debugf("Adding %v to the server list", *v)
813 srvrs[v.node] = append(srvrs[v.node], v)
814 }
815
816 for k1,v1 := range(coreGroupDiffs) {
817 log.Debugf("k1:%v, v1:%v", k1,v1)
818 for k2,v2 := range(v1) {
819 log.Debugf("k2:%v, v2:%v", k2,v2)
820 if v2 == nil { // Nothing to do here
821 continue
822 }
823 if _,ok := srvrs[v2.node]; ok == true {
824 coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
825 if len(srvrs[v2.node]) > 1 { // remove one entry from the list
826 srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
827 } else { // Delete the endtry from the map
828 delete(srvrs, v2.node)
829 }
830 } else {
831 log.Error("This should never happen, node appears to have changed names")
832 // attempt to limp along by keeping this old entry
833 }
834 }
835 }
836
837 return coreGroupDiffs
838}
839
840func applyAddrDiffs(client pb.ConfigurationClient, coreGroups [][]*rwPod, rwPods []*rwPod) {
841 var newEntries [][]*rwPod
842
843 log.Debug("Applying diffs")
844 newEntries = reconcileAddrDiffs(getAddrDiffs(coreGroups, rwPods))
845
846 // Now replace the information in coreGropus with the new
847 // entries and then reconcile the device ids on the core
848 // that's in the new entry with the device ids of it's
849 // active-active peer.
850 for k1,v1 := range(coreGroups) {
851 for k2,v2 := range(v1) {
852 if newEntries[k1][k2] != nil {
853 // TODO: Missing is the case where bothe the primary
854 // and the secondary core crash and come back.
855 // Pull the device ids from the active-active peer
856 ids := queryPodDeviceIds(coreGroups[k1][k2^1])
857 if len(ids) != 0 {
858 if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
859 log.Errorf("Attempt to reconcile ids on pod %v failed",newEntries[k1][k2])
860 }
861 }
862 // Send the affininty router new connection information
863 setConnection(client, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
864 // Copy the new entry information over
865 coreGroups[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
866 coreGroups[k1][k2].name = newEntries[k1][k2].name
867 coreGroups[k1][k2].devIds = ids
868 }
869 }
870 }
871}
872
sslobodr16e41bc2019-01-18 16:22:21 -0500873func startCoreMonitor(client pb.ConfigurationClient,
874 clientset *kubernetes.Clientset,
875 coreFltr *regexp.Regexp,
876 coreGroups [][]*rwPod) error {
877 // Now that initial allocation has been completed, monitor the pods
878 // for IP changes
879 // The main loop needs to do the following:
880 // 1) Periodically query the pods and filter out
881 // the vcore ones
882 // 2) Validate that the pods running are the same
883 // as the previous check
884 // 3) Validate that the IP addresses are the same
885 // as the last check.
886 // If the pod name(s) ha(s/ve) changed then remove
887 // the unused pod names and add in the new pod names
888 // maintaining the cluster/backend information.
889 // If an IP address has changed (which shouldn't
890 // happen unless a pod is re-started) it should get
891 // caught by the pod name change.
892 for {
893 time.Sleep(10 * time.Second) // Wait a while
894 // Get the rw core list from k8s
895 rwPods := getRwPods(clientset, coreFltr)
sslobodre7ce71d2019-01-22 16:21:45 -0500896 queryDeviceIds(rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500897 // If we didn't get 2n+1 pods then wait since
898 // something is down and will hopefully come
899 // back up at some point.
900 // TODO: remove the 6 pod hardcoding
901 if len(rwPods) != 6 {
902 continue
903 }
904 // We have all pods, check if any IP addresses
905 // have changed.
906 for _,v := range rwPods {
sslobodre7ce71d2019-01-22 16:21:45 -0500907 if hasIpAddr(coreGroups, v.ipAddr) == false {
908 log.Debug("Address has changed...")
909 applyAddrDiffs(client, coreGroups, rwPods)
910
911 }
sslobodr16e41bc2019-01-18 16:22:21 -0500912 }
913 }
sslobodr16e41bc2019-01-18 16:22:21 -0500914}
915
sslobodre7ce71d2019-01-22 16:21:45 -0500916func hasIpAddr(coreGroups [][]*rwPod, ipAddr string) bool {
917 for _,v1 := range(coreGroups) {
918 for _,v2 := range(v1) {
919 if v2.ipAddr == ipAddr {
920 return true
921 }
922 }
923 }
924 return false
925}
926
927
sslobodr16e41bc2019-01-18 16:22:21 -0500928func main() {
929 // This is currently hard coded to a cluster with 3 servers
930 //var connections map[string]configConn = make(map[string]configConn)
931 //var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
932 var rwCoreNodesPrev map[string][]rwPod = make(map[string][]rwPod)
933 var firstTime bool = true
934 var err error
935 var conn *grpc.ClientConn
936
937
938 // Set up the regular expression to identify the voltha cores
939 coreFltr := regexp.MustCompile(`rw-core[0-9]-`)
940
941 // Set up logging
942 if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
943 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
944 }
945
946 // Set up kubernetes api
947 clientset := k8sClientSet()
948
949 // Connect to the affinity router and set up the client
950 conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
951 if err != nil {
952 panic(err.Error())
953 }
954 client := pb.NewConfigurationClient(conn)
955
956 // Get the voltha rw-core podes
957 rwPods := getRwPods(clientset, coreFltr)
958
959 // Fetch the devices held by each running core
sslobodre7ce71d2019-01-22 16:21:45 -0500960 queryDeviceIds(rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500961
962 // For debugging... comment out l8r
963 for _,v := range rwPods {
964 log.Debugf("Pod list %v", *v)
965 }
966
967 coreGroups := groupPods1(rwPods)
968
969
970 // Assign the groupings to the the backends and connections
971 for k,_ := range coreGroups {
972 for k1,_ := range coreGroups[k] {
973 coreGroups[k][k1].backend = "vcore"+strconv.Itoa(k+1)
974 coreGroups[k][k1].connection = "vcore"+strconv.Itoa(k+1)+strconv.Itoa(k1+1)
975 }
976 }
977 log.Debug("Core gouping completed")
978
979 // TODO: Debugging code, comment out for production
980 for k,v := range coreGroups {
981 for k2,v2 := range v {
982 log.Debugf("Core group %d,%d: %v", k, k2, v2)
983 }
984 }
985 log.Debug("Setting affinities")
986 // Now set the affinities for exising devices in the cores
987 for _,v := range coreGroups {
988 setAffinity(client, v[0].devIds, v[0].backend)
989 setAffinity(client, v[1].devIds, v[1].backend)
990 }
991 log.Debug("Setting connections")
992 // Configure the backeds based on the calculated core groups
993 for _,v := range coreGroups {
994 setConnection(client, v[0].backend, v[0].connection, v[0].ipAddr, 50057)
995 setConnection(client, v[1].backend, v[1].connection, v[1].ipAddr, 50057)
996 }
997
998 log.Debug("Starting discovery monitoring")
sslobodr38afd0d2019-01-21 12:31:46 -0500999 startDiscoveryMonitor(client, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -05001000
1001 log.Debugf("Starting core monitoring")
1002 startCoreMonitor(client, clientset, coreFltr, coreGroups) // Never returns
1003 return
1004
1005
1006 // The main loop needs to do the following:
1007 // 1) Periodically query the pods and filter out
1008 // the vcore ones
1009 // 2) Validate that the pods running are the same
1010 // as the previous check
1011 // 3) Validate that the IP addresses are the same
1012 // as the last check.
1013 // If the pod name(s) ha(s/ve) changed then remove
1014 // the unused pod names and add in the new pod names
1015 // maintaining the cluster/backend information.
1016 // If an IP address has changed (which shouldn't
1017 // happen unless a pod is re-started) it should get
1018 // caught by the pod name change.
1019 for {
1020 var rwCorePods map[string]rwPod = make(map[string]rwPod)
1021 var rwCoreNodes map[string][]rwPod = make(map[string][]rwPod)
1022 pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
1023 if err != nil {
1024 panic(err.Error())
1025 }
1026 log.Debugf("There are %d pods in the cluster\n", len(pods.Items))
1027
1028 /*
1029 for k,v := range pods.Items {
1030 if v.Namespace == "voltha" && coreFltr.MatchString(v.Name) {
1031 fmt.Printf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
1032 v.Status.PodIP, v.Spec.NodeName)
1033 //fmt.Printf("Pod %v,%v\n\n\n",k,v)
1034 _ = k
1035 // Add this pod to the core structure.
1036 if firstTime == true {
1037 rwCorePodsPrev[v.Name] = rwPod{name:v.Name,node:v.Spec.NodeName}
1038 rwCoreNodesPrev[v.Spec.NodeName] =
1039 append(rwCoreNodesPrev[v.Spec.NodeName], rwPod{name:v.Name,node:v.Spec.NodeName})
1040 }
1041 rwCorePods[v.Name] = rwPod{v.Name,v.Status.PodIP,v.Spec.NodeName, "", ""}
1042 rwCoreNodes[v.Spec.NodeName] =
1043 append(rwCoreNodes[v.Spec.NodeName], rwPod{v.Name,v.Status.PodIP,v.Spec.NodeName,"",""})
1044 }
1045 }
1046 */
1047
1048 if len(rwCorePods) != 6 {
1049 continue
1050 }
1051
1052 //fmt.Printf("Pod map: %v\n", rwCorePods)
1053 //fmt.Printf("Pod map2: %v\n", rwCoreNodes)
1054
1055 // Examples for error handling:
1056 // - Use helper functions like e.g. errors.IsNotFound()
1057 // - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
1058 /*
1059 _, err = clientset.CoreV1().Pods("default").Get("example-xxxxx", metav1.GetOptions{})
1060 if errors.IsNotFound(err) {
1061 fmt.Printf("Pod not found\n")
1062 } else if statusError, isStatus := err.(*errors.StatusError); isStatus {
1063 fmt.Printf("Error getting pod %v\n", statusError.ErrStatus.Message)
1064 } else if err != nil {
1065 panic(err.Error())
1066 } else {
1067 fmt.Printf("Found pod\n")
1068 }
1069 */
1070 // Set the association to backends and connections only once.
1071 // TODO: This needs to be reworked for when a pod crashes
1072 // and it's name changes.
1073 if firstTime == true {
1074 be := 1
1075 for k,_ := range rwCoreNodesPrev { // Each node has 2 cores running on it
1076 // Use a pretty dumb distribution algorithm.
1077 log.Debugf("Processing core node %s:%d\n", k,be)
1078 rwCoreNodesPrev[k][0].backend = "vcore"+strconv.Itoa(be)
1079 rwCoreNodesPrev[k][0].connection = "vcore"+strconv.Itoa(be)+strconv.Itoa(1)
1080 rwCoreNodesPrev[k][1].backend = "vcore"+strconv.Itoa(be%3+1)
1081 rwCoreNodesPrev[k][1].connection = "vcore"+strconv.Itoa(be%3+1)+strconv.Itoa(2)
1082 be++
1083 }
1084 }
1085
1086 log.Debugf("Backend Allocation: %v",rwCoreNodesPrev)
1087 // Compare the current node IPs with the previous node IPs and if they differ
1088 // then set the new one and send the command to configure the router with the
1089 // new backend connection.
1090 for k,v := range rwCoreNodesPrev {
1091 if rwCoreNodes[k][0].ipAddr != rwCoreNodesPrev[k][0].ipAddr {
1092 log.Debugf("Configuring backend %s : connection %s\n\n", v[0].backend, v[0].connection)
1093 cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:rwCoreNodesPrev[k][0].backend,
1094 Connection:rwCoreNodesPrev[k][0].connection,Addr:rwCoreNodes[k][0].ipAddr,
1095 Port:50057}
1096 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
1097 log.Debugf("failed SetConnection RPC call: %s", err)
1098 } else {
1099 log.Debugf("Result: %v", res)
1100 rwCoreNodesPrev[k][0].ipAddr = rwCoreNodes[k][0].ipAddr
1101 }
1102 }
1103 if rwCoreNodes[k][1].ipAddr != rwCoreNodesPrev[k][1].ipAddr {
1104 log.Debugf("Configuring backend %s : connection %s\n\n", v[1].backend, v[1].connection)
1105 cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:rwCoreNodesPrev[k][1].backend,
1106 Connection:rwCoreNodesPrev[k][1].connection,Addr:rwCoreNodes[k][1].ipAddr,
1107 Port:50057}
1108 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
1109 log.Debugf("failed SetConnection RPC call: %s", err)
1110 } else {
1111 log.Debugf("Result: %v", res)
1112 rwCoreNodesPrev[k][1].ipAddr = rwCoreNodes[k][1].ipAddr
1113 }
1114 }
1115 }
1116
1117
1118 fmt.Printf("The structure for setting the connections is: %v\n", rwCoreNodesPrev)
1119 firstTime = false
1120
1121 // Now make the API calls
1122 time.Sleep(10 * time.Second)
1123 }
1124 conn.Close()
1125
1126}
1127