blob: 47445acbbe8d890f24ee86046f4d7b76e733d78f [file] [log] [blame]
sslobodr16e41bc2019-01-18 16:22:21 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package main
18
19import (
sslobodr16e41bc2019-01-18 16:22:21 -050020 "errors"
David K. Bainbridgef430cd52019-05-28 15:00:35 -070021 "flag"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040022 "fmt"
23 "math"
24 "os"
David K. Bainbridgef430cd52019-05-28 15:00:35 -070025 "path"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040026 "regexp"
sslobodr16e41bc2019-01-18 16:22:21 -050027 "strconv"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040028 "time"
sslobodr16e41bc2019-01-18 16:22:21 -050029
sslobodr16e41bc2019-01-18 16:22:21 -050030 "github.com/golang/protobuf/ptypes"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040031 "github.com/golang/protobuf/ptypes/empty"
sslobodr16e41bc2019-01-18 16:22:21 -050032 "github.com/opencord/voltha-go/common/log"
David K. Bainbridgef430cd52019-05-28 15:00:35 -070033 "github.com/opencord/voltha-go/common/version"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040034 "github.com/opencord/voltha-go/kafka"
William Kurkiandaa6bb22019-03-07 12:26:28 -050035 pb "github.com/opencord/voltha-protos/go/afrouter"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040036 cmn "github.com/opencord/voltha-protos/go/common"
William Kurkiandaa6bb22019-03-07 12:26:28 -050037 ic "github.com/opencord/voltha-protos/go/inter_container"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040038 vpb "github.com/opencord/voltha-protos/go/voltha"
39 "golang.org/x/net/context"
40 "google.golang.org/grpc"
41 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42 "k8s.io/client-go/kubernetes"
43 "k8s.io/client-go/rest"
sslobodr16e41bc2019-01-18 16:22:21 -050044)
45
sslobodr8e2ccb52019-02-05 09:21:47 -050046type volthaPod struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040047 name string
48 ipAddr string
49 node string
50 devIds map[string]struct{}
51 cluster string
52 backend string
sslobodr16e41bc2019-01-18 16:22:21 -050053 connection string
54}
55
56type podTrack struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040057 pod *volthaPod
58 dn bool
sslobodr16e41bc2019-01-18 16:22:21 -050059}
60
David K. Bainbridgef430cd52019-05-28 15:00:35 -070061type Configuration struct {
62 DisplayVersionOnly *bool
63}
64
Kent Hagerman334a8ce2019-05-16 16:50:33 -040065var (
66 podNamespace = getStrEnv("POD_NAMESPACE", "voltha")
67 podGrpcPort = uint64(getIntEnv("POD_GRPC_PORT", 0, math.MaxUint16, 50057))
sslobodre7ce71d2019-01-22 16:21:45 -050068
Kent Hagerman334a8ce2019-05-16 16:50:33 -040069 numRWPods = getIntEnv("NUM_RW_PODS", 1, math.MaxInt32, 6)
70 numROPods = getIntEnv("NUM_RO_PODS", 1, math.MaxInt32, 3)
71
72 afrouterRouterName = getStrEnv("AFROUTER_ROUTER_NAME", "vcore")
73 afrouterRWClusterName = getStrEnv("AFROUTER_RW_CLUSTER_NAME", "vcore")
74 afrouterROClusterName = getStrEnv("AFROUTER_RO_CLUSTER_NAME", "ro_vcore")
75
76 kafkaTopic = getStrEnv("KAFKA_TOPIC", "AffinityRouter")
77 kafkaClientType = getStrEnv("KAFKA_CLIENT_TYPE", "sarama")
78 kafkaHost = getStrEnv("KAFKA_HOST", "kafka")
79 kafkaPort = getIntEnv("KAFKA_PORT", 0, math.MaxUint16, 9092)
80 kafkaInstanceID = getStrEnv("KAFKA_INSTANCE_ID", "arouterd")
81)
82
83func getIntEnv(key string, min, max, defaultValue int) int {
84 if val, have := os.LookupEnv(key); have {
85 num, err := strconv.Atoi(val)
86 if err != nil || !(min <= num && num <= max) {
87 panic(fmt.Errorf("%s must be a number in the range [%d, %d]; default: %d", key, min, max, defaultValue))
88 }
89 return num
90 }
91 return defaultValue
92}
93
94func getStrEnv(key, defaultValue string) string {
95 if val, have := os.LookupEnv(key); have {
96 return val
97 }
98 return defaultValue
99}
sslobodr16e41bc2019-01-18 16:22:21 -0500100
101func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
102
103 log.Infow("kafka-client-type", log.Fields{"client": clientType})
104 switch clientType {
105 case "sarama":
106 return kafka.NewSaramaClient(
107 kafka.Host(host),
108 kafka.Port(port),
109 kafka.ConsumerType(kafka.GroupCustomer),
110 kafka.ProducerReturnOnErrors(true),
111 kafka.ProducerReturnOnSuccess(true),
112 kafka.ProducerMaxRetries(6),
113 kafka.NumPartitions(3),
114 kafka.ConsumerGroupName(instanceID),
115 kafka.ConsumerGroupPrefix(instanceID),
116 kafka.AutoCreateTopic(false),
117 kafka.ProducerFlushFrequency(5),
118 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
119 }
120 return nil, errors.New("unsupported-client-type")
121}
122
sslobodr16e41bc2019-01-18 16:22:21 -0500123func k8sClientSet() *kubernetes.Clientset {
124 // creates the in-cluster config
125 config, err := rest.InClusterConfig()
126 if err != nil {
127 panic(err.Error())
128 }
129 // creates the clientset
130 clientset, err := kubernetes.NewForConfig(config)
131 if err != nil {
132 panic(err.Error())
133 }
134
135 return clientset
136}
137
sslobodr16e41bc2019-01-18 16:22:21 -0500138func connect(addr string) (*grpc.ClientConn, error) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400139 for ctr := 0; ctr < 100; ctr++ {
sslobodre7ce71d2019-01-22 16:21:45 -0500140 log.Debugf("Trying to connect to %s", addr)
sslobodr16e41bc2019-01-18 16:22:21 -0500141 conn, err := grpc.Dial(addr, grpc.WithInsecure())
142 if err != nil {
143 log.Debugf("Attempt to connect failed, retrying %v:", err)
144 } else {
145 log.Debugf("Connection succeeded")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400146 return conn, err
sslobodr16e41bc2019-01-18 16:22:21 -0500147 }
148 time.Sleep(10 * time.Second)
149 }
150 log.Debugf("Too many connection attempts, giving up!")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400151 return nil, errors.New("Timeout attempting to conect")
sslobodr16e41bc2019-01-18 16:22:21 -0500152}
153
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400154func getVolthaPods(cs *kubernetes.Clientset, coreFilter *regexp.Regexp) []*volthaPod {
sslobodr8e2ccb52019-02-05 09:21:47 -0500155 var rtrn []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500156
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400157 pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
sslobodr16e41bc2019-01-18 16:22:21 -0500158 if err != nil {
159 panic(err.Error())
160 }
sslobodre7ce71d2019-01-22 16:21:45 -0500161 //log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
sslobodr16e41bc2019-01-18 16:22:21 -0500162
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400163 for _, v := range pods.Items {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400164 if coreFilter.MatchString(v.Name) {
sslobodre7ce71d2019-01-22 16:21:45 -0500165 log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400166 v.Status.PodIP, v.Spec.NodeName)
sslobodre7ce71d2019-01-22 16:21:45 -0500167 // Only add the pod if it has an IP address. If it doesn't then it likely crashed and
168 // and is still in the process of getting re-started.
169 if v.Status.PodIP != "" {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400170 rtrn = append(rtrn, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName,
171 devIds: make(map[string]struct{}), backend: "", connection: ""})
sslobodre7ce71d2019-01-22 16:21:45 -0500172 }
sslobodr16e41bc2019-01-18 16:22:21 -0500173 }
174 }
175 return rtrn
176}
177
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400178func reconcilePodDeviceIds(pod *volthaPod, ids map[string]struct{}) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400179 conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
sslobodre7ce71d2019-01-22 16:21:45 -0500180 defer conn.Close()
sslobodr6c1689c2019-01-24 07:31:15 -0500181 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500182 log.Debugf("Could not query devices from %s, could not connect", pod.name)
183 return false
184 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400185
186 var idList cmn.IDs
187 for k := range ids {
188 idList.Items = append(idList.Items, &cmn.ID{Id: k})
189 }
190
sslobodre7ce71d2019-01-22 16:21:45 -0500191 client := vpb.NewVolthaServiceClient(conn)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400192 _, err = client.ReconcileDevices(context.Background(), &idList)
sslobodre7ce71d2019-01-22 16:21:45 -0500193 if err != nil {
194 log.Error(err)
195 return false
196 }
197
198 return true
199}
200
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400201func queryPodDeviceIds(pod *volthaPod) map[string]struct{} {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400202 var rtrn = make(map[string]struct{})
sslobodre7ce71d2019-01-22 16:21:45 -0500203 // Open a connection to the pod
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400204 conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
sslobodr6c1689c2019-01-24 07:31:15 -0500205 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500206 log.Debugf("Could not query devices from %s, could not connect", pod.name)
207 return rtrn
208 }
209 defer conn.Close()
210 client := vpb.NewVolthaServiceClient(conn)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400211 devs, err := client.ListDeviceIds(context.Background(), &empty.Empty{})
sslobodre7ce71d2019-01-22 16:21:45 -0500212 if err != nil {
213 log.Error(err)
214 return rtrn
215 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400216 for _, dv := range devs.Items {
217 rtrn[dv.Id] = struct{}{}
sslobodre7ce71d2019-01-22 16:21:45 -0500218 }
219
220 return rtrn
221}
222
sslobodr8e2ccb52019-02-05 09:21:47 -0500223func queryDeviceIds(pods []*volthaPod) {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400224 for pk := range pods {
sslobodre7ce71d2019-01-22 16:21:45 -0500225 // Keep the old Id list if a new list is not returned
226 if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
227 pods[pk].devIds = idList
sslobodr16e41bc2019-01-18 16:22:21 -0500228 }
sslobodr16e41bc2019-01-18 16:22:21 -0500229 }
230}
231
sslobodr8e2ccb52019-02-05 09:21:47 -0500232func allEmpty(pods []*volthaPod) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400233 for k := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500234 if len(pods[k].devIds) != 0 {
235 return false
236 }
237 }
238 return true
239}
240
sslobodr8e2ccb52019-02-05 09:21:47 -0500241func rmPod(pods []*volthaPod, idx int) []*volthaPod {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400242 return append(pods[:idx], pods[idx+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500243}
244
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400245func groupIntersectingPods1(pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
sslobodr8e2ccb52019-02-05 09:21:47 -0500246 var rtrn [][]*volthaPod
247 var out []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500248
249 for {
250 if len(pods) == 0 {
251 break
252 }
253 if len(pods[0].devIds) == 0 { // Ignore pods with no devices
254 ////log.Debugf("%s empty pod", pd[k].pod.name)
255 out = append(out, pods[0])
256 pods = rmPod(pods, 0)
257 continue
258 }
259 // Start a pod group with this pod
sslobodr8e2ccb52019-02-05 09:21:47 -0500260 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500261 grp = append(grp, pods[0])
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400262 pods = rmPod(pods, 0)
sslobodr16e41bc2019-01-18 16:22:21 -0500263 //log.Debugf("Creating new group %s", pd[k].pod.name)
264 // Find the peer pod based on device overlap
265 // It's ok if one isn't found, an empty one will be used instead
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400266 for k := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500267 if len(pods[k].devIds) == 0 { // Skip pods with no devices
268 //log.Debugf("%s empty pod", pd[k1].pod.name)
269 continue
270 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400271 if intersect(grp[0].devIds, pods[k].devIds) {
sslobodr16e41bc2019-01-18 16:22:21 -0500272 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
273 if grp[0].node == pods[k].node {
274 // This should never happen
275 log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400276 grp[0].name, pods[k].name)
sslobodr16e41bc2019-01-18 16:22:21 -0500277 continue
278 }
279 grp = append(grp, pods[k])
280 pods = rmPod(pods, k)
281 break
282
283 }
284 }
285 rtrn = append(rtrn, grp)
286 //log.Debugf("Added group %s", grp[0].name)
287 // Check if the number of groups = half the pods, if so all groups are started.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400288 if len(rtrn) == podCt>>1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500289 // Append any remaining pods to out
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400290 out = append(out, pods[0:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500291 break
292 }
293 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400294 return rtrn, out
sslobodr16e41bc2019-01-18 16:22:21 -0500295}
296
sslobodr16e41bc2019-01-18 16:22:21 -0500297func unallocPodCount(pd []*podTrack) int {
298 var rtrn int = 0
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400299 for _, v := range pd {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400300 if !v.dn {
sslobodr16e41bc2019-01-18 16:22:21 -0500301 rtrn++
302 }
303 }
304 return rtrn
305}
306
sslobodr8e2ccb52019-02-05 09:21:47 -0500307func sameNode(pod *volthaPod, grps [][]*volthaPod) bool {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400308 for _, v := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500309 if v[0].node == pod.node {
310 return true
311 }
312 if len(v) == 2 && v[1].node == pod.node {
313 return true
314 }
315 }
316 return false
317}
318
sslobodr8e2ccb52019-02-05 09:21:47 -0500319func startRemainingGroups1(grps [][]*volthaPod, pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
320 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500321
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400322 for k := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500323 if sameNode(pods[k], grps) {
324 continue
325 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500326 grp = []*volthaPod{}
sslobodr16e41bc2019-01-18 16:22:21 -0500327 grp = append(grp, pods[k])
328 pods = rmPod(pods, k)
329 grps = append(grps, grp)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400330 if len(grps) == podCt>>1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500331 break
332 }
333 }
334 return grps, pods
335}
336
sslobodr8e2ccb52019-02-05 09:21:47 -0500337func hasSingleSecondNode(grp []*volthaPod) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400338 var servers = make(map[string]struct{})
339 for k := range grp {
sslobodr16e41bc2019-01-18 16:22:21 -0500340 if k == 0 {
341 continue // Ignore the first item
342 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400343 servers[grp[k].node] = struct{}{}
sslobodr16e41bc2019-01-18 16:22:21 -0500344 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400345 if len(servers) == 1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500346 return true
347 }
348 return false
349}
350
sslobodr8e2ccb52019-02-05 09:21:47 -0500351func addNode(grps [][]*volthaPod, idx *volthaPod, item *volthaPod) [][]*volthaPod {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400352 for k := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500353 if grps[k][0].name == idx.name {
354 grps[k] = append(grps[k], item)
355 return grps
356 }
357 }
358 // TODO: Error checking required here.
359 return grps
360}
361
sslobodr8e2ccb52019-02-05 09:21:47 -0500362func removeNode(grps [][]*volthaPod, item *volthaPod) [][]*volthaPod {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400363 for k := range grps {
364 for k1 := range grps[k] {
sslobodr16e41bc2019-01-18 16:22:21 -0500365 if grps[k][k1].name == item.name {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400366 grps[k] = append(grps[k][:k1], grps[k][k1+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500367 break
368 }
369 }
370 }
371 return grps
372}
373
sslobodr8e2ccb52019-02-05 09:21:47 -0500374func groupRemainingPods1(grps [][]*volthaPod, pods []*volthaPod) [][]*volthaPod {
375 var lgrps [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500376 // All groups must be started when this function is called.
377 // Copy incomplete groups
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400378 for k := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500379 if len(grps[k]) != 2 {
380 lgrps = append(lgrps, grps[k])
381 }
382 }
383
384 // Add all pairing candidates to each started group.
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400385 for k := range pods {
386 for k2 := range lgrps {
sslobodr16e41bc2019-01-18 16:22:21 -0500387 if lgrps[k2][0].node != pods[k].node {
388 lgrps[k2] = append(lgrps[k2], pods[k])
389 }
390 }
391 }
392
393 //TODO: If any member of lgrps doesn't have at least 2
394 // nodes something is wrong. Check for that here
395
396 for {
397 for { // Address groups with only a single server choice
398 var ssn bool = false
399
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400400 for k := range lgrps {
sslobodr16e41bc2019-01-18 16:22:21 -0500401 // Now if any of the groups only have a single
402 // node as the choice for the second member
403 // address that one first.
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400404 if hasSingleSecondNode(lgrps[k]) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400405 ssn = true
sslobodr16e41bc2019-01-18 16:22:21 -0500406 // Add this pairing to the groups
407 grps = addNode(grps, lgrps[k][0], lgrps[k][1])
408 // Since this node is now used, remove it from all
409 // remaining tenative groups
410 lgrps = removeNode(lgrps, lgrps[k][1])
411 // Now remove this group completely since
412 // it's been addressed
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400413 lgrps = append(lgrps[:k], lgrps[k+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500414 break
415 }
416 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400417 if !ssn {
sslobodr16e41bc2019-01-18 16:22:21 -0500418 break
419 }
420 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400421 // Now address one of the remaining groups
sslobodr16e41bc2019-01-18 16:22:21 -0500422 if len(lgrps) == 0 {
423 break // Nothing left to do, exit the loop
424 }
425 grps = addNode(grps, lgrps[0][0], lgrps[0][1])
426 lgrps = removeNode(lgrps, lgrps[0][1])
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400427 lgrps = append(lgrps[:0], lgrps[1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500428 }
429 return grps
430}
431
sslobodr8e2ccb52019-02-05 09:21:47 -0500432func groupPods1(pods []*volthaPod) [][]*volthaPod {
433 var rtrn [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500434 var podCt int = len(pods)
435
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400436 rtrn, pods = groupIntersectingPods1(pods, podCt)
437 // There are several outcomes here
sslobodr16e41bc2019-01-18 16:22:21 -0500438 // 1) All pods have been paired and we're done
439 // 2) Some un-allocated pods remain
440 // 2.a) All groups have been started
441 // 2.b) Not all groups have been started
442 if len(pods) == 0 {
443 return rtrn
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400444 } else if len(rtrn) == podCt>>1 { // All groupings started
sslobodr16e41bc2019-01-18 16:22:21 -0500445 // Allocate the remaining (presumably empty) pods to the started groups
446 return groupRemainingPods1(rtrn, pods)
447 } else { // Some groupings started
448 // Start empty groups with remaining pods
449 // each grouping is on a different server then
450 // allocate remaining pods.
451 rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
452 return groupRemainingPods1(rtrn, pods)
453 }
454}
455
sslobodr16e41bc2019-01-18 16:22:21 -0500456func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400457 for k := range d1 {
458 if _, ok := d2[k]; ok {
sslobodr16e41bc2019-01-18 16:22:21 -0500459 return true
460 }
461 }
462 return false
463}
464
sslobodr8e2ccb52019-02-05 09:21:47 -0500465func setConnection(client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
sslobodr360c8d72019-02-05 12:47:56 -0500466 log.Debugf("Configuring backend %s : connection %s in cluster %s\n\n",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400467 backend, connection, cluster)
468 cnf := &pb.Conn{Server: "grpc_command", Cluster: cluster, Backend: backend,
469 Connection: connection, Addr: addr,
470 Port: port}
sslobodr16e41bc2019-01-18 16:22:21 -0500471 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
472 log.Debugf("failed SetConnection RPC call: %s", err)
473 } else {
474 log.Debugf("Result: %v", res)
475 }
476}
477
478func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
479 log.Debugf("Configuring backend %s : affinities \n", backend)
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400480 aff := &pb.Affinity{Router: afrouterRouterName, Route: "dev_manager", Cluster: afrouterRWClusterName, Backend: backend}
481 for k := range ids {
sslobodr16e41bc2019-01-18 16:22:21 -0500482 log.Debugf("Setting affinity for id %s", k)
483 aff.Id = k
484 if res, err := client.SetAffinity(context.Background(), aff); err != nil {
485 log.Debugf("failed affinity RPC call: %s", err)
486 } else {
487 log.Debugf("Result: %v", res)
488 }
489 }
490}
491
sslobodr8e2ccb52019-02-05 09:21:47 -0500492func getBackendForCore(coreId string, coreGroups [][]*volthaPod) string {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400493 for _, v := range coreGroups {
494 for _, v2 := range v {
sslobodr38afd0d2019-01-21 12:31:46 -0500495 if v2.name == coreId {
496 return v2.backend
497 }
498 }
499 }
500 log.Errorf("No backend found for core %s\n", coreId)
501 return ""
502}
503
sslobodr16e41bc2019-01-18 16:22:21 -0500504func monitorDiscovery(client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400505 ch <-chan *ic.InterContainerMessage,
506 coreGroups [][]*volthaPod) {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400507 var id = make(map[string]struct{})
sslobodr38afd0d2019-01-21 12:31:46 -0500508
sslobodr16e41bc2019-01-18 16:22:21 -0500509 select {
510 case msg := <-ch:
511 log.Debugf("Received a device discovery notification")
sslobodr38afd0d2019-01-21 12:31:46 -0500512 device := &ic.DeviceDiscovered{}
513 if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
sslobodr16e41bc2019-01-18 16:22:21 -0500514 log.Errorf("Could not unmarshal received notification %v", msg)
515 } else {
sslobodr38afd0d2019-01-21 12:31:46 -0500516 // Set the affinity of the discovered device.
517 if be := getBackendForCore(device.Id, coreGroups); be != "" {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400518 id[device.Id] = struct{}{}
sslobodr38afd0d2019-01-21 12:31:46 -0500519 setAffinity(client, id, be)
520 } else {
521 log.Error("Cant use an empty string as a backend name")
522 }
sslobodr16e41bc2019-01-18 16:22:21 -0500523 }
524 break
525 }
526}
527
sslobodr38afd0d2019-01-21 12:31:46 -0500528func startDiscoveryMonitor(client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400529 coreGroups [][]*volthaPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500530 var ch <-chan *ic.InterContainerMessage
531 // Connect to kafka for discovery events
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400532 topic := &kafka.Topic{Name: kafkaTopic}
533 kc, err := newKafkaClient(kafkaClientType, kafkaHost, kafkaPort, kafkaInstanceID)
sslobodr16e41bc2019-01-18 16:22:21 -0500534 kc.Start()
535
536 if ch, err = kc.Subscribe(topic); err != nil {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400537 log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
sslobodr16e41bc2019-01-18 16:22:21 -0500538 return err
539 }
sslobodr38afd0d2019-01-21 12:31:46 -0500540 go monitorDiscovery(client, ch, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500541 return nil
542}
543
sslobodre7ce71d2019-01-22 16:21:45 -0500544// Determines which items in core groups
545// have changed based on the list provided
546// and returns a coreGroup with only the changed
547// items and a pod list with the new items
sslobodr8e2ccb52019-02-05 09:21:47 -0500548func getAddrDiffs(coreGroups [][]*volthaPod, rwPods []*volthaPod) ([][]*volthaPod, []*volthaPod) {
549 var nList []*volthaPod
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400550 var rtrn = make([][]*volthaPod, numRWPods>>1)
551 var ipAddrs = make(map[string]struct{})
sslobodre7ce71d2019-01-22 16:21:45 -0500552
553 log.Debug("Get addr diffs")
554
555 // Start with an empty array
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400556 for k := range rtrn {
sslobodr8e2ccb52019-02-05 09:21:47 -0500557 rtrn[k] = make([]*volthaPod, 2)
sslobodre7ce71d2019-01-22 16:21:45 -0500558 }
559
560 // Build a list with only the new items
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400561 for _, v := range rwPods {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400562 if !hasIpAddr(coreGroups, v.ipAddr) {
sslobodre7ce71d2019-01-22 16:21:45 -0500563 nList = append(nList, v)
564 }
565 ipAddrs[v.ipAddr] = struct{}{} // for the search below
566 }
567
568 // Now build the coreGroups with only the changed items
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400569 for k1, v1 := range coreGroups {
570 for k2, v2 := range v1 {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400571 if _, ok := ipAddrs[v2.ipAddr]; !ok {
sslobodre7ce71d2019-01-22 16:21:45 -0500572 rtrn[k1][k2] = v2
573 }
574 }
575 }
576 return rtrn, nList
577}
578
579// Figure out where best to put the new pods
580// in the coreGroup array based on the old
581// pods being replaced. The criteria is that
582// the new pod be on the same server as the
583// old pod was.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400584func reconcileAddrDiffs(coreGroupDiffs [][]*volthaPod, rwPodDiffs []*volthaPod) [][]*volthaPod {
sslobodr8e2ccb52019-02-05 09:21:47 -0500585 var srvrs map[string][]*volthaPod = make(map[string][]*volthaPod)
sslobodre7ce71d2019-01-22 16:21:45 -0500586
587 log.Debug("Reconciling diffs")
588 log.Debug("Building server list")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400589 for _, v := range rwPodDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500590 log.Debugf("Adding %v to the server list", *v)
591 srvrs[v.node] = append(srvrs[v.node], v)
592 }
593
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400594 for k1, v1 := range coreGroupDiffs {
595 log.Debugf("k1:%v, v1:%v", k1, v1)
596 for k2, v2 := range v1 {
597 log.Debugf("k2:%v, v2:%v", k2, v2)
sslobodre7ce71d2019-01-22 16:21:45 -0500598 if v2 == nil { // Nothing to do here
599 continue
600 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400601 if _, ok := srvrs[v2.node]; ok {
sslobodre7ce71d2019-01-22 16:21:45 -0500602 coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
603 if len(srvrs[v2.node]) > 1 { // remove one entry from the list
604 srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
605 } else { // Delete the endtry from the map
606 delete(srvrs, v2.node)
607 }
608 } else {
609 log.Error("This should never happen, node appears to have changed names")
610 // attempt to limp along by keeping this old entry
611 }
612 }
613 }
614
615 return coreGroupDiffs
616}
617
sslobodr8e2ccb52019-02-05 09:21:47 -0500618func applyAddrDiffs(client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
619 var newEntries [][]*volthaPod
sslobodre7ce71d2019-01-22 16:21:45 -0500620
621 log.Debug("Applying diffs")
sslobodr8e2ccb52019-02-05 09:21:47 -0500622 switch cores := coreList.(type) {
623 case [][]*volthaPod:
624 newEntries = reconcileAddrDiffs(getAddrDiffs(cores, nPods))
sslobodre7ce71d2019-01-22 16:21:45 -0500625
sslobodr8e2ccb52019-02-05 09:21:47 -0500626 // Now replace the information in coreGropus with the new
627 // entries and then reconcile the device ids on the core
628 // that's in the new entry with the device ids of it's
629 // active-active peer.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400630 for k1, v1 := range cores {
631 for k2, v2 := range v1 {
sslobodr8e2ccb52019-02-05 09:21:47 -0500632 if newEntries[k1][k2] != nil {
633 // TODO: Missing is the case where bothe the primary
634 // and the secondary core crash and come back.
635 // Pull the device ids from the active-active peer
636 ids := queryPodDeviceIds(cores[k1][k2^1])
637 if len(ids) != 0 {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400638 if !reconcilePodDeviceIds(newEntries[k1][k2], ids) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400639 log.Errorf("Attempt to reconcile ids on pod %v failed", newEntries[k1][k2])
sslobodr8e2ccb52019-02-05 09:21:47 -0500640 }
sslobodre7ce71d2019-01-22 16:21:45 -0500641 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500642 // Send the affininty router new connection information
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400643 setConnection(client, afrouterRWClusterName, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, podGrpcPort)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400644 // Copy the new entry information over
sslobodr8e2ccb52019-02-05 09:21:47 -0500645 cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
646 cores[k1][k2].name = newEntries[k1][k2].name
647 cores[k1][k2].devIds = ids
sslobodre7ce71d2019-01-22 16:21:45 -0500648 }
sslobodre7ce71d2019-01-22 16:21:45 -0500649 }
650 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500651 case []*volthaPod:
652 var mia []*volthaPod
653 var found bool
654 // TODO: Break this using functions to simplify
655 // reading of the code.
656 // Find the core(s) that have changed addresses
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400657 for k1, v1 := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500658 found = false
659 for _, v2 := range nPods {
660 if v1.ipAddr == v2.ipAddr {
661 found = true
662 break
663 }
664 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400665 if !found {
sslobodr8e2ccb52019-02-05 09:21:47 -0500666 mia = append(mia, cores[k1])
667 }
668 }
669 // Now plug in the new addresses and set the connection
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400670 for _, v1 := range nPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500671 found = false
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400672 for _, v2 := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500673 if v1.ipAddr == v2.ipAddr {
674 found = true
675 break
676 }
677 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400678 if found {
sslobodr8e2ccb52019-02-05 09:21:47 -0500679 continue
680 }
681 mia[0].ipAddr = v1.ipAddr
682 mia[0].name = v1.name
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400683 setConnection(client, afrouterROClusterName, mia[0].backend, mia[0].connection, v1.ipAddr, podGrpcPort)
sslobodr8e2ccb52019-02-05 09:21:47 -0500684 // Now get rid of the mia entry just processed
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400685 mia = append(mia[:0], mia[1:]...)
sslobodr8e2ccb52019-02-05 09:21:47 -0500686 }
687 default:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400688 log.Error("Internal: Unexpected type in call to applyAddrDiffs")
sslobodre7ce71d2019-01-22 16:21:45 -0500689 }
690}
691
sslobodr8e2ccb52019-02-05 09:21:47 -0500692func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400693 var byName = make(map[string]*volthaPod)
sslobodrcd37bc52019-01-24 11:47:16 -0500694
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400695 // Convenience
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400696 for _, v := range rwPods {
sslobodrcd37bc52019-01-24 11:47:16 -0500697 byName[v.name] = v
698 }
699
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400700 for k1, v1 := range coreGroups {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400701 for k2 := range v1 {
sslobodrcd37bc52019-01-24 11:47:16 -0500702 coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
703 }
704 }
705}
706
sslobodr16e41bc2019-01-18 16:22:21 -0500707func startCoreMonitor(client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400708 clientset *kubernetes.Clientset,
709 rwCoreFltr *regexp.Regexp,
710 roCoreFltr *regexp.Regexp,
711 coreGroups [][]*volthaPod,
712 oRoPods []*volthaPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500713 // Now that initial allocation has been completed, monitor the pods
714 // for IP changes
715 // The main loop needs to do the following:
716 // 1) Periodically query the pods and filter out
717 // the vcore ones
718 // 2) Validate that the pods running are the same
719 // as the previous check
720 // 3) Validate that the IP addresses are the same
721 // as the last check.
722 // If the pod name(s) ha(s/ve) changed then remove
723 // the unused pod names and add in the new pod names
724 // maintaining the cluster/backend information.
725 // If an IP address has changed (which shouldn't
726 // happen unless a pod is re-started) it should get
727 // caught by the pod name change.
728 for {
729 time.Sleep(10 * time.Second) // Wait a while
730 // Get the rw core list from k8s
sslobodr8e2ccb52019-02-05 09:21:47 -0500731 rwPods := getVolthaPods(clientset, rwCoreFltr)
sslobodre7ce71d2019-01-22 16:21:45 -0500732 queryDeviceIds(rwPods)
sslobodrcd37bc52019-01-24 11:47:16 -0500733 updateDeviceIds(coreGroups, rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500734 // If we didn't get 2n+1 pods then wait since
735 // something is down and will hopefully come
736 // back up at some point.
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400737 if len(rwPods) != numRWPods {
sslobodr16e41bc2019-01-18 16:22:21 -0500738 continue
739 }
740 // We have all pods, check if any IP addresses
741 // have changed.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400742 for _, v := range rwPods {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400743 if !hasIpAddr(coreGroups, v.ipAddr) {
sslobodre7ce71d2019-01-22 16:21:45 -0500744 log.Debug("Address has changed...")
745 applyAddrDiffs(client, coreGroups, rwPods)
sslobodr8e2ccb52019-02-05 09:21:47 -0500746 break
sslobodre7ce71d2019-01-22 16:21:45 -0500747 }
sslobodr16e41bc2019-01-18 16:22:21 -0500748 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500749
750 roPods := getVolthaPods(clientset, roCoreFltr)
751
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400752 if len(roPods) != numROPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500753 continue
754 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400755 for _, v := range roPods {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400756 if !hasIpAddr(oRoPods, v.ipAddr) {
sslobodr8e2ccb52019-02-05 09:21:47 -0500757 applyAddrDiffs(client, oRoPods, roPods)
758 break
759 }
760 }
761
sslobodr16e41bc2019-01-18 16:22:21 -0500762 }
sslobodr16e41bc2019-01-18 16:22:21 -0500763}
764
sslobodr8e2ccb52019-02-05 09:21:47 -0500765func hasIpAddr(coreList interface{}, ipAddr string) bool {
766 switch cores := coreList.(type) {
767 case []*volthaPod:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400768 for _, v := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500769 if v.ipAddr == ipAddr {
sslobodre7ce71d2019-01-22 16:21:45 -0500770 return true
771 }
772 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500773 case [][]*volthaPod:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400774 for _, v1 := range cores {
775 for _, v2 := range v1 {
sslobodr8e2ccb52019-02-05 09:21:47 -0500776 if v2.ipAddr == ipAddr {
777 return true
778 }
779 }
780 }
781 default:
782 log.Error("Internal: Unexpected type in call to hasIpAddr")
sslobodre7ce71d2019-01-22 16:21:45 -0500783 }
784 return false
785}
786
sslobodr16e41bc2019-01-18 16:22:21 -0500787func main() {
788 // This is currently hard coded to a cluster with 3 servers
789 //var connections map[string]configConn = make(map[string]configConn)
790 //var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
sslobodr16e41bc2019-01-18 16:22:21 -0500791 var err error
792 var conn *grpc.ClientConn
793
David K. Bainbridgef430cd52019-05-28 15:00:35 -0700794 config := &Configuration{}
795 cmdParse := flag.NewFlagSet(path.Base(os.Args[0]), flag.ContinueOnError)
796 config.DisplayVersionOnly = cmdParse.Bool("version", false, "Print version information and exit")
797
798 err = cmdParse.Parse(os.Args[1:])
799 if err != nil {
800 fmt.Printf("Error: %v\n", err)
801 os.Exit(1)
802 }
803
804 if *config.DisplayVersionOnly {
805 fmt.Println("VOLTHA API Server (afrouterd)")
806 fmt.Println(version.VersionInfo.String(" "))
807 return
808 }
809
sslobodr16e41bc2019-01-18 16:22:21 -0500810 // Set up the regular expression to identify the voltha cores
sslobodr8e2ccb52019-02-05 09:21:47 -0500811 rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
812 roCoreFltr := regexp.MustCompile(`ro-core-`)
sslobodr16e41bc2019-01-18 16:22:21 -0500813
814 // Set up logging
815 if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
816 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
817 }
818
819 // Set up kubernetes api
820 clientset := k8sClientSet()
821
822 // Connect to the affinity router and set up the client
823 conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
sslobodrcd37bc52019-01-24 11:47:16 -0500824 defer conn.Close()
sslobodr16e41bc2019-01-18 16:22:21 -0500825 if err != nil {
826 panic(err.Error())
827 }
828 client := pb.NewConfigurationClient(conn)
829
830 // Get the voltha rw-core podes
sslobodr8e2ccb52019-02-05 09:21:47 -0500831 rwPods := getVolthaPods(clientset, rwCoreFltr)
sslobodr16e41bc2019-01-18 16:22:21 -0500832
833 // Fetch the devices held by each running core
sslobodre7ce71d2019-01-22 16:21:45 -0500834 queryDeviceIds(rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500835
836 // For debugging... comment out l8r
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400837 for _, v := range rwPods {
sslobodr16e41bc2019-01-18 16:22:21 -0500838 log.Debugf("Pod list %v", *v)
839 }
840
841 coreGroups := groupPods1(rwPods)
842
sslobodr16e41bc2019-01-18 16:22:21 -0500843 // Assign the groupings to the the backends and connections
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400844 for k := range coreGroups {
845 for k1 := range coreGroups[k] {
846 coreGroups[k][k1].cluster = afrouterRWClusterName
847 coreGroups[k][k1].backend = afrouterRWClusterName + strconv.Itoa(k+1)
848 coreGroups[k][k1].connection = afrouterRWClusterName + strconv.Itoa(k+1) + strconv.Itoa(k1+1)
sslobodr16e41bc2019-01-18 16:22:21 -0500849 }
850 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400851 log.Info("Core grouping completed")
sslobodr16e41bc2019-01-18 16:22:21 -0500852
853 // TODO: Debugging code, comment out for production
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400854 for k, v := range coreGroups {
855 for k2, v2 := range v {
sslobodr16e41bc2019-01-18 16:22:21 -0500856 log.Debugf("Core group %d,%d: %v", k, k2, v2)
857 }
858 }
sslobodrcd37bc52019-01-24 11:47:16 -0500859 log.Info("Setting affinities")
sslobodr16e41bc2019-01-18 16:22:21 -0500860 // Now set the affinities for exising devices in the cores
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400861 for _, v := range coreGroups {
sslobodr16e41bc2019-01-18 16:22:21 -0500862 setAffinity(client, v[0].devIds, v[0].backend)
863 setAffinity(client, v[1].devIds, v[1].backend)
864 }
sslobodrcd37bc52019-01-24 11:47:16 -0500865 log.Info("Setting connections")
sslobodr16e41bc2019-01-18 16:22:21 -0500866 // Configure the backeds based on the calculated core groups
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400867 for _, v := range coreGroups {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400868 setConnection(client, afrouterRWClusterName, v[0].backend, v[0].connection, v[0].ipAddr, podGrpcPort)
869 setConnection(client, afrouterRWClusterName, v[1].backend, v[1].connection, v[1].ipAddr, podGrpcPort)
sslobodr8e2ccb52019-02-05 09:21:47 -0500870 }
871
872 // Process the read only pods
873 roPods := getVolthaPods(clientset, roCoreFltr)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400874 for k, v := range roPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500875 log.Debugf("Processing ro_pod %v", v)
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400876 vN := afrouterROClusterName + strconv.Itoa(k+1)
sslobodr8e2ccb52019-02-05 09:21:47 -0500877 log.Debugf("Setting connection %s, %s, %s", vN, vN+"1", v.ipAddr)
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400878 roPods[k].cluster = afrouterROClusterName
sslobodr8e2ccb52019-02-05 09:21:47 -0500879 roPods[k].backend = vN
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400880 roPods[k].connection = vN + "1"
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400881 setConnection(client, afrouterROClusterName, v.backend, v.connection, v.ipAddr, podGrpcPort)
sslobodr16e41bc2019-01-18 16:22:21 -0500882 }
883
sslobodrcd37bc52019-01-24 11:47:16 -0500884 log.Info("Starting discovery monitoring")
sslobodr38afd0d2019-01-21 12:31:46 -0500885 startDiscoveryMonitor(client, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500886
sslobodrcd37bc52019-01-24 11:47:16 -0500887 log.Info("Starting core monitoring")
sslobodr8e2ccb52019-02-05 09:21:47 -0500888 startCoreMonitor(client, clientset, rwCoreFltr,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400889 roCoreFltr, coreGroups, roPods) // Never returns
sslobodr16e41bc2019-01-18 16:22:21 -0500890 return
sslobodr16e41bc2019-01-18 16:22:21 -0500891}