[VOL-2164] Update rw-core to use the Async Kafka API

This commit consists of the following:

1. Process per-device requests in the Core in the order they are
received. If there are lots of requests on a given device then
there will be some latencies introduced due to ordering.  With
recent changes in the model along with keeping the request lock
to a minimal then these latencies are reduced.  Testing did not
show and noticeable latencies.

2) Keep the request lock from the moment a request started
processing to the moment that request is sent to kafka (when
applicable).  Adapter responses are received and processed
asynchronously. Therefore, an adapter can takes all the time it
needs to process a transaction.  The Core still has a context
with timeout (configurable) to cater for cases where the adapter
does not return a response.

3) Adapter requests are processed to completion before sending a
reponse back to the adapter.  Previously, in some cases, a
separate go routine was created to process the request and a
successful response is sent to the adapter.  Now if the request
fails then the adapter will receive an error. The adapter
requests for a given device are therefore processed in the
order they are received.

4) Some changes are made when retrieving a handler to execute
a device state transition.  This was necessary as there was some
transition overlap found.

Update after multiple reviews.

Change-Id: I55a189efec1549a662f2d71e18e6eca9015a3a17
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index dfc3907..4e029c8 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -21,14 +21,10 @@
 	"encoding/hex"
 	"errors"
 	"fmt"
-	"github.com/opencord/voltha-go/rw_core/route"
-	"reflect"
-	"sync"
-	"time"
-
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/db/model"
 	fd "github.com/opencord/voltha-go/rw_core/flowdecomposition"
+	"github.com/opencord/voltha-go/rw_core/route"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -37,6 +33,13 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"reflect"
+	"sync"
+	"time"
+)
+
+const (
+	maxOrderedLogicalDeviceRequestQueueSize = 1000
 )
 
 // LogicalDeviceAgent represent attributes of logical device agent
@@ -53,19 +56,20 @@
 	meterProxy         *model.Proxy
 	ldProxy            *model.Proxy
 	portProxies        map[string]*model.Proxy
-	portProxiesLock    sync.RWMutex
-	lockLogicalDevice  sync.RWMutex
 	lockDeviceRoutes   sync.RWMutex
 	logicalPortsNo     map[uint32]bool //value is true for NNI port
 	lockLogicalPortsNo sync.RWMutex
 	flowDecomposer     *fd.FlowDecomposer
-	defaultTimeout     int64
+	defaultTimeout     time.Duration
 	logicalDevice      *voltha.LogicalDevice
+	requestQueue       *coreutils.RequestQueue
+	startOnce          sync.Once
+	stopOnce           sync.Once
 }
 
 func newLogicalDeviceAgent(id string, deviceID string, ldeviceMgr *LogicalDeviceManager,
 	deviceMgr *DeviceManager,
-	cdProxy *model.Proxy, timeout int64) *LogicalDeviceAgent {
+	cdProxy *model.Proxy, timeout time.Duration) *LogicalDeviceAgent {
 	var agent LogicalDeviceAgent
 	agent.exitChannel = make(chan int, 1)
 	agent.logicalDeviceID = id
@@ -74,27 +78,40 @@
 	agent.clusterDataProxy = cdProxy
 	agent.ldeviceMgr = ldeviceMgr
 	agent.flowDecomposer = fd.NewFlowDecomposer(agent.deviceMgr)
-	agent.lockLogicalDevice = sync.RWMutex{}
 	agent.portProxies = make(map[string]*model.Proxy)
-	agent.portProxiesLock = sync.RWMutex{}
-	agent.lockLogicalPortsNo = sync.RWMutex{}
-	agent.lockDeviceRoutes = sync.RWMutex{}
 	agent.logicalPortsNo = make(map[uint32]bool)
 	agent.defaultTimeout = timeout
+	agent.requestQueue = coreutils.NewRequestQueue(agent.logicalDeviceID, maxOrderedLogicalDeviceRequestQueueSize)
 	return &agent
 }
 
 // start creates the logical device and add it to the data model
-func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromdB bool) error {
-	log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceID, "loadFromdB": loadFromdB})
+func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromDB bool) error {
+	needToStart := false
+	if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
+		return nil
+	}
+
+	log.Infow("starting-logical_device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
+
+	var startSucceeded bool
+	defer func() {
+		if !startSucceeded {
+			if err := agent.stop(ctx); err != nil {
+				log.Errorw("failed-to-cleanup-after-unsuccessful-start", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+			}
+		}
+	}()
+
+	// Launch the request queue - it will launch a go routine
+	agent.requestQueue.Start()
+
 	var ld *voltha.LogicalDevice
-	var err error
-	if !loadFromdB {
+	if !loadFromDB {
 		//Build the logical device based on information retrieved from the device adapter
 		var switchCap *ic.SwitchCapability
 		var err error
 		if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceID); err != nil {
-			log.Errorw("error-creating-logical-device", log.Fields{"error": err})
 			return err
 		}
 		ld = &voltha.LogicalDevice{Id: agent.logicalDeviceID, RootDeviceId: agent.rootDeviceID}
@@ -102,7 +119,6 @@
 		// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
 		var datapathID uint64
 		if datapathID, err = CreateDataPathID(agent.logicalDeviceID); err != nil {
-			log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
 			return err
 		}
 		ld.DatapathId = datapathID
@@ -113,42 +129,35 @@
 		ld.FlowGroups = &ofp.FlowGroups{Items: nil}
 		ld.Ports = []*voltha.LogicalPort{}
 
-		agent.lockLogicalDevice.Lock()
 		// Save the logical device
 		added, err := agent.clusterDataProxy.AddWithID(ctx, "/logical_devices", ld.Id, ld, "")
 		if err != nil {
-			log.Errorw("failed-to-save-logical-devices-to-cluster-proxy", log.Fields{"error": err})
-			agent.lockLogicalDevice.Unlock()
 			return err
 		}
 		if added == nil {
-			log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+			log.Errorw("failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
 		} else {
-			log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+			log.Debugw("logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
 		}
 
 		agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
-		agent.lockLogicalDevice.Unlock()
 
-		// TODO:  Set the logical ports in a separate call once the port update issue is fixed.
+		// Setup the logicalports - internal processing, no need to propagate the client context
 		go func() {
-			err := agent.setupLogicalPorts(ctx)
+			err := agent.setupLogicalPorts(context.Background())
 			if err != nil {
 				log.Errorw("unable-to-setup-logical-ports", log.Fields{"error": err})
 			}
 		}()
-
 	} else {
 		//	load from dB - the logical may not exist at this time.  On error, just return and the calling function
 		// will destroy this agent.
-		agent.lockLogicalDevice.Lock()
 		logicalDevice, err := agent.clusterDataProxy.Get(ctx, "/logical_devices/"+agent.logicalDeviceID, 0, true, "")
 		if err != nil {
-			return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
+			return err
 		}
 		ld, ok := logicalDevice.(*voltha.LogicalDevice)
 		if !ok {
-			agent.lockLogicalDevice.Unlock()
 			return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
 		}
 		// Update the root device Id
@@ -157,20 +166,16 @@
 		// Update the last data
 		agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
 
-		agent.lockLogicalDevice.Unlock()
-
 		// Setup the local list of logical ports
 		agent.addLogicalPortsToMap(ld.Ports)
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
 
+	var err error
 	agent.flowProxy, err = agent.clusterDataProxy.CreateProxy(
 		ctx,
 		fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceID),
 		false)
 	if err != nil {
-		log.Errorw("failed-to-create-flow-proxy", log.Fields{"error": err})
 		return err
 	}
 	agent.meterProxy, err = agent.clusterDataProxy.CreateProxy(
@@ -201,89 +206,118 @@
 	if agent.ldProxy != nil {
 		agent.ldProxy.RegisterCallback(model.PostUpdate, agent.portUpdated)
 	} else {
-		log.Errorw("logical-device-proxy-null", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 		return status.Error(codes.Internal, "logical-device-proxy-null")
 	}
 
 	// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
-	if loadFromdB {
+	if loadFromDB {
 		go func() {
 			if err := agent.buildRoutes(context.Background()); err != nil {
-				log.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+				log.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
 			}
 		}()
 	}
+	startSucceeded = true
+
 	return nil
 }
 
-// stop stops the logical devuce agent.  This removes the logical device from the data model.
+// stop stops the logical device agent.  This removes the logical device from the data model.
 func (agent *LogicalDeviceAgent) stop(ctx context.Context) error {
-	log.Info("stopping-logical_device-agent")
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	var returnErr error
+	agent.stopOnce.Do(func() {
+		log.Info("stopping-logical_device-agent")
 
-	//Remove the logical device from the model
-	if removed, err := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceID, ""); err != nil {
-		log.Errorw("failed-to-remove-device", log.Fields{"error": err})
-		return err
-	} else if removed == nil {
-		log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
-	} else {
-		log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
-	}
-	agent.exitChannel <- 1
-	log.Info("logical_device-agent-stopped")
-	return nil
+		if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+			// This should never happen - an error is returned only if the agent is stopped and an agent is only stopped once.
+			returnErr = err
+			return
+		}
+		defer agent.requestQueue.RequestComplete()
+
+		//Remove the logical device from the model
+		if removed, err := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceID, ""); err != nil {
+			returnErr = err
+		} else if removed == nil {
+			returnErr = status.Errorf(codes.Aborted, "failed-to-remove-logical-ldevice-%s", agent.logicalDeviceID)
+		} else {
+			log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+		}
+
+		// Stop the request queue and request complete indication
+		agent.requestQueue.Stop()
+
+		close(agent.exitChannel)
+
+		log.Info("logical_device-agent-stopped")
+	})
+	return returnErr
 }
 
 // GetLogicalDevice returns the latest logical device data
-func (agent *LogicalDeviceAgent) GetLogicalDevice() *voltha.LogicalDevice {
-	agent.lockLogicalDevice.RLock()
-	defer agent.lockLogicalDevice.RUnlock()
-
-	return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice)
+func (agent *LogicalDeviceAgent) GetLogicalDevice(ctx context.Context) (*voltha.LogicalDevice, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice), nil
 }
 
 // ListLogicalDeviceFlows returns logical device flows
-func (agent *LogicalDeviceAgent) ListLogicalDeviceFlows() *ofp.Flows {
+func (agent *LogicalDeviceAgent) ListLogicalDeviceFlows(ctx context.Context) (*ofp.Flows, error) {
 	log.Debug("ListLogicalDeviceFlows")
 
-	logicalDevice := agent.GetLogicalDevice()
-	if logicalDevice.Flows == nil {
-		return &ofp.Flows{}
+	logicalDevice, err := agent.GetLogicalDevice(ctx)
+	if err != nil {
+		return nil, err
 	}
-	return (proto.Clone(logicalDevice.Flows)).(*ofp.Flows)
+	if logicalDevice.Flows == nil {
+		return &ofp.Flows{}, nil
+	}
+	return (proto.Clone(logicalDevice.Flows)).(*ofp.Flows), nil
 }
 
 // ListLogicalDeviceMeters returns logical device meters
-func (agent *LogicalDeviceAgent) ListLogicalDeviceMeters() *ofp.Meters {
+func (agent *LogicalDeviceAgent) ListLogicalDeviceMeters(ctx context.Context) (*ofp.Meters, error) {
 	log.Debug("ListLogicalDeviceMeters")
 
-	logicalDevice := agent.GetLogicalDevice()
-	if logicalDevice.Meters == nil {
-		return &ofp.Meters{}
+	logicalDevice, err := agent.GetLogicalDevice(ctx)
+	if err != nil {
+		return nil, err
 	}
-	return (proto.Clone(logicalDevice.Meters)).(*ofp.Meters)
+	if logicalDevice.Meters == nil {
+		return &ofp.Meters{}, nil
+	}
+	return (proto.Clone(logicalDevice.Meters)).(*ofp.Meters), nil
 }
 
 // ListLogicalDeviceFlowGroups returns logical device flow groups
-func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups() *ofp.FlowGroups {
+func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups(ctx context.Context) (*ofp.FlowGroups, error) {
 	log.Debug("ListLogicalDeviceFlowGroups")
 
-	logicalDevice := agent.GetLogicalDevice()
-	if logicalDevice.FlowGroups == nil {
-		return &ofp.FlowGroups{}
+	logicalDevice, err := agent.GetLogicalDevice(ctx)
+	if err != nil {
+		return nil, err
 	}
-	return (proto.Clone(logicalDevice.FlowGroups)).(*ofp.FlowGroups)
+	if logicalDevice.FlowGroups == nil {
+		return &ofp.FlowGroups{}, nil
+	}
+	return (proto.Clone(logicalDevice.FlowGroups)).(*ofp.FlowGroups), nil
 }
 
 // ListLogicalDevicePorts returns logical device ports
-func (agent *LogicalDeviceAgent) ListLogicalDevicePorts() *voltha.LogicalPorts {
+func (agent *LogicalDeviceAgent) ListLogicalDevicePorts(ctx context.Context) (*voltha.LogicalPorts, error) {
 	log.Debug("ListLogicalDevicePorts")
-	logicalDevice := agent.GetLogicalDevice()
+	logicalDevice, err := agent.GetLogicalDevice(ctx)
+	if err != nil {
+		return nil, err
+	}
+	if logicalDevice == nil {
+		return &voltha.LogicalPorts{}, nil
+	}
 	lPorts := make([]*voltha.LogicalPort, 0)
 	lPorts = append(lPorts, logicalDevice.Ports...)
-	return &voltha.LogicalPorts{Items: lPorts}
+	return &voltha.LogicalPorts{Items: lPorts}, nil
 }
 
 //updateLogicalDeviceFlowsWithoutLock updates the logical device with the latest flows in the model.
@@ -379,7 +413,7 @@
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		go func(child *voltha.Device) {
-			if err = agent.setupUNILogicalPorts(ctx, child); err != nil {
+			if err = agent.setupUNILogicalPorts(context.Background(), child); err != nil {
 				log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
 				response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
 			}
@@ -420,8 +454,10 @@
 // updatePortState updates the port state of the device
 func (agent *LogicalDeviceAgent) updatePortState(ctx context.Context, deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
 	log.Infow("updatePortState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	// Get the latest logical device info
 	cloned := agent.getLogicalDeviceWithoutLock()
 	for idx, lPort := range cloned.Ports {
@@ -447,8 +483,10 @@
 // updatePortsState updates the ports state related to the device
 func (agent *LogicalDeviceAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
 	log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	// Get the latest logical device info
 	cloned := agent.getLogicalDeviceWithoutLock()
 	for _, lport := range cloned.Ports {
@@ -494,8 +532,10 @@
 // deleteAllLogicalPorts deletes all logical ports associated with this device
 func (agent *LogicalDeviceAgent) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
 	log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	// Get the latest logical device info
 	ld := agent.getLogicalDeviceWithoutLock()
 
@@ -522,8 +562,10 @@
 // deleteAllUNILogicalPorts deletes all UNI logical ports associated with this parent device
 func (agent *LogicalDeviceAgent) deleteAllUNILogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
 	log.Debugw("delete-all-uni-logical-ports", log.Fields{"logical-device-id": agent.logicalDeviceID})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	// Get the latest logical device info
 	ld := agent.getLogicalDeviceWithoutLock()
 
@@ -558,7 +600,9 @@
 	if afterUpdate == nil {
 		return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceID)
 	}
-	agent.logicalDevice = (proto.Clone(logicalDevice)).(*voltha.LogicalDevice)
+	//agent.logicalDevice = (proto.Clone(logicalDevice)).(*voltha.LogicalDevice)
+	agent.logicalDevice = logicalDevice
+
 	return nil
 }
 
@@ -568,12 +612,15 @@
 	agent.lockDeviceRoutes.Lock()
 	defer agent.lockDeviceRoutes.Unlock()
 
-	ld := agent.GetLogicalDevice()
+	ld, err := agent.GetLogicalDevice(ctx)
+	if err != nil {
+		return err
+	}
 
 	if agent.deviceRoutes != nil && agent.deviceRoutes.IsUpToDate(ld) {
 		return nil
 	}
-	log.Debug("Generation of device graph required")
+	log.Debug("Generation of device route required")
 	if err := agent.buildRoutes(ctx); err != nil {
 		return err
 	}
@@ -651,9 +698,10 @@
 	if meterMod == nil {
 		return nil
 	}
-	log.Debug("Waiting for logical device lock!!")
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 	log.Debug("Acquired logical device lock")
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -686,8 +734,10 @@
 	if meterMod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -742,8 +792,10 @@
 	if meterMod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -831,8 +883,10 @@
 	if mod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -904,11 +958,6 @@
 		}
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.addDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
-			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-			return err
-		}
-
 		//	Update model
 		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
 			log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
@@ -930,7 +979,17 @@
 
 			}
 		}
+		// Send the flows to the devices
+		respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &flowMetadata)
 
+		// Create the go routines to wait
+		go func() {
+			// Wait for completion
+			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
+				log.Warnw("failure-to-add-flows", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
+				// TODO : revert added flow
+			}
+		}()
 	}
 	return nil
 }
@@ -970,8 +1029,10 @@
 	if mod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -1027,29 +1088,38 @@
 		}
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.deleteDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
-			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-			return err
-		}
-
 		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: toKeep}); err != nil {
 			log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
+
+		// Update the devices
+		respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+
+		// Wait for the responses
+		go func() {
+			// Wait for completion
+			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+				log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+				// TODO: Revert the flow deletion
+			}
+		}()
 	}
 
 	//TODO: send announcement on delete
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
-	log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
+func (agent *LogicalDeviceAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+	log.Debugw("send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
 
 	responses := make([]coreutils.Response, 0)
 	for deviceID, value := range deviceRules.GetRules() {
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
+			ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+			defer cancel()
 			if err := agent.deviceMgr.addFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
 				response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
@@ -1057,21 +1127,20 @@
 			response.Done()
 		}(deviceID, value)
 	}
-	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
-		return status.Errorf(codes.Aborted, "errors-%s", res)
-	}
-	return nil
+	// Return responses (an array of channels) for the caller to wait for a response from the far end.
+	return responses
 }
 
-func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
-	log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
+func (agent *LogicalDeviceAgent) deleteFlowsAndGroupsFromDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+	log.Debugw("send-delete-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
 
 	responses := make([]coreutils.Response, 0)
 	for deviceID, value := range deviceRules.GetRules() {
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
+			ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+			defer cancel()
 			if err := agent.deviceMgr.deleteFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
 				response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
@@ -1079,21 +1148,19 @@
 			response.Done()
 		}(deviceID, value)
 	}
-	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
-		return status.Errorf(codes.Aborted, "errors-%s", res)
-	}
-	return nil
+	return responses
 }
 
-func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
-	log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
+func (agent *LogicalDeviceAgent) updateFlowsAndGroupsOfDevice(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+	log.Debugw("send-update-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
 
 	responses := make([]coreutils.Response, 0)
 	for deviceID, value := range deviceRules.GetRules() {
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
+			ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+			defer cancel()
 			if err := agent.deviceMgr.updateFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
 				response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
@@ -1101,11 +1168,7 @@
 			response.Done()
 		}(deviceID, value)
 	}
-	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
-		return status.Errorf(codes.Aborted, "errors-%s", res)
-	}
-	return nil
+	return responses
 }
 
 //flowDeleteStrict deletes a flow from the flow table of that logical device
@@ -1114,8 +1177,10 @@
 	if mod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -1172,15 +1237,21 @@
 		}
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.deleteDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
-			log.Errorw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-			return err
-		}
-
 		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
 			log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
+
+		// Update the devices
+		respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+
+		// Wait for completion
+		go func() {
+			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+				log.Warnw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+				//TODO: Revert flow changes
+			}
+		}()
 	}
 	return nil
 }
@@ -1200,8 +1271,10 @@
 	if groupMod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -1216,19 +1289,25 @@
 		deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
 
 		log.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
-		if err := agent.addDeviceFlowsAndGroups(ctx, deviceRules, &voltha.FlowMetadata{}); err != nil {
-			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-			return err
-		}
 
 		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
-	} else {
-		return fmt.Errorf("Groups %d already present", groupMod.GroupId)
+
+		// Update the devices
+		respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
+
+		// Wait for completion
+		go func() {
+			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+				log.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+				//TODO: Revert flow changes
+			}
+		}()
+		return nil
 	}
-	return nil
+	return fmt.Errorf("Groups %d already present", groupMod.GroupId)
 }
 
 func (agent *LogicalDeviceAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
@@ -1236,8 +1315,10 @@
 	if groupMod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 	groups := lDevice.FlowGroups.Items
@@ -1266,23 +1347,29 @@
 		}
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.updateDeviceFlowsAndGroups(ctx, deviceRules, nil); err != nil {
-			log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-			return err
+		if groupsChanged {
+			if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
+				log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+				return err
+			}
 		}
-	}
+		if flowsChanged {
+			if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
+				log.Errorw("cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+				return err
+			}
+		}
 
-	if groupsChanged {
-		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
-			log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-			return err
-		}
-	}
-	if flowsChanged {
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
-			log.Errorw("cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-			return err
-		}
+		// Update the devices
+		respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, nil)
+
+		// Wait for completion
+		go func() {
+			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+				log.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+				//TODO: Revert flow changes
+			}
+		}()
 	}
 	return nil
 }
@@ -1292,8 +1379,10 @@
 	if groupMod == nil {
 		return nil
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	lDevice := agent.getLogicalDeviceWithoutLock()
 	groups := lDevice.FlowGroups.Items
@@ -1315,24 +1404,32 @@
 		deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
 
 		log.Debugw("rules", log.Fields{"rules for group-modify": deviceRules.String()})
-		if err := agent.updateDeviceFlowsAndGroups(ctx, deviceRules, &voltha.FlowMetadata{}); err != nil {
-			log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
-			return err
-		}
 
-		//lDevice.FlowGroups.Items = groups
 		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
+
+		// Update the devices
+		respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, &voltha.FlowMetadata{})
+
+		// Wait for completion
+		go func() {
+			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+				log.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+				//TODO: Revert flow changes
+			}
+		}()
 	}
 	return nil
 }
 
 // deleteLogicalPort removes the logical port
 func (agent *LogicalDeviceAgent) deleteLogicalPort(ctx context.Context, lPort *voltha.LogicalPort) error {
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	logicalDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -1368,8 +1465,10 @@
 
 // deleteLogicalPorts removes the logical ports associated with that deviceId
 func (agent *LogicalDeviceAgent) deleteLogicalPorts(ctx context.Context, deviceID string) error {
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	logicalDevice := agent.getLogicalDeviceWithoutLock()
 	lPortstoKeep := []*voltha.LogicalPort{}
@@ -1403,8 +1502,10 @@
 
 // enableLogicalPort enables the logical port
 func (agent *LogicalDeviceAgent) enableLogicalPort(ctx context.Context, lPortID string) error {
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	logicalDevice := agent.getLogicalDeviceWithoutLock()
 
@@ -1424,8 +1525,10 @@
 
 // disableLogicalPort disabled the logical port
 func (agent *LogicalDeviceAgent) disableLogicalPort(ctx context.Context, lPortID string) error {
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
 
 	// Get the most up to date logical device
 	logicalDevice := agent.getLogicalDeviceWithoutLock()
@@ -1546,9 +1649,12 @@
 
 //rebuildRoutes rebuilds the device routes
 func (agent *LogicalDeviceAgent) buildRoutes(ctx context.Context) error {
-	log.Debugw("building-routes", log.Fields{"logical-device-id": agent.logicalDeviceID})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	log.Debugf("building-routes", log.Fields{"logical-device-id": agent.logicalDeviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
 	if agent.deviceRoutes == nil {
 		agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
 	}
@@ -1568,8 +1674,11 @@
 //updateRoutes updates the device routes
 func (agent *LogicalDeviceAgent) updateRoutes(ctx context.Context, lp *voltha.LogicalPort) error {
 	log.Debugw("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
 	if agent.deviceRoutes == nil {
 		agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
 	}
@@ -1673,13 +1782,15 @@
 		log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
 		return false, nil
 	}
-	agent.lockLogicalDevice.RLock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return false, err
+	}
 	if agent.portExist(device, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
-		agent.lockLogicalDevice.RUnlock()
+		agent.requestQueue.RequestComplete()
 		return false, nil
 	}
-	agent.lockLogicalDevice.RUnlock()
+	agent.requestQueue.RequestComplete()
 
 	var portCap *ic.PortCapability
 	var err error
@@ -1689,8 +1800,11 @@
 		return false, err
 	}
 
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return false, err
+	}
+
+	defer agent.requestQueue.RequestComplete()
 	// Double check again if this port has been already added since the getPortCapability could have taken a long time
 	if agent.portExist(device, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
@@ -1722,7 +1836,7 @@
 	clonedLP := (proto.Clone(lp)).(*voltha.LogicalPort)
 	go func() {
 		if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
-			log.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
+			log.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
 		}
 	}()
 
@@ -1749,13 +1863,16 @@
 		log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
 		return false, nil
 	}
-	agent.lockLogicalDevice.RLock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return false, err
+	}
+
 	if agent.portExist(childDevice, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
-		agent.lockLogicalDevice.RUnlock()
+		agent.requestQueue.RequestComplete()
 		return false, nil
 	}
-	agent.lockLogicalDevice.RUnlock()
+	agent.requestQueue.RequestComplete()
 	var portCap *ic.PortCapability
 	var err error
 	// First get the port capability
@@ -1763,8 +1880,10 @@
 		log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
 		return false, err
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return false, err
+	}
+	defer agent.requestQueue.RequestComplete()
 	// Double check again if this port has been already added since the getPortCapability could have taken a long time
 	if agent.portExist(childDevice, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})