[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index e71af8a..0fda20e 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -18,20 +18,28 @@
 
 import (
 	"context"
+	"errors"
+	"time"
 
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	"github.com/opencord/voltha-protos/v4/go/common"
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+
+	"github.com/opencord/voltha-protos/v5/go/common"
 
 	"github.com/gogo/protobuf/proto"
-	"github.com/golang/protobuf/ptypes"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
 
 func (agent *Agent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+	var err error
+	var desc string
+	var prevAdminState, currAdminState common.AdminState_Types
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, operStatus, err, desc) }()
+
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
@@ -39,23 +47,26 @@
 
 	if agent.device.Root {
 		agent.requestQueue.RequestComplete()
-		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, is an OLT. Image update "+
+		err = status.Errorf(codes.FailedPrecondition, "device-id:%s, is an OLT. Image update "+
 			"not supported by VOLTHA. Use Device Manager or other means", agent.deviceID)
+		return nil, err
 	}
 
 	device := agent.cloneDeviceWithoutLock()
 
 	if !agent.proceedWithRequest(device) {
 		agent.requestQueue.RequestComplete()
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.",
-			agent.deviceID)
+		err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+		return nil, err
 	}
 
 	if device.ImageDownloads != nil {
 		for _, image := range device.ImageDownloads {
 			if image.DownloadState == voltha.ImageDownload_DOWNLOAD_REQUESTED {
-				return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, already downloading image:%s",
+				err = status.Errorf(codes.FailedPrecondition, "device-id:%s, already downloading image:%s",
 					agent.deviceID, image.Name)
+				agent.requestQueue.RequestComplete()
+				return nil, err
 			}
 		}
 	}
@@ -71,22 +82,39 @@
 		cloned.ImageDownloads[index] = clonedImg
 	}
 
-	cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
-	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
-		return nil, err
-	}
-
 	// Send the request to the adapter
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
-	ch, err := agent.adapterProxy.DownloadImage(subCtx, cloned, clonedImg)
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
-		cancel()
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": device.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
 		return nil, err
 	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "downloadImage", ch, agent.onImageSuccess, agent.onImageFailure)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	go func() {
+		defer cancel()
+		response, err := client.DownloadImage(subCtx, &ic.ImageDownloadMessage{
+			Device: cloned,
+			Image:  clonedImg,
+		})
+		if err == nil {
+			agent.onImageSuccess(subCtx, response)
+		} else {
+			agent.onImageFailure(subCtx, err)
+		}
+	}()
 
+	cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
+	if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+		return nil, err
+	}
 	return &common.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
 }
 
@@ -102,7 +130,14 @@
 }
 
 func (agent *Agent) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+
+	var err error
+	var desc string
+	var prevAdminState, currAdminState common.AdminState_Types
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, operStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
 	logger.Debugw(ctx, "cancel-image-download", log.Fields{"device-id": agent.deviceID})
@@ -111,8 +146,8 @@
 
 	if !agent.proceedWithRequest(cloned) {
 		agent.requestQueue.RequestComplete()
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.",
-			agent.deviceID)
+		err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+		return nil, err
 	}
 
 	// Update image download state
@@ -121,33 +156,59 @@
 		agent.requestQueue.RequestComplete()
 		return nil, err
 	}
-
+	prevAdminState = cloned.AdminState
 	cloned.ImageDownloads[index].DownloadState = voltha.ImageDownload_DOWNLOAD_CANCELLED
 
 	if cloned.AdminState != voltha.AdminState_DOWNLOADING_IMAGE {
+		err = status.Errorf(codes.Aborted, "device not in image download state %s", cloned.Id)
 		agent.requestQueue.RequestComplete()
-	} else {
-		// Set the device to Enabled
-		cloned.AdminState = voltha.AdminState_ENABLED
-		if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
-			return nil, err
-		}
-		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-		subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
-		ch, err := agent.adapterProxy.CancelImageDownload(subCtx, cloned, img)
-		if err != nil {
-			cancel()
-			return nil, err
-		}
-		go agent.waitForAdapterResponse(subCtx, cancel, "cancelImageDownload", ch, agent.onImageSuccess,
-			agent.onImageFailure)
+		return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}, err
 	}
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	go func() {
+		defer cancel()
+		response, err := client.CancelImageDownload(subCtx, &ic.ImageDownloadMessage{
+			Device: cloned,
+			Image:  img,
+		})
+		if err == nil {
+			agent.onImageSuccess(subCtx, response)
+		} else {
+			agent.onImageFailure(subCtx, err)
+		}
+	}()
+
+	// Set the device to Enabled
+	cloned.AdminState = voltha.AdminState_ENABLED
+	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+		return nil, err
+	}
+	currAdminState = cloned.AdminState
+
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
 }
 
 func (agent *Agent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	var err error
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
 	logger.Debugw(ctx, "activate-image", log.Fields{"device-id": agent.deviceID})
@@ -155,9 +216,9 @@
 	cloned := agent.cloneDeviceWithoutLock()
 
 	if !agent.proceedWithRequest(cloned) {
+		err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
 		agent.requestQueue.RequestComplete()
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconcile is in progress/failed.",
-			agent.deviceID)
+		return nil, err
 	}
 
 	// Update image download state
@@ -169,72 +230,125 @@
 
 	if image.DownloadState != voltha.ImageDownload_DOWNLOAD_SUCCEEDED {
 		agent.requestQueue.RequestComplete()
-		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, device-has-not-downloaded-image:%s", agent.deviceID, img.Name)
+		err = status.Errorf(codes.FailedPrecondition, "device-id:%s, device-has-not-downloaded-image:%s", agent.deviceID, img.Name)
+		return nil, err
 	}
 
 	//TODO does this need to be removed ?
 	if cloned.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
 		agent.requestQueue.RequestComplete()
-		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
+		err = status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
+		return nil, err
 	}
 
-	// Save the image
+	// Update the image
 	cloned.ImageDownloads[index].ImageState = voltha.ImageDownload_IMAGE_ACTIVATING
-
 	cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
+
+	// Send the request to the adapter
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	go func() {
+		defer cancel()
+		response, err := client.ActivateImageUpdate(subCtx, &ic.ImageDownloadMessage{
+			Device: cloned,
+			Image:  img,
+		})
+		if err == nil {
+			agent.onImageSuccess(subCtx, response)
+		} else {
+			agent.onImageFailure(subCtx, err)
+		}
+	}()
+
+	// Save the image
 	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
 		return nil, err
 	}
 
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
-	ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, cloned, img)
-	if err != nil {
-		cancel()
-		return nil, err
-	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "activateImageUpdate", ch, agent.onImageSuccess, agent.onFailure)
-
 	// The status of the AdminState will be changed following the update_download_status response from the adapter
 	// The image name will also be removed from the device list
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
 }
 
 func (agent *Agent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	var err error
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
 	logger.Debugw(ctx, "revert-image", log.Fields{"device-id": agent.deviceID})
 
-	// Update image download state
 	cloned := agent.cloneDeviceWithoutLock()
+	if !agent.proceedWithRequest(cloned) {
+		agent.requestQueue.RequestComplete()
+		err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+		return nil, err
+	}
+
+	// Update image download state
 	_, index, err := getImage(img, cloned)
 	if err != nil {
 		agent.requestQueue.RequestComplete()
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
+		err = status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
+		return nil, err
 	}
 	if cloned.AdminState != voltha.AdminState_ENABLED {
 		agent.requestQueue.RequestComplete()
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-not-enabled-state:%s", agent.deviceID, img.Name)
+		err = status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-not-enabled-state:%s", agent.deviceID, img.Name)
+		return nil, err
 	}
 
 	cloned.ImageDownloads[index].ImageState = voltha.ImageDownload_IMAGE_REVERTING
+	// Send the request to the adapter
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	go func() {
+		defer cancel()
+		_, err := client.RevertImageUpdate(subCtx, &ic.ImageDownloadMessage{
+			Device: cloned,
+			Image:  img,
+		})
+		if err == nil {
+			agent.onSuccess(subCtx, nil, nil, true)
+		} else {
+			agent.onFailure(subCtx, err, nil, nil, true)
+		}
+	}()
 
+	// Save data
 	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
 		return nil, err
 	}
 
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
-	ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, cloned, img)
-	if err != nil {
-		cancel()
-		return nil, err
-	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "revertImageUpdate", ch, agent.onSuccess, agent.onFailure)
-
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
 }
 
@@ -244,40 +358,49 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
+	defer agent.requestQueue.RequestComplete()
+
 	device := agent.getDeviceReadOnlyWithoutLock()
-	ch, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img)
-	agent.requestQueue.RequestComplete()
+	if !agent.proceedWithRequest(device) {
+		return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+	}
+
+	// Send the request to the adapter
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": device.AdapterEndpoint,
+			})
 		return nil, err
 	}
-	// Wait for the adapter response
-	rpcResponse, ok := <-ch
-	if !ok {
-		return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
-	}
-	if rpcResponse.Err != nil {
-		return nil, rpcResponse.Err
-	}
-	// Successful response
-	imgDownload := &voltha.ImageDownload{}
-	if err := ptypes.UnmarshalAny(rpcResponse.Reply, imgDownload); err != nil {
-		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
-	}
-	return imgDownload, nil
+	return client.GetImageDownloadStatus(ctx, &ic.ImageDownloadMessage{
+		Device: device,
+		Image:  img,
+	})
 }
 
 func (agent *Agent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	var err error
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
+
 	logger.Debugw(ctx, "updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
 
 	cloned := agent.cloneDeviceWithoutLock()
 
 	if !agent.proceedWithRequest(cloned) {
 		agent.requestQueue.RequestComplete()
-		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.",
-			agent.deviceID)
+		err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+		return err
 	}
 
 	// Update the image as well as remove it if the download was cancelled
@@ -326,14 +449,25 @@
 }
 
 // onImageFailure brings back the device to Enabled state and sets the image to image download_failed.
-func (agent *Agent) onImageFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
+func (agent *Agent) onImageFailure(ctx context.Context, imgErr error) {
 	// original context has failed due to timeout , let's open a new one
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.internalTimeout)
 	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
 	defer cancel()
+	rpc := coreutils.GetRPCMetadataFromContext(subCtx)
+
+	defer func() {
+		eCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+		rpce := agent.deviceMgr.NewRPCEvent(eCtx, agent.deviceID, imgErr.Error(), nil)
+		go agent.deviceMgr.SendRPCEvent(eCtx, "RPC_ERROR_RAISE_EVENT", rpce,
+			voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
+		operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+		desc := "adapter-response"
+		agent.logDeviceUpdate(ctx, nil, nil, operStatus, imgErr, desc)
+	}()
 
 	if err := agent.requestQueue.WaitForGreenLight(subCtx); err != nil {
-		logger.Errorw(subCtx, "can't obtain lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
+		logger.Errorw(subCtx, "can't obtain lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err})
 		return
 	}
 
@@ -345,8 +479,8 @@
 			log.Fields{"rpc": rpc, "device-id": agent.deviceID})
 		return
 	}
-	if res, ok := response.(error); ok {
-		logger.Errorw(subCtx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
+	if imgErr != nil {
+		logger.Errorw(subCtx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": imgErr})
 		cloned := agent.cloneDeviceWithoutLock()
 		//TODO base this on IMAGE ID when created
 		var imageFailed *voltha.ImageDownload
@@ -362,7 +496,7 @@
 		}
 
 		if imageFailed == nil {
-			logger.Errorw(subCtx, "can't find image", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
+			logger.Errorw(subCtx, "can't find image", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
 			return
 		}
 
@@ -376,19 +510,24 @@
 		cloned.AdminState = voltha.AdminState_ENABLED
 		if err := agent.updateDeviceAndReleaseLock(subCtx, cloned); err != nil {
 			logger.Errorw(subCtx, "failed-enable-device-after-image-failure",
-				log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
+				log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err})
 		}
 	} else {
-		logger.Errorw(subCtx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
+		logger.Errorw(subCtx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
 		return
 	}
-	// TODO: Post failure message onto kafka
 }
 
 // onImageSuccess brings back the device to Enabled state and sets the image to image download_failed.
-func (agent *Agent) onImageSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		logger.Errorw(ctx, "cannot-obtain-lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
+func (agent *Agent) onImageSuccess(ctx context.Context, response interface{}) {
+	rpc := coreutils.GetRPCMetadataFromContext(ctx)
+
+	var err error
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return
 	}
 
@@ -396,12 +535,10 @@
 
 	if !agent.proceedWithRequest(cloned) {
 		agent.requestQueue.RequestComplete()
-		logger.Errorw(ctx, "Cannot complete operation as Device deletion is in progress or reconciling is in progress/failed.",
-			log.Fields{"rpc": rpc, "device-id": agent.deviceID})
+		err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
 		return
 	}
-	logger.Infow(ctx, "rpc-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response, "args": reqArgs})
-
+	logger.Infow(ctx, "rpc-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response})
 	//TODO base this on IMAGE ID when created
 	var imageSucceeded *voltha.ImageDownload
 	var index int
@@ -416,7 +553,7 @@
 	}
 
 	if imageSucceeded == nil {
-		logger.Errorw(ctx, "can't find image", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
+		err = errors.New("can't find image")
 		return
 	}
 	//update image state on success
@@ -427,11 +564,14 @@
 	}
 	//Enabled is the only state we can go back to.
 	cloned.AdminState = voltha.AdminState_ENABLED
-	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+	if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
 		logger.Errorw(ctx, "failed-enable-device-after-image-download-success",
-			log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response, "args": reqArgs})
+			log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response})
 	}
-
+	// Update operation status
+	if err == nil {
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+	}
 }
 
 func (agent *Agent) downloadImageToDevice(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) {
@@ -447,19 +587,30 @@
 	}
 
 	cloned := agent.cloneDeviceWithoutLock()
+	if !agent.proceedWithRequest(cloned) {
+		agent.requestQueue.RequestComplete()
+		return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+	}
 
 	// Send the request to the adapter
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
 	defer cancel()
 	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
 
-	ch, err := agent.adapterProxy.DownloadImageToOnuDevice(subCtx, cloned, request)
 	agent.requestQueue.RequestComplete()
-	if err != nil {
-		return nil, err
-	}
-
-	return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+	return client.DownloadOnuImage(subCtx, request)
 }
 
 func (agent *Agent) getImageStatus(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
@@ -467,21 +618,33 @@
 		return nil, err
 	}
 
-	cloned := agent.cloneDeviceWithoutLock()
-
-	// Send the request to the adapter
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	defer cancel()
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
 	logger.Debugw(ctx, "get-image-status", log.Fields{"device-id": agent.deviceID})
 
-	ch, err := agent.adapterProxy.GetOnuImageStatus(subCtx, cloned, request)
-	agent.requestQueue.RequestComplete()
+	cloned := agent.cloneDeviceWithoutLock()
+	if !agent.proceedWithRequest(cloned) {
+		agent.requestQueue.RequestComplete()
+		return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+	}
+
+	// Send the request to the adapter
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
 		return nil, err
 	}
 
-	return agent.getDeviceImageResponseFromAdapter(subCtx, ch)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
+	return client.GetOnuImageStatus(subCtx, request)
 }
 
 func (agent *Agent) activateImageOnDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
@@ -489,21 +652,35 @@
 		return nil, err
 	}
 
-	cloned := agent.cloneDeviceWithoutLock()
-
-	// Send the request to the adapter
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	defer cancel()
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
 	logger.Debugw(ctx, "activate-image-on-device", log.Fields{"device-id": agent.deviceID})
 
-	ch, err := agent.adapterProxy.ActivateOnuImage(subCtx, cloned, request)
-	agent.requestQueue.RequestComplete()
+	cloned := agent.cloneDeviceWithoutLock()
+
+	if !agent.proceedWithRequest(cloned) {
+		agent.requestQueue.RequestComplete()
+		return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+	}
+
+	// Send the request to the adapter
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
 		return nil, err
 	}
 
-	return agent.getDeviceImageResponseFromAdapter(subCtx, ch)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
+	agent.requestQueue.RequestComplete()
+	return client.ActivateOnuImage(subCtx, request)
 }
 
 func (agent *Agent) abortImageUpgradeToDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
@@ -511,21 +688,35 @@
 		return nil, err
 	}
 
-	cloned := agent.cloneDeviceWithoutLock()
-
-	// Send the request to the adapter
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	defer cancel()
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
 	logger.Debugw(ctx, "abort-image-on-device", log.Fields{"device-id": agent.deviceID})
 
-	ch, err := agent.adapterProxy.AbortImageUpgrade(subCtx, cloned, request)
-	agent.requestQueue.RequestComplete()
-	if err != nil {
-		return nil, err
+	cloned := agent.cloneDeviceWithoutLock()
+
+	if !agent.proceedWithRequest(cloned) {
+		agent.requestQueue.RequestComplete()
+		return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
 	}
 
-	return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+	// Send the request to the adapter
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
+	agent.requestQueue.RequestComplete()
+
+	return client.AbortOnuImageUpgrade(subCtx, request)
 }
 
 func (agent *Agent) commitImage(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
@@ -533,21 +724,33 @@
 		return nil, err
 	}
 
-	cloned := agent.cloneDeviceWithoutLock()
-
-	// Send the request to the adapter
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	defer cancel()
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
 	logger.Debugw(ctx, "commit-image-on-device", log.Fields{"device-id": agent.deviceID})
 
-	ch, err := agent.adapterProxy.CommitImage(subCtx, cloned, request)
-	agent.requestQueue.RequestComplete()
-	if err != nil {
-		return nil, err
+	cloned := agent.cloneDeviceWithoutLock()
+
+	if !agent.proceedWithRequest(cloned) {
+		agent.requestQueue.RequestComplete()
+		return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
 	}
 
-	return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+	// Send the request to the adapter
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
+	return client.CommitOnuImage(subCtx, request)
 }
 
 func (agent *Agent) getOnuImages(ctx context.Context, id *common.ID) (*voltha.OnuImages, error) {
@@ -555,63 +758,29 @@
 		return nil, err
 	}
 
-	cloned := agent.cloneDeviceWithoutLock()
-
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	defer cancel()
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
 	logger.Debug(ctx, "get-onu-images")
 
+	cloned := agent.cloneDeviceWithoutLock()
+
+	if !agent.proceedWithRequest(cloned) {
+		agent.requestQueue.RequestComplete()
+		return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+	}
+
 	// Send the request to the adapter
-	ch, err := agent.adapterProxy.GetOnuImages(subCtx, cloned, id)
-	agent.requestQueue.RequestComplete()
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
 		return nil, err
 	}
 
-	//wait for adapter response
-	select {
-	case rpcResponse, ok := <-ch:
-		if !ok {
-			return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
-		} else if rpcResponse.Err != nil {
-			// return error
-			return nil, status.Errorf(codes.Internal, "%s", rpcResponse.Err.Error())
-		} else {
-			resp := &voltha.OnuImages{}
-			if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
-				return nil, status.Errorf(codes.Internal, "%s", err.Error())
-			}
-
-			return resp, nil
-		}
-	case <-ctx.Done():
-		return nil, ctx.Err()
-	}
-}
-
-func (agent *Agent) getDeviceImageResponseFromAdapter(ctx context.Context, ch chan *kafka.RpcResponse) (*voltha.DeviceImageResponse, error) {
-	//wait for adapter response
-	select {
-	case rpcResponse, ok := <-ch:
-		if !ok {
-			return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
-		} else if rpcResponse.Err != nil {
-			// return error
-			return nil, status.Errorf(codes.Internal, "%s", rpcResponse.Err.Error())
-		} else {
-			resp := &voltha.DeviceImageResponse{}
-			if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
-				return nil, status.Errorf(codes.Internal, "%s", err.Error())
-			}
-
-			if len(resp.DeviceImageStates) == 0 || resp.DeviceImageStates[0] == nil {
-				return nil, status.Errorf(codes.Internal, "invalid response from adapter")
-			}
-
-			return resp, nil
-		}
-	case <-ctx.Done():
-		return nil, ctx.Err()
-	}
+	agent.requestQueue.RequestComplete()
+	return client.GetOnuImages(ctx, id)
 }