blob: 0e35ec13f720f796c9017d0eaeb033efa9bb0335 [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
khenaidoob6238b32020-04-07 12:07:36 -040017package kafka
khenaidoo59ce9dd2019-11-11 13:05:32 -050018
19import (
khenaidoo59ce9dd2019-11-11 13:05:32 -050020 "testing"
21 "time"
serkant.uluderyab38671c2019-11-01 09:35:38 -070022
23 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
24 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
25 "github.com/stretchr/testify/assert"
khenaidoo59ce9dd2019-11-11 13:05:32 -050026)
27
khenaidoo59ce9dd2019-11-11 13:05:32 -050028func TestKafkaClientCreateTopic(t *testing.T) {
29 cTkc := NewKafkaClient()
30 topic := kafka.Topic{Name: "myTopic"}
31 err := cTkc.CreateTopic(&topic, 1, 1)
32 assert.Nil(t, err)
33 err = cTkc.CreateTopic(&topic, 1, 1)
34 assert.NotNil(t, err)
35}
36
37func TestKafkaClientDeleteTopic(t *testing.T) {
38 cTkc := NewKafkaClient()
39 topic := kafka.Topic{Name: "myTopic"}
40 err := cTkc.DeleteTopic(&topic)
41 assert.Nil(t, err)
42}
43
44func TestKafkaClientSubscribeSend(t *testing.T) {
45 cTkc := NewKafkaClient()
46 topic := kafka.Topic{Name: "myTopic"}
47 ch, err := cTkc.Subscribe(&topic)
48 assert.Nil(t, err)
49 assert.NotNil(t, ch)
50 testCh := make(chan bool)
51 maxWait := 5 * time.Millisecond
52 msg := &ic.InterContainerMessage{
53 Header: &ic.Header{Id: "1234", ToTopic: topic.Name},
54 Body: nil,
55 }
56 timer := time.NewTimer(maxWait)
57 defer timer.Stop()
58 go func() {
59 select {
60 case val, ok := <-ch:
61 assert.True(t, ok)
62 assert.Equal(t, val, msg)
63 testCh <- true
64 case <-timer.C:
65 testCh <- false
66 }
67 }()
68 err = cTkc.Send(msg, &topic)
69 assert.Nil(t, err)
70 res := <-testCh
71 assert.True(t, res)
72}
73
74func TestKafkaClientUnSubscribe(t *testing.T) {
75 cTkc := NewKafkaClient()
76 topic := kafka.Topic{Name: "myTopic"}
77 ch, err := cTkc.Subscribe(&topic)
78 assert.Nil(t, err)
79 assert.NotNil(t, ch)
80 err = cTkc.UnSubscribe(&topic, ch)
81 assert.Nil(t, err)
82}
83
84func TestKafkaClientStop(t *testing.T) {
85 cTkc := NewKafkaClient()
86 topic := kafka.Topic{Name: "myTopic"}
87 ch, err := cTkc.Subscribe(&topic)
88 assert.Nil(t, err)
89 assert.NotNil(t, ch)
90 err = cTkc.UnSubscribe(&topic, ch)
91 assert.Nil(t, err)
92 cTkc.Stop()
93}