[VOL-1037, VOL-1035] This commit consists of flow and groups
handling (from NBI to Adapters, including decomposition),

Change-Id: I4f6d9ecd3dee8a9b161708b20b0a68d030c0cb23
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 52ab584..e045fc9 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -17,16 +17,18 @@
 
 import (
 	"context"
-	"reflect"
-	"sync"
-
+	"fmt"
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/protos/core_adapter"
+	ofp "github.com/opencord/voltha-go/protos/openflow_13"
 	"github.com/opencord/voltha-go/protos/voltha"
+	fu "github.com/opencord/voltha-go/rw_core/utils"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"reflect"
+	"sync"
 )
 
 type DeviceAgent struct {
@@ -37,6 +39,8 @@
 	clusterDataProxy *model.Proxy
 	deviceProxy      *model.Proxy
 	exitChannel      chan int
+	flowProxy        *model.Proxy
+	groupProxy       *model.Proxy
 	lockDevice       sync.RWMutex
 }
 
@@ -48,6 +52,13 @@
 	cloned := (proto.Clone(device)).(*voltha.Device)
 	cloned.Id = CreateDeviceId()
 	cloned.AdminState = voltha.AdminState_PREPROVISIONED
+	cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
+	cloned.Flows = &ofp.Flows{Items: nil}
+	if !device.GetRoot() && device.ProxyAddress != nil {
+		// Set the default vlan ID to the one specified by the parent adapter.  It can be
+		// overwritten by the child adapter during a device update request
+		cloned.Vlan = device.ProxyAddress.ChannelId
+	}
 	agent.deviceId = cloned.Id
 	agent.lastData = cloned
 	agent.deviceMgr = deviceMgr
@@ -68,6 +79,17 @@
 	}
 	agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
 	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
+
+	agent.flowProxy = agent.clusterDataProxy.Root.GetProxy(
+		fmt.Sprintf("/devices/%s/flows", agent.deviceId),
+		false)
+	agent.groupProxy = agent.clusterDataProxy.Root.GetProxy(
+		fmt.Sprintf("/devices/%s/flow_groups", agent.deviceId),
+		false)
+
+	agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
+	//agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
+
 	log.Debug("device-agent-started")
 }
 
@@ -80,7 +102,7 @@
 	log.Debug("device-agent-stopped")
 }
 
-// getDevice retrieves the latest device information from the data model
+// GetDevice retrieves the latest device information from the data model
 func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
@@ -146,6 +168,51 @@
 	return nil
 }
 
+func (agent *DeviceAgent) updateFlows(flows []*ofp.OfpFlowStats) error {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows})
+	var oldData *voltha.Flows
+	if storedData, err := agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		oldData = proto.Clone(storedData.Flows).(*voltha.Flows)
+		log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows, "old": oldData})
+		// store the changed data
+		storedData.Flows.Items = flows
+		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
+		if afterUpdate == nil {
+			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		}
+
+		// For now, force the callback to occur
+		go agent.flowTableUpdated(oldData, &ofp.Flows{Items: flows})
+		return nil
+	}
+}
+
+func (agent *DeviceAgent) updateGroups(groups []*ofp.OfpGroupEntry) error {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	var oldData *voltha.FlowGroups
+	log.Debugw("updateGroups", log.Fields{"deviceId": agent.deviceId, "groups": groups})
+	if storedData, err := agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		oldData = proto.Clone(storedData.FlowGroups).(*voltha.FlowGroups)
+		// store the changed data
+		storedData.FlowGroups.Items = groups
+		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
+		if afterUpdate == nil {
+			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		}
+
+		// For now, force the callback to occur
+		go agent.groupTableUpdated(oldData, &ofp.FlowGroups{Items: groups})
+		return nil
+	}
+}
+
 //disableDevice disable a device
 func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
 	agent.lockDevice.Lock()
@@ -257,7 +324,7 @@
 func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
 	log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
 	ports := &voltha.Ports{}
-	if device, _ := agent.deviceMgr.getDevice(agent.deviceId); device != nil {
+	if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
 		for _, port := range device.Ports {
 			if port.Type == portType {
 				ports.Items = append(ports.Items, port)
@@ -271,7 +338,7 @@
 // parent device
 func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*core_adapter.SwitchCapability, error) {
 	log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceId})
-	if device, err := agent.deviceMgr.getDevice(agent.deviceId); device == nil {
+	if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
 		return nil, err
 	} else {
 		var switchCap *core_adapter.SwitchCapability
@@ -288,7 +355,7 @@
 // device
 func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*core_adapter.PortCapability, error) {
 	log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceId})
-	if device, err := agent.deviceMgr.getDevice(agent.deviceId); device == nil {
+	if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
 		return nil, err
 	} else {
 		var portCap *core_adapter.PortCapability
@@ -500,6 +567,164 @@
 	}
 }
 
+//flowTableUpdated is the callback after flows have been updated in the model to push them
+//to the adapters
+func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
+	log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
+
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+
+	var previousData *voltha.Flows
+	var latestData *voltha.Flows
+
+	var ok bool
+	if previousData, ok = args[0].(*ofp.Flows); !ok {
+		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+		return nil
+	}
+	if latestData, ok = args[1].(*ofp.Flows); !ok {
+		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+		return nil
+	}
+
+	// Sanity check - should not happen as this is already handled in logical device agent
+	if reflect.DeepEqual(previousData.Items, latestData.Items) {
+		log.Debugw("flow-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
+		return nil
+	}
+
+	var device *voltha.Device
+	var err error
+	if device, err = agent.getDeviceWithoutLock(); err != nil {
+		log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
+		return nil
+	}
+	groups := device.FlowGroups
+
+	// Send update to adapters
+	// Check whether the device supports incremental flow changes
+	// Assume false for test
+	acceptsAddRemoveFlowUpdates := false
+	if !acceptsAddRemoveFlowUpdates {
+		if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
+			log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
+			return err
+		}
+		return nil
+	}
+	// Incremental flow changes accepted
+	var toAdd []*ofp.OfpFlowStats
+	var toDelete []*ofp.OfpFlowStats
+
+	for _, flow := range latestData.Items {
+		if fu.FindFlowById(previousData.Items, flow) == -1 { // did not exist before
+			toAdd = append(toAdd, flow)
+		}
+	}
+	for _, flow := range previousData.Items {
+		if fu.FindFlowById(latestData.Items, flow) == -1 { // does not exist now
+			toDelete = append(toDelete, flow)
+		}
+	}
+	flowChanges := &ofp.FlowChanges{
+		ToAdd:    &voltha.Flows{Items: toAdd},
+		ToRemove: &voltha.Flows{Items: toDelete},
+	}
+	// Send an empty group changes as it would be dealt with a call to groupTableUpdated
+	groupChanges := &ofp.FlowGroupChanges{}
+
+	// Send changes only
+	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
+		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
+		return err
+	}
+
+	return nil
+}
+
+//groupTableUpdated is the callback after group table has been updated in the model to push them
+//to the adapters
+func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
+	log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
+
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+
+	var previousData *voltha.FlowGroups
+	var latestData *voltha.FlowGroups
+
+	var ok bool
+	if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
+		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+		return nil
+	}
+	if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
+		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+		return nil
+	}
+
+	// Sanity check - should not happen as this is already handled in logical device agent
+	if reflect.DeepEqual(previousData.Items, latestData.Items) {
+		log.Debugw("group-table-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
+		return nil
+	}
+
+	var device *voltha.Device
+	var err error
+	if device, err = agent.getDeviceWithoutLock(); err != nil {
+		log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
+		return nil
+	}
+	flows := device.Flows
+
+	// Send update to adapters
+	// Check whether the device supports incremental flow changes
+	// Assume false for test
+	acceptsAddRemoveFlowUpdates := false
+	if !acceptsAddRemoveFlowUpdates {
+		if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
+			log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
+			return err
+		}
+		return nil
+	}
+
+	// Incremental group changes accepted
+	var toAdd []*ofp.OfpGroupEntry
+	var toDelete []*ofp.OfpGroupEntry
+	var toUpdate []*ofp.OfpGroupEntry
+
+	for _, group := range latestData.Items {
+		if idx := fu.FindGroup(previousData.Items, group.Desc.GroupId); idx == -1 { // did not exist before
+			toAdd = append(toAdd, group)
+		} else { // existed before
+			if previousData.Items[idx].String() != group.String() { // there is a change
+				toUpdate = append(toUpdate, group)
+			}
+		}
+	}
+	for _, group := range previousData.Items {
+		if fu.FindGroup(latestData.Items, group.Desc.GroupId) == -1 { // does not exist now
+			toDelete = append(toDelete, group)
+		}
+	}
+	groupChanges := &ofp.FlowGroupChanges{
+		ToAdd:    &voltha.FlowGroups{Items: toAdd},
+		ToRemove: &voltha.FlowGroups{Items: toDelete},
+		ToUpdate: &voltha.FlowGroups{Items: toUpdate},
+	}
+	// Send an empty flow changes as it should have been dealt with a call to flowTableUpdated
+	flowChanges := &ofp.FlowChanges{}
+
+	// Send changes only
+	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
+		log.Debugw("update-incremental-group-error", log.Fields{"id": agent.lastData.Id, "error": err})
+		return err
+	}
+	return nil
+}
+
 // TODO: A generic device update by attribute
 func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
 	agent.lockDevice.Lock()