blob: 1594b6ee73a88d5433abc7e7ee5b3d5351e910ff [file] [log] [blame]
ssiddiquif076cb82021-04-23 10:47:04 +05301/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
20 "context"
21 "reflect"
22 "strconv"
23 "testing"
24
25 "github.com/golang/mock/gomock"
26 "github.com/golang/protobuf/ptypes"
27 "github.com/golang/protobuf/ptypes/any"
28 "github.com/opencord/voltha-go/db/model"
29 "github.com/opencord/voltha-go/rw_core/config"
30 "github.com/opencord/voltha-go/rw_core/core/adapter"
31 tst "github.com/opencord/voltha-go/rw_core/test"
yasin sapli5458a1c2021-06-14 22:24:38 +000032 "github.com/opencord/voltha-lib-go/v5/pkg/db"
33 "github.com/opencord/voltha-lib-go/v5/pkg/events"
34 "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
ssiddiquif076cb82021-04-23 10:47:04 +053035 "github.com/opencord/voltha-protos/v4/go/common"
36 "github.com/opencord/voltha-protos/v4/go/voltha"
37 "github.com/phayes/freeport"
38 "github.com/stretchr/testify/assert"
39)
40
41const (
42 version = "dummy-version"
43 url = "http://127.0.0.1:2222/dummy-image"
44 vendor = "dummy"
45
46 numberOfTestDevices = 10
47)
48
49func initialiseTest(ctx context.Context, t *testing.T) (*DATest, *MockInterContainerProxy, []*Agent) {
50 dat := newDATest(ctx)
51
52 controller := gomock.NewController(t)
53 mockICProxy := NewMockInterContainerProxy(controller)
54
55 // Set expectations for the mock
56 mockICProxy.EXPECT().Start(gomock.Any()).AnyTimes().Return(nil)
57 mockICProxy.EXPECT().SubscribeWithDefaultRequestHandler(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
58
59 dat.startCoreWithCustomICProxy(ctx, mockICProxy)
60
61 var agents []*Agent
62 for i := 1; i <= numberOfTestDevices; i++ {
63 if agent := dat.createDeviceAgent(t); agent != nil {
64 agents = append(agents, agent)
65 }
66 }
67
68 assert.Equal(t, len(agents), numberOfTestDevices)
69
70 dat.oltAdapter, dat.onuAdapter = tst.CreateAndregisterAdapters(ctx,
71 t,
72 dat.kClient,
73 dat.coreInstanceID,
74 dat.oltAdapterName,
75 dat.onuAdapterName,
76 dat.adapterMgr)
77
78 return dat, mockICProxy, agents
79}
80
81func (dat *DATest) startCoreWithCustomICProxy(ctx context.Context, kmp kafka.InterContainerProxy) {
David K. Bainbridge6080c172021-07-24 00:22:28 +000082 cfg := &config.RWCoreFlags{}
83 cfg.ParseCommandArguments([]string{})
ssiddiquif076cb82021-04-23 10:47:04 +053084 cfg.CoreTopic = "rw_core"
85 cfg.EventTopic = "voltha.events"
86 cfg.DefaultRequestTimeout = dat.defaultTimeout
87 cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
88 grpcPort, err := freeport.GetFreePort()
89 if err != nil {
90 logger.Fatal(ctx, "Cannot get a freeport for grpc")
91 }
92 cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
93 client := tst.SetupKVClient(ctx, cfg, dat.coreInstanceID)
94 backend := &db.Backend{
95 Client: client,
96 StoreType: cfg.KVStoreType,
97 Address: cfg.KVStoreAddress,
98 Timeout: cfg.KVStoreTimeout,
99 LivenessChannelInterval: cfg.LiveProbeInterval / 2}
100
101 dat.kmp = kmp
102
103 endpointMgr := kafka.NewEndpointManager(backend)
104 proxy := model.NewDBPath(backend)
105 dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
106 eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
Maninder0aabf0c2021-03-17 14:55:14 +0530107 dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg, dat.coreInstanceID, eventProxy)
ssiddiquif076cb82021-04-23 10:47:04 +0530108 dat.adapterMgr.Start(context.Background())
109 if err = dat.kmp.Start(ctx); err != nil {
110 logger.Fatal(ctx, "Cannot start InterContainerProxy")
111 }
112
113 if err := dat.kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: cfg.CoreTopic}, kafka.OffsetNewest); err != nil {
114 logger.Fatalf(ctx, "Cannot add default request handler: %s", err)
115 }
116
117}
118
119func TestManager_DownloadImageToDevice(t *testing.T) {
120 type args struct {
121 ctx context.Context
122 request *voltha.DeviceImageDownloadRequest
123 }
124
125 ctx := context.Background()
126 dat, mockICProxy, agents := initialiseTest(ctx, t)
127
128 tests := []struct {
129 name string
130 args args
131 want *voltha.DeviceImageResponse
132 wantErr bool
133 }{
134 {
135 name: "request-for-single-device",
136 args: args{
137 ctx: ctx,
138 request: newDeviceImageDownloadRequest(agents[:1]),
139 },
140 want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
141 wantErr: false,
142 },
143 {
144 name: "request-for-multiple-devices",
145 args: args{
146 ctx: ctx,
147 request: newDeviceImageDownloadRequest(agents),
148 },
149 want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
150 wantErr: false,
151 },
152 }
153
154 for _, tt := range tests {
155 t.Run(tt.name, func(t *testing.T) {
156 if tt.name == "request-for-single-device" {
157 chnl := make(chan *kafka.RpcResponse, 10)
158 // Set expectation for the API invocation
159 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
160 "Download_onu_image",
161 gomock.Any(),
162 gomock.Any(),
163 true,
164 gomock.Any(), gomock.Any()).Return(chnl)
165 // Send the expected response to channel from a goroutine
166 go func() {
167 reply := newImageDownloadAdapterResponse(t, agents[0].deviceID, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
168
169 chnl <- &kafka.RpcResponse{
170 MType: kafka.RpcSent,
171 Err: nil,
172 Reply: reply,
173 }
174
175 chnl <- &kafka.RpcResponse{
176 MType: kafka.RpcReply,
177 Err: nil,
178 Reply: reply,
179 }
180 }()
181 } else if tt.name == "request-for-multiple-devices" {
182 // Map to store per device kafka response channel
183 kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
184 for _, id := range tt.args.request.DeviceId {
185 // Create a kafka response channel per device
186 chnl := make(chan *kafka.RpcResponse)
187
188 // Set expectation for the API invocation
189 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
190 "Download_onu_image",
191 gomock.Any(),
192 gomock.Any(),
193 true,
194 id.Id, gomock.Any()).Return(chnl)
195
196 kafkaRespChans[id.Id] = chnl
197 }
198
199 // Send the expected response to channel from a goroutine
200 go func() {
201 for _, agent := range agents {
202 reply := newImageDownloadAdapterResponse(t, agent.deviceID, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
203
204 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
205 MType: kafka.RpcSent,
206 Err: nil,
207 Reply: reply,
208 }
209
210 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
211 MType: kafka.RpcReply,
212 Err: nil,
213 Reply: reply,
214 }
215 }
216 }()
217 }
218
219 got, err := dat.deviceMgr.DownloadImageToDevice(tt.args.ctx, tt.args.request)
220 if (err != nil) != tt.wantErr {
221 t.Errorf("DownloadImageToDevice() error = %v, wantErr %v", err, tt.wantErr)
222 return
223 }
224
225 if !gotAllSuccess(got, tt.want) {
226 t.Errorf("DownloadImageToDevice() got = %v, want = %v", got, tt.want)
227 }
228 })
229 }
230}
231
232func TestManager_GetImageStatus(t *testing.T) {
233 type args struct {
234 ctx context.Context
235 request *voltha.DeviceImageRequest
236 }
237
238 ctx := context.Background()
239 dat, mockICProxy, agents := initialiseTest(ctx, t)
240
241 tests := []struct {
242 name string
243 args args
244 want *voltha.DeviceImageResponse
245 wantErr bool
246 }{
247 {
248 name: "request-for-single-device",
249 args: args{
250 ctx: ctx,
251 request: newDeviceImagedRequest(agents[:1]),
252 },
253 want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
254 wantErr: false,
255 },
256 {
257 name: "request-for-multiple-devices",
258 args: args{
259 ctx: ctx,
260 request: newDeviceImagedRequest(agents),
261 },
262 want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
263 wantErr: false,
264 },
265 }
266
267 for _, tt := range tests {
268 t.Run(tt.name, func(t *testing.T) {
269 if tt.name == "request-for-single-device" {
270 chnl := make(chan *kafka.RpcResponse, 10)
271 // Set expectation for the API invocation
272 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
273 "Get_onu_image_status",
274 gomock.Any(),
275 gomock.Any(),
276 true,
277 gomock.Any(), gomock.Any()).Return(chnl)
278 // Send the expected response to channel from a goroutine
279 go func() {
280 reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
281
282 chnl <- &kafka.RpcResponse{
283 MType: kafka.RpcSent,
284 Err: nil,
285 Reply: reply,
286 }
287
288 chnl <- &kafka.RpcResponse{
289 MType: kafka.RpcReply,
290 Err: nil,
291 Reply: reply,
292 }
293 }()
294 } else if tt.name == "request-for-multiple-devices" {
295 // Map to store per device kafka response channel
296 kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
297 for _, id := range tt.args.request.DeviceId {
298 // Create a kafka response channel per device
299 chnl := make(chan *kafka.RpcResponse)
300
301 // Set expectation for the API invocation
302 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
303 "Get_onu_image_status",
304 gomock.Any(),
305 gomock.Any(),
306 true,
307 id.Id, gomock.Any()).Return(chnl)
308
309 kafkaRespChans[id.Id] = chnl
310 }
311
312 // Send the expected response to channel from a goroutine
313 go func() {
314 for _, agent := range agents {
315 reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
316
317 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
318 MType: kafka.RpcSent,
319 Err: nil,
320 Reply: reply,
321 }
322
323 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
324 MType: kafka.RpcReply,
325 Err: nil,
326 Reply: reply,
327 }
328 }
329 }()
330 }
331
332 got, err := dat.deviceMgr.GetImageStatus(tt.args.ctx, tt.args.request)
333 if (err != nil) != tt.wantErr {
334 t.Errorf("GetImageStatus() error = %v, wantErr %v", err, tt.wantErr)
335 return
336 }
337
338 if !gotAllSuccess(got, tt.want) {
339 t.Errorf("GetImageStatus() got = %v, want %v", got, tt.want)
340 }
341 })
342 }
343}
344
345func TestManager_AbortImageUpgradeToDevice(t *testing.T) {
346
347 type args struct {
348 ctx context.Context
349 request *voltha.DeviceImageRequest
350 }
351
352 ctx := context.Background()
353 dat, mockICProxy, agents := initialiseTest(ctx, t)
354
355 tests := []struct {
356 name string
357 args args
358 want *voltha.DeviceImageResponse
359 wantErr bool
360 }{
361 {
362 name: "request-for-single-device",
363 args: args{
364 ctx: ctx,
365 request: newDeviceImagedRequest(agents[:1]),
366 },
367 want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR),
368 wantErr: false,
369 },
370 {
371 name: "request-for-multiple-devices",
372 args: args{
373 ctx: ctx,
374 request: newDeviceImagedRequest(agents[:1]),
375 },
376 want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR),
377 wantErr: false,
378 },
379 }
380 for _, tt := range tests {
381 t.Run(tt.name, func(t *testing.T) {
382 if tt.name == "request-for-single-device" {
383 chnl := make(chan *kafka.RpcResponse, 10)
384 // Set expectation for the API invocation
385 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
386 "Abort_onu_image_upgrade",
387 gomock.Any(),
388 gomock.Any(),
389 true,
390 gomock.Any(), gomock.Any()).Return(chnl)
391 // Send the expected response to channel from a goroutine
392 go func() {
393 reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR)
394
395 chnl <- &kafka.RpcResponse{
396 MType: kafka.RpcSent,
397 Err: nil,
398 Reply: reply,
399 }
400
401 chnl <- &kafka.RpcResponse{
402 MType: kafka.RpcReply,
403 Err: nil,
404 Reply: reply,
405 }
406 }()
407 } else if tt.name == "request-for-multiple-devices" {
408 // Map to store per device kafka response channel
409 kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
410 for _, id := range tt.args.request.DeviceId {
411 // Create a kafka response channel per device
412 chnl := make(chan *kafka.RpcResponse)
413
414 // Set expectation for the API invocation
415 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
416 "Abort_onu_image_upgrade",
417 gomock.Any(),
418 gomock.Any(),
419 true,
420 id.Id, gomock.Any()).Return(chnl)
421
422 kafkaRespChans[id.Id] = chnl
423 }
424
425 // Send the expected response to channel from a goroutine
426 go func() {
427 for _, agent := range agents {
428 reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR)
429
430 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
431 MType: kafka.RpcSent,
432 Err: nil,
433 Reply: reply,
434 }
435
436 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
437 MType: kafka.RpcReply,
438 Err: nil,
439 Reply: reply,
440 }
441 }
442 }()
443 }
444 got, err := dat.deviceMgr.AbortImageUpgradeToDevice(tt.args.ctx, tt.args.request)
445 if (err != nil) != tt.wantErr {
446 t.Errorf("AbortImageUpgradeToDevice() error = %v, wantErr %v", err, tt.wantErr)
447 return
448 }
449
450 if !gotAllSuccess(got, tt.want) {
451 t.Errorf("AbortImageUpgradeToDevice() got = %v, want %v", got, tt.want)
452 }
453 })
454 }
455}
456
457func TestManager_ActivateImage(t *testing.T) {
458 type args struct {
459 ctx context.Context
460 request *voltha.DeviceImageRequest
461 }
462
463 ctx := context.Background()
464 dat, mockICProxy, agents := initialiseTest(ctx, t)
465
466 tests := []struct {
467 name string
468 args args
469 want *voltha.DeviceImageResponse
470 wantErr bool
471 }{
472 {
473 name: "request-for-single-device",
474 args: args{
475 ctx: ctx,
476 request: newDeviceImagedRequest(agents[:1]),
477 },
478 want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR),
479 wantErr: false,
480 },
481 {
482 name: "request-for-multiple-devices",
483 args: args{
484 ctx: ctx,
485 request: newDeviceImagedRequest(agents),
486 },
487 want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR),
488 wantErr: false,
489 },
490 }
491
492 for _, tt := range tests {
493 t.Run(tt.name, func(t *testing.T) {
494 if tt.name == "request-for-single-device" {
495 chnl := make(chan *kafka.RpcResponse, 10)
496 // Set expectation for the API invocation
497 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
498 "Activate_onu_image",
499 gomock.Any(),
500 gomock.Any(),
501 true,
502 gomock.Any(), gomock.Any()).Return(chnl)
503 // Send the expected response to channel from a goroutine
504 go func() {
505 reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR)
506
507 chnl <- &kafka.RpcResponse{
508 MType: kafka.RpcSent,
509 Err: nil,
510 Reply: reply,
511 }
512
513 chnl <- &kafka.RpcResponse{
514 MType: kafka.RpcReply,
515 Err: nil,
516 Reply: reply,
517 }
518 }()
519 } else if tt.name == "request-for-multiple-devices" {
520 // Map to store per device kafka response channel
521 kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
522 for _, id := range tt.args.request.DeviceId {
523 // Create a kafka response channel per device
524 chnl := make(chan *kafka.RpcResponse)
525
526 // Set expectation for the API invocation
527 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
528 "Activate_onu_image",
529 gomock.Any(),
530 gomock.Any(),
531 true,
532 id.Id, gomock.Any()).Return(chnl)
533
534 kafkaRespChans[id.Id] = chnl
535 }
536
537 // Send the expected response to channel from a goroutine
538 go func() {
539 for _, agent := range agents {
540 reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR)
541
542 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
543 MType: kafka.RpcSent,
544 Err: nil,
545 Reply: reply,
546 }
547
548 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
549 MType: kafka.RpcReply,
550 Err: nil,
551 Reply: reply,
552 }
553 }
554 }()
555 }
556 got, err := dat.deviceMgr.ActivateImage(tt.args.ctx, tt.args.request)
557 if (err != nil) != tt.wantErr {
558 t.Errorf("ActivateImage() error = %v, wantErr %v", err, tt.wantErr)
559 return
560 }
561 if !gotAllSuccess(got, tt.want) {
562 t.Errorf("ActivateImage() got = %v, want %v", got, tt.want)
563 }
564 })
565 }
566}
567
568func TestManager_CommitImage(t *testing.T) {
569 type args struct {
570 ctx context.Context
571 request *voltha.DeviceImageRequest
572 }
573
574 ctx := context.Background()
575 dat, mockICProxy, agents := initialiseTest(ctx, t)
576
577 tests := []struct {
578 name string
579 args args
580 want *voltha.DeviceImageResponse
581 wantErr bool
582 }{
583 {
584 name: "request-for-single-device",
585 args: args{
586 ctx: ctx,
587 request: newDeviceImagedRequest(agents[:1]),
588 },
589 want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR),
590 wantErr: false,
591 },
592 {
593 name: "request-for-multiple-devices",
594 args: args{
595 ctx: ctx,
596 request: newDeviceImagedRequest(agents),
597 },
598 want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR),
599 wantErr: false,
600 },
601 }
602 for _, tt := range tests {
603 t.Run(tt.name, func(t *testing.T) {
604 if tt.name == "request-for-single-device" {
605 chnl := make(chan *kafka.RpcResponse, 10)
606 // Set expectation for the API invocation
607 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
608 "Commit_onu_image",
609 gomock.Any(),
610 gomock.Any(),
611 true,
612 gomock.Any(), gomock.Any()).Return(chnl)
613 // Send the expected response to channel from a goroutine
614 go func() {
615 reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR)
616
617 chnl <- &kafka.RpcResponse{
618 MType: kafka.RpcSent,
619 Err: nil,
620 Reply: reply,
621 }
622
623 chnl <- &kafka.RpcResponse{
624 MType: kafka.RpcReply,
625 Err: nil,
626 Reply: reply,
627 }
628 }()
629 } else if tt.name == "request-for-multiple-devices" {
630 // Map to store per device kafka response channel
631 kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
632 for _, id := range tt.args.request.DeviceId {
633 // Create a kafka response channel per device
634 chnl := make(chan *kafka.RpcResponse)
635
636 // Set expectation for the API invocation
637 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
638 "Commit_onu_image",
639 gomock.Any(),
640 gomock.Any(),
641 true,
642 id.Id, gomock.Any()).Return(chnl)
643
644 kafkaRespChans[id.Id] = chnl
645 }
646
647 // Send the expected response to channel from a goroutine
648 go func() {
649 for _, agent := range agents {
650 reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR)
651
652 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
653 MType: kafka.RpcSent,
654 Err: nil,
655 Reply: reply,
656 }
657
658 kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
659 MType: kafka.RpcReply,
660 Err: nil,
661 Reply: reply,
662 }
663 }
664 }()
665 }
666 got, err := dat.deviceMgr.CommitImage(tt.args.ctx, tt.args.request)
667 if (err != nil) != tt.wantErr {
668 t.Errorf("CommitImage() error = %v, wantErr %v", err, tt.wantErr)
669 return
670 }
671 if !gotAllSuccess(got, tt.want) {
672 t.Errorf("CommitImage() got = %v, want %v", got, tt.want)
673 }
674 })
675 }
676}
677
678func TestManager_GetOnuImages(t *testing.T) {
679 type args struct {
680 ctx context.Context
681 id *common.ID
682 }
683
684 ctx := context.Background()
685 dat, mockICProxy, agents := initialiseTest(ctx, t)
686
687 tests := []struct {
688 name string
689 args args
690 want *voltha.OnuImages
691 wantErr bool
692 }{
693 {
694 name: "request-for-single-device",
695 args: args{
696 ctx: ctx,
697 id: &common.ID{
698 Id: agents[0].deviceID,
699 },
700 },
701 want: &voltha.OnuImages{
702 Items: []*voltha.OnuImage{{
703 Version: version,
704 IsCommited: true,
705 IsActive: true,
706 IsValid: true,
707 }},
708 },
709 wantErr: false,
710 },
711 }
712
713 for _, tt := range tests {
714 t.Run(tt.name, func(t *testing.T) {
715 if tt.name == "request-for-single-device" {
716 chnl := make(chan *kafka.RpcResponse, 10)
717 // Set expectation for the API invocation
718 mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
719 "Get_onu_images",
720 gomock.Any(),
721 gomock.Any(),
722 true,
723 gomock.Any(), gomock.Any()).Return(chnl)
724 // Send the expected response to channel from a goroutine
725 go func() {
726 reply := newOnuImagesResponse(t)
727 chnl <- &kafka.RpcResponse{
728 MType: kafka.RpcSent,
729 Err: nil,
730 Reply: reply,
731 }
732
733 chnl <- &kafka.RpcResponse{
734 MType: kafka.RpcReply,
735 Err: nil,
736 Reply: reply,
737 }
738 }()
739 }
740
741 got, err := dat.deviceMgr.GetOnuImages(tt.args.ctx, tt.args.id)
742 if (err != nil) != tt.wantErr {
743 t.Errorf("GetOnuImages() error = %v, wantErr %v", err, tt.wantErr)
744 return
745 }
746 if !reflect.DeepEqual(got, tt.want) {
747 t.Errorf("GetOnuImages() got = %v, want %v", got, tt.want)
748 }
749 })
750 }
751}
752
753// verify that we got all the wanted response (order not important)
754func gotAllSuccess(got, want *voltha.DeviceImageResponse) bool {
755 for _, imagestateGot := range got.DeviceImageStates {
756 found := false
757 for _, imageStateWant := range want.DeviceImageStates {
758 if reflect.DeepEqual(imagestateGot, imageStateWant) {
759 found = true
760 }
761 }
762
763 if !found {
764 return false
765 }
766 }
767
768 return true
769}
770
771func newDeviceImagedRequest(agents []*Agent) *voltha.DeviceImageRequest {
772 imgReq := &voltha.DeviceImageRequest{
773 Version: version,
774 CommitOnSuccess: true,
775 }
776
777 for _, agent := range agents {
778 if agent != nil {
779 imgReq.DeviceId = append(imgReq.DeviceId, &common.ID{
780 Id: agent.deviceID,
781 })
782 }
783 }
784
785 return imgReq
786}
787
788func newDeviceImageDownloadRequest(agents []*Agent) *voltha.DeviceImageDownloadRequest {
789 imgDownReq := &voltha.DeviceImageDownloadRequest{
790 Image: &voltha.Image{
791 Version: version,
792 Url: url,
793 Vendor: vendor,
794 },
795 ActivateOnSuccess: true,
796 CommitOnSuccess: true,
797 }
798
799 for _, agent := range agents {
800 if agent != nil {
801 imgDownReq.DeviceId = append(imgDownReq.DeviceId, &common.ID{
802 Id: agent.deviceID,
803 })
804 }
805 }
806
807 return imgDownReq
808}
809
810func newImageResponse(agents []*Agent,
811 downloadState voltha.ImageState_ImageDownloadState,
812 imageSate voltha.ImageState_ImageActivationState,
813 reason voltha.ImageState_ImageFailureReason) *voltha.DeviceImageResponse {
814 response := &voltha.DeviceImageResponse{}
815
816 for _, agent := range agents {
817 response.DeviceImageStates = append(response.DeviceImageStates, &voltha.DeviceImageState{
818 DeviceId: agent.deviceID,
819 ImageState: &voltha.ImageState{
820 Version: version,
821 DownloadState: downloadState,
822 Reason: reason,
823 ImageState: imageSate,
824 },
825 })
826 }
827
828 return response
829}
830
831func newImageDownloadAdapterResponse(t *testing.T,
832 deviceID string,
833 downloadState voltha.ImageState_ImageDownloadState,
834 imageSate voltha.ImageState_ImageActivationState,
835 reason voltha.ImageState_ImageFailureReason) *any.Any {
836 reply, err := ptypes.MarshalAny(&voltha.DeviceImageResponse{
837 DeviceImageStates: []*voltha.DeviceImageState{{
838 DeviceId: deviceID,
839 ImageState: &voltha.ImageState{
840 Version: version,
841 DownloadState: downloadState,
842 Reason: reason,
843 ImageState: imageSate,
844 },
845 }},
846 })
847 assert.Nil(t, err)
848 return reply
849}
850
851func newImageStatusAdapterResponse(t *testing.T,
852 agents []*Agent,
853 downloadState voltha.ImageState_ImageDownloadState,
854 imageSate voltha.ImageState_ImageActivationState,
855 reason voltha.ImageState_ImageFailureReason) *any.Any {
856 imgResponse := &voltha.DeviceImageResponse{}
857 for _, agent := range agents {
858 imgResponse.DeviceImageStates = append(imgResponse.DeviceImageStates, &voltha.DeviceImageState{
859 DeviceId: agent.deviceID,
860 ImageState: &voltha.ImageState{
861 Version: version,
862 DownloadState: downloadState,
863 Reason: reason,
864 ImageState: imageSate,
865 },
866 })
867 }
868
869 reply, err := ptypes.MarshalAny(imgResponse)
870 assert.Nil(t, err)
871 return reply
872}
873
874func newOnuImagesResponse(t *testing.T) *any.Any {
875 onuImages := &voltha.OnuImages{
876 Items: []*voltha.OnuImage{{
877 Version: version,
878 IsCommited: true,
879 IsActive: true,
880 IsValid: true,
881 }},
882 }
883
884 reply, err := ptypes.MarshalAny(onuImages)
885 assert.Nil(t, err)
886 return reply
887}