blob: 0e35ec13f720f796c9017d0eaeb033efa9bb0335 [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
khenaidoob6238b32020-04-07 12:07:36 -040017package kafka
khenaidoo59ce9dd2019-11-11 13:05:32 -050018
19import (
Scott Bakere6685952020-06-23 04:05:39 +000020 "testing"
21 "time"
22
serkant.uluderyab38671c2019-11-01 09:35:38 -070023 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
24 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
25 "github.com/stretchr/testify/assert"
khenaidoo59ce9dd2019-11-11 13:05:32 -050026)
27
khenaidoo59ce9dd2019-11-11 13:05:32 -050028func TestKafkaClientCreateTopic(t *testing.T) {
29 cTkc := NewKafkaClient()
30 topic := kafka.Topic{Name: "myTopic"}
Scott Bakere6685952020-06-23 04:05:39 +000031 err := cTkc.CreateTopic(&topic, 1, 1)
khenaidoo59ce9dd2019-11-11 13:05:32 -050032 assert.Nil(t, err)
Scott Bakere6685952020-06-23 04:05:39 +000033 err = cTkc.CreateTopic(&topic, 1, 1)
khenaidoo59ce9dd2019-11-11 13:05:32 -050034 assert.NotNil(t, err)
35}
36
37func TestKafkaClientDeleteTopic(t *testing.T) {
38 cTkc := NewKafkaClient()
39 topic := kafka.Topic{Name: "myTopic"}
Scott Bakere6685952020-06-23 04:05:39 +000040 err := cTkc.DeleteTopic(&topic)
khenaidoo59ce9dd2019-11-11 13:05:32 -050041 assert.Nil(t, err)
42}
43
44func TestKafkaClientSubscribeSend(t *testing.T) {
45 cTkc := NewKafkaClient()
46 topic := kafka.Topic{Name: "myTopic"}
Scott Bakere6685952020-06-23 04:05:39 +000047 ch, err := cTkc.Subscribe(&topic)
khenaidoo59ce9dd2019-11-11 13:05:32 -050048 assert.Nil(t, err)
49 assert.NotNil(t, ch)
50 testCh := make(chan bool)
51 maxWait := 5 * time.Millisecond
52 msg := &ic.InterContainerMessage{
53 Header: &ic.Header{Id: "1234", ToTopic: topic.Name},
54 Body: nil,
55 }
56 timer := time.NewTimer(maxWait)
57 defer timer.Stop()
58 go func() {
59 select {
60 case val, ok := <-ch:
61 assert.True(t, ok)
62 assert.Equal(t, val, msg)
63 testCh <- true
64 case <-timer.C:
65 testCh <- false
66 }
67 }()
Scott Bakere6685952020-06-23 04:05:39 +000068 err = cTkc.Send(msg, &topic)
khenaidoo59ce9dd2019-11-11 13:05:32 -050069 assert.Nil(t, err)
70 res := <-testCh
71 assert.True(t, res)
72}
73
74func TestKafkaClientUnSubscribe(t *testing.T) {
75 cTkc := NewKafkaClient()
76 topic := kafka.Topic{Name: "myTopic"}
Scott Bakere6685952020-06-23 04:05:39 +000077 ch, err := cTkc.Subscribe(&topic)
khenaidoo59ce9dd2019-11-11 13:05:32 -050078 assert.Nil(t, err)
79 assert.NotNil(t, ch)
Scott Bakere6685952020-06-23 04:05:39 +000080 err = cTkc.UnSubscribe(&topic, ch)
khenaidoo59ce9dd2019-11-11 13:05:32 -050081 assert.Nil(t, err)
82}
83
84func TestKafkaClientStop(t *testing.T) {
85 cTkc := NewKafkaClient()
86 topic := kafka.Topic{Name: "myTopic"}
Scott Bakere6685952020-06-23 04:05:39 +000087 ch, err := cTkc.Subscribe(&topic)
khenaidoo59ce9dd2019-11-11 13:05:32 -050088 assert.Nil(t, err)
89 assert.NotNil(t, ch)
Scott Bakere6685952020-06-23 04:05:39 +000090 err = cTkc.UnSubscribe(&topic, ch)
khenaidoo59ce9dd2019-11-11 13:05:32 -050091 assert.Nil(t, err)
Scott Bakere6685952020-06-23 04:05:39 +000092 cTkc.Stop()
khenaidoo59ce9dd2019-11-11 13:05:32 -050093}