blob: 9d6f50c7990d1f9f716444e463e7093ae28c5b06 [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidoob6238b32020-04-07 12:07:36 -040016package kafka
khenaidoo59ce9dd2019-11-11 13:05:32 -050017
18import (
Neha Sharma94f16a92020-06-26 04:17:55 +000019 "context"
khenaidoo59ce9dd2019-11-11 13:05:32 -050020 "fmt"
serkant.uluderyab38671c2019-11-01 09:35:38 -070021 "sync"
Scott Baker84a55ce2020-04-17 10:11:30 -070022 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070023
24 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
25 "github.com/opencord/voltha-lib-go/v3/pkg/log"
26 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
khenaidoo59ce9dd2019-11-11 13:05:32 -050027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
khenaidoo59ce9dd2019-11-11 13:05:32 -050029)
30
Kent Hagermanccfa2132019-12-17 13:29:34 -050031// static check to ensure KafkaClient implements kafka.Client
32var _ kafka.Client = &KafkaClient{}
33
khenaidoo59ce9dd2019-11-11 13:05:32 -050034type KafkaClient struct {
35 topicsChannelMap map[string][]chan *ic.InterContainerMessage
36 lock sync.RWMutex
37}
38
39func NewKafkaClient() *KafkaClient {
40 return &KafkaClient{
41 topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
42 lock: sync.RWMutex{},
43 }
44}
45
Neha Sharma94f16a92020-06-26 04:17:55 +000046func (kc *KafkaClient) Start(ctx context.Context) error {
47 logger.Debug(ctx, "kafka-client-started")
khenaidoo59ce9dd2019-11-11 13:05:32 -050048 return nil
49}
50
Neha Sharma94f16a92020-06-26 04:17:55 +000051func (kc *KafkaClient) Stop(ctx context.Context) {
khenaidoo59ce9dd2019-11-11 13:05:32 -050052 kc.lock.Lock()
53 defer kc.lock.Unlock()
54 for topic, chnls := range kc.topicsChannelMap {
55 for _, c := range chnls {
56 close(c)
57 }
58 delete(kc.topicsChannelMap, topic)
59 }
Neha Sharma94f16a92020-06-26 04:17:55 +000060 logger.Debug(ctx, "kafka-client-stopped")
khenaidoo59ce9dd2019-11-11 13:05:32 -050061}
62
Neha Sharma94f16a92020-06-26 04:17:55 +000063func (kc *KafkaClient) CreateTopic(ctx context.Context, topic *kafka.Topic, numPartition int, repFactor int) error {
64 logger.Debugw(ctx, "CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidoo59ce9dd2019-11-11 13:05:32 -050065 kc.lock.Lock()
66 defer kc.lock.Unlock()
67 if _, ok := kc.topicsChannelMap[topic.Name]; ok {
68 return fmt.Errorf("Topic %s already exist", topic.Name)
69 }
70 ch := make(chan *ic.InterContainerMessage)
71 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
72 return nil
73}
74
Neha Sharma94f16a92020-06-26 04:17:55 +000075func (kc *KafkaClient) DeleteTopic(ctx context.Context, topic *kafka.Topic) error {
76 logger.Debugw(ctx, "DeleteTopic", log.Fields{"topic": topic.Name})
khenaidoo59ce9dd2019-11-11 13:05:32 -050077 kc.lock.Lock()
78 defer kc.lock.Unlock()
79 delete(kc.topicsChannelMap, topic.Name)
80 return nil
81}
82
Neha Sharma94f16a92020-06-26 04:17:55 +000083func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
84 logger.Debugw(ctx, "Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
khenaidoo59ce9dd2019-11-11 13:05:32 -050085 kc.lock.Lock()
86 defer kc.lock.Unlock()
87 ch := make(chan *ic.InterContainerMessage)
88 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
89 return ch, nil
90}
91
92func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
93 s[i] = s[len(s)-1]
94 return s[:len(s)-1]
95}
96
Neha Sharma94f16a92020-06-26 04:17:55 +000097func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
98 logger.Debugw(ctx, "UnSubscribe", log.Fields{"topic": topic.Name})
khenaidoo59ce9dd2019-11-11 13:05:32 -050099 kc.lock.Lock()
100 defer kc.lock.Unlock()
101 if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
102 idx := -1
103 for i, c := range chnls {
104 if c == ch {
105 close(c)
106 idx = i
107 }
108 }
109 if idx >= 0 {
110 kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
111 }
112 }
113 return nil
114}
115
Neha Sharma94f16a92020-06-26 04:17:55 +0000116func (kc *KafkaClient) SubscribeForMetadata(ctx context.Context, _ func(fromTopic string, timestamp time.Time)) {
117 logger.Debug(ctx, "SubscribeForMetadata - unimplemented")
Kent Hagermanccfa2132019-12-17 13:29:34 -0500118}
119
Neha Sharma94f16a92020-06-26 04:17:55 +0000120func (kc *KafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
khenaidoo59ce9dd2019-11-11 13:05:32 -0500121 req, ok := msg.(*ic.InterContainerMessage)
122 if !ok {
123 return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
124 }
125 if req == nil {
126 return status.Error(codes.InvalidArgument, "msg-nil")
127 }
128 kc.lock.RLock()
129 defer kc.lock.RUnlock()
130 for _, ch := range kc.topicsChannelMap[topic.Name] {
Neha Sharma94f16a92020-06-26 04:17:55 +0000131 logger.Debugw(ctx, "Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
khenaidoo59ce9dd2019-11-11 13:05:32 -0500132 ch <- req
133 }
134 return nil
135}
136
Neha Sharma94f16a92020-06-26 04:17:55 +0000137func (kc *KafkaClient) SendLiveness(ctx context.Context) error {
khenaidoo59ce9dd2019-11-11 13:05:32 -0500138 return status.Error(codes.Unimplemented, "SendLiveness")
139}
140
Neha Sharma94f16a92020-06-26 04:17:55 +0000141func (kc *KafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
142 logger.Debug(ctx, "EnableLivenessChannel - unimplemented")
khenaidoo59ce9dd2019-11-11 13:05:32 -0500143 return nil
144}
Scott Baker0fef6982019-12-12 09:49:42 -0800145
Neha Sharma94f16a92020-06-26 04:17:55 +0000146func (kc *KafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
147 logger.Debug(ctx, "EnableHealthinessChannel - unimplemented")
Scott Baker0fef6982019-12-12 09:49:42 -0800148 return nil
149}