blob: 79cdfbc5b14c1092549b41289400ad29722009f5 [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package mocks
17
18import (
19 "fmt"
20 "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
21 "github.com/opencord/voltha-lib-go/v2/pkg/log"
22 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
23 "google.golang.org/grpc/codes"
24 "google.golang.org/grpc/status"
25 "sync"
26)
27
28type KafkaClient struct {
29 topicsChannelMap map[string][]chan *ic.InterContainerMessage
30 lock sync.RWMutex
31}
32
33func NewKafkaClient() *KafkaClient {
34 return &KafkaClient{
35 topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
36 lock: sync.RWMutex{},
37 }
38}
39
40func (kc *KafkaClient) Start() error {
41 log.Debug("kafka-client-started")
42 return nil
43}
44
45func (kc *KafkaClient) Stop() {
46 kc.lock.Lock()
47 defer kc.lock.Unlock()
48 for topic, chnls := range kc.topicsChannelMap {
49 for _, c := range chnls {
50 close(c)
51 }
52 delete(kc.topicsChannelMap, topic)
53 }
54 log.Debug("kafka-client-stopped")
55}
56
57func (kc *KafkaClient) CreateTopic(topic *kafka.Topic, numPartition int, repFactor int) error {
58 log.Debugw("CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
59 kc.lock.Lock()
60 defer kc.lock.Unlock()
61 if _, ok := kc.topicsChannelMap[topic.Name]; ok {
62 return fmt.Errorf("Topic %s already exist", topic.Name)
63 }
64 ch := make(chan *ic.InterContainerMessage)
65 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
66 return nil
67}
68
69func (kc *KafkaClient) DeleteTopic(topic *kafka.Topic) error {
70 log.Debugw("DeleteTopic", log.Fields{"topic": topic.Name})
71 kc.lock.Lock()
72 defer kc.lock.Unlock()
73 delete(kc.topicsChannelMap, topic.Name)
74 return nil
75}
76
77func (kc *KafkaClient) Subscribe(topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
78 log.Debugw("Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
79 kc.lock.Lock()
80 defer kc.lock.Unlock()
81 ch := make(chan *ic.InterContainerMessage)
82 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
83 return ch, nil
84}
85
86func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
87 s[i] = s[len(s)-1]
88 return s[:len(s)-1]
89}
90
91func (kc *KafkaClient) UnSubscribe(topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
92 log.Debugw("UnSubscribe", log.Fields{"topic": topic.Name})
93 kc.lock.Lock()
94 defer kc.lock.Unlock()
95 if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
96 idx := -1
97 for i, c := range chnls {
98 if c == ch {
99 close(c)
100 idx = i
101 }
102 }
103 if idx >= 0 {
104 kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
105 }
106 }
107 return nil
108}
109
110func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
111 req, ok := msg.(*ic.InterContainerMessage)
112 if !ok {
113 return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
114 }
115 if req == nil {
116 return status.Error(codes.InvalidArgument, "msg-nil")
117 }
118 kc.lock.RLock()
119 defer kc.lock.RUnlock()
120 for _, ch := range kc.topicsChannelMap[topic.Name] {
121 log.Debugw("Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
122 ch <- req
123 }
124 return nil
125}
126
127func (kc *KafkaClient) SendLiveness() error {
128 return status.Error(codes.Unimplemented, "SendLiveness")
129}
130
131func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
132 return nil
133}