blob: 9836793a1eebbbc61ba4fd06553ba479b2f48576 [file] [log] [blame]
Kent Hagerman0ab4cb22019-04-24 13:13:35 -04001// +build integration
2
khenaidoobf6e7bb2018-08-14 22:27:29 -04003/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
khenaidooabad44c2018-08-03 16:58:35 -040018package kafka
19
20import (
21 "context"
22 "github.com/golang/protobuf/ptypes"
khenaidood4d922e2018-08-03 22:35:16 -040023 "github.com/google/uuid"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040024 rhp "github.com/opencord/voltha-go/rw_core/core"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080025 kk "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
26 "github.com/opencord/voltha-lib-go/v3/pkg/log"
27 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
28 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidooabad44c2018-08-03 16:58:35 -040029 "github.com/stretchr/testify/assert"
khenaidoo79232702018-12-04 11:00:41 -050030 "os"
khenaidooabad44c2018-08-03 16:58:35 -040031 "testing"
khenaidooabad44c2018-08-03 16:58:35 -040032 "time"
33)
34
khenaidoo2c6f1672018-09-20 23:14:41 -040035/*
36Prerequite: Start the kafka/zookeeper containers.
37*/
38
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040039const (
40 TEST_RPC_KEY = ""
41)
42
khenaidoo43c82122018-11-22 18:38:28 -050043var coreKafkaProxy *kk.InterContainerProxy
44var adapterKafkaProxy *kk.InterContainerProxy
khenaidoo79232702018-12-04 11:00:41 -050045var kafkaPartitionClient kk.Client
46var affinityRouterTopic string
47var hostIP string
48var kafkaClient kk.Client
khenaidooabad44c2018-08-03 16:58:35 -040049
50func init() {
khenaidoo79232702018-12-04 11:00:41 -050051 affinityRouterTopic = "AffinityRouter"
52 hostIP = os.Getenv("DOCKER_HOST_IP")
53 kafkaClient = kk.NewSaramaClient(
54 kk.Host(hostIP),
khenaidoo43c82122018-11-22 18:38:28 -050055 kk.Port(9092))
khenaidoo2c6f1672018-09-20 23:14:41 -040056
npujar467fe752020-01-16 20:17:45 +053057 coreKafkaProxy = kk.NewInterContainerProxy(
khenaidoo79232702018-12-04 11:00:41 -050058 kk.InterContainerHost(hostIP),
khenaidoo43c82122018-11-22 18:38:28 -050059 kk.InterContainerPort(9092),
60 kk.DefaultTopic(&kk.Topic{Name: "Core"}),
khenaidoo79232702018-12-04 11:00:41 -050061 kk.MsgClient(kafkaClient),
62 kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
khenaidooabad44c2018-08-03 16:58:35 -040063
npujar467fe752020-01-16 20:17:45 +053064 adapterKafkaProxy = kk.NewInterContainerProxy(
khenaidoo79232702018-12-04 11:00:41 -050065 kk.InterContainerHost(hostIP),
khenaidoo43c82122018-11-22 18:38:28 -050066 kk.InterContainerPort(9092),
67 kk.DefaultTopic(&kk.Topic{Name: "Adapter"}),
68 kk.MsgClient(kafkaClient))
khenaidooabad44c2018-08-03 16:58:35 -040069
khenaidoo79232702018-12-04 11:00:41 -050070 kafkaPartitionClient = kk.NewSaramaClient(
71 kk.ConsumerType(kk.PartitionConsumer),
72 kk.Host(hostIP),
73 kk.Port(9092),
74 kk.AutoCreateTopic(true),
75 kk.ProducerFlushFrequency(5))
76 kafkaPartitionClient.Start()
77
khenaidooabad44c2018-08-03 16:58:35 -040078 coreKafkaProxy.Start()
79 adapterKafkaProxy.Start()
80 subscribeTarget(coreKafkaProxy)
81}
82
khenaidoo43c82122018-11-22 18:38:28 -050083func subscribeTarget(kmp *kk.InterContainerProxy) {
khenaidooabad44c2018-08-03 16:58:35 -040084 topic := kk.Topic{Name: "Core"}
khenaidoo2c6f1672018-09-20 23:14:41 -040085 requestProxy := &rhp.AdapterRequestHandlerProxy{TestMode: true}
khenaidoo43c82122018-11-22 18:38:28 -050086 kmp.SubscribeWithRequestHandlerInterface(topic, requestProxy)
khenaidooabad44c2018-08-03 16:58:35 -040087}
88
khenaidoo79232702018-12-04 11:00:41 -050089func waitForRPCMessage(topic kk.Topic, ch <-chan *ic.InterContainerMessage, doneCh chan string) {
khenaidooabad44c2018-08-03 16:58:35 -040090 for msg := range ch {
Girish Kumarf56a4682020-03-20 20:07:46 +000091 logger.Debugw("Got-RPC-message", log.Fields{"msg": msg})
khenaidooabad44c2018-08-03 16:58:35 -040092 // Unpack message
khenaidoo79232702018-12-04 11:00:41 -050093 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -040094 if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
95 doneCh <- "Error"
96 } else {
97 doneCh <- requestBody.Rpc
98 }
99 break
100 }
101}
102
khenaidoo43c82122018-11-22 18:38:28 -0500103//func TestSubscribeUnsubscribe(t *testing.T) {
104// // First subscribe to the specific topic
105// topic := kk.Topic{Name: "Core"}
106// ch, err := coreKafkaProxy.Subs(topic)
107// assert.NotNil(t, ch)
108// assert.Nil(t, err)
109// // Create a channel to receive a response
110// waitCh := make(chan string)
111// // Wait for a message
112// go waitForRPCMessage(topic, ch, waitCh)
113// // Send the message - don't care of the response
114// rpc := "AnyRPCRequestForTest"
115// adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
116// // Wait for the result on ouw own channel
117// result := <-waitCh
118// assert.Equal(t, result, rpc)
119// close(waitCh)
120// err = coreKafkaProxy.UnSubscribe(topic, ch)
121// assert.Nil(t, err)
122//}
123//
124//func TestMultipleSubscribeUnsubscribe(t *testing.T) {
125// // First subscribe to the specific topic
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800126// //log.SetPackageLogLevel("github.com/opencord/voltha-lib-go/v3/pkg/kafka", log.DebugLevel)
khenaidoo43c82122018-11-22 18:38:28 -0500127// var err error
khenaidoo79232702018-12-04 11:00:41 -0500128// var ch1 <-chan *ic.InterContainerMessage
129// var ch2 <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500130// topic := kk.Topic{Name: "Core"}
131// ch1, err = coreKafkaProxy.Subscribe(topic)
132// assert.NotNil(t, ch1)
133// assert.Nil(t, err)
134// // Create a channel to receive responses
135// waitCh := make(chan string)
136// ch2, err = coreKafkaProxy.Subscribe(topic)
137// assert.NotNil(t, ch2)
138// assert.Nil(t, err)
139// // Wait for a message
140// go waitForRPCMessage(topic, ch2, waitCh)
141// go waitForRPCMessage(topic, ch1, waitCh)
142//
143// // Send the message - don't care of the response
144// rpc := "AnyRPCRequestForTest"
145// adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
146// // Wait for the result on ouw own channel
147//
148// responses := 0
149// for msg := range waitCh {
150// assert.Equal(t, msg, rpc)
151// responses = responses + 1
152// if responses > 1 {
153// break
154// }
155// }
156// assert.Equal(t, responses, 2)
157// close(waitCh)
158// err = coreKafkaProxy.UnSubscribe(topic, ch1)
159// assert.Nil(t, err)
160// err = coreKafkaProxy.UnSubscribe(topic, ch2)
161// assert.Nil(t, err)
162//}
khenaidooabad44c2018-08-03 16:58:35 -0400163
164func TestIncorrectAPI(t *testing.T) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800165 log.SetPackageLogLevel("github.com/opencord/voltha-lib-go/v3/pkg/kafka", log.ErrorLevel)
khenaidooabad44c2018-08-03 16:58:35 -0400166 trnsId := uuid.New().String()
167 protoMsg := &voltha.Device{Id: trnsId}
168 args := make([]*kk.KVArg, 1)
169 args[0] = &kk.KVArg{
170 Key: "device",
171 Value: protoMsg,
172 }
173 rpc := "IncorrectAPI"
174 topic := kk.Topic{Name: "Core"}
175 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400176 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400177 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000178 logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400179 assert.Equal(t, status, false)
180 //Unpack the result into the actual proto object
khenaidoo79232702018-12-04 11:00:41 -0500181 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400182 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000183 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400184 }
185 assert.NotNil(t, unpackResult)
186}
187
188func TestIncorrectAPIParams(t *testing.T) {
189 trnsId := uuid.New().String()
190 protoMsg := &voltha.Device{Id: trnsId}
191 args := make([]*kk.KVArg, 1)
192 args[0] = &kk.KVArg{
193 Key: "device",
194 Value: protoMsg,
195 }
196 rpc := "GetDevice"
197 topic := kk.Topic{Name: "Core"}
198 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400199 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400200 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000201 logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400202 assert.Equal(t, status, false)
203 //Unpack the result into the actual proto object
khenaidoo79232702018-12-04 11:00:41 -0500204 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400205 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000206 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400207 }
208 assert.NotNil(t, unpackResult)
209}
210
211func TestGetDevice(t *testing.T) {
212 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400213 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400214 args := make([]*kk.KVArg, 1)
215 args[0] = &kk.KVArg{
216 Key: "deviceID",
217 Value: protoMsg,
218 }
219 rpc := "GetDevice"
220 topic := kk.Topic{Name: "Core"}
221 expectedResponse := &voltha.Device{Id: trnsId}
222 timeout := time.Duration(50) * time.Millisecond
223 ctx, cancel := context.WithTimeout(context.Background(), timeout)
224 defer cancel()
225 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400226 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400227 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000228 logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400229 assert.Equal(t, status, true)
230 unpackResult := &voltha.Device{}
231 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000232 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400233 }
234 assert.Equal(t, unpackResult, expectedResponse)
235}
236
237func TestGetDeviceTimeout(t *testing.T) {
238 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400239 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400240 args := make([]*kk.KVArg, 1)
241 args[0] = &kk.KVArg{
242 Key: "deviceID",
243 Value: protoMsg,
244 }
245 rpc := "GetDevice"
246 topic := kk.Topic{Name: "Core"}
247 timeout := time.Duration(2) * time.Millisecond
248 ctx, cancel := context.WithTimeout(context.Background(), timeout)
249 defer cancel()
250 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400251 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400252 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000253 logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400254 assert.Equal(t, status, false)
khenaidoo79232702018-12-04 11:00:41 -0500255 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400256 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000257 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400258 }
259 assert.NotNil(t, unpackResult)
260}
261
262func TestGetChildDevice(t *testing.T) {
263 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400264 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400265 args := make([]*kk.KVArg, 1)
266 args[0] = &kk.KVArg{
267 Key: "deviceID",
268 Value: protoMsg,
269 }
270 rpc := "GetChildDevice"
271 topic := kk.Topic{Name: "Core"}
272 expectedResponse := &voltha.Device{Id: trnsId}
273 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400274 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400275 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000276 logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400277 assert.Equal(t, status, true)
278 unpackResult := &voltha.Device{}
279 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000280 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400281 }
282 assert.Equal(t, unpackResult, expectedResponse)
283}
284
285func TestGetChildDevices(t *testing.T) {
286 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400287 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400288 args := make([]*kk.KVArg, 1)
289 args[0] = &kk.KVArg{
290 Key: "deviceID",
291 Value: protoMsg,
292 }
293 rpc := "GetChildDevices"
294 topic := kk.Topic{Name: "Core"}
295 expectedResponse := &voltha.Device{Id: trnsId}
296 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400297 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400298 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000299 logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400300 assert.Equal(t, status, true)
301 unpackResult := &voltha.Device{}
302 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000303 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400304 }
305 assert.Equal(t, unpackResult, expectedResponse)
306}
307
308func TestGetPorts(t *testing.T) {
309 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400310 protoArg1 := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400311 args := make([]*kk.KVArg, 2)
312 args[0] = &kk.KVArg{
313 Key: "deviceID",
314 Value: protoArg1,
315 }
khenaidoo79232702018-12-04 11:00:41 -0500316 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400317 args[1] = &kk.KVArg{
318 Key: "portType",
319 Value: protoArg2,
320 }
321 rpc := "GetPorts"
322 topic := kk.Topic{Name: "Core"}
323 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400324 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400325 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000326 logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400327 assert.Equal(t, status, true)
328 unpackResult := &voltha.Ports{}
329 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000330 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400331 }
332 expectedLen := len(unpackResult.Items) >= 1
333 assert.Equal(t, true, expectedLen)
334}
335
336func TestGetPortsMissingArgs(t *testing.T) {
337 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400338 protoArg1 := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400339 args := make([]*kk.KVArg, 1)
340 args[0] = &kk.KVArg{
341 Key: "deviceID",
342 Value: protoArg1,
343 }
344 rpc := "GetPorts"
345 topic := kk.Topic{Name: "Core"}
346 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400347 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400348 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000349 logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400350 assert.Equal(t, status, false)
351 //Unpack the result into the actual proto object
khenaidoo79232702018-12-04 11:00:41 -0500352 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400353 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000354 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400355 }
356 assert.NotNil(t, unpackResult)
357}
358
359func TestChildDeviceDetected(t *testing.T) {
360 trnsId := uuid.New().String()
khenaidoo79232702018-12-04 11:00:41 -0500361 protoArg1 := &ic.StrType{Val: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400362 args := make([]*kk.KVArg, 5)
363 args[0] = &kk.KVArg{
364 Key: "deviceID",
365 Value: protoArg1,
366 }
khenaidoo79232702018-12-04 11:00:41 -0500367 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400368 args[1] = &kk.KVArg{
369 Key: "parentPortNo",
370 Value: protoArg2,
371 }
khenaidoo79232702018-12-04 11:00:41 -0500372 protoArg3 := &ic.StrType{Val: "great_onu"}
khenaidooabad44c2018-08-03 16:58:35 -0400373 args[2] = &kk.KVArg{
374 Key: "childDeviceType",
375 Value: protoArg3,
376 }
377 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
378 args[3] = &kk.KVArg{
379 Key: "proxyAddress",
380 Value: protoArg4,
381 }
khenaidoo79232702018-12-04 11:00:41 -0500382 protoArg5 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400383 args[4] = &kk.KVArg{
384 Key: "portType",
385 Value: protoArg5,
386 }
387
388 rpc := "ChildDeviceDetected"
389 topic := kk.Topic{Name: "Core"}
390 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400391 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400392 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000393 logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400394 assert.Equal(t, status, true)
395 assert.Nil(t, result)
396}
397
398func TestChildDeviceDetectedNoWait(t *testing.T) {
399 trnsId := uuid.New().String()
khenaidoo79232702018-12-04 11:00:41 -0500400 protoArg1 := &ic.StrType{Val: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400401 args := make([]*kk.KVArg, 5)
402 args[0] = &kk.KVArg{
403 Key: "deviceID",
404 Value: protoArg1,
405 }
khenaidoo79232702018-12-04 11:00:41 -0500406 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400407 args[1] = &kk.KVArg{
408 Key: "parentPortNo",
409 Value: protoArg2,
410 }
khenaidoo79232702018-12-04 11:00:41 -0500411 protoArg3 := &ic.StrType{Val: "great_onu"}
khenaidooabad44c2018-08-03 16:58:35 -0400412 args[2] = &kk.KVArg{
413 Key: "childDeviceType",
414 Value: protoArg3,
415 }
416 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
417 args[3] = &kk.KVArg{
418 Key: "proxyAddress",
419 Value: protoArg4,
420 }
khenaidoo79232702018-12-04 11:00:41 -0500421 protoArg5 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400422 args[4] = &kk.KVArg{
423 Key: "portType",
424 Value: protoArg5,
425 }
426
427 rpc := "ChildDeviceDetected"
428 topic := kk.Topic{Name: "Core"}
429 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400430 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, false, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400431 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000432 logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400433 assert.Equal(t, status, true)
434 assert.Nil(t, result)
435}
436
437func TestChildDeviceDetectedMissingArgs(t *testing.T) {
438 trnsId := uuid.New().String()
khenaidoo79232702018-12-04 11:00:41 -0500439 protoArg1 := &ic.StrType{Val: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400440 args := make([]*kk.KVArg, 4)
441 args[0] = &kk.KVArg{
442 Key: "deviceID",
443 Value: protoArg1,
444 }
khenaidoo79232702018-12-04 11:00:41 -0500445 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400446 args[1] = &kk.KVArg{
447 Key: "parentPortNo",
448 Value: protoArg2,
449 }
khenaidoo79232702018-12-04 11:00:41 -0500450 protoArg3 := &ic.StrType{Val: "great_onu"}
khenaidooabad44c2018-08-03 16:58:35 -0400451 args[2] = &kk.KVArg{
452 Key: "childDeviceType",
453 Value: protoArg3,
454 }
khenaidooabad44c2018-08-03 16:58:35 -0400455
456 rpc := "ChildDeviceDetected"
457 topic := kk.Topic{Name: "Core"}
458 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400459 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400460 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000461 logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400462 assert.Equal(t, status, false)
khenaidoo79232702018-12-04 11:00:41 -0500463 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400464 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000465 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400466 }
467 assert.NotNil(t, unpackResult)
468}
469
khenaidoo92e62c52018-10-03 14:02:54 -0400470func TestDeviceStateChange(t *testing.T) {
471 log.SetAllLogLevel(log.DebugLevel)
472 trnsId := uuid.New().String()
473 protoArg1 := &voltha.ID{Id: trnsId}
474 args := make([]*kk.KVArg, 4)
475 args[0] = &kk.KVArg{
476 Key: "device_id",
477 Value: protoArg1,
478 }
khenaidoo79232702018-12-04 11:00:41 -0500479 protoArg2 := &ic.IntType{Val: 1}
khenaidoo92e62c52018-10-03 14:02:54 -0400480 args[1] = &kk.KVArg{
481 Key: "oper_status",
482 Value: protoArg2,
483 }
khenaidoo79232702018-12-04 11:00:41 -0500484 protoArg3 := &ic.IntType{Val: 1}
khenaidoo92e62c52018-10-03 14:02:54 -0400485 args[2] = &kk.KVArg{
486 Key: "connect_status",
487 Value: protoArg3,
488 }
489
490 rpc := "DeviceStateUpdate"
491 topic := kk.Topic{Name: "Core"}
492 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400493 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidoo92e62c52018-10-03 14:02:54 -0400494 elapsed := time.Since(start)
Girish Kumarf56a4682020-03-20 20:07:46 +0000495 logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidoo92e62c52018-10-03 14:02:54 -0400496 assert.Equal(t, status, true)
497 assert.Nil(t, result)
498}
499
khenaidoo79232702018-12-04 11:00:41 -0500500func subscribeToTopic(topic *kk.Topic, waitingChannel chan *ic.InterContainerMessage) error {
501 var ch <-chan *ic.InterContainerMessage
502 var err error
503 if ch, err = kafkaPartitionClient.Subscribe(topic); err != nil {
504 return nil
505 }
506 msg := <-ch
507
Girish Kumarf56a4682020-03-20 20:07:46 +0000508 logger.Debugw("msg-received", log.Fields{"msg": msg})
khenaidoo79232702018-12-04 11:00:41 -0500509 waitingChannel <- msg
510 return nil
511}
512
513func TestDeviceDiscovery(t *testing.T) {
514 // Create an intercontainer proxy - similar to the Core
npujar467fe752020-01-16 20:17:45 +0530515 testProxy := kk.NewInterContainerProxy(
khenaidoo79232702018-12-04 11:00:41 -0500516 kk.InterContainerHost(hostIP),
517 kk.InterContainerPort(9092),
518 kk.DefaultTopic(&kk.Topic{Name: "Test"}),
519 kk.MsgClient(kafkaClient),
520 kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
521
522 // First start to wait for the message
523 waitingChannel := make(chan *ic.InterContainerMessage)
524 go subscribeToTopic(&kk.Topic{Name: affinityRouterTopic}, waitingChannel)
525
526 // Sleep to make sure the consumer is ready
527 time.Sleep(time.Millisecond * 100)
528
529 // Send the message
khenaidoo19374072018-12-11 11:05:15 -0500530 go testProxy.DeviceDiscovered("TestDeviceId", "TestDevicetype", "TestParentId", "myPODName")
khenaidoo79232702018-12-04 11:00:41 -0500531
532 msg := <-waitingChannel
533 totalTime := (time.Now().UnixNano() - msg.Header.Timestamp) / int64(time.Millisecond)
534 assert.Equal(t, msg.Header.Type, ic.MessageType_DEVICE_DISCOVERED)
535 // Unpack message
536 dd := &ic.DeviceDiscovered{}
537 err := ptypes.UnmarshalAny(msg.Body, dd)
538 assert.Nil(t, err)
539 assert.Equal(t, dd.Id, "TestDeviceId")
540 assert.Equal(t, dd.DeviceType, "TestDevicetype")
541 assert.Equal(t, dd.ParentId, "TestParentId")
khenaidoo19374072018-12-11 11:05:15 -0500542 assert.Equal(t, dd.Publisher, "myPODName")
Girish Kumarf56a4682020-03-20 20:07:46 +0000543 logger.Debugw("TotalTime", log.Fields{"time": totalTime})
khenaidoo79232702018-12-04 11:00:41 -0500544}
545
khenaidooabad44c2018-08-03 16:58:35 -0400546func TestStopKafkaProxy(t *testing.T) {
547 adapterKafkaProxy.Stop()
548 coreKafkaProxy.Stop()
549}
550
551//func TestMain(m *testing.T) {
Girish Kumarf56a4682020-03-20 20:07:46 +0000552// logger.Info("Main")
khenaidooabad44c2018-08-03 16:58:35 -0400553//}