blob: 23968b5e35ef1addd30d4b9e66fd1b91747c03ae [file] [log] [blame]
ssiddiquif076cb82021-04-23 10:47:04 +05301/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
20 "context"
21 "reflect"
22 "strconv"
23 "testing"
24
25 "github.com/golang/mock/gomock"
26 "github.com/golang/protobuf/ptypes"
27 "github.com/golang/protobuf/ptypes/any"
28 "github.com/opencord/voltha-go/db/model"
29 "github.com/opencord/voltha-go/rw_core/config"
30 "github.com/opencord/voltha-go/rw_core/core/adapter"
31 tst "github.com/opencord/voltha-go/rw_core/test"
yasin sapli5458a1c2021-06-14 22:24:38 +000032 "github.com/opencord/voltha-lib-go/v5/pkg/db"
33 "github.com/opencord/voltha-lib-go/v5/pkg/events"
34 "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
ssiddiquif076cb82021-04-23 10:47:04 +053035 "github.com/opencord/voltha-protos/v4/go/common"
36 "github.com/opencord/voltha-protos/v4/go/voltha"
37 "github.com/phayes/freeport"
38 "github.com/stretchr/testify/assert"
39)
40
41const (
42 version = "dummy-version"
43 url = "http://127.0.0.1:2222/dummy-image"
44 vendor = "dummy"
45
46 numberOfTestDevices = 10
47)
48
49func initialiseTest(ctx context.Context, t *testing.T) (*DATest, *MockInterContainerProxy, []*Agent) {
50 dat := newDATest(ctx)
51
52 controller := gomock.NewController(t)
53 mockICProxy := NewMockInterContainerProxy(controller)
54
55 // Set expectations for the mock
56 mockICProxy.EXPECT().Start(gomock.Any()).AnyTimes().Return(nil)
57 mockICProxy.EXPECT().SubscribeWithDefaultRequestHandler(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
58
59 dat.startCoreWithCustomICProxy(ctx, mockICProxy)
60
61 var agents []*Agent
62 for i := 1; i <= numberOfTestDevices; i++ {
63 if agent := dat.createDeviceAgent(t); agent != nil {
64 agents = append(agents, agent)
65 }
66 }
67
68 assert.Equal(t, len(agents), numberOfTestDevices)
69
70 dat.oltAdapter, dat.onuAdapter = tst.CreateAndregisterAdapters(ctx,
71 t,
72 dat.kClient,
73 dat.coreInstanceID,
74 dat.oltAdapterName,
75 dat.onuAdapterName,
76 dat.adapterMgr)
77
78 return dat, mockICProxy, agents
79}
80
81func (dat *DATest) startCoreWithCustomICProxy(ctx context.Context, kmp kafka.InterContainerProxy) {
82 cfg := config.NewRWCoreFlags()
83 cfg.CoreTopic = "rw_core"
84 cfg.EventTopic = "voltha.events"
85 cfg.DefaultRequestTimeout = dat.defaultTimeout
86 cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
87 grpcPort, err := freeport.GetFreePort()
88 if err != nil {
89 logger.Fatal(ctx, "Cannot get a freeport for grpc")
90 }
91 cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
92 client := tst.SetupKVClient(ctx, cfg, dat.coreInstanceID)
93 backend := &db.Backend{
94 Client: client,
95 StoreType: cfg.KVStoreType,
96 Address: cfg.KVStoreAddress,
97 Timeout: cfg.KVStoreTimeout,
98 LivenessChannelInterval: cfg.LiveProbeInterval / 2}
99
100 dat.kmp = kmp
101
102 endpointMgr := kafka.NewEndpointManager(backend)
103 proxy := model.NewDBPath(backend)
104 dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
105 eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
Maninder0aabf0c2021-03-17 14:55:14 +0530106 dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg, dat.coreInstanceID, eventProxy)
ssiddiquif076cb82021-04-23 10:47:04 +0530107 dat.adapterMgr.Start(context.Background())
108 if err = dat.kmp.Start(ctx); err != nil {
109 logger.Fatal(ctx, "Cannot start InterContainerProxy")
110 }
111
112 if err := dat.kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: cfg.CoreTopic}, kafka.OffsetNewest); err != nil {
113 logger.Fatalf(ctx, "Cannot add default request handler: %s", err)
114 }
115
116}
117
118func TestManager_DownloadImageToDevice(t *testing.T) {
119 type args struct {
120 ctx context.Context
121 request *voltha.DeviceImageDownloadRequest
122 }
123
124 ctx := context.Background()
125 dat, mockICProxy, agents := initialiseTest(ctx, t)
126
127 tests := []struct {
128 name string
129 args args
130 want *voltha.DeviceImageResponse
131 wantErr bool
132 }{
133 {
134 name: "request-for-single-device",
135 args: args{
136 ctx: ctx,
137 request: newDeviceImageDownloadRequest(agents[:1]),
138 },
139 want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
140 wantErr: false,
141 },
142 {
143 name: "request-for-multiple-devices",
144 args: args{
145 ctx: ctx,
146 request: newDeviceImageDownloadRequest(agents),
147 },
148 want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
149 wantErr: false,
150 },
151 }
152
153 for _, tt := range tests {
154 t.Run(tt.name, func(t *testing.T) {
155 if tt.name == "request-for-single-device" {
156 chnl := make(chan *kafka.RpcResponse, 10)
157 // Set expectation for the API invocation
158 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
159 "Download_onu_image",
160 gomock.Any(),
161 gomock.Any(),
162 true,
163 gomock.Any(), gomock.Any()).Return(chnl)
164 // Send the expected response to channel from a goroutine
165 go func() {
166 reply := newImageDownloadAdapterResponse(t, agents[0].deviceID, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
167
168 chnl <- &kafka.RpcResponse{
169 MType: kafka.RpcSent,
170 Err: nil,
171 Reply: reply,
172 }
173
174 chnl <- &kafka.RpcResponse{
175 MType: kafka.RpcReply,
176 Err: nil,
177 Reply: reply,
178 }
179 }()
180 } else if tt.name == "request-for-multiple-devices" {
181 // Map to store per device kafka response channel
182 kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
183 for _, id := range tt.args.request.DeviceId {
184 // Create a kafka response channel per device
185 chnl := make(chan *kafka.RpcResponse)
186
187 // Set expectation for the API invocation
188 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
189 "Download_onu_image",
190 gomock.Any(),
191 gomock.Any(),
192 true,
193 id.Id, gomock.Any()).Return(chnl)
194
195 kafkaRespChans[id.Id] = chnl
196 }
197
198 // Send the expected response to channel from a goroutine
199 go func() {
200 for _, agent := range agents {
201 reply := newImageDownloadAdapterResponse(t, agent.deviceID, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
202
203 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
204 MType: kafka.RpcSent,
205 Err: nil,
206 Reply: reply,
207 }
208
209 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
210 MType: kafka.RpcReply,
211 Err: nil,
212 Reply: reply,
213 }
214 }
215 }()
216 }
217
218 got, err := dat.deviceMgr.DownloadImageToDevice(tt.args.ctx, tt.args.request)
219 if (err != nil) != tt.wantErr {
220 t.Errorf("DownloadImageToDevice() error = %v, wantErr %v", err, tt.wantErr)
221 return
222 }
223
224 if !gotAllSuccess(got, tt.want) {
225 t.Errorf("DownloadImageToDevice() got = %v, want = %v", got, tt.want)
226 }
227 })
228 }
229}
230
231func TestManager_GetImageStatus(t *testing.T) {
232 type args struct {
233 ctx context.Context
234 request *voltha.DeviceImageRequest
235 }
236
237 ctx := context.Background()
238 dat, mockICProxy, agents := initialiseTest(ctx, t)
239
240 tests := []struct {
241 name string
242 args args
243 want *voltha.DeviceImageResponse
244 wantErr bool
245 }{
246 {
247 name: "request-for-single-device",
248 args: args{
249 ctx: ctx,
250 request: newDeviceImagedRequest(agents[:1]),
251 },
252 want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
253 wantErr: false,
254 },
255 {
256 name: "request-for-multiple-devices",
257 args: args{
258 ctx: ctx,
259 request: newDeviceImagedRequest(agents),
260 },
261 want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
262 wantErr: false,
263 },
264 }
265
266 for _, tt := range tests {
267 t.Run(tt.name, func(t *testing.T) {
268 if tt.name == "request-for-single-device" {
269 chnl := make(chan *kafka.RpcResponse, 10)
270 // Set expectation for the API invocation
271 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
272 "Get_onu_image_status",
273 gomock.Any(),
274 gomock.Any(),
275 true,
276 gomock.Any(), gomock.Any()).Return(chnl)
277 // Send the expected response to channel from a goroutine
278 go func() {
279 reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
280
281 chnl <- &kafka.RpcResponse{
282 MType: kafka.RpcSent,
283 Err: nil,
284 Reply: reply,
285 }
286
287 chnl <- &kafka.RpcResponse{
288 MType: kafka.RpcReply,
289 Err: nil,
290 Reply: reply,
291 }
292 }()
293 } else if tt.name == "request-for-multiple-devices" {
294 // Map to store per device kafka response channel
295 kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
296 for _, id := range tt.args.request.DeviceId {
297 // Create a kafka response channel per device
298 chnl := make(chan *kafka.RpcResponse)
299
300 // Set expectation for the API invocation
301 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
302 "Get_onu_image_status",
303 gomock.Any(),
304 gomock.Any(),
305 true,
306 id.Id, gomock.Any()).Return(chnl)
307
308 kafkaRespChans[id.Id] = chnl
309 }
310
311 // Send the expected response to channel from a goroutine
312 go func() {
313 for _, agent := range agents {
314 reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
315
316 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
317 MType: kafka.RpcSent,
318 Err: nil,
319 Reply: reply,
320 }
321
322 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
323 MType: kafka.RpcReply,
324 Err: nil,
325 Reply: reply,
326 }
327 }
328 }()
329 }
330
331 got, err := dat.deviceMgr.GetImageStatus(tt.args.ctx, tt.args.request)
332 if (err != nil) != tt.wantErr {
333 t.Errorf("GetImageStatus() error = %v, wantErr %v", err, tt.wantErr)
334 return
335 }
336
337 if !gotAllSuccess(got, tt.want) {
338 t.Errorf("GetImageStatus() got = %v, want %v", got, tt.want)
339 }
340 })
341 }
342}
343
344func TestManager_AbortImageUpgradeToDevice(t *testing.T) {
345
346 type args struct {
347 ctx context.Context
348 request *voltha.DeviceImageRequest
349 }
350
351 ctx := context.Background()
352 dat, mockICProxy, agents := initialiseTest(ctx, t)
353
354 tests := []struct {
355 name string
356 args args
357 want *voltha.DeviceImageResponse
358 wantErr bool
359 }{
360 {
361 name: "request-for-single-device",
362 args: args{
363 ctx: ctx,
364 request: newDeviceImagedRequest(agents[:1]),
365 },
366 want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR),
367 wantErr: false,
368 },
369 {
370 name: "request-for-multiple-devices",
371 args: args{
372 ctx: ctx,
373 request: newDeviceImagedRequest(agents[:1]),
374 },
375 want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR),
376 wantErr: false,
377 },
378 }
379 for _, tt := range tests {
380 t.Run(tt.name, func(t *testing.T) {
381 if tt.name == "request-for-single-device" {
382 chnl := make(chan *kafka.RpcResponse, 10)
383 // Set expectation for the API invocation
384 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
385 "Abort_onu_image_upgrade",
386 gomock.Any(),
387 gomock.Any(),
388 true,
389 gomock.Any(), gomock.Any()).Return(chnl)
390 // Send the expected response to channel from a goroutine
391 go func() {
392 reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR)
393
394 chnl <- &kafka.RpcResponse{
395 MType: kafka.RpcSent,
396 Err: nil,
397 Reply: reply,
398 }
399
400 chnl <- &kafka.RpcResponse{
401 MType: kafka.RpcReply,
402 Err: nil,
403 Reply: reply,
404 }
405 }()
406 } else if tt.name == "request-for-multiple-devices" {
407 // Map to store per device kafka response channel
408 kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
409 for _, id := range tt.args.request.DeviceId {
410 // Create a kafka response channel per device
411 chnl := make(chan *kafka.RpcResponse)
412
413 // Set expectation for the API invocation
414 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
415 "Abort_onu_image_upgrade",
416 gomock.Any(),
417 gomock.Any(),
418 true,
419 id.Id, gomock.Any()).Return(chnl)
420
421 kafkaRespChans[id.Id] = chnl
422 }
423
424 // Send the expected response to channel from a goroutine
425 go func() {
426 for _, agent := range agents {
427 reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR)
428
429 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
430 MType: kafka.RpcSent,
431 Err: nil,
432 Reply: reply,
433 }
434
435 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
436 MType: kafka.RpcReply,
437 Err: nil,
438 Reply: reply,
439 }
440 }
441 }()
442 }
443 got, err := dat.deviceMgr.AbortImageUpgradeToDevice(tt.args.ctx, tt.args.request)
444 if (err != nil) != tt.wantErr {
445 t.Errorf("AbortImageUpgradeToDevice() error = %v, wantErr %v", err, tt.wantErr)
446 return
447 }
448
449 if !gotAllSuccess(got, tt.want) {
450 t.Errorf("AbortImageUpgradeToDevice() got = %v, want %v", got, tt.want)
451 }
452 })
453 }
454}
455
456func TestManager_ActivateImage(t *testing.T) {
457 type args struct {
458 ctx context.Context
459 request *voltha.DeviceImageRequest
460 }
461
462 ctx := context.Background()
463 dat, mockICProxy, agents := initialiseTest(ctx, t)
464
465 tests := []struct {
466 name string
467 args args
468 want *voltha.DeviceImageResponse
469 wantErr bool
470 }{
471 {
472 name: "request-for-single-device",
473 args: args{
474 ctx: ctx,
475 request: newDeviceImagedRequest(agents[:1]),
476 },
477 want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR),
478 wantErr: false,
479 },
480 {
481 name: "request-for-multiple-devices",
482 args: args{
483 ctx: ctx,
484 request: newDeviceImagedRequest(agents),
485 },
486 want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR),
487 wantErr: false,
488 },
489 }
490
491 for _, tt := range tests {
492 t.Run(tt.name, func(t *testing.T) {
493 if tt.name == "request-for-single-device" {
494 chnl := make(chan *kafka.RpcResponse, 10)
495 // Set expectation for the API invocation
496 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
497 "Activate_onu_image",
498 gomock.Any(),
499 gomock.Any(),
500 true,
501 gomock.Any(), gomock.Any()).Return(chnl)
502 // Send the expected response to channel from a goroutine
503 go func() {
504 reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR)
505
506 chnl <- &kafka.RpcResponse{
507 MType: kafka.RpcSent,
508 Err: nil,
509 Reply: reply,
510 }
511
512 chnl <- &kafka.RpcResponse{
513 MType: kafka.RpcReply,
514 Err: nil,
515 Reply: reply,
516 }
517 }()
518 } else if tt.name == "request-for-multiple-devices" {
519 // Map to store per device kafka response channel
520 kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
521 for _, id := range tt.args.request.DeviceId {
522 // Create a kafka response channel per device
523 chnl := make(chan *kafka.RpcResponse)
524
525 // Set expectation for the API invocation
526 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
527 "Activate_onu_image",
528 gomock.Any(),
529 gomock.Any(),
530 true,
531 id.Id, gomock.Any()).Return(chnl)
532
533 kafkaRespChans[id.Id] = chnl
534 }
535
536 // Send the expected response to channel from a goroutine
537 go func() {
538 for _, agent := range agents {
539 reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR)
540
541 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
542 MType: kafka.RpcSent,
543 Err: nil,
544 Reply: reply,
545 }
546
547 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
548 MType: kafka.RpcReply,
549 Err: nil,
550 Reply: reply,
551 }
552 }
553 }()
554 }
555 got, err := dat.deviceMgr.ActivateImage(tt.args.ctx, tt.args.request)
556 if (err != nil) != tt.wantErr {
557 t.Errorf("ActivateImage() error = %v, wantErr %v", err, tt.wantErr)
558 return
559 }
560 if !gotAllSuccess(got, tt.want) {
561 t.Errorf("ActivateImage() got = %v, want %v", got, tt.want)
562 }
563 })
564 }
565}
566
567func TestManager_CommitImage(t *testing.T) {
568 type args struct {
569 ctx context.Context
570 request *voltha.DeviceImageRequest
571 }
572
573 ctx := context.Background()
574 dat, mockICProxy, agents := initialiseTest(ctx, t)
575
576 tests := []struct {
577 name string
578 args args
579 want *voltha.DeviceImageResponse
580 wantErr bool
581 }{
582 {
583 name: "request-for-single-device",
584 args: args{
585 ctx: ctx,
586 request: newDeviceImagedRequest(agents[:1]),
587 },
588 want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR),
589 wantErr: false,
590 },
591 {
592 name: "request-for-multiple-devices",
593 args: args{
594 ctx: ctx,
595 request: newDeviceImagedRequest(agents),
596 },
597 want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR),
598 wantErr: false,
599 },
600 }
601 for _, tt := range tests {
602 t.Run(tt.name, func(t *testing.T) {
603 if tt.name == "request-for-single-device" {
604 chnl := make(chan *kafka.RpcResponse, 10)
605 // Set expectation for the API invocation
606 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
607 "Commit_onu_image",
608 gomock.Any(),
609 gomock.Any(),
610 true,
611 gomock.Any(), gomock.Any()).Return(chnl)
612 // Send the expected response to channel from a goroutine
613 go func() {
614 reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR)
615
616 chnl <- &kafka.RpcResponse{
617 MType: kafka.RpcSent,
618 Err: nil,
619 Reply: reply,
620 }
621
622 chnl <- &kafka.RpcResponse{
623 MType: kafka.RpcReply,
624 Err: nil,
625 Reply: reply,
626 }
627 }()
628 } else if tt.name == "request-for-multiple-devices" {
629 // Map to store per device kafka response channel
630 kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
631 for _, id := range tt.args.request.DeviceId {
632 // Create a kafka response channel per device
633 chnl := make(chan *kafka.RpcResponse)
634
635 // Set expectation for the API invocation
636 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
637 "Commit_onu_image",
638 gomock.Any(),
639 gomock.Any(),
640 true,
641 id.Id, gomock.Any()).Return(chnl)
642
643 kafkaRespChans[id.Id] = chnl
644 }
645
646 // Send the expected response to channel from a goroutine
647 go func() {
648 for _, agent := range agents {
649 reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR)
650
651 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
652 MType: kafka.RpcSent,
653 Err: nil,
654 Reply: reply,
655 }
656
657 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
658 MType: kafka.RpcReply,
659 Err: nil,
660 Reply: reply,
661 }
662 }
663 }()
664 }
665 got, err := dat.deviceMgr.CommitImage(tt.args.ctx, tt.args.request)
666 if (err != nil) != tt.wantErr {
667 t.Errorf("CommitImage() error = %v, wantErr %v", err, tt.wantErr)
668 return
669 }
670 if !gotAllSuccess(got, tt.want) {
671 t.Errorf("CommitImage() got = %v, want %v", got, tt.want)
672 }
673 })
674 }
675}
676
677func TestManager_GetOnuImages(t *testing.T) {
678 type args struct {
679 ctx context.Context
680 id *common.ID
681 }
682
683 ctx := context.Background()
684 dat, mockICProxy, agents := initialiseTest(ctx, t)
685
686 tests := []struct {
687 name string
688 args args
689 want *voltha.OnuImages
690 wantErr bool
691 }{
692 {
693 name: "request-for-single-device",
694 args: args{
695 ctx: ctx,
696 id: &common.ID{
697 Id: agents[0].deviceID,
698 },
699 },
700 want: &voltha.OnuImages{
701 Items: []*voltha.OnuImage{{
702 Version: version,
703 IsCommited: true,
704 IsActive: true,
705 IsValid: true,
706 }},
707 },
708 wantErr: false,
709 },
710 }
711
712 for _, tt := range tests {
713 t.Run(tt.name, func(t *testing.T) {
714 if tt.name == "request-for-single-device" {
715 chnl := make(chan *kafka.RpcResponse, 10)
716 // Set expectation for the API invocation
717 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
718 "Get_onu_images",
719 gomock.Any(),
720 gomock.Any(),
721 true,
722 gomock.Any(), gomock.Any()).Return(chnl)
723 // Send the expected response to channel from a goroutine
724 go func() {
725 reply := newOnuImagesResponse(t)
726 chnl <- &kafka.RpcResponse{
727 MType: kafka.RpcSent,
728 Err: nil,
729 Reply: reply,
730 }
731
732 chnl <- &kafka.RpcResponse{
733 MType: kafka.RpcReply,
734 Err: nil,
735 Reply: reply,
736 }
737 }()
738 }
739
740 got, err := dat.deviceMgr.GetOnuImages(tt.args.ctx, tt.args.id)
741 if (err != nil) != tt.wantErr {
742 t.Errorf("GetOnuImages() error = %v, wantErr %v", err, tt.wantErr)
743 return
744 }
745 if !reflect.DeepEqual(got, tt.want) {
746 t.Errorf("GetOnuImages() got = %v, want %v", got, tt.want)
747 }
748 })
749 }
750}
751
752// verify that we got all the wanted response (order not important)
753func gotAllSuccess(got, want *voltha.DeviceImageResponse) bool {
754 for _, imagestateGot := range got.DeviceImageStates {
755 found := false
756 for _, imageStateWant := range want.DeviceImageStates {
757 if reflect.DeepEqual(imagestateGot, imageStateWant) {
758 found = true
759 }
760 }
761
762 if !found {
763 return false
764 }
765 }
766
767 return true
768}
769
770func newDeviceImagedRequest(agents []*Agent) *voltha.DeviceImageRequest {
771 imgReq := &voltha.DeviceImageRequest{
772 Version: version,
773 CommitOnSuccess: true,
774 }
775
776 for _, agent := range agents {
777 if agent != nil {
778 imgReq.DeviceId = append(imgReq.DeviceId, &common.ID{
779 Id: agent.deviceID,
780 })
781 }
782 }
783
784 return imgReq
785}
786
787func newDeviceImageDownloadRequest(agents []*Agent) *voltha.DeviceImageDownloadRequest {
788 imgDownReq := &voltha.DeviceImageDownloadRequest{
789 Image: &voltha.Image{
790 Version: version,
791 Url: url,
792 Vendor: vendor,
793 },
794 ActivateOnSuccess: true,
795 CommitOnSuccess: true,
796 }
797
798 for _, agent := range agents {
799 if agent != nil {
800 imgDownReq.DeviceId = append(imgDownReq.DeviceId, &common.ID{
801 Id: agent.deviceID,
802 })
803 }
804 }
805
806 return imgDownReq
807}
808
809func newImageResponse(agents []*Agent,
810 downloadState voltha.ImageState_ImageDownloadState,
811 imageSate voltha.ImageState_ImageActivationState,
812 reason voltha.ImageState_ImageFailureReason) *voltha.DeviceImageResponse {
813 response := &voltha.DeviceImageResponse{}
814
815 for _, agent := range agents {
816 response.DeviceImageStates = append(response.DeviceImageStates, &voltha.DeviceImageState{
817 DeviceId: agent.deviceID,
818 ImageState: &voltha.ImageState{
819 Version: version,
820 DownloadState: downloadState,
821 Reason: reason,
822 ImageState: imageSate,
823 },
824 })
825 }
826
827 return response
828}
829
830func newImageDownloadAdapterResponse(t *testing.T,
831 deviceID string,
832 downloadState voltha.ImageState_ImageDownloadState,
833 imageSate voltha.ImageState_ImageActivationState,
834 reason voltha.ImageState_ImageFailureReason) *any.Any {
835 reply, err := ptypes.MarshalAny(&voltha.DeviceImageResponse{
836 DeviceImageStates: []*voltha.DeviceImageState{{
837 DeviceId: deviceID,
838 ImageState: &voltha.ImageState{
839 Version: version,
840 DownloadState: downloadState,
841 Reason: reason,
842 ImageState: imageSate,
843 },
844 }},
845 })
846 assert.Nil(t, err)
847 return reply
848}
849
850func newImageStatusAdapterResponse(t *testing.T,
851 agents []*Agent,
852 downloadState voltha.ImageState_ImageDownloadState,
853 imageSate voltha.ImageState_ImageActivationState,
854 reason voltha.ImageState_ImageFailureReason) *any.Any {
855 imgResponse := &voltha.DeviceImageResponse{}
856 for _, agent := range agents {
857 imgResponse.DeviceImageStates = append(imgResponse.DeviceImageStates, &voltha.DeviceImageState{
858 DeviceId: agent.deviceID,
859 ImageState: &voltha.ImageState{
860 Version: version,
861 DownloadState: downloadState,
862 Reason: reason,
863 ImageState: imageSate,
864 },
865 })
866 }
867
868 reply, err := ptypes.MarshalAny(imgResponse)
869 assert.Nil(t, err)
870 return reply
871}
872
873func newOnuImagesResponse(t *testing.T) *any.Any {
874 onuImages := &voltha.OnuImages{
875 Items: []*voltha.OnuImage{{
876 Version: version,
877 IsCommited: true,
878 IsActive: true,
879 IsValid: true,
880 }},
881 }
882
883 reply, err := ptypes.MarshalAny(onuImages)
884 assert.Nil(t, err)
885 return reply
886}