blob: b1f8da44678386d19570c60246168fb5cc7fc4cb [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001/*
Joey Armstrong9cdee9f2024-01-03 04:56:14 -05002* Copyright 2019-2024 Open Networking Foundation (ONF) and the ONF Contributors
khenaidoo59ce9dd2019-11-11 13:05:32 -05003
Joey Armstrong7f8436c2023-07-09 20:23:27 -04004* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
khenaidoo59ce9dd2019-11-11 13:05:32 -05007
Joey Armstrong7f8436c2023-07-09 20:23:27 -04008* http://www.apache.org/licenses/LICENSE-2.0
khenaidoo59ce9dd2019-11-11 13:05:32 -05009
Joey Armstrong7f8436c2023-07-09 20:23:27 -040010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
khenaidoo59ce9dd2019-11-11 13:05:32 -050015 */
khenaidoob6238b32020-04-07 12:07:36 -040016package kafka
khenaidoo59ce9dd2019-11-11 13:05:32 -050017
18import (
Neha Sharma94f16a92020-06-26 04:17:55 +000019 "context"
khenaidoo59ce9dd2019-11-11 13:05:32 -050020 "fmt"
serkant.uluderyab38671c2019-11-01 09:35:38 -070021 "sync"
Scott Baker84a55ce2020-04-17 10:11:30 -070022 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070023
Himani Chawla6c38e572021-03-23 19:44:25 +053024 "github.com/golang/protobuf/proto"
khenaidoo26721882021-08-11 17:42:52 -040025 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
26 "github.com/opencord/voltha-lib-go/v7/pkg/log"
khenaidoo59ce9dd2019-11-11 13:05:32 -050027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
khenaidoo59ce9dd2019-11-11 13:05:32 -050029)
30
khenaidoo26721882021-08-11 17:42:52 -040031const (
32 maxConcurrentMessage = 100
33)
34
Kent Hagermanccfa2132019-12-17 13:29:34 -050035// static check to ensure KafkaClient implements kafka.Client
36var _ kafka.Client = &KafkaClient{}
37
khenaidoo59ce9dd2019-11-11 13:05:32 -050038type KafkaClient struct {
khenaidoo26721882021-08-11 17:42:52 -040039 topicsChannelMap map[string][]chan proto.Message
khenaidoo59ce9dd2019-11-11 13:05:32 -050040 lock sync.RWMutex
khenaidoo26721882021-08-11 17:42:52 -040041 alive bool
42 livenessMutex sync.Mutex
43 liveness chan bool
khenaidoo59ce9dd2019-11-11 13:05:32 -050044}
45
46func NewKafkaClient() *KafkaClient {
47 return &KafkaClient{
khenaidoo26721882021-08-11 17:42:52 -040048 topicsChannelMap: make(map[string][]chan proto.Message),
khenaidoo59ce9dd2019-11-11 13:05:32 -050049 lock: sync.RWMutex{},
50 }
51}
52
Neha Sharma94f16a92020-06-26 04:17:55 +000053func (kc *KafkaClient) Start(ctx context.Context) error {
54 logger.Debug(ctx, "kafka-client-started")
khenaidoo59ce9dd2019-11-11 13:05:32 -050055 return nil
56}
57
Neha Sharma94f16a92020-06-26 04:17:55 +000058func (kc *KafkaClient) Stop(ctx context.Context) {
khenaidoo59ce9dd2019-11-11 13:05:32 -050059 kc.lock.Lock()
60 defer kc.lock.Unlock()
61 for topic, chnls := range kc.topicsChannelMap {
62 for _, c := range chnls {
63 close(c)
64 }
65 delete(kc.topicsChannelMap, topic)
66 }
Neha Sharma94f16a92020-06-26 04:17:55 +000067 logger.Debug(ctx, "kafka-client-stopped")
khenaidoo59ce9dd2019-11-11 13:05:32 -050068}
69
Neha Sharma94f16a92020-06-26 04:17:55 +000070func (kc *KafkaClient) CreateTopic(ctx context.Context, topic *kafka.Topic, numPartition int, repFactor int) error {
71 logger.Debugw(ctx, "CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidoo59ce9dd2019-11-11 13:05:32 -050072 kc.lock.Lock()
73 defer kc.lock.Unlock()
74 if _, ok := kc.topicsChannelMap[topic.Name]; ok {
75 return fmt.Errorf("Topic %s already exist", topic.Name)
76 }
khenaidoo26721882021-08-11 17:42:52 -040077 ch := make(chan proto.Message)
khenaidoo59ce9dd2019-11-11 13:05:32 -050078 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
79 return nil
80}
81
Neha Sharma94f16a92020-06-26 04:17:55 +000082func (kc *KafkaClient) DeleteTopic(ctx context.Context, topic *kafka.Topic) error {
83 logger.Debugw(ctx, "DeleteTopic", log.Fields{"topic": topic.Name})
khenaidoo59ce9dd2019-11-11 13:05:32 -050084 kc.lock.Lock()
85 defer kc.lock.Unlock()
86 delete(kc.topicsChannelMap, topic.Name)
87 return nil
88}
89
khenaidoo26721882021-08-11 17:42:52 -040090func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan proto.Message, error) {
Neha Sharma94f16a92020-06-26 04:17:55 +000091 logger.Debugw(ctx, "Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
khenaidoo59ce9dd2019-11-11 13:05:32 -050092 kc.lock.Lock()
93 defer kc.lock.Unlock()
khenaidoo26721882021-08-11 17:42:52 -040094 ch := make(chan proto.Message, maxConcurrentMessage)
khenaidoo59ce9dd2019-11-11 13:05:32 -050095 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
96 return ch, nil
97}
98
khenaidoo26721882021-08-11 17:42:52 -040099func removeChannel(s []chan proto.Message, i int) []chan proto.Message {
khenaidoo59ce9dd2019-11-11 13:05:32 -0500100 s[i] = s[len(s)-1]
101 return s[:len(s)-1]
102}
103
khenaidoo26721882021-08-11 17:42:52 -0400104func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan proto.Message) error {
Neha Sharma94f16a92020-06-26 04:17:55 +0000105 logger.Debugw(ctx, "UnSubscribe", log.Fields{"topic": topic.Name})
khenaidoo59ce9dd2019-11-11 13:05:32 -0500106 kc.lock.Lock()
107 defer kc.lock.Unlock()
108 if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
109 idx := -1
110 for i, c := range chnls {
111 if c == ch {
112 close(c)
113 idx = i
114 }
115 }
116 if idx >= 0 {
117 kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
118 }
119 }
120 return nil
121}
122
Neha Sharma94f16a92020-06-26 04:17:55 +0000123func (kc *KafkaClient) SubscribeForMetadata(ctx context.Context, _ func(fromTopic string, timestamp time.Time)) {
124 logger.Debug(ctx, "SubscribeForMetadata - unimplemented")
Kent Hagermanccfa2132019-12-17 13:29:34 -0500125}
126
Neha Sharma94f16a92020-06-26 04:17:55 +0000127func (kc *KafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
Himani Chawla6c38e572021-03-23 19:44:25 +0530128 // Assert message is a proto message
khenaidoo26721882021-08-11 17:42:52 -0400129 protoMsg, ok := msg.(proto.Message)
130 if !ok {
Himani Chawla6c38e572021-03-23 19:44:25 +0530131 logger.Warnw(ctx, "message-not-a-proto-message", log.Fields{"msg": msg})
132 return status.Error(codes.InvalidArgument, "msg-not-a-proto-msg")
133 }
khenaidoo59ce9dd2019-11-11 13:05:32 -0500134 kc.lock.RLock()
135 defer kc.lock.RUnlock()
136 for _, ch := range kc.topicsChannelMap[topic.Name] {
khenaidoo26721882021-08-11 17:42:52 -0400137 select {
138 case ch <- protoMsg:
139 logger.Debugw(ctx, "publishing", log.Fields{"toTopic": topic.Name, "msg": protoMsg})
140 default:
141 logger.Debugw(ctx, "ignoring-event-channel-busy", log.Fields{"toTopic": topic.Name, "msg": protoMsg})
142 }
khenaidoo59ce9dd2019-11-11 13:05:32 -0500143 }
144 return nil
145}
146
Neha Sharma94f16a92020-06-26 04:17:55 +0000147func (kc *KafkaClient) SendLiveness(ctx context.Context) error {
khenaidoo26721882021-08-11 17:42:52 -0400148 kc.livenessMutex.Lock()
149 defer kc.livenessMutex.Unlock()
150 if kc.liveness != nil {
151 kc.liveness <- true // I am a mock
152 }
153 return nil
khenaidoo59ce9dd2019-11-11 13:05:32 -0500154}
155
Neha Sharma94f16a92020-06-26 04:17:55 +0000156func (kc *KafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
khenaidoo26721882021-08-11 17:42:52 -0400157 logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
158 if enable {
159 kc.livenessMutex.Lock()
160 defer kc.livenessMutex.Unlock()
161 if kc.liveness == nil {
162 logger.Info(ctx, "kafka-create-liveness-channel")
163 kc.liveness = make(chan bool, 10)
164 // post intial state to the channel
165 kc.liveness <- kc.alive
166 }
167 } else {
168 panic("Turning off liveness reporting is not supported")
169 }
170 return kc.liveness
khenaidoo59ce9dd2019-11-11 13:05:32 -0500171}
Scott Baker0fef6982019-12-12 09:49:42 -0800172
Neha Sharma94f16a92020-06-26 04:17:55 +0000173func (kc *KafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
174 logger.Debug(ctx, "EnableHealthinessChannel - unimplemented")
Scott Baker0fef6982019-12-12 09:49:42 -0800175 return nil
176}
kesavandd85e52b2022-03-15 16:38:08 +0530177
178func (kc *KafkaClient) ListTopics(ctx context.Context) ([]string, error) {
179 topics := []string{"voltha.events", "myTopic"}
180 return topics, nil
181}