[VOL-4022] RW-Core Changes For ONU SW Upgrade
New Download/Activate/Retrieve APIs
Change-Id: I2d8a0ec7d8967fd76a261a108f743e75f84c98e9
diff --git a/rw_core/core/device/manager_test.go b/rw_core/core/device/manager_test.go
new file mode 100644
index 0000000..40407f9
--- /dev/null
+++ b/rw_core/core/device/manager_test.go
@@ -0,0 +1,886 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "reflect"
+ "strconv"
+ "testing"
+
+ "github.com/golang/mock/gomock"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/rw_core/config"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ tst "github.com/opencord/voltha-go/rw_core/test"
+ "github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
+ "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
+ "github.com/opencord/voltha-protos/v4/go/common"
+ "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/phayes/freeport"
+ "github.com/stretchr/testify/assert"
+)
+
+const (
+ version = "dummy-version"
+ url = "http://127.0.0.1:2222/dummy-image"
+ vendor = "dummy"
+
+ numberOfTestDevices = 10
+)
+
+func initialiseTest(ctx context.Context, t *testing.T) (*DATest, *MockInterContainerProxy, []*Agent) {
+ dat := newDATest(ctx)
+
+ controller := gomock.NewController(t)
+ mockICProxy := NewMockInterContainerProxy(controller)
+
+ // Set expectations for the mock
+ mockICProxy.EXPECT().Start(gomock.Any()).AnyTimes().Return(nil)
+ mockICProxy.EXPECT().SubscribeWithDefaultRequestHandler(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
+
+ dat.startCoreWithCustomICProxy(ctx, mockICProxy)
+
+ var agents []*Agent
+ for i := 1; i <= numberOfTestDevices; i++ {
+ if agent := dat.createDeviceAgent(t); agent != nil {
+ agents = append(agents, agent)
+ }
+ }
+
+ assert.Equal(t, len(agents), numberOfTestDevices)
+
+ dat.oltAdapter, dat.onuAdapter = tst.CreateAndregisterAdapters(ctx,
+ t,
+ dat.kClient,
+ dat.coreInstanceID,
+ dat.oltAdapterName,
+ dat.onuAdapterName,
+ dat.adapterMgr)
+
+ return dat, mockICProxy, agents
+}
+
+func (dat *DATest) startCoreWithCustomICProxy(ctx context.Context, kmp kafka.InterContainerProxy) {
+ cfg := config.NewRWCoreFlags()
+ cfg.CoreTopic = "rw_core"
+ cfg.EventTopic = "voltha.events"
+ cfg.DefaultRequestTimeout = dat.defaultTimeout
+ cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
+ grpcPort, err := freeport.GetFreePort()
+ if err != nil {
+ logger.Fatal(ctx, "Cannot get a freeport for grpc")
+ }
+ cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
+ client := tst.SetupKVClient(ctx, cfg, dat.coreInstanceID)
+ backend := &db.Backend{
+ Client: client,
+ StoreType: cfg.KVStoreType,
+ Address: cfg.KVStoreAddress,
+ Timeout: cfg.KVStoreTimeout,
+ LivenessChannelInterval: cfg.LiveProbeInterval / 2}
+
+ dat.kmp = kmp
+
+ endpointMgr := kafka.NewEndpointManager(backend)
+ proxy := model.NewDBPath(backend)
+ dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
+ eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+ dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy, cfg.VolthaStackID)
+ dat.adapterMgr.Start(context.Background())
+ if err = dat.kmp.Start(ctx); err != nil {
+ logger.Fatal(ctx, "Cannot start InterContainerProxy")
+ }
+
+ if err := dat.kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: cfg.CoreTopic}, kafka.OffsetNewest); err != nil {
+ logger.Fatalf(ctx, "Cannot add default request handler: %s", err)
+ }
+
+}
+
+func TestManager_DownloadImageToDevice(t *testing.T) {
+ type args struct {
+ ctx context.Context
+ request *voltha.DeviceImageDownloadRequest
+ }
+
+ ctx := context.Background()
+ dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+ tests := []struct {
+ name string
+ args args
+ want *voltha.DeviceImageResponse
+ wantErr bool
+ }{
+ {
+ name: "request-for-single-device",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImageDownloadRequest(agents[:1]),
+ },
+ want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ {
+ name: "request-for-multiple-devices",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImageDownloadRequest(agents),
+ },
+ want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.name == "request-for-single-device" {
+ chnl := make(chan *kafka.RpcResponse, 10)
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Download_onu_image",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ gomock.Any(), gomock.Any()).Return(chnl)
+ // Send the expected response to channel from a goroutine
+ go func() {
+ reply := newImageDownloadAdapterResponse(t, agents[0].deviceID, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }()
+ } else if tt.name == "request-for-multiple-devices" {
+ // Map to store per device kafka response channel
+ kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+ for _, id := range tt.args.request.DeviceId {
+ // Create a kafka response channel per device
+ chnl := make(chan *kafka.RpcResponse)
+
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Download_onu_image",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ id.Id, gomock.Any()).Return(chnl)
+
+ kafkaRespChans[id.Id] = chnl
+ }
+
+ // Send the expected response to channel from a goroutine
+ go func() {
+ for _, agent := range agents {
+ reply := newImageDownloadAdapterResponse(t, agent.deviceID, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }
+ }()
+ }
+
+ got, err := dat.deviceMgr.DownloadImageToDevice(tt.args.ctx, tt.args.request)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("DownloadImageToDevice() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+
+ if !gotAllSuccess(got, tt.want) {
+ t.Errorf("DownloadImageToDevice() got = %v, want = %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestManager_GetImageStatus(t *testing.T) {
+ type args struct {
+ ctx context.Context
+ request *voltha.DeviceImageRequest
+ }
+
+ ctx := context.Background()
+ dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+ tests := []struct {
+ name string
+ args args
+ want *voltha.DeviceImageResponse
+ wantErr bool
+ }{
+ {
+ name: "request-for-single-device",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents[:1]),
+ },
+ want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ {
+ name: "request-for-multiple-devices",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents),
+ },
+ want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.name == "request-for-single-device" {
+ chnl := make(chan *kafka.RpcResponse, 10)
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Get_onu_image_status",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ gomock.Any(), gomock.Any()).Return(chnl)
+ // Send the expected response to channel from a goroutine
+ go func() {
+ reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }()
+ } else if tt.name == "request-for-multiple-devices" {
+ // Map to store per device kafka response channel
+ kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+ for _, id := range tt.args.request.DeviceId {
+ // Create a kafka response channel per device
+ chnl := make(chan *kafka.RpcResponse)
+
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Get_onu_image_status",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ id.Id, gomock.Any()).Return(chnl)
+
+ kafkaRespChans[id.Id] = chnl
+ }
+
+ // Send the expected response to channel from a goroutine
+ go func() {
+ for _, agent := range agents {
+ reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }
+ }()
+ }
+
+ got, err := dat.deviceMgr.GetImageStatus(tt.args.ctx, tt.args.request)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("GetImageStatus() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+
+ if !gotAllSuccess(got, tt.want) {
+ t.Errorf("GetImageStatus() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestManager_AbortImageUpgradeToDevice(t *testing.T) {
+
+ type args struct {
+ ctx context.Context
+ request *voltha.DeviceImageRequest
+ }
+
+ ctx := context.Background()
+ dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+ tests := []struct {
+ name string
+ args args
+ want *voltha.DeviceImageResponse
+ wantErr bool
+ }{
+ {
+ name: "request-for-single-device",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents[:1]),
+ },
+ want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ {
+ name: "request-for-multiple-devices",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents[:1]),
+ },
+ want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.name == "request-for-single-device" {
+ chnl := make(chan *kafka.RpcResponse, 10)
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Abort_onu_image_upgrade",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ gomock.Any(), gomock.Any()).Return(chnl)
+ // Send the expected response to channel from a goroutine
+ go func() {
+ reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR)
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }()
+ } else if tt.name == "request-for-multiple-devices" {
+ // Map to store per device kafka response channel
+ kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+ for _, id := range tt.args.request.DeviceId {
+ // Create a kafka response channel per device
+ chnl := make(chan *kafka.RpcResponse)
+
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Abort_onu_image_upgrade",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ id.Id, gomock.Any()).Return(chnl)
+
+ kafkaRespChans[id.Id] = chnl
+ }
+
+ // Send the expected response to channel from a goroutine
+ go func() {
+ for _, agent := range agents {
+ reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR)
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }
+ }()
+ }
+ got, err := dat.deviceMgr.AbortImageUpgradeToDevice(tt.args.ctx, tt.args.request)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("AbortImageUpgradeToDevice() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+
+ if !gotAllSuccess(got, tt.want) {
+ t.Errorf("AbortImageUpgradeToDevice() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestManager_ActivateImage(t *testing.T) {
+ type args struct {
+ ctx context.Context
+ request *voltha.DeviceImageRequest
+ }
+
+ ctx := context.Background()
+ dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+ tests := []struct {
+ name string
+ args args
+ want *voltha.DeviceImageResponse
+ wantErr bool
+ }{
+ {
+ name: "request-for-single-device",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents[:1]),
+ },
+ want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ {
+ name: "request-for-multiple-devices",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents),
+ },
+ want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.name == "request-for-single-device" {
+ chnl := make(chan *kafka.RpcResponse, 10)
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Activate_onu_image",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ gomock.Any(), gomock.Any()).Return(chnl)
+ // Send the expected response to channel from a goroutine
+ go func() {
+ reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR)
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }()
+ } else if tt.name == "request-for-multiple-devices" {
+ // Map to store per device kafka response channel
+ kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+ for _, id := range tt.args.request.DeviceId {
+ // Create a kafka response channel per device
+ chnl := make(chan *kafka.RpcResponse)
+
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Activate_onu_image",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ id.Id, gomock.Any()).Return(chnl)
+
+ kafkaRespChans[id.Id] = chnl
+ }
+
+ // Send the expected response to channel from a goroutine
+ go func() {
+ for _, agent := range agents {
+ reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR)
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }
+ }()
+ }
+ got, err := dat.deviceMgr.ActivateImage(tt.args.ctx, tt.args.request)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("ActivateImage() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !gotAllSuccess(got, tt.want) {
+ t.Errorf("ActivateImage() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestManager_CommitImage(t *testing.T) {
+ type args struct {
+ ctx context.Context
+ request *voltha.DeviceImageRequest
+ }
+
+ ctx := context.Background()
+ dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+ tests := []struct {
+ name string
+ args args
+ want *voltha.DeviceImageResponse
+ wantErr bool
+ }{
+ {
+ name: "request-for-single-device",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents[:1]),
+ },
+ want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ {
+ name: "request-for-multiple-devices",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents),
+ },
+ want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.name == "request-for-single-device" {
+ chnl := make(chan *kafka.RpcResponse, 10)
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Commit_onu_image",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ gomock.Any(), gomock.Any()).Return(chnl)
+ // Send the expected response to channel from a goroutine
+ go func() {
+ reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR)
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }()
+ } else if tt.name == "request-for-multiple-devices" {
+ // Map to store per device kafka response channel
+ kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+ for _, id := range tt.args.request.DeviceId {
+ // Create a kafka response channel per device
+ chnl := make(chan *kafka.RpcResponse)
+
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Commit_onu_image",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ id.Id, gomock.Any()).Return(chnl)
+
+ kafkaRespChans[id.Id] = chnl
+ }
+
+ // Send the expected response to channel from a goroutine
+ go func() {
+ for _, agent := range agents {
+ reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR)
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }
+ }()
+ }
+ got, err := dat.deviceMgr.CommitImage(tt.args.ctx, tt.args.request)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("CommitImage() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !gotAllSuccess(got, tt.want) {
+ t.Errorf("CommitImage() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestManager_GetOnuImages(t *testing.T) {
+ type args struct {
+ ctx context.Context
+ id *common.ID
+ }
+
+ ctx := context.Background()
+ dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+ tests := []struct {
+ name string
+ args args
+ want *voltha.OnuImages
+ wantErr bool
+ }{
+ {
+ name: "request-for-single-device",
+ args: args{
+ ctx: ctx,
+ id: &common.ID{
+ Id: agents[0].deviceID,
+ },
+ },
+ want: &voltha.OnuImages{
+ Items: []*voltha.OnuImage{{
+ Version: version,
+ IsCommited: true,
+ IsActive: true,
+ IsValid: true,
+ }},
+ },
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.name == "request-for-single-device" {
+ chnl := make(chan *kafka.RpcResponse, 10)
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Get_onu_images",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ gomock.Any(), gomock.Any()).Return(chnl)
+ // Send the expected response to channel from a goroutine
+ go func() {
+ reply := newOnuImagesResponse(t)
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }()
+ }
+
+ got, err := dat.deviceMgr.GetOnuImages(tt.args.ctx, tt.args.id)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("GetOnuImages() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("GetOnuImages() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+// verify that we got all the wanted response (order not important)
+func gotAllSuccess(got, want *voltha.DeviceImageResponse) bool {
+ for _, imagestateGot := range got.DeviceImageStates {
+ found := false
+ for _, imageStateWant := range want.DeviceImageStates {
+ if reflect.DeepEqual(imagestateGot, imageStateWant) {
+ found = true
+ }
+ }
+
+ if !found {
+ return false
+ }
+ }
+
+ return true
+}
+
+func newDeviceImagedRequest(agents []*Agent) *voltha.DeviceImageRequest {
+ imgReq := &voltha.DeviceImageRequest{
+ Version: version,
+ CommitOnSuccess: true,
+ }
+
+ for _, agent := range agents {
+ if agent != nil {
+ imgReq.DeviceId = append(imgReq.DeviceId, &common.ID{
+ Id: agent.deviceID,
+ })
+ }
+ }
+
+ return imgReq
+}
+
+func newDeviceImageDownloadRequest(agents []*Agent) *voltha.DeviceImageDownloadRequest {
+ imgDownReq := &voltha.DeviceImageDownloadRequest{
+ Image: &voltha.Image{
+ Version: version,
+ Url: url,
+ Vendor: vendor,
+ },
+ ActivateOnSuccess: true,
+ CommitOnSuccess: true,
+ }
+
+ for _, agent := range agents {
+ if agent != nil {
+ imgDownReq.DeviceId = append(imgDownReq.DeviceId, &common.ID{
+ Id: agent.deviceID,
+ })
+ }
+ }
+
+ return imgDownReq
+}
+
+func newImageResponse(agents []*Agent,
+ downloadState voltha.ImageState_ImageDownloadState,
+ imageSate voltha.ImageState_ImageActivationState,
+ reason voltha.ImageState_ImageFailureReason) *voltha.DeviceImageResponse {
+ response := &voltha.DeviceImageResponse{}
+
+ for _, agent := range agents {
+ response.DeviceImageStates = append(response.DeviceImageStates, &voltha.DeviceImageState{
+ DeviceId: agent.deviceID,
+ ImageState: &voltha.ImageState{
+ Version: version,
+ DownloadState: downloadState,
+ Reason: reason,
+ ImageState: imageSate,
+ },
+ })
+ }
+
+ return response
+}
+
+func newImageDownloadAdapterResponse(t *testing.T,
+ deviceID string,
+ downloadState voltha.ImageState_ImageDownloadState,
+ imageSate voltha.ImageState_ImageActivationState,
+ reason voltha.ImageState_ImageFailureReason) *any.Any {
+ reply, err := ptypes.MarshalAny(&voltha.DeviceImageResponse{
+ DeviceImageStates: []*voltha.DeviceImageState{{
+ DeviceId: deviceID,
+ ImageState: &voltha.ImageState{
+ Version: version,
+ DownloadState: downloadState,
+ Reason: reason,
+ ImageState: imageSate,
+ },
+ }},
+ })
+ assert.Nil(t, err)
+ return reply
+}
+
+func newImageStatusAdapterResponse(t *testing.T,
+ agents []*Agent,
+ downloadState voltha.ImageState_ImageDownloadState,
+ imageSate voltha.ImageState_ImageActivationState,
+ reason voltha.ImageState_ImageFailureReason) *any.Any {
+ imgResponse := &voltha.DeviceImageResponse{}
+ for _, agent := range agents {
+ imgResponse.DeviceImageStates = append(imgResponse.DeviceImageStates, &voltha.DeviceImageState{
+ DeviceId: agent.deviceID,
+ ImageState: &voltha.ImageState{
+ Version: version,
+ DownloadState: downloadState,
+ Reason: reason,
+ ImageState: imageSate,
+ },
+ })
+ }
+
+ reply, err := ptypes.MarshalAny(imgResponse)
+ assert.Nil(t, err)
+ return reply
+}
+
+func newOnuImagesResponse(t *testing.T) *any.Any {
+ onuImages := &voltha.OnuImages{
+ Items: []*voltha.OnuImage{{
+ Version: version,
+ IsCommited: true,
+ IsActive: true,
+ IsValid: true,
+ }},
+ }
+
+ reply, err := ptypes.MarshalAny(onuImages)
+ assert.Nil(t, err)
+ return reply
+}