blob: 1f60cb198ba9d8b14c81cfc67f57274bd1dbf40e [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidooabad44c2018-08-03 16:58:35 -040016package kafka
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
khenaidood4d922e2018-08-03 22:35:16 -040021 "github.com/google/uuid"
khenaidooabad44c2018-08-03 16:58:35 -040022 "github.com/opencord/voltha-go/common/log"
23 kk "github.com/opencord/voltha-go/kafka"
24 ca "github.com/opencord/voltha-go/protos/core_adapter"
25 "github.com/opencord/voltha-go/protos/voltha"
26 rhp "github.com/opencord/voltha-go/rw_core/core"
27 "github.com/stretchr/testify/assert"
28 "testing"
khenaidooabad44c2018-08-03 16:58:35 -040029 "time"
30)
31
32var coreKafkaProxy *kk.KafkaMessagingProxy
33var adapterKafkaProxy *kk.KafkaMessagingProxy
34
35func init() {
khenaidood4d922e2018-08-03 22:35:16 -040036 if _, err := log.SetLogger(log.JSON, 3, log.Fields{"instanceId": "testing"}); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -040037 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
38 }
39 coreKafkaProxy, _ = kk.NewKafkaMessagingProxy(
khenaidoob9203542018-09-17 22:56:37 -040040 kk.KafkaHost("10.176.215.107"),
khenaidooabad44c2018-08-03 16:58:35 -040041 kk.KafkaPort(9092),
42 kk.DefaultTopic(&kk.Topic{Name: "Core"}))
43
44 adapterKafkaProxy, _ = kk.NewKafkaMessagingProxy(
khenaidoob9203542018-09-17 22:56:37 -040045 kk.KafkaHost("10.176.215.107"),
khenaidooabad44c2018-08-03 16:58:35 -040046 kk.KafkaPort(9092),
47 kk.DefaultTopic(&kk.Topic{Name: "Adapter"}))
48
49 coreKafkaProxy.Start()
50 adapterKafkaProxy.Start()
51 subscribeTarget(coreKafkaProxy)
52}
53
54func subscribeTarget(kmp *kk.KafkaMessagingProxy) {
55 topic := kk.Topic{Name: "Core"}
56 requestProxy := &rhp.RequestHandlerProxy{TestMode: true}
57 kmp.SubscribeWithTarget(topic, requestProxy)
58}
59
60func waitForRPCMessage(topic kk.Topic, ch <-chan *ca.InterContainerMessage, doneCh chan string) {
61 for msg := range ch {
62 log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
63 // Unpack message
64 requestBody := &ca.InterContainerRequestBody{}
65 if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
66 doneCh <- "Error"
67 } else {
68 doneCh <- requestBody.Rpc
69 }
70 break
71 }
72}
73
74func TestSubscribeUnsubscribe(t *testing.T) {
75 // First subscribe to the specific topic
76 topic := kk.Topic{Name: "Core"}
77 ch, err := coreKafkaProxy.Subscribe(topic)
78 assert.NotNil(t, ch)
79 assert.Nil(t, err)
80 // Create a channel to receive a response
81 waitCh := make(chan string)
82 // Wait for a message
83 go waitForRPCMessage(topic, ch, waitCh)
84 // Send the message - don't care of the response
85 rpc := "AnyRPCRequestForTest"
86 adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
87 // Wait for the result on ouw own channel
88 result := <-waitCh
89 assert.Equal(t, result, rpc)
90 close(waitCh)
91 err = coreKafkaProxy.UnSubscribe(topic, ch)
92 assert.Nil(t, err)
93}
94
95func TestMultipleSubscribeUnsubscribe(t *testing.T) {
96 // First subscribe to the specific topic
khenaidood4d922e2018-08-03 22:35:16 -040097 log.SetLoglevel(0)
khenaidooabad44c2018-08-03 16:58:35 -040098 var err error
99 var ch1 <-chan *ca.InterContainerMessage
100 var ch2 <-chan *ca.InterContainerMessage
101 topic := kk.Topic{Name: "Core"}
102 ch1, err = coreKafkaProxy.Subscribe(topic)
103 assert.NotNil(t, ch1)
104 assert.Nil(t, err)
105 // Create a channel to receive responses
106 waitCh := make(chan string)
107 ch2, err = coreKafkaProxy.Subscribe(topic)
108 assert.NotNil(t, ch2)
109 assert.Nil(t, err)
110 // Wait for a message
111 go waitForRPCMessage(topic, ch2, waitCh)
112 go waitForRPCMessage(topic, ch1, waitCh)
113
114 // Send the message - don't care of the response
115 rpc := "AnyRPCRequestForTest"
116 adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
117 // Wait for the result on ouw own channel
118
119 responses := 0
120 for msg := range waitCh {
121 assert.Equal(t, msg, rpc)
122 responses = responses + 1
123 if responses > 1 {
124 break
125 }
126 }
127 assert.Equal(t, responses, 2)
128 close(waitCh)
129 err = coreKafkaProxy.UnSubscribe(topic, ch1)
130 assert.Nil(t, err)
131 err = coreKafkaProxy.UnSubscribe(topic, ch2)
132 assert.Nil(t, err)
133}
134
135func TestIncorrectAPI(t *testing.T) {
khenaidood4d922e2018-08-03 22:35:16 -0400136 log.SetLoglevel(3)
khenaidooabad44c2018-08-03 16:58:35 -0400137 trnsId := uuid.New().String()
138 protoMsg := &voltha.Device{Id: trnsId}
139 args := make([]*kk.KVArg, 1)
140 args[0] = &kk.KVArg{
141 Key: "device",
142 Value: protoMsg,
143 }
144 rpc := "IncorrectAPI"
145 topic := kk.Topic{Name: "Core"}
146 start := time.Now()
147 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
148 elapsed := time.Since(start)
149 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
150 assert.Equal(t, status, false)
151 //Unpack the result into the actual proto object
152 unpackResult := &ca.Error{}
153 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
154 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
155 }
156 assert.NotNil(t, unpackResult)
157}
158
159func TestIncorrectAPIParams(t *testing.T) {
160 trnsId := uuid.New().String()
161 protoMsg := &voltha.Device{Id: trnsId}
162 args := make([]*kk.KVArg, 1)
163 args[0] = &kk.KVArg{
164 Key: "device",
165 Value: protoMsg,
166 }
167 rpc := "GetDevice"
168 topic := kk.Topic{Name: "Core"}
169 start := time.Now()
170 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
171 elapsed := time.Since(start)
172 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
173 assert.Equal(t, status, false)
174 //Unpack the result into the actual proto object
175 unpackResult := &ca.Error{}
176 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
177 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
178 }
179 assert.NotNil(t, unpackResult)
180}
181
182func TestGetDevice(t *testing.T) {
183 trnsId := uuid.New().String()
184 protoMsg := &ca.StrType{Val: trnsId}
185 args := make([]*kk.KVArg, 1)
186 args[0] = &kk.KVArg{
187 Key: "deviceID",
188 Value: protoMsg,
189 }
190 rpc := "GetDevice"
191 topic := kk.Topic{Name: "Core"}
192 expectedResponse := &voltha.Device{Id: trnsId}
193 timeout := time.Duration(50) * time.Millisecond
194 ctx, cancel := context.WithTimeout(context.Background(), timeout)
195 defer cancel()
196 start := time.Now()
197 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
198 elapsed := time.Since(start)
199 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
200 assert.Equal(t, status, true)
201 unpackResult := &voltha.Device{}
202 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
203 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
204 }
205 assert.Equal(t, unpackResult, expectedResponse)
206}
207
208func TestGetDeviceTimeout(t *testing.T) {
209 trnsId := uuid.New().String()
210 protoMsg := &ca.StrType{Val: trnsId}
211 args := make([]*kk.KVArg, 1)
212 args[0] = &kk.KVArg{
213 Key: "deviceID",
214 Value: protoMsg,
215 }
216 rpc := "GetDevice"
217 topic := kk.Topic{Name: "Core"}
218 timeout := time.Duration(2) * time.Millisecond
219 ctx, cancel := context.WithTimeout(context.Background(), timeout)
220 defer cancel()
221 start := time.Now()
222 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
223 elapsed := time.Since(start)
224 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
225 assert.Equal(t, status, false)
226 unpackResult := &ca.Error{}
227 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
228 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
229 }
230 assert.NotNil(t, unpackResult)
231}
232
233func TestGetChildDevice(t *testing.T) {
234 trnsId := uuid.New().String()
235 protoMsg := &ca.StrType{Val: trnsId}
236 args := make([]*kk.KVArg, 1)
237 args[0] = &kk.KVArg{
238 Key: "deviceID",
239 Value: protoMsg,
240 }
241 rpc := "GetChildDevice"
242 topic := kk.Topic{Name: "Core"}
243 expectedResponse := &voltha.Device{Id: trnsId}
244 start := time.Now()
245 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
246 elapsed := time.Since(start)
247 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
248 assert.Equal(t, status, true)
249 unpackResult := &voltha.Device{}
250 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
251 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
252 }
253 assert.Equal(t, unpackResult, expectedResponse)
254}
255
256func TestGetChildDevices(t *testing.T) {
257 trnsId := uuid.New().String()
258 protoMsg := &ca.StrType{Val: trnsId}
259 args := make([]*kk.KVArg, 1)
260 args[0] = &kk.KVArg{
261 Key: "deviceID",
262 Value: protoMsg,
263 }
264 rpc := "GetChildDevices"
265 topic := kk.Topic{Name: "Core"}
266 expectedResponse := &voltha.Device{Id: trnsId}
267 start := time.Now()
268 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
269 elapsed := time.Since(start)
270 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
271 assert.Equal(t, status, true)
272 unpackResult := &voltha.Device{}
273 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
274 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
275 }
276 assert.Equal(t, unpackResult, expectedResponse)
277}
278
279func TestGetPorts(t *testing.T) {
280 trnsId := uuid.New().String()
281 protoArg1 := &ca.StrType{Val: trnsId}
282 args := make([]*kk.KVArg, 2)
283 args[0] = &kk.KVArg{
284 Key: "deviceID",
285 Value: protoArg1,
286 }
287 protoArg2 := &ca.IntType{Val: 1}
288 args[1] = &kk.KVArg{
289 Key: "portType",
290 Value: protoArg2,
291 }
292 rpc := "GetPorts"
293 topic := kk.Topic{Name: "Core"}
294 start := time.Now()
295 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
296 elapsed := time.Since(start)
297 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
298 assert.Equal(t, status, true)
299 unpackResult := &voltha.Ports{}
300 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
301 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
302 }
303 expectedLen := len(unpackResult.Items) >= 1
304 assert.Equal(t, true, expectedLen)
305}
306
307func TestGetPortsMissingArgs(t *testing.T) {
308 trnsId := uuid.New().String()
309 protoArg1 := &ca.StrType{Val: trnsId}
310 args := make([]*kk.KVArg, 1)
311 args[0] = &kk.KVArg{
312 Key: "deviceID",
313 Value: protoArg1,
314 }
315 rpc := "GetPorts"
316 topic := kk.Topic{Name: "Core"}
317 start := time.Now()
318 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
319 elapsed := time.Since(start)
320 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
321 assert.Equal(t, status, false)
322 //Unpack the result into the actual proto object
323 unpackResult := &ca.Error{}
324 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
325 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
326 }
327 assert.NotNil(t, unpackResult)
328}
329
330func TestChildDeviceDetected(t *testing.T) {
331 trnsId := uuid.New().String()
332 protoArg1 := &ca.StrType{Val: trnsId}
333 args := make([]*kk.KVArg, 5)
334 args[0] = &kk.KVArg{
335 Key: "deviceID",
336 Value: protoArg1,
337 }
338 protoArg2 := &ca.IntType{Val: 1}
339 args[1] = &kk.KVArg{
340 Key: "parentPortNo",
341 Value: protoArg2,
342 }
343 protoArg3 := &ca.StrType{Val: "great_onu"}
344 args[2] = &kk.KVArg{
345 Key: "childDeviceType",
346 Value: protoArg3,
347 }
348 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
349 args[3] = &kk.KVArg{
350 Key: "proxyAddress",
351 Value: protoArg4,
352 }
353 protoArg5 := &ca.IntType{Val: 1}
354 args[4] = &kk.KVArg{
355 Key: "portType",
356 Value: protoArg5,
357 }
358
359 rpc := "ChildDeviceDetected"
360 topic := kk.Topic{Name: "Core"}
361 start := time.Now()
362 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
363 elapsed := time.Since(start)
364 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
365 assert.Equal(t, status, true)
366 assert.Nil(t, result)
367}
368
369func TestChildDeviceDetectedNoWait(t *testing.T) {
370 trnsId := uuid.New().String()
371 protoArg1 := &ca.StrType{Val: trnsId}
372 args := make([]*kk.KVArg, 5)
373 args[0] = &kk.KVArg{
374 Key: "deviceID",
375 Value: protoArg1,
376 }
377 protoArg2 := &ca.IntType{Val: 1}
378 args[1] = &kk.KVArg{
379 Key: "parentPortNo",
380 Value: protoArg2,
381 }
382 protoArg3 := &ca.StrType{Val: "great_onu"}
383 args[2] = &kk.KVArg{
384 Key: "childDeviceType",
385 Value: protoArg3,
386 }
387 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
388 args[3] = &kk.KVArg{
389 Key: "proxyAddress",
390 Value: protoArg4,
391 }
392 protoArg5 := &ca.IntType{Val: 1}
393 args[4] = &kk.KVArg{
394 Key: "portType",
395 Value: protoArg5,
396 }
397
398 rpc := "ChildDeviceDetected"
399 topic := kk.Topic{Name: "Core"}
400 start := time.Now()
401 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
402 elapsed := time.Since(start)
403 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
404 assert.Equal(t, status, true)
405 assert.Nil(t, result)
406}
407
408func TestChildDeviceDetectedMissingArgs(t *testing.T) {
409 trnsId := uuid.New().String()
410 protoArg1 := &ca.StrType{Val: trnsId}
411 args := make([]*kk.KVArg, 4)
412 args[0] = &kk.KVArg{
413 Key: "deviceID",
414 Value: protoArg1,
415 }
416 protoArg2 := &ca.IntType{Val: 1}
417 args[1] = &kk.KVArg{
418 Key: "parentPortNo",
419 Value: protoArg2,
420 }
421 protoArg3 := &ca.StrType{Val: "great_onu"}
422 args[2] = &kk.KVArg{
423 Key: "childDeviceType",
424 Value: protoArg3,
425 }
426 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
427 args[3] = &kk.KVArg{
428 Key: "proxyAddress",
429 Value: protoArg4,
430 }
431
432 rpc := "ChildDeviceDetected"
433 topic := kk.Topic{Name: "Core"}
434 start := time.Now()
435 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
436 elapsed := time.Since(start)
437 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
438 assert.Equal(t, status, false)
439 unpackResult := &ca.Error{}
440 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
441 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
442 }
443 assert.NotNil(t, unpackResult)
444}
445
446func TestStopKafkaProxy(t *testing.T) {
447 adapterKafkaProxy.Stop()
448 coreKafkaProxy.Stop()
449}
450
451//func TestMain(m *testing.T) {
452// log.Info("Main")
453//}