blob: 9f1215c0289063e4f0bc0c66c468b3f7f347a458 [file] [log] [blame]
sslobodr16e41bc2019-01-18 16:22:21 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package main
18
19import (
sslobodr16e41bc2019-01-18 16:22:21 -050020 "errors"
David K. Bainbridgef430cd52019-05-28 15:00:35 -070021 "flag"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040022 "fmt"
23 "math"
24 "os"
David K. Bainbridgef430cd52019-05-28 15:00:35 -070025 "path"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040026 "regexp"
sslobodr16e41bc2019-01-18 16:22:21 -050027 "strconv"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040028 "time"
sslobodr16e41bc2019-01-18 16:22:21 -050029
sslobodr16e41bc2019-01-18 16:22:21 -050030 "github.com/golang/protobuf/ptypes"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040031 "github.com/golang/protobuf/ptypes/empty"
sslobodr16e41bc2019-01-18 16:22:21 -050032 "github.com/opencord/voltha-go/common/log"
David K. Bainbridgef430cd52019-05-28 15:00:35 -070033 "github.com/opencord/voltha-go/common/version"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040034 "github.com/opencord/voltha-go/kafka"
William Kurkiandaa6bb22019-03-07 12:26:28 -050035 pb "github.com/opencord/voltha-protos/go/afrouter"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040036 cmn "github.com/opencord/voltha-protos/go/common"
William Kurkiandaa6bb22019-03-07 12:26:28 -050037 ic "github.com/opencord/voltha-protos/go/inter_container"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040038 vpb "github.com/opencord/voltha-protos/go/voltha"
39 "golang.org/x/net/context"
40 "google.golang.org/grpc"
41 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42 "k8s.io/client-go/kubernetes"
43 "k8s.io/client-go/rest"
Kent Hagermane566c2e2019-06-03 17:56:42 -040044 "k8s.io/client-go/tools/clientcmd"
sslobodr16e41bc2019-01-18 16:22:21 -050045)
46
sslobodr8e2ccb52019-02-05 09:21:47 -050047type volthaPod struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040048 name string
49 ipAddr string
50 node string
51 devIds map[string]struct{}
52 cluster string
53 backend string
sslobodr16e41bc2019-01-18 16:22:21 -050054 connection string
55}
56
57type podTrack struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040058 pod *volthaPod
59 dn bool
sslobodr16e41bc2019-01-18 16:22:21 -050060}
61
David K. Bainbridgef430cd52019-05-28 15:00:35 -070062type Configuration struct {
63 DisplayVersionOnly *bool
64}
65
Kent Hagerman334a8ce2019-05-16 16:50:33 -040066var (
Kent Hagermane566c2e2019-06-03 17:56:42 -040067 // if k8s variables are undefined, will attempt to use in-cluster config
68 k8sApiServer = getStrEnv("K8S_API_SERVER", "")
69 k8sKubeConfigPath = getStrEnv("K8S_KUBE_CONFIG_PATH", "")
70
Kent Hagerman334a8ce2019-05-16 16:50:33 -040071 podNamespace = getStrEnv("POD_NAMESPACE", "voltha")
72 podGrpcPort = uint64(getIntEnv("POD_GRPC_PORT", 0, math.MaxUint16, 50057))
sslobodre7ce71d2019-01-22 16:21:45 -050073
Kent Hagerman334a8ce2019-05-16 16:50:33 -040074 numRWPods = getIntEnv("NUM_RW_PODS", 1, math.MaxInt32, 6)
75 numROPods = getIntEnv("NUM_RO_PODS", 1, math.MaxInt32, 3)
76
77 afrouterRouterName = getStrEnv("AFROUTER_ROUTER_NAME", "vcore")
78 afrouterRWClusterName = getStrEnv("AFROUTER_RW_CLUSTER_NAME", "vcore")
79 afrouterROClusterName = getStrEnv("AFROUTER_RO_CLUSTER_NAME", "ro_vcore")
80
81 kafkaTopic = getStrEnv("KAFKA_TOPIC", "AffinityRouter")
82 kafkaClientType = getStrEnv("KAFKA_CLIENT_TYPE", "sarama")
83 kafkaHost = getStrEnv("KAFKA_HOST", "kafka")
84 kafkaPort = getIntEnv("KAFKA_PORT", 0, math.MaxUint16, 9092)
85 kafkaInstanceID = getStrEnv("KAFKA_INSTANCE_ID", "arouterd")
86)
87
88func getIntEnv(key string, min, max, defaultValue int) int {
89 if val, have := os.LookupEnv(key); have {
90 num, err := strconv.Atoi(val)
91 if err != nil || !(min <= num && num <= max) {
92 panic(fmt.Errorf("%s must be a number in the range [%d, %d]; default: %d", key, min, max, defaultValue))
93 }
94 return num
95 }
96 return defaultValue
97}
98
99func getStrEnv(key, defaultValue string) string {
100 if val, have := os.LookupEnv(key); have {
101 return val
102 }
103 return defaultValue
104}
sslobodr16e41bc2019-01-18 16:22:21 -0500105
106func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
107
108 log.Infow("kafka-client-type", log.Fields{"client": clientType})
109 switch clientType {
110 case "sarama":
111 return kafka.NewSaramaClient(
112 kafka.Host(host),
113 kafka.Port(port),
114 kafka.ConsumerType(kafka.GroupCustomer),
115 kafka.ProducerReturnOnErrors(true),
116 kafka.ProducerReturnOnSuccess(true),
117 kafka.ProducerMaxRetries(6),
118 kafka.NumPartitions(3),
119 kafka.ConsumerGroupName(instanceID),
120 kafka.ConsumerGroupPrefix(instanceID),
121 kafka.AutoCreateTopic(false),
122 kafka.ProducerFlushFrequency(5),
123 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
124 }
125 return nil, errors.New("unsupported-client-type")
126}
127
sslobodr16e41bc2019-01-18 16:22:21 -0500128func k8sClientSet() *kubernetes.Clientset {
Kent Hagermane566c2e2019-06-03 17:56:42 -0400129 var config *rest.Config
130 if k8sApiServer != "" || k8sKubeConfigPath != "" {
131 // use combination of URL & local kube-config file
132 c, err := clientcmd.BuildConfigFromFlags(k8sApiServer, k8sKubeConfigPath)
133 if err != nil {
134 panic(err)
135 }
136 config = c
137 } else {
138 // use in-cluster config
139 c, err := rest.InClusterConfig()
140 if err != nil {
141 log.Errorf("Unable to load in-cluster config. Try setting K8S_API_SERVER and K8S_KUBE_CONFIG_PATH?")
142 panic(err)
143 }
144 config = c
sslobodr16e41bc2019-01-18 16:22:21 -0500145 }
146 // creates the clientset
147 clientset, err := kubernetes.NewForConfig(config)
148 if err != nil {
Kent Hagermane566c2e2019-06-03 17:56:42 -0400149 panic(err)
sslobodr16e41bc2019-01-18 16:22:21 -0500150 }
151
152 return clientset
153}
154
sslobodr16e41bc2019-01-18 16:22:21 -0500155func connect(addr string) (*grpc.ClientConn, error) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400156 for ctr := 0; ctr < 100; ctr++ {
Kent Hagermane566c2e2019-06-03 17:56:42 -0400157 startTime := time.Now()
sslobodre7ce71d2019-01-22 16:21:45 -0500158 log.Debugf("Trying to connect to %s", addr)
Kent Hagermane566c2e2019-06-03 17:56:42 -0400159 ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
160 conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock())
sslobodr16e41bc2019-01-18 16:22:21 -0500161 if err != nil {
Kent Hagermane566c2e2019-06-03 17:56:42 -0400162 log.Debugf("Attempt to connect failed, retrying. (%v)", err)
sslobodr16e41bc2019-01-18 16:22:21 -0500163 } else {
164 log.Debugf("Connection succeeded")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400165 return conn, err
sslobodr16e41bc2019-01-18 16:22:21 -0500166 }
Kent Hagermane566c2e2019-06-03 17:56:42 -0400167 // 5s between attempts, whether or not if the connection fails immediately
168 time.Sleep(startTime.Add(time.Second * 5).Sub(time.Now()))
sslobodr16e41bc2019-01-18 16:22:21 -0500169 }
170 log.Debugf("Too many connection attempts, giving up!")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400171 return nil, errors.New("Timeout attempting to conect")
sslobodr16e41bc2019-01-18 16:22:21 -0500172}
173
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400174func getVolthaPods(cs *kubernetes.Clientset, coreFilter *regexp.Regexp) []*volthaPod {
sslobodr8e2ccb52019-02-05 09:21:47 -0500175 var rtrn []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500176
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400177 pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
sslobodr16e41bc2019-01-18 16:22:21 -0500178 if err != nil {
Kent Hagermane566c2e2019-06-03 17:56:42 -0400179 panic(err)
sslobodr16e41bc2019-01-18 16:22:21 -0500180 }
sslobodre7ce71d2019-01-22 16:21:45 -0500181 //log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
sslobodr16e41bc2019-01-18 16:22:21 -0500182
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400183 for _, v := range pods.Items {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400184 if coreFilter.MatchString(v.Name) {
sslobodre7ce71d2019-01-22 16:21:45 -0500185 log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400186 v.Status.PodIP, v.Spec.NodeName)
sslobodre7ce71d2019-01-22 16:21:45 -0500187 // Only add the pod if it has an IP address. If it doesn't then it likely crashed and
188 // and is still in the process of getting re-started.
189 if v.Status.PodIP != "" {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400190 rtrn = append(rtrn, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName,
191 devIds: make(map[string]struct{}), backend: "", connection: ""})
sslobodre7ce71d2019-01-22 16:21:45 -0500192 }
sslobodr16e41bc2019-01-18 16:22:21 -0500193 }
194 }
195 return rtrn
196}
197
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400198func reconcilePodDeviceIds(pod *volthaPod, ids map[string]struct{}) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400199 conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
sslobodre7ce71d2019-01-22 16:21:45 -0500200 defer conn.Close()
sslobodr6c1689c2019-01-24 07:31:15 -0500201 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500202 log.Debugf("Could not query devices from %s, could not connect", pod.name)
203 return false
204 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400205
206 var idList cmn.IDs
207 for k := range ids {
208 idList.Items = append(idList.Items, &cmn.ID{Id: k})
209 }
210
sslobodre7ce71d2019-01-22 16:21:45 -0500211 client := vpb.NewVolthaServiceClient(conn)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400212 _, err = client.ReconcileDevices(context.Background(), &idList)
sslobodre7ce71d2019-01-22 16:21:45 -0500213 if err != nil {
214 log.Error(err)
215 return false
216 }
217
218 return true
219}
220
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400221func queryPodDeviceIds(pod *volthaPod) map[string]struct{} {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400222 var rtrn = make(map[string]struct{})
sslobodre7ce71d2019-01-22 16:21:45 -0500223 // Open a connection to the pod
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400224 conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
sslobodr6c1689c2019-01-24 07:31:15 -0500225 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500226 log.Debugf("Could not query devices from %s, could not connect", pod.name)
227 return rtrn
228 }
229 defer conn.Close()
230 client := vpb.NewVolthaServiceClient(conn)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400231 devs, err := client.ListDeviceIds(context.Background(), &empty.Empty{})
sslobodre7ce71d2019-01-22 16:21:45 -0500232 if err != nil {
233 log.Error(err)
234 return rtrn
235 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400236 for _, dv := range devs.Items {
237 rtrn[dv.Id] = struct{}{}
sslobodre7ce71d2019-01-22 16:21:45 -0500238 }
239
240 return rtrn
241}
242
sslobodr8e2ccb52019-02-05 09:21:47 -0500243func queryDeviceIds(pods []*volthaPod) {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400244 for pk := range pods {
sslobodre7ce71d2019-01-22 16:21:45 -0500245 // Keep the old Id list if a new list is not returned
246 if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
247 pods[pk].devIds = idList
sslobodr16e41bc2019-01-18 16:22:21 -0500248 }
sslobodr16e41bc2019-01-18 16:22:21 -0500249 }
250}
251
sslobodr8e2ccb52019-02-05 09:21:47 -0500252func allEmpty(pods []*volthaPod) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400253 for k := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500254 if len(pods[k].devIds) != 0 {
255 return false
256 }
257 }
258 return true
259}
260
sslobodr8e2ccb52019-02-05 09:21:47 -0500261func rmPod(pods []*volthaPod, idx int) []*volthaPod {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400262 return append(pods[:idx], pods[idx+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500263}
264
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400265func groupIntersectingPods1(pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
sslobodr8e2ccb52019-02-05 09:21:47 -0500266 var rtrn [][]*volthaPod
267 var out []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500268
269 for {
270 if len(pods) == 0 {
271 break
272 }
273 if len(pods[0].devIds) == 0 { // Ignore pods with no devices
274 ////log.Debugf("%s empty pod", pd[k].pod.name)
275 out = append(out, pods[0])
276 pods = rmPod(pods, 0)
277 continue
278 }
279 // Start a pod group with this pod
sslobodr8e2ccb52019-02-05 09:21:47 -0500280 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500281 grp = append(grp, pods[0])
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400282 pods = rmPod(pods, 0)
sslobodr16e41bc2019-01-18 16:22:21 -0500283 //log.Debugf("Creating new group %s", pd[k].pod.name)
284 // Find the peer pod based on device overlap
285 // It's ok if one isn't found, an empty one will be used instead
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400286 for k := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500287 if len(pods[k].devIds) == 0 { // Skip pods with no devices
288 //log.Debugf("%s empty pod", pd[k1].pod.name)
289 continue
290 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400291 if intersect(grp[0].devIds, pods[k].devIds) {
sslobodr16e41bc2019-01-18 16:22:21 -0500292 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
293 if grp[0].node == pods[k].node {
294 // This should never happen
295 log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400296 grp[0].name, pods[k].name)
sslobodr16e41bc2019-01-18 16:22:21 -0500297 continue
298 }
299 grp = append(grp, pods[k])
300 pods = rmPod(pods, k)
301 break
302
303 }
304 }
305 rtrn = append(rtrn, grp)
306 //log.Debugf("Added group %s", grp[0].name)
307 // Check if the number of groups = half the pods, if so all groups are started.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400308 if len(rtrn) == podCt>>1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500309 // Append any remaining pods to out
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400310 out = append(out, pods[0:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500311 break
312 }
313 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400314 return rtrn, out
sslobodr16e41bc2019-01-18 16:22:21 -0500315}
316
sslobodr16e41bc2019-01-18 16:22:21 -0500317func unallocPodCount(pd []*podTrack) int {
318 var rtrn int = 0
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400319 for _, v := range pd {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400320 if !v.dn {
sslobodr16e41bc2019-01-18 16:22:21 -0500321 rtrn++
322 }
323 }
324 return rtrn
325}
326
sslobodr8e2ccb52019-02-05 09:21:47 -0500327func sameNode(pod *volthaPod, grps [][]*volthaPod) bool {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400328 for _, v := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500329 if v[0].node == pod.node {
330 return true
331 }
332 if len(v) == 2 && v[1].node == pod.node {
333 return true
334 }
335 }
336 return false
337}
338
sslobodr8e2ccb52019-02-05 09:21:47 -0500339func startRemainingGroups1(grps [][]*volthaPod, pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
340 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500341
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400342 for k := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500343 if sameNode(pods[k], grps) {
344 continue
345 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500346 grp = []*volthaPod{}
sslobodr16e41bc2019-01-18 16:22:21 -0500347 grp = append(grp, pods[k])
348 pods = rmPod(pods, k)
349 grps = append(grps, grp)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400350 if len(grps) == podCt>>1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500351 break
352 }
353 }
354 return grps, pods
355}
356
sslobodr8e2ccb52019-02-05 09:21:47 -0500357func hasSingleSecondNode(grp []*volthaPod) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400358 var servers = make(map[string]struct{})
359 for k := range grp {
sslobodr16e41bc2019-01-18 16:22:21 -0500360 if k == 0 {
361 continue // Ignore the first item
362 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400363 servers[grp[k].node] = struct{}{}
sslobodr16e41bc2019-01-18 16:22:21 -0500364 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400365 if len(servers) == 1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500366 return true
367 }
368 return false
369}
370
sslobodr8e2ccb52019-02-05 09:21:47 -0500371func addNode(grps [][]*volthaPod, idx *volthaPod, item *volthaPod) [][]*volthaPod {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400372 for k := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500373 if grps[k][0].name == idx.name {
374 grps[k] = append(grps[k], item)
375 return grps
376 }
377 }
378 // TODO: Error checking required here.
379 return grps
380}
381
sslobodr8e2ccb52019-02-05 09:21:47 -0500382func removeNode(grps [][]*volthaPod, item *volthaPod) [][]*volthaPod {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400383 for k := range grps {
384 for k1 := range grps[k] {
sslobodr16e41bc2019-01-18 16:22:21 -0500385 if grps[k][k1].name == item.name {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400386 grps[k] = append(grps[k][:k1], grps[k][k1+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500387 break
388 }
389 }
390 }
391 return grps
392}
393
sslobodr8e2ccb52019-02-05 09:21:47 -0500394func groupRemainingPods1(grps [][]*volthaPod, pods []*volthaPod) [][]*volthaPod {
395 var lgrps [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500396 // All groups must be started when this function is called.
397 // Copy incomplete groups
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400398 for k := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500399 if len(grps[k]) != 2 {
400 lgrps = append(lgrps, grps[k])
401 }
402 }
403
404 // Add all pairing candidates to each started group.
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400405 for k := range pods {
406 for k2 := range lgrps {
sslobodr16e41bc2019-01-18 16:22:21 -0500407 if lgrps[k2][0].node != pods[k].node {
408 lgrps[k2] = append(lgrps[k2], pods[k])
409 }
410 }
411 }
412
413 //TODO: If any member of lgrps doesn't have at least 2
414 // nodes something is wrong. Check for that here
415
416 for {
417 for { // Address groups with only a single server choice
418 var ssn bool = false
419
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400420 for k := range lgrps {
sslobodr16e41bc2019-01-18 16:22:21 -0500421 // Now if any of the groups only have a single
422 // node as the choice for the second member
423 // address that one first.
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400424 if hasSingleSecondNode(lgrps[k]) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400425 ssn = true
sslobodr16e41bc2019-01-18 16:22:21 -0500426 // Add this pairing to the groups
427 grps = addNode(grps, lgrps[k][0], lgrps[k][1])
428 // Since this node is now used, remove it from all
429 // remaining tenative groups
430 lgrps = removeNode(lgrps, lgrps[k][1])
431 // Now remove this group completely since
432 // it's been addressed
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400433 lgrps = append(lgrps[:k], lgrps[k+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500434 break
435 }
436 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400437 if !ssn {
sslobodr16e41bc2019-01-18 16:22:21 -0500438 break
439 }
440 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400441 // Now address one of the remaining groups
sslobodr16e41bc2019-01-18 16:22:21 -0500442 if len(lgrps) == 0 {
443 break // Nothing left to do, exit the loop
444 }
445 grps = addNode(grps, lgrps[0][0], lgrps[0][1])
446 lgrps = removeNode(lgrps, lgrps[0][1])
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400447 lgrps = append(lgrps[:0], lgrps[1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500448 }
449 return grps
450}
451
sslobodr8e2ccb52019-02-05 09:21:47 -0500452func groupPods1(pods []*volthaPod) [][]*volthaPod {
453 var rtrn [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500454 var podCt int = len(pods)
455
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400456 rtrn, pods = groupIntersectingPods1(pods, podCt)
457 // There are several outcomes here
sslobodr16e41bc2019-01-18 16:22:21 -0500458 // 1) All pods have been paired and we're done
459 // 2) Some un-allocated pods remain
460 // 2.a) All groups have been started
461 // 2.b) Not all groups have been started
462 if len(pods) == 0 {
463 return rtrn
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400464 } else if len(rtrn) == podCt>>1 { // All groupings started
sslobodr16e41bc2019-01-18 16:22:21 -0500465 // Allocate the remaining (presumably empty) pods to the started groups
466 return groupRemainingPods1(rtrn, pods)
467 } else { // Some groupings started
468 // Start empty groups with remaining pods
469 // each grouping is on a different server then
470 // allocate remaining pods.
471 rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
472 return groupRemainingPods1(rtrn, pods)
473 }
474}
475
sslobodr16e41bc2019-01-18 16:22:21 -0500476func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400477 for k := range d1 {
478 if _, ok := d2[k]; ok {
sslobodr16e41bc2019-01-18 16:22:21 -0500479 return true
480 }
481 }
482 return false
483}
484
sslobodr8e2ccb52019-02-05 09:21:47 -0500485func setConnection(client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
sslobodr360c8d72019-02-05 12:47:56 -0500486 log.Debugf("Configuring backend %s : connection %s in cluster %s\n\n",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400487 backend, connection, cluster)
488 cnf := &pb.Conn{Server: "grpc_command", Cluster: cluster, Backend: backend,
489 Connection: connection, Addr: addr,
490 Port: port}
sslobodr16e41bc2019-01-18 16:22:21 -0500491 if res, err := client.SetConnection(context.Background(), cnf); err != nil {
492 log.Debugf("failed SetConnection RPC call: %s", err)
493 } else {
494 log.Debugf("Result: %v", res)
495 }
496}
497
498func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
499 log.Debugf("Configuring backend %s : affinities \n", backend)
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400500 aff := &pb.Affinity{Router: afrouterRouterName, Route: "dev_manager", Cluster: afrouterRWClusterName, Backend: backend}
501 for k := range ids {
sslobodr16e41bc2019-01-18 16:22:21 -0500502 log.Debugf("Setting affinity for id %s", k)
503 aff.Id = k
504 if res, err := client.SetAffinity(context.Background(), aff); err != nil {
505 log.Debugf("failed affinity RPC call: %s", err)
506 } else {
507 log.Debugf("Result: %v", res)
508 }
509 }
510}
511
sslobodr8e2ccb52019-02-05 09:21:47 -0500512func getBackendForCore(coreId string, coreGroups [][]*volthaPod) string {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400513 for _, v := range coreGroups {
514 for _, v2 := range v {
sslobodr38afd0d2019-01-21 12:31:46 -0500515 if v2.name == coreId {
516 return v2.backend
517 }
518 }
519 }
520 log.Errorf("No backend found for core %s\n", coreId)
521 return ""
522}
523
sslobodr16e41bc2019-01-18 16:22:21 -0500524func monitorDiscovery(client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400525 ch <-chan *ic.InterContainerMessage,
526 coreGroups [][]*volthaPod) {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400527 var id = make(map[string]struct{})
sslobodr38afd0d2019-01-21 12:31:46 -0500528
sslobodr16e41bc2019-01-18 16:22:21 -0500529 select {
530 case msg := <-ch:
531 log.Debugf("Received a device discovery notification")
sslobodr38afd0d2019-01-21 12:31:46 -0500532 device := &ic.DeviceDiscovered{}
533 if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
sslobodr16e41bc2019-01-18 16:22:21 -0500534 log.Errorf("Could not unmarshal received notification %v", msg)
535 } else {
sslobodr38afd0d2019-01-21 12:31:46 -0500536 // Set the affinity of the discovered device.
537 if be := getBackendForCore(device.Id, coreGroups); be != "" {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400538 id[device.Id] = struct{}{}
sslobodr38afd0d2019-01-21 12:31:46 -0500539 setAffinity(client, id, be)
540 } else {
541 log.Error("Cant use an empty string as a backend name")
542 }
sslobodr16e41bc2019-01-18 16:22:21 -0500543 }
544 break
545 }
546}
547
sslobodr38afd0d2019-01-21 12:31:46 -0500548func startDiscoveryMonitor(client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400549 coreGroups [][]*volthaPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500550 var ch <-chan *ic.InterContainerMessage
551 // Connect to kafka for discovery events
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400552 topic := &kafka.Topic{Name: kafkaTopic}
553 kc, err := newKafkaClient(kafkaClientType, kafkaHost, kafkaPort, kafkaInstanceID)
sslobodr16e41bc2019-01-18 16:22:21 -0500554 kc.Start()
555
556 if ch, err = kc.Subscribe(topic); err != nil {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400557 log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
sslobodr16e41bc2019-01-18 16:22:21 -0500558 return err
559 }
sslobodr38afd0d2019-01-21 12:31:46 -0500560 go monitorDiscovery(client, ch, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500561 return nil
562}
563
sslobodre7ce71d2019-01-22 16:21:45 -0500564// Determines which items in core groups
565// have changed based on the list provided
566// and returns a coreGroup with only the changed
567// items and a pod list with the new items
sslobodr8e2ccb52019-02-05 09:21:47 -0500568func getAddrDiffs(coreGroups [][]*volthaPod, rwPods []*volthaPod) ([][]*volthaPod, []*volthaPod) {
569 var nList []*volthaPod
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400570 var rtrn = make([][]*volthaPod, numRWPods>>1)
571 var ipAddrs = make(map[string]struct{})
sslobodre7ce71d2019-01-22 16:21:45 -0500572
573 log.Debug("Get addr diffs")
574
575 // Start with an empty array
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400576 for k := range rtrn {
sslobodr8e2ccb52019-02-05 09:21:47 -0500577 rtrn[k] = make([]*volthaPod, 2)
sslobodre7ce71d2019-01-22 16:21:45 -0500578 }
579
580 // Build a list with only the new items
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400581 for _, v := range rwPods {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400582 if !hasIpAddr(coreGroups, v.ipAddr) {
sslobodre7ce71d2019-01-22 16:21:45 -0500583 nList = append(nList, v)
584 }
585 ipAddrs[v.ipAddr] = struct{}{} // for the search below
586 }
587
588 // Now build the coreGroups with only the changed items
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400589 for k1, v1 := range coreGroups {
590 for k2, v2 := range v1 {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400591 if _, ok := ipAddrs[v2.ipAddr]; !ok {
sslobodre7ce71d2019-01-22 16:21:45 -0500592 rtrn[k1][k2] = v2
593 }
594 }
595 }
596 return rtrn, nList
597}
598
599// Figure out where best to put the new pods
600// in the coreGroup array based on the old
601// pods being replaced. The criteria is that
602// the new pod be on the same server as the
603// old pod was.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400604func reconcileAddrDiffs(coreGroupDiffs [][]*volthaPod, rwPodDiffs []*volthaPod) [][]*volthaPod {
sslobodr8e2ccb52019-02-05 09:21:47 -0500605 var srvrs map[string][]*volthaPod = make(map[string][]*volthaPod)
sslobodre7ce71d2019-01-22 16:21:45 -0500606
607 log.Debug("Reconciling diffs")
608 log.Debug("Building server list")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400609 for _, v := range rwPodDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500610 log.Debugf("Adding %v to the server list", *v)
611 srvrs[v.node] = append(srvrs[v.node], v)
612 }
613
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400614 for k1, v1 := range coreGroupDiffs {
615 log.Debugf("k1:%v, v1:%v", k1, v1)
616 for k2, v2 := range v1 {
617 log.Debugf("k2:%v, v2:%v", k2, v2)
sslobodre7ce71d2019-01-22 16:21:45 -0500618 if v2 == nil { // Nothing to do here
619 continue
620 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400621 if _, ok := srvrs[v2.node]; ok {
sslobodre7ce71d2019-01-22 16:21:45 -0500622 coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
623 if len(srvrs[v2.node]) > 1 { // remove one entry from the list
624 srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
625 } else { // Delete the endtry from the map
626 delete(srvrs, v2.node)
627 }
628 } else {
629 log.Error("This should never happen, node appears to have changed names")
630 // attempt to limp along by keeping this old entry
631 }
632 }
633 }
634
635 return coreGroupDiffs
636}
637
sslobodr8e2ccb52019-02-05 09:21:47 -0500638func applyAddrDiffs(client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
639 var newEntries [][]*volthaPod
sslobodre7ce71d2019-01-22 16:21:45 -0500640
641 log.Debug("Applying diffs")
sslobodr8e2ccb52019-02-05 09:21:47 -0500642 switch cores := coreList.(type) {
643 case [][]*volthaPod:
644 newEntries = reconcileAddrDiffs(getAddrDiffs(cores, nPods))
sslobodre7ce71d2019-01-22 16:21:45 -0500645
sslobodr8e2ccb52019-02-05 09:21:47 -0500646 // Now replace the information in coreGropus with the new
647 // entries and then reconcile the device ids on the core
648 // that's in the new entry with the device ids of it's
649 // active-active peer.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400650 for k1, v1 := range cores {
651 for k2, v2 := range v1 {
sslobodr8e2ccb52019-02-05 09:21:47 -0500652 if newEntries[k1][k2] != nil {
653 // TODO: Missing is the case where bothe the primary
654 // and the secondary core crash and come back.
655 // Pull the device ids from the active-active peer
656 ids := queryPodDeviceIds(cores[k1][k2^1])
657 if len(ids) != 0 {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400658 if !reconcilePodDeviceIds(newEntries[k1][k2], ids) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400659 log.Errorf("Attempt to reconcile ids on pod %v failed", newEntries[k1][k2])
sslobodr8e2ccb52019-02-05 09:21:47 -0500660 }
sslobodre7ce71d2019-01-22 16:21:45 -0500661 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500662 // Send the affininty router new connection information
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400663 setConnection(client, afrouterRWClusterName, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, podGrpcPort)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400664 // Copy the new entry information over
sslobodr8e2ccb52019-02-05 09:21:47 -0500665 cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
666 cores[k1][k2].name = newEntries[k1][k2].name
667 cores[k1][k2].devIds = ids
sslobodre7ce71d2019-01-22 16:21:45 -0500668 }
sslobodre7ce71d2019-01-22 16:21:45 -0500669 }
670 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500671 case []*volthaPod:
672 var mia []*volthaPod
673 var found bool
674 // TODO: Break this using functions to simplify
675 // reading of the code.
676 // Find the core(s) that have changed addresses
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400677 for k1, v1 := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500678 found = false
679 for _, v2 := range nPods {
680 if v1.ipAddr == v2.ipAddr {
681 found = true
682 break
683 }
684 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400685 if !found {
sslobodr8e2ccb52019-02-05 09:21:47 -0500686 mia = append(mia, cores[k1])
687 }
688 }
689 // Now plug in the new addresses and set the connection
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400690 for _, v1 := range nPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500691 found = false
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400692 for _, v2 := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500693 if v1.ipAddr == v2.ipAddr {
694 found = true
695 break
696 }
697 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400698 if found {
sslobodr8e2ccb52019-02-05 09:21:47 -0500699 continue
700 }
701 mia[0].ipAddr = v1.ipAddr
702 mia[0].name = v1.name
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400703 setConnection(client, afrouterROClusterName, mia[0].backend, mia[0].connection, v1.ipAddr, podGrpcPort)
sslobodr8e2ccb52019-02-05 09:21:47 -0500704 // Now get rid of the mia entry just processed
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400705 mia = append(mia[:0], mia[1:]...)
sslobodr8e2ccb52019-02-05 09:21:47 -0500706 }
707 default:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400708 log.Error("Internal: Unexpected type in call to applyAddrDiffs")
sslobodre7ce71d2019-01-22 16:21:45 -0500709 }
710}
711
sslobodr8e2ccb52019-02-05 09:21:47 -0500712func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400713 var byName = make(map[string]*volthaPod)
sslobodrcd37bc52019-01-24 11:47:16 -0500714
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400715 // Convenience
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400716 for _, v := range rwPods {
sslobodrcd37bc52019-01-24 11:47:16 -0500717 byName[v.name] = v
718 }
719
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400720 for k1, v1 := range coreGroups {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400721 for k2 := range v1 {
sslobodrcd37bc52019-01-24 11:47:16 -0500722 coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
723 }
724 }
725}
726
sslobodr16e41bc2019-01-18 16:22:21 -0500727func startCoreMonitor(client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400728 clientset *kubernetes.Clientset,
729 rwCoreFltr *regexp.Regexp,
730 roCoreFltr *regexp.Regexp,
731 coreGroups [][]*volthaPod,
732 oRoPods []*volthaPod) error {
sslobodr16e41bc2019-01-18 16:22:21 -0500733 // Now that initial allocation has been completed, monitor the pods
734 // for IP changes
735 // The main loop needs to do the following:
736 // 1) Periodically query the pods and filter out
737 // the vcore ones
738 // 2) Validate that the pods running are the same
739 // as the previous check
740 // 3) Validate that the IP addresses are the same
741 // as the last check.
742 // If the pod name(s) ha(s/ve) changed then remove
743 // the unused pod names and add in the new pod names
744 // maintaining the cluster/backend information.
745 // If an IP address has changed (which shouldn't
746 // happen unless a pod is re-started) it should get
747 // caught by the pod name change.
748 for {
749 time.Sleep(10 * time.Second) // Wait a while
750 // Get the rw core list from k8s
sslobodr8e2ccb52019-02-05 09:21:47 -0500751 rwPods := getVolthaPods(clientset, rwCoreFltr)
sslobodre7ce71d2019-01-22 16:21:45 -0500752 queryDeviceIds(rwPods)
sslobodrcd37bc52019-01-24 11:47:16 -0500753 updateDeviceIds(coreGroups, rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500754 // If we didn't get 2n+1 pods then wait since
755 // something is down and will hopefully come
756 // back up at some point.
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400757 if len(rwPods) != numRWPods {
sslobodr16e41bc2019-01-18 16:22:21 -0500758 continue
759 }
760 // We have all pods, check if any IP addresses
761 // have changed.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400762 for _, v := range rwPods {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400763 if !hasIpAddr(coreGroups, v.ipAddr) {
sslobodre7ce71d2019-01-22 16:21:45 -0500764 log.Debug("Address has changed...")
765 applyAddrDiffs(client, coreGroups, rwPods)
sslobodr8e2ccb52019-02-05 09:21:47 -0500766 break
sslobodre7ce71d2019-01-22 16:21:45 -0500767 }
sslobodr16e41bc2019-01-18 16:22:21 -0500768 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500769
770 roPods := getVolthaPods(clientset, roCoreFltr)
771
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400772 if len(roPods) != numROPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500773 continue
774 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400775 for _, v := range roPods {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400776 if !hasIpAddr(oRoPods, v.ipAddr) {
sslobodr8e2ccb52019-02-05 09:21:47 -0500777 applyAddrDiffs(client, oRoPods, roPods)
778 break
779 }
780 }
781
sslobodr16e41bc2019-01-18 16:22:21 -0500782 }
sslobodr16e41bc2019-01-18 16:22:21 -0500783}
784
sslobodr8e2ccb52019-02-05 09:21:47 -0500785func hasIpAddr(coreList interface{}, ipAddr string) bool {
786 switch cores := coreList.(type) {
787 case []*volthaPod:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400788 for _, v := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500789 if v.ipAddr == ipAddr {
sslobodre7ce71d2019-01-22 16:21:45 -0500790 return true
791 }
792 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500793 case [][]*volthaPod:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400794 for _, v1 := range cores {
795 for _, v2 := range v1 {
sslobodr8e2ccb52019-02-05 09:21:47 -0500796 if v2.ipAddr == ipAddr {
797 return true
798 }
799 }
800 }
801 default:
802 log.Error("Internal: Unexpected type in call to hasIpAddr")
sslobodre7ce71d2019-01-22 16:21:45 -0500803 }
804 return false
805}
806
sslobodr16e41bc2019-01-18 16:22:21 -0500807func main() {
808 // This is currently hard coded to a cluster with 3 servers
809 //var connections map[string]configConn = make(map[string]configConn)
810 //var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
sslobodr16e41bc2019-01-18 16:22:21 -0500811 var err error
812 var conn *grpc.ClientConn
813
David K. Bainbridgef430cd52019-05-28 15:00:35 -0700814 config := &Configuration{}
815 cmdParse := flag.NewFlagSet(path.Base(os.Args[0]), flag.ContinueOnError)
816 config.DisplayVersionOnly = cmdParse.Bool("version", false, "Print version information and exit")
817
818 err = cmdParse.Parse(os.Args[1:])
819 if err != nil {
820 fmt.Printf("Error: %v\n", err)
821 os.Exit(1)
822 }
823
824 if *config.DisplayVersionOnly {
825 fmt.Println("VOLTHA API Server (afrouterd)")
826 fmt.Println(version.VersionInfo.String(" "))
827 return
828 }
829
sslobodr16e41bc2019-01-18 16:22:21 -0500830 // Set up the regular expression to identify the voltha cores
sslobodr8e2ccb52019-02-05 09:21:47 -0500831 rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
832 roCoreFltr := regexp.MustCompile(`ro-core-`)
sslobodr16e41bc2019-01-18 16:22:21 -0500833
834 // Set up logging
835 if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
836 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
837 }
838
839 // Set up kubernetes api
840 clientset := k8sClientSet()
841
842 // Connect to the affinity router and set up the client
843 conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
844 if err != nil {
Kent Hagermane566c2e2019-06-03 17:56:42 -0400845 panic(err)
sslobodr16e41bc2019-01-18 16:22:21 -0500846 }
Kent Hagermane566c2e2019-06-03 17:56:42 -0400847 defer conn.Close()
sslobodr16e41bc2019-01-18 16:22:21 -0500848 client := pb.NewConfigurationClient(conn)
849
850 // Get the voltha rw-core podes
sslobodr8e2ccb52019-02-05 09:21:47 -0500851 rwPods := getVolthaPods(clientset, rwCoreFltr)
sslobodr16e41bc2019-01-18 16:22:21 -0500852
853 // Fetch the devices held by each running core
sslobodre7ce71d2019-01-22 16:21:45 -0500854 queryDeviceIds(rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500855
856 // For debugging... comment out l8r
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400857 for _, v := range rwPods {
sslobodr16e41bc2019-01-18 16:22:21 -0500858 log.Debugf("Pod list %v", *v)
859 }
860
861 coreGroups := groupPods1(rwPods)
862
sslobodr16e41bc2019-01-18 16:22:21 -0500863 // Assign the groupings to the the backends and connections
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400864 for k := range coreGroups {
865 for k1 := range coreGroups[k] {
866 coreGroups[k][k1].cluster = afrouterRWClusterName
867 coreGroups[k][k1].backend = afrouterRWClusterName + strconv.Itoa(k+1)
868 coreGroups[k][k1].connection = afrouterRWClusterName + strconv.Itoa(k+1) + strconv.Itoa(k1+1)
sslobodr16e41bc2019-01-18 16:22:21 -0500869 }
870 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400871 log.Info("Core grouping completed")
sslobodr16e41bc2019-01-18 16:22:21 -0500872
873 // TODO: Debugging code, comment out for production
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400874 for k, v := range coreGroups {
875 for k2, v2 := range v {
sslobodr16e41bc2019-01-18 16:22:21 -0500876 log.Debugf("Core group %d,%d: %v", k, k2, v2)
877 }
878 }
sslobodrcd37bc52019-01-24 11:47:16 -0500879 log.Info("Setting affinities")
sslobodr16e41bc2019-01-18 16:22:21 -0500880 // Now set the affinities for exising devices in the cores
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400881 for _, v := range coreGroups {
sslobodr16e41bc2019-01-18 16:22:21 -0500882 setAffinity(client, v[0].devIds, v[0].backend)
883 setAffinity(client, v[1].devIds, v[1].backend)
884 }
sslobodrcd37bc52019-01-24 11:47:16 -0500885 log.Info("Setting connections")
sslobodr16e41bc2019-01-18 16:22:21 -0500886 // Configure the backeds based on the calculated core groups
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400887 for _, v := range coreGroups {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400888 setConnection(client, afrouterRWClusterName, v[0].backend, v[0].connection, v[0].ipAddr, podGrpcPort)
889 setConnection(client, afrouterRWClusterName, v[1].backend, v[1].connection, v[1].ipAddr, podGrpcPort)
sslobodr8e2ccb52019-02-05 09:21:47 -0500890 }
891
892 // Process the read only pods
893 roPods := getVolthaPods(clientset, roCoreFltr)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400894 for k, v := range roPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500895 log.Debugf("Processing ro_pod %v", v)
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400896 vN := afrouterROClusterName + strconv.Itoa(k+1)
sslobodr8e2ccb52019-02-05 09:21:47 -0500897 log.Debugf("Setting connection %s, %s, %s", vN, vN+"1", v.ipAddr)
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400898 roPods[k].cluster = afrouterROClusterName
sslobodr8e2ccb52019-02-05 09:21:47 -0500899 roPods[k].backend = vN
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400900 roPods[k].connection = vN + "1"
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400901 setConnection(client, afrouterROClusterName, v.backend, v.connection, v.ipAddr, podGrpcPort)
sslobodr16e41bc2019-01-18 16:22:21 -0500902 }
903
sslobodrcd37bc52019-01-24 11:47:16 -0500904 log.Info("Starting discovery monitoring")
sslobodr38afd0d2019-01-21 12:31:46 -0500905 startDiscoveryMonitor(client, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500906
sslobodrcd37bc52019-01-24 11:47:16 -0500907 log.Info("Starting core monitoring")
sslobodr8e2ccb52019-02-05 09:21:47 -0500908 startCoreMonitor(client, clientset, rwCoreFltr,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400909 roCoreFltr, coreGroups, roPods) // Never returns
sslobodr16e41bc2019-01-18 16:22:21 -0500910 return
sslobodr16e41bc2019-01-18 16:22:21 -0500911}