blob: 3516261c55062313d2816a40da91d306e10afcd4 [file] [log] [blame]
sslobodr16e41bc2019-01-18 16:22:21 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package main
18
19import (
sslobodr16e41bc2019-01-18 16:22:21 -050020 "errors"
David K. Bainbridgef430cd52019-05-28 15:00:35 -070021 "flag"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040022 "fmt"
Kent Hagerman92661462019-06-04 18:22:05 -040023 "google.golang.org/grpc/connectivity"
24 "google.golang.org/grpc/keepalive"
Kent Hagerman737b9e52019-06-18 16:29:33 -040025 "k8s.io/api/core/v1"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040026 "math"
27 "os"
David K. Bainbridgef430cd52019-05-28 15:00:35 -070028 "path"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040029 "regexp"
sslobodr16e41bc2019-01-18 16:22:21 -050030 "strconv"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040031 "time"
sslobodr16e41bc2019-01-18 16:22:21 -050032
sslobodr16e41bc2019-01-18 16:22:21 -050033 "github.com/golang/protobuf/ptypes"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040034 "github.com/golang/protobuf/ptypes/empty"
sslobodr16e41bc2019-01-18 16:22:21 -050035 "github.com/opencord/voltha-go/common/log"
David K. Bainbridgef430cd52019-05-28 15:00:35 -070036 "github.com/opencord/voltha-go/common/version"
Kent Hagerman334a8ce2019-05-16 16:50:33 -040037 "github.com/opencord/voltha-go/kafka"
William Kurkiandaa6bb22019-03-07 12:26:28 -050038 pb "github.com/opencord/voltha-protos/go/afrouter"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040039 cmn "github.com/opencord/voltha-protos/go/common"
William Kurkiandaa6bb22019-03-07 12:26:28 -050040 ic "github.com/opencord/voltha-protos/go/inter_container"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040041 vpb "github.com/opencord/voltha-protos/go/voltha"
42 "golang.org/x/net/context"
43 "google.golang.org/grpc"
44 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
45 "k8s.io/client-go/kubernetes"
46 "k8s.io/client-go/rest"
Kent Hagermane566c2e2019-06-03 17:56:42 -040047 "k8s.io/client-go/tools/clientcmd"
sslobodr16e41bc2019-01-18 16:22:21 -050048)
49
sslobodr8e2ccb52019-02-05 09:21:47 -050050type volthaPod struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040051 name string
52 ipAddr string
53 node string
54 devIds map[string]struct{}
55 cluster string
56 backend string
sslobodr16e41bc2019-01-18 16:22:21 -050057 connection string
58}
59
60type podTrack struct {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040061 pod *volthaPod
62 dn bool
sslobodr16e41bc2019-01-18 16:22:21 -050063}
64
David K. Bainbridgef430cd52019-05-28 15:00:35 -070065type Configuration struct {
66 DisplayVersionOnly *bool
67}
68
Kent Hagerman334a8ce2019-05-16 16:50:33 -040069var (
Kent Hagermane566c2e2019-06-03 17:56:42 -040070 // if k8s variables are undefined, will attempt to use in-cluster config
71 k8sApiServer = getStrEnv("K8S_API_SERVER", "")
72 k8sKubeConfigPath = getStrEnv("K8S_KUBE_CONFIG_PATH", "")
73
Kent Hagerman334a8ce2019-05-16 16:50:33 -040074 podNamespace = getStrEnv("POD_NAMESPACE", "voltha")
75 podGrpcPort = uint64(getIntEnv("POD_GRPC_PORT", 0, math.MaxUint16, 50057))
sslobodre7ce71d2019-01-22 16:21:45 -050076
Kent Hagerman334a8ce2019-05-16 16:50:33 -040077 numRWPods = getIntEnv("NUM_RW_PODS", 1, math.MaxInt32, 6)
78 numROPods = getIntEnv("NUM_RO_PODS", 1, math.MaxInt32, 3)
79
Kent Hagerman92661462019-06-04 18:22:05 -040080 afrouterApiAddress = getStrEnv("AFROUTER_API_ADDRESS", "localhost:55554")
81
Kent Hagerman334a8ce2019-05-16 16:50:33 -040082 afrouterRouterName = getStrEnv("AFROUTER_ROUTER_NAME", "vcore")
83 afrouterRWClusterName = getStrEnv("AFROUTER_RW_CLUSTER_NAME", "vcore")
84 afrouterROClusterName = getStrEnv("AFROUTER_RO_CLUSTER_NAME", "ro_vcore")
85
86 kafkaTopic = getStrEnv("KAFKA_TOPIC", "AffinityRouter")
87 kafkaClientType = getStrEnv("KAFKA_CLIENT_TYPE", "sarama")
88 kafkaHost = getStrEnv("KAFKA_HOST", "kafka")
89 kafkaPort = getIntEnv("KAFKA_PORT", 0, math.MaxUint16, 9092)
90 kafkaInstanceID = getStrEnv("KAFKA_INSTANCE_ID", "arouterd")
91)
92
93func getIntEnv(key string, min, max, defaultValue int) int {
94 if val, have := os.LookupEnv(key); have {
95 num, err := strconv.Atoi(val)
96 if err != nil || !(min <= num && num <= max) {
97 panic(fmt.Errorf("%s must be a number in the range [%d, %d]; default: %d", key, min, max, defaultValue))
98 }
99 return num
100 }
101 return defaultValue
102}
103
104func getStrEnv(key, defaultValue string) string {
105 if val, have := os.LookupEnv(key); have {
106 return val
107 }
108 return defaultValue
109}
sslobodr16e41bc2019-01-18 16:22:21 -0500110
111func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
112
113 log.Infow("kafka-client-type", log.Fields{"client": clientType})
114 switch clientType {
115 case "sarama":
116 return kafka.NewSaramaClient(
117 kafka.Host(host),
118 kafka.Port(port),
119 kafka.ConsumerType(kafka.GroupCustomer),
120 kafka.ProducerReturnOnErrors(true),
121 kafka.ProducerReturnOnSuccess(true),
122 kafka.ProducerMaxRetries(6),
123 kafka.NumPartitions(3),
124 kafka.ConsumerGroupName(instanceID),
125 kafka.ConsumerGroupPrefix(instanceID),
126 kafka.AutoCreateTopic(false),
127 kafka.ProducerFlushFrequency(5),
128 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
129 }
130 return nil, errors.New("unsupported-client-type")
131}
132
sslobodr16e41bc2019-01-18 16:22:21 -0500133func k8sClientSet() *kubernetes.Clientset {
Kent Hagermane566c2e2019-06-03 17:56:42 -0400134 var config *rest.Config
135 if k8sApiServer != "" || k8sKubeConfigPath != "" {
136 // use combination of URL & local kube-config file
137 c, err := clientcmd.BuildConfigFromFlags(k8sApiServer, k8sKubeConfigPath)
138 if err != nil {
139 panic(err)
140 }
141 config = c
142 } else {
143 // use in-cluster config
144 c, err := rest.InClusterConfig()
145 if err != nil {
146 log.Errorf("Unable to load in-cluster config. Try setting K8S_API_SERVER and K8S_KUBE_CONFIG_PATH?")
147 panic(err)
148 }
149 config = c
sslobodr16e41bc2019-01-18 16:22:21 -0500150 }
151 // creates the clientset
152 clientset, err := kubernetes.NewForConfig(config)
153 if err != nil {
Kent Hagermane566c2e2019-06-03 17:56:42 -0400154 panic(err)
sslobodr16e41bc2019-01-18 16:22:21 -0500155 }
156
157 return clientset
158}
159
Kent Hagerman92661462019-06-04 18:22:05 -0400160func connect(ctx context.Context, addr string) (*grpc.ClientConn, error) {
161 log.Debugf("Trying to connect to %s", addr)
162 conn, err := grpc.DialContext(ctx, addr,
163 grpc.WithInsecure(),
164 grpc.WithBlock(),
165 grpc.WithBackoffMaxDelay(time.Second*5),
166 grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: time.Second * 10, Timeout: time.Second * 5}))
167 if err == nil {
168 log.Debugf("Connection succeeded")
sslobodr16e41bc2019-01-18 16:22:21 -0500169 }
Kent Hagerman92661462019-06-04 18:22:05 -0400170 return conn, err
sslobodr16e41bc2019-01-18 16:22:21 -0500171}
172
Kent Hagerman737b9e52019-06-18 16:29:33 -0400173func getVolthaPods(cs *kubernetes.Clientset) ([]*volthaPod, []*volthaPod, error) {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400174 pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
sslobodr16e41bc2019-01-18 16:22:21 -0500175 if err != nil {
Kent Hagerman737b9e52019-06-18 16:29:33 -0400176 return nil, nil, err
sslobodr16e41bc2019-01-18 16:22:21 -0500177 }
sslobodr16e41bc2019-01-18 16:22:21 -0500178
Kent Hagerman737b9e52019-06-18 16:29:33 -0400179 // Set up the regular expression to identify the voltha cores
180 rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
181 roCoreFltr := regexp.MustCompile(`ro-core-`)
182
183 var rwPods, roPods []*volthaPod
184items:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400185 for _, v := range pods.Items {
Kent Hagerman737b9e52019-06-18 16:29:33 -0400186 // only pods that are actually running should be considered
187 if v.Status.Phase == v1.PodRunning {
188 for _, condition := range v.Status.Conditions {
189 if condition.Status != v1.ConditionTrue {
190 continue items
191 }
192 }
193
194 if rwCoreFltr.MatchString(v.Name) {
195 log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name, v.Status.PodIP, v.Spec.NodeName)
196 rwPods = append(rwPods, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName, devIds: make(map[string]struct{}), backend: "", connection: ""})
197 } else if roCoreFltr.MatchString(v.Name) {
198 log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name, v.Status.PodIP, v.Spec.NodeName)
199 roPods = append(roPods, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName, devIds: make(map[string]struct{}), backend: "", connection: ""})
sslobodre7ce71d2019-01-22 16:21:45 -0500200 }
sslobodr16e41bc2019-01-18 16:22:21 -0500201 }
202 }
Kent Hagerman737b9e52019-06-18 16:29:33 -0400203 return rwPods, roPods, nil
sslobodr16e41bc2019-01-18 16:22:21 -0500204}
205
Kent Hagerman737b9e52019-06-18 16:29:33 -0400206func reconcilePodDeviceIds(ctx context.Context, pod *volthaPod, ids map[string]struct{}) bool {
207 ctxTimeout, _ := context.WithTimeout(ctx, time.Second*5)
208 conn, err := connect(ctxTimeout, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
sslobodr6c1689c2019-01-24 07:31:15 -0500209 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500210 log.Debugf("Could not query devices from %s, could not connect", pod.name)
211 return false
212 }
Kent Hagerman92661462019-06-04 18:22:05 -0400213 defer conn.Close()
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400214
215 var idList cmn.IDs
216 for k := range ids {
217 idList.Items = append(idList.Items, &cmn.ID{Id: k})
218 }
219
sslobodre7ce71d2019-01-22 16:21:45 -0500220 client := vpb.NewVolthaServiceClient(conn)
Kent Hagerman737b9e52019-06-18 16:29:33 -0400221 _, err = client.ReconcileDevices(ctx, &idList)
sslobodre7ce71d2019-01-22 16:21:45 -0500222 if err != nil {
223 log.Error(err)
224 return false
225 }
226
227 return true
228}
229
Kent Hagerman737b9e52019-06-18 16:29:33 -0400230func queryPodDeviceIds(ctx context.Context, pod *volthaPod) map[string]struct{} {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400231 var rtrn = make(map[string]struct{})
sslobodre7ce71d2019-01-22 16:21:45 -0500232 // Open a connection to the pod
Kent Hagerman737b9e52019-06-18 16:29:33 -0400233 ctxTimeout, _ := context.WithTimeout(ctx, time.Second*5)
234 conn, err := connect(ctxTimeout, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
sslobodr6c1689c2019-01-24 07:31:15 -0500235 if err != nil {
sslobodre7ce71d2019-01-22 16:21:45 -0500236 log.Debugf("Could not query devices from %s, could not connect", pod.name)
237 return rtrn
238 }
239 defer conn.Close()
240 client := vpb.NewVolthaServiceClient(conn)
Kent Hagerman737b9e52019-06-18 16:29:33 -0400241 devs, err := client.ListDeviceIds(ctx, &empty.Empty{})
sslobodre7ce71d2019-01-22 16:21:45 -0500242 if err != nil {
243 log.Error(err)
244 return rtrn
245 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400246 for _, dv := range devs.Items {
247 rtrn[dv.Id] = struct{}{}
sslobodre7ce71d2019-01-22 16:21:45 -0500248 }
249
250 return rtrn
251}
252
Kent Hagerman737b9e52019-06-18 16:29:33 -0400253func queryDeviceIds(ctx context.Context, pods []*volthaPod) {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400254 for pk := range pods {
sslobodre7ce71d2019-01-22 16:21:45 -0500255 // Keep the old Id list if a new list is not returned
Kent Hagerman737b9e52019-06-18 16:29:33 -0400256 if idList := queryPodDeviceIds(ctx, pods[pk]); len(idList) != 0 {
sslobodre7ce71d2019-01-22 16:21:45 -0500257 pods[pk].devIds = idList
sslobodr16e41bc2019-01-18 16:22:21 -0500258 }
sslobodr16e41bc2019-01-18 16:22:21 -0500259 }
260}
261
sslobodr8e2ccb52019-02-05 09:21:47 -0500262func allEmpty(pods []*volthaPod) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400263 for k := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500264 if len(pods[k].devIds) != 0 {
265 return false
266 }
267 }
268 return true
269}
270
sslobodr8e2ccb52019-02-05 09:21:47 -0500271func rmPod(pods []*volthaPod, idx int) []*volthaPod {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400272 return append(pods[:idx], pods[idx+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500273}
274
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400275func groupIntersectingPods1(pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
sslobodr8e2ccb52019-02-05 09:21:47 -0500276 var rtrn [][]*volthaPod
277 var out []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500278
279 for {
280 if len(pods) == 0 {
281 break
282 }
283 if len(pods[0].devIds) == 0 { // Ignore pods with no devices
284 ////log.Debugf("%s empty pod", pd[k].pod.name)
285 out = append(out, pods[0])
286 pods = rmPod(pods, 0)
287 continue
288 }
289 // Start a pod group with this pod
sslobodr8e2ccb52019-02-05 09:21:47 -0500290 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500291 grp = append(grp, pods[0])
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400292 pods = rmPod(pods, 0)
sslobodr16e41bc2019-01-18 16:22:21 -0500293 //log.Debugf("Creating new group %s", pd[k].pod.name)
294 // Find the peer pod based on device overlap
295 // It's ok if one isn't found, an empty one will be used instead
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400296 for k := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500297 if len(pods[k].devIds) == 0 { // Skip pods with no devices
298 //log.Debugf("%s empty pod", pd[k1].pod.name)
299 continue
300 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400301 if intersect(grp[0].devIds, pods[k].devIds) {
sslobodr16e41bc2019-01-18 16:22:21 -0500302 //log.Debugf("intersection found %s:%s", pd[k].pod.name, pd[k1].pod.name)
303 if grp[0].node == pods[k].node {
304 // This should never happen
305 log.Errorf("pods %s and %s intersect and are on the same server!! Not pairing",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400306 grp[0].name, pods[k].name)
sslobodr16e41bc2019-01-18 16:22:21 -0500307 continue
308 }
309 grp = append(grp, pods[k])
310 pods = rmPod(pods, k)
311 break
312
313 }
314 }
315 rtrn = append(rtrn, grp)
316 //log.Debugf("Added group %s", grp[0].name)
317 // Check if the number of groups = half the pods, if so all groups are started.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400318 if len(rtrn) == podCt>>1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500319 // Append any remaining pods to out
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400320 out = append(out, pods[0:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500321 break
322 }
323 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400324 return rtrn, out
sslobodr16e41bc2019-01-18 16:22:21 -0500325}
326
sslobodr16e41bc2019-01-18 16:22:21 -0500327func unallocPodCount(pd []*podTrack) int {
328 var rtrn int = 0
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400329 for _, v := range pd {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400330 if !v.dn {
sslobodr16e41bc2019-01-18 16:22:21 -0500331 rtrn++
332 }
333 }
334 return rtrn
335}
336
sslobodr8e2ccb52019-02-05 09:21:47 -0500337func sameNode(pod *volthaPod, grps [][]*volthaPod) bool {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400338 for _, v := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500339 if v[0].node == pod.node {
340 return true
341 }
342 if len(v) == 2 && v[1].node == pod.node {
343 return true
344 }
345 }
346 return false
347}
348
sslobodr8e2ccb52019-02-05 09:21:47 -0500349func startRemainingGroups1(grps [][]*volthaPod, pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
350 var grp []*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500351
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400352 for k := range pods {
sslobodr16e41bc2019-01-18 16:22:21 -0500353 if sameNode(pods[k], grps) {
354 continue
355 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500356 grp = []*volthaPod{}
sslobodr16e41bc2019-01-18 16:22:21 -0500357 grp = append(grp, pods[k])
358 pods = rmPod(pods, k)
359 grps = append(grps, grp)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400360 if len(grps) == podCt>>1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500361 break
362 }
363 }
364 return grps, pods
365}
366
sslobodr8e2ccb52019-02-05 09:21:47 -0500367func hasSingleSecondNode(grp []*volthaPod) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400368 var servers = make(map[string]struct{})
369 for k := range grp {
sslobodr16e41bc2019-01-18 16:22:21 -0500370 if k == 0 {
371 continue // Ignore the first item
372 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400373 servers[grp[k].node] = struct{}{}
sslobodr16e41bc2019-01-18 16:22:21 -0500374 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400375 if len(servers) == 1 {
sslobodr16e41bc2019-01-18 16:22:21 -0500376 return true
377 }
378 return false
379}
380
sslobodr8e2ccb52019-02-05 09:21:47 -0500381func addNode(grps [][]*volthaPod, idx *volthaPod, item *volthaPod) [][]*volthaPod {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400382 for k := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500383 if grps[k][0].name == idx.name {
384 grps[k] = append(grps[k], item)
385 return grps
386 }
387 }
388 // TODO: Error checking required here.
389 return grps
390}
391
sslobodr8e2ccb52019-02-05 09:21:47 -0500392func removeNode(grps [][]*volthaPod, item *volthaPod) [][]*volthaPod {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400393 for k := range grps {
394 for k1 := range grps[k] {
sslobodr16e41bc2019-01-18 16:22:21 -0500395 if grps[k][k1].name == item.name {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400396 grps[k] = append(grps[k][:k1], grps[k][k1+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500397 break
398 }
399 }
400 }
401 return grps
402}
403
sslobodr8e2ccb52019-02-05 09:21:47 -0500404func groupRemainingPods1(grps [][]*volthaPod, pods []*volthaPod) [][]*volthaPod {
405 var lgrps [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500406 // All groups must be started when this function is called.
407 // Copy incomplete groups
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400408 for k := range grps {
sslobodr16e41bc2019-01-18 16:22:21 -0500409 if len(grps[k]) != 2 {
410 lgrps = append(lgrps, grps[k])
411 }
412 }
413
414 // Add all pairing candidates to each started group.
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400415 for k := range pods {
416 for k2 := range lgrps {
sslobodr16e41bc2019-01-18 16:22:21 -0500417 if lgrps[k2][0].node != pods[k].node {
418 lgrps[k2] = append(lgrps[k2], pods[k])
419 }
420 }
421 }
422
423 //TODO: If any member of lgrps doesn't have at least 2
424 // nodes something is wrong. Check for that here
425
426 for {
427 for { // Address groups with only a single server choice
428 var ssn bool = false
429
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400430 for k := range lgrps {
sslobodr16e41bc2019-01-18 16:22:21 -0500431 // Now if any of the groups only have a single
432 // node as the choice for the second member
433 // address that one first.
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400434 if hasSingleSecondNode(lgrps[k]) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400435 ssn = true
sslobodr16e41bc2019-01-18 16:22:21 -0500436 // Add this pairing to the groups
437 grps = addNode(grps, lgrps[k][0], lgrps[k][1])
438 // Since this node is now used, remove it from all
439 // remaining tenative groups
440 lgrps = removeNode(lgrps, lgrps[k][1])
441 // Now remove this group completely since
442 // it's been addressed
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400443 lgrps = append(lgrps[:k], lgrps[k+1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500444 break
445 }
446 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400447 if !ssn {
sslobodr16e41bc2019-01-18 16:22:21 -0500448 break
449 }
450 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400451 // Now address one of the remaining groups
sslobodr16e41bc2019-01-18 16:22:21 -0500452 if len(lgrps) == 0 {
453 break // Nothing left to do, exit the loop
454 }
455 grps = addNode(grps, lgrps[0][0], lgrps[0][1])
456 lgrps = removeNode(lgrps, lgrps[0][1])
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400457 lgrps = append(lgrps[:0], lgrps[1:]...)
sslobodr16e41bc2019-01-18 16:22:21 -0500458 }
459 return grps
460}
461
sslobodr8e2ccb52019-02-05 09:21:47 -0500462func groupPods1(pods []*volthaPod) [][]*volthaPod {
463 var rtrn [][]*volthaPod
sslobodr16e41bc2019-01-18 16:22:21 -0500464 var podCt int = len(pods)
465
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400466 rtrn, pods = groupIntersectingPods1(pods, podCt)
467 // There are several outcomes here
sslobodr16e41bc2019-01-18 16:22:21 -0500468 // 1) All pods have been paired and we're done
469 // 2) Some un-allocated pods remain
470 // 2.a) All groups have been started
471 // 2.b) Not all groups have been started
472 if len(pods) == 0 {
473 return rtrn
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400474 } else if len(rtrn) == podCt>>1 { // All groupings started
sslobodr16e41bc2019-01-18 16:22:21 -0500475 // Allocate the remaining (presumably empty) pods to the started groups
476 return groupRemainingPods1(rtrn, pods)
477 } else { // Some groupings started
478 // Start empty groups with remaining pods
479 // each grouping is on a different server then
480 // allocate remaining pods.
481 rtrn, pods = startRemainingGroups1(rtrn, pods, podCt)
482 return groupRemainingPods1(rtrn, pods)
483 }
484}
485
sslobodr16e41bc2019-01-18 16:22:21 -0500486func intersect(d1 map[string]struct{}, d2 map[string]struct{}) bool {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400487 for k := range d1 {
488 if _, ok := d2[k]; ok {
sslobodr16e41bc2019-01-18 16:22:21 -0500489 return true
490 }
491 }
492 return false
493}
494
Kent Hagerman92661462019-06-04 18:22:05 -0400495func setConnection(ctx context.Context, client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
sslobodr360c8d72019-02-05 12:47:56 -0500496 log.Debugf("Configuring backend %s : connection %s in cluster %s\n\n",
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400497 backend, connection, cluster)
498 cnf := &pb.Conn{Server: "grpc_command", Cluster: cluster, Backend: backend,
499 Connection: connection, Addr: addr,
500 Port: port}
Kent Hagerman92661462019-06-04 18:22:05 -0400501 if res, err := client.SetConnection(ctx, cnf); err != nil {
sslobodr16e41bc2019-01-18 16:22:21 -0500502 log.Debugf("failed SetConnection RPC call: %s", err)
503 } else {
504 log.Debugf("Result: %v", res)
505 }
506}
507
Kent Hagerman92661462019-06-04 18:22:05 -0400508func setAffinity(ctx context.Context, client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
sslobodr16e41bc2019-01-18 16:22:21 -0500509 log.Debugf("Configuring backend %s : affinities \n", backend)
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400510 aff := &pb.Affinity{Router: afrouterRouterName, Route: "dev_manager", Cluster: afrouterRWClusterName, Backend: backend}
511 for k := range ids {
sslobodr16e41bc2019-01-18 16:22:21 -0500512 log.Debugf("Setting affinity for id %s", k)
513 aff.Id = k
Kent Hagerman92661462019-06-04 18:22:05 -0400514 if res, err := client.SetAffinity(ctx, aff); err != nil {
sslobodr16e41bc2019-01-18 16:22:21 -0500515 log.Debugf("failed affinity RPC call: %s", err)
516 } else {
517 log.Debugf("Result: %v", res)
518 }
519 }
520}
521
sslobodr8e2ccb52019-02-05 09:21:47 -0500522func getBackendForCore(coreId string, coreGroups [][]*volthaPod) string {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400523 for _, v := range coreGroups {
524 for _, v2 := range v {
sslobodr38afd0d2019-01-21 12:31:46 -0500525 if v2.name == coreId {
526 return v2.backend
527 }
528 }
529 }
530 log.Errorf("No backend found for core %s\n", coreId)
531 return ""
532}
533
Kent Hagerman92661462019-06-04 18:22:05 -0400534func monitorDiscovery(ctx context.Context,
535 client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400536 ch <-chan *ic.InterContainerMessage,
Kent Hagerman92661462019-06-04 18:22:05 -0400537 coreGroups [][]*volthaPod,
538 doneCh chan<- struct{}) {
539 defer close(doneCh)
540
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400541 var id = make(map[string]struct{})
sslobodr38afd0d2019-01-21 12:31:46 -0500542
sslobodr16e41bc2019-01-18 16:22:21 -0500543 select {
Kent Hagerman92661462019-06-04 18:22:05 -0400544 case <-ctx.Done():
sslobodr16e41bc2019-01-18 16:22:21 -0500545 case msg := <-ch:
546 log.Debugf("Received a device discovery notification")
sslobodr38afd0d2019-01-21 12:31:46 -0500547 device := &ic.DeviceDiscovered{}
548 if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
sslobodr16e41bc2019-01-18 16:22:21 -0500549 log.Errorf("Could not unmarshal received notification %v", msg)
550 } else {
sslobodr38afd0d2019-01-21 12:31:46 -0500551 // Set the affinity of the discovered device.
552 if be := getBackendForCore(device.Id, coreGroups); be != "" {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400553 id[device.Id] = struct{}{}
Kent Hagerman92661462019-06-04 18:22:05 -0400554 setAffinity(ctx, client, id, be)
sslobodr38afd0d2019-01-21 12:31:46 -0500555 } else {
556 log.Error("Cant use an empty string as a backend name")
557 }
sslobodr16e41bc2019-01-18 16:22:21 -0500558 }
559 break
560 }
561}
562
Kent Hagerman92661462019-06-04 18:22:05 -0400563func startDiscoveryMonitor(ctx context.Context,
564 client pb.ConfigurationClient,
565 coreGroups [][]*volthaPod) (<-chan struct{}, error) {
566 doneCh := make(chan struct{})
sslobodr16e41bc2019-01-18 16:22:21 -0500567 var ch <-chan *ic.InterContainerMessage
568 // Connect to kafka for discovery events
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400569 topic := &kafka.Topic{Name: kafkaTopic}
570 kc, err := newKafkaClient(kafkaClientType, kafkaHost, kafkaPort, kafkaInstanceID)
sslobodr16e41bc2019-01-18 16:22:21 -0500571 kc.Start()
Kent Hagerman92661462019-06-04 18:22:05 -0400572 defer kc.Stop()
sslobodr16e41bc2019-01-18 16:22:21 -0500573
574 if ch, err = kc.Subscribe(topic); err != nil {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400575 log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
Kent Hagerman92661462019-06-04 18:22:05 -0400576 close(doneCh)
577 return doneCh, err
sslobodr16e41bc2019-01-18 16:22:21 -0500578 }
Kent Hagerman92661462019-06-04 18:22:05 -0400579
580 go monitorDiscovery(ctx, client, ch, coreGroups, doneCh)
581 return doneCh, nil
sslobodr16e41bc2019-01-18 16:22:21 -0500582}
583
sslobodre7ce71d2019-01-22 16:21:45 -0500584// Determines which items in core groups
585// have changed based on the list provided
586// and returns a coreGroup with only the changed
587// items and a pod list with the new items
sslobodr8e2ccb52019-02-05 09:21:47 -0500588func getAddrDiffs(coreGroups [][]*volthaPod, rwPods []*volthaPod) ([][]*volthaPod, []*volthaPod) {
589 var nList []*volthaPod
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400590 var rtrn = make([][]*volthaPod, numRWPods>>1)
591 var ipAddrs = make(map[string]struct{})
sslobodre7ce71d2019-01-22 16:21:45 -0500592
593 log.Debug("Get addr diffs")
594
595 // Start with an empty array
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400596 for k := range rtrn {
sslobodr8e2ccb52019-02-05 09:21:47 -0500597 rtrn[k] = make([]*volthaPod, 2)
sslobodre7ce71d2019-01-22 16:21:45 -0500598 }
599
600 // Build a list with only the new items
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400601 for _, v := range rwPods {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400602 if !hasIpAddr(coreGroups, v.ipAddr) {
sslobodre7ce71d2019-01-22 16:21:45 -0500603 nList = append(nList, v)
604 }
605 ipAddrs[v.ipAddr] = struct{}{} // for the search below
606 }
607
608 // Now build the coreGroups with only the changed items
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400609 for k1, v1 := range coreGroups {
610 for k2, v2 := range v1 {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400611 if _, ok := ipAddrs[v2.ipAddr]; !ok {
sslobodre7ce71d2019-01-22 16:21:45 -0500612 rtrn[k1][k2] = v2
613 }
614 }
615 }
616 return rtrn, nList
617}
618
619// Figure out where best to put the new pods
620// in the coreGroup array based on the old
621// pods being replaced. The criteria is that
622// the new pod be on the same server as the
623// old pod was.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400624func reconcileAddrDiffs(coreGroupDiffs [][]*volthaPod, rwPodDiffs []*volthaPod) [][]*volthaPod {
sslobodr8e2ccb52019-02-05 09:21:47 -0500625 var srvrs map[string][]*volthaPod = make(map[string][]*volthaPod)
sslobodre7ce71d2019-01-22 16:21:45 -0500626
627 log.Debug("Reconciling diffs")
628 log.Debug("Building server list")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400629 for _, v := range rwPodDiffs {
sslobodre7ce71d2019-01-22 16:21:45 -0500630 log.Debugf("Adding %v to the server list", *v)
631 srvrs[v.node] = append(srvrs[v.node], v)
632 }
633
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400634 for k1, v1 := range coreGroupDiffs {
635 log.Debugf("k1:%v, v1:%v", k1, v1)
636 for k2, v2 := range v1 {
637 log.Debugf("k2:%v, v2:%v", k2, v2)
sslobodre7ce71d2019-01-22 16:21:45 -0500638 if v2 == nil { // Nothing to do here
639 continue
640 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400641 if _, ok := srvrs[v2.node]; ok {
sslobodre7ce71d2019-01-22 16:21:45 -0500642 coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
643 if len(srvrs[v2.node]) > 1 { // remove one entry from the list
644 srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
645 } else { // Delete the endtry from the map
646 delete(srvrs, v2.node)
647 }
648 } else {
649 log.Error("This should never happen, node appears to have changed names")
650 // attempt to limp along by keeping this old entry
651 }
652 }
653 }
654
655 return coreGroupDiffs
656}
657
Kent Hagerman92661462019-06-04 18:22:05 -0400658func applyAddrDiffs(ctx context.Context, client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
sslobodre7ce71d2019-01-22 16:21:45 -0500659 log.Debug("Applying diffs")
sslobodr8e2ccb52019-02-05 09:21:47 -0500660 switch cores := coreList.(type) {
661 case [][]*volthaPod:
Kent Hagerman737b9e52019-06-18 16:29:33 -0400662 newEntries := reconcileAddrDiffs(getAddrDiffs(cores, nPods))
sslobodre7ce71d2019-01-22 16:21:45 -0500663
sslobodr8e2ccb52019-02-05 09:21:47 -0500664 // Now replace the information in coreGropus with the new
665 // entries and then reconcile the device ids on the core
666 // that's in the new entry with the device ids of it's
667 // active-active peer.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400668 for k1, v1 := range cores {
669 for k2, v2 := range v1 {
sslobodr8e2ccb52019-02-05 09:21:47 -0500670 if newEntries[k1][k2] != nil {
671 // TODO: Missing is the case where bothe the primary
672 // and the secondary core crash and come back.
673 // Pull the device ids from the active-active peer
Kent Hagerman737b9e52019-06-18 16:29:33 -0400674 ids := queryPodDeviceIds(ctx, cores[k1][k2^1])
sslobodr8e2ccb52019-02-05 09:21:47 -0500675 if len(ids) != 0 {
Kent Hagerman737b9e52019-06-18 16:29:33 -0400676 if !reconcilePodDeviceIds(ctx, newEntries[k1][k2], ids) {
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400677 log.Errorf("Attempt to reconcile ids on pod %v failed", newEntries[k1][k2])
sslobodr8e2ccb52019-02-05 09:21:47 -0500678 }
sslobodre7ce71d2019-01-22 16:21:45 -0500679 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500680 // Send the affininty router new connection information
Kent Hagerman92661462019-06-04 18:22:05 -0400681 setConnection(ctx, client, afrouterRWClusterName, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, podGrpcPort)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400682 // Copy the new entry information over
sslobodr8e2ccb52019-02-05 09:21:47 -0500683 cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
684 cores[k1][k2].name = newEntries[k1][k2].name
685 cores[k1][k2].devIds = ids
sslobodre7ce71d2019-01-22 16:21:45 -0500686 }
sslobodre7ce71d2019-01-22 16:21:45 -0500687 }
688 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500689 case []*volthaPod:
690 var mia []*volthaPod
691 var found bool
692 // TODO: Break this using functions to simplify
693 // reading of the code.
694 // Find the core(s) that have changed addresses
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400695 for k1, v1 := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500696 found = false
697 for _, v2 := range nPods {
698 if v1.ipAddr == v2.ipAddr {
699 found = true
700 break
701 }
702 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400703 if !found {
sslobodr8e2ccb52019-02-05 09:21:47 -0500704 mia = append(mia, cores[k1])
705 }
706 }
707 // Now plug in the new addresses and set the connection
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400708 for _, v1 := range nPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500709 found = false
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400710 for _, v2 := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500711 if v1.ipAddr == v2.ipAddr {
712 found = true
713 break
714 }
715 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400716 if found {
sslobodr8e2ccb52019-02-05 09:21:47 -0500717 continue
718 }
719 mia[0].ipAddr = v1.ipAddr
720 mia[0].name = v1.name
Kent Hagerman92661462019-06-04 18:22:05 -0400721 setConnection(ctx, client, afrouterROClusterName, mia[0].backend, mia[0].connection, v1.ipAddr, podGrpcPort)
sslobodr8e2ccb52019-02-05 09:21:47 -0500722 // Now get rid of the mia entry just processed
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400723 mia = append(mia[:0], mia[1:]...)
sslobodr8e2ccb52019-02-05 09:21:47 -0500724 }
725 default:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400726 log.Error("Internal: Unexpected type in call to applyAddrDiffs")
sslobodre7ce71d2019-01-22 16:21:45 -0500727 }
728}
729
sslobodr8e2ccb52019-02-05 09:21:47 -0500730func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400731 // Convenience
Kent Hagerman737b9e52019-06-18 16:29:33 -0400732 var byName = make(map[string]*volthaPod)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400733 for _, v := range rwPods {
sslobodrcd37bc52019-01-24 11:47:16 -0500734 byName[v.name] = v
735 }
736
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400737 for k1, v1 := range coreGroups {
Kent Hagerman737b9e52019-06-18 16:29:33 -0400738 for k2, v2 := range v1 {
739 if pod, have := byName[v2.name]; have {
740 coreGroups[k1][k2].devIds = pod.devIds
741 }
sslobodrcd37bc52019-01-24 11:47:16 -0500742 }
743 }
744}
745
Kent Hagerman92661462019-06-04 18:22:05 -0400746func startCoreMonitor(ctx context.Context,
747 client pb.ConfigurationClient,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400748 clientset *kubernetes.Clientset,
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400749 coreGroups [][]*volthaPod,
Kent Hagerman92661462019-06-04 18:22:05 -0400750 oRoPods []*volthaPod) {
sslobodr16e41bc2019-01-18 16:22:21 -0500751 // Now that initial allocation has been completed, monitor the pods
752 // for IP changes
753 // The main loop needs to do the following:
754 // 1) Periodically query the pods and filter out
755 // the vcore ones
756 // 2) Validate that the pods running are the same
757 // as the previous check
758 // 3) Validate that the IP addresses are the same
759 // as the last check.
760 // If the pod name(s) ha(s/ve) changed then remove
761 // the unused pod names and add in the new pod names
762 // maintaining the cluster/backend information.
763 // If an IP address has changed (which shouldn't
764 // happen unless a pod is re-started) it should get
765 // caught by the pod name change.
Kent Hagerman92661462019-06-04 18:22:05 -0400766loop:
sslobodr16e41bc2019-01-18 16:22:21 -0500767 for {
Kent Hagerman92661462019-06-04 18:22:05 -0400768 select {
769 case <-ctx.Done():
770 // if we're done, exit
771 break loop
772 case <-time.After(10 * time.Second): //wait a while
773 }
774
sslobodr16e41bc2019-01-18 16:22:21 -0500775 // Get the rw core list from k8s
Kent Hagerman737b9e52019-06-18 16:29:33 -0400776 rwPods, roPods, err := getVolthaPods(clientset)
Kent Hagerman92661462019-06-04 18:22:05 -0400777 if err != nil {
778 log.Error(err)
779 continue
780 }
781
sslobodr16e41bc2019-01-18 16:22:21 -0500782 // If we didn't get 2n+1 pods then wait since
783 // something is down and will hopefully come
784 // back up at some point.
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400785 if len(rwPods) != numRWPods {
Kent Hagerman737b9e52019-06-18 16:29:33 -0400786 log.Debug("One or more RW pod(s) are offline, will wait and retry")
sslobodr16e41bc2019-01-18 16:22:21 -0500787 continue
788 }
Kent Hagerman737b9e52019-06-18 16:29:33 -0400789
790 queryDeviceIds(ctx, rwPods)
791 updateDeviceIds(coreGroups, rwPods)
792
sslobodr16e41bc2019-01-18 16:22:21 -0500793 // We have all pods, check if any IP addresses
794 // have changed.
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400795 for _, v := range rwPods {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400796 if !hasIpAddr(coreGroups, v.ipAddr) {
sslobodre7ce71d2019-01-22 16:21:45 -0500797 log.Debug("Address has changed...")
Kent Hagerman92661462019-06-04 18:22:05 -0400798 applyAddrDiffs(ctx, client, coreGroups, rwPods)
sslobodr8e2ccb52019-02-05 09:21:47 -0500799 break
sslobodre7ce71d2019-01-22 16:21:45 -0500800 }
sslobodr16e41bc2019-01-18 16:22:21 -0500801 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500802
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400803 if len(roPods) != numROPods {
Kent Hagerman737b9e52019-06-18 16:29:33 -0400804 log.Debug("One or more RO pod(s) are offline, will wait and retry")
sslobodr8e2ccb52019-02-05 09:21:47 -0500805 continue
806 }
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400807 for _, v := range roPods {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400808 if !hasIpAddr(oRoPods, v.ipAddr) {
Kent Hagerman92661462019-06-04 18:22:05 -0400809 applyAddrDiffs(ctx, client, oRoPods, roPods)
sslobodr8e2ccb52019-02-05 09:21:47 -0500810 break
811 }
812 }
sslobodr16e41bc2019-01-18 16:22:21 -0500813 }
sslobodr16e41bc2019-01-18 16:22:21 -0500814}
815
sslobodr8e2ccb52019-02-05 09:21:47 -0500816func hasIpAddr(coreList interface{}, ipAddr string) bool {
817 switch cores := coreList.(type) {
818 case []*volthaPod:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400819 for _, v := range cores {
sslobodr8e2ccb52019-02-05 09:21:47 -0500820 if v.ipAddr == ipAddr {
sslobodre7ce71d2019-01-22 16:21:45 -0500821 return true
822 }
823 }
sslobodr8e2ccb52019-02-05 09:21:47 -0500824 case [][]*volthaPod:
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400825 for _, v1 := range cores {
826 for _, v2 := range v1 {
sslobodr8e2ccb52019-02-05 09:21:47 -0500827 if v2.ipAddr == ipAddr {
828 return true
829 }
830 }
831 }
832 default:
833 log.Error("Internal: Unexpected type in call to hasIpAddr")
sslobodre7ce71d2019-01-22 16:21:45 -0500834 }
835 return false
836}
837
Kent Hagerman92661462019-06-04 18:22:05 -0400838// endOnClose cancels the context when the connection closes
839func connectionActiveContext(conn *grpc.ClientConn) context.Context {
840 ctx, disconnected := context.WithCancel(context.Background())
841 go func() {
842 for state := conn.GetState(); state != connectivity.TransientFailure && state != connectivity.Shutdown; state = conn.GetState() {
843 if !conn.WaitForStateChange(context.Background(), state) {
844 break
845 }
846 }
847 log.Infof("Connection to afrouter lost")
848 disconnected()
849 }()
850 return ctx
851}
sslobodr16e41bc2019-01-18 16:22:21 -0500852
Kent Hagerman92661462019-06-04 18:22:05 -0400853func main() {
David K. Bainbridgef430cd52019-05-28 15:00:35 -0700854 config := &Configuration{}
855 cmdParse := flag.NewFlagSet(path.Base(os.Args[0]), flag.ContinueOnError)
856 config.DisplayVersionOnly = cmdParse.Bool("version", false, "Print version information and exit")
857
Kent Hagerman92661462019-06-04 18:22:05 -0400858 if err := cmdParse.Parse(os.Args[1:]); err != nil {
David K. Bainbridgef430cd52019-05-28 15:00:35 -0700859 fmt.Printf("Error: %v\n", err)
860 os.Exit(1)
861 }
862
863 if *config.DisplayVersionOnly {
864 fmt.Println("VOLTHA API Server (afrouterd)")
865 fmt.Println(version.VersionInfo.String(" "))
866 return
867 }
868
sslobodr16e41bc2019-01-18 16:22:21 -0500869 // Set up logging
870 if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
871 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
872 }
873
874 // Set up kubernetes api
875 clientset := k8sClientSet()
876
Kent Hagerman92661462019-06-04 18:22:05 -0400877 for {
878 // Connect to the affinity router
879 conn, err := connect(context.Background(), afrouterApiAddress) // This is a sidecar container so communicating over localhost
880 if err != nil {
881 panic(err)
882 }
883
884 // monitor the connection status, end context if connection is lost
885 ctx := connectionActiveContext(conn)
886
887 // set up the client
888 client := pb.NewConfigurationClient(conn)
889
890 // determine config & repopulate the afrouter
891 generateAndMaintainConfiguration(ctx, client, clientset)
892
893 conn.Close()
894 }
895}
896
897// generateAndMaintainConfiguration does the pod-reconciliation work,
898// it only returns once all sub-processes have completed
899func generateAndMaintainConfiguration(ctx context.Context, client pb.ConfigurationClient, clientset *kubernetes.Clientset) {
Kent Hagerman737b9e52019-06-18 16:29:33 -0400900 // Get the voltha rw-/ro-core pods
901 var rwPods, roPods []*volthaPod
902 for {
903 var err error
904 if rwPods, roPods, err = getVolthaPods(clientset); err != nil {
905 log.Error(err)
906 return
907 }
Kent Hagerman92661462019-06-04 18:22:05 -0400908
Kent Hagerman737b9e52019-06-18 16:29:33 -0400909 if len(rwPods) == numRWPods && len(roPods) == numROPods {
910 break
911 }
912
913 log.Debug("One or more RW/RO pod(s) are offline, will wait and retry")
914 select {
915 case <-ctx.Done():
916 return
917 case <-time.After(time.Second * 5):
918 // retry
919 }
sslobodr16e41bc2019-01-18 16:22:21 -0500920 }
sslobodr16e41bc2019-01-18 16:22:21 -0500921
922 // Fetch the devices held by each running core
Kent Hagerman737b9e52019-06-18 16:29:33 -0400923 queryDeviceIds(ctx, rwPods)
sslobodr16e41bc2019-01-18 16:22:21 -0500924
925 // For debugging... comment out l8r
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400926 for _, v := range rwPods {
sslobodr16e41bc2019-01-18 16:22:21 -0500927 log.Debugf("Pod list %v", *v)
928 }
929
930 coreGroups := groupPods1(rwPods)
931
sslobodr16e41bc2019-01-18 16:22:21 -0500932 // Assign the groupings to the the backends and connections
Kent Hagerman737b9e52019-06-18 16:29:33 -0400933 for k, coresInGroup := range coreGroups {
934 for k1 := range coresInGroup {
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400935 coreGroups[k][k1].cluster = afrouterRWClusterName
936 coreGroups[k][k1].backend = afrouterRWClusterName + strconv.Itoa(k+1)
937 coreGroups[k][k1].connection = afrouterRWClusterName + strconv.Itoa(k+1) + strconv.Itoa(k1+1)
sslobodr16e41bc2019-01-18 16:22:21 -0500938 }
939 }
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400940 log.Info("Core grouping completed")
sslobodr16e41bc2019-01-18 16:22:21 -0500941
942 // TODO: Debugging code, comment out for production
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400943 for k, v := range coreGroups {
944 for k2, v2 := range v {
sslobodr16e41bc2019-01-18 16:22:21 -0500945 log.Debugf("Core group %d,%d: %v", k, k2, v2)
946 }
947 }
sslobodrcd37bc52019-01-24 11:47:16 -0500948 log.Info("Setting affinities")
sslobodr16e41bc2019-01-18 16:22:21 -0500949 // Now set the affinities for exising devices in the cores
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400950 for _, v := range coreGroups {
Kent Hagerman92661462019-06-04 18:22:05 -0400951 setAffinity(ctx, client, v[0].devIds, v[0].backend)
952 setAffinity(ctx, client, v[1].devIds, v[1].backend)
sslobodr16e41bc2019-01-18 16:22:21 -0500953 }
sslobodrcd37bc52019-01-24 11:47:16 -0500954 log.Info("Setting connections")
sslobodr16e41bc2019-01-18 16:22:21 -0500955 // Configure the backeds based on the calculated core groups
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400956 for _, v := range coreGroups {
Kent Hagerman92661462019-06-04 18:22:05 -0400957 setConnection(ctx, client, afrouterRWClusterName, v[0].backend, v[0].connection, v[0].ipAddr, podGrpcPort)
958 setConnection(ctx, client, afrouterRWClusterName, v[1].backend, v[1].connection, v[1].ipAddr, podGrpcPort)
sslobodr8e2ccb52019-02-05 09:21:47 -0500959 }
960
961 // Process the read only pods
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400962 for k, v := range roPods {
sslobodr8e2ccb52019-02-05 09:21:47 -0500963 log.Debugf("Processing ro_pod %v", v)
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400964 vN := afrouterROClusterName + strconv.Itoa(k+1)
sslobodr8e2ccb52019-02-05 09:21:47 -0500965 log.Debugf("Setting connection %s, %s, %s", vN, vN+"1", v.ipAddr)
Kent Hagerman334a8ce2019-05-16 16:50:33 -0400966 roPods[k].cluster = afrouterROClusterName
sslobodr8e2ccb52019-02-05 09:21:47 -0500967 roPods[k].backend = vN
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400968 roPods[k].connection = vN + "1"
Kent Hagerman92661462019-06-04 18:22:05 -0400969 setConnection(ctx, client, afrouterROClusterName, v.backend, v.connection, v.ipAddr, podGrpcPort)
sslobodr16e41bc2019-01-18 16:22:21 -0500970 }
971
sslobodrcd37bc52019-01-24 11:47:16 -0500972 log.Info("Starting discovery monitoring")
Kent Hagerman92661462019-06-04 18:22:05 -0400973 doneCh, _ := startDiscoveryMonitor(ctx, client, coreGroups)
sslobodr16e41bc2019-01-18 16:22:21 -0500974
sslobodrcd37bc52019-01-24 11:47:16 -0500975 log.Info("Starting core monitoring")
Kent Hagerman737b9e52019-06-18 16:29:33 -0400976 startCoreMonitor(ctx, client, clientset, coreGroups, roPods)
Kent Hagerman92661462019-06-04 18:22:05 -0400977
978 //ensure the discovery monitor to quit
979 <-doneCh
sslobodr16e41bc2019-01-18 16:22:21 -0500980}