blob: d6a8e8bd97136e763639e4008ca815fef1e02c88 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
khenaidood4d922e2018-08-03 22:35:16 -040021 "github.com/google/uuid"
khenaidooabad44c2018-08-03 16:58:35 -040022 "github.com/opencord/voltha-go/common/log"
23 kk "github.com/opencord/voltha-go/kafka"
William Kurkiandaa6bb22019-03-07 12:26:28 -050024 ic "github.com/opencord/voltha-protos/go/inter_container"
25 "github.com/opencord/voltha-protos/go/voltha"
khenaidooabad44c2018-08-03 16:58:35 -040026 rhp "github.com/opencord/voltha-go/rw_core/core"
27 "github.com/stretchr/testify/assert"
khenaidoo79232702018-12-04 11:00:41 -050028 "os"
khenaidooabad44c2018-08-03 16:58:35 -040029 "testing"
khenaidooabad44c2018-08-03 16:58:35 -040030 "time"
31)
32
khenaidoo2c6f1672018-09-20 23:14:41 -040033/*
34Prerequite: Start the kafka/zookeeper containers.
35*/
36
khenaidoo43c82122018-11-22 18:38:28 -050037var coreKafkaProxy *kk.InterContainerProxy
38var adapterKafkaProxy *kk.InterContainerProxy
khenaidoo79232702018-12-04 11:00:41 -050039var kafkaPartitionClient kk.Client
40var affinityRouterTopic string
41var hostIP string
42var kafkaClient kk.Client
khenaidooabad44c2018-08-03 16:58:35 -040043
44func init() {
khenaidoo2c6f1672018-09-20 23:14:41 -040045 log.AddPackage(log.JSON, log.ErrorLevel, nil)
46 log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
47 log.SetAllLogLevel(log.ErrorLevel)
khenaidoo79232702018-12-04 11:00:41 -050048 affinityRouterTopic = "AffinityRouter"
49 hostIP = os.Getenv("DOCKER_HOST_IP")
50 kafkaClient = kk.NewSaramaClient(
51 kk.Host(hostIP),
khenaidoo43c82122018-11-22 18:38:28 -050052 kk.Port(9092))
khenaidoo2c6f1672018-09-20 23:14:41 -040053
khenaidoo43c82122018-11-22 18:38:28 -050054 coreKafkaProxy, _ = kk.NewInterContainerProxy(
khenaidoo79232702018-12-04 11:00:41 -050055 kk.InterContainerHost(hostIP),
khenaidoo43c82122018-11-22 18:38:28 -050056 kk.InterContainerPort(9092),
57 kk.DefaultTopic(&kk.Topic{Name: "Core"}),
khenaidoo79232702018-12-04 11:00:41 -050058 kk.MsgClient(kafkaClient),
59 kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
khenaidooabad44c2018-08-03 16:58:35 -040060
khenaidoo43c82122018-11-22 18:38:28 -050061 adapterKafkaProxy, _ = kk.NewInterContainerProxy(
khenaidoo79232702018-12-04 11:00:41 -050062 kk.InterContainerHost(hostIP),
khenaidoo43c82122018-11-22 18:38:28 -050063 kk.InterContainerPort(9092),
64 kk.DefaultTopic(&kk.Topic{Name: "Adapter"}),
65 kk.MsgClient(kafkaClient))
khenaidooabad44c2018-08-03 16:58:35 -040066
khenaidoo79232702018-12-04 11:00:41 -050067 kafkaPartitionClient = kk.NewSaramaClient(
68 kk.ConsumerType(kk.PartitionConsumer),
69 kk.Host(hostIP),
70 kk.Port(9092),
71 kk.AutoCreateTopic(true),
72 kk.ProducerFlushFrequency(5))
73 kafkaPartitionClient.Start()
74
khenaidooabad44c2018-08-03 16:58:35 -040075 coreKafkaProxy.Start()
76 adapterKafkaProxy.Start()
77 subscribeTarget(coreKafkaProxy)
78}
79
khenaidoo43c82122018-11-22 18:38:28 -050080func subscribeTarget(kmp *kk.InterContainerProxy) {
khenaidooabad44c2018-08-03 16:58:35 -040081 topic := kk.Topic{Name: "Core"}
khenaidoo2c6f1672018-09-20 23:14:41 -040082 requestProxy := &rhp.AdapterRequestHandlerProxy{TestMode: true}
khenaidoo43c82122018-11-22 18:38:28 -050083 kmp.SubscribeWithRequestHandlerInterface(topic, requestProxy)
khenaidooabad44c2018-08-03 16:58:35 -040084}
85
khenaidoo79232702018-12-04 11:00:41 -050086func waitForRPCMessage(topic kk.Topic, ch <-chan *ic.InterContainerMessage, doneCh chan string) {
khenaidooabad44c2018-08-03 16:58:35 -040087 for msg := range ch {
88 log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
89 // Unpack message
khenaidoo79232702018-12-04 11:00:41 -050090 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -040091 if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
92 doneCh <- "Error"
93 } else {
94 doneCh <- requestBody.Rpc
95 }
96 break
97 }
98}
99
khenaidoo43c82122018-11-22 18:38:28 -0500100//func TestSubscribeUnsubscribe(t *testing.T) {
101// // First subscribe to the specific topic
102// topic := kk.Topic{Name: "Core"}
103// ch, err := coreKafkaProxy.Subs(topic)
104// assert.NotNil(t, ch)
105// assert.Nil(t, err)
106// // Create a channel to receive a response
107// waitCh := make(chan string)
108// // Wait for a message
109// go waitForRPCMessage(topic, ch, waitCh)
110// // Send the message - don't care of the response
111// rpc := "AnyRPCRequestForTest"
112// adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
113// // Wait for the result on ouw own channel
114// result := <-waitCh
115// assert.Equal(t, result, rpc)
116// close(waitCh)
117// err = coreKafkaProxy.UnSubscribe(topic, ch)
118// assert.Nil(t, err)
119//}
120//
121//func TestMultipleSubscribeUnsubscribe(t *testing.T) {
122// // First subscribe to the specific topic
123// //log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
124// var err error
khenaidoo79232702018-12-04 11:00:41 -0500125// var ch1 <-chan *ic.InterContainerMessage
126// var ch2 <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500127// topic := kk.Topic{Name: "Core"}
128// ch1, err = coreKafkaProxy.Subscribe(topic)
129// assert.NotNil(t, ch1)
130// assert.Nil(t, err)
131// // Create a channel to receive responses
132// waitCh := make(chan string)
133// ch2, err = coreKafkaProxy.Subscribe(topic)
134// assert.NotNil(t, ch2)
135// assert.Nil(t, err)
136// // Wait for a message
137// go waitForRPCMessage(topic, ch2, waitCh)
138// go waitForRPCMessage(topic, ch1, waitCh)
139//
140// // Send the message - don't care of the response
141// rpc := "AnyRPCRequestForTest"
142// adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
143// // Wait for the result on ouw own channel
144//
145// responses := 0
146// for msg := range waitCh {
147// assert.Equal(t, msg, rpc)
148// responses = responses + 1
149// if responses > 1 {
150// break
151// }
152// }
153// assert.Equal(t, responses, 2)
154// close(waitCh)
155// err = coreKafkaProxy.UnSubscribe(topic, ch1)
156// assert.Nil(t, err)
157// err = coreKafkaProxy.UnSubscribe(topic, ch2)
158// assert.Nil(t, err)
159//}
khenaidooabad44c2018-08-03 16:58:35 -0400160
161func TestIncorrectAPI(t *testing.T) {
khenaidoo2c6f1672018-09-20 23:14:41 -0400162 log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.ErrorLevel)
khenaidooabad44c2018-08-03 16:58:35 -0400163 trnsId := uuid.New().String()
164 protoMsg := &voltha.Device{Id: trnsId}
165 args := make([]*kk.KVArg, 1)
166 args[0] = &kk.KVArg{
167 Key: "device",
168 Value: protoMsg,
169 }
170 rpc := "IncorrectAPI"
171 topic := kk.Topic{Name: "Core"}
172 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500173 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400174 elapsed := time.Since(start)
175 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
176 assert.Equal(t, status, false)
177 //Unpack the result into the actual proto object
khenaidoo79232702018-12-04 11:00:41 -0500178 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400179 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
180 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
181 }
182 assert.NotNil(t, unpackResult)
183}
184
185func TestIncorrectAPIParams(t *testing.T) {
186 trnsId := uuid.New().String()
187 protoMsg := &voltha.Device{Id: trnsId}
188 args := make([]*kk.KVArg, 1)
189 args[0] = &kk.KVArg{
190 Key: "device",
191 Value: protoMsg,
192 }
193 rpc := "GetDevice"
194 topic := kk.Topic{Name: "Core"}
195 start := time.Now()
khenaidoo79232702018-12-04 11:00:41 -0500196 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400197 elapsed := time.Since(start)
198 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
199 assert.Equal(t, status, false)
200 //Unpack the result into the actual proto object
khenaidoo79232702018-12-04 11:00:41 -0500201 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400202 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
203 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
204 }
205 assert.NotNil(t, unpackResult)
206}
207
208func TestGetDevice(t *testing.T) {
209 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400210 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400211 args := make([]*kk.KVArg, 1)
212 args[0] = &kk.KVArg{
213 Key: "deviceID",
214 Value: protoMsg,
215 }
216 rpc := "GetDevice"
217 topic := kk.Topic{Name: "Core"}
218 expectedResponse := &voltha.Device{Id: trnsId}
219 timeout := time.Duration(50) * time.Millisecond
220 ctx, cancel := context.WithTimeout(context.Background(), timeout)
221 defer cancel()
222 start := time.Now()
khenaidoo79232702018-12-04 11:00:41 -0500223 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400224 elapsed := time.Since(start)
225 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
226 assert.Equal(t, status, true)
227 unpackResult := &voltha.Device{}
228 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
229 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
230 }
231 assert.Equal(t, unpackResult, expectedResponse)
232}
233
234func TestGetDeviceTimeout(t *testing.T) {
235 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400236 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400237 args := make([]*kk.KVArg, 1)
238 args[0] = &kk.KVArg{
239 Key: "deviceID",
240 Value: protoMsg,
241 }
242 rpc := "GetDevice"
243 topic := kk.Topic{Name: "Core"}
244 timeout := time.Duration(2) * time.Millisecond
245 ctx, cancel := context.WithTimeout(context.Background(), timeout)
246 defer cancel()
247 start := time.Now()
khenaidoo79232702018-12-04 11:00:41 -0500248 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400249 elapsed := time.Since(start)
250 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
251 assert.Equal(t, status, false)
khenaidoo79232702018-12-04 11:00:41 -0500252 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400253 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
254 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
255 }
256 assert.NotNil(t, unpackResult)
257}
258
259func TestGetChildDevice(t *testing.T) {
260 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400261 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400262 args := make([]*kk.KVArg, 1)
263 args[0] = &kk.KVArg{
264 Key: "deviceID",
265 Value: protoMsg,
266 }
267 rpc := "GetChildDevice"
268 topic := kk.Topic{Name: "Core"}
269 expectedResponse := &voltha.Device{Id: trnsId}
270 start := time.Now()
khenaidoo79232702018-12-04 11:00:41 -0500271 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400272 elapsed := time.Since(start)
273 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
274 assert.Equal(t, status, true)
275 unpackResult := &voltha.Device{}
276 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
277 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
278 }
279 assert.Equal(t, unpackResult, expectedResponse)
280}
281
282func TestGetChildDevices(t *testing.T) {
283 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400284 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400285 args := make([]*kk.KVArg, 1)
286 args[0] = &kk.KVArg{
287 Key: "deviceID",
288 Value: protoMsg,
289 }
290 rpc := "GetChildDevices"
291 topic := kk.Topic{Name: "Core"}
292 expectedResponse := &voltha.Device{Id: trnsId}
293 start := time.Now()
khenaidoo79232702018-12-04 11:00:41 -0500294 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400295 elapsed := time.Since(start)
296 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
297 assert.Equal(t, status, true)
298 unpackResult := &voltha.Device{}
299 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
300 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
301 }
302 assert.Equal(t, unpackResult, expectedResponse)
303}
304
305func TestGetPorts(t *testing.T) {
306 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400307 protoArg1 := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400308 args := make([]*kk.KVArg, 2)
309 args[0] = &kk.KVArg{
310 Key: "deviceID",
311 Value: protoArg1,
312 }
khenaidoo79232702018-12-04 11:00:41 -0500313 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400314 args[1] = &kk.KVArg{
315 Key: "portType",
316 Value: protoArg2,
317 }
318 rpc := "GetPorts"
319 topic := kk.Topic{Name: "Core"}
320 start := time.Now()
khenaidoo79232702018-12-04 11:00:41 -0500321 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400322 elapsed := time.Since(start)
323 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
324 assert.Equal(t, status, true)
325 unpackResult := &voltha.Ports{}
326 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
327 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
328 }
329 expectedLen := len(unpackResult.Items) >= 1
330 assert.Equal(t, true, expectedLen)
331}
332
333func TestGetPortsMissingArgs(t *testing.T) {
334 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400335 protoArg1 := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400336 args := make([]*kk.KVArg, 1)
337 args[0] = &kk.KVArg{
338 Key: "deviceID",
339 Value: protoArg1,
340 }
341 rpc := "GetPorts"
342 topic := kk.Topic{Name: "Core"}
343 start := time.Now()
khenaidoo79232702018-12-04 11:00:41 -0500344 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400345 elapsed := time.Since(start)
346 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
347 assert.Equal(t, status, false)
348 //Unpack the result into the actual proto object
khenaidoo79232702018-12-04 11:00:41 -0500349 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400350 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
351 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
352 }
353 assert.NotNil(t, unpackResult)
354}
355
356func TestChildDeviceDetected(t *testing.T) {
357 trnsId := uuid.New().String()
khenaidoo79232702018-12-04 11:00:41 -0500358 protoArg1 := &ic.StrType{Val: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400359 args := make([]*kk.KVArg, 5)
360 args[0] = &kk.KVArg{
361 Key: "deviceID",
362 Value: protoArg1,
363 }
khenaidoo79232702018-12-04 11:00:41 -0500364 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400365 args[1] = &kk.KVArg{
366 Key: "parentPortNo",
367 Value: protoArg2,
368 }
khenaidoo79232702018-12-04 11:00:41 -0500369 protoArg3 := &ic.StrType{Val: "great_onu"}
khenaidooabad44c2018-08-03 16:58:35 -0400370 args[2] = &kk.KVArg{
371 Key: "childDeviceType",
372 Value: protoArg3,
373 }
374 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
375 args[3] = &kk.KVArg{
376 Key: "proxyAddress",
377 Value: protoArg4,
378 }
khenaidoo79232702018-12-04 11:00:41 -0500379 protoArg5 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400380 args[4] = &kk.KVArg{
381 Key: "portType",
382 Value: protoArg5,
383 }
384
385 rpc := "ChildDeviceDetected"
386 topic := kk.Topic{Name: "Core"}
387 start := time.Now()
khenaidoo79232702018-12-04 11:00:41 -0500388 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400389 elapsed := time.Since(start)
390 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
391 assert.Equal(t, status, true)
392 assert.Nil(t, result)
393}
394
395func TestChildDeviceDetectedNoWait(t *testing.T) {
396 trnsId := uuid.New().String()
khenaidoo79232702018-12-04 11:00:41 -0500397 protoArg1 := &ic.StrType{Val: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400398 args := make([]*kk.KVArg, 5)
399 args[0] = &kk.KVArg{
400 Key: "deviceID",
401 Value: protoArg1,
402 }
khenaidoo79232702018-12-04 11:00:41 -0500403 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400404 args[1] = &kk.KVArg{
405 Key: "parentPortNo",
406 Value: protoArg2,
407 }
khenaidoo79232702018-12-04 11:00:41 -0500408 protoArg3 := &ic.StrType{Val: "great_onu"}
khenaidooabad44c2018-08-03 16:58:35 -0400409 args[2] = &kk.KVArg{
410 Key: "childDeviceType",
411 Value: protoArg3,
412 }
413 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
414 args[3] = &kk.KVArg{
415 Key: "proxyAddress",
416 Value: protoArg4,
417 }
khenaidoo79232702018-12-04 11:00:41 -0500418 protoArg5 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400419 args[4] = &kk.KVArg{
420 Key: "portType",
421 Value: protoArg5,
422 }
423
424 rpc := "ChildDeviceDetected"
425 topic := kk.Topic{Name: "Core"}
426 start := time.Now()
khenaidoo79232702018-12-04 11:00:41 -0500427 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, false, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400428 elapsed := time.Since(start)
429 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
430 assert.Equal(t, status, true)
431 assert.Nil(t, result)
432}
433
434func TestChildDeviceDetectedMissingArgs(t *testing.T) {
435 trnsId := uuid.New().String()
khenaidoo79232702018-12-04 11:00:41 -0500436 protoArg1 := &ic.StrType{Val: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400437 args := make([]*kk.KVArg, 4)
438 args[0] = &kk.KVArg{
439 Key: "deviceID",
440 Value: protoArg1,
441 }
khenaidoo79232702018-12-04 11:00:41 -0500442 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400443 args[1] = &kk.KVArg{
444 Key: "parentPortNo",
445 Value: protoArg2,
446 }
khenaidoo79232702018-12-04 11:00:41 -0500447 protoArg3 := &ic.StrType{Val: "great_onu"}
khenaidooabad44c2018-08-03 16:58:35 -0400448 args[2] = &kk.KVArg{
449 Key: "childDeviceType",
450 Value: protoArg3,
451 }
khenaidooabad44c2018-08-03 16:58:35 -0400452
453 rpc := "ChildDeviceDetected"
454 topic := kk.Topic{Name: "Core"}
455 start := time.Now()
khenaidoo79232702018-12-04 11:00:41 -0500456 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400457 elapsed := time.Since(start)
458 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
459 assert.Equal(t, status, false)
khenaidoo79232702018-12-04 11:00:41 -0500460 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400461 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
462 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
463 }
464 assert.NotNil(t, unpackResult)
465}
466
khenaidoo92e62c52018-10-03 14:02:54 -0400467func TestDeviceStateChange(t *testing.T) {
468 log.SetAllLogLevel(log.DebugLevel)
469 trnsId := uuid.New().String()
470 protoArg1 := &voltha.ID{Id: trnsId}
471 args := make([]*kk.KVArg, 4)
472 args[0] = &kk.KVArg{
473 Key: "device_id",
474 Value: protoArg1,
475 }
khenaidoo79232702018-12-04 11:00:41 -0500476 protoArg2 := &ic.IntType{Val: 1}
khenaidoo92e62c52018-10-03 14:02:54 -0400477 args[1] = &kk.KVArg{
478 Key: "oper_status",
479 Value: protoArg2,
480 }
khenaidoo79232702018-12-04 11:00:41 -0500481 protoArg3 := &ic.IntType{Val: 1}
khenaidoo92e62c52018-10-03 14:02:54 -0400482 args[2] = &kk.KVArg{
483 Key: "connect_status",
484 Value: protoArg3,
485 }
486
487 rpc := "DeviceStateUpdate"
488 topic := kk.Topic{Name: "Core"}
489 start := time.Now()
khenaidoo79232702018-12-04 11:00:41 -0500490 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
khenaidoo92e62c52018-10-03 14:02:54 -0400491 elapsed := time.Since(start)
492 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
493 assert.Equal(t, status, true)
494 assert.Nil(t, result)
495}
496
khenaidoo79232702018-12-04 11:00:41 -0500497func subscribeToTopic(topic *kk.Topic, waitingChannel chan *ic.InterContainerMessage) error {
498 var ch <-chan *ic.InterContainerMessage
499 var err error
500 if ch, err = kafkaPartitionClient.Subscribe(topic); err != nil {
501 return nil
502 }
503 msg := <-ch
504
505 log.Debugw("msg-received", log.Fields{"msg": msg})
506 waitingChannel <- msg
507 return nil
508}
509
510func TestDeviceDiscovery(t *testing.T) {
511 // Create an intercontainer proxy - similar to the Core
512 testProxy, _ := kk.NewInterContainerProxy(
513 kk.InterContainerHost(hostIP),
514 kk.InterContainerPort(9092),
515 kk.DefaultTopic(&kk.Topic{Name: "Test"}),
516 kk.MsgClient(kafkaClient),
517 kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
518
519 // First start to wait for the message
520 waitingChannel := make(chan *ic.InterContainerMessage)
521 go subscribeToTopic(&kk.Topic{Name: affinityRouterTopic}, waitingChannel)
522
523 // Sleep to make sure the consumer is ready
524 time.Sleep(time.Millisecond * 100)
525
526 // Send the message
khenaidoo19374072018-12-11 11:05:15 -0500527 go testProxy.DeviceDiscovered("TestDeviceId", "TestDevicetype", "TestParentId", "myPODName")
khenaidoo79232702018-12-04 11:00:41 -0500528
529 msg := <-waitingChannel
530 totalTime := (time.Now().UnixNano() - msg.Header.Timestamp) / int64(time.Millisecond)
531 assert.Equal(t, msg.Header.Type, ic.MessageType_DEVICE_DISCOVERED)
532 // Unpack message
533 dd := &ic.DeviceDiscovered{}
534 err := ptypes.UnmarshalAny(msg.Body, dd)
535 assert.Nil(t, err)
536 assert.Equal(t, dd.Id, "TestDeviceId")
537 assert.Equal(t, dd.DeviceType, "TestDevicetype")
538 assert.Equal(t, dd.ParentId, "TestParentId")
khenaidoo19374072018-12-11 11:05:15 -0500539 assert.Equal(t, dd.Publisher, "myPODName")
khenaidoo79232702018-12-04 11:00:41 -0500540 log.Debugw("TotalTime", log.Fields{"time": totalTime})
541}
542
khenaidooabad44c2018-08-03 16:58:35 -0400543func TestStopKafkaProxy(t *testing.T) {
544 adapterKafkaProxy.Stop()
545 coreKafkaProxy.Stop()
546}
547
548//func TestMain(m *testing.T) {
549// log.Info("Main")
550//}