blob: 73098406e400160a10b9d56aa287be18e0b31919 [file] [log] [blame]
Kent Hagerman0ab4cb22019-04-24 13:13:35 -04001// +build integration
2
khenaidoo4c1a5bf2018-11-29 15:53:42 -05003/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package kafka
19
20import (
21 "fmt"
npujar12732342019-11-14 17:28:40 +053022 "os"
23 "testing"
24 "time"
25
khenaidoo4c1a5bf2018-11-29 15:53:42 -050026 "github.com/golang/protobuf/ptypes"
27 "github.com/golang/protobuf/ptypes/any"
28 "github.com/google/uuid"
Scott Baker807addd2019-10-24 15:16:21 -070029 kk "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
30 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Baker555307d2019-11-04 08:58:01 -080031 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050032 "github.com/stretchr/testify/assert"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050033)
34
35/*
36Prerequite: Start the kafka/zookeeper containers.
37*/
38
39var partionClient kk.Client
40var groupClient kk.Client
41var totalTime int64
42var numMessageToSend int
43var totalMessageReceived int
44
45type sendToKafka func(interface{}, *kk.Topic, ...string) error
46
47func init() {
48 log.AddPackage(log.JSON, log.ErrorLevel, nil)
49 hostIP := os.Getenv("DOCKER_HOST_IP")
50 log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
51 log.SetAllLogLevel(log.ErrorLevel)
52 partionClient = kk.NewSaramaClient(
53 kk.ConsumerType(kk.PartitionConsumer),
54 kk.Host(hostIP),
55 kk.Port(9092),
56 kk.AutoCreateTopic(true),
57 kk.ProducerFlushFrequency(5))
58 partionClient.Start()
59 groupClient = kk.NewSaramaClient(
60 kk.ConsumerType(kk.GroupCustomer),
61 kk.Host(hostIP),
62 kk.Port(9092),
63 kk.AutoCreateTopic(false),
64 kk.ProducerFlushFrequency(5))
65 groupClient.Start()
66 numMessageToSend = 1
67}
68
khenaidoo79232702018-12-04 11:00:41 -050069func waitForMessage(ch <-chan *ic.InterContainerMessage, doneCh chan string, maxMessages int) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050070 totalTime = 0
71 totalMessageReceived = 0
72 mytime := time.Now()
73startloop:
74 for {
75 select {
76 case msg := <-ch:
77 if totalMessageReceived == 0 {
78 mytime = time.Now()
79 }
80 totalTime = totalTime + (time.Now().UnixNano()-msg.Header.Timestamp)/int64(time.Millisecond)
81 //log.Debugw("msg-received", log.Fields{"msg":msg})
82 totalMessageReceived = totalMessageReceived + 1
83 if totalMessageReceived == maxMessages {
84 doneCh <- "All received"
85 break startloop
86 }
87 if totalMessageReceived%10000 == 0 {
88 fmt.Println("received-so-far", totalMessageReceived, totalTime, totalTime/int64(totalMessageReceived))
89 }
90 }
91 }
92 log.Infow("Received all messages", log.Fields{"total": time.Since(mytime)})
93}
94
95func sendMessages(topic *kk.Topic, numMessages int, fn sendToKafka) error {
96 // Loop for numMessages
97 for i := 0; i < numMessages; i++ {
khenaidoo79232702018-12-04 11:00:41 -050098 msg := &ic.InterContainerMessage{}
99 msg.Header = &ic.Header{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500100 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -0500101 Type: ic.MessageType_REQUEST,
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500102 FromTopic: topic.Name,
103 ToTopic: topic.Name,
104 Timestamp: time.Now().UnixNano(),
105 }
106 var marshalledArg *any.Any
107 var err error
khenaidoo79232702018-12-04 11:00:41 -0500108 body := &ic.InterContainerRequestBody{Rpc: "testRPC", Args: []*ic.Argument{}}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500109 if marshalledArg, err = ptypes.MarshalAny(body); err != nil {
110 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
111 return err
112 }
113 msg.Body = marshalledArg
114 msg.Header.Timestamp = time.Now().UnixNano()
115 go fn(msg, topic)
116 //go partionClient.Send(msg, topic)
117 }
118 return nil
119}
120
121func runWithPartionConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
khenaidoo79232702018-12-04 11:00:41 -0500122 var ch <-chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500123 var err error
124 if ch, err = partionClient.Subscribe(topic); err != nil {
125 return nil
126 }
127 go waitForMessage(ch, doneCh, numMessages)
128
129 //Now create a routine to send messages
130 go sendMessages(topic, numMessages, partionClient.Send)
131
132 return nil
133}
134
135func runWithGroupConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
khenaidoo79232702018-12-04 11:00:41 -0500136 var ch <-chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500137 var err error
138 if ch, err = groupClient.Subscribe(topic); err != nil {
139 return nil
140 }
141 go waitForMessage(ch, doneCh, numMessages)
142
143 //Now create a routine to send messages
144 go sendMessages(topic, numMessages, groupClient.Send)
145
146 return nil
147}
148
149func TestPartitionConsumer(t *testing.T) {
150 done := make(chan string)
151 topic := &kk.Topic{Name: "CoreTest1"}
152 runWithPartionConsumer(topic, numMessageToSend, done)
153 start := time.Now()
154 // Wait for done
155 val := <-done
156 err := partionClient.DeleteTopic(topic)
157 assert.Nil(t, err)
158 partionClient.Stop()
159 assert.Equal(t, numMessageToSend, totalMessageReceived)
160 log.Infow("Partition consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
161}
162
163func TestGroupConsumer(t *testing.T) {
164 done := make(chan string)
165 topic := &kk.Topic{Name: "CoreTest2"}
166 runWithGroupConsumer(topic, numMessageToSend, done)
167 start := time.Now()
168 // Wait for done
169 val := <-done
170 err := groupClient.DeleteTopic(topic)
171 assert.Nil(t, err)
172 groupClient.Stop()
173 assert.Equal(t, numMessageToSend, totalMessageReceived)
174 log.Infow("Group consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
175
176}
177
178func TestCreateDeleteTopic(t *testing.T) {
179 hostIP := os.Getenv("DOCKER_HOST_IP")
180 client := kk.NewSaramaClient(
181 kk.ConsumerType(kk.PartitionConsumer),
182 kk.Host(hostIP),
183 kk.Port(9092),
184 kk.AutoCreateTopic(true),
185 kk.ProducerFlushFrequency(5))
186 client.Start()
187 topic := &kk.Topic{Name: "CoreTest20"}
188 err := client.CreateTopic(topic, 3, 1)
189 assert.Nil(t, err)
190 err = client.DeleteTopic(topic)
191 assert.Nil(t, err)
192}