blob: 0bcff080cf2afcda1b54a0ca051b751949bb1453 [file] [log] [blame]
sslobodr16e41bc2019-01-18 16:22:21 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package main
18
19import (
sslobodr16e41bc2019-01-18 16:22:21 -050020 "time"
21 "regexp"
22 "errors"
23 "strconv"
sslobodr16e41bc2019-01-18 16:22:21 -050024
25 "k8s.io/client-go/rest"
26 "google.golang.org/grpc"
27 "golang.org/x/net/context"
28 "k8s.io/client-go/kubernetes"
29 "github.com/golang/protobuf/ptypes"
sslobodr16e41bc2019-01-18 16:22:21 -050030 "github.com/opencord/voltha-go/common/log"
31 kafka "github.com/opencord/voltha-go/kafka"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 empty "github.com/golang/protobuf/ptypes/empty"
34 vpb "github.com/opencord/voltha-go/protos/voltha"
sslobodre7ce71d2019-01-22 16:21:45 -050035 cmn "github.com/opencord/voltha-go/protos/common"
sslobodr16e41bc2019-01-18 16:22:21 -050036 pb "github.com/opencord/voltha-go/protos/afrouter"
37 ic "github.com/opencord/voltha-go/protos/inter_container"
38)
39
40type configConn struct {
41 Server string `json:"Server"`
42 Cluster string `json:"Cluster"`
43 Backend string `json:"Backend"`
44 connections map[string]connection
45}
46
47type connection struct {
48 Name string `json:"Connection"`
49 Addr string `json:"Addr"`
50 Port uint64 `json:"Port"`
51}
52
sslobodr8e2ccb52019-02-05 09:21:47 -050053type volthaPod struct {
sslobodr16e41bc2019-01-18 16:22:21 -050054 name string
55 ipAddr string
56 node string
57 devIds map[string]struct{}
sslobodr8e2ccb52019-02-05 09:21:47 -050058 cluster string
sslobodr16e41bc2019-01-18 16:22:21 -050059 backend string
60 connection string
61}
62
63type podTrack struct {
sslobodr8e2ccb52019-02-05 09:21:47 -050064 pod *volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -050065 dn bool
66}
67
sslobodre7ce71d2019-01-22 16:21:45 -050068var nPods int = 6
69
sslobodr16e41bc2019-01-18 16:22:21 -050070// Topic is affinityRouter
71// port: 9092
72
73func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
74
75 log.Infow("kafka-client-type", log.Fields{"client": clientType})
76 switch clientType {
77 case "sarama":
78 return kafka.NewSaramaClient(
79 kafka.Host(host),
80 kafka.Port(port),
81 kafka.ConsumerType(kafka.GroupCustomer),
82 kafka.ProducerReturnOnErrors(true),
83 kafka.ProducerReturnOnSuccess(true),
84 kafka.ProducerMaxRetries(6),
85 kafka.NumPartitions(3),
86 kafka.ConsumerGroupName(instanceID),
87 kafka.ConsumerGroupPrefix(instanceID),
88 kafka.AutoCreateTopic(false),
89 kafka.ProducerFlushFrequency(5),
90 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
91 }
92 return nil, errors.New("unsupported-client-type")
93}
94
95
96func k8sClientSet() *kubernetes.Clientset {
97 // creates the in-cluster config
98 config, err := rest.InClusterConfig()
99 if err != nil {
100 panic(err.Error())
101 }
102 // creates the clientset
103 clientset, err := kubernetes.NewForConfig(config)
104 if err != nil {
105 panic(err.Error())
106 }
107
108 return clientset
109}
110
111
112func connect(addr string) (*grpc.ClientConn, error) {
113 for ctr :=0 ; ctr < 100; ctr++ {
sslobodre7ce71d2019-01-22 16:21:45 -0500114 log.Debugf("Trying to connect to %s", addr)
sslobodr16e41bc2019-01-18 16:22:21 -0500115 conn, err := grpc.Dial(addr, grpc.WithInsecure())
116 if err != nil {
117 log.Debugf("Attempt to connect failed, retrying %v:", err)
118 } else {
119 log.Debugf("Connection succeeded")
120 return conn,err
121 }
122 time.Sleep(10 * time.Second)
123 }
124 log.Debugf("Too many connection attempts, giving up!")
125 return nil,errors.New("Timeout attempting to conect")
126}
127
sslobodr8e2ccb52019-02-05 09:21:47 -0500128func getVolthaPods(cs *kubernetes.Clientset, coreFilter * regexp.Regexp) []*volthaPod {
129 var rtrn []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500130
131 pods, err := cs.CoreV1().Pods("").List(metav1.ListOptions{})
132 if err != nil {
133 panic(err.Error())
134 }
sslobodre7ce71d2019-01-22 16:21:45 -0500135 //log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
sslobodr16e41bc2019-01-18 16:22:21 -0500136
sslobodre7ce71d2019-01-22 16:21:45 -0500137 for _,v := range pods.Items {
sslobodr16e41bc2019-01-18 16:22:21 -0500138 if v.Namespace == "voltha" && coreFilter.MatchString(v.Name) {
sslobodre7ce71d2019-01-22 16:21:45 -0500139 log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
sslobodr16e41bc2019-01-18 16:22:21 -0500140 v.Status.PodIP, v.Spec.NodeName)
sslobodre7ce71d2019-01-22 16:21:45 -0500141 // Only add the pod if it has an IP address. If it doesn't then it likely crashed and
142 // and is still in the process of getting re-started.
143 if v.Status.PodIP != "" {
sslobodr8e2ccb52019-02-05 09:21:47 -0500144 rtrn = append(rtrn, &volthaPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
sslobodre7ce71d2019-01-22 16:21:45 -0500145 devIds:make(map[string]struct{}), backend:"", connection:""})
146 }
sslobodr16e41bc2019-01-18 16:22:21 -0500147 }
148 }
149 return rtrn
150}
151
sslobodr8e2ccb52019-02-05 09:21:47 -0500152func reconcilePodDeviceIds(pod * volthaPod, ids map[string]struct{}) bool {
sslobodre7ce71d2019-01-22 16:21:45 -0500153 var idList cmn.IDs
sslobodr6c1689c2019-01-24 07:31:15 -0500154 for k,_ := range ids {
sslobodre7ce71d2019-01-22 16:21:45 -0500155 idList.Items = append(idList.Items, &cmn.ID{Id:k})
156 }
157 conn,err := connect(pod.ipAddr+":50057")
158 defer conn.Close()
sslobodr6c1689c2019-01-24 07:31:15 -0500159 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500160 log.Debugf("Could not query devices from %s, could not connect", pod.name)
161 return false
162 }
163 client := vpb.NewVolthaServiceClient(conn)
164 _,err = client.ReconcileDevices(context.Background(), &idList)
165 if err != nil {
166 log.Error(err)
167 return false
168 }
169
170 return true
171}
172
sslobodr8e2ccb52019-02-05 09:21:47 -0500173func queryPodDeviceIds(pod * volthaPod) map[string]struct{} {
sslobodre7ce71d2019-01-22 16:21:45 -0500174 var rtrn map[string]struct{} = make(map[string]struct{})
175 // Open a connection to the pod
176 // port 50057
177 conn, err := connect(pod.ipAddr+":50057")
sslobodr6c1689c2019-01-24 07:31:15 -0500178 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500179 log.Debugf("Could not query devices from %s, could not connect", pod.name)
180 return rtrn
181 }
182 defer conn.Close()
183 client := vpb.NewVolthaServiceClient(conn)
184 devs,err := client.ListDeviceIds(context.Background(), &empty.Empty{})
185 if err != nil {
186 log.Error(err)
187 return rtrn
188 }
189 for _,dv := range devs.Items {
190 rtrn[dv.Id]=struct{}{}
191 }
192
193 return rtrn
194}
195
sslobodr8e2ccb52019-02-05 09:21:47 -0500196func queryDeviceIds(pods []*volthaPod) {
sslobodre7ce71d2019-01-22 16:21:45 -0500197 for pk,_ := range pods {
198 // Keep the old Id list if a new list is not returned
199 if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
200 pods[pk].devIds = idList
sslobodr16e41bc2019-01-18 16:22:21 -0500201 }
sslobodr16e41bc2019-01-18 16:22:21 -0500202 }
203}
204
sslobodr8e2ccb52019-02-05 09:21:47 -0500205func allEmpty(pods []*volthaPod) bool {
sslobodr16e41bc2019-01-18 16:22:21 -0500206 for k,_ := range pods {
207 if len(pods[k].devIds) != 0 {
208 return false
209 }
210 }
211 return true
212}
213
sslobodr8e2ccb52019-02-05 09:21:47 -0500214func rmPod(pods []*volthaPod, idx int) []*volthaPod {
sslobodr16e41bc2019-01-18 16:22:21 -0500215 return append(pods[:idx],pods[idx+1:]...)
216}
217
sslobodr8e2ccb52019-02-05 09:21:47 -0500218func groupIntersectingPods1(pods []*volthaPod, podCt int) ([][]*volthaPod,[]*volthaPod) {
219 var rtrn [][]*volthaPod
220 var out []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500221
222 for {
223 if len(pods) == 0 {
224 break
225 }
226 if len(pods[0].devIds) == 0 { // Ignore pods with no devices
227 ////log.Debugf("%s empty pod", pd[k].pod.name)
228 out = append(out, pods[0])
229 pods = rmPod(pods, 0)
230 continue
231 }
232 // Start a pod group with this pod
sslobodr8e2ccb52019-02-05 09:21:47 -0500233 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500234 grp = append(grp, pods[0])
235 pods = rmPod(pods,0)
236 //log.Debugf("Creating new group %s", pd[k].pod.name)
237 // Find the peer pod based on device overlap
238 // It's ok if one isn't found, an empty one will be used instead
239 for k,_ := range pods {
240 if len(pods[k].devIds) == 0 { // Skip pods with no devices
241 //log.Debugf("%s empty pod", pd[k1].pod.name)
242 continue
243 }
244 if intersect(grp[0].devIds, pods[k].devIds) == true {
245 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
246 if grp[0].node == pods[k].node {
247 // This should never happen
248 log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
249 grp[0].name, pods[k].name)
250 continue
251 }
252 grp = append(grp, pods[k])
253 pods = rmPod(pods, k)
254 break
255
256 }
257 }
258 rtrn = append(rtrn, grp)
259 //log.Debugf("Added group %s", grp[0].name)
260 // Check if the number of groups = half the pods, if so all groups are started.
261 if len(rtrn) == podCt >> 1 {
262 // Append any remaining pods to out
263 out = append(out,pods[0:]...)
264 break
265 }
266 }
267 return rtrn,out
268}
269
sslobodr16e41bc2019-01-18 16:22:21 -0500270func unallocPodCount(pd []*podTrack) int {
271 var rtrn int = 0
272 for _,v := range pd {
273 if v.dn == false {
274 rtrn++
275 }
276 }
277 return rtrn
278}
279
280
sslobodr8e2ccb52019-02-05 09:21:47 -0500281func sameNode(pod *volthaPod, grps [][]*volthaPod) bool {
sslobodr16e41bc2019-01-18 16:22:21 -0500282 for _,v := range grps {
283 if v[0].node == pod.node {
284 return true
285 }
286 if len(v) == 2 && v[1].node == pod.node {
287 return true
288 }
289 }
290 return false
291}
292
sslobodr8e2ccb52019-02-05 09:21:47 -0500293func startRemainingGroups1(grps [][]*volthaPod, pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
294 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500295
296 for k,_ := range pods {
297 if sameNode(pods[k], grps) {
298 continue
299 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500300 grp = []*volthaPod{}
sslobodr16e41bc2019-01-18 16:22:21 -0500301 grp = append(grp, pods[k])
302 pods = rmPod(pods, k)
303 grps = append(grps, grp)
304 if len(grps) == podCt >> 1 {
305 break
306 }
307 }
308 return grps, pods
309}
310
sslobodr8e2ccb52019-02-05 09:21:47 -0500311func hasSingleSecondNode(grp []*volthaPod) bool {
sslobodr16e41bc2019-01-18 16:22:21 -0500312 var srvrs map[string]struct{} = make(map[string]struct{})
313 for k,_ := range grp {
314 if k == 0 {
315 continue // Ignore the first item
316 }
317 srvrs[grp[k].node] = struct{}{}
318 }
319 if len(srvrs) == 1 {
320 return true
321 }
322 return false
323}
324
sslobodr8e2ccb52019-02-05 09:21:47 -0500325func addNode(grps [][]*volthaPod, idx *volthaPod, item *volthaPod) [][]*volthaPod {
sslobodr16e41bc2019-01-18 16:22:21 -0500326 for k,_ := range grps {
327 if grps[k][0].name == idx.name {
328 grps[k] = append(grps[k], item)
329 return grps
330 }
331 }
332 // TODO: Error checking required here.
333 return grps
334}
335
sslobodr8e2ccb52019-02-05 09:21:47 -0500336func removeNode(grps [][]*volthaPod, item *volthaPod) [][]*volthaPod {
sslobodr16e41bc2019-01-18 16:22:21 -0500337 for k,_ := range grps {
338 for k1,_ := range grps[k] {
339 if grps[k][k1].name == item.name {
340 grps[k] = append(grps[k][:k1],grps[k][k1+1:]...)
341 break
342 }
343 }
344 }
345 return grps
346}
347
sslobodr8e2ccb52019-02-05 09:21:47 -0500348func groupRemainingPods1(grps [][]*volthaPod, pods []*volthaPod) [][]*volthaPod {
349 var lgrps [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500350 // All groups must be started when this function is called.
351 // Copy incomplete groups
352 for k,_ := range grps {
353 if len(grps[k]) != 2 {
354 lgrps = append(lgrps, grps[k])
355 }
356 }
357
358 // Add all pairing candidates to each started group.
359 for k,_ := range pods {
360 for k2,_ := range lgrps {
361 if lgrps[k2][0].node != pods[k].node {
362 lgrps[k2] = append(lgrps[k2], pods[k])
363 }
364 }
365 }
366
367 //TODO: If any member of lgrps doesn't have at least 2
368 // nodes something is wrong. Check for that here
369
370 for {
371 for { // Address groups with only a single server choice
372 var ssn bool = false
373
374 for k,_ := range lgrps {
375 // Now if any of the groups only have a single
376 // node as the choice for the second member
377 // address that one first.
378 if hasSingleSecondNode(lgrps[k]) == true {
379 ssn = true
380 // Add this pairing to the groups
381 grps = addNode(grps, lgrps[k][0], lgrps[k][1])
382 // Since this node is now used, remove it from all
383 // remaining tenative groups
384 lgrps = removeNode(lgrps, lgrps[k][1])
385 // Now remove this group completely since
386 // it's been addressed
387 lgrps = append(lgrps[:k],lgrps[k+1:]...)
388 break
389 }
390 }
391 if ssn == false {
392 break
393 }
394 }
395 // Now adress one of the remaining groups
396 if len(lgrps) == 0 {
397 break // Nothing left to do, exit the loop
398 }
399 grps = addNode(grps, lgrps[0][0], lgrps[0][1])
400 lgrps = removeNode(lgrps, lgrps[0][1])
401 lgrps = append(lgrps[:0],lgrps[1:]...)
402 }
403 return grps
404}
405
sslobodr8e2ccb52019-02-05 09:21:47 -0500406func groupPods1(pods []*volthaPod) [][]*volthaPod {
407 var rtrn [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500408 var podCt int = len(pods)
409
410 rtrn,pods = groupIntersectingPods1(pods, podCt)
411 // There are several outcomes here
412 // 1) All pods have been paired and we're done
413 // 2) Some un-allocated pods remain
414 // 2.a) All groups have been started
415 // 2.b) Not all groups have been started
416 if len(pods) == 0 {
417 return rtrn
418 } else if len(rtrn) == podCt >> 1 { // All groupings started
419 // Allocate the remaining (presumably empty) pods to the started groups
420 return groupRemainingPods1(rtrn, pods)
421 } else { // Some groupings started
422 // Start empty groups with remaining pods
423 // each grouping is on a different server then
424 // allocate remaining pods.
425 rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
426 return groupRemainingPods1(rtrn, pods)
427 }
428}
429
sslobodr16e41bc2019-01-18 16:22:21 -0500430func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
431 for k,_ := range d1 {
432 if _,ok := d2[k]; ok == true {
433 return true
434 }
435 }
436 return false
437}
438
sslobodr8e2ccb52019-02-05 09:21:47 -0500439func setConnection(client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
sslobodr16e41bc2019-01-18 16:22:21 -0500440 log.Debugf("Configuring backend %s : connection %s\n\n", backend, connection)
sslobodr8e2ccb52019-02-05 09:21:47 -0500441 cnf := &pb.Conn{Server:"grpc_command",Cluster:cluster, Backend:backend,
sslobodr16e41bc2019-01-18 16:22:21 -0500442 Connection:connection,Addr:addr,
443 Port:port}
444 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
445 log.Debugf("failed SetConnection RPC call: %s", err)
446 } else {
447 log.Debugf("Result: %v", res)
448 }
449}
450
451func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
452 log.Debugf("Configuring backend %s : affinities \n", backend)
453 aff := &pb.Affinity{Router:"vcore",Route:"dev_manager",Cluster:"vcore",Backend:backend}
454 for k,_ := range ids {
455 log.Debugf("Setting affinity for id %s", k)
456 aff.Id = k
457 if res, err := client.SetAffinity(context.Background(), aff); err != nil {
458 log.Debugf("failed affinity RPC call: %s", err)
459 } else {
460 log.Debugf("Result: %v", res)
461 }
462 }
463}
464
sslobodr8e2ccb52019-02-05 09:21:47 -0500465func getBackendForCore(coreId string, coreGroups [][]*volthaPod) string {
sslobodr6c1689c2019-01-24 07:31:15 -0500466 for _,v := range coreGroups {
467 for _,v2 := range v {
sslobodr38afd0d2019-01-21 12:31:46 -0500468 if v2.name == coreId {
469 return v2.backend
470 }
471 }
472 }
473 log.Errorf("No backend found for core %s\n", coreId)
474 return ""
475}
476
sslobodr16e41bc2019-01-18 16:22:21 -0500477func monitorDiscovery(client pb.ConfigurationClient,
sslobodr38afd0d2019-01-21 12:31:46 -0500478 ch <-chan *ic.InterContainerMessage,
sslobodr8e2ccb52019-02-05 09:21:47 -0500479 coreGroups [][]*volthaPod) {
sslobodr38afd0d2019-01-21 12:31:46 -0500480 var id map[string]struct{} = make(map[string]struct{})
481
sslobodr16e41bc2019-01-18 16:22:21 -0500482 select {
483 case msg := <-ch:
484 log.Debugf("Received a device discovery notification")
sslobodr38afd0d2019-01-21 12:31:46 -0500485 device := &ic.DeviceDiscovered{}
486 if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
sslobodr16e41bc2019-01-18 16:22:21 -0500487 log.Errorf("Could not unmarshal received notification %v", msg)
488 } else {
sslobodr38afd0d2019-01-21 12:31:46 -0500489 // Set the affinity of the discovered device.
490 if be := getBackendForCore(device.Id, coreGroups); be != "" {
491 id[device.Id]=struct{}{}
492 setAffinity(client, id, be)
493 } else {
494 log.Error("Cant use an empty string as a backend name")
495 }
sslobodr16e41bc2019-01-18 16:22:21 -0500496 }
497 break
498 }
499}
500
sslobodr38afd0d2019-01-21 12:31:46 -0500501func startDiscoveryMonitor(client pb.ConfigurationClient,
sslobodr8e2ccb52019-02-05 09:21:47 -0500502 coreGroups [][]*volthaPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500503 var ch <-chan *ic.InterContainerMessage
504 // Connect to kafka for discovery events
505 topic := &kafka.Topic{Name: "AffinityRouter"}
506 kc,err := newKafkaClient("sarama", "kafka", 9092, "arouterd")
507 kc.Start()
508
509 if ch, err = kc.Subscribe(topic); err != nil {
510 log.Error("Could not subscribe to the 'AffinityRouter' channel, discovery disabled")
511 return err
512 }
sslobodr38afd0d2019-01-21 12:31:46 -0500513 go monitorDiscovery(client, ch, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500514 return nil
515}
516
sslobodre7ce71d2019-01-22 16:21:45 -0500517// Determines which items in core groups
518// have changed based on the list provided
519// and returns a coreGroup with only the changed
520// items and a pod list with the new items
sslobodr8e2ccb52019-02-05 09:21:47 -0500521func getAddrDiffs(coreGroups [][]*volthaPod, rwPods []*volthaPod) ([][]*volthaPod, []*volthaPod) {
522 var nList []*volthaPod
523 var rtrn [][]*volthaPod = make([][]*volthaPod, nPods>>1)
sslobodre7ce71d2019-01-22 16:21:45 -0500524 var ipAddrs map[string]struct{} = make(map[string]struct{})
525
526 log.Debug("Get addr diffs")
527
528 // Start with an empty array
sslobodr6c1689c2019-01-24 07:31:15 -0500529 for k,_ := range rtrn {
sslobodr8e2ccb52019-02-05 09:21:47 -0500530 rtrn[k] = make([]*volthaPod, 2)
sslobodre7ce71d2019-01-22 16:21:45 -0500531 }
532
533 // Build a list with only the new items
sslobodr6c1689c2019-01-24 07:31:15 -0500534 for _,v := range rwPods {
sslobodre7ce71d2019-01-22 16:21:45 -0500535 if hasIpAddr(coreGroups, v.ipAddr) == false {
536 nList = append(nList, v)
537 }
538 ipAddrs[v.ipAddr] = struct{}{} // for the search below
539 }
540
541 // Now build the coreGroups with only the changed items
sslobodr6c1689c2019-01-24 07:31:15 -0500542 for k1,v1 := range coreGroups {
543 for k2,v2 := range v1 {
sslobodre7ce71d2019-01-22 16:21:45 -0500544 if _,ok := ipAddrs[v2.ipAddr]; ok == false {
545 rtrn[k1][k2] = v2
546 }
547 }
548 }
549 return rtrn, nList
550}
551
552// Figure out where best to put the new pods
553// in the coreGroup array based on the old
554// pods being replaced. The criteria is that
555// the new pod be on the same server as the
556// old pod was.
sslobodr8e2ccb52019-02-05 09:21:47 -0500557func reconcileAddrDiffs(coreGroupDiffs [][]*volthaPod, rwPodDiffs []*volthaPod) ([][]*volthaPod) {
558 var srvrs map[string][]*volthaPod = make(map[string][]*volthaPod)
sslobodre7ce71d2019-01-22 16:21:45 -0500559
560 log.Debug("Reconciling diffs")
561 log.Debug("Building server list")
sslobodr6c1689c2019-01-24 07:31:15 -0500562 for _,v := range rwPodDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500563 log.Debugf("Adding %v to the server list", *v)
564 srvrs[v.node] = append(srvrs[v.node], v)
565 }
566
sslobodr6c1689c2019-01-24 07:31:15 -0500567 for k1,v1 := range coreGroupDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500568 log.Debugf("k1:%v, v1:%v", k1,v1)
sslobodr6c1689c2019-01-24 07:31:15 -0500569 for k2,v2 := range v1 {
sslobodre7ce71d2019-01-22 16:21:45 -0500570 log.Debugf("k2:%v, v2:%v", k2,v2)
571 if v2 == nil { // Nothing to do here
572 continue
573 }
574 if _,ok := srvrs[v2.node]; ok == true {
575 coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
576 if len(srvrs[v2.node]) > 1 { // remove one entry from the list
577 srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
578 } else { // Delete the endtry from the map
579 delete(srvrs, v2.node)
580 }
581 } else {
582 log.Error("This should never happen, node appears to have changed names")
583 // attempt to limp along by keeping this old entry
584 }
585 }
586 }
587
588 return coreGroupDiffs
589}
590
sslobodr8e2ccb52019-02-05 09:21:47 -0500591func applyAddrDiffs(client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
592 var newEntries [][]*volthaPod
sslobodre7ce71d2019-01-22 16:21:45 -0500593
594 log.Debug("Applying diffs")
sslobodr8e2ccb52019-02-05 09:21:47 -0500595 switch cores := coreList.(type) {
596 case [][]*volthaPod:
597 newEntries = reconcileAddrDiffs(getAddrDiffs(cores, nPods))
sslobodre7ce71d2019-01-22 16:21:45 -0500598
sslobodr8e2ccb52019-02-05 09:21:47 -0500599 // Now replace the information in coreGropus with the new
600 // entries and then reconcile the device ids on the core
601 // that's in the new entry with the device ids of it's
602 // active-active peer.
603 for k1,v1 := range cores {
604 for k2,v2 := range v1 {
605 if newEntries[k1][k2] != nil {
606 // TODO: Missing is the case where bothe the primary
607 // and the secondary core crash and come back.
608 // Pull the device ids from the active-active peer
609 ids := queryPodDeviceIds(cores[k1][k2^1])
610 if len(ids) != 0 {
611 if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
612 log.Errorf("Attempt to reconcile ids on pod %v failed",newEntries[k1][k2])
613 }
sslobodre7ce71d2019-01-22 16:21:45 -0500614 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500615 // Send the affininty router new connection information
616 setConnection(client, "vcore", v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
617 // Copy the new entry information over
618 cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
619 cores[k1][k2].name = newEntries[k1][k2].name
620 cores[k1][k2].devIds = ids
sslobodre7ce71d2019-01-22 16:21:45 -0500621 }
sslobodre7ce71d2019-01-22 16:21:45 -0500622 }
623 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500624 case []*volthaPod:
625 var mia []*volthaPod
626 var found bool
627 // TODO: Break this using functions to simplify
628 // reading of the code.
629 // Find the core(s) that have changed addresses
630 for k1,v1 := range cores {
631 found = false
632 for _, v2 := range nPods {
633 if v1.ipAddr == v2.ipAddr {
634 found = true
635 break
636 }
637 }
638 if found == false {
639 mia = append(mia, cores[k1])
640 }
641 }
642 // Now plug in the new addresses and set the connection
643 for _,v1 := range nPods {
644 found = false
645 for _,v2 := range cores {
646 if v1.ipAddr == v2.ipAddr {
647 found = true
648 break
649 }
650 }
651 if found == true {
652 continue
653 }
654 mia[0].ipAddr = v1.ipAddr
655 mia[0].name = v1.name
656 setConnection(client, "ro_vcore", mia[0].backend, mia[0].connection, v1.ipAddr, 50057)
657 // Now get rid of the mia entry just processed
658 mia = append(mia[:0],mia[1:]...)
659 }
660 default:
661 log.Error("Internal: Unexpected type in call to applyAddrDiffs");
sslobodre7ce71d2019-01-22 16:21:45 -0500662 }
663}
664
sslobodr8e2ccb52019-02-05 09:21:47 -0500665func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
666 var byName map[string]*volthaPod = make(map[string]*volthaPod)
sslobodrcd37bc52019-01-24 11:47:16 -0500667
668 // Convinience
669 for _,v := range rwPods {
670 byName[v.name] = v
671 }
672
673 for k1,v1 := range coreGroups {
674 for k2,_ := range v1 {
675 coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
676 }
677 }
678}
679
sslobodr16e41bc2019-01-18 16:22:21 -0500680func startCoreMonitor(client pb.ConfigurationClient,
681 clientset *kubernetes.Clientset,
sslobodr8e2ccb52019-02-05 09:21:47 -0500682 rwCoreFltr *regexp.Regexp,
683 roCoreFltr *regexp.Regexp,
684 coreGroups [][]*volthaPod,
685 oRoPods []*volthaPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500686 // Now that initial allocation has been completed, monitor the pods
687 // for IP changes
688 // The main loop needs to do the following:
689 // 1) Periodically query the pods and filter out
690 // the vcore ones
691 // 2) Validate that the pods running are the same
692 // as the previous check
693 // 3) Validate that the IP addresses are the same
694 // as the last check.
695 // If the pod name(s) ha(s/ve) changed then remove
696 // the unused pod names and add in the new pod names
697 // maintaining the cluster/backend information.
698 // If an IP address has changed (which shouldn't
699 // happen unless a pod is re-started) it should get
700 // caught by the pod name change.
701 for {
702 time.Sleep(10 * time.Second) // Wait a while
703 // Get the rw core list from k8s
sslobodr8e2ccb52019-02-05 09:21:47 -0500704 rwPods := getVolthaPods(clientset, rwCoreFltr)
sslobodre7ce71d2019-01-22 16:21:45 -0500705 queryDeviceIds(rwPods)
sslobodrcd37bc52019-01-24 11:47:16 -0500706 updateDeviceIds(coreGroups, rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500707 // If we didn't get 2n+1 pods then wait since
708 // something is down and will hopefully come
709 // back up at some point.
710 // TODO: remove the 6 pod hardcoding
711 if len(rwPods) != 6 {
712 continue
713 }
714 // We have all pods, check if any IP addresses
715 // have changed.
716 for _,v := range rwPods {
sslobodre7ce71d2019-01-22 16:21:45 -0500717 if hasIpAddr(coreGroups, v.ipAddr) == false {
718 log.Debug("Address has changed...")
719 applyAddrDiffs(client, coreGroups, rwPods)
sslobodr8e2ccb52019-02-05 09:21:47 -0500720 break
sslobodre7ce71d2019-01-22 16:21:45 -0500721 }
sslobodr16e41bc2019-01-18 16:22:21 -0500722 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500723
724 roPods := getVolthaPods(clientset, roCoreFltr)
725
726 if len(roPods) != 3 {
727 continue
728 }
729 for _,v := range roPods {
730 if hasIpAddr(oRoPods, v.ipAddr) == false {
731 applyAddrDiffs(client, oRoPods, roPods)
732 break
733 }
734 }
735
sslobodr16e41bc2019-01-18 16:22:21 -0500736 }
sslobodr16e41bc2019-01-18 16:22:21 -0500737}
738
sslobodr8e2ccb52019-02-05 09:21:47 -0500739func hasIpAddr(coreList interface{}, ipAddr string) bool {
740 switch cores := coreList.(type) {
741 case []*volthaPod:
742 for _,v := range cores {
743 if v.ipAddr == ipAddr {
sslobodre7ce71d2019-01-22 16:21:45 -0500744 return true
745 }
746 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500747 case [][]*volthaPod:
748 for _,v1 := range cores {
749 for _,v2 := range v1 {
750 if v2.ipAddr == ipAddr {
751 return true
752 }
753 }
754 }
755 default:
756 log.Error("Internal: Unexpected type in call to hasIpAddr")
sslobodre7ce71d2019-01-22 16:21:45 -0500757 }
758 return false
759}
760
761
sslobodr16e41bc2019-01-18 16:22:21 -0500762func main() {
763 // This is currently hard coded to a cluster with 3 servers
764 //var connections map[string]configConn = make(map[string]configConn)
765 //var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
sslobodr16e41bc2019-01-18 16:22:21 -0500766 var err error
767 var conn *grpc.ClientConn
768
769
770 // Set up the regular expression to identify the voltha cores
sslobodr8e2ccb52019-02-05 09:21:47 -0500771 rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
772 roCoreFltr := regexp.MustCompile(`ro-core-`)
sslobodr16e41bc2019-01-18 16:22:21 -0500773
774 // Set up logging
775 if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
776 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
777 }
778
779 // Set up kubernetes api
780 clientset := k8sClientSet()
781
782 // Connect to the affinity router and set up the client
783 conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
sslobodrcd37bc52019-01-24 11:47:16 -0500784 defer conn.Close()
sslobodr16e41bc2019-01-18 16:22:21 -0500785 if err != nil {
786 panic(err.Error())
787 }
788 client := pb.NewConfigurationClient(conn)
789
790 // Get the voltha rw-core podes
sslobodr8e2ccb52019-02-05 09:21:47 -0500791 rwPods := getVolthaPods(clientset, rwCoreFltr)
sslobodr16e41bc2019-01-18 16:22:21 -0500792
793 // Fetch the devices held by each running core
sslobodre7ce71d2019-01-22 16:21:45 -0500794 queryDeviceIds(rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500795
796 // For debugging... comment out l8r
797 for _,v := range rwPods {
798 log.Debugf("Pod list %v", *v)
799 }
800
801 coreGroups := groupPods1(rwPods)
802
sslobodr16e41bc2019-01-18 16:22:21 -0500803 // Assign the groupings to the the backends and connections
804 for k,_ := range coreGroups {
805 for k1,_ := range coreGroups[k] {
sslobodr8e2ccb52019-02-05 09:21:47 -0500806 coreGroups[k][k1].cluster = "vcore"
sslobodr16e41bc2019-01-18 16:22:21 -0500807 coreGroups[k][k1].backend = "vcore"+strconv.Itoa(k+1)
808 coreGroups[k][k1].connection = "vcore"+strconv.Itoa(k+1)+strconv.Itoa(k1+1)
809 }
810 }
sslobodrcd37bc52019-01-24 11:47:16 -0500811 log.Info("Core gouping completed")
sslobodr16e41bc2019-01-18 16:22:21 -0500812
813 // TODO: Debugging code, comment out for production
814 for k,v := range coreGroups {
815 for k2,v2 := range v {
816 log.Debugf("Core group %d,%d: %v", k, k2, v2)
817 }
818 }
sslobodrcd37bc52019-01-24 11:47:16 -0500819 log.Info("Setting affinities")
sslobodr16e41bc2019-01-18 16:22:21 -0500820 // Now set the affinities for exising devices in the cores
821 for _,v := range coreGroups {
822 setAffinity(client, v[0].devIds, v[0].backend)
823 setAffinity(client, v[1].devIds, v[1].backend)
824 }
sslobodrcd37bc52019-01-24 11:47:16 -0500825 log.Info("Setting connections")
sslobodr16e41bc2019-01-18 16:22:21 -0500826 // Configure the backeds based on the calculated core groups
827 for _,v := range coreGroups {
sslobodr8e2ccb52019-02-05 09:21:47 -0500828 setConnection(client, "vcore", v[0].backend, v[0].connection, v[0].ipAddr, 50057)
829 setConnection(client, "vcore", v[1].backend, v[1].connection, v[1].ipAddr, 50057)
830 }
831
832 // Process the read only pods
833 roPods := getVolthaPods(clientset, roCoreFltr)
834 for k,v := range roPods {
835 log.Debugf("Processing ro_pod %v", v)
836 vN := "ro_vcore"+strconv.Itoa(k+1)
837 log.Debugf("Setting connection %s, %s, %s", vN, vN+"1", v.ipAddr)
838 roPods[k].cluster = "ro_core"
839 roPods[k].backend = vN
840 roPods[k].connection = vN+"1"
841 setConnection(client, "ro_vcore", v.backend, v.connection, v.ipAddr, 50057)
sslobodr16e41bc2019-01-18 16:22:21 -0500842 }
843
sslobodrcd37bc52019-01-24 11:47:16 -0500844 log.Info("Starting discovery monitoring")
sslobodr38afd0d2019-01-21 12:31:46 -0500845 startDiscoveryMonitor(client, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500846
sslobodrcd37bc52019-01-24 11:47:16 -0500847 log.Info("Starting core monitoring")
sslobodr8e2ccb52019-02-05 09:21:47 -0500848 startCoreMonitor(client, clientset, rwCoreFltr,
849 roCoreFltr, coreGroups, roPods) // Never returns
sslobodr16e41bc2019-01-18 16:22:21 -0500850 return
sslobodr16e41bc2019-01-18 16:22:21 -0500851}
852