[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index cd879c6..1a83cb2 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -25,9 +25,11 @@
 	"sync"
 	"time"
 
+	"github.com/opencord/voltha-protos/v5/go/adapter_services"
+	"github.com/opencord/voltha-protos/v5/go/core"
+
 	"github.com/cenkalti/backoff/v3"
 	"github.com/gogo/protobuf/proto"
-	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/rw_core/config"
 	"google.golang.org/grpc/codes"
@@ -38,32 +40,34 @@
 	"github.com/opencord/voltha-go/rw_core/core/device/flow"
 	"github.com/opencord/voltha-go/rw_core/core/device/group"
 	"github.com/opencord/voltha-go/rw_core/core/device/port"
-	"github.com/opencord/voltha-go/rw_core/core/device/remote"
 	"github.com/opencord/voltha-go/rw_core/core/device/transientstate"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"github.com/opencord/voltha-protos/v4/go/common"
-	"github.com/opencord/voltha-protos/v4/go/extension"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-protos/v5/go/common"
+	"github.com/opencord/voltha-protos/v5/go/extension"
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
 
+var errReconcileAborted = errors.New("reconcile aborted")
+var errContextExpired = errors.New("context expired")
+
 // Agent represents device agent attributes
 type Agent struct {
 	deviceID             string
 	parentID             string
 	deviceType           string
+	adapterEndpoint      string
 	isRootDevice         bool
-	adapterProxy         *remote.AdapterProxy
 	adapterMgr           *adapter.Manager
 	deviceMgr            *Manager
 	dbProxy              *model.Proxy
 	exitChannel          chan int
 	device               *voltha.Device
 	requestQueue         *coreutils.RequestQueue
-	defaultTimeout       time.Duration
+	internalTimeout      time.Duration
+	rpcTimeout           time.Duration
 	startOnce            sync.Once
 	stopOnce             sync.Once
 	stopped              bool
@@ -78,7 +82,7 @@
 }
 
 //newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
+func newAgent(device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, internalTimeout, rpcTimeout time.Duration) *Agent {
 	deviceID := device.Id
 	if deviceID == "" {
 		deviceID = coreutils.CreateDeviceID()
@@ -86,15 +90,16 @@
 
 	return &Agent{
 		deviceID:             deviceID,
-		adapterProxy:         ap,
 		isRootDevice:         device.Root,
 		parentID:             device.ParentId,
 		deviceType:           device.Type,
+		adapterEndpoint:      device.AdapterEndpoint,
 		deviceMgr:            deviceMgr,
 		adapterMgr:           deviceMgr.adapterMgr,
 		exitChannel:          make(chan int, 1),
 		dbProxy:              deviceProxy,
-		defaultTimeout:       timeout,
+		internalTimeout:      internalTimeout,
+		rpcTimeout:           rpcTimeout,
 		device:               proto.Clone(device).(*voltha.Device),
 		requestQueue:         coreutils.NewRequestQueue(),
 		config:               deviceMgr.config,
@@ -132,22 +137,23 @@
 			} else if !have {
 				return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
 			}
+			logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID, "adapter-endpoint": device.AdapterEndpoint, "type": device.Type})
 		}
-		agent.deviceType = device.Adapter
+		agent.deviceType = device.Type
+		agent.adapterEndpoint = device.AdapterEndpoint
 		agent.device = proto.Clone(device).(*voltha.Device)
 		// load the ports from KV to cache
 		agent.portLoader.Load(ctx)
 		agent.transientStateLoader.Load(ctx)
-
-		logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID})
 	} else {
 		// Create a new device
 		var desc string
+		var err error
 		prevState := common.AdminState_UNKNOWN
 		currState := common.AdminState_UNKNOWN
-		operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+		requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
 
-		defer agent.logDeviceUpdate(ctx, "createDevice", &prevState, &currState, operStatus, &desc)
+		defer func() { agent.logDeviceUpdate(ctx, &prevState, &currState, requestStatus, err, desc) }()
 
 		// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
 		// is a new device, so populate them here before passing the device to ldProxy.Set.
@@ -162,13 +168,13 @@
 			device.Vlan = deviceToCreate.ProxyAddress.ChannelId
 		}
 
-		// Add the initial device to the local model
-		if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
-			desc = fmt.Sprintf("failed-adding-device-%s: %s", agent.deviceID, err.Error())
-			return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
+		// Save the device to the model
+		if err = agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
+			err = status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
+			return nil, err
 		}
 		_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, device.OperStatus, device.ConnectStatus, prevState, device, time.Now().Unix())
-		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
 		agent.device = device
 	}
 	startSucceeded = true
@@ -225,8 +231,9 @@
 		return // not found in kv
 	}
 
-	agent.deviceType = device.Adapter
+	agent.deviceType = device.Type
 	agent.device = device
+	agent.adapterEndpoint = device.AdapterEndpoint
 	agent.portLoader.Load(ctx)
 	agent.transientStateLoader.Load(ctx)
 
@@ -234,139 +241,68 @@
 }
 
 // onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
-// and the only action required is to publish a successful result on kafka
-func (agent *Agent) onSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
-	logger.Debugw(ctx, "response-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
-	// TODO: Post success message onto kafka
+func (agent *Agent) onSuccess(ctx context.Context, prevState, currState *common.AdminState_Types, deviceUpdateLog bool) {
+	if deviceUpdateLog {
+		requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
+		desc := "adapter-response"
+		agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, nil, desc)
+		return
+	}
+	logger.Debugw(ctx, "successful-operation", log.Fields{"device-id": agent.deviceID, "rpc": coreutils.GetRPCMetadataFromContext(ctx)})
 }
 
 // onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
 // and the only action required is to publish the failed result on kafka
-func (agent *Agent) onFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
-	if res, ok := response.(error); ok {
-		logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
-	} else {
-		logger.Errorw(ctx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
-	}
-	// TODO: Post failure message onto kafka
-}
+func (agent *Agent) onFailure(ctx context.Context, err error, prevState, currState *common.AdminState_Types, deviceUpdateLog bool) {
+	// Send an event on kafka
+	rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+	go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+		voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 
-func (agent *Agent) waitForAdapterForceDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
-	onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
-	defer cancel()
-	select {
-	case rpcResponse, ok := <-ch:
-		if !ok {
-			onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
-		} else if rpcResponse.Err != nil {
-			onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
-		} else {
-			onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
-		}
-	case <-ctx.Done():
-		onFailure(ctx, rpc, ctx.Err(), reqArgs)
+	// Log the device update event
+	if deviceUpdateLog {
+		requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+		desc := "adapter-response"
+		agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, err, desc)
+		return
 	}
+	logger.Errorw(ctx, "failed-operation", log.Fields{"error": err, "device-id": agent.deviceID, "rpc": coreutils.GetRPCMetadataFromContext(ctx)})
 }
 
 // onDeleteSuccess is a common callback for scenarios where we receive a nil response following a delete request
 // to an adapter.
-func (agent *Agent) onDeleteSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
-	logger.Debugw(ctx, "response-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
+func (agent *Agent) onDeleteSuccess(ctx context.Context, prevState, currState *common.AdminState_Types) {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err, "args": reqArgs})
+		logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err})
 	}
 	previousDeviceTransientState := agent.getTransientState()
 	newDevice := agent.cloneDeviceWithoutLock()
 	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, newDevice,
-		voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE, previousDeviceTransientState); err != nil {
-		logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err, "args": reqArgs})
+		core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE, previousDeviceTransientState); err != nil {
+		logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err})
 	}
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
+	desc := "adapter-response"
+	agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, nil, desc)
 }
 
 // onDeleteFailure is a common callback for scenarios where we receive an error response following a delete request
 //  to an adapter and the only action required is to return the error response.
-func (agent *Agent) onDeleteFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
-	if res, ok := response.(error); ok {
-		logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
-	} else {
-		logger.Errorw(ctx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
-	}
+func (agent *Agent) onDeleteFailure(ctx context.Context, err error, prevState, currState *common.AdminState_Types) {
+	logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": coreutils.GetRPCMetadataFromContext(ctx), "device-id": agent.deviceID, "error": err})
+
 	//Only updating of transient state is required, no transition.
-	if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
-		logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
+	if er := agent.updateTransientState(ctx, core.DeviceTransientState_DELETE_FAILED); er != nil {
+		logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID, "error": er})
 	}
+	rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+	go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+		voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 
-}
-
-func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
-	onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
-	defer cancel()
-	var rpce *voltha.RPCEvent
-	defer func() {
-		if rpce != nil {
-			agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
-				voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
-		}
-	}()
-	select {
-	case rpcResponse, ok := <-ch:
-		if !ok {
-			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
-			onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
-			//add failure
-		} else if rpcResponse.Err != nil {
-			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
-			onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
-			//add failure
-		} else {
-			onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
-		}
-	case <-ctx.Done():
-		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
-		onFailure(ctx, rpc, ctx.Err(), reqArgs)
-	}
-}
-
-func (agent *Agent) waitForAdapterResponseAndLogDeviceUpdate(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
-	onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, prevState *common.AdminState_Types, reqArgs ...interface{}) {
-	defer cancel()
-	var desc string
-	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
-	defer func() {
-		currAdminState := prevState
-		if d, _ := agent.getDeviceReadOnly(ctx); d != nil {
-			currAdminState = &d.AdminState
-		}
-		agent.logDeviceUpdate(ctx, rpc, prevState, currAdminState, operStatus, &desc)
-	}()
-	var rpce *voltha.RPCEvent
-	defer func() {
-		if rpce != nil {
-			agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
-				voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
-		}
-	}()
-
-	select {
-	case rpcResponse, ok := <-ch:
-		if !ok {
-			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
-			onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
-			//add failure
-		} else if rpcResponse.Err != nil {
-			desc = rpcResponse.Err.Error()
-			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
-			onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
-			//add failure
-		} else {
-			operStatus.Code = common.OperationResp_OPERATION_SUCCESS
-			onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
-		}
-	case <-ctx.Done():
-		desc = ctx.Err().Error()
-		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
-		onFailure(ctx, rpc, ctx.Err(), reqArgs)
-	}
+	// Log the device update event
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	desc := "adapter-response"
+	agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, err, desc)
 }
 
 // getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever.
@@ -390,117 +326,128 @@
 	return proto.Clone(agent.device).(*voltha.Device)
 }
 
+func (agent *Agent) updateDeviceTypeAndEndpoint(ctx context.Context) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	changed := false
+	cloned := agent.cloneDeviceWithoutLock()
+	if cloned.Type == "" {
+		adapterType, err := agent.adapterMgr.GetAdapterType(cloned.Type)
+		if err != nil {
+			agent.requestQueue.RequestComplete()
+			return err
+		}
+		cloned.Type = adapterType
+		changed = true
+	}
+
+	if cloned.AdapterEndpoint == "" {
+		var err error
+		if cloned.AdapterEndpoint, err = agent.adapterMgr.GetAdapterEndpoint(ctx, cloned.Id, cloned.Type); err != nil {
+			agent.requestQueue.RequestComplete()
+			return err
+		}
+		agent.adapterEndpoint = cloned.AdapterEndpoint
+		changed = true
+	}
+
+	if changed {
+		return agent.updateDeviceAndReleaseLock(ctx, cloned)
+	}
+	agent.requestQueue.RequestComplete()
+	return nil
+}
+
 // enableDevice activates a preprovisioned or a disable device
 func (agent *Agent) enableDevice(ctx context.Context) error {
 	//To preserve and use oldDevice state as prev state in new device
+	var err error
 	var desc string
-	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	var prevAdminState, currAdminState common.AdminState_Types
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
 
-	defer agent.logDeviceUpdate(ctx, "enableDevice", nil, nil, operStatus, &desc)
+	defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, requestStatus, err, desc) }()
 
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})
 
-	prevDeviceState := agent.device.AdminState
-
 	oldDevice := agent.getDeviceReadOnlyWithoutLock()
+	prevAdminState = oldDevice.AdminState
 
 	if !agent.proceedWithRequest(oldDevice) {
 		agent.requestQueue.RequestComplete()
-
-		desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconcile is in progress/failed.", agent.deviceID)
-		return status.Error(codes.FailedPrecondition, desc)
+		err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed: %s", agent.deviceID)
+		return err
 	}
 
 	if oldDevice.AdminState == voltha.AdminState_ENABLED {
 		logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
 		agent.requestQueue.RequestComplete()
-		desc = fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id)
-		return status.Error(codes.FailedPrecondition, desc)
-	}
-
-	// First figure out which adapter will handle this device type.  We do it at this stage as allow devices to be
-	// pre-provisioned with the required adapter not registered.   At this stage, since we need to communicate
-	// with the adapter then we need to know the adapter that will handle this request
-	adapterName, err := agent.adapterMgr.GetAdapterType(oldDevice.Type)
-	if err != nil {
-		agent.requestQueue.RequestComplete()
-		desc = err.Error()
+		err = status.Errorf(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id))
 		return err
 	}
 
+	// Verify whether there is a device type that supports this device type
+	_, err = agent.adapterMgr.GetAdapterType(oldDevice.Type)
+	if err != nil {
+		agent.requestQueue.RequestComplete()
+		return err
+	}
+
+	// Update device adapter endpoint if not set.  This is set once by the Core and use as is by the adapters.  E.g if this is a
+	// child device then the parent adapter will use this device's adapter endpoint (set here) to communicate with it.
 	newDevice := agent.cloneDeviceWithoutLock()
-	newDevice.Adapter = adapterName
+	if newDevice.AdapterEndpoint == "" {
+		if newDevice.AdapterEndpoint, err = agent.adapterMgr.GetAdapterEndpoint(ctx, newDevice.Id, newDevice.Type); err != nil {
+			agent.requestQueue.RequestComplete()
+			return err
+		}
+		agent.adapterEndpoint = newDevice.AdapterEndpoint
+	}
 
 	// Update the Admin State and set the operational state to activating before sending the request to the Adapters
 	newDevice.AdminState = voltha.AdminState_ENABLED
 	newDevice.OperStatus = voltha.OperStatus_ACTIVATING
 
-	if err := agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
-		desc = err.Error()
-		return err
-	}
-
 	// Adopt the device if it was in pre-provision state.  In all other cases, try to re-enable it.
-	var ch chan *kafka.RpcResponse
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-	subCtx = coreutils.WithFromTopicMetadataFromContext(subCtx, ctx)
-
-	if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
-		ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
-	} else {
-		ch, err = agent.adapterProxy.ReEnableDevice(subCtx, newDevice)
-	}
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
-		cancel()
-		desc = err.Error()
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": newDevice.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
 		return err
 	}
-
-	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
-
-	// Wait for response
-	go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
-	return nil
-}
-
-func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse, response coreutils.Response) {
-	defer cancel()
-	var desc string
-	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
-	defer agent.logDeviceUpdate(ctx, rpc, nil, nil, operStatus, &desc)
-
-	var rpce *voltha.RPCEvent
-	defer func() {
-		if rpce != nil {
-			agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
-				voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
+	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+	requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	go func() {
+		defer cancel()
+		var err error
+		if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
+			_, err = client.AdoptDevice(subCtx, newDevice)
+		} else {
+			_, err = client.ReEnableDevice(subCtx, newDevice)
+		}
+		if err == nil {
+			agent.onSuccess(subCtx, nil, nil, true)
+		} else {
+			agent.onFailure(subCtx, err, nil, nil, true)
 		}
 	}()
-	select {
-	case rpcResponse, ok := <-ch:
-		if !ok {
-			//add failure
-			desc = "Response Channel Closed"
-			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
-			response.Error(status.Errorf(codes.Aborted, "channel-closed"))
-		} else if rpcResponse.Err != nil {
-			//add failure
-			desc = rpcResponse.Err.Error()
-			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
-			response.Error(rpcResponse.Err)
-		} else {
-			operStatus.Code = common.OperationResp_OPERATION_SUCCESS
-			response.Done()
-		}
-	case <-ctx.Done():
-		desc = ctx.Err().Error()
-		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
-		response.Error(ctx.Err())
+
+	// Update device
+	if err = agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
+		return err
 	}
+	currAdminState = newDevice.AdminState
+	return nil
 }
 
 //addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
@@ -516,8 +463,8 @@
 	if grpResponse, err = agent.addGroupsToAdapter(ctx, newGroups, flowMetadata); err != nil {
 		return err
 	}
-	if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); errs != nil {
-		logger.Warnw(ctx, "no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
+	if errs := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, flwResponse, grpResponse); errs != nil {
+		logger.Warnw(ctx, "adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
 		return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
 	}
 	return nil
@@ -535,7 +482,7 @@
 		return err
 	}
 
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, flwResponse, grpResponse); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -553,7 +500,7 @@
 		return err
 	}
 
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, flwResponse, grpResponse); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -561,73 +508,81 @@
 
 //disableDevice disable a device
 func (agent *Agent) disableDevice(ctx context.Context) error {
+	var err error
 	var desc string
-	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	var prevAdminState, currAdminState common.AdminState_Types
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, requestStatus, err, desc) }()
 
-	prevDeviceState := agent.device.AdminState
-
-	defer agent.logDeviceUpdate(ctx, "disableDevice", nil, nil, operStatus, &desc)
-
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		desc = err.Error()
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})
 
 	cloned := agent.cloneDeviceWithoutLock()
+	prevAdminState = agent.device.AdminState
 
 	if !agent.proceedWithRequest(cloned) {
+		err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed: %s", agent.deviceID)
 		agent.requestQueue.RequestComplete()
-		desc = fmt.Sprintf("deviceId:%s,Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.", agent.deviceID)
-		return status.Errorf(codes.FailedPrecondition, desc)
+		return err
 	}
 
 	if cloned.AdminState == voltha.AdminState_DISABLED {
 		desc = "device-already-disabled"
-		logger.Debugw(ctx, "device-already-disabled", log.Fields{"device-id": agent.deviceID})
 		agent.requestQueue.RequestComplete()
 		return nil
 	}
 	if cloned.AdminState == voltha.AdminState_PREPROVISIONED {
 		agent.requestQueue.RequestComplete()
-		desc = fmt.Sprintf("deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
-		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
+		err = status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
+		return err
 	}
 
 	// Update the Admin State and operational state before sending the request out
 	cloned.AdminState = voltha.AdminState_DISABLED
 	cloned.OperStatus = voltha.OperStatus_UNKNOWN
 
-	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
-		return err
-	}
-
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
-	ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
-		cancel()
-		desc = err.Error()
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
 		return err
 	}
-	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+	requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	go func() {
+		defer cancel()
+		_, err := client.DisableDevice(subCtx, cloned)
+		if err == nil {
+			agent.onSuccess(subCtx, nil, nil, true)
+		} else {
+			agent.onFailure(subCtx, err, nil, nil, true)
+		}
+	}()
 
-	// Wait for response
-	go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
+	// Update device
+	if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+		return err
+	}
+	currAdminState = cloned.AdminState
 
 	return nil
 }
 
 func (agent *Agent) rebootDevice(ctx context.Context) error {
 	var desc string
-	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	var err error
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
 
-	prevDeviceState := agent.device.AdminState
-
-	defer agent.logDeviceUpdate(ctx, "rebootDevice", nil, nil, operStatus, &desc)
-
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		desc = err.Error()
 		return err
 	}
@@ -637,22 +592,32 @@
 	device := agent.getDeviceReadOnlyWithoutLock()
 
 	if !agent.proceedWithRequest(device) {
-		desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.", agent.deviceID)
-		return status.Errorf(codes.FailedPrecondition, desc)
-	}
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
-	ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
-	if err != nil {
-		cancel()
-		desc = err.Error()
+		err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed:%s", agent.deviceID)
 		return err
 	}
-	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
 
-	// Wait for response
-	go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": device.AdapterEndpoint,
+			})
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+	requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	go func() {
+		defer cancel()
+		_, err := client.RebootDevice(subCtx, device)
+		if err == nil {
+			agent.onSuccess(subCtx, nil, nil, true)
+		} else {
+			agent.onFailure(subCtx, err, nil, nil, true)
+		}
+	}()
 	return nil
 }
 
@@ -660,50 +625,54 @@
 	logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})
 
 	var desc string
-	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	var err error
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
 
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		desc = err.Error()
-		agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
 	// Get the device Transient state, return err if it is DELETING
 	previousDeviceTransientState := agent.getTransientState()
-
-	if agent.isStateDeleting(previousDeviceTransientState) {
-		agent.requestQueue.RequestComplete()
-		desc = fmt.Sprintf("deviceId:%s, Device Deletion is in progress",
-			agent.deviceID)
-		agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
-		return status.Error(codes.FailedPrecondition, desc)
-	}
-
-	//Send stop Reconcile if in progress
-	agent.stopReconcile()
-
 	device := agent.cloneDeviceWithoutLock()
-	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
-		voltha.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
+	if !agent.isForceDeletingAllowed(previousDeviceTransientState, device) {
+		agent.requestQueue.RequestComplete()
+		err = status.Error(codes.FailedPrecondition, fmt.Sprintf("deviceId:%s, force deletion is in progress", agent.deviceID))
 		return err
 	}
-	previousAdminState := device.AdminState
-	if previousAdminState != ic.AdminState_PREPROVISIONED {
-		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-		subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
 
-		ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
+	previousAdminState := device.AdminState
+	if previousAdminState != common.AdminState_PREPROVISIONED {
+		var client adapter_services.AdapterServiceClient
+		client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 		if err != nil {
-			cancel()
-			desc = err.Error()
-			agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
+			logger.Errorw(ctx, "grpc-client-nil",
+				log.Fields{
+					"error":            err,
+					"device-id":        agent.deviceID,
+					"device-type":      agent.deviceType,
+					"adapter-endpoint": device.AdapterEndpoint,
+				})
+			agent.requestQueue.RequestComplete()
 			return err
 		}
-		// As force delete will not be dependent over the response of adapter, marking this operation as success
-		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
-		agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
-		// Since it is a case of force delete, nothing needs to be done on adapter responses.
-		go agent.waitForAdapterForceDeleteResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
-			agent.onFailure)
+		subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+		requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+		go func() {
+			defer cancel()
+			_, err := client.DeleteDevice(subCtx, device)
+			if err == nil {
+				agent.onSuccess(subCtx, nil, nil, true)
+			} else {
+				agent.onFailure(subCtx, err, nil, nil, true)
+			}
+		}()
+	}
+
+	// Update device
+	if err = agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+		core.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
+		return err
 	}
 	return nil
 }
@@ -712,12 +681,11 @@
 	logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})
 
 	var desc string
-	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
-	prevState := agent.device.AdminState
+	var err error
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
 
-	defer agent.logDeviceUpdate(ctx, "deleteDevice", nil, nil, operStatus, &desc)
-
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		desc = err.Error()
 		return err
 	}
@@ -726,8 +694,8 @@
 
 	if !agent.proceedWithRequest(device) {
 		agent.requestQueue.RequestComplete()
-		desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", agent.deviceID)
-		return status.Error(codes.FailedPrecondition, desc)
+		err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed: %s", agent.deviceID)
+		return err
 	}
 
 	// Get the device Transient state, return err if it is DELETING
@@ -735,38 +703,48 @@
 
 	previousAdminState := device.AdminState
 	// Change the device transient state to DELETING_FROM_ADAPTER  state till the device is removed from adapters.
-	currentDeviceTransientState := voltha.DeviceTransientState_DELETING_FROM_ADAPTER
+	currentDeviceTransientState := core.DeviceTransientState_DELETING_FROM_ADAPTER
 
-	if previousAdminState == ic.AdminState_PREPROVISIONED {
+	if previousAdminState == common.AdminState_PREPROVISIONED {
 		// Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device.
-		currentDeviceTransientState = voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
+		currentDeviceTransientState = core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
 	}
-	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+	// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
+	// adapter
+	if previousAdminState != common.AdminState_PREPROVISIONED {
+		var client adapter_services.AdapterServiceClient
+		client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+		if err != nil {
+			logger.Errorw(ctx, "grpc-client-nil",
+				log.Fields{
+					"error":            err,
+					"device-id":        agent.deviceID,
+					"device-type":      agent.deviceType,
+					"adapter-endpoint": device.AdapterEndpoint,
+				})
+			agent.requestQueue.RequestComplete()
+			return err
+		}
+		subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+		requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+		go func() {
+			defer cancel()
+			_, err := client.DeleteDevice(subCtx, device)
+			if err == nil {
+				agent.onDeleteSuccess(subCtx, nil, nil)
+			} else {
+				agent.onDeleteFailure(subCtx, err, nil, nil)
+			}
+		}()
+	}
+
+	// Update device and release lock
+	if err = agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
 		currentDeviceTransientState, previousDeviceTransientState); err != nil {
 		desc = err.Error()
 		return err
 	}
-	// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
-	// adapter
-	if previousAdminState != ic.AdminState_PREPROVISIONED {
-		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-		subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
 
-		ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
-		if err != nil {
-			cancel()
-			//updating of transient state is required in error
-			if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
-				logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
-			}
-			desc = err.Error()
-			return err
-		}
-
-		operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
-		go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
-			agent.onDeleteFailure, &prevState)
-	}
 	return nil
 }
 
@@ -789,67 +767,66 @@
 	if err != nil {
 		return nil, err
 	}
-	ch, err := agent.adapterProxy.GetOfpDeviceInfo(ctx, device)
+
+	// Get the gRPC client
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
 		return nil, err
 	}
 
-	// Wait for adapter response
-	rpcResponse, ok := <-ch
-	if !ok {
-		return nil, status.Errorf(codes.Aborted, "channel-closed")
-	}
-	if rpcResponse.Err != nil {
-		return nil, rpcResponse.Err
-	}
-	// Successful response
-	switchCap := &ic.SwitchCapability{}
-	if err := ptypes.UnmarshalAny(rpcResponse.Reply, switchCap); err != nil {
-		return nil, err
-	}
-	return switchCap, nil
+	return client.GetOfpDeviceInfo(ctx, device)
 }
 
-func (agent *Agent) onPacketFailure(ctx context.Context, rpc string, response interface{}, args ...interface{}) {
-	// packet data is encoded in the args param as the first parameter
-	var packet []byte
-	if len(args) >= 1 {
-		if pkt, ok := args[0].([]byte); ok {
-			packet = pkt
-		}
-	}
-	var errResp error
-	if err, ok := response.(error); ok {
-		errResp = err
-	}
-	logger.Warnw(ctx, "packet-out-error", log.Fields{
+func (agent *Agent) onPacketFailure(ctx context.Context, err error, packet *ofp.OfpPacketOut) {
+	logger.Errorw(ctx, "packet-out-error", log.Fields{
 		"device-id": agent.deviceID,
-		"error":     errResp,
-		"packet":    hex.EncodeToString(packet),
+		"error":     err.Error(),
+		"packet":    hex.EncodeToString(packet.Data),
 	})
+	rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+	go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+		voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 }
 
 func (agent *Agent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error {
-	// If deviceType=="" then we must have taken ownership of this device.
-	// Fixes VOL-2226 where a core would take ownership and have stale data
 	if agent.deviceType == "" {
 		agent.reconcileWithKVStore(ctx)
 	}
 	//	Send packet to adapter
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
-	ch, err := agent.adapterProxy.PacketOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
-		cancel()
-		return nil
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":       err,
+				"device-id":   agent.deviceID,
+				"device-type": agent.deviceType,
+			})
+		return err
 	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "packetOut", ch, agent.onSuccess, agent.onPacketFailure, packet.Data)
+	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+	go func() {
+		defer cancel()
+		_, err := client.SendPacketOut(subCtx, &ic.PacketOut{
+			DeviceId:     agent.deviceID,
+			EgressPortNo: outPort,
+			Packet:       packet,
+		})
+		if err == nil {
+			agent.onSuccess(subCtx, nil, nil, false)
+		} else {
+			agent.onPacketFailure(subCtx, err, packet)
+		}
+	}()
 	return nil
 }
 
 func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	var err error
+	var desc string
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id})
@@ -863,27 +840,40 @@
 	cloned.Vlan = device.Vlan
 	cloned.Reason = device.Reason
 	cloned.ImageDownloads = device.ImageDownloads
-	return agent.updateDeviceAndReleaseLock(ctx, cloned)
+	cloned.OperStatus = device.OperStatus
+	cloned.ConnectStatus = device.ConnectStatus
+	if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err == nil {
+		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
+	}
+	return err
 }
 
 func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	var err error
+	var desc string
+	opStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, opStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
 
 	cloned := agent.cloneDeviceWithoutLock()
 	// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
 	if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
-		logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
+		logger.Debugw(ctx, "update-device-conn-status", log.Fields{"ok": ok, "val": s})
 		cloned.ConnectStatus = connStatus
 	}
 	if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
-		logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
+		logger.Debugw(ctx, "update-device-oper-status", log.Fields{"ok": ok, "val": s})
 		cloned.OperStatus = operStatus
 	}
 	logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": cloned.Id, "oper-status": cloned.OperStatus, "connect-status": cloned.ConnectStatus})
 	// Store the device
-	return agent.updateDeviceAndReleaseLock(ctx, cloned)
+	if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err == nil {
+		opStatus.Code = common.OperationResp_OPERATION_SUCCESS
+	}
+	return err
 }
 
 // TODO: A generic device update by attribute
@@ -926,7 +916,12 @@
 }
 
 func (agent *Agent) simulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	var err error
+	var desc string
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
 	defer agent.requestQueue.RequestComplete()
@@ -934,15 +929,28 @@
 
 	device := agent.getDeviceReadOnlyWithoutLock()
 
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
-	ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
-		cancel()
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": device.AdapterEndpoint,
+			})
 		return err
 	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "simulateAlarm", ch, agent.onSuccess, agent.onFailure)
+	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+	requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	go func() {
+		defer cancel()
+		_, err := client.SimulateAlarm(subCtx, &ic.SimulateAlarmMessage{Device: device, Request: simulateReq})
+		if err == nil {
+			agent.onSuccess(subCtx, nil, nil, false)
+		} else {
+			agent.onFailure(subCtx, err, nil, nil, false)
+		}
+	}()
 	return nil
 }
 
@@ -990,7 +998,7 @@
 // This function updates the device transient in the DB through loader, releases the device lock, and runs any state transitions.
 // The calling function MUST hold the device lock.  The caller MUST NOT modify the device after this is called.
 func (agent *Agent) updateDeviceWithTransientStateAndReleaseLock(ctx context.Context, device *voltha.Device,
-	transientState, prevTransientState voltha.DeviceTransientState_Types) error {
+	transientState, prevTransientState core.DeviceTransientState_Types) error {
 	// fail early if this agent is no longer valid
 	if agent.stopped {
 		agent.requestQueue.RequestComplete()
@@ -1004,9 +1012,10 @@
 	// update in db
 	if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
 		//Reverting TransientState update
-		err := agent.updateTransientState(ctx, prevTransientState)
-		logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
-			"previous-transient-state": prevTransientState, "current-transient-state": transientState})
+		if errTransient := agent.updateTransientState(ctx, prevTransientState); errTransient != nil {
+			logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
+				"previous-transient-state": prevTransientState, "current-transient-state": transientState, "error": errTransient})
+		}
 		agent.requestQueue.RequestComplete()
 		return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
 	}
@@ -1023,44 +1032,47 @@
 
 	// release lock before processing transition
 	agent.requestQueue.RequestComplete()
-	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
-	if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
-		device, prevDevice, transientState, prevTransientState); err != nil {
-		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
-		// Sending RPC EVENT here
-		rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
-		agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
-			nil, time.Now().Unix())
-	}
+	go func() {
+		subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+		if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
+			device, prevDevice, transientState, prevTransientState); err != nil {
+			logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+			// Sending RPC EVENT here
+			rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+			agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+				nil, time.Now().Unix())
+		}
+	}()
 	return nil
 }
 func (agent *Agent) updateDeviceReason(ctx context.Context, reason string) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
+
+	var err error
+	var desc string
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
 
-	logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
-
-	var desc string
-	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
-
-	defer agent.logDeviceUpdate(ctx, "updateDeviceReason", nil, nil, operStatus, &desc)
-
 	cloned := agent.cloneDeviceWithoutLock()
 	cloned.Reason = reason
-	retErr := agent.updateDeviceAndReleaseLock(ctx, cloned)
-	if retErr != nil {
-		desc = retErr.Error()
-	} else {
-		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
-		desc = reason
+	if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err == nil {
+		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
 	}
-	return retErr
+	return err
 }
 
 func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
 	logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
 
+	var err error
+	var desc string
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
 	// Remove the associated peer ports on the parent device
 	for portID := range agent.portLoader.ListIDs() {
 		if portHandle, have := agent.portLoader.Lock(portID); have {
@@ -1082,154 +1094,199 @@
 	}
 
 	//send request to adapter
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
-	ch, err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, device)
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
-		cancel()
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": device.AdapterEndpoint,
+			})
 		return err
 	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "childDeviceLost", ch, agent.onSuccess, agent.onFailure)
+	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+	requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	go func() {
+		defer cancel()
+		_, err := client.ChildDeviceLost(subCtx, device)
+		if err == nil {
+			agent.onSuccess(subCtx, nil, nil, true)
+		} else {
+			agent.onFailure(subCtx, err, nil, nil, true)
+		}
+	}()
 	return nil
 }
 
 func (agent *Agent) startOmciTest(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return nil, err
-	}
+	var err error
+	var desc string
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
 
-	cloned := agent.cloneDeviceWithoutLock()
-
-	if cloned.Adapter == "" {
-		adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
-		if err != nil {
-			agent.requestQueue.RequestComplete()
-			return nil, err
-		}
-		cloned.Adapter = adapterName
-	}
-
-	// Send request to the adapter
-	ch, err := agent.adapterProxy.StartOmciTest(ctx, cloned, omcitestrequest)
-	agent.requestQueue.RequestComplete()
+	// OMCI test may be performed on a pre-provisioned device.  If a device is in that state both its device type and endpoint
+	// may not have been set yet.
+	// First check if we need to update the type or endpoint
+	cloned, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
 		return nil, err
 	}
-
-	// Wait for the adapter response
-	rpcResponse, ok := <-ch
-	if !ok {
-		return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
-	}
-	if rpcResponse.Err != nil {
-		return nil, rpcResponse.Err
+	if cloned.Type == "" || cloned.AdapterEndpoint == "" {
+		if err = agent.updateDeviceTypeAndEndpoint(ctx); err != nil {
+			return nil, err
+		}
+		cloned, err = agent.getDeviceReadOnly(ctx)
+		if err != nil {
+			return nil, err
+		}
 	}
 
-	// Unmarshal and return the response
-	testResp := &voltha.TestResponse{}
-	if err := ptypes.UnmarshalAny(rpcResponse.Reply, testResp); err != nil {
-		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+	// Send request to the adapter
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		return nil, err
 	}
-	logger.Debugw(ctx, "omci_test_request-success-device-agent", log.Fields{"test-resp": testResp})
-	return testResp, nil
+
+	res, err := client.StartOmciTest(ctx, &ic.OMCITest{
+		Device:  cloned,
+		Request: omcitestrequest,
+	})
+	if err == nil {
+		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
+	}
+	return res, err
 }
 
 func (agent *Agent) getExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, valueparam *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
 	logger.Debugw(ctx, "get-ext-value", log.Fields{"device-id": agent.deviceID, "onu-id": valueparam.Id, "value-type": valueparam.Value})
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	var err error
+	var desc string
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
 
-	//send request to adapter
-	ch, err := agent.adapterProxy.GetExtValue(ctx, pdevice, cdevice, valueparam.Id, valueparam.Value)
-	agent.requestQueue.RequestComplete()
+	//send request to adapter synchronously
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, pdevice.AdapterEndpoint)
 	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": pdevice.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
 		return nil, err
 	}
 
-	// Wait for the adapter response
-	rpcResponse, ok := <-ch
-	if !ok {
-		return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
-	}
-	if rpcResponse.Err != nil {
-		return nil, rpcResponse.Err
-	}
+	// Release lock before sending to adapter
+	agent.requestQueue.RequestComplete()
 
-	// Unmarshal and return the response
-	Resp := &voltha.ReturnValues{}
-	if err := ptypes.UnmarshalAny(rpcResponse.Reply, Resp); err != nil {
-		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+	retVal, err := client.GetExtValue(ctx, &ic.GetExtValueMessage{
+		ParentDevice: pdevice,
+		ChildDevice:  cdevice,
+		ValueType:    valueparam.Value,
+	})
+	if err == nil {
+		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
 	}
-	logger.Debugw(ctx, "get-ext-value-success-device-agent", log.Fields{"Resp": Resp})
-	return Resp, nil
+	return retVal, err
 }
 
 func (agent *Agent) setExtValue(ctx context.Context, device *voltha.Device, value *voltha.ValueSet) (*empty.Empty, error) {
 	logger.Debugw(ctx, "set-ext-value", log.Fields{"device-id": value.Id})
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+
+	var err error
+	var desc string
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
 
 	//send request to adapter
-	ch, err := agent.adapterProxy.SetExtValue(ctx, device, value)
-	agent.requestQueue.RequestComplete()
+	//send request to adapter synchronously
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": device.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
 		return nil, err
 	}
+	// Release lock before sending request to adapter
+	agent.requestQueue.RequestComplete()
 
-	// Wait for the adapter response
-	rpcResponse, ok := <-ch
-	if !ok {
-		return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+	retVal, err := client.SetExtValue(ctx, &ic.SetExtValueMessage{
+		Device: device,
+		Value:  value,
+	})
+	if err == nil {
+		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
 	}
-	if rpcResponse.Err != nil {
-		return nil, rpcResponse.Err
-	}
-
-	// Unmarshal and return the response
-	logger.Debug(ctx, "set-ext-value-success-device-agent")
-	return &empty.Empty{}, nil
+	return retVal, err
 }
 
 func (agent *Agent) getSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
 	logger.Debugw(ctx, "get-single-value", log.Fields{"device-id": request.TargetId})
 
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+	var err error
+	var desc string
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
 
 	cloned := agent.cloneDeviceWithoutLock()
 
 	//send request to adapter
-	ch, err := agent.adapterProxy.GetSingleValue(ctx, cloned.Adapter, request)
-	agent.requestQueue.RequestComplete()
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        cloned.Id,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
 		return nil, err
 	}
+	// Release lock before sending request to adapter
+	agent.requestQueue.RequestComplete()
 
-	// Wait for the adapter response
-	rpcResponse, ok := <-ch
-	if !ok {
-		return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+	resp, err := client.GetSingleValue(ctx, request)
+	if err == nil {
+		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
 	}
-
-	if rpcResponse.Err != nil {
-		return nil, rpcResponse.Err
-	}
-
-	resp := &extension.SingleGetValueResponse{}
-	if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
-		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
-	}
-
-	return resp, nil
+	return resp, err
 }
 
 func (agent *Agent) setSingleValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) {
 	logger.Debugw(ctx, "set-single-value", log.Fields{"device-id": request.TargetId})
 
+	var err error
+	var desc string
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
@@ -1237,28 +1294,26 @@
 	cloned := agent.cloneDeviceWithoutLock()
 
 	//send request to adapter
-	ch, err := agent.adapterProxy.SetSingleValue(ctx, cloned.Adapter, request)
-	agent.requestQueue.RequestComplete()
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
 	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": cloned.AdapterEndpoint,
+			})
+		agent.requestQueue.RequestComplete()
 		return nil, err
 	}
+	// Release lock before sending request to adapter
+	agent.requestQueue.RequestComplete()
 
-	// Wait for the adapter response
-	rpcResponse, ok := <-ch
-	if !ok {
-		return nil, status.Errorf(codes.Aborted, "channel-closed-cloned-id-%s", agent.deviceID)
+	resp, err := client.SetSingleValue(ctx, request)
+	if err == nil {
+		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
 	}
-
-	if rpcResponse.Err != nil {
-		return nil, rpcResponse.Err
-	}
-
-	resp := &extension.SingleSetValueResponse{}
-	if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
-		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
-	}
-
-	return resp, nil
+	return resp, err
 }
 
 func (agent *Agent) proceedWithRequest(device *voltha.Device) bool {
@@ -1273,32 +1328,66 @@
 	agent.stopReconcilingMutex.Unlock()
 }
 
-func (agent *Agent) ReconcileDevice(ctx context.Context, device *voltha.Device) {
+// abortAllProcessing is invoked when an adapter managing this device is restarted
+func (agent *Agent) abortAllProcessing(ctx context.Context) error {
+	logger.Infow(ctx, "aborting-current-running-requests", log.Fields{"device-id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	// If any reconciling is in progress just abort it. The adapter is gone.
+	agent.stopReconcile()
+
+	// Update the Core device transient state accordingly
+	var updatedState core.DeviceTransientState_Types
+	switch agent.getTransientState() {
+	case core.DeviceTransientState_RECONCILE_IN_PROGRESS:
+		updatedState = core.DeviceTransientState_NONE
+	case core.DeviceTransientState_FORCE_DELETING:
+		updatedState = core.DeviceTransientState_DELETE_FAILED
+	case core.DeviceTransientState_DELETING_FROM_ADAPTER:
+		updatedState = core.DeviceTransientState_DELETE_FAILED
+	default:
+		updatedState = core.DeviceTransientState_NONE
+	}
+	if err := agent.updateTransientState(ctx, updatedState); err != nil {
+		logger.Errorf(ctx, "transient-state-update-failed", log.Fields{"error": err})
+		return err
+	}
+	return nil
+}
+
+func (agent *Agent) ReconcileDevice(ctx context.Context) {
+	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
 	var desc string
-	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
 
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		desc = err.Error()
-		agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+		agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
+		return
+	}
+
+	device := agent.getDeviceReadOnlyWithoutLock()
+	if device.AdminState == voltha.AdminState_PREPROVISIONED {
+		agent.requestQueue.RequestComplete()
+		logger.Debugw(ctx, "device-in-preprovisioning-state-reconcile-not-needed", log.Fields{"device-id": device.Id})
 		return
 	}
 
 	if !agent.proceedWithRequest(device) {
 		agent.requestQueue.RequestComplete()
-		desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
-		logger.Errorf(ctx, desc)
-		agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+		err := fmt.Errorf("cannot complete operation as device deletion/reconciling is in progress or reconcile failed for device : %s", device.Id)
+		logger.Errorw(ctx, "reconcile-failed", log.Fields{"error": err})
+		agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
 		return
 	}
 
 	//set transient state to RECONCILE IN PROGRESS
-	err := agent.updateTransientState(ctx, voltha.DeviceTransientState_RECONCILE_IN_PROGRESS)
+	err := agent.updateTransientState(ctx, core.DeviceTransientState_RECONCILE_IN_PROGRESS)
 	if err != nil {
 		agent.requestQueue.RequestComplete()
-		desc = fmt.Sprintf("Not able to set device transient state to Reconcile in progress."+
-			"Err: %s", err.Error())
-		logger.Errorf(ctx, desc)
-		agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+		logger.Errorw(ctx, "setting-transient-state-failed", log.Fields{"error": err})
+		agent.logDeviceUpdate(ctx, nil, nil, requestStatus, nil, desc)
 		return
 	}
 
@@ -1343,43 +1432,35 @@
 
 		backoffTimer = time.NewTimer(duration)
 
-		logger.Debugw(ctx, "retrying-reconciling", log.Fields{"deviceID": device.Id})
-		// Send a reconcile request to the adapter.
-		ch, err := agent.adapterProxy.ReconcileDevice(ctx, agent.device)
-		//release lock before moving further
+		logger.Debugw(ctx, "retrying-reconciling", log.Fields{"deviceID": device.Id, "endpoint": device.AdapterEndpoint})
+		// Release lock before sending request to adapter
 		agent.requestQueue.RequestComplete()
+
+		// Send a reconcile request to the adapter.
+		err := agent.sendReconcileRequestToAdapter(ctx, device)
+		if errors.Is(err, errContextExpired) || errors.Is(err, errReconcileAborted) {
+			logger.Errorw(ctx, "reconcile-aborted", log.Fields{"error": err})
+			requestStatus = &common.OperationResp{Code: common.OperationResp_OperationReturnCode(common.OperStatus_FAILED)}
+			desc = "aborted"
+			agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
+			break retry
+		}
 		if err != nil {
-			desc := fmt.Sprintf("Failed reconciling from adapter side. Err: %s", err.Error())
-			agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+			agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
 			<-backoffTimer.C
 			// backoffTimer expired continue
 			// Take lock back before retrying
 			if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-				desc = err.Error()
-				agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+				agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
 				break retry
 			}
 			continue
 		}
-
-		// if return err retry if not then break loop and quit retrying reconcile
-		if err = agent.waitForReconcileResponse(backoffTimer, ch); err != nil {
-			desc = err.Error()
-			logger.Errorf(ctx, desc)
-			agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
-			<-backoffTimer.C
-		} else {
-			operStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_IN_PROGRESS}
-			agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
-			break retry
-		}
-
-		// Take lock back before retrying
-		if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-			desc = err.Error()
-			agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
-			break retry
-		}
+		// Success
+		requestStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
+		desc = "adapter-response"
+		agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
+		break retry
 	}
 
 	// Retry loop is broken, so stop any timers and drain the channel
@@ -1400,18 +1481,23 @@
 	}
 }
 
-func (agent *Agent) waitForReconcileResponse(backoffTimer *time.Timer, ch chan *kafka.RpcResponse) error {
+func (agent *Agent) sendReconcileRequestToAdapter(ctx context.Context, device *voltha.Device) error {
+	logger.Debugw(ctx, "sending-reconcile-to-adapter", log.Fields{"device-id": device.Id, "endpoint": agent.adapterEndpoint})
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+	if err != nil {
+		return err
+	}
+	adapterResponse := make(chan error)
+	go func() {
+		_, err := client.ReconcileDevice(ctx, device)
+		adapterResponse <- err
+	}()
 	select {
 	// wait for response
-	case resp, ok := <-ch:
-		if !ok {
-			//channel-closed
-			return errors.New("channel on which reconcile response is awaited is closed")
-		} else if resp.Err != nil {
-			//error encountered
-			return fmt.Errorf("error encountered while retrying reconcile. Err: %s", resp.Err.Error())
+	case err := <-adapterResponse:
+		if err != nil {
+			return err
 		}
-
 		//In case of success quit retrying and wait for adapter to reset operation state of device
 		agent.stopReconcilingMutex.Lock()
 		agent.stopReconciling = nil
@@ -1425,33 +1511,47 @@
 		agent.stopReconcilingMutex.Unlock()
 		if !ok {
 			//channel-closed
-			return errors.New("channel used to notify to stop reconcile is closed")
+			return fmt.Errorf("reconcile channel closed:%w", errReconcileAborted)
 		}
-		return nil
-	//continue if timer expired
-	case <-backoffTimer.C:
+		return fmt.Errorf("reconciling aborted:%w", errReconcileAborted)
+	// Context expired
+	case <-ctx.Done():
+		return fmt.Errorf("context expired:%s :%w", ctx.Err(), errContextExpired)
 	}
-	return nil
 }
 
 func (agent *Agent) reconcilingCleanup(ctx context.Context) error {
 	var desc string
+	var err error
 	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		desc = err.Error()
-		agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		desc = "reconcile-cleanup-failed"
 		return err
 	}
 	defer agent.requestQueue.RequestComplete()
-	err := agent.updateTransientState(ctx, voltha.DeviceTransientState_NONE)
+	err = agent.updateTransientState(ctx, core.DeviceTransientState_NONE)
 	if err != nil {
-		desc = fmt.Sprintf("Not able to clear device transient state from Reconcile in progress."+
-			"Err: %s", err.Error())
-		logger.Errorf(ctx, desc)
-		agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+		logger.Errorf(ctx, "transient-state-update-failed", log.Fields{"error": err})
 		return err
 	}
-	operStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
-	agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+	operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 	return nil
 }
+
+func (agent *Agent) isAdapterConnectionUp(ctx context.Context) bool {
+	c, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+	return c != nil && err == nil
+}
+
+func (agent *Agent) canDeviceRequestProceed(ctx context.Context) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	if agent.proceedWithRequest(agent.device) {
+		return nil
+	}
+	return fmt.Errorf("device-cannot-process-request-%s", agent.deviceID)
+}