blob: 268c57166291ee349f70c6704a3454a76290f2d5 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Matteo Scandolod525ae32020-04-02 17:27:29 -070016package kafka
khenaidooab1f7bd2019-11-14 14:00:27 -050017
18import (
Rohan Agrawal31f21802020-06-12 05:38:46 +000019 "context"
khenaidooab1f7bd2019-11-14 14:00:27 -050020 "fmt"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080021 "sync"
Scott Baker504b4802020-04-17 10:12:20 -070022 "time"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080023
Maninderdfadc982020-10-28 14:04:33 +053024 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
25 "github.com/opencord/voltha-lib-go/v4/pkg/log"
26 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
khenaidooab1f7bd2019-11-14 14:00:27 -050027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
khenaidooab1f7bd2019-11-14 14:00:27 -050029)
30
npujar467fe752020-01-16 20:17:45 +053031// static check to ensure KafkaClient implements kafka.Client
32var _ kafka.Client = &KafkaClient{}
33
khenaidooab1f7bd2019-11-14 14:00:27 -050034type KafkaClient struct {
35 topicsChannelMap map[string][]chan *ic.InterContainerMessage
36 lock sync.RWMutex
37}
38
39func NewKafkaClient() *KafkaClient {
40 return &KafkaClient{
41 topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
42 lock: sync.RWMutex{},
43 }
44}
45
Rohan Agrawal31f21802020-06-12 05:38:46 +000046func (kc *KafkaClient) Start(ctx context.Context) error {
47 logger.Debug(ctx, "kafka-client-started")
khenaidooab1f7bd2019-11-14 14:00:27 -050048 return nil
49}
50
Rohan Agrawal31f21802020-06-12 05:38:46 +000051func (kc *KafkaClient) Stop(ctx context.Context) {
khenaidooab1f7bd2019-11-14 14:00:27 -050052 kc.lock.Lock()
53 defer kc.lock.Unlock()
54 for topic, chnls := range kc.topicsChannelMap {
55 for _, c := range chnls {
56 close(c)
57 }
58 delete(kc.topicsChannelMap, topic)
59 }
Rohan Agrawal31f21802020-06-12 05:38:46 +000060 logger.Debug(ctx, "kafka-client-stopped")
khenaidooab1f7bd2019-11-14 14:00:27 -050061}
62
Rohan Agrawal31f21802020-06-12 05:38:46 +000063func (kc *KafkaClient) CreateTopic(ctx context.Context, topic *kafka.Topic, numPartition int, repFactor int) error {
64 logger.Debugw(ctx, "CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidooab1f7bd2019-11-14 14:00:27 -050065 kc.lock.Lock()
66 defer kc.lock.Unlock()
67 if _, ok := kc.topicsChannelMap[topic.Name]; ok {
68 return fmt.Errorf("Topic %s already exist", topic.Name)
69 }
70 ch := make(chan *ic.InterContainerMessage)
71 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
72 return nil
73}
74
Rohan Agrawal31f21802020-06-12 05:38:46 +000075func (kc *KafkaClient) DeleteTopic(ctx context.Context, topic *kafka.Topic) error {
76 logger.Debugw(ctx, "DeleteTopic", log.Fields{"topic": topic.Name})
khenaidooab1f7bd2019-11-14 14:00:27 -050077 kc.lock.Lock()
78 defer kc.lock.Unlock()
79 delete(kc.topicsChannelMap, topic.Name)
80 return nil
81}
82
Rohan Agrawal31f21802020-06-12 05:38:46 +000083func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
84 logger.Debugw(ctx, "Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
khenaidooab1f7bd2019-11-14 14:00:27 -050085 kc.lock.Lock()
86 defer kc.lock.Unlock()
87 ch := make(chan *ic.InterContainerMessage)
88 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
89 return ch, nil
90}
91
92func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
93 s[i] = s[len(s)-1]
94 return s[:len(s)-1]
95}
96
Rohan Agrawal31f21802020-06-12 05:38:46 +000097func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
98 logger.Debugw(ctx, "UnSubscribe", log.Fields{"topic": topic.Name})
khenaidooab1f7bd2019-11-14 14:00:27 -050099 kc.lock.Lock()
100 defer kc.lock.Unlock()
101 if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
102 idx := -1
103 for i, c := range chnls {
104 if c == ch {
105 close(c)
106 idx = i
107 }
108 }
109 if idx >= 0 {
110 kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
111 }
112 }
113 return nil
114}
115
Rohan Agrawal31f21802020-06-12 05:38:46 +0000116func (kc *KafkaClient) SubscribeForMetadata(ctx context.Context, _ func(fromTopic string, timestamp time.Time)) {
117 logger.Debug(ctx, "SubscribeForMetadata - unimplemented")
npujar467fe752020-01-16 20:17:45 +0530118}
119
Rohan Agrawal31f21802020-06-12 05:38:46 +0000120func (kc *KafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
khenaidooab1f7bd2019-11-14 14:00:27 -0500121 req, ok := msg.(*ic.InterContainerMessage)
122 if !ok {
123 return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
124 }
125 if req == nil {
126 return status.Error(codes.InvalidArgument, "msg-nil")
127 }
128 kc.lock.RLock()
129 defer kc.lock.RUnlock()
130 for _, ch := range kc.topicsChannelMap[topic.Name] {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000131 logger.Debugw(ctx, "Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
khenaidooab1f7bd2019-11-14 14:00:27 -0500132 ch <- req
133 }
134 return nil
135}
136
Rohan Agrawal31f21802020-06-12 05:38:46 +0000137func (kc *KafkaClient) SendLiveness(ctx context.Context) error {
khenaidooab1f7bd2019-11-14 14:00:27 -0500138 return status.Error(codes.Unimplemented, "SendLiveness")
139}
140
Rohan Agrawal31f21802020-06-12 05:38:46 +0000141func (kc *KafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
142 logger.Debug(ctx, "EnableLivenessChannel - unimplemented")
khenaidooab1f7bd2019-11-14 14:00:27 -0500143 return nil
144}
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800145
Rohan Agrawal31f21802020-06-12 05:38:46 +0000146func (kc *KafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
147 logger.Debug(ctx, "EnableHealthinessChannel - unimplemented")
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800148 return nil
149}