[VOL-1588] Improve Flow Add performance

This update consists of the following:
1) Update the performance when adding a flow to a logical device,
decomposing the flow into parent and child device and sending the
flow to the adapters.
2) Format a number of files as per GO fmt.
3) Ensure the device graph cache gets updated when a new port is
added to the graph that belongs to an existing device in cache.

The flow update/deletion performance will be addressed in a separate
commit.

Change-Id: I2eb663cc73eef9fc6172203ed88a35726f5fe008
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 9496aa1..a2b4494 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -37,25 +37,29 @@
 type LogicalDeviceAgent struct {
 	logicalDeviceId string
 	//lastData          *voltha.LogicalDevice
-	rootDeviceId      string
-	deviceMgr         *DeviceManager
-	ldeviceMgr        *LogicalDeviceManager
-	clusterDataProxy  *model.Proxy
-	exitChannel       chan int
-	deviceGraph       *graph.DeviceGraph
-	DefaultFlowRules  *fu.DeviceRules
-	flowProxy         *model.Proxy
-	groupProxy        *model.Proxy
-	ldProxy           *model.Proxy
-	portProxies       map[string]*model.Proxy
-	portProxiesLock   sync.RWMutex
-	lockLogicalDevice sync.RWMutex
-	flowDecomposer    *fd.FlowDecomposer
+	rootDeviceId        string
+	deviceMgr           *DeviceManager
+	ldeviceMgr          *LogicalDeviceManager
+	clusterDataProxy    *model.Proxy
+	exitChannel         chan int
+	deviceGraph         *graph.DeviceGraph
+	DefaultFlowRules    *fu.DeviceRules
+	flowProxy           *model.Proxy
+	groupProxy          *model.Proxy
+	ldProxy             *model.Proxy
+	portProxies         map[string]*model.Proxy
+	portProxiesLock     sync.RWMutex
+	lockLogicalDevice   sync.RWMutex
+	logicalPortsNo      map[uint32]bool //value is true for NNI port
+	lockLogicalPortsNo  sync.RWMutex
+	flowDecomposer      *fd.FlowDecomposer
+	includeDefaultFlows bool
+	defaultTimeout      int64
 }
 
 func newLogicalDeviceAgent(id string, deviceId string, ldeviceMgr *LogicalDeviceManager,
 	deviceMgr *DeviceManager,
-	cdProxy *model.Proxy) *LogicalDeviceAgent {
+	cdProxy *model.Proxy, timeout int64) *LogicalDeviceAgent {
 	var agent LogicalDeviceAgent
 	agent.exitChannel = make(chan int, 1)
 	agent.logicalDeviceId = id
@@ -67,6 +71,10 @@
 	agent.lockLogicalDevice = sync.RWMutex{}
 	agent.portProxies = make(map[string]*model.Proxy)
 	agent.portProxiesLock = sync.RWMutex{}
+	agent.lockLogicalPortsNo = sync.RWMutex{}
+	agent.logicalPortsNo = make(map[uint32]bool)
+	agent.includeDefaultFlows = true
+	agent.defaultTimeout = timeout
 	return &agent
 }
 
@@ -131,12 +139,11 @@
 		fmt.Sprintf("/logical_devices/%s", agent.logicalDeviceId),
 		false)
 
-	agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
-	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
-
 	// TODO:  Use a port proxy once the POST_ADD is fixed
 	agent.ldProxy.RegisterCallback(model.POST_UPDATE, agent.portUpdated)
 
+	agent.includeDefaultFlows = true
+
 	agent.lockLogicalDevice.Unlock()
 
 	return nil
@@ -148,13 +155,6 @@
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 
-	// Unregister to teh callbacks
-	if agent.flowProxy != nil {
-		agent.flowProxy.UnregisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
-	}
-	if agent.groupProxy != nil {
-		agent.groupProxy.UnregisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
-	}
 	//Remove the logical device from the model
 	if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
 		log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -246,6 +246,29 @@
 	return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
 }
 
+func (agent *LogicalDeviceAgent) updateLogicalPort(device *voltha.Device, port *voltha.Port) error {
+	log.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
+	var err error
+	if port.Type == voltha.Port_ETHERNET_NNI {
+		if _, err = agent.addNNILogicalPort(device, port); err != nil {
+			return err
+		}
+		agent.addLogicalPortToMap(port.PortNo, true)
+	} else if port.Type == voltha.Port_ETHERNET_UNI {
+		if _, err = agent.addUNILogicalPort(device, port); err != nil {
+			return err
+		}
+		agent.addLogicalPortToMap(port.PortNo, false)
+	} else {
+		// Update the device graph to ensure all routes on the logical device have been calculated
+		if err = agent.updateRoutes(device, port); err != nil {
+			log.Errorw("failed-to-update-routes", log.Fields{"deviceId": device.Id, "port": port, "error": err})
+			return err
+		}
+	}
+	return nil
+}
+
 func (agent *LogicalDeviceAgent) addLogicalPort(device *voltha.Device, port *voltha.Port) error {
 	log.Debugw("addLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
 	var err error
@@ -253,10 +276,12 @@
 		if _, err = agent.addNNILogicalPort(device, port); err != nil {
 			return err
 		}
+		agent.addLogicalPortToMap(port.PortNo, true)
 	} else if port.Type == voltha.Port_ETHERNET_UNI {
 		if _, err = agent.addUNILogicalPort(device, port); err != nil {
 			return err
 		}
+		agent.addLogicalPortToMap(port.PortNo, false)
 	} else {
 		log.Debugw("invalid-port-type", log.Fields{"deviceId": device.Id, "port": port})
 		return nil
@@ -266,15 +291,13 @@
 
 // setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
 func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
-	//now := time.Now()
-	//defer fmt.Println("setupNNILogicalPorts:", deviceId, time.Since(now))
 	log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
 	var err error
 
 	var device *voltha.Device
 	if device, err = agent.deviceMgr.GetDevice(deviceId); err != nil {
-		log.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": device.Id})
+		log.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": deviceId})
 		return err
 	}
 
@@ -284,6 +307,7 @@
 			if _, err = agent.addNNILogicalPort(device, port); err != nil {
 				log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
 			}
+			agent.addLogicalPortToMap(port.PortNo, true)
 		}
 	}
 	return err
@@ -291,8 +315,6 @@
 
 // setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
 func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
-	//now := time.Now()
-	//defer fmt.Println("setupUNILogicalPorts:", childDevice.Id, time.Since(now))
 	log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
 	var err error
@@ -303,6 +325,7 @@
 			if _, err = agent.addUNILogicalPort(childDevice, port); err != nil {
 				log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
 			}
+			agent.addLogicalPortToMap(port.PortNo, false)
 		}
 	}
 	return err
@@ -357,17 +380,6 @@
 		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
 }
 
-//updateFlowGroupsWithoutLock updates the flows in the logical device without locking the logical device.  This function
-//must only be called by a function that is holding the lock on the logical device
-func (agent *LogicalDeviceAgent) updateFlowGroupsWithoutLock(groups []*ofp.OfpGroupEntry) error {
-	groupsCloned := make([]*ofp.OfpGroupEntry, len(groups))
-	copy(groupsCloned, groups)
-	if afterUpdate := agent.groupProxy.Update("/", groupsCloned, true, ""); afterUpdate == nil {
-		return errors.New(fmt.Sprintf("update-flow-group-failed:%s", agent.logicalDeviceId))
-	}
-	return nil
-}
-
 //flowAdd adds a flow to the flow table of that logical device
 func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
 	log.Debug("flowAdd")
@@ -389,6 +401,7 @@
 		flows = lDevice.Flows.Items
 	}
 
+	updatedFlows := make([]*ofp.OfpFlowStats, 0)
 	//oldData := proto.Clone(lDevice.Flows).(*voltha.Flows)
 	changed := false
 	checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
@@ -400,6 +413,7 @@
 			//	Add flow
 			flow := fd.FlowStatsEntryFromFlowModMessage(mod)
 			flows = append(flows, flow)
+			updatedFlows = append(updatedFlows, flow)
 			changed = true
 		}
 	} else {
@@ -411,26 +425,65 @@
 				flow.ByteCount = oldFlow.ByteCount
 				flow.PacketCount = oldFlow.PacketCount
 			}
-			flows[idx] = flow
+			if !reflect.DeepEqual(oldFlow, flow) {
+				flows[idx] = flow
+				updatedFlows = append(updatedFlows, flow)
+				changed = true
+			}
 		} else {
 			flows = append(flows, flow)
+			updatedFlows = append(updatedFlows, flow)
+			changed = true
 		}
-		changed = true
 	}
 	if changed {
+		// Launch a routine to decompose the flows
+		if err := agent.decomposeAndSendFlows(&ofp.Flows{Items: updatedFlows}, lDevice.FlowGroups, agent.includeDefaultFlows); err != nil {
+			log.Errorf("decomposing-and-sending-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
+
+		// We no longer need to sent the default flows, unless there is a change in device topology
+		agent.includeDefaultFlows = false
+
 		//	Update model
 		flowsToUpdate := &ofp.Flows{}
 		if lDevice.Flows != nil {
 			flowsToUpdate = &ofp.Flows{Items: flows}
 		}
 		if err := agent.updateLogicalDeviceFlowsWithoutLock(flowsToUpdate); err != nil {
-			log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	}
 	return nil
 }
 
+func (agent *LogicalDeviceAgent) decomposeAndSendFlows(flows *ofp.Flows, groups *ofp.FlowGroups, includeDefaultFlows bool) error {
+	log.Debugw("decomposeAndSendFlows", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
+
+	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *groups, includeDefaultFlows)
+	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+	chnlsList := make([]chan interface{}, 0)
+	for deviceId, value := range deviceRules.GetRules() {
+		ch := make(chan interface{})
+		chnlsList = append(chnlsList, ch)
+		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
+			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups); err != nil {
+				log.Error("flow-update-failed", log.Fields{"deviceID": deviceId})
+				ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
+			}
+			ch <- nil
+		}(deviceId, value.ListFlows(), value.ListGroups())
+	}
+	// Wait for completion
+	if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+		return status.Errorf(codes.Aborted, "errors-%s", res)
+	}
+	return nil
+}
+
 //flowDelete deletes a flow from the flow table of that logical device
 func (agent *LogicalDeviceAgent) flowDelete(mod *ofp.OfpFlowMod) error {
 	log.Debug("flowDelete")
@@ -740,15 +793,6 @@
 	return nil
 }
 
-func isNNIPort(portNo uint32, nniPortsNo []uint32) bool {
-	for _, pNo := range nniPortsNo {
-		if pNo == portNo {
-			return true
-		}
-	}
-	return false
-}
-
 func (agent *LogicalDeviceAgent) getPreCalculatedRoute(ingress, egress uint32) []graph.RouteHop {
 	log.Debugw("ROUTE", log.Fields{"len": len(agent.deviceGraph.Routes)})
 	for routeLink, route := range agent.deviceGraph.Routes {
@@ -763,67 +807,56 @@
 
 func (agent *LogicalDeviceAgent) GetRoute(ingressPortNo uint32, egressPortNo uint32) []graph.RouteHop {
 	log.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
-	// Get the updated logical device
-	var ld *ic.LogicalDevice
 	routes := make([]graph.RouteHop, 0)
-	var err error
-	if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
-		return nil
-	}
-	nniLogicalPortsNo := make([]uint32, 0)
-	for _, logicalPort := range ld.Ports {
-		if logicalPort.RootPort {
-			nniLogicalPortsNo = append(nniLogicalPortsNo, logicalPort.OfpPort.PortNo)
-		}
-	}
-	if len(nniLogicalPortsNo) == 0 {
-		log.Errorw("no-nni-ports", log.Fields{"LogicalDeviceId": ld.Id})
-		return nil
-	}
+
 	// Note: A port value of 0 is equivalent to a nil port
 
 	//	Consider different possibilities
 	if egressPortNo != 0 && ((egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
-		log.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
-		if isNNIPort(ingressPortNo, nniLogicalPortsNo) {
+		log.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
+		if agent.isNNIPort(ingressPortNo) {
 			log.Debug("returning-half-route")
 			//This is a trap on the NNI Port
 			if len(agent.deviceGraph.Routes) == 0 {
 				// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
 				// internal route
-				hop := graph.RouteHop{DeviceID: ld.RootDeviceId, Ingress: ingressPortNo, Egress: egressPortNo}
+				hop := graph.RouteHop{DeviceID: agent.rootDeviceId, Ingress: ingressPortNo, Egress: egressPortNo}
 				routes = append(routes, hop)
 				routes = append(routes, hop)
 				return routes
 			}
 			//Return a 'half' route to make the flow decomposer logic happy
 			for routeLink, route := range agent.deviceGraph.Routes {
-				if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
+				if agent.isNNIPort(routeLink.Egress) {
 					routes = append(routes, graph.RouteHop{}) // first hop is set to empty
 					routes = append(routes, route[1])
 					return routes
 				}
 			}
-			log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+			log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
 			return nil
 		}
 		//treat it as if the output port is the first NNI of the OLT
-		egressPortNo = nniLogicalPortsNo[0]
+		var err error
+		if egressPortNo, err = agent.getFirstNNIPort(); err != nil {
+			log.Warnw("no-nni-port", log.Fields{"error": err})
+			return nil
+		}
 	}
 	//If ingress port is not specified (nil), it may be a wildcarded
 	//route if egress port is OFPP_CONTROLLER or a nni logical port,
 	//in which case we need to create a half-route where only the egress
 	//hop is filled, the first hop is nil
-	if ingressPortNo == 0 && isNNIPort(egressPortNo, nniLogicalPortsNo) {
+	if ingressPortNo == 0 && agent.isNNIPort(egressPortNo) {
 		// We can use the 2nd hop of any upstream route, so just find the first upstream:
 		for routeLink, route := range agent.deviceGraph.Routes {
-			if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
+			if agent.isNNIPort(routeLink.Egress) {
 				routes = append(routes, graph.RouteHop{}) // first hop is set to empty
 				routes = append(routes, route[1])
 				return routes
 			}
 		}
-		log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+		log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
 		return nil
 	}
 	//If egress port is not specified (nil), we can also can return a "half" route
@@ -835,10 +868,9 @@
 				return routes
 			}
 		}
-		log.Warnw("no-downstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+		log.Warnw("no-downstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
 		return nil
 	}
-
 	//	Return the pre-calculated route
 	return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
 }
@@ -935,16 +967,6 @@
 }
 
 func (agent *LogicalDeviceAgent) GetAllDefaultRules() *fu.DeviceRules {
-	// Get latest
-	var err error
-	if _, err = agent.GetLogicalDevice(); err != nil {
-		return fu.NewDeviceRules()
-	}
-	if agent.DefaultFlowRules == nil { // Nothing setup yet
-		// Setup device graph if needed
-		agent.setupDeviceGraph()
-		agent.DefaultFlowRules = agent.generateDefaultRules()
-	}
 	return agent.DefaultFlowRules
 }
 
@@ -968,101 +990,64 @@
 	return agent.deviceGraph
 }
 
-//setupDeviceGraph creates the device graph if not done already
-func (agent *LogicalDeviceAgent) setupDeviceGraph() {
+//updateRoutes redo the device graph if not done already and setup the default rules as well
+func (agent *LogicalDeviceAgent) updateRoutes(device *voltha.Device, port *voltha.Port) error {
+	log.Debugf("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "device": device.Id, "port": port})
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
+	rules := fu.NewDeviceRules()
 	if agent.deviceGraph == nil {
 		agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
-		if ld, err := agent.getLogicalDeviceWithoutLock(); err == nil {
-			agent.deviceGraph.ComputeRoutes(ld.Ports)
+	}
+	// Get all the logical ports on that logical device
+	if lDevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
+		log.Errorf("unknown-logical-device", log.Fields{"error": err, "logicalDeviceId": agent.logicalDeviceId})
+		return err
+	} else {
+		//TODO:  Find a better way to refresh only missing routes
+		agent.deviceGraph.ComputeRoutes(lDevice.Ports)
+	}
+	deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
+	for deviceId := range deviceNodeIds {
+		if deviceId == agent.rootDeviceId {
+			rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
+		} else {
+			rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
 		}
 	}
+	agent.DefaultFlowRules = rules
+
+	// Reset the default flows flag to ensure all default flows are sent to all devices, including the newly added
+	// one when a flow request is received.
+	agent.includeDefaultFlows = true
+	agent.deviceGraph.Print()
+	return nil
 }
 
-//updateDeviceGraph updates the device graph if not done already
+//updateDeviceGraph updates the device graph if not done already and setup the default rules as well
 func (agent *LogicalDeviceAgent) updateDeviceGraph(lp *voltha.LogicalPort) {
+	log.Debugf("updateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+	rules := fu.NewDeviceRules()
 	if agent.deviceGraph == nil {
 		agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
 	}
 	agent.deviceGraph.AddPort(lp)
-}
-
-func (agent *LogicalDeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
-	log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
-	var previousData *ofp.Flows
-	var latestData *ofp.Flows
-
-	var ok bool
-	if previousData, ok = args[0].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-	}
-	if latestData, ok = args[1].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-	}
-
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debug("flow-update-not-required")
-		return nil
-	}
-
-	var groups *ofp.FlowGroups
-	lDevice, _ := agent.getLogicalDeviceWithoutLock()
-	groups = lDevice.FlowGroups
-	log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
-	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
-	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
-	var err error
-	for deviceId, value := range deviceRules.GetRules() {
-		if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
-			log.Error("update-flows-failed", log.Fields{"deviceID": deviceId})
-		}
-		if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
-			log.Error("update-groups-failed", log.Fields{"deviceID": deviceId})
+	deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
+	for deviceId := range deviceNodeIds {
+		if deviceId == agent.rootDeviceId {
+			rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
+		} else {
+			rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
 		}
 	}
+	agent.DefaultFlowRules = rules
 
-	return nil
-}
-
-func (agent *LogicalDeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
-	log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
-	var previousData *ofp.FlowGroups
-	var latestData *ofp.FlowGroups
-
-	var ok bool
-	if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-	}
-	if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-	}
-
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debug("flow-update-not-required")
-		return nil
-	}
-
-	var flows *ofp.Flows
-	lDevice, _ := agent.getLogicalDeviceWithoutLock()
-	flows = lDevice.Flows
-	log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
-	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
-	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-	var err error
-	for deviceId, value := range deviceRules.GetRules() {
-		if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
-			log.Error("update-flows-failed", log.Fields{"deviceID": deviceId})
-		}
-		if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
-			log.Error("update-groups-failed", log.Fields{"deviceID": deviceId})
-		}
-
-	}
-	return nil
+	// Reset the default flows flag to ensure all default flows are sent to all devices, including the newly added
+	// one when a flow request is received.
+	agent.includeDefaultFlows = true
+	agent.deviceGraph.Print()
 }
 
 // portAdded is a callback invoked when a port is added to the logical device.
@@ -1192,9 +1177,9 @@
 	newPorts, changedPorts, deletedPorts := diff(oldLD.Ports, newlD.Ports)
 
 	// Send the port change events to the OF controller
-	for _, new := range newPorts {
+	for _, newP := range newPorts {
 		go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
-			&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: new.OfpPort})
+			&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
 	}
 	for _, change := range changedPorts {
 		go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
@@ -1213,8 +1198,6 @@
 // (true, nil).   If the device is not in the correct state it will return (false, nil) as this is a valid
 // scenario. This also applies to the case where the port was already added.
 func (agent *LogicalDeviceAgent) addNNILogicalPort(device *voltha.Device, port *voltha.Port) (bool, error) {
-	//now := time.Now()
-	//defer fmt.Println("setupNNILogicalPorts:", device.Id, time.Since(now))
 	log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
 	if device.AdminState != voltha.AdminState_ENABLED || device.OperStatus != voltha.OperStatus_ACTIVE {
 		log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
@@ -1291,8 +1274,6 @@
 // (true, nil).   If the device is not in the correct state it will return (false, nil) as this is a valid
 // scenario. This also applies to the case where the port was already added.
 func (agent *LogicalDeviceAgent) addUNILogicalPort(childDevice *voltha.Device, port *voltha.Port) (bool, error) {
-	//now := time.Now()
-	//defer fmt.Println("addUNILogicalPort:", childDevice.Id, time.Since(now))
 	log.Debugw("addUNILogicalPort", log.Fields{"port": port})
 	if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
 		log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
@@ -1337,7 +1318,6 @@
 		if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
 			return false, err
 		}
-
 		// Update the device graph with this new logical port
 		clonedLP := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
 		go agent.updateDeviceGraph(clonedLP)
@@ -1361,3 +1341,39 @@
 	agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, transactionId, packetIn)
 	log.Debugw("sending-packet-in", log.Fields{"packet-in": packetIn})
 }
+
+func (agent *LogicalDeviceAgent) addLogicalPortToMap(portNo uint32, nniPort bool) {
+	agent.lockLogicalPortsNo.Lock()
+	defer agent.lockLogicalPortsNo.Unlock()
+	if exist := agent.logicalPortsNo[portNo]; !exist {
+		agent.logicalPortsNo[portNo] = nniPort
+	}
+}
+
+func (agent *LogicalDeviceAgent) deleteLogicalPortFromMap(portNo uint32) {
+	agent.lockLogicalPortsNo.Lock()
+	defer agent.lockLogicalPortsNo.Unlock()
+	if exist := agent.logicalPortsNo[portNo]; exist {
+		delete(agent.logicalPortsNo, portNo)
+	}
+}
+
+func (agent *LogicalDeviceAgent) isNNIPort(portNo uint32) bool {
+	agent.lockLogicalPortsNo.RLock()
+	defer agent.lockLogicalPortsNo.RUnlock()
+	if exist := agent.logicalPortsNo[portNo]; exist {
+		return agent.logicalPortsNo[portNo]
+	}
+	return false
+}
+
+func (agent *LogicalDeviceAgent) getFirstNNIPort() (uint32, error) {
+	agent.lockLogicalPortsNo.RLock()
+	defer agent.lockLogicalPortsNo.RUnlock()
+	for portNo, nni := range agent.logicalPortsNo {
+		if nni {
+			return portNo, nil
+		}
+	}
+	return 0, status.Error(codes.NotFound, "No NNI port found")
+}