blob: 2fab6ed40d89a8dba669bbcfa76711f10c841a4c [file] [log] [blame]
Scott Bakere702d122019-10-22 11:54:12 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouterd
18
19import (
20 "errors"
21 "github.com/golang/protobuf/ptypes"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
24 pb "github.com/opencord/voltha-protos/go/afrouter"
25 ic "github.com/opencord/voltha-protos/go/inter_container"
26 "golang.org/x/net/context"
27 "regexp"
28 "time"
29)
30
31func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
32 log.Infow("kafka-client-type", log.Fields{"client": clientType})
33 switch clientType {
34 case "sarama":
35 return kafka.NewSaramaClient(
36 kafka.Host(host),
37 kafka.Port(port),
38 kafka.ConsumerType(kafka.GroupCustomer),
39 kafka.ProducerReturnOnErrors(true),
40 kafka.ProducerReturnOnSuccess(true),
41 kafka.ProducerMaxRetries(6),
42 kafka.NumPartitions(3),
43 kafka.ConsumerGroupName(instanceID),
44 kafka.ConsumerGroupPrefix(instanceID),
45 kafka.AutoCreateTopic(false),
46 kafka.ProducerFlushFrequency(5),
47 kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
48 }
49 return nil, errors.New("unsupported-client-type")
50}
51
52func monitorDiscovery(kc kafka.Client, ctx context.Context, client pb.ConfigurationClient, ch <-chan *ic.InterContainerMessage, doneCh chan<- struct{}) {
53 defer close(doneCh)
54 defer kc.Stop()
55
56monitorLoop:
57 for {
58 select {
59 case <-ctx.Done():
60 break monitorLoop
61 case msg := <-ch:
62 log.Debug("Received a device discovery notification")
63 device := &ic.DeviceDiscovered{}
64 if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
65 log.Errorf("Could not unmarshal received notification %v", msg)
66 } else {
67 // somewhat hackish solution, backend is known from the first digit found in the publisher name
68 group := regexp.MustCompile(`\d`).FindString(device.Publisher)
69 if group != "" {
70 // set the affinity of the discovered device
71 setAffinity(ctx, client, device.Id, afrouterRWClusterName+group)
72 } else {
73 log.Error("backend is unknown")
74 }
75 }
76 }
77 }
78}
79
80func StartDiscoveryMonitor(ctx context.Context, client pb.ConfigurationClient) (<-chan struct{}, error) {
81 doneCh := make(chan struct{})
82 // Connect to kafka for discovery events
83 kc, err := newKafkaClient(kafkaClientType, kafkaHost, kafkaPort, kafkaInstanceID)
84 if err != nil {
85 panic(err)
86 }
87
88 for {
89 if err := kc.Start(); err != nil {
90 log.Error("Could not connect to kafka")
91 } else {
92 break
93 }
94 select {
95 case <-ctx.Done():
96 close(doneCh)
97 return doneCh, errors.New("GRPC context done")
98
99 case <-time.After(5 * time.Second):
100 }
101 }
102 ch, err := kc.Subscribe(&kafka.Topic{Name: kafkaTopic})
103 if err != nil {
104 log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
105 close(doneCh)
106 kc.Stop()
107 return doneCh, err
108 }
109
110 go monitorDiscovery(kc, ctx, client, ch, doneCh)
111 return doneCh, nil
112}