blob: 5553d14f447337713b388bccacfd4d519eba5381 [file] [log] [blame]
khenaidooabad44c2018-08-03 16:58:35 -04001package kafka
2
3import (
4 "context"
5 "github.com/golang/protobuf/ptypes"
khenaidood4d922e2018-08-03 22:35:16 -04006 "github.com/google/uuid"
khenaidooabad44c2018-08-03 16:58:35 -04007 "github.com/opencord/voltha-go/common/log"
8 kk "github.com/opencord/voltha-go/kafka"
9 ca "github.com/opencord/voltha-go/protos/core_adapter"
10 "github.com/opencord/voltha-go/protos/voltha"
11 rhp "github.com/opencord/voltha-go/rw_core/core"
12 "github.com/stretchr/testify/assert"
13 "testing"
khenaidooabad44c2018-08-03 16:58:35 -040014 "time"
15)
16
17var coreKafkaProxy *kk.KafkaMessagingProxy
18var adapterKafkaProxy *kk.KafkaMessagingProxy
19
20func init() {
khenaidood4d922e2018-08-03 22:35:16 -040021 if _, err := log.SetLogger(log.JSON, 3, log.Fields{"instanceId": "testing"}); err != nil {
khenaidooabad44c2018-08-03 16:58:35 -040022 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
23 }
24 coreKafkaProxy, _ = kk.NewKafkaMessagingProxy(
25 kk.KafkaHost("10.100.198.220"),
26 kk.KafkaPort(9092),
27 kk.DefaultTopic(&kk.Topic{Name: "Core"}))
28
29 adapterKafkaProxy, _ = kk.NewKafkaMessagingProxy(
30 kk.KafkaHost("10.100.198.220"),
31 kk.KafkaPort(9092),
32 kk.DefaultTopic(&kk.Topic{Name: "Adapter"}))
33
34 coreKafkaProxy.Start()
35 adapterKafkaProxy.Start()
36 subscribeTarget(coreKafkaProxy)
37}
38
39func subscribeTarget(kmp *kk.KafkaMessagingProxy) {
40 topic := kk.Topic{Name: "Core"}
41 requestProxy := &rhp.RequestHandlerProxy{TestMode: true}
42 kmp.SubscribeWithTarget(topic, requestProxy)
43}
44
45func waitForRPCMessage(topic kk.Topic, ch <-chan *ca.InterContainerMessage, doneCh chan string) {
46 for msg := range ch {
47 log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
48 // Unpack message
49 requestBody := &ca.InterContainerRequestBody{}
50 if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
51 doneCh <- "Error"
52 } else {
53 doneCh <- requestBody.Rpc
54 }
55 break
56 }
57}
58
59func TestSubscribeUnsubscribe(t *testing.T) {
60 // First subscribe to the specific topic
61 topic := kk.Topic{Name: "Core"}
62 ch, err := coreKafkaProxy.Subscribe(topic)
63 assert.NotNil(t, ch)
64 assert.Nil(t, err)
65 // Create a channel to receive a response
66 waitCh := make(chan string)
67 // Wait for a message
68 go waitForRPCMessage(topic, ch, waitCh)
69 // Send the message - don't care of the response
70 rpc := "AnyRPCRequestForTest"
71 adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
72 // Wait for the result on ouw own channel
73 result := <-waitCh
74 assert.Equal(t, result, rpc)
75 close(waitCh)
76 err = coreKafkaProxy.UnSubscribe(topic, ch)
77 assert.Nil(t, err)
78}
79
80func TestMultipleSubscribeUnsubscribe(t *testing.T) {
81 // First subscribe to the specific topic
khenaidood4d922e2018-08-03 22:35:16 -040082 log.SetLoglevel(0)
khenaidooabad44c2018-08-03 16:58:35 -040083 var err error
84 var ch1 <-chan *ca.InterContainerMessage
85 var ch2 <-chan *ca.InterContainerMessage
86 topic := kk.Topic{Name: "Core"}
87 ch1, err = coreKafkaProxy.Subscribe(topic)
88 assert.NotNil(t, ch1)
89 assert.Nil(t, err)
90 // Create a channel to receive responses
91 waitCh := make(chan string)
92 ch2, err = coreKafkaProxy.Subscribe(topic)
93 assert.NotNil(t, ch2)
94 assert.Nil(t, err)
95 // Wait for a message
96 go waitForRPCMessage(topic, ch2, waitCh)
97 go waitForRPCMessage(topic, ch1, waitCh)
98
99 // Send the message - don't care of the response
100 rpc := "AnyRPCRequestForTest"
101 adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
102 // Wait for the result on ouw own channel
103
104 responses := 0
105 for msg := range waitCh {
106 assert.Equal(t, msg, rpc)
107 responses = responses + 1
108 if responses > 1 {
109 break
110 }
111 }
112 assert.Equal(t, responses, 2)
113 close(waitCh)
114 err = coreKafkaProxy.UnSubscribe(topic, ch1)
115 assert.Nil(t, err)
116 err = coreKafkaProxy.UnSubscribe(topic, ch2)
117 assert.Nil(t, err)
118}
119
120func TestIncorrectAPI(t *testing.T) {
khenaidood4d922e2018-08-03 22:35:16 -0400121 log.SetLoglevel(3)
khenaidooabad44c2018-08-03 16:58:35 -0400122 trnsId := uuid.New().String()
123 protoMsg := &voltha.Device{Id: trnsId}
124 args := make([]*kk.KVArg, 1)
125 args[0] = &kk.KVArg{
126 Key: "device",
127 Value: protoMsg,
128 }
129 rpc := "IncorrectAPI"
130 topic := kk.Topic{Name: "Core"}
131 start := time.Now()
132 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
133 elapsed := time.Since(start)
134 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
135 assert.Equal(t, status, false)
136 //Unpack the result into the actual proto object
137 unpackResult := &ca.Error{}
138 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
139 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
140 }
141 assert.NotNil(t, unpackResult)
142}
143
144func TestIncorrectAPIParams(t *testing.T) {
145 trnsId := uuid.New().String()
146 protoMsg := &voltha.Device{Id: trnsId}
147 args := make([]*kk.KVArg, 1)
148 args[0] = &kk.KVArg{
149 Key: "device",
150 Value: protoMsg,
151 }
152 rpc := "GetDevice"
153 topic := kk.Topic{Name: "Core"}
154 start := time.Now()
155 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
156 elapsed := time.Since(start)
157 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
158 assert.Equal(t, status, false)
159 //Unpack the result into the actual proto object
160 unpackResult := &ca.Error{}
161 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
162 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
163 }
164 assert.NotNil(t, unpackResult)
165}
166
167func TestGetDevice(t *testing.T) {
168 trnsId := uuid.New().String()
169 protoMsg := &ca.StrType{Val: trnsId}
170 args := make([]*kk.KVArg, 1)
171 args[0] = &kk.KVArg{
172 Key: "deviceID",
173 Value: protoMsg,
174 }
175 rpc := "GetDevice"
176 topic := kk.Topic{Name: "Core"}
177 expectedResponse := &voltha.Device{Id: trnsId}
178 timeout := time.Duration(50) * time.Millisecond
179 ctx, cancel := context.WithTimeout(context.Background(), timeout)
180 defer cancel()
181 start := time.Now()
182 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
183 elapsed := time.Since(start)
184 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
185 assert.Equal(t, status, true)
186 unpackResult := &voltha.Device{}
187 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
188 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
189 }
190 assert.Equal(t, unpackResult, expectedResponse)
191}
192
193func TestGetDeviceTimeout(t *testing.T) {
194 trnsId := uuid.New().String()
195 protoMsg := &ca.StrType{Val: trnsId}
196 args := make([]*kk.KVArg, 1)
197 args[0] = &kk.KVArg{
198 Key: "deviceID",
199 Value: protoMsg,
200 }
201 rpc := "GetDevice"
202 topic := kk.Topic{Name: "Core"}
203 timeout := time.Duration(2) * time.Millisecond
204 ctx, cancel := context.WithTimeout(context.Background(), timeout)
205 defer cancel()
206 start := time.Now()
207 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
208 elapsed := time.Since(start)
209 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
210 assert.Equal(t, status, false)
211 unpackResult := &ca.Error{}
212 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
213 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
214 }
215 assert.NotNil(t, unpackResult)
216}
217
218func TestGetChildDevice(t *testing.T) {
219 trnsId := uuid.New().String()
220 protoMsg := &ca.StrType{Val: trnsId}
221 args := make([]*kk.KVArg, 1)
222 args[0] = &kk.KVArg{
223 Key: "deviceID",
224 Value: protoMsg,
225 }
226 rpc := "GetChildDevice"
227 topic := kk.Topic{Name: "Core"}
228 expectedResponse := &voltha.Device{Id: trnsId}
229 start := time.Now()
230 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
231 elapsed := time.Since(start)
232 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
233 assert.Equal(t, status, true)
234 unpackResult := &voltha.Device{}
235 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
236 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
237 }
238 assert.Equal(t, unpackResult, expectedResponse)
239}
240
241func TestGetChildDevices(t *testing.T) {
242 trnsId := uuid.New().String()
243 protoMsg := &ca.StrType{Val: trnsId}
244 args := make([]*kk.KVArg, 1)
245 args[0] = &kk.KVArg{
246 Key: "deviceID",
247 Value: protoMsg,
248 }
249 rpc := "GetChildDevices"
250 topic := kk.Topic{Name: "Core"}
251 expectedResponse := &voltha.Device{Id: trnsId}
252 start := time.Now()
253 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
254 elapsed := time.Since(start)
255 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
256 assert.Equal(t, status, true)
257 unpackResult := &voltha.Device{}
258 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
259 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
260 }
261 assert.Equal(t, unpackResult, expectedResponse)
262}
263
264func TestGetPorts(t *testing.T) {
265 trnsId := uuid.New().String()
266 protoArg1 := &ca.StrType{Val: trnsId}
267 args := make([]*kk.KVArg, 2)
268 args[0] = &kk.KVArg{
269 Key: "deviceID",
270 Value: protoArg1,
271 }
272 protoArg2 := &ca.IntType{Val: 1}
273 args[1] = &kk.KVArg{
274 Key: "portType",
275 Value: protoArg2,
276 }
277 rpc := "GetPorts"
278 topic := kk.Topic{Name: "Core"}
279 start := time.Now()
280 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
281 elapsed := time.Since(start)
282 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
283 assert.Equal(t, status, true)
284 unpackResult := &voltha.Ports{}
285 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
286 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
287 }
288 expectedLen := len(unpackResult.Items) >= 1
289 assert.Equal(t, true, expectedLen)
290}
291
292func TestGetPortsMissingArgs(t *testing.T) {
293 trnsId := uuid.New().String()
294 protoArg1 := &ca.StrType{Val: trnsId}
295 args := make([]*kk.KVArg, 1)
296 args[0] = &kk.KVArg{
297 Key: "deviceID",
298 Value: protoArg1,
299 }
300 rpc := "GetPorts"
301 topic := kk.Topic{Name: "Core"}
302 start := time.Now()
303 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
304 elapsed := time.Since(start)
305 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
306 assert.Equal(t, status, false)
307 //Unpack the result into the actual proto object
308 unpackResult := &ca.Error{}
309 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
310 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
311 }
312 assert.NotNil(t, unpackResult)
313}
314
315func TestChildDeviceDetected(t *testing.T) {
316 trnsId := uuid.New().String()
317 protoArg1 := &ca.StrType{Val: trnsId}
318 args := make([]*kk.KVArg, 5)
319 args[0] = &kk.KVArg{
320 Key: "deviceID",
321 Value: protoArg1,
322 }
323 protoArg2 := &ca.IntType{Val: 1}
324 args[1] = &kk.KVArg{
325 Key: "parentPortNo",
326 Value: protoArg2,
327 }
328 protoArg3 := &ca.StrType{Val: "great_onu"}
329 args[2] = &kk.KVArg{
330 Key: "childDeviceType",
331 Value: protoArg3,
332 }
333 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
334 args[3] = &kk.KVArg{
335 Key: "proxyAddress",
336 Value: protoArg4,
337 }
338 protoArg5 := &ca.IntType{Val: 1}
339 args[4] = &kk.KVArg{
340 Key: "portType",
341 Value: protoArg5,
342 }
343
344 rpc := "ChildDeviceDetected"
345 topic := kk.Topic{Name: "Core"}
346 start := time.Now()
347 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
348 elapsed := time.Since(start)
349 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
350 assert.Equal(t, status, true)
351 assert.Nil(t, result)
352}
353
354func TestChildDeviceDetectedNoWait(t *testing.T) {
355 trnsId := uuid.New().String()
356 protoArg1 := &ca.StrType{Val: trnsId}
357 args := make([]*kk.KVArg, 5)
358 args[0] = &kk.KVArg{
359 Key: "deviceID",
360 Value: protoArg1,
361 }
362 protoArg2 := &ca.IntType{Val: 1}
363 args[1] = &kk.KVArg{
364 Key: "parentPortNo",
365 Value: protoArg2,
366 }
367 protoArg3 := &ca.StrType{Val: "great_onu"}
368 args[2] = &kk.KVArg{
369 Key: "childDeviceType",
370 Value: protoArg3,
371 }
372 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
373 args[3] = &kk.KVArg{
374 Key: "proxyAddress",
375 Value: protoArg4,
376 }
377 protoArg5 := &ca.IntType{Val: 1}
378 args[4] = &kk.KVArg{
379 Key: "portType",
380 Value: protoArg5,
381 }
382
383 rpc := "ChildDeviceDetected"
384 topic := kk.Topic{Name: "Core"}
385 start := time.Now()
386 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
387 elapsed := time.Since(start)
388 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
389 assert.Equal(t, status, true)
390 assert.Nil(t, result)
391}
392
393func TestChildDeviceDetectedMissingArgs(t *testing.T) {
394 trnsId := uuid.New().String()
395 protoArg1 := &ca.StrType{Val: trnsId}
396 args := make([]*kk.KVArg, 4)
397 args[0] = &kk.KVArg{
398 Key: "deviceID",
399 Value: protoArg1,
400 }
401 protoArg2 := &ca.IntType{Val: 1}
402 args[1] = &kk.KVArg{
403 Key: "parentPortNo",
404 Value: protoArg2,
405 }
406 protoArg3 := &ca.StrType{Val: "great_onu"}
407 args[2] = &kk.KVArg{
408 Key: "childDeviceType",
409 Value: protoArg3,
410 }
411 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
412 args[3] = &kk.KVArg{
413 Key: "proxyAddress",
414 Value: protoArg4,
415 }
416
417 rpc := "ChildDeviceDetected"
418 topic := kk.Topic{Name: "Core"}
419 start := time.Now()
420 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
421 elapsed := time.Since(start)
422 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
423 assert.Equal(t, status, false)
424 unpackResult := &ca.Error{}
425 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
426 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
427 }
428 assert.NotNil(t, unpackResult)
429}
430
431func TestStopKafkaProxy(t *testing.T) {
432 adapterKafkaProxy.Stop()
433 coreKafkaProxy.Stop()
434}
435
436//func TestMain(m *testing.T) {
437// log.Info("Main")
438//}