[VOL-1547] Add port to logical device when device is active
This commit consists of the following changes:
1) Fix the issue where flows were received when the logical
device flow graph was not ready.
2) Update the default kafka config for improved performance
3) Add a lock to the device ownership logic to ensure the
lock map does not get corrupted.
Change-Id: I840d572e06ed5acf0f3bc1ce423a0ada8f335543
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index 4a692d3..0921561 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -238,39 +238,34 @@
if id == nil {
return "", status.Error(codes.InvalidArgument, "nil-id")
}
+ da.deviceToKeyMapLock.Lock()
+ defer da.deviceToKeyMapLock.Unlock()
var device *voltha.Device
var lDevice *voltha.LogicalDevice
// The id can either be a device Id or a logical device id.
if dId, ok := id.(*utils.DeviceID); ok {
// Use cache if present
- if val, err := da.getDeviceKey(dId.Id); err == nil {
+ if val, exist := da.deviceToKeyMap[dId.Id]; exist {
return val, nil
}
if device, _ = da.deviceMgr.GetDevice(dId.Id); device == nil {
return "", status.Error(codes.NotFound, fmt.Sprintf("id-absent-%s", dId))
}
if device.Root {
- if err := da.updateDeviceKey(dId.Id, device.Id); err != nil {
- log.Warnw("Error-updating-cache", log.Fields{"id": dId.Id, "key": device.Id, "error": err})
- }
- return device.Id, nil
+ da.deviceToKeyMap[dId.Id] = device.Id
} else {
- if err := da.updateDeviceKey(dId.Id, device.ParentId); err != nil {
- log.Warnw("Error-updating-cache", log.Fields{"id": dId.Id, "key": device.ParentId, "error": err})
- }
- return device.ParentId, nil
+ da.deviceToKeyMap[dId.Id] = device.ParentId
}
+ return da.deviceToKeyMap[dId.Id], nil
} else if ldId, ok := id.(*utils.LogicalDeviceID); ok {
// Use cache if present
- if val, err := da.getDeviceKey(ldId.Id); err == nil {
+ if val, exist := da.deviceToKeyMap[ldId.Id]; exist {
return val, nil
}
if lDevice, _ = da.logicalDeviceMgr.getLogicalDevice(ldId.Id); lDevice == nil {
return "", status.Error(codes.NotFound, fmt.Sprintf("id-absent-%s", ldId))
}
- if err := da.updateDeviceKey(ldId.Id, lDevice.RootDeviceId); err != nil {
- log.Warnw("Error-updating-cache", log.Fields{"id": ldId.Id, "key": lDevice.RootDeviceId, "error": err})
- }
+ da.deviceToKeyMap[ldId.Id] = lDevice.RootDeviceId
return lDevice.RootDeviceId, nil
}
return "", status.Error(codes.NotFound, fmt.Sprintf("id-%s", id))
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index b1a68cc..732e9cf 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -22,12 +22,12 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
- ic "github.com/opencord/voltha-protos/go/inter_container"
- ofp "github.com/opencord/voltha-protos/go/openflow_13"
- "github.com/opencord/voltha-protos/go/voltha"
fd "github.com/opencord/voltha-go/rw_core/flow_decomposition"
"github.com/opencord/voltha-go/rw_core/graph"
fu "github.com/opencord/voltha-go/rw_core/utils"
+ ic "github.com/opencord/voltha-protos/go/inter_container"
+ ofp "github.com/opencord/voltha-protos/go/openflow_13"
+ "github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"reflect"
@@ -248,24 +248,31 @@
func (agent *LogicalDeviceAgent) addLogicalPort (device *voltha.Device, port *voltha.Port) error {
+ log.Debugw("addLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
+ var changed bool
+ var err error
if port.Type == voltha.Port_ETHERNET_NNI {
- if err := agent.addNNILogicalPort(device, port); err != nil {
+ if changed, err = agent.addNNILogicalPort(device, port); err != nil {
return err
}
} else if port.Type == voltha.Port_ETHERNET_UNI {
- if err := agent.addUNILogicalPort(device, port); err != nil {
+ if changed, err = agent.addUNILogicalPort(device, port); err != nil {
return err
}
} else {
log.Debugw("invalid-port-type", log.Fields{"deviceId": device.Id, "port": port})
return nil
}
- go agent.setupDeviceGraph()
+ if changed {
+ go agent.setupDeviceGraph()
+ }
return nil
}
// setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
+ //now := time.Now()
+ //defer fmt.Println("setupNNILogicalPorts:", deviceId, time.Since(now))
log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
var err error
@@ -279,11 +286,12 @@
//Get UNI port number
changesMade := false
for _, port := range device.Ports {
+ changed := false
if port.Type == voltha.Port_ETHERNET_NNI {
- if err = agent.addNNILogicalPort(device, port); err != nil {
+ if changed, err = agent.addNNILogicalPort(device, port); err != nil {
log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
} else {
- changesMade = true
+ changesMade = changed || changesMade
}
}
}
@@ -296,6 +304,8 @@
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
+ //now := time.Now()
+ //defer fmt.Println("setupUNILogicalPorts:", childDevice.Id, time.Since(now))
log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
var err error
@@ -303,11 +313,12 @@
//Get UNI port number
changesMade := false
for _, port := range childDevice.Ports {
+ changed := false
if port.Type == voltha.Port_ETHERNET_UNI {
- if err = agent.addUNILogicalPort(childDevice, port); err != nil {
+ if changed, err = agent.addUNILogicalPort(childDevice, port); err != nil {
log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
} else {
- changesMade = true
+ changesMade = changed || changesMade
}
}
}
@@ -797,6 +808,14 @@
if isNNIPort(ingressPortNo, nniLogicalPortsNo) {
log.Debug("returning-half-route")
//This is a trap on the NNI Port
+ if len(agent.deviceGraph.Routes) == 0 {
+ // If there are no routes set (usually when the logical device has only NNI port(s), then just return an
+ // internal route
+ hop := graph.RouteHop{DeviceID:ld.RootDeviceId, Ingress:ingressPortNo, Egress:egressPortNo}
+ routes = append(routes, hop)
+ routes = append(routes, hop)
+ return routes
+ }
//Return a 'half' route to make the flow decomposer logic happy
for routeLink, route := range agent.deviceGraph.Routes {
if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
@@ -1208,17 +1227,23 @@
}
-func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port) error {
+// addNNILogicalPort adds an NNI port to the logical device. It returns a bool representing whether a port has been
+// added and an eror in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port) (bool, error) {
+ //now := time.Now()
+ //defer fmt.Println("setupNNILogicalPorts:", device.Id, time.Since(now))
log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
- if device.AdminState != voltha.AdminState_ENABLED {
- log.Infow("device-not-enabled", log.Fields{"deviceId": device.Id})
- return nil
+ if device.AdminState != voltha.AdminState_ENABLED || device.OperStatus != voltha.OperStatus_ACTIVE {
+ log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
+ return false, nil
}
agent.lockLogicalDevice.RLock()
if agent.portExist(device, port) {
log.Debugw("port-already-exist", log.Fields{"port": port})
agent.lockLogicalDevice.RUnlock()
- return nil
+ return false, nil
}
agent.lockLogicalDevice.RUnlock()
@@ -1227,7 +1252,7 @@
// First get the port capability
if portCap, err = agent.deviceMgr.getPortCapability(nil, device.Id, port.PortNo); err != nil {
log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
- return err
+ return false, err
}
agent.lockLogicalDevice.Lock()
@@ -1235,7 +1260,7 @@
// Double check again if this port has been already added since the getPortCapability could have taken a long time
if agent.portExist(device, port) {
log.Debugw("port-already-exist", log.Fields{"port": port})
- return nil
+ return false, nil
}
portCap.Port.RootPort = true
@@ -1249,7 +1274,7 @@
var ld *voltha.LogicalDevice
if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
log.Errorw("error-retrieving-logical-device", log.Fields{"error": err})
- return err
+ return false, err
}
cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
if cloned.Ports == nil {
@@ -1259,9 +1284,9 @@
if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
log.Errorw("error-updating-logical-device", log.Fields{"error": err})
- return err
+ return false, err
}
- return nil
+ return true, nil
}
func (agent *LogicalDeviceAgent) portExist (device *voltha.Device, port *voltha.Port) bool {
@@ -1275,17 +1300,24 @@
return false
}
-func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port) error {
+
+// addUNILogicalPort adds an UNI port to the logical device. It returns a bool representing whether a port has been
+// added and an eror in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port) (bool, error) {
+ //now := time.Now()
+ //defer fmt.Println("addUNILogicalPort:", childDevice.Id, time.Since(now))
log.Debugw("addUNILogicalPort", log.Fields{"port": port})
- if childDevice.AdminState != voltha.AdminState_ENABLED {
- log.Infow("device-not-enabled", log.Fields{"deviceId": childDevice.Id})
- return nil
+ if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
+ log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
+ return false, nil
}
agent.lockLogicalDevice.RLock()
if agent.portExist(childDevice, port) {
log.Debugw("port-already-exist", log.Fields{"port": port})
agent.lockLogicalDevice.RUnlock()
- return nil
+ return false, nil
}
agent.lockLogicalDevice.RUnlock()
var portCap *ic.PortCapability
@@ -1293,18 +1325,18 @@
// First get the port capability
if portCap, err = agent.deviceMgr.getPortCapability(nil, childDevice.Id, port.PortNo); err != nil {
log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
- return err
+ return false, err
}
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
// Double check again if this port has been already added since the getPortCapability could have taken a long time
if agent.portExist(childDevice, port) {
log.Debugw("port-already-exist", log.Fields{"port": port})
- return nil
+ return false, nil
}
// Get stored logical device
if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
- return status.Error(codes.NotFound, agent.logicalDeviceId)
+ return false, status.Error(codes.NotFound, agent.logicalDeviceId)
} else {
log.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
portCap.Port.RootPort = false
@@ -1318,7 +1350,7 @@
cloned.Ports = make([]*voltha.LogicalPort, 0)
}
cloned.Ports = append(cloned.Ports, portCap.Port)
- return agent.updateLogicalDeviceWithoutLock(cloned)
+ return true, agent.updateLogicalDeviceWithoutLock(cloned)
}
}