blob: 99ba2c48068d5f518204a3aa3f326ffb7e68a1c7 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
khenaidood4d922e2018-08-03 22:35:16 -040021 "github.com/google/uuid"
khenaidooabad44c2018-08-03 16:58:35 -040022 "github.com/opencord/voltha-go/common/log"
23 kk "github.com/opencord/voltha-go/kafka"
24 ca "github.com/opencord/voltha-go/protos/core_adapter"
25 "github.com/opencord/voltha-go/protos/voltha"
26 rhp "github.com/opencord/voltha-go/rw_core/core"
27 "github.com/stretchr/testify/assert"
28 "testing"
khenaidooabad44c2018-08-03 16:58:35 -040029 "time"
30)
31
khenaidoo2c6f1672018-09-20 23:14:41 -040032/*
33Prerequite: Start the kafka/zookeeper containers.
34*/
35
khenaidooabad44c2018-08-03 16:58:35 -040036var coreKafkaProxy *kk.KafkaMessagingProxy
37var adapterKafkaProxy *kk.KafkaMessagingProxy
38
39func init() {
khenaidoo2c6f1672018-09-20 23:14:41 -040040 log.AddPackage(log.JSON, log.ErrorLevel, nil)
41 log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
42 log.SetAllLogLevel(log.ErrorLevel)
43
khenaidooabad44c2018-08-03 16:58:35 -040044 coreKafkaProxy, _ = kk.NewKafkaMessagingProxy(
khenaidoo2c6f1672018-09-20 23:14:41 -040045 kk.KafkaHost("192.168.0.20"),
khenaidooabad44c2018-08-03 16:58:35 -040046 kk.KafkaPort(9092),
47 kk.DefaultTopic(&kk.Topic{Name: "Core"}))
48
49 adapterKafkaProxy, _ = kk.NewKafkaMessagingProxy(
khenaidoo2c6f1672018-09-20 23:14:41 -040050 kk.KafkaHost("192.168.0.20"),
khenaidooabad44c2018-08-03 16:58:35 -040051 kk.KafkaPort(9092),
52 kk.DefaultTopic(&kk.Topic{Name: "Adapter"}))
53
54 coreKafkaProxy.Start()
55 adapterKafkaProxy.Start()
56 subscribeTarget(coreKafkaProxy)
57}
58
59func subscribeTarget(kmp *kk.KafkaMessagingProxy) {
60 topic := kk.Topic{Name: "Core"}
khenaidoo2c6f1672018-09-20 23:14:41 -040061 requestProxy := &rhp.AdapterRequestHandlerProxy{TestMode: true}
khenaidooabad44c2018-08-03 16:58:35 -040062 kmp.SubscribeWithTarget(topic, requestProxy)
63}
64
65func waitForRPCMessage(topic kk.Topic, ch <-chan *ca.InterContainerMessage, doneCh chan string) {
66 for msg := range ch {
67 log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
68 // Unpack message
69 requestBody := &ca.InterContainerRequestBody{}
70 if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
71 doneCh <- "Error"
72 } else {
73 doneCh <- requestBody.Rpc
74 }
75 break
76 }
77}
78
79func TestSubscribeUnsubscribe(t *testing.T) {
80 // First subscribe to the specific topic
81 topic := kk.Topic{Name: "Core"}
82 ch, err := coreKafkaProxy.Subscribe(topic)
83 assert.NotNil(t, ch)
84 assert.Nil(t, err)
85 // Create a channel to receive a response
86 waitCh := make(chan string)
87 // Wait for a message
88 go waitForRPCMessage(topic, ch, waitCh)
89 // Send the message - don't care of the response
90 rpc := "AnyRPCRequestForTest"
91 adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
92 // Wait for the result on ouw own channel
93 result := <-waitCh
94 assert.Equal(t, result, rpc)
95 close(waitCh)
96 err = coreKafkaProxy.UnSubscribe(topic, ch)
97 assert.Nil(t, err)
98}
99
100func TestMultipleSubscribeUnsubscribe(t *testing.T) {
101 // First subscribe to the specific topic
khenaidoo2c6f1672018-09-20 23:14:41 -0400102 //log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
khenaidooabad44c2018-08-03 16:58:35 -0400103 var err error
104 var ch1 <-chan *ca.InterContainerMessage
105 var ch2 <-chan *ca.InterContainerMessage
106 topic := kk.Topic{Name: "Core"}
107 ch1, err = coreKafkaProxy.Subscribe(topic)
108 assert.NotNil(t, ch1)
109 assert.Nil(t, err)
110 // Create a channel to receive responses
111 waitCh := make(chan string)
112 ch2, err = coreKafkaProxy.Subscribe(topic)
113 assert.NotNil(t, ch2)
114 assert.Nil(t, err)
115 // Wait for a message
116 go waitForRPCMessage(topic, ch2, waitCh)
117 go waitForRPCMessage(topic, ch1, waitCh)
118
119 // Send the message - don't care of the response
120 rpc := "AnyRPCRequestForTest"
121 adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
122 // Wait for the result on ouw own channel
123
124 responses := 0
125 for msg := range waitCh {
126 assert.Equal(t, msg, rpc)
127 responses = responses + 1
128 if responses > 1 {
129 break
130 }
131 }
132 assert.Equal(t, responses, 2)
133 close(waitCh)
134 err = coreKafkaProxy.UnSubscribe(topic, ch1)
135 assert.Nil(t, err)
136 err = coreKafkaProxy.UnSubscribe(topic, ch2)
137 assert.Nil(t, err)
138}
139
140func TestIncorrectAPI(t *testing.T) {
khenaidoo2c6f1672018-09-20 23:14:41 -0400141 log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.ErrorLevel)
khenaidooabad44c2018-08-03 16:58:35 -0400142 trnsId := uuid.New().String()
143 protoMsg := &voltha.Device{Id: trnsId}
144 args := make([]*kk.KVArg, 1)
145 args[0] = &kk.KVArg{
146 Key: "device",
147 Value: protoMsg,
148 }
149 rpc := "IncorrectAPI"
150 topic := kk.Topic{Name: "Core"}
151 start := time.Now()
152 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
153 elapsed := time.Since(start)
154 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
155 assert.Equal(t, status, false)
156 //Unpack the result into the actual proto object
157 unpackResult := &ca.Error{}
158 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
159 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
160 }
161 assert.NotNil(t, unpackResult)
162}
163
164func TestIncorrectAPIParams(t *testing.T) {
165 trnsId := uuid.New().String()
166 protoMsg := &voltha.Device{Id: trnsId}
167 args := make([]*kk.KVArg, 1)
168 args[0] = &kk.KVArg{
169 Key: "device",
170 Value: protoMsg,
171 }
172 rpc := "GetDevice"
173 topic := kk.Topic{Name: "Core"}
174 start := time.Now()
175 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
176 elapsed := time.Since(start)
177 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
178 assert.Equal(t, status, false)
179 //Unpack the result into the actual proto object
180 unpackResult := &ca.Error{}
181 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
182 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
183 }
184 assert.NotNil(t, unpackResult)
185}
186
187func TestGetDevice(t *testing.T) {
188 trnsId := uuid.New().String()
khenaidoo2c6f1672018-09-20 23:14:41 -0400189 protoMsg := &voltha.ID{Id:trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400190 args := make([]*kk.KVArg, 1)
191 args[0] = &kk.KVArg{
192 Key: "deviceID",
193 Value: protoMsg,
194 }
195 rpc := "GetDevice"
196 topic := kk.Topic{Name: "Core"}
197 expectedResponse := &voltha.Device{Id: trnsId}
198 timeout := time.Duration(50) * time.Millisecond
199 ctx, cancel := context.WithTimeout(context.Background(), timeout)
200 defer cancel()
201 start := time.Now()
202 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
203 elapsed := time.Since(start)
204 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
205 assert.Equal(t, status, true)
206 unpackResult := &voltha.Device{}
207 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
208 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
209 }
210 assert.Equal(t, unpackResult, expectedResponse)
211}
212
213func TestGetDeviceTimeout(t *testing.T) {
214 trnsId := uuid.New().String()
khenaidoo2c6f1672018-09-20 23:14:41 -0400215 protoMsg := &voltha.ID{Id:trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400216 args := make([]*kk.KVArg, 1)
217 args[0] = &kk.KVArg{
218 Key: "deviceID",
219 Value: protoMsg,
220 }
221 rpc := "GetDevice"
222 topic := kk.Topic{Name: "Core"}
223 timeout := time.Duration(2) * time.Millisecond
224 ctx, cancel := context.WithTimeout(context.Background(), timeout)
225 defer cancel()
226 start := time.Now()
227 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
228 elapsed := time.Since(start)
229 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
230 assert.Equal(t, status, false)
231 unpackResult := &ca.Error{}
232 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
233 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
234 }
235 assert.NotNil(t, unpackResult)
236}
237
238func TestGetChildDevice(t *testing.T) {
239 trnsId := uuid.New().String()
khenaidoo2c6f1672018-09-20 23:14:41 -0400240 protoMsg := &voltha.ID{Id:trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400241 args := make([]*kk.KVArg, 1)
242 args[0] = &kk.KVArg{
243 Key: "deviceID",
244 Value: protoMsg,
245 }
246 rpc := "GetChildDevice"
247 topic := kk.Topic{Name: "Core"}
248 expectedResponse := &voltha.Device{Id: trnsId}
249 start := time.Now()
250 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
251 elapsed := time.Since(start)
252 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
253 assert.Equal(t, status, true)
254 unpackResult := &voltha.Device{}
255 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
256 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
257 }
258 assert.Equal(t, unpackResult, expectedResponse)
259}
260
261func TestGetChildDevices(t *testing.T) {
262 trnsId := uuid.New().String()
khenaidoo2c6f1672018-09-20 23:14:41 -0400263 protoMsg := &voltha.ID{Id:trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400264 args := make([]*kk.KVArg, 1)
265 args[0] = &kk.KVArg{
266 Key: "deviceID",
267 Value: protoMsg,
268 }
269 rpc := "GetChildDevices"
270 topic := kk.Topic{Name: "Core"}
271 expectedResponse := &voltha.Device{Id: trnsId}
272 start := time.Now()
273 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
274 elapsed := time.Since(start)
275 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
276 assert.Equal(t, status, true)
277 unpackResult := &voltha.Device{}
278 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
279 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
280 }
281 assert.Equal(t, unpackResult, expectedResponse)
282}
283
284func TestGetPorts(t *testing.T) {
285 trnsId := uuid.New().String()
khenaidoo2c6f1672018-09-20 23:14:41 -0400286 protoArg1 := &voltha.ID{Id:trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400287 args := make([]*kk.KVArg, 2)
288 args[0] = &kk.KVArg{
289 Key: "deviceID",
290 Value: protoArg1,
291 }
292 protoArg2 := &ca.IntType{Val: 1}
293 args[1] = &kk.KVArg{
294 Key: "portType",
295 Value: protoArg2,
296 }
297 rpc := "GetPorts"
298 topic := kk.Topic{Name: "Core"}
299 start := time.Now()
300 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
301 elapsed := time.Since(start)
302 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
303 assert.Equal(t, status, true)
304 unpackResult := &voltha.Ports{}
305 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
306 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
307 }
308 expectedLen := len(unpackResult.Items) >= 1
309 assert.Equal(t, true, expectedLen)
310}
311
312func TestGetPortsMissingArgs(t *testing.T) {
313 trnsId := uuid.New().String()
khenaidoo2c6f1672018-09-20 23:14:41 -0400314 protoArg1 := &voltha.ID{Id:trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400315 args := make([]*kk.KVArg, 1)
316 args[0] = &kk.KVArg{
317 Key: "deviceID",
318 Value: protoArg1,
319 }
320 rpc := "GetPorts"
321 topic := kk.Topic{Name: "Core"}
322 start := time.Now()
323 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
324 elapsed := time.Since(start)
325 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
326 assert.Equal(t, status, false)
327 //Unpack the result into the actual proto object
328 unpackResult := &ca.Error{}
329 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
330 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
331 }
332 assert.NotNil(t, unpackResult)
333}
334
335func TestChildDeviceDetected(t *testing.T) {
336 trnsId := uuid.New().String()
337 protoArg1 := &ca.StrType{Val: trnsId}
338 args := make([]*kk.KVArg, 5)
339 args[0] = &kk.KVArg{
340 Key: "deviceID",
341 Value: protoArg1,
342 }
343 protoArg2 := &ca.IntType{Val: 1}
344 args[1] = &kk.KVArg{
345 Key: "parentPortNo",
346 Value: protoArg2,
347 }
348 protoArg3 := &ca.StrType{Val: "great_onu"}
349 args[2] = &kk.KVArg{
350 Key: "childDeviceType",
351 Value: protoArg3,
352 }
353 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
354 args[3] = &kk.KVArg{
355 Key: "proxyAddress",
356 Value: protoArg4,
357 }
358 protoArg5 := &ca.IntType{Val: 1}
359 args[4] = &kk.KVArg{
360 Key: "portType",
361 Value: protoArg5,
362 }
363
364 rpc := "ChildDeviceDetected"
365 topic := kk.Topic{Name: "Core"}
366 start := time.Now()
367 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
368 elapsed := time.Since(start)
369 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
370 assert.Equal(t, status, true)
371 assert.Nil(t, result)
372}
373
374func TestChildDeviceDetectedNoWait(t *testing.T) {
375 trnsId := uuid.New().String()
376 protoArg1 := &ca.StrType{Val: trnsId}
377 args := make([]*kk.KVArg, 5)
378 args[0] = &kk.KVArg{
379 Key: "deviceID",
380 Value: protoArg1,
381 }
382 protoArg2 := &ca.IntType{Val: 1}
383 args[1] = &kk.KVArg{
384 Key: "parentPortNo",
385 Value: protoArg2,
386 }
387 protoArg3 := &ca.StrType{Val: "great_onu"}
388 args[2] = &kk.KVArg{
389 Key: "childDeviceType",
390 Value: protoArg3,
391 }
392 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
393 args[3] = &kk.KVArg{
394 Key: "proxyAddress",
395 Value: protoArg4,
396 }
397 protoArg5 := &ca.IntType{Val: 1}
398 args[4] = &kk.KVArg{
399 Key: "portType",
400 Value: protoArg5,
401 }
402
403 rpc := "ChildDeviceDetected"
404 topic := kk.Topic{Name: "Core"}
405 start := time.Now()
406 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
407 elapsed := time.Since(start)
408 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
409 assert.Equal(t, status, true)
410 assert.Nil(t, result)
411}
412
413func TestChildDeviceDetectedMissingArgs(t *testing.T) {
414 trnsId := uuid.New().String()
415 protoArg1 := &ca.StrType{Val: trnsId}
416 args := make([]*kk.KVArg, 4)
417 args[0] = &kk.KVArg{
418 Key: "deviceID",
419 Value: protoArg1,
420 }
421 protoArg2 := &ca.IntType{Val: 1}
422 args[1] = &kk.KVArg{
423 Key: "parentPortNo",
424 Value: protoArg2,
425 }
426 protoArg3 := &ca.StrType{Val: "great_onu"}
427 args[2] = &kk.KVArg{
428 Key: "childDeviceType",
429 Value: protoArg3,
430 }
khenaidooabad44c2018-08-03 16:58:35 -0400431
432 rpc := "ChildDeviceDetected"
433 topic := kk.Topic{Name: "Core"}
434 start := time.Now()
435 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
436 elapsed := time.Since(start)
437 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
438 assert.Equal(t, status, false)
439 unpackResult := &ca.Error{}
440 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
441 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
442 }
443 assert.NotNil(t, unpackResult)
444}
445
446func TestStopKafkaProxy(t *testing.T) {
447 adapterKafkaProxy.Stop()
448 coreKafkaProxy.Stop()
449}
450
451//func TestMain(m *testing.T) {
452// log.Info("Main")
453//}