[VOL-1890] Flow decomposition fails after a core switch over

This commit fixes the issue reported in VOL-1890 by doing
the following:
1) Update the device graph if the device graph was built
with an older logical device topology (this can happen in
the standby by core as currently the model does not trigger
a core callback after it updates its data in memory following
a watch event from the KV store - this is a TODO item).
2) Update flows in a logical device without using the data
model proxy to flows directly.
3) Reconcile the list of devices and logical devices in the
core memory after a restart upon receive a reconcile request
from the api server.

After first review:
1) Update the code as per the comments
2) Change the device loading channel to have two simultaneous
routines invoking the device loading for the same device to
return the same result (i.e. if it's a success then both
should return success and conversely in the failure case)

After second review:
Change the sync map controlling the device in loading state
to regular map.

Change-Id: I4331ac2a8f87a7de272e919a31dfe4bbaaf327a0
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 49e1463..65b3b30 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -50,6 +50,7 @@
 	portProxies        map[string]*model.Proxy
 	portProxiesLock    sync.RWMutex
 	lockLogicalDevice  sync.RWMutex
+	lockDeviceGraph    sync.RWMutex
 	logicalPortsNo     map[uint32]bool //value is true for NNI port
 	lockLogicalPortsNo sync.RWMutex
 	flowDecomposer     *fd.FlowDecomposer
@@ -71,6 +72,7 @@
 	agent.portProxies = make(map[string]*model.Proxy)
 	agent.portProxiesLock = sync.RWMutex{}
 	agent.lockLogicalPortsNo = sync.RWMutex{}
+	agent.lockDeviceGraph = sync.RWMutex{}
 	agent.logicalPortsNo = make(map[uint32]bool)
 	agent.defaultTimeout = timeout
 	return &agent
@@ -130,8 +132,6 @@
 		// Setup the local list of logical ports
 		agent.addLogicalPortsToMap(ld.Ports)
 
-		// Setup the device graph
-		agent.generateDeviceGraph()
 	}
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
@@ -161,6 +161,10 @@
 		return status.Error(codes.Internal, "logical-device-proxy-null")
 	}
 
+	// Setup the device graph - run it in its own routine
+	if loadFromdB {
+		go agent.generateDeviceGraph()
+	}
 	return nil
 }
 
@@ -267,32 +271,53 @@
 	return nil
 }
 
-//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+//updateLogicalDeviceFlowsWithoutLock updates the logical device with the latest flows in the model.
 func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
-	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-	afterUpdate := agent.flowProxy.Update(updateCtx, "/", flows, false, "")
-	if afterUpdate == nil {
-		return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
+	ld, err := agent.getLogicalDeviceWithoutLock()
+	if err != nil {
+		return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+	}
+	log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+	cloned.Flows = flows
+
+	if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+		log.Errorw("error-updating-logical-device-with-flows", log.Fields{"error": err})
+		return err
 	}
 	return nil
 }
 
-//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+//updateLogicalDeviceMetersWithoutLock updates the logical device with the meters info
 func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(meters *ofp.Meters) error {
-	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-	afterUpdate := agent.meterProxy.Update(updateCtx, "/", meters, false, "")
-	if afterUpdate == nil {
-		return status.Errorf(codes.Internal, "failed-updating-logical-device-meters:%s", agent.logicalDeviceId)
+	ld, err := agent.getLogicalDeviceWithoutLock()
+	if err != nil {
+		return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+	}
+	log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+	cloned.Meters = meters
+
+	if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+		log.Errorw("error-updating-logical-device-with-meters", log.Fields{"error": err})
+		return err
 	}
 	return nil
 }
 
-//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+//updateLogicalDeviceFlowGroupsWithoutLock updates the logical device with the flow groups
 func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
-	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-	afterUpdate := agent.groupProxy.Update(updateCtx, "/", flowGroups, false, "")
-	if afterUpdate == nil {
-		return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
+	ld, err := agent.getLogicalDeviceWithoutLock()
+	if err != nil {
+		return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+	}
+	log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+	cloned.FlowGroups = flowGroups
+
+	if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+		log.Errorw("error-updating-logical-device-with-flowgroups", log.Fields{"error": err})
+		return err
 	}
 	return nil
 }
@@ -507,12 +532,33 @@
 	return nil
 }
 
+//generateDeviceGraphIfNeeded generates the device graph if the logical device has been updated since the last time
+//that device graph was generated.
+func (agent *LogicalDeviceAgent) generateDeviceGraphIfNeeded() error {
+	if ld, err := agent.GetLogicalDevice(); err != nil {
+		log.Errorw("get-logical-device-error", log.Fields{"error": err})
+		return err
+	} else {
+		agent.lockDeviceGraph.Lock()
+		defer agent.lockDeviceGraph.Unlock()
+		if agent.deviceGraph != nil && agent.deviceGraph.IsUpToDate(ld) {
+			return nil
+		}
+		log.Debug("Generation of device graph required")
+		agent.generateDeviceGraph()
+	}
+	return nil
+}
+
 //updateFlowTable updates the flow table of that logical device
 func (agent *LogicalDeviceAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
 	log.Debug("updateFlowTable")
 	if flow == nil {
 		return nil
 	}
+	if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+		return err
+	}
 	switch flow.GetCommand() {
 	case ofp.OfpFlowModCommand_OFPFC_ADD:
 		return agent.flowAdd(flow)
@@ -535,6 +581,9 @@
 	if groupMod == nil {
 		return nil
 	}
+	if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+		return err
+	}
 	switch groupMod.GetCommand() {
 	case ofp.OfpGroupModCommand_OFPGC_ADD:
 		return agent.groupAdd(groupMod)
@@ -553,6 +602,9 @@
 	if meterMod == nil {
 		return nil
 	}
+	if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+		return err
+	}
 	switch meterMod.GetCommand() {
 	case ofp.OfpMeterModCommand_OFPMC_ADD:
 		return agent.meterAdd(meterMod)
@@ -765,7 +817,7 @@
 
 //flowAdd adds a flow to the flow table of that logical device
 func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
-	log.Debug("flowAdd")
+	log.Debugw("flowAdd", log.Fields{"flow": mod})
 	if mod == nil {
 		return nil
 	}
@@ -826,6 +878,8 @@
 			changed = true
 		}
 	}
+	log.Debugw("flowAdd-changed", log.Fields{"changed": changed})
+
 	if changed {
 		var flowMetadata voltha.FlowMetadata
 		if err := agent.GetMeterConfig(updatedFlows, meters, &flowMetadata); err != nil { // This should never happen,meters should be installed before flow arrives
@@ -1500,14 +1554,14 @@
 
 //generateDeviceGraph regenerates the device graph
 func (agent *LogicalDeviceAgent) generateDeviceGraph() {
-	log.Debugf("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+	log.Debugw("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 	// Get the latest logical device
 	if ld, err := agent.getLogicalDeviceWithoutLock(); err != nil {
 		log.Errorw("logical-device-not-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 	} else {
-		log.Debugw("generating-graph", log.Fields{"lDeviceId": agent.logicalDeviceId, "deviceGraph": agent.deviceGraph, "lPorts": len(ld.Ports)})
+		log.Debugw("generating-graph", log.Fields{"lDeviceId": agent.logicalDeviceId, "lPorts": len(ld.Ports)})
 		if agent.deviceGraph == nil {
 			agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
 		}