This commit fixes some issues in the Core:

1) Fix the loading of the logical device agent in the Core that does
not own the device.
2) Fix an issue when UNI logical ports were ready to be added but
the logical device was not fully ready.
3) Fix an issue with a deadlock condition when multiple flows were
added to the same device
4) Update the logic when receiving requests to process flows from
OFAgent.  The logic will need to be revamped in a subsequent
commit once OFAgent is able to send transactions IDs when issueing
Flow updates.
5) Setup device ownership after a device has been loaded in memory.

Change-Id: I2d604e2ba89e5af21f96871414852c2b6ef85f08
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 0de7953..dbd09e4 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -1067,16 +1067,22 @@
 	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val, "packet": packet,
 		"transactionID": transactionID.Val})
 
-	// For performance reason, we do not compete for packet-in.  We process it and send the packet in.  later in the
-	// processing flow the duplicate packet will be discarded
-
+	// Try to grab the transaction as this core may be competing with another Core
+	// TODO: If this adds too much latencies then needs to remove transaction and let OFAgent filter out
+	// duplicates.
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
 	go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
-	//if err := rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload); err != nil {
-	//	return nil, err
-	//}
+
 	return new(empty.Empty), nil
 }
 
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index a96166d..faf8da8 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -21,6 +21,7 @@
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/kafka"
+	"github.com/opencord/voltha-go/rw_core/utils"
 	ic "github.com/opencord/voltha-protos/go/inter_container"
 	ofp "github.com/opencord/voltha-protos/go/openflow_13"
 	"github.com/opencord/voltha-protos/go/voltha"
@@ -121,14 +122,21 @@
 		dMgr.lockDeviceAgentsMap.RUnlock()
 		return agent
 	} else {
-		//	Try to load into memory - loading will also create the device agent
+		//	Try to load into memory - loading will also create the device agent and set the device ownership
 		dMgr.lockDeviceAgentsMap.RUnlock()
 		if err := dMgr.load(deviceId); err == nil {
 			dMgr.lockDeviceAgentsMap.RLock()
 			defer dMgr.lockDeviceAgentsMap.RUnlock()
-			if agent, ok = dMgr.deviceAgents[deviceId]; ok {
+			if agent, ok = dMgr.deviceAgents[deviceId]; !ok {
+				return nil
+			} else {
+				// Register this device for ownership tracking
+				go dMgr.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: deviceId})
 				return agent
 			}
+		} else {
+			//TODO: Change the return params to return an error as well
+			log.Errorw("loading-device-failed", log.Fields{"deviceId": deviceId, "error": err})
 		}
 	}
 	return nil
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index ed0fd08..e746444 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -190,6 +190,7 @@
 
 	deviceOwned, ownedByMe := da.getOwnership(ownershipKey)
 	if deviceOwned {
+		log.Debugw("ownership", log.Fields{"Id": ownershipKey, "owned": ownedByMe})
 		return ownedByMe
 	}
 	// Not owned by me or maybe nobody else.  Try to reserve it
@@ -283,6 +284,7 @@
 // device Id of a child device or the rootdevice of a logical device.   This function also returns the
 // id in string format of the id param via the ref output as well as if the data was retrieved from cache
 func (da *DeviceOwnership) getOwnershipKey(id interface{}) (ownershipKey string, ref string, cached bool, err error) {
+
 	if id == nil {
 		return "", "", false, status.Error(codes.InvalidArgument, "nil-id")
 	}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 2c3b35e..0b251a2 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -107,9 +107,11 @@
 		// Metadata in context
 		if _, ok = md[handler.core.config.CoreBindingKey]; ok {
 			// OFAgent field in metadata
+			log.Debug("OFController-request")
 			return true
 		}
 	}
+	log.Debug("not-OFController-request")
 	return false
 }
 
@@ -284,13 +286,17 @@
 		return out, nil
 	}
 
+	// TODO: Update this logic when the OF Controller (OFAgent in this case) is able to send a transaction Id in its
+	// request (the api-router binds the OfAgent to two Cores in a pair and let the traffic flows transparently)
 	if handler.competeForTransaction() {
-		if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
+		if !handler.isOFControllerRequest(ctx) {
 			if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {
 				defer txn.Close()
 			}
+		} else if !handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: flow.Id}) {
+			return new(empty.Empty), nil
 		}
 	}
 
@@ -307,6 +313,8 @@
 		return out, nil
 	}
 
+	// TODO: Update this logic when the OF Controller (OFAgent in this case) is able to send a transaction Id in its
+	// request (the api-router binds the OfAgent to two Cores in a pair and let the traffic flows transparently)
 	if handler.competeForTransaction() {
 		if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
 			if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
@@ -314,6 +322,8 @@
 			} else {
 				defer txn.Close()
 			}
+		} else if !handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: flow.Id}) {
+			return new(empty.Empty), nil
 		}
 	}
 
@@ -720,9 +730,15 @@
 
 func (handler *APIHandler) forwardPacketOut(packet *openflow_13.PacketOut) {
 	log.Debugw("forwardPacketOut-request", log.Fields{"packet": packet})
-	agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id)
-	agent.packetOut(packet.PacketOut)
+	//TODO: Update this logic once the OF Controller (OFAgent in this case) can include a transaction Id in its
+	// request.  For performance reason we can let both Cores in a Core-Pair forward the Packet to the adapters and
+	// let once of the shim layer (kafka proxy or adapter request handler filters out the duplicate packet)
+	if handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: packet.Id}) {
+		agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id)
+		agent.packetOut(packet.PacketOut)
+	}
 }
+
 func (handler *APIHandler) StreamPacketsOut(
 	packets voltha.VolthaService_StreamPacketsOutServer,
 ) error {
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 057cc6b..bb25b4b 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -114,8 +114,9 @@
 		}
 		agent.lockLogicalDevice.Unlock()
 
-		// TODO:  Set the NNI ports in a separate call once the port update issue is fixed.
-		go agent.setupNNILogicalPorts(ctx, agent.rootDeviceId)
+		// TODO:  Set the logical ports in a separate call once the port update issue is fixed.
+		go agent.setupLogicalPorts(ctx)
+
 	} else {
 		//	load from dB - the logical may not exist at this time.  On error, just return and the calling function
 		// will destroy this agent.
@@ -124,10 +125,18 @@
 			log.Warnw("failed-to-load-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 			return err
 		}
+
 		// Update the root device Id
 		agent.rootDeviceId = ld.RootDeviceId
+
+		// Setup the local list of logical ports
+		agent.addLogicalPortsToMap(ld.Ports)
+
+		// Setup the device graph
+		agent.generateDeviceGraph()
 	}
 	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
 
 	agent.flowProxy = agent.clusterDataProxy.CreateProxy(
 		fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
@@ -140,12 +149,15 @@
 		false)
 
 	// TODO:  Use a port proxy once the POST_ADD is fixed
-	agent.ldProxy.RegisterCallback(model.POST_UPDATE, agent.portUpdated)
+	if agent.ldProxy != nil {
+		agent.ldProxy.RegisterCallback(model.POST_UPDATE, agent.portUpdated)
+	} else {
+		log.Errorw("logical-device-proxy-null", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		return status.Error(codes.Internal, "logical-device-proxy-null")
+	}
 
 	agent.includeDefaultFlows = true
 
-	agent.lockLogicalDevice.Unlock()
-
 	return nil
 }
 
@@ -289,6 +301,42 @@
 	return nil
 }
 
+// setupLogicalPorts is invoked once the logical device has been created and is ready to get ports
+// added to it.  While the logical device was being created we could have received requests to add
+// NNI and UNI ports which were discarded.  Now is the time to add them if needed
+func (agent *LogicalDeviceAgent) setupLogicalPorts(ctx context.Context) error {
+	log.Infow("setupLogicalPorts", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+	// First add any NNI ports which could have been missing
+	if err := agent.setupNNILogicalPorts(nil, agent.rootDeviceId); err != nil {
+		log.Errorw("error-setting-up-NNI-ports", log.Fields{"error": err, "deviceId": agent.rootDeviceId})
+		return err
+	}
+
+	// Now, set up the UNI ports if needed.
+	if children, err := agent.deviceMgr.getAllChildDevices(agent.rootDeviceId); err != nil {
+		log.Errorw("error-getting-child-devices", log.Fields{"error": err, "deviceId": agent.rootDeviceId})
+		return err
+	} else {
+		chnlsList := make([]chan interface{}, 0)
+		for _, child := range children.Items {
+			ch := make(chan interface{})
+			chnlsList = append(chnlsList, ch)
+			go func(device *voltha.Device, ch chan interface{}) {
+				if err = agent.setupUNILogicalPorts(nil, device); err != nil {
+					log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": device.Id})
+					ch <- status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", device.Id)
+				}
+				ch <- nil
+			}(child, ch)
+		}
+		// Wait for completion
+		if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+			return status.Errorf(codes.Aborted, "errors-%s", res)
+		}
+	}
+	return nil
+}
+
 // setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
 func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
 	log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
@@ -349,7 +397,7 @@
 
 // setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
 func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
-	log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+	log.Infow("setupUNILogicalPort", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
 	var err error
 
@@ -806,7 +854,7 @@
 			return err
 		}
 		// Reset the logical device graph
-		go agent.redoDeviceGraph()
+		go agent.generateDeviceGraph()
 	}
 	return nil
 }
@@ -835,7 +883,7 @@
 		return err
 	}
 	// Reset the logical device graph
-	go agent.redoDeviceGraph()
+	go agent.generateDeviceGraph()
 
 	return nil
 }
@@ -1067,13 +1115,16 @@
 	return agent.DefaultFlowRules
 }
 
+//GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and
+//returns their port numbers.  This function is invoked only during flow decomposition where the lock on the logical
+//device is already held.  Therefore it is safe to retrieve the logical device without lock.
 func (agent *LogicalDeviceAgent) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
 	lPorts := make([]uint32, 0)
 	var exclPort uint32
 	if len(excludePort) == 1 {
 		exclPort = excludePort[0]
 	}
-	if lDevice, _ := agent.GetLogicalDevice(); lDevice != nil {
+	if lDevice, _ := agent.getLogicalDeviceWithoutLock(); lDevice != nil {
 		for _, port := range lDevice.Ports {
 			if port.OfpPort.PortNo != exclPort {
 				lPorts = append(lPorts, port.OfpPort.PortNo)
@@ -1143,18 +1194,21 @@
 	agent.deviceGraph.Print()
 }
 
-//redoDeviceGraph regenerates the device graph upon port changes on a device graph
-//TODO: it may yield better performance to have separate deleteLogicalPort functions that would remove
-// all the routes/nodes related to the deleted logical port.
-func (agent *LogicalDeviceAgent) redoDeviceGraph() {
-	log.Debugf("redoDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+//generateDeviceGraph regenerates the device graph
+func (agent *LogicalDeviceAgent) generateDeviceGraph() {
+	log.Debugf("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 	// Get the latest logical device
 	if ld, err := agent.getLogicalDeviceWithoutLock(); err != nil {
 		log.Errorw("logical-device-not-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 	} else {
+		log.Debugw("generating-graph", log.Fields{"lDeviceId": agent.logicalDeviceId, "deviceGraph": agent.deviceGraph, "lPorts": len(ld.Ports)})
+		if agent.deviceGraph == nil {
+			agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
+		}
 		agent.deviceGraph.ComputeRoutes(ld.Ports)
+		agent.deviceGraph.Print()
 	}
 }
 
@@ -1458,6 +1512,16 @@
 	}
 }
 
+func (agent *LogicalDeviceAgent) addLogicalPortsToMap(lps []*voltha.LogicalPort) {
+	agent.lockLogicalPortsNo.Lock()
+	defer agent.lockLogicalPortsNo.Unlock()
+	for _, lp := range lps {
+		if exist := agent.logicalPortsNo[lp.DevicePortNo]; !exist {
+			agent.logicalPortsNo[lp.DevicePortNo] = lp.RootPort
+		}
+	}
+}
+
 func (agent *LogicalDeviceAgent) deleteLogicalPortFromMap(portNo uint32) {
 	agent.lockLogicalPortsNo.Lock()
 	defer agent.lockLogicalPortsNo.Unlock()
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index b6493c5..37b3c3f 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -414,7 +414,7 @@
 
 	log.Debugw("setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
 
-	if parentId == "" || logDeviceId == nil {
+	if parentId == "" || logDeviceId == nil || *logDeviceId == "" {
 		return errors.New("device-in-invalid-state")
 	}
 
diff --git a/rw_core/graph/device_graph.go b/rw_core/graph/device_graph.go
index 5583023..bdb72f3 100644
--- a/rw_core/graph/device_graph.go
+++ b/rw_core/graph/device_graph.go
@@ -244,6 +244,8 @@
 		return g
 	}
 
+	log.Debugw("Adding-device", log.Fields{"deviceId": device.Id, "ports": device.Ports})
+
 	if _, exist := (*devicesAdded)[device.Id]; !exist {
 		g.AddNode(goraph.NewNode(device.Id))
 		(*devicesAdded)[device.Id] = device.Id