VOL-2868 Remove all uses of Proxy.RegisterCallback(...)

Change-Id: I05d47a9915071adb80ebc3c5f9b129ed6c36b54b
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 64237d3..2616540 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -49,11 +49,6 @@
 	clusterDataProxy   *model.Proxy
 	exitChannel        chan int
 	deviceRoutes       *route.DeviceRoutes
-	flowProxy          *model.Proxy
-	groupProxy         *model.Proxy
-	meterProxy         *model.Proxy
-	ldProxy            *model.Proxy
-	portProxies        map[string]*model.Proxy
 	lockDeviceRoutes   sync.RWMutex
 	logicalPortsNo     map[uint32]bool //value is true for NNI port
 	lockLogicalPortsNo sync.RWMutex
@@ -76,7 +71,6 @@
 	agent.clusterDataProxy = cdProxy
 	agent.ldeviceMgr = ldeviceMgr
 	agent.flowDecomposer = fd.NewFlowDecomposer(agent.deviceMgr)
-	agent.portProxies = make(map[string]*model.Proxy)
 	agent.logicalPortsNo = make(map[uint32]bool)
 	agent.defaultTimeout = timeout
 	agent.requestQueue = coreutils.NewRequestQueue()
@@ -165,45 +159,6 @@
 		agent.addLogicalPortsToMap(ld.Ports)
 	}
 
-	var err error
-	agent.flowProxy, err = agent.clusterDataProxy.CreateProxy(
-		ctx,
-		fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceID),
-		false)
-	if err != nil {
-		return err
-	}
-	agent.meterProxy, err = agent.clusterDataProxy.CreateProxy(
-		ctx,
-		fmt.Sprintf("/logical_devices/%s/meters", agent.logicalDeviceID),
-		false)
-	if err != nil {
-		logger.Errorw("failed-to-create-meter-proxy", log.Fields{"error": err})
-		return err
-	}
-	agent.groupProxy, err = agent.clusterDataProxy.CreateProxy(
-		ctx,
-		fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceID),
-		false)
-	if err != nil {
-		logger.Errorw("failed-to-create-group-proxy", log.Fields{"error": err})
-		return err
-	}
-	agent.ldProxy, err = agent.clusterDataProxy.CreateProxy(
-		ctx,
-		fmt.Sprintf("/logical_devices/%s", agent.logicalDeviceID),
-		false)
-	if err != nil {
-		logger.Errorw("failed-to-create-logical-device-proxy", log.Fields{"error": err})
-		return err
-	}
-	// TODO:  Use a port proxy once the POST_ADD is fixed
-	if agent.ldProxy != nil {
-		agent.ldProxy.RegisterCallback(model.PostUpdate, agent.portUpdated)
-	} else {
-		return status.Error(codes.Internal, "logical-device-proxy-null")
-	}
-
 	// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
 	if loadFromDB {
 		go func() {
@@ -451,18 +406,19 @@
 	}
 	defer agent.requestQueue.RequestComplete()
 	// Get the latest logical device info
-	cloned := agent.getLogicalDeviceWithoutLock()
-	for idx, lPort := range cloned.Ports {
-		if lPort.DeviceId == deviceID && lPort.DevicePortNo == portNo {
+	original := agent.getLogicalDeviceWithoutLock()
+	updatedPorts := clonePorts(original.Ports)
+	for _, port := range updatedPorts {
+		if port.DeviceId == deviceID && port.DevicePortNo == portNo {
 			if operStatus == voltha.OperStatus_ACTIVE {
-				cloned.Ports[idx].OfpPort.Config = cloned.Ports[idx].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
-				cloned.Ports[idx].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
+				port.OfpPort.Config = port.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+				port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
 			} else {
-				cloned.Ports[idx].OfpPort.Config = cloned.Ports[idx].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
-				cloned.Ports[idx].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+				port.OfpPort.Config = port.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+				port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
 			}
 			// Update the logical device
-			if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+			if err := agent.updateLogicalDevicePortsWithoutLock(ctx, original, updatedPorts); err != nil {
 				logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
 				return err
 			}
@@ -480,21 +436,21 @@
 	}
 	defer agent.requestQueue.RequestComplete()
 	// Get the latest logical device info
-	cloned := agent.getLogicalDeviceWithoutLock()
-	for _, lport := range cloned.Ports {
-		if lport.DeviceId == device.Id {
+	original := agent.getLogicalDeviceWithoutLock()
+	updatedPorts := clonePorts(original.Ports)
+	for _, port := range updatedPorts {
+		if port.DeviceId == device.Id {
 			if state == voltha.OperStatus_ACTIVE {
-				lport.OfpPort.Config = lport.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
-				lport.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
+				port.OfpPort.Config = port.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+				port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
 			} else {
-				lport.OfpPort.Config = lport.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
-				lport.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+				port.OfpPort.Config = port.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+				port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
 			}
-
 		}
 	}
 	// Updating the logical device will trigger the poprt change events to be populated to the controller
-	if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+	if err := agent.updateLogicalDevicePortsWithoutLock(ctx, original, updatedPorts); err != nil {
 		logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
 		return err
 	}
@@ -529,14 +485,9 @@
 	}
 	defer agent.requestQueue.RequestComplete()
 	// Get the latest logical device info
-	ld := agent.getLogicalDeviceWithoutLock()
+	cloned := agent.getLogicalDeviceWithoutLock()
 
-	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
-	var updateLogicalPorts []*voltha.LogicalPort
-	// Update an empty ports slice to remove all the ports
-	cloned.Ports = updateLogicalPorts
-
-	if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+	if err := agent.updateLogicalDevicePortsWithoutLock(ctx, cloned, []*voltha.LogicalPort{}); err != nil {
 		logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
 		return err
 	}
@@ -553,18 +504,16 @@
 	// Get the latest logical device info
 	ld := agent.getLogicalDeviceWithoutLock()
 
-	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
 	updateLogicalPorts := []*voltha.LogicalPort{}
-	for _, lport := range cloned.Ports {
+	for _, lport := range ld.Ports {
 		// Save NNI ports only
 		if agent.isNNIPort(lport.DevicePortNo) {
 			updateLogicalPorts = append(updateLogicalPorts, lport)
 		}
 	}
-	if len(updateLogicalPorts) < len(cloned.Ports) {
-		cloned.Ports = updateLogicalPorts
+	if len(updateLogicalPorts) < len(ld.Ports) {
 		// Updating the logical device will trigger the port change events to be populated to the controller
-		if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+		if err := agent.updateLogicalDevicePortsWithoutLock(ctx, ld, updateLogicalPorts); err != nil {
 			return err
 		}
 	} else {
@@ -573,6 +522,21 @@
 	return nil
 }
 
+func clonePorts(ports []*voltha.LogicalPort) []*voltha.LogicalPort {
+	return proto.Clone(&voltha.LogicalPorts{Items: ports}).(*voltha.LogicalPorts).Items
+}
+
+//updateLogicalDevicePortsWithoutLock updates the
+func (agent *LogicalDeviceAgent) updateLogicalDevicePortsWithoutLock(ctx context.Context, device *voltha.LogicalDevice, newPorts []*voltha.LogicalPort) error {
+	oldPorts := device.Ports
+	device.Ports = newPorts
+	if err := agent.updateLogicalDeviceWithoutLock(ctx, device); err != nil {
+		return err
+	}
+	agent.portUpdated(oldPorts, newPorts)
+	return nil
+}
+
 //updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
 func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(ctx context.Context, logicalDevice *voltha.LogicalDevice) error {
 	updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
@@ -1425,11 +1389,14 @@
 		}
 	}
 	if index >= 0 {
-		copy(logicalDevice.Ports[index:], logicalDevice.Ports[index+1:])
-		logicalDevice.Ports[len(logicalDevice.Ports)-1] = nil
-		logicalDevice.Ports = logicalDevice.Ports[:len(logicalDevice.Ports)-1]
+		clonedPorts := clonePorts(logicalDevice.Ports)
+		if index < len(clonedPorts)-1 {
+			copy(clonedPorts[index:], clonedPorts[index+1:])
+		}
+		clonedPorts[len(clonedPorts)-1] = nil
+		clonedPorts = clonedPorts[:len(clonedPorts)-1]
 		logger.Debugw("logical-port-deleted", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-		if err := agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice); err != nil {
+		if err := agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts); err != nil {
 			logger.Errorw("logical-device-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
@@ -1465,10 +1432,8 @@
 			lPortsNoToDelete = append(lPortsNoToDelete, logicalPort.DevicePortNo)
 		}
 	}
-	logicalDevice.Ports = lPortstoKeep
-
 	logger.Debugw("deleted-logical-ports", log.Fields{"ports": lPortstoKeep})
-	if err := agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice); err != nil {
+	if err := agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, lPortstoKeep); err != nil {
 		logger.Errorw("logical-device-update-failed", log.Fields{"logical-device-id": agent.logicalDeviceID})
 		return err
 	}
@@ -1502,8 +1467,9 @@
 		}
 	}
 	if index >= 0 {
-		logicalDevice.Ports[index].OfpPort.Config = logicalDevice.Ports[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
-		return agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice)
+		clonedPorts := clonePorts(logicalDevice.Ports)
+		clonedPorts[index].OfpPort.Config = clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+		return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
 	}
 	return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
 }
@@ -1525,8 +1491,9 @@
 		}
 	}
 	if index >= 0 {
-		logicalDevice.Ports[index].OfpPort.Config = (logicalDevice.Ports[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
-		return agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice)
+		clonedPorts := clonePorts(logicalDevice.Ports)
+		clonedPorts[index].OfpPort.Config = (clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+		return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
 	}
 	return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
 }
@@ -1716,29 +1683,14 @@
 // portUpdated is invoked when a port is updated on the logical device.  Until
 // the POST_ADD notification is fixed, we will use the logical device to
 // update that data.
-func (agent *LogicalDeviceAgent) portUpdated(ctx context.Context, args ...interface{}) interface{} {
-	logger.Debugw("portUpdated-callback", log.Fields{"argsLen": len(args)})
-
-	var oldLD *voltha.LogicalDevice
-	var newlD *voltha.LogicalDevice
-
-	var ok bool
-	if oldLD, ok = args[0].(*voltha.LogicalDevice); !ok {
-		logger.Errorw("invalid-args", log.Fields{"args0": args[0]})
-		return nil
-	}
-	if newlD, ok = args[1].(*voltha.LogicalDevice); !ok {
-		logger.Errorw("invalid-args", log.Fields{"args1": args[1]})
-		return nil
-	}
-
-	if reflect.DeepEqual(oldLD.Ports, newlD.Ports) {
+func (agent *LogicalDeviceAgent) portUpdated(oldPorts, newPorts []*voltha.LogicalPort) interface{} {
+	if reflect.DeepEqual(oldPorts, newPorts) {
 		logger.Debug("ports-have-not-changed")
 		return nil
 	}
 
 	// Get the difference between the two list
-	newPorts, changedPorts, deletedPorts := diff(oldLD.Ports, newlD.Ports)
+	newPorts, changedPorts, deletedPorts := diff(oldPorts, newPorts)
 
 	// Send the port change events to the OF controller
 	for _, newP := range newPorts {
@@ -1806,13 +1758,13 @@
 
 	ld := agent.getLogicalDeviceWithoutLock()
 
-	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
-	if cloned.Ports == nil {
-		cloned.Ports = make([]*voltha.LogicalPort, 0)
+	clonedPorts := clonePorts(ld.Ports)
+	if clonedPorts == nil {
+		clonedPorts = make([]*voltha.LogicalPort, 0)
 	}
-	cloned.Ports = append(cloned.Ports, lp)
+	clonedPorts = append(clonedPorts, lp)
 
-	if err = agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+	if err = agent.updateLogicalDevicePortsWithoutLock(ctx, ld, clonedPorts); err != nil {
 		logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
 		return false, err
 	}
@@ -1883,12 +1835,12 @@
 	portCap.Port.OfpPort.PortNo = port.PortNo
 	portCap.Port.DeviceId = childDevice.Id
 	portCap.Port.DevicePortNo = port.PortNo
-	cloned := (proto.Clone(ldevice)).(*voltha.LogicalDevice)
-	if cloned.Ports == nil {
-		cloned.Ports = make([]*voltha.LogicalPort, 0)
+	clonedPorts := clonePorts(ldevice.Ports)
+	if clonedPorts == nil {
+		clonedPorts = make([]*voltha.LogicalPort, 0)
 	}
-	cloned.Ports = append(cloned.Ports, portCap.Port)
-	if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+	clonedPorts = append(clonedPorts, portCap.Port)
+	if err := agent.updateLogicalDevicePortsWithoutLock(ctx, ldevice, clonedPorts); err != nil {
 		return false, err
 	}
 	// Update the device graph with this new logical port