blob: cce5d7e1b4d2d3628cc13011af55c9ec87fcff06 [file] [log] [blame]
Kent Hagerman0ab4cb22019-04-24 13:13:35 -04001// +build integration
2
khenaidoo4c1a5bf2018-11-29 15:53:42 -05003/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package kafka
19
20import (
21 "fmt"
npujar12732342019-11-14 17:28:40 +053022 "os"
23 "testing"
24 "time"
25
khenaidoo4c1a5bf2018-11-29 15:53:42 -050026 "github.com/golang/protobuf/ptypes"
27 "github.com/golang/protobuf/ptypes/any"
28 "github.com/google/uuid"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080029 kk "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
30 "github.com/opencord/voltha-lib-go/v3/pkg/log"
31 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050032 "github.com/stretchr/testify/assert"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050033)
34
35/*
36Prerequite: Start the kafka/zookeeper containers.
37*/
38
39var partionClient kk.Client
40var groupClient kk.Client
41var totalTime int64
42var numMessageToSend int
43var totalMessageReceived int
44
45type sendToKafka func(interface{}, *kk.Topic, ...string) error
46
47func init() {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050048 hostIP := os.Getenv("DOCKER_HOST_IP")
khenaidoo4c1a5bf2018-11-29 15:53:42 -050049 partionClient = kk.NewSaramaClient(
50 kk.ConsumerType(kk.PartitionConsumer),
51 kk.Host(hostIP),
52 kk.Port(9092),
53 kk.AutoCreateTopic(true),
54 kk.ProducerFlushFrequency(5))
55 partionClient.Start()
56 groupClient = kk.NewSaramaClient(
57 kk.ConsumerType(kk.GroupCustomer),
58 kk.Host(hostIP),
59 kk.Port(9092),
60 kk.AutoCreateTopic(false),
61 kk.ProducerFlushFrequency(5))
62 groupClient.Start()
63 numMessageToSend = 1
64}
65
khenaidoo79232702018-12-04 11:00:41 -050066func waitForMessage(ch <-chan *ic.InterContainerMessage, doneCh chan string, maxMessages int) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050067 totalTime = 0
68 totalMessageReceived = 0
69 mytime := time.Now()
70startloop:
71 for {
72 select {
73 case msg := <-ch:
74 if totalMessageReceived == 0 {
75 mytime = time.Now()
76 }
77 totalTime = totalTime + (time.Now().UnixNano()-msg.Header.Timestamp)/int64(time.Millisecond)
Rohan Agrawal31f21802020-06-12 05:38:46 +000078 //logger.Debugw(ctx, "msg-received", log.Fields{"msg":msg})
khenaidoo4c1a5bf2018-11-29 15:53:42 -050079 totalMessageReceived = totalMessageReceived + 1
80 if totalMessageReceived == maxMessages {
81 doneCh <- "All received"
82 break startloop
83 }
84 if totalMessageReceived%10000 == 0 {
85 fmt.Println("received-so-far", totalMessageReceived, totalTime, totalTime/int64(totalMessageReceived))
86 }
87 }
88 }
Rohan Agrawal31f21802020-06-12 05:38:46 +000089 logger.Infow(ctx, "Received all messages", log.Fields{"total": time.Since(mytime)})
khenaidoo4c1a5bf2018-11-29 15:53:42 -050090}
91
92func sendMessages(topic *kk.Topic, numMessages int, fn sendToKafka) error {
93 // Loop for numMessages
94 for i := 0; i < numMessages; i++ {
khenaidoo79232702018-12-04 11:00:41 -050095 msg := &ic.InterContainerMessage{}
96 msg.Header = &ic.Header{
khenaidoo4c1a5bf2018-11-29 15:53:42 -050097 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -050098 Type: ic.MessageType_REQUEST,
khenaidoo4c1a5bf2018-11-29 15:53:42 -050099 FromTopic: topic.Name,
100 ToTopic: topic.Name,
101 Timestamp: time.Now().UnixNano(),
102 }
103 var marshalledArg *any.Any
104 var err error
khenaidoo79232702018-12-04 11:00:41 -0500105 body := &ic.InterContainerRequestBody{Rpc: "testRPC", Args: []*ic.Argument{}}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500106 if marshalledArg, err = ptypes.MarshalAny(body); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000107 logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500108 return err
109 }
110 msg.Body = marshalledArg
111 msg.Header.Timestamp = time.Now().UnixNano()
112 go fn(msg, topic)
113 //go partionClient.Send(msg, topic)
114 }
115 return nil
116}
117
118func runWithPartionConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
khenaidoo79232702018-12-04 11:00:41 -0500119 var ch <-chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500120 var err error
121 if ch, err = partionClient.Subscribe(topic); err != nil {
122 return nil
123 }
124 go waitForMessage(ch, doneCh, numMessages)
125
126 //Now create a routine to send messages
127 go sendMessages(topic, numMessages, partionClient.Send)
128
129 return nil
130}
131
132func runWithGroupConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
khenaidoo79232702018-12-04 11:00:41 -0500133 var ch <-chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500134 var err error
135 if ch, err = groupClient.Subscribe(topic); err != nil {
136 return nil
137 }
138 go waitForMessage(ch, doneCh, numMessages)
139
140 //Now create a routine to send messages
141 go sendMessages(topic, numMessages, groupClient.Send)
142
143 return nil
144}
145
146func TestPartitionConsumer(t *testing.T) {
147 done := make(chan string)
148 topic := &kk.Topic{Name: "CoreTest1"}
149 runWithPartionConsumer(topic, numMessageToSend, done)
150 start := time.Now()
151 // Wait for done
152 val := <-done
153 err := partionClient.DeleteTopic(topic)
154 assert.Nil(t, err)
155 partionClient.Stop()
156 assert.Equal(t, numMessageToSend, totalMessageReceived)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000157 logger.Infow(ctx, "Partition consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500158}
159
160func TestGroupConsumer(t *testing.T) {
161 done := make(chan string)
162 topic := &kk.Topic{Name: "CoreTest2"}
163 runWithGroupConsumer(topic, numMessageToSend, done)
164 start := time.Now()
165 // Wait for done
166 val := <-done
167 err := groupClient.DeleteTopic(topic)
168 assert.Nil(t, err)
169 groupClient.Stop()
170 assert.Equal(t, numMessageToSend, totalMessageReceived)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000171 logger.Infow(ctx, "Group consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500172
173}
174
175func TestCreateDeleteTopic(t *testing.T) {
176 hostIP := os.Getenv("DOCKER_HOST_IP")
177 client := kk.NewSaramaClient(
178 kk.ConsumerType(kk.PartitionConsumer),
179 kk.Host(hostIP),
180 kk.Port(9092),
181 kk.AutoCreateTopic(true),
182 kk.ProducerFlushFrequency(5))
183 client.Start()
184 topic := &kk.Topic{Name: "CoreTest20"}
185 err := client.CreateTopic(topic, 3, 1)
186 assert.Nil(t, err)
187 err = client.DeleteTopic(topic)
188 assert.Nil(t, err)
189}