blob: e4517cc6f3828f06a42d0aa89f92346d5f29b112 [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package mocks
18
19import (
khenaidoo59ce9dd2019-11-11 13:05:32 -050020 "testing"
21 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070022
23 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
24 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
25 "github.com/stretchr/testify/assert"
khenaidoo59ce9dd2019-11-11 13:05:32 -050026)
27
28func TestKafkaClientImplementsKafkaClientIf(t *testing.T) {
29 client := NewKafkaClient()
30
31 if _, ok := interface{}(client).(kafka.Client); !ok {
serkant.uluderyab38671c2019-11-01 09:35:38 -070032 t.Error("mock kafka client does not implement voltha-lib-go/v3/pkg/kafka/Client interface")
khenaidoo59ce9dd2019-11-11 13:05:32 -050033 }
34}
35
36func TestKafkaClientCreateTopic(t *testing.T) {
37 cTkc := NewKafkaClient()
38 topic := kafka.Topic{Name: "myTopic"}
39 err := cTkc.CreateTopic(&topic, 1, 1)
40 assert.Nil(t, err)
41 err = cTkc.CreateTopic(&topic, 1, 1)
42 assert.NotNil(t, err)
43}
44
45func TestKafkaClientDeleteTopic(t *testing.T) {
46 cTkc := NewKafkaClient()
47 topic := kafka.Topic{Name: "myTopic"}
48 err := cTkc.DeleteTopic(&topic)
49 assert.Nil(t, err)
50}
51
52func TestKafkaClientSubscribeSend(t *testing.T) {
53 cTkc := NewKafkaClient()
54 topic := kafka.Topic{Name: "myTopic"}
55 ch, err := cTkc.Subscribe(&topic)
56 assert.Nil(t, err)
57 assert.NotNil(t, ch)
58 testCh := make(chan bool)
59 maxWait := 5 * time.Millisecond
60 msg := &ic.InterContainerMessage{
61 Header: &ic.Header{Id: "1234", ToTopic: topic.Name},
62 Body: nil,
63 }
64 timer := time.NewTimer(maxWait)
65 defer timer.Stop()
66 go func() {
67 select {
68 case val, ok := <-ch:
69 assert.True(t, ok)
70 assert.Equal(t, val, msg)
71 testCh <- true
72 case <-timer.C:
73 testCh <- false
74 }
75 }()
76 err = cTkc.Send(msg, &topic)
77 assert.Nil(t, err)
78 res := <-testCh
79 assert.True(t, res)
80}
81
82func TestKafkaClientUnSubscribe(t *testing.T) {
83 cTkc := NewKafkaClient()
84 topic := kafka.Topic{Name: "myTopic"}
85 ch, err := cTkc.Subscribe(&topic)
86 assert.Nil(t, err)
87 assert.NotNil(t, ch)
88 err = cTkc.UnSubscribe(&topic, ch)
89 assert.Nil(t, err)
90}
91
92func TestKafkaClientStop(t *testing.T) {
93 cTkc := NewKafkaClient()
94 topic := kafka.Topic{Name: "myTopic"}
95 ch, err := cTkc.Subscribe(&topic)
96 assert.Nil(t, err)
97 assert.NotNil(t, ch)
98 err = cTkc.UnSubscribe(&topic, ch)
99 assert.Nil(t, err)
100 cTkc.Stop()
101}