blob: 51bd53216709f286fea12b2c7814664f140024c8 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package mocks
17
18import (
19 "fmt"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080020 "sync"
21
22 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
23 "github.com/opencord/voltha-lib-go/v3/pkg/log"
24 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
khenaidooab1f7bd2019-11-14 14:00:27 -050025 "google.golang.org/grpc/codes"
26 "google.golang.org/grpc/status"
khenaidooab1f7bd2019-11-14 14:00:27 -050027)
28
29type KafkaClient struct {
30 topicsChannelMap map[string][]chan *ic.InterContainerMessage
31 lock sync.RWMutex
32}
33
34func NewKafkaClient() *KafkaClient {
35 return &KafkaClient{
36 topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
37 lock: sync.RWMutex{},
38 }
39}
40
41func (kc *KafkaClient) Start() error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080042 logger.Debug("kafka-client-started")
khenaidooab1f7bd2019-11-14 14:00:27 -050043 return nil
44}
45
46func (kc *KafkaClient) Stop() {
47 kc.lock.Lock()
48 defer kc.lock.Unlock()
49 for topic, chnls := range kc.topicsChannelMap {
50 for _, c := range chnls {
51 close(c)
52 }
53 delete(kc.topicsChannelMap, topic)
54 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -080055 logger.Debug("kafka-client-stopped")
khenaidooab1f7bd2019-11-14 14:00:27 -050056}
57
58func (kc *KafkaClient) CreateTopic(topic *kafka.Topic, numPartition int, repFactor int) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080059 logger.Debugw("CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidooab1f7bd2019-11-14 14:00:27 -050060 kc.lock.Lock()
61 defer kc.lock.Unlock()
62 if _, ok := kc.topicsChannelMap[topic.Name]; ok {
63 return fmt.Errorf("Topic %s already exist", topic.Name)
64 }
65 ch := make(chan *ic.InterContainerMessage)
66 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
67 return nil
68}
69
70func (kc *KafkaClient) DeleteTopic(topic *kafka.Topic) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080071 logger.Debugw("DeleteTopic", log.Fields{"topic": topic.Name})
khenaidooab1f7bd2019-11-14 14:00:27 -050072 kc.lock.Lock()
73 defer kc.lock.Unlock()
74 delete(kc.topicsChannelMap, topic.Name)
75 return nil
76}
77
78func (kc *KafkaClient) Subscribe(topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080079 logger.Debugw("Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
khenaidooab1f7bd2019-11-14 14:00:27 -050080 kc.lock.Lock()
81 defer kc.lock.Unlock()
82 ch := make(chan *ic.InterContainerMessage)
83 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
84 return ch, nil
85}
86
87func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
88 s[i] = s[len(s)-1]
89 return s[:len(s)-1]
90}
91
92func (kc *KafkaClient) UnSubscribe(topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080093 logger.Debugw("UnSubscribe", log.Fields{"topic": topic.Name})
khenaidooab1f7bd2019-11-14 14:00:27 -050094 kc.lock.Lock()
95 defer kc.lock.Unlock()
96 if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
97 idx := -1
98 for i, c := range chnls {
99 if c == ch {
100 close(c)
101 idx = i
102 }
103 }
104 if idx >= 0 {
105 kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
106 }
107 }
108 return nil
109}
110
111func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
112 req, ok := msg.(*ic.InterContainerMessage)
113 if !ok {
114 return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
115 }
116 if req == nil {
117 return status.Error(codes.InvalidArgument, "msg-nil")
118 }
119 kc.lock.RLock()
120 defer kc.lock.RUnlock()
121 for _, ch := range kc.topicsChannelMap[topic.Name] {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800122 logger.Debugw("Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
khenaidooab1f7bd2019-11-14 14:00:27 -0500123 ch <- req
124 }
125 return nil
126}
127
128func (kc *KafkaClient) SendLiveness() error {
129 return status.Error(codes.Unimplemented, "SendLiveness")
130}
131
132func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
133 return nil
134}
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800135
136func (kc *KafkaClient) EnableHealthinessChannel(enable bool) chan bool {
137 return nil
138}