blob: 7a7ce27f4205f5caf0e84ad13791c2662a621781 [file] [log] [blame]
Kent Hagerman0ab4cb22019-04-24 13:13:35 -04001// +build integration
2
khenaidoobf6e7bb2018-08-14 22:27:29 -04003/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
khenaidooabad44c2018-08-03 16:58:35 -040018package kafka
19
20import (
21 "context"
22 "github.com/golang/protobuf/ptypes"
khenaidood4d922e2018-08-03 22:35:16 -040023 "github.com/google/uuid"
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040024 rhp "github.com/opencord/voltha-go/rw_core/core"
Scott Baker807addd2019-10-24 15:16:21 -070025 kk "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
26 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Baker555307d2019-11-04 08:58:01 -080027 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
28 "github.com/opencord/voltha-protos/v2/go/voltha"
khenaidooabad44c2018-08-03 16:58:35 -040029 "github.com/stretchr/testify/assert"
khenaidoo79232702018-12-04 11:00:41 -050030 "os"
khenaidooabad44c2018-08-03 16:58:35 -040031 "testing"
khenaidooabad44c2018-08-03 16:58:35 -040032 "time"
33)
34
khenaidoo2c6f1672018-09-20 23:14:41 -040035/*
36Prerequite: Start the kafka/zookeeper containers.
37*/
38
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040039const (
40 TEST_RPC_KEY = ""
41)
42
khenaidoo43c82122018-11-22 18:38:28 -050043var coreKafkaProxy *kk.InterContainerProxy
44var adapterKafkaProxy *kk.InterContainerProxy
khenaidoo79232702018-12-04 11:00:41 -050045var kafkaPartitionClient kk.Client
46var affinityRouterTopic string
47var hostIP string
48var kafkaClient kk.Client
khenaidooabad44c2018-08-03 16:58:35 -040049
50func init() {
khenaidoo2c6f1672018-09-20 23:14:41 -040051 log.AddPackage(log.JSON, log.ErrorLevel, nil)
52 log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
53 log.SetAllLogLevel(log.ErrorLevel)
khenaidoo79232702018-12-04 11:00:41 -050054 affinityRouterTopic = "AffinityRouter"
55 hostIP = os.Getenv("DOCKER_HOST_IP")
56 kafkaClient = kk.NewSaramaClient(
57 kk.Host(hostIP),
khenaidoo43c82122018-11-22 18:38:28 -050058 kk.Port(9092))
khenaidoo2c6f1672018-09-20 23:14:41 -040059
khenaidoo43c82122018-11-22 18:38:28 -050060 coreKafkaProxy, _ = kk.NewInterContainerProxy(
khenaidoo79232702018-12-04 11:00:41 -050061 kk.InterContainerHost(hostIP),
khenaidoo43c82122018-11-22 18:38:28 -050062 kk.InterContainerPort(9092),
63 kk.DefaultTopic(&kk.Topic{Name: "Core"}),
khenaidoo79232702018-12-04 11:00:41 -050064 kk.MsgClient(kafkaClient),
65 kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
khenaidooabad44c2018-08-03 16:58:35 -040066
khenaidoo43c82122018-11-22 18:38:28 -050067 adapterKafkaProxy, _ = kk.NewInterContainerProxy(
khenaidoo79232702018-12-04 11:00:41 -050068 kk.InterContainerHost(hostIP),
khenaidoo43c82122018-11-22 18:38:28 -050069 kk.InterContainerPort(9092),
70 kk.DefaultTopic(&kk.Topic{Name: "Adapter"}),
71 kk.MsgClient(kafkaClient))
khenaidooabad44c2018-08-03 16:58:35 -040072
khenaidoo79232702018-12-04 11:00:41 -050073 kafkaPartitionClient = kk.NewSaramaClient(
74 kk.ConsumerType(kk.PartitionConsumer),
75 kk.Host(hostIP),
76 kk.Port(9092),
77 kk.AutoCreateTopic(true),
78 kk.ProducerFlushFrequency(5))
79 kafkaPartitionClient.Start()
80
khenaidooabad44c2018-08-03 16:58:35 -040081 coreKafkaProxy.Start()
82 adapterKafkaProxy.Start()
83 subscribeTarget(coreKafkaProxy)
84}
85
khenaidoo43c82122018-11-22 18:38:28 -050086func subscribeTarget(kmp *kk.InterContainerProxy) {
khenaidooabad44c2018-08-03 16:58:35 -040087 topic := kk.Topic{Name: "Core"}
khenaidoo2c6f1672018-09-20 23:14:41 -040088 requestProxy := &rhp.AdapterRequestHandlerProxy{TestMode: true}
khenaidoo43c82122018-11-22 18:38:28 -050089 kmp.SubscribeWithRequestHandlerInterface(topic, requestProxy)
khenaidooabad44c2018-08-03 16:58:35 -040090}
91
khenaidoo79232702018-12-04 11:00:41 -050092func waitForRPCMessage(topic kk.Topic, ch <-chan *ic.InterContainerMessage, doneCh chan string) {
khenaidooabad44c2018-08-03 16:58:35 -040093 for msg := range ch {
94 log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
95 // Unpack message
khenaidoo79232702018-12-04 11:00:41 -050096 requestBody := &ic.InterContainerRequestBody{}
khenaidooabad44c2018-08-03 16:58:35 -040097 if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
98 doneCh <- "Error"
99 } else {
100 doneCh <- requestBody.Rpc
101 }
102 break
103 }
104}
105
khenaidoo43c82122018-11-22 18:38:28 -0500106//func TestSubscribeUnsubscribe(t *testing.T) {
107// // First subscribe to the specific topic
108// topic := kk.Topic{Name: "Core"}
109// ch, err := coreKafkaProxy.Subs(topic)
110// assert.NotNil(t, ch)
111// assert.Nil(t, err)
112// // Create a channel to receive a response
113// waitCh := make(chan string)
114// // Wait for a message
115// go waitForRPCMessage(topic, ch, waitCh)
116// // Send the message - don't care of the response
117// rpc := "AnyRPCRequestForTest"
118// adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
119// // Wait for the result on ouw own channel
120// result := <-waitCh
121// assert.Equal(t, result, rpc)
122// close(waitCh)
123// err = coreKafkaProxy.UnSubscribe(topic, ch)
124// assert.Nil(t, err)
125//}
126//
127//func TestMultipleSubscribeUnsubscribe(t *testing.T) {
128// // First subscribe to the specific topic
Scott Baker807addd2019-10-24 15:16:21 -0700129// //log.SetPackageLogLevel("github.com/opencord/voltha-lib-go/v2/pkg/kafka", log.DebugLevel)
khenaidoo43c82122018-11-22 18:38:28 -0500130// var err error
khenaidoo79232702018-12-04 11:00:41 -0500131// var ch1 <-chan *ic.InterContainerMessage
132// var ch2 <-chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500133// topic := kk.Topic{Name: "Core"}
134// ch1, err = coreKafkaProxy.Subscribe(topic)
135// assert.NotNil(t, ch1)
136// assert.Nil(t, err)
137// // Create a channel to receive responses
138// waitCh := make(chan string)
139// ch2, err = coreKafkaProxy.Subscribe(topic)
140// assert.NotNil(t, ch2)
141// assert.Nil(t, err)
142// // Wait for a message
143// go waitForRPCMessage(topic, ch2, waitCh)
144// go waitForRPCMessage(topic, ch1, waitCh)
145//
146// // Send the message - don't care of the response
147// rpc := "AnyRPCRequestForTest"
148// adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
149// // Wait for the result on ouw own channel
150//
151// responses := 0
152// for msg := range waitCh {
153// assert.Equal(t, msg, rpc)
154// responses = responses + 1
155// if responses > 1 {
156// break
157// }
158// }
159// assert.Equal(t, responses, 2)
160// close(waitCh)
161// err = coreKafkaProxy.UnSubscribe(topic, ch1)
162// assert.Nil(t, err)
163// err = coreKafkaProxy.UnSubscribe(topic, ch2)
164// assert.Nil(t, err)
165//}
khenaidooabad44c2018-08-03 16:58:35 -0400166
167func TestIncorrectAPI(t *testing.T) {
Scott Baker807addd2019-10-24 15:16:21 -0700168 log.SetPackageLogLevel("github.com/opencord/voltha-lib-go/v2/pkg/kafka", log.ErrorLevel)
khenaidooabad44c2018-08-03 16:58:35 -0400169 trnsId := uuid.New().String()
170 protoMsg := &voltha.Device{Id: trnsId}
171 args := make([]*kk.KVArg, 1)
172 args[0] = &kk.KVArg{
173 Key: "device",
174 Value: protoMsg,
175 }
176 rpc := "IncorrectAPI"
177 topic := kk.Topic{Name: "Core"}
178 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400179 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400180 elapsed := time.Since(start)
181 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
182 assert.Equal(t, status, false)
183 //Unpack the result into the actual proto object
khenaidoo79232702018-12-04 11:00:41 -0500184 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400185 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
186 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
187 }
188 assert.NotNil(t, unpackResult)
189}
190
191func TestIncorrectAPIParams(t *testing.T) {
192 trnsId := uuid.New().String()
193 protoMsg := &voltha.Device{Id: trnsId}
194 args := make([]*kk.KVArg, 1)
195 args[0] = &kk.KVArg{
196 Key: "device",
197 Value: protoMsg,
198 }
199 rpc := "GetDevice"
200 topic := kk.Topic{Name: "Core"}
201 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400202 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400203 elapsed := time.Since(start)
204 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
205 assert.Equal(t, status, false)
206 //Unpack the result into the actual proto object
khenaidoo79232702018-12-04 11:00:41 -0500207 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400208 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
209 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
210 }
211 assert.NotNil(t, unpackResult)
212}
213
214func TestGetDevice(t *testing.T) {
215 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400216 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400217 args := make([]*kk.KVArg, 1)
218 args[0] = &kk.KVArg{
219 Key: "deviceID",
220 Value: protoMsg,
221 }
222 rpc := "GetDevice"
223 topic := kk.Topic{Name: "Core"}
224 expectedResponse := &voltha.Device{Id: trnsId}
225 timeout := time.Duration(50) * time.Millisecond
226 ctx, cancel := context.WithTimeout(context.Background(), timeout)
227 defer cancel()
228 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400229 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400230 elapsed := time.Since(start)
231 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
232 assert.Equal(t, status, true)
233 unpackResult := &voltha.Device{}
234 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
235 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
236 }
237 assert.Equal(t, unpackResult, expectedResponse)
238}
239
240func TestGetDeviceTimeout(t *testing.T) {
241 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400242 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400243 args := make([]*kk.KVArg, 1)
244 args[0] = &kk.KVArg{
245 Key: "deviceID",
246 Value: protoMsg,
247 }
248 rpc := "GetDevice"
249 topic := kk.Topic{Name: "Core"}
250 timeout := time.Duration(2) * time.Millisecond
251 ctx, cancel := context.WithTimeout(context.Background(), timeout)
252 defer cancel()
253 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400254 status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400255 elapsed := time.Since(start)
256 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
257 assert.Equal(t, status, false)
khenaidoo79232702018-12-04 11:00:41 -0500258 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400259 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
260 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
261 }
262 assert.NotNil(t, unpackResult)
263}
264
265func TestGetChildDevice(t *testing.T) {
266 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400267 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400268 args := make([]*kk.KVArg, 1)
269 args[0] = &kk.KVArg{
270 Key: "deviceID",
271 Value: protoMsg,
272 }
273 rpc := "GetChildDevice"
274 topic := kk.Topic{Name: "Core"}
275 expectedResponse := &voltha.Device{Id: trnsId}
276 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400277 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400278 elapsed := time.Since(start)
279 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
280 assert.Equal(t, status, true)
281 unpackResult := &voltha.Device{}
282 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
283 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
284 }
285 assert.Equal(t, unpackResult, expectedResponse)
286}
287
288func TestGetChildDevices(t *testing.T) {
289 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400290 protoMsg := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400291 args := make([]*kk.KVArg, 1)
292 args[0] = &kk.KVArg{
293 Key: "deviceID",
294 Value: protoMsg,
295 }
296 rpc := "GetChildDevices"
297 topic := kk.Topic{Name: "Core"}
298 expectedResponse := &voltha.Device{Id: trnsId}
299 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400300 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400301 elapsed := time.Since(start)
302 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
303 assert.Equal(t, status, true)
304 unpackResult := &voltha.Device{}
305 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
306 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
307 }
308 assert.Equal(t, unpackResult, expectedResponse)
309}
310
311func TestGetPorts(t *testing.T) {
312 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400313 protoArg1 := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400314 args := make([]*kk.KVArg, 2)
315 args[0] = &kk.KVArg{
316 Key: "deviceID",
317 Value: protoArg1,
318 }
khenaidoo79232702018-12-04 11:00:41 -0500319 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400320 args[1] = &kk.KVArg{
321 Key: "portType",
322 Value: protoArg2,
323 }
324 rpc := "GetPorts"
325 topic := kk.Topic{Name: "Core"}
326 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400327 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400328 elapsed := time.Since(start)
329 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
330 assert.Equal(t, status, true)
331 unpackResult := &voltha.Ports{}
332 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
333 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
334 }
335 expectedLen := len(unpackResult.Items) >= 1
336 assert.Equal(t, true, expectedLen)
337}
338
339func TestGetPortsMissingArgs(t *testing.T) {
340 trnsId := uuid.New().String()
khenaidoo92e62c52018-10-03 14:02:54 -0400341 protoArg1 := &voltha.ID{Id: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400342 args := make([]*kk.KVArg, 1)
343 args[0] = &kk.KVArg{
344 Key: "deviceID",
345 Value: protoArg1,
346 }
347 rpc := "GetPorts"
348 topic := kk.Topic{Name: "Core"}
349 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400350 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400351 elapsed := time.Since(start)
352 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
353 assert.Equal(t, status, false)
354 //Unpack the result into the actual proto object
khenaidoo79232702018-12-04 11:00:41 -0500355 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400356 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
357 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
358 }
359 assert.NotNil(t, unpackResult)
360}
361
362func TestChildDeviceDetected(t *testing.T) {
363 trnsId := uuid.New().String()
khenaidoo79232702018-12-04 11:00:41 -0500364 protoArg1 := &ic.StrType{Val: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400365 args := make([]*kk.KVArg, 5)
366 args[0] = &kk.KVArg{
367 Key: "deviceID",
368 Value: protoArg1,
369 }
khenaidoo79232702018-12-04 11:00:41 -0500370 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400371 args[1] = &kk.KVArg{
372 Key: "parentPortNo",
373 Value: protoArg2,
374 }
khenaidoo79232702018-12-04 11:00:41 -0500375 protoArg3 := &ic.StrType{Val: "great_onu"}
khenaidooabad44c2018-08-03 16:58:35 -0400376 args[2] = &kk.KVArg{
377 Key: "childDeviceType",
378 Value: protoArg3,
379 }
380 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
381 args[3] = &kk.KVArg{
382 Key: "proxyAddress",
383 Value: protoArg4,
384 }
khenaidoo79232702018-12-04 11:00:41 -0500385 protoArg5 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400386 args[4] = &kk.KVArg{
387 Key: "portType",
388 Value: protoArg5,
389 }
390
391 rpc := "ChildDeviceDetected"
392 topic := kk.Topic{Name: "Core"}
393 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400394 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400395 elapsed := time.Since(start)
396 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
397 assert.Equal(t, status, true)
398 assert.Nil(t, result)
399}
400
401func TestChildDeviceDetectedNoWait(t *testing.T) {
402 trnsId := uuid.New().String()
khenaidoo79232702018-12-04 11:00:41 -0500403 protoArg1 := &ic.StrType{Val: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400404 args := make([]*kk.KVArg, 5)
405 args[0] = &kk.KVArg{
406 Key: "deviceID",
407 Value: protoArg1,
408 }
khenaidoo79232702018-12-04 11:00:41 -0500409 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400410 args[1] = &kk.KVArg{
411 Key: "parentPortNo",
412 Value: protoArg2,
413 }
khenaidoo79232702018-12-04 11:00:41 -0500414 protoArg3 := &ic.StrType{Val: "great_onu"}
khenaidooabad44c2018-08-03 16:58:35 -0400415 args[2] = &kk.KVArg{
416 Key: "childDeviceType",
417 Value: protoArg3,
418 }
419 protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
420 args[3] = &kk.KVArg{
421 Key: "proxyAddress",
422 Value: protoArg4,
423 }
khenaidoo79232702018-12-04 11:00:41 -0500424 protoArg5 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400425 args[4] = &kk.KVArg{
426 Key: "portType",
427 Value: protoArg5,
428 }
429
430 rpc := "ChildDeviceDetected"
431 topic := kk.Topic{Name: "Core"}
432 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400433 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, false, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400434 elapsed := time.Since(start)
435 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
436 assert.Equal(t, status, true)
437 assert.Nil(t, result)
438}
439
440func TestChildDeviceDetectedMissingArgs(t *testing.T) {
441 trnsId := uuid.New().String()
khenaidoo79232702018-12-04 11:00:41 -0500442 protoArg1 := &ic.StrType{Val: trnsId}
khenaidooabad44c2018-08-03 16:58:35 -0400443 args := make([]*kk.KVArg, 4)
444 args[0] = &kk.KVArg{
445 Key: "deviceID",
446 Value: protoArg1,
447 }
khenaidoo79232702018-12-04 11:00:41 -0500448 protoArg2 := &ic.IntType{Val: 1}
khenaidooabad44c2018-08-03 16:58:35 -0400449 args[1] = &kk.KVArg{
450 Key: "parentPortNo",
451 Value: protoArg2,
452 }
khenaidoo79232702018-12-04 11:00:41 -0500453 protoArg3 := &ic.StrType{Val: "great_onu"}
khenaidooabad44c2018-08-03 16:58:35 -0400454 args[2] = &kk.KVArg{
455 Key: "childDeviceType",
456 Value: protoArg3,
457 }
khenaidooabad44c2018-08-03 16:58:35 -0400458
459 rpc := "ChildDeviceDetected"
460 topic := kk.Topic{Name: "Core"}
461 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400462 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidooabad44c2018-08-03 16:58:35 -0400463 elapsed := time.Since(start)
464 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
465 assert.Equal(t, status, false)
khenaidoo79232702018-12-04 11:00:41 -0500466 unpackResult := &ic.Error{}
khenaidooabad44c2018-08-03 16:58:35 -0400467 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
468 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
469 }
470 assert.NotNil(t, unpackResult)
471}
472
khenaidoo92e62c52018-10-03 14:02:54 -0400473func TestDeviceStateChange(t *testing.T) {
474 log.SetAllLogLevel(log.DebugLevel)
475 trnsId := uuid.New().String()
476 protoArg1 := &voltha.ID{Id: trnsId}
477 args := make([]*kk.KVArg, 4)
478 args[0] = &kk.KVArg{
479 Key: "device_id",
480 Value: protoArg1,
481 }
khenaidoo79232702018-12-04 11:00:41 -0500482 protoArg2 := &ic.IntType{Val: 1}
khenaidoo92e62c52018-10-03 14:02:54 -0400483 args[1] = &kk.KVArg{
484 Key: "oper_status",
485 Value: protoArg2,
486 }
khenaidoo79232702018-12-04 11:00:41 -0500487 protoArg3 := &ic.IntType{Val: 1}
khenaidoo92e62c52018-10-03 14:02:54 -0400488 args[2] = &kk.KVArg{
489 Key: "connect_status",
490 Value: protoArg3,
491 }
492
493 rpc := "DeviceStateUpdate"
494 topic := kk.Topic{Name: "Core"}
495 start := time.Now()
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400496 status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
khenaidoo92e62c52018-10-03 14:02:54 -0400497 elapsed := time.Since(start)
498 log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
499 assert.Equal(t, status, true)
500 assert.Nil(t, result)
501}
502
khenaidoo79232702018-12-04 11:00:41 -0500503func subscribeToTopic(topic *kk.Topic, waitingChannel chan *ic.InterContainerMessage) error {
504 var ch <-chan *ic.InterContainerMessage
505 var err error
506 if ch, err = kafkaPartitionClient.Subscribe(topic); err != nil {
507 return nil
508 }
509 msg := <-ch
510
511 log.Debugw("msg-received", log.Fields{"msg": msg})
512 waitingChannel <- msg
513 return nil
514}
515
516func TestDeviceDiscovery(t *testing.T) {
517 // Create an intercontainer proxy - similar to the Core
518 testProxy, _ := kk.NewInterContainerProxy(
519 kk.InterContainerHost(hostIP),
520 kk.InterContainerPort(9092),
521 kk.DefaultTopic(&kk.Topic{Name: "Test"}),
522 kk.MsgClient(kafkaClient),
523 kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
524
525 // First start to wait for the message
526 waitingChannel := make(chan *ic.InterContainerMessage)
527 go subscribeToTopic(&kk.Topic{Name: affinityRouterTopic}, waitingChannel)
528
529 // Sleep to make sure the consumer is ready
530 time.Sleep(time.Millisecond * 100)
531
532 // Send the message
khenaidoo19374072018-12-11 11:05:15 -0500533 go testProxy.DeviceDiscovered("TestDeviceId", "TestDevicetype", "TestParentId", "myPODName")
khenaidoo79232702018-12-04 11:00:41 -0500534
535 msg := <-waitingChannel
536 totalTime := (time.Now().UnixNano() - msg.Header.Timestamp) / int64(time.Millisecond)
537 assert.Equal(t, msg.Header.Type, ic.MessageType_DEVICE_DISCOVERED)
538 // Unpack message
539 dd := &ic.DeviceDiscovered{}
540 err := ptypes.UnmarshalAny(msg.Body, dd)
541 assert.Nil(t, err)
542 assert.Equal(t, dd.Id, "TestDeviceId")
543 assert.Equal(t, dd.DeviceType, "TestDevicetype")
544 assert.Equal(t, dd.ParentId, "TestParentId")
khenaidoo19374072018-12-11 11:05:15 -0500545 assert.Equal(t, dd.Publisher, "myPODName")
khenaidoo79232702018-12-04 11:00:41 -0500546 log.Debugw("TotalTime", log.Fields{"time": totalTime})
547}
548
khenaidooabad44c2018-08-03 16:58:35 -0400549func TestStopKafkaProxy(t *testing.T) {
550 adapterKafkaProxy.Stop()
551 coreKafkaProxy.Stop()
552}
553
554//func TestMain(m *testing.T) {
555// log.Info("Main")
556//}