blob: d8d7cff216e916929645b72237ca9b6de0c7ee80 [file] [log] [blame]
sslobodr16e41bc2019-01-18 16:22:21 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package main
18
19import (
sslobodr16e41bc2019-01-18 16:22:21 -050020 "time"
21 "regexp"
22 "errors"
23 "strconv"
sslobodr16e41bc2019-01-18 16:22:21 -050024
25 "k8s.io/client-go/rest"
26 "google.golang.org/grpc"
27 "golang.org/x/net/context"
28 "k8s.io/client-go/kubernetes"
29 "github.com/golang/protobuf/ptypes"
sslobodr16e41bc2019-01-18 16:22:21 -050030 "github.com/opencord/voltha-go/common/log"
31 kafka "github.com/opencord/voltha-go/kafka"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 empty "github.com/golang/protobuf/ptypes/empty"
William Kurkiandaa6bb22019-03-07 12:26:28 -050034 vpb "github.com/opencord/voltha-protos/go/voltha"
35 cmn "github.com/opencord/voltha-protos/go/common"
36 pb "github.com/opencord/voltha-protos/go/afrouter"
37 ic "github.com/opencord/voltha-protos/go/inter_container"
sslobodr16e41bc2019-01-18 16:22:21 -050038)
39
40type configConn struct {
41 Server string `json:"Server"`
42 Cluster string `json:"Cluster"`
43 Backend string `json:"Backend"`
44 connections map[string]connection
45}
46
47type connection struct {
48 Name string `json:"Connection"`
49 Addr string `json:"Addr"`
50 Port uint64 `json:"Port"`
51}
52
sslobodr8e2ccb52019-02-05 09:21:47 -050053type volthaPod struct {
sslobodr16e41bc2019-01-18 16:22:21 -050054 name string
55 ipAddr string
56 node string
57 devIds map[string]struct{}
sslobodr8e2ccb52019-02-05 09:21:47 -050058 cluster string
sslobodr16e41bc2019-01-18 16:22:21 -050059 backend string
60 connection string
61}
62
63type podTrack struct {
sslobodr8e2ccb52019-02-05 09:21:47 -050064 pod *volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -050065 dn bool
66}
67
sslobodre7ce71d2019-01-22 16:21:45 -050068var nPods int = 6
69
sslobodr16e41bc2019-01-18 16:22:21 -050070// Topic is affinityRouter
71// port: 9092
72
73func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
74
75 log.Infow("kafka-client-type", log.Fields{"client": clientType})
76 switch clientType {
77 case "sarama":
78 return kafka.NewSaramaClient(
79 kafka.Host(host),
80 kafka.Port(port),
81 kafka.ConsumerType(kafka.GroupCustomer),
82 kafka.ProducerReturnOnErrors(true),
83 kafka.ProducerReturnOnSuccess(true),
84 kafka.ProducerMaxRetries(6),
85 kafka.NumPartitions(3),
86 kafka.ConsumerGroupName(instanceID),
87 kafka.ConsumerGroupPrefix(instanceID),
88 kafka.AutoCreateTopic(false),
89 kafka.ProducerFlushFrequency(5),
90 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
91 }
92 return nil, errors.New("unsupported-client-type")
93}
94
95
96func k8sClientSet() *kubernetes.Clientset {
97 // creates the in-cluster config
98 config, err := rest.InClusterConfig()
99 if err != nil {
100 panic(err.Error())
101 }
102 // creates the clientset
103 clientset, err := kubernetes.NewForConfig(config)
104 if err != nil {
105 panic(err.Error())
106 }
107
108 return clientset
109}
110
111
112func connect(addr string) (*grpc.ClientConn, error) {
113 for ctr :=0 ; ctr < 100; ctr++ {
sslobodre7ce71d2019-01-22 16:21:45 -0500114 log.Debugf("Trying to connect to %s", addr)
sslobodr16e41bc2019-01-18 16:22:21 -0500115 conn, err := grpc.Dial(addr, grpc.WithInsecure())
116 if err != nil {
117 log.Debugf("Attempt to connect failed, retrying %v:", err)
118 } else {
119 log.Debugf("Connection succeeded")
120 return conn,err
121 }
122 time.Sleep(10 * time.Second)
123 }
124 log.Debugf("Too many connection attempts, giving up!")
125 return nil,errors.New("Timeout attempting to conect")
126}
127
sslobodr8e2ccb52019-02-05 09:21:47 -0500128func getVolthaPods(cs *kubernetes.Clientset, coreFilter * regexp.Regexp) []*volthaPod {
129 var rtrn []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500130
131 pods, err := cs.CoreV1().Pods("").List(metav1.ListOptions{})
132 if err != nil {
133 panic(err.Error())
134 }
sslobodre7ce71d2019-01-22 16:21:45 -0500135 //log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
sslobodr16e41bc2019-01-18 16:22:21 -0500136
sslobodre7ce71d2019-01-22 16:21:45 -0500137 for _,v := range pods.Items {
sslobodr16e41bc2019-01-18 16:22:21 -0500138 if v.Namespace == "voltha" && coreFilter.MatchString(v.Name) {
sslobodre7ce71d2019-01-22 16:21:45 -0500139 log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
sslobodr16e41bc2019-01-18 16:22:21 -0500140 v.Status.PodIP, v.Spec.NodeName)
sslobodre7ce71d2019-01-22 16:21:45 -0500141 // Only add the pod if it has an IP address. If it doesn't then it likely crashed and
142 // and is still in the process of getting re-started.
143 if v.Status.PodIP != "" {
sslobodr8e2ccb52019-02-05 09:21:47 -0500144 rtrn = append(rtrn, &volthaPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
sslobodre7ce71d2019-01-22 16:21:45 -0500145 devIds:make(map[string]struct{}), backend:"", connection:""})
146 }
sslobodr16e41bc2019-01-18 16:22:21 -0500147 }
148 }
149 return rtrn
150}
151
sslobodr8e2ccb52019-02-05 09:21:47 -0500152func reconcilePodDeviceIds(pod * volthaPod, ids map[string]struct{}) bool {
sslobodre7ce71d2019-01-22 16:21:45 -0500153 var idList cmn.IDs
sslobodr6c1689c2019-01-24 07:31:15 -0500154 for k,_ := range ids {
sslobodre7ce71d2019-01-22 16:21:45 -0500155 idList.Items = append(idList.Items, &cmn.ID{Id:k})
156 }
157 conn,err := connect(pod.ipAddr+":50057")
158 defer conn.Close()
sslobodr6c1689c2019-01-24 07:31:15 -0500159 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500160 log.Debugf("Could not query devices from %s, could not connect", pod.name)
161 return false
162 }
163 client := vpb.NewVolthaServiceClient(conn)
164 _,err = client.ReconcileDevices(context.Background(), &idList)
165 if err != nil {
166 log.Error(err)
167 return false
168 }
169
170 return true
171}
172
sslobodr8e2ccb52019-02-05 09:21:47 -0500173func queryPodDeviceIds(pod * volthaPod) map[string]struct{} {
sslobodre7ce71d2019-01-22 16:21:45 -0500174 var rtrn map[string]struct{} = make(map[string]struct{})
175 // Open a connection to the pod
176 // port 50057
177 conn, err := connect(pod.ipAddr+":50057")
sslobodr6c1689c2019-01-24 07:31:15 -0500178 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500179 log.Debugf("Could not query devices from %s, could not connect", pod.name)
180 return rtrn
181 }
182 defer conn.Close()
183 client := vpb.NewVolthaServiceClient(conn)
184 devs,err := client.ListDeviceIds(context.Background(), &empty.Empty{})
185 if err != nil {
186 log.Error(err)
187 return rtrn
188 }
189 for _,dv := range devs.Items {
190 rtrn[dv.Id]=struct{}{}
191 }
192
193 return rtrn
194}
195
sslobodr8e2ccb52019-02-05 09:21:47 -0500196func queryDeviceIds(pods []*volthaPod) {
sslobodre7ce71d2019-01-22 16:21:45 -0500197 for pk,_ := range pods {
198 // Keep the old Id list if a new list is not returned
199 if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
200 pods[pk].devIds = idList
sslobodr16e41bc2019-01-18 16:22:21 -0500201 }
sslobodr16e41bc2019-01-18 16:22:21 -0500202 }
203}
204
sslobodr8e2ccb52019-02-05 09:21:47 -0500205func allEmpty(pods []*volthaPod) bool {
sslobodr16e41bc2019-01-18 16:22:21 -0500206 for k,_ := range pods {
207 if len(pods[k].devIds) != 0 {
208 return false
209 }
210 }
211 return true
212}
213
sslobodr8e2ccb52019-02-05 09:21:47 -0500214func rmPod(pods []*volthaPod, idx int) []*volthaPod {
sslobodr16e41bc2019-01-18 16:22:21 -0500215 return append(pods[:idx],pods[idx+1:]...)
216}
217
sslobodr8e2ccb52019-02-05 09:21:47 -0500218func groupIntersectingPods1(pods []*volthaPod, podCt int) ([][]*volthaPod,[]*volthaPod) {
219 var rtrn [][]*volthaPod
220 var out []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500221
222 for {
223 if len(pods) == 0 {
224 break
225 }
226 if len(pods[0].devIds) == 0 { // Ignore pods with no devices
227 ////log.Debugf("%s empty pod", pd[k].pod.name)
228 out = append(out, pods[0])
229 pods = rmPod(pods, 0)
230 continue
231 }
232 // Start a pod group with this pod
sslobodr8e2ccb52019-02-05 09:21:47 -0500233 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500234 grp = append(grp, pods[0])
235 pods = rmPod(pods,0)
236 //log.Debugf("Creating new group %s", pd[k].pod.name)
237 // Find the peer pod based on device overlap
238 // It's ok if one isn't found, an empty one will be used instead
239 for k,_ := range pods {
240 if len(pods[k].devIds) == 0 { // Skip pods with no devices
241 //log.Debugf("%s empty pod", pd[k1].pod.name)
242 continue
243 }
244 if intersect(grp[0].devIds, pods[k].devIds) == true {
245 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
246 if grp[0].node == pods[k].node {
247 // This should never happen
248 log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
249 grp[0].name, pods[k].name)
250 continue
251 }
252 grp = append(grp, pods[k])
253 pods = rmPod(pods, k)
254 break
255
256 }
257 }
258 rtrn = append(rtrn, grp)
259 //log.Debugf("Added group %s", grp[0].name)
260 // Check if the number of groups = half the pods, if so all groups are started.
261 if len(rtrn) == podCt >> 1 {
262 // Append any remaining pods to out
263 out = append(out,pods[0:]...)
264 break
265 }
266 }
267 return rtrn,out
268}
269
sslobodr16e41bc2019-01-18 16:22:21 -0500270func unallocPodCount(pd []*podTrack) int {
271 var rtrn int = 0
272 for _,v := range pd {
273 if v.dn == false {
274 rtrn++
275 }
276 }
277 return rtrn
278}
279
280
sslobodr8e2ccb52019-02-05 09:21:47 -0500281func sameNode(pod *volthaPod, grps [][]*volthaPod) bool {
sslobodr16e41bc2019-01-18 16:22:21 -0500282 for _,v := range grps {
283 if v[0].node == pod.node {
284 return true
285 }
286 if len(v) == 2 && v[1].node == pod.node {
287 return true
288 }
289 }
290 return false
291}
292
sslobodr8e2ccb52019-02-05 09:21:47 -0500293func startRemainingGroups1(grps [][]*volthaPod, pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
294 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500295
296 for k,_ := range pods {
297 if sameNode(pods[k], grps) {
298 continue
299 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500300 grp = []*volthaPod{}
sslobodr16e41bc2019-01-18 16:22:21 -0500301 grp = append(grp, pods[k])
302 pods = rmPod(pods, k)
303 grps = append(grps, grp)
304 if len(grps) == podCt >> 1 {
305 break
306 }
307 }
308 return grps, pods
309}
310
sslobodr8e2ccb52019-02-05 09:21:47 -0500311func hasSingleSecondNode(grp []*volthaPod) bool {
sslobodr16e41bc2019-01-18 16:22:21 -0500312 var srvrs map[string]struct{} = make(map[string]struct{})
313 for k,_ := range grp {
314 if k == 0 {
315 continue // Ignore the first item
316 }
317 srvrs[grp[k].node] = struct{}{}
318 }
319 if len(srvrs) == 1 {
320 return true
321 }
322 return false
323}
324
sslobodr8e2ccb52019-02-05 09:21:47 -0500325func addNode(grps [][]*volthaPod, idx *volthaPod, item *volthaPod) [][]*volthaPod {
sslobodr16e41bc2019-01-18 16:22:21 -0500326 for k,_ := range grps {
327 if grps[k][0].name == idx.name {
328 grps[k] = append(grps[k], item)
329 return grps
330 }
331 }
332 // TODO: Error checking required here.
333 return grps
334}
335
sslobodr8e2ccb52019-02-05 09:21:47 -0500336func removeNode(grps [][]*volthaPod, item *volthaPod) [][]*volthaPod {
sslobodr16e41bc2019-01-18 16:22:21 -0500337 for k,_ := range grps {
338 for k1,_ := range grps[k] {
339 if grps[k][k1].name == item.name {
340 grps[k] = append(grps[k][:k1],grps[k][k1+1:]...)
341 break
342 }
343 }
344 }
345 return grps
346}
347
sslobodr8e2ccb52019-02-05 09:21:47 -0500348func groupRemainingPods1(grps [][]*volthaPod, pods []*volthaPod) [][]*volthaPod {
349 var lgrps [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500350 // All groups must be started when this function is called.
351 // Copy incomplete groups
352 for k,_ := range grps {
353 if len(grps[k]) != 2 {
354 lgrps = append(lgrps, grps[k])
355 }
356 }
357
358 // Add all pairing candidates to each started group.
359 for k,_ := range pods {
360 for k2,_ := range lgrps {
361 if lgrps[k2][0].node != pods[k].node {
362 lgrps[k2] = append(lgrps[k2], pods[k])
363 }
364 }
365 }
366
367 //TODO: If any member of lgrps doesn't have at least 2
368 // nodes something is wrong. Check for that here
369
370 for {
371 for { // Address groups with only a single server choice
372 var ssn bool = false
373
374 for k,_ := range lgrps {
375 // Now if any of the groups only have a single
376 // node as the choice for the second member
377 // address that one first.
378 if hasSingleSecondNode(lgrps[k]) == true {
379 ssn = true
380 // Add this pairing to the groups
381 grps = addNode(grps, lgrps[k][0], lgrps[k][1])
382 // Since this node is now used, remove it from all
383 // remaining tenative groups
384 lgrps = removeNode(lgrps, lgrps[k][1])
385 // Now remove this group completely since
386 // it's been addressed
387 lgrps = append(lgrps[:k],lgrps[k+1:]...)
388 break
389 }
390 }
391 if ssn == false {
392 break
393 }
394 }
395 // Now adress one of the remaining groups
396 if len(lgrps) == 0 {
397 break // Nothing left to do, exit the loop
398 }
399 grps = addNode(grps, lgrps[0][0], lgrps[0][1])
400 lgrps = removeNode(lgrps, lgrps[0][1])
401 lgrps = append(lgrps[:0],lgrps[1:]...)
402 }
403 return grps
404}
405
sslobodr8e2ccb52019-02-05 09:21:47 -0500406func groupPods1(pods []*volthaPod) [][]*volthaPod {
407 var rtrn [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500408 var podCt int = len(pods)
409
410 rtrn,pods = groupIntersectingPods1(pods, podCt)
411 // There are several outcomes here
412 // 1) All pods have been paired and we're done
413 // 2) Some un-allocated pods remain
414 // 2.a) All groups have been started
415 // 2.b) Not all groups have been started
416 if len(pods) == 0 {
417 return rtrn
418 } else if len(rtrn) == podCt >> 1 { // All groupings started
419 // Allocate the remaining (presumably empty) pods to the started groups
420 return groupRemainingPods1(rtrn, pods)
421 } else { // Some groupings started
422 // Start empty groups with remaining pods
423 // each grouping is on a different server then
424 // allocate remaining pods.
425 rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
426 return groupRemainingPods1(rtrn, pods)
427 }
428}
429
sslobodr16e41bc2019-01-18 16:22:21 -0500430func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
431 for k,_ := range d1 {
432 if _,ok := d2[k]; ok == true {
433 return true
434 }
435 }
436 return false
437}
438
sslobodr8e2ccb52019-02-05 09:21:47 -0500439func setConnection(client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
sslobodr360c8d72019-02-05 12:47:56 -0500440 log.Debugf("Configuring backend %s : connection %s in cluster %s\n\n",
441 backend, connection, cluster)
sslobodr8e2ccb52019-02-05 09:21:47 -0500442 cnf := &pb.Conn{Server:"grpc_command",Cluster:cluster, Backend:backend,
sslobodr16e41bc2019-01-18 16:22:21 -0500443 Connection:connection,Addr:addr,
444 Port:port}
445 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
446 log.Debugf("failed SetConnection RPC call: %s", err)
447 } else {
448 log.Debugf("Result: %v", res)
449 }
450}
451
452func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
453 log.Debugf("Configuring backend %s : affinities \n", backend)
454 aff := &pb.Affinity{Router:"vcore",Route:"dev_manager",Cluster:"vcore",Backend:backend}
455 for k,_ := range ids {
456 log.Debugf("Setting affinity for id %s", k)
457 aff.Id = k
458 if res, err := client.SetAffinity(context.Background(), aff); err != nil {
459 log.Debugf("failed affinity RPC call: %s", err)
460 } else {
461 log.Debugf("Result: %v", res)
462 }
463 }
464}
465
sslobodr8e2ccb52019-02-05 09:21:47 -0500466func getBackendForCore(coreId string, coreGroups [][]*volthaPod) string {
sslobodr6c1689c2019-01-24 07:31:15 -0500467 for _,v := range coreGroups {
468 for _,v2 := range v {
sslobodr38afd0d2019-01-21 12:31:46 -0500469 if v2.name == coreId {
470 return v2.backend
471 }
472 }
473 }
474 log.Errorf("No backend found for core %s\n", coreId)
475 return ""
476}
477
sslobodr16e41bc2019-01-18 16:22:21 -0500478func monitorDiscovery(client pb.ConfigurationClient,
sslobodr38afd0d2019-01-21 12:31:46 -0500479 ch <-chan *ic.InterContainerMessage,
sslobodr8e2ccb52019-02-05 09:21:47 -0500480 coreGroups [][]*volthaPod) {
sslobodr38afd0d2019-01-21 12:31:46 -0500481 var id map[string]struct{} = make(map[string]struct{})
482
sslobodr16e41bc2019-01-18 16:22:21 -0500483 select {
484 case msg := <-ch:
485 log.Debugf("Received a device discovery notification")
sslobodr38afd0d2019-01-21 12:31:46 -0500486 device := &ic.DeviceDiscovered{}
487 if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
sslobodr16e41bc2019-01-18 16:22:21 -0500488 log.Errorf("Could not unmarshal received notification %v", msg)
489 } else {
sslobodr38afd0d2019-01-21 12:31:46 -0500490 // Set the affinity of the discovered device.
491 if be := getBackendForCore(device.Id, coreGroups); be != "" {
492 id[device.Id]=struct{}{}
493 setAffinity(client, id, be)
494 } else {
495 log.Error("Cant use an empty string as a backend name")
496 }
sslobodr16e41bc2019-01-18 16:22:21 -0500497 }
498 break
499 }
500}
501
sslobodr38afd0d2019-01-21 12:31:46 -0500502func startDiscoveryMonitor(client pb.ConfigurationClient,
sslobodr8e2ccb52019-02-05 09:21:47 -0500503 coreGroups [][]*volthaPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500504 var ch <-chan *ic.InterContainerMessage
505 // Connect to kafka for discovery events
506 topic := &kafka.Topic{Name: "AffinityRouter"}
507 kc,err := newKafkaClient("sarama", "kafka", 9092, "arouterd")
508 kc.Start()
509
510 if ch, err = kc.Subscribe(topic); err != nil {
511 log.Error("Could not subscribe to the 'AffinityRouter' channel, discovery disabled")
512 return err
513 }
sslobodr38afd0d2019-01-21 12:31:46 -0500514 go monitorDiscovery(client, ch, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500515 return nil
516}
517
sslobodre7ce71d2019-01-22 16:21:45 -0500518// Determines which items in core groups
519// have changed based on the list provided
520// and returns a coreGroup with only the changed
521// items and a pod list with the new items
sslobodr8e2ccb52019-02-05 09:21:47 -0500522func getAddrDiffs(coreGroups [][]*volthaPod, rwPods []*volthaPod) ([][]*volthaPod, []*volthaPod) {
523 var nList []*volthaPod
524 var rtrn [][]*volthaPod = make([][]*volthaPod, nPods>>1)
sslobodre7ce71d2019-01-22 16:21:45 -0500525 var ipAddrs map[string]struct{} = make(map[string]struct{})
526
527 log.Debug("Get addr diffs")
528
529 // Start with an empty array
sslobodr6c1689c2019-01-24 07:31:15 -0500530 for k,_ := range rtrn {
sslobodr8e2ccb52019-02-05 09:21:47 -0500531 rtrn[k] = make([]*volthaPod, 2)
sslobodre7ce71d2019-01-22 16:21:45 -0500532 }
533
534 // Build a list with only the new items
sslobodr6c1689c2019-01-24 07:31:15 -0500535 for _,v := range rwPods {
sslobodre7ce71d2019-01-22 16:21:45 -0500536 if hasIpAddr(coreGroups, v.ipAddr) == false {
537 nList = append(nList, v)
538 }
539 ipAddrs[v.ipAddr] = struct{}{} // for the search below
540 }
541
542 // Now build the coreGroups with only the changed items
sslobodr6c1689c2019-01-24 07:31:15 -0500543 for k1,v1 := range coreGroups {
544 for k2,v2 := range v1 {
sslobodre7ce71d2019-01-22 16:21:45 -0500545 if _,ok := ipAddrs[v2.ipAddr]; ok == false {
546 rtrn[k1][k2] = v2
547 }
548 }
549 }
550 return rtrn, nList
551}
552
553// Figure out where best to put the new pods
554// in the coreGroup array based on the old
555// pods being replaced. The criteria is that
556// the new pod be on the same server as the
557// old pod was.
sslobodr8e2ccb52019-02-05 09:21:47 -0500558func reconcileAddrDiffs(coreGroupDiffs [][]*volthaPod, rwPodDiffs []*volthaPod) ([][]*volthaPod) {
559 var srvrs map[string][]*volthaPod = make(map[string][]*volthaPod)
sslobodre7ce71d2019-01-22 16:21:45 -0500560
561 log.Debug("Reconciling diffs")
562 log.Debug("Building server list")
sslobodr6c1689c2019-01-24 07:31:15 -0500563 for _,v := range rwPodDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500564 log.Debugf("Adding %v to the server list", *v)
565 srvrs[v.node] = append(srvrs[v.node], v)
566 }
567
sslobodr6c1689c2019-01-24 07:31:15 -0500568 for k1,v1 := range coreGroupDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500569 log.Debugf("k1:%v, v1:%v", k1,v1)
sslobodr6c1689c2019-01-24 07:31:15 -0500570 for k2,v2 := range v1 {
sslobodre7ce71d2019-01-22 16:21:45 -0500571 log.Debugf("k2:%v, v2:%v", k2,v2)
572 if v2 == nil { // Nothing to do here
573 continue
574 }
575 if _,ok := srvrs[v2.node]; ok == true {
576 coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
577 if len(srvrs[v2.node]) > 1 { // remove one entry from the list
578 srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
579 } else { // Delete the endtry from the map
580 delete(srvrs, v2.node)
581 }
582 } else {
583 log.Error("This should never happen, node appears to have changed names")
584 // attempt to limp along by keeping this old entry
585 }
586 }
587 }
588
589 return coreGroupDiffs
590}
591
sslobodr8e2ccb52019-02-05 09:21:47 -0500592func applyAddrDiffs(client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
593 var newEntries [][]*volthaPod
sslobodre7ce71d2019-01-22 16:21:45 -0500594
595 log.Debug("Applying diffs")
sslobodr8e2ccb52019-02-05 09:21:47 -0500596 switch cores := coreList.(type) {
597 case [][]*volthaPod:
598 newEntries = reconcileAddrDiffs(getAddrDiffs(cores, nPods))
sslobodre7ce71d2019-01-22 16:21:45 -0500599
sslobodr8e2ccb52019-02-05 09:21:47 -0500600 // Now replace the information in coreGropus with the new
601 // entries and then reconcile the device ids on the core
602 // that's in the new entry with the device ids of it's
603 // active-active peer.
604 for k1,v1 := range cores {
605 for k2,v2 := range v1 {
606 if newEntries[k1][k2] != nil {
607 // TODO: Missing is the case where bothe the primary
608 // and the secondary core crash and come back.
609 // Pull the device ids from the active-active peer
610 ids := queryPodDeviceIds(cores[k1][k2^1])
611 if len(ids) != 0 {
612 if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
613 log.Errorf("Attempt to reconcile ids on pod %v failed",newEntries[k1][k2])
614 }
sslobodre7ce71d2019-01-22 16:21:45 -0500615 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500616 // Send the affininty router new connection information
617 setConnection(client, "vcore", v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
618 // Copy the new entry information over
619 cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
620 cores[k1][k2].name = newEntries[k1][k2].name
621 cores[k1][k2].devIds = ids
sslobodre7ce71d2019-01-22 16:21:45 -0500622 }
sslobodre7ce71d2019-01-22 16:21:45 -0500623 }
624 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500625 case []*volthaPod:
626 var mia []*volthaPod
627 var found bool
628 // TODO: Break this using functions to simplify
629 // reading of the code.
630 // Find the core(s) that have changed addresses
631 for k1,v1 := range cores {
632 found = false
633 for _, v2 := range nPods {
634 if v1.ipAddr == v2.ipAddr {
635 found = true
636 break
637 }
638 }
639 if found == false {
640 mia = append(mia, cores[k1])
641 }
642 }
643 // Now plug in the new addresses and set the connection
644 for _,v1 := range nPods {
645 found = false
646 for _,v2 := range cores {
647 if v1.ipAddr == v2.ipAddr {
648 found = true
649 break
650 }
651 }
652 if found == true {
653 continue
654 }
655 mia[0].ipAddr = v1.ipAddr
656 mia[0].name = v1.name
657 setConnection(client, "ro_vcore", mia[0].backend, mia[0].connection, v1.ipAddr, 50057)
658 // Now get rid of the mia entry just processed
659 mia = append(mia[:0],mia[1:]...)
660 }
661 default:
662 log.Error("Internal: Unexpected type in call to applyAddrDiffs");
sslobodre7ce71d2019-01-22 16:21:45 -0500663 }
664}
665
sslobodr8e2ccb52019-02-05 09:21:47 -0500666func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
667 var byName map[string]*volthaPod = make(map[string]*volthaPod)
sslobodrcd37bc52019-01-24 11:47:16 -0500668
669 // Convinience
670 for _,v := range rwPods {
671 byName[v.name] = v
672 }
673
674 for k1,v1 := range coreGroups {
675 for k2,_ := range v1 {
676 coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
677 }
678 }
679}
680
sslobodr16e41bc2019-01-18 16:22:21 -0500681func startCoreMonitor(client pb.ConfigurationClient,
682 clientset *kubernetes.Clientset,
sslobodr8e2ccb52019-02-05 09:21:47 -0500683 rwCoreFltr *regexp.Regexp,
684 roCoreFltr *regexp.Regexp,
685 coreGroups [][]*volthaPod,
686 oRoPods []*volthaPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500687 // Now that initial allocation has been completed, monitor the pods
688 // for IP changes
689 // The main loop needs to do the following:
690 // 1) Periodically query the pods and filter out
691 // the vcore ones
692 // 2) Validate that the pods running are the same
693 // as the previous check
694 // 3) Validate that the IP addresses are the same
695 // as the last check.
696 // If the pod name(s) ha(s/ve) changed then remove
697 // the unused pod names and add in the new pod names
698 // maintaining the cluster/backend information.
699 // If an IP address has changed (which shouldn't
700 // happen unless a pod is re-started) it should get
701 // caught by the pod name change.
702 for {
703 time.Sleep(10 * time.Second) // Wait a while
704 // Get the rw core list from k8s
sslobodr8e2ccb52019-02-05 09:21:47 -0500705 rwPods := getVolthaPods(clientset, rwCoreFltr)
sslobodre7ce71d2019-01-22 16:21:45 -0500706 queryDeviceIds(rwPods)
sslobodrcd37bc52019-01-24 11:47:16 -0500707 updateDeviceIds(coreGroups, rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500708 // If we didn't get 2n+1 pods then wait since
709 // something is down and will hopefully come
710 // back up at some point.
711 // TODO: remove the 6 pod hardcoding
712 if len(rwPods) != 6 {
713 continue
714 }
715 // We have all pods, check if any IP addresses
716 // have changed.
717 for _,v := range rwPods {
sslobodre7ce71d2019-01-22 16:21:45 -0500718 if hasIpAddr(coreGroups, v.ipAddr) == false {
719 log.Debug("Address has changed...")
720 applyAddrDiffs(client, coreGroups, rwPods)
sslobodr8e2ccb52019-02-05 09:21:47 -0500721 break
sslobodre7ce71d2019-01-22 16:21:45 -0500722 }
sslobodr16e41bc2019-01-18 16:22:21 -0500723 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500724
725 roPods := getVolthaPods(clientset, roCoreFltr)
726
727 if len(roPods) != 3 {
728 continue
729 }
730 for _,v := range roPods {
731 if hasIpAddr(oRoPods, v.ipAddr) == false {
732 applyAddrDiffs(client, oRoPods, roPods)
733 break
734 }
735 }
736
sslobodr16e41bc2019-01-18 16:22:21 -0500737 }
sslobodr16e41bc2019-01-18 16:22:21 -0500738}
739
sslobodr8e2ccb52019-02-05 09:21:47 -0500740func hasIpAddr(coreList interface{}, ipAddr string) bool {
741 switch cores := coreList.(type) {
742 case []*volthaPod:
743 for _,v := range cores {
744 if v.ipAddr == ipAddr {
sslobodre7ce71d2019-01-22 16:21:45 -0500745 return true
746 }
747 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500748 case [][]*volthaPod:
749 for _,v1 := range cores {
750 for _,v2 := range v1 {
751 if v2.ipAddr == ipAddr {
752 return true
753 }
754 }
755 }
756 default:
757 log.Error("Internal: Unexpected type in call to hasIpAddr")
sslobodre7ce71d2019-01-22 16:21:45 -0500758 }
759 return false
760}
761
762
sslobodr16e41bc2019-01-18 16:22:21 -0500763func main() {
764 // This is currently hard coded to a cluster with 3 servers
765 //var connections map[string]configConn = make(map[string]configConn)
766 //var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
sslobodr16e41bc2019-01-18 16:22:21 -0500767 var err error
768 var conn *grpc.ClientConn
769
770
771 // Set up the regular expression to identify the voltha cores
sslobodr8e2ccb52019-02-05 09:21:47 -0500772 rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
773 roCoreFltr := regexp.MustCompile(`ro-core-`)
sslobodr16e41bc2019-01-18 16:22:21 -0500774
775 // Set up logging
776 if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
777 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
778 }
779
780 // Set up kubernetes api
781 clientset := k8sClientSet()
782
783 // Connect to the affinity router and set up the client
784 conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
sslobodrcd37bc52019-01-24 11:47:16 -0500785 defer conn.Close()
sslobodr16e41bc2019-01-18 16:22:21 -0500786 if err != nil {
787 panic(err.Error())
788 }
789 client := pb.NewConfigurationClient(conn)
790
791 // Get the voltha rw-core podes
sslobodr8e2ccb52019-02-05 09:21:47 -0500792 rwPods := getVolthaPods(clientset, rwCoreFltr)
sslobodr16e41bc2019-01-18 16:22:21 -0500793
794 // Fetch the devices held by each running core
sslobodre7ce71d2019-01-22 16:21:45 -0500795 queryDeviceIds(rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500796
797 // For debugging... comment out l8r
798 for _,v := range rwPods {
799 log.Debugf("Pod list %v", *v)
800 }
801
802 coreGroups := groupPods1(rwPods)
803
sslobodr16e41bc2019-01-18 16:22:21 -0500804 // Assign the groupings to the the backends and connections
805 for k,_ := range coreGroups {
806 for k1,_ := range coreGroups[k] {
sslobodr8e2ccb52019-02-05 09:21:47 -0500807 coreGroups[k][k1].cluster = "vcore"
sslobodr16e41bc2019-01-18 16:22:21 -0500808 coreGroups[k][k1].backend = "vcore"+strconv.Itoa(k+1)
809 coreGroups[k][k1].connection = "vcore"+strconv.Itoa(k+1)+strconv.Itoa(k1+1)
810 }
811 }
sslobodrcd37bc52019-01-24 11:47:16 -0500812 log.Info("Core gouping completed")
sslobodr16e41bc2019-01-18 16:22:21 -0500813
814 // TODO: Debugging code, comment out for production
815 for k,v := range coreGroups {
816 for k2,v2 := range v {
817 log.Debugf("Core group %d,%d: %v", k, k2, v2)
818 }
819 }
sslobodrcd37bc52019-01-24 11:47:16 -0500820 log.Info("Setting affinities")
sslobodr16e41bc2019-01-18 16:22:21 -0500821 // Now set the affinities for exising devices in the cores
822 for _,v := range coreGroups {
823 setAffinity(client, v[0].devIds, v[0].backend)
824 setAffinity(client, v[1].devIds, v[1].backend)
825 }
sslobodrcd37bc52019-01-24 11:47:16 -0500826 log.Info("Setting connections")
sslobodr16e41bc2019-01-18 16:22:21 -0500827 // Configure the backeds based on the calculated core groups
828 for _,v := range coreGroups {
sslobodr8e2ccb52019-02-05 09:21:47 -0500829 setConnection(client, "vcore", v[0].backend, v[0].connection, v[0].ipAddr, 50057)
830 setConnection(client, "vcore", v[1].backend, v[1].connection, v[1].ipAddr, 50057)
831 }
832
833 // Process the read only pods
834 roPods := getVolthaPods(clientset, roCoreFltr)
835 for k,v := range roPods {
836 log.Debugf("Processing ro_pod %v", v)
837 vN := "ro_vcore"+strconv.Itoa(k+1)
838 log.Debugf("Setting connection %s, %s, %s", vN, vN+"1", v.ipAddr)
839 roPods[k].cluster = "ro_core"
840 roPods[k].backend = vN
841 roPods[k].connection = vN+"1"
842 setConnection(client, "ro_vcore", v.backend, v.connection, v.ipAddr, 50057)
sslobodr16e41bc2019-01-18 16:22:21 -0500843 }
844
sslobodrcd37bc52019-01-24 11:47:16 -0500845 log.Info("Starting discovery monitoring")
sslobodr38afd0d2019-01-21 12:31:46 -0500846 startDiscoveryMonitor(client, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500847
sslobodrcd37bc52019-01-24 11:47:16 -0500848 log.Info("Starting core monitoring")
sslobodr8e2ccb52019-02-05 09:21:47 -0500849 startCoreMonitor(client, clientset, rwCoreFltr,
850 roCoreFltr, coreGroups, roPods) // Never returns
sslobodr16e41bc2019-01-18 16:22:21 -0500851 return
sslobodr16e41bc2019-01-18 16:22:21 -0500852}
853