blob: f5f2453264b23289818ecaf304663a03bc7c799a [file] [log] [blame]
sslobodr16e41bc2019-01-18 16:22:21 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package main
18
19import (
sslobodr16e41bc2019-01-18 16:22:21 -050020 "errors"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040021 "fmt"
22 "math"
23 "os"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040024 "regexp"
sslobodr16e41bc2019-01-18 16:22:21 -050025 "strconv"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040026 "time"
sslobodr16e41bc2019-01-18 16:22:21 -050027
sslobodr16e41bc2019-01-18 16:22:21 -050028 "github.com/golang/protobuf/ptypes"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040029 "github.com/golang/protobuf/ptypes/empty"
sslobodr16e41bc2019-01-18 16:22:21 -050030 "github.com/opencord/voltha-go/common/log"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040031 "github.com/opencord/voltha-go/kafka"
William Kurkiandaa6bb22019-03-07 12:26:28 -050032 pb "github.com/opencord/voltha-protos/go/afrouter"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040033 cmn "github.com/opencord/voltha-protos/go/common"
William Kurkiandaa6bb22019-03-07 12:26:28 -050034 ic "github.com/opencord/voltha-protos/go/inter_container"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040035 vpb "github.com/opencord/voltha-protos/go/voltha"
36 "golang.org/x/net/context"
37 "google.golang.org/grpc"
38 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 "k8s.io/client-go/kubernetes"
40 "k8s.io/client-go/rest"
sslobodr16e41bc2019-01-18 16:22:21 -050041)
42
sslobodr8e2ccb52019-02-05 09:21:47 -050043type volthaPod struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040044 name string
45 ipAddr string
46 node string
47 devIds map[string]struct{}
48 cluster string
49 backend string
sslobodr16e41bc2019-01-18 16:22:21 -050050 connection string
51}
52
53type podTrack struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040054 pod *volthaPod
55 dn bool
sslobodr16e41bc2019-01-18 16:22:21 -050056}
57
Kent Hagerman334a8ce2019-05-16 16:50:33 -040058var (
59 podNamespace = getStrEnv("POD_NAMESPACE", "voltha")
60 podGrpcPort = uint64(getIntEnv("POD_GRPC_PORT", 0, math.MaxUint16, 50057))
sslobodre7ce71d2019-01-22 16:21:45 -050061
Kent Hagerman334a8ce2019-05-16 16:50:33 -040062 numRWPods = getIntEnv("NUM_RW_PODS", 1, math.MaxInt32, 6)
63 numROPods = getIntEnv("NUM_RO_PODS", 1, math.MaxInt32, 3)
64
65 afrouterRouterName = getStrEnv("AFROUTER_ROUTER_NAME", "vcore")
66 afrouterRWClusterName = getStrEnv("AFROUTER_RW_CLUSTER_NAME", "vcore")
67 afrouterROClusterName = getStrEnv("AFROUTER_RO_CLUSTER_NAME", "ro_vcore")
68
69 kafkaTopic = getStrEnv("KAFKA_TOPIC", "AffinityRouter")
70 kafkaClientType = getStrEnv("KAFKA_CLIENT_TYPE", "sarama")
71 kafkaHost = getStrEnv("KAFKA_HOST", "kafka")
72 kafkaPort = getIntEnv("KAFKA_PORT", 0, math.MaxUint16, 9092)
73 kafkaInstanceID = getStrEnv("KAFKA_INSTANCE_ID", "arouterd")
74)
75
76func getIntEnv(key string, min, max, defaultValue int) int {
77 if val, have := os.LookupEnv(key); have {
78 num, err := strconv.Atoi(val)
79 if err != nil || !(min <= num && num <= max) {
80 panic(fmt.Errorf("%s must be a number in the range [%d, %d]; default: %d", key, min, max, defaultValue))
81 }
82 return num
83 }
84 return defaultValue
85}
86
87func getStrEnv(key, defaultValue string) string {
88 if val, have := os.LookupEnv(key); have {
89 return val
90 }
91 return defaultValue
92}
sslobodr16e41bc2019-01-18 16:22:21 -050093
94func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
95
96 log.Infow("kafka-client-type", log.Fields{"client": clientType})
97 switch clientType {
98 case "sarama":
99 return kafka.NewSaramaClient(
100 kafka.Host(host),
101 kafka.Port(port),
102 kafka.ConsumerType(kafka.GroupCustomer),
103 kafka.ProducerReturnOnErrors(true),
104 kafka.ProducerReturnOnSuccess(true),
105 kafka.ProducerMaxRetries(6),
106 kafka.NumPartitions(3),
107 kafka.ConsumerGroupName(instanceID),
108 kafka.ConsumerGroupPrefix(instanceID),
109 kafka.AutoCreateTopic(false),
110 kafka.ProducerFlushFrequency(5),
111 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
112 }
113 return nil, errors.New("unsupported-client-type")
114}
115
sslobodr16e41bc2019-01-18 16:22:21 -0500116func k8sClientSet() *kubernetes.Clientset {
117 // creates the in-cluster config
118 config, err := rest.InClusterConfig()
119 if err != nil {
120 panic(err.Error())
121 }
122 // creates the clientset
123 clientset, err := kubernetes.NewForConfig(config)
124 if err != nil {
125 panic(err.Error())
126 }
127
128 return clientset
129}
130
sslobodr16e41bc2019-01-18 16:22:21 -0500131func connect(addr string) (*grpc.ClientConn, error) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400132 for ctr := 0; ctr < 100; ctr++ {
sslobodre7ce71d2019-01-22 16:21:45 -0500133 log.Debugf("Trying to connect to %s", addr)
sslobodr16e41bc2019-01-18 16:22:21 -0500134 conn, err := grpc.Dial(addr, grpc.WithInsecure())
135 if err != nil {
136 log.Debugf("Attempt to connect failed, retrying %v:", err)
137 } else {
138 log.Debugf("Connection succeeded")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400139 return conn, err
sslobodr16e41bc2019-01-18 16:22:21 -0500140 }
141 time.Sleep(10 * time.Second)
142 }
143 log.Debugf("Too many connection attempts, giving up!")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400144 return nil, errors.New("Timeout attempting to conect")
sslobodr16e41bc2019-01-18 16:22:21 -0500145}
146
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400147func getVolthaPods(cs *kubernetes.Clientset, coreFilter *regexp.Regexp) []*volthaPod {
sslobodr8e2ccb52019-02-05 09:21:47 -0500148 var rtrn []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500149
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400150 pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
sslobodr16e41bc2019-01-18 16:22:21 -0500151 if err != nil {
152 panic(err.Error())
153 }
sslobodre7ce71d2019-01-22 16:21:45 -0500154 //log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
sslobodr16e41bc2019-01-18 16:22:21 -0500155
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400156 for _, v := range pods.Items {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400157 if coreFilter.MatchString(v.Name) {
sslobodre7ce71d2019-01-22 16:21:45 -0500158 log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400159 v.Status.PodIP, v.Spec.NodeName)
sslobodre7ce71d2019-01-22 16:21:45 -0500160 // Only add the pod if it has an IP address. If it doesn't then it likely crashed and
161 // and is still in the process of getting re-started.
162 if v.Status.PodIP != "" {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400163 rtrn = append(rtrn, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName,
164 devIds: make(map[string]struct{}), backend: "", connection: ""})
sslobodre7ce71d2019-01-22 16:21:45 -0500165 }
sslobodr16e41bc2019-01-18 16:22:21 -0500166 }
167 }
168 return rtrn
169}
170
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400171func reconcilePodDeviceIds(pod *volthaPod, ids map[string]struct{}) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400172 conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
sslobodre7ce71d2019-01-22 16:21:45 -0500173 defer conn.Close()
sslobodr6c1689c2019-01-24 07:31:15 -0500174 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500175 log.Debugf("Could not query devices from %s, could not connect", pod.name)
176 return false
177 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400178
179 var idList cmn.IDs
180 for k := range ids {
181 idList.Items = append(idList.Items, &cmn.ID{Id: k})
182 }
183
sslobodre7ce71d2019-01-22 16:21:45 -0500184 client := vpb.NewVolthaServiceClient(conn)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400185 _, err = client.ReconcileDevices(context.Background(), &idList)
sslobodre7ce71d2019-01-22 16:21:45 -0500186 if err != nil {
187 log.Error(err)
188 return false
189 }
190
191 return true
192}
193
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400194func queryPodDeviceIds(pod *volthaPod) map[string]struct{} {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400195 var rtrn = make(map[string]struct{})
sslobodre7ce71d2019-01-22 16:21:45 -0500196 // Open a connection to the pod
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400197 conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
sslobodr6c1689c2019-01-24 07:31:15 -0500198 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500199 log.Debugf("Could not query devices from %s, could not connect", pod.name)
200 return rtrn
201 }
202 defer conn.Close()
203 client := vpb.NewVolthaServiceClient(conn)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400204 devs, err := client.ListDeviceIds(context.Background(), &empty.Empty{})
sslobodre7ce71d2019-01-22 16:21:45 -0500205 if err != nil {
206 log.Error(err)
207 return rtrn
208 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400209 for _, dv := range devs.Items {
210 rtrn[dv.Id] = struct{}{}
sslobodre7ce71d2019-01-22 16:21:45 -0500211 }
212
213 return rtrn
214}
215
sslobodr8e2ccb52019-02-05 09:21:47 -0500216func queryDeviceIds(pods []*volthaPod) {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400217 for pk := range pods {
sslobodre7ce71d2019-01-22 16:21:45 -0500218 // Keep the old Id list if a new list is not returned
219 if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
220 pods[pk].devIds = idList
sslobodr16e41bc2019-01-18 16:22:21 -0500221 }
sslobodr16e41bc2019-01-18 16:22:21 -0500222 }
223}
224
sslobodr8e2ccb52019-02-05 09:21:47 -0500225func allEmpty(pods []*volthaPod) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400226 for k := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500227 if len(pods[k].devIds) != 0 {
228 return false
229 }
230 }
231 return true
232}
233
sslobodr8e2ccb52019-02-05 09:21:47 -0500234func rmPod(pods []*volthaPod, idx int) []*volthaPod {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400235 return append(pods[:idx], pods[idx+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500236}
237
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400238func groupIntersectingPods1(pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
sslobodr8e2ccb52019-02-05 09:21:47 -0500239 var rtrn [][]*volthaPod
240 var out []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500241
242 for {
243 if len(pods) == 0 {
244 break
245 }
246 if len(pods[0].devIds) == 0 { // Ignore pods with no devices
247 ////log.Debugf("%s empty pod", pd[k].pod.name)
248 out = append(out, pods[0])
249 pods = rmPod(pods, 0)
250 continue
251 }
252 // Start a pod group with this pod
sslobodr8e2ccb52019-02-05 09:21:47 -0500253 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500254 grp = append(grp, pods[0])
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400255 pods = rmPod(pods, 0)
sslobodr16e41bc2019-01-18 16:22:21 -0500256 //log.Debugf("Creating new group %s", pd[k].pod.name)
257 // Find the peer pod based on device overlap
258 // It's ok if one isn't found, an empty one will be used instead
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400259 for k := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500260 if len(pods[k].devIds) == 0 { // Skip pods with no devices
261 //log.Debugf("%s empty pod", pd[k1].pod.name)
262 continue
263 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400264 if intersect(grp[0].devIds, pods[k].devIds) {
sslobodr16e41bc2019-01-18 16:22:21 -0500265 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
266 if grp[0].node == pods[k].node {
267 // This should never happen
268 log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400269 grp[0].name, pods[k].name)
sslobodr16e41bc2019-01-18 16:22:21 -0500270 continue
271 }
272 grp = append(grp, pods[k])
273 pods = rmPod(pods, k)
274 break
275
276 }
277 }
278 rtrn = append(rtrn, grp)
279 //log.Debugf("Added group %s", grp[0].name)
280 // Check if the number of groups = half the pods, if so all groups are started.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400281 if len(rtrn) == podCt>>1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500282 // Append any remaining pods to out
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400283 out = append(out, pods[0:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500284 break
285 }
286 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400287 return rtrn, out
sslobodr16e41bc2019-01-18 16:22:21 -0500288}
289
sslobodr16e41bc2019-01-18 16:22:21 -0500290func unallocPodCount(pd []*podTrack) int {
291 var rtrn int = 0
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400292 for _, v := range pd {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400293 if !v.dn {
sslobodr16e41bc2019-01-18 16:22:21 -0500294 rtrn++
295 }
296 }
297 return rtrn
298}
299
sslobodr8e2ccb52019-02-05 09:21:47 -0500300func sameNode(pod *volthaPod, grps [][]*volthaPod) bool {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400301 for _, v := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500302 if v[0].node == pod.node {
303 return true
304 }
305 if len(v) == 2 && v[1].node == pod.node {
306 return true
307 }
308 }
309 return false
310}
311
sslobodr8e2ccb52019-02-05 09:21:47 -0500312func startRemainingGroups1(grps [][]*volthaPod, pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
313 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500314
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400315 for k := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500316 if sameNode(pods[k], grps) {
317 continue
318 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500319 grp = []*volthaPod{}
sslobodr16e41bc2019-01-18 16:22:21 -0500320 grp = append(grp, pods[k])
321 pods = rmPod(pods, k)
322 grps = append(grps, grp)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400323 if len(grps) == podCt>>1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500324 break
325 }
326 }
327 return grps, pods
328}
329
sslobodr8e2ccb52019-02-05 09:21:47 -0500330func hasSingleSecondNode(grp []*volthaPod) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400331 var servers = make(map[string]struct{})
332 for k := range grp {
sslobodr16e41bc2019-01-18 16:22:21 -0500333 if k == 0 {
334 continue // Ignore the first item
335 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400336 servers[grp[k].node] = struct{}{}
sslobodr16e41bc2019-01-18 16:22:21 -0500337 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400338 if len(servers) == 1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500339 return true
340 }
341 return false
342}
343
sslobodr8e2ccb52019-02-05 09:21:47 -0500344func addNode(grps [][]*volthaPod, idx *volthaPod, item *volthaPod) [][]*volthaPod {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400345 for k := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500346 if grps[k][0].name == idx.name {
347 grps[k] = append(grps[k], item)
348 return grps
349 }
350 }
351 // TODO: Error checking required here.
352 return grps
353}
354
sslobodr8e2ccb52019-02-05 09:21:47 -0500355func removeNode(grps [][]*volthaPod, item *volthaPod) [][]*volthaPod {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400356 for k := range grps {
357 for k1 := range grps[k] {
sslobodr16e41bc2019-01-18 16:22:21 -0500358 if grps[k][k1].name == item.name {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400359 grps[k] = append(grps[k][:k1], grps[k][k1+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500360 break
361 }
362 }
363 }
364 return grps
365}
366
sslobodr8e2ccb52019-02-05 09:21:47 -0500367func groupRemainingPods1(grps [][]*volthaPod, pods []*volthaPod) [][]*volthaPod {
368 var lgrps [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500369 // All groups must be started when this function is called.
370 // Copy incomplete groups
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400371 for k := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500372 if len(grps[k]) != 2 {
373 lgrps = append(lgrps, grps[k])
374 }
375 }
376
377 // Add all pairing candidates to each started group.
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400378 for k := range pods {
379 for k2 := range lgrps {
sslobodr16e41bc2019-01-18 16:22:21 -0500380 if lgrps[k2][0].node != pods[k].node {
381 lgrps[k2] = append(lgrps[k2], pods[k])
382 }
383 }
384 }
385
386 //TODO: If any member of lgrps doesn't have at least 2
387 // nodes something is wrong. Check for that here
388
389 for {
390 for { // Address groups with only a single server choice
391 var ssn bool = false
392
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400393 for k := range lgrps {
sslobodr16e41bc2019-01-18 16:22:21 -0500394 // Now if any of the groups only have a single
395 // node as the choice for the second member
396 // address that one first.
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400397 if hasSingleSecondNode(lgrps[k]) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400398 ssn = true
sslobodr16e41bc2019-01-18 16:22:21 -0500399 // Add this pairing to the groups
400 grps = addNode(grps, lgrps[k][0], lgrps[k][1])
401 // Since this node is now used, remove it from all
402 // remaining tenative groups
403 lgrps = removeNode(lgrps, lgrps[k][1])
404 // Now remove this group completely since
405 // it's been addressed
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400406 lgrps = append(lgrps[:k], lgrps[k+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500407 break
408 }
409 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400410 if !ssn {
sslobodr16e41bc2019-01-18 16:22:21 -0500411 break
412 }
413 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400414 // Now address one of the remaining groups
sslobodr16e41bc2019-01-18 16:22:21 -0500415 if len(lgrps) == 0 {
416 break // Nothing left to do, exit the loop
417 }
418 grps = addNode(grps, lgrps[0][0], lgrps[0][1])
419 lgrps = removeNode(lgrps, lgrps[0][1])
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400420 lgrps = append(lgrps[:0], lgrps[1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500421 }
422 return grps
423}
424
sslobodr8e2ccb52019-02-05 09:21:47 -0500425func groupPods1(pods []*volthaPod) [][]*volthaPod {
426 var rtrn [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500427 var podCt int = len(pods)
428
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400429 rtrn, pods = groupIntersectingPods1(pods, podCt)
430 // There are several outcomes here
sslobodr16e41bc2019-01-18 16:22:21 -0500431 // 1) All pods have been paired and we're done
432 // 2) Some un-allocated pods remain
433 // 2.a) All groups have been started
434 // 2.b) Not all groups have been started
435 if len(pods) == 0 {
436 return rtrn
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400437 } else if len(rtrn) == podCt>>1 { // All groupings started
sslobodr16e41bc2019-01-18 16:22:21 -0500438 // Allocate the remaining (presumably empty) pods to the started groups
439 return groupRemainingPods1(rtrn, pods)
440 } else { // Some groupings started
441 // Start empty groups with remaining pods
442 // each grouping is on a different server then
443 // allocate remaining pods.
444 rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
445 return groupRemainingPods1(rtrn, pods)
446 }
447}
448
sslobodr16e41bc2019-01-18 16:22:21 -0500449func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400450 for k := range d1 {
451 if _, ok := d2[k]; ok {
sslobodr16e41bc2019-01-18 16:22:21 -0500452 return true
453 }
454 }
455 return false
456}
457
sslobodr8e2ccb52019-02-05 09:21:47 -0500458func setConnection(client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
sslobodr360c8d72019-02-05 12:47:56 -0500459 log.Debugf("Configuring backend %s : connection %s in cluster %s\n\n",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400460 backend, connection, cluster)
461 cnf := &pb.Conn{Server: "grpc_command", Cluster: cluster, Backend: backend,
462 Connection: connection, Addr: addr,
463 Port: port}
sslobodr16e41bc2019-01-18 16:22:21 -0500464 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
465 log.Debugf("failed SetConnection RPC call: %s", err)
466 } else {
467 log.Debugf("Result: %v", res)
468 }
469}
470
471func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
472 log.Debugf("Configuring backend %s : affinities \n", backend)
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400473 aff := &pb.Affinity{Router: afrouterRouterName, Route: "dev_manager", Cluster: afrouterRWClusterName, Backend: backend}
474 for k := range ids {
sslobodr16e41bc2019-01-18 16:22:21 -0500475 log.Debugf("Setting affinity for id %s", k)
476 aff.Id = k
477 if res, err := client.SetAffinity(context.Background(), aff); err != nil {
478 log.Debugf("failed affinity RPC call: %s", err)
479 } else {
480 log.Debugf("Result: %v", res)
481 }
482 }
483}
484
sslobodr8e2ccb52019-02-05 09:21:47 -0500485func getBackendForCore(coreId string, coreGroups [][]*volthaPod) string {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400486 for _, v := range coreGroups {
487 for _, v2 := range v {
sslobodr38afd0d2019-01-21 12:31:46 -0500488 if v2.name == coreId {
489 return v2.backend
490 }
491 }
492 }
493 log.Errorf("No backend found for core %s\n", coreId)
494 return ""
495}
496
sslobodr16e41bc2019-01-18 16:22:21 -0500497func monitorDiscovery(client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400498 ch <-chan *ic.InterContainerMessage,
499 coreGroups [][]*volthaPod) {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400500 var id = make(map[string]struct{})
sslobodr38afd0d2019-01-21 12:31:46 -0500501
sslobodr16e41bc2019-01-18 16:22:21 -0500502 select {
503 case msg := <-ch:
504 log.Debugf("Received a device discovery notification")
sslobodr38afd0d2019-01-21 12:31:46 -0500505 device := &ic.DeviceDiscovered{}
506 if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
sslobodr16e41bc2019-01-18 16:22:21 -0500507 log.Errorf("Could not unmarshal received notification %v", msg)
508 } else {
sslobodr38afd0d2019-01-21 12:31:46 -0500509 // Set the affinity of the discovered device.
510 if be := getBackendForCore(device.Id, coreGroups); be != "" {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400511 id[device.Id] = struct{}{}
sslobodr38afd0d2019-01-21 12:31:46 -0500512 setAffinity(client, id, be)
513 } else {
514 log.Error("Cant use an empty string as a backend name")
515 }
sslobodr16e41bc2019-01-18 16:22:21 -0500516 }
517 break
518 }
519}
520
sslobodr38afd0d2019-01-21 12:31:46 -0500521func startDiscoveryMonitor(client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400522 coreGroups [][]*volthaPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500523 var ch <-chan *ic.InterContainerMessage
524 // Connect to kafka for discovery events
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400525 topic := &kafka.Topic{Name: kafkaTopic}
526 kc, err := newKafkaClient(kafkaClientType, kafkaHost, kafkaPort, kafkaInstanceID)
sslobodr16e41bc2019-01-18 16:22:21 -0500527 kc.Start()
528
529 if ch, err = kc.Subscribe(topic); err != nil {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400530 log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
sslobodr16e41bc2019-01-18 16:22:21 -0500531 return err
532 }
sslobodr38afd0d2019-01-21 12:31:46 -0500533 go monitorDiscovery(client, ch, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500534 return nil
535}
536
sslobodre7ce71d2019-01-22 16:21:45 -0500537// Determines which items in core groups
538// have changed based on the list provided
539// and returns a coreGroup with only the changed
540// items and a pod list with the new items
sslobodr8e2ccb52019-02-05 09:21:47 -0500541func getAddrDiffs(coreGroups [][]*volthaPod, rwPods []*volthaPod) ([][]*volthaPod, []*volthaPod) {
542 var nList []*volthaPod
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400543 var rtrn = make([][]*volthaPod, numRWPods>>1)
544 var ipAddrs = make(map[string]struct{})
sslobodre7ce71d2019-01-22 16:21:45 -0500545
546 log.Debug("Get addr diffs")
547
548 // Start with an empty array
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400549 for k := range rtrn {
sslobodr8e2ccb52019-02-05 09:21:47 -0500550 rtrn[k] = make([]*volthaPod, 2)
sslobodre7ce71d2019-01-22 16:21:45 -0500551 }
552
553 // Build a list with only the new items
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400554 for _, v := range rwPods {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400555 if !hasIpAddr(coreGroups, v.ipAddr) {
sslobodre7ce71d2019-01-22 16:21:45 -0500556 nList = append(nList, v)
557 }
558 ipAddrs[v.ipAddr] = struct{}{} // for the search below
559 }
560
561 // Now build the coreGroups with only the changed items
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400562 for k1, v1 := range coreGroups {
563 for k2, v2 := range v1 {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400564 if _, ok := ipAddrs[v2.ipAddr]; !ok {
sslobodre7ce71d2019-01-22 16:21:45 -0500565 rtrn[k1][k2] = v2
566 }
567 }
568 }
569 return rtrn, nList
570}
571
572// Figure out where best to put the new pods
573// in the coreGroup array based on the old
574// pods being replaced. The criteria is that
575// the new pod be on the same server as the
576// old pod was.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400577func reconcileAddrDiffs(coreGroupDiffs [][]*volthaPod, rwPodDiffs []*volthaPod) [][]*volthaPod {
sslobodr8e2ccb52019-02-05 09:21:47 -0500578 var srvrs map[string][]*volthaPod = make(map[string][]*volthaPod)
sslobodre7ce71d2019-01-22 16:21:45 -0500579
580 log.Debug("Reconciling diffs")
581 log.Debug("Building server list")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400582 for _, v := range rwPodDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500583 log.Debugf("Adding %v to the server list", *v)
584 srvrs[v.node] = append(srvrs[v.node], v)
585 }
586
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400587 for k1, v1 := range coreGroupDiffs {
588 log.Debugf("k1:%v, v1:%v", k1, v1)
589 for k2, v2 := range v1 {
590 log.Debugf("k2:%v, v2:%v", k2, v2)
sslobodre7ce71d2019-01-22 16:21:45 -0500591 if v2 == nil { // Nothing to do here
592 continue
593 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400594 if _, ok := srvrs[v2.node]; ok {
sslobodre7ce71d2019-01-22 16:21:45 -0500595 coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
596 if len(srvrs[v2.node]) > 1 { // remove one entry from the list
597 srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
598 } else { // Delete the endtry from the map
599 delete(srvrs, v2.node)
600 }
601 } else {
602 log.Error("This should never happen, node appears to have changed names")
603 // attempt to limp along by keeping this old entry
604 }
605 }
606 }
607
608 return coreGroupDiffs
609}
610
sslobodr8e2ccb52019-02-05 09:21:47 -0500611func applyAddrDiffs(client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
612 var newEntries [][]*volthaPod
sslobodre7ce71d2019-01-22 16:21:45 -0500613
614 log.Debug("Applying diffs")
sslobodr8e2ccb52019-02-05 09:21:47 -0500615 switch cores := coreList.(type) {
616 case [][]*volthaPod:
617 newEntries = reconcileAddrDiffs(getAddrDiffs(cores, nPods))
sslobodre7ce71d2019-01-22 16:21:45 -0500618
sslobodr8e2ccb52019-02-05 09:21:47 -0500619 // Now replace the information in coreGropus with the new
620 // entries and then reconcile the device ids on the core
621 // that's in the new entry with the device ids of it's
622 // active-active peer.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400623 for k1, v1 := range cores {
624 for k2, v2 := range v1 {
sslobodr8e2ccb52019-02-05 09:21:47 -0500625 if newEntries[k1][k2] != nil {
626 // TODO: Missing is the case where bothe the primary
627 // and the secondary core crash and come back.
628 // Pull the device ids from the active-active peer
629 ids := queryPodDeviceIds(cores[k1][k2^1])
630 if len(ids) != 0 {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400631 if !reconcilePodDeviceIds(newEntries[k1][k2], ids) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400632 log.Errorf("Attempt to reconcile ids on pod %v failed", newEntries[k1][k2])
sslobodr8e2ccb52019-02-05 09:21:47 -0500633 }
sslobodre7ce71d2019-01-22 16:21:45 -0500634 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500635 // Send the affininty router new connection information
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400636 setConnection(client, afrouterRWClusterName, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, podGrpcPort)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400637 // Copy the new entry information over
sslobodr8e2ccb52019-02-05 09:21:47 -0500638 cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
639 cores[k1][k2].name = newEntries[k1][k2].name
640 cores[k1][k2].devIds = ids
sslobodre7ce71d2019-01-22 16:21:45 -0500641 }
sslobodre7ce71d2019-01-22 16:21:45 -0500642 }
643 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500644 case []*volthaPod:
645 var mia []*volthaPod
646 var found bool
647 // TODO: Break this using functions to simplify
648 // reading of the code.
649 // Find the core(s) that have changed addresses
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400650 for k1, v1 := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500651 found = false
652 for _, v2 := range nPods {
653 if v1.ipAddr == v2.ipAddr {
654 found = true
655 break
656 }
657 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400658 if !found {
sslobodr8e2ccb52019-02-05 09:21:47 -0500659 mia = append(mia, cores[k1])
660 }
661 }
662 // Now plug in the new addresses and set the connection
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400663 for _, v1 := range nPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500664 found = false
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400665 for _, v2 := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500666 if v1.ipAddr == v2.ipAddr {
667 found = true
668 break
669 }
670 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400671 if found {
sslobodr8e2ccb52019-02-05 09:21:47 -0500672 continue
673 }
674 mia[0].ipAddr = v1.ipAddr
675 mia[0].name = v1.name
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400676 setConnection(client, afrouterROClusterName, mia[0].backend, mia[0].connection, v1.ipAddr, podGrpcPort)
sslobodr8e2ccb52019-02-05 09:21:47 -0500677 // Now get rid of the mia entry just processed
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400678 mia = append(mia[:0], mia[1:]...)
sslobodr8e2ccb52019-02-05 09:21:47 -0500679 }
680 default:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400681 log.Error("Internal: Unexpected type in call to applyAddrDiffs")
sslobodre7ce71d2019-01-22 16:21:45 -0500682 }
683}
684
sslobodr8e2ccb52019-02-05 09:21:47 -0500685func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400686 var byName = make(map[string]*volthaPod)
sslobodrcd37bc52019-01-24 11:47:16 -0500687
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400688 // Convenience
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400689 for _, v := range rwPods {
sslobodrcd37bc52019-01-24 11:47:16 -0500690 byName[v.name] = v
691 }
692
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400693 for k1, v1 := range coreGroups {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400694 for k2 := range v1 {
sslobodrcd37bc52019-01-24 11:47:16 -0500695 coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
696 }
697 }
698}
699
sslobodr16e41bc2019-01-18 16:22:21 -0500700func startCoreMonitor(client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400701 clientset *kubernetes.Clientset,
702 rwCoreFltr *regexp.Regexp,
703 roCoreFltr *regexp.Regexp,
704 coreGroups [][]*volthaPod,
705 oRoPods []*volthaPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500706 // Now that initial allocation has been completed, monitor the pods
707 // for IP changes
708 // The main loop needs to do the following:
709 // 1) Periodically query the pods and filter out
710 // the vcore ones
711 // 2) Validate that the pods running are the same
712 // as the previous check
713 // 3) Validate that the IP addresses are the same
714 // as the last check.
715 // If the pod name(s) ha(s/ve) changed then remove
716 // the unused pod names and add in the new pod names
717 // maintaining the cluster/backend information.
718 // If an IP address has changed (which shouldn't
719 // happen unless a pod is re-started) it should get
720 // caught by the pod name change.
721 for {
722 time.Sleep(10 * time.Second) // Wait a while
723 // Get the rw core list from k8s
sslobodr8e2ccb52019-02-05 09:21:47 -0500724 rwPods := getVolthaPods(clientset, rwCoreFltr)
sslobodre7ce71d2019-01-22 16:21:45 -0500725 queryDeviceIds(rwPods)
sslobodrcd37bc52019-01-24 11:47:16 -0500726 updateDeviceIds(coreGroups, rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500727 // If we didn't get 2n+1 pods then wait since
728 // something is down and will hopefully come
729 // back up at some point.
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400730 if len(rwPods) != numRWPods {
sslobodr16e41bc2019-01-18 16:22:21 -0500731 continue
732 }
733 // We have all pods, check if any IP addresses
734 // have changed.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400735 for _, v := range rwPods {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400736 if !hasIpAddr(coreGroups, v.ipAddr) {
sslobodre7ce71d2019-01-22 16:21:45 -0500737 log.Debug("Address has changed...")
738 applyAddrDiffs(client, coreGroups, rwPods)
sslobodr8e2ccb52019-02-05 09:21:47 -0500739 break
sslobodre7ce71d2019-01-22 16:21:45 -0500740 }
sslobodr16e41bc2019-01-18 16:22:21 -0500741 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500742
743 roPods := getVolthaPods(clientset, roCoreFltr)
744
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400745 if len(roPods) != numROPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500746 continue
747 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400748 for _, v := range roPods {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400749 if !hasIpAddr(oRoPods, v.ipAddr) {
sslobodr8e2ccb52019-02-05 09:21:47 -0500750 applyAddrDiffs(client, oRoPods, roPods)
751 break
752 }
753 }
754
sslobodr16e41bc2019-01-18 16:22:21 -0500755 }
sslobodr16e41bc2019-01-18 16:22:21 -0500756}
757
sslobodr8e2ccb52019-02-05 09:21:47 -0500758func hasIpAddr(coreList interface{}, ipAddr string) bool {
759 switch cores := coreList.(type) {
760 case []*volthaPod:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400761 for _, v := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500762 if v.ipAddr == ipAddr {
sslobodre7ce71d2019-01-22 16:21:45 -0500763 return true
764 }
765 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500766 case [][]*volthaPod:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400767 for _, v1 := range cores {
768 for _, v2 := range v1 {
sslobodr8e2ccb52019-02-05 09:21:47 -0500769 if v2.ipAddr == ipAddr {
770 return true
771 }
772 }
773 }
774 default:
775 log.Error("Internal: Unexpected type in call to hasIpAddr")
sslobodre7ce71d2019-01-22 16:21:45 -0500776 }
777 return false
778}
779
sslobodr16e41bc2019-01-18 16:22:21 -0500780func main() {
781 // This is currently hard coded to a cluster with 3 servers
782 //var connections map[string]configConn = make(map[string]configConn)
783 //var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
sslobodr16e41bc2019-01-18 16:22:21 -0500784 var err error
785 var conn *grpc.ClientConn
786
sslobodr16e41bc2019-01-18 16:22:21 -0500787 // Set up the regular expression to identify the voltha cores
sslobodr8e2ccb52019-02-05 09:21:47 -0500788 rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
789 roCoreFltr := regexp.MustCompile(`ro-core-`)
sslobodr16e41bc2019-01-18 16:22:21 -0500790
791 // Set up logging
792 if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
793 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
794 }
795
796 // Set up kubernetes api
797 clientset := k8sClientSet()
798
799 // Connect to the affinity router and set up the client
800 conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
sslobodrcd37bc52019-01-24 11:47:16 -0500801 defer conn.Close()
sslobodr16e41bc2019-01-18 16:22:21 -0500802 if err != nil {
803 panic(err.Error())
804 }
805 client := pb.NewConfigurationClient(conn)
806
807 // Get the voltha rw-core podes
sslobodr8e2ccb52019-02-05 09:21:47 -0500808 rwPods := getVolthaPods(clientset, rwCoreFltr)
sslobodr16e41bc2019-01-18 16:22:21 -0500809
810 // Fetch the devices held by each running core
sslobodre7ce71d2019-01-22 16:21:45 -0500811 queryDeviceIds(rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500812
813 // For debugging... comment out l8r
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400814 for _, v := range rwPods {
sslobodr16e41bc2019-01-18 16:22:21 -0500815 log.Debugf("Pod list %v", *v)
816 }
817
818 coreGroups := groupPods1(rwPods)
819
sslobodr16e41bc2019-01-18 16:22:21 -0500820 // Assign the groupings to the the backends and connections
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400821 for k := range coreGroups {
822 for k1 := range coreGroups[k] {
823 coreGroups[k][k1].cluster = afrouterRWClusterName
824 coreGroups[k][k1].backend = afrouterRWClusterName + strconv.Itoa(k+1)
825 coreGroups[k][k1].connection = afrouterRWClusterName + strconv.Itoa(k+1) + strconv.Itoa(k1+1)
sslobodr16e41bc2019-01-18 16:22:21 -0500826 }
827 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400828 log.Info("Core grouping completed")
sslobodr16e41bc2019-01-18 16:22:21 -0500829
830 // TODO: Debugging code, comment out for production
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400831 for k, v := range coreGroups {
832 for k2, v2 := range v {
sslobodr16e41bc2019-01-18 16:22:21 -0500833 log.Debugf("Core group %d,%d: %v", k, k2, v2)
834 }
835 }
sslobodrcd37bc52019-01-24 11:47:16 -0500836 log.Info("Setting affinities")
sslobodr16e41bc2019-01-18 16:22:21 -0500837 // Now set the affinities for exising devices in the cores
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400838 for _, v := range coreGroups {
sslobodr16e41bc2019-01-18 16:22:21 -0500839 setAffinity(client, v[0].devIds, v[0].backend)
840 setAffinity(client, v[1].devIds, v[1].backend)
841 }
sslobodrcd37bc52019-01-24 11:47:16 -0500842 log.Info("Setting connections")
sslobodr16e41bc2019-01-18 16:22:21 -0500843 // Configure the backeds based on the calculated core groups
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400844 for _, v := range coreGroups {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400845 setConnection(client, afrouterRWClusterName, v[0].backend, v[0].connection, v[0].ipAddr, podGrpcPort)
846 setConnection(client, afrouterRWClusterName, v[1].backend, v[1].connection, v[1].ipAddr, podGrpcPort)
sslobodr8e2ccb52019-02-05 09:21:47 -0500847 }
848
849 // Process the read only pods
850 roPods := getVolthaPods(clientset, roCoreFltr)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400851 for k, v := range roPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500852 log.Debugf("Processing ro_pod %v", v)
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400853 vN := afrouterROClusterName + strconv.Itoa(k+1)
sslobodr8e2ccb52019-02-05 09:21:47 -0500854 log.Debugf("Setting connection %s, %s, %s", vN, vN+"1", v.ipAddr)
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400855 roPods[k].cluster = afrouterROClusterName
sslobodr8e2ccb52019-02-05 09:21:47 -0500856 roPods[k].backend = vN
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400857 roPods[k].connection = vN + "1"
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400858 setConnection(client, afrouterROClusterName, v.backend, v.connection, v.ipAddr, podGrpcPort)
sslobodr16e41bc2019-01-18 16:22:21 -0500859 }
860
sslobodrcd37bc52019-01-24 11:47:16 -0500861 log.Info("Starting discovery monitoring")
sslobodr38afd0d2019-01-21 12:31:46 -0500862 startDiscoveryMonitor(client, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500863
sslobodrcd37bc52019-01-24 11:47:16 -0500864 log.Info("Starting core monitoring")
sslobodr8e2ccb52019-02-05 09:21:47 -0500865 startCoreMonitor(client, clientset, rwCoreFltr,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400866 roCoreFltr, coreGroups, roPods) // Never returns
sslobodr16e41bc2019-01-18 16:22:21 -0500867 return
sslobodr16e41bc2019-01-18 16:22:21 -0500868}