blob: 0293d6d20a0ad6d9b82cc7bba133bd00e90e205d [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
khenaidood4d922e2018-08-03 22:35:16 -040021 "github.com/google/uuid"
khenaidooabad44c2018-08-03 16:58:35 -040022 "github.com/opencord/voltha-go/common/log"
23 kk "github.com/opencord/voltha-go/kafka"
24 ca "github.com/opencord/voltha-go/protos/core_adapter"
25 "github.com/opencord/voltha-go/protos/voltha"
26 rhp "github.com/opencord/voltha-go/rw_core/core"
27 "github.com/stretchr/testify/assert"
28 "testing"
khenaidooabad44c2018-08-03 16:58:35 -040029 "time"
30)
31
khenaidoo2c6f1672018-09-20 23:14:41 -040032/*
33Prerequite: Start the kafka/zookeeper containers.
34*/
35
khenaidoo43c82122018-11-22 18:38:28 -050036var coreKafkaProxy *kk.InterContainerProxy
37var adapterKafkaProxy *kk.InterContainerProxy
khenaidooabad44c2018-08-03 16:58:35 -040038
39func init() {
khenaidoo2c6f1672018-09-20 23:14:41 -040040 log.AddPackage(log.JSON, log.ErrorLevel, nil)
41 log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
42 log.SetAllLogLevel(log.ErrorLevel)
khenaidoo43c82122018-11-22 18:38:28 -050043 kafkaClient := kk.NewSaramaClient(
44 kk.Host("10.176.212.108"),
45 kk.Port(9092))
khenaidoo2c6f1672018-09-20 23:14:41 -040046
khenaidoo43c82122018-11-22 18:38:28 -050047 coreKafkaProxy, _ = kk.NewInterContainerProxy(
48 kk.InterContainerHost("10.176.212.108"),
49 kk.InterContainerPort(9092),
50 kk.DefaultTopic(&kk.Topic{Name: "Core"}),
51 kk.MsgClient(kafkaClient))
khenaidooabad44c2018-08-03 16:58:35 -040052
khenaidoo43c82122018-11-22 18:38:28 -050053 adapterKafkaProxy, _ = kk.NewInterContainerProxy(
54 kk.InterContainerHost("10.176.212.108"),
55 kk.InterContainerPort(9092),
56 kk.DefaultTopic(&kk.Topic{Name: "Adapter"}),
57 kk.MsgClient(kafkaClient))
khenaidooabad44c2018-08-03 16:58:35 -040058
59 coreKafkaProxy.Start()
60 adapterKafkaProxy.Start()
61 subscribeTarget(coreKafkaProxy)
62}
63
khenaidoo43c82122018-11-22 18:38:28 -050064func subscribeTarget(kmp *kk.InterContainerProxy) {
khenaidooabad44c2018-08-03 16:58:35 -040065 topic := kk.Topic{Name: "Core"}
khenaidoo2c6f1672018-09-20 23:14:41 -040066 requestProxy := &rhp.AdapterRequestHandlerProxy{TestMode: true}
khenaidoo43c82122018-11-22 18:38:28 -050067 kmp.SubscribeWithRequestHandlerInterface(topic, requestProxy)
khenaidooabad44c2018-08-03 16:58:35 -040068}
69
70func waitForRPCMessage(topic kk.Topic, ch <-chan *ca.InterContainerMessage, doneCh chan string) {
71 for msg := range ch {
72 log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
73 // Unpack message
74 requestBody := &ca.InterContainerRequestBody{}
75 if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
76 doneCh <- "Error"
77 } else {
78 doneCh <- requestBody.Rpc
79 }
80 break
81 }
82}
83
khenaidoo43c82122018-11-22 18:38:28 -050084//func TestSubscribeUnsubscribe(t *testing.T) {
85// // First subscribe to the specific topic
86// topic := kk.Topic{Name: "Core"}
87// ch, err := coreKafkaProxy.Subs(topic)
88// assert.NotNil(t, ch)
89// assert.Nil(t, err)
90// // Create a channel to receive a response
91// waitCh := make(chan string)
92// // Wait for a message
93// go waitForRPCMessage(topic, ch, waitCh)
94// // Send the message - don't care of the response
95// rpc := "AnyRPCRequestForTest"
96// adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
97// // Wait for the result on ouw own channel
98// result := <-waitCh
99// assert.Equal(t, result, rpc)
100// close(waitCh)
101// err = coreKafkaProxy.UnSubscribe(topic, ch)
102// assert.Nil(t, err)
103//}
104//
105//func TestMultipleSubscribeUnsubscribe(t *testing.T) {
106// // First subscribe to the specific topic
107// //log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
108// var err error
109// var ch1 <-chan *ca.InterContainerMessage
110// var ch2 <-chan *ca.InterContainerMessage
111// topic := kk.Topic{Name: "Core"}
112// ch1, err = coreKafkaProxy.Subscribe(topic)
113// assert.NotNil(t, ch1)
114// assert.Nil(t, err)
115// // Create a channel to receive responses
116// waitCh := make(chan string)
117// ch2, err = coreKafkaProxy.Subscribe(topic)
118// assert.NotNil(t, ch2)
119// assert.Nil(t, err)
120// // Wait for a message
121// go waitForRPCMessage(topic, ch2, waitCh)
122// go waitForRPCMessage(topic, ch1, waitCh)
123//
124// // Send the message - don't care of the response
125// rpc := "AnyRPCRequestForTest"
126// adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
127// // Wait for the result on ouw own channel
128//
129// responses := 0
130// for msg := range waitCh {
131// assert.Equal(t, msg, rpc)
132// responses = responses + 1
133// if responses > 1 {
134// break
135// }
136// }
137// assert.Equal(t, responses, 2)
138// close(waitCh)
139// err = coreKafkaProxy.UnSubscribe(topic, ch1)
140// assert.Nil(t, err)
141// err = coreKafkaProxy.UnSubscribe(topic, ch2)
142// assert.Nil(t, err)
143//}
khenaidooabad44c2018-08-03 16:58:35 -0400144
145func TestIncorrectAPI(t *testing.T) {
khenaidoo2c6f1672018-09-20 23:14:41 -0400146 log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.ErrorLevel)
khenaidooabad44c2018-08-03 16:58:35 -0400147 trnsId := uuid.New().String()
148 protoMsg := &voltha.Device{Id: trnsId}
149 args := make([]*kk.KVArg, 1)
150 args[0] = &kk.KVArg{
151 Key: "device",
152 Value: protoMsg,
153 }
154 rpc := "IncorrectAPI"
155 topic := kk.Topic{Name: "Core"}
156 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500157 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400158 elapsed := time.Since(start)
159 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
160 assert.Equal(t, status, false)
161 //Unpack the result into the actual proto object
162 unpackResult := &ca.Error{}
163 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
164 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
165 }
166 assert.NotNil(t, unpackResult)
167}
168
169func TestIncorrectAPIParams(t *testing.T) {
170 trnsId := uuid.New().String()
171 protoMsg := &voltha.Device{Id: trnsId}
172 args := make([]*kk.KVArg, 1)
173 args[0] = &kk.KVArg{
174 Key: "device",
175 Value: protoMsg,
176 }
177 rpc := "GetDevice"
178 topic := kk.Topic{Name: "Core"}
179 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500180 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400181 elapsed := time.Since(start)
182 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
183 assert.Equal(t, status, false)
184 //Unpack the result into the actual proto object
185 unpackResult := &ca.Error{}
186 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
187 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
188 }
189 assert.NotNil(t, unpackResult)
190}
191
192func TestGetDevice(t *testing.T) {
193 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400194 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400195 args := make([]*kk.KVArg, 1)
196 args[0] = &kk.KVArg{
197 Key: "deviceID",
198 Value: protoMsg,
199 }
200 rpc := "GetDevice"
201 topic := kk.Topic{Name: "Core"}
202 expectedResponse := &voltha.Device{Id: trnsId}
203 timeout := time.Duration(50) * time.Millisecond
204 ctx, cancel := context.WithTimeout(context.Background(), timeout)
205 defer cancel()
206 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500207 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic,true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400208 elapsed := time.Since(start)
209 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
210 assert.Equal(t, status, true)
211 unpackResult := &voltha.Device{}
212 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
213 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
214 }
215 assert.Equal(t, unpackResult, expectedResponse)
216}
217
218func TestGetDeviceTimeout(t *testing.T) {
219 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400220 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400221 args := make([]*kk.KVArg, 1)
222 args[0] = &kk.KVArg{
223 Key: "deviceID",
224 Value: protoMsg,
225 }
226 rpc := "GetDevice"
227 topic := kk.Topic{Name: "Core"}
228 timeout := time.Duration(2) * time.Millisecond
229 ctx, cancel := context.WithTimeout(context.Background(), timeout)
230 defer cancel()
231 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500232 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic,true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400233 elapsed := time.Since(start)
234 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
235 assert.Equal(t, status, false)
236 unpackResult := &ca.Error{}
237 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
238 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
239 }
240 assert.NotNil(t, unpackResult)
241}
242
243func TestGetChildDevice(t *testing.T) {
244 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400245 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400246 args := make([]*kk.KVArg, 1)
247 args[0] = &kk.KVArg{
248 Key: "deviceID",
249 Value: protoMsg,
250 }
251 rpc := "GetChildDevice"
252 topic := kk.Topic{Name: "Core"}
253 expectedResponse := &voltha.Device{Id: trnsId}
254 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500255 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400256 elapsed := time.Since(start)
257 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
258 assert.Equal(t, status, true)
259 unpackResult := &voltha.Device{}
260 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
261 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
262 }
263 assert.Equal(t, unpackResult, expectedResponse)
264}
265
266func TestGetChildDevices(t *testing.T) {
267 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400268 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400269 args := make([]*kk.KVArg, 1)
270 args[0] = &kk.KVArg{
271 Key: "deviceID",
272 Value: protoMsg,
273 }
274 rpc := "GetChildDevices"
275 topic := kk.Topic{Name: "Core"}
276 expectedResponse := &voltha.Device{Id: trnsId}
277 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500278 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400279 elapsed := time.Since(start)
280 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
281 assert.Equal(t, status, true)
282 unpackResult := &voltha.Device{}
283 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
284 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
285 }
286 assert.Equal(t, unpackResult, expectedResponse)
287}
288
289func TestGetPorts(t *testing.T) {
290 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400291 protoArg1 := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400292 args := make([]*kk.KVArg, 2)
293 args[0] = &kk.KVArg{
294 Key: "deviceID",
295 Value: protoArg1,
296 }
297 protoArg2 := &ca.IntType{Val: 1}
298 args[1] = &kk.KVArg{
299 Key: "portType",
300 Value: protoArg2,
301 }
302 rpc := "GetPorts"
303 topic := kk.Topic{Name: "Core"}
304 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500305 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400306 elapsed := time.Since(start)
307 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
308 assert.Equal(t, status, true)
309 unpackResult := &voltha.Ports{}
310 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
311 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
312 }
313 expectedLen := len(unpackResult.Items) >= 1
314 assert.Equal(t, true, expectedLen)
315}
316
317func TestGetPortsMissingArgs(t *testing.T) {
318 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400319 protoArg1 := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400320 args := make([]*kk.KVArg, 1)
321 args[0] = &kk.KVArg{
322 Key: "deviceID",
323 Value: protoArg1,
324 }
325 rpc := "GetPorts"
326 topic := kk.Topic{Name: "Core"}
327 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500328 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400329 elapsed := time.Since(start)
330 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
331 assert.Equal(t, status, false)
332 //Unpack the result into the actual proto object
333 unpackResult := &ca.Error{}
334 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
335 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
336 }
337 assert.NotNil(t, unpackResult)
338}
339
340func TestChildDeviceDetected(t *testing.T) {
341 trnsId := uuid.New().String()
342 protoArg1 := &ca.StrType{Val: trnsId}
343 args := make([]*kk.KVArg, 5)
344 args[0] = &kk.KVArg{
345 Key: "deviceID",
346 Value: protoArg1,
347 }
348 protoArg2 := &ca.IntType{Val: 1}
349 args[1] = &kk.KVArg{
350 Key: "parentPortNo",
351 Value: protoArg2,
352 }
353 protoArg3 := &ca.StrType{Val: "great_onu"}
354 args[2] = &kk.KVArg{
355 Key: "childDeviceType",
356 Value: protoArg3,
357 }
358 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
359 args[3] = &kk.KVArg{
360 Key: "proxyAddress",
361 Value: protoArg4,
362 }
363 protoArg5 := &ca.IntType{Val: 1}
364 args[4] = &kk.KVArg{
365 Key: "portType",
366 Value: protoArg5,
367 }
368
369 rpc := "ChildDeviceDetected"
370 topic := kk.Topic{Name: "Core"}
371 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500372 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400373 elapsed := time.Since(start)
374 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
375 assert.Equal(t, status, true)
376 assert.Nil(t, result)
377}
378
379func TestChildDeviceDetectedNoWait(t *testing.T) {
380 trnsId := uuid.New().String()
381 protoArg1 := &ca.StrType{Val: trnsId}
382 args := make([]*kk.KVArg, 5)
383 args[0] = &kk.KVArg{
384 Key: "deviceID",
385 Value: protoArg1,
386 }
387 protoArg2 := &ca.IntType{Val: 1}
388 args[1] = &kk.KVArg{
389 Key: "parentPortNo",
390 Value: protoArg2,
391 }
392 protoArg3 := &ca.StrType{Val: "great_onu"}
393 args[2] = &kk.KVArg{
394 Key: "childDeviceType",
395 Value: protoArg3,
396 }
397 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
398 args[3] = &kk.KVArg{
399 Key: "proxyAddress",
400 Value: protoArg4,
401 }
402 protoArg5 := &ca.IntType{Val: 1}
403 args[4] = &kk.KVArg{
404 Key: "portType",
405 Value: protoArg5,
406 }
407
408 rpc := "ChildDeviceDetected"
409 topic := kk.Topic{Name: "Core"}
410 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500411 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,false, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400412 elapsed := time.Since(start)
413 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
414 assert.Equal(t, status, true)
415 assert.Nil(t, result)
416}
417
418func TestChildDeviceDetectedMissingArgs(t *testing.T) {
419 trnsId := uuid.New().String()
420 protoArg1 := &ca.StrType{Val: trnsId}
421 args := make([]*kk.KVArg, 4)
422 args[0] = &kk.KVArg{
423 Key: "deviceID",
424 Value: protoArg1,
425 }
426 protoArg2 := &ca.IntType{Val: 1}
427 args[1] = &kk.KVArg{
428 Key: "parentPortNo",
429 Value: protoArg2,
430 }
431 protoArg3 := &ca.StrType{Val: "great_onu"}
432 args[2] = &kk.KVArg{
433 Key: "childDeviceType",
434 Value: protoArg3,
435 }
khenaidooabad44c2018-08-03 16:58:35 -0400436
437 rpc := "ChildDeviceDetected"
438 topic := kk.Topic{Name: "Core"}
439 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500440 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400441 elapsed := time.Since(start)
442 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
443 assert.Equal(t, status, false)
444 unpackResult := &ca.Error{}
445 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
446 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
447 }
448 assert.NotNil(t, unpackResult)
449}
450
khenaidoo92e62c52018-10-03 14:02:54 -0400451func TestDeviceStateChange(t *testing.T) {
452 log.SetAllLogLevel(log.DebugLevel)
453 trnsId := uuid.New().String()
454 protoArg1 := &voltha.ID{Id: trnsId}
455 args := make([]*kk.KVArg, 4)
456 args[0] = &kk.KVArg{
457 Key: "device_id",
458 Value: protoArg1,
459 }
460 protoArg2 := &ca.IntType{Val: 1}
461 args[1] = &kk.KVArg{
462 Key: "oper_status",
463 Value: protoArg2,
464 }
465 protoArg3 := &ca.IntType{Val: 1}
466 args[2] = &kk.KVArg{
467 Key: "connect_status",
468 Value: protoArg3,
469 }
470
471 rpc := "DeviceStateUpdate"
472 topic := kk.Topic{Name: "Core"}
473 start := time.Now()
khenaidoo43c82122018-11-22 18:38:28 -0500474 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
khenaidoo92e62c52018-10-03 14:02:54 -0400475 elapsed := time.Since(start)
476 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
477 assert.Equal(t, status, true)
478 assert.Nil(t, result)
479}
480
khenaidooabad44c2018-08-03 16:58:35 -0400481func TestStopKafkaProxy(t *testing.T) {
482 adapterKafkaProxy.Stop()
483 coreKafkaProxy.Stop()
484}
485
486//func TestMain(m *testing.T) {
487// log.Info("Main")
488//}