[VOL-1512] Set device ownership

This commit consists of the following:
1) Set device ownership per Core in a Core-pair such that only 1
Core actively process a device (i.e. handles all the requests for
that device) while the other Core in the pair passively watch for
updates on that device and will take over in case the owner Core
failed to process the transaction.
2) Cleanup the lock mechanisms to ensure we use a read lock when
needed instead of just a lock.
3) Update logical port additions such that ports are added only when
the device is enabled.
4) Update the port Ids for the logical ports.
5) Update some sarama client configs for performance - this is an
ongoing tune up.
6) Update the adapter request handler in the Core to send back an
ACK immediately to the adapter request instead of processing the
request fully and then sending an ACK.  This reduces the latency
over kafka and therefore reduces the likelihood of timeouts.

Change-Id: I9149bf3ba6fbad38e3a29c76ea8dba2f9f731d29
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 03edb57..a8e6a70 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -168,8 +168,8 @@
 // GetLogicalDevice locks the logical device model and then retrieves the latest logical device information
 func (agent *LogicalDeviceAgent) GetLogicalDevice() (*voltha.LogicalDevice, error) {
 	log.Debug("GetLogicalDevice")
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	agent.lockLogicalDevice.RLock()
+	defer agent.lockLogicalDevice.RUnlock()
 	logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
 	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
 		return lDevice, nil
@@ -178,9 +178,9 @@
 }
 
 func (agent *LogicalDeviceAgent) ListLogicalDevicePorts() (*voltha.LogicalPorts, error) {
-	log.Debug("!!!!!ListLogicalDevicePorts")
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	log.Debug("ListLogicalDevicePorts")
+	agent.lockLogicalDevice.RLock()
+	defer agent.lockLogicalDevice.RUnlock()
 	logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
 	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
 		lPorts := make([]*voltha.LogicalPort, 0)
@@ -195,8 +195,8 @@
 // listFlows locks the logical device model and then retrieves the latest flow information
 func (agent *LogicalDeviceAgent) listFlows() []*ofp.OfpFlowStats {
 	log.Debug("listFlows")
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	agent.lockLogicalDevice.RLock()
+	defer agent.lockLogicalDevice.RUnlock()
 	logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
 	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
 		return lDevice.Flows.Items
@@ -207,8 +207,8 @@
 // listFlowGroups locks the logical device model and then retrieves the latest flow groups information
 func (agent *LogicalDeviceAgent) listFlowGroups() []*ofp.OfpGroupEntry {
 	log.Debug("listFlowGroups")
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+	agent.lockLogicalDevice.RLock()
+	defer agent.lockLogicalDevice.RUnlock()
 	logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
 	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
 		return lDevice.FlowGroups.Items
@@ -240,7 +240,7 @@
 	log.Debug("getLogicalDeviceWithoutLock")
 	logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
 	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
-		log.Debug("getLogicalDeviceWithoutLock", log.Fields{"ldevice": lDevice})
+		//log.Debug("getLogicalDeviceWithoutLock", log.Fields{"ldevice": lDevice})
 		return lDevice, nil
 	}
 	return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
@@ -268,7 +268,6 @@
 func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
 	log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
-	//var portCap *ic.PortCapability
 	var err error
 
 	var device *voltha.Device
@@ -278,7 +277,6 @@
 	}
 
 	//Get UNI port number
-	//var uniPort uint32
 	changesMade := false
 	for _, port := range device.Ports {
 		if port.Type == voltha.Port_ETHERNET_NNI {
@@ -287,7 +285,6 @@
 			} else {
 				changesMade = true
 			}
-			//uniPort = port.PortNo
 		}
 	}
 	if changesMade {
@@ -301,11 +298,9 @@
 func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
 	log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
-	//var portCap *ic.PortCapability
 	var err error
 
 	//Get UNI port number
-	//var uniPort uint32
 	changesMade := false
 	for _, port := range childDevice.Ports {
 		if port.Type == voltha.Port_ETHERNET_UNI {
@@ -314,36 +309,11 @@
 			} else {
 				changesMade = true
 			}
-			//uniPort = port.PortNo
 		}
 	}
 	if changesMade {
 		go agent.setupDeviceGraph()
 	}
-	//if portCap, err = agent.deviceMgr.getPortCapability(ctx, childDevice.Id, uniPort); err != nil {
-	//	log.Errorw("error-creating-logical-port", log.Fields{"error": err})
-	//	return err
-	//}
-	//agent.lockLogicalDevice.Lock()
-	//defer agent.lockLogicalDevice.Unlock()
-	//// Get stored logical device
-	//if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
-	//	return status.Error(codes.NotFound, agent.logicalDeviceId)
-	//} else {
-	//	log.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
-	//	portCap.Port.RootPort = false
-	//	//TODO: For now use the channel id assigned by the OLT as logical port number
-	//	lPortNo := childDevice.ProxyAddress.ChannelId
-	//	portCap.Port.Id = fmt.Sprintf("uni-%d", lPortNo)
-	//	portCap.Port.OfpPort.PortNo = lPortNo
-	//	portCap.Port.OfpPort.Name = portCap.Port.Id
-	//	portCap.Port.DeviceId = childDevice.Id
-	//	portCap.Port.DevicePortNo = uniPort
-	//	portCap.Port.DeviceId = childDevice.Id
-	//
-	//	ldevice.Ports = append(ldevice.Ports, portCap.Port)
-	//	return agent.updateLogicalDeviceWithoutLock(ldevice)
-	//}
 	return err
 }
 
@@ -353,9 +323,6 @@
 	if afterUpdate == nil {
 		return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceId)
 	}
-	//if a, ok := afterUpdate.(*voltha.LogicalDevice); ok {
-	//	log.Debugw("AFTER UPDATE", log.Fields{"logical": a})
-	//}
 	return nil
 }
 
@@ -1242,13 +1209,19 @@
 
 
 func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port)  error {
-	log.Infow("addNNILogicalPort", log.Fields{"NNI": port})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
-	if agent.portExist(device, port) {
-		log.Debugw("port-already-exist", log.Fields{"port": port})
+	log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
+	if device.AdminState != voltha.AdminState_ENABLED {
+		log.Infow("device-not-enabled", log.Fields{"deviceId": device.Id})
 		return nil
 	}
+	agent.lockLogicalDevice.RLock()
+	if agent.portExist(device, port) {
+		log.Debugw("port-already-exist", log.Fields{"port": port})
+		agent.lockLogicalDevice.RUnlock()
+		return nil
+	}
+	agent.lockLogicalDevice.RUnlock()
+
 	var portCap *ic.PortCapability
 	var err error
 	// First get the port capability
@@ -1256,6 +1229,15 @@
 		log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
 		return err
 	}
+
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+	// Double check again if this port has been already added since the getPortCapability could have taken a long time
+	if agent.portExist(device, port) {
+		log.Debugw("port-already-exist", log.Fields{"port": port})
+		return nil
+	}
+
 	portCap.Port.RootPort = true
 	lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
 	lp.DeviceId = device.Id
@@ -1295,12 +1277,17 @@
 
 func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port)  error {
 	log.Debugw("addUNILogicalPort", log.Fields{"port": port})
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
-	if agent.portExist(childDevice, port) {
-		log.Debugw("port-already-exist", log.Fields{"port": port})
+	if childDevice.AdminState != voltha.AdminState_ENABLED {
+		log.Infow("device-not-enabled", log.Fields{"deviceId": childDevice.Id})
 		return nil
 	}
+	agent.lockLogicalDevice.RLock()
+	if agent.portExist(childDevice, port) {
+		log.Debugw("port-already-exist", log.Fields{"port": port})
+		agent.lockLogicalDevice.RUnlock()
+		return nil
+	}
+	agent.lockLogicalDevice.RUnlock()
 	var portCap *ic.PortCapability
 	var err error
 	// First get the port capability
@@ -1308,8 +1295,13 @@
 		log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
 		return err
 	}
-	//agent.lockLogicalDevice.Lock()
-	//defer agent.lockLogicalDevice.Unlock()
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+	// Double check again if this port has been already added since the getPortCapability could have taken a long time
+	if agent.portExist(childDevice, port) {
+		log.Debugw("port-already-exist", log.Fields{"port": port})
+		return nil
+	}
 	// Get stored logical device
 	if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
 		return status.Error(codes.NotFound, agent.logicalDeviceId)
@@ -1318,7 +1310,7 @@
 		portCap.Port.RootPort = false
 		portCap.Port.Id = port.Label
 		portCap.Port.OfpPort.PortNo = port.PortNo
-		portCap.Port.OfpPort.Name = portCap.Port.Id
+		portCap.Port.OfpPort.Name = childDevice.SerialNumber
 		portCap.Port.DeviceId = childDevice.Id
 		portCap.Port.DevicePortNo = port.PortNo
 		cloned := (proto.Clone(ldevice)).(*voltha.LogicalDevice)