blob: 9a73b18fd72322f5e33dbe3e4c11402a3467fc63 [file] [log] [blame]
khenaidooabad44c2018-08-03 16:58:35 -04001package kafka
2
3import (
4 "context"
5 "github.com/golang/protobuf/ptypes"
6 "github.com/opencord/voltha-go/common/log"
7 kk "github.com/opencord/voltha-go/kafka"
8 ca "github.com/opencord/voltha-go/protos/core_adapter"
9 "github.com/opencord/voltha-go/protos/voltha"
10 rhp "github.com/opencord/voltha-go/rw_core/core"
11 "github.com/stretchr/testify/assert"
12 "testing"
13 //"time"
14 "github.com/google/uuid"
15 "time"
16)
17
18var coreKafkaProxy *kk.KafkaMessagingProxy
19var adapterKafkaProxy *kk.KafkaMessagingProxy
20
21func init() {
22 if _, err := log.SetLogger(log.JSON, 1, log.Fields{"instanceId": "testing"}); err != nil {
23 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
24 }
25 coreKafkaProxy, _ = kk.NewKafkaMessagingProxy(
26 kk.KafkaHost("10.100.198.220"),
27 kk.KafkaPort(9092),
28 kk.DefaultTopic(&kk.Topic{Name: "Core"}))
29
30 adapterKafkaProxy, _ = kk.NewKafkaMessagingProxy(
31 kk.KafkaHost("10.100.198.220"),
32 kk.KafkaPort(9092),
33 kk.DefaultTopic(&kk.Topic{Name: "Adapter"}))
34
35 coreKafkaProxy.Start()
36 adapterKafkaProxy.Start()
37 subscribeTarget(coreKafkaProxy)
38}
39
40func subscribeTarget(kmp *kk.KafkaMessagingProxy) {
41 topic := kk.Topic{Name: "Core"}
42 requestProxy := &rhp.RequestHandlerProxy{TestMode: true}
43 kmp.SubscribeWithTarget(topic, requestProxy)
44}
45
46func waitForRPCMessage(topic kk.Topic, ch <-chan *ca.InterContainerMessage, doneCh chan string) {
47 for msg := range ch {
48 log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
49 // Unpack message
50 requestBody := &ca.InterContainerRequestBody{}
51 if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
52 doneCh <- "Error"
53 } else {
54 doneCh <- requestBody.Rpc
55 }
56 break
57 }
58}
59
60func TestSubscribeUnsubscribe(t *testing.T) {
61 // First subscribe to the specific topic
62 topic := kk.Topic{Name: "Core"}
63 ch, err := coreKafkaProxy.Subscribe(topic)
64 assert.NotNil(t, ch)
65 assert.Nil(t, err)
66 // Create a channel to receive a response
67 waitCh := make(chan string)
68 // Wait for a message
69 go waitForRPCMessage(topic, ch, waitCh)
70 // Send the message - don't care of the response
71 rpc := "AnyRPCRequestForTest"
72 adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
73 // Wait for the result on ouw own channel
74 result := <-waitCh
75 assert.Equal(t, result, rpc)
76 close(waitCh)
77 err = coreKafkaProxy.UnSubscribe(topic, ch)
78 assert.Nil(t, err)
79}
80
81func TestMultipleSubscribeUnsubscribe(t *testing.T) {
82 // First subscribe to the specific topic
83 var err error
84 var ch1 <-chan *ca.InterContainerMessage
85 var ch2 <-chan *ca.InterContainerMessage
86 topic := kk.Topic{Name: "Core"}
87 ch1, err = coreKafkaProxy.Subscribe(topic)
88 assert.NotNil(t, ch1)
89 assert.Nil(t, err)
90 // Create a channel to receive responses
91 waitCh := make(chan string)
92 ch2, err = coreKafkaProxy.Subscribe(topic)
93 assert.NotNil(t, ch2)
94 assert.Nil(t, err)
95 // Wait for a message
96 go waitForRPCMessage(topic, ch2, waitCh)
97 go waitForRPCMessage(topic, ch1, waitCh)
98
99 // Send the message - don't care of the response
100 rpc := "AnyRPCRequestForTest"
101 adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
102 // Wait for the result on ouw own channel
103
104 responses := 0
105 for msg := range waitCh {
106 assert.Equal(t, msg, rpc)
107 responses = responses + 1
108 if responses > 1 {
109 break
110 }
111 }
112 assert.Equal(t, responses, 2)
113 close(waitCh)
114 err = coreKafkaProxy.UnSubscribe(topic, ch1)
115 assert.Nil(t, err)
116 err = coreKafkaProxy.UnSubscribe(topic, ch2)
117 assert.Nil(t, err)
118}
119
120func TestIncorrectAPI(t *testing.T) {
121 trnsId := uuid.New().String()
122 protoMsg := &voltha.Device{Id: trnsId}
123 args := make([]*kk.KVArg, 1)
124 args[0] = &kk.KVArg{
125 Key: "device",
126 Value: protoMsg,
127 }
128 rpc := "IncorrectAPI"
129 topic := kk.Topic{Name: "Core"}
130 start := time.Now()
131 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
132 elapsed := time.Since(start)
133 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
134 assert.Equal(t, status, false)
135 //Unpack the result into the actual proto object
136 unpackResult := &ca.Error{}
137 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
138 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
139 }
140 assert.NotNil(t, unpackResult)
141}
142
143func TestIncorrectAPIParams(t *testing.T) {
144 trnsId := uuid.New().String()
145 protoMsg := &voltha.Device{Id: trnsId}
146 args := make([]*kk.KVArg, 1)
147 args[0] = &kk.KVArg{
148 Key: "device",
149 Value: protoMsg,
150 }
151 rpc := "GetDevice"
152 topic := kk.Topic{Name: "Core"}
153 start := time.Now()
154 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
155 elapsed := time.Since(start)
156 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
157 assert.Equal(t, status, false)
158 //Unpack the result into the actual proto object
159 unpackResult := &ca.Error{}
160 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
161 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
162 }
163 assert.NotNil(t, unpackResult)
164}
165
166func TestGetDevice(t *testing.T) {
167 trnsId := uuid.New().String()
168 protoMsg := &ca.StrType{Val: trnsId}
169 args := make([]*kk.KVArg, 1)
170 args[0] = &kk.KVArg{
171 Key: "deviceID",
172 Value: protoMsg,
173 }
174 rpc := "GetDevice"
175 topic := kk.Topic{Name: "Core"}
176 expectedResponse := &voltha.Device{Id: trnsId}
177 timeout := time.Duration(50) * time.Millisecond
178 ctx, cancel := context.WithTimeout(context.Background(), timeout)
179 defer cancel()
180 start := time.Now()
181 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
182 elapsed := time.Since(start)
183 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
184 assert.Equal(t, status, true)
185 unpackResult := &voltha.Device{}
186 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
187 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
188 }
189 assert.Equal(t, unpackResult, expectedResponse)
190}
191
192func TestGetDeviceTimeout(t *testing.T) {
193 trnsId := uuid.New().String()
194 protoMsg := &ca.StrType{Val: trnsId}
195 args := make([]*kk.KVArg, 1)
196 args[0] = &kk.KVArg{
197 Key: "deviceID",
198 Value: protoMsg,
199 }
200 rpc := "GetDevice"
201 topic := kk.Topic{Name: "Core"}
202 timeout := time.Duration(2) * time.Millisecond
203 ctx, cancel := context.WithTimeout(context.Background(), timeout)
204 defer cancel()
205 start := time.Now()
206 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
207 elapsed := time.Since(start)
208 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
209 assert.Equal(t, status, false)
210 unpackResult := &ca.Error{}
211 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
212 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
213 }
214 assert.NotNil(t, unpackResult)
215}
216
217func TestGetChildDevice(t *testing.T) {
218 trnsId := uuid.New().String()
219 protoMsg := &ca.StrType{Val: trnsId}
220 args := make([]*kk.KVArg, 1)
221 args[0] = &kk.KVArg{
222 Key: "deviceID",
223 Value: protoMsg,
224 }
225 rpc := "GetChildDevice"
226 topic := kk.Topic{Name: "Core"}
227 expectedResponse := &voltha.Device{Id: trnsId}
228 start := time.Now()
229 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
230 elapsed := time.Since(start)
231 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
232 assert.Equal(t, status, true)
233 unpackResult := &voltha.Device{}
234 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
235 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
236 }
237 assert.Equal(t, unpackResult, expectedResponse)
238}
239
240func TestGetChildDevices(t *testing.T) {
241 trnsId := uuid.New().String()
242 protoMsg := &ca.StrType{Val: trnsId}
243 args := make([]*kk.KVArg, 1)
244 args[0] = &kk.KVArg{
245 Key: "deviceID",
246 Value: protoMsg,
247 }
248 rpc := "GetChildDevices"
249 topic := kk.Topic{Name: "Core"}
250 expectedResponse := &voltha.Device{Id: trnsId}
251 start := time.Now()
252 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
253 elapsed := time.Since(start)
254 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
255 assert.Equal(t, status, true)
256 unpackResult := &voltha.Device{}
257 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
258 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
259 }
260 assert.Equal(t, unpackResult, expectedResponse)
261}
262
263func TestGetPorts(t *testing.T) {
264 trnsId := uuid.New().String()
265 protoArg1 := &ca.StrType{Val: trnsId}
266 args := make([]*kk.KVArg, 2)
267 args[0] = &kk.KVArg{
268 Key: "deviceID",
269 Value: protoArg1,
270 }
271 protoArg2 := &ca.IntType{Val: 1}
272 args[1] = &kk.KVArg{
273 Key: "portType",
274 Value: protoArg2,
275 }
276 rpc := "GetPorts"
277 topic := kk.Topic{Name: "Core"}
278 start := time.Now()
279 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
280 elapsed := time.Since(start)
281 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
282 assert.Equal(t, status, true)
283 unpackResult := &voltha.Ports{}
284 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
285 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
286 }
287 expectedLen := len(unpackResult.Items) >= 1
288 assert.Equal(t, true, expectedLen)
289}
290
291func TestGetPortsMissingArgs(t *testing.T) {
292 trnsId := uuid.New().String()
293 protoArg1 := &ca.StrType{Val: trnsId}
294 args := make([]*kk.KVArg, 1)
295 args[0] = &kk.KVArg{
296 Key: "deviceID",
297 Value: protoArg1,
298 }
299 rpc := "GetPorts"
300 topic := kk.Topic{Name: "Core"}
301 start := time.Now()
302 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
303 elapsed := time.Since(start)
304 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
305 assert.Equal(t, status, false)
306 //Unpack the result into the actual proto object
307 unpackResult := &ca.Error{}
308 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
309 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
310 }
311 assert.NotNil(t, unpackResult)
312}
313
314func TestChildDeviceDetected(t *testing.T) {
315 trnsId := uuid.New().String()
316 protoArg1 := &ca.StrType{Val: trnsId}
317 args := make([]*kk.KVArg, 5)
318 args[0] = &kk.KVArg{
319 Key: "deviceID",
320 Value: protoArg1,
321 }
322 protoArg2 := &ca.IntType{Val: 1}
323 args[1] = &kk.KVArg{
324 Key: "parentPortNo",
325 Value: protoArg2,
326 }
327 protoArg3 := &ca.StrType{Val: "great_onu"}
328 args[2] = &kk.KVArg{
329 Key: "childDeviceType",
330 Value: protoArg3,
331 }
332 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
333 args[3] = &kk.KVArg{
334 Key: "proxyAddress",
335 Value: protoArg4,
336 }
337 protoArg5 := &ca.IntType{Val: 1}
338 args[4] = &kk.KVArg{
339 Key: "portType",
340 Value: protoArg5,
341 }
342
343 rpc := "ChildDeviceDetected"
344 topic := kk.Topic{Name: "Core"}
345 start := time.Now()
346 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
347 elapsed := time.Since(start)
348 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
349 assert.Equal(t, status, true)
350 assert.Nil(t, result)
351}
352
353func TestChildDeviceDetectedNoWait(t *testing.T) {
354 trnsId := uuid.New().String()
355 protoArg1 := &ca.StrType{Val: trnsId}
356 args := make([]*kk.KVArg, 5)
357 args[0] = &kk.KVArg{
358 Key: "deviceID",
359 Value: protoArg1,
360 }
361 protoArg2 := &ca.IntType{Val: 1}
362 args[1] = &kk.KVArg{
363 Key: "parentPortNo",
364 Value: protoArg2,
365 }
366 protoArg3 := &ca.StrType{Val: "great_onu"}
367 args[2] = &kk.KVArg{
368 Key: "childDeviceType",
369 Value: protoArg3,
370 }
371 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
372 args[3] = &kk.KVArg{
373 Key: "proxyAddress",
374 Value: protoArg4,
375 }
376 protoArg5 := &ca.IntType{Val: 1}
377 args[4] = &kk.KVArg{
378 Key: "portType",
379 Value: protoArg5,
380 }
381
382 rpc := "ChildDeviceDetected"
383 topic := kk.Topic{Name: "Core"}
384 start := time.Now()
385 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
386 elapsed := time.Since(start)
387 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
388 assert.Equal(t, status, true)
389 assert.Nil(t, result)
390}
391
392func TestChildDeviceDetectedMissingArgs(t *testing.T) {
393 trnsId := uuid.New().String()
394 protoArg1 := &ca.StrType{Val: trnsId}
395 args := make([]*kk.KVArg, 4)
396 args[0] = &kk.KVArg{
397 Key: "deviceID",
398 Value: protoArg1,
399 }
400 protoArg2 := &ca.IntType{Val: 1}
401 args[1] = &kk.KVArg{
402 Key: "parentPortNo",
403 Value: protoArg2,
404 }
405 protoArg3 := &ca.StrType{Val: "great_onu"}
406 args[2] = &kk.KVArg{
407 Key: "childDeviceType",
408 Value: protoArg3,
409 }
410 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
411 args[3] = &kk.KVArg{
412 Key: "proxyAddress",
413 Value: protoArg4,
414 }
415
416 rpc := "ChildDeviceDetected"
417 topic := kk.Topic{Name: "Core"}
418 start := time.Now()
419 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
420 elapsed := time.Since(start)
421 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
422 assert.Equal(t, status, false)
423 unpackResult := &ca.Error{}
424 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
425 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
426 }
427 assert.NotNil(t, unpackResult)
428}
429
430func TestStopKafkaProxy(t *testing.T) {
431 adapterKafkaProxy.Stop()
432 coreKafkaProxy.Stop()
433}
434
435//func TestMain(m *testing.T) {
436// log.Info("Main")
437//}