blob: 7c5508bc0cf4d30a08cbb5dd829d8e863172a7ab [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidoob6238b32020-04-07 12:07:36 -040016package kafka
khenaidoo59ce9dd2019-11-11 13:05:32 -050017
18import (
19 "fmt"
serkant.uluderyab38671c2019-11-01 09:35:38 -070020 "sync"
Scott Baker84a55ce2020-04-17 10:11:30 -070021 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070022
23 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
24 "github.com/opencord/voltha-lib-go/v3/pkg/log"
25 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
khenaidoo59ce9dd2019-11-11 13:05:32 -050026 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/status"
khenaidoo59ce9dd2019-11-11 13:05:32 -050028)
29
Kent Hagermanccfa2132019-12-17 13:29:34 -050030// static check to ensure KafkaClient implements kafka.Client
31var _ kafka.Client = &KafkaClient{}
32
khenaidoo59ce9dd2019-11-11 13:05:32 -050033type KafkaClient struct {
34 topicsChannelMap map[string][]chan *ic.InterContainerMessage
35 lock sync.RWMutex
36}
37
38func NewKafkaClient() *KafkaClient {
39 return &KafkaClient{
40 topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
41 lock: sync.RWMutex{},
42 }
43}
44
Scott Bakere6685952020-06-23 04:05:39 +000045func (kc *KafkaClient) Start() error {
46 logger.Debug("kafka-client-started")
khenaidoo59ce9dd2019-11-11 13:05:32 -050047 return nil
48}
49
Scott Bakere6685952020-06-23 04:05:39 +000050func (kc *KafkaClient) Stop() {
khenaidoo59ce9dd2019-11-11 13:05:32 -050051 kc.lock.Lock()
52 defer kc.lock.Unlock()
53 for topic, chnls := range kc.topicsChannelMap {
54 for _, c := range chnls {
55 close(c)
56 }
57 delete(kc.topicsChannelMap, topic)
58 }
Scott Bakere6685952020-06-23 04:05:39 +000059 logger.Debug("kafka-client-stopped")
khenaidoo59ce9dd2019-11-11 13:05:32 -050060}
61
Scott Bakere6685952020-06-23 04:05:39 +000062func (kc *KafkaClient) CreateTopic(topic *kafka.Topic, numPartition int, repFactor int) error {
63 logger.Debugw("CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidoo59ce9dd2019-11-11 13:05:32 -050064 kc.lock.Lock()
65 defer kc.lock.Unlock()
66 if _, ok := kc.topicsChannelMap[topic.Name]; ok {
67 return fmt.Errorf("Topic %s already exist", topic.Name)
68 }
69 ch := make(chan *ic.InterContainerMessage)
70 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
71 return nil
72}
73
Scott Bakere6685952020-06-23 04:05:39 +000074func (kc *KafkaClient) DeleteTopic(topic *kafka.Topic) error {
75 logger.Debugw("DeleteTopic", log.Fields{"topic": topic.Name})
khenaidoo59ce9dd2019-11-11 13:05:32 -050076 kc.lock.Lock()
77 defer kc.lock.Unlock()
78 delete(kc.topicsChannelMap, topic.Name)
79 return nil
80}
81
Scott Bakere6685952020-06-23 04:05:39 +000082func (kc *KafkaClient) Subscribe(topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
83 logger.Debugw("Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
khenaidoo59ce9dd2019-11-11 13:05:32 -050084 kc.lock.Lock()
85 defer kc.lock.Unlock()
86 ch := make(chan *ic.InterContainerMessage)
87 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
88 return ch, nil
89}
90
91func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
92 s[i] = s[len(s)-1]
93 return s[:len(s)-1]
94}
95
Scott Bakere6685952020-06-23 04:05:39 +000096func (kc *KafkaClient) UnSubscribe(topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
97 logger.Debugw("UnSubscribe", log.Fields{"topic": topic.Name})
khenaidoo59ce9dd2019-11-11 13:05:32 -050098 kc.lock.Lock()
99 defer kc.lock.Unlock()
100 if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
101 idx := -1
102 for i, c := range chnls {
103 if c == ch {
104 close(c)
105 idx = i
106 }
107 }
108 if idx >= 0 {
109 kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
110 }
111 }
112 return nil
113}
114
Scott Bakere6685952020-06-23 04:05:39 +0000115func (kc *KafkaClient) SubscribeForMetadata(_ func(fromTopic string, timestamp time.Time)) {
116 logger.Debug("SubscribeForMetadata - unimplemented")
Kent Hagermanccfa2132019-12-17 13:29:34 -0500117}
118
Scott Bakere6685952020-06-23 04:05:39 +0000119func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
khenaidoo59ce9dd2019-11-11 13:05:32 -0500120 req, ok := msg.(*ic.InterContainerMessage)
121 if !ok {
122 return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
123 }
124 if req == nil {
125 return status.Error(codes.InvalidArgument, "msg-nil")
126 }
127 kc.lock.RLock()
128 defer kc.lock.RUnlock()
129 for _, ch := range kc.topicsChannelMap[topic.Name] {
Scott Bakere6685952020-06-23 04:05:39 +0000130 logger.Debugw("Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
khenaidoo59ce9dd2019-11-11 13:05:32 -0500131 ch <- req
132 }
133 return nil
134}
135
Scott Bakere6685952020-06-23 04:05:39 +0000136func (kc *KafkaClient) SendLiveness() error {
khenaidoo59ce9dd2019-11-11 13:05:32 -0500137 return status.Error(codes.Unimplemented, "SendLiveness")
138}
139
Scott Bakere6685952020-06-23 04:05:39 +0000140func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
141 logger.Debug("EnableLivenessChannel - unimplemented")
khenaidoo59ce9dd2019-11-11 13:05:32 -0500142 return nil
143}
Scott Baker0fef6982019-12-12 09:49:42 -0800144
Scott Bakere6685952020-06-23 04:05:39 +0000145func (kc *KafkaClient) EnableHealthinessChannel(enable bool) chan bool {
146 logger.Debug("EnableHealthinessChannel - unimplemented")
Scott Baker0fef6982019-12-12 09:49:42 -0800147 return nil
148}