VOL-1173 : Removed hash based storage; replaced with per device protobuf
- Ensured proxies issue callbacks instead of forcing with goroutines
- Fixed mutex issue with proxy component
Change-Id: Idabd3257c6d264c0f607ee228e406810304dab43
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 8a69967..ea94788 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -50,12 +50,13 @@
flowDecomposer *fd.FlowDecomposer
}
-func newLogicalDeviceAgent(id string, device *voltha.Device, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager,
+func newLogicalDeviceAgent(id string, deviceId string, ldeviceMgr *LogicalDeviceManager,
+ deviceMgr *DeviceManager,
cdProxy *model.Proxy) *LogicalDeviceAgent {
var agent LogicalDeviceAgent
agent.exitChannel = make(chan int, 1)
agent.logicalDeviceId = id
- agent.rootDeviceId = device.Id
+ agent.rootDeviceId = deviceId
agent.deviceMgr = deviceMgr
agent.clusterDataProxy = cdProxy
agent.ldeviceMgr = ldeviceMgr
@@ -148,8 +149,7 @@
defer agent.lockLogicalDevice.Unlock()
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
- cloned := proto.Clone(lDevice).(*voltha.LogicalDevice)
- return cloned, nil
+ return lDevice, nil
}
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
@@ -162,7 +162,7 @@
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
lPorts := make([]*voltha.LogicalPort, 0)
for _, port := range lDevice.Ports {
- lPorts = append(lPorts, proto.Clone(port).(*voltha.LogicalPort))
+ lPorts = append(lPorts, port)
}
return &voltha.LogicalPorts{Items: lPorts}, nil
}
@@ -195,31 +195,19 @@
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
- cloned := proto.Clone(flows).(*ofp.Flows)
- afterUpdate := agent.flowProxy.Update("/", cloned, false, "")
+ afterUpdate := agent.flowProxy.Update("/", flows, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
}
- // TODO: Remove this code when the model update is fixed
- ld, _ := agent.getLogicalDeviceWithoutLock()
- clonedDevice := proto.Clone(ld).(*voltha.LogicalDevice)
- clonedDevice.Flows = proto.Clone(flows).(*ofp.Flows)
- agent.updateLogicalDeviceWithoutLock(clonedDevice)
return nil
}
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
- cloned := proto.Clone(flowGroups).(*ofp.FlowGroups)
- afterUpdate := agent.groupProxy.Update("/", cloned, false, "")
+ afterUpdate := agent.groupProxy.Update("/", flowGroups, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
}
- // TODO: Remove this code when the model update is fixed
- ld, _ := agent.getLogicalDeviceWithoutLock()
- clonedDevice := proto.Clone(ld).(*voltha.LogicalDevice)
- clonedDevice.FlowGroups = proto.Clone(flowGroups).(*ofp.FlowGroups)
- agent.updateLogicalDeviceWithoutLock(clonedDevice)
return nil
}
@@ -229,8 +217,7 @@
log.Debug("getLogicalDeviceWithoutLock")
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
- cloned := proto.Clone(lDevice).(*voltha.LogicalDevice)
- return cloned, nil
+ return lDevice, nil
}
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
@@ -260,7 +247,6 @@
return status.Error(codes.NotFound, agent.logicalDeviceId)
} else {
log.Infow("!!!!!!!!!!!ADDING-UNI", log.Fields{"deviceId": childDevice.Id})
- cloned := proto.Clone(ldevice).(*voltha.LogicalDevice)
portCap.Port.RootPort = false
//TODO: For now use the channel id assigned by the OLT as logical port number
lPortNo := childDevice.ProxyAddress.ChannelId
@@ -269,17 +255,15 @@
portCap.Port.OfpPort.Name = portCap.Port.Id
portCap.Port.DeviceId = childDevice.Id
portCap.Port.DevicePortNo = uniPort
- lp := proto.Clone(portCap.Port).(*voltha.LogicalPort)
- lp.DeviceId = childDevice.Id
- cloned.Ports = append(cloned.Ports, lp)
- return agent.updateLogicalDeviceWithoutLock(cloned)
+ portCap.Port.DeviceId = childDevice.Id
+ ldevice.Ports = append(ldevice.Ports, portCap.Port)
+ return agent.updateLogicalDeviceWithoutLock(ldevice)
}
}
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(logicalDevice *voltha.LogicalDevice) error {
- cloned := proto.Clone(logicalDevice).(*voltha.LogicalDevice)
- afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, cloned, false, "")
+ afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, logicalDevice, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceId)
}
@@ -397,8 +381,6 @@
return err
}
}
- //// For now, force the callback to occur
- //go agent.flowTableUpdated(oldData, lDevice.Flows)
return nil
}
@@ -952,45 +934,36 @@
func (agent *LogicalDeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
- // Run this callback in it's own go routine since callbacks are not invoked in their own
- // go routine
- go func(args ...interface{}) interface{} {
- //agent.lockLogicalDevice.Lock()
- //defer agent.lockLogicalDevice.Unlock()
+ var previousData *ofp.Flows
+ var latestData *ofp.Flows
- var previousData *ofp.Flows
- var latestData *ofp.Flows
+ var ok bool
+ if previousData, ok = args[0].(*ofp.Flows); !ok {
+ log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+ }
+ if latestData, ok = args[1].(*ofp.Flows); !ok {
+ log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+ }
- var ok bool
- if previousData, ok = args[0].(*ofp.Flows); !ok {
- log.Errorw("invalid-args", log.Fields{"args0": args[0]})
- }
- if latestData, ok = args[1].(*ofp.Flows); !ok {
- log.Errorw("invalid-args", log.Fields{"args1": args[1]})
- }
-
- if reflect.DeepEqual(previousData.Items, latestData.Items) {
- log.Debug("flow-update-not-required")
- return nil
- }
-
- // Ensure the device graph has been setup
- agent.setupDeviceGraph()
-
- var groups *ofp.FlowGroups
- lDevice, _ := agent.getLogicalDeviceWithoutLock()
- groups = lDevice.FlowGroups
- log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
- log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
- for deviceId, value := range deviceRules.GetRules() {
- agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
- agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
- }
-
+ if reflect.DeepEqual(previousData.Items, latestData.Items) {
+ log.Debug("flow-update-not-required")
return nil
- }(args...)
+ }
+
+ // Ensure the device graph has been setup
+ agent.setupDeviceGraph()
+
+ var groups *ofp.FlowGroups
+ lDevice, _ := agent.getLogicalDeviceWithoutLock()
+ groups = lDevice.FlowGroups
+ log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
+ log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+ for deviceId, value := range deviceRules.GetRules() {
+ agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
+ agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+ }
return nil
}
@@ -998,43 +971,35 @@
func (agent *LogicalDeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
- // Run this callback in it's own go routine since callbacks are not invoked in their own
- // go routine
- go func(args ...interface{}) interface{} {
- //agent.lockLogicalDevice.Lock()
- //defer agent.lockLogicalDevice.Unlock()
+ var previousData *ofp.FlowGroups
+ var latestData *ofp.FlowGroups
- var previousData *ofp.FlowGroups
- var latestData *ofp.FlowGroups
+ var ok bool
+ if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
+ log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+ }
+ if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
+ log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+ }
- var ok bool
- if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
- log.Errorw("invalid-args", log.Fields{"args0": args[0]})
- }
- if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
- log.Errorw("invalid-args", log.Fields{"args1": args[1]})
- }
-
- if reflect.DeepEqual(previousData.Items, latestData.Items) {
- log.Debug("flow-update-not-required")
- return nil
- }
-
- // Ensure the device graph has been setup
- agent.setupDeviceGraph()
-
- var flows *ofp.Flows
- lDevice, _ := agent.getLogicalDeviceWithoutLock()
- flows = lDevice.Flows
- log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
- log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- for deviceId, value := range deviceRules.GetRules() {
- agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
- agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
- }
+ if reflect.DeepEqual(previousData.Items, latestData.Items) {
+ log.Debug("flow-update-not-required")
return nil
- }(args...)
+ }
+
+ // Ensure the device graph has been setup
+ agent.setupDeviceGraph()
+
+ var flows *ofp.Flows
+ lDevice, _ := agent.getLogicalDeviceWithoutLock()
+ flows = lDevice.Flows
+ log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
+ log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+ for deviceId, value := range deviceRules.GetRules() {
+ agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
+ agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+ }
return nil
}