blob: f6eadfff0b2899050442423d30dbb86994b9bacf [file] [log] [blame]
Kent Hagerman0ab4cb22019-04-24 13:13:35 -04001// +build integration
2
khenaidoo4c1a5bf2018-11-29 15:53:42 -05003/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package kafka
19
20import (
21 "fmt"
22 "github.com/golang/protobuf/ptypes"
23 "github.com/golang/protobuf/ptypes/any"
24 "github.com/google/uuid"
25 "github.com/opencord/voltha-go/common/log"
26 kk "github.com/opencord/voltha-go/kafka"
William Kurkiandaa6bb22019-03-07 12:26:28 -050027 ic "github.com/opencord/voltha-protos/go/inter_container"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050028 "github.com/stretchr/testify/assert"
29 "os"
30 "testing"
31 "time"
32)
33
34/*
35Prerequite: Start the kafka/zookeeper containers.
36*/
37
38var partionClient kk.Client
39var groupClient kk.Client
40var totalTime int64
41var numMessageToSend int
42var totalMessageReceived int
43
44type sendToKafka func(interface{}, *kk.Topic, ...string) error
45
46func init() {
47 log.AddPackage(log.JSON, log.ErrorLevel, nil)
48 hostIP := os.Getenv("DOCKER_HOST_IP")
49 log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
50 log.SetAllLogLevel(log.ErrorLevel)
51 partionClient = kk.NewSaramaClient(
52 kk.ConsumerType(kk.PartitionConsumer),
53 kk.Host(hostIP),
54 kk.Port(9092),
55 kk.AutoCreateTopic(true),
56 kk.ProducerFlushFrequency(5))
57 partionClient.Start()
58 groupClient = kk.NewSaramaClient(
59 kk.ConsumerType(kk.GroupCustomer),
60 kk.Host(hostIP),
61 kk.Port(9092),
62 kk.AutoCreateTopic(false),
63 kk.ProducerFlushFrequency(5))
64 groupClient.Start()
65 numMessageToSend = 1
66}
67
khenaidoo79232702018-12-04 11:00:41 -050068func waitForMessage(ch <-chan *ic.InterContainerMessage, doneCh chan string, maxMessages int) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050069 totalTime = 0
70 totalMessageReceived = 0
71 mytime := time.Now()
72startloop:
73 for {
74 select {
75 case msg := <-ch:
76 if totalMessageReceived == 0 {
77 mytime = time.Now()
78 }
79 totalTime = totalTime + (time.Now().UnixNano()-msg.Header.Timestamp)/int64(time.Millisecond)
80 //log.Debugw("msg-received", log.Fields{"msg":msg})
81 totalMessageReceived = totalMessageReceived + 1
82 if totalMessageReceived == maxMessages {
83 doneCh <- "All received"
84 break startloop
85 }
86 if totalMessageReceived%10000 == 0 {
87 fmt.Println("received-so-far", totalMessageReceived, totalTime, totalTime/int64(totalMessageReceived))
88 }
89 }
90 }
91 log.Infow("Received all messages", log.Fields{"total": time.Since(mytime)})
92}
93
94func sendMessages(topic *kk.Topic, numMessages int, fn sendToKafka) error {
95 // Loop for numMessages
96 for i := 0; i < numMessages; i++ {
khenaidoo79232702018-12-04 11:00:41 -050097 msg := &ic.InterContainerMessage{}
98 msg.Header = &ic.Header{
khenaidoo4c1a5bf2018-11-29 15:53:42 -050099 Id: uuid.New().String(),
khenaidoo79232702018-12-04 11:00:41 -0500100 Type: ic.MessageType_REQUEST,
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500101 FromTopic: topic.Name,
102 ToTopic: topic.Name,
103 Timestamp: time.Now().UnixNano(),
104 }
105 var marshalledArg *any.Any
106 var err error
khenaidoo79232702018-12-04 11:00:41 -0500107 body := &ic.InterContainerRequestBody{Rpc: "testRPC", Args: []*ic.Argument{}}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500108 if marshalledArg, err = ptypes.MarshalAny(body); err != nil {
109 log.Warnw("cannot-marshal-request", log.Fields{"error": err})
110 return err
111 }
112 msg.Body = marshalledArg
113 msg.Header.Timestamp = time.Now().UnixNano()
114 go fn(msg, topic)
115 //go partionClient.Send(msg, topic)
116 }
117 return nil
118}
119
120func runWithPartionConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
khenaidoo79232702018-12-04 11:00:41 -0500121 var ch <-chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500122 var err error
123 if ch, err = partionClient.Subscribe(topic); err != nil {
124 return nil
125 }
126 go waitForMessage(ch, doneCh, numMessages)
127
128 //Now create a routine to send messages
129 go sendMessages(topic, numMessages, partionClient.Send)
130
131 return nil
132}
133
134func runWithGroupConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
khenaidoo79232702018-12-04 11:00:41 -0500135 var ch <-chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500136 var err error
137 if ch, err = groupClient.Subscribe(topic); err != nil {
138 return nil
139 }
140 go waitForMessage(ch, doneCh, numMessages)
141
142 //Now create a routine to send messages
143 go sendMessages(topic, numMessages, groupClient.Send)
144
145 return nil
146}
147
148func TestPartitionConsumer(t *testing.T) {
149 done := make(chan string)
150 topic := &kk.Topic{Name: "CoreTest1"}
151 runWithPartionConsumer(topic, numMessageToSend, done)
152 start := time.Now()
153 // Wait for done
154 val := <-done
155 err := partionClient.DeleteTopic(topic)
156 assert.Nil(t, err)
157 partionClient.Stop()
158 assert.Equal(t, numMessageToSend, totalMessageReceived)
159 log.Infow("Partition consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
160}
161
162func TestGroupConsumer(t *testing.T) {
163 done := make(chan string)
164 topic := &kk.Topic{Name: "CoreTest2"}
165 runWithGroupConsumer(topic, numMessageToSend, done)
166 start := time.Now()
167 // Wait for done
168 val := <-done
169 err := groupClient.DeleteTopic(topic)
170 assert.Nil(t, err)
171 groupClient.Stop()
172 assert.Equal(t, numMessageToSend, totalMessageReceived)
173 log.Infow("Group consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
174
175}
176
177func TestCreateDeleteTopic(t *testing.T) {
178 hostIP := os.Getenv("DOCKER_HOST_IP")
179 client := kk.NewSaramaClient(
180 kk.ConsumerType(kk.PartitionConsumer),
181 kk.Host(hostIP),
182 kk.Port(9092),
183 kk.AutoCreateTopic(true),
184 kk.ProducerFlushFrequency(5))
185 client.Start()
186 topic := &kk.Topic{Name: "CoreTest20"}
187 err := client.CreateTopic(topic, 3, 1)
188 assert.Nil(t, err)
189 err = client.DeleteTopic(topic)
190 assert.Nil(t, err)
191}