blob: f4707f65521be5d90d45939bb5cd600cb64da9c8 [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package mocks
18
19import (
20 "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
21 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
22 "github.com/stretchr/testify/assert"
23 "testing"
24 "time"
25)
26
27func TestKafkaClientImplementsKafkaClientIf(t *testing.T) {
28 client := NewKafkaClient()
29
30 if _, ok := interface{}(client).(kafka.Client); !ok {
31 t.Error("mock kafka client does not implement voltha-lib-go/v2/pkg/kafka/Client interface")
32 }
33}
34
35func TestKafkaClientCreateTopic(t *testing.T) {
36 cTkc := NewKafkaClient()
37 topic := kafka.Topic{Name: "myTopic"}
38 err := cTkc.CreateTopic(&topic, 1, 1)
39 assert.Nil(t, err)
40 err = cTkc.CreateTopic(&topic, 1, 1)
41 assert.NotNil(t, err)
42}
43
44func TestKafkaClientDeleteTopic(t *testing.T) {
45 cTkc := NewKafkaClient()
46 topic := kafka.Topic{Name: "myTopic"}
47 err := cTkc.DeleteTopic(&topic)
48 assert.Nil(t, err)
49}
50
51func TestKafkaClientSubscribeSend(t *testing.T) {
52 cTkc := NewKafkaClient()
53 topic := kafka.Topic{Name: "myTopic"}
54 ch, err := cTkc.Subscribe(&topic)
55 assert.Nil(t, err)
56 assert.NotNil(t, ch)
57 testCh := make(chan bool)
58 maxWait := 5 * time.Millisecond
59 msg := &ic.InterContainerMessage{
60 Header: &ic.Header{Id: "1234", ToTopic: topic.Name},
61 Body: nil,
62 }
63 timer := time.NewTimer(maxWait)
64 defer timer.Stop()
65 go func() {
66 select {
67 case val, ok := <-ch:
68 assert.True(t, ok)
69 assert.Equal(t, val, msg)
70 testCh <- true
71 case <-timer.C:
72 testCh <- false
73 }
74 }()
75 err = cTkc.Send(msg, &topic)
76 assert.Nil(t, err)
77 res := <-testCh
78 assert.True(t, res)
79}
80
81func TestKafkaClientUnSubscribe(t *testing.T) {
82 cTkc := NewKafkaClient()
83 topic := kafka.Topic{Name: "myTopic"}
84 ch, err := cTkc.Subscribe(&topic)
85 assert.Nil(t, err)
86 assert.NotNil(t, ch)
87 err = cTkc.UnSubscribe(&topic, ch)
88 assert.Nil(t, err)
89}
90
91func TestKafkaClientStop(t *testing.T) {
92 cTkc := NewKafkaClient()
93 topic := kafka.Topic{Name: "myTopic"}
94 ch, err := cTkc.Subscribe(&topic)
95 assert.Nil(t, err)
96 assert.NotNil(t, ch)
97 err = cTkc.UnSubscribe(&topic, ch)
98 assert.Nil(t, err)
99 cTkc.Stop()
100}