blob: d0bd26e87b1bd7a345dc8a19006e48bb855fc55e [file] [log] [blame]
Kent Hagerman0ab4cb22019-04-24 13:13:35 -04001// +build integration
2
khenaidoobf6e7bb2018-08-14 22:27:29 -04003/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
khenaidooabad44c2018-08-03 16:58:35 -040018package kafka
19
20import (
21 "context"
David Bainbridge9ae13132020-06-22 17:28:01 -070022 "os"
23 "testing"
24 "time"
25
khenaidooabad44c2018-08-03 16:58:35 -040026 "github.com/golang/protobuf/ptypes"
khenaidood4d922e2018-08-03 22:35:16 -040027 "github.com/google/uuid"
Kent Hagerman2b216042020-04-03 18:28:56 -040028 "github.com/opencord/voltha-go/rw_core/core/api"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080029 kk "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
30 "github.com/opencord/voltha-lib-go/v3/pkg/log"
31 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
32 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidooabad44c2018-08-03 16:58:35 -040033 "github.com/stretchr/testify/assert"
khenaidooabad44c2018-08-03 16:58:35 -040034)
35
khenaidoo2c6f1672018-09-20 23:14:41 -040036/*
37Prerequite: Start the kafka/zookeeper containers.
38*/
39
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040040const (
41 TEST_RPC_KEY = ""
42)
43
khenaidoo43c82122018-11-22 18:38:28 -050044var coreKafkaProxy *kk.InterContainerProxy
45var adapterKafkaProxy *kk.InterContainerProxy
khenaidoo79232702018-12-04 11:00:41 -050046var kafkaPartitionClient kk.Client
David Bainbridge9ae13132020-06-22 17:28:01 -070047var deviceDiscoveryTopic string
khenaidoo79232702018-12-04 11:00:41 -050048var hostIP string
49var kafkaClient kk.Client
khenaidooabad44c2018-08-03 16:58:35 -040050
51func init() {
David Bainbridge9ae13132020-06-22 17:28:01 -070052 deviceDiscoveryTopic = "deviceDiscovery"
khenaidoo79232702018-12-04 11:00:41 -050053 hostIP = os.Getenv("DOCKER_HOST_IP")
54 kafkaClient = kk.NewSaramaClient(
55 kk.Host(hostIP),
khenaidoo43c82122018-11-22 18:38:28 -050056 kk.Port(9092))
khenaidoo2c6f1672018-09-20 23:14:41 -040057
npujar467fe752020-01-16 20:17:45 +053058 coreKafkaProxy = kk.NewInterContainerProxy(
khenaidoo79232702018-12-04 11:00:41 -050059 kk.InterContainerHost(hostIP),
khenaidoo43c82122018-11-22 18:38:28 -050060 kk.InterContainerPort(9092),
61 kk.DefaultTopic(&kk.Topic{Name: "Core"}),
khenaidoo79232702018-12-04 11:00:41 -050062 kk.MsgClient(kafkaClient),
David Bainbridge9ae13132020-06-22 17:28:01 -070063 kk.DeviceDiscoveryTopic(&kk.Topic{Name: deviceDiscoveryTopic}))
khenaidooabad44c2018-08-03 16:58:35 -040064
npujar467fe752020-01-16 20:17:45 +053065 adapterKafkaProxy = kk.NewInterContainerProxy(
khenaidoo79232702018-12-04 11:00:41 -050066 kk.InterContainerHost(hostIP),
khenaidoo43c82122018-11-22 18:38:28 -050067 kk.InterContainerPort(9092),
68 kk.DefaultTopic(&kk.Topic{Name: "Adapter"}),
69 kk.MsgClient(kafkaClient))
khenaidooabad44c2018-08-03 16:58:35 -040070
khenaidoo79232702018-12-04 11:00:41 -050071 kafkaPartitionClient = kk.NewSaramaClient(
72 kk.ConsumerType(kk.PartitionConsumer),
73 kk.Host(hostIP),
74 kk.Port(9092),
75 kk.AutoCreateTopic(true),
76 kk.ProducerFlushFrequency(5))
77 kafkaPartitionClient.Start()
78
khenaidooabad44c2018-08-03 16:58:35 -040079 coreKafkaProxy.Start()
80 adapterKafkaProxy.Start()
Rohan Agrawal31f21802020-06-12 05:38:46 +000081 subscribeTarget(context.Background(), coreKafkaProxy)
khenaidooabad44c2018-08-03 16:58:35 -040082}
83
Rohan Agrawal31f21802020-06-12 05:38:46 +000084func subscribeTarget(ctx context.Context, kmp *kk.InterContainerProxy) {
khenaidooabad44c2018-08-03 16:58:35 -040085 topic := kk.Topic{Name: "Core"}
Kent Hagerman2b216042020-04-03 18:28:56 -040086 requestProxy := &api.AdapterRequestHandlerProxy{TestMode: true}
Rohan Agrawal31f21802020-06-12 05:38:46 +000087 kmp.SubscribeWithRequestHandlerInterface(ctx, topic, requestProxy)
khenaidooabad44c2018-08-03 16:58:35 -040088}
89
Rohan Agrawal31f21802020-06-12 05:38:46 +000090func waitForRPCMessage(ctx context.Context, topic kk.Topic, ch <-chan *ic.InterContainerMessage, doneCh chan string) {
khenaidooabad44c2018-08-03 16:58:35 -040091 for msg := range ch {
Rohan Agrawal31f21802020-06-12 05:38:46 +000092 logger.Debugw(ctx, "Got-RPC-message", log.Fields{"msg": msg})
khenaidooabad44c2018-08-03 16:58:35 -040093 // Unpack message
khenaidoo79232702018-12-04 11:00:41 -050094 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -040095 if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
96 doneCh <- "Error"
97 } else {
98 doneCh <- requestBody.Rpc
99 }
100 break
101 }
102}
103
khenaidoo43c82122018-11-22 18:38:28 -0500104//func TestSubscribeUnsubscribe(t *testing.T) {
105// // First subscribe to the specific topic
106// topic := kk.Topic{Name: "Core"}
107// ch, err := coreKafkaProxy.Subs(topic)
108// assert.NotNil(t, ch)
109// assert.Nil(t, err)
110// // Create a channel to receive a response
111// waitCh := make(chan string)
112// // Wait for a message
113// go waitForRPCMessage(topic, ch, waitCh)
114// // Send the message - don't care of the response
115// rpc := "AnyRPCRequestForTest"
116// adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
117// // Wait for the result on ouw own channel
118// result := <-waitCh
119// assert.Equal(t, result, rpc)
120// close(waitCh)
121// err = coreKafkaProxy.UnSubscribe(topic, ch)
122// assert.Nil(t, err)
123//}
124//
125//func TestMultipleSubscribeUnsubscribe(t *testing.T) {
126// // First subscribe to the specific topic
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800127// //log.SetPackageLogLevel("github.com/opencord/voltha-lib-go/v3/pkg/kafka", log.DebugLevel)
khenaidoo43c82122018-11-22 18:38:28 -0500128// var err error
khenaidoo79232702018-12-04 11:00:41 -0500129// var ch1 <-chan *ic.InterContainerMessage
130// var ch2 <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500131// topic := kk.Topic{Name: "Core"}
132// ch1, err = coreKafkaProxy.Subscribe(topic)
133// assert.NotNil(t, ch1)
134// assert.Nil(t, err)
135// // Create a channel to receive responses
136// waitCh := make(chan string)
137// ch2, err = coreKafkaProxy.Subscribe(topic)
138// assert.NotNil(t, ch2)
139// assert.Nil(t, err)
140// // Wait for a message
141// go waitForRPCMessage(topic, ch2, waitCh)
142// go waitForRPCMessage(topic, ch1, waitCh)
143//
144// // Send the message - don't care of the response
145// rpc := "AnyRPCRequestForTest"
146// adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
147// // Wait for the result on ouw own channel
148//
149// responses := 0
150// for msg := range waitCh {
151// assert.Equal(t, msg, rpc)
152// responses = responses + 1
153// if responses > 1 {
154// break
155// }
156// }
157// assert.Equal(t, responses, 2)
158// close(waitCh)
159// err = coreKafkaProxy.UnSubscribe(topic, ch1)
160// assert.Nil(t, err)
161// err = coreKafkaProxy.UnSubscribe(topic, ch2)
162// assert.Nil(t, err)
163//}
khenaidooabad44c2018-08-03 16:58:35 -0400164
165func TestIncorrectAPI(t *testing.T) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800166 log.SetPackageLogLevel("github.com/opencord/voltha-lib-go/v3/pkg/kafka", log.ErrorLevel)
khenaidooabad44c2018-08-03 16:58:35 -0400167 trnsId := uuid.New().String()
168 protoMsg := &voltha.Device{Id: trnsId}
169 args := make([]*kk.KVArg, 1)
170 args[0] = &kk.KVArg{
171 Key: "device",
172 Value: protoMsg,
173 }
174 rpc := "IncorrectAPI"
175 topic := kk.Topic{Name: "Core"}
176 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400177 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400178 elapsed := time.Since(start)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000179 logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400180 assert.Equal(t, status, false)
181 //Unpack the result into the actual proto object
khenaidoo79232702018-12-04 11:00:41 -0500182 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400183 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000184 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400185 }
186 assert.NotNil(t, unpackResult)
187}
188
189func TestIncorrectAPIParams(t *testing.T) {
190 trnsId := uuid.New().String()
191 protoMsg := &voltha.Device{Id: trnsId}
192 args := make([]*kk.KVArg, 1)
193 args[0] = &kk.KVArg{
194 Key: "device",
195 Value: protoMsg,
196 }
197 rpc := "GetDevice"
198 topic := kk.Topic{Name: "Core"}
199 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400200 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400201 elapsed := time.Since(start)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000202 logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400203 assert.Equal(t, status, false)
204 //Unpack the result into the actual proto object
khenaidoo79232702018-12-04 11:00:41 -0500205 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400206 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000207 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400208 }
209 assert.NotNil(t, unpackResult)
210}
211
212func TestGetDevice(t *testing.T) {
213 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400214 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400215 args := make([]*kk.KVArg, 1)
216 args[0] = &kk.KVArg{
217 Key: "deviceID",
218 Value: protoMsg,
219 }
220 rpc := "GetDevice"
221 topic := kk.Topic{Name: "Core"}
222 expectedResponse := &voltha.Device{Id: trnsId}
223 timeout := time.Duration(50) * time.Millisecond
224 ctx, cancel := context.WithTimeout(context.Background(), timeout)
225 defer cancel()
226 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400227 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400228 elapsed := time.Since(start)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000229 logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400230 assert.Equal(t, status, true)
231 unpackResult := &voltha.Device{}
232 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000233 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400234 }
235 assert.Equal(t, unpackResult, expectedResponse)
236}
237
238func TestGetDeviceTimeout(t *testing.T) {
239 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400240 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400241 args := make([]*kk.KVArg, 1)
242 args[0] = &kk.KVArg{
243 Key: "deviceID",
244 Value: protoMsg,
245 }
246 rpc := "GetDevice"
247 topic := kk.Topic{Name: "Core"}
248 timeout := time.Duration(2) * time.Millisecond
249 ctx, cancel := context.WithTimeout(context.Background(), timeout)
250 defer cancel()
251 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400252 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400253 elapsed := time.Since(start)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000254 logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400255 assert.Equal(t, status, false)
khenaidoo79232702018-12-04 11:00:41 -0500256 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400257 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000258 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400259 }
260 assert.NotNil(t, unpackResult)
261}
262
263func TestGetChildDevice(t *testing.T) {
264 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400265 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400266 args := make([]*kk.KVArg, 1)
267 args[0] = &kk.KVArg{
268 Key: "deviceID",
269 Value: protoMsg,
270 }
271 rpc := "GetChildDevice"
272 topic := kk.Topic{Name: "Core"}
273 expectedResponse := &voltha.Device{Id: trnsId}
274 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400275 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400276 elapsed := time.Since(start)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000277 logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400278 assert.Equal(t, status, true)
279 unpackResult := &voltha.Device{}
280 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000281 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400282 }
283 assert.Equal(t, unpackResult, expectedResponse)
284}
285
286func TestGetChildDevices(t *testing.T) {
287 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400288 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400289 args := make([]*kk.KVArg, 1)
290 args[0] = &kk.KVArg{
291 Key: "deviceID",
292 Value: protoMsg,
293 }
294 rpc := "GetChildDevices"
295 topic := kk.Topic{Name: "Core"}
296 expectedResponse := &voltha.Device{Id: trnsId}
297 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400298 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400299 elapsed := time.Since(start)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000300 logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400301 assert.Equal(t, status, true)
302 unpackResult := &voltha.Device{}
303 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000304 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400305 }
306 assert.Equal(t, unpackResult, expectedResponse)
307}
308
309func TestGetPorts(t *testing.T) {
310 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400311 protoArg1 := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400312 args := make([]*kk.KVArg, 2)
313 args[0] = &kk.KVArg{
314 Key: "deviceID",
315 Value: protoArg1,
316 }
khenaidoo79232702018-12-04 11:00:41 -0500317 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400318 args[1] = &kk.KVArg{
319 Key: "portType",
320 Value: protoArg2,
321 }
322 rpc := "GetPorts"
323 topic := kk.Topic{Name: "Core"}
324 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400325 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400326 elapsed := time.Since(start)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000327 logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400328 assert.Equal(t, status, true)
329 unpackResult := &voltha.Ports{}
330 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000331 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400332 }
333 expectedLen := len(unpackResult.Items) >= 1
334 assert.Equal(t, true, expectedLen)
335}
336
337func TestGetPortsMissingArgs(t *testing.T) {
338 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400339 protoArg1 := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400340 args := make([]*kk.KVArg, 1)
341 args[0] = &kk.KVArg{
342 Key: "deviceID",
343 Value: protoArg1,
344 }
345 rpc := "GetPorts"
346 topic := kk.Topic{Name: "Core"}
347 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400348 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400349 elapsed := time.Since(start)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000350 logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400351 assert.Equal(t, status, false)
352 //Unpack the result into the actual proto object
khenaidoo79232702018-12-04 11:00:41 -0500353 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400354 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000355 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400356 }
357 assert.NotNil(t, unpackResult)
358}
359
360func TestChildDeviceDetected(t *testing.T) {
361 trnsId := uuid.New().String()
khenaidoo79232702018-12-04 11:00:41 -0500362 protoArg1 := &ic.StrType{Val: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400363 args := make([]*kk.KVArg, 5)
364 args[0] = &kk.KVArg{
365 Key: "deviceID",
366 Value: protoArg1,
367 }
khenaidoo79232702018-12-04 11:00:41 -0500368 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400369 args[1] = &kk.KVArg{
370 Key: "parentPortNo",
371 Value: protoArg2,
372 }
khenaidoo79232702018-12-04 11:00:41 -0500373 protoArg3 := &ic.StrType{Val: "great_onu"}
khenaidooabad44c2018-08-03 16:58:35 -0400374 args[2] = &kk.KVArg{
375 Key: "childDeviceType",
376 Value: protoArg3,
377 }
378 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
379 args[3] = &kk.KVArg{
380 Key: "proxyAddress",
381 Value: protoArg4,
382 }
khenaidoo79232702018-12-04 11:00:41 -0500383 protoArg5 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400384 args[4] = &kk.KVArg{
385 Key: "portType",
386 Value: protoArg5,
387 }
388
389 rpc := "ChildDeviceDetected"
390 topic := kk.Topic{Name: "Core"}
391 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400392 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400393 elapsed := time.Since(start)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000394 logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400395 assert.Equal(t, status, true)
396 assert.Nil(t, result)
397}
398
399func TestChildDeviceDetectedNoWait(t *testing.T) {
400 trnsId := uuid.New().String()
khenaidoo79232702018-12-04 11:00:41 -0500401 protoArg1 := &ic.StrType{Val: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400402 args := make([]*kk.KVArg, 5)
403 args[0] = &kk.KVArg{
404 Key: "deviceID",
405 Value: protoArg1,
406 }
khenaidoo79232702018-12-04 11:00:41 -0500407 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400408 args[1] = &kk.KVArg{
409 Key: "parentPortNo",
410 Value: protoArg2,
411 }
khenaidoo79232702018-12-04 11:00:41 -0500412 protoArg3 := &ic.StrType{Val: "great_onu"}
khenaidooabad44c2018-08-03 16:58:35 -0400413 args[2] = &kk.KVArg{
414 Key: "childDeviceType",
415 Value: protoArg3,
416 }
417 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
418 args[3] = &kk.KVArg{
419 Key: "proxyAddress",
420 Value: protoArg4,
421 }
khenaidoo79232702018-12-04 11:00:41 -0500422 protoArg5 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400423 args[4] = &kk.KVArg{
424 Key: "portType",
425 Value: protoArg5,
426 }
427
428 rpc := "ChildDeviceDetected"
429 topic := kk.Topic{Name: "Core"}
430 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400431 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, false, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400432 elapsed := time.Since(start)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000433 logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400434 assert.Equal(t, status, true)
435 assert.Nil(t, result)
436}
437
438func TestChildDeviceDetectedMissingArgs(t *testing.T) {
439 trnsId := uuid.New().String()
khenaidoo79232702018-12-04 11:00:41 -0500440 protoArg1 := &ic.StrType{Val: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400441 args := make([]*kk.KVArg, 4)
442 args[0] = &kk.KVArg{
443 Key: "deviceID",
444 Value: protoArg1,
445 }
khenaidoo79232702018-12-04 11:00:41 -0500446 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400447 args[1] = &kk.KVArg{
448 Key: "parentPortNo",
449 Value: protoArg2,
450 }
khenaidoo79232702018-12-04 11:00:41 -0500451 protoArg3 := &ic.StrType{Val: "great_onu"}
khenaidooabad44c2018-08-03 16:58:35 -0400452 args[2] = &kk.KVArg{
453 Key: "childDeviceType",
454 Value: protoArg3,
455 }
khenaidooabad44c2018-08-03 16:58:35 -0400456
457 rpc := "ChildDeviceDetected"
458 topic := kk.Topic{Name: "Core"}
459 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400460 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400461 elapsed := time.Since(start)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000462 logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidooabad44c2018-08-03 16:58:35 -0400463 assert.Equal(t, status, false)
khenaidoo79232702018-12-04 11:00:41 -0500464 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400465 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000466 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
khenaidooabad44c2018-08-03 16:58:35 -0400467 }
468 assert.NotNil(t, unpackResult)
469}
470
khenaidoo92e62c52018-10-03 14:02:54 -0400471func TestDeviceStateChange(t *testing.T) {
472 log.SetAllLogLevel(log.DebugLevel)
473 trnsId := uuid.New().String()
474 protoArg1 := &voltha.ID{Id: trnsId}
475 args := make([]*kk.KVArg, 4)
476 args[0] = &kk.KVArg{
477 Key: "device_id",
478 Value: protoArg1,
479 }
khenaidoo79232702018-12-04 11:00:41 -0500480 protoArg2 := &ic.IntType{Val: 1}
khenaidoo92e62c52018-10-03 14:02:54 -0400481 args[1] = &kk.KVArg{
482 Key: "oper_status",
483 Value: protoArg2,
484 }
khenaidoo79232702018-12-04 11:00:41 -0500485 protoArg3 := &ic.IntType{Val: 1}
khenaidoo92e62c52018-10-03 14:02:54 -0400486 args[2] = &kk.KVArg{
487 Key: "connect_status",
488 Value: protoArg3,
489 }
490
491 rpc := "DeviceStateUpdate"
492 topic := kk.Topic{Name: "Core"}
493 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400494 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidoo92e62c52018-10-03 14:02:54 -0400495 elapsed := time.Since(start)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000496 logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
khenaidoo92e62c52018-10-03 14:02:54 -0400497 assert.Equal(t, status, true)
498 assert.Nil(t, result)
499}
500
khenaidoo79232702018-12-04 11:00:41 -0500501func subscribeToTopic(topic *kk.Topic, waitingChannel chan *ic.InterContainerMessage) error {
502 var ch <-chan *ic.InterContainerMessage
503 var err error
504 if ch, err = kafkaPartitionClient.Subscribe(topic); err != nil {
505 return nil
506 }
507 msg := <-ch
508
Rohan Agrawal31f21802020-06-12 05:38:46 +0000509 logger.Debugw(ctx, "msg-received", log.Fields{"msg": msg})
khenaidoo79232702018-12-04 11:00:41 -0500510 waitingChannel <- msg
511 return nil
512}
513
514func TestDeviceDiscovery(t *testing.T) {
515 // Create an intercontainer proxy - similar to the Core
npujar467fe752020-01-16 20:17:45 +0530516 testProxy := kk.NewInterContainerProxy(
khenaidoo79232702018-12-04 11:00:41 -0500517 kk.InterContainerHost(hostIP),
518 kk.InterContainerPort(9092),
519 kk.DefaultTopic(&kk.Topic{Name: "Test"}),
520 kk.MsgClient(kafkaClient),
David Bainbridge9ae13132020-06-22 17:28:01 -0700521 kk.DeviceDiscoveryTopic(&kk.Topic{Name: deviceDiscoveryTopic}))
khenaidoo79232702018-12-04 11:00:41 -0500522
523 // First start to wait for the message
524 waitingChannel := make(chan *ic.InterContainerMessage)
David Bainbridge9ae13132020-06-22 17:28:01 -0700525 go subscribeToTopic(&kk.Topic{Name: deviceDiscoveryTopic}, waitingChannel)
khenaidoo79232702018-12-04 11:00:41 -0500526
527 // Sleep to make sure the consumer is ready
528 time.Sleep(time.Millisecond * 100)
529
530 // Send the message
khenaidoo19374072018-12-11 11:05:15 -0500531 go testProxy.DeviceDiscovered("TestDeviceId", "TestDevicetype", "TestParentId", "myPODName")
khenaidoo79232702018-12-04 11:00:41 -0500532
533 msg := <-waitingChannel
534 totalTime := (time.Now().UnixNano() - msg.Header.Timestamp) / int64(time.Millisecond)
535 assert.Equal(t, msg.Header.Type, ic.MessageType_DEVICE_DISCOVERED)
536 // Unpack message
537 dd := &ic.DeviceDiscovered{}
538 err := ptypes.UnmarshalAny(msg.Body, dd)
539 assert.Nil(t, err)
540 assert.Equal(t, dd.Id, "TestDeviceId")
541 assert.Equal(t, dd.DeviceType, "TestDevicetype")
542 assert.Equal(t, dd.ParentId, "TestParentId")
khenaidoo19374072018-12-11 11:05:15 -0500543 assert.Equal(t, dd.Publisher, "myPODName")
Rohan Agrawal31f21802020-06-12 05:38:46 +0000544 logger.Debugw(ctx, "TotalTime", log.Fields{"time": totalTime})
khenaidoo79232702018-12-04 11:00:41 -0500545}
546
khenaidooabad44c2018-08-03 16:58:35 -0400547func TestStopKafkaProxy(t *testing.T) {
548 adapterKafkaProxy.Stop()
549 coreKafkaProxy.Stop()
550}
551
552//func TestMain(m *testing.T) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000553// logger.Info(ctx, "Main")
khenaidooabad44c2018-08-03 16:58:35 -0400554//}