blob: 5922ce22f4d9bf2838ccda627300ecfb4dd4467a [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidoob6238b32020-04-07 12:07:36 -040016package kafka
khenaidoo59ce9dd2019-11-11 13:05:32 -050017
18import (
19 "fmt"
serkant.uluderyab38671c2019-11-01 09:35:38 -070020 "sync"
21
22 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
23 "github.com/opencord/voltha-lib-go/v3/pkg/log"
24 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
khenaidoo59ce9dd2019-11-11 13:05:32 -050025 "google.golang.org/grpc/codes"
26 "google.golang.org/grpc/status"
khenaidoo59ce9dd2019-11-11 13:05:32 -050027)
28
Kent Hagermanccfa2132019-12-17 13:29:34 -050029// static check to ensure KafkaClient implements kafka.Client
30var _ kafka.Client = &KafkaClient{}
31
khenaidoo59ce9dd2019-11-11 13:05:32 -050032type KafkaClient struct {
33 topicsChannelMap map[string][]chan *ic.InterContainerMessage
34 lock sync.RWMutex
35}
36
37func NewKafkaClient() *KafkaClient {
38 return &KafkaClient{
39 topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
40 lock: sync.RWMutex{},
41 }
42}
43
44func (kc *KafkaClient) Start() error {
khenaidoob332f9b2020-01-16 16:25:26 -050045 logger.Debug("kafka-client-started")
khenaidoo59ce9dd2019-11-11 13:05:32 -050046 return nil
47}
48
49func (kc *KafkaClient) Stop() {
50 kc.lock.Lock()
51 defer kc.lock.Unlock()
52 for topic, chnls := range kc.topicsChannelMap {
53 for _, c := range chnls {
54 close(c)
55 }
56 delete(kc.topicsChannelMap, topic)
57 }
khenaidoob332f9b2020-01-16 16:25:26 -050058 logger.Debug("kafka-client-stopped")
khenaidoo59ce9dd2019-11-11 13:05:32 -050059}
60
61func (kc *KafkaClient) CreateTopic(topic *kafka.Topic, numPartition int, repFactor int) error {
khenaidoob332f9b2020-01-16 16:25:26 -050062 logger.Debugw("CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidoo59ce9dd2019-11-11 13:05:32 -050063 kc.lock.Lock()
64 defer kc.lock.Unlock()
65 if _, ok := kc.topicsChannelMap[topic.Name]; ok {
66 return fmt.Errorf("Topic %s already exist", topic.Name)
67 }
68 ch := make(chan *ic.InterContainerMessage)
69 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
70 return nil
71}
72
73func (kc *KafkaClient) DeleteTopic(topic *kafka.Topic) error {
khenaidoob332f9b2020-01-16 16:25:26 -050074 logger.Debugw("DeleteTopic", log.Fields{"topic": topic.Name})
khenaidoo59ce9dd2019-11-11 13:05:32 -050075 kc.lock.Lock()
76 defer kc.lock.Unlock()
77 delete(kc.topicsChannelMap, topic.Name)
78 return nil
79}
80
81func (kc *KafkaClient) Subscribe(topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidoob332f9b2020-01-16 16:25:26 -050082 logger.Debugw("Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
khenaidoo59ce9dd2019-11-11 13:05:32 -050083 kc.lock.Lock()
84 defer kc.lock.Unlock()
85 ch := make(chan *ic.InterContainerMessage)
86 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
87 return ch, nil
88}
89
90func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
91 s[i] = s[len(s)-1]
92 return s[:len(s)-1]
93}
94
95func (kc *KafkaClient) UnSubscribe(topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoob332f9b2020-01-16 16:25:26 -050096 logger.Debugw("UnSubscribe", log.Fields{"topic": topic.Name})
khenaidoo59ce9dd2019-11-11 13:05:32 -050097 kc.lock.Lock()
98 defer kc.lock.Unlock()
99 if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
100 idx := -1
101 for i, c := range chnls {
102 if c == ch {
103 close(c)
104 idx = i
105 }
106 }
107 if idx >= 0 {
108 kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
109 }
110 }
111 return nil
112}
113
Kent Hagermanccfa2132019-12-17 13:29:34 -0500114func (kc *KafkaClient) SubscribeForMetadata(_ func(fromTopic string, timestamp int64)) {
Kent Hagerman359d93b2020-02-04 17:27:30 -0500115 logger.Debug("SubscribeForMetadata - unimplemented")
Kent Hagermanccfa2132019-12-17 13:29:34 -0500116}
117
khenaidoo59ce9dd2019-11-11 13:05:32 -0500118func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
119 req, ok := msg.(*ic.InterContainerMessage)
120 if !ok {
121 return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
122 }
123 if req == nil {
124 return status.Error(codes.InvalidArgument, "msg-nil")
125 }
126 kc.lock.RLock()
127 defer kc.lock.RUnlock()
128 for _, ch := range kc.topicsChannelMap[topic.Name] {
khenaidoob332f9b2020-01-16 16:25:26 -0500129 logger.Debugw("Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
khenaidoo59ce9dd2019-11-11 13:05:32 -0500130 ch <- req
131 }
132 return nil
133}
134
135func (kc *KafkaClient) SendLiveness() error {
136 return status.Error(codes.Unimplemented, "SendLiveness")
137}
138
139func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
Kent Hagerman359d93b2020-02-04 17:27:30 -0500140 logger.Debug("EnableLivenessChannel - unimplemented")
khenaidoo59ce9dd2019-11-11 13:05:32 -0500141 return nil
142}
Scott Baker0fef6982019-12-12 09:49:42 -0800143
144func (kc *KafkaClient) EnableHealthinessChannel(enable bool) chan bool {
Kent Hagerman359d93b2020-02-04 17:27:30 -0500145 logger.Debug("EnableHealthinessChannel - unimplemented")
Scott Baker0fef6982019-12-12 09:49:42 -0800146 return nil
147}