blob: 2e3e831f6eddc908797dae1f63a5ab0139654701 [file] [log] [blame]
Scott Bakere702d122019-10-22 11:54:12 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouterd
18
19import (
20 "errors"
21 "github.com/golang/protobuf/ptypes"
Scott Bakerf579f132019-10-24 14:31:41 -070022 "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
23 "github.com/opencord/voltha-lib-go/v2/pkg/log"
divyadesaif117fc22019-11-04 06:32:01 +000024 "github.com/opencord/voltha-lib-go/v2/pkg/probe"
Scott Bakerb6de7a52019-11-04 09:13:37 -080025 pb "github.com/opencord/voltha-protos/v2/go/afrouter"
26 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
Scott Bakere702d122019-10-22 11:54:12 -070027 "golang.org/x/net/context"
28 "regexp"
29 "time"
30)
31
32func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
33 log.Infow("kafka-client-type", log.Fields{"client": clientType})
34 switch clientType {
35 case "sarama":
36 return kafka.NewSaramaClient(
37 kafka.Host(host),
38 kafka.Port(port),
39 kafka.ConsumerType(kafka.GroupCustomer),
40 kafka.ProducerReturnOnErrors(true),
41 kafka.ProducerReturnOnSuccess(true),
42 kafka.ProducerMaxRetries(6),
43 kafka.NumPartitions(3),
44 kafka.ConsumerGroupName(instanceID),
45 kafka.ConsumerGroupPrefix(instanceID),
46 kafka.AutoCreateTopic(false),
47 kafka.ProducerFlushFrequency(5),
48 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
49 }
50 return nil, errors.New("unsupported-client-type")
51}
52
53func monitorDiscovery(kc kafka.Client, ctx context.Context, client pb.ConfigurationClient, ch <-chan *ic.InterContainerMessage, doneCh chan<- struct{}) {
54 defer close(doneCh)
55 defer kc.Stop()
divyadesaif117fc22019-11-04 06:32:01 +000056 defer probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusStopped)
Scott Bakere702d122019-10-22 11:54:12 -070057
58monitorLoop:
59 for {
60 select {
61 case <-ctx.Done():
62 break monitorLoop
63 case msg := <-ch:
64 log.Debug("Received a device discovery notification")
65 device := &ic.DeviceDiscovered{}
66 if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
67 log.Errorf("Could not unmarshal received notification %v", msg)
68 } else {
69 // somewhat hackish solution, backend is known from the first digit found in the publisher name
70 group := regexp.MustCompile(`\d`).FindString(device.Publisher)
71 if group != "" {
72 // set the affinity of the discovered device
73 setAffinity(ctx, client, device.Id, afrouterRWClusterName+group)
74 } else {
75 log.Error("backend is unknown")
76 }
77 }
78 }
79 }
80}
81
82func StartDiscoveryMonitor(ctx context.Context, client pb.ConfigurationClient) (<-chan struct{}, error) {
83 doneCh := make(chan struct{})
84 // Connect to kafka for discovery events
85 kc, err := newKafkaClient(kafkaClientType, kafkaHost, kafkaPort, kafkaInstanceID)
86 if err != nil {
87 panic(err)
88 }
89
divyadesaif117fc22019-11-04 06:32:01 +000090 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
Scott Bakere702d122019-10-22 11:54:12 -070091 for {
92 if err := kc.Start(); err != nil {
93 log.Error("Could not connect to kafka")
94 } else {
divyadesaif117fc22019-11-04 06:32:01 +000095 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
Scott Bakere702d122019-10-22 11:54:12 -070096 break
97 }
98 select {
99 case <-ctx.Done():
100 close(doneCh)
divyadesaif117fc22019-11-04 06:32:01 +0000101 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusStopped)
Scott Bakere702d122019-10-22 11:54:12 -0700102 return doneCh, errors.New("GRPC context done")
103
104 case <-time.After(5 * time.Second):
105 }
106 }
107 ch, err := kc.Subscribe(&kafka.Topic{Name: kafkaTopic})
108 if err != nil {
109 log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
110 close(doneCh)
111 kc.Stop()
divyadesaif117fc22019-11-04 06:32:01 +0000112 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusStopped)
Scott Bakere702d122019-10-22 11:54:12 -0700113 return doneCh, err
114 }
115
116 go monitorDiscovery(kc, ctx, client, ch, doneCh)
117 return doneCh, nil
118}