[VOL-1037, VOL-1035] This commit consists of flow and groups
handling (from NBI to Adapters, including decomposition),

Change-Id: I4f6d9ecd3dee8a9b161708b20b0a68d030c0cb23
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index 233465b..284bef2 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -20,6 +20,7 @@
 	"bytes"
 	"crypto/md5"
 	"fmt"
+	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	ofp "github.com/opencord/voltha-go/protos/openflow_13"
 	"github.com/opencord/voltha-go/protos/voltha"
@@ -175,7 +176,7 @@
 }
 
 func VlanPcp(vlanPcp uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: VLAN_VID, Value: &ofp.OfpOxmOfbField_VlanPcp{VlanPcp: vlanPcp}}
+	return &ofp.OfpOxmOfbField{Type: VLAN_PCP, Value: &ofp.OfpOxmOfbField_VlanPcp{VlanPcp: vlanPcp}}
 }
 
 func IpDscp(ipDscp uint32) *ofp.OfpOxmOfbField {
@@ -343,6 +344,37 @@
 	return nil
 }
 
+func UpdateOutputPortByActionType(flow *ofp.OfpFlowStats, actionType uint32, toPort uint32) *ofp.OfpFlowStats {
+	if flow == nil {
+		return nil
+	}
+	nFlow := (proto.Clone(flow)).(*ofp.OfpFlowStats)
+	nFlow.Instructions = nil
+	nInsts := make([]*ofp.OfpInstruction, 0)
+	for _, instruction := range flow.Instructions {
+		if instruction.Type == actionType {
+			instActions := instruction.GetActions()
+			if instActions == nil {
+				return nil
+			}
+			nActions := make([]*ofp.OfpAction, 0)
+			for _, action := range instActions.Actions {
+				if action.GetOutput() != nil {
+					nActions = append(nActions, Output(toPort))
+				} else {
+					nActions = append(nActions, action)
+				}
+			}
+			instructionAction := ofp.OfpInstruction_Actions{Actions: &ofp.OfpInstructionActions{Actions: nActions}}
+			nInsts = append(nInsts, &ofp.OfpInstruction{Type: uint32(APPLY_ACTIONS), Data: &instructionAction})
+		} else {
+			nInsts = append(nInsts, instruction)
+		}
+	}
+	nFlow.Instructions = nInsts
+	return nFlow
+}
+
 func excludeOxmOfbField(field *ofp.OfpOxmOfbField, exclude ...ofp.OxmOfbFieldTypes) bool {
 	for _, fieldToExclude := range exclude {
 		if field.Type == fieldToExclude {
@@ -709,6 +741,42 @@
 	return deviceRules
 }
 
+// Handles special case of any controller-bound flow for a parent device
+func (fd *FlowDecomposer) updateOutputPortForControllerBoundFlowForParentDevide(flow *ofp.OfpFlowStats,
+	dr *fu.DeviceRules) *fu.DeviceRules {
+	EAPOL := EthType(0x888e)
+	IGMP := IpProto(2)
+	UDP := IpProto(17)
+
+	newDeviceRules := dr.Copy()
+	//	Check whether we are dealing with a parent device
+	for deviceId, fg := range dr.GetRules() {
+		if root, _ := fd.deviceMgr.IsRootDevice(deviceId); root {
+			newDeviceRules.ClearFlows(deviceId)
+			for i := 0; i < fg.Flows.Len(); i++ {
+				f := fg.GetFlow(i)
+				UpdateOutPortNo := false
+				for _, field := range GetOfbFields(f) {
+					UpdateOutPortNo = (field.String() == EAPOL.String())
+					UpdateOutPortNo = UpdateOutPortNo || (field.String() == IGMP.String())
+					UpdateOutPortNo = UpdateOutPortNo || (field.String() == UDP.String())
+					if UpdateOutPortNo {
+						break
+					}
+				}
+				if UpdateOutPortNo {
+					f = UpdateOutputPortByActionType(f, uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
+						uint32(ofp.OfpPortNo_OFPP_CONTROLLER))
+				}
+				// Update flow Id as a change in the instruction field will result in a new flow ID
+				f.Id = hashFlowStats(f)
+				newDeviceRules.AddFlow(deviceId, (proto.Clone(f)).(*ofp.OfpFlowStats))
+			}
+		}
+	}
+	return newDeviceRules
+}
+
 //processControllerBoundFlow decomposes trap flows
 func (fd *FlowDecomposer) processControllerBoundFlow(agent coreIf.LogicalDeviceAgent, route []graph.RouteHop,
 	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
@@ -904,7 +972,7 @@
 		log.Debugw("creating-metadata-flow", log.Fields{"flow": flow})
 		portNumber := uint32(GetPortNumberFromMetadata(flow))
 		if portNumber != 0 {
-			recalculatedRoute := agent.GetRoute(&inPortNo, &portNumber)
+			recalculatedRoute := agent.GetRoute(inPortNo, portNumber)
 			switch len(recalculatedRoute) {
 			case 0:
 				log.Errorw("no-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": GetMetaData64Bit(flow)})
@@ -1071,7 +1139,7 @@
 			}
 		}
 
-		route2 := agent.GetRoute(&inPortNo, &outPortNo)
+		route2 := agent.GetRoute(inPortNo, outPortNo)
 		switch len(route2) {
 		case 0:
 			log.Errorw("mc-no-route", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "comment": "deleting flow"})
@@ -1144,7 +1212,7 @@
 
 	deviceRules := fu.NewDeviceRules()
 
-	route := agent.GetRoute(&inPortNo, &outPortNo)
+	route := agent.GetRoute(inPortNo, outPortNo)
 	switch len(route) {
 	case 0:
 		log.Errorw("no-route", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "comment": "deleting-flow"})
@@ -1182,5 +1250,6 @@
 			deviceRules = fd.processMulticastFlow(agent, route, inPortNo, outPortNo, flow, grpId, groupMap)
 		}
 	}
+	deviceRules = fd.updateOutputPortForControllerBoundFlowForParentDevide(flow, deviceRules)
 	return deviceRules
 }
diff --git a/rw_core/flow_decomposition/flow_decomposer_test.go b/rw_core/flow_decomposition/flow_decomposer_test.go
index 6c7d7fa..f00a1e7 100644
--- a/rw_core/flow_decomposition/flow_decomposer_test.go
+++ b/rw_core/flow_decomposition/flow_decomposer_test.go
@@ -94,6 +94,12 @@
 	}
 	return nil, errors.New("ABSENT.")
 }
+func (tdm *testDeviceManager) IsRootDevice(deviceId string) (bool, error) {
+	if d, ok := tdm.devices[deviceId]; ok {
+		return d.Root, nil
+	}
+	return false, errors.New("ABSENT.")
+}
 
 type testFlowDecomposer struct {
 	dMgr         *testDeviceManager
@@ -372,8 +378,8 @@
 	return ""
 }
 
-func (tfd *testFlowDecomposer) GetLogicalDevice() *voltha.LogicalDevice {
-	return nil
+func (tfd *testFlowDecomposer) GetLogicalDevice() (*voltha.LogicalDevice, error) {
+	return nil, nil
 }
 
 func (tfd *testFlowDecomposer) GetDeviceGraph() *graph.DeviceGraph {
@@ -398,19 +404,19 @@
 	return lPorts
 }
 
-func (tfd *testFlowDecomposer) GetRoute(ingressPortNo *uint32, egressPortNo *uint32) []graph.RouteHop {
+func (tfd *testFlowDecomposer) GetRoute(ingressPortNo uint32, egressPortNo uint32) []graph.RouteHop {
 	var portLink graph.OFPortLink
-	if egressPortNo == nil {
+	if egressPortNo == 0 {
 		portLink.Egress = 0
-	} else if *egressPortNo&0x7fffffff == uint32(ofp.OfpPortNo_OFPP_CONTROLLER) {
+	} else if egressPortNo&0x7fffffff == uint32(ofp.OfpPortNo_OFPP_CONTROLLER) {
 		portLink.Egress = 10
 	} else {
-		portLink.Egress = *egressPortNo
+		portLink.Egress = egressPortNo
 	}
-	if ingressPortNo == nil {
+	if ingressPortNo == 0 {
 		portLink.Ingress = 0
 	} else {
-		portLink.Ingress = *ingressPortNo
+		portLink.Ingress = ingressPortNo
 	}
 	for key, val := range tfd.routes {
 		if key.Ingress == portLink.Ingress && key.Egress == portLink.Egress {
@@ -472,7 +478,7 @@
 		Actions: []*ofp.OfpAction{
 			PushVlan(0x8100),
 			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
-			Output(2),
+			Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
 	expectedOltFlow := MkFlowStat(fa)
@@ -556,7 +562,7 @@
 		Actions: []*ofp.OfpAction{
 			PushVlan(0x8100),
 			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
-			Output(2),
+			Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
 	expectedOltFlow := MkFlowStat(fa)