[VOL-1547] Add port to logical device when device is active

This commit consists of the following changes:
1)  Fix the issue where flows were received when the logical
device flow graph was not ready.
2) Update the default kafka config for improved performance
3) Add a lock to the device ownership logic to ensure the
lock map does not get corrupted.

Change-Id: I840d572e06ed5acf0f3bc1ce423a0ada8f335543
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index b1a68cc..732e9cf 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -22,12 +22,12 @@
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
-	ic "github.com/opencord/voltha-protos/go/inter_container"
-	ofp "github.com/opencord/voltha-protos/go/openflow_13"
-	"github.com/opencord/voltha-protos/go/voltha"
 	fd "github.com/opencord/voltha-go/rw_core/flow_decomposition"
 	"github.com/opencord/voltha-go/rw_core/graph"
 	fu "github.com/opencord/voltha-go/rw_core/utils"
+	ic "github.com/opencord/voltha-protos/go/inter_container"
+	ofp "github.com/opencord/voltha-protos/go/openflow_13"
+	"github.com/opencord/voltha-protos/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 	"reflect"
@@ -248,24 +248,31 @@
 
 
 func (agent *LogicalDeviceAgent)  addLogicalPort (device *voltha.Device, port *voltha.Port) error {
+	log.Debugw("addLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
+	var changed bool
+	var err error
 	if port.Type == voltha.Port_ETHERNET_NNI {
-		if err := agent.addNNILogicalPort(device, port); err != nil {
+		if changed, err = agent.addNNILogicalPort(device, port); err != nil {
 			return err
 		}
 	} else if port.Type == voltha.Port_ETHERNET_UNI {
-		if err :=  agent.addUNILogicalPort(device, port); err != nil {
+		if changed, err =  agent.addUNILogicalPort(device, port); err != nil {
 			return err
 		}
 	} else {
 		log.Debugw("invalid-port-type", log.Fields{"deviceId": device.Id, "port": port})
 		return nil
 	}
-	go agent.setupDeviceGraph()
+	if changed {
+		go agent.setupDeviceGraph()
+	}
 	return nil
 }
 
 // setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
 func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
+	//now := time.Now()
+	//defer fmt.Println("setupNNILogicalPorts:", deviceId, time.Since(now))
 	log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
 	var err error
@@ -279,11 +286,12 @@
 	//Get UNI port number
 	changesMade := false
 	for _, port := range device.Ports {
+		changed := false
 		if port.Type == voltha.Port_ETHERNET_NNI {
-			if err = agent.addNNILogicalPort(device, port); err != nil {
+			if changed, err = agent.addNNILogicalPort(device, port); err != nil {
 				log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
 			} else {
-				changesMade = true
+				changesMade = changed || changesMade
 			}
 		}
 	}
@@ -296,6 +304,8 @@
 
 // setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
 func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
+	//now := time.Now()
+	//defer fmt.Println("setupUNILogicalPorts:", childDevice.Id, time.Since(now))
 	log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
 	var err error
@@ -303,11 +313,12 @@
 	//Get UNI port number
 	changesMade := false
 	for _, port := range childDevice.Ports {
+		changed := false
 		if port.Type == voltha.Port_ETHERNET_UNI {
-			if err = agent.addUNILogicalPort(childDevice, port); err != nil {
+			if changed, err = agent.addUNILogicalPort(childDevice, port); err != nil {
 				log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
 			} else {
-				changesMade = true
+				changesMade = changed || changesMade
 			}
 		}
 	}
@@ -797,6 +808,14 @@
 		if isNNIPort(ingressPortNo, nniLogicalPortsNo) {
 			log.Debug("returning-half-route")
 			//This is a trap on the NNI Port
+			if len(agent.deviceGraph.Routes) == 0 {
+				// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
+				// internal route
+				hop := graph.RouteHop{DeviceID:ld.RootDeviceId, Ingress:ingressPortNo, Egress:egressPortNo}
+				routes = append(routes, hop)
+				routes = append(routes, hop)
+				return routes
+			}
 			//Return a 'half' route to make the flow decomposer logic happy
 			for routeLink, route := range agent.deviceGraph.Routes {
 				if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
@@ -1208,17 +1227,23 @@
 }
 
 
-func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port)  error {
+// addNNILogicalPort adds an NNI port to the logical device.  It returns a bool representing whether a port has been
+// added and an eror in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil).   If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port)  (bool, error) {
+	//now := time.Now()
+	//defer fmt.Println("setupNNILogicalPorts:", device.Id, time.Since(now))
 	log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
-	if device.AdminState != voltha.AdminState_ENABLED {
-		log.Infow("device-not-enabled", log.Fields{"deviceId": device.Id})
-		return nil
+	if device.AdminState != voltha.AdminState_ENABLED || device.OperStatus != voltha.OperStatus_ACTIVE {
+		log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
+		return false, nil
 	}
 	agent.lockLogicalDevice.RLock()
 	if agent.portExist(device, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
 		agent.lockLogicalDevice.RUnlock()
-		return nil
+		return false, nil
 	}
 	agent.lockLogicalDevice.RUnlock()
 
@@ -1227,7 +1252,7 @@
 	// First get the port capability
 	if portCap, err = agent.deviceMgr.getPortCapability(nil, device.Id, port.PortNo); err != nil {
 		log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
-		return err
+		return false, err
 	}
 
 	agent.lockLogicalDevice.Lock()
@@ -1235,7 +1260,7 @@
 	// Double check again if this port has been already added since the getPortCapability could have taken a long time
 	if agent.portExist(device, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
-		return nil
+		return false, nil
 	}
 
 	portCap.Port.RootPort = true
@@ -1249,7 +1274,7 @@
 	var ld *voltha.LogicalDevice
 	if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
 		log.Errorw("error-retrieving-logical-device", log.Fields{"error": err})
-		return err
+		return false, err
 	}
 	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
 	if cloned.Ports == nil {
@@ -1259,9 +1284,9 @@
 
 	if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
 		log.Errorw("error-updating-logical-device", log.Fields{"error": err})
-		return err
+		return false, err
 	}
-	return nil
+	return true, nil
 }
 
 func (agent *LogicalDeviceAgent) portExist (device *voltha.Device, port *voltha.Port) bool {
@@ -1275,17 +1300,24 @@
 	return false
 }
 
-func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port)  error {
+
+// addUNILogicalPort adds an UNI port to the logical device.  It returns a bool representing whether a port has been
+// added and an eror in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil).   If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port)  (bool, error) {
+	//now := time.Now()
+	//defer fmt.Println("addUNILogicalPort:", childDevice.Id, time.Since(now))
 	log.Debugw("addUNILogicalPort", log.Fields{"port": port})
-	if childDevice.AdminState != voltha.AdminState_ENABLED {
-		log.Infow("device-not-enabled", log.Fields{"deviceId": childDevice.Id})
-		return nil
+	if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
+		log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
+		return false, nil
 	}
 	agent.lockLogicalDevice.RLock()
 	if agent.portExist(childDevice, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
 		agent.lockLogicalDevice.RUnlock()
-		return nil
+		return false, nil
 	}
 	agent.lockLogicalDevice.RUnlock()
 	var portCap *ic.PortCapability
@@ -1293,18 +1325,18 @@
 	// First get the port capability
 	if portCap, err = agent.deviceMgr.getPortCapability(nil, childDevice.Id, port.PortNo); err != nil {
 		log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
-		return err
+		return false, err
 	}
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 	// Double check again if this port has been already added since the getPortCapability could have taken a long time
 	if agent.portExist(childDevice, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
-		return nil
+		return false, nil
 	}
 	// Get stored logical device
 	if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
-		return status.Error(codes.NotFound, agent.logicalDeviceId)
+		return false, status.Error(codes.NotFound, agent.logicalDeviceId)
 	} else {
 		log.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
 		portCap.Port.RootPort = false
@@ -1318,7 +1350,7 @@
 			cloned.Ports = make([]*voltha.LogicalPort, 0)
 		}
 		cloned.Ports = append(cloned.Ports, portCap.Port)
-		return agent.updateLogicalDeviceWithoutLock(cloned)
+		return true, agent.updateLogicalDeviceWithoutLock(cloned)
 	}
 }