[VOL-1512] Set device ownership
This commit consists of the following:
1) Set device ownership per Core in a Core-pair such that only 1
Core actively process a device (i.e. handles all the requests for
that device) while the other Core in the pair passively watch for
updates on that device and will take over in case the owner Core
failed to process the transaction.
2) Cleanup the lock mechanisms to ensure we use a read lock when
needed instead of just a lock.
3) Update logical port additions such that ports are added only when
the device is enabled.
4) Update the port Ids for the logical ports.
5) Update some sarama client configs for performance - this is an
ongoing tune up.
6) Update the adapter request handler in the Core to send back an
ACK immediately to the adapter request instead of processing the
request fully and then sending an ACK. This reduces the latency
over kafka and therefore reduces the likelihood of timeouts.
Change-Id: I9149bf3ba6fbad38e3a29c76ea8dba2f9f731d29
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 03edb57..a8e6a70 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -168,8 +168,8 @@
// GetLogicalDevice locks the logical device model and then retrieves the latest logical device information
func (agent *LogicalDeviceAgent) GetLogicalDevice() (*voltha.LogicalDevice, error) {
log.Debug("GetLogicalDevice")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ agent.lockLogicalDevice.RLock()
+ defer agent.lockLogicalDevice.RUnlock()
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice, nil
@@ -178,9 +178,9 @@
}
func (agent *LogicalDeviceAgent) ListLogicalDevicePorts() (*voltha.LogicalPorts, error) {
- log.Debug("!!!!!ListLogicalDevicePorts")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ log.Debug("ListLogicalDevicePorts")
+ agent.lockLogicalDevice.RLock()
+ defer agent.lockLogicalDevice.RUnlock()
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
lPorts := make([]*voltha.LogicalPort, 0)
@@ -195,8 +195,8 @@
// listFlows locks the logical device model and then retrieves the latest flow information
func (agent *LogicalDeviceAgent) listFlows() []*ofp.OfpFlowStats {
log.Debug("listFlows")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ agent.lockLogicalDevice.RLock()
+ defer agent.lockLogicalDevice.RUnlock()
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice.Flows.Items
@@ -207,8 +207,8 @@
// listFlowGroups locks the logical device model and then retrieves the latest flow groups information
func (agent *LogicalDeviceAgent) listFlowGroups() []*ofp.OfpGroupEntry {
log.Debug("listFlowGroups")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ agent.lockLogicalDevice.RLock()
+ defer agent.lockLogicalDevice.RUnlock()
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice.FlowGroups.Items
@@ -240,7 +240,7 @@
log.Debug("getLogicalDeviceWithoutLock")
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
- log.Debug("getLogicalDeviceWithoutLock", log.Fields{"ldevice": lDevice})
+ //log.Debug("getLogicalDeviceWithoutLock", log.Fields{"ldevice": lDevice})
return lDevice, nil
}
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
@@ -268,7 +268,6 @@
func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
- //var portCap *ic.PortCapability
var err error
var device *voltha.Device
@@ -278,7 +277,6 @@
}
//Get UNI port number
- //var uniPort uint32
changesMade := false
for _, port := range device.Ports {
if port.Type == voltha.Port_ETHERNET_NNI {
@@ -287,7 +285,6 @@
} else {
changesMade = true
}
- //uniPort = port.PortNo
}
}
if changesMade {
@@ -301,11 +298,9 @@
func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
- //var portCap *ic.PortCapability
var err error
//Get UNI port number
- //var uniPort uint32
changesMade := false
for _, port := range childDevice.Ports {
if port.Type == voltha.Port_ETHERNET_UNI {
@@ -314,36 +309,11 @@
} else {
changesMade = true
}
- //uniPort = port.PortNo
}
}
if changesMade {
go agent.setupDeviceGraph()
}
- //if portCap, err = agent.deviceMgr.getPortCapability(ctx, childDevice.Id, uniPort); err != nil {
- // log.Errorw("error-creating-logical-port", log.Fields{"error": err})
- // return err
- //}
- //agent.lockLogicalDevice.Lock()
- //defer agent.lockLogicalDevice.Unlock()
- //// Get stored logical device
- //if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
- // return status.Error(codes.NotFound, agent.logicalDeviceId)
- //} else {
- // log.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
- // portCap.Port.RootPort = false
- // //TODO: For now use the channel id assigned by the OLT as logical port number
- // lPortNo := childDevice.ProxyAddress.ChannelId
- // portCap.Port.Id = fmt.Sprintf("uni-%d", lPortNo)
- // portCap.Port.OfpPort.PortNo = lPortNo
- // portCap.Port.OfpPort.Name = portCap.Port.Id
- // portCap.Port.DeviceId = childDevice.Id
- // portCap.Port.DevicePortNo = uniPort
- // portCap.Port.DeviceId = childDevice.Id
- //
- // ldevice.Ports = append(ldevice.Ports, portCap.Port)
- // return agent.updateLogicalDeviceWithoutLock(ldevice)
- //}
return err
}
@@ -353,9 +323,6 @@
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceId)
}
- //if a, ok := afterUpdate.(*voltha.LogicalDevice); ok {
- // log.Debugw("AFTER UPDATE", log.Fields{"logical": a})
- //}
return nil
}
@@ -1242,13 +1209,19 @@
func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port) error {
- log.Infow("addNNILogicalPort", log.Fields{"NNI": port})
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
- if agent.portExist(device, port) {
- log.Debugw("port-already-exist", log.Fields{"port": port})
+ log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
+ if device.AdminState != voltha.AdminState_ENABLED {
+ log.Infow("device-not-enabled", log.Fields{"deviceId": device.Id})
return nil
}
+ agent.lockLogicalDevice.RLock()
+ if agent.portExist(device, port) {
+ log.Debugw("port-already-exist", log.Fields{"port": port})
+ agent.lockLogicalDevice.RUnlock()
+ return nil
+ }
+ agent.lockLogicalDevice.RUnlock()
+
var portCap *ic.PortCapability
var err error
// First get the port capability
@@ -1256,6 +1229,15 @@
log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
return err
}
+
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ // Double check again if this port has been already added since the getPortCapability could have taken a long time
+ if agent.portExist(device, port) {
+ log.Debugw("port-already-exist", log.Fields{"port": port})
+ return nil
+ }
+
portCap.Port.RootPort = true
lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
lp.DeviceId = device.Id
@@ -1295,12 +1277,17 @@
func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port) error {
log.Debugw("addUNILogicalPort", log.Fields{"port": port})
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
- if agent.portExist(childDevice, port) {
- log.Debugw("port-already-exist", log.Fields{"port": port})
+ if childDevice.AdminState != voltha.AdminState_ENABLED {
+ log.Infow("device-not-enabled", log.Fields{"deviceId": childDevice.Id})
return nil
}
+ agent.lockLogicalDevice.RLock()
+ if agent.portExist(childDevice, port) {
+ log.Debugw("port-already-exist", log.Fields{"port": port})
+ agent.lockLogicalDevice.RUnlock()
+ return nil
+ }
+ agent.lockLogicalDevice.RUnlock()
var portCap *ic.PortCapability
var err error
// First get the port capability
@@ -1308,8 +1295,13 @@
log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
return err
}
- //agent.lockLogicalDevice.Lock()
- //defer agent.lockLogicalDevice.Unlock()
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ // Double check again if this port has been already added since the getPortCapability could have taken a long time
+ if agent.portExist(childDevice, port) {
+ log.Debugw("port-already-exist", log.Fields{"port": port})
+ return nil
+ }
// Get stored logical device
if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
return status.Error(codes.NotFound, agent.logicalDeviceId)
@@ -1318,7 +1310,7 @@
portCap.Port.RootPort = false
portCap.Port.Id = port.Label
portCap.Port.OfpPort.PortNo = port.PortNo
- portCap.Port.OfpPort.Name = portCap.Port.Id
+ portCap.Port.OfpPort.Name = childDevice.SerialNumber
portCap.Port.DeviceId = childDevice.Id
portCap.Port.DevicePortNo = port.PortNo
cloned := (proto.Clone(ldevice)).(*voltha.LogicalDevice)