blob: 935089556e80aa26fcb08f8fe0cb13e40400e6c4 [file] [log] [blame]
Elia Battiston4750d3c2022-07-14 13:24:56 +00001/*
2* Copyright 2022-present Open Networking Foundation
3
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7
8* http://www.apache.org/licenses/LICENSE-2.0
9
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15 */
16
17package clients
18
19import (
20 "context"
21 "fmt"
22 "time"
23
24 "github.com/Shopify/sarama"
25 "github.com/golang/protobuf/proto"
26 "github.com/opencord/voltha-lib-go/v7/pkg/log"
27 "github.com/opencord/voltha-protos/v5/go/voltha"
28)
29
30const (
31 volthaEventsTopic = "voltha.events"
32 kafkaBackoffInterval = time.Second * 10
33)
34
35//Used to listen for events coming from VOLTHA
36type KafkaConsumer struct {
37 address string
38 config *sarama.Config
39 client sarama.Client
40 consumer sarama.Consumer
41 partitionConsumer sarama.PartitionConsumer
42 highwater int64
43}
44
45//Creates a sarama client with the specified address
46func NewKafkaConsumer(clusterAddress string) *KafkaConsumer {
47 c := KafkaConsumer{address: clusterAddress}
48 c.config = sarama.NewConfig()
49 c.config.ClientID = "bbf-adapter-consumer"
50 c.config.Consumer.Return.Errors = true
51 c.config.Consumer.Offsets.Initial = sarama.OffsetNewest
52 c.config.Version = sarama.V1_0_0_0
53
54 return &c
55}
56
57//Starts consuming new messages on the voltha events topic, executing the provided callback on each event
58func (c *KafkaConsumer) Start(ctx context.Context, eventCallback func(context.Context, *voltha.Event)) error {
59 var err error
60
61 for {
62 if c.client, err = sarama.NewClient([]string{c.address}, c.config); err == nil {
63 logger.Debug(ctx, "kafka-client-created")
64 break
65 } else {
66 logger.Warnw(ctx, "kafka-not-reachable", log.Fields{
67 "err": err,
68 })
69 }
70
71 //Wait a bit before trying again
72 select {
73 case <-ctx.Done():
74 return fmt.Errorf("kafka-client-creation-stopped-due-to-context-done")
75 case <-time.After(kafkaBackoffInterval):
76 continue
77 }
78 }
79
80 c.consumer, err = sarama.NewConsumerFromClient(c.client)
81 if err != nil {
82 return err
83 }
84
85 partitions, _ := c.consumer.Partitions(volthaEventsTopic)
86
87 // TODO: Add support for multiple partitions
88 if len(partitions) > 1 {
89 logger.Warnw(ctx, "only-listening-one-partition", log.Fields{
90 "topic": volthaEventsTopic,
91 "partitionsNum": len(partitions),
92 })
93 }
94
95 hw, err := c.client.GetOffset(volthaEventsTopic, partitions[0], sarama.OffsetNewest)
96 if err != nil {
97 return fmt.Errorf("cannot-get-highwater: %v", err)
98 }
99 c.highwater = hw
100
101 c.partitionConsumer, err = c.consumer.ConsumePartition(volthaEventsTopic, partitions[0], sarama.OffsetOldest)
102 if nil != err {
103 return fmt.Errorf("Error in consume(): Topic %v Partitions: %v", volthaEventsTopic, partitions)
104 }
105
106 //Start consuming the event topic in a goroutine
107 logger.Debugw(ctx, "start-consuming-kafka-topic", log.Fields{"topic": volthaEventsTopic})
108 go func(topic string, pConsumer sarama.PartitionConsumer) {
109 for {
110 select {
111 case <-ctx.Done():
112 logger.Info(ctx, "stopped-listening-for-events-due-to-context-done")
113 return
114 case err := <-pConsumer.Errors():
115 logger.Errorw(ctx, "kafka-consumer-error", log.Fields{
116 "err": err.Error(),
117 "topic": err.Topic,
118 "partition": err.Partition,
119 })
120 case msg := <-pConsumer.Messages():
121 if msg.Offset <= c.highwater {
122 continue
123 }
124
125 //Unmarshal the content of the message to a voltha Event protobuf message
126 event := &voltha.Event{}
127 if err := proto.Unmarshal(msg.Value, event); err != nil {
128 logger.Errorw(ctx, "error-unmarshalling-kafka-event", log.Fields{"err": err})
129 continue
130 }
131
132 eventCallback(ctx, event)
133 }
134 }
135 }(volthaEventsTopic, c.partitionConsumer)
136
137 return nil
138}
139
140//Closes the sarama client and all consumers
141func (c *KafkaConsumer) Stop() error {
142 if err := c.partitionConsumer.Close(); err != nil {
143 return err
144 }
145
146 if err := c.consumer.Close(); err != nil {
147 return err
148 }
149
150 if err := c.client.Close(); err != nil {
151 return err
152 }
153
154 return nil
155}