[VOL-1588] Improve Flow Add performance
This update consists of the following:
1) Update the performance when adding a flow to a logical device,
decomposing the flow into parent and child device and sending the
flow to the adapters.
2) Format a number of files as per GO fmt.
3) Ensure the device graph cache gets updated when a new port is
added to the graph that belongs to an existing device in cache.
The flow update/deletion performance will be addressed in a separate
commit.
Change-Id: I2eb663cc73eef9fc6172203ed88a35726f5fe008
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index f9da623..f6540e4 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -33,6 +33,8 @@
type DeviceManager struct {
deviceAgents map[string]*DeviceAgent
+ rootDevices map[string]bool
+ lockRootDeviceMap sync.RWMutex
core *Core
adapterProxy *AdapterProxy
adapterMgr *AdapterManager
@@ -42,6 +44,7 @@
clusterDataProxy *model.Proxy
coreInstanceId string
exitChannel chan int
+ defaultTimeout int64
lockDeviceAgentsMap sync.RWMutex
}
@@ -50,12 +53,15 @@
deviceMgr.core = core
deviceMgr.exitChannel = make(chan int, 1)
deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
+ deviceMgr.rootDevices = make(map[string]bool)
deviceMgr.kafkaICProxy = core.kmp
deviceMgr.adapterProxy = NewAdapterProxy(core.kmp)
deviceMgr.coreInstanceId = core.instanceId
deviceMgr.clusterDataProxy = core.clusterDataProxy
deviceMgr.adapterMgr = core.adapterMgr
deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
+ deviceMgr.lockRootDeviceMap = sync.RWMutex{}
+ deviceMgr.defaultTimeout = core.config.DefaultCoreTimeout
return &deviceMgr
}
@@ -86,16 +92,26 @@
func (dMgr *DeviceManager) addDeviceAgentToMap(agent *DeviceAgent) {
dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
+ //defer dMgr.lockDeviceAgentsMap.Unlock()
if _, exist := dMgr.deviceAgents[agent.deviceId]; !exist {
dMgr.deviceAgents[agent.deviceId] = agent
}
+ dMgr.lockDeviceAgentsMap.Unlock()
+ dMgr.lockRootDeviceMap.Lock()
+ defer dMgr.lockRootDeviceMap.Unlock()
+ dMgr.rootDevices[agent.deviceId] = agent.isRootdevice
+
}
func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
+ //defer dMgr.lockDeviceAgentsMap.Unlock()
delete(dMgr.deviceAgents, agent.deviceId)
+ dMgr.lockDeviceAgentsMap.Unlock()
+ dMgr.lockRootDeviceMap.Lock()
+ defer dMgr.lockRootDeviceMap.Unlock()
+ delete(dMgr.rootDevices, agent.deviceId)
+
}
// getDeviceAgent returns the agent managing the device. If the device is not in memory, it will loads it, if it exists
@@ -123,7 +139,7 @@
dMgr.lockDeviceAgentsMap.RLock()
defer dMgr.lockDeviceAgentsMap.RUnlock()
result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
- for key, _ := range dMgr.deviceAgents {
+ for key := range dMgr.deviceAgents {
result.Items = append(result.Items, &voltha.ID{Id: key})
}
return result
@@ -135,7 +151,7 @@
// Ensure this device is set as root
device.Root = true
// Create and start a device agent for that device
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy)
+ agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
dMgr.addDeviceAgentToMap(agent)
agent.start(ctx, false)
@@ -310,11 +326,12 @@
}
func (dMgr *DeviceManager) IsRootDevice(id string) (bool, error) {
- device, err := dMgr.GetDevice(id)
- if err != nil {
- return false, err
+ dMgr.lockRootDeviceMap.RLock()
+ defer dMgr.lockRootDeviceMap.RUnlock()
+ if exist := dMgr.rootDevices[id]; exist {
+ return dMgr.rootDevices[id], nil
}
- return device.Root, nil
+ return false, nil
}
// ListDevices retrieves the latest devices from the data model
@@ -325,7 +342,7 @@
for _, device := range devices.([]interface{}) {
// If device is not in memory then set it up
if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
- agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+ agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
if err := agent.start(nil, true); err != nil {
log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
agent.stop(nil)
@@ -347,7 +364,7 @@
return nil, status.Error(codes.InvalidArgument, "deviceId empty")
}
if !dMgr.IsDeviceInCache(deviceId) {
- agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: deviceId}, dMgr, dMgr.clusterDataProxy)
+ agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: deviceId}, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
if err := agent.start(nil, true); err != nil {
agent.stop(nil)
return nil, err
@@ -450,7 +467,7 @@
// Device Id not in memory
log.Debugw("reconciling-device", log.Fields{"id": id.Id})
// Load device from dB
- agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: id.Id}, dMgr, dMgr.clusterDataProxy)
+ agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: id.Id}, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
if err := agent.start(nil, true); err != nil {
log.Warnw("failure-loading-device", log.Fields{"deviceId": id.Id})
agent.stop(nil)
@@ -494,14 +511,14 @@
}
}
}
- // Notify the logical device manager to setup a logical port if needed
- if port.Type == voltha.Port_ETHERNET_NNI || port.Type == voltha.Port_ETHERNET_UNI {
- if device, err := dMgr.GetDevice(deviceId); err == nil {
- go dMgr.logicalDeviceMgr.addLogicalPort(device, port)
- } else {
- log.Errorw("failed-to-retrieve-device", log.Fields{"deviceId": deviceId})
- return err
- }
+ // Notify the logical device manager to setup a logical port, if needed. If the added port is an NNI or UNI
+ // then a logical port will be added to the logical device and the device graph generated. If the port is a
+ // PON port then only the device graph will be generated.
+ if device, err := dMgr.GetDevice(deviceId); err == nil {
+ go dMgr.logicalDeviceMgr.updateLogicalPort(device, port)
+ } else {
+ log.Errorw("failed-to-retrieve-device", log.Fields{"deviceId": deviceId})
+ return err
}
return nil
} else {
@@ -509,17 +526,12 @@
}
}
-func (dMgr *DeviceManager) updateFlows(deviceId string, flows []*ofp.OfpFlowStats) error {
- log.Debugw("updateFlows", log.Fields{"deviceid": deviceId})
+func (dMgr *DeviceManager) addFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) error {
+ log.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.updateFlows(flows)
- }
- return status.Errorf(codes.NotFound, "%s", deviceId)
-}
-
-func (dMgr *DeviceManager) updateGroups(deviceId string, groups []*ofp.OfpGroupEntry) error {
- if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.updateGroups(groups)
+ return agent.addFlowsAndGroups(flows, groups)
+ //go agent.addFlowsAndGroups(flows, groups)
+ //return nil
}
return status.Errorf(codes.NotFound, "%s", deviceId)
}
@@ -624,13 +636,10 @@
childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceId, DeviceType: parent.Type, ChannelId: uint32(channelId), OnuId: uint32(onuId)}
// Create and start a device agent for that device
- agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy)
+ agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
dMgr.addDeviceAgentToMap(agent)
agent.start(nil, false)
- //// Set device ownership
- //dMgr.core.deviceOwnership.OwnedByMe(agent.deviceId)
-
// Activate the child device
if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
go agent.enableDevice(nil)
@@ -808,8 +817,8 @@
childDeviceIds = append(childDeviceIds, peer.DeviceId)
}
}
+ log.Debugw("returning-getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id, "childDeviceIds": childDeviceIds})
}
- log.Debugw("returning-getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id, "childDeviceIds": childDeviceIds})
return childDeviceIds, nil
}
@@ -970,7 +979,7 @@
func (dMgr *DeviceManager) notAllowed(pcDevice *voltha.Device) error {
log.Info("notAllowed")
- return errors.New("Transition-not-allowed")
+ return errors.New("transition-not-allowed")
}
func funcName(f interface{}) string {