blob: 62af5db66927f6258c411184e080b80e45754673 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package mocks
17
18import (
19 "fmt"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080020 "sync"
21
22 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
23 "github.com/opencord/voltha-lib-go/v3/pkg/log"
24 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
khenaidooab1f7bd2019-11-14 14:00:27 -050025 "google.golang.org/grpc/codes"
26 "google.golang.org/grpc/status"
khenaidooab1f7bd2019-11-14 14:00:27 -050027)
28
npujar467fe752020-01-16 20:17:45 +053029// static check to ensure KafkaClient implements kafka.Client
30var _ kafka.Client = &KafkaClient{}
31
khenaidooab1f7bd2019-11-14 14:00:27 -050032type KafkaClient struct {
33 topicsChannelMap map[string][]chan *ic.InterContainerMessage
34 lock sync.RWMutex
35}
36
37func NewKafkaClient() *KafkaClient {
38 return &KafkaClient{
39 topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
40 lock: sync.RWMutex{},
41 }
42}
43
44func (kc *KafkaClient) Start() error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080045 logger.Debug("kafka-client-started")
khenaidooab1f7bd2019-11-14 14:00:27 -050046 return nil
47}
48
49func (kc *KafkaClient) Stop() {
50 kc.lock.Lock()
51 defer kc.lock.Unlock()
52 for topic, chnls := range kc.topicsChannelMap {
53 for _, c := range chnls {
54 close(c)
55 }
56 delete(kc.topicsChannelMap, topic)
57 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -080058 logger.Debug("kafka-client-stopped")
khenaidooab1f7bd2019-11-14 14:00:27 -050059}
60
61func (kc *KafkaClient) CreateTopic(topic *kafka.Topic, numPartition int, repFactor int) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080062 logger.Debugw("CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidooab1f7bd2019-11-14 14:00:27 -050063 kc.lock.Lock()
64 defer kc.lock.Unlock()
65 if _, ok := kc.topicsChannelMap[topic.Name]; ok {
66 return fmt.Errorf("Topic %s already exist", topic.Name)
67 }
68 ch := make(chan *ic.InterContainerMessage)
69 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
70 return nil
71}
72
73func (kc *KafkaClient) DeleteTopic(topic *kafka.Topic) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080074 logger.Debugw("DeleteTopic", log.Fields{"topic": topic.Name})
khenaidooab1f7bd2019-11-14 14:00:27 -050075 kc.lock.Lock()
76 defer kc.lock.Unlock()
77 delete(kc.topicsChannelMap, topic.Name)
78 return nil
79}
80
81func (kc *KafkaClient) Subscribe(topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080082 logger.Debugw("Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
khenaidooab1f7bd2019-11-14 14:00:27 -050083 kc.lock.Lock()
84 defer kc.lock.Unlock()
85 ch := make(chan *ic.InterContainerMessage)
86 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
87 return ch, nil
88}
89
90func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
91 s[i] = s[len(s)-1]
92 return s[:len(s)-1]
93}
94
95func (kc *KafkaClient) UnSubscribe(topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080096 logger.Debugw("UnSubscribe", log.Fields{"topic": topic.Name})
khenaidooab1f7bd2019-11-14 14:00:27 -050097 kc.lock.Lock()
98 defer kc.lock.Unlock()
99 if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
100 idx := -1
101 for i, c := range chnls {
102 if c == ch {
103 close(c)
104 idx = i
105 }
106 }
107 if idx >= 0 {
108 kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
109 }
110 }
111 return nil
112}
113
npujar467fe752020-01-16 20:17:45 +0530114func (kc *KafkaClient) SubscribeForMetadata(_ func(fromTopic string, timestamp int64)) {
115 panic("unimplemented")
116}
117
khenaidooab1f7bd2019-11-14 14:00:27 -0500118func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
119 req, ok := msg.(*ic.InterContainerMessage)
120 if !ok {
121 return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
122 }
123 if req == nil {
124 return status.Error(codes.InvalidArgument, "msg-nil")
125 }
126 kc.lock.RLock()
127 defer kc.lock.RUnlock()
128 for _, ch := range kc.topicsChannelMap[topic.Name] {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800129 logger.Debugw("Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
khenaidooab1f7bd2019-11-14 14:00:27 -0500130 ch <- req
131 }
132 return nil
133}
134
135func (kc *KafkaClient) SendLiveness() error {
136 return status.Error(codes.Unimplemented, "SendLiveness")
137}
138
139func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
140 return nil
141}
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800142
143func (kc *KafkaClient) EnableHealthinessChannel(enable bool) chan bool {
144 return nil
145}