[VOL-4022] RW-Core Changes For ONU SW Upgrade
New Download/Activate/Retrieve APIs
Change-Id: I2d8a0ec7d8967fd76a261a108f743e75f84c98e9
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index 9816927..3781540 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -18,6 +18,8 @@
import (
"context"
+
+ "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-protos/v4/go/common"
"github.com/gogo/protobuf/proto"
@@ -387,3 +389,185 @@
}
}
+
+func (agent *Agent) downloadImageToDevice(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+
+ logger.Debugw(ctx, "download-image-to-device", log.Fields{"device-id": agent.deviceID})
+ if agent.device.Root {
+ agent.requestQueue.RequestComplete()
+ return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, is an OLT. Image update "+
+ "not supported by VOLTHA. Use Device Manager or other means", agent.deviceID)
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
+ ch, err := agent.adapterProxy.DownloadImageToOnuDevice(subCtx, cloned, request)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+
+ return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+}
+
+func (agent *Agent) getImageStatus(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ logger.Debugw(ctx, "get-image-status", log.Fields{"device-id": agent.deviceID})
+
+ ch, err := agent.adapterProxy.GetOnuImageStatus(subCtx, cloned, request)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+
+ return agent.getDeviceImageResponseFromAdapter(subCtx, ch)
+}
+
+func (agent *Agent) activateImageOnDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ logger.Debugw(ctx, "activate-image-on-device", log.Fields{"device-id": agent.deviceID})
+
+ ch, err := agent.adapterProxy.ActivateOnuImage(subCtx, cloned, request)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+
+ return agent.getDeviceImageResponseFromAdapter(subCtx, ch)
+}
+
+func (agent *Agent) abortImageUpgradeToDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ logger.Debugw(ctx, "abort-image-on-device", log.Fields{"device-id": agent.deviceID})
+
+ ch, err := agent.adapterProxy.AbortImageUpgrade(subCtx, cloned, request)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+
+ return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+}
+
+func (agent *Agent) commitImage(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ logger.Debugw(ctx, "commit-image-on-device", log.Fields{"device-id": agent.deviceID})
+
+ ch, err := agent.adapterProxy.CommitImage(subCtx, cloned, request)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+
+ return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+}
+
+func (agent *Agent) getOnuImages(ctx context.Context, id *common.ID) (*voltha.OnuImages, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ logger.Debugw(ctx, "get-onu-images", log.Fields{"device-id": agent.deviceID})
+
+ // Send the request to the adapter
+ ch, err := agent.adapterProxy.GetOnuImages(subCtx, cloned, id)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+
+ //wait for adapter response
+ select {
+ case rpcResponse, ok := <-ch:
+ if !ok {
+ return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+ } else if rpcResponse.Err != nil {
+ // return error
+ return nil, status.Errorf(codes.Internal, "%s", rpcResponse.Err.Error())
+ } else {
+ resp := &voltha.OnuImages{}
+ if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
+ return nil, status.Errorf(codes.Internal, "%s", err.Error())
+ }
+
+ return resp, nil
+ }
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+}
+
+func (agent *Agent) getDeviceImageResponseFromAdapter(ctx context.Context, ch chan *kafka.RpcResponse) (*voltha.DeviceImageResponse, error) {
+ //wait for adapter response
+ select {
+ case rpcResponse, ok := <-ch:
+ if !ok {
+ return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+ } else if rpcResponse.Err != nil {
+ // return error
+ return nil, status.Errorf(codes.Internal, "%s", rpcResponse.Err.Error())
+ } else {
+ resp := &voltha.DeviceImageResponse{}
+ if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
+ return nil, status.Errorf(codes.Internal, "%s", err.Error())
+ }
+
+ if len(resp.DeviceImageStates) == 0 || resp.DeviceImageStates[0] == nil {
+ return nil, status.Errorf(codes.Internal, "invalid response from adapter")
+ }
+
+ return resp, nil
+ }
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 9d9f933..43bd2be 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -1643,3 +1643,321 @@
}
return agent.getTransientState(), nil
}
+
+func (dMgr *Manager) DownloadImageToDevice(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) {
+ if err := dMgr.validateImageDownloadRequest(request); err != nil {
+ return nil, err
+ }
+
+ ctx = utils.WithRPCMetadataContext(ctx, "DownloadImageToDevice")
+ respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
+
+ downloadReq := &voltha.DeviceImageDownloadRequest{
+ Image: request.Image,
+ ActivateOnSuccess: request.ActivateOnSuccess,
+ CommitOnSuccess: request.CommitOnSuccess,
+ }
+
+ for index, deviceID := range request.DeviceId {
+ //slice-out only single deviceID from the request
+ downloadReq.DeviceId = request.DeviceId[index : index+1]
+
+ go func(deviceID string, req *voltha.DeviceImageDownloadRequest, ch chan []*voltha.DeviceImageState) {
+ agent := dMgr.getDeviceAgent(ctx, deviceID)
+ if agent == nil {
+ logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
+ ch <- nil
+ return
+ }
+
+ resp, err := agent.downloadImageToDevice(ctx, req)
+ if err != nil {
+ logger.Errorw(ctx, "download-image-to-device-failed", log.Fields{"device-id": deviceID, "error": err})
+ ch <- nil
+ return
+ }
+
+ err = dMgr.validateDeviceImageResponse(resp)
+ if err != nil {
+ logger.Errorw(ctx, "download-image-to-device-failed", log.Fields{"device-id": deviceID, "error": err})
+ ch <- nil
+ return
+ }
+ ch <- resp.GetDeviceImageStates()
+ }(deviceID.GetId(), downloadReq, respCh)
+
+ }
+
+ return dMgr.waitForAllResponses(ctx, "download-image-to-device", respCh, len(request.GetDeviceId()))
+}
+
+func (dMgr *Manager) GetImageStatus(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+ if err := dMgr.validateImageRequest(request); err != nil {
+ return nil, err
+ }
+
+ ctx = utils.WithRPCMetadataContext(ctx, "GetImageStatus")
+
+ imageStatusReq := &voltha.DeviceImageRequest{
+ Version: request.Version,
+ CommitOnSuccess: request.CommitOnSuccess,
+ }
+
+ respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
+ for index, deviceID := range request.DeviceId {
+ //slice-out only single deviceID from the request
+ imageStatusReq.DeviceId = request.DeviceId[index : index+1]
+
+ go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
+ agent := dMgr.getDeviceAgent(ctx, deviceID)
+ if agent == nil {
+ logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
+ ch <- nil
+ return
+ }
+
+ resp, err := agent.getImageStatus(ctx, req)
+ if err != nil {
+ logger.Errorw(ctx, "get-image-status-failed", log.Fields{"device-id": deviceID, "error": err})
+ ch <- nil
+ return
+ }
+
+ err = dMgr.validateDeviceImageResponse(resp)
+ if err != nil {
+ logger.Errorw(ctx, "get-image-status-failed", log.Fields{"device-id": deviceID, "error": err})
+ ch <- nil
+ return
+ }
+ ch <- resp.GetDeviceImageStates()
+ }(deviceID.GetId(), imageStatusReq, respCh)
+
+ }
+
+ return dMgr.waitForAllResponses(ctx, "get-image-status", respCh, len(request.GetDeviceId()))
+}
+
+func (dMgr *Manager) AbortImageUpgradeToDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+ if err := dMgr.validateImageRequest(request); err != nil {
+ return nil, err
+ }
+
+ ctx = utils.WithRPCMetadataContext(ctx, "AbortImageUpgradeToDevice")
+ respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
+
+ abortImageReq := &voltha.DeviceImageRequest{
+ Version: request.Version,
+ CommitOnSuccess: request.CommitOnSuccess,
+ }
+
+ for index, deviceID := range request.DeviceId {
+ //slice-out only single deviceID from the request
+ abortImageReq.DeviceId = request.DeviceId[index : index+1]
+
+ go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
+ agent := dMgr.getDeviceAgent(ctx, deviceID)
+ if agent == nil {
+ logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
+ ch <- nil
+ return
+ }
+
+ resp, err := agent.abortImageUpgradeToDevice(ctx, req)
+ if err != nil {
+ logger.Errorw(ctx, "abort-image-upgrade-to-device-failed", log.Fields{"device-id": deviceID, "error": err})
+ ch <- nil
+ return
+ }
+
+ err = dMgr.validateDeviceImageResponse(resp)
+ if err != nil {
+ logger.Errorw(ctx, "abort-image-upgrade-to-device-failed", log.Fields{"device-id": deviceID, "error": err})
+ ch <- nil
+ return
+ }
+ ch <- resp.GetDeviceImageStates()
+ }(deviceID.GetId(), abortImageReq, respCh)
+
+ }
+
+ return dMgr.waitForAllResponses(ctx, "abort-image-upgrade-to-device", respCh, len(request.GetDeviceId()))
+}
+
+func (dMgr *Manager) GetOnuImages(ctx context.Context, id *common.ID) (*voltha.OnuImages, error) {
+ if id == nil || id.Id == "" {
+ return nil, status.Errorf(codes.InvalidArgument, "empty device id")
+ }
+
+ ctx = utils.WithRPCMetadataContext(ctx, "GetOnuImages")
+ log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
+ agent := dMgr.getDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
+ }
+
+ resp, err := agent.getOnuImages(ctx, id)
+ if err != nil {
+ return nil, err
+ }
+
+ logger.Debugw(ctx, "get-onu-images-result", log.Fields{"onu-image": resp.Items})
+
+ return resp, nil
+}
+
+func (dMgr *Manager) ActivateImage(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+ if err := dMgr.validateImageRequest(request); err != nil {
+ return nil, err
+ }
+
+ ctx = utils.WithRPCMetadataContext(ctx, "ActivateImage")
+ respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
+
+ activateImageReq := &voltha.DeviceImageRequest{
+ Version: request.Version,
+ CommitOnSuccess: request.CommitOnSuccess,
+ }
+
+ for index, deviceID := range request.DeviceId {
+ //slice-out only single deviceID from the request
+ activateImageReq.DeviceId = request.DeviceId[index : index+1]
+
+ go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
+ agent := dMgr.getDeviceAgent(ctx, deviceID)
+ if agent == nil {
+ logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
+ ch <- nil
+ return
+ }
+
+ resp, err := agent.activateImageOnDevice(ctx, req)
+ if err != nil {
+ logger.Errorw(ctx, "activate-image-failed", log.Fields{"device-id": deviceID, "error": err})
+ ch <- nil
+ return
+ }
+
+ err = dMgr.validateDeviceImageResponse(resp)
+ if err != nil {
+ logger.Errorw(ctx, "activate-image-failed", log.Fields{"device-id": deviceID, "error": err})
+ ch <- nil
+ return
+ }
+
+ ch <- resp.GetDeviceImageStates()
+ }(deviceID.GetId(), activateImageReq, respCh)
+
+ }
+
+ return dMgr.waitForAllResponses(ctx, "activate-image", respCh, len(request.GetDeviceId()))
+}
+
+func (dMgr *Manager) CommitImage(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+ if err := dMgr.validateImageRequest(request); err != nil {
+ return nil, err
+ }
+
+ ctx = utils.WithRPCMetadataContext(ctx, "CommitImage")
+ respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
+
+ commitImageReq := &voltha.DeviceImageRequest{
+ Version: request.Version,
+ CommitOnSuccess: request.CommitOnSuccess,
+ }
+
+ for index, deviceID := range request.DeviceId {
+ //slice-out only single deviceID from the request
+ commitImageReq.DeviceId = request.DeviceId[index : index+1]
+
+ go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
+ agent := dMgr.getDeviceAgent(ctx, deviceID)
+ if agent == nil {
+ logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
+ ch <- nil
+ return
+ }
+
+ resp, err := agent.commitImage(ctx, req)
+ if err != nil {
+ logger.Errorw(ctx, "commit-image-failed", log.Fields{"device-id": deviceID, "error": err})
+ ch <- nil
+ return
+ }
+
+ err = dMgr.validateDeviceImageResponse(resp)
+ if err != nil {
+ logger.Errorf(ctx, "commit-image-failed", log.Fields{"device-id": deviceID, "error": err})
+ ch <- nil
+ return
+ }
+ ch <- resp.GetDeviceImageStates()
+ }(deviceID.GetId(), commitImageReq, respCh)
+
+ }
+
+ return dMgr.waitForAllResponses(ctx, "commit-image", respCh, len(request.GetDeviceId()))
+}
+
+func (dMgr *Manager) validateImageDownloadRequest(request *voltha.DeviceImageDownloadRequest) error {
+ if request == nil || request.Image == nil || len(request.DeviceId) == 0 {
+ return status.Errorf(codes.InvalidArgument, "invalid argument")
+ }
+
+ for _, deviceID := range request.DeviceId {
+ if deviceID == nil {
+ return status.Errorf(codes.InvalidArgument, "id is nil")
+ }
+ }
+ return nil
+}
+
+func (dMgr *Manager) validateImageRequest(request *voltha.DeviceImageRequest) error {
+ if request == nil || len(request.DeviceId) == 0 || request.DeviceId[0] == nil {
+ return status.Errorf(codes.InvalidArgument, "invalid argument")
+ }
+
+ for _, deviceID := range request.DeviceId {
+ if deviceID == nil {
+ return status.Errorf(codes.InvalidArgument, "id is nil")
+ }
+ }
+
+ return nil
+}
+
+func (dMgr *Manager) validateDeviceImageResponse(response *voltha.DeviceImageResponse) error {
+ if response == nil || len(response.GetDeviceImageStates()) == 0 || response.GetDeviceImageStates()[0] == nil {
+ return status.Errorf(codes.Internal, "invalid-response-from-adapter")
+ }
+
+ return nil
+}
+
+func (dMgr *Manager) waitForAllResponses(ctx context.Context, opName string, respCh chan []*voltha.DeviceImageState, expectedResps int) (*voltha.DeviceImageResponse, error) {
+ response := &voltha.DeviceImageResponse{}
+ respCount := 0
+ for {
+ select {
+ case resp, ok := <-respCh:
+ if !ok {
+ logger.Errorw(ctx, opName+"-failed", log.Fields{"error": "channel-closed"})
+ return response, status.Errorf(codes.Aborted, "channel-closed")
+ }
+
+ if resp != nil {
+ logger.Debugw(ctx, opName+"-result", log.Fields{"image-state": resp[0].GetImageState(), "device-id": resp[0].GetDeviceId()})
+ response.DeviceImageStates = append(response.DeviceImageStates, resp...)
+ }
+
+ respCount++
+
+ //check whether all responses received, if so, sent back the collated response
+ if respCount == expectedResps {
+ return response, nil
+ }
+ continue
+ case <-ctx.Done():
+ return nil, status.Errorf(codes.Aborted, opName+"-failed-%s", ctx.Err())
+ }
+ }
+}
diff --git a/rw_core/core/device/manager_test.go b/rw_core/core/device/manager_test.go
new file mode 100644
index 0000000..40407f9
--- /dev/null
+++ b/rw_core/core/device/manager_test.go
@@ -0,0 +1,886 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "reflect"
+ "strconv"
+ "testing"
+
+ "github.com/golang/mock/gomock"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/rw_core/config"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ tst "github.com/opencord/voltha-go/rw_core/test"
+ "github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
+ "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
+ "github.com/opencord/voltha-protos/v4/go/common"
+ "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/phayes/freeport"
+ "github.com/stretchr/testify/assert"
+)
+
+const (
+ version = "dummy-version"
+ url = "http://127.0.0.1:2222/dummy-image"
+ vendor = "dummy"
+
+ numberOfTestDevices = 10
+)
+
+func initialiseTest(ctx context.Context, t *testing.T) (*DATest, *MockInterContainerProxy, []*Agent) {
+ dat := newDATest(ctx)
+
+ controller := gomock.NewController(t)
+ mockICProxy := NewMockInterContainerProxy(controller)
+
+ // Set expectations for the mock
+ mockICProxy.EXPECT().Start(gomock.Any()).AnyTimes().Return(nil)
+ mockICProxy.EXPECT().SubscribeWithDefaultRequestHandler(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
+
+ dat.startCoreWithCustomICProxy(ctx, mockICProxy)
+
+ var agents []*Agent
+ for i := 1; i <= numberOfTestDevices; i++ {
+ if agent := dat.createDeviceAgent(t); agent != nil {
+ agents = append(agents, agent)
+ }
+ }
+
+ assert.Equal(t, len(agents), numberOfTestDevices)
+
+ dat.oltAdapter, dat.onuAdapter = tst.CreateAndregisterAdapters(ctx,
+ t,
+ dat.kClient,
+ dat.coreInstanceID,
+ dat.oltAdapterName,
+ dat.onuAdapterName,
+ dat.adapterMgr)
+
+ return dat, mockICProxy, agents
+}
+
+func (dat *DATest) startCoreWithCustomICProxy(ctx context.Context, kmp kafka.InterContainerProxy) {
+ cfg := config.NewRWCoreFlags()
+ cfg.CoreTopic = "rw_core"
+ cfg.EventTopic = "voltha.events"
+ cfg.DefaultRequestTimeout = dat.defaultTimeout
+ cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
+ grpcPort, err := freeport.GetFreePort()
+ if err != nil {
+ logger.Fatal(ctx, "Cannot get a freeport for grpc")
+ }
+ cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
+ client := tst.SetupKVClient(ctx, cfg, dat.coreInstanceID)
+ backend := &db.Backend{
+ Client: client,
+ StoreType: cfg.KVStoreType,
+ Address: cfg.KVStoreAddress,
+ Timeout: cfg.KVStoreTimeout,
+ LivenessChannelInterval: cfg.LiveProbeInterval / 2}
+
+ dat.kmp = kmp
+
+ endpointMgr := kafka.NewEndpointManager(backend)
+ proxy := model.NewDBPath(backend)
+ dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
+ eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+ dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy, cfg.VolthaStackID)
+ dat.adapterMgr.Start(context.Background())
+ if err = dat.kmp.Start(ctx); err != nil {
+ logger.Fatal(ctx, "Cannot start InterContainerProxy")
+ }
+
+ if err := dat.kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: cfg.CoreTopic}, kafka.OffsetNewest); err != nil {
+ logger.Fatalf(ctx, "Cannot add default request handler: %s", err)
+ }
+
+}
+
+func TestManager_DownloadImageToDevice(t *testing.T) {
+ type args struct {
+ ctx context.Context
+ request *voltha.DeviceImageDownloadRequest
+ }
+
+ ctx := context.Background()
+ dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+ tests := []struct {
+ name string
+ args args
+ want *voltha.DeviceImageResponse
+ wantErr bool
+ }{
+ {
+ name: "request-for-single-device",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImageDownloadRequest(agents[:1]),
+ },
+ want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ {
+ name: "request-for-multiple-devices",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImageDownloadRequest(agents),
+ },
+ want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.name == "request-for-single-device" {
+ chnl := make(chan *kafka.RpcResponse, 10)
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Download_onu_image",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ gomock.Any(), gomock.Any()).Return(chnl)
+ // Send the expected response to channel from a goroutine
+ go func() {
+ reply := newImageDownloadAdapterResponse(t, agents[0].deviceID, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }()
+ } else if tt.name == "request-for-multiple-devices" {
+ // Map to store per device kafka response channel
+ kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+ for _, id := range tt.args.request.DeviceId {
+ // Create a kafka response channel per device
+ chnl := make(chan *kafka.RpcResponse)
+
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Download_onu_image",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ id.Id, gomock.Any()).Return(chnl)
+
+ kafkaRespChans[id.Id] = chnl
+ }
+
+ // Send the expected response to channel from a goroutine
+ go func() {
+ for _, agent := range agents {
+ reply := newImageDownloadAdapterResponse(t, agent.deviceID, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }
+ }()
+ }
+
+ got, err := dat.deviceMgr.DownloadImageToDevice(tt.args.ctx, tt.args.request)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("DownloadImageToDevice() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+
+ if !gotAllSuccess(got, tt.want) {
+ t.Errorf("DownloadImageToDevice() got = %v, want = %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestManager_GetImageStatus(t *testing.T) {
+ type args struct {
+ ctx context.Context
+ request *voltha.DeviceImageRequest
+ }
+
+ ctx := context.Background()
+ dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+ tests := []struct {
+ name string
+ args args
+ want *voltha.DeviceImageResponse
+ wantErr bool
+ }{
+ {
+ name: "request-for-single-device",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents[:1]),
+ },
+ want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ {
+ name: "request-for-multiple-devices",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents),
+ },
+ want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.name == "request-for-single-device" {
+ chnl := make(chan *kafka.RpcResponse, 10)
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Get_onu_image_status",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ gomock.Any(), gomock.Any()).Return(chnl)
+ // Send the expected response to channel from a goroutine
+ go func() {
+ reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }()
+ } else if tt.name == "request-for-multiple-devices" {
+ // Map to store per device kafka response channel
+ kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+ for _, id := range tt.args.request.DeviceId {
+ // Create a kafka response channel per device
+ chnl := make(chan *kafka.RpcResponse)
+
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Get_onu_image_status",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ id.Id, gomock.Any()).Return(chnl)
+
+ kafkaRespChans[id.Id] = chnl
+ }
+
+ // Send the expected response to channel from a goroutine
+ go func() {
+ for _, agent := range agents {
+ reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }
+ }()
+ }
+
+ got, err := dat.deviceMgr.GetImageStatus(tt.args.ctx, tt.args.request)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("GetImageStatus() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+
+ if !gotAllSuccess(got, tt.want) {
+ t.Errorf("GetImageStatus() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestManager_AbortImageUpgradeToDevice(t *testing.T) {
+
+ type args struct {
+ ctx context.Context
+ request *voltha.DeviceImageRequest
+ }
+
+ ctx := context.Background()
+ dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+ tests := []struct {
+ name string
+ args args
+ want *voltha.DeviceImageResponse
+ wantErr bool
+ }{
+ {
+ name: "request-for-single-device",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents[:1]),
+ },
+ want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ {
+ name: "request-for-multiple-devices",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents[:1]),
+ },
+ want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.name == "request-for-single-device" {
+ chnl := make(chan *kafka.RpcResponse, 10)
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Abort_onu_image_upgrade",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ gomock.Any(), gomock.Any()).Return(chnl)
+ // Send the expected response to channel from a goroutine
+ go func() {
+ reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR)
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }()
+ } else if tt.name == "request-for-multiple-devices" {
+ // Map to store per device kafka response channel
+ kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+ for _, id := range tt.args.request.DeviceId {
+ // Create a kafka response channel per device
+ chnl := make(chan *kafka.RpcResponse)
+
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Abort_onu_image_upgrade",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ id.Id, gomock.Any()).Return(chnl)
+
+ kafkaRespChans[id.Id] = chnl
+ }
+
+ // Send the expected response to channel from a goroutine
+ go func() {
+ for _, agent := range agents {
+ reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR)
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }
+ }()
+ }
+ got, err := dat.deviceMgr.AbortImageUpgradeToDevice(tt.args.ctx, tt.args.request)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("AbortImageUpgradeToDevice() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+
+ if !gotAllSuccess(got, tt.want) {
+ t.Errorf("AbortImageUpgradeToDevice() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestManager_ActivateImage(t *testing.T) {
+ type args struct {
+ ctx context.Context
+ request *voltha.DeviceImageRequest
+ }
+
+ ctx := context.Background()
+ dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+ tests := []struct {
+ name string
+ args args
+ want *voltha.DeviceImageResponse
+ wantErr bool
+ }{
+ {
+ name: "request-for-single-device",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents[:1]),
+ },
+ want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ {
+ name: "request-for-multiple-devices",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents),
+ },
+ want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.name == "request-for-single-device" {
+ chnl := make(chan *kafka.RpcResponse, 10)
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Activate_onu_image",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ gomock.Any(), gomock.Any()).Return(chnl)
+ // Send the expected response to channel from a goroutine
+ go func() {
+ reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR)
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }()
+ } else if tt.name == "request-for-multiple-devices" {
+ // Map to store per device kafka response channel
+ kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+ for _, id := range tt.args.request.DeviceId {
+ // Create a kafka response channel per device
+ chnl := make(chan *kafka.RpcResponse)
+
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Activate_onu_image",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ id.Id, gomock.Any()).Return(chnl)
+
+ kafkaRespChans[id.Id] = chnl
+ }
+
+ // Send the expected response to channel from a goroutine
+ go func() {
+ for _, agent := range agents {
+ reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR)
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }
+ }()
+ }
+ got, err := dat.deviceMgr.ActivateImage(tt.args.ctx, tt.args.request)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("ActivateImage() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !gotAllSuccess(got, tt.want) {
+ t.Errorf("ActivateImage() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestManager_CommitImage(t *testing.T) {
+ type args struct {
+ ctx context.Context
+ request *voltha.DeviceImageRequest
+ }
+
+ ctx := context.Background()
+ dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+ tests := []struct {
+ name string
+ args args
+ want *voltha.DeviceImageResponse
+ wantErr bool
+ }{
+ {
+ name: "request-for-single-device",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents[:1]),
+ },
+ want: newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ {
+ name: "request-for-multiple-devices",
+ args: args{
+ ctx: ctx,
+ request: newDeviceImagedRequest(agents),
+ },
+ want: newImageResponse(agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR),
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.name == "request-for-single-device" {
+ chnl := make(chan *kafka.RpcResponse, 10)
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Commit_onu_image",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ gomock.Any(), gomock.Any()).Return(chnl)
+ // Send the expected response to channel from a goroutine
+ go func() {
+ reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR)
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }()
+ } else if tt.name == "request-for-multiple-devices" {
+ // Map to store per device kafka response channel
+ kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+ for _, id := range tt.args.request.DeviceId {
+ // Create a kafka response channel per device
+ chnl := make(chan *kafka.RpcResponse)
+
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Commit_onu_image",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ id.Id, gomock.Any()).Return(chnl)
+
+ kafkaRespChans[id.Id] = chnl
+ }
+
+ // Send the expected response to channel from a goroutine
+ go func() {
+ for _, agent := range agents {
+ reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR)
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }
+ }()
+ }
+ got, err := dat.deviceMgr.CommitImage(tt.args.ctx, tt.args.request)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("CommitImage() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !gotAllSuccess(got, tt.want) {
+ t.Errorf("CommitImage() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestManager_GetOnuImages(t *testing.T) {
+ type args struct {
+ ctx context.Context
+ id *common.ID
+ }
+
+ ctx := context.Background()
+ dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+ tests := []struct {
+ name string
+ args args
+ want *voltha.OnuImages
+ wantErr bool
+ }{
+ {
+ name: "request-for-single-device",
+ args: args{
+ ctx: ctx,
+ id: &common.ID{
+ Id: agents[0].deviceID,
+ },
+ },
+ want: &voltha.OnuImages{
+ Items: []*voltha.OnuImage{{
+ Version: version,
+ IsCommited: true,
+ IsActive: true,
+ IsValid: true,
+ }},
+ },
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.name == "request-for-single-device" {
+ chnl := make(chan *kafka.RpcResponse, 10)
+ // Set expectation for the API invocation
+ mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+ "Get_onu_images",
+ gomock.Any(),
+ gomock.Any(),
+ true,
+ gomock.Any(), gomock.Any()).Return(chnl)
+ // Send the expected response to channel from a goroutine
+ go func() {
+ reply := newOnuImagesResponse(t)
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcSent,
+ Err: nil,
+ Reply: reply,
+ }
+
+ chnl <- &kafka.RpcResponse{
+ MType: kafka.RpcReply,
+ Err: nil,
+ Reply: reply,
+ }
+ }()
+ }
+
+ got, err := dat.deviceMgr.GetOnuImages(tt.args.ctx, tt.args.id)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("GetOnuImages() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("GetOnuImages() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+// verify that we got all the wanted response (order not important)
+func gotAllSuccess(got, want *voltha.DeviceImageResponse) bool {
+ for _, imagestateGot := range got.DeviceImageStates {
+ found := false
+ for _, imageStateWant := range want.DeviceImageStates {
+ if reflect.DeepEqual(imagestateGot, imageStateWant) {
+ found = true
+ }
+ }
+
+ if !found {
+ return false
+ }
+ }
+
+ return true
+}
+
+func newDeviceImagedRequest(agents []*Agent) *voltha.DeviceImageRequest {
+ imgReq := &voltha.DeviceImageRequest{
+ Version: version,
+ CommitOnSuccess: true,
+ }
+
+ for _, agent := range agents {
+ if agent != nil {
+ imgReq.DeviceId = append(imgReq.DeviceId, &common.ID{
+ Id: agent.deviceID,
+ })
+ }
+ }
+
+ return imgReq
+}
+
+func newDeviceImageDownloadRequest(agents []*Agent) *voltha.DeviceImageDownloadRequest {
+ imgDownReq := &voltha.DeviceImageDownloadRequest{
+ Image: &voltha.Image{
+ Version: version,
+ Url: url,
+ Vendor: vendor,
+ },
+ ActivateOnSuccess: true,
+ CommitOnSuccess: true,
+ }
+
+ for _, agent := range agents {
+ if agent != nil {
+ imgDownReq.DeviceId = append(imgDownReq.DeviceId, &common.ID{
+ Id: agent.deviceID,
+ })
+ }
+ }
+
+ return imgDownReq
+}
+
+func newImageResponse(agents []*Agent,
+ downloadState voltha.ImageState_ImageDownloadState,
+ imageSate voltha.ImageState_ImageActivationState,
+ reason voltha.ImageState_ImageFailureReason) *voltha.DeviceImageResponse {
+ response := &voltha.DeviceImageResponse{}
+
+ for _, agent := range agents {
+ response.DeviceImageStates = append(response.DeviceImageStates, &voltha.DeviceImageState{
+ DeviceId: agent.deviceID,
+ ImageState: &voltha.ImageState{
+ Version: version,
+ DownloadState: downloadState,
+ Reason: reason,
+ ImageState: imageSate,
+ },
+ })
+ }
+
+ return response
+}
+
+func newImageDownloadAdapterResponse(t *testing.T,
+ deviceID string,
+ downloadState voltha.ImageState_ImageDownloadState,
+ imageSate voltha.ImageState_ImageActivationState,
+ reason voltha.ImageState_ImageFailureReason) *any.Any {
+ reply, err := ptypes.MarshalAny(&voltha.DeviceImageResponse{
+ DeviceImageStates: []*voltha.DeviceImageState{{
+ DeviceId: deviceID,
+ ImageState: &voltha.ImageState{
+ Version: version,
+ DownloadState: downloadState,
+ Reason: reason,
+ ImageState: imageSate,
+ },
+ }},
+ })
+ assert.Nil(t, err)
+ return reply
+}
+
+func newImageStatusAdapterResponse(t *testing.T,
+ agents []*Agent,
+ downloadState voltha.ImageState_ImageDownloadState,
+ imageSate voltha.ImageState_ImageActivationState,
+ reason voltha.ImageState_ImageFailureReason) *any.Any {
+ imgResponse := &voltha.DeviceImageResponse{}
+ for _, agent := range agents {
+ imgResponse.DeviceImageStates = append(imgResponse.DeviceImageStates, &voltha.DeviceImageState{
+ DeviceId: agent.deviceID,
+ ImageState: &voltha.ImageState{
+ Version: version,
+ DownloadState: downloadState,
+ Reason: reason,
+ ImageState: imageSate,
+ },
+ })
+ }
+
+ reply, err := ptypes.MarshalAny(imgResponse)
+ assert.Nil(t, err)
+ return reply
+}
+
+func newOnuImagesResponse(t *testing.T) *any.Any {
+ onuImages := &voltha.OnuImages{
+ Items: []*voltha.OnuImage{{
+ Version: version,
+ IsCommited: true,
+ IsActive: true,
+ IsValid: true,
+ }},
+ }
+
+ reply, err := ptypes.MarshalAny(onuImages)
+ assert.Nil(t, err)
+ return reply
+}
diff --git a/rw_core/core/device/mock_kafka.go b/rw_core/core/device/mock_kafka.go
new file mode 100644
index 0000000..9a08e70
--- /dev/null
+++ b/rw_core/core/device/mock_kafka.go
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: github.com/opencord/voltha-lib-go/v4/pkg/kafka (interfaces: InterContainerProxy)
+
+// Package device is a generated GoMock package.
+package device
+
+import (
+ context "context"
+ reflect "reflect"
+
+ gomock "github.com/golang/mock/gomock"
+ any "github.com/golang/protobuf/ptypes/any"
+ kafka "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
+)
+
+// MockInterContainerProxy is a mock of InterContainerProxy interface.
+type MockInterContainerProxy struct {
+ ctrl *gomock.Controller
+ recorder *MockInterContainerProxyMockRecorder
+}
+
+// MockInterContainerProxyMockRecorder is the mock recorder for MockInterContainerProxy.
+type MockInterContainerProxyMockRecorder struct {
+ mock *MockInterContainerProxy
+}
+
+// NewMockInterContainerProxy creates a new mock instance.
+func NewMockInterContainerProxy(ctrl *gomock.Controller) *MockInterContainerProxy {
+ mock := &MockInterContainerProxy{ctrl: ctrl}
+ mock.recorder = &MockInterContainerProxyMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockInterContainerProxy) EXPECT() *MockInterContainerProxyMockRecorder {
+ return m.recorder
+}
+
+// DeleteTopic mocks base method.
+func (m *MockInterContainerProxy) DeleteTopic(arg0 context.Context, arg1 kafka.Topic) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "DeleteTopic", arg0, arg1)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// DeleteTopic indicates an expected call of DeleteTopic.
+func (mr *MockInterContainerProxyMockRecorder) DeleteTopic(arg0, arg1 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTopic", reflect.TypeOf((*MockInterContainerProxy)(nil).DeleteTopic), arg0, arg1)
+}
+
+// EnableLivenessChannel mocks base method.
+func (m *MockInterContainerProxy) EnableLivenessChannel(arg0 context.Context, arg1 bool) chan bool {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "EnableLivenessChannel", arg0, arg1)
+ ret0, _ := ret[0].(chan bool)
+ return ret0
+}
+
+// EnableLivenessChannel indicates an expected call of EnableLivenessChannel.
+func (mr *MockInterContainerProxyMockRecorder) EnableLivenessChannel(arg0, arg1 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnableLivenessChannel", reflect.TypeOf((*MockInterContainerProxy)(nil).EnableLivenessChannel), arg0, arg1)
+}
+
+// GetDefaultTopic mocks base method.
+func (m *MockInterContainerProxy) GetDefaultTopic() *kafka.Topic {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "GetDefaultTopic")
+ ret0, _ := ret[0].(*kafka.Topic)
+ return ret0
+}
+
+// GetDefaultTopic indicates an expected call of GetDefaultTopic.
+func (mr *MockInterContainerProxyMockRecorder) GetDefaultTopic() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDefaultTopic", reflect.TypeOf((*MockInterContainerProxy)(nil).GetDefaultTopic))
+}
+
+// InvokeAsyncRPC mocks base method.
+func (m *MockInterContainerProxy) InvokeAsyncRPC(arg0 context.Context, arg1 string, arg2, arg3 *kafka.Topic, arg4 bool, arg5 string, arg6 ...*kafka.KVArg) chan *kafka.RpcResponse {
+ m.ctrl.T.Helper()
+ varargs := []interface{}{arg0, arg1, arg2, arg3, arg4, arg5}
+ for _, a := range arg6 {
+ varargs = append(varargs, a)
+ }
+ ret := m.ctrl.Call(m, "InvokeAsyncRPC", varargs...)
+ ret0, _ := ret[0].(chan *kafka.RpcResponse)
+ return ret0
+}
+
+// InvokeAsyncRPC indicates an expected call of InvokeAsyncRPC.
+func (mr *MockInterContainerProxyMockRecorder) InvokeAsyncRPC(arg0, arg1, arg2, arg3, arg4, arg5 interface{}, arg6 ...interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ varargs := append([]interface{}{arg0, arg1, arg2, arg3, arg4, arg5}, arg6...)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsyncRPC", reflect.TypeOf((*MockInterContainerProxy)(nil).InvokeAsyncRPC), varargs...)
+}
+
+// InvokeRPC mocks base method.
+func (m *MockInterContainerProxy) InvokeRPC(arg0 context.Context, arg1 string, arg2, arg3 *kafka.Topic, arg4 bool, arg5 string, arg6 ...*kafka.KVArg) (bool, *any.Any) {
+ m.ctrl.T.Helper()
+ varargs := []interface{}{arg0, arg1, arg2, arg3, arg4, arg5}
+ for _, a := range arg6 {
+ varargs = append(varargs, a)
+ }
+ ret := m.ctrl.Call(m, "InvokeRPC", varargs...)
+ ret0, _ := ret[0].(bool)
+ ret1, _ := ret[1].(*any.Any)
+ return ret0, ret1
+}
+
+// InvokeRPC indicates an expected call of InvokeRPC.
+func (mr *MockInterContainerProxyMockRecorder) InvokeRPC(arg0, arg1, arg2, arg3, arg4, arg5 interface{}, arg6 ...interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ varargs := append([]interface{}{arg0, arg1, arg2, arg3, arg4, arg5}, arg6...)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeRPC", reflect.TypeOf((*MockInterContainerProxy)(nil).InvokeRPC), varargs...)
+}
+
+// SendLiveness mocks base method.
+func (m *MockInterContainerProxy) SendLiveness(arg0 context.Context) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "SendLiveness", arg0)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// SendLiveness indicates an expected call of SendLiveness.
+func (mr *MockInterContainerProxyMockRecorder) SendLiveness(arg0 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendLiveness", reflect.TypeOf((*MockInterContainerProxy)(nil).SendLiveness), arg0)
+}
+
+// Start mocks base method.
+func (m *MockInterContainerProxy) Start(arg0 context.Context) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Start", arg0)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// Start indicates an expected call of Start.
+func (mr *MockInterContainerProxyMockRecorder) Start(arg0 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockInterContainerProxy)(nil).Start), arg0)
+}
+
+// Stop mocks base method.
+func (m *MockInterContainerProxy) Stop(arg0 context.Context) {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "Stop", arg0)
+}
+
+// Stop indicates an expected call of Stop.
+func (mr *MockInterContainerProxyMockRecorder) Stop(arg0 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockInterContainerProxy)(nil).Stop), arg0)
+}
+
+// SubscribeWithDefaultRequestHandler mocks base method.
+func (m *MockInterContainerProxy) SubscribeWithDefaultRequestHandler(arg0 context.Context, arg1 kafka.Topic, arg2 int64) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "SubscribeWithDefaultRequestHandler", arg0, arg1, arg2)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// SubscribeWithDefaultRequestHandler indicates an expected call of SubscribeWithDefaultRequestHandler.
+func (mr *MockInterContainerProxyMockRecorder) SubscribeWithDefaultRequestHandler(arg0, arg1, arg2 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeWithDefaultRequestHandler", reflect.TypeOf((*MockInterContainerProxy)(nil).SubscribeWithDefaultRequestHandler), arg0, arg1, arg2)
+}
+
+// SubscribeWithRequestHandlerInterface mocks base method.
+func (m *MockInterContainerProxy) SubscribeWithRequestHandlerInterface(arg0 context.Context, arg1 kafka.Topic, arg2 interface{}) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "SubscribeWithRequestHandlerInterface", arg0, arg1, arg2)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// SubscribeWithRequestHandlerInterface indicates an expected call of SubscribeWithRequestHandlerInterface.
+func (mr *MockInterContainerProxyMockRecorder) SubscribeWithRequestHandlerInterface(arg0, arg1, arg2 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeWithRequestHandlerInterface", reflect.TypeOf((*MockInterContainerProxy)(nil).SubscribeWithRequestHandlerInterface), arg0, arg1, arg2)
+}
+
+// UnSubscribeFromRequestHandler mocks base method.
+func (m *MockInterContainerProxy) UnSubscribeFromRequestHandler(arg0 context.Context, arg1 kafka.Topic) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "UnSubscribeFromRequestHandler", arg0, arg1)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// UnSubscribeFromRequestHandler indicates an expected call of UnSubscribeFromRequestHandler.
+func (mr *MockInterContainerProxyMockRecorder) UnSubscribeFromRequestHandler(arg0, arg1 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnSubscribeFromRequestHandler", reflect.TypeOf((*MockInterContainerProxy)(nil).UnSubscribeFromRequestHandler), arg0, arg1)
+}
diff --git a/rw_core/core/device/remote/adapter_proxy.go b/rw_core/core/device/remote/adapter_proxy.go
index aba551e..1a187f8 100755
--- a/rw_core/core/device/remote/adapter_proxy.go
+++ b/rw_core/core/device/remote/adapter_proxy.go
@@ -19,6 +19,8 @@
import (
"context"
+ "github.com/opencord/voltha-protos/v4/go/common"
+
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/extension"
@@ -515,3 +517,88 @@
replyToTopic := ap.getCoreTopic()
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, request.TargetId, args...)
}
+
+// DownloadImageToOnuDevice invokes download image rpc
+func (ap *AdapterProxy) DownloadImageToOnuDevice(ctx context.Context, device *voltha.Device, downloadRequest *voltha.DeviceImageDownloadRequest) (chan *kafka.RpcResponse, error) {
+ logger.Debugw(ctx, "download-image-to-device", log.Fields{"device-id": device.Id, "image": downloadRequest.Image.Name})
+ rpc := "Download_onu_image"
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
+ args := []*kafka.KVArg{
+ {Key: "deviceImageDownloadReq", Value: downloadRequest},
+ }
+ replyToTopic := ap.getCoreTopic()
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
+}
+
+func (ap *AdapterProxy) GetOnuImageStatus(ctx context.Context, device *voltha.Device, request *voltha.DeviceImageRequest) (chan *kafka.RpcResponse, error) {
+ logger.Debugw(ctx, "get-image-status", log.Fields{"device-id": device.Id})
+ rpc := "Get_onu_image_status"
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
+ args := []*kafka.KVArg{
+ {Key: "deviceImageReq", Value: request},
+ }
+ replyToTopic := ap.getCoreTopic()
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
+}
+
+func (ap *AdapterProxy) ActivateOnuImage(ctx context.Context, device *voltha.Device, request *voltha.DeviceImageRequest) (chan *kafka.RpcResponse, error) {
+ logger.Debugw(ctx, "activate-onu-image", log.Fields{"device-id": device.Id})
+ rpc := "Activate_onu_image"
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
+ args := []*kafka.KVArg{
+ {Key: "deviceImageReq", Value: request},
+ }
+ replyToTopic := ap.getCoreTopic()
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
+}
+
+func (ap *AdapterProxy) AbortImageUpgrade(ctx context.Context, device *voltha.Device, request *voltha.DeviceImageRequest) (chan *kafka.RpcResponse, error) {
+ logger.Debugw(ctx, "abort-image-upgrade", log.Fields{"device-id": device.Id})
+ rpc := "Abort_onu_image_upgrade"
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
+ args := []*kafka.KVArg{
+ {Key: "deviceImageReq", Value: request},
+ }
+ replyToTopic := ap.getCoreTopic()
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
+}
+
+func (ap *AdapterProxy) CommitImage(ctx context.Context, device *voltha.Device, request *voltha.DeviceImageRequest) (chan *kafka.RpcResponse, error) {
+ logger.Debugw(ctx, "commit-image", log.Fields{"device-id": device.Id})
+ rpc := "Commit_onu_image"
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
+ args := []*kafka.KVArg{
+ {Key: "deviceImageReq", Value: request},
+ }
+ replyToTopic := ap.getCoreTopic()
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
+}
+
+func (ap *AdapterProxy) GetOnuImages(ctx context.Context, device *voltha.Device, id *common.ID) (chan *kafka.RpcResponse, error) {
+ logger.Debugw(ctx, "get-onu-images", log.Fields{"device-id": device.Id})
+ rpc := "Get_onu_images"
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
+ if err != nil {
+ return nil, err
+ }
+ args := []*kafka.KVArg{
+ {Key: "deviceId", Value: id},
+ }
+ replyToTopic := ap.getCoreTopic()
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
+}