[VOL-2164] Update rw-core to use the Async Kafka API

This commit consists of the following:

1. Process per-device requests in the Core in the order they are
received. If there are lots of requests on a given device then
there will be some latencies introduced due to ordering.  With
recent changes in the model along with keeping the request lock
to a minimal then these latencies are reduced.  Testing did not
show and noticeable latencies.

2) Keep the request lock from the moment a request started
processing to the moment that request is sent to kafka (when
applicable).  Adapter responses are received and processed
asynchronously. Therefore, an adapter can takes all the time it
needs to process a transaction.  The Core still has a context
with timeout (configurable) to cater for cases where the adapter
does not return a response.

3) Adapter requests are processed to completion before sending a
reponse back to the adapter.  Previously, in some cases, a
separate go routine was created to process the request and a
successful response is sent to the adapter.  Now if the request
fails then the adapter will receive an error. The adapter
requests for a given device are therefore processed in the
order they are received.

4) Some changes are made when retrieving a handler to execute
a device state transition.  This was necessary as there was some
transition overlap found.

Update after multiple reviews.

Change-Id: I55a189efec1549a662f2d71e18e6eca9015a3a17
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 42d628a..837e884 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -20,6 +20,8 @@
 	"context"
 	"encoding/hex"
 	"fmt"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"reflect"
 	"sync"
 	"time"
@@ -36,6 +38,10 @@
 	"google.golang.org/grpc/status"
 )
 
+const (
+	maxOrderedDeviceRequestQueueSize = 1000
+)
+
 // DeviceAgent represents device agent attributes
 type DeviceAgent struct {
 	deviceID         string
@@ -48,13 +54,16 @@
 	clusterDataProxy *model.Proxy
 	deviceProxy      *model.Proxy
 	exitChannel      chan int
-	lockDevice       sync.RWMutex
 	device           *voltha.Device
-	defaultTimeout   int64
+	requestQueue     *coreutils.RequestQueue
+	defaultTimeout   time.Duration
+	startOnce        sync.Once
+	stopOnce         sync.Once
+	stopped          bool
 }
 
 //newDeviceAgent creates a new device agent. The device will be initialized when start() is called.
-func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout int64) *DeviceAgent {
+func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout time.Duration) *DeviceAgent {
 	var agent DeviceAgent
 	agent.adapterProxy = ap
 	if device.Id == "" {
@@ -70,27 +79,38 @@
 	agent.adapterMgr = deviceMgr.adapterMgr
 	agent.exitChannel = make(chan int, 1)
 	agent.clusterDataProxy = cdProxy
-	agent.lockDevice = sync.RWMutex{}
 	agent.defaultTimeout = timeout
 	agent.device = proto.Clone(device).(*voltha.Device)
+	agent.requestQueue = coreutils.NewRequestQueue(agent.deviceID, maxOrderedDeviceRequestQueueSize)
 	return &agent
 }
 
-// start()
-// save the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.  Otherwise,
-// it will load the data from the dB and setup the necessary callbacks and proxies. Returns the device that
+// start() saves the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.
+// Otherwise, it will load the data from the dB and setup the necessary callbacks and proxies. Returns the device that
 // was started.
 func (agent *DeviceAgent) start(ctx context.Context, deviceToCreate *voltha.Device) (*voltha.Device, error) {
-	var device *voltha.Device
+	needToStart := false
+	if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
+		return agent.getDevice(ctx)
+	}
+	var startSucceeded bool
+	defer func() {
+		if !startSucceeded {
+			if err := agent.stop(ctx); err != nil {
+				log.Errorw("failed-to-cleanup-after-unsuccessful-start", log.Fields{"device-id": agent.deviceID, "error": err})
+			}
+		}
+	}()
 
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceID})
+	// Start the request queue.  If this start fails then stop will be invoked and it requires
+	// that the request sequencer is present
+	agent.requestQueue.Start()
+
+	var device *voltha.Device
 	if deviceToCreate == nil {
 		// Load the existing device
 		loadedDevice, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
 		if err != nil {
-			log.Errorw("failed-to-get-from-cluster-data-proxy", log.Fields{"error": err})
 			return nil, err
 		}
 		if loadedDevice != nil {
@@ -99,14 +119,12 @@
 				agent.deviceType = device.Adapter
 				agent.device = proto.Clone(device).(*voltha.Device)
 			} else {
-				log.Errorw("failed-to-convert-device", log.Fields{"deviceId": agent.deviceID})
 				return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
 			}
 		} else {
-			log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceID})
-			return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
+			return nil, status.Errorf(codes.NotFound, "device-%s-loading-failed", agent.deviceID)
 		}
-		log.Debugw("device-loaded-from-dB", log.Fields{"deviceId": agent.deviceID})
+		log.Infow("device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
 	} else {
 		// Create a new device
 		// Assumption is that AdminState, FlowGroups, and Flows are unitialized since this
@@ -126,74 +144,129 @@
 		// Add the initial device to the local model
 		added, err := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceID, device, "")
 		if err != nil {
-			log.Errorw("failed-to-save-devices-to-cluster-proxy", log.Fields{"error": err})
 			return nil, err
 		}
 		if added == nil {
-			log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceID})
 			return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceID)
 		}
-		agent.device = proto.Clone(device).(*voltha.Device)
+		agent.device = device
 	}
 	var err error
 	if agent.deviceProxy, err = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceID, false); err != nil {
-		log.Errorw("failed-to-add-devices-to-cluster-proxy", log.Fields{"error": err})
 		return nil, err
 	}
 	agent.deviceProxy.RegisterCallback(model.PostUpdate, agent.processUpdate)
 
-	log.Debugw("device-agent-started", log.Fields{"deviceId": agent.deviceID})
-	return device, nil
+	startSucceeded = true
+	log.Debugw("device-agent-started", log.Fields{"device-id": agent.deviceID})
+
+	return agent.getDevice(ctx)
 }
 
 // stop stops the device agent.  Not much to do for now
-func (agent *DeviceAgent) stop(ctx context.Context) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+func (agent *DeviceAgent) stop(ctx context.Context) error {
+	needToStop := false
+	if agent.stopOnce.Do(func() { needToStop = true }); !needToStop {
+		return nil
+	}
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
-	log.Debugw("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
+	log.Infow("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
 
 	// First unregister any callbacks
-	agent.deviceProxy.UnregisterCallback(model.PostUpdate, agent.processUpdate)
+	if agent.deviceProxy != nil {
+		agent.deviceProxy.UnregisterCallback(model.PostUpdate, agent.processUpdate)
+	}
 
 	//	Remove the device from the KV store
 	removed, err := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceID, "")
 	if err != nil {
-		log.Errorw("Failed-to-remove-device-from-cluster-data-proxy", log.Fields{"error": err})
-		return
+		return err
 	}
 	if removed == nil {
-		log.Debugw("device-already-removed", log.Fields{"id": agent.deviceID})
+		log.Debugw("device-already-removed", log.Fields{"device-id": agent.deviceID})
 	}
-	agent.exitChannel <- 1
-	log.Debugw("device-agent-stopped", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
+
+	// Stop the request queue - no more requests can be processed
+	agent.requestQueue.Stop()
+
+	close(agent.exitChannel)
+
+	agent.stopped = true
+
+	log.Infow("device-agent-stopped", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
+
+	return nil
 }
 
 // Load the most recent state from the KVStore for the device.
 func (agent *DeviceAgent) reconcileWithKVStore(ctx context.Context) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		log.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "error": err})
+		return
+	}
+	defer agent.requestQueue.RequestComplete()
 	log.Debug("reconciling-device-agent-devicetype")
 	// TODO: context timeout
 	device, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
 	if err != nil {
-		log.Errorw("Failed to get device info from cluster data proxy", log.Fields{"error": err})
+		log.Errorw("kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err})
 		return
 	}
 	if device != nil {
 		if d, ok := device.(*voltha.Device); ok {
 			agent.deviceType = d.Adapter
 			agent.device = proto.Clone(d).(*voltha.Device)
-			log.Debugw("reconciled-device-agent-devicetype", log.Fields{"Id": agent.deviceID, "type": agent.deviceType})
+			log.Debugw("reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
 		}
 	}
 }
 
+// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
+// and the only action required is to publish a successful result on kafka
+func (agent *DeviceAgent) onSuccess(rpc string, response interface{}, reqArgs ...interface{}) {
+	log.Debugw("response successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
+	// TODO: Post success message onto kafka
+}
+
+// onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
+// and the only action required is to publish the failed result on kafka
+func (agent *DeviceAgent) onFailure(rpc string, response interface{}, reqArgs ...interface{}) {
+	if res, ok := response.(error); ok {
+		log.Errorw("rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
+	} else {
+		log.Errorw("rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
+	}
+	// TODO: Post failure message onto kafka
+}
+
+func (agent *DeviceAgent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+	onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
+	defer cancel()
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			onFailure(rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+		} else if rpcResponse.Err != nil {
+			onFailure(rpc, rpcResponse.Err, reqArgs)
+		} else {
+			onSuccess(rpc, rpcResponse.Reply, reqArgs)
+		}
+	case <-ctx.Done():
+		onFailure(rpc, ctx.Err(), reqArgs)
+	}
+}
+
 // getDevice returns the device data from cache
-func (agent *DeviceAgent) getDevice() *voltha.Device {
-	agent.lockDevice.RLock()
-	defer agent.lockDevice.RUnlock()
-	return proto.Clone(agent.device).(*voltha.Device)
+func (agent *DeviceAgent) getDevice(ctx context.Context) (*voltha.Device, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	return proto.Clone(agent.device).(*voltha.Device), nil
 }
 
 // getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
@@ -203,31 +276,32 @@
 
 // enableDevice activates a preprovisioned or a disable device
 func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("enableDevice", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	log.Debugw("enableDevice", log.Fields{"device-id": agent.deviceID})
 
 	cloned := agent.getDeviceWithoutLock()
 
 	// First figure out which adapter will handle this device type.  We do it at this stage as allow devices to be
-	// pre-provisionned with the required adapter not registered.   At this stage, since we need to communicate
+	// pre-provisioned with the required adapter not registered.   At this stage, since we need to communicate
 	// with the adapter then we need to know the adapter that will handle this request
 	adapterName, err := agent.adapterMgr.getAdapterName(cloned.Type)
 	if err != nil {
-		log.Warnw("no-adapter-registered-for-device-type", log.Fields{"deviceType": cloned.Type, "deviceAdapter": cloned.Adapter})
 		return err
 	}
 	cloned.Adapter = adapterName
 
 	if cloned.AdminState == voltha.AdminState_ENABLED {
-		log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceID})
+		log.Debugw("device-already-enabled", log.Fields{"device-id": agent.deviceID})
 		return nil
 	}
 
 	if cloned.AdminState == voltha.AdminState_DELETED {
 		// This is a temporary state when a device is deleted before it gets removed from the model.
 		err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-a-deleted-device: %s ", cloned.Id))
-		log.Warnw("invalid-state", log.Fields{"id": agent.deviceID, "state": cloned.AdminState, "error": err})
 		return err
 	}
 
@@ -242,36 +316,38 @@
 		return err
 	}
 
-	// Adopt the device if it was in preprovision state.  In all other cases, try to reenable it.
+	// Adopt the device if it was in pre-provision state.  In all other cases, try to re-enable it.
 	device := proto.Clone(cloned).(*voltha.Device)
+	var ch chan *kafka.RpcResponse
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	if previousAdminState == voltha.AdminState_PREPROVISIONED {
-		if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
-			log.Debugw("adoptDevice-error", log.Fields{"id": agent.deviceID, "error": err})
-			return err
-		}
+		ch, err = agent.adapterProxy.adoptDevice(subCtx, device)
 	} else {
-		if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
-			log.Debugw("renableDevice-error", log.Fields{"id": agent.deviceID, "error": err})
-			return err
-		}
+		ch, err = agent.adapterProxy.reEnableDevice(subCtx, device)
 	}
+	if err != nil {
+		cancel()
+		return err
+	}
+	// Wait for response
+	go agent.waitForAdapterResponse(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
 
-func (agent *DeviceAgent) sendBulkFlowsToAdapters(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
-	if err := agent.adapterProxy.UpdateFlowsBulk(ctx, device, flows, groups, flowMetadata); err != nil {
-		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.deviceID, "error": err})
-		response.Error(err)
+func (agent *DeviceAgent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
+	defer cancel()
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			response.Error(status.Errorf(codes.Aborted, "channel-closed"))
+		} else if rpcResponse.Err != nil {
+			response.Error(rpcResponse.Err)
+		} else {
+			response.Done()
+		}
+	case <-ctx.Done():
+		response.Error(ctx.Err())
 	}
-	response.Done()
-}
-
-func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(ctx context.Context, device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
-	if err := agent.adapterProxy.UpdateFlowsIncremental(ctx, device, flows, groups, flowMetadata); err != nil {
-		log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.deviceID, "error": err})
-		response.Error(err)
-	}
-	response.Done()
 }
 
 //deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice.  This function will
@@ -334,17 +410,24 @@
 }
 
 func (agent *DeviceAgent) addFlowsAndGroupsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups, "flowMetadata": flowMetadata})
+	log.Debugw("add-flows-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups, "flow-metadata": flowMetadata})
 
 	if (len(newFlows) | len(newGroups)) == 0 {
-		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
+		log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
 		return coreutils.DoneResponse(), nil
 	}
 
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return coreutils.DoneResponse(), err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	device := agent.getDeviceWithoutLock()
+	dType := agent.adapterMgr.getDeviceType(device.Type)
+	if dType == nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+
 	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
 	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
 
@@ -356,29 +439,32 @@
 
 	// Sanity check
 	if (len(updatedAllFlows) | len(flowsToDelete) | len(updatedAllGroups) | len(groupsToDelete)) == 0 {
-		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
+		log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
 		return coreutils.DoneResponse(), nil
 	}
 
-	// Send update to adapters
-	// Create two channels to receive responses from the dB and from the adapters.
-	// Do not close these channels as this function may exit on timeout before the dB or adapters get a chance
-	// to send their responses.  These channels will be garbage collected once all the responses are
-	// received
-	response := coreutils.NewResponse()
-	dType := agent.adapterMgr.getDeviceType(device.Type)
-	if dType == nil {
-		log.Errorw("non-existent device type", log.Fields{"deviceType": device.Type})
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent device type %s", device.Type)
+	// store the changed data
+	device.Flows = &voltha.Flows{Items: updatedAllFlows}
+	device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
+	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-device-%s", agent.deviceID)
 	}
-	if !dType.AcceptsAddRemoveFlowUpdates {
 
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
 		if len(updatedAllGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedAllGroups) && len(updatedAllFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedAllFlows) {
-			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
+			log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
+			cancel()
 			return coreutils.DoneResponse(), nil
 		}
-		go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata, response)
-
+		rpcResponse, err := agent.adapterProxy.updateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: newFlows},
@@ -389,16 +475,13 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
+		rpcResponse, err := agent.adapterProxy.updateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	}
-
-	// store the changed data
-	device.Flows = &voltha.Flows{Items: updatedAllFlows}
-	device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
-	}
-
 	return response, nil
 }
 
@@ -409,25 +492,31 @@
 	if err != nil {
 		return err
 	}
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
-		log.Debugw("Failed to get response from adapter[or] DB", log.Fields{"result": res})
-		return status.Errorf(codes.Aborted, "errors-%s", res)
+	if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); errs != nil {
+		log.Warnw("no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
+		return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
 	}
 	return nil
 }
 
 func (agent *DeviceAgent) deleteFlowsAndGroupsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
+	log.Debugw("delete-flows-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
 
 	if (len(flowsToDel) | len(groupsToDel)) == 0 {
-		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
+		log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
 		return coreutils.DoneResponse(), nil
 	}
 
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return coreutils.DoneResponse(), err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	device := agent.getDeviceWithoutLock()
+	dType := agent.adapterMgr.getDeviceType(device.Type)
+	if dType == nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
 
 	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
 	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
@@ -451,32 +540,41 @@
 
 	log.Debugw("deleteFlowsAndGroups",
 		log.Fields{
-			"deviceId":     agent.deviceID,
-			"flowsToDel":   len(flowsToDel),
-			"flowsToKeep":  len(flowsToKeep),
-			"groupsToDel":  len(groupsToDel),
-			"groupsToKeep": len(groupsToKeep),
+			"device-id":      agent.deviceID,
+			"flows-to-del":   len(flowsToDel),
+			"flows-to-keep":  len(flowsToKeep),
+			"groups-to-del":  len(groupsToDel),
+			"groups-to-keep": len(groupsToKeep),
 		})
 
 	// Sanity check
 	if (len(flowsToKeep) | len(flowsToDel) | len(groupsToKeep) | len(groupsToDel)) == 0 {
-		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
+		log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows-to-del": flowsToDel, "groups-to-del": groupsToDel})
 		return coreutils.DoneResponse(), nil
 	}
 
-	// Send update to adapters
-	response := coreutils.NewResponse()
-	dType := agent.adapterMgr.getDeviceType(device.Type)
-	if dType == nil {
-		log.Errorw("non-existent device type", log.Fields{"deviceType": device.Type})
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent device type %s", device.Type)
+	// store the changed data
+	device.Flows = &voltha.Flows{Items: flowsToKeep}
+	device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
+	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
 	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
 			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
+			cancel()
 			return coreutils.DoneResponse(), nil
 		}
-		go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, response)
+		rpcResponse, err := agent.adapterProxy.updateFlowsBulk(subCtx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -487,18 +585,14 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
+		rpcResponse, err := agent.adapterProxy.updateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	}
-
-	// store the changed data
-	device.Flows = &voltha.Flows{Items: flowsToKeep}
-	device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
-	}
-
 	return response, nil
-
 }
 
 //deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
@@ -515,43 +609,59 @@
 }
 
 func (agent *DeviceAgent) updateFlowsAndGroupsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	log.Debugw("updateFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+	log.Debugw("updateFlowsAndGroups", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
 
 	if (len(updatedFlows) | len(updatedGroups)) == 0 {
-		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+		log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
 		return coreutils.DoneResponse(), nil
 	}
 
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return coreutils.DoneResponse(), err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	device := agent.getDeviceWithoutLock()
+	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
+	}
+	dType := agent.adapterMgr.getDeviceType(device.Type)
+	if dType == nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
 
 	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
 	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
 
 	if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
-		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+		log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
 		return coreutils.DoneResponse(), nil
 	}
 
 	log.Debugw("updating-flows-and-groups",
 		log.Fields{
-			"deviceId":      agent.deviceID,
-			"updatedFlows":  updatedFlows,
-			"updatedGroups": updatedGroups,
+			"device-id":      agent.deviceID,
+			"updated-flows":  updatedFlows,
+			"updated-groups": updatedGroups,
 		})
 
-	response := coreutils.NewResponse()
-	dType := agent.adapterMgr.getDeviceType(device.Type)
-	if dType == nil {
-		log.Errorw("non-existent device type", log.Fields{"deviceType": device.Type})
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent device type %s", device.Type)
+	// store the updated data
+	device.Flows = &voltha.Flows{Items: updatedFlows}
+	device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
+	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
 	}
 
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, response)
+		rpcResponse, err := agent.adapterProxy.updateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
 		var flowsToAdd []*ofp.OfpFlowStats
 		var flowsToDelete []*ofp.OfpFlowStats
@@ -584,16 +694,17 @@
 
 		log.Debugw("updating-flows-and-groups",
 			log.Fields{
-				"deviceId":       agent.deviceID,
-				"flowsToAdd":     flowsToAdd,
-				"flowsToDelete":  flowsToDelete,
-				"groupsToAdd":    groupsToAdd,
-				"groupsToDelete": groupsToDelete,
+				"device-id":        agent.deviceID,
+				"flows-to-add":     flowsToAdd,
+				"flows-to-delete":  flowsToDelete,
+				"groups-to-add":    groupsToAdd,
+				"groups-to-delete": groupsToDelete,
 			})
 
 		// Sanity check
 		if (len(flowsToAdd) | len(flowsToDelete) | len(groupsToAdd) | len(groupsToDelete) | len(updatedGroups)) == 0 {
-			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+			log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+			cancel()
 			return coreutils.DoneResponse(), nil
 		}
 
@@ -606,14 +717,12 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
 		}
-		go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
-	}
-
-	// store the updated data
-	device.Flows = &voltha.Flows{Items: updatedFlows}
-	device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+		rpcResponse, err := agent.adapterProxy.updateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	}
 
 	return response, nil
@@ -634,9 +743,11 @@
 
 //disableDevice disable a device
 func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("disableDevice", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("disableDevice", log.Fields{"device-id": agent.deviceID})
 
 	cloned := agent.getDeviceWithoutLock()
 
@@ -646,7 +757,6 @@
 	}
 	if cloned.AdminState == voltha.AdminState_PREPROVISIONED ||
 		cloned.AdminState == voltha.AdminState_DELETED {
-		log.Debugw("device-not-enabled", log.Fields{"id": agent.deviceID})
 		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
 	}
 
@@ -656,75 +766,75 @@
 	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
-	if err := agent.adapterProxy.DisableDevice(ctx, proto.Clone(cloned).(*voltha.Device)); err != nil {
-		log.Debugw("disableDevice-error", log.Fields{"id": agent.deviceID, "error": err})
+
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.disableDevice(subCtx, proto.Clone(cloned).(*voltha.Device))
+	if err != nil {
+		cancel()
 		return err
 	}
-	return nil
-}
+	go agent.waitForAdapterResponse(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure)
 
-func (agent *DeviceAgent) updateAdminState(ctx context.Context, adminState voltha.AdminState_Types) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("updateAdminState", log.Fields{"id": agent.deviceID})
-
-	cloned := agent.getDeviceWithoutLock()
-
-	if cloned.AdminState == adminState {
-		log.Debugw("no-change-needed", log.Fields{"id": agent.deviceID, "state": adminState})
-		return nil
-	}
-	// Received an Ack (no error found above).  Now update the device in the model to the expected state
-	cloned.AdminState = adminState
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
-		return err
-	}
 	return nil
 }
 
 func (agent *DeviceAgent) rebootDevice(ctx context.Context) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("rebootDevice", log.Fields{"id": agent.deviceID})
-
-	device := agent.getDeviceWithoutLock()
-	if err := agent.adapterProxy.RebootDevice(ctx, device); err != nil {
-		log.Debugw("rebootDevice-error", log.Fields{"id": agent.deviceID, "error": err})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("rebootDevice", log.Fields{"device-id": agent.deviceID})
+
+	device := agent.getDeviceWithoutLock()
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.rebootDevice(subCtx, device)
+	if err != nil {
+		cancel()
+		return err
+	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
 
 func (agent *DeviceAgent) deleteDevice(ctx context.Context) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("deleteDevice", log.Fields{"id": agent.deviceID})
+	log.Debugw("deleteDevice", log.Fields{"device-id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	cloned := agent.getDeviceWithoutLock()
-	if cloned.AdminState == voltha.AdminState_DELETED {
-		log.Debugw("device-already-in-deleted-state", log.Fields{"id": agent.deviceID})
-		return nil
-	}
-	if cloned.AdminState != voltha.AdminState_PREPROVISIONED {
-		if err := agent.adapterProxy.DeleteDevice(ctx, cloned); err != nil {
-			log.Debugw("deleteDevice-error", log.Fields{"id": agent.deviceID, "error": err})
-			return err
-		}
-	}
-	//Set the state to deleted - this will trigger some background process to invoke parent adapter to delete child
-	//device and clean up the device as well as its association with the logical device
-	cloned.AdminState = voltha.AdminState_DELETED
 
+	previousState := cloned.AdminState
+
+	// No check is required when deleting a device.  Changing the state to DELETE will trigger the removal of this
+	// device by the state machine
+	cloned.AdminState = voltha.AdminState_DELETED
 	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
+
+	// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
+	// adapter
+	if previousState != ic.AdminState_PREPROVISIONED {
+		subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+		ch, err := agent.adapterProxy.deleteDevice(subCtx, cloned)
+		if err != nil {
+			cancel()
+			return err
+		}
+		go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onSuccess, agent.onFailure)
+	}
 	return nil
 }
 
 func (agent *DeviceAgent) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("setParentId", log.Fields{"deviceId": device.Id, "parentId": parentID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	log.Debugw("setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
 
 	cloned := agent.getDeviceWithoutLock()
 	cloned.ParentId = parentID
@@ -732,13 +842,16 @@
 	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
+
 	return nil
 }
 
 func (agent *DeviceAgent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("updatePmConfigs", log.Fields{"id": pmConfigs.Id})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
 
 	cloned := agent.getDeviceWithoutLock()
 	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
@@ -747,50 +860,58 @@
 		return err
 	}
 	// Send the request to the adapter
-	if err := agent.adapterProxy.UpdatePmConfigs(ctx, cloned, pmConfigs); err != nil {
-		log.Errorw("update-pm-configs-error", log.Fields{"id": agent.deviceID, "error": err})
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.updatePmConfigs(subCtx, cloned, pmConfigs)
+	if err != nil {
+		cancel()
 		return err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "updatePmConfigs", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
 
 func (agent *DeviceAgent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("initPmConfigs", log.Fields{"id": pmConfigs.Id})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
 
 	cloned := agent.getDeviceWithoutLock()
 	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
-	// Store the device
 	updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
 	afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, cloned, false, "")
 	if err != nil {
-		return status.Errorf(codes.Internal, "%s", agent.deviceID)
+		return err
 	}
 	if afterUpdate == nil {
-		return status.Errorf(codes.Internal, "%s", agent.deviceID)
+		return status.Errorf(codes.Internal, "pm-kv-update-failed-for-device-id-%s", agent.deviceID)
 	}
 	return nil
 }
 
 func (agent *DeviceAgent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
-	agent.lockDevice.RLock()
-	defer agent.lockDevice.RUnlock()
-	log.Debugw("listPmConfigs", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("listPmConfigs", log.Fields{"device-id": agent.deviceID})
 
 	return agent.getDeviceWithoutLock().PmConfigs, nil
 }
 
 func (agent *DeviceAgent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("downloadImage", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	log.Debugw("downloadImage", log.Fields{"device-id": agent.deviceID})
 
 	device := agent.getDeviceWithoutLock()
 
 	if device.AdminState != voltha.AdminState_ENABLED {
-		log.Debugw("device-not-enabled", log.Fields{"id": agent.deviceID})
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
 	}
 	// Save the image
 	clonedImg := proto.Clone(img).(*voltha.ImageDownload)
@@ -817,10 +938,13 @@
 			return nil, err
 		}
 		// Send the request to the adapter
-		if err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg); err != nil {
-			log.Debugw("downloadImage-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+		subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+		ch, err := agent.adapterProxy.downloadImage(ctx, cloned, clonedImg)
+		if err != nil {
+			cancel()
 			return nil, err
 		}
+		go agent.waitForAdapterResponse(subCtx, cancel, "downloadImage", ch, agent.onSuccess, agent.onFailure)
 	}
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
 }
@@ -836,15 +960,18 @@
 }
 
 func (agent *DeviceAgent) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("cancelImageDownload", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	log.Debugw("cancelImageDownload", log.Fields{"device-id": agent.deviceID})
 
 	device := agent.getDeviceWithoutLock()
 
 	// Verify whether the Image is in the list of image being downloaded
 	if !isImageRegistered(img, device) {
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
 	}
 
 	// Update image download state
@@ -861,28 +988,32 @@
 		if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 			return nil, err
 		}
-		// Send the request to the adapter
-		if err := agent.adapterProxy.CancelImageDownload(ctx, device, img); err != nil {
-			log.Debugw("cancelImageDownload-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+		subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+		ch, err := agent.adapterProxy.cancelImageDownload(subCtx, device, img)
+		if err != nil {
+			cancel()
 			return nil, err
 		}
+		go agent.waitForAdapterResponse(subCtx, cancel, "cancelImageDownload", ch, agent.onSuccess, agent.onFailure)
 	}
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
 }
 
 func (agent *DeviceAgent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("activateImage", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("activateImage", log.Fields{"device-id": agent.deviceID})
 	cloned := agent.getDeviceWithoutLock()
 
 	// Verify whether the Image is in the list of image being downloaded
 	if !isImageRegistered(img, cloned) {
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
 	}
 
 	if cloned.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
 	}
 	// Update image download state
 	for _, image := range cloned.ImageDownloads {
@@ -896,19 +1027,25 @@
 		return nil, err
 	}
 
-	if err := agent.adapterProxy.ActivateImageUpdate(ctx, proto.Clone(cloned).(*voltha.Device), img); err != nil {
-		log.Debugw("activateImage-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.activateImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+	if err != nil {
+		cancel()
 		return nil, err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "activateImageUpdate", ch, agent.onSuccess, agent.onFailure)
+
 	// The status of the AdminState will be changed following the update_download_status response from the adapter
 	// The image name will also be removed from the device list
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
 }
 
 func (agent *DeviceAgent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("revertImage", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("revertImage", log.Fields{"device-id": agent.deviceID})
 
 	cloned := agent.getDeviceWithoutLock()
 
@@ -931,31 +1068,51 @@
 		return nil, err
 	}
 
-	if err := agent.adapterProxy.RevertImageUpdate(ctx, proto.Clone(cloned).(*voltha.Device), img); err != nil {
-		log.Debugw("revertImage-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.revertImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+	if err != nil {
+		cancel()
 		return nil, err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "revertImageUpdate", ch, agent.onSuccess, agent.onFailure)
+
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
 }
 
 func (agent *DeviceAgent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("getImageDownloadStatus", log.Fields{"id": agent.deviceID})
+	log.Debugw("getImageDownloadStatus", log.Fields{"device-id": agent.deviceID})
 
-	cloned := agent.getDeviceWithoutLock()
-	resp, err := agent.adapterProxy.GetImageDownloadStatus(ctx, cloned, img)
-	if err != nil {
-		log.Debugw("getImageDownloadStatus-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	return resp, nil
+	device := agent.getDeviceWithoutLock()
+	ch, err := agent.adapterProxy.getImageDownloadStatus(ctx, device, img)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+	// Wait for the adapter response
+	rpcResponse, ok := <-ch
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+	}
+	if rpcResponse.Err != nil {
+		return nil, rpcResponse.Err
+	}
+	// Successful response
+	imgDownload := &voltha.ImageDownload{}
+	if err := ptypes.UnmarshalAny(rpcResponse.Reply, imgDownload); err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+	}
+	return imgDownload, nil
 }
 
 func (agent *DeviceAgent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("updateImageDownload", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
 
 	cloned := agent.getDeviceWithoutLock()
 
@@ -983,9 +1140,11 @@
 }
 
 func (agent *DeviceAgent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
-	agent.lockDevice.RLock()
-	defer agent.lockDevice.RUnlock()
-	log.Debugw("getImageDownload", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("getImageDownload", log.Fields{"device-id": agent.deviceID})
 
 	cloned := agent.getDeviceWithoutLock()
 	for _, image := range cloned.ImageDownloads {
@@ -997,16 +1156,18 @@
 }
 
 func (agent *DeviceAgent) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
-	agent.lockDevice.RLock()
-	defer agent.lockDevice.RUnlock()
-	log.Debugw("listImageDownloads", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("listImageDownloads", log.Fields{"device-id": agent.deviceID})
 
 	return &voltha.ImageDownloads{Items: agent.getDeviceWithoutLock().ImageDownloads}, nil
 }
 
 // getPorts retrieves the ports information of the device based on the port type.
 func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
-	log.Debugw("getPorts", log.Fields{"id": agent.deviceID, "portType": portType})
+	log.Debugw("getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
 	ports := &voltha.Ports{}
 	if device, _ := agent.deviceMgr.GetDevice(ctx, agent.deviceID); device != nil {
 		for _, port := range device.Ports {
@@ -1018,38 +1179,81 @@
 	return ports
 }
 
-// getSwitchCapability is a helper method that a logical device agent uses to retrieve the switch capability of a
-// parent device
+// getSwitchCapability retrieves the switch capability of a parent device
 func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
-	log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceID})
-	device, err := agent.deviceMgr.GetDevice(ctx, agent.deviceID)
-	if device == nil {
+	log.Debugw("getSwitchCapability", log.Fields{"device-id": agent.deviceID})
+
+	cloned, err := agent.getDevice(ctx)
+	if err != nil {
 		return nil, err
 	}
-	var switchCap *ic.SwitchCapability
-	if switchCap, err = agent.adapterProxy.GetOfpDeviceInfo(ctx, device); err != nil {
-		log.Debugw("getSwitchCapability-error", log.Fields{"id": device.Id, "error": err})
+	ch, err := agent.adapterProxy.getOfpDeviceInfo(ctx, cloned)
+	if err != nil {
+		return nil, err
+	}
+
+	// Wait for adapter response
+	rpcResponse, ok := <-ch
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "channel-closed")
+	}
+	if rpcResponse.Err != nil {
+		return nil, rpcResponse.Err
+	}
+	// Successful response
+	switchCap := &ic.SwitchCapability{}
+	if err := ptypes.UnmarshalAny(rpcResponse.Reply, switchCap); err != nil {
 		return nil, err
 	}
 	return switchCap, nil
 }
 
-// getPortCapability is a helper method that a logical device agent uses to retrieve the port capability of a
-// device
+// getPortCapability retrieves the port capability of a device
 func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
-	log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceID})
-	device, err := agent.deviceMgr.GetDevice(ctx, agent.deviceID)
-	if device == nil {
+	log.Debugw("getPortCapability", log.Fields{"device-id": agent.deviceID})
+	device, err := agent.getDevice(ctx)
+	if err != nil {
 		return nil, err
 	}
-	var portCap *ic.PortCapability
-	if portCap, err = agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo); err != nil {
-		log.Debugw("getPortCapability-error", log.Fields{"id": device.Id, "error": err})
+	ch, err := agent.adapterProxy.getOfpPortInfo(ctx, device, portNo)
+	if err != nil {
 		return nil, err
 	}
+	// Wait for adapter response
+	rpcResponse, ok := <-ch
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "channel-closed")
+	}
+	if rpcResponse.Err != nil {
+		return nil, rpcResponse.Err
+	}
+	// Successful response
+	portCap := &ic.PortCapability{}
+	if err := ptypes.UnmarshalAny(rpcResponse.Reply, portCap); err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+	}
 	return portCap, nil
 }
 
+func (agent *DeviceAgent) onPacketFailure(rpc string, response interface{}, args ...interface{}) {
+	// packet data is encoded in the args param as the first parameter
+	var packet []byte
+	if len(args) >= 1 {
+		if pkt, ok := args[0].([]byte); ok {
+			packet = pkt
+		}
+	}
+	var errResp error
+	if err, ok := response.(error); ok {
+		errResp = err
+	}
+	log.Warnw("packet-out-error", log.Fields{
+		"device-id": agent.deviceID,
+		"error":     errResp,
+		"packet":    hex.EncodeToString(packet),
+	})
+}
+
 func (agent *DeviceAgent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error {
 	// If deviceType=="" then we must have taken ownership of this device.
 	// Fixes VOL-2226 where a core would take ownership and have stale data
@@ -1057,20 +1261,19 @@
 		agent.reconcileWithKVStore(ctx)
 	}
 	//	Send packet to adapter
-	if err := agent.adapterProxy.packetOut(ctx, agent.deviceType, agent.deviceID, outPort, packet); err != nil {
-		log.Debugw("packet-out-error", log.Fields{
-			"id":     agent.deviceID,
-			"error":  err,
-			"packet": hex.EncodeToString(packet.Data),
-		})
-		return err
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.packetOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
+	if err != nil {
+		cancel()
+		return nil
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "packetOut", ch, agent.onSuccess, agent.onPacketFailure, packet.Data)
 	return nil
 }
 
-// processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
+// processUpdate is a respCallback invoked whenever there is a change on the device manages by this device agent
 func (agent *DeviceAgent) processUpdate(ctx context.Context, args ...interface{}) interface{} {
-	//// Run this callback in its own go routine
+	//// Run this respCallback in its own go routine
 	go func(args ...interface{}) interface{} {
 		var previous *voltha.Device
 		var current *voltha.Device
@@ -1088,10 +1291,10 @@
 			log.Errorw("too-many-args-in-callback", log.Fields{"len": len(args)})
 			return nil
 		}
-		// Perform the state transition in it's own go routine (since the caller doesn't wait for this, use a background context)
+		// Perform the state transition in it's own go routine
 		if err := agent.deviceMgr.processTransition(context.Background(), previous, current); err != nil {
-			log.Errorw("failed-process-transition", log.Fields{"deviceId": previous.Id,
-				"previousAdminState": previous.AdminState, "currentAdminState": current.AdminState})
+			log.Errorw("failed-process-transition", log.Fields{"device-id": previous.Id,
+				"previous-admin-state": previous.AdminState, "current-admin-state": current.AdminState})
 		}
 		return nil
 	}(args...)
@@ -1112,13 +1315,16 @@
 	cloned.Reason = device.Reason
 	return cloned, nil
 }
+
 func (agent *DeviceAgent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("updateDeviceUsingAdapterData", log.Fields{"deviceId": device.Id})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
+
 	updatedDevice, err := agent.mergeDeviceInfoFromAdapter(device)
 	if err != nil {
-		log.Errorw("failed to update device ", log.Fields{"deviceId": device.Id})
 		return status.Errorf(codes.Internal, "%s", err.Error())
 	}
 	cloned := proto.Clone(updatedDevice).(*voltha.Device)
@@ -1127,13 +1333,16 @@
 
 func (agent *DeviceAgent) updateDeviceWithoutLock(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
-	cloned := proto.Clone(device).(*voltha.Device)
+	//cloned := proto.Clone(device).(*voltha.Device)
+	cloned := device
 	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
 func (agent *DeviceAgent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	cloned := agent.getDeviceWithoutLock()
 
@@ -1153,8 +1362,10 @@
 
 func (agent *DeviceAgent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
 	log.Debugw("updatePortsOperState", log.Fields{"device-id": agent.deviceID})
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	cloned := agent.getDeviceWithoutLock()
 	for _, port := range cloned.Ports {
 		port.OperStatus = operStatus
@@ -1164,8 +1375,10 @@
 }
 
 func (agent *DeviceAgent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	// Work only on latest data
 	// TODO: Get list of ports from device directly instead of the entire device
 	cloned := agent.getDeviceWithoutLock()
@@ -1186,8 +1399,10 @@
 
 func (agent *DeviceAgent) deleteAllPorts(ctx context.Context) error {
 	log.Debugw("deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	cloned := agent.getDeviceWithoutLock()
 
@@ -1208,9 +1423,11 @@
 }
 
 func (agent *DeviceAgent) addPort(ctx context.Context, port *voltha.Port) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("addPort", log.Fields{"deviceId": agent.deviceID, "port": port})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("addPort", log.Fields{"deviceId": agent.deviceID})
 
 	cloned := agent.getDeviceWithoutLock()
 	updatePort := false
@@ -1245,8 +1462,10 @@
 }
 
 func (agent *DeviceAgent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	log.Debugw("adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
 
 	cloned := agent.getDeviceWithoutLock()
@@ -1279,30 +1498,13 @@
 	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *DeviceAgent) deletePeerPorts(ctx context.Context, deviceID string) error {
-	log.Debug("deletePeerPorts")
-
-	cloned := agent.getDeviceWithoutLock()
-
-	var updatedPeers []*voltha.Port_PeerPort
-	for _, port := range cloned.Ports {
-		updatedPeers = make([]*voltha.Port_PeerPort, 0)
-		for _, peerPort := range port.Peers {
-			if peerPort.DeviceId != deviceID {
-				updatedPeers = append(updatedPeers, peerPort)
-			}
-		}
-		port.Peers = updatedPeers
-	}
-
-	// Store the device with updated peer ports
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
 // TODO: A generic device update by attribute
 func (agent *DeviceAgent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		log.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
+		return
+	}
+	defer agent.requestQueue.RequestComplete()
 	if value == nil {
 		return
 	}
@@ -1336,17 +1538,21 @@
 }
 
 func (agent *DeviceAgent) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	log.Debugw("simulateAlarm", log.Fields{"id": agent.deviceID})
 
 	cloned := agent.getDeviceWithoutLock()
 
-	// First send the request to an Adapter and wait for a response
-	if err := agent.adapterProxy.SimulateAlarm(ctx, cloned, simulatereq); err != nil {
-		log.Debugw("simulateAlarm-error", log.Fields{"id": agent.deviceID, "error": err})
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.simulateAlarm(subCtx, cloned, simulatereq)
+	if err != nil {
+		cancel()
 		return err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "simulateAlarm", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
 
@@ -1369,8 +1575,10 @@
 }
 
 func (agent *DeviceAgent) updateDeviceReason(ctx context.Context, reason string) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	cloned := agent.getDeviceWithoutLock()
 	cloned.Reason = reason
@@ -1380,10 +1588,12 @@
 }
 
 func (agent *DeviceAgent) disablePort(ctx context.Context, Port *voltha.Port) error {
-	var cp *voltha.Port
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	log.Debugw("disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+	var cp *voltha.Port
 	// Get the most up to date the device info
 	device := agent.getDeviceWithoutLock()
 	for _, port := range device.Ports {
@@ -1406,19 +1616,26 @@
 		log.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
 		return err
 	}
+
 	//send request to adapter
-	if err := agent.adapterProxy.disablePort(ctx, device, cp); err != nil {
-		log.Debugw("DisablePort-error", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.disablePort(ctx, device, cp)
+	if err != nil {
+		cancel()
 		return err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "disablePort", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
 
 func (agent *DeviceAgent) enablePort(ctx context.Context, Port *voltha.Port) error {
-	var cp *voltha.Port
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	log.Debugw("enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+
+	var cp *voltha.Port
 	// Get the most up to date the device info
 	device := agent.getDeviceWithoutLock()
 	for _, port := range device.Ports {
@@ -1442,28 +1659,47 @@
 		return err
 	}
 	//send request to adapter
-	if err := agent.adapterProxy.enablePort(ctx, device, cp); err != nil {
-		log.Debugw("EnablePort-error", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.enablePort(ctx, device, cp)
+	if err != nil {
+		cancel()
 		return err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "enablePort", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
 
 func (agent *DeviceAgent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("ChildDeviceLost", log.Fields{"id": device.Id})
+	log.Debugw("childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-ud": agent.deviceID})
 
 	//Remove the associated peer ports on the parent device
-	if err := agent.deviceMgr.deletePeerPorts(ctx, device.ParentId, device.Id); err != nil {
-		// At this stage, the parent device may also have been deleted.  Just log and keep processing.
-		log.Warnw("failure-deleting-peer-port", log.Fields{"error": err, "child-device-id": device.Id, "parent-device-id": device.ParentId})
+	parentDevice := agent.getDeviceWithoutLock()
+	var updatedPeers []*voltha.Port_PeerPort
+	for _, port := range parentDevice.Ports {
+		updatedPeers = make([]*voltha.Port_PeerPort, 0)
+		for _, peerPort := range port.Peers {
+			if peerPort.DeviceId != device.Id {
+				updatedPeers = append(updatedPeers, peerPort)
+			}
+		}
+		port.Peers = updatedPeers
+	}
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, parentDevice, false, ""); err != nil {
+		return err
 	}
 
-	if err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId); err != nil {
-		log.Warnw("ChildDeviceLost-error", log.Fields{"error": err})
+	//send request to adapter
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.childDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId)
+	if err != nil {
+		cancel()
+		return err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "childDeviceLost", ch, agent.onSuccess, agent.onFailure)
 	return nil
-
 }