[VOL-4022] RW-Core Changes For ONU SW Upgrade
New Download/Activate/Retrieve APIs

Change-Id: I2d8a0ec7d8967fd76a261a108f743e75f84c98e9
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index 9816927..3781540 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -18,6 +18,8 @@
 
 import (
 	"context"
+
+	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-protos/v4/go/common"
 
 	"github.com/gogo/protobuf/proto"
@@ -387,3 +389,185 @@
 	}
 
 }
+
+func (agent *Agent) downloadImageToDevice(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	logger.Debugw(ctx, "download-image-to-device", log.Fields{"device-id": agent.deviceID})
+	if agent.device.Root {
+		agent.requestQueue.RequestComplete()
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, is an OLT. Image update "+
+			"not supported by VOLTHA. Use Device Manager or other means", agent.deviceID)
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	// Send the request to the adapter
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
+	ch, err := agent.adapterProxy.DownloadImageToOnuDevice(subCtx, cloned, request)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+}
+
+func (agent *Agent) getImageStatus(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	// Send the request to the adapter
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	logger.Debugw(ctx, "get-image-status", log.Fields{"device-id": agent.deviceID})
+
+	ch, err := agent.adapterProxy.GetOnuImageStatus(subCtx, cloned, request)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	return agent.getDeviceImageResponseFromAdapter(subCtx, ch)
+}
+
+func (agent *Agent) activateImageOnDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	// Send the request to the adapter
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	logger.Debugw(ctx, "activate-image-on-device", log.Fields{"device-id": agent.deviceID})
+
+	ch, err := agent.adapterProxy.ActivateOnuImage(subCtx, cloned, request)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	return agent.getDeviceImageResponseFromAdapter(subCtx, ch)
+}
+
+func (agent *Agent) abortImageUpgradeToDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	// Send the request to the adapter
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	logger.Debugw(ctx, "abort-image-on-device", log.Fields{"device-id": agent.deviceID})
+
+	ch, err := agent.adapterProxy.AbortImageUpgrade(subCtx, cloned, request)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+}
+
+func (agent *Agent) commitImage(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	// Send the request to the adapter
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	logger.Debugw(ctx, "commit-image-on-device", log.Fields{"device-id": agent.deviceID})
+
+	ch, err := agent.adapterProxy.CommitImage(subCtx, cloned, request)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+}
+
+func (agent *Agent) getOnuImages(ctx context.Context, id *common.ID) (*voltha.OnuImages, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	logger.Debugw(ctx, "get-onu-images", log.Fields{"device-id": agent.deviceID})
+
+	// Send the request to the adapter
+	ch, err := agent.adapterProxy.GetOnuImages(subCtx, cloned, id)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	//wait for adapter response
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+		} else if rpcResponse.Err != nil {
+			// return error
+			return nil, status.Errorf(codes.Internal, "%s", rpcResponse.Err.Error())
+		} else {
+			resp := &voltha.OnuImages{}
+			if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
+				return nil, status.Errorf(codes.Internal, "%s", err.Error())
+			}
+
+			return resp, nil
+		}
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	}
+}
+
+func (agent *Agent) getDeviceImageResponseFromAdapter(ctx context.Context, ch chan *kafka.RpcResponse) (*voltha.DeviceImageResponse, error) {
+	//wait for adapter response
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+		} else if rpcResponse.Err != nil {
+			// return error
+			return nil, status.Errorf(codes.Internal, "%s", rpcResponse.Err.Error())
+		} else {
+			resp := &voltha.DeviceImageResponse{}
+			if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
+				return nil, status.Errorf(codes.Internal, "%s", err.Error())
+			}
+
+			if len(resp.DeviceImageStates) == 0 || resp.DeviceImageStates[0] == nil {
+				return nil, status.Errorf(codes.Internal, "invalid response from adapter")
+			}
+
+			return resp, nil
+		}
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	}
+}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 9d9f933..43bd2be 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -1643,3 +1643,321 @@
 	}
 	return agent.getTransientState(), nil
 }
+
+func (dMgr *Manager) DownloadImageToDevice(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) {
+	if err := dMgr.validateImageDownloadRequest(request); err != nil {
+		return nil, err
+	}
+
+	ctx = utils.WithRPCMetadataContext(ctx, "DownloadImageToDevice")
+	respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
+
+	downloadReq := &voltha.DeviceImageDownloadRequest{
+		Image:             request.Image,
+		ActivateOnSuccess: request.ActivateOnSuccess,
+		CommitOnSuccess:   request.CommitOnSuccess,
+	}
+
+	for index, deviceID := range request.DeviceId {
+		//slice-out only single deviceID from the request
+		downloadReq.DeviceId = request.DeviceId[index : index+1]
+
+		go func(deviceID string, req *voltha.DeviceImageDownloadRequest, ch chan []*voltha.DeviceImageState) {
+			agent := dMgr.getDeviceAgent(ctx, deviceID)
+			if agent == nil {
+				logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
+				ch <- nil
+				return
+			}
+
+			resp, err := agent.downloadImageToDevice(ctx, req)
+			if err != nil {
+				logger.Errorw(ctx, "download-image-to-device-failed", log.Fields{"device-id": deviceID, "error": err})
+				ch <- nil
+				return
+			}
+
+			err = dMgr.validateDeviceImageResponse(resp)
+			if err != nil {
+				logger.Errorw(ctx, "download-image-to-device-failed", log.Fields{"device-id": deviceID, "error": err})
+				ch <- nil
+				return
+			}
+			ch <- resp.GetDeviceImageStates()
+		}(deviceID.GetId(), downloadReq, respCh)
+
+	}
+
+	return dMgr.waitForAllResponses(ctx, "download-image-to-device", respCh, len(request.GetDeviceId()))
+}
+
+func (dMgr *Manager) GetImageStatus(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+	if err := dMgr.validateImageRequest(request); err != nil {
+		return nil, err
+	}
+
+	ctx = utils.WithRPCMetadataContext(ctx, "GetImageStatus")
+
+	imageStatusReq := &voltha.DeviceImageRequest{
+		Version:         request.Version,
+		CommitOnSuccess: request.CommitOnSuccess,
+	}
+
+	respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
+	for index, deviceID := range request.DeviceId {
+		//slice-out only single deviceID from the request
+		imageStatusReq.DeviceId = request.DeviceId[index : index+1]
+
+		go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
+			agent := dMgr.getDeviceAgent(ctx, deviceID)
+			if agent == nil {
+				logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
+				ch <- nil
+				return
+			}
+
+			resp, err := agent.getImageStatus(ctx, req)
+			if err != nil {
+				logger.Errorw(ctx, "get-image-status-failed", log.Fields{"device-id": deviceID, "error": err})
+				ch <- nil
+				return
+			}
+
+			err = dMgr.validateDeviceImageResponse(resp)
+			if err != nil {
+				logger.Errorw(ctx, "get-image-status-failed", log.Fields{"device-id": deviceID, "error": err})
+				ch <- nil
+				return
+			}
+			ch <- resp.GetDeviceImageStates()
+		}(deviceID.GetId(), imageStatusReq, respCh)
+
+	}
+
+	return dMgr.waitForAllResponses(ctx, "get-image-status", respCh, len(request.GetDeviceId()))
+}
+
+func (dMgr *Manager) AbortImageUpgradeToDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+	if err := dMgr.validateImageRequest(request); err != nil {
+		return nil, err
+	}
+
+	ctx = utils.WithRPCMetadataContext(ctx, "AbortImageUpgradeToDevice")
+	respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
+
+	abortImageReq := &voltha.DeviceImageRequest{
+		Version:         request.Version,
+		CommitOnSuccess: request.CommitOnSuccess,
+	}
+
+	for index, deviceID := range request.DeviceId {
+		//slice-out only single deviceID from the request
+		abortImageReq.DeviceId = request.DeviceId[index : index+1]
+
+		go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
+			agent := dMgr.getDeviceAgent(ctx, deviceID)
+			if agent == nil {
+				logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
+				ch <- nil
+				return
+			}
+
+			resp, err := agent.abortImageUpgradeToDevice(ctx, req)
+			if err != nil {
+				logger.Errorw(ctx, "abort-image-upgrade-to-device-failed", log.Fields{"device-id": deviceID, "error": err})
+				ch <- nil
+				return
+			}
+
+			err = dMgr.validateDeviceImageResponse(resp)
+			if err != nil {
+				logger.Errorw(ctx, "abort-image-upgrade-to-device-failed", log.Fields{"device-id": deviceID, "error": err})
+				ch <- nil
+				return
+			}
+			ch <- resp.GetDeviceImageStates()
+		}(deviceID.GetId(), abortImageReq, respCh)
+
+	}
+
+	return dMgr.waitForAllResponses(ctx, "abort-image-upgrade-to-device", respCh, len(request.GetDeviceId()))
+}
+
+func (dMgr *Manager) GetOnuImages(ctx context.Context, id *common.ID) (*voltha.OnuImages, error) {
+	if id == nil || id.Id == "" {
+		return nil, status.Errorf(codes.InvalidArgument, "empty device id")
+	}
+
+	ctx = utils.WithRPCMetadataContext(ctx, "GetOnuImages")
+	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
+	agent := dMgr.getDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
+	}
+
+	resp, err := agent.getOnuImages(ctx, id)
+	if err != nil {
+		return nil, err
+	}
+
+	logger.Debugw(ctx, "get-onu-images-result", log.Fields{"onu-image": resp.Items})
+
+	return resp, nil
+}
+
+func (dMgr *Manager) ActivateImage(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+	if err := dMgr.validateImageRequest(request); err != nil {
+		return nil, err
+	}
+
+	ctx = utils.WithRPCMetadataContext(ctx, "ActivateImage")
+	respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
+
+	activateImageReq := &voltha.DeviceImageRequest{
+		Version:         request.Version,
+		CommitOnSuccess: request.CommitOnSuccess,
+	}
+
+	for index, deviceID := range request.DeviceId {
+		//slice-out only single deviceID from the request
+		activateImageReq.DeviceId = request.DeviceId[index : index+1]
+
+		go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
+			agent := dMgr.getDeviceAgent(ctx, deviceID)
+			if agent == nil {
+				logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
+				ch <- nil
+				return
+			}
+
+			resp, err := agent.activateImageOnDevice(ctx, req)
+			if err != nil {
+				logger.Errorw(ctx, "activate-image-failed", log.Fields{"device-id": deviceID, "error": err})
+				ch <- nil
+				return
+			}
+
+			err = dMgr.validateDeviceImageResponse(resp)
+			if err != nil {
+				logger.Errorw(ctx, "activate-image-failed", log.Fields{"device-id": deviceID, "error": err})
+				ch <- nil
+				return
+			}
+
+			ch <- resp.GetDeviceImageStates()
+		}(deviceID.GetId(), activateImageReq, respCh)
+
+	}
+
+	return dMgr.waitForAllResponses(ctx, "activate-image", respCh, len(request.GetDeviceId()))
+}
+
+func (dMgr *Manager) CommitImage(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+	if err := dMgr.validateImageRequest(request); err != nil {
+		return nil, err
+	}
+
+	ctx = utils.WithRPCMetadataContext(ctx, "CommitImage")
+	respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
+
+	commitImageReq := &voltha.DeviceImageRequest{
+		Version:         request.Version,
+		CommitOnSuccess: request.CommitOnSuccess,
+	}
+
+	for index, deviceID := range request.DeviceId {
+		//slice-out only single deviceID from the request
+		commitImageReq.DeviceId = request.DeviceId[index : index+1]
+
+		go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
+			agent := dMgr.getDeviceAgent(ctx, deviceID)
+			if agent == nil {
+				logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
+				ch <- nil
+				return
+			}
+
+			resp, err := agent.commitImage(ctx, req)
+			if err != nil {
+				logger.Errorw(ctx, "commit-image-failed", log.Fields{"device-id": deviceID, "error": err})
+				ch <- nil
+				return
+			}
+
+			err = dMgr.validateDeviceImageResponse(resp)
+			if err != nil {
+				logger.Errorf(ctx, "commit-image-failed", log.Fields{"device-id": deviceID, "error": err})
+				ch <- nil
+				return
+			}
+			ch <- resp.GetDeviceImageStates()
+		}(deviceID.GetId(), commitImageReq, respCh)
+
+	}
+
+	return dMgr.waitForAllResponses(ctx, "commit-image", respCh, len(request.GetDeviceId()))
+}
+
+func (dMgr *Manager) validateImageDownloadRequest(request *voltha.DeviceImageDownloadRequest) error {
+	if request == nil || request.Image == nil || len(request.DeviceId) == 0 {
+		return status.Errorf(codes.InvalidArgument, "invalid argument")
+	}
+
+	for _, deviceID := range request.DeviceId {
+		if deviceID == nil {
+			return status.Errorf(codes.InvalidArgument, "id is nil")
+		}
+	}
+	return nil
+}
+
+func (dMgr *Manager) validateImageRequest(request *voltha.DeviceImageRequest) error {
+	if request == nil || len(request.DeviceId) == 0 || request.DeviceId[0] == nil {
+		return status.Errorf(codes.InvalidArgument, "invalid argument")
+	}
+
+	for _, deviceID := range request.DeviceId {
+		if deviceID == nil {
+			return status.Errorf(codes.InvalidArgument, "id is nil")
+		}
+	}
+
+	return nil
+}
+
+func (dMgr *Manager) validateDeviceImageResponse(response *voltha.DeviceImageResponse) error {
+	if response == nil || len(response.GetDeviceImageStates()) == 0 || response.GetDeviceImageStates()[0] == nil {
+		return status.Errorf(codes.Internal, "invalid-response-from-adapter")
+	}
+
+	return nil
+}
+
+func (dMgr *Manager) waitForAllResponses(ctx context.Context, opName string, respCh chan []*voltha.DeviceImageState, expectedResps int) (*voltha.DeviceImageResponse, error) {
+	response := &voltha.DeviceImageResponse{}
+	respCount := 0
+	for {
+		select {
+		case resp, ok := <-respCh:
+			if !ok {
+				logger.Errorw(ctx, opName+"-failed", log.Fields{"error": "channel-closed"})
+				return response, status.Errorf(codes.Aborted, "channel-closed")
+			}
+
+			if resp != nil {
+				logger.Debugw(ctx, opName+"-result", log.Fields{"image-state": resp[0].GetImageState(), "device-id": resp[0].GetDeviceId()})
+				response.DeviceImageStates = append(response.DeviceImageStates, resp...)
+			}
+
+			respCount++
+
+			//check whether all responses received, if so, sent back the collated response
+			if respCount == expectedResps {
+				return response, nil
+			}
+			continue
+		case <-ctx.Done():
+			return nil, status.Errorf(codes.Aborted, opName+"-failed-%s", ctx.Err())
+		}
+	}
+}
diff --git a/rw_core/core/device/manager_test.go b/rw_core/core/device/manager_test.go
new file mode 100644
index 0000000..40407f9
--- /dev/null
+++ b/rw_core/core/device/manager_test.go
@@ -0,0 +1,886 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+	"context"
+	"reflect"
+	"strconv"
+	"testing"
+
+	"github.com/golang/mock/gomock"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/any"
+	"github.com/opencord/voltha-go/db/model"
+	"github.com/opencord/voltha-go/rw_core/config"
+	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	tst "github.com/opencord/voltha-go/rw_core/test"
+	"github.com/opencord/voltha-lib-go/v4/pkg/db"
+	"github.com/opencord/voltha-lib-go/v4/pkg/events"
+	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
+	"github.com/opencord/voltha-protos/v4/go/common"
+	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/phayes/freeport"
+	"github.com/stretchr/testify/assert"
+)
+
+const (
+	version = "dummy-version"
+	url     = "http://127.0.0.1:2222/dummy-image"
+	vendor  = "dummy"
+
+	numberOfTestDevices = 10
+)
+
+func initialiseTest(ctx context.Context, t *testing.T) (*DATest, *MockInterContainerProxy, []*Agent) {
+	dat := newDATest(ctx)
+
+	controller := gomock.NewController(t)
+	mockICProxy := NewMockInterContainerProxy(controller)
+
+	// Set expectations for the mock
+	mockICProxy.EXPECT().Start(gomock.Any()).AnyTimes().Return(nil)
+	mockICProxy.EXPECT().SubscribeWithDefaultRequestHandler(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
+
+	dat.startCoreWithCustomICProxy(ctx, mockICProxy)
+
+	var agents []*Agent
+	for i := 1; i <= numberOfTestDevices; i++ {
+		if agent := dat.createDeviceAgent(t); agent != nil {
+			agents = append(agents, agent)
+		}
+	}
+
+	assert.Equal(t, len(agents), numberOfTestDevices)
+
+	dat.oltAdapter, dat.onuAdapter = tst.CreateAndregisterAdapters(ctx,
+		t,
+		dat.kClient,
+		dat.coreInstanceID,
+		dat.oltAdapterName,
+		dat.onuAdapterName,
+		dat.adapterMgr)
+
+	return dat, mockICProxy, agents
+}
+
+func (dat *DATest) startCoreWithCustomICProxy(ctx context.Context, kmp kafka.InterContainerProxy) {
+	cfg := config.NewRWCoreFlags()
+	cfg.CoreTopic = "rw_core"
+	cfg.EventTopic = "voltha.events"
+	cfg.DefaultRequestTimeout = dat.defaultTimeout
+	cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
+	grpcPort, err := freeport.GetFreePort()
+	if err != nil {
+		logger.Fatal(ctx, "Cannot get a freeport for grpc")
+	}
+	cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
+	client := tst.SetupKVClient(ctx, cfg, dat.coreInstanceID)
+	backend := &db.Backend{
+		Client:                  client,
+		StoreType:               cfg.KVStoreType,
+		Address:                 cfg.KVStoreAddress,
+		Timeout:                 cfg.KVStoreTimeout,
+		LivenessChannelInterval: cfg.LiveProbeInterval / 2}
+
+	dat.kmp = kmp
+
+	endpointMgr := kafka.NewEndpointManager(backend)
+	proxy := model.NewDBPath(backend)
+	dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
+	eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy, cfg.VolthaStackID)
+	dat.adapterMgr.Start(context.Background())
+	if err = dat.kmp.Start(ctx); err != nil {
+		logger.Fatal(ctx, "Cannot start InterContainerProxy")
+	}
+
+	if err := dat.kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: cfg.CoreTopic}, kafka.OffsetNewest); err != nil {
+		logger.Fatalf(ctx, "Cannot add default request handler: %s", err)
+	}
+
+}
+
+func TestManager_DownloadImageToDevice(t *testing.T) {
+	type args struct {
+		ctx     context.Context
+		request *voltha.DeviceImageDownloadRequest
+	}
+
+	ctx := context.Background()
+	dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+	tests := []struct {
+		name    string
+		args    args
+		want    *voltha.DeviceImageResponse
+		wantErr bool
+	}{
+		{
+			name: "request-for-single-device",
+			args: args{
+				ctx:     ctx,
+				request: newDeviceImageDownloadRequest(agents[:1]),
+			},
+			want:    newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
+			wantErr: false,
+		},
+		{
+			name: "request-for-multiple-devices",
+			args: args{
+				ctx:     ctx,
+				request: newDeviceImageDownloadRequest(agents),
+			},
+			want:    newImageResponse(agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
+			wantErr: false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.name == "request-for-single-device" {
+				chnl := make(chan *kafka.RpcResponse, 10)
+				// Set expectation for the API invocation
+				mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+					"Download_onu_image",
+					gomock.Any(),
+					gomock.Any(),
+					true,
+					gomock.Any(), gomock.Any()).Return(chnl)
+				// Send the expected response to channel from a goroutine
+				go func() {
+					reply := newImageDownloadAdapterResponse(t, agents[0].deviceID, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
+
+					chnl <- &kafka.RpcResponse{
+						MType: kafka.RpcSent,
+						Err:   nil,
+						Reply: reply,
+					}
+
+					chnl <- &kafka.RpcResponse{
+						MType: kafka.RpcReply,
+						Err:   nil,
+						Reply: reply,
+					}
+				}()
+			} else if tt.name == "request-for-multiple-devices" {
+				// Map to store per device kafka response channel
+				kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+				for _, id := range tt.args.request.DeviceId {
+					// Create a kafka response channel per device
+					chnl := make(chan *kafka.RpcResponse)
+
+					// Set expectation for the API invocation
+					mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+						"Download_onu_image",
+						gomock.Any(),
+						gomock.Any(),
+						true,
+						id.Id, gomock.Any()).Return(chnl)
+
+					kafkaRespChans[id.Id] = chnl
+				}
+
+				// Send the expected response to channel from a goroutine
+				go func() {
+					for _, agent := range agents {
+						reply := newImageDownloadAdapterResponse(t, agent.deviceID, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
+
+						kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+							MType: kafka.RpcSent,
+							Err:   nil,
+							Reply: reply,
+						}
+
+						kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+							MType: kafka.RpcReply,
+							Err:   nil,
+							Reply: reply,
+						}
+					}
+				}()
+			}
+
+			got, err := dat.deviceMgr.DownloadImageToDevice(tt.args.ctx, tt.args.request)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("DownloadImageToDevice() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+
+			if !gotAllSuccess(got, tt.want) {
+				t.Errorf("DownloadImageToDevice() got = %v, want = %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestManager_GetImageStatus(t *testing.T) {
+	type args struct {
+		ctx     context.Context
+		request *voltha.DeviceImageRequest
+	}
+
+	ctx := context.Background()
+	dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+	tests := []struct {
+		name    string
+		args    args
+		want    *voltha.DeviceImageResponse
+		wantErr bool
+	}{
+		{
+			name: "request-for-single-device",
+			args: args{
+				ctx:     ctx,
+				request: newDeviceImagedRequest(agents[:1]),
+			},
+			want:    newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
+			wantErr: false,
+		},
+		{
+			name: "request-for-multiple-devices",
+			args: args{
+				ctx:     ctx,
+				request: newDeviceImagedRequest(agents),
+			},
+			want:    newImageResponse(agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR),
+			wantErr: false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.name == "request-for-single-device" {
+				chnl := make(chan *kafka.RpcResponse, 10)
+				// Set expectation for the API invocation
+				mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+					"Get_onu_image_status",
+					gomock.Any(),
+					gomock.Any(),
+					true,
+					gomock.Any(), gomock.Any()).Return(chnl)
+				// Send the expected response to channel from a goroutine
+				go func() {
+					reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
+
+					chnl <- &kafka.RpcResponse{
+						MType: kafka.RpcSent,
+						Err:   nil,
+						Reply: reply,
+					}
+
+					chnl <- &kafka.RpcResponse{
+						MType: kafka.RpcReply,
+						Err:   nil,
+						Reply: reply,
+					}
+				}()
+			} else if tt.name == "request-for-multiple-devices" {
+				// Map to store per device kafka response channel
+				kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+				for _, id := range tt.args.request.DeviceId {
+					// Create a kafka response channel per device
+					chnl := make(chan *kafka.RpcResponse)
+
+					// Set expectation for the API invocation
+					mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+						"Get_onu_image_status",
+						gomock.Any(),
+						gomock.Any(),
+						true,
+						id.Id, gomock.Any()).Return(chnl)
+
+					kafkaRespChans[id.Id] = chnl
+				}
+
+				// Send the expected response to channel from a goroutine
+				go func() {
+					for _, agent := range agents {
+						reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_REQUESTED, voltha.ImageState_IMAGE_DOWNLOADING, voltha.ImageState_NO_ERROR)
+
+						kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+							MType: kafka.RpcSent,
+							Err:   nil,
+							Reply: reply,
+						}
+
+						kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+							MType: kafka.RpcReply,
+							Err:   nil,
+							Reply: reply,
+						}
+					}
+				}()
+			}
+
+			got, err := dat.deviceMgr.GetImageStatus(tt.args.ctx, tt.args.request)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("GetImageStatus() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+
+			if !gotAllSuccess(got, tt.want) {
+				t.Errorf("GetImageStatus() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestManager_AbortImageUpgradeToDevice(t *testing.T) {
+
+	type args struct {
+		ctx     context.Context
+		request *voltha.DeviceImageRequest
+	}
+
+	ctx := context.Background()
+	dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+	tests := []struct {
+		name    string
+		args    args
+		want    *voltha.DeviceImageResponse
+		wantErr bool
+	}{
+		{
+			name: "request-for-single-device",
+			args: args{
+				ctx:     ctx,
+				request: newDeviceImagedRequest(agents[:1]),
+			},
+			want:    newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR),
+			wantErr: false,
+		},
+		{
+			name: "request-for-multiple-devices",
+			args: args{
+				ctx:     ctx,
+				request: newDeviceImagedRequest(agents[:1]),
+			},
+			want:    newImageResponse(agents, voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR),
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.name == "request-for-single-device" {
+				chnl := make(chan *kafka.RpcResponse, 10)
+				// Set expectation for the API invocation
+				mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+					"Abort_onu_image_upgrade",
+					gomock.Any(),
+					gomock.Any(),
+					true,
+					gomock.Any(), gomock.Any()).Return(chnl)
+				// Send the expected response to channel from a goroutine
+				go func() {
+					reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR)
+
+					chnl <- &kafka.RpcResponse{
+						MType: kafka.RpcSent,
+						Err:   nil,
+						Reply: reply,
+					}
+
+					chnl <- &kafka.RpcResponse{
+						MType: kafka.RpcReply,
+						Err:   nil,
+						Reply: reply,
+					}
+				}()
+			} else if tt.name == "request-for-multiple-devices" {
+				// Map to store per device kafka response channel
+				kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+				for _, id := range tt.args.request.DeviceId {
+					// Create a kafka response channel per device
+					chnl := make(chan *kafka.RpcResponse)
+
+					// Set expectation for the API invocation
+					mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+						"Abort_onu_image_upgrade",
+						gomock.Any(),
+						gomock.Any(),
+						true,
+						id.Id, gomock.Any()).Return(chnl)
+
+					kafkaRespChans[id.Id] = chnl
+				}
+
+				// Send the expected response to channel from a goroutine
+				go func() {
+					for _, agent := range agents {
+						reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_CANCELLED, voltha.ImageState_IMAGE_ACTIVATION_ABORTED, voltha.ImageState_NO_ERROR)
+
+						kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+							MType: kafka.RpcSent,
+							Err:   nil,
+							Reply: reply,
+						}
+
+						kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+							MType: kafka.RpcReply,
+							Err:   nil,
+							Reply: reply,
+						}
+					}
+				}()
+			}
+			got, err := dat.deviceMgr.AbortImageUpgradeToDevice(tt.args.ctx, tt.args.request)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("AbortImageUpgradeToDevice() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+
+			if !gotAllSuccess(got, tt.want) {
+				t.Errorf("AbortImageUpgradeToDevice() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestManager_ActivateImage(t *testing.T) {
+	type args struct {
+		ctx     context.Context
+		request *voltha.DeviceImageRequest
+	}
+
+	ctx := context.Background()
+	dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+	tests := []struct {
+		name    string
+		args    args
+		want    *voltha.DeviceImageResponse
+		wantErr bool
+	}{
+		{
+			name: "request-for-single-device",
+			args: args{
+				ctx:     ctx,
+				request: newDeviceImagedRequest(agents[:1]),
+			},
+			want:    newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR),
+			wantErr: false,
+		},
+		{
+			name: "request-for-multiple-devices",
+			args: args{
+				ctx:     ctx,
+				request: newDeviceImagedRequest(agents),
+			},
+			want:    newImageResponse(agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR),
+			wantErr: false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.name == "request-for-single-device" {
+				chnl := make(chan *kafka.RpcResponse, 10)
+				// Set expectation for the API invocation
+				mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+					"Activate_onu_image",
+					gomock.Any(),
+					gomock.Any(),
+					true,
+					gomock.Any(), gomock.Any()).Return(chnl)
+				// Send the expected response to channel from a goroutine
+				go func() {
+					reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR)
+
+					chnl <- &kafka.RpcResponse{
+						MType: kafka.RpcSent,
+						Err:   nil,
+						Reply: reply,
+					}
+
+					chnl <- &kafka.RpcResponse{
+						MType: kafka.RpcReply,
+						Err:   nil,
+						Reply: reply,
+					}
+				}()
+			} else if tt.name == "request-for-multiple-devices" {
+				// Map to store per device kafka response channel
+				kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+				for _, id := range tt.args.request.DeviceId {
+					// Create a kafka response channel per device
+					chnl := make(chan *kafka.RpcResponse)
+
+					// Set expectation for the API invocation
+					mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+						"Activate_onu_image",
+						gomock.Any(),
+						gomock.Any(),
+						true,
+						id.Id, gomock.Any()).Return(chnl)
+
+					kafkaRespChans[id.Id] = chnl
+				}
+
+				// Send the expected response to channel from a goroutine
+				go func() {
+					for _, agent := range agents {
+						reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_ACTIVATING, voltha.ImageState_NO_ERROR)
+
+						kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+							MType: kafka.RpcSent,
+							Err:   nil,
+							Reply: reply,
+						}
+
+						kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+							MType: kafka.RpcReply,
+							Err:   nil,
+							Reply: reply,
+						}
+					}
+				}()
+			}
+			got, err := dat.deviceMgr.ActivateImage(tt.args.ctx, tt.args.request)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("ActivateImage() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !gotAllSuccess(got, tt.want) {
+				t.Errorf("ActivateImage() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestManager_CommitImage(t *testing.T) {
+	type args struct {
+		ctx     context.Context
+		request *voltha.DeviceImageRequest
+	}
+
+	ctx := context.Background()
+	dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+	tests := []struct {
+		name    string
+		args    args
+		want    *voltha.DeviceImageResponse
+		wantErr bool
+	}{
+		{
+			name: "request-for-single-device",
+			args: args{
+				ctx:     ctx,
+				request: newDeviceImagedRequest(agents[:1]),
+			},
+			want:    newImageResponse(agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR),
+			wantErr: false,
+		},
+		{
+			name: "request-for-multiple-devices",
+			args: args{
+				ctx:     ctx,
+				request: newDeviceImagedRequest(agents),
+			},
+			want:    newImageResponse(agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR),
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.name == "request-for-single-device" {
+				chnl := make(chan *kafka.RpcResponse, 10)
+				// Set expectation for the API invocation
+				mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+					"Commit_onu_image",
+					gomock.Any(),
+					gomock.Any(),
+					true,
+					gomock.Any(), gomock.Any()).Return(chnl)
+				// Send the expected response to channel from a goroutine
+				go func() {
+					reply := newImageStatusAdapterResponse(t, agents[:1], voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR)
+
+					chnl <- &kafka.RpcResponse{
+						MType: kafka.RpcSent,
+						Err:   nil,
+						Reply: reply,
+					}
+
+					chnl <- &kafka.RpcResponse{
+						MType: kafka.RpcReply,
+						Err:   nil,
+						Reply: reply,
+					}
+				}()
+			} else if tt.name == "request-for-multiple-devices" {
+				// Map to store per device kafka response channel
+				kafkaRespChans := make(map[string]chan *kafka.RpcResponse)
+				for _, id := range tt.args.request.DeviceId {
+					// Create a kafka response channel per device
+					chnl := make(chan *kafka.RpcResponse)
+
+					// Set expectation for the API invocation
+					mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+						"Commit_onu_image",
+						gomock.Any(),
+						gomock.Any(),
+						true,
+						id.Id, gomock.Any()).Return(chnl)
+
+					kafkaRespChans[id.Id] = chnl
+				}
+
+				// Send the expected response to channel from a goroutine
+				go func() {
+					for _, agent := range agents {
+						reply := newImageStatusAdapterResponse(t, agents, voltha.ImageState_DOWNLOAD_SUCCEEDED, voltha.ImageState_IMAGE_COMMITTING, voltha.ImageState_NO_ERROR)
+
+						kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+							MType: kafka.RpcSent,
+							Err:   nil,
+							Reply: reply,
+						}
+
+						kafkaRespChans[agent.deviceID] <- &kafka.RpcResponse{
+							MType: kafka.RpcReply,
+							Err:   nil,
+							Reply: reply,
+						}
+					}
+				}()
+			}
+			got, err := dat.deviceMgr.CommitImage(tt.args.ctx, tt.args.request)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("CommitImage() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !gotAllSuccess(got, tt.want) {
+				t.Errorf("CommitImage() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestManager_GetOnuImages(t *testing.T) {
+	type args struct {
+		ctx context.Context
+		id  *common.ID
+	}
+
+	ctx := context.Background()
+	dat, mockICProxy, agents := initialiseTest(ctx, t)
+
+	tests := []struct {
+		name    string
+		args    args
+		want    *voltha.OnuImages
+		wantErr bool
+	}{
+		{
+			name: "request-for-single-device",
+			args: args{
+				ctx: ctx,
+				id: &common.ID{
+					Id: agents[0].deviceID,
+				},
+			},
+			want: &voltha.OnuImages{
+				Items: []*voltha.OnuImage{{
+					Version:    version,
+					IsCommited: true,
+					IsActive:   true,
+					IsValid:    true,
+				}},
+			},
+			wantErr: false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.name == "request-for-single-device" {
+				chnl := make(chan *kafka.RpcResponse, 10)
+				// Set expectation for the API invocation
+				mockICProxy.EXPECT().InvokeAsyncRPC(gomock.Any(),
+					"Get_onu_images",
+					gomock.Any(),
+					gomock.Any(),
+					true,
+					gomock.Any(), gomock.Any()).Return(chnl)
+				// Send the expected response to channel from a goroutine
+				go func() {
+					reply := newOnuImagesResponse(t)
+					chnl <- &kafka.RpcResponse{
+						MType: kafka.RpcSent,
+						Err:   nil,
+						Reply: reply,
+					}
+
+					chnl <- &kafka.RpcResponse{
+						MType: kafka.RpcReply,
+						Err:   nil,
+						Reply: reply,
+					}
+				}()
+			}
+
+			got, err := dat.deviceMgr.GetOnuImages(tt.args.ctx, tt.args.id)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("GetOnuImages() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("GetOnuImages() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+// verify that we got all the wanted response (order not important)
+func gotAllSuccess(got, want *voltha.DeviceImageResponse) bool {
+	for _, imagestateGot := range got.DeviceImageStates {
+		found := false
+		for _, imageStateWant := range want.DeviceImageStates {
+			if reflect.DeepEqual(imagestateGot, imageStateWant) {
+				found = true
+			}
+		}
+
+		if !found {
+			return false
+		}
+	}
+
+	return true
+}
+
+func newDeviceImagedRequest(agents []*Agent) *voltha.DeviceImageRequest {
+	imgReq := &voltha.DeviceImageRequest{
+		Version:         version,
+		CommitOnSuccess: true,
+	}
+
+	for _, agent := range agents {
+		if agent != nil {
+			imgReq.DeviceId = append(imgReq.DeviceId, &common.ID{
+				Id: agent.deviceID,
+			})
+		}
+	}
+
+	return imgReq
+}
+
+func newDeviceImageDownloadRequest(agents []*Agent) *voltha.DeviceImageDownloadRequest {
+	imgDownReq := &voltha.DeviceImageDownloadRequest{
+		Image: &voltha.Image{
+			Version: version,
+			Url:     url,
+			Vendor:  vendor,
+		},
+		ActivateOnSuccess: true,
+		CommitOnSuccess:   true,
+	}
+
+	for _, agent := range agents {
+		if agent != nil {
+			imgDownReq.DeviceId = append(imgDownReq.DeviceId, &common.ID{
+				Id: agent.deviceID,
+			})
+		}
+	}
+
+	return imgDownReq
+}
+
+func newImageResponse(agents []*Agent,
+	downloadState voltha.ImageState_ImageDownloadState,
+	imageSate voltha.ImageState_ImageActivationState,
+	reason voltha.ImageState_ImageFailureReason) *voltha.DeviceImageResponse {
+	response := &voltha.DeviceImageResponse{}
+
+	for _, agent := range agents {
+		response.DeviceImageStates = append(response.DeviceImageStates, &voltha.DeviceImageState{
+			DeviceId: agent.deviceID,
+			ImageState: &voltha.ImageState{
+				Version:       version,
+				DownloadState: downloadState,
+				Reason:        reason,
+				ImageState:    imageSate,
+			},
+		})
+	}
+
+	return response
+}
+
+func newImageDownloadAdapterResponse(t *testing.T,
+	deviceID string,
+	downloadState voltha.ImageState_ImageDownloadState,
+	imageSate voltha.ImageState_ImageActivationState,
+	reason voltha.ImageState_ImageFailureReason) *any.Any {
+	reply, err := ptypes.MarshalAny(&voltha.DeviceImageResponse{
+		DeviceImageStates: []*voltha.DeviceImageState{{
+			DeviceId: deviceID,
+			ImageState: &voltha.ImageState{
+				Version:       version,
+				DownloadState: downloadState,
+				Reason:        reason,
+				ImageState:    imageSate,
+			},
+		}},
+	})
+	assert.Nil(t, err)
+	return reply
+}
+
+func newImageStatusAdapterResponse(t *testing.T,
+	agents []*Agent,
+	downloadState voltha.ImageState_ImageDownloadState,
+	imageSate voltha.ImageState_ImageActivationState,
+	reason voltha.ImageState_ImageFailureReason) *any.Any {
+	imgResponse := &voltha.DeviceImageResponse{}
+	for _, agent := range agents {
+		imgResponse.DeviceImageStates = append(imgResponse.DeviceImageStates, &voltha.DeviceImageState{
+			DeviceId: agent.deviceID,
+			ImageState: &voltha.ImageState{
+				Version:       version,
+				DownloadState: downloadState,
+				Reason:        reason,
+				ImageState:    imageSate,
+			},
+		})
+	}
+
+	reply, err := ptypes.MarshalAny(imgResponse)
+	assert.Nil(t, err)
+	return reply
+}
+
+func newOnuImagesResponse(t *testing.T) *any.Any {
+	onuImages := &voltha.OnuImages{
+		Items: []*voltha.OnuImage{{
+			Version:    version,
+			IsCommited: true,
+			IsActive:   true,
+			IsValid:    true,
+		}},
+	}
+
+	reply, err := ptypes.MarshalAny(onuImages)
+	assert.Nil(t, err)
+	return reply
+}
diff --git a/rw_core/core/device/mock_kafka.go b/rw_core/core/device/mock_kafka.go
new file mode 100644
index 0000000..9a08e70
--- /dev/null
+++ b/rw_core/core/device/mock_kafka.go
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: github.com/opencord/voltha-lib-go/v4/pkg/kafka (interfaces: InterContainerProxy)
+
+// Package device is a generated GoMock package.
+package device
+
+import (
+	context "context"
+	reflect "reflect"
+
+	gomock "github.com/golang/mock/gomock"
+	any "github.com/golang/protobuf/ptypes/any"
+	kafka "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
+)
+
+// MockInterContainerProxy is a mock of InterContainerProxy interface.
+type MockInterContainerProxy struct {
+	ctrl     *gomock.Controller
+	recorder *MockInterContainerProxyMockRecorder
+}
+
+// MockInterContainerProxyMockRecorder is the mock recorder for MockInterContainerProxy.
+type MockInterContainerProxyMockRecorder struct {
+	mock *MockInterContainerProxy
+}
+
+// NewMockInterContainerProxy creates a new mock instance.
+func NewMockInterContainerProxy(ctrl *gomock.Controller) *MockInterContainerProxy {
+	mock := &MockInterContainerProxy{ctrl: ctrl}
+	mock.recorder = &MockInterContainerProxyMockRecorder{mock}
+	return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockInterContainerProxy) EXPECT() *MockInterContainerProxyMockRecorder {
+	return m.recorder
+}
+
+// DeleteTopic mocks base method.
+func (m *MockInterContainerProxy) DeleteTopic(arg0 context.Context, arg1 kafka.Topic) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "DeleteTopic", arg0, arg1)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// DeleteTopic indicates an expected call of DeleteTopic.
+func (mr *MockInterContainerProxyMockRecorder) DeleteTopic(arg0, arg1 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTopic", reflect.TypeOf((*MockInterContainerProxy)(nil).DeleteTopic), arg0, arg1)
+}
+
+// EnableLivenessChannel mocks base method.
+func (m *MockInterContainerProxy) EnableLivenessChannel(arg0 context.Context, arg1 bool) chan bool {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "EnableLivenessChannel", arg0, arg1)
+	ret0, _ := ret[0].(chan bool)
+	return ret0
+}
+
+// EnableLivenessChannel indicates an expected call of EnableLivenessChannel.
+func (mr *MockInterContainerProxyMockRecorder) EnableLivenessChannel(arg0, arg1 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnableLivenessChannel", reflect.TypeOf((*MockInterContainerProxy)(nil).EnableLivenessChannel), arg0, arg1)
+}
+
+// GetDefaultTopic mocks base method.
+func (m *MockInterContainerProxy) GetDefaultTopic() *kafka.Topic {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "GetDefaultTopic")
+	ret0, _ := ret[0].(*kafka.Topic)
+	return ret0
+}
+
+// GetDefaultTopic indicates an expected call of GetDefaultTopic.
+func (mr *MockInterContainerProxyMockRecorder) GetDefaultTopic() *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDefaultTopic", reflect.TypeOf((*MockInterContainerProxy)(nil).GetDefaultTopic))
+}
+
+// InvokeAsyncRPC mocks base method.
+func (m *MockInterContainerProxy) InvokeAsyncRPC(arg0 context.Context, arg1 string, arg2, arg3 *kafka.Topic, arg4 bool, arg5 string, arg6 ...*kafka.KVArg) chan *kafka.RpcResponse {
+	m.ctrl.T.Helper()
+	varargs := []interface{}{arg0, arg1, arg2, arg3, arg4, arg5}
+	for _, a := range arg6 {
+		varargs = append(varargs, a)
+	}
+	ret := m.ctrl.Call(m, "InvokeAsyncRPC", varargs...)
+	ret0, _ := ret[0].(chan *kafka.RpcResponse)
+	return ret0
+}
+
+// InvokeAsyncRPC indicates an expected call of InvokeAsyncRPC.
+func (mr *MockInterContainerProxyMockRecorder) InvokeAsyncRPC(arg0, arg1, arg2, arg3, arg4, arg5 interface{}, arg6 ...interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	varargs := append([]interface{}{arg0, arg1, arg2, arg3, arg4, arg5}, arg6...)
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsyncRPC", reflect.TypeOf((*MockInterContainerProxy)(nil).InvokeAsyncRPC), varargs...)
+}
+
+// InvokeRPC mocks base method.
+func (m *MockInterContainerProxy) InvokeRPC(arg0 context.Context, arg1 string, arg2, arg3 *kafka.Topic, arg4 bool, arg5 string, arg6 ...*kafka.KVArg) (bool, *any.Any) {
+	m.ctrl.T.Helper()
+	varargs := []interface{}{arg0, arg1, arg2, arg3, arg4, arg5}
+	for _, a := range arg6 {
+		varargs = append(varargs, a)
+	}
+	ret := m.ctrl.Call(m, "InvokeRPC", varargs...)
+	ret0, _ := ret[0].(bool)
+	ret1, _ := ret[1].(*any.Any)
+	return ret0, ret1
+}
+
+// InvokeRPC indicates an expected call of InvokeRPC.
+func (mr *MockInterContainerProxyMockRecorder) InvokeRPC(arg0, arg1, arg2, arg3, arg4, arg5 interface{}, arg6 ...interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	varargs := append([]interface{}{arg0, arg1, arg2, arg3, arg4, arg5}, arg6...)
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeRPC", reflect.TypeOf((*MockInterContainerProxy)(nil).InvokeRPC), varargs...)
+}
+
+// SendLiveness mocks base method.
+func (m *MockInterContainerProxy) SendLiveness(arg0 context.Context) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "SendLiveness", arg0)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// SendLiveness indicates an expected call of SendLiveness.
+func (mr *MockInterContainerProxyMockRecorder) SendLiveness(arg0 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendLiveness", reflect.TypeOf((*MockInterContainerProxy)(nil).SendLiveness), arg0)
+}
+
+// Start mocks base method.
+func (m *MockInterContainerProxy) Start(arg0 context.Context) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "Start", arg0)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// Start indicates an expected call of Start.
+func (mr *MockInterContainerProxyMockRecorder) Start(arg0 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockInterContainerProxy)(nil).Start), arg0)
+}
+
+// Stop mocks base method.
+func (m *MockInterContainerProxy) Stop(arg0 context.Context) {
+	m.ctrl.T.Helper()
+	m.ctrl.Call(m, "Stop", arg0)
+}
+
+// Stop indicates an expected call of Stop.
+func (mr *MockInterContainerProxyMockRecorder) Stop(arg0 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockInterContainerProxy)(nil).Stop), arg0)
+}
+
+// SubscribeWithDefaultRequestHandler mocks base method.
+func (m *MockInterContainerProxy) SubscribeWithDefaultRequestHandler(arg0 context.Context, arg1 kafka.Topic, arg2 int64) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "SubscribeWithDefaultRequestHandler", arg0, arg1, arg2)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// SubscribeWithDefaultRequestHandler indicates an expected call of SubscribeWithDefaultRequestHandler.
+func (mr *MockInterContainerProxyMockRecorder) SubscribeWithDefaultRequestHandler(arg0, arg1, arg2 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeWithDefaultRequestHandler", reflect.TypeOf((*MockInterContainerProxy)(nil).SubscribeWithDefaultRequestHandler), arg0, arg1, arg2)
+}
+
+// SubscribeWithRequestHandlerInterface mocks base method.
+func (m *MockInterContainerProxy) SubscribeWithRequestHandlerInterface(arg0 context.Context, arg1 kafka.Topic, arg2 interface{}) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "SubscribeWithRequestHandlerInterface", arg0, arg1, arg2)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// SubscribeWithRequestHandlerInterface indicates an expected call of SubscribeWithRequestHandlerInterface.
+func (mr *MockInterContainerProxyMockRecorder) SubscribeWithRequestHandlerInterface(arg0, arg1, arg2 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeWithRequestHandlerInterface", reflect.TypeOf((*MockInterContainerProxy)(nil).SubscribeWithRequestHandlerInterface), arg0, arg1, arg2)
+}
+
+// UnSubscribeFromRequestHandler mocks base method.
+func (m *MockInterContainerProxy) UnSubscribeFromRequestHandler(arg0 context.Context, arg1 kafka.Topic) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "UnSubscribeFromRequestHandler", arg0, arg1)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// UnSubscribeFromRequestHandler indicates an expected call of UnSubscribeFromRequestHandler.
+func (mr *MockInterContainerProxyMockRecorder) UnSubscribeFromRequestHandler(arg0, arg1 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnSubscribeFromRequestHandler", reflect.TypeOf((*MockInterContainerProxy)(nil).UnSubscribeFromRequestHandler), arg0, arg1)
+}
diff --git a/rw_core/core/device/remote/adapter_proxy.go b/rw_core/core/device/remote/adapter_proxy.go
index aba551e..1a187f8 100755
--- a/rw_core/core/device/remote/adapter_proxy.go
+++ b/rw_core/core/device/remote/adapter_proxy.go
@@ -19,6 +19,8 @@
 import (
 	"context"
 
+	"github.com/opencord/voltha-protos/v4/go/common"
+
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
 	"github.com/opencord/voltha-protos/v4/go/extension"
@@ -515,3 +517,88 @@
 	replyToTopic := ap.getCoreTopic()
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, request.TargetId, args...)
 }
+
+// DownloadImageToOnuDevice invokes download image rpc
+func (ap *AdapterProxy) DownloadImageToOnuDevice(ctx context.Context, device *voltha.Device, downloadRequest *voltha.DeviceImageDownloadRequest) (chan *kafka.RpcResponse, error) {
+	logger.Debugw(ctx, "download-image-to-device", log.Fields{"device-id": device.Id, "image": downloadRequest.Image.Name})
+	rpc := "Download_onu_image"
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
+	args := []*kafka.KVArg{
+		{Key: "deviceImageDownloadReq", Value: downloadRequest},
+	}
+	replyToTopic := ap.getCoreTopic()
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
+}
+
+func (ap *AdapterProxy) GetOnuImageStatus(ctx context.Context, device *voltha.Device, request *voltha.DeviceImageRequest) (chan *kafka.RpcResponse, error) {
+	logger.Debugw(ctx, "get-image-status", log.Fields{"device-id": device.Id})
+	rpc := "Get_onu_image_status"
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
+	args := []*kafka.KVArg{
+		{Key: "deviceImageReq", Value: request},
+	}
+	replyToTopic := ap.getCoreTopic()
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
+}
+
+func (ap *AdapterProxy) ActivateOnuImage(ctx context.Context, device *voltha.Device, request *voltha.DeviceImageRequest) (chan *kafka.RpcResponse, error) {
+	logger.Debugw(ctx, "activate-onu-image", log.Fields{"device-id": device.Id})
+	rpc := "Activate_onu_image"
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
+	args := []*kafka.KVArg{
+		{Key: "deviceImageReq", Value: request},
+	}
+	replyToTopic := ap.getCoreTopic()
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
+}
+
+func (ap *AdapterProxy) AbortImageUpgrade(ctx context.Context, device *voltha.Device, request *voltha.DeviceImageRequest) (chan *kafka.RpcResponse, error) {
+	logger.Debugw(ctx, "abort-image-upgrade", log.Fields{"device-id": device.Id})
+	rpc := "Abort_onu_image_upgrade"
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
+	args := []*kafka.KVArg{
+		{Key: "deviceImageReq", Value: request},
+	}
+	replyToTopic := ap.getCoreTopic()
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
+}
+
+func (ap *AdapterProxy) CommitImage(ctx context.Context, device *voltha.Device, request *voltha.DeviceImageRequest) (chan *kafka.RpcResponse, error) {
+	logger.Debugw(ctx, "commit-image", log.Fields{"device-id": device.Id})
+	rpc := "Commit_onu_image"
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
+	args := []*kafka.KVArg{
+		{Key: "deviceImageReq", Value: request},
+	}
+	replyToTopic := ap.getCoreTopic()
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
+}
+
+func (ap *AdapterProxy) GetOnuImages(ctx context.Context, device *voltha.Device, id *common.ID) (chan *kafka.RpcResponse, error) {
+	logger.Debugw(ctx, "get-onu-images", log.Fields{"device-id": device.Id})
+	rpc := "Get_onu_images"
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
+	args := []*kafka.KVArg{
+		{Key: "deviceId", Value: id},
+	}
+	replyToTopic := ap.getCoreTopic()
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
+}
diff --git a/rw_core/mocks/adapter.go b/rw_core/mocks/adapter.go
index 79107c1..a8117fe 100644
--- a/rw_core/mocks/adapter.go
+++ b/rw_core/mocks/adapter.go
@@ -300,3 +300,33 @@
 func (ta *Adapter) SetDeleteAction(failDeleteDevice bool) {
 	ta.failDeleteDevice = failDeleteDevice
 }
+
+// Download_onu_image -
+func (ta *Adapter) Download_onu_image(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	return nil, nil
+}
+
+// Get_onu_image_status -
+func (ta *Adapter) Get_onu_image_status(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	return nil, nil
+}
+
+// Abort_onu_image_upgrade -
+func (ta *Adapter) Abort_onu_image_upgrade(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	return nil, nil
+}
+
+// Get_onu_images -
+func (ta *Adapter) Get_onu_images(ctx context.Context, deviceID string) (*voltha.OnuImages, error) { //nolint
+	return nil, nil
+}
+
+// Activate_onu_image -
+func (ta *Adapter) Activate_onu_image(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	return nil, nil
+}
+
+// Commit_onu_image -
+func (ta *Adapter) Commit_onu_image(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	return nil, nil
+}
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index e13e774..a07d073 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -295,3 +295,33 @@
 	_ = valueflag
 	return nil, errors.New("get-ext-value-not-implemented")
 }
+
+func (oltA *OLTAdapter) Download_onu_image(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	_ = request
+	return nil, errors.New("download-onu-image-not-implemented")
+}
+
+func (oltA *OLTAdapter) Get_onu_image_status(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	_ = in
+	return nil, errors.New("get-onu-image-not-implemented")
+}
+
+func (oltA *OLTAdapter) Abort_onu_image_upgrade(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	_ = in
+	return nil, errors.New("abort-onu-image-upgrade-not-implemented")
+}
+
+func (oltA *OLTAdapter) Get_onu_images(ctx context.Context, deviceID string) (*voltha.OnuImages, error) { //nolint
+	_ = deviceID
+	return nil, errors.New("get-onu-images-not-implemented")
+}
+
+func (oltA *OLTAdapter) Activate_onu_image(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	_ = in
+	return nil, errors.New("activate-onu-image-not-implemented")
+}
+
+func (oltA *OLTAdapter) Commit_onu_image(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	_ = in
+	return nil, errors.New("commit-onu-image-not-implemented")
+}
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index fc24a3b..b1b004b 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -199,3 +199,34 @@
 	_ = valueflag
 	return nil, errors.New("get-ext-value-not-implemented")
 }
+
+func (onuA *ONUAdapter) Download_onu_image(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	logger.Infof(ctx, "Download_onu_image")
+	_ = request
+	return nil, errors.New("download-onu-image-not-implemented")
+}
+
+func (onuA *ONUAdapter) Get_onu_image_status(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	_ = in
+	return nil, errors.New("get-onu-image-not-implemented")
+}
+
+func (onuA *ONUAdapter) Abort_onu_image_upgrade(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	_ = in
+	return nil, errors.New("abort-onu-image-upgrade-not-implemented")
+}
+
+func (onuA *ONUAdapter) Get_onu_images(ctx context.Context, deviceID string) (*voltha.OnuImages, error) { //nolint
+	_ = deviceID
+	return nil, errors.New("get-onu-images-not-implemented")
+}
+
+func (onuA *ONUAdapter) Activate_onu_image(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	_ = in
+	return nil, errors.New("activate-onu-image-not-implemented")
+}
+
+func (onuA *ONUAdapter) Commit_onu_image(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
+	_ = in
+	return nil, errors.New("commit-onu-image-not-implemented")
+}