[VOL-1588] Improve Flow Add performance

This update consists of the following:
1) Update the performance when adding a flow to a logical device,
decomposing the flow into parent and child device and sending the
flow to the adapters.
2) Format a number of files as per GO fmt.
3) Ensure the device graph cache gets updated when a new port is
added to the graph that belongs to an existing device in cache.

The flow update/deletion performance will be addressed in a separate
commit.

Change-Id: I2eb663cc73eef9fc6172203ed88a35726f5fe008
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 9704fff..836269e 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -17,7 +17,6 @@
 
 import (
 	"context"
-	"fmt"
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
@@ -34,6 +33,7 @@
 type DeviceAgent struct {
 	deviceId         string
 	deviceType       string
+	isRootdevice     bool
 	lastData         *voltha.Device
 	adapterProxy     *AdapterProxy
 	adapterMgr       *AdapterManager
@@ -41,14 +41,13 @@
 	clusterDataProxy *model.Proxy
 	deviceProxy      *model.Proxy
 	exitChannel      chan int
-	flowProxy        *model.Proxy
-	groupProxy       *model.Proxy
 	lockDevice       sync.RWMutex
+	defaultTimeout   int64
 }
 
 //newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
 //preprovisioning
-func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
+func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout int64) *DeviceAgent {
 	var agent DeviceAgent
 	agent.adapterProxy = ap
 	cloned := (proto.Clone(device)).(*voltha.Device)
@@ -63,6 +62,7 @@
 		// overwritten by the child adapter during a device update request
 		cloned.Vlan = device.ProxyAddress.ChannelId
 	}
+	agent.isRootdevice = device.Root
 	agent.deviceId = cloned.Id
 	agent.deviceType = cloned.Type
 	agent.lastData = cloned
@@ -71,6 +71,7 @@
 	agent.exitChannel = make(chan int, 1)
 	agent.clusterDataProxy = cdProxy
 	agent.lockDevice = sync.RWMutex{}
+	agent.defaultTimeout = timeout
 	return &agent
 }
 
@@ -101,16 +102,6 @@
 	agent.deviceProxy = agent.clusterDataProxy.CreateProxy("/devices/"+agent.deviceId, false)
 	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
 
-	agent.flowProxy = agent.clusterDataProxy.CreateProxy(
-		fmt.Sprintf("/devices/%s/flows", agent.deviceId),
-		false)
-	agent.groupProxy = agent.clusterDataProxy.CreateProxy(
-		fmt.Sprintf("/devices/%s/flow_groups", agent.deviceId),
-		false)
-
-	agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
-	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
-
 	log.Debug("device-agent-started")
 	return nil
 }
@@ -201,38 +192,117 @@
 	return nil
 }
 
-func (agent *DeviceAgent) updateFlows(flows []*ofp.OfpFlowStats) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows})
-	var oldData *voltha.Flows
-	if storedData, err := agent.getDeviceWithoutLock(); err != nil {
-		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
-	} else {
-		oldData = proto.Clone(storedData.Flows).(*voltha.Flows)
-		log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows, "old": oldData})
-
-		// store the changed data
-		afterUpdate := agent.flowProxy.Update("/", &ofp.Flows{Items: flows}, false, "")
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
-		}
-
-		return nil
+func (agent *DeviceAgent) updateDeviceWithoutLockAsync(device *voltha.Device, ch chan interface{}) {
+	if err := agent.updateDeviceWithoutLock(device); err != nil {
+		ch <- status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
 	}
+	ch <- nil
 }
 
-func (agent *DeviceAgent) updateGroups(groups []*ofp.OfpGroupEntry) error {
+func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, ch chan interface{}) {
+	if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups); err != nil {
+		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
+		ch <- err
+	}
+	ch <- nil
+}
+
+func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, ch chan interface{}) {
+	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups); err != nil {
+		log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.lastData.Id, "error": err})
+		ch <- err
+	}
+	ch <- nil
+}
+
+func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry) error {
+	if (len(newFlows) | len(newGroups)) == 0 {
+		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+		return nil
+	}
+
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
-	log.Debugw("updateGroups", log.Fields{"deviceId": agent.deviceId, "groups": groups})
-	if _, err := agent.getDeviceWithoutLock(); err != nil {
+	log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+	var existingFlows *voltha.Flows
+	if device, err := agent.getDeviceWithoutLock(); err != nil {
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
+		existingFlows = proto.Clone(device.Flows).(*voltha.Flows)
+		existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+		log.Debugw("addFlows", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "existingFlows": existingFlows, "groups": newGroups, "existingGroups": existingGroups})
+
+		var updatedFlows []*ofp.OfpFlowStats
+		var flowsToDelete []*ofp.OfpFlowStats
+		var groupsToDelete []*ofp.OfpGroupEntry
+		var updatedGroups []*ofp.OfpGroupEntry
+
+		// Process flows
+		for _, flow := range newFlows {
+			updatedFlows = append(updatedFlows, flow)
+		}
+
+		for _, flow := range existingFlows.Items {
+			if idx := fu.FindFlows(newFlows, flow); idx == -1 {
+				updatedFlows = append(updatedFlows, flow)
+			} else {
+				flowsToDelete = append(flowsToDelete, flow)
+			}
+		}
+
+		// Process groups
+		for _, g := range newGroups {
+			updatedGroups = append(updatedGroups, g)
+		}
+
+		for _, group := range existingGroups.Items {
+			if fu.FindGroup(newGroups, group.Desc.GroupId) == -1 { // does not exist now
+				updatedGroups = append(updatedGroups, group)
+			} else {
+				groupsToDelete = append(groupsToDelete, group)
+			}
+		}
+
+		// Sanity check
+		if (len(updatedFlows) | len(flowsToDelete) | len(updatedGroups) | len(groupsToDelete)) == 0 {
+			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+			return nil
+
+		}
+		// Send update to adapters
+		chAdapters := make(chan interface{})
+		defer close(chAdapters)
+		chdB := make(chan interface{})
+		defer close(chdB)
+		dType := agent.adapterMgr.getDeviceType(device.Type)
+		if !dType.AcceptsAddRemoveFlowUpdates {
+
+			if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
+				log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+				return nil
+			}
+			go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+
+		} else {
+			flowChanges := &ofp.FlowChanges{
+				ToAdd:    &voltha.Flows{Items: newFlows},
+				ToRemove: &voltha.Flows{Items: flowsToDelete},
+			}
+			groupChanges := &ofp.FlowGroupChanges{
+				ToAdd:    &voltha.FlowGroups{Items: newGroups},
+				ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
+				ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			}
+			go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+		}
+
 		// store the changed data
-		afterUpdate := agent.groupProxy.Update("/", &ofp.FlowGroups{Items: groups}, false, "")
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		device.Flows = &voltha.Flows{Items: updatedFlows}
+		device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
+		go agent.updateDeviceWithoutLockAsync(device, chdB)
+
+		if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+			return status.Errorf(codes.Aborted, "errors-%s", res)
 		}
 
 		return nil
@@ -677,7 +747,6 @@
 
 func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
 	agent.lockDevice.Lock()
-	//defer agent.lockDevice.Unlock()
 	// Work only on latest data
 	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
 		agent.lockDevice.Unlock()
@@ -707,7 +776,6 @@
 
 func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
 	agent.lockDevice.Lock()
-	//defer agent.lockDevice.Unlock()
 	// Work only on latest data
 	// TODO: Get list of ports from device directly instead of the entire device
 	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
@@ -766,7 +834,7 @@
 func (agent *DeviceAgent) addPort(port *voltha.Port) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
-	log.Debugw("addPort", log.Fields{"deviceId": agent.deviceId})
+	log.Debugw("addLogicalPortToMap", log.Fields{"deviceId": agent.deviceId})
 	// Work only on latest data
 	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
@@ -775,7 +843,7 @@
 		cloned := proto.Clone(storeDevice).(*voltha.Device)
 		if cloned.Ports == nil {
 			//	First port
-			log.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
+			log.Debugw("addLogicalPortToMap-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
 			cloned.Ports = make([]*voltha.Port, 0)
 		}
 		cp := proto.Clone(port).(*voltha.Port)
@@ -822,160 +890,6 @@
 	}
 }
 
-//flowTableUpdated is the callback after flows have been updated in the model to push them
-//to the adapters
-func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
-	log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-
-	var previousData *voltha.Flows
-	var latestData *voltha.Flows
-
-	var ok bool
-	if previousData, ok = args[0].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-		return nil
-	}
-	if latestData, ok = args[1].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-		return nil
-	}
-
-	// Sanity check - should not happen as this is already handled in logical device agent
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debugw("flow-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
-		return nil
-	}
-
-	var device *voltha.Device
-	var err error
-	if device, err = agent.getDeviceWithoutLock(); err != nil {
-		log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
-		return nil
-	}
-	groups := device.FlowGroups
-
-	// Send update to adapters
-	dType := agent.adapterMgr.getDeviceType(device.Type)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
-			log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
-			return err
-		}
-		return nil
-	}
-	// Incremental flow changes accepted
-	var toAdd []*ofp.OfpFlowStats
-	var toDelete []*ofp.OfpFlowStats
-
-	for _, flow := range latestData.Items {
-		if fu.FindFlowById(previousData.Items, flow) == -1 { // did not exist before
-			toAdd = append(toAdd, flow)
-		}
-	}
-	for _, flow := range previousData.Items {
-		if fu.FindFlowById(latestData.Items, flow) == -1 { // does not exist now
-			toDelete = append(toDelete, flow)
-		}
-	}
-	flowChanges := &ofp.FlowChanges{
-		ToAdd:    &voltha.Flows{Items: toAdd},
-		ToRemove: &voltha.Flows{Items: toDelete},
-	}
-	// Send an empty group changes as it would be dealt with a call to groupTableUpdated
-	groupChanges := &ofp.FlowGroupChanges{}
-
-	// Send changes only
-	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
-		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
-		return err
-	}
-
-	return nil
-}
-
-//groupTableUpdated is the callback after group table has been updated in the model to push them
-//to the adapters
-func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
-	log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-
-	var previousData *voltha.FlowGroups
-	var latestData *voltha.FlowGroups
-
-	var ok bool
-	if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-		return nil
-	}
-	if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-		return nil
-	}
-
-	// Sanity check - should not happen as this is already handled in logical device agent
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debugw("group-table-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
-		return nil
-	}
-
-	var device *voltha.Device
-	var err error
-	if device, err = agent.getDeviceWithoutLock(); err != nil {
-		log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
-		return nil
-	}
-	flows := device.Flows
-
-	// Send update to adapters
-	dType := agent.adapterMgr.getDeviceType(device.Type)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
-			log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
-			return err
-		}
-		return nil
-	}
-
-	// Incremental group changes accepted
-	var toAdd []*ofp.OfpGroupEntry
-	var toDelete []*ofp.OfpGroupEntry
-	var toUpdate []*ofp.OfpGroupEntry
-
-	for _, group := range latestData.Items {
-		if idx := fu.FindGroup(previousData.Items, group.Desc.GroupId); idx == -1 { // did not exist before
-			toAdd = append(toAdd, group)
-		} else { // existed before
-			if previousData.Items[idx].String() != group.String() { // there is a change
-				toUpdate = append(toUpdate, group)
-			}
-		}
-	}
-	for _, group := range previousData.Items {
-		if fu.FindGroup(latestData.Items, group.Desc.GroupId) == -1 { // does not exist now
-			toDelete = append(toDelete, group)
-		}
-	}
-	groupChanges := &ofp.FlowGroupChanges{
-		ToAdd:    &voltha.FlowGroups{Items: toAdd},
-		ToRemove: &voltha.FlowGroups{Items: toDelete},
-		ToUpdate: &voltha.FlowGroups{Items: toUpdate},
-	}
-	// Send an empty flow changes as it should have been dealt with a call to flowTableUpdated
-	flowChanges := &ofp.FlowChanges{}
-
-	// Send changes only
-	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
-		log.Debugw("update-incremental-group-error", log.Fields{"id": agent.lastData.Id, "error": err})
-		return err
-	}
-	return nil
-}
-
 // TODO: A generic device update by attribute
 func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
 	agent.lockDevice.Lock()