[VOL-2164] Update rw-core to use the Async Kafka API

This commit consists of the following:

1. Process per-device requests in the Core in the order they are
received. If there are lots of requests on a given device then
there will be some latencies introduced due to ordering.  With
recent changes in the model along with keeping the request lock
to a minimal then these latencies are reduced.  Testing did not
show and noticeable latencies.

2) Keep the request lock from the moment a request started
processing to the moment that request is sent to kafka (when
applicable).  Adapter responses are received and processed
asynchronously. Therefore, an adapter can takes all the time it
needs to process a transaction.  The Core still has a context
with timeout (configurable) to cater for cases where the adapter
does not return a response.

3) Adapter requests are processed to completion before sending a
reponse back to the adapter.  Previously, in some cases, a
separate go routine was created to process the request and a
successful response is sent to the adapter.  Now if the request
fails then the adapter will receive an error. The adapter
requests for a given device are therefore processed in the
order they are received.

4) Some changes are made when retrieving a handler to execute
a device state transition.  This was necessary as there was some
transition overlap found.

Update after multiple reviews.

Change-Id: I55a189efec1549a662f2d71e18e6eca9015a3a17
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 0c2c3ce..2ec9d18 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -51,9 +51,9 @@
 	defaultRWCoreCA                  = "pki/voltha-CA.pem"
 	defaultAffinityRouterTopic       = "affinityRouter"
 	defaultInCompetingMode           = true
-	defaultLongRunningRequestTimeout = int64(2000)
-	defaultDefaultRequestTimeout     = int64(500)
-	defaultCoreTimeout               = int64(500)
+	defaultLongRunningRequestTimeout = 2000 * time.Millisecond
+	defaultDefaultRequestTimeout     = 1000 * time.Millisecond
+	defaultCoreTimeout               = 1000 * time.Millisecond
 	defaultCoreBindingKey            = "voltha_backend_name"
 	defaultCorePairTopic             = "rwcore_1"
 	defaultMaxConnectionRetries      = -1 // retries forever
@@ -89,9 +89,9 @@
 	RWCoreCA                  string
 	AffinityRouterTopic       string
 	InCompetingMode           bool
-	LongRunningRequestTimeout int64
-	DefaultRequestTimeout     int64
-	DefaultCoreTimeout        int64
+	LongRunningRequestTimeout time.Duration
+	DefaultRequestTimeout     time.Duration
+	DefaultCoreTimeout        time.Duration
 	CoreBindingKey            string
 	CorePairTopic             string
 	MaxConnectionRetries      int
@@ -204,13 +204,18 @@
 	flag.StringVar(&(cf.LogLevel), "log_level", defaultLogLevel, help)
 
 	help = fmt.Sprintf("Timeout for long running request")
-	flag.Int64Var(&(cf.LongRunningRequestTimeout), "timeout_long_request", defaultLongRunningRequestTimeout, help)
+	// TODO:  Change this code once all the params and helm charts have been changed to use the different type
+	var temp int64
+	flag.Int64Var(&temp, "timeout_long_request", defaultLongRunningRequestTimeout.Milliseconds(), help)
+	cf.LongRunningRequestTimeout = time.Duration(temp) * time.Millisecond
 
 	help = fmt.Sprintf("Default timeout for regular request")
-	flag.Int64Var(&(cf.DefaultRequestTimeout), "timeout_request", defaultDefaultRequestTimeout, help)
+	flag.Int64Var(&temp, "timeout_request", defaultDefaultRequestTimeout.Milliseconds(), help)
+	cf.DefaultRequestTimeout = time.Duration(temp) * time.Millisecond
 
 	help = fmt.Sprintf("Default Core timeout")
-	flag.Int64Var(&(cf.DefaultCoreTimeout), "core_timeout", defaultCoreTimeout, help)
+	flag.Int64Var(&temp, "core_timeout", defaultCoreTimeout.Milliseconds(), help)
+	cf.DefaultCoreTimeout = time.Duration(temp) * time.Millisecond
 
 	help = fmt.Sprintf("Show startup banner log lines")
 	flag.BoolVar(&cf.Banner, "banner", defaultBanner, help)
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index f6e9867..e3c362e 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -18,21 +18,15 @@
 
 import (
 	"context"
-
-	"github.com/golang/protobuf/ptypes"
-	a "github.com/golang/protobuf/ptypes/any"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	"github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
 )
 
 // AdapterProxy represents adapter proxy attributes
 type AdapterProxy struct {
-	TestMode              bool
 	deviceTopicRegistered bool
 	corePairTopic         string
 	kafkaICProxy          kafka.InterContainerProxy
@@ -47,21 +41,6 @@
 	}
 }
 
-func unPackResponse(rpc string, deviceID string, success bool, response *a.Any) error {
-	if success {
-		return nil
-	}
-	unpackResult := &ic.Error{}
-	var err error
-	if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
-		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-		return err
-	}
-	log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceID, "success": success, "error": err})
-	// TODO:  Need to get the real error code
-	return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
-}
-
 func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
 	return kafka.Topic{Name: ap.corePairTopic}
 }
@@ -70,575 +49,296 @@
 	return kafka.Topic{Name: adapterName}
 }
 
-// AdoptDevice invokes adopt device rpc
-func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
-	log.Debugw("AdoptDevice", log.Fields{"device": device})
+func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
+	waitForResponse bool, deviceID string, kvArgs ...*kafka.KVArg) (chan *kafka.RpcResponse, error) {
+
+	// Sent the request to kafka
+	respChnl := ap.kafkaICProxy.InvokeAsyncRPC(ctx, rpc, toTopic, replyToTopic, waitForResponse, deviceID, kvArgs...)
+
+	// Wait for first response which would indicate whether the request was successfully sent to kafka.
+	firstResponse, ok := <-respChnl
+	if !ok || firstResponse.MType != kafka.RpcSent {
+		log.Errorw("failure to request to kafka", log.Fields{"rpc": rpc, "device-id": deviceID, "error": firstResponse.Err})
+		return nil, firstResponse.Err
+	}
+	// return the kafka channel for the caller to wait for the response of the RPC call
+	return respChnl, nil
+}
+
+// adoptDevice invokes adopt device rpc
+func (ap *AdapterProxy) adoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("adoptDevice", log.Fields{"device-id": device.Id})
 	rpc := "adopt_device"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	//topic := kafka.Topic{Name: device.Adapter}
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
 	}
-	// Use a device topic for the response as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
 	ap.deviceTopicRegistered = true
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// DisableDevice invokes disable device rpc
-func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
-	log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
+// disableDevice invokes disable device rpc
+func (ap *AdapterProxy) disableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("disableDevice", log.Fields{"device-id": device.Id})
 	rpc := "disable_device"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	//toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
 	}
-	// Use a device specific topic as we are the only core handling requests for this device
-	//replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// ReEnableDevice invokes reenable device rpc
-func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
-	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
+// reEnableDevice invokes reenable device rpc
+func (ap *AdapterProxy) reEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("reEnableDevice", log.Fields{"device-id": device.Id})
 	rpc := "reenable_device"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
 	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// RebootDevice invokes reboot device rpc
-func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
-	log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
+// rebootDevice invokes reboot device rpc
+func (ap *AdapterProxy) rebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("rebootDevice", log.Fields{"device-id": device.Id})
 	rpc := "reboot_device"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
 	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// DeleteDevice invokes delete device rpc
-func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
-	log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
+// deleteDevice invokes delete device rpc
+func (ap *AdapterProxy) deleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("deleteDevice", log.Fields{"device-id": device.Id})
 	rpc := "delete_device"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
 	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
-
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// GetOfpDeviceInfo invokes get ofp device info rpc
-func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
-	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
+// getOfpDeviceInfo invokes get ofp device info rpc
+func (ap *AdapterProxy) getOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("getOfpDeviceInfo", log.Fields{"device-id": device.Id})
+	rpc := "get_ofp_device_info"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
 	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
-	if success {
-		unpackResult := &ic.SwitchCapability{}
-		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
-		}
-		return unpackResult, nil
-	}
-	unpackResult := &ic.Error{}
-	var err error
-	if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-	}
-	log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
-	// TODO:  Need to get the real error code
-	return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// GetOfpPortInfo invokes get ofp port info rpc
-func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
-	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
+// getOfpPortInfo invokes get ofp port info rpc
+func (ap *AdapterProxy) getOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
+	log.Debugw("getOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "port_no", Value: &ic.IntType{Val: int64(portNo)}},
 	}
-	pNo := &ic.IntType{Val: int64(portNo)}
-	args[1] = &kafka.KVArg{
-		Key:   "port_no",
-		Value: pNo,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
-	if success {
-		unpackResult := &ic.PortCapability{}
-		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
-		}
-		return unpackResult, nil
-	}
-	unpackResult := &ic.Error{}
-	var err error
-	if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-	}
-	log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
-	// TODO:  Need to get the real error code
-	return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+	return ap.sendRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-//TODO: Implement the functions below
-
-// AdapterDescriptor - TODO
-func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
-	log.Debug("AdapterDescriptor")
-	return nil, nil
-}
-
-// DeviceTypes - TODO
-func (ap *AdapterProxy) DeviceTypes() (*voltha.DeviceType, error) {
-	log.Debug("DeviceTypes")
-	return nil, nil
-}
-
-// Health - TODO
-func (ap *AdapterProxy) Health() (*voltha.HealthStatus, error) {
-	log.Debug("Health")
-	return nil, nil
-}
-
-// ReconcileDevice invokes reconcile device rpc
-func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) error {
-	log.Debugw("ReconcileDevice", log.Fields{"deviceId": device.Id})
+// reconcileDevice invokes reconcile device rpc
+func (ap *AdapterProxy) reconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("reconcileDevice", log.Fields{"device-id": device.Id})
 	rpc := "reconcile_device"
 	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("ReconcileDevice-response", log.Fields{"deviceid": device.Id, "success": success})
-
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// AbandonDevice - TODO
-func (ap *AdapterProxy) AbandonDevice(device voltha.Device) error {
-	log.Debug("AbandonDevice")
-	return nil
-}
-
-// GetDeviceDetails - TODO
-func (ap *AdapterProxy) GetDeviceDetails(device voltha.Device) (*voltha.Device, error) {
-	log.Debug("GetDeviceDetails")
-	return nil, nil
-}
-
-// DownloadImage invokes download image rpc
-func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
-	log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
+// downloadImage invokes download image rpc
+func (ap *AdapterProxy) downloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+	log.Debugw("downloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "download_image"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "request", Value: download},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "request",
-		Value: download,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
-
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// GetImageDownloadStatus invokes get image download status rpc
-func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
-	log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
+// getImageDownloadStatus invokes get image download status rpc
+func (ap *AdapterProxy) getImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+	log.Debugw("getImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "get_image_download_status"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "request", Value: download},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "request",
-		Value: download,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
-
-	if success {
-		unpackResult := &voltha.ImageDownload{}
-		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
-		}
-		return unpackResult, nil
-	}
-	unpackResult := &ic.Error{}
-	var err error
-	if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-		return nil, err
-	}
-	log.Debugw("GetImageDownloadStatus-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
-	return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// CancelImageDownload invokes cancel image download rpc
-func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
-	log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
+// cancelImageDownload invokes cancel image download rpc
+func (ap *AdapterProxy) cancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+	log.Debugw("cancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "cancel_image_download"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "request", Value: download},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "request",
-		Value: download,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
-
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// ActivateImageUpdate invokes activate image update rpc
-func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
-	log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
+// activateImageUpdate invokes activate image update rpc
+func (ap *AdapterProxy) activateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+	log.Debugw("activateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "activate_image_update"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "request", Value: download},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "request",
-		Value: download,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
-
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// RevertImageUpdate invokes revert image update rpc
-func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
-	log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
+// revertImageUpdate invokes revert image update rpc
+func (ap *AdapterProxy) revertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+	log.Debugw("revertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "revert_image_update"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "request", Value: download},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "request",
-		Value: download,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
-
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// SelfTestDevice - TODO
-func (ap *AdapterProxy) SelfTestDevice(device voltha.Device) (*voltha.SelfTestResponse, error) {
-	log.Debug("SelfTestDevice")
-	return nil, nil
-}
-
-func (ap *AdapterProxy) packetOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
-	log.Debugw("packetOut", log.Fields{"deviceId": deviceID})
+func (ap *AdapterProxy) packetOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
+	log.Debugw("packetOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
 	toTopic := ap.getAdapterTopic(deviceType)
 	rpc := "receive_packet_out"
-	dID := &ic.StrType{Val: deviceID}
-	args := make([]*kafka.KVArg, 3)
-	args[0] = &kafka.KVArg{
-		Key:   "deviceId",
-		Value: dID,
+	args := []*kafka.KVArg{
+		{Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
+		{Key: "outPort", Value: &ic.IntType{Val: int64(outPort)}},
+		{Key: "packet", Value: packet},
 	}
-	op := &ic.IntType{Val: int64(outPort)}
-	args[1] = &kafka.KVArg{
-		Key:   "outPort",
-		Value: op,
-	}
-	args[2] = &kafka.KVArg{
-		Key:   "packet",
-		Value: packet,
-	}
-
-	// TODO:  Do we need to wait for an ACK on a packet Out?
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, deviceID, args...)
-	log.Debugw("packetOut", log.Fields{"deviceid": deviceID, "success": success})
-	return unPackResponse(rpc, deviceID, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, deviceID, args...)
 }
 
-// UpdateFlowsBulk invokes update flows bulk rpc
-func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error {
-	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flowsInUpdate": len(flows.Items), "groupsToUpdate": len(groups.Items)})
+// updateFlowsBulk invokes update flows bulk rpc
+func (ap *AdapterProxy) updateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
+	log.Debugw("updateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
 	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "update_flows_bulk"
-	args := make([]*kafka.KVArg, 4)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "flows", Value: flows},
+		{Key: "groups", Value: groups},
+		{Key: "flow_metadata", Value: flowMetadata},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "flows",
-		Value: flows,
-	}
-	args[2] = &kafka.KVArg{
-		Key:   "groups",
-		Value: groups,
-	}
-	args[3] = &kafka.KVArg{
-		Key:   "flow_metadata",
-		Value: flowMetadata,
-	}
-
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// UpdateFlowsIncremental invokes update flows incremental rpc
-func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
-	log.Debugw("UpdateFlowsIncremental",
+// updateFlowsIncremental invokes update flows incremental rpc
+func (ap *AdapterProxy) updateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
+	log.Debugw("updateFlowsIncremental",
 		log.Fields{
-			"deviceId":       device.Id,
-			"flowsToAdd":     len(flowChanges.ToAdd.Items),
-			"flowsToDelete":  len(flowChanges.ToRemove.Items),
-			"groupsToAdd":    len(groupChanges.ToAdd.Items),
-			"groupsToDelete": len(groupChanges.ToRemove.Items),
-			"groupsToUpdate": len(groupChanges.ToUpdate.Items),
+			"device-id":             device.Id,
+			"flow-to-add-count":     len(flowChanges.ToAdd.Items),
+			"flow-to-delete-count":  len(flowChanges.ToRemove.Items),
+			"group-to-add-count":    len(groupChanges.ToAdd.Items),
+			"group-to-delete-count": len(groupChanges.ToRemove.Items),
+			"group-to-update-count": len(groupChanges.ToUpdate.Items),
 		})
 	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "update_flows_incrementally"
-	args := make([]*kafka.KVArg, 4)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "flow_changes", Value: flowChanges},
+		{Key: "group_changes", Value: groupChanges},
+		{Key: "flow_metadata", Value: flowMetadata},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "flow_changes",
-		Value: flowChanges,
-	}
-	args[2] = &kafka.KVArg{
-		Key:   "group_changes",
-		Value: groupChanges,
-	}
-
-	args[3] = &kafka.KVArg{
-		Key:   "flow_metadata",
-		Value: flowMetadata,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// UpdatePmConfigs invokes update pm configs rpc
-func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
-	log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id})
+// updatePmConfigs invokes update pm configs rpc
+func (ap *AdapterProxy) updatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
+	log.Debugw("updatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
 	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "Update_pm_config"
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "pm_configs", Value: pmConfigs},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "pm_configs",
-		Value: pmConfigs,
-	}
-
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("UpdatePmConfigs-response", log.Fields{"deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// ReceivePacketOut - TODO
-func (ap *AdapterProxy) ReceivePacketOut(deviceID voltha.ID, egressPortNo int, msg interface{}) error {
-	log.Debug("ReceivePacketOut")
-	return nil
-}
-
-func (ap *AdapterProxy) SuppressEvent(filter *voltha.EventFilter) error {
-	log.Debug("SuppressEvent")
-	return nil
-}
-
-func (ap *AdapterProxy) UnSuppressEvent(filter *voltha.EventFilter) error {
-	log.Debug("UnSuppressEvent")
-	return nil
-}
-
-// SimulateAlarm invokes simulate alarm rpc
-func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulatereq *voltha.SimulateAlarmRequest) error {
-	log.Debugw("SimulateAlarm", log.Fields{"id": simulatereq.Id})
+// simulateAlarm invokes simulate alarm rpc
+func (ap *AdapterProxy) simulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
+	log.Debugw("simulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
 	rpc := "simulate_alarm"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "request", Value: simulateReq},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "request",
-		Value: simulatereq,
-	}
-
-	// Use a device topic for the response as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
 	ap.deviceTopicRegistered = true
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("SimulateAlarm-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
 	log.Debugw("disablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
 	rpc := "disable_port"
-	deviceID := &ic.StrType{Val: device.Id}
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "deviceId",
-		Value: deviceID,
+	args := []*kafka.KVArg{
+		{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
+		{Key: "port", Value: port},
 	}
-
-	args[1] = &kafka.KVArg{
-		Key:   "port",
-		Value: port,
-	}
-
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("disablePort-response", log.Fields{"device-id": device.Id, "port-no": port.PortNo, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
 	log.Debugw("enablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
 	rpc := "enable_port"
-	deviceID := &ic.StrType{Val: device.Id}
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "deviceId",
-		Value: deviceID,
+	args := []*kafka.KVArg{
+		{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
+		{Key: "port", Value: port},
 	}
-
-	args[1] = &kafka.KVArg{
-		Key:   "port",
-		Value: port,
-	}
-
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("enablePort-response", log.Fields{"device-id": device.Id, "port-no": port.PortNo, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// ChildDeviceLost invokes Child device_lost rpc
-func (ap *AdapterProxy) ChildDeviceLost(ctx context.Context, deviceType string, pDeviceID string, pPortNo uint32, onuID uint32) error {
-	log.Debugw("ChildDeviceLost", log.Fields{"pDeviceId": pDeviceID, "pPortNo": pPortNo, "onuID": onuID})
+// childDeviceLost invokes child device_lost rpc
+func (ap *AdapterProxy) childDeviceLost(ctx context.Context, deviceType string, pDeviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
+	log.Debugw("childDeviceLost", log.Fields{"parent-device-id": pDeviceID, "parent-port-no": pPortNo, "onu-id": onuID})
 	rpc := "child_device_lost"
 	toTopic := ap.getAdapterTopic(deviceType)
-	dID := &ic.StrType{Val: pDeviceID}
-	PortNo := &ic.IntType{Val: int64(pPortNo)}
-	oID := &ic.IntType{Val: int64(onuID)}
 	args := []*kafka.KVArg{
-		{
-			Key:   "pDeviceId",
-			Value: dID,
-		},
-		{
-			Key:   "pPortNo",
-			Value: PortNo,
-		},
-		{
-			Key:   "onuID",
-			Value: oID,
-		}}
-
-	// Use a device specific topic as we are the only core handling requests for this device
+		{Key: "pDeviceId", Value: &ic.StrType{Val: pDeviceID}},
+		{Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
+		{Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
+	}
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, pDeviceID, args...)
-	log.Debugw("ChildDeviceLost-response", log.Fields{"pDeviceId": pDeviceID, "success": success})
-
-	return unPackResponse(rpc, pDeviceID, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, pDeviceID, args...)
 }
diff --git a/rw_core/core/adapter_proxy_test.go b/rw_core/core/adapter_proxy_test.go
index 3989142..36e5e51 100755
--- a/rw_core/core/adapter_proxy_test.go
+++ b/rw_core/core/adapter_proxy_test.go
@@ -18,19 +18,22 @@
 import (
 	"context"
 	"crypto/rand"
-	"testing"
-	"time"
-
+	"github.com/golang/protobuf/ptypes"
+	any2 "github.com/golang/protobuf/ptypes/any"
 	cm "github.com/opencord/voltha-go/rw_core/mocks"
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	of "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/stretchr/testify/assert"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"strings"
+	"testing"
+	"time"
 )
 
 const (
@@ -52,7 +55,7 @@
 		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
 	}
 	// Set the log level to Warning
-	log.SetAllLogLevel(2)
+	log.SetAllLogLevel(log.WarnLevel)
 
 	var err error
 
@@ -63,6 +66,7 @@
 	coreKafkaICProxy = kafka.NewInterContainerProxy(
 		kafka.MsgClient(kc),
 		kafka.DefaultTopic(&kafka.Topic{Name: coreName}))
+
 	if err = coreKafkaICProxy.Start(); err != nil {
 		log.Fatalw("Failure-starting-core-kafka-intercontainerProxy", log.Fields{"error": err})
 	}
@@ -78,6 +82,7 @@
 		kafka.MsgClient(kc),
 		kafka.DefaultTopic(&kafka.Topic{Name: adapterName}),
 		kafka.RequestHandlerInterface(adapterReqHandler))
+
 	if err = adapterKafkaICProxy.Start(); err != nil {
 		log.Fatalw("Failure-starting-adapter-kafka-intercontainerProxy", log.Fields{"error": err})
 	}
@@ -97,42 +102,62 @@
 	assert.NotNil(t, ap)
 }
 
+func waitForResponse(ctx context.Context, ch chan *kafka.RpcResponse) (*any2.Any, error) {
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			return nil, status.Error(codes.Aborted, "channel-closed")
+		} else if rpcResponse.Err != nil {
+			return nil, rpcResponse.Err
+		} else {
+			return rpcResponse.Reply, nil
+		}
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	}
+}
+
 func testSimpleRequests(t *testing.T) {
-	type simpleRequest func(context.Context, *voltha.Device) error
+	type simpleRequest func(context.Context, *voltha.Device) (chan *kafka.RpcResponse, error)
 	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
 	simpleRequests := []simpleRequest{
-		ap.AdoptDevice,
-		ap.DisableDevice,
-		ap.RebootDevice,
-		ap.DeleteDevice,
-		ap.ReconcileDevice,
-		ap.ReEnableDevice,
+		ap.adoptDevice,
+		ap.disableDevice,
+		ap.rebootDevice,
+		ap.deleteDevice,
+		ap.reconcileDevice,
+		ap.reEnableDevice,
 	}
 	for _, f := range simpleRequests {
-		//Success
+		// Success
 		d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 		ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
-		err := f(ctx, d)
-		cancel()
+		rpcResponse, err := f(ctx, d)
 		assert.Nil(t, err)
+		_, err = waitForResponse(ctx, rpcResponse)
+		assert.Nil(t, err)
+		cancel()
 
 		//	Failure - invalid adapter
-		expectedError := status.Error(codes.Canceled, "context deadline exceeded")
+		expectedError := "context deadline exceeded"
 		d = &voltha.Device{Id: "deviceId", Adapter: "adapter_mock_1"}
 		ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond)
-		err = f(ctx, d)
+		rpcResponse, err = f(ctx, d)
+		assert.Nil(t, err)
+		_, err = waitForResponse(ctx, rpcResponse)
 		cancel()
 		assert.NotNil(t, err)
-		assert.Equal(t, expectedError.Error(), err.Error())
+		assert.True(t, strings.Contains(err.Error(), expectedError))
 
-		// Failure - short timeout
-		expectedError = status.Error(codes.Canceled, "context deadline exceeded")
+		// Failure -  timeout
 		d = &voltha.Device{Id: "deviceId", Adapter: adapterName}
 		ctx, cancel = context.WithTimeout(context.Background(), 100*time.Nanosecond)
-		err = f(ctx, d)
+		rpcResponse, err = f(ctx, d)
+		assert.Nil(t, err)
+		_, err = waitForResponse(ctx, rpcResponse)
 		cancel()
 		assert.NotNil(t, err)
-		assert.Equal(t, expectedError.Error(), err.Error())
+		assert.True(t, strings.Contains(err.Error(), expectedError))
 	}
 }
 
@@ -140,8 +165,13 @@
 	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
-	switchCap, err := ap.GetOfpDeviceInfo(ctx, d)
-	cancel()
+	defer cancel()
+	rpcResponse, err := ap.getOfpDeviceInfo(ctx, d)
+	assert.Nil(t, err)
+	response, err := waitForResponse(ctx, rpcResponse)
+	assert.Nil(t, err)
+	switchCap := &ic.SwitchCapability{}
+	err = ptypes.UnmarshalAny(response, switchCap)
 	assert.Nil(t, err)
 	assert.NotNil(t, switchCap)
 	expectedCap, _ := adapter.Get_ofp_device_info(d)
@@ -152,13 +182,18 @@
 	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+	defer cancel()
 	portNo := uint32(1)
-	portInfo, err := ap.GetOfpPortInfo(ctx, d, portNo)
-	cancel()
+	rpcResponse, err := ap.getOfpPortInfo(ctx, d, portNo)
 	assert.Nil(t, err)
-	assert.NotNil(t, portInfo)
+	response, err := waitForResponse(ctx, rpcResponse)
+	assert.Nil(t, err)
+	portCap := &ic.PortCapability{}
+	err = ptypes.UnmarshalAny(response, portCap)
+	assert.Nil(t, err)
+	assert.NotNil(t, portCap)
 	expectedPortInfo, _ := adapter.Get_ofp_port_info(d, int64(portNo))
-	assert.Equal(t, portInfo.String(), expectedPortInfo.String())
+	assert.Equal(t, portCap.String(), expectedPortInfo.String())
 }
 
 func testPacketOut(t *testing.T) {
@@ -167,18 +202,26 @@
 	outPort := uint32(1)
 	packet, err := getRandomBytes(50)
 	assert.Nil(t, err)
-	err = ap.packetOut(context.Background(), adapterName, d.Id, outPort, &of.OfpPacketOut{Data: packet})
+	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+	defer cancel()
+	rpcResponse, err := ap.packetOut(ctx, adapterName, d.Id, outPort, &of.OfpPacketOut{Data: packet})
+	assert.Nil(t, err)
+	_, err = waitForResponse(ctx, rpcResponse)
 	assert.Nil(t, err)
 }
 
 func testFlowUpdates(t *testing.T) {
 	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
-	err := ap.UpdateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
+	_, err := ap.updateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
 	assert.Nil(t, err)
 	flowChanges := &voltha.FlowChanges{ToAdd: &voltha.Flows{Items: nil}, ToRemove: &voltha.Flows{Items: nil}}
 	groupChanges := &voltha.FlowGroupChanges{ToAdd: &voltha.FlowGroups{Items: nil}, ToRemove: &voltha.FlowGroups{Items: nil}, ToUpdate: &voltha.FlowGroups{Items: nil}}
-	err = ap.UpdateFlowsIncremental(context.Background(), d, flowChanges, groupChanges, &voltha.FlowMetadata{})
+	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+	defer cancel()
+	rpcResponse, err := ap.updateFlowsIncremental(ctx, d, flowChanges, groupChanges, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	_, err = waitForResponse(ctx, rpcResponse)
 	assert.Nil(t, err)
 }
 
@@ -186,8 +229,10 @@
 	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
-	err := ap.UpdatePmConfigs(ctx, d, &voltha.PmConfigs{})
-	cancel()
+	defer cancel()
+	rpcResponse, err := ap.updatePmConfigs(ctx, d, &voltha.PmConfigs{})
+	assert.Nil(t, err)
+	_, err = waitForResponse(ctx, rpcResponse)
 	assert.Nil(t, err)
 }
 
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index fd07c7e..85af539 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -19,7 +19,6 @@
 import (
 	"context"
 	"errors"
-
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/db/model"
@@ -28,29 +27,27 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
+	"time"
 )
 
 // AdapterRequestHandlerProxy represent adapter request handler proxy attributes
 type AdapterRequestHandlerProxy struct {
-	TestMode                  bool
 	coreInstanceID            string
 	deviceMgr                 *DeviceManager
 	lDeviceMgr                *LogicalDeviceManager
 	adapterMgr                *AdapterManager
 	localDataProxy            *model.Proxy
 	clusterDataProxy          *model.Proxy
-	defaultRequestTimeout     int64
-	longRunningRequestTimeout int64
+	defaultRequestTimeout     time.Duration
+	longRunningRequestTimeout time.Duration
 	coreInCompetingMode       bool
 	core                      *Core
 }
 
 // NewAdapterRequestHandlerProxy assigns values for adapter request handler proxy attributes and returns the new instance
 func NewAdapterRequestHandlerProxy(core *Core, coreInstanceID string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
-	aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout int64,
-	defaultRequestTimeout int64) *AdapterRequestHandlerProxy {
+	aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout time.Duration,
+	defaultRequestTimeout time.Duration) *AdapterRequestHandlerProxy {
 	var proxy AdapterRequestHandlerProxy
 	proxy.core = core
 	proxy.coreInstanceID = coreInstanceID
@@ -67,7 +64,7 @@
 
 // This is a helper function that attempts to acquire the request by using the device ownership model
 func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(ctx context.Context, transactionID string, devID string, maxTimeout ...int64) (*KVTransaction, error) {
-	timeout := rhp.defaultRequestTimeout
+	timeout := rhp.defaultRequestTimeout.Milliseconds()
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
 	}
@@ -131,7 +128,7 @@
 			}
 		}
 	}
-	log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionID": transactionID.Val, "coreID": rhp.coreInstanceID})
+	log.Debugw("Register", log.Fields{"adapter": *adapter, "device-types": deviceTypes, "transaction-id": transactionID.Val, "core-id": rhp.coreInstanceID})
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
@@ -146,10 +143,6 @@
 		}
 		defer txn.Close(context.TODO())
 	}
-
-	if rhp.TestMode { // Execute only for test cases
-		return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
-	}
 	return rhp.adapterMgr.registerAdapter(adapter, deviceTypes)
 }
 
@@ -189,17 +182,12 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return &voltha.Device{Id: pID.Id}, nil
-	}
-
 	// Get the device via the device manager
 	device, err := rhp.deviceMgr.GetDevice(context.TODO(), pID.Id)
 	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+		log.Debugw("get-device-failed", log.Fields{"deviceID": pID.Id, "error": err})
 	}
-	log.Debugw("GetDevice-response", log.Fields{"deviceID": pID.Id})
-	return device, nil
+	return device, err
 }
 
 // DeviceUpdate updates device using adapter data
@@ -239,16 +227,12 @@
 	}
 
 	log.Debugw("DeviceUpdate got txn", log.Fields{"deviceID": device.Id, "transactionID": transactionID.Val})
-	if rhp.TestMode { // Execute only for test cases
-		return new(empty.Empty), nil
+
+	if err := rhp.deviceMgr.updateDeviceUsingAdapterData(context.TODO(), device); err != nil {
+		log.Debugw("unable-to-update-device-using-adapter-data", log.Fields{"error": err})
+		return nil, err
 	}
-	go func() {
-		err := rhp.deviceMgr.updateDeviceUsingAdapterData(context.TODO(), device)
-		if err != nil {
-			log.Errorw("unable-to-update-device-using-adapter-data", log.Fields{"error": err})
-		}
-	}()
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // GetChildDevice returns details of child device
@@ -305,9 +289,6 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return &voltha.Device{Id: pID.Id}, nil
-	}
 	return rhp.deviceMgr.GetChildDevice(context.TODO(), pID.Id, serialNumber.Val, onuID.Val, parentPortNo.Val)
 }
 
@@ -347,9 +328,6 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return &voltha.Device{Id: proxyAddress.DeviceId}, nil
-	}
 	return rhp.deviceMgr.GetChildDeviceWithProxyAddress(context.TODO(), proxyAddress)
 }
 
@@ -383,12 +361,7 @@
 		}
 	}
 	log.Debugw("GetPorts", log.Fields{"deviceID": deviceID.Id, "portype": pt.Val, "transactionID": transactionID.Val})
-	if rhp.TestMode { // Execute only for test cases
-		aPort := &voltha.Port{Label: "test_port"}
-		allPorts := &voltha.Ports{}
-		allPorts.Items = append(allPorts.Items, aPort)
-		return allPorts, nil
-	}
+
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
@@ -438,15 +411,10 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return &voltha.Devices{Items: nil}, nil
-	}
-
 	return rhp.deviceMgr.getAllChildDevices(context.TODO(), pID.Id)
 }
 
-// ChildDeviceDetected is invoked when a child device is detected.  The following
-// parameters are expected:
+// ChildDeviceDetected is invoked when a child device is detected.  The following parameters are expected:
 // {parent_device_id, parent_port_no, child_device_type, channel_id, vendor_id, serial_number)
 func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ic.Argument) (*voltha.Device, error) {
 	if len(args) < 5 {
@@ -521,16 +489,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
-	}
 	device, err := rhp.deviceMgr.childDeviceDetected(context.TODO(), pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
 	if err != nil {
-		log.Errorw("child-detection-failed", log.Fields{"parentID": pID.Id, "onuID": onuID.Val, "error": err})
-		return nil, err
+		log.Debugw("child-detection-failed", log.Fields{"parentID": pID.Id, "onuID": onuID.Val, "error": err})
 	}
-
-	return device, nil
+	return device, err
 }
 
 // DeviceStateUpdate updates device status
@@ -581,19 +544,12 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.updateDeviceStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
+		voltha.ConnectStatus_Types(connStatus.Val)); err != nil {
+		log.Debugw("unable-to-update-device-status", log.Fields{"error": err})
+		return nil, err
 	}
-	// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
-	go func() {
-		err := rhp.deviceMgr.updateDeviceStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
-			voltha.ConnectStatus_Types(connStatus.Val))
-		if err != nil {
-			log.Errorw("unable-to-update-device-status", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // ChildrenStateUpdate updates child device status
@@ -644,24 +600,13 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
-	}
-
 	// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
-	go func() {
-		err := rhp.deviceMgr.updateChildrenStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
-			voltha.ConnectStatus_Types(connStatus.Val))
-		if err != nil {
-			log.Errorw("unable-to-update-children-status", log.Fields{"error": err})
-		}
-	}()
-
-	//if err := rhp.deviceMgr.updateChildrenStatus(deviceID.ID, voltha.OperStatus_OperStatus(operStatus.Val),
-	//	voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
-	//	return nil, err
-	//}
-	return new(empty.Empty), nil
+	if err := rhp.deviceMgr.updateChildrenStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
+		voltha.ConnectStatus_Types(connStatus.Val)); err != nil {
+		log.Debugw("unable-to-update-children-status", log.Fields{"error": err})
+		return nil, err
+	}
+	return &empty.Empty{}, nil
 }
 
 // PortsStateUpdate updates the ports state related to the device
@@ -705,18 +650,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.updatePortsState(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val)); err != nil {
+		log.Debugw("unable-to-update-ports-state", log.Fields{"error": err})
+		return nil, err
 	}
-
-	go func() {
-		err := rhp.deviceMgr.updatePortsState(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val))
-		if err != nil {
-			log.Errorw("unable-to-update-ports-state", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // PortStateUpdate updates the port state of the device
@@ -773,26 +711,15 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.updatePortState(context.TODO(), deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
+		voltha.OperStatus_Types(operStatus.Val)); err != nil {
+		// If the error doesn't change behavior and is essentially ignored, it is not an error, it is a
+		// warning.
+		// TODO: VOL-2707
+		log.Debugw("unable-to-update-port-state", log.Fields{"error": err})
+		return nil, err
 	}
-
-	go func() {
-		if err := rhp.deviceMgr.updatePortState(context.TODO(), deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
-			voltha.OperStatus_Types(operStatus.Val)); err != nil {
-			// If the error doesn't change behavior and is
-			// essentially ignored, it is not an error, it is a
-			// warning.
-			// TODO: VOL-2707
-			log.Warnw("unable-to-update-port-state", log.Fields{"error": err})
-		}
-	}()
-
-	//if err := rhp.deviceMgr.updatePortState(deviceID.ID, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
-	//	voltha.OperStatus_OperStatus(operStatus.Val)); err != nil {
-	//	return nil, err
-	//}
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // DeleteAllPorts deletes all ports of device
@@ -830,18 +757,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.deleteAllPorts(context.TODO(), deviceID.Id); err != nil {
+		log.Debugw("unable-to-delete-ports", log.Fields{"error": err})
+		return nil, err
 	}
-
-	go func() {
-		err := rhp.deviceMgr.deleteAllPorts(context.TODO(), deviceID.Id)
-		if err != nil {
-			log.Errorw("unable-to-delete-ports", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // ChildDevicesLost indicates that a parent device is in a state (Disabled) where it cannot manage the child devices.
@@ -880,18 +800,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.childDevicesLost(context.TODO(), parentDeviceID.Id); err != nil {
+		log.Debugw("unable-to-disable-child-devices", log.Fields{"error": err})
+		return nil, err
 	}
-
-	go func() {
-		err := rhp.deviceMgr.childDevicesLost(context.TODO(), parentDeviceID.Id)
-		if err != nil {
-			log.Errorw("unable-to-disable-child-devices", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // ChildDevicesDetected invoked by an adapter when child devices are found, typically after after a disable/enable sequence.
@@ -930,16 +843,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
-	}
-
 	if err := rhp.deviceMgr.childDevicesDetected(context.TODO(), parentDeviceID.Id); err != nil {
-		log.Errorw("child-devices-dection-failed", log.Fields{"parentID": parentDeviceID.Id, "error": err})
+		log.Debugw("child-devices-detection-failed", log.Fields{"parentID": parentDeviceID.Id, "error": err})
 		return nil, err
 	}
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // PortCreated adds port to device
@@ -983,17 +891,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.addPort(context.TODO(), deviceID.Id, port); err != nil {
+		log.Debugw("unable-to-add-port", log.Fields{"error": err})
+		return nil, err
 	}
-	go func() {
-		err := rhp.deviceMgr.addPort(context.TODO(), deviceID.Id, port)
-		if err != nil {
-			log.Errorw("unable-to-add-port", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // DevicePMConfigUpdate initializes the pm configs as defined by the adapter.
@@ -1032,18 +934,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.initPmConfigs(context.TODO(), pmConfigs.Id, pmConfigs); err != nil {
+		log.Debugw("unable-to-initialize-pm-configs", log.Fields{"error": err})
+		return nil, err
 	}
-
-	go func() {
-		err := rhp.deviceMgr.initPmConfigs(context.TODO(), pmConfigs.Id, pmConfigs)
-		if err != nil {
-			log.Errorw("unable-to-initialize-pm-configs", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // PacketIn sends the incoming packet of device
@@ -1095,17 +990,13 @@
 		}
 		defer txn.Close(context.TODO())
 	}
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
-	}
-	go func() {
-		err := rhp.deviceMgr.PacketIn(context.TODO(), deviceID.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
-		if err != nil {
-			log.Errorw("unable-to-receive-packet-from-adapter", log.Fields{"error": err})
-		}
-	}()
 
-	return new(empty.Empty), nil
+	if err := rhp.deviceMgr.PacketIn(context.TODO(), deviceID.Id, uint32(portNo.Val), transactionID.Val, packet.Payload); err != nil {
+		log.Debugw("unable-to-receive-packet-from-adapter", log.Fields{"error": err})
+		return nil, err
+
+	}
+	return &empty.Empty{}, nil
 }
 
 // UpdateImageDownload updates image download
@@ -1150,19 +1041,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.updateImageDownload(context.TODO(), deviceID.Id, img); err != nil {
+		log.Debugw("unable-to-update-image-download", log.Fields{"error": err})
+		return nil, err
 	}
-	go func() {
-		err := rhp.deviceMgr.updateImageDownload(context.TODO(), deviceID.Id, img)
-		if err != nil {
-			log.Errorw("unable-to-update-image-download", log.Fields{"error": err})
-		}
-	}()
-	//if err := rhp.deviceMgr.updateImageDownload(deviceID.ID, img); err != nil {
-	//	return nil, err
-	//}
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // ReconcileChildDevices reconciles child devices
@@ -1200,19 +1083,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.reconcileChildDevices(context.TODO(), parentDeviceID.Id); err != nil {
+		log.Debugw("unable-to-reconcile-child-devices", log.Fields{"error": err})
+		return nil, err
 	}
-
-	// Run it in its own routine
-	go func() {
-		err := rhp.deviceMgr.reconcileChildDevices(context.TODO(), parentDeviceID.Id)
-		if err != nil {
-			log.Errorw("unable-to-reconcile-child-devices", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // DeviceReasonUpdate updates device reason
@@ -1257,17 +1132,10 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.updateDeviceReason(context.TODO(), deviceID.Id, reason.Val); err != nil {
+		log.Debugw("unable-to-update-device-reason", log.Fields{"error": err})
+		return nil, err
+
 	}
-
-	// Run it in its own routine (w/ background context)
-	go func() {
-		err := rhp.deviceMgr.updateDeviceReason(context.TODO(), deviceID.Id, reason.Val)
-		if err != nil {
-			log.Errorw("unable-to-update-device-reason", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 42d628a..837e884 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -20,6 +20,8 @@
 	"context"
 	"encoding/hex"
 	"fmt"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"reflect"
 	"sync"
 	"time"
@@ -36,6 +38,10 @@
 	"google.golang.org/grpc/status"
 )
 
+const (
+	maxOrderedDeviceRequestQueueSize = 1000
+)
+
 // DeviceAgent represents device agent attributes
 type DeviceAgent struct {
 	deviceID         string
@@ -48,13 +54,16 @@
 	clusterDataProxy *model.Proxy
 	deviceProxy      *model.Proxy
 	exitChannel      chan int
-	lockDevice       sync.RWMutex
 	device           *voltha.Device
-	defaultTimeout   int64
+	requestQueue     *coreutils.RequestQueue
+	defaultTimeout   time.Duration
+	startOnce        sync.Once
+	stopOnce         sync.Once
+	stopped          bool
 }
 
 //newDeviceAgent creates a new device agent. The device will be initialized when start() is called.
-func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout int64) *DeviceAgent {
+func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout time.Duration) *DeviceAgent {
 	var agent DeviceAgent
 	agent.adapterProxy = ap
 	if device.Id == "" {
@@ -70,27 +79,38 @@
 	agent.adapterMgr = deviceMgr.adapterMgr
 	agent.exitChannel = make(chan int, 1)
 	agent.clusterDataProxy = cdProxy
-	agent.lockDevice = sync.RWMutex{}
 	agent.defaultTimeout = timeout
 	agent.device = proto.Clone(device).(*voltha.Device)
+	agent.requestQueue = coreutils.NewRequestQueue(agent.deviceID, maxOrderedDeviceRequestQueueSize)
 	return &agent
 }
 
-// start()
-// save the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.  Otherwise,
-// it will load the data from the dB and setup the necessary callbacks and proxies. Returns the device that
+// start() saves the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.
+// Otherwise, it will load the data from the dB and setup the necessary callbacks and proxies. Returns the device that
 // was started.
 func (agent *DeviceAgent) start(ctx context.Context, deviceToCreate *voltha.Device) (*voltha.Device, error) {
-	var device *voltha.Device
+	needToStart := false
+	if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
+		return agent.getDevice(ctx)
+	}
+	var startSucceeded bool
+	defer func() {
+		if !startSucceeded {
+			if err := agent.stop(ctx); err != nil {
+				log.Errorw("failed-to-cleanup-after-unsuccessful-start", log.Fields{"device-id": agent.deviceID, "error": err})
+			}
+		}
+	}()
 
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceID})
+	// Start the request queue.  If this start fails then stop will be invoked and it requires
+	// that the request sequencer is present
+	agent.requestQueue.Start()
+
+	var device *voltha.Device
 	if deviceToCreate == nil {
 		// Load the existing device
 		loadedDevice, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
 		if err != nil {
-			log.Errorw("failed-to-get-from-cluster-data-proxy", log.Fields{"error": err})
 			return nil, err
 		}
 		if loadedDevice != nil {
@@ -99,14 +119,12 @@
 				agent.deviceType = device.Adapter
 				agent.device = proto.Clone(device).(*voltha.Device)
 			} else {
-				log.Errorw("failed-to-convert-device", log.Fields{"deviceId": agent.deviceID})
 				return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
 			}
 		} else {
-			log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceID})
-			return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
+			return nil, status.Errorf(codes.NotFound, "device-%s-loading-failed", agent.deviceID)
 		}
-		log.Debugw("device-loaded-from-dB", log.Fields{"deviceId": agent.deviceID})
+		log.Infow("device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
 	} else {
 		// Create a new device
 		// Assumption is that AdminState, FlowGroups, and Flows are unitialized since this
@@ -126,74 +144,129 @@
 		// Add the initial device to the local model
 		added, err := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceID, device, "")
 		if err != nil {
-			log.Errorw("failed-to-save-devices-to-cluster-proxy", log.Fields{"error": err})
 			return nil, err
 		}
 		if added == nil {
-			log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceID})
 			return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceID)
 		}
-		agent.device = proto.Clone(device).(*voltha.Device)
+		agent.device = device
 	}
 	var err error
 	if agent.deviceProxy, err = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceID, false); err != nil {
-		log.Errorw("failed-to-add-devices-to-cluster-proxy", log.Fields{"error": err})
 		return nil, err
 	}
 	agent.deviceProxy.RegisterCallback(model.PostUpdate, agent.processUpdate)
 
-	log.Debugw("device-agent-started", log.Fields{"deviceId": agent.deviceID})
-	return device, nil
+	startSucceeded = true
+	log.Debugw("device-agent-started", log.Fields{"device-id": agent.deviceID})
+
+	return agent.getDevice(ctx)
 }
 
 // stop stops the device agent.  Not much to do for now
-func (agent *DeviceAgent) stop(ctx context.Context) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+func (agent *DeviceAgent) stop(ctx context.Context) error {
+	needToStop := false
+	if agent.stopOnce.Do(func() { needToStop = true }); !needToStop {
+		return nil
+	}
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
-	log.Debugw("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
+	log.Infow("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
 
 	// First unregister any callbacks
-	agent.deviceProxy.UnregisterCallback(model.PostUpdate, agent.processUpdate)
+	if agent.deviceProxy != nil {
+		agent.deviceProxy.UnregisterCallback(model.PostUpdate, agent.processUpdate)
+	}
 
 	//	Remove the device from the KV store
 	removed, err := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceID, "")
 	if err != nil {
-		log.Errorw("Failed-to-remove-device-from-cluster-data-proxy", log.Fields{"error": err})
-		return
+		return err
 	}
 	if removed == nil {
-		log.Debugw("device-already-removed", log.Fields{"id": agent.deviceID})
+		log.Debugw("device-already-removed", log.Fields{"device-id": agent.deviceID})
 	}
-	agent.exitChannel <- 1
-	log.Debugw("device-agent-stopped", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
+
+	// Stop the request queue - no more requests can be processed
+	agent.requestQueue.Stop()
+
+	close(agent.exitChannel)
+
+	agent.stopped = true
+
+	log.Infow("device-agent-stopped", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
+
+	return nil
 }
 
 // Load the most recent state from the KVStore for the device.
 func (agent *DeviceAgent) reconcileWithKVStore(ctx context.Context) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		log.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "error": err})
+		return
+	}
+	defer agent.requestQueue.RequestComplete()
 	log.Debug("reconciling-device-agent-devicetype")
 	// TODO: context timeout
 	device, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
 	if err != nil {
-		log.Errorw("Failed to get device info from cluster data proxy", log.Fields{"error": err})
+		log.Errorw("kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err})
 		return
 	}
 	if device != nil {
 		if d, ok := device.(*voltha.Device); ok {
 			agent.deviceType = d.Adapter
 			agent.device = proto.Clone(d).(*voltha.Device)
-			log.Debugw("reconciled-device-agent-devicetype", log.Fields{"Id": agent.deviceID, "type": agent.deviceType})
+			log.Debugw("reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
 		}
 	}
 }
 
+// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
+// and the only action required is to publish a successful result on kafka
+func (agent *DeviceAgent) onSuccess(rpc string, response interface{}, reqArgs ...interface{}) {
+	log.Debugw("response successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
+	// TODO: Post success message onto kafka
+}
+
+// onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
+// and the only action required is to publish the failed result on kafka
+func (agent *DeviceAgent) onFailure(rpc string, response interface{}, reqArgs ...interface{}) {
+	if res, ok := response.(error); ok {
+		log.Errorw("rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
+	} else {
+		log.Errorw("rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
+	}
+	// TODO: Post failure message onto kafka
+}
+
+func (agent *DeviceAgent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+	onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
+	defer cancel()
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			onFailure(rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+		} else if rpcResponse.Err != nil {
+			onFailure(rpc, rpcResponse.Err, reqArgs)
+		} else {
+			onSuccess(rpc, rpcResponse.Reply, reqArgs)
+		}
+	case <-ctx.Done():
+		onFailure(rpc, ctx.Err(), reqArgs)
+	}
+}
+
 // getDevice returns the device data from cache
-func (agent *DeviceAgent) getDevice() *voltha.Device {
-	agent.lockDevice.RLock()
-	defer agent.lockDevice.RUnlock()
-	return proto.Clone(agent.device).(*voltha.Device)
+func (agent *DeviceAgent) getDevice(ctx context.Context) (*voltha.Device, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	return proto.Clone(agent.device).(*voltha.Device), nil
 }
 
 // getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
@@ -203,31 +276,32 @@
 
 // enableDevice activates a preprovisioned or a disable device
 func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("enableDevice", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	log.Debugw("enableDevice", log.Fields{"device-id": agent.deviceID})
 
 	cloned := agent.getDeviceWithoutLock()
 
 	// First figure out which adapter will handle this device type.  We do it at this stage as allow devices to be
-	// pre-provisionned with the required adapter not registered.   At this stage, since we need to communicate
+	// pre-provisioned with the required adapter not registered.   At this stage, since we need to communicate
 	// with the adapter then we need to know the adapter that will handle this request
 	adapterName, err := agent.adapterMgr.getAdapterName(cloned.Type)
 	if err != nil {
-		log.Warnw("no-adapter-registered-for-device-type", log.Fields{"deviceType": cloned.Type, "deviceAdapter": cloned.Adapter})
 		return err
 	}
 	cloned.Adapter = adapterName
 
 	if cloned.AdminState == voltha.AdminState_ENABLED {
-		log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceID})
+		log.Debugw("device-already-enabled", log.Fields{"device-id": agent.deviceID})
 		return nil
 	}
 
 	if cloned.AdminState == voltha.AdminState_DELETED {
 		// This is a temporary state when a device is deleted before it gets removed from the model.
 		err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-a-deleted-device: %s ", cloned.Id))
-		log.Warnw("invalid-state", log.Fields{"id": agent.deviceID, "state": cloned.AdminState, "error": err})
 		return err
 	}
 
@@ -242,36 +316,38 @@
 		return err
 	}
 
-	// Adopt the device if it was in preprovision state.  In all other cases, try to reenable it.
+	// Adopt the device if it was in pre-provision state.  In all other cases, try to re-enable it.
 	device := proto.Clone(cloned).(*voltha.Device)
+	var ch chan *kafka.RpcResponse
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	if previousAdminState == voltha.AdminState_PREPROVISIONED {
-		if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
-			log.Debugw("adoptDevice-error", log.Fields{"id": agent.deviceID, "error": err})
-			return err
-		}
+		ch, err = agent.adapterProxy.adoptDevice(subCtx, device)
 	} else {
-		if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
-			log.Debugw("renableDevice-error", log.Fields{"id": agent.deviceID, "error": err})
-			return err
-		}
+		ch, err = agent.adapterProxy.reEnableDevice(subCtx, device)
 	}
+	if err != nil {
+		cancel()
+		return err
+	}
+	// Wait for response
+	go agent.waitForAdapterResponse(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
 
-func (agent *DeviceAgent) sendBulkFlowsToAdapters(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
-	if err := agent.adapterProxy.UpdateFlowsBulk(ctx, device, flows, groups, flowMetadata); err != nil {
-		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.deviceID, "error": err})
-		response.Error(err)
+func (agent *DeviceAgent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
+	defer cancel()
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			response.Error(status.Errorf(codes.Aborted, "channel-closed"))
+		} else if rpcResponse.Err != nil {
+			response.Error(rpcResponse.Err)
+		} else {
+			response.Done()
+		}
+	case <-ctx.Done():
+		response.Error(ctx.Err())
 	}
-	response.Done()
-}
-
-func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(ctx context.Context, device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
-	if err := agent.adapterProxy.UpdateFlowsIncremental(ctx, device, flows, groups, flowMetadata); err != nil {
-		log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.deviceID, "error": err})
-		response.Error(err)
-	}
-	response.Done()
 }
 
 //deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice.  This function will
@@ -334,17 +410,24 @@
 }
 
 func (agent *DeviceAgent) addFlowsAndGroupsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups, "flowMetadata": flowMetadata})
+	log.Debugw("add-flows-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups, "flow-metadata": flowMetadata})
 
 	if (len(newFlows) | len(newGroups)) == 0 {
-		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
+		log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
 		return coreutils.DoneResponse(), nil
 	}
 
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return coreutils.DoneResponse(), err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	device := agent.getDeviceWithoutLock()
+	dType := agent.adapterMgr.getDeviceType(device.Type)
+	if dType == nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+
 	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
 	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
 
@@ -356,29 +439,32 @@
 
 	// Sanity check
 	if (len(updatedAllFlows) | len(flowsToDelete) | len(updatedAllGroups) | len(groupsToDelete)) == 0 {
-		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
+		log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
 		return coreutils.DoneResponse(), nil
 	}
 
-	// Send update to adapters
-	// Create two channels to receive responses from the dB and from the adapters.
-	// Do not close these channels as this function may exit on timeout before the dB or adapters get a chance
-	// to send their responses.  These channels will be garbage collected once all the responses are
-	// received
-	response := coreutils.NewResponse()
-	dType := agent.adapterMgr.getDeviceType(device.Type)
-	if dType == nil {
-		log.Errorw("non-existent device type", log.Fields{"deviceType": device.Type})
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent device type %s", device.Type)
+	// store the changed data
+	device.Flows = &voltha.Flows{Items: updatedAllFlows}
+	device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
+	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-device-%s", agent.deviceID)
 	}
-	if !dType.AcceptsAddRemoveFlowUpdates {
 
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
 		if len(updatedAllGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedAllGroups) && len(updatedAllFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedAllFlows) {
-			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
+			log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
+			cancel()
 			return coreutils.DoneResponse(), nil
 		}
-		go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata, response)
-
+		rpcResponse, err := agent.adapterProxy.updateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: newFlows},
@@ -389,16 +475,13 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
+		rpcResponse, err := agent.adapterProxy.updateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	}
-
-	// store the changed data
-	device.Flows = &voltha.Flows{Items: updatedAllFlows}
-	device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
-	}
-
 	return response, nil
 }
 
@@ -409,25 +492,31 @@
 	if err != nil {
 		return err
 	}
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
-		log.Debugw("Failed to get response from adapter[or] DB", log.Fields{"result": res})
-		return status.Errorf(codes.Aborted, "errors-%s", res)
+	if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); errs != nil {
+		log.Warnw("no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
+		return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
 	}
 	return nil
 }
 
 func (agent *DeviceAgent) deleteFlowsAndGroupsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
+	log.Debugw("delete-flows-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
 
 	if (len(flowsToDel) | len(groupsToDel)) == 0 {
-		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
+		log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
 		return coreutils.DoneResponse(), nil
 	}
 
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return coreutils.DoneResponse(), err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	device := agent.getDeviceWithoutLock()
+	dType := agent.adapterMgr.getDeviceType(device.Type)
+	if dType == nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
 
 	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
 	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
@@ -451,32 +540,41 @@
 
 	log.Debugw("deleteFlowsAndGroups",
 		log.Fields{
-			"deviceId":     agent.deviceID,
-			"flowsToDel":   len(flowsToDel),
-			"flowsToKeep":  len(flowsToKeep),
-			"groupsToDel":  len(groupsToDel),
-			"groupsToKeep": len(groupsToKeep),
+			"device-id":      agent.deviceID,
+			"flows-to-del":   len(flowsToDel),
+			"flows-to-keep":  len(flowsToKeep),
+			"groups-to-del":  len(groupsToDel),
+			"groups-to-keep": len(groupsToKeep),
 		})
 
 	// Sanity check
 	if (len(flowsToKeep) | len(flowsToDel) | len(groupsToKeep) | len(groupsToDel)) == 0 {
-		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
+		log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows-to-del": flowsToDel, "groups-to-del": groupsToDel})
 		return coreutils.DoneResponse(), nil
 	}
 
-	// Send update to adapters
-	response := coreutils.NewResponse()
-	dType := agent.adapterMgr.getDeviceType(device.Type)
-	if dType == nil {
-		log.Errorw("non-existent device type", log.Fields{"deviceType": device.Type})
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent device type %s", device.Type)
+	// store the changed data
+	device.Flows = &voltha.Flows{Items: flowsToKeep}
+	device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
+	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
 	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
 			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
+			cancel()
 			return coreutils.DoneResponse(), nil
 		}
-		go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, response)
+		rpcResponse, err := agent.adapterProxy.updateFlowsBulk(subCtx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -487,18 +585,14 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
+		rpcResponse, err := agent.adapterProxy.updateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	}
-
-	// store the changed data
-	device.Flows = &voltha.Flows{Items: flowsToKeep}
-	device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
-	}
-
 	return response, nil
-
 }
 
 //deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
@@ -515,43 +609,59 @@
 }
 
 func (agent *DeviceAgent) updateFlowsAndGroupsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	log.Debugw("updateFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+	log.Debugw("updateFlowsAndGroups", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
 
 	if (len(updatedFlows) | len(updatedGroups)) == 0 {
-		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+		log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
 		return coreutils.DoneResponse(), nil
 	}
 
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return coreutils.DoneResponse(), err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	device := agent.getDeviceWithoutLock()
+	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
+	}
+	dType := agent.adapterMgr.getDeviceType(device.Type)
+	if dType == nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
 
 	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
 	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
 
 	if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
-		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+		log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
 		return coreutils.DoneResponse(), nil
 	}
 
 	log.Debugw("updating-flows-and-groups",
 		log.Fields{
-			"deviceId":      agent.deviceID,
-			"updatedFlows":  updatedFlows,
-			"updatedGroups": updatedGroups,
+			"device-id":      agent.deviceID,
+			"updated-flows":  updatedFlows,
+			"updated-groups": updatedGroups,
 		})
 
-	response := coreutils.NewResponse()
-	dType := agent.adapterMgr.getDeviceType(device.Type)
-	if dType == nil {
-		log.Errorw("non-existent device type", log.Fields{"deviceType": device.Type})
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent device type %s", device.Type)
+	// store the updated data
+	device.Flows = &voltha.Flows{Items: updatedFlows}
+	device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
+	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
 	}
 
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, response)
+		rpcResponse, err := agent.adapterProxy.updateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
 		var flowsToAdd []*ofp.OfpFlowStats
 		var flowsToDelete []*ofp.OfpFlowStats
@@ -584,16 +694,17 @@
 
 		log.Debugw("updating-flows-and-groups",
 			log.Fields{
-				"deviceId":       agent.deviceID,
-				"flowsToAdd":     flowsToAdd,
-				"flowsToDelete":  flowsToDelete,
-				"groupsToAdd":    groupsToAdd,
-				"groupsToDelete": groupsToDelete,
+				"device-id":        agent.deviceID,
+				"flows-to-add":     flowsToAdd,
+				"flows-to-delete":  flowsToDelete,
+				"groups-to-add":    groupsToAdd,
+				"groups-to-delete": groupsToDelete,
 			})
 
 		// Sanity check
 		if (len(flowsToAdd) | len(flowsToDelete) | len(groupsToAdd) | len(groupsToDelete) | len(updatedGroups)) == 0 {
-			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+			log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+			cancel()
 			return coreutils.DoneResponse(), nil
 		}
 
@@ -606,14 +717,12 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
 		}
-		go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
-	}
-
-	// store the updated data
-	device.Flows = &voltha.Flows{Items: updatedFlows}
-	device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+		rpcResponse, err := agent.adapterProxy.updateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	}
 
 	return response, nil
@@ -634,9 +743,11 @@
 
 //disableDevice disable a device
 func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("disableDevice", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("disableDevice", log.Fields{"device-id": agent.deviceID})
 
 	cloned := agent.getDeviceWithoutLock()
 
@@ -646,7 +757,6 @@
 	}
 	if cloned.AdminState == voltha.AdminState_PREPROVISIONED ||
 		cloned.AdminState == voltha.AdminState_DELETED {
-		log.Debugw("device-not-enabled", log.Fields{"id": agent.deviceID})
 		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
 	}
 
@@ -656,75 +766,75 @@
 	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
-	if err := agent.adapterProxy.DisableDevice(ctx, proto.Clone(cloned).(*voltha.Device)); err != nil {
-		log.Debugw("disableDevice-error", log.Fields{"id": agent.deviceID, "error": err})
+
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.disableDevice(subCtx, proto.Clone(cloned).(*voltha.Device))
+	if err != nil {
+		cancel()
 		return err
 	}
-	return nil
-}
+	go agent.waitForAdapterResponse(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure)
 
-func (agent *DeviceAgent) updateAdminState(ctx context.Context, adminState voltha.AdminState_Types) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("updateAdminState", log.Fields{"id": agent.deviceID})
-
-	cloned := agent.getDeviceWithoutLock()
-
-	if cloned.AdminState == adminState {
-		log.Debugw("no-change-needed", log.Fields{"id": agent.deviceID, "state": adminState})
-		return nil
-	}
-	// Received an Ack (no error found above).  Now update the device in the model to the expected state
-	cloned.AdminState = adminState
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
-		return err
-	}
 	return nil
 }
 
 func (agent *DeviceAgent) rebootDevice(ctx context.Context) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("rebootDevice", log.Fields{"id": agent.deviceID})
-
-	device := agent.getDeviceWithoutLock()
-	if err := agent.adapterProxy.RebootDevice(ctx, device); err != nil {
-		log.Debugw("rebootDevice-error", log.Fields{"id": agent.deviceID, "error": err})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("rebootDevice", log.Fields{"device-id": agent.deviceID})
+
+	device := agent.getDeviceWithoutLock()
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.rebootDevice(subCtx, device)
+	if err != nil {
+		cancel()
+		return err
+	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
 
 func (agent *DeviceAgent) deleteDevice(ctx context.Context) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("deleteDevice", log.Fields{"id": agent.deviceID})
+	log.Debugw("deleteDevice", log.Fields{"device-id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	cloned := agent.getDeviceWithoutLock()
-	if cloned.AdminState == voltha.AdminState_DELETED {
-		log.Debugw("device-already-in-deleted-state", log.Fields{"id": agent.deviceID})
-		return nil
-	}
-	if cloned.AdminState != voltha.AdminState_PREPROVISIONED {
-		if err := agent.adapterProxy.DeleteDevice(ctx, cloned); err != nil {
-			log.Debugw("deleteDevice-error", log.Fields{"id": agent.deviceID, "error": err})
-			return err
-		}
-	}
-	//Set the state to deleted - this will trigger some background process to invoke parent adapter to delete child
-	//device and clean up the device as well as its association with the logical device
-	cloned.AdminState = voltha.AdminState_DELETED
 
+	previousState := cloned.AdminState
+
+	// No check is required when deleting a device.  Changing the state to DELETE will trigger the removal of this
+	// device by the state machine
+	cloned.AdminState = voltha.AdminState_DELETED
 	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
+
+	// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
+	// adapter
+	if previousState != ic.AdminState_PREPROVISIONED {
+		subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+		ch, err := agent.adapterProxy.deleteDevice(subCtx, cloned)
+		if err != nil {
+			cancel()
+			return err
+		}
+		go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onSuccess, agent.onFailure)
+	}
 	return nil
 }
 
 func (agent *DeviceAgent) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("setParentId", log.Fields{"deviceId": device.Id, "parentId": parentID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	log.Debugw("setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
 
 	cloned := agent.getDeviceWithoutLock()
 	cloned.ParentId = parentID
@@ -732,13 +842,16 @@
 	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
+
 	return nil
 }
 
 func (agent *DeviceAgent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("updatePmConfigs", log.Fields{"id": pmConfigs.Id})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
 
 	cloned := agent.getDeviceWithoutLock()
 	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
@@ -747,50 +860,58 @@
 		return err
 	}
 	// Send the request to the adapter
-	if err := agent.adapterProxy.UpdatePmConfigs(ctx, cloned, pmConfigs); err != nil {
-		log.Errorw("update-pm-configs-error", log.Fields{"id": agent.deviceID, "error": err})
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.updatePmConfigs(subCtx, cloned, pmConfigs)
+	if err != nil {
+		cancel()
 		return err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "updatePmConfigs", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
 
 func (agent *DeviceAgent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("initPmConfigs", log.Fields{"id": pmConfigs.Id})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
 
 	cloned := agent.getDeviceWithoutLock()
 	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
-	// Store the device
 	updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
 	afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, cloned, false, "")
 	if err != nil {
-		return status.Errorf(codes.Internal, "%s", agent.deviceID)
+		return err
 	}
 	if afterUpdate == nil {
-		return status.Errorf(codes.Internal, "%s", agent.deviceID)
+		return status.Errorf(codes.Internal, "pm-kv-update-failed-for-device-id-%s", agent.deviceID)
 	}
 	return nil
 }
 
 func (agent *DeviceAgent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
-	agent.lockDevice.RLock()
-	defer agent.lockDevice.RUnlock()
-	log.Debugw("listPmConfigs", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("listPmConfigs", log.Fields{"device-id": agent.deviceID})
 
 	return agent.getDeviceWithoutLock().PmConfigs, nil
 }
 
 func (agent *DeviceAgent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("downloadImage", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	log.Debugw("downloadImage", log.Fields{"device-id": agent.deviceID})
 
 	device := agent.getDeviceWithoutLock()
 
 	if device.AdminState != voltha.AdminState_ENABLED {
-		log.Debugw("device-not-enabled", log.Fields{"id": agent.deviceID})
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
 	}
 	// Save the image
 	clonedImg := proto.Clone(img).(*voltha.ImageDownload)
@@ -817,10 +938,13 @@
 			return nil, err
 		}
 		// Send the request to the adapter
-		if err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg); err != nil {
-			log.Debugw("downloadImage-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+		subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+		ch, err := agent.adapterProxy.downloadImage(ctx, cloned, clonedImg)
+		if err != nil {
+			cancel()
 			return nil, err
 		}
+		go agent.waitForAdapterResponse(subCtx, cancel, "downloadImage", ch, agent.onSuccess, agent.onFailure)
 	}
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
 }
@@ -836,15 +960,18 @@
 }
 
 func (agent *DeviceAgent) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("cancelImageDownload", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	log.Debugw("cancelImageDownload", log.Fields{"device-id": agent.deviceID})
 
 	device := agent.getDeviceWithoutLock()
 
 	// Verify whether the Image is in the list of image being downloaded
 	if !isImageRegistered(img, device) {
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
 	}
 
 	// Update image download state
@@ -861,28 +988,32 @@
 		if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 			return nil, err
 		}
-		// Send the request to the adapter
-		if err := agent.adapterProxy.CancelImageDownload(ctx, device, img); err != nil {
-			log.Debugw("cancelImageDownload-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+		subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+		ch, err := agent.adapterProxy.cancelImageDownload(subCtx, device, img)
+		if err != nil {
+			cancel()
 			return nil, err
 		}
+		go agent.waitForAdapterResponse(subCtx, cancel, "cancelImageDownload", ch, agent.onSuccess, agent.onFailure)
 	}
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
 }
 
 func (agent *DeviceAgent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("activateImage", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("activateImage", log.Fields{"device-id": agent.deviceID})
 	cloned := agent.getDeviceWithoutLock()
 
 	// Verify whether the Image is in the list of image being downloaded
 	if !isImageRegistered(img, cloned) {
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
 	}
 
 	if cloned.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
 	}
 	// Update image download state
 	for _, image := range cloned.ImageDownloads {
@@ -896,19 +1027,25 @@
 		return nil, err
 	}
 
-	if err := agent.adapterProxy.ActivateImageUpdate(ctx, proto.Clone(cloned).(*voltha.Device), img); err != nil {
-		log.Debugw("activateImage-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.activateImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+	if err != nil {
+		cancel()
 		return nil, err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "activateImageUpdate", ch, agent.onSuccess, agent.onFailure)
+
 	// The status of the AdminState will be changed following the update_download_status response from the adapter
 	// The image name will also be removed from the device list
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
 }
 
 func (agent *DeviceAgent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("revertImage", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("revertImage", log.Fields{"device-id": agent.deviceID})
 
 	cloned := agent.getDeviceWithoutLock()
 
@@ -931,31 +1068,51 @@
 		return nil, err
 	}
 
-	if err := agent.adapterProxy.RevertImageUpdate(ctx, proto.Clone(cloned).(*voltha.Device), img); err != nil {
-		log.Debugw("revertImage-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.revertImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+	if err != nil {
+		cancel()
 		return nil, err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "revertImageUpdate", ch, agent.onSuccess, agent.onFailure)
+
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
 }
 
 func (agent *DeviceAgent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("getImageDownloadStatus", log.Fields{"id": agent.deviceID})
+	log.Debugw("getImageDownloadStatus", log.Fields{"device-id": agent.deviceID})
 
-	cloned := agent.getDeviceWithoutLock()
-	resp, err := agent.adapterProxy.GetImageDownloadStatus(ctx, cloned, img)
-	if err != nil {
-		log.Debugw("getImageDownloadStatus-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	return resp, nil
+	device := agent.getDeviceWithoutLock()
+	ch, err := agent.adapterProxy.getImageDownloadStatus(ctx, device, img)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+	// Wait for the adapter response
+	rpcResponse, ok := <-ch
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+	}
+	if rpcResponse.Err != nil {
+		return nil, rpcResponse.Err
+	}
+	// Successful response
+	imgDownload := &voltha.ImageDownload{}
+	if err := ptypes.UnmarshalAny(rpcResponse.Reply, imgDownload); err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+	}
+	return imgDownload, nil
 }
 
 func (agent *DeviceAgent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("updateImageDownload", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
 
 	cloned := agent.getDeviceWithoutLock()
 
@@ -983,9 +1140,11 @@
 }
 
 func (agent *DeviceAgent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
-	agent.lockDevice.RLock()
-	defer agent.lockDevice.RUnlock()
-	log.Debugw("getImageDownload", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("getImageDownload", log.Fields{"device-id": agent.deviceID})
 
 	cloned := agent.getDeviceWithoutLock()
 	for _, image := range cloned.ImageDownloads {
@@ -997,16 +1156,18 @@
 }
 
 func (agent *DeviceAgent) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
-	agent.lockDevice.RLock()
-	defer agent.lockDevice.RUnlock()
-	log.Debugw("listImageDownloads", log.Fields{"id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("listImageDownloads", log.Fields{"device-id": agent.deviceID})
 
 	return &voltha.ImageDownloads{Items: agent.getDeviceWithoutLock().ImageDownloads}, nil
 }
 
 // getPorts retrieves the ports information of the device based on the port type.
 func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
-	log.Debugw("getPorts", log.Fields{"id": agent.deviceID, "portType": portType})
+	log.Debugw("getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
 	ports := &voltha.Ports{}
 	if device, _ := agent.deviceMgr.GetDevice(ctx, agent.deviceID); device != nil {
 		for _, port := range device.Ports {
@@ -1018,38 +1179,81 @@
 	return ports
 }
 
-// getSwitchCapability is a helper method that a logical device agent uses to retrieve the switch capability of a
-// parent device
+// getSwitchCapability retrieves the switch capability of a parent device
 func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
-	log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceID})
-	device, err := agent.deviceMgr.GetDevice(ctx, agent.deviceID)
-	if device == nil {
+	log.Debugw("getSwitchCapability", log.Fields{"device-id": agent.deviceID})
+
+	cloned, err := agent.getDevice(ctx)
+	if err != nil {
 		return nil, err
 	}
-	var switchCap *ic.SwitchCapability
-	if switchCap, err = agent.adapterProxy.GetOfpDeviceInfo(ctx, device); err != nil {
-		log.Debugw("getSwitchCapability-error", log.Fields{"id": device.Id, "error": err})
+	ch, err := agent.adapterProxy.getOfpDeviceInfo(ctx, cloned)
+	if err != nil {
+		return nil, err
+	}
+
+	// Wait for adapter response
+	rpcResponse, ok := <-ch
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "channel-closed")
+	}
+	if rpcResponse.Err != nil {
+		return nil, rpcResponse.Err
+	}
+	// Successful response
+	switchCap := &ic.SwitchCapability{}
+	if err := ptypes.UnmarshalAny(rpcResponse.Reply, switchCap); err != nil {
 		return nil, err
 	}
 	return switchCap, nil
 }
 
-// getPortCapability is a helper method that a logical device agent uses to retrieve the port capability of a
-// device
+// getPortCapability retrieves the port capability of a device
 func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
-	log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceID})
-	device, err := agent.deviceMgr.GetDevice(ctx, agent.deviceID)
-	if device == nil {
+	log.Debugw("getPortCapability", log.Fields{"device-id": agent.deviceID})
+	device, err := agent.getDevice(ctx)
+	if err != nil {
 		return nil, err
 	}
-	var portCap *ic.PortCapability
-	if portCap, err = agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo); err != nil {
-		log.Debugw("getPortCapability-error", log.Fields{"id": device.Id, "error": err})
+	ch, err := agent.adapterProxy.getOfpPortInfo(ctx, device, portNo)
+	if err != nil {
 		return nil, err
 	}
+	// Wait for adapter response
+	rpcResponse, ok := <-ch
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "channel-closed")
+	}
+	if rpcResponse.Err != nil {
+		return nil, rpcResponse.Err
+	}
+	// Successful response
+	portCap := &ic.PortCapability{}
+	if err := ptypes.UnmarshalAny(rpcResponse.Reply, portCap); err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+	}
 	return portCap, nil
 }
 
+func (agent *DeviceAgent) onPacketFailure(rpc string, response interface{}, args ...interface{}) {
+	// packet data is encoded in the args param as the first parameter
+	var packet []byte
+	if len(args) >= 1 {
+		if pkt, ok := args[0].([]byte); ok {
+			packet = pkt
+		}
+	}
+	var errResp error
+	if err, ok := response.(error); ok {
+		errResp = err
+	}
+	log.Warnw("packet-out-error", log.Fields{
+		"device-id": agent.deviceID,
+		"error":     errResp,
+		"packet":    hex.EncodeToString(packet),
+	})
+}
+
 func (agent *DeviceAgent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error {
 	// If deviceType=="" then we must have taken ownership of this device.
 	// Fixes VOL-2226 where a core would take ownership and have stale data
@@ -1057,20 +1261,19 @@
 		agent.reconcileWithKVStore(ctx)
 	}
 	//	Send packet to adapter
-	if err := agent.adapterProxy.packetOut(ctx, agent.deviceType, agent.deviceID, outPort, packet); err != nil {
-		log.Debugw("packet-out-error", log.Fields{
-			"id":     agent.deviceID,
-			"error":  err,
-			"packet": hex.EncodeToString(packet.Data),
-		})
-		return err
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.packetOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
+	if err != nil {
+		cancel()
+		return nil
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "packetOut", ch, agent.onSuccess, agent.onPacketFailure, packet.Data)
 	return nil
 }
 
-// processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
+// processUpdate is a respCallback invoked whenever there is a change on the device manages by this device agent
 func (agent *DeviceAgent) processUpdate(ctx context.Context, args ...interface{}) interface{} {
-	//// Run this callback in its own go routine
+	//// Run this respCallback in its own go routine
 	go func(args ...interface{}) interface{} {
 		var previous *voltha.Device
 		var current *voltha.Device
@@ -1088,10 +1291,10 @@
 			log.Errorw("too-many-args-in-callback", log.Fields{"len": len(args)})
 			return nil
 		}
-		// Perform the state transition in it's own go routine (since the caller doesn't wait for this, use a background context)
+		// Perform the state transition in it's own go routine
 		if err := agent.deviceMgr.processTransition(context.Background(), previous, current); err != nil {
-			log.Errorw("failed-process-transition", log.Fields{"deviceId": previous.Id,
-				"previousAdminState": previous.AdminState, "currentAdminState": current.AdminState})
+			log.Errorw("failed-process-transition", log.Fields{"device-id": previous.Id,
+				"previous-admin-state": previous.AdminState, "current-admin-state": current.AdminState})
 		}
 		return nil
 	}(args...)
@@ -1112,13 +1315,16 @@
 	cloned.Reason = device.Reason
 	return cloned, nil
 }
+
 func (agent *DeviceAgent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("updateDeviceUsingAdapterData", log.Fields{"deviceId": device.Id})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
+
 	updatedDevice, err := agent.mergeDeviceInfoFromAdapter(device)
 	if err != nil {
-		log.Errorw("failed to update device ", log.Fields{"deviceId": device.Id})
 		return status.Errorf(codes.Internal, "%s", err.Error())
 	}
 	cloned := proto.Clone(updatedDevice).(*voltha.Device)
@@ -1127,13 +1333,16 @@
 
 func (agent *DeviceAgent) updateDeviceWithoutLock(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
-	cloned := proto.Clone(device).(*voltha.Device)
+	//cloned := proto.Clone(device).(*voltha.Device)
+	cloned := device
 	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
 func (agent *DeviceAgent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	cloned := agent.getDeviceWithoutLock()
 
@@ -1153,8 +1362,10 @@
 
 func (agent *DeviceAgent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
 	log.Debugw("updatePortsOperState", log.Fields{"device-id": agent.deviceID})
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	cloned := agent.getDeviceWithoutLock()
 	for _, port := range cloned.Ports {
 		port.OperStatus = operStatus
@@ -1164,8 +1375,10 @@
 }
 
 func (agent *DeviceAgent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	// Work only on latest data
 	// TODO: Get list of ports from device directly instead of the entire device
 	cloned := agent.getDeviceWithoutLock()
@@ -1186,8 +1399,10 @@
 
 func (agent *DeviceAgent) deleteAllPorts(ctx context.Context) error {
 	log.Debugw("deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	cloned := agent.getDeviceWithoutLock()
 
@@ -1208,9 +1423,11 @@
 }
 
 func (agent *DeviceAgent) addPort(ctx context.Context, port *voltha.Port) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("addPort", log.Fields{"deviceId": agent.deviceID, "port": port})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	log.Debugw("addPort", log.Fields{"deviceId": agent.deviceID})
 
 	cloned := agent.getDeviceWithoutLock()
 	updatePort := false
@@ -1245,8 +1462,10 @@
 }
 
 func (agent *DeviceAgent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	log.Debugw("adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
 
 	cloned := agent.getDeviceWithoutLock()
@@ -1279,30 +1498,13 @@
 	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *DeviceAgent) deletePeerPorts(ctx context.Context, deviceID string) error {
-	log.Debug("deletePeerPorts")
-
-	cloned := agent.getDeviceWithoutLock()
-
-	var updatedPeers []*voltha.Port_PeerPort
-	for _, port := range cloned.Ports {
-		updatedPeers = make([]*voltha.Port_PeerPort, 0)
-		for _, peerPort := range port.Peers {
-			if peerPort.DeviceId != deviceID {
-				updatedPeers = append(updatedPeers, peerPort)
-			}
-		}
-		port.Peers = updatedPeers
-	}
-
-	// Store the device with updated peer ports
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
 // TODO: A generic device update by attribute
 func (agent *DeviceAgent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		log.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
+		return
+	}
+	defer agent.requestQueue.RequestComplete()
 	if value == nil {
 		return
 	}
@@ -1336,17 +1538,21 @@
 }
 
 func (agent *DeviceAgent) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	log.Debugw("simulateAlarm", log.Fields{"id": agent.deviceID})
 
 	cloned := agent.getDeviceWithoutLock()
 
-	// First send the request to an Adapter and wait for a response
-	if err := agent.adapterProxy.SimulateAlarm(ctx, cloned, simulatereq); err != nil {
-		log.Debugw("simulateAlarm-error", log.Fields{"id": agent.deviceID, "error": err})
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.simulateAlarm(subCtx, cloned, simulatereq)
+	if err != nil {
+		cancel()
 		return err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "simulateAlarm", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
 
@@ -1369,8 +1575,10 @@
 }
 
 func (agent *DeviceAgent) updateDeviceReason(ctx context.Context, reason string) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	cloned := agent.getDeviceWithoutLock()
 	cloned.Reason = reason
@@ -1380,10 +1588,12 @@
 }
 
 func (agent *DeviceAgent) disablePort(ctx context.Context, Port *voltha.Port) error {
-	var cp *voltha.Port
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	log.Debugw("disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+	var cp *voltha.Port
 	// Get the most up to date the device info
 	device := agent.getDeviceWithoutLock()
 	for _, port := range device.Ports {
@@ -1406,19 +1616,26 @@
 		log.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
 		return err
 	}
+
 	//send request to adapter
-	if err := agent.adapterProxy.disablePort(ctx, device, cp); err != nil {
-		log.Debugw("DisablePort-error", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.disablePort(ctx, device, cp)
+	if err != nil {
+		cancel()
 		return err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "disablePort", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
 
 func (agent *DeviceAgent) enablePort(ctx context.Context, Port *voltha.Port) error {
-	var cp *voltha.Port
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	log.Debugw("enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+
+	var cp *voltha.Port
 	// Get the most up to date the device info
 	device := agent.getDeviceWithoutLock()
 	for _, port := range device.Ports {
@@ -1442,28 +1659,47 @@
 		return err
 	}
 	//send request to adapter
-	if err := agent.adapterProxy.enablePort(ctx, device, cp); err != nil {
-		log.Debugw("EnablePort-error", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.enablePort(ctx, device, cp)
+	if err != nil {
+		cancel()
 		return err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "enablePort", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
 
 func (agent *DeviceAgent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("ChildDeviceLost", log.Fields{"id": device.Id})
+	log.Debugw("childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-ud": agent.deviceID})
 
 	//Remove the associated peer ports on the parent device
-	if err := agent.deviceMgr.deletePeerPorts(ctx, device.ParentId, device.Id); err != nil {
-		// At this stage, the parent device may also have been deleted.  Just log and keep processing.
-		log.Warnw("failure-deleting-peer-port", log.Fields{"error": err, "child-device-id": device.Id, "parent-device-id": device.ParentId})
+	parentDevice := agent.getDeviceWithoutLock()
+	var updatedPeers []*voltha.Port_PeerPort
+	for _, port := range parentDevice.Ports {
+		updatedPeers = make([]*voltha.Port_PeerPort, 0)
+		for _, peerPort := range port.Peers {
+			if peerPort.DeviceId != device.Id {
+				updatedPeers = append(updatedPeers, peerPort)
+			}
+		}
+		port.Peers = updatedPeers
+	}
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, parentDevice, false, ""); err != nil {
+		return err
 	}
 
-	if err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId); err != nil {
-		log.Warnw("ChildDeviceLost-error", log.Fields{"error": err})
+	//send request to adapter
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.childDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId)
+	if err != nil {
+		cancel()
+		return err
 	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "childDeviceLost", ch, agent.onSuccess, agent.onFailure)
 	return nil
-
 }
diff --git a/rw_core/core/device_agent_test.go b/rw_core/core/device_agent_test.go
index be5fdb1..85fc09e 100755
--- a/rw_core/core/device_agent_test.go
+++ b/rw_core/core/device_agent_test.go
@@ -101,7 +101,7 @@
 func (dat *DATest) startCore(inCompeteMode bool) {
 	cfg := config.NewRWCoreFlags()
 	cfg.CorePairTopic = "rw_core"
-	cfg.DefaultRequestTimeout = dat.defaultTimeout.Nanoseconds() / 1000000 //TODO: change when Core changes to Duration
+	cfg.DefaultRequestTimeout = dat.defaultTimeout
 	cfg.KVStorePort = dat.kvClientPort
 	cfg.InCompetingMode = inCompeteMode
 	grpcPort, err := freeport.GetFreePort()
@@ -143,7 +143,8 @@
 }
 
 func (dat *DATest) updateDeviceConcurrently(t *testing.T, da *DeviceAgent, globalWG *sync.WaitGroup) {
-	originalDevice := da.getDevice()
+	originalDevice, err := da.getDevice(context.Background())
+	assert.Nil(t, err)
 	assert.NotNil(t, originalDevice)
 	var localWG sync.WaitGroup
 
@@ -205,7 +206,7 @@
 	expectedChange.Vlan = vlan
 	expectedChange.Reason = reason
 
-	updatedDevice := da.getDevice()
+	updatedDevice, _ := da.getDevice(context.Background())
 	assert.NotNil(t, updatedDevice)
 	assert.True(t, proto.Equal(expectedChange, updatedDevice))
 
@@ -213,22 +214,24 @@
 }
 
 func TestConcurrentDevices(t *testing.T) {
-	da := newDATest()
-	assert.NotNil(t, da)
-	defer da.stopAll()
+	for i := 0; i < 2; i++ {
+		da := newDATest()
+		assert.NotNil(t, da)
+		defer da.stopAll()
 
-	// Start the Core
-	da.startCore(false)
+		// Start the Core
+		da.startCore(false)
 
-	var wg sync.WaitGroup
-	numConCurrentDeviceAgents := 20
-	for i := 0; i < numConCurrentDeviceAgents; i++ {
-		wg.Add(1)
-		a := da.createDeviceAgent(t)
-		go da.updateDeviceConcurrently(t, a, &wg)
+		var wg sync.WaitGroup
+		numConCurrentDeviceAgents := 20
+		for i := 0; i < numConCurrentDeviceAgents; i++ {
+			wg.Add(1)
+			a := da.createDeviceAgent(t)
+			go da.updateDeviceConcurrently(t, a, &wg)
+		}
+
+		wg.Wait()
 	}
-
-	wg.Wait()
 }
 
 func isFlowSliceEqual(a, b []*ofp.OfpFlowStats) bool {
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 98c45eb..0d77429 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -19,10 +19,6 @@
 import (
 	"context"
 	"errors"
-	"reflect"
-	"runtime"
-	"sync"
-
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
@@ -33,6 +29,10 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"reflect"
+	"runtime"
+	"sync"
+	"time"
 )
 
 // DeviceManager represent device manager attributes
@@ -49,7 +49,7 @@
 	clusterDataProxy        *model.Proxy
 	coreInstanceID          string
 	exitChannel             chan int
-	defaultTimeout          int64
+	defaultTimeout          time.Duration
 	devicesLoadingLock      sync.RWMutex
 	deviceLoadingInProgress map[string][]chan int
 }
@@ -65,7 +65,7 @@
 	deviceMgr.clusterDataProxy = core.clusterDataProxy
 	deviceMgr.adapterMgr = core.adapterMgr
 	deviceMgr.lockRootDeviceMap = sync.RWMutex{}
-	deviceMgr.defaultTimeout = core.config.DefaultCoreTimeout
+	deviceMgr.defaultTimeout = time.Duration(core.config.DefaultCoreTimeout) * time.Millisecond
 	deviceMgr.devicesLoadingLock = sync.RWMutex{}
 	deviceMgr.deviceLoadingInProgress = make(map[string][]chan int)
 	return &deviceMgr
@@ -121,7 +121,7 @@
 	if ok {
 		return agent.(*DeviceAgent)
 	}
-	//	Try to load into memory - loading will also create the device agent and set the device ownership
+	// Try to load into memory - loading will also create the device agent and set the device ownership
 	err := dMgr.load(ctx, deviceID)
 	if err == nil {
 		agent, ok = dMgr.deviceAgents.Load(deviceID)
@@ -172,13 +172,13 @@
 	device.Root = true
 	// Create and start a device agent for that device
 	agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
-	dMgr.addDeviceAgentToMap(agent)
 	device, err = agent.start(ctx, device)
 	if err != nil {
-		log.Errorf("Failed to start device")
+		log.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
 		sendResponse(ctx, ch, err)
 		return
 	}
+	dMgr.addDeviceAgentToMap(agent)
 
 	sendResponse(ctx, ch, device)
 }
@@ -251,19 +251,21 @@
 			// We do not need to stop the child devices as this is taken care by the state machine.
 		}
 		if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
-			agent.stop(ctx)
+			if err := agent.stop(ctx); err != nil {
+				log.Warnw("unable-to-stop-device-agent", log.Fields{"device-id": agent.deviceID, "error": err})
+			}
 			dMgr.deleteDeviceAgentFromMap(agent)
 			// Abandon the device ownership
 			err := dMgr.core.deviceOwnership.AbandonDevice(id)
 			if err != nil {
-				log.Errorw("unable-to-abandon-device", log.Fields{"error": err})
+				log.Warnw("unable-to-abandon-device", log.Fields{"error": err})
 			}
 		}
 	}
 }
 
 // RunPostDeviceDelete removes any reference of this device
-func (dMgr *DeviceManager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	log.Infow("RunPostDeviceDelete", log.Fields{"deviceId": cDevice.Id})
 	dMgr.stopManagingDevice(ctx, cDevice.Id)
 	return nil
@@ -273,7 +275,7 @@
 func (dMgr *DeviceManager) GetDevice(ctx context.Context, id string) (*voltha.Device, error) {
 	log.Debugw("GetDevice", log.Fields{"deviceid": id})
 	if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
-		return agent.getDevice(), nil
+		return agent.getDevice(ctx)
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
@@ -404,19 +406,19 @@
 		return nil, err
 	}
 	if devices != nil {
-		for _, device := range devices.([]interface{}) {
+		for _, d := range devices.([]interface{}) {
+			device := d.(*voltha.Device)
 			// If device is not in memory then set it up
-			if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
-				log.Debugw("loading-device-from-Model", log.Fields{"id": device.(*voltha.Device).Id})
-				agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+			if !dMgr.IsDeviceInCache(device.Id) {
+				log.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
+				agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 				if _, err := agent.start(ctx, nil); err != nil {
-					log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
-					agent.stop(ctx)
+					log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
 				} else {
 					dMgr.addDeviceAgentToMap(agent)
 				}
 			}
-			result.Items = append(result.Items, device.(*voltha.Device))
+			result.Items = append(result.Items, device)
 		}
 	}
 	log.Debugw("ListDevices-end", log.Fields{"len": len(result.Items)})
@@ -480,7 +482,6 @@
 				agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 				if _, err = agent.start(ctx, nil); err != nil {
 					log.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
-					agent.stop(ctx)
 				} else {
 					dMgr.addDeviceAgentToMap(agent)
 				}
@@ -554,7 +555,10 @@
 		return err
 	}
 	// Get the loaded device details
-	device := dAgent.getDevice()
+	device, err := dAgent.getDevice(ctx)
+	if err != nil {
+		return err
+	}
 
 	// If the device is in Pre-provisioning or deleted state stop here
 	if device.AdminState == voltha.AdminState_PREPROVISIONED || device.AdminState == voltha.AdminState_DELETED {
@@ -679,14 +683,20 @@
 	// to the adapter.   We will therefore bypass the adapter adapter and send the request directly to the adapter via
 	// the adapter_proxy.
 	response := utils.NewResponse()
-	go func(device *voltha.Device) {
-		if err := dMgr.adapterProxy.ReconcileDevice(ctx, device); err != nil {
-			log.Errorw("reconcile-request-failed", log.Fields{"deviceId": device.Id, "error": err})
-			response.Error(status.Errorf(codes.Internal, "device: %s", device.Id))
+	ch, err := dMgr.adapterProxy.reconcileDevice(ctx, device)
+	if err != nil {
+		response.Error(err)
+	}
+	// Wait for adapter response in its own routine
+	go func() {
+		resp, ok := <-ch
+		if !ok {
+			response.Error(status.Errorf(codes.Aborted, "channel-closed-device: %s", device.Id))
+		} else if resp.Err != nil {
+			response.Error(resp.Err)
 		}
 		response.Done()
-	}(device)
-
+	}()
 	return response
 }
 
@@ -751,14 +761,6 @@
 	return status.Errorf(codes.NotFound, "%s", deviceID)
 }
 
-func (dMgr *DeviceManager) deletePeerPorts(ctx context.Context, fromDeviceID string, deviceID string) error {
-	log.Debugw("deletePeerPorts", log.Fields{"fromDeviceId": fromDeviceID, "deviceid": deviceID})
-	if agent := dMgr.getDeviceAgent(ctx, fromDeviceID); agent != nil {
-		return agent.deletePeerPorts(ctx, deviceID)
-	}
-	return status.Errorf(codes.NotFound, "%s", deviceID)
-}
-
 func (dMgr *DeviceManager) addFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceID, "groups:": groups, "flowMetadata": flowMetadata})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
@@ -894,6 +896,7 @@
 				}
 			}()
 		}
+		return nil
 	}
 	return status.Errorf(codes.NotFound, "%s", deviceID)
 }
@@ -997,28 +1000,30 @@
 	childDevice.SerialNumber = serialNumber
 	childDevice.Root = false
 
-	//Get parent device type
-	parent, err := dMgr.GetDevice(ctx, parentDeviceID)
-	if err != nil {
-		log.Error("no-parent-found", log.Fields{"parentId": parentDeviceID})
+	// Get parent device type
+	pAgent := dMgr.getDeviceAgent(ctx, parentDeviceID)
+	if pAgent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
 	}
+	if pAgent.deviceType == "" {
+		return nil, status.Errorf(codes.FailedPrecondition, "device Type not set %s", parentDeviceID)
+	}
 
 	if device, err := dMgr.GetChildDevice(ctx, parentDeviceID, serialNumber, onuID, parentPortNo); err == nil {
 		log.Warnw("child-device-exists", log.Fields{"parentId": parentDeviceID, "serialNumber": serialNumber})
 		return device, status.Errorf(codes.AlreadyExists, "%s", serialNumber)
 	}
 
-	childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: parent.Type, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
+	childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
 
 	// Create and start a device agent for that device
 	agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
-	dMgr.addDeviceAgentToMap(agent)
-	childDevice, err = agent.start(ctx, childDevice)
+	childDevice, err := agent.start(ctx, childDevice)
 	if err != nil {
-		log.Error("error-starting-child")
+		log.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
 		return nil, err
 	}
+	dMgr.addDeviceAgentToMap(agent)
 
 	// Since this Core has handled this request then it therefore owns this child device.  Set the
 	// ownership of this device to this Core
@@ -1055,10 +1060,10 @@
 		log.Debugw("no-op-transition", log.Fields{"deviceId": current.Id})
 		return nil
 	}
-	log.Debugw("handler-found", log.Fields{"num-handlers": len(handlers), "isParent": current.Root, "current-data": current})
+	log.Debugw("handler-found", log.Fields{"num-expectedHandlers": len(handlers), "isParent": current.Root, "current-data": current})
 	for _, handler := range handlers {
 		log.Debugw("running-handler", log.Fields{"handler": funcName(handler)})
-		if err := handler(ctx, current); err != nil {
+		if err := handler(ctx, current, previous); err != nil {
 			log.Warnw("handler-failed", log.Fields{"handler": funcName(handler), "error": err})
 			return err
 		}
@@ -1104,7 +1109,7 @@
 }
 
 // CreateLogicalDevice creates logical device in core
-func (dMgr *DeviceManager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	log.Info("CreateLogicalDevice")
 	// Verify whether the logical device has already been created
 	if cDevice.ParentId != "" {
@@ -1120,7 +1125,7 @@
 }
 
 // DeleteLogicalDevice deletes logical device from core
-func (dMgr *DeviceManager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	log.Info("DeleteLogicalDevice")
 	var err error
 	if err = dMgr.logicalDeviceMgr.deleteLogicalDevice(ctx, cDevice); err != nil {
@@ -1151,10 +1156,10 @@
 }
 
 // DeleteLogicalPorts removes the logical ports associated with that deviceId
-func (dMgr *DeviceManager) DeleteLogicalPorts(ctx context.Context, device *voltha.Device) error {
-	log.Info("deleteLogicalPorts")
-	if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(ctx, device.Id); err != nil {
-		log.Warnw("deleteLogical-ports-error", log.Fields{"deviceId": device.Id})
+func (dMgr *DeviceManager) DeleteLogicalPorts(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
+	log.Debugw("delete-all-logical-ports", log.Fields{"device-id": cDevice.Id})
+	if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(ctx, cDevice.Id); err != nil {
+		log.Warnw("deleteLogical-ports-error", log.Fields{"deviceId": cDevice.Id})
 		return err
 	}
 	return nil
@@ -1180,7 +1185,7 @@
 		log.Warnw("failed-getting-device", log.Fields{"deviceId": parentDeviceID, "error": err})
 		return err
 	}
-	return dMgr.DisableAllChildDevices(ctx, parentDevice)
+	return dMgr.DisableAllChildDevices(ctx, parentDevice, nil)
 }
 
 //childDevicesDetected is invoked by an adapter when child devices are found, typically after after a
@@ -1230,15 +1235,15 @@
 */
 
 //DisableAllChildDevices is invoked as a callback when the parent device is disabled
-func (dMgr *DeviceManager) DisableAllChildDevices(ctx context.Context, parentDevice *voltha.Device) error {
+func (dMgr *DeviceManager) DisableAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device, parentPrevDevice *voltha.Device) error {
 	log.Debug("DisableAllChildDevices")
 	var childDeviceIds []string
 	var err error
-	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
-		return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
+	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentCurrDevice); err != nil {
+		return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
 	}
 	if len(childDeviceIds) == 0 {
-		log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
+		log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentCurrDevice.Id})
 	}
 	allChildDisable := true
 	for _, childDeviceID := range childDeviceIds {
@@ -1256,15 +1261,15 @@
 }
 
 //DeleteAllChildDevices is invoked as a callback when the parent device is deleted
-func (dMgr *DeviceManager) DeleteAllChildDevices(ctx context.Context, parentDevice *voltha.Device) error {
+func (dMgr *DeviceManager) DeleteAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device, parentPrevDevice *voltha.Device) error {
 	log.Debug("DeleteAllChildDevices")
 	var childDeviceIds []string
 	var err error
-	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
-		return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
+	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentCurrDevice); err != nil {
+		return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
 	}
 	if len(childDeviceIds) == 0 {
-		log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
+		log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentCurrDevice.Id})
 	}
 	allChildDeleted := true
 	for _, childDeviceID := range childDeviceIds {
@@ -1284,9 +1289,9 @@
 }
 
 //DeleteAllUNILogicalPorts is invoked as a callback when the parent device is deleted
-func (dMgr *DeviceManager) DeleteAllUNILogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
-	log.Debugw("delete-all-uni-logical-ports", log.Fields{"parent-device-id": parentDevice.Id})
-	if err := dMgr.logicalDeviceMgr.deleteAllUNILogicalPorts(ctx, parentDevice); err != nil {
+func (dMgr *DeviceManager) DeleteAllUNILogicalPorts(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error {
+	log.Debugw("delete-all-uni-logical-ports", log.Fields{"parent-device-id": curr.Id})
+	if err := dMgr.logicalDeviceMgr.deleteAllUNILogicalPorts(ctx, curr); err != nil {
 		return err
 	}
 	return nil
@@ -1325,7 +1330,7 @@
 }
 
 // SetupUNILogicalPorts creates UNI ports on the logical device that represents a child UNI interface
-func (dMgr *DeviceManager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	log.Info("addUNILogicalPort")
 	if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(ctx, cDevice); err != nil {
 		log.Warnw("addUNILogicalPort-error", log.Fields{"device": cDevice, "err": err})
@@ -1438,22 +1443,15 @@
 	return nil, status.Errorf(codes.NotFound, "%s", deviceID)
 }
 
-// SetAdminStateToEnable sets admin state of device to enabled
-func (dMgr *DeviceManager) SetAdminStateToEnable(ctx context.Context, cDevice *voltha.Device) error {
-	log.Info("SetAdminStateToEnable")
-	if agent := dMgr.getDeviceAgent(ctx, cDevice.Id); agent != nil {
-		return agent.updateAdminState(ctx, voltha.AdminState_ENABLED)
-	}
-	return status.Errorf(codes.NotFound, "%s", cDevice.Id)
-}
-
-// NotifyInvalidTransition notifies about invalid transition
-func (dMgr *DeviceManager) NotifyInvalidTransition(ctx context.Context, pcDevice *voltha.Device) error {
+func (dMgr *DeviceManager) NotifyInvalidTransition(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	log.Errorw("NotifyInvalidTransition", log.Fields{
-		"device":     pcDevice.Id,
-		"adminState": pcDevice.AdminState,
-		"operState":  pcDevice.OperStatus,
-		"connState":  pcDevice.ConnectStatus,
+		"device":           cDevice.Id,
+		"prev-admin-state": pDevice.AdminState,
+		"prev-oper-state":  pDevice.OperStatus,
+		"prev-conn-state":  pDevice.ConnectStatus,
+		"curr-admin-state": cDevice.AdminState,
+		"curr-oper-state":  cDevice.OperStatus,
+		"curr-conn-state":  cDevice.ConnectStatus,
 	})
 	//TODO: notify over kafka?
 	return nil
@@ -1488,7 +1486,7 @@
 	var res interface{}
 	if agent := dMgr.getDeviceAgent(ctx, simulatereq.Id); agent != nil {
 		res = agent.simulateAlarm(ctx, simulatereq)
-		log.Debugw("SimulateAlarm-result", log.Fields{"result": res})
+		log.Debugw("simulateAlarm-result", log.Fields{"result": res})
 	}
 	//TODO CLI always get successful response
 	sendResponse(ctx, ch, res)
@@ -1528,11 +1526,11 @@
 	sendResponse(ctx, ch, res)
 }
 
-// ChildDeviceLost  calls parent adapter to delete child device and all its references
-func (dMgr *DeviceManager) ChildDeviceLost(ctx context.Context, cDevice *voltha.Device) error {
-	log.Debugw("ChildDeviceLost", log.Fields{"deviceid": cDevice.Id})
-	if parentAgent := dMgr.getDeviceAgent(ctx, cDevice.ParentId); parentAgent != nil {
-		return parentAgent.ChildDeviceLost(ctx, cDevice)
+// childDeviceLost  calls parent adapter to delete child device and all its references
+func (dMgr *DeviceManager) ChildDeviceLost(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error {
+	log.Debugw("childDeviceLost", log.Fields{"device-id": curr.Id})
+	if parentAgent := dMgr.getDeviceAgent(ctx, curr.ParentId); parentAgent != nil {
+		return parentAgent.ChildDeviceLost(ctx, curr)
 	}
-	return status.Errorf(codes.NotFound, "%s", cDevice.Id)
+	return status.Errorf(codes.NotFound, "%s", curr.Id)
 }
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device_state_transitions.go
index 9b4acf3..69dfd78 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device_state_transitions.go
@@ -32,6 +32,35 @@
 	any    DeviceType = 2
 )
 
+type MatchResult uint8
+
+const (
+	noMatch            MatchResult = iota // current state has not match in the transition table
+	currWildcardMatch                     // current state matches the wildcard *_UNKNOWN state in the transition table
+	currStateOnlyMatch                    // current state matches the current state and previous state matches the wildcard in the transition table
+	currPrevStateMatch                    // both current and previous states match in the transition table
+)
+
+// match is used to keep the current match states
+type match struct {
+	admin, oper, conn MatchResult
+}
+
+// toInt returns an integer representing the matching level of the match (the larger the number the better)
+func (m *match) toInt() int {
+	return int(m.admin<<4 | m.oper<<2 | m.conn)
+}
+
+// isExactMatch returns true if match is an exact match
+func (m *match) isExactMatch() bool {
+	return m.admin == currPrevStateMatch && m.oper == currPrevStateMatch && m.conn == currPrevStateMatch
+}
+
+// isBetterMatch returns true if newMatch is a worse match
+func (m *match) isBetterMatch(newMatch *match) bool {
+	return m.toInt() > newMatch.toInt()
+}
+
 // DeviceState has admin, operational and connection status of device
 type DeviceState struct {
 	Admin       voltha.AdminState_Types
@@ -39,8 +68,8 @@
 	Operational voltha.OperStatus_Types
 }
 
-// TransitionHandler function type which takes device as input parameter
-type TransitionHandler func(context.Context, *voltha.Device) error
+// TransitionHandler function type which takes the current and previous device info as input parameter
+type TransitionHandler func(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
 
 // Transition represent transition related attributes
 type Transition struct {
@@ -61,7 +90,8 @@
 	var transitionMap TransitionMap
 	transitionMap.dMgr = dMgr
 	transitionMap.transitions = make([]Transition, 0)
-	transitionMap.transitions = append(transitionMap.transitions,
+	transitionMap.transitions = append(
+		transitionMap.transitions,
 		Transition{
 			deviceType:    parent,
 			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
@@ -71,12 +101,24 @@
 		Transition{
 			deviceType:    child,
 			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_DISCOVERED},
+			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
+			handlers:      []TransitionHandler{}})
+	transitionMap.transitions = append(transitionMap.transitions,
+		Transition{
+			deviceType:    child,
+			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_DISCOVERED},
 			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
 			handlers:      []TransitionHandler{dMgr.SetupUNILogicalPorts}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    child,
 			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
+			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_DISCOVERED},
+			handlers:      []TransitionHandler{}})
+	transitionMap.transitions = append(transitionMap.transitions,
+		Transition{
+			deviceType:    child,
+			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
 			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
 			handlers:      []TransitionHandler{dMgr.SetupUNILogicalPorts}})
 	transitionMap.transitions = append(transitionMap.transitions,
@@ -88,13 +130,13 @@
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    parent,
-			previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+			previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			handlers:      []TransitionHandler{dMgr.DisableAllChildDevices, dMgr.DeleteAllUNILogicalPorts, dMgr.DeleteAllChildDevices, dMgr.DeleteLogicalDevice, dMgr.RunPostDeviceDelete}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    child,
-			previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+			previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			handlers:      []TransitionHandler{dMgr.ChildDeviceLost, dMgr.DeleteLogicalPorts, dMgr.RunPostDeviceDelete}})
 	transitionMap.transitions = append(transitionMap.transitions,
@@ -178,37 +220,57 @@
 }
 
 // isMatched matches a state transition.  It returns whether there is a match and if there is whether it is an exact match
-func getHandler(previous *DeviceState, current *DeviceState, transition *Transition) ([]TransitionHandler, bool) {
-
+func getHandler(previous *DeviceState, current *DeviceState, transition *Transition) ([]TransitionHandler, *match) {
+	m := &match{}
 	// Do we have an exact match?
 	if *previous == transition.previousState && *current == transition.currentState {
-		return transition.handlers, true
+		return transition.handlers, &match{admin: currPrevStateMatch, oper: currPrevStateMatch, conn: currPrevStateMatch}
 	}
 
-	// Admin states must match
-	if previous.Admin != transition.previousState.Admin || current.Admin != transition.currentState.Admin {
-		return nil, false
+	// Do we have Admin state match?
+	if current.Admin == transition.currentState.Admin && transition.currentState.Admin != voltha.AdminState_UNKNOWN {
+		if previous.Admin == transition.previousState.Admin {
+			m.admin = currPrevStateMatch
+		} else if transition.previousState.Admin == voltha.AdminState_UNKNOWN {
+			m.admin = currStateOnlyMatch
+		}
+	} else if current.Admin == transition.currentState.Admin && transition.currentState.Admin == voltha.AdminState_UNKNOWN {
+		if previous.Admin == transition.previousState.Admin || transition.previousState.Admin == voltha.AdminState_UNKNOWN {
+			m.admin = currWildcardMatch
+		}
+	}
+	if m.admin == noMatch {
+		// invalid transition - need to match on current admin state
+		return nil, m
 	}
 
-	// If the admin state changed then prioritize it first
-	if previous.Admin != current.Admin {
-		if previous.Admin == transition.previousState.Admin && current.Admin == transition.currentState.Admin {
-			return transition.handlers, false
+	// Do we have an operational state match?
+	if current.Operational == transition.currentState.Operational && transition.previousState.Operational != voltha.OperStatus_UNKNOWN {
+		if previous.Operational == transition.previousState.Operational || transition.previousState.Operational == voltha.OperStatus_UNKNOWN {
+			m.oper = currPrevStateMatch
+		} else {
+			m.oper = currStateOnlyMatch
+		}
+	} else if current.Operational == transition.currentState.Operational && transition.previousState.Operational == voltha.OperStatus_UNKNOWN {
+		if previous.Operational == transition.previousState.Operational || transition.previousState.Operational == voltha.OperStatus_UNKNOWN {
+			m.oper = currWildcardMatch
 		}
 	}
-	// If the operational state changed then prioritize it in second position
-	if previous.Operational != current.Operational {
-		if previous.Operational == transition.previousState.Operational && current.Operational == transition.currentState.Operational {
-			return transition.handlers, false
+
+	// Do we have an connection state match?
+	if current.Connection == transition.currentState.Connection && transition.previousState.Connection != voltha.ConnectStatus_UNKNOWN {
+		if previous.Connection == transition.previousState.Connection || transition.previousState.Connection == voltha.ConnectStatus_UNKNOWN {
+			m.conn = currPrevStateMatch
+		} else {
+			m.conn = currStateOnlyMatch
+		}
+	} else if current.Connection == transition.currentState.Connection && transition.previousState.Connection == voltha.ConnectStatus_UNKNOWN {
+		if previous.Connection == transition.previousState.Connection || transition.previousState.Connection == voltha.ConnectStatus_UNKNOWN {
+			m.conn = currWildcardMatch
 		}
 	}
-	// If the connection state changed then prioritize it in third position
-	if previous.Connection != current.Connection {
-		if previous.Connection == transition.previousState.Connection && current.Connection == transition.currentState.Connection {
-			return transition.handlers, false
-		}
-	}
-	return nil, false
+
+	return transition.handlers, m
 }
 
 // GetTransitionHandler returns transition handler
@@ -216,6 +278,12 @@
 	//1. Get the previous and current set of states
 	pState := getDeviceStates(pDevice)
 	cState := getDeviceStates(cDevice)
+
+	// Do nothing is there are no states change
+	if *pState == *cState {
+		return nil
+	}
+
 	//log.Infow("DeviceType", log.Fields{"device": pDevice})
 	deviceType := parent
 	if !pDevice.Root {
@@ -227,22 +295,20 @@
 	//2. Go over transition array to get the right transition
 	var currentMatch []TransitionHandler
 	var tempHandler []TransitionHandler
-	var exactStateMatch bool
-	var stateMatchFound bool
+	var m *match
+	bestMatch := &match{}
 	for _, aTransition := range tMap.transitions {
 		// consider transition only if it matches deviceType or is a wild card - any
 		if aTransition.deviceType != deviceType && aTransition.deviceType != any {
 			continue
 		}
-		tempHandler, exactStateMatch = getHandler(pState, cState, &aTransition)
+		tempHandler, m = getHandler(pState, cState, &aTransition)
 		if tempHandler != nil {
-			if exactStateMatch && aTransition.deviceType == deviceType {
+			if m.isExactMatch() && aTransition.deviceType == deviceType {
 				return tempHandler
-			} else if exactStateMatch {
+			} else if m.isExactMatch() || m.isBetterMatch(bestMatch) {
 				currentMatch = tempHandler
-				stateMatchFound = true
-			} else if currentMatch == nil && !stateMatchFound {
-				currentMatch = tempHandler
+				bestMatch = m
 			}
 		}
 	}
diff --git a/rw_core/core/device_state_transitions_test.go b/rw_core/core/device_state_transitions_test.go
index 15d72f3..8eecd70 100644
--- a/rw_core/core/device_state_transitions_test.go
+++ b/rw_core/core/device_state_transitions_test.go
@@ -16,6 +16,7 @@
 package core
 
 import (
+	"fmt"
 	"reflect"
 	"testing"
 
@@ -169,83 +170,69 @@
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
-	from = getDevice(true, voltha.AdminState_DISABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
-	to = getDevice(true, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
-	handlers = transitionMap.GetTransitionHandler(from, to)
-	assert.Equal(t, 5, len(handlers))
-	assert.True(t, reflect.ValueOf(tdm.DisableAllChildDevices).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteAllUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteAllChildDevices).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteLogicalDevice).Pointer() == reflect.ValueOf(handlers[3]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[4]).Pointer())
+	var deleteDeviceTest = struct {
+		from                   []*voltha.Device
+		to                     []*voltha.Device
+		expectedParentHandlers []TransitionHandler
+		expectedChildHandlers  []TransitionHandler
+	}{
+		from: []*voltha.Device{
+			getDevice(false, voltha.AdminState_DISABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_FAILED),
+			getDevice(false, voltha.AdminState_UNKNOWN, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN),
+			getDevice(false, voltha.AdminState_DOWNLOADING_IMAGE, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN),
+			getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN),
+			getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE),
+			getDevice(false, voltha.AdminState_DISABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_UNKNOWN),
+			getDevice(false, voltha.AdminState_DISABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN),
+		},
+		to: []*voltha.Device{
+			getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN),
+			getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN),
+			getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_FAILED),
+		},
+		expectedParentHandlers: []TransitionHandler{
+			tdm.DisableAllChildDevices,
+			tdm.DeleteAllUNILogicalPorts,
+			tdm.DeleteAllChildDevices,
+			tdm.DeleteLogicalDevice,
+			tdm.RunPostDeviceDelete,
+		},
+		expectedChildHandlers: []TransitionHandler{
+			tdm.ChildDeviceLost,
+			tdm.DeleteLogicalPorts,
+			tdm.RunPostDeviceDelete,
+		},
+	}
 
-	from = getDevice(true, voltha.AdminState_DISABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_UNKNOWN)
-	to = getDevice(true, voltha.AdminState_DELETED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
-	handlers = transitionMap.GetTransitionHandler(from, to)
-	assert.Equal(t, 5, len(handlers))
-	assert.True(t, reflect.ValueOf(tdm.DisableAllChildDevices).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteAllUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteAllChildDevices).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteLogicalDevice).Pointer() == reflect.ValueOf(handlers[3]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[4]).Pointer())
+	testName := "delete-parent-device-post-provisioning"
+	for _, from := range deleteDeviceTest.from {
+		from.Root = true
+		for _, to := range deleteDeviceTest.to {
+			to.Root = true
+			t.Run(testName, func(t *testing.T) {
+				handlers = transitionMap.GetTransitionHandler(from, to)
+				assert.Equal(t, 5, len(handlers))
+				for idx, expHandler := range deleteDeviceTest.expectedParentHandlers {
+					assert.True(t, reflect.ValueOf(expHandler).Pointer() == reflect.ValueOf(handlers[idx]).Pointer())
+				}
+			})
+		}
+	}
 
-	from = getDevice(true, voltha.AdminState_DISABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
-	to = getDevice(true, voltha.AdminState_DELETED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_FAILED)
-	handlers = transitionMap.GetTransitionHandler(from, to)
-	assert.Equal(t, 5, len(handlers))
-	assert.True(t, reflect.ValueOf(tdm.DisableAllChildDevices).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteAllUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteAllChildDevices).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteLogicalDevice).Pointer() == reflect.ValueOf(handlers[3]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[4]).Pointer())
-
-	from = getDevice(false, voltha.AdminState_DISABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
-	to = getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
-	handlers = transitionMap.GetTransitionHandler(from, to)
-	assert.Equal(t, 3, len(handlers))
-	assert.True(t, reflect.ValueOf(tdm.ChildDeviceLost).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteLogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
-
-	from = getDevice(false, voltha.AdminState_DISABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_UNKNOWN)
-	to = getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
-	handlers = transitionMap.GetTransitionHandler(from, to)
-	assert.Equal(t, 3, len(handlers))
-	assert.True(t, reflect.ValueOf(tdm.ChildDeviceLost).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteLogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
-
-	from = getDevice(false, voltha.AdminState_DISABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
-	to = getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_FAILED)
-	handlers = transitionMap.GetTransitionHandler(from, to)
-	assert.Equal(t, 3, len(handlers))
-	assert.True(t, reflect.ValueOf(tdm.ChildDeviceLost).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteLogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
-
-	from = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
-	to = getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
-	handlers = transitionMap.GetTransitionHandler(from, to)
-	assert.Equal(t, 3, len(handlers))
-	assert.True(t, reflect.ValueOf(tdm.ChildDeviceLost).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteLogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
-
-	from = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_UNKNOWN)
-	to = getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
-	handlers = transitionMap.GetTransitionHandler(from, to)
-	assert.Equal(t, 3, len(handlers))
-	assert.True(t, reflect.ValueOf(tdm.ChildDeviceLost).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteLogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
-
-	from = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
-	to = getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_FAILED)
-	handlers = transitionMap.GetTransitionHandler(from, to)
-	assert.Equal(t, 3, len(handlers))
-	assert.True(t, reflect.ValueOf(tdm.ChildDeviceLost).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.DeleteLogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
-	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
+	testName = "delete-child-device"
+	for _, from := range deleteDeviceTest.from {
+		from.Root = false
+		for _, to := range deleteDeviceTest.to {
+			to.Root = false
+			t.Run(testName, func(t *testing.T) {
+				handlers = transitionMap.GetTransitionHandler(from, to)
+				assert.Equal(t, 3, len(handlers))
+				for idx, expHandler := range deleteDeviceTest.expectedChildHandlers {
+					assert.True(t, reflect.ValueOf(expHandler).Pointer() == reflect.ValueOf(handlers[idx]).Pointer())
+				}
+			})
+		}
+	}
 }
 
 func TestInvalidTransitions(t *testing.T) {
@@ -303,3 +290,9 @@
 	to = getDevice(false, voltha.AdminState_PREPROVISIONED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVATING)
 	assertNoOpTransition(t, from, to)
 }
+
+func TestMatch(t *testing.T) {
+	best := &match{admin: currPrevStateMatch, oper: currPrevStateMatch, conn: currPrevStateMatch}
+	m := &match{admin: currStateOnlyMatch, oper: currWildcardMatch, conn: currWildcardMatch}
+	fmt.Println(m.isBetterMatch(best), m.toInt(), best.toInt())
+}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index eaf8fac..0196f35 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -23,6 +23,7 @@
 	"errors"
 	"io"
 	"sync"
+	"time"
 
 	"github.com/golang/protobuf/ptypes/empty"
 	da "github.com/opencord/voltha-go/common/core/northbound/grpc"
@@ -58,8 +59,8 @@
 	packetInQueueDone         chan bool
 	changeEventQueueDone      chan bool
 	coreInCompetingMode       bool
-	longRunningRequestTimeout int64
-	defaultRequestTimeout     int64
+	longRunningRequestTimeout time.Duration
+	defaultRequestTimeout     time.Duration
 	da.DefaultAPIHandler
 	core *Core
 }
@@ -138,7 +139,7 @@
 // timeout value (in the event this Core dies the transaction times out in the dB causing the other Core in the
 // core-pair to proceed with the it).  If the device is not owned then this Core will just monitor the transaction
 // for potential timeouts.
-func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
+func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...time.Duration) (*KVTransaction, error) {
 	timeout := handler.defaultRequestTimeout
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
@@ -154,9 +155,9 @@
 			log.Warnw("getting-ownership-failed", log.Fields{"deviceId": id, "error": err})
 			return nil, errorIDNotFound
 		}
-		acquired, err = txn.Acquired(ctx, timeout, ownedByMe)
+		acquired, err = txn.Acquired(ctx, timeout.Milliseconds(), ownedByMe)
 	} else {
-		acquired, err = txn.Acquired(ctx, timeout)
+		acquired, err = txn.Acquired(ctx, timeout.Milliseconds())
 	}
 	if err == nil && acquired {
 		log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnID})
@@ -366,7 +367,7 @@
 		if handler.isOFControllerRequest(ctx) {
 			//	Since an OF controller is only interested in the set of logical devices managed by thgis Core then return
 			//	only logical devices managed/monitored by this Core.
-			return handler.logicalDeviceMgr.listManagedLogicalDevices()
+			return handler.logicalDeviceMgr.listManagedLogicalDevices(ctx)
 		}
 	}
 	return handler.logicalDeviceMgr.listLogicalDevices(ctx)
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index 1b956d2..662302c 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -83,8 +83,8 @@
 	defer cancel()
 	cfg := config.NewRWCoreFlags()
 	cfg.CorePairTopic = "rw_core"
-	cfg.DefaultRequestTimeout = nb.defaultTimeout.Nanoseconds() / 1000000 //TODO: change when Core changes to Duration
-	cfg.DefaultCoreTimeout = nb.defaultTimeout.Nanoseconds() / 1000000
+	cfg.DefaultRequestTimeout = nb.defaultTimeout
+	cfg.DefaultCoreTimeout = nb.defaultTimeout
 	cfg.KVStorePort = nb.kvClientPort
 	cfg.InCompetingMode = inCompeteMode
 	grpcPort, err := freeport.GetFreePort()
@@ -287,9 +287,9 @@
 	assert.NotNil(t, devices)
 	assert.Equal(t, 0, len(devices.Items))
 	adapters, err := nbi.ListAdapters(getContext(), &empty.Empty{})
+	assert.Equal(t, 0, len(adapters.Items))
 	assert.Nil(t, err)
 	assert.NotNil(t, adapters)
-	assert.Equal(t, 0, len(adapters.Items))
 }
 
 func (nb *NBTest) testAdapterRegistration(t *testing.T, nbi *APIHandler) {
@@ -362,7 +362,7 @@
 	var vFunction isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
 		return devices != nil && len(devices.Items) == 0
 	}
-	err = waitUntilConditionForDevices(5*time.Second, nbi, vFunction)
+	err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunction)
 	assert.Nil(t, err)
 }
 
@@ -385,7 +385,7 @@
 	var vdFunction isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
 		return devices != nil && len(devices.Items) == 0
 	}
-	err = waitUntilConditionForDevices(5*time.Second, nbi, vdFunction)
+	err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vdFunction)
 	assert.Nil(t, err)
 
 	// Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
@@ -884,6 +884,7 @@
 
 	// Listen for port events
 	processedLogicalPorts := 0
+	start := time.Now()
 	for event := range nbi.changeEventQueue {
 		startingVlan++
 		if portStatus, ok := (event.Event).(*ofp.ChangeEvent_PortStatus); ok {
@@ -896,6 +897,7 @@
 			}
 		}
 		if processedLogicalPorts >= numNNIPorts+numUNIPorts {
+			fmt.Println("Total time to send all flows:", time.Since(start))
 			break
 		}
 	}
@@ -930,6 +932,8 @@
 	}
 	defer pprof.StopCPUProfile()
 
+	//log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
+
 	nb := newNBTest()
 	assert.NotNil(t, nb)
 
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index dfc3907..4e029c8 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -21,14 +21,10 @@
 	"encoding/hex"
 	"errors"
 	"fmt"
-	"github.com/opencord/voltha-go/rw_core/route"
-	"reflect"
-	"sync"
-	"time"
-
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/db/model"
 	fd "github.com/opencord/voltha-go/rw_core/flowdecomposition"
+	"github.com/opencord/voltha-go/rw_core/route"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -37,6 +33,13 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"reflect"
+	"sync"
+	"time"
+)
+
+const (
+	maxOrderedLogicalDeviceRequestQueueSize = 1000
 )
 
 // LogicalDeviceAgent represent attributes of logical device agent
@@ -53,19 +56,20 @@
 	meterProxy         *model.Proxy
 	ldProxy            *model.Proxy
 	portProxies        map[string]*model.Proxy
-	portProxiesLock    sync.RWMutex
-	lockLogicalDevice  sync.RWMutex
 	lockDeviceRoutes   sync.RWMutex
 	logicalPortsNo     map[uint32]bool //value is true for NNI port
 	lockLogicalPortsNo sync.RWMutex
 	flowDecomposer     *fd.FlowDecomposer
-	defaultTimeout     int64
+	defaultTimeout     time.Duration
 	logicalDevice      *voltha.LogicalDevice
+	requestQueue       *coreutils.RequestQueue
+	startOnce          sync.Once
+	stopOnce           sync.Once
 }
 
 func newLogicalDeviceAgent(id string, deviceID string, ldeviceMgr *LogicalDeviceManager,
 	deviceMgr *DeviceManager,
-	cdProxy *model.Proxy, timeout int64) *LogicalDeviceAgent {
+	cdProxy *model.Proxy, timeout time.Duration) *LogicalDeviceAgent {
 	var agent LogicalDeviceAgent
 	agent.exitChannel = make(chan int, 1)
 	agent.logicalDeviceID = id
@@ -74,27 +78,40 @@
 	agent.clusterDataProxy = cdProxy
 	agent.ldeviceMgr = ldeviceMgr
 	agent.flowDecomposer = fd.NewFlowDecomposer(agent.deviceMgr)
-	agent.lockLogicalDevice = sync.RWMutex{}
 	agent.portProxies = make(map[string]*model.Proxy)
-	agent.portProxiesLock = sync.RWMutex{}
-	agent.lockLogicalPortsNo = sync.RWMutex{}
-	agent.lockDeviceRoutes = sync.RWMutex{}
 	agent.logicalPortsNo = make(map[uint32]bool)
 	agent.defaultTimeout = timeout
+	agent.requestQueue = coreutils.NewRequestQueue(agent.logicalDeviceID, maxOrderedLogicalDeviceRequestQueueSize)
 	return &agent
 }
 
 // start creates the logical device and add it to the data model
-func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromdB bool) error {
-	log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceID, "loadFromdB": loadFromdB})
+func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromDB bool) error {
+	needToStart := false
+	if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
+		return nil
+	}
+
+	log.Infow("starting-logical_device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
+
+	var startSucceeded bool
+	defer func() {
+		if !startSucceeded {
+			if err := agent.stop(ctx); err != nil {
+				log.Errorw("failed-to-cleanup-after-unsuccessful-start", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+			}
+		}
+	}()
+
+	// Launch the request queue - it will launch a go routine
+	agent.requestQueue.Start()
+
 	var ld *voltha.LogicalDevice
-	var err error
-	if !loadFromdB {
+	if !loadFromDB {
 		//Build the logical device based on information retrieved from the device adapter
 		var switchCap *ic.SwitchCapability
 		var err error
 		if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceID); err != nil {
-			log.Errorw("error-creating-logical-device", log.Fields{"error": err})
 			return err
 		}
 		ld = &voltha.LogicalDevice{Id: agent.logicalDeviceID, RootDeviceId: agent.rootDeviceID}
@@ -102,7 +119,6 @@
 		// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
 		var datapathID uint64
 		if datapathID, err = CreateDataPathID(agent.logicalDeviceID); err != nil {
-			log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
 			return err
 		}
 		ld.DatapathId = datapathID
@@ -113,42 +129,35 @@
 		ld.FlowGroups = &ofp.FlowGroups{Items: nil}
 		ld.Ports = []*voltha.LogicalPort{}
 
-		agent.lockLogicalDevice.Lock()
 		// Save the logical device
 		added, err := agent.clusterDataProxy.AddWithID(ctx, "/logical_devices", ld.Id, ld, "")
 		if err != nil {
-			log.Errorw("failed-to-save-logical-devices-to-cluster-proxy", log.Fields{"error": err})
-			agent.lockLogicalDevice.Unlock()
 			return err
 		}
 		if added == nil {
-			log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+			log.Errorw("failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
 		} else {
-			log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+			log.Debugw("logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
 		}
 
 		agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
-		agent.lockLogicalDevice.Unlock()
 
-		// TODO:  Set the logical ports in a separate call once the port update issue is fixed.
+		// Setup the logicalports - internal processing, no need to propagate the client context
 		go func() {
-			err := agent.setupLogicalPorts(ctx)
+			err := agent.setupLogicalPorts(context.Background())
 			if err != nil {
 				log.Errorw("unable-to-setup-logical-ports", log.Fields{"error": err})
 			}
 		}()
-
 	} else {
 		//	load from dB - the logical may not exist at this time.  On error, just return and the calling function
 		// will destroy this agent.
-		agent.lockLogicalDevice.Lock()
 		logicalDevice, err := agent.clusterDataProxy.Get(ctx, "/logical_devices/"+agent.logicalDeviceID, 0, true, "")
 		if err != nil {
-			return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
+			return err
 		}
 		ld, ok := logicalDevice.(*voltha.LogicalDevice)
 		if !ok {
-			agent.lockLogicalDevice.Unlock()
 			return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
 		}
 		// Update the root device Id
@@ -157,20 +166,16 @@
 		// Update the last data
 		agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
 
-		agent.lockLogicalDevice.Unlock()
-
 		// Setup the local list of logical ports
 		agent.addLogicalPortsToMap(ld.Ports)
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
 
+	var err error
 	agent.flowProxy, err = agent.clusterDataProxy.CreateProxy(
 		ctx,
 		fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceID),
 		false)
 	if err != nil {
-		log.Errorw("failed-to-create-flow-proxy", log.Fields{"error": err})
 		return err
 	}
 	agent.meterProxy, err = agent.clusterDataProxy.CreateProxy(
@@ -201,89 +206,118 @@
 	if agent.ldProxy != nil {
 		agent.ldProxy.RegisterCallback(model.PostUpdate, agent.portUpdated)
 	} else {
-		log.Errorw("logical-device-proxy-null", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 		return status.Error(codes.Internal, "logical-device-proxy-null")
 	}
 
 	// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
-	if loadFromdB {
+	if loadFromDB {
 		go func() {
 			if err := agent.buildRoutes(context.Background()); err != nil {
-				log.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+				log.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
 			}
 		}()
 	}
+	startSucceeded = true
+
 	return nil
 }
 
-// stop stops the logical devuce agent.  This removes the logical device from the data model.
+// stop stops the logical device agent.  This removes the logical device from the data model.
 func (agent *LogicalDeviceAgent) stop(ctx context.Context) error {
-	log.Info("stopping-logical_device-agent")
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	var returnErr error
+	agent.stopOnce.Do(func() {
+		log.Info("stopping-logical_device-agent")
 
-	//Remove the logical device from the model
-	if removed, err := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceID, ""); err != nil {
-		log.Errorw("failed-to-remove-device", log.Fields{"error": err})
-		return err
-	} else if removed == nil {
-		log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
-	} else {
-		log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
-	}
-	agent.exitChannel <- 1
-	log.Info("logical_device-agent-stopped")
-	return nil
+		if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+			// This should never happen - an error is returned only if the agent is stopped and an agent is only stopped once.
+			returnErr = err
+			return
+		}
+		defer agent.requestQueue.RequestComplete()
+
+		//Remove the logical device from the model
+		if removed, err := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceID, ""); err != nil {
+			returnErr = err
+		} else if removed == nil {
+			returnErr = status.Errorf(codes.Aborted, "failed-to-remove-logical-ldevice-%s", agent.logicalDeviceID)
+		} else {
+			log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+		}
+
+		// Stop the request queue and request complete indication
+		agent.requestQueue.Stop()
+
+		close(agent.exitChannel)
+
+		log.Info("logical_device-agent-stopped")
+	})
+	return returnErr
 }
 
 // GetLogicalDevice returns the latest logical device data
-func (agent *LogicalDeviceAgent) GetLogicalDevice() *voltha.LogicalDevice {
-	agent.lockLogicalDevice.RLock()
-	defer agent.lockLogicalDevice.RUnlock()
-
-	return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice)
+func (agent *LogicalDeviceAgent) GetLogicalDevice(ctx context.Context) (*voltha.LogicalDevice, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice), nil
 }
 
 // ListLogicalDeviceFlows returns logical device flows
-func (agent *LogicalDeviceAgent) ListLogicalDeviceFlows() *ofp.Flows {
+func (agent *LogicalDeviceAgent) ListLogicalDeviceFlows(ctx context.Context) (*ofp.Flows, error) {
 	log.Debug("ListLogicalDeviceFlows")
 
-	logicalDevice := agent.GetLogicalDevice()
-	if logicalDevice.Flows == nil {
-		return &ofp.Flows{}
+	logicalDevice, err := agent.GetLogicalDevice(ctx)
+	if err != nil {
+		return nil, err
 	}
-	return (proto.Clone(logicalDevice.Flows)).(*ofp.Flows)
+	if logicalDevice.Flows == nil {
+		return &ofp.Flows{}, nil
+	}
+	return (proto.Clone(logicalDevice.Flows)).(*ofp.Flows), nil
 }
 
 // ListLogicalDeviceMeters returns logical device meters
-func (agent *LogicalDeviceAgent) ListLogicalDeviceMeters() *ofp.Meters {
+func (agent *LogicalDeviceAgent) ListLogicalDeviceMeters(ctx context.Context) (*ofp.Meters, error) {
 	log.Debug("ListLogicalDeviceMeters")
 
-	logicalDevice := agent.GetLogicalDevice()
-	if logicalDevice.Meters == nil {
-		return &ofp.Meters{}
+	logicalDevice, err := agent.GetLogicalDevice(ctx)
+	if err != nil {
+		return nil, err
 	}
-	return (proto.Clone(logicalDevice.Meters)).(*ofp.Meters)
+	if logicalDevice.Meters == nil {
+		return &ofp.Meters{}, nil
+	}
+	return (proto.Clone(logicalDevice.Meters)).(*ofp.Meters), nil
 }
 
 // ListLogicalDeviceFlowGroups returns logical device flow groups
-func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups() *ofp.FlowGroups {
+func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups(ctx context.Context) (*ofp.FlowGroups, error) {
 	log.Debug("ListLogicalDeviceFlowGroups")
 
-	logicalDevice := agent.GetLogicalDevice()
-	if logicalDevice.FlowGroups == nil {
-		return &ofp.FlowGroups{}
+	logicalDevice, err := agent.GetLogicalDevice(ctx)
+	if err != nil {
+		return nil, err
 	}
-	return (proto.Clone(logicalDevice.FlowGroups)).(*ofp.FlowGroups)
+	if logicalDevice.FlowGroups == nil {
+		return &ofp.FlowGroups{}, nil
+	}
+	return (proto.Clone(logicalDevice.FlowGroups)).(*ofp.FlowGroups), nil
 }
 
 // ListLogicalDevicePorts returns logical device ports
-func (agent *LogicalDeviceAgent) ListLogicalDevicePorts() *voltha.LogicalPorts {
+func (agent *LogicalDeviceAgent) ListLogicalDevicePorts(ctx context.Context) (*voltha.LogicalPorts, error) {
 	log.Debug("ListLogicalDevicePorts")
-	logicalDevice := agent.GetLogicalDevice()
+	logicalDevice, err := agent.GetLogicalDevice(ctx)
+	if err != nil {
+		return nil, err
+	}
+	if logicalDevice == nil {
+		return &voltha.LogicalPorts{}, nil
+	}
 	lPorts := make([]*voltha.LogicalPort, 0)
 	lPorts = append(lPorts, logicalDevice.Ports...)
-	return &voltha.LogicalPorts{Items: lPorts}
+	return &voltha.LogicalPorts{Items: lPorts}, nil
 }
 
 //updateLogicalDeviceFlowsWithoutLock updates the logical device with the latest flows in the model.
@@ -379,7 +413,7 @@
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		go func(child *voltha.Device) {
-			if err = agent.setupUNILogicalPorts(ctx, child); err != nil {
+			if err = agent.setupUNILogicalPorts(context.Background(), child); err != nil {
 				log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
 				response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
 			}
@@ -420,8 +454,10 @@
 // updatePortState updates the port state of the device
 func (agent *LogicalDeviceAgent) updatePortState(ctx context.Context, deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
 	log.Infow("updatePortState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	// Get the latest logical device info
 	cloned := agent.getLogicalDeviceWithoutLock()
 	for idx, lPort := range cloned.Ports {
@@ -447,8 +483,10 @@
 // updatePortsState updates the ports state related to the device
 func (agent *LogicalDeviceAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
 	log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	// Get the latest logical device info
 	cloned := agent.getLogicalDeviceWithoutLock()
 	for _, lport := range cloned.Ports {
@@ -494,8 +532,10 @@
 // deleteAllLogicalPorts deletes all logical ports associated with this device
 func (agent *LogicalDeviceAgent) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
 	log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	// Get the latest logical device info
 	ld := agent.getLogicalDeviceWithoutLock()
 
@@ -522,8 +562,10 @@
 // deleteAllUNILogicalPorts deletes all UNI logical ports associated with this parent device
 func (agent *LogicalDeviceAgent) deleteAllUNILogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
 	log.Debugw("delete-all-uni-logical-ports", log.Fields{"logical-device-id": agent.logicalDeviceID})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	// Get the latest logical device info
 	ld := agent.getLogicalDeviceWithoutLock()
 
@@ -558,7 +600,9 @@
 	if afterUpdate == nil {
 		return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceID)
 	}
-	agent.logicalDevice = (proto.Clone(logicalDevice)).(*voltha.LogicalDevice)
+	//agent.logicalDevice = (proto.Clone(logicalDevice)).(*voltha.LogicalDevice)
+	agent.logicalDevice = logicalDevice
+
 	return nil
 }
 
@@ -568,12 +612,15 @@
 	agent.lockDeviceRoutes.Lock()
 	defer agent.lockDeviceRoutes.Unlock()
 
-	ld := agent.GetLogicalDevice()
+	ld, err := agent.GetLogicalDevice(ctx)
+	if err != nil {
+		return err
+	}
 
 	if agent.deviceRoutes != nil && agent.deviceRoutes.IsUpToDate(ld) {
 		return nil
 	}
-	log.Debug("Generation of device graph required")
+	log.Debug("Generation of device route required")
 	if err := agent.buildRoutes(ctx); err != nil {
 		return err
 	}
@@ -651,9 +698,10 @@
 	if meterMod == nil {
 		return nil
 	}
-	log.Debug("Waiting for logical device lock!!")
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	log.Debug("Acquired logical device lock")
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -686,8 +734,10 @@
 	if meterMod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -742,8 +792,10 @@
 	if meterMod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -831,8 +883,10 @@
 	if mod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -904,11 +958,6 @@
 		}
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.addDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
-			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-			return err
-		}
-
 		//	Update model
 		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
 			log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
@@ -930,7 +979,17 @@
 
 			}
 		}
+		// Send the flows to the devices
+		respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &flowMetadata)
 
+		// Create the go routines to wait
+		go func() {
+			// Wait for completion
+			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
+				log.Warnw("failure-to-add-flows", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
+				// TODO : revert added flow
+			}
+		}()
 	}
 	return nil
 }
@@ -970,8 +1029,10 @@
 	if mod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -1027,29 +1088,38 @@
 		}
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.deleteDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
-			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-			return err
-		}
-
 		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: toKeep}); err != nil {
 			log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
+
+		// Update the devices
+		respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+
+		// Wait for the responses
+		go func() {
+			// Wait for completion
+			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+				log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+				// TODO: Revert the flow deletion
+			}
+		}()
 	}
 
 	//TODO: send announcement on delete
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
-	log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
+func (agent *LogicalDeviceAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+	log.Debugw("send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
 
 	responses := make([]coreutils.Response, 0)
 	for deviceID, value := range deviceRules.GetRules() {
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
+			ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+			defer cancel()
 			if err := agent.deviceMgr.addFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
 				response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
@@ -1057,21 +1127,20 @@
 			response.Done()
 		}(deviceID, value)
 	}
-	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
-		return status.Errorf(codes.Aborted, "errors-%s", res)
-	}
-	return nil
+	// Return responses (an array of channels) for the caller to wait for a response from the far end.
+	return responses
 }
 
-func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
-	log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
+func (agent *LogicalDeviceAgent) deleteFlowsAndGroupsFromDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+	log.Debugw("send-delete-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
 
 	responses := make([]coreutils.Response, 0)
 	for deviceID, value := range deviceRules.GetRules() {
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
+			ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+			defer cancel()
 			if err := agent.deviceMgr.deleteFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
 				response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
@@ -1079,21 +1148,19 @@
 			response.Done()
 		}(deviceID, value)
 	}
-	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
-		return status.Errorf(codes.Aborted, "errors-%s", res)
-	}
-	return nil
+	return responses
 }
 
-func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
-	log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
+func (agent *LogicalDeviceAgent) updateFlowsAndGroupsOfDevice(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+	log.Debugw("send-update-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
 
 	responses := make([]coreutils.Response, 0)
 	for deviceID, value := range deviceRules.GetRules() {
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
+			ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+			defer cancel()
 			if err := agent.deviceMgr.updateFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
 				response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
@@ -1101,11 +1168,7 @@
 			response.Done()
 		}(deviceID, value)
 	}
-	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
-		return status.Errorf(codes.Aborted, "errors-%s", res)
-	}
-	return nil
+	return responses
 }
 
 //flowDeleteStrict deletes a flow from the flow table of that logical device
@@ -1114,8 +1177,10 @@
 	if mod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -1172,15 +1237,21 @@
 		}
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.deleteDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
-			log.Errorw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-			return err
-		}
-
 		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
 			log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
+
+		// Update the devices
+		respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+
+		// Wait for completion
+		go func() {
+			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+				log.Warnw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+				//TODO: Revert flow changes
+			}
+		}()
 	}
 	return nil
 }
@@ -1200,8 +1271,10 @@
 	if groupMod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -1216,19 +1289,25 @@
 		deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
 
 		log.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
-		if err := agent.addDeviceFlowsAndGroups(ctx, deviceRules, &voltha.FlowMetadata{}); err != nil {
-			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-			return err
-		}
 
 		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
-	} else {
-		return fmt.Errorf("Groups %d already present", groupMod.GroupId)
+
+		// Update the devices
+		respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
+
+		// Wait for completion
+		go func() {
+			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+				log.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+				//TODO: Revert flow changes
+			}
+		}()
+		return nil
 	}
-	return nil
+	return fmt.Errorf("Groups %d already present", groupMod.GroupId)
 }
 
 func (agent *LogicalDeviceAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
@@ -1236,8 +1315,10 @@
 	if groupMod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 	groups := lDevice.FlowGroups.Items
@@ -1266,23 +1347,29 @@
 		}
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.updateDeviceFlowsAndGroups(ctx, deviceRules, nil); err != nil {
-			log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-			return err
+		if groupsChanged {
+			if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
+				log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+				return err
+			}
 		}
-	}
+		if flowsChanged {
+			if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
+				log.Errorw("cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+				return err
+			}
+		}
 
-	if groupsChanged {
-		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
-			log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-			return err
-		}
-	}
-	if flowsChanged {
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
-			log.Errorw("cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-			return err
-		}
+		// Update the devices
+		respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, nil)
+
+		// Wait for completion
+		go func() {
+			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+				log.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+				//TODO: Revert flow changes
+			}
+		}()
 	}
 	return nil
 }
@@ -1292,8 +1379,10 @@
 	if groupMod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 	groups := lDevice.FlowGroups.Items
@@ -1315,24 +1404,32 @@
 		deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
 
 		log.Debugw("rules", log.Fields{"rules for group-modify": deviceRules.String()})
-		if err := agent.updateDeviceFlowsAndGroups(ctx, deviceRules, &voltha.FlowMetadata{}); err != nil {
-			log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-			return err
-		}
 
-		//lDevice.FlowGroups.Items = groups
 		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
+
+		// Update the devices
+		respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, &voltha.FlowMetadata{})
+
+		// Wait for completion
+		go func() {
+			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+				log.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+				//TODO: Revert flow changes
+			}
+		}()
 	}
 	return nil
 }
 
 // deleteLogicalPort removes the logical port
 func (agent *LogicalDeviceAgent) deleteLogicalPort(ctx context.Context, lPort *voltha.LogicalPort) error {
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	logicalDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -1368,8 +1465,10 @@
 
 // deleteLogicalPorts removes the logical ports associated with that deviceId
 func (agent *LogicalDeviceAgent) deleteLogicalPorts(ctx context.Context, deviceID string) error {
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	logicalDevice := agent.getLogicalDeviceWithoutLock()
 	lPortstoKeep := []*voltha.LogicalPort{}
@@ -1403,8 +1502,10 @@
 
 // enableLogicalPort enables the logical port
 func (agent *LogicalDeviceAgent) enableLogicalPort(ctx context.Context, lPortID string) error {
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	logicalDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -1424,8 +1525,10 @@
 
 // disableLogicalPort disabled the logical port
 func (agent *LogicalDeviceAgent) disableLogicalPort(ctx context.Context, lPortID string) error {
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	// Get the most up to date logical device
 	logicalDevice := agent.getLogicalDeviceWithoutLock()
@@ -1546,9 +1649,12 @@
 
 //rebuildRoutes rebuilds the device routes
 func (agent *LogicalDeviceAgent) buildRoutes(ctx context.Context) error {
-	log.Debugw("building-routes", log.Fields{"logical-device-id": agent.logicalDeviceID})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	log.Debugf("building-routes", log.Fields{"logical-device-id": agent.logicalDeviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
 	if agent.deviceRoutes == nil {
 		agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
 	}
@@ -1568,8 +1674,11 @@
 //updateRoutes updates the device routes
 func (agent *LogicalDeviceAgent) updateRoutes(ctx context.Context, lp *voltha.LogicalPort) error {
 	log.Debugw("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
 	if agent.deviceRoutes == nil {
 		agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
 	}
@@ -1673,13 +1782,15 @@
 		log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
 		return false, nil
 	}
-	agent.lockLogicalDevice.RLock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return false, err
+	}
 	if agent.portExist(device, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
-		agent.lockLogicalDevice.RUnlock()
+		agent.requestQueue.RequestComplete()
 		return false, nil
 	}
-	agent.lockLogicalDevice.RUnlock()
+	agent.requestQueue.RequestComplete()
 
 	var portCap *ic.PortCapability
 	var err error
@@ -1689,8 +1800,11 @@
 		return false, err
 	}
 
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return false, err
+	}
+
+	defer agent.requestQueue.RequestComplete()
 	// Double check again if this port has been already added since the getPortCapability could have taken a long time
 	if agent.portExist(device, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
@@ -1722,7 +1836,7 @@
 	clonedLP := (proto.Clone(lp)).(*voltha.LogicalPort)
 	go func() {
 		if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
-			log.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
+			log.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
 		}
 	}()
 
@@ -1749,13 +1863,16 @@
 		log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
 		return false, nil
 	}
-	agent.lockLogicalDevice.RLock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return false, err
+	}
+
 	if agent.portExist(childDevice, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
-		agent.lockLogicalDevice.RUnlock()
+		agent.requestQueue.RequestComplete()
 		return false, nil
 	}
-	agent.lockLogicalDevice.RUnlock()
+	agent.requestQueue.RequestComplete()
 	var portCap *ic.PortCapability
 	var err error
 	// First get the port capability
@@ -1763,8 +1880,10 @@
 		log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
 		return false, err
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return false, err
+	}
+	defer agent.requestQueue.RequestComplete()
 	// Double check again if this port has been already added since the getPortCapability could have taken a long time
 	if agent.portExist(childDevice, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/logical_device_agent_test.go
index ece7c7b..957bb50 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/logical_device_agent_test.go
@@ -449,7 +449,7 @@
 	ctx := context.Background()
 	cfg := config.NewRWCoreFlags()
 	cfg.CorePairTopic = "rw_core"
-	cfg.DefaultRequestTimeout = lda.defaultTimeout.Nanoseconds() / 1000000 //TODO: change when Core changes to Duration
+	cfg.DefaultRequestTimeout = lda.defaultTimeout
 	cfg.KVStorePort = lda.kvClientPort
 	cfg.InCompetingMode = inCompeteMode
 	grpcPort, err := freeport.GetFreePort()
@@ -487,6 +487,7 @@
 	clonedLD.DatapathId = rand.Uint64()
 	lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
 	lDeviceAgent.logicalDevice = clonedLD
+	lDeviceAgent.requestQueue.Start()
 	added, err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "/logical_devices", clonedLD.Id, clonedLD, "")
 	assert.Nil(t, err)
 	assert.NotNil(t, added)
@@ -495,7 +496,7 @@
 }
 
 func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalDeviceAgent, globalWG *sync.WaitGroup) {
-	originalLogicalDevice := ldAgent.GetLogicalDevice()
+	originalLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
 	assert.NotNil(t, originalLogicalDevice)
 	var localWG sync.WaitGroup
 
@@ -557,7 +558,7 @@
 	expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
 	expectedChange.Meters = &voltha.Meters{Items: nil}
 	expectedChange.Meters.Items = append(expectedChange.Meters.Items, fu.MeterEntryFromMeterMod(meterMod))
-	updatedLogicalDevice := ldAgent.GetLogicalDevice()
+	updatedLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
 	assert.NotNil(t, updatedLogicalDevice)
 	assert.True(t, proto.Equal(expectedChange, updatedLogicalDevice))
 	globalWG.Done()
@@ -572,7 +573,7 @@
 	lda.startCore(false)
 
 	var wg sync.WaitGroup
-	numConCurrentLogicalDeviceAgents := 20
+	numConCurrentLogicalDeviceAgents := 3
 	for i := 0; i < numConCurrentLogicalDeviceAgents; i++ {
 		wg.Add(1)
 		a := lda.createLogicalDeviceAgent(t)
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index e6b8fe0..df11ac4 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -19,9 +19,6 @@
 import (
 	"context"
 	"errors"
-	"strings"
-	"sync"
-
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -30,6 +27,9 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"strings"
+	"sync"
+	"time"
 )
 
 // LogicalDeviceManager represent logical device manager attributes
@@ -41,12 +41,12 @@
 	kafkaICProxy                   kafka.InterContainerProxy
 	clusterDataProxy               *model.Proxy
 	exitChannel                    chan int
-	defaultTimeout                 int64
+	defaultTimeout                 time.Duration
 	logicalDevicesLoadingLock      sync.RWMutex
 	logicalDeviceLoadingInProgress map[string][]chan int
 }
 
-func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy kafka.InterContainerProxy, cdProxy *model.Proxy, timeout int64) *LogicalDeviceManager {
+func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy kafka.InterContainerProxy, cdProxy *model.Proxy, timeout time.Duration) *LogicalDeviceManager {
 	var logicalDeviceMgr LogicalDeviceManager
 	logicalDeviceMgr.core = core
 	logicalDeviceMgr.exitChannel = make(chan int, 1)
@@ -127,17 +127,17 @@
 func (ldMgr *LogicalDeviceManager) getLogicalDevice(ctx context.Context, id string) (*voltha.LogicalDevice, error) {
 	log.Debugw("getlogicalDevice", log.Fields{"logicaldeviceid": id})
 	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		return agent.GetLogicalDevice(), nil
+		return agent.GetLogicalDevice(ctx)
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
 
-func (ldMgr *LogicalDeviceManager) listManagedLogicalDevices() (*voltha.LogicalDevices, error) {
+func (ldMgr *LogicalDeviceManager) listManagedLogicalDevices(ctx context.Context) (*voltha.LogicalDevices, error) {
 	log.Debug("listManagedLogicalDevices")
 	result := &voltha.LogicalDevices{}
 	ldMgr.logicalDeviceAgents.Range(func(key, value interface{}) bool {
 		agent := value.(*LogicalDeviceAgent)
-		if ld := agent.GetLogicalDevice(); ld != nil {
+		if ld, _ := agent.GetLogicalDevice(ctx); ld != nil {
 			result.Items = append(result.Items, ld)
 		}
 		return true
@@ -192,9 +192,11 @@
 	}
 
 	go func() {
-		err := agent.start(ctx, false)
+		//agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+		err := agent.start(context.Background(), false)
 		if err != nil {
 			log.Errorw("unable-to-create-the-logical-device", log.Fields{"error": err})
+			ldMgr.deleteLogicalDeviceAgent(id)
 		}
 	}()
 
@@ -255,13 +257,9 @@
 				log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
 				agent := newLogicalDeviceAgent(lDeviceID, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
 				if err := agent.start(ctx, true); err != nil {
-					if err := agent.stop(ctx); err != nil {
-						log.Errorw("failed-to-stop-agent", log.Fields{"error": err})
-						return err
-					}
-				} else {
-					ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
+					return err
 				}
+				ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
 			} else {
 				log.Debugw("logicalDevice not in model", log.Fields{"lDeviceId": lDeviceID})
 			}
@@ -363,7 +361,7 @@
 func (ldMgr *LogicalDeviceManager) ListLogicalDeviceFlows(ctx context.Context, id string) (*openflow_13.Flows, error) {
 	log.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id})
 	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		return agent.ListLogicalDeviceFlows(), nil
+		return agent.ListLogicalDeviceFlows(ctx)
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
@@ -372,7 +370,7 @@
 func (ldMgr *LogicalDeviceManager) ListLogicalDeviceFlowGroups(ctx context.Context, id string) (*openflow_13.FlowGroups, error) {
 	log.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"logicaldeviceid": id})
 	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		return agent.ListLogicalDeviceFlowGroups(), nil
+		return agent.ListLogicalDeviceFlowGroups(ctx)
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
@@ -381,7 +379,7 @@
 func (ldMgr *LogicalDeviceManager) ListLogicalDevicePorts(ctx context.Context, id string) (*voltha.LogicalPorts, error) {
 	log.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
 	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		return agent.ListLogicalDevicePorts(), nil
+		return agent.ListLogicalDevicePorts(ctx)
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
@@ -585,7 +583,7 @@
 func (ldMgr *LogicalDeviceManager) ListLogicalDeviceMeters(ctx context.Context, id string) (*openflow_13.Meters, error) {
 	log.Debugw("ListLogicalDeviceMeters", log.Fields{"logicalDeviceId": id})
 	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		return agent.ListLogicalDeviceMeters(), nil
+		return agent.ListLogicalDeviceMeters(ctx)
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
diff --git a/rw_core/coreif/device_manager_if.go b/rw_core/coreif/device_manager_if.go
index 60da009..a3f65cf 100644
--- a/rw_core/coreif/device_manager_if.go
+++ b/rw_core/coreif/device_manager_if.go
@@ -29,15 +29,14 @@
 type DeviceManager interface {
 	GetDevice(context.Context, string) (*voltha.Device, error)
 	IsRootDevice(string) (bool, error)
-	NotifyInvalidTransition(context.Context, *voltha.Device) error
-	SetAdminStateToEnable(context.Context, *voltha.Device) error
-	CreateLogicalDevice(context.Context, *voltha.Device) error
-	SetupUNILogicalPorts(context.Context, *voltha.Device) error
-	DisableAllChildDevices(context.Context, *voltha.Device) error
-	DeleteLogicalDevice(context.Context, *voltha.Device) error
-	DeleteLogicalPorts(context.Context, *voltha.Device) error
-	DeleteAllChildDevices(context.Context, *voltha.Device) error
-	RunPostDeviceDelete(context.Context, *voltha.Device) error
-	ChildDeviceLost(context.Context, *voltha.Device) error
-	DeleteAllUNILogicalPorts(context.Context, *voltha.Device) error
+	NotifyInvalidTransition(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+	CreateLogicalDevice(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+	SetupUNILogicalPorts(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+	DisableAllChildDevices(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+	DeleteLogicalDevice(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+	DeleteLogicalPorts(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+	DeleteAllChildDevices(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+	RunPostDeviceDelete(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+	ChildDeviceLost(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+	DeleteAllUNILogicalPorts(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
 }
diff --git a/rw_core/coreif/logical_device_agent_if.go b/rw_core/coreif/logical_device_agent_if.go
index 0670bdf..ac4201a 100644
--- a/rw_core/coreif/logical_device_agent_if.go
+++ b/rw_core/coreif/logical_device_agent_if.go
@@ -28,8 +28,8 @@
 
 // LogicalDeviceAgent represents a generic agent
 type LogicalDeviceAgent interface {
-	GetLogicalDevice() *voltha.LogicalDevice
 	GetDeviceRoutes() *route.DeviceRoutes
+	GetLogicalDevice(ctx context.Context) (*voltha.LogicalDevice, error)
 	GetWildcardInputPorts(excludePort ...uint32) []uint32
 	GetRoute(ctx context.Context, ingressPortNo uint32, egressPortNo uint32) ([]route.Hop, error)
 	GetNNIPorts() []uint32
diff --git a/rw_core/flowdecomposition/flow_decomposer_test.go b/rw_core/flowdecomposition/flow_decomposer_test.go
index 6164f0d..3517f43 100644
--- a/rw_core/flowdecomposition/flow_decomposer_test.go
+++ b/rw_core/flowdecomposition/flow_decomposer_test.go
@@ -417,8 +417,8 @@
 	return ""
 }
 
-func (tfd *testFlowDecomposer) GetLogicalDevice() *voltha.LogicalDevice {
-	return nil
+func (tfd *testFlowDecomposer) GetLogicalDevice(ctx context.Context) (*voltha.LogicalDevice, error) {
+	return nil, nil
 }
 
 func (tfd *testFlowDecomposer) GetDeviceRoutes() *route.DeviceRoutes {
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index 303bae3..f145ab6 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -190,7 +190,7 @@
 		cloned := proto.Clone(device).(*voltha.Device)
 		// Update the all ports state on that device to disable
 		if err := oltA.coreProxy.PortsStateUpdate(context.TODO(), cloned.Id, voltha.OperStatus_UNKNOWN); err != nil {
-			log.Fatalf("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
+			log.Warnw("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
 		}
 
 		//Update the device state
@@ -198,7 +198,9 @@
 		cloned.OperStatus = voltha.OperStatus_UNKNOWN
 
 		if err := oltA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
-			log.Fatalf("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
+			// Device may already have been deleted in the core
+			log.Warnw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
+			return
 		}
 
 		if err := oltA.updateDevice(cloned); err != nil {
@@ -207,7 +209,8 @@
 
 		// Tell the Core that all child devices have been disabled (by default it's an action already taken by the Core
 		if err := oltA.coreProxy.ChildDevicesLost(context.TODO(), cloned.Id); err != nil {
-			log.Fatalf("lost-notif-of-child-devices-failed", log.Fields{"deviceId": device.Id, "error": err})
+			// Device may already have been deleted in the core
+			log.Warnw("lost-notif-of-child-devices-failed", log.Fields{"deviceId": device.Id, "error": err})
 		}
 	}()
 	return nil
@@ -262,7 +265,8 @@
 
 		if Port.Type == voltha.Port_PON_OLT {
 			if err := oltA.coreProxy.PortStateUpdate(context.TODO(), deviceId, voltha.Port_PON_OLT, Port.PortNo, voltha.OperStatus_DISCOVERED); err != nil {
-				log.Fatalf("updating-ports-failed", log.Fields{"device-id": deviceId, "error": err})
+				// Corresponding device may have been deleted
+				log.Warnw("updating-ports-failed", log.Fields{"device-id": deviceId, "error": err})
 			}
 		}
 	}()
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index 73ee749..c31cb93 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -62,20 +62,12 @@
 			log.Fatalf("deviceUpdate-failed-%s", res)
 		}
 
-		// Updating the device states twice, once with oper status to discovered and followed by active may cause
-		// a failure for unit tests when these requests reaches the Core within a millisecond of each other (with real
-		// hardware will not happen as the time between these requests is much higher than 1 millisecond).  For
-		// some reasons this issue is seen on Jenkins but not when running the tests locally. The issue
-		// in the core is triggered when these requests are processed out of order (an issue in the Core that is
-		// being handled by https://jira.opencord.org/browse/VOL-2164).
-		// TODO:  Once the above change is completed then this code can be uncommented.
+		d.ConnectStatus = voltha.ConnectStatus_REACHABLE
+		d.OperStatus = voltha.OperStatus_DISCOVERED
 
-		//d.ConnectStatus = voltha.ConnectStatus_REACHABLE
-		//d.OperStatus = voltha.OperStatus_DISCOVERED
-
-		//if err := onuA.coreProxy.DeviceStateUpdate(context.TODO(), d.Id, d.ConnectStatus, d.OperStatus); err != nil {
-		//	log.Fatalf("device-state-update-failed-%s", err)
-		//}
+		if err := onuA.coreProxy.DeviceStateUpdate(context.TODO(), d.Id, d.ConnectStatus, d.OperStatus); err != nil {
+			log.Fatalf("device-state-update-failed-%s", err)
+		}
 
 		uniPortNo := uint32(2)
 		if device.ProxyAddress != nil {
@@ -163,14 +155,17 @@
 		cloned := proto.Clone(device).(*voltha.Device)
 		// Update the all ports state on that device to disable
 		if err := onuA.coreProxy.PortsStateUpdate(context.TODO(), cloned.Id, voltha.OperStatus_UNKNOWN); err != nil {
-			log.Fatalf("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
+			// Device may also have been deleted in the Core
+			log.Warnw("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
+			return
 		}
 		//Update the device state
 		cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
 		cloned.OperStatus = voltha.OperStatus_UNKNOWN
 
 		if err := onuA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
-			log.Fatalf("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
+			log.Warnw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
+			return
 		}
 		if err := onuA.updateDevice(cloned); err != nil {
 			log.Fatalf("saving-device-failed-%s", err)
diff --git a/rw_core/mocks/device_manager.go b/rw_core/mocks/device_manager.go
index 45e5af5..b3d63cb 100644
--- a/rw_core/mocks/device_manager.go
+++ b/rw_core/mocks/device_manager.go
@@ -22,7 +22,9 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 )
 
-// DeviceManager -
+// DeviceManager represents a mock of a device manager that implements the coreif/DeviceManager interface.  It provides
+// default behaviors. For non-default behavior, another implementation of the coreif/DeviceManager interface must be
+// used.
 type DeviceManager struct {
 }
 
@@ -37,152 +39,51 @@
 }
 
 // NotifyInvalidTransition -
-func (dm *DeviceManager) NotifyInvalidTransition(ctx context.Context, pcDevice *voltha.Device) error {
-	return nil
-}
-
-// SetAdminStateToEnable -
-func (dm *DeviceManager) SetAdminStateToEnable(ctx context.Context, cDevice *voltha.Device) error {
+func (dm *DeviceManager) NotifyInvalidTransition(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	return nil
 }
 
 // CreateLogicalDevice -
-func (dm *DeviceManager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
+func (dm *DeviceManager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	return nil
 }
 
 // SetupUNILogicalPorts -
-func (dm *DeviceManager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
+func (dm *DeviceManager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	return nil
 }
 
 // DisableAllChildDevices -
-func (dm *DeviceManager) DisableAllChildDevices(ctx context.Context, cDevice *voltha.Device) error {
+func (dm *DeviceManager) DisableAllChildDevices(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	return nil
 }
 
 // DeleteLogicalDevice -
-func (dm *DeviceManager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
+func (dm *DeviceManager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	return nil
 }
 
 // DeleteLogicalPorts -
-func (dm *DeviceManager) DeleteLogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
+func (dm *DeviceManager) DeleteLogicalPorts(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	return nil
 }
 
 // DeleteAllChildDevices -
-func (dm *DeviceManager) DeleteAllChildDevices(ctx context.Context, cDevice *voltha.Device) error {
+func (dm *DeviceManager) DeleteAllChildDevices(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	return nil
 }
 
 // DeleteAllUNILogicalPorts -
-func (dm *DeviceManager) DeleteAllUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
+func (dm *DeviceManager) DeleteAllUNILogicalPorts(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	return nil
 }
 
 // RunPostDeviceDelete -
-func (dm *DeviceManager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device) error {
+func (dm *DeviceManager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	return nil
 }
 
-// ListDevices -
-func (dm *DeviceManager) ListDevices() (*voltha.Devices, error) {
-	return nil, nil
-}
-
-// ListDeviceIds -
-func (dm *DeviceManager) ListDeviceIds() (*voltha.IDs, error) {
-	return nil, nil
-}
-
-// ReconcileDevices -
-func (dm *DeviceManager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
-}
-
-// CreateDevice -
-func (dm *DeviceManager) CreateDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
-}
-
-// EnableDevice -
-func (dm *DeviceManager) EnableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
-}
-
-// DisableDevice -
-func (dm *DeviceManager) DisableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
-}
-
-// RebootDevice -
-func (dm *DeviceManager) RebootDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
-}
-
-// DeleteDevice -
-func (dm *DeviceManager) DeleteDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
-}
-
-// StopManagingDevice -
-func (dm *DeviceManager) StopManagingDevice(id string) {
-}
-
-// DownloadImage -
-func (dm *DeviceManager) DownloadImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
-}
-
-// CancelImageDownload -
-func (dm *DeviceManager) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
-}
-
-// ActivateImage -
-func (dm *DeviceManager) ActivateImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
-}
-
-// RevertImage -
-func (dm *DeviceManager) RevertImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
-}
-
-// GetImageDownloadStatus -
-func (dm *DeviceManager) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
-}
-
-// UpdateImageDownload -
-func (dm *DeviceManager) UpdateImageDownload(deviceID string, img *voltha.ImageDownload) error {
-	return nil
-}
-
-// SimulateAlarm -
-func (dm *DeviceManager) SimulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest, ch chan interface{}) {
-}
-
-// GetImageDownload -
-func (dm *DeviceManager) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
-	return nil, nil
-}
-
-// ListImageDownloads -
-func (dm *DeviceManager) ListImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
-	return nil, nil
-}
-
-// UpdatePmConfigs -
-func (dm *DeviceManager) UpdatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs, ch chan interface{}) {
-}
-
-// ListPmConfigs -
-func (dm *DeviceManager) ListPmConfigs(ctx context.Context, deviceID string) (*voltha.PmConfigs, error) {
-	return nil, nil
-}
-
-// DeletePeerPorts -
-func (dm *DeviceManager) DeletePeerPorts(fromDeviceID string, deviceID string) error {
-	return nil
-}
-
-// ProcessTransition -
-func (dm *DeviceManager) ProcessTransition(previous *voltha.Device, current *voltha.Device) error {
-	return nil
-}
-
-// ChildDeviceLost -
-func (dm *DeviceManager) ChildDeviceLost(ctx context.Context, cDevice *voltha.Device) error {
+// childDeviceLost -
+func (dm *DeviceManager) ChildDeviceLost(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	return nil
 }
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index 82465ef..bffd9c4 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -17,13 +17,19 @@
 package utils
 
 import (
+	"context"
 	"os"
+	"sync"
 	"time"
 
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
 
+// ResponseCallback is the function signature for callbacks to execute after a response is received.
+type ResponseCallback func(rpc string, response interface{}, reqArgs ...interface{})
+
 // DeviceID represent device id attribute
 type DeviceID struct {
 	ID string
@@ -39,6 +45,97 @@
 	return os.Getenv("HOSTNAME")
 }
 
+type request struct {
+	channel chan struct{}
+	done    chan struct{}
+}
+
+func newRequest() *request {
+	return &request{
+		channel: make(chan struct{}),
+		done:    make(chan struct{}),
+	}
+}
+
+// RequestQueue represents a request processing queue where each request is processed to completion before another
+// request is given the green light to proceed.
+type RequestQueue struct {
+	queue                     chan *request
+	requestCompleteIndication chan struct{}
+	queueID                   string
+	stopOnce                  sync.Once
+	stopped                   bool
+}
+
+// NewRequestQueue creates a new request queue. maxQueueSize is the maximum size of the queue. queueID is used mostly
+// for logging.
+func NewRequestQueue(queueID string, maxQueueSize int) *RequestQueue {
+	return &RequestQueue{
+		queueID:                   queueID,
+		queue:                     make(chan *request, maxQueueSize),
+		requestCompleteIndication: make(chan struct{}),
+	}
+}
+
+// Start starts the request processing queue in its own go routine
+func (rq *RequestQueue) Start() {
+	go func() {
+		for {
+			req, ok := <-rq.queue
+			if !ok {
+				log.Warnw("request-sequencer-queue-closed", log.Fields{"id": rq.queueID})
+				break
+			}
+			// If the request is waiting then closing the reqChnl will trigger the request to proceed.  Otherwise,
+			// if the request was cancelled then this will just clean up.
+			close(req.channel)
+
+			// Wait for either a request complete indication or a request aborted due to timeout
+			select {
+			case <-req.done:
+			case <-rq.requestCompleteIndication:
+			}
+		}
+	}()
+}
+
+// WaitForGreenLight is invoked by a function processing a request to receive the green light before
+// proceeding.  The caller can also provide a context with timeout.  The timeout will be triggered if the wait is
+// too long (previous requests taking too long)
+func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
+	if rq.stopped {
+		return status.Errorf(codes.Aborted, "queue-already-stopped-%s", rq.queueID)
+	}
+	request := newRequest()
+	// Queue the request
+	rq.queue <- request
+	select {
+	case <-request.channel:
+		return nil
+	case <-ctx.Done():
+		close(request.done)
+		return ctx.Err()
+	}
+}
+
+// RequestComplete must be invoked by a process when it completes processing the request.  That process must have
+// invoked WaitForGreenLight() before.
+func (rq *RequestQueue) RequestComplete() {
+	if !rq.stopped {
+		rq.requestCompleteIndication <- struct{}{}
+	}
+}
+
+// Stop must only be invoked by the process that started the request queue.   Prior to invoking Stop, WaitForGreenLight
+// must be invoked.
+func (rq *RequestQueue) Stop() {
+	rq.stopOnce.Do(func() {
+		rq.stopped = true
+		close(rq.requestCompleteIndication)
+		close(rq.queue)
+	})
+}
+
 // Response -
 type Response struct {
 	*response
@@ -91,9 +188,9 @@
 //The error will be at the index corresponding to the order in which the channel appear in the parameter list.
 //If no errors is found then nil is returned.  This method also takes in a timeout in milliseconds. If a
 //timeout is obtained then this function will stop waiting for the remaining responses and abort.
-func WaitForNilOrErrorResponses(timeout int64, responses ...Response) []error {
+func WaitForNilOrErrorResponses(timeout time.Duration, responses ...Response) []error {
 	timedOut := make(chan struct{})
-	timer := time.AfterFunc(time.Duration(timeout)*time.Millisecond, func() { close(timedOut) })
+	timer := time.AfterFunc(timeout, func() { close(timedOut) })
 	defer timer.Stop()
 
 	gotError := false
diff --git a/rw_core/utils/core_utils_test.go b/rw_core/utils/core_utils_test.go
index d7797e5..0457606 100644
--- a/rw_core/utils/core_utils_test.go
+++ b/rw_core/utils/core_utils_test.go
@@ -50,7 +50,7 @@
 	response.Error(taskFailureError)
 }
 
-func runMultipleTasks(timeout, numTasks, taskDurationRange, numSuccessfulTask, numFailuretask int) []error {
+func runMultipleTasks(timeout time.Duration, numTasks, taskDurationRange, numSuccessfulTask, numFailuretask int) []error {
 	if numTasks != numSuccessfulTask+numFailuretask {
 		return []error{status.Error(codes.FailedPrecondition, "invalid-num-tasks")}
 	}
@@ -65,7 +65,7 @@
 		}
 		go runFailureTask(responses[i], taskDurationRange)
 	}
-	return WaitForNilOrErrorResponses(int64(timeout), responses...)
+	return WaitForNilOrErrorResponses(timeout, responses...)
 }
 
 func getNumSuccessFailure(inputs []error) (numSuccess, numFailure, numTimeout int) {
@@ -99,12 +99,11 @@
 	for i := 0; i < numIterations; i++ {
 		totalSuccess = rand.Intn(numTasks)
 		totalFailure = numTasks - totalSuccess
-		results = runMultipleTasks(110, numTasks, 100, totalSuccess, totalFailure)
+		results = runMultipleTasks(110*time.Millisecond, numTasks, 50, totalSuccess, totalFailure)
 		nSuccess, nFailure, nTimeouts = getNumSuccessFailure(results)
 		assert.Equal(t, totalFailure, nFailure)
 		assert.Equal(t, totalSuccess, nSuccess)
 		assert.Equal(t, 0, nTimeouts)
-
 	}
 }
 
@@ -122,7 +121,7 @@
 	for i := 0; i < numIterations; i++ {
 		totalSuccess = rand.Intn(numTasks)
 		totalFailure = numTasks - totalSuccess
-		results = runMultipleTasks(50, numTasks, 100, totalSuccess, totalFailure)
+		results = runMultipleTasks(50*time.Millisecond, numTasks, 100, totalSuccess, totalFailure)
 		nSuccess, nFailure, nTimeouts = getNumSuccessFailure(results)
 		assert.True(t, nFailure >= totalFailure)
 		assert.True(t, nSuccess <= totalSuccess)