VOL-2868 Remove all uses of Proxy.RegisterCallback(...)
Change-Id: I05d47a9915071adb80ebc3c5f9b129ed6c36b54b
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 64237d3..2616540 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -49,11 +49,6 @@
clusterDataProxy *model.Proxy
exitChannel chan int
deviceRoutes *route.DeviceRoutes
- flowProxy *model.Proxy
- groupProxy *model.Proxy
- meterProxy *model.Proxy
- ldProxy *model.Proxy
- portProxies map[string]*model.Proxy
lockDeviceRoutes sync.RWMutex
logicalPortsNo map[uint32]bool //value is true for NNI port
lockLogicalPortsNo sync.RWMutex
@@ -76,7 +71,6 @@
agent.clusterDataProxy = cdProxy
agent.ldeviceMgr = ldeviceMgr
agent.flowDecomposer = fd.NewFlowDecomposer(agent.deviceMgr)
- agent.portProxies = make(map[string]*model.Proxy)
agent.logicalPortsNo = make(map[uint32]bool)
agent.defaultTimeout = timeout
agent.requestQueue = coreutils.NewRequestQueue()
@@ -165,45 +159,6 @@
agent.addLogicalPortsToMap(ld.Ports)
}
- var err error
- agent.flowProxy, err = agent.clusterDataProxy.CreateProxy(
- ctx,
- fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceID),
- false)
- if err != nil {
- return err
- }
- agent.meterProxy, err = agent.clusterDataProxy.CreateProxy(
- ctx,
- fmt.Sprintf("/logical_devices/%s/meters", agent.logicalDeviceID),
- false)
- if err != nil {
- logger.Errorw("failed-to-create-meter-proxy", log.Fields{"error": err})
- return err
- }
- agent.groupProxy, err = agent.clusterDataProxy.CreateProxy(
- ctx,
- fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceID),
- false)
- if err != nil {
- logger.Errorw("failed-to-create-group-proxy", log.Fields{"error": err})
- return err
- }
- agent.ldProxy, err = agent.clusterDataProxy.CreateProxy(
- ctx,
- fmt.Sprintf("/logical_devices/%s", agent.logicalDeviceID),
- false)
- if err != nil {
- logger.Errorw("failed-to-create-logical-device-proxy", log.Fields{"error": err})
- return err
- }
- // TODO: Use a port proxy once the POST_ADD is fixed
- if agent.ldProxy != nil {
- agent.ldProxy.RegisterCallback(model.PostUpdate, agent.portUpdated)
- } else {
- return status.Error(codes.Internal, "logical-device-proxy-null")
- }
-
// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
if loadFromDB {
go func() {
@@ -451,18 +406,19 @@
}
defer agent.requestQueue.RequestComplete()
// Get the latest logical device info
- cloned := agent.getLogicalDeviceWithoutLock()
- for idx, lPort := range cloned.Ports {
- if lPort.DeviceId == deviceID && lPort.DevicePortNo == portNo {
+ original := agent.getLogicalDeviceWithoutLock()
+ updatedPorts := clonePorts(original.Ports)
+ for _, port := range updatedPorts {
+ if port.DeviceId == deviceID && port.DevicePortNo == portNo {
if operStatus == voltha.OperStatus_ACTIVE {
- cloned.Ports[idx].OfpPort.Config = cloned.Ports[idx].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- cloned.Ports[idx].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
+ port.OfpPort.Config = port.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
} else {
- cloned.Ports[idx].OfpPort.Config = cloned.Ports[idx].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- cloned.Ports[idx].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+ port.OfpPort.Config = port.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
}
// Update the logical device
- if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, original, updatedPorts); err != nil {
logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
return err
}
@@ -480,21 +436,21 @@
}
defer agent.requestQueue.RequestComplete()
// Get the latest logical device info
- cloned := agent.getLogicalDeviceWithoutLock()
- for _, lport := range cloned.Ports {
- if lport.DeviceId == device.Id {
+ original := agent.getLogicalDeviceWithoutLock()
+ updatedPorts := clonePorts(original.Ports)
+ for _, port := range updatedPorts {
+ if port.DeviceId == device.Id {
if state == voltha.OperStatus_ACTIVE {
- lport.OfpPort.Config = lport.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- lport.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
+ port.OfpPort.Config = port.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
} else {
- lport.OfpPort.Config = lport.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- lport.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+ port.OfpPort.Config = port.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
}
-
}
}
// Updating the logical device will trigger the poprt change events to be populated to the controller
- if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, original, updatedPorts); err != nil {
logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
return err
}
@@ -529,14 +485,9 @@
}
defer agent.requestQueue.RequestComplete()
// Get the latest logical device info
- ld := agent.getLogicalDeviceWithoutLock()
+ cloned := agent.getLogicalDeviceWithoutLock()
- cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
- var updateLogicalPorts []*voltha.LogicalPort
- // Update an empty ports slice to remove all the ports
- cloned.Ports = updateLogicalPorts
-
- if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, cloned, []*voltha.LogicalPort{}); err != nil {
logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
return err
}
@@ -553,18 +504,16 @@
// Get the latest logical device info
ld := agent.getLogicalDeviceWithoutLock()
- cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
updateLogicalPorts := []*voltha.LogicalPort{}
- for _, lport := range cloned.Ports {
+ for _, lport := range ld.Ports {
// Save NNI ports only
if agent.isNNIPort(lport.DevicePortNo) {
updateLogicalPorts = append(updateLogicalPorts, lport)
}
}
- if len(updateLogicalPorts) < len(cloned.Ports) {
- cloned.Ports = updateLogicalPorts
+ if len(updateLogicalPorts) < len(ld.Ports) {
// Updating the logical device will trigger the port change events to be populated to the controller
- if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, ld, updateLogicalPorts); err != nil {
return err
}
} else {
@@ -573,6 +522,21 @@
return nil
}
+func clonePorts(ports []*voltha.LogicalPort) []*voltha.LogicalPort {
+ return proto.Clone(&voltha.LogicalPorts{Items: ports}).(*voltha.LogicalPorts).Items
+}
+
+//updateLogicalDevicePortsWithoutLock updates the
+func (agent *LogicalDeviceAgent) updateLogicalDevicePortsWithoutLock(ctx context.Context, device *voltha.LogicalDevice, newPorts []*voltha.LogicalPort) error {
+ oldPorts := device.Ports
+ device.Ports = newPorts
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, device); err != nil {
+ return err
+ }
+ agent.portUpdated(oldPorts, newPorts)
+ return nil
+}
+
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(ctx context.Context, logicalDevice *voltha.LogicalDevice) error {
updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
@@ -1425,11 +1389,14 @@
}
}
if index >= 0 {
- copy(logicalDevice.Ports[index:], logicalDevice.Ports[index+1:])
- logicalDevice.Ports[len(logicalDevice.Ports)-1] = nil
- logicalDevice.Ports = logicalDevice.Ports[:len(logicalDevice.Ports)-1]
+ clonedPorts := clonePorts(logicalDevice.Ports)
+ if index < len(clonedPorts)-1 {
+ copy(clonedPorts[index:], clonedPorts[index+1:])
+ }
+ clonedPorts[len(clonedPorts)-1] = nil
+ clonedPorts = clonedPorts[:len(clonedPorts)-1]
logger.Debugw("logical-port-deleted", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice); err != nil {
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts); err != nil {
logger.Errorw("logical-device-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
@@ -1465,10 +1432,8 @@
lPortsNoToDelete = append(lPortsNoToDelete, logicalPort.DevicePortNo)
}
}
- logicalDevice.Ports = lPortstoKeep
-
logger.Debugw("deleted-logical-ports", log.Fields{"ports": lPortstoKeep})
- if err := agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice); err != nil {
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, lPortstoKeep); err != nil {
logger.Errorw("logical-device-update-failed", log.Fields{"logical-device-id": agent.logicalDeviceID})
return err
}
@@ -1502,8 +1467,9 @@
}
}
if index >= 0 {
- logicalDevice.Ports[index].OfpPort.Config = logicalDevice.Ports[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- return agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice)
+ clonedPorts := clonePorts(logicalDevice.Ports)
+ clonedPorts[index].OfpPort.Config = clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
}
return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
}
@@ -1525,8 +1491,9 @@
}
}
if index >= 0 {
- logicalDevice.Ports[index].OfpPort.Config = (logicalDevice.Ports[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- return agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice)
+ clonedPorts := clonePorts(logicalDevice.Ports)
+ clonedPorts[index].OfpPort.Config = (clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
}
return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
}
@@ -1716,29 +1683,14 @@
// portUpdated is invoked when a port is updated on the logical device. Until
// the POST_ADD notification is fixed, we will use the logical device to
// update that data.
-func (agent *LogicalDeviceAgent) portUpdated(ctx context.Context, args ...interface{}) interface{} {
- logger.Debugw("portUpdated-callback", log.Fields{"argsLen": len(args)})
-
- var oldLD *voltha.LogicalDevice
- var newlD *voltha.LogicalDevice
-
- var ok bool
- if oldLD, ok = args[0].(*voltha.LogicalDevice); !ok {
- logger.Errorw("invalid-args", log.Fields{"args0": args[0]})
- return nil
- }
- if newlD, ok = args[1].(*voltha.LogicalDevice); !ok {
- logger.Errorw("invalid-args", log.Fields{"args1": args[1]})
- return nil
- }
-
- if reflect.DeepEqual(oldLD.Ports, newlD.Ports) {
+func (agent *LogicalDeviceAgent) portUpdated(oldPorts, newPorts []*voltha.LogicalPort) interface{} {
+ if reflect.DeepEqual(oldPorts, newPorts) {
logger.Debug("ports-have-not-changed")
return nil
}
// Get the difference between the two list
- newPorts, changedPorts, deletedPorts := diff(oldLD.Ports, newlD.Ports)
+ newPorts, changedPorts, deletedPorts := diff(oldPorts, newPorts)
// Send the port change events to the OF controller
for _, newP := range newPorts {
@@ -1806,13 +1758,13 @@
ld := agent.getLogicalDeviceWithoutLock()
- cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
- if cloned.Ports == nil {
- cloned.Ports = make([]*voltha.LogicalPort, 0)
+ clonedPorts := clonePorts(ld.Ports)
+ if clonedPorts == nil {
+ clonedPorts = make([]*voltha.LogicalPort, 0)
}
- cloned.Ports = append(cloned.Ports, lp)
+ clonedPorts = append(clonedPorts, lp)
- if err = agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+ if err = agent.updateLogicalDevicePortsWithoutLock(ctx, ld, clonedPorts); err != nil {
logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
return false, err
}
@@ -1883,12 +1835,12 @@
portCap.Port.OfpPort.PortNo = port.PortNo
portCap.Port.DeviceId = childDevice.Id
portCap.Port.DevicePortNo = port.PortNo
- cloned := (proto.Clone(ldevice)).(*voltha.LogicalDevice)
- if cloned.Ports == nil {
- cloned.Ports = make([]*voltha.LogicalPort, 0)
+ clonedPorts := clonePorts(ldevice.Ports)
+ if clonedPorts == nil {
+ clonedPorts = make([]*voltha.LogicalPort, 0)
}
- cloned.Ports = append(cloned.Ports, portCap.Port)
- if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+ clonedPorts = append(clonedPorts, portCap.Port)
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, ldevice, clonedPorts); err != nil {
return false, err
}
// Update the device graph with this new logical port