[VOL-1514]  Add port notification to the logical device

This update sends port status notifications to the OFAgent.  This
commit also refactored the port creation logic to ensure we can
add a port to a logical device anytime.

Change-Id: Ied78e93d0feef4621b588cfd4e10bbead79b0a5b
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 511ca13..4a0488a 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -46,6 +46,9 @@
 	DefaultFlowRules  *fu.DeviceRules
 	flowProxy         *model.Proxy
 	groupProxy        *model.Proxy
+	ldProxy *model.Proxy
+	portProxies map[string]*model.Proxy
+	portProxiesLock sync.RWMutex
 	lockLogicalDevice sync.RWMutex
 	flowDecomposer    *fd.FlowDecomposer
 }
@@ -62,6 +65,8 @@
 	agent.ldeviceMgr = ldeviceMgr
 	agent.flowDecomposer = fd.NewFlowDecomposer(agent.deviceMgr)
 	agent.lockLogicalDevice = sync.RWMutex{}
+	agent.portProxies = make(map[string]*model.Proxy)
+	agent.portProxiesLock = sync.RWMutex{}
 	return &agent
 }
 
@@ -77,7 +82,6 @@
 			log.Errorw("error-creating-logical-device", log.Fields{"error": err})
 			return err
 		}
-
 		ld = &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
 
 		// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
@@ -86,7 +90,6 @@
 			log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
 			return err
 		}
-
 		ld.DatapathId = datapathID
 		ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
 		log.Debugw("Switch-capability", log.Fields{"Desc": ld.Desc, "fromAd": switchCap.Desc})
@@ -94,32 +97,7 @@
 		ld.Flows = &ofp.Flows{Items: nil}
 		ld.FlowGroups = &ofp.FlowGroups{Items: nil}
 
-		//Add logical ports to the logical device based on the number of NNI ports discovered
-		//First get the default port capability - TODO:  each NNI port may have different capabilities,
-		//hence. may need to extract the port by the NNI port id defined by the adapter during device
-		//creation
-		var nniPorts *voltha.Ports
-		if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
-			log.Errorw("error-creating-logical-port", log.Fields{"error": err})
-		}
-		var portCap *ic.PortCapability
-		for _, port := range nniPorts.Items {
-			log.Infow("!!!!!!!NNI PORTS", log.Fields{"NNI": port})
-			if portCap, err = agent.deviceMgr.getPortCapability(ctx, agent.rootDeviceId, port.PortNo); err != nil {
-				log.Errorw("error-creating-logical-device", log.Fields{"error": err})
-				return err
-			}
-			portCap.Port.RootPort = true
-			lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
-			lp.DeviceId = agent.rootDeviceId
-			lp.Id = fmt.Sprintf("nni-%d", port.PortNo)
-			lp.OfpPort.PortNo = port.PortNo
-			lp.OfpPort.Name = lp.Id
-			lp.DevicePortNo = port.PortNo
-			ld.Ports = append(ld.Ports, lp)
-		}
 		agent.lockLogicalDevice.Lock()
-		//defer agent.lockLogicalDevice.Unlock()
 		// Save the logical device
 		if added := agent.clusterDataProxy.AddWithID("/logical_devices", ld.Id, ld, ""); added == nil {
 			log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -127,6 +105,9 @@
 			log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 		}
 		agent.lockLogicalDevice.Unlock()
+
+		// TODO:  Set the NNI ports in a separate call once the port update issue is fixed.
+		go agent.setupNNILogicalPorts(ctx, agent.rootDeviceId)
 	} else {
 		//	load from dB - the logical may not exist at this time.  On error, just return and the calling function
 		// will destroy this agent.
@@ -139,20 +120,24 @@
 		agent.rootDeviceId = ld.RootDeviceId
 	}
 	agent.lockLogicalDevice.Lock()
+
 	agent.flowProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
 		false)
 	agent.groupProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceId),
 		false)
+	agent.ldProxy = agent.clusterDataProxy.Root.CreateProxy(
+		fmt.Sprintf("/logical_devices/%s", agent.logicalDeviceId),
+		false)
 
 	agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
 	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
 
-	agent.lockLogicalDevice.Unlock()
+	// TODO:  Use a port proxy once the POST_ADD is fixed
+	agent.ldProxy.RegisterCallback(model.POST_UPDATE, agent.portUpdated)
 
-	// Setup the device graph
-	go agent.setupDeviceGraph()
+	agent.lockLogicalDevice.Unlock()
 
 	return nil
 }
@@ -255,49 +240,111 @@
 	log.Debug("getLogicalDeviceWithoutLock")
 	logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
 	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+		log.Debug("getLogicalDeviceWithoutLock", log.Fields{"ldevice": lDevice})
 		return lDevice, nil
 	}
 	return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
 }
 
-// addUNILogicalPort creates a UNI port on the logical device that represents a child device
-func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
-	log.Infow("addUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+
+func (agent *LogicalDeviceAgent)  addLogicalPort (device *voltha.Device, port *voltha.Port) error {
+	if port.Type == voltha.Port_ETHERNET_NNI {
+		if err := agent.addNNILogicalPort(device, port); err != nil {
+			return err
+		}
+	} else if port.Type == voltha.Port_ETHERNET_UNI {
+		if err :=  agent.addUNILogicalPort(device, port); err != nil {
+			return err
+		}
+	} else {
+		log.Debugw("invalid-port-type", log.Fields{"deviceId": device.Id, "port": port})
+		return nil
+	}
+	go agent.setupDeviceGraph()
+	return nil
+}
+
+// setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
+func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
+	log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
-	var portCap *ic.PortCapability
+	//var portCap *ic.PortCapability
+	var err error
+
+	var device *voltha.Device
+	if device, err = agent.deviceMgr.GetDevice(deviceId); err != nil {
+		log.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": device.Id})
+		return err
+	}
+
+	//Get UNI port number
+	//var uniPort uint32
+	changesMade := false
+	for _, port := range device.Ports {
+		if port.Type == voltha.Port_ETHERNET_NNI {
+			if err = agent.addNNILogicalPort(device, port); err != nil {
+				log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
+			} else {
+				changesMade = true
+			}
+			//uniPort = port.PortNo
+		}
+	}
+	if changesMade {
+		go agent.setupDeviceGraph()
+	}
+	return err
+}
+
+
+// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
+func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
+	log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+	// Build the logical device based on information retrieved from the device adapter
+	//var portCap *ic.PortCapability
 	var err error
 
 	//Get UNI port number
-	var uniPort uint32
+	//var uniPort uint32
+	changesMade := false
 	for _, port := range childDevice.Ports {
 		if port.Type == voltha.Port_ETHERNET_UNI {
-			uniPort = port.PortNo
+			if err = agent.addUNILogicalPort(childDevice, port); err != nil {
+				log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
+			} else {
+				changesMade = true
+			}
+			//uniPort = port.PortNo
 		}
 	}
-	if portCap, err = agent.deviceMgr.getPortCapability(ctx, childDevice.Id, uniPort); err != nil {
-		log.Errorw("error-creating-logical-port", log.Fields{"error": err})
-		return err
+	if changesMade {
+		go agent.setupDeviceGraph()
 	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
-	// Get stored logical device
-	if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
-		return status.Error(codes.NotFound, agent.logicalDeviceId)
-	} else {
-		log.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
-		portCap.Port.RootPort = false
-		//TODO: For now use the channel id assigned by the OLT as logical port number
-		lPortNo := childDevice.ProxyAddress.ChannelId
-		portCap.Port.Id = fmt.Sprintf("uni-%d", lPortNo)
-		portCap.Port.OfpPort.PortNo = lPortNo
-		portCap.Port.OfpPort.Name = portCap.Port.Id
-		portCap.Port.DeviceId = childDevice.Id
-		portCap.Port.DevicePortNo = uniPort
-		portCap.Port.DeviceId = childDevice.Id
-
-		ldevice.Ports = append(ldevice.Ports, portCap.Port)
-		return agent.updateLogicalDeviceWithoutLock(ldevice)
-	}
+	//if portCap, err = agent.deviceMgr.getPortCapability(ctx, childDevice.Id, uniPort); err != nil {
+	//	log.Errorw("error-creating-logical-port", log.Fields{"error": err})
+	//	return err
+	//}
+	//agent.lockLogicalDevice.Lock()
+	//defer agent.lockLogicalDevice.Unlock()
+	//// Get stored logical device
+	//if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
+	//	return status.Error(codes.NotFound, agent.logicalDeviceId)
+	//} else {
+	//	log.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
+	//	portCap.Port.RootPort = false
+	//	//TODO: For now use the channel id assigned by the OLT as logical port number
+	//	lPortNo := childDevice.ProxyAddress.ChannelId
+	//	portCap.Port.Id = fmt.Sprintf("uni-%d", lPortNo)
+	//	portCap.Port.OfpPort.PortNo = lPortNo
+	//	portCap.Port.OfpPort.Name = portCap.Port.Id
+	//	portCap.Port.DeviceId = childDevice.Id
+	//	portCap.Port.DevicePortNo = uniPort
+	//	portCap.Port.DeviceId = childDevice.Id
+	//
+	//	ldevice.Ports = append(ldevice.Ports, portCap.Port)
+	//	return agent.updateLogicalDeviceWithoutLock(ldevice)
+	//}
+	return err
 }
 
 //updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
@@ -306,6 +353,9 @@
 	if afterUpdate == nil {
 		return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceId)
 	}
+	//if a, ok := afterUpdate.(*voltha.LogicalDevice); ok {
+	//	log.Debugw("AFTER UPDATE", log.Fields{"logical": a})
+	//}
 	return nil
 }
 
@@ -1047,6 +1097,244 @@
 	return nil
 }
 
+// portAdded is a callback invoked when a port is added to the logical device.
+// TODO: To use when POST_ADD is fixed.
+func (agent *LogicalDeviceAgent) portAdded(args ...interface{}) interface{} {
+	log.Debugw("portAdded-callback", log.Fields{"argsLen": len(args)})
+
+	var port *voltha.LogicalPort
+
+	// Sanity check
+	if args[0] != nil {
+		log.Warnw("previous-data-not-nil", log.Fields{"args0": args[0]})
+	}
+	var ok bool
+	if port, ok = args[1].(*voltha.LogicalPort); !ok {
+		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+		return nil
+	}
+
+	// Set the proxy and callback for that port
+	agent.portProxiesLock.Lock()
+	agent.portProxies[port.Id] = agent.clusterDataProxy.Root.CreateProxy(
+		fmt.Sprintf("/logical_devices/%s/ports/%s", agent.logicalDeviceId, port.Id),
+		false)
+	agent.portProxies[port.Id].RegisterCallback(model.POST_UPDATE, agent.portUpdated)
+	agent.portProxiesLock.Unlock()
+
+	// Send the port change event to the OF controller
+	agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
+		&ofp.OfpPortStatus{Reason:ofp.OfpPortReason_OFPPR_ADD, Desc:port.OfpPort})
+
+	return nil
+}
+
+// portRemoved is a callback invoked when a port is removed from the logical device.
+// TODO: To use when POST_ADD is fixed.
+func (agent *LogicalDeviceAgent) portRemoved(args ...interface{}) interface{} {
+	log.Debugw("portRemoved-callback", log.Fields{"argsLen": len(args)})
+
+	var port *voltha.LogicalPort
+
+	// Sanity check
+	if args[1] != nil {
+		log.Warnw("data-not-nil", log.Fields{"args1": args[1]})
+	}
+	var ok bool
+	if port, ok = args[0].(*voltha.LogicalPort); !ok {
+		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+		return nil
+	}
+
+	// Remove the proxy and callback for that port
+	agent.portProxiesLock.Lock()
+	agent.portProxies[port.Id].UnregisterCallback(model.POST_UPDATE, agent.portUpdated)
+	delete(agent.portProxies, port.Id)
+	agent.portProxiesLock.Unlock()
+
+	// Send the port change event to the OF controller
+	agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
+		&ofp.OfpPortStatus{Reason:ofp.OfpPortReason_OFPPR_DELETE, Desc:port.OfpPort})
+
+	return nil
+}
+
+// diff go over two lists of logical ports and return what's new, what's changed and what's removed.
+func diff(oldList , newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts []*voltha.LogicalPort) {
+	newPorts = make([]*voltha.LogicalPort, 0)
+	changedPorts = make([]*voltha.LogicalPort, 0)
+	deletedPorts = make([]*voltha.LogicalPort, 0)
+	for _, o := range oldList {
+		found := false
+		changed := false
+		for _, n := range newList {
+			if o.Id == n.Id {
+				changed = !reflect.DeepEqual(o, n)
+				found = true
+				break
+			}
+		}
+		if !found {
+			deletedPorts = append(deletedPorts, o)
+		}
+		if changed {
+			changedPorts = append(changedPorts, o)
+		}
+	}
+	for _, n := range newList {
+		found := false
+		for _, o := range oldList {
+			if o.Id == n.Id {
+				found = true
+				break
+			}
+		}
+		if !found {
+			newPorts = append(newPorts, n)
+		}
+	}
+	return
+}
+
+// portUpdated is invoked when a port is updated on the logical device.  Until
+// the POST_ADD notification is fixed, we will use the logical device to
+// update that data.
+func (agent *LogicalDeviceAgent) portUpdated(args ...interface{}) interface{} {
+	log.Debugw("portUpdated-callback", log.Fields{"argsLen": len(args)})
+
+	var oldLD *voltha.LogicalDevice
+	var newlD *voltha.LogicalDevice
+
+	var ok bool
+	if oldLD, ok = args[0].(*voltha.LogicalDevice); !ok {
+		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+		return nil
+	}
+	if newlD, ok = args[1].(*voltha.LogicalDevice); !ok {
+		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+		return nil
+	}
+
+	if reflect.DeepEqual(oldLD.Ports, newlD.Ports) {
+		log.Debug("ports-have-not-changed")
+		return nil
+	}
+
+	// Get the difference between the two list
+	newPorts, changedPorts, deletedPorts := diff(oldLD.Ports, newlD.Ports)
+
+	// Send the port change events to the OF controller
+	for _, new := range newPorts {
+		go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
+			&ofp.OfpPortStatus{Reason:ofp.OfpPortReason_OFPPR_ADD, Desc:new.OfpPort})
+	}
+	for _, change := range changedPorts {
+		go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
+			&ofp.OfpPortStatus{Reason:ofp.OfpPortReason_OFPPR_MODIFY, Desc:change.OfpPort})
+	}
+	for _, del := range deletedPorts {
+		go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
+			&ofp.OfpPortStatus{Reason:ofp.OfpPortReason_OFPPR_DELETE, Desc:del.OfpPort})
+	}
+
+	return nil
+}
+
+
+func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port)  error {
+	log.Infow("addNNILogicalPort", log.Fields{"NNI": port})
+	if agent.portExist(device, port) {
+		log.Debugw("port-already-exist", log.Fields{"port": port})
+		return nil
+	}
+	var portCap *ic.PortCapability
+	var err error
+	// First get the port capability
+	if portCap, err = agent.deviceMgr.getPortCapability(nil, device.Id, port.PortNo); err != nil {
+		log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
+		return err
+	}
+	portCap.Port.RootPort = true
+	lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+	lp.DeviceId = device.Id
+	lp.Id = fmt.Sprintf("nni-%d", port.PortNo)
+	lp.OfpPort.PortNo = port.PortNo
+	lp.OfpPort.Name = lp.Id
+	lp.DevicePortNo = port.PortNo
+
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+	var ld *voltha.LogicalDevice
+	if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+		log.Errorw("error-retrieving-logical-device", log.Fields{"error": err})
+		return err
+	}
+	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+	if cloned.Ports == nil {
+		cloned.Ports = make([]*voltha.LogicalPort, 0)
+	}
+	cloned.Ports = append(cloned.Ports, lp)
+
+	if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+		log.Errorw("error-updating-logical-device", log.Fields{"error": err})
+		return err
+	}
+	return nil
+}
+
+func (agent *LogicalDeviceAgent) portExist (device *voltha.Device, port *voltha.Port) bool {
+	if ldevice, _ := agent.GetLogicalDevice(); ldevice != nil {
+		for _, lPort := range ldevice.Ports {
+			if lPort.DeviceId == device.Id && lPort.DevicePortNo == port.PortNo {
+				if lPort.OfpPort != nil && device.ProxyAddress != nil {
+					return lPort.OfpPort.PortNo == device.ProxyAddress.ChannelId
+				} else if lPort.OfpPort != nil || device.ProxyAddress != nil {
+					return false
+				}
+				return true
+			}
+		}
+	}
+	return false
+}
+
+func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port)  error {
+	log.Debugw("addUNILogicalPort", log.Fields{"port": port})
+	if agent.portExist(childDevice, port) {
+		log.Debugw("port-already-exist", log.Fields{"port": port})
+		return nil
+	}
+	var portCap *ic.PortCapability
+	var err error
+	// First get the port capability
+	if portCap, err = agent.deviceMgr.getPortCapability(nil, childDevice.Id, port.PortNo); err != nil {
+		log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
+		return err
+	}
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+	// Get stored logical device
+	if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
+		return status.Error(codes.NotFound, agent.logicalDeviceId)
+	} else {
+		log.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
+		portCap.Port.RootPort = false
+		//TODO: For now use the channel id assigned by the OLT as logical port number
+		lPortNo := childDevice.ProxyAddress.ChannelId
+		portCap.Port.Id = fmt.Sprintf("uni-%d", lPortNo)
+		portCap.Port.OfpPort.PortNo = lPortNo
+		portCap.Port.OfpPort.Name = portCap.Port.Id
+		portCap.Port.DeviceId = childDevice.Id
+		portCap.Port.DevicePortNo = port.PortNo
+		cloned := (proto.Clone(ldevice)).(*voltha.LogicalDevice)
+		if cloned.Ports == nil {
+			cloned.Ports = make([]*voltha.LogicalPort, 0)
+		}
+		cloned.Ports = append(cloned.Ports, portCap.Port)
+		return agent.updateLogicalDeviceWithoutLock(cloned)
+	}
+}
+
 func (agent *LogicalDeviceAgent) packetOut(packet *ofp.OfpPacketOut) {
 	log.Debugw("packet-out", log.Fields{"packet": packet.GetInPort()})
 	outPort := fd.GetPacketOutPort(packet)