blob: bd2787aa3d40bbea5d50916e27921141637e7ef2 [file] [log] [blame]
sslobodr16e41bc2019-01-18 16:22:21 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package main
18
19import (
sslobodr16e41bc2019-01-18 16:22:21 -050020 "time"
21 "regexp"
22 "errors"
23 "strconv"
sslobodr16e41bc2019-01-18 16:22:21 -050024
25 "k8s.io/client-go/rest"
26 "google.golang.org/grpc"
27 "golang.org/x/net/context"
28 "k8s.io/client-go/kubernetes"
29 "github.com/golang/protobuf/ptypes"
sslobodr16e41bc2019-01-18 16:22:21 -050030 "github.com/opencord/voltha-go/common/log"
31 kafka "github.com/opencord/voltha-go/kafka"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 empty "github.com/golang/protobuf/ptypes/empty"
34 vpb "github.com/opencord/voltha-go/protos/voltha"
sslobodre7ce71d2019-01-22 16:21:45 -050035 cmn "github.com/opencord/voltha-go/protos/common"
sslobodr16e41bc2019-01-18 16:22:21 -050036 pb "github.com/opencord/voltha-go/protos/afrouter"
37 ic "github.com/opencord/voltha-go/protos/inter_container"
38)
39
40type configConn struct {
41 Server string `json:"Server"`
42 Cluster string `json:"Cluster"`
43 Backend string `json:"Backend"`
44 connections map[string]connection
45}
46
47type connection struct {
48 Name string `json:"Connection"`
49 Addr string `json:"Addr"`
50 Port uint64 `json:"Port"`
51}
52
53type rwPod struct {
54 name string
55 ipAddr string
56 node string
57 devIds map[string]struct{}
58 backend string
59 connection string
60}
61
62type podTrack struct {
63 pod *rwPod
64 dn bool
65}
66
sslobodre7ce71d2019-01-22 16:21:45 -050067var nPods int = 6
68
sslobodr16e41bc2019-01-18 16:22:21 -050069// Topic is affinityRouter
70// port: 9092
71
72func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
73
74 log.Infow("kafka-client-type", log.Fields{"client": clientType})
75 switch clientType {
76 case "sarama":
77 return kafka.NewSaramaClient(
78 kafka.Host(host),
79 kafka.Port(port),
80 kafka.ConsumerType(kafka.GroupCustomer),
81 kafka.ProducerReturnOnErrors(true),
82 kafka.ProducerReturnOnSuccess(true),
83 kafka.ProducerMaxRetries(6),
84 kafka.NumPartitions(3),
85 kafka.ConsumerGroupName(instanceID),
86 kafka.ConsumerGroupPrefix(instanceID),
87 kafka.AutoCreateTopic(false),
88 kafka.ProducerFlushFrequency(5),
89 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
90 }
91 return nil, errors.New("unsupported-client-type")
92}
93
94
95func k8sClientSet() *kubernetes.Clientset {
96 // creates the in-cluster config
97 config, err := rest.InClusterConfig()
98 if err != nil {
99 panic(err.Error())
100 }
101 // creates the clientset
102 clientset, err := kubernetes.NewForConfig(config)
103 if err != nil {
104 panic(err.Error())
105 }
106
107 return clientset
108}
109
110
111func connect(addr string) (*grpc.ClientConn, error) {
112 for ctr :=0 ; ctr < 100; ctr++ {
sslobodre7ce71d2019-01-22 16:21:45 -0500113 log.Debugf("Trying to connect to %s", addr)
sslobodr16e41bc2019-01-18 16:22:21 -0500114 conn, err := grpc.Dial(addr, grpc.WithInsecure())
115 if err != nil {
116 log.Debugf("Attempt to connect failed, retrying %v:", err)
117 } else {
118 log.Debugf("Connection succeeded")
119 return conn,err
120 }
121 time.Sleep(10 * time.Second)
122 }
123 log.Debugf("Too many connection attempts, giving up!")
124 return nil,errors.New("Timeout attempting to conect")
125}
126
127func getRwPods(cs *kubernetes.Clientset, coreFilter * regexp.Regexp) []*rwPod {
128 var rtrn []*rwPod
129
130 pods, err := cs.CoreV1().Pods("").List(metav1.ListOptions{})
131 if err != nil {
132 panic(err.Error())
133 }
sslobodre7ce71d2019-01-22 16:21:45 -0500134 //log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
sslobodr16e41bc2019-01-18 16:22:21 -0500135
sslobodre7ce71d2019-01-22 16:21:45 -0500136 for _,v := range pods.Items {
sslobodr16e41bc2019-01-18 16:22:21 -0500137 if v.Namespace == "voltha" && coreFilter.MatchString(v.Name) {
sslobodre7ce71d2019-01-22 16:21:45 -0500138 log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
sslobodr16e41bc2019-01-18 16:22:21 -0500139 v.Status.PodIP, v.Spec.NodeName)
sslobodre7ce71d2019-01-22 16:21:45 -0500140 // Only add the pod if it has an IP address. If it doesn't then it likely crashed and
141 // and is still in the process of getting re-started.
142 if v.Status.PodIP != "" {
143 rtrn = append(rtrn, &rwPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
144 devIds:make(map[string]struct{}), backend:"", connection:""})
145 }
sslobodr16e41bc2019-01-18 16:22:21 -0500146 }
147 }
148 return rtrn
149}
150
sslobodre7ce71d2019-01-22 16:21:45 -0500151func reconcilePodDeviceIds(pod * rwPod, ids map[string]struct{}) bool {
152 var idList cmn.IDs
sslobodr6c1689c2019-01-24 07:31:15 -0500153 for k,_ := range ids {
sslobodre7ce71d2019-01-22 16:21:45 -0500154 idList.Items = append(idList.Items, &cmn.ID{Id:k})
155 }
156 conn,err := connect(pod.ipAddr+":50057")
157 defer conn.Close()
sslobodr6c1689c2019-01-24 07:31:15 -0500158 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500159 log.Debugf("Could not query devices from %s, could not connect", pod.name)
160 return false
161 }
162 client := vpb.NewVolthaServiceClient(conn)
163 _,err = client.ReconcileDevices(context.Background(), &idList)
164 if err != nil {
165 log.Error(err)
166 return false
167 }
168
169 return true
170}
171
172func queryPodDeviceIds(pod * rwPod) map[string]struct{} {
173 var rtrn map[string]struct{} = make(map[string]struct{})
174 // Open a connection to the pod
175 // port 50057
176 conn, err := connect(pod.ipAddr+":50057")
sslobodr6c1689c2019-01-24 07:31:15 -0500177 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500178 log.Debugf("Could not query devices from %s, could not connect", pod.name)
179 return rtrn
180 }
181 defer conn.Close()
182 client := vpb.NewVolthaServiceClient(conn)
183 devs,err := client.ListDeviceIds(context.Background(), &empty.Empty{})
184 if err != nil {
185 log.Error(err)
186 return rtrn
187 }
188 for _,dv := range devs.Items {
189 rtrn[dv.Id]=struct{}{}
190 }
191
192 return rtrn
193}
194
195func queryDeviceIds(pods []*rwPod) {
196 for pk,_ := range pods {
197 // Keep the old Id list if a new list is not returned
198 if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
199 pods[pk].devIds = idList
sslobodr16e41bc2019-01-18 16:22:21 -0500200 }
sslobodr16e41bc2019-01-18 16:22:21 -0500201 }
202}
203
204func allEmpty(pods []*rwPod) bool {
205 for k,_ := range pods {
206 if len(pods[k].devIds) != 0 {
207 return false
208 }
209 }
210 return true
211}
212
sslobodr16e41bc2019-01-18 16:22:21 -0500213func rmPod(pods []*rwPod, idx int) []*rwPod {
214 return append(pods[:idx],pods[idx+1:]...)
215}
216
217func groupIntersectingPods1(pods []*rwPod, podCt int) ([][]*rwPod,[]*rwPod) {
218 var rtrn [][]*rwPod
219 var out []*rwPod
220
221 for {
222 if len(pods) == 0 {
223 break
224 }
225 if len(pods[0].devIds) == 0 { // Ignore pods with no devices
226 ////log.Debugf("%s empty pod", pd[k].pod.name)
227 out = append(out, pods[0])
228 pods = rmPod(pods, 0)
229 continue
230 }
231 // Start a pod group with this pod
232 var grp []*rwPod
233 grp = append(grp, pods[0])
234 pods = rmPod(pods,0)
235 //log.Debugf("Creating new group %s", pd[k].pod.name)
236 // Find the peer pod based on device overlap
237 // It's ok if one isn't found, an empty one will be used instead
238 for k,_ := range pods {
239 if len(pods[k].devIds) == 0 { // Skip pods with no devices
240 //log.Debugf("%s empty pod", pd[k1].pod.name)
241 continue
242 }
243 if intersect(grp[0].devIds, pods[k].devIds) == true {
244 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
245 if grp[0].node == pods[k].node {
246 // This should never happen
247 log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
248 grp[0].name, pods[k].name)
249 continue
250 }
251 grp = append(grp, pods[k])
252 pods = rmPod(pods, k)
253 break
254
255 }
256 }
257 rtrn = append(rtrn, grp)
258 //log.Debugf("Added group %s", grp[0].name)
259 // Check if the number of groups = half the pods, if so all groups are started.
260 if len(rtrn) == podCt >> 1 {
261 // Append any remaining pods to out
262 out = append(out,pods[0:]...)
263 break
264 }
265 }
266 return rtrn,out
267}
268
sslobodr16e41bc2019-01-18 16:22:21 -0500269func unallocPodCount(pd []*podTrack) int {
270 var rtrn int = 0
271 for _,v := range pd {
272 if v.dn == false {
273 rtrn++
274 }
275 }
276 return rtrn
277}
278
279
280func sameNode(pod *rwPod, grps [][]*rwPod) bool {
281 for _,v := range grps {
282 if v[0].node == pod.node {
283 return true
284 }
285 if len(v) == 2 && v[1].node == pod.node {
286 return true
287 }
288 }
289 return false
290}
291
292func startRemainingGroups1(grps [][]*rwPod, pods []*rwPod, podCt int) ([][]*rwPod, []*rwPod) {
293 var grp []*rwPod
294
295 for k,_ := range pods {
296 if sameNode(pods[k], grps) {
297 continue
298 }
299 grp = []*rwPod{}
300 grp = append(grp, pods[k])
301 pods = rmPod(pods, k)
302 grps = append(grps, grp)
303 if len(grps) == podCt >> 1 {
304 break
305 }
306 }
307 return grps, pods
308}
309
sslobodr16e41bc2019-01-18 16:22:21 -0500310func hasSingleSecondNode(grp []*rwPod) bool {
311 var srvrs map[string]struct{} = make(map[string]struct{})
312 for k,_ := range grp {
313 if k == 0 {
314 continue // Ignore the first item
315 }
316 srvrs[grp[k].node] = struct{}{}
317 }
318 if len(srvrs) == 1 {
319 return true
320 }
321 return false
322}
323
324func addNode(grps [][]*rwPod, idx *rwPod, item *rwPod) [][]*rwPod {
325 for k,_ := range grps {
326 if grps[k][0].name == idx.name {
327 grps[k] = append(grps[k], item)
328 return grps
329 }
330 }
331 // TODO: Error checking required here.
332 return grps
333}
334
335func removeNode(grps [][]*rwPod, item *rwPod) [][]*rwPod {
336 for k,_ := range grps {
337 for k1,_ := range grps[k] {
338 if grps[k][k1].name == item.name {
339 grps[k] = append(grps[k][:k1],grps[k][k1+1:]...)
340 break
341 }
342 }
343 }
344 return grps
345}
346
347func groupRemainingPods1(grps [][]*rwPod, pods []*rwPod) [][]*rwPod {
348 var lgrps [][]*rwPod
349 // All groups must be started when this function is called.
350 // Copy incomplete groups
351 for k,_ := range grps {
352 if len(grps[k]) != 2 {
353 lgrps = append(lgrps, grps[k])
354 }
355 }
356
357 // Add all pairing candidates to each started group.
358 for k,_ := range pods {
359 for k2,_ := range lgrps {
360 if lgrps[k2][0].node != pods[k].node {
361 lgrps[k2] = append(lgrps[k2], pods[k])
362 }
363 }
364 }
365
366 //TODO: If any member of lgrps doesn't have at least 2
367 // nodes something is wrong. Check for that here
368
369 for {
370 for { // Address groups with only a single server choice
371 var ssn bool = false
372
373 for k,_ := range lgrps {
374 // Now if any of the groups only have a single
375 // node as the choice for the second member
376 // address that one first.
377 if hasSingleSecondNode(lgrps[k]) == true {
378 ssn = true
379 // Add this pairing to the groups
380 grps = addNode(grps, lgrps[k][0], lgrps[k][1])
381 // Since this node is now used, remove it from all
382 // remaining tenative groups
383 lgrps = removeNode(lgrps, lgrps[k][1])
384 // Now remove this group completely since
385 // it's been addressed
386 lgrps = append(lgrps[:k],lgrps[k+1:]...)
387 break
388 }
389 }
390 if ssn == false {
391 break
392 }
393 }
394 // Now adress one of the remaining groups
395 if len(lgrps) == 0 {
396 break // Nothing left to do, exit the loop
397 }
398 grps = addNode(grps, lgrps[0][0], lgrps[0][1])
399 lgrps = removeNode(lgrps, lgrps[0][1])
400 lgrps = append(lgrps[:0],lgrps[1:]...)
401 }
402 return grps
403}
404
sslobodr16e41bc2019-01-18 16:22:21 -0500405func groupPods1(pods []*rwPod) [][]*rwPod {
406 var rtrn [][]*rwPod
407 var podCt int = len(pods)
408
409 rtrn,pods = groupIntersectingPods1(pods, podCt)
410 // There are several outcomes here
411 // 1) All pods have been paired and we're done
412 // 2) Some un-allocated pods remain
413 // 2.a) All groups have been started
414 // 2.b) Not all groups have been started
415 if len(pods) == 0 {
416 return rtrn
417 } else if len(rtrn) == podCt >> 1 { // All groupings started
418 // Allocate the remaining (presumably empty) pods to the started groups
419 return groupRemainingPods1(rtrn, pods)
420 } else { // Some groupings started
421 // Start empty groups with remaining pods
422 // each grouping is on a different server then
423 // allocate remaining pods.
424 rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
425 return groupRemainingPods1(rtrn, pods)
426 }
427}
428
sslobodr16e41bc2019-01-18 16:22:21 -0500429func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
430 for k,_ := range d1 {
431 if _,ok := d2[k]; ok == true {
432 return true
433 }
434 }
435 return false
436}
437
438func setConnection(client pb.ConfigurationClient, backend string, connection string, addr string, port uint64) {
439 log.Debugf("Configuring backend %s : connection %s\n\n", backend, connection)
440 cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:backend,
441 Connection:connection,Addr:addr,
442 Port:port}
443 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
444 log.Debugf("failed SetConnection RPC call: %s", err)
445 } else {
446 log.Debugf("Result: %v", res)
447 }
448}
449
450func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
451 log.Debugf("Configuring backend %s : affinities \n", backend)
452 aff := &pb.Affinity{Router:"vcore",Route:"dev_manager",Cluster:"vcore",Backend:backend}
453 for k,_ := range ids {
454 log.Debugf("Setting affinity for id %s", k)
455 aff.Id = k
456 if res, err := client.SetAffinity(context.Background(), aff); err != nil {
457 log.Debugf("failed affinity RPC call: %s", err)
458 } else {
459 log.Debugf("Result: %v", res)
460 }
461 }
462}
463
sslobodr38afd0d2019-01-21 12:31:46 -0500464func getBackendForCore(coreId string, coreGroups [][]*rwPod) string {
sslobodr6c1689c2019-01-24 07:31:15 -0500465 for _,v := range coreGroups {
466 for _,v2 := range v {
sslobodr38afd0d2019-01-21 12:31:46 -0500467 if v2.name == coreId {
468 return v2.backend
469 }
470 }
471 }
472 log.Errorf("No backend found for core %s\n", coreId)
473 return ""
474}
475
sslobodr16e41bc2019-01-18 16:22:21 -0500476func monitorDiscovery(client pb.ConfigurationClient,
sslobodr38afd0d2019-01-21 12:31:46 -0500477 ch <-chan *ic.InterContainerMessage,
478 coreGroups [][]*rwPod) {
479 var id map[string]struct{} = make(map[string]struct{})
480
sslobodr16e41bc2019-01-18 16:22:21 -0500481 select {
482 case msg := <-ch:
483 log.Debugf("Received a device discovery notification")
sslobodr38afd0d2019-01-21 12:31:46 -0500484 device := &ic.DeviceDiscovered{}
485 if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
sslobodr16e41bc2019-01-18 16:22:21 -0500486 log.Errorf("Could not unmarshal received notification %v", msg)
487 } else {
sslobodr38afd0d2019-01-21 12:31:46 -0500488 // Set the affinity of the discovered device.
489 if be := getBackendForCore(device.Id, coreGroups); be != "" {
490 id[device.Id]=struct{}{}
491 setAffinity(client, id, be)
492 } else {
493 log.Error("Cant use an empty string as a backend name")
494 }
sslobodr16e41bc2019-01-18 16:22:21 -0500495 }
496 break
497 }
498}
499
sslobodr38afd0d2019-01-21 12:31:46 -0500500func startDiscoveryMonitor(client pb.ConfigurationClient,
501 coreGroups [][]*rwPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500502 var ch <-chan *ic.InterContainerMessage
503 // Connect to kafka for discovery events
504 topic := &kafka.Topic{Name: "AffinityRouter"}
505 kc,err := newKafkaClient("sarama", "kafka", 9092, "arouterd")
506 kc.Start()
507
508 if ch, err = kc.Subscribe(topic); err != nil {
509 log.Error("Could not subscribe to the 'AffinityRouter' channel, discovery disabled")
510 return err
511 }
sslobodr38afd0d2019-01-21 12:31:46 -0500512 go monitorDiscovery(client, ch, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500513 return nil
514}
515
sslobodre7ce71d2019-01-22 16:21:45 -0500516// Determines which items in core groups
517// have changed based on the list provided
518// and returns a coreGroup with only the changed
519// items and a pod list with the new items
520func getAddrDiffs(coreGroups [][]*rwPod, rwPods []*rwPod) ([][]*rwPod, []*rwPod) {
521 var nList []*rwPod
522 var rtrn [][]*rwPod = make([][]*rwPod, nPods>>1)
523 var ipAddrs map[string]struct{} = make(map[string]struct{})
524
525 log.Debug("Get addr diffs")
526
527 // Start with an empty array
sslobodr6c1689c2019-01-24 07:31:15 -0500528 for k,_ := range rtrn {
sslobodre7ce71d2019-01-22 16:21:45 -0500529 rtrn[k] = make([]*rwPod, 2)
530 }
531
532 // Build a list with only the new items
sslobodr6c1689c2019-01-24 07:31:15 -0500533 for _,v := range rwPods {
sslobodre7ce71d2019-01-22 16:21:45 -0500534 if hasIpAddr(coreGroups, v.ipAddr) == false {
535 nList = append(nList, v)
536 }
537 ipAddrs[v.ipAddr] = struct{}{} // for the search below
538 }
539
540 // Now build the coreGroups with only the changed items
sslobodr6c1689c2019-01-24 07:31:15 -0500541 for k1,v1 := range coreGroups {
542 for k2,v2 := range v1 {
sslobodre7ce71d2019-01-22 16:21:45 -0500543 if _,ok := ipAddrs[v2.ipAddr]; ok == false {
544 rtrn[k1][k2] = v2
545 }
546 }
547 }
548 return rtrn, nList
549}
550
551// Figure out where best to put the new pods
552// in the coreGroup array based on the old
553// pods being replaced. The criteria is that
554// the new pod be on the same server as the
555// old pod was.
556func reconcileAddrDiffs(coreGroupDiffs [][]*rwPod, rwPodDiffs []*rwPod) ([][]*rwPod) {
557 var srvrs map[string][]*rwPod = make(map[string][]*rwPod)
558
559 log.Debug("Reconciling diffs")
560 log.Debug("Building server list")
sslobodr6c1689c2019-01-24 07:31:15 -0500561 for _,v := range rwPodDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500562 log.Debugf("Adding %v to the server list", *v)
563 srvrs[v.node] = append(srvrs[v.node], v)
564 }
565
sslobodr6c1689c2019-01-24 07:31:15 -0500566 for k1,v1 := range coreGroupDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500567 log.Debugf("k1:%v, v1:%v", k1,v1)
sslobodr6c1689c2019-01-24 07:31:15 -0500568 for k2,v2 := range v1 {
sslobodre7ce71d2019-01-22 16:21:45 -0500569 log.Debugf("k2:%v, v2:%v", k2,v2)
570 if v2 == nil { // Nothing to do here
571 continue
572 }
573 if _,ok := srvrs[v2.node]; ok == true {
574 coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
575 if len(srvrs[v2.node]) > 1 { // remove one entry from the list
576 srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
577 } else { // Delete the endtry from the map
578 delete(srvrs, v2.node)
579 }
580 } else {
581 log.Error("This should never happen, node appears to have changed names")
582 // attempt to limp along by keeping this old entry
583 }
584 }
585 }
586
587 return coreGroupDiffs
588}
589
590func applyAddrDiffs(client pb.ConfigurationClient, coreGroups [][]*rwPod, rwPods []*rwPod) {
591 var newEntries [][]*rwPod
592
593 log.Debug("Applying diffs")
594 newEntries = reconcileAddrDiffs(getAddrDiffs(coreGroups, rwPods))
595
596 // Now replace the information in coreGropus with the new
597 // entries and then reconcile the device ids on the core
598 // that's in the new entry with the device ids of it's
599 // active-active peer.
sslobodr6c1689c2019-01-24 07:31:15 -0500600 for k1,v1 := range coreGroups {
601 for k2,v2 := range v1 {
sslobodre7ce71d2019-01-22 16:21:45 -0500602 if newEntries[k1][k2] != nil {
603 // TODO: Missing is the case where bothe the primary
604 // and the secondary core crash and come back.
605 // Pull the device ids from the active-active peer
606 ids := queryPodDeviceIds(coreGroups[k1][k2^1])
607 if len(ids) != 0 {
608 if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
609 log.Errorf("Attempt to reconcile ids on pod %v failed",newEntries[k1][k2])
610 }
611 }
612 // Send the affininty router new connection information
613 setConnection(client, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
614 // Copy the new entry information over
615 coreGroups[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
616 coreGroups[k1][k2].name = newEntries[k1][k2].name
617 coreGroups[k1][k2].devIds = ids
618 }
619 }
620 }
621}
622
sslobodrcd37bc52019-01-24 11:47:16 -0500623func updateDeviceIds(coreGroups [][]*rwPod, rwPods []*rwPod) {
624 var byName map[string]*rwPod = make(map[string]*rwPod)
625
626 // Convinience
627 for _,v := range rwPods {
628 byName[v.name] = v
629 }
630
631 for k1,v1 := range coreGroups {
632 for k2,_ := range v1 {
633 coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
634 }
635 }
636}
637
sslobodr16e41bc2019-01-18 16:22:21 -0500638func startCoreMonitor(client pb.ConfigurationClient,
639 clientset *kubernetes.Clientset,
640 coreFltr *regexp.Regexp,
641 coreGroups [][]*rwPod) error {
642 // Now that initial allocation has been completed, monitor the pods
643 // for IP changes
644 // The main loop needs to do the following:
645 // 1) Periodically query the pods and filter out
646 // the vcore ones
647 // 2) Validate that the pods running are the same
648 // as the previous check
649 // 3) Validate that the IP addresses are the same
650 // as the last check.
651 // If the pod name(s) ha(s/ve) changed then remove
652 // the unused pod names and add in the new pod names
653 // maintaining the cluster/backend information.
654 // If an IP address has changed (which shouldn't
655 // happen unless a pod is re-started) it should get
656 // caught by the pod name change.
657 for {
658 time.Sleep(10 * time.Second) // Wait a while
659 // Get the rw core list from k8s
660 rwPods := getRwPods(clientset, coreFltr)
sslobodre7ce71d2019-01-22 16:21:45 -0500661 queryDeviceIds(rwPods)
sslobodrcd37bc52019-01-24 11:47:16 -0500662 updateDeviceIds(coreGroups, rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500663 // If we didn't get 2n+1 pods then wait since
664 // something is down and will hopefully come
665 // back up at some point.
666 // TODO: remove the 6 pod hardcoding
667 if len(rwPods) != 6 {
668 continue
669 }
670 // We have all pods, check if any IP addresses
671 // have changed.
672 for _,v := range rwPods {
sslobodre7ce71d2019-01-22 16:21:45 -0500673 if hasIpAddr(coreGroups, v.ipAddr) == false {
674 log.Debug("Address has changed...")
675 applyAddrDiffs(client, coreGroups, rwPods)
676
677 }
sslobodr16e41bc2019-01-18 16:22:21 -0500678 }
679 }
sslobodr16e41bc2019-01-18 16:22:21 -0500680}
681
sslobodre7ce71d2019-01-22 16:21:45 -0500682func hasIpAddr(coreGroups [][]*rwPod, ipAddr string) bool {
sslobodr6c1689c2019-01-24 07:31:15 -0500683 for _,v1 := range coreGroups {
684 for _,v2 := range v1 {
sslobodre7ce71d2019-01-22 16:21:45 -0500685 if v2.ipAddr == ipAddr {
686 return true
687 }
688 }
689 }
690 return false
691}
692
693
sslobodr16e41bc2019-01-18 16:22:21 -0500694func main() {
695 // This is currently hard coded to a cluster with 3 servers
696 //var connections map[string]configConn = make(map[string]configConn)
697 //var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
sslobodr16e41bc2019-01-18 16:22:21 -0500698 var err error
699 var conn *grpc.ClientConn
700
701
702 // Set up the regular expression to identify the voltha cores
703 coreFltr := regexp.MustCompile(`rw-core[0-9]-`)
704
705 // Set up logging
706 if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
707 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
708 }
709
710 // Set up kubernetes api
711 clientset := k8sClientSet()
712
713 // Connect to the affinity router and set up the client
714 conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
sslobodrcd37bc52019-01-24 11:47:16 -0500715 defer conn.Close()
sslobodr16e41bc2019-01-18 16:22:21 -0500716 if err != nil {
717 panic(err.Error())
718 }
719 client := pb.NewConfigurationClient(conn)
720
721 // Get the voltha rw-core podes
722 rwPods := getRwPods(clientset, coreFltr)
723
724 // Fetch the devices held by each running core
sslobodre7ce71d2019-01-22 16:21:45 -0500725 queryDeviceIds(rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500726
727 // For debugging... comment out l8r
728 for _,v := range rwPods {
729 log.Debugf("Pod list %v", *v)
730 }
731
732 coreGroups := groupPods1(rwPods)
733
sslobodr16e41bc2019-01-18 16:22:21 -0500734 // Assign the groupings to the the backends and connections
735 for k,_ := range coreGroups {
736 for k1,_ := range coreGroups[k] {
737 coreGroups[k][k1].backend = "vcore"+strconv.Itoa(k+1)
738 coreGroups[k][k1].connection = "vcore"+strconv.Itoa(k+1)+strconv.Itoa(k1+1)
739 }
740 }
sslobodrcd37bc52019-01-24 11:47:16 -0500741 log.Info("Core gouping completed")
sslobodr16e41bc2019-01-18 16:22:21 -0500742
743 // TODO: Debugging code, comment out for production
744 for k,v := range coreGroups {
745 for k2,v2 := range v {
746 log.Debugf("Core group %d,%d: %v", k, k2, v2)
747 }
748 }
sslobodrcd37bc52019-01-24 11:47:16 -0500749 log.Info("Setting affinities")
sslobodr16e41bc2019-01-18 16:22:21 -0500750 // Now set the affinities for exising devices in the cores
751 for _,v := range coreGroups {
752 setAffinity(client, v[0].devIds, v[0].backend)
753 setAffinity(client, v[1].devIds, v[1].backend)
754 }
sslobodrcd37bc52019-01-24 11:47:16 -0500755 log.Info("Setting connections")
sslobodr16e41bc2019-01-18 16:22:21 -0500756 // Configure the backeds based on the calculated core groups
757 for _,v := range coreGroups {
758 setConnection(client, v[0].backend, v[0].connection, v[0].ipAddr, 50057)
759 setConnection(client, v[1].backend, v[1].connection, v[1].ipAddr, 50057)
760 }
761
sslobodrcd37bc52019-01-24 11:47:16 -0500762 log.Info("Starting discovery monitoring")
sslobodr38afd0d2019-01-21 12:31:46 -0500763 startDiscoveryMonitor(client, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500764
sslobodrcd37bc52019-01-24 11:47:16 -0500765 log.Info("Starting core monitoring")
sslobodr16e41bc2019-01-18 16:22:21 -0500766 startCoreMonitor(client, clientset, coreFltr, coreGroups) // Never returns
767 return
sslobodr16e41bc2019-01-18 16:22:21 -0500768}
769