[VOL-1890] Flow decomposition fails after a core switch over

Cherry Pick into the master branch from voltha-2.1

Change-Id: I84c3a83b5b9115d6ec334af29634486ba7148634
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 49e1463..65b3b30 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -50,6 +50,7 @@
 	portProxies        map[string]*model.Proxy
 	portProxiesLock    sync.RWMutex
 	lockLogicalDevice  sync.RWMutex
+	lockDeviceGraph    sync.RWMutex
 	logicalPortsNo     map[uint32]bool //value is true for NNI port
 	lockLogicalPortsNo sync.RWMutex
 	flowDecomposer     *fd.FlowDecomposer
@@ -71,6 +72,7 @@
 	agent.portProxies = make(map[string]*model.Proxy)
 	agent.portProxiesLock = sync.RWMutex{}
 	agent.lockLogicalPortsNo = sync.RWMutex{}
+	agent.lockDeviceGraph = sync.RWMutex{}
 	agent.logicalPortsNo = make(map[uint32]bool)
 	agent.defaultTimeout = timeout
 	return &agent
@@ -130,8 +132,6 @@
 		// Setup the local list of logical ports
 		agent.addLogicalPortsToMap(ld.Ports)
 
-		// Setup the device graph
-		agent.generateDeviceGraph()
 	}
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
@@ -161,6 +161,10 @@
 		return status.Error(codes.Internal, "logical-device-proxy-null")
 	}
 
+	// Setup the device graph - run it in its own routine
+	if loadFromdB {
+		go agent.generateDeviceGraph()
+	}
 	return nil
 }
 
@@ -267,32 +271,53 @@
 	return nil
 }
 
-//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+//updateLogicalDeviceFlowsWithoutLock updates the logical device with the latest flows in the model.
 func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
-	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-	afterUpdate := agent.flowProxy.Update(updateCtx, "/", flows, false, "")
-	if afterUpdate == nil {
-		return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
+	ld, err := agent.getLogicalDeviceWithoutLock()
+	if err != nil {
+		return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+	}
+	log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+	cloned.Flows = flows
+
+	if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+		log.Errorw("error-updating-logical-device-with-flows", log.Fields{"error": err})
+		return err
 	}
 	return nil
 }
 
-//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+//updateLogicalDeviceMetersWithoutLock updates the logical device with the meters info
 func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(meters *ofp.Meters) error {
-	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-	afterUpdate := agent.meterProxy.Update(updateCtx, "/", meters, false, "")
-	if afterUpdate == nil {
-		return status.Errorf(codes.Internal, "failed-updating-logical-device-meters:%s", agent.logicalDeviceId)
+	ld, err := agent.getLogicalDeviceWithoutLock()
+	if err != nil {
+		return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+	}
+	log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+	cloned.Meters = meters
+
+	if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+		log.Errorw("error-updating-logical-device-with-meters", log.Fields{"error": err})
+		return err
 	}
 	return nil
 }
 
-//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+//updateLogicalDeviceFlowGroupsWithoutLock updates the logical device with the flow groups
 func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
-	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-	afterUpdate := agent.groupProxy.Update(updateCtx, "/", flowGroups, false, "")
-	if afterUpdate == nil {
-		return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
+	ld, err := agent.getLogicalDeviceWithoutLock()
+	if err != nil {
+		return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+	}
+	log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+	cloned.FlowGroups = flowGroups
+
+	if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+		log.Errorw("error-updating-logical-device-with-flowgroups", log.Fields{"error": err})
+		return err
 	}
 	return nil
 }
@@ -507,12 +532,33 @@
 	return nil
 }
 
+//generateDeviceGraphIfNeeded generates the device graph if the logical device has been updated since the last time
+//that device graph was generated.
+func (agent *LogicalDeviceAgent) generateDeviceGraphIfNeeded() error {
+	if ld, err := agent.GetLogicalDevice(); err != nil {
+		log.Errorw("get-logical-device-error", log.Fields{"error": err})
+		return err
+	} else {
+		agent.lockDeviceGraph.Lock()
+		defer agent.lockDeviceGraph.Unlock()
+		if agent.deviceGraph != nil && agent.deviceGraph.IsUpToDate(ld) {
+			return nil
+		}
+		log.Debug("Generation of device graph required")
+		agent.generateDeviceGraph()
+	}
+	return nil
+}
+
 //updateFlowTable updates the flow table of that logical device
 func (agent *LogicalDeviceAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
 	log.Debug("updateFlowTable")
 	if flow == nil {
 		return nil
 	}
+	if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+		return err
+	}
 	switch flow.GetCommand() {
 	case ofp.OfpFlowModCommand_OFPFC_ADD:
 		return agent.flowAdd(flow)
@@ -535,6 +581,9 @@
 	if groupMod == nil {
 		return nil
 	}
+	if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+		return err
+	}
 	switch groupMod.GetCommand() {
 	case ofp.OfpGroupModCommand_OFPGC_ADD:
 		return agent.groupAdd(groupMod)
@@ -553,6 +602,9 @@
 	if meterMod == nil {
 		return nil
 	}
+	if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+		return err
+	}
 	switch meterMod.GetCommand() {
 	case ofp.OfpMeterModCommand_OFPMC_ADD:
 		return agent.meterAdd(meterMod)
@@ -765,7 +817,7 @@
 
 //flowAdd adds a flow to the flow table of that logical device
 func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
-	log.Debug("flowAdd")
+	log.Debugw("flowAdd", log.Fields{"flow": mod})
 	if mod == nil {
 		return nil
 	}
@@ -826,6 +878,8 @@
 			changed = true
 		}
 	}
+	log.Debugw("flowAdd-changed", log.Fields{"changed": changed})
+
 	if changed {
 		var flowMetadata voltha.FlowMetadata
 		if err := agent.GetMeterConfig(updatedFlows, meters, &flowMetadata); err != nil { // This should never happen,meters should be installed before flow arrives
@@ -1500,14 +1554,14 @@
 
 //generateDeviceGraph regenerates the device graph
 func (agent *LogicalDeviceAgent) generateDeviceGraph() {
-	log.Debugf("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+	log.Debugw("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 	// Get the latest logical device
 	if ld, err := agent.getLogicalDeviceWithoutLock(); err != nil {
 		log.Errorw("logical-device-not-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 	} else {
-		log.Debugw("generating-graph", log.Fields{"lDeviceId": agent.logicalDeviceId, "deviceGraph": agent.deviceGraph, "lPorts": len(ld.Ports)})
+		log.Debugw("generating-graph", log.Fields{"lDeviceId": agent.logicalDeviceId, "lPorts": len(ld.Ports)})
 		if agent.deviceGraph == nil {
 			agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
 		}