blob: 6d2bd1c7f1b4e7c4823e1b4c005d0adea193e13a [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001/*
Joey Armstrong9cdee9f2024-01-03 04:56:14 -05002* Copyright 2019-2024 Open Networking Foundation (ONF) and the ONF Contributors
khenaidoo59ce9dd2019-11-11 13:05:32 -05003
Joey Armstrong7f8436c2023-07-09 20:23:27 -04004* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
khenaidoo59ce9dd2019-11-11 13:05:32 -05007
Joey Armstrong7f8436c2023-07-09 20:23:27 -04008* http://www.apache.org/licenses/LICENSE-2.0
khenaidoo59ce9dd2019-11-11 13:05:32 -05009
Joey Armstrong7f8436c2023-07-09 20:23:27 -040010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
khenaidoo59ce9dd2019-11-11 13:05:32 -050015 */
Akash Reddy Kankanala05aff182025-05-06 12:57:32 +053016//nolint:staticcheck
khenaidoob6238b32020-04-07 12:07:36 -040017package kafka
khenaidoo59ce9dd2019-11-11 13:05:32 -050018
19import (
Neha Sharma94f16a92020-06-26 04:17:55 +000020 "context"
khenaidoo59ce9dd2019-11-11 13:05:32 -050021 "fmt"
serkant.uluderyab38671c2019-11-01 09:35:38 -070022 "sync"
Scott Baker84a55ce2020-04-17 10:11:30 -070023 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070024
Himani Chawla6c38e572021-03-23 19:44:25 +053025 "github.com/golang/protobuf/proto"
khenaidoo26721882021-08-11 17:42:52 -040026 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
27 "github.com/opencord/voltha-lib-go/v7/pkg/log"
khenaidoo59ce9dd2019-11-11 13:05:32 -050028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
khenaidoo59ce9dd2019-11-11 13:05:32 -050030)
31
khenaidoo26721882021-08-11 17:42:52 -040032const (
33 maxConcurrentMessage = 100
34)
35
Kent Hagermanccfa2132019-12-17 13:29:34 -050036// static check to ensure KafkaClient implements kafka.Client
37var _ kafka.Client = &KafkaClient{}
38
khenaidoo59ce9dd2019-11-11 13:05:32 -050039type KafkaClient struct {
khenaidoo26721882021-08-11 17:42:52 -040040 topicsChannelMap map[string][]chan proto.Message
khenaidoo59ce9dd2019-11-11 13:05:32 -050041 lock sync.RWMutex
khenaidoo26721882021-08-11 17:42:52 -040042 alive bool
43 livenessMutex sync.Mutex
44 liveness chan bool
khenaidoo59ce9dd2019-11-11 13:05:32 -050045}
46
47func NewKafkaClient() *KafkaClient {
48 return &KafkaClient{
khenaidoo26721882021-08-11 17:42:52 -040049 topicsChannelMap: make(map[string][]chan proto.Message),
khenaidoo59ce9dd2019-11-11 13:05:32 -050050 lock: sync.RWMutex{},
51 }
52}
53
Neha Sharma94f16a92020-06-26 04:17:55 +000054func (kc *KafkaClient) Start(ctx context.Context) error {
55 logger.Debug(ctx, "kafka-client-started")
khenaidoo59ce9dd2019-11-11 13:05:32 -050056 return nil
57}
58
Neha Sharma94f16a92020-06-26 04:17:55 +000059func (kc *KafkaClient) Stop(ctx context.Context) {
khenaidoo59ce9dd2019-11-11 13:05:32 -050060 kc.lock.Lock()
61 defer kc.lock.Unlock()
62 for topic, chnls := range kc.topicsChannelMap {
63 for _, c := range chnls {
64 close(c)
65 }
66 delete(kc.topicsChannelMap, topic)
67 }
Neha Sharma94f16a92020-06-26 04:17:55 +000068 logger.Debug(ctx, "kafka-client-stopped")
khenaidoo59ce9dd2019-11-11 13:05:32 -050069}
70
Neha Sharma94f16a92020-06-26 04:17:55 +000071func (kc *KafkaClient) CreateTopic(ctx context.Context, topic *kafka.Topic, numPartition int, repFactor int) error {
72 logger.Debugw(ctx, "CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidoo59ce9dd2019-11-11 13:05:32 -050073 kc.lock.Lock()
74 defer kc.lock.Unlock()
75 if _, ok := kc.topicsChannelMap[topic.Name]; ok {
76 return fmt.Errorf("Topic %s already exist", topic.Name)
77 }
khenaidoo26721882021-08-11 17:42:52 -040078 ch := make(chan proto.Message)
khenaidoo59ce9dd2019-11-11 13:05:32 -050079 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
80 return nil
81}
82
Neha Sharma94f16a92020-06-26 04:17:55 +000083func (kc *KafkaClient) DeleteTopic(ctx context.Context, topic *kafka.Topic) error {
84 logger.Debugw(ctx, "DeleteTopic", log.Fields{"topic": topic.Name})
khenaidoo59ce9dd2019-11-11 13:05:32 -050085 kc.lock.Lock()
86 defer kc.lock.Unlock()
87 delete(kc.topicsChannelMap, topic.Name)
88 return nil
89}
90
khenaidoo26721882021-08-11 17:42:52 -040091func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan proto.Message, error) {
Neha Sharma94f16a92020-06-26 04:17:55 +000092 logger.Debugw(ctx, "Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
khenaidoo59ce9dd2019-11-11 13:05:32 -050093 kc.lock.Lock()
94 defer kc.lock.Unlock()
khenaidoo26721882021-08-11 17:42:52 -040095 ch := make(chan proto.Message, maxConcurrentMessage)
khenaidoo59ce9dd2019-11-11 13:05:32 -050096 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
97 return ch, nil
98}
99
khenaidoo26721882021-08-11 17:42:52 -0400100func removeChannel(s []chan proto.Message, i int) []chan proto.Message {
khenaidoo59ce9dd2019-11-11 13:05:32 -0500101 s[i] = s[len(s)-1]
102 return s[:len(s)-1]
103}
104
khenaidoo26721882021-08-11 17:42:52 -0400105func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan proto.Message) error {
Neha Sharma94f16a92020-06-26 04:17:55 +0000106 logger.Debugw(ctx, "UnSubscribe", log.Fields{"topic": topic.Name})
khenaidoo59ce9dd2019-11-11 13:05:32 -0500107 kc.lock.Lock()
108 defer kc.lock.Unlock()
109 if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
110 idx := -1
111 for i, c := range chnls {
112 if c == ch {
113 close(c)
114 idx = i
115 }
116 }
117 if idx >= 0 {
118 kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
119 }
120 }
121 return nil
122}
123
Neha Sharma94f16a92020-06-26 04:17:55 +0000124func (kc *KafkaClient) SubscribeForMetadata(ctx context.Context, _ func(fromTopic string, timestamp time.Time)) {
125 logger.Debug(ctx, "SubscribeForMetadata - unimplemented")
Kent Hagermanccfa2132019-12-17 13:29:34 -0500126}
127
Neha Sharma94f16a92020-06-26 04:17:55 +0000128func (kc *KafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
Himani Chawla6c38e572021-03-23 19:44:25 +0530129 // Assert message is a proto message
khenaidoo26721882021-08-11 17:42:52 -0400130 protoMsg, ok := msg.(proto.Message)
131 if !ok {
Himani Chawla6c38e572021-03-23 19:44:25 +0530132 logger.Warnw(ctx, "message-not-a-proto-message", log.Fields{"msg": msg})
133 return status.Error(codes.InvalidArgument, "msg-not-a-proto-msg")
134 }
khenaidoo59ce9dd2019-11-11 13:05:32 -0500135 kc.lock.RLock()
136 defer kc.lock.RUnlock()
137 for _, ch := range kc.topicsChannelMap[topic.Name] {
khenaidoo26721882021-08-11 17:42:52 -0400138 select {
139 case ch <- protoMsg:
140 logger.Debugw(ctx, "publishing", log.Fields{"toTopic": topic.Name, "msg": protoMsg})
141 default:
142 logger.Debugw(ctx, "ignoring-event-channel-busy", log.Fields{"toTopic": topic.Name, "msg": protoMsg})
143 }
khenaidoo59ce9dd2019-11-11 13:05:32 -0500144 }
145 return nil
146}
147
Neha Sharma94f16a92020-06-26 04:17:55 +0000148func (kc *KafkaClient) SendLiveness(ctx context.Context) error {
khenaidoo26721882021-08-11 17:42:52 -0400149 kc.livenessMutex.Lock()
150 defer kc.livenessMutex.Unlock()
151 if kc.liveness != nil {
152 kc.liveness <- true // I am a mock
153 }
154 return nil
khenaidoo59ce9dd2019-11-11 13:05:32 -0500155}
156
Neha Sharma94f16a92020-06-26 04:17:55 +0000157func (kc *KafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
khenaidoo26721882021-08-11 17:42:52 -0400158 logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
159 if enable {
160 kc.livenessMutex.Lock()
161 defer kc.livenessMutex.Unlock()
162 if kc.liveness == nil {
163 logger.Info(ctx, "kafka-create-liveness-channel")
164 kc.liveness = make(chan bool, 10)
165 // post intial state to the channel
166 kc.liveness <- kc.alive
167 }
168 } else {
169 panic("Turning off liveness reporting is not supported")
170 }
171 return kc.liveness
khenaidoo59ce9dd2019-11-11 13:05:32 -0500172}
Scott Baker0fef6982019-12-12 09:49:42 -0800173
Neha Sharma94f16a92020-06-26 04:17:55 +0000174func (kc *KafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
175 logger.Debug(ctx, "EnableHealthinessChannel - unimplemented")
Scott Baker0fef6982019-12-12 09:49:42 -0800176 return nil
177}
kesavandd85e52b2022-03-15 16:38:08 +0530178
179func (kc *KafkaClient) ListTopics(ctx context.Context) ([]string, error) {
180 topics := []string{"voltha.events", "myTopic"}
181 return topics, nil
182}