[VOL-1644]  Add test cases to the Core Utils package

As part of this update some utilities which were inside the flow
decomposer file has been moved to the utils package.  Otherwise,
the code would have to be duplicated to work around package
circular dependencies.

Change-Id: I77dc6cdb23d832323e58ff6d9351db809fff30ba
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index d45fca6..e7089cd 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -290,10 +290,13 @@
 
 		}
 		// Send update to adapters
+
+		// Create two channels to receive responses from the dB and from the adapters.
+		// Do not close these channels as this function may exit on timeout before the dB or adapters get a chance
+		// to send their responses.  These channels will be garbage collected once all the responses are
+		// received
 		chAdapters := make(chan interface{})
-		defer close(chAdapters)
 		chdB := make(chan interface{})
-		defer close(chdB)
 		dType := agent.adapterMgr.getDeviceType(device.Type)
 		if !dType.AcceptsAddRemoveFlowUpdates {
 
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 1555ca5..057cc6b 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -476,13 +476,13 @@
 			log.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 		} else {
 			//	Add flow
-			flow := fd.FlowStatsEntryFromFlowModMessage(mod)
+			flow := fu.FlowStatsEntryFromFlowModMessage(mod)
 			flows = append(flows, flow)
 			updatedFlows = append(updatedFlows, flow)
 			changed = true
 		}
 	} else {
-		flow := fd.FlowStatsEntryFromFlowModMessage(mod)
+		flow := fu.FlowStatsEntryFromFlowModMessage(mod)
 		idx := fu.FindFlows(flows, flow)
 		if idx >= 0 {
 			oldFlow := flows[idx]
@@ -638,7 +638,7 @@
 	}
 	flows := lDevice.Flows.Items
 	changed := false
-	flow := fd.FlowStatsEntryFromFlowModMessage(mod)
+	flow := fu.FlowStatsEntryFromFlowModMessage(mod)
 	idx := fu.FindFlows(flows, flow)
 	if idx >= 0 {
 		flows = append(flows[:idx], flows[idx+1:]...)
@@ -683,7 +683,7 @@
 	}
 	groups := lDevice.FlowGroups.Items
 	if fu.FindGroup(groups, groupMod.GroupId) == -1 {
-		groups = append(groups, fd.GroupEntryFromGroupMod(groupMod))
+		groups = append(groups, fu.GroupEntryFromGroupMod(groupMod))
 		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
@@ -764,7 +764,7 @@
 		return errors.New(fmt.Sprintf("group-absent:%d", groupId))
 	} else {
 		//replace existing group entry with new group definition
-		groupEntry := fd.GroupEntryFromGroupMod(groupMod)
+		groupEntry := fu.GroupEntryFromGroupMod(groupMod)
 		groups[idx] = groupEntry
 		groupsChanged = true
 	}
@@ -1005,42 +1005,42 @@
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fd.InPort(downstreamPorts[0].PortNo),
-			fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.InPort(downstreamPorts[0].PortNo),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 		},
 		Actions: []*ofp.OfpAction{
-			fd.SetField(fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan)),
-			fd.Output(upstreamPorts[0].PortNo),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan)),
+			fu.Output(upstreamPorts[0].PortNo),
 		},
 	}
-	fg.AddFlow(fd.MkFlowStat(fa))
+	fg.AddFlow(fu.MkFlowStat(fa))
 
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fd.InPort(downstreamPorts[0].PortNo),
-			fd.VlanVid(0),
+			fu.InPort(downstreamPorts[0].PortNo),
+			fu.VlanVid(0),
 		},
 		Actions: []*ofp.OfpAction{
-			fd.PushVlan(0x8100),
-			fd.SetField(fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan)),
-			fd.Output(upstreamPorts[0].PortNo),
+			fu.PushVlan(0x8100),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan)),
+			fu.Output(upstreamPorts[0].PortNo),
 		},
 	}
-	fg.AddFlow(fd.MkFlowStat(fa))
+	fg.AddFlow(fu.MkFlowStat(fa))
 
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fd.InPort(upstreamPorts[0].PortNo),
-			fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan),
+			fu.InPort(upstreamPorts[0].PortNo),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan),
 		},
 		Actions: []*ofp.OfpAction{
-			fd.SetField(fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
-			fd.Output(downstreamPorts[0].PortNo),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+			fu.Output(downstreamPorts[0].PortNo),
 		},
 	}
-	fg.AddFlow(fd.MkFlowStat(fa))
+	fg.AddFlow(fu.MkFlowStat(fa))
 
 	return fg
 }
@@ -1435,7 +1435,7 @@
 
 func (agent *LogicalDeviceAgent) packetOut(packet *ofp.OfpPacketOut) {
 	log.Debugw("packet-out", log.Fields{"packet": packet.GetInPort()})
-	outPort := fd.GetPacketOutPort(packet)
+	outPort := fu.GetPacketOutPort(packet)
 	//frame := packet.GetData()
 	//TODO: Use a channel between the logical agent and the device agent
 	if err := agent.deviceMgr.packetOut(agent.rootDeviceId, outPort, packet); err != nil {
@@ -1445,7 +1445,7 @@
 
 func (agent *LogicalDeviceAgent) packetIn(port uint32, transactionId string, packet []byte) {
 	log.Debugw("packet-in", log.Fields{"port": port, "packet": packet, "transactionId": transactionId})
-	packetIn := fd.MkPacketIn(port, packet)
+	packetIn := fu.MkPacketIn(port, packet)
 	agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, transactionId, packetIn)
 	log.Debugw("sending-packet-in", log.Fields{"packet-in": packetIn})
 }
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index 41fdc4a..06d1b8b 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -17,9 +17,6 @@
 package flow_decomposition
 
 import (
-	"bytes"
-	"crypto/md5"
-	"fmt"
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/rw_core/coreIf"
@@ -27,743 +24,12 @@
 	fu "github.com/opencord/voltha-go/rw_core/utils"
 	ofp "github.com/opencord/voltha-protos/go/openflow_13"
 	"github.com/opencord/voltha-protos/go/voltha"
-	"math/big"
 )
 
 func init() {
 	log.AddPackage(log.JSON, log.DebugLevel, nil)
 }
 
-var (
-	// Instructions shortcut
-	APPLY_ACTIONS = ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS
-
-	//OFPAT_* shortcuts
-	OUTPUT       = ofp.OfpActionType_OFPAT_OUTPUT
-	COPY_TTL_OUT = ofp.OfpActionType_OFPAT_COPY_TTL_OUT
-	COPY_TTL_IN  = ofp.OfpActionType_OFPAT_COPY_TTL_IN
-	SET_MPLS_TTL = ofp.OfpActionType_OFPAT_SET_MPLS_TTL
-	DEC_MPLS_TTL = ofp.OfpActionType_OFPAT_DEC_MPLS_TTL
-	PUSH_VLAN    = ofp.OfpActionType_OFPAT_PUSH_VLAN
-	POP_VLAN     = ofp.OfpActionType_OFPAT_POP_VLAN
-	PUSH_MPLS    = ofp.OfpActionType_OFPAT_PUSH_MPLS
-	POP_MPLS     = ofp.OfpActionType_OFPAT_POP_MPLS
-	SET_QUEUE    = ofp.OfpActionType_OFPAT_SET_QUEUE
-	GROUP        = ofp.OfpActionType_OFPAT_GROUP
-	SET_NW_TTL   = ofp.OfpActionType_OFPAT_SET_NW_TTL
-	NW_TTL       = ofp.OfpActionType_OFPAT_DEC_NW_TTL
-	SET_FIELD    = ofp.OfpActionType_OFPAT_SET_FIELD
-	PUSH_PBB     = ofp.OfpActionType_OFPAT_PUSH_PBB
-	POP_PBB      = ofp.OfpActionType_OFPAT_POP_PBB
-	EXPERIMENTER = ofp.OfpActionType_OFPAT_EXPERIMENTER
-
-	//OFPXMT_OFB_* shortcuts (incomplete)
-	IN_PORT         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT
-	IN_PHY_PORT     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT
-	METADATA        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_METADATA
-	ETH_DST         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ETH_DST
-	ETH_SRC         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ETH_SRC
-	ETH_TYPE        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE
-	VLAN_VID        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID
-	VLAN_PCP        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_PCP
-	IP_DSCP         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IP_DSCP
-	IP_ECN          = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IP_ECN
-	IP_PROTO        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO
-	IPV4_SRC        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV4_SRC
-	IPV4_DST        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV4_DST
-	TCP_SRC         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_TCP_SRC
-	TCP_DST         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_TCP_DST
-	UDP_SRC         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC
-	UDP_DST         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST
-	SCTP_SRC        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_SCTP_SRC
-	SCTP_DST        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_SCTP_DST
-	ICMPV4_TYPE     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ICMPV4_TYPE
-	ICMPV4_CODE     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ICMPV4_CODE
-	ARP_OP          = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_OP
-	ARP_SPA         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_SPA
-	ARP_TPA         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_TPA
-	ARP_SHA         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_SHA
-	ARP_THA         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_THA
-	IPV6_SRC        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_SRC
-	IPV6_DST        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_DST
-	IPV6_FLABEL     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_FLABEL
-	ICMPV6_TYPE     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ICMPV6_TYPE
-	ICMPV6_CODE     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ICMPV6_CODE
-	IPV6_ND_TARGET  = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_ND_TARGET
-	OFB_IPV6_ND_SLL = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_ND_SLL
-	IPV6_ND_TLL     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_ND_TLL
-	MPLS_LABEL      = ofp.OxmOfbFieldTypes_OFPXMT_OFB_MPLS_LABEL
-	MPLS_TC         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_MPLS_TC
-	MPLS_BOS        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_MPLS_BOS
-	PBB_ISID        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_PBB_ISID
-	TUNNEL_ID       = ofp.OxmOfbFieldTypes_OFPXMT_OFB_TUNNEL_ID
-	IPV6_EXTHDR     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_EXTHDR
-)
-
-//ofp_action_* shortcuts
-
-func Output(port uint32, maxLen ...ofp.OfpControllerMaxLen) *ofp.OfpAction {
-	maxLength := ofp.OfpControllerMaxLen_OFPCML_MAX
-	if len(maxLen) > 0 {
-		maxLength = maxLen[0]
-	}
-	return &ofp.OfpAction{Type: OUTPUT, Action: &ofp.OfpAction_Output{Output: &ofp.OfpActionOutput{Port: port, MaxLen: uint32(maxLength)}}}
-}
-
-func MplsTtl(ttl uint32) *ofp.OfpAction {
-	return &ofp.OfpAction{Type: SET_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: ttl}}}
-}
-
-func PushVlan(ethType uint32) *ofp.OfpAction {
-	return &ofp.OfpAction{Type: PUSH_VLAN, Action: &ofp.OfpAction_Push{Push: &ofp.OfpActionPush{Ethertype: ethType}}}
-}
-
-func PopVlan() *ofp.OfpAction {
-	return &ofp.OfpAction{Type: POP_VLAN}
-}
-
-func PopMpls(ethType uint32) *ofp.OfpAction {
-	return &ofp.OfpAction{Type: POP_MPLS, Action: &ofp.OfpAction_PopMpls{PopMpls: &ofp.OfpActionPopMpls{Ethertype: ethType}}}
-}
-
-func Group(groupId uint32) *ofp.OfpAction {
-	return &ofp.OfpAction{Type: GROUP, Action: &ofp.OfpAction_Group{Group: &ofp.OfpActionGroup{GroupId: groupId}}}
-}
-
-func NwTtl(nwTtl uint32) *ofp.OfpAction {
-	return &ofp.OfpAction{Type: NW_TTL, Action: &ofp.OfpAction_NwTtl{NwTtl: &ofp.OfpActionNwTtl{NwTtl: nwTtl}}}
-}
-
-func SetField(field *ofp.OfpOxmOfbField) *ofp.OfpAction {
-	actionSetField := &ofp.OfpOxmField{OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC, Field: &ofp.OfpOxmField_OfbField{OfbField: field}}
-	return &ofp.OfpAction{Type: SET_FIELD, Action: &ofp.OfpAction_SetField{SetField: &ofp.OfpActionSetField{Field: actionSetField}}}
-}
-
-func Experimenter(experimenter uint32, data []byte) *ofp.OfpAction {
-	return &ofp.OfpAction{Type: EXPERIMENTER, Action: &ofp.OfpAction_Experimenter{Experimenter: &ofp.OfpActionExperimenter{Experimenter: experimenter, Data: data}}}
-}
-
-//ofb_field generators (incomplete set)
-
-func InPort(inPort uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IN_PORT, Value: &ofp.OfpOxmOfbField_Port{Port: inPort}}
-}
-
-func InPhyPort(inPhyPort uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IN_PHY_PORT, Value: &ofp.OfpOxmOfbField_Port{Port: inPhyPort}}
-}
-
-func Metadata_ofp(tableMetadata uint64) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: METADATA, Value: &ofp.OfpOxmOfbField_TableMetadata{TableMetadata: tableMetadata}}
-}
-
-// should Metadata_ofp used here ?????
-func EthDst(ethDst uint64) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: ETH_DST, Value: &ofp.OfpOxmOfbField_TableMetadata{TableMetadata: ethDst}}
-}
-
-// should Metadata_ofp used here ?????
-func EthSrc(ethSrc uint64) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: ETH_SRC, Value: &ofp.OfpOxmOfbField_TableMetadata{TableMetadata: ethSrc}}
-}
-
-func EthType(ethType uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: ETH_TYPE, Value: &ofp.OfpOxmOfbField_EthType{EthType: ethType}}
-}
-
-func VlanVid(vlanVid uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: VLAN_VID, Value: &ofp.OfpOxmOfbField_VlanVid{VlanVid: vlanVid}}
-}
-
-func VlanPcp(vlanPcp uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: VLAN_PCP, Value: &ofp.OfpOxmOfbField_VlanPcp{VlanPcp: vlanPcp}}
-}
-
-func IpDscp(ipDscp uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IP_DSCP, Value: &ofp.OfpOxmOfbField_IpDscp{IpDscp: ipDscp}}
-}
-
-func IpEcn(ipEcn uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IP_ECN, Value: &ofp.OfpOxmOfbField_IpEcn{IpEcn: ipEcn}}
-}
-
-func IpProto(ipProto uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IP_PROTO, Value: &ofp.OfpOxmOfbField_IpProto{IpProto: ipProto}}
-}
-
-func Ipv4Src(ipv4Src uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IPV4_SRC, Value: &ofp.OfpOxmOfbField_Ipv4Src{Ipv4Src: ipv4Src}}
-}
-
-func Ipv4Dst(ipv4Dst uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IPV4_DST, Value: &ofp.OfpOxmOfbField_Ipv4Dst{Ipv4Dst: ipv4Dst}}
-}
-
-func TcpSrc(tcpSrc uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: TCP_SRC, Value: &ofp.OfpOxmOfbField_TcpSrc{TcpSrc: tcpSrc}}
-}
-
-func TcpDst(tcpDst uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: TCP_DST, Value: &ofp.OfpOxmOfbField_TcpDst{TcpDst: tcpDst}}
-}
-
-func UdpSrc(udpSrc uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: UDP_SRC, Value: &ofp.OfpOxmOfbField_UdpSrc{UdpSrc: udpSrc}}
-}
-
-func UdpDst(udpDst uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: UDP_DST, Value: &ofp.OfpOxmOfbField_UdpDst{UdpDst: udpDst}}
-}
-
-func SctpSrc(sctpSrc uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: SCTP_SRC, Value: &ofp.OfpOxmOfbField_SctpSrc{SctpSrc: sctpSrc}}
-}
-
-func SctpDst(sctpDst uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: SCTP_DST, Value: &ofp.OfpOxmOfbField_SctpDst{SctpDst: sctpDst}}
-}
-
-func Icmpv4Type(icmpv4Type uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: ICMPV4_TYPE, Value: &ofp.OfpOxmOfbField_Icmpv4Type{Icmpv4Type: icmpv4Type}}
-}
-
-func Icmpv4Code(icmpv4Code uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: ICMPV4_CODE, Value: &ofp.OfpOxmOfbField_Icmpv4Code{Icmpv4Code: icmpv4Code}}
-}
-
-func ArpOp(arpOp uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: ARP_OP, Value: &ofp.OfpOxmOfbField_ArpOp{ArpOp: arpOp}}
-}
-
-func ArpSpa(arpSpa uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: ARP_SPA, Value: &ofp.OfpOxmOfbField_ArpSpa{ArpSpa: arpSpa}}
-}
-
-func ArpTpa(arpTpa uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: ARP_TPA, Value: &ofp.OfpOxmOfbField_ArpTpa{ArpTpa: arpTpa}}
-}
-
-func ArpSha(arpSha []byte) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: ARP_SHA, Value: &ofp.OfpOxmOfbField_ArpSha{ArpSha: arpSha}}
-}
-
-func ArpTha(arpTha []byte) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: ARP_THA, Value: &ofp.OfpOxmOfbField_ArpTha{ArpTha: arpTha}}
-}
-
-func Ipv6Src(ipv6Src []byte) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IPV6_SRC, Value: &ofp.OfpOxmOfbField_Ipv6Src{Ipv6Src: ipv6Src}}
-}
-
-func Ipv6Dst(ipv6Dst []byte) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IPV6_DST, Value: &ofp.OfpOxmOfbField_Ipv6Dst{Ipv6Dst: ipv6Dst}}
-}
-
-func Ipv6Flabel(ipv6Flabel uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IPV6_FLABEL, Value: &ofp.OfpOxmOfbField_Ipv6Flabel{Ipv6Flabel: ipv6Flabel}}
-}
-
-func Icmpv6Type(icmpv6Type uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: ICMPV6_TYPE, Value: &ofp.OfpOxmOfbField_Icmpv6Type{Icmpv6Type: icmpv6Type}}
-}
-
-func Icmpv6Code(icmpv6Code uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: ICMPV6_CODE, Value: &ofp.OfpOxmOfbField_Icmpv6Code{Icmpv6Code: icmpv6Code}}
-}
-
-func Ipv6NdTarget(ipv6NdTarget []byte) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IPV6_ND_TARGET, Value: &ofp.OfpOxmOfbField_Ipv6NdTarget{Ipv6NdTarget: ipv6NdTarget}}
-}
-
-func OfbIpv6NdSll(ofbIpv6NdSll []byte) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: OFB_IPV6_ND_SLL, Value: &ofp.OfpOxmOfbField_Ipv6NdSsl{Ipv6NdSsl: ofbIpv6NdSll}}
-}
-
-func Ipv6NdTll(ipv6NdTll []byte) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IPV6_ND_TLL, Value: &ofp.OfpOxmOfbField_Ipv6NdTll{Ipv6NdTll: ipv6NdTll}}
-}
-
-func MplsLabel(mplsLabel uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: MPLS_LABEL, Value: &ofp.OfpOxmOfbField_MplsLabel{MplsLabel: mplsLabel}}
-}
-
-func MplsTc(mplsTc uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: MPLS_TC, Value: &ofp.OfpOxmOfbField_MplsTc{MplsTc: mplsTc}}
-}
-
-func MplsBos(mplsBos uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: MPLS_BOS, Value: &ofp.OfpOxmOfbField_MplsBos{MplsBos: mplsBos}}
-}
-
-func PbbIsid(pbbIsid uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: PBB_ISID, Value: &ofp.OfpOxmOfbField_PbbIsid{PbbIsid: pbbIsid}}
-}
-
-func TunnelId(tunnelId uint64) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: TUNNEL_ID, Value: &ofp.OfpOxmOfbField_TunnelId{TunnelId: tunnelId}}
-}
-
-func Ipv6Exthdr(ipv6Exthdr uint32) *ofp.OfpOxmOfbField {
-	return &ofp.OfpOxmOfbField{Type: IPV6_EXTHDR, Value: &ofp.OfpOxmOfbField_Ipv6Exthdr{Ipv6Exthdr: ipv6Exthdr}}
-}
-
-//frequently used extractors
-
-func excludeAction(action *ofp.OfpAction, exclude ...ofp.OfpActionType) bool {
-	for _, actionToExclude := range exclude {
-		if action.Type == actionToExclude {
-			return true
-		}
-	}
-	return false
-}
-
-func GetActions(flow *ofp.OfpFlowStats, exclude ...ofp.OfpActionType) []*ofp.OfpAction {
-	if flow == nil {
-		return nil
-	}
-	for _, instruction := range flow.Instructions {
-		if instruction.Type == uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS) {
-			instActions := instruction.GetActions()
-			if instActions == nil {
-				return nil
-			}
-			if len(exclude) == 0 {
-				return instActions.Actions
-			} else {
-				filteredAction := make([]*ofp.OfpAction, 0)
-				for _, action := range instActions.Actions {
-					if !excludeAction(action, exclude...) {
-						filteredAction = append(filteredAction, action)
-					}
-				}
-				return filteredAction
-			}
-		}
-	}
-	return nil
-}
-
-func UpdateOutputPortByActionType(flow *ofp.OfpFlowStats, actionType uint32, toPort uint32) *ofp.OfpFlowStats {
-	if flow == nil {
-		return nil
-	}
-	nFlow := (proto.Clone(flow)).(*ofp.OfpFlowStats)
-	nFlow.Instructions = nil
-	nInsts := make([]*ofp.OfpInstruction, 0)
-	for _, instruction := range flow.Instructions {
-		if instruction.Type == actionType {
-			instActions := instruction.GetActions()
-			if instActions == nil {
-				return nil
-			}
-			nActions := make([]*ofp.OfpAction, 0)
-			for _, action := range instActions.Actions {
-				if action.GetOutput() != nil {
-					nActions = append(nActions, Output(toPort))
-				} else {
-					nActions = append(nActions, action)
-				}
-			}
-			instructionAction := ofp.OfpInstruction_Actions{Actions: &ofp.OfpInstructionActions{Actions: nActions}}
-			nInsts = append(nInsts, &ofp.OfpInstruction{Type: uint32(APPLY_ACTIONS), Data: &instructionAction})
-		} else {
-			nInsts = append(nInsts, instruction)
-		}
-	}
-	nFlow.Instructions = nInsts
-	return nFlow
-}
-
-func excludeOxmOfbField(field *ofp.OfpOxmOfbField, exclude ...ofp.OxmOfbFieldTypes) bool {
-	for _, fieldToExclude := range exclude {
-		if field.Type == fieldToExclude {
-			return true
-		}
-	}
-	return false
-}
-
-func GetOfbFields(flow *ofp.OfpFlowStats, exclude ...ofp.OxmOfbFieldTypes) []*ofp.OfpOxmOfbField {
-	if flow == nil || flow.Match == nil || flow.Match.Type != ofp.OfpMatchType_OFPMT_OXM {
-		return nil
-	}
-	ofbFields := make([]*ofp.OfpOxmOfbField, 0)
-	for _, field := range flow.Match.OxmFields {
-		if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
-			ofbFields = append(ofbFields, field.GetOfbField())
-		}
-	}
-	if len(exclude) == 0 {
-		return ofbFields
-	} else {
-		filteredFields := make([]*ofp.OfpOxmOfbField, 0)
-		for _, ofbField := range ofbFields {
-			if !excludeOxmOfbField(ofbField, exclude...) {
-				filteredFields = append(filteredFields, ofbField)
-			}
-		}
-		return filteredFields
-	}
-}
-
-func GetPacketOutPort(packet *ofp.OfpPacketOut) uint32 {
-	if packet == nil {
-		return 0
-	}
-	for _, action := range packet.GetActions() {
-		if action.Type == OUTPUT {
-			return action.GetOutput().Port
-		}
-	}
-	return 0
-}
-
-func GetOutPort(flow *ofp.OfpFlowStats) uint32 {
-	if flow == nil {
-		return 0
-	}
-	for _, action := range GetActions(flow) {
-		if action.Type == OUTPUT {
-			out := action.GetOutput()
-			if out == nil {
-				return 0
-			}
-			return out.GetPort()
-		}
-	}
-	return 0
-}
-
-func GetInPort(flow *ofp.OfpFlowStats) uint32 {
-	if flow == nil {
-		return 0
-	}
-	for _, field := range GetOfbFields(flow) {
-		if field.Type == IN_PORT {
-			return field.GetPort()
-		}
-	}
-	return 0
-}
-
-func GetGotoTableId(flow *ofp.OfpFlowStats) uint32 {
-	if flow == nil {
-		return 0
-	}
-	for _, instruction := range flow.Instructions {
-		if instruction.Type == uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE) {
-			gotoTable := instruction.GetGotoTable()
-			if gotoTable == nil {
-				return 0
-			}
-			return gotoTable.GetTableId()
-		}
-	}
-	return 0
-}
-
-func GetTunnelId(flow *ofp.OfpFlowStats) uint64 {
-	if flow == nil {
-		return 0
-	}
-	for _, field := range GetOfbFields(flow) {
-		if field.Type == TUNNEL_ID {
-			return field.GetTunnelId()
-		}
-	}
-	return 0
-}
-
-//GetMetaData - legacy get method (only want lower 32 bits)
-func GetMetaData(flow *ofp.OfpFlowStats) uint32 {
-	if flow == nil {
-		return 0
-	}
-	for _, field := range GetOfbFields(flow) {
-		if field.Type == METADATA {
-			return uint32(field.GetTableMetadata() & 0xffffffff)
-		}
-	}
-	return 0
-}
-
-func GetMetaData64Bit(flow *ofp.OfpFlowStats) uint64 {
-	if flow == nil {
-		return 0
-	}
-	for _, field := range GetOfbFields(flow) {
-		if field.Type == METADATA {
-			return field.GetTableMetadata()
-		}
-	}
-	return 0
-}
-
-// GetPortNumberFromMetadata retrieves the port number from the Metadata_ofp. The port number (UNI on ONU) is in the
-// lower 32-bits of Metadata_ofp and the inner_tag is in the upper 32-bits. This is set in the ONOS OltPipeline as
-// a Metadata_ofp field
-func GetPortNumberFromMetadata(flow *ofp.OfpFlowStats) uint64 {
-	md := GetMetaData64Bit(flow)
-	if md == 0 {
-		return 0
-	}
-	if md <= 0xffffffff {
-		log.Debugw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
-		return md
-	}
-	return md & 0xffffffff
-}
-
-//GetInnerTagFromMetaData retrieves the inner tag from the Metadata_ofp. The port number (UNI on ONU) is in the
-// lower 32-bits of Metadata_ofp and the inner_tag is in the upper 32-bits. This is set in the ONOS OltPipeline as
-//// a Metadata_ofp field
-func GetInnerTagFromMetaData(flow *ofp.OfpFlowStats) uint64 {
-	md := GetMetaData64Bit(flow)
-	if md == 0 {
-		return 0
-	}
-	if md <= 0xffffffff {
-		log.Debugw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
-		return md
-	}
-	return (md >> 32) & 0xffffffff
-}
-
-// Extract the child device port from a flow that contains the parent device peer port.  Typically the UNI port of an
-// ONU child device.  Per TST agreement this will be the lower 32 bits of tunnel id reserving upper 32 bits for later
-// use
-func GetChildPortFromTunnelId(flow *ofp.OfpFlowStats) uint32 {
-	tid := GetTunnelId(flow)
-	if tid == 0 {
-		return 0
-	}
-	// Per TST agreement we are keeping any child port id (uni port id) in the lower 32 bits
-	return uint32(tid & 0xffffffff)
-}
-
-func HasNextTable(flow *ofp.OfpFlowStats) bool {
-	if flow == nil {
-		return false
-	}
-	return GetGotoTableId(flow) != 0
-}
-
-func GetGroup(flow *ofp.OfpFlowStats) uint32 {
-	if flow == nil {
-		return 0
-	}
-	for _, action := range GetActions(flow) {
-		if action.Type == GROUP {
-			grp := action.GetGroup()
-			if grp == nil {
-				return 0
-			}
-			return grp.GetGroupId()
-		}
-	}
-	return 0
-}
-
-func HasGroup(flow *ofp.OfpFlowStats) bool {
-	return GetGroup(flow) != 0
-}
-
-// GetNextTableId returns the next table ID if the "table_id" is present in the map, otherwise return nil
-func GetNextTableId(kw fu.OfpFlowModArgs) *uint32 {
-	if val, exist := kw["table_id"]; exist {
-		ret := uint32(val)
-		return &ret
-	}
-	return nil
-}
-
-// Return unique 64-bit integer hash for flow covering the following attributes:
-// 'table_id', 'priority', 'flags', 'cookie', 'match', '_instruction_string'
-func hashFlowStats(flow *ofp.OfpFlowStats) uint64 {
-	if flow == nil { // Should never happen
-		return 0
-	}
-	// Create string with the instructions field first
-	var instructionString bytes.Buffer
-	for _, instruction := range flow.Instructions {
-		instructionString.WriteString(instruction.String())
-	}
-	var flowString = fmt.Sprintf("%d%d%d%d%s%s", flow.TableId, flow.Priority, flow.Flags, flow.Cookie, flow.Match.String(), instructionString.String())
-	h := md5.New()
-	h.Write([]byte(flowString))
-	hash := big.NewInt(0)
-	hash.SetBytes(h.Sum(nil))
-	return hash.Uint64()
-}
-
-// flowStatsEntryFromFlowModMessage maps an ofp_flow_mod message to an ofp_flow_stats message
-func FlowStatsEntryFromFlowModMessage(mod *ofp.OfpFlowMod) *ofp.OfpFlowStats {
-	flow := &ofp.OfpFlowStats{}
-	if mod == nil {
-		return flow
-	}
-	flow.TableId = mod.TableId
-	flow.Priority = mod.Priority
-	flow.IdleTimeout = mod.IdleTimeout
-	flow.HardTimeout = mod.HardTimeout
-	flow.Flags = mod.Flags
-	flow.Cookie = mod.Cookie
-	flow.Match = mod.Match
-	flow.Instructions = mod.Instructions
-	flow.Id = hashFlowStats(flow)
-	return flow
-}
-
-func GroupEntryFromGroupMod(mod *ofp.OfpGroupMod) *ofp.OfpGroupEntry {
-	group := &ofp.OfpGroupEntry{}
-	if mod == nil {
-		return group
-	}
-	group.Desc = &ofp.OfpGroupDesc{Type: mod.Type, GroupId: mod.GroupId, Buckets: mod.Buckets}
-	group.Stats = &ofp.OfpGroupStats{GroupId: mod.GroupId}
-	//TODO do we need to instantiate bucket bins?
-	return group
-}
-
-func MkOxmFields(matchFields []ofp.OfpOxmField) []*ofp.OfpOxmField {
-	oxmFields := make([]*ofp.OfpOxmField, 0)
-	for _, matchField := range matchFields {
-		oxmField := ofp.OfpOxmField{OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC, Field: matchField.Field}
-		oxmFields = append(oxmFields, &oxmField)
-	}
-	return oxmFields
-}
-
-func MkInstructionsFromActions(actions []*ofp.OfpAction) []*ofp.OfpInstruction {
-	instructions := make([]*ofp.OfpInstruction, 0)
-	instructionAction := ofp.OfpInstruction_Actions{Actions: &ofp.OfpInstructionActions{Actions: actions}}
-	instruction := ofp.OfpInstruction{Type: uint32(APPLY_ACTIONS), Data: &instructionAction}
-	instructions = append(instructions, &instruction)
-	return instructions
-}
-
-// Convenience function to generare ofp_flow_mod message with OXM BASIC match composed from the match_fields, and
-// single APPLY_ACTIONS instruction with a list if ofp_action objects.
-func MkSimpleFlowMod(matchFields []*ofp.OfpOxmField, actions []*ofp.OfpAction, command *ofp.OfpFlowModCommand, kw fu.OfpFlowModArgs) *ofp.OfpFlowMod {
-
-	// Process actions instructions
-	instructions := make([]*ofp.OfpInstruction, 0)
-	instructionAction := ofp.OfpInstruction_Actions{Actions: &ofp.OfpInstructionActions{Actions: actions}}
-	instruction := ofp.OfpInstruction{Type: uint32(APPLY_ACTIONS), Data: &instructionAction}
-	instructions = append(instructions, &instruction)
-
-	// Process next table
-	if tableId := GetNextTableId(kw); tableId != nil {
-		var instGotoTable ofp.OfpInstruction_GotoTable
-		instGotoTable.GotoTable = &ofp.OfpInstructionGotoTable{TableId: *tableId}
-		inst := ofp.OfpInstruction{Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE), Data: &instGotoTable}
-		instructions = append(instructions, &inst)
-	}
-
-	// Process match fields
-	oxmFields := make([]*ofp.OfpOxmField, 0)
-	for _, matchField := range matchFields {
-		oxmField := ofp.OfpOxmField{OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC, Field: matchField.Field}
-		oxmFields = append(oxmFields, &oxmField)
-	}
-	var match ofp.OfpMatch
-	match.Type = ofp.OfpMatchType_OFPMT_OXM
-	match.OxmFields = oxmFields
-
-	// Create ofp_flow_message
-	msg := &ofp.OfpFlowMod{}
-	if command == nil {
-		msg.Command = ofp.OfpFlowModCommand_OFPFC_ADD
-	} else {
-		msg.Command = *command
-	}
-	msg.Instructions = instructions
-	msg.Match = &match
-
-	// Set the variadic argument values
-	msg = setVariadicModAttributes(msg, kw)
-
-	return msg
-}
-
-func MkMulticastGroupMod(groupId uint32, buckets []*ofp.OfpBucket, command *ofp.OfpGroupModCommand) *ofp.OfpGroupMod {
-	group := &ofp.OfpGroupMod{}
-	if command == nil {
-		group.Command = ofp.OfpGroupModCommand_OFPGC_ADD
-	} else {
-		group.Command = *command
-	}
-	group.Type = ofp.OfpGroupType_OFPGT_ALL
-	group.GroupId = groupId
-	group.Buckets = buckets
-	return group
-}
-
-//SetVariadicModAttributes sets only uint64 or uint32 fields of the ofp_flow_mod message
-func setVariadicModAttributes(mod *ofp.OfpFlowMod, args fu.OfpFlowModArgs) *ofp.OfpFlowMod {
-	if args == nil {
-		return mod
-	}
-	for key, val := range args {
-		switch key {
-		case "cookie":
-			mod.Cookie = val
-		case "cookie_mask":
-			mod.CookieMask = val
-		case "table_id":
-			mod.TableId = uint32(val)
-		case "idle_timeout":
-			mod.IdleTimeout = uint32(val)
-		case "hard_timeout":
-			mod.HardTimeout = uint32(val)
-		case "priority":
-			mod.Priority = uint32(val)
-		case "buffer_id":
-			mod.BufferId = uint32(val)
-		case "out_port":
-			mod.OutPort = uint32(val)
-		case "out_group":
-			mod.OutGroup = uint32(val)
-		case "flags":
-			mod.Flags = uint32(val)
-		}
-	}
-	return mod
-}
-
-func MkPacketIn(port uint32, packet []byte) *ofp.OfpPacketIn {
-	packetIn := &ofp.OfpPacketIn{
-		Reason: ofp.OfpPacketInReason_OFPR_ACTION,
-		Match: &ofp.OfpMatch{
-			Type: ofp.OfpMatchType_OFPMT_OXM,
-			OxmFields: []*ofp.OfpOxmField{
-				{
-					OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC,
-					Field: &ofp.OfpOxmField_OfbField{
-						OfbField: InPort(port)},
-				},
-			},
-		},
-		Data: packet,
-	}
-	return packetIn
-}
-
-// MkFlowStat is a helper method to build flows
-func MkFlowStat(fa *fu.FlowArgs) *ofp.OfpFlowStats {
-	//Build the matchfields
-	matchFields := make([]*ofp.OfpOxmField, 0)
-	for _, val := range fa.MatchFields {
-		matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
-	}
-	return FlowStatsEntryFromFlowModMessage(MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV))
-}
-
-func MkGroupStat(ga *fu.GroupArgs) *ofp.OfpGroupEntry {
-	return GroupEntryFromGroupMod(MkMulticastGroupMod(ga.GroupId, ga.Buckets, ga.Command))
-}
-
 type FlowDecomposer struct {
 	deviceMgr coreIf.DeviceManager
 }
@@ -805,9 +71,9 @@
 // Handles special case of any controller-bound flow for a parent device
 func (fd *FlowDecomposer) updateOutputPortForControllerBoundFlowForParentDevide(flow *ofp.OfpFlowStats,
 	dr *fu.DeviceRules) *fu.DeviceRules {
-	EAPOL := EthType(0x888e)
-	IGMP := IpProto(2)
-	UDP := IpProto(17)
+	EAPOL := fu.EthType(0x888e)
+	IGMP := fu.IpProto(2)
+	UDP := fu.IpProto(17)
 
 	newDeviceRules := dr.Copy()
 	//	Check whether we are dealing with a parent device
@@ -817,7 +83,7 @@
 			for i := 0; i < fg.Flows.Len(); i++ {
 				f := fg.GetFlow(i)
 				UpdateOutPortNo := false
-				for _, field := range GetOfbFields(f) {
+				for _, field := range fu.GetOfbFields(f) {
 					UpdateOutPortNo = (field.String() == EAPOL.String())
 					UpdateOutPortNo = UpdateOutPortNo || (field.String() == IGMP.String())
 					UpdateOutPortNo = UpdateOutPortNo || (field.String() == UDP.String())
@@ -826,11 +92,11 @@
 					}
 				}
 				if UpdateOutPortNo {
-					f = UpdateOutputPortByActionType(f, uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
+					f = fu.UpdateOutputPortByActionType(f, uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
 						uint32(ofp.OfpPortNo_OFPP_CONTROLLER))
 				}
 				// Update flow Id as a change in the instruction field will result in a new flow ID
-				f.Id = hashFlowStats(f)
+				f.Id = fu.HashFlowStats(f)
 				newDeviceRules.AddFlow(deviceId, (proto.Clone(f)).(*ofp.OfpFlowStats))
 			}
 		}
@@ -870,36 +136,36 @@
 			fa = &fu.FlowArgs{
 				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 				MatchFields: []*ofp.OfpOxmOfbField{
-					InPort(egressHop.Ingress),
-					VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | inputPort),
-					TunnelId(uint64(inputPort)),
+					fu.InPort(egressHop.Ingress),
+					fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | inputPort),
+					fu.TunnelId(uint64(inputPort)),
 				},
 				Actions: []*ofp.OfpAction{
-					PushVlan(0x8100),
-					SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
-					Output(egressHop.Egress),
+					fu.PushVlan(0x8100),
+					fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
+					fu.Output(egressHop.Egress),
 				},
 			}
 			// Augment the matchfields with the ofpfields from the flow
-			fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT, VLAN_VID)...)
-			fg.AddFlow(MkFlowStat(fa))
+			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT, fu.VLAN_VID)...)
+			fg.AddFlow(fu.MkFlowStat(fa))
 
 			// Downstream flow
 			fa = &fu.FlowArgs{
 				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority)},
 				MatchFields: []*ofp.OfpOxmOfbField{
-					InPort(egressHop.Egress),
-					VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
-					VlanPcp(0),
-					Metadata_ofp(uint64(inputPort)),
-					TunnelId(uint64(inputPort)),
+					fu.InPort(egressHop.Egress),
+					fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
+					fu.VlanPcp(0),
+					fu.Metadata_ofp(uint64(inputPort)),
+					fu.TunnelId(uint64(inputPort)),
 				},
 				Actions: []*ofp.OfpAction{
-					PopVlan(),
-					Output(egressHop.Ingress),
+					fu.PopVlan(),
+					fu.Output(egressHop.Ingress),
 				},
 			}
-			fg.AddFlow(MkFlowStat(fa))
+			fg.AddFlow(fu.MkFlowStat(fa))
 		}
 	}
 	deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
@@ -920,7 +186,7 @@
 	ingressHop := route[0]
 	egressHop := route[1]
 
-	if HasNextTable(flow) {
+	if fu.HasNextTable(flow) {
 		log.Debugw("has-next-table", log.Fields{"table_id": flow.TableId})
 		if outPortNo != 0 {
 			log.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
@@ -929,26 +195,26 @@
 		fa = &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				InPort(ingressHop.Ingress),
-				TunnelId(uint64(inPortNo)),
+				fu.InPort(ingressHop.Ingress),
+				fu.TunnelId(uint64(inPortNo)),
 			},
-			Actions: GetActions(flow),
+			Actions: fu.GetActions(flow),
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT)...)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 
 		// Augment the Actions
-		fa.Actions = append(fa.Actions, Output(ingressHop.Egress))
+		fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
 
 		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(MkFlowStat(fa))
+		fg.AddFlow(fu.MkFlowStat(fa))
 		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
 	} else {
 		var actions []ofp.OfpActionType
 		var isOutputTypeInActions bool
-		for _, action := range GetActions(flow) {
+		for _, action := range fu.GetActions(flow) {
 			actions = append(actions, action.Type)
-			if !isOutputTypeInActions && action.Type == OUTPUT {
+			if !isOutputTypeInActions && action.Type == fu.OUTPUT {
 				isOutputTypeInActions = true
 			}
 		}
@@ -958,33 +224,33 @@
 			fa = &fu.FlowArgs{
 				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 				MatchFields: []*ofp.OfpOxmOfbField{
-					InPort(ingressHop.Ingress),
+					fu.InPort(ingressHop.Ingress),
 				},
 				Actions: []*ofp.OfpAction{
-					Output(ingressHop.Egress),
+					fu.Output(ingressHop.Egress),
 				},
 			}
 			// Augment the matchfields with the ofpfields from the flow
-			fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT)...)
+			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 			fg := fu.NewFlowsAndGroups()
-			fg.AddFlow(MkFlowStat(fa))
+			fg.AddFlow(fu.MkFlowStat(fa))
 			deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
 
 			// parent device flow
 			fa = &fu.FlowArgs{
 				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 				MatchFields: []*ofp.OfpOxmOfbField{
-					InPort(egressHop.Ingress), //egress_hop.ingress_port.port_no
-					TunnelId(uint64(inPortNo)),
+					fu.InPort(egressHop.Ingress), //egress_hop.ingress_port.port_no
+					fu.TunnelId(uint64(inPortNo)),
 				},
 				Actions: []*ofp.OfpAction{
-					Output(egressHop.Egress),
+					fu.Output(egressHop.Egress),
 				},
 			}
 			// Augment the matchfields with the ofpfields from the flow
-			fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT)...)
+			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 			fg = fu.NewFlowsAndGroups()
-			fg.AddFlow(MkFlowStat(fa))
+			fg.AddFlow(fu.MkFlowStat(fa))
 			deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
 		} else {
 			if outPortNo == 0 {
@@ -994,20 +260,20 @@
 			fa = &fu.FlowArgs{
 				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 				MatchFields: []*ofp.OfpOxmOfbField{
-					InPort(egressHop.Ingress),
-					TunnelId(uint64(inPortNo)),
+					fu.InPort(egressHop.Ingress),
+					fu.TunnelId(uint64(inPortNo)),
 				},
 			}
 			// Augment the matchfields with the ofpfields from the flow
-			fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT)...)
+			fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 
 			//Augment the actions
-			filteredAction := GetActions(flow, OUTPUT)
-			filteredAction = append(filteredAction, Output(egressHop.Egress))
+			filteredAction := fu.GetActions(flow, fu.OUTPUT)
+			filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
 			fa.Actions = filteredAction
 
 			fg := fu.NewFlowsAndGroups()
-			fg.AddFlow(MkFlowStat(fa))
+			fg.AddFlow(fu.MkFlowStat(fa))
 			deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
 		}
 	}
@@ -1027,14 +293,14 @@
 	ingressHop := route[0]
 	egressHop := route[1]
 
-	if GetMetaData(flow) != 0 {
+	if fu.GetMetaData(flow) != 0 {
 		log.Debugw("creating-metadata-flow", log.Fields{"flow": flow})
-		portNumber := uint32(GetPortNumberFromMetadata(flow))
+		portNumber := uint32(fu.GetPortNumberFromMetadata(flow))
 		if portNumber != 0 {
 			recalculatedRoute := agent.GetRoute(inPortNo, portNumber)
 			switch len(recalculatedRoute) {
 			case 0:
-				log.Errorw("no-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": GetMetaData64Bit(flow)})
+				log.Errorw("no-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": fu.GetMetaData64Bit(flow)})
 				//	TODO: Delete flow
 				return deviceRules
 			case 2:
@@ -1046,9 +312,9 @@
 			}
 			ingressHop = recalculatedRoute[0]
 		}
-		innerTag := GetInnerTagFromMetaData(flow)
+		innerTag := fu.GetInnerTagFromMetaData(flow)
 		if innerTag == 0 {
-			log.Errorw("no-inner-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": GetMetaData64Bit(flow)})
+			log.Errorw("no-inner-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": fu.GetMetaData64Bit(flow)})
 			//	TODO: Delete flow
 			return deviceRules
 		}
@@ -1056,20 +322,20 @@
 		fa = &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				InPort(ingressHop.Ingress),
-				Metadata_ofp(innerTag),
-				TunnelId(uint64(portNumber)),
+				fu.InPort(ingressHop.Ingress),
+				fu.Metadata_ofp(innerTag),
+				fu.TunnelId(uint64(portNumber)),
 			},
-			Actions: GetActions(flow),
+			Actions: fu.GetActions(flow),
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT, METADATA)...)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT, fu.METADATA)...)
 
 		// Augment the Actions
-		fa.Actions = append(fa.Actions, Output(ingressHop.Egress))
+		fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
 
 		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(MkFlowStat(fa))
+		fg.AddFlow(fu.MkFlowStat(fa))
 		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
 	} else { // Create standard flow
 		log.Debugw("creating-standard-flow", log.Fields{"flow": flow})
@@ -1077,19 +343,19 @@
 		fa = &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				InPort(ingressHop.Ingress),
-				TunnelId(uint64(inPortNo)),
+				fu.InPort(ingressHop.Ingress),
+				fu.TunnelId(uint64(inPortNo)),
 			},
-			Actions: GetActions(flow),
+			Actions: fu.GetActions(flow),
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT)...)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 
 		// Augment the Actions
-		fa.Actions = append(fa.Actions, Output(ingressHop.Egress))
+		fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
 
 		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(MkFlowStat(fa))
+		fg.AddFlow(fu.MkFlowStat(fa))
 		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
 	}
 
@@ -1108,9 +374,9 @@
 
 	var actions []ofp.OfpActionType
 	var isOutputTypeInActions bool
-	for _, action := range GetActions(flow) {
+	for _, action := range fu.GetActions(flow) {
 		actions = append(actions, action.Type)
-		if !isOutputTypeInActions && action.Type == OUTPUT {
+		if !isOutputTypeInActions && action.Type == fu.OUTPUT {
 			isOutputTypeInActions = true
 		}
 	}
@@ -1120,54 +386,54 @@
 		fa = &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				InPort(ingressHop.Ingress),
-				TunnelId(uint64(inPortNo)),
+				fu.InPort(ingressHop.Ingress),
+				fu.TunnelId(uint64(inPortNo)),
 			},
 			Actions: []*ofp.OfpAction{
-				Output(ingressHop.Egress),
+				fu.Output(ingressHop.Egress),
 			},
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT)...)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 
 		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(MkFlowStat(fa))
+		fg.AddFlow(fu.MkFlowStat(fa))
 		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
 
 		// Child device flow
 		fa = &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				InPort(egressHop.Ingress),
+				fu.InPort(egressHop.Ingress),
 			},
 			Actions: []*ofp.OfpAction{
-				Output(egressHop.Egress),
+				fu.Output(egressHop.Egress),
 			},
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT)...)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 
 		fg = fu.NewFlowsAndGroups()
-		fg.AddFlow(MkFlowStat(fa))
+		fg.AddFlow(fu.MkFlowStat(fa))
 		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
 	} else {
 		var fa *fu.FlowArgs
 		fa = &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				InPort(egressHop.Ingress),
+				fu.InPort(egressHop.Ingress),
 			},
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT)...)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 
 		// Augment the Actions
-		filteredAction := GetActions(flow, OUTPUT)
-		filteredAction = append(filteredAction, Output(egressHop.Egress))
+		filteredAction := fu.GetActions(flow, fu.OUTPUT)
+		filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
 		fa.Actions = filteredAction
 
 		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(MkFlowStat(fa))
+		fg.AddFlow(fu.MkFlowStat(fa))
 		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
 	}
 	return deviceRules
@@ -1195,9 +461,9 @@
 	for _, bucket := range grp.Desc.Buckets {
 		otherActions := make([]*ofp.OfpAction, 0)
 		for _, action := range bucket.Actions {
-			if action.Type == OUTPUT {
+			if action.Type == fu.OUTPUT {
 				outPortNo = action.GetOutput().Port
-			} else if action.Type != POP_VLAN {
+			} else if action.Type != fu.POP_VLAN {
 				otherActions = append(otherActions, action)
 			}
 		}
@@ -1229,38 +495,38 @@
 		fa = &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				InPort(ingressHop.Ingress),
+				fu.InPort(ingressHop.Ingress),
 			},
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT)...)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 
 		// Augment the Actions
-		filteredAction := GetActions(flow, GROUP)
-		filteredAction = append(filteredAction, PopVlan())
-		filteredAction = append(filteredAction, Output(route2[1].Ingress))
+		filteredAction := fu.GetActions(flow, fu.GROUP)
+		filteredAction = append(filteredAction, fu.PopVlan())
+		filteredAction = append(filteredAction, fu.Output(route2[1].Ingress))
 		fa.Actions = filteredAction
 
 		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(MkFlowStat(fa))
+		fg.AddFlow(fu.MkFlowStat(fa))
 		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
 
 		// Set the child device flow
 		fa = &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
 			MatchFields: []*ofp.OfpOxmOfbField{
-				InPort(egressHop.Ingress),
+				fu.InPort(egressHop.Ingress),
 			},
 		}
 		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT, VLAN_VID, VLAN_PCP)...)
+		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT, fu.VLAN_VID, fu.VLAN_PCP)...)
 
 		// Augment the Actions
-		otherActions = append(otherActions, Output(egressHop.Egress))
+		otherActions = append(otherActions, fu.Output(egressHop.Egress))
 		fa.Actions = otherActions
 
 		fg = fu.NewFlowsAndGroups()
-		fg.AddFlow(MkFlowStat(fa))
+		fg.AddFlow(fu.MkFlowStat(fa))
 		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
 	}
 	return deviceRules
@@ -1270,8 +536,8 @@
 func (fd *FlowDecomposer) decomposeFlow(agent coreIf.LogicalDeviceAgent, flow *ofp.OfpFlowStats,
 	groupMap map[uint32]*ofp.OfpGroupEntry) *fu.DeviceRules {
 
-	inPortNo := GetInPort(flow)
-	outPortNo := GetOutPort(flow)
+	inPortNo := fu.GetInPort(flow)
+	outPortNo := fu.GetOutPort(flow)
 	deviceRules := fu.NewDeviceRules()
 	route := agent.GetRoute(inPortNo, outPortNo)
 
@@ -1301,11 +567,11 @@
 		isUpstream := !ingressDevice.Root
 		if isUpstream {
 			deviceRules = fd.processUpstreamNonControllerBoundFlow(agent, route, inPortNo, outPortNo, flow)
-		} else if HasNextTable(flow) {
+		} else if fu.HasNextTable(flow) {
 			deviceRules = fd.processDownstreamFlowWithNextTable(agent, route, inPortNo, outPortNo, flow)
 		} else if outPortNo != 0 { // Unicast
 			deviceRules = fd.processUnicastFlow(agent, route, inPortNo, outPortNo, flow)
-		} else if grpId := GetGroup(flow); grpId != 0 { //Multicast
+		} else if grpId := fu.GetGroup(flow); grpId != 0 { //Multicast
 			deviceRules = fd.processMulticastFlow(agent, route, inPortNo, outPortNo, flow, grpId, groupMap)
 		}
 	}
diff --git a/rw_core/flow_decomposition/flow_decomposer_test.go b/rw_core/flow_decomposition/flow_decomposer_test.go
index 5d914ac..6464c5d 100644
--- a/rw_core/flow_decomposition/flow_decomposer_test.go
+++ b/rw_core/flow_decomposition/flow_decomposer_test.go
@@ -343,57 +343,57 @@
 	var fa *fu.FlowArgs
 	fa = &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(2),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.InPort(2),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 		},
 		Actions: []*ofp.OfpAction{
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
-			Output(1),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+			fu.Output(1),
 		},
 	}
-	fg.AddFlow(MkFlowStat(fa))
+	fg.AddFlow(fu.MkFlowStat(fa))
 	tfd.defaultRules.AddFlowsAndGroup("onu1", fg)
 
 	fg = fu.NewFlowsAndGroups()
 	fa = &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(2),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.InPort(2),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 		},
 		Actions: []*ofp.OfpAction{
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 102)),
-			Output(1),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 102)),
+			fu.Output(1),
 		},
 	}
-	fg.AddFlow(MkFlowStat(fa))
+	fg.AddFlow(fu.MkFlowStat(fa))
 	tfd.defaultRules.AddFlowsAndGroup("onu2", fg)
 
 	fg = fu.NewFlowsAndGroups()
 	fa = &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(2),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.InPort(2),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 		},
 		Actions: []*ofp.OfpAction{
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 103)),
-			Output(1),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 103)),
+			fu.Output(1),
 		},
 	}
-	fg.AddFlow(MkFlowStat(fa))
+	fg.AddFlow(fu.MkFlowStat(fa))
 	tfd.defaultRules.AddFlowsAndGroup("onu3", fg)
 
 	fg = fu.NewFlowsAndGroups()
 	fa = &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(2),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.InPort(2),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 		},
 		Actions: []*ofp.OfpAction{
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 104)),
-			Output(1),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 104)),
+			fu.Output(1),
 		},
 	}
-	fg.AddFlow(MkFlowStat(fa))
+	fg.AddFlow(fu.MkFlowStat(fa))
 	tfd.defaultRules.AddFlowsAndGroup("onu4", fg)
 
 	//Set up the device graph - flow decomposer uses it only to verify whether a port is a root port.
@@ -468,17 +468,17 @@
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(1),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
-			EthType(0x888e),
+			fu.InPort(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.EthType(0x888e),
 		},
 		Actions: []*ofp.OfpAction{
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
-			Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
 
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{MkFlowStat(fa)}}
+	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fu.MkFlowStat(fa)}}
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
@@ -492,51 +492,51 @@
 
 	fa = &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(2),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.InPort(2),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 		},
 		Actions: []*ofp.OfpAction{
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
-			Output(1),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+			fu.Output(1),
 		},
 	}
-	expectedOnu1Flow := MkFlowStat(fa)
+	expectedOnu1Flow := fu.MkFlowStat(fa)
 	derivedFlow := onu1FlowAndGroup.GetFlow(0)
 	assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
 
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(1),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1),
-			TunnelId(uint64(1)),
-			EthType(0x888e),
+			fu.InPort(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1),
+			fu.TunnelId(uint64(1)),
+			fu.EthType(0x888e),
 		},
 		Actions: []*ofp.OfpAction{
-			PushVlan(0x8100),
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
-			Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.PushVlan(0x8100),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
+			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
-	expectedOltFlow := MkFlowStat(fa)
+	expectedOltFlow := fu.MkFlowStat(fa)
 	derivedFlow = oltFlowAndGroup.GetFlow(0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(2),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
-			VlanPcp(0),
-			Metadata_ofp(1),
-			TunnelId(uint64(1)),
+			fu.InPort(2),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
+			fu.VlanPcp(0),
+			fu.Metadata_ofp(1),
+			fu.TunnelId(uint64(1)),
 		},
 		Actions: []*ofp.OfpAction{
-			PopVlan(),
-			Output(1),
+			fu.PopVlan(),
+			fu.Output(1),
 		},
 	}
-	expectedOltFlow = MkFlowStat(fa)
+	expectedOltFlow = fu.MkFlowStat(fa)
 	derivedFlow = oltFlowAndGroup.GetFlow(1)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 }
@@ -547,20 +547,20 @@
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(1),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
-			EthType(0x0800),
-			Ipv4Dst(0xffffffff),
-			IpProto(17),
-			UdpSrc(68),
-			UdpDst(67),
+			fu.InPort(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.EthType(0x0800),
+			fu.Ipv4Dst(0xffffffff),
+			fu.IpProto(17),
+			fu.UdpSrc(68),
+			fu.UdpDst(67),
 		},
 		Actions: []*ofp.OfpAction{
-			Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
 
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{MkFlowStat(fa)}}
+	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fu.MkFlowStat(fa)}}
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
@@ -574,55 +574,55 @@
 
 	fa = &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(2),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.InPort(2),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 		},
 		Actions: []*ofp.OfpAction{
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
-			Output(1),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+			fu.Output(1),
 		},
 	}
-	expectedOnu1Flow := MkFlowStat(fa)
+	expectedOnu1Flow := fu.MkFlowStat(fa)
 	derivedFlow := onu1FlowAndGroup.GetFlow(0)
 	assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
 
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(1),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1),
-			TunnelId(uint64(1)),
-			EthType(0x0800),
-			Ipv4Dst(0xffffffff),
-			IpProto(17),
-			UdpSrc(68),
-			UdpDst(67),
+			fu.InPort(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1),
+			fu.TunnelId(uint64(1)),
+			fu.EthType(0x0800),
+			fu.Ipv4Dst(0xffffffff),
+			fu.IpProto(17),
+			fu.UdpSrc(68),
+			fu.UdpDst(67),
 		},
 		Actions: []*ofp.OfpAction{
-			PushVlan(0x8100),
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
-			Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+			fu.PushVlan(0x8100),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
+			fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
 		},
 	}
-	expectedOltFlow := MkFlowStat(fa)
+	expectedOltFlow := fu.MkFlowStat(fa)
 	derivedFlow = oltFlowAndGroup.GetFlow(0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(2),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
-			VlanPcp(0),
-			Metadata_ofp(1),
-			TunnelId(uint64(1)),
+			fu.InPort(2),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
+			fu.VlanPcp(0),
+			fu.Metadata_ofp(1),
+			fu.TunnelId(uint64(1)),
 		},
 		Actions: []*ofp.OfpAction{
-			PopVlan(),
-			Output(1),
+			fu.PopVlan(),
+			fu.Output(1),
 		},
 	}
-	expectedOltFlow = MkFlowStat(fa)
+	expectedOltFlow = fu.MkFlowStat(fa)
 	derivedFlow = oltFlowAndGroup.GetFlow(1)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 }
@@ -633,12 +633,12 @@
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(1),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
-			VlanPcp(0),
+			fu.InPort(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.VlanPcp(0),
 		},
 		Actions: []*ofp.OfpAction{
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
 		},
 	}
 
@@ -646,19 +646,19 @@
 	fa2 = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(1),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
-			VlanPcp(0),
+			fu.InPort(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
+			fu.VlanPcp(0),
 		},
 		Actions: []*ofp.OfpAction{
-			PushVlan(0x8100),
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1000)),
-			SetField(VlanPcp(0)),
-			Output(10),
+			fu.PushVlan(0x8100),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1000)),
+			fu.SetField(fu.VlanPcp(0)),
+			fu.Output(10),
 		},
 	}
 
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{MkFlowStat(fa), MkFlowStat(fa2)}}
+	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fu.MkFlowStat(fa), fu.MkFlowStat(fa2)}}
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
@@ -673,36 +673,36 @@
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(2),
-			TunnelId(uint64(1)),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
-			VlanPcp(0),
+			fu.InPort(2),
+			fu.TunnelId(uint64(1)),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.VlanPcp(0),
 		},
 		Actions: []*ofp.OfpAction{
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
-			Output(1),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+			fu.Output(1),
 		},
 	}
-	expectedOnu1Flow := MkFlowStat(fa)
+	expectedOnu1Flow := fu.MkFlowStat(fa)
 	derivedFlow := onu1FlowAndGroup.GetFlow(1)
 	assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
 
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(1),
-			TunnelId(uint64(1)),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
-			VlanPcp(0),
+			fu.InPort(1),
+			fu.TunnelId(uint64(1)),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
+			fu.VlanPcp(0),
 		},
 		Actions: []*ofp.OfpAction{
-			PushVlan(0x8100),
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1000)),
-			SetField(VlanPcp(0)),
-			Output(2),
+			fu.PushVlan(0x8100),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1000)),
+			fu.SetField(fu.VlanPcp(0)),
+			fu.Output(2),
 		},
 	}
-	expectedOltFlow := MkFlowStat(fa)
+	expectedOltFlow := fu.MkFlowStat(fa)
 	derivedFlow = oltFlowAndGroup.GetFlow(0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 }
@@ -712,12 +712,12 @@
 	fa1 = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500, "table_id": 1},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(10),
-			Metadata_ofp((1000 << 32) | 1),
-			VlanPcp(0),
+			fu.InPort(10),
+			fu.Metadata_ofp((1000 << 32) | 1),
+			fu.VlanPcp(0),
 		},
 		Actions: []*ofp.OfpAction{
-			PopVlan(),
+			fu.PopVlan(),
 		},
 	}
 
@@ -725,17 +725,17 @@
 	fa2 = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(10),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
-			VlanPcp(0),
+			fu.InPort(10),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
+			fu.VlanPcp(0),
 		},
 		Actions: []*ofp.OfpAction{
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
-			Output(1),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+			fu.Output(1),
 		},
 	}
 
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{MkFlowStat(fa1), MkFlowStat(fa2)}}
+	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fu.MkFlowStat(fa1), fu.MkFlowStat(fa2)}}
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
@@ -750,33 +750,33 @@
 	fa1 = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(2),
-			Metadata_ofp(1000),
-			TunnelId(uint64(1)),
-			VlanPcp(0),
+			fu.InPort(2),
+			fu.Metadata_ofp(1000),
+			fu.TunnelId(uint64(1)),
+			fu.VlanPcp(0),
 		},
 		Actions: []*ofp.OfpAction{
-			PopVlan(),
-			Output(1),
+			fu.PopVlan(),
+			fu.Output(1),
 		},
 	}
-	expectedOltFlow := MkFlowStat(fa1)
+	expectedOltFlow := fu.MkFlowStat(fa1)
 	derivedFlow := oltFlowAndGroup.GetFlow(0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 
 	fa1 = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(1),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
-			VlanPcp(0),
+			fu.InPort(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
+			fu.VlanPcp(0),
 		},
 		Actions: []*ofp.OfpAction{
-			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
-			Output(2),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+			fu.Output(2),
 		},
 	}
-	expectedOnu1Flow := MkFlowStat(fa1)
+	expectedOnu1Flow := fu.MkFlowStat(fa1)
 	derivedFlow = onu1FlowAndGroup.GetFlow(1)
 	assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
 }
@@ -786,14 +786,14 @@
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(10),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 170),
-			VlanPcp(0),
-			EthType(0x800),
-			Ipv4Dst(0xe00a0a0a),
+			fu.InPort(10),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 170),
+			fu.VlanPcp(0),
+			fu.EthType(0x800),
+			fu.Ipv4Dst(0xe00a0a0a),
 		},
 		Actions: []*ofp.OfpAction{
-			Group(10),
+			fu.Group(10),
 		},
 	}
 
@@ -802,15 +802,15 @@
 		GroupId: 10,
 		Buckets: []*ofp.OfpBucket{
 			{Actions: []*ofp.OfpAction{
-				PopVlan(),
-				Output(1),
+				fu.PopVlan(),
+				fu.Output(1),
 			},
 			},
 		},
 	}
 
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{MkFlowStat(fa)}}
-	groups := ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{MkGroupStat(ga)}}
+	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fu.MkFlowStat(fa)}}
+	groups := ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{fu.MkGroupStat(ga)}}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
 	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
@@ -824,33 +824,33 @@
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(2),
-			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 170),
-			VlanPcp(0),
-			EthType(0x800),
-			Ipv4Dst(0xe00a0a0a),
+			fu.InPort(2),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 170),
+			fu.VlanPcp(0),
+			fu.EthType(0x800),
+			fu.Ipv4Dst(0xe00a0a0a),
 		},
 		Actions: []*ofp.OfpAction{
-			PopVlan(),
-			Output(1),
+			fu.PopVlan(),
+			fu.Output(1),
 		},
 	}
-	expectedOltFlow := MkFlowStat(fa)
+	expectedOltFlow := fu.MkFlowStat(fa)
 	derivedFlow := oltFlowAndGroup.GetFlow(0)
 	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
 
 	fa = &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 500},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			InPort(1),
-			EthType(0x800),
-			Ipv4Dst(0xe00a0a0a),
+			fu.InPort(1),
+			fu.EthType(0x800),
+			fu.Ipv4Dst(0xe00a0a0a),
 		},
 		Actions: []*ofp.OfpAction{
-			Output(2),
+			fu.Output(2),
 		},
 	}
-	expectedOnu1Flow := MkFlowStat(fa)
+	expectedOnu1Flow := fu.MkFlowStat(fa)
 	derivedFlow = onu1FlowAndGroup.GetFlow(1)
 	assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
 }
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index cf77d59..813c978 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -59,12 +59,12 @@
 		if !ok { // closed channel
 			//Set the channel at that index to nil to disable this case, hence preventing it from interfering with other cases.
 			cases[index].Chan = reflect.ValueOf(nil)
-			errors[index] = status.Errorf(codes.Internal, "channel closed")
+			errors[index] = status.Error(codes.Internal, "channel closed")
 			errorsReceived = true
 		} else if index == len(chnls) { // Timeout has occurred
 			for k := range errors {
 				if !resultsReceived[k] {
-					errors[k] = status.Errorf(codes.Aborted, "timeout")
+					errors[k] = status.Error(codes.Aborted, "timeout")
 				}
 			}
 			errorsReceived = true
diff --git a/rw_core/utils/core_utils_test.go b/rw_core/utils/core_utils_test.go
new file mode 100644
index 0000000..cb0abfe
--- /dev/null
+++ b/rw_core/utils/core_utils_test.go
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package utils
+
+import (
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"math/rand"
+	"testing"
+	"time"
+)
+
+var (
+	timeoutError     error
+	taskFailureError error
+)
+
+func init() {
+	log.AddPackage(log.JSON, log.WarnLevel, nil)
+	timeoutError = status.Errorf(codes.Aborted, "timeout")
+	taskFailureError = status.Error(codes.Internal, "test failure task")
+}
+
+func runSuccessfulTask(ch chan interface{}, durationRange int) {
+	time.Sleep(time.Duration(rand.Intn(durationRange)) * time.Millisecond)
+	ch <- nil
+}
+
+func runFailureTask(ch chan interface{}, durationRange int) {
+	time.Sleep(time.Duration(rand.Intn(durationRange)) * time.Millisecond)
+	ch <- taskFailureError
+}
+
+func runMultipleTasks(timeout, numTasks, taskDurationRange, numSuccessfulTask, numFailuretask int) []error {
+	if numTasks != numSuccessfulTask+numFailuretask {
+		return []error{status.Error(codes.FailedPrecondition, "invalid-num-tasks")}
+	}
+	numSuccessfulTaskCreated := 0
+	chnls := make([]chan interface{}, numTasks)
+	for i := 0; i < numTasks; i++ {
+		chnls[i] = make(chan interface{})
+		if numSuccessfulTaskCreated < numSuccessfulTask {
+			go runSuccessfulTask(chnls[i], taskDurationRange)
+			numSuccessfulTaskCreated += 1
+			continue
+		}
+		go runFailureTask(chnls[i], taskDurationRange)
+	}
+	return WaitForNilOrErrorResponses(int64(timeout), chnls...)
+}
+
+func getNumSuccessFailure(inputs []error) (numSuccess, numFailure, numTimeout int) {
+	numFailure = 0
+	numSuccess = 0
+	numTimeout = 0
+	for _, input := range inputs {
+		if input != nil {
+			if input.Error() == timeoutError.Error() {
+				numTimeout += 1
+			}
+			numFailure += 1
+		} else {
+			numSuccess += 1
+		}
+	}
+	return
+}
+
+func TestNoTimeouts(t *testing.T) {
+	var (
+		totalSuccess int
+		totalFailure int
+		results      []error
+		nSuccess     int
+		nFailure     int
+		nTimeouts    int
+	)
+	numIterations := 5
+	numTasks := 5
+	for i := 0; i < numIterations; i++ {
+		totalSuccess = rand.Intn(numTasks)
+		totalFailure = numTasks - totalSuccess
+		results = runMultipleTasks(110, numTasks, 100, totalSuccess, totalFailure)
+		nSuccess, nFailure, nTimeouts = getNumSuccessFailure(results)
+		assert.Equal(t, totalFailure, nFailure)
+		assert.Equal(t, totalSuccess, nSuccess)
+		assert.Equal(t, 0, nTimeouts)
+
+	}
+}
+
+func TestSomeTasksTimeouts(t *testing.T) {
+	var (
+		totalSuccess int
+		totalFailure int
+		results      []error
+		nSuccess     int
+		nFailure     int
+		nTimeouts    int
+	)
+	numIterations := 5
+	numTasks := 5
+	for i := 0; i < numIterations; i++ {
+		totalSuccess = rand.Intn(numTasks)
+		totalFailure = numTasks - totalSuccess
+		results = runMultipleTasks(50, numTasks, 100, totalSuccess, totalFailure)
+		nSuccess, nFailure, nTimeouts = getNumSuccessFailure(results)
+		assert.True(t, nFailure >= totalFailure)
+		assert.True(t, nSuccess <= totalSuccess)
+		assert.True(t, nTimeouts > 0)
+	}
+}
diff --git a/rw_core/utils/flow_utils.go b/rw_core/utils/flow_utils.go
index c2c9287..c1ca18d 100644
--- a/rw_core/utils/flow_utils.go
+++ b/rw_core/utils/flow_utils.go
@@ -17,12 +17,746 @@
 
 import (
 	"bytes"
+	"crypto/md5"
+	"fmt"
 	"github.com/cevaris/ordered_map"
 	"github.com/gogo/protobuf/proto"
+	"github.com/opencord/voltha-go/common/log"
 	ofp "github.com/opencord/voltha-protos/go/openflow_13"
+	"math/big"
 	"strings"
 )
 
+var (
+	// Instructions shortcut
+	APPLY_ACTIONS = ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS
+
+	//OFPAT_* shortcuts
+	OUTPUT       = ofp.OfpActionType_OFPAT_OUTPUT
+	COPY_TTL_OUT = ofp.OfpActionType_OFPAT_COPY_TTL_OUT
+	COPY_TTL_IN  = ofp.OfpActionType_OFPAT_COPY_TTL_IN
+	SET_MPLS_TTL = ofp.OfpActionType_OFPAT_SET_MPLS_TTL
+	DEC_MPLS_TTL = ofp.OfpActionType_OFPAT_DEC_MPLS_TTL
+	PUSH_VLAN    = ofp.OfpActionType_OFPAT_PUSH_VLAN
+	POP_VLAN     = ofp.OfpActionType_OFPAT_POP_VLAN
+	PUSH_MPLS    = ofp.OfpActionType_OFPAT_PUSH_MPLS
+	POP_MPLS     = ofp.OfpActionType_OFPAT_POP_MPLS
+	SET_QUEUE    = ofp.OfpActionType_OFPAT_SET_QUEUE
+	GROUP        = ofp.OfpActionType_OFPAT_GROUP
+	SET_NW_TTL   = ofp.OfpActionType_OFPAT_SET_NW_TTL
+	NW_TTL       = ofp.OfpActionType_OFPAT_DEC_NW_TTL
+	SET_FIELD    = ofp.OfpActionType_OFPAT_SET_FIELD
+	PUSH_PBB     = ofp.OfpActionType_OFPAT_PUSH_PBB
+	POP_PBB      = ofp.OfpActionType_OFPAT_POP_PBB
+	EXPERIMENTER = ofp.OfpActionType_OFPAT_EXPERIMENTER
+
+	//OFPXMT_OFB_* shortcuts (incomplete)
+	IN_PORT         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT
+	IN_PHY_PORT     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT
+	METADATA        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_METADATA
+	ETH_DST         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ETH_DST
+	ETH_SRC         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ETH_SRC
+	ETH_TYPE        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE
+	VLAN_VID        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID
+	VLAN_PCP        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_PCP
+	IP_DSCP         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IP_DSCP
+	IP_ECN          = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IP_ECN
+	IP_PROTO        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO
+	IPV4_SRC        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV4_SRC
+	IPV4_DST        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV4_DST
+	TCP_SRC         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_TCP_SRC
+	TCP_DST         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_TCP_DST
+	UDP_SRC         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC
+	UDP_DST         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST
+	SCTP_SRC        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_SCTP_SRC
+	SCTP_DST        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_SCTP_DST
+	ICMPV4_TYPE     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ICMPV4_TYPE
+	ICMPV4_CODE     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ICMPV4_CODE
+	ARP_OP          = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_OP
+	ARP_SPA         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_SPA
+	ARP_TPA         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_TPA
+	ARP_SHA         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_SHA
+	ARP_THA         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_THA
+	IPV6_SRC        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_SRC
+	IPV6_DST        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_DST
+	IPV6_FLABEL     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_FLABEL
+	ICMPV6_TYPE     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ICMPV6_TYPE
+	ICMPV6_CODE     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ICMPV6_CODE
+	IPV6_ND_TARGET  = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_ND_TARGET
+	OFB_IPV6_ND_SLL = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_ND_SLL
+	IPV6_ND_TLL     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_ND_TLL
+	MPLS_LABEL      = ofp.OxmOfbFieldTypes_OFPXMT_OFB_MPLS_LABEL
+	MPLS_TC         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_MPLS_TC
+	MPLS_BOS        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_MPLS_BOS
+	PBB_ISID        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_PBB_ISID
+	TUNNEL_ID       = ofp.OxmOfbFieldTypes_OFPXMT_OFB_TUNNEL_ID
+	IPV6_EXTHDR     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_EXTHDR
+)
+
+//ofp_action_* shortcuts
+
+func Output(port uint32, maxLen ...ofp.OfpControllerMaxLen) *ofp.OfpAction {
+	maxLength := ofp.OfpControllerMaxLen_OFPCML_MAX
+	if len(maxLen) > 0 {
+		maxLength = maxLen[0]
+	}
+	return &ofp.OfpAction{Type: OUTPUT, Action: &ofp.OfpAction_Output{Output: &ofp.OfpActionOutput{Port: port, MaxLen: uint32(maxLength)}}}
+}
+
+func MplsTtl(ttl uint32) *ofp.OfpAction {
+	return &ofp.OfpAction{Type: SET_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: ttl}}}
+}
+
+func PushVlan(ethType uint32) *ofp.OfpAction {
+	return &ofp.OfpAction{Type: PUSH_VLAN, Action: &ofp.OfpAction_Push{Push: &ofp.OfpActionPush{Ethertype: ethType}}}
+}
+
+func PopVlan() *ofp.OfpAction {
+	return &ofp.OfpAction{Type: POP_VLAN}
+}
+
+func PopMpls(ethType uint32) *ofp.OfpAction {
+	return &ofp.OfpAction{Type: POP_MPLS, Action: &ofp.OfpAction_PopMpls{PopMpls: &ofp.OfpActionPopMpls{Ethertype: ethType}}}
+}
+
+func Group(groupId uint32) *ofp.OfpAction {
+	return &ofp.OfpAction{Type: GROUP, Action: &ofp.OfpAction_Group{Group: &ofp.OfpActionGroup{GroupId: groupId}}}
+}
+
+func NwTtl(nwTtl uint32) *ofp.OfpAction {
+	return &ofp.OfpAction{Type: NW_TTL, Action: &ofp.OfpAction_NwTtl{NwTtl: &ofp.OfpActionNwTtl{NwTtl: nwTtl}}}
+}
+
+func SetField(field *ofp.OfpOxmOfbField) *ofp.OfpAction {
+	actionSetField := &ofp.OfpOxmField{OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC, Field: &ofp.OfpOxmField_OfbField{OfbField: field}}
+	return &ofp.OfpAction{Type: SET_FIELD, Action: &ofp.OfpAction_SetField{SetField: &ofp.OfpActionSetField{Field: actionSetField}}}
+}
+
+func Experimenter(experimenter uint32, data []byte) *ofp.OfpAction {
+	return &ofp.OfpAction{Type: EXPERIMENTER, Action: &ofp.OfpAction_Experimenter{Experimenter: &ofp.OfpActionExperimenter{Experimenter: experimenter, Data: data}}}
+}
+
+//ofb_field generators (incomplete set)
+
+func InPort(inPort uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IN_PORT, Value: &ofp.OfpOxmOfbField_Port{Port: inPort}}
+}
+
+func InPhyPort(inPhyPort uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IN_PHY_PORT, Value: &ofp.OfpOxmOfbField_Port{Port: inPhyPort}}
+}
+
+func Metadata_ofp(tableMetadata uint64) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: METADATA, Value: &ofp.OfpOxmOfbField_TableMetadata{TableMetadata: tableMetadata}}
+}
+
+// should Metadata_ofp used here ?????
+func EthDst(ethDst uint64) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ETH_DST, Value: &ofp.OfpOxmOfbField_TableMetadata{TableMetadata: ethDst}}
+}
+
+// should Metadata_ofp used here ?????
+func EthSrc(ethSrc uint64) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ETH_SRC, Value: &ofp.OfpOxmOfbField_TableMetadata{TableMetadata: ethSrc}}
+}
+
+func EthType(ethType uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ETH_TYPE, Value: &ofp.OfpOxmOfbField_EthType{EthType: ethType}}
+}
+
+func VlanVid(vlanVid uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: VLAN_VID, Value: &ofp.OfpOxmOfbField_VlanVid{VlanVid: vlanVid}}
+}
+
+func VlanPcp(vlanPcp uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: VLAN_PCP, Value: &ofp.OfpOxmOfbField_VlanPcp{VlanPcp: vlanPcp}}
+}
+
+func IpDscp(ipDscp uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IP_DSCP, Value: &ofp.OfpOxmOfbField_IpDscp{IpDscp: ipDscp}}
+}
+
+func IpEcn(ipEcn uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IP_ECN, Value: &ofp.OfpOxmOfbField_IpEcn{IpEcn: ipEcn}}
+}
+
+func IpProto(ipProto uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IP_PROTO, Value: &ofp.OfpOxmOfbField_IpProto{IpProto: ipProto}}
+}
+
+func Ipv4Src(ipv4Src uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV4_SRC, Value: &ofp.OfpOxmOfbField_Ipv4Src{Ipv4Src: ipv4Src}}
+}
+
+func Ipv4Dst(ipv4Dst uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV4_DST, Value: &ofp.OfpOxmOfbField_Ipv4Dst{Ipv4Dst: ipv4Dst}}
+}
+
+func TcpSrc(tcpSrc uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: TCP_SRC, Value: &ofp.OfpOxmOfbField_TcpSrc{TcpSrc: tcpSrc}}
+}
+
+func TcpDst(tcpDst uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: TCP_DST, Value: &ofp.OfpOxmOfbField_TcpDst{TcpDst: tcpDst}}
+}
+
+func UdpSrc(udpSrc uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: UDP_SRC, Value: &ofp.OfpOxmOfbField_UdpSrc{UdpSrc: udpSrc}}
+}
+
+func UdpDst(udpDst uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: UDP_DST, Value: &ofp.OfpOxmOfbField_UdpDst{UdpDst: udpDst}}
+}
+
+func SctpSrc(sctpSrc uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: SCTP_SRC, Value: &ofp.OfpOxmOfbField_SctpSrc{SctpSrc: sctpSrc}}
+}
+
+func SctpDst(sctpDst uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: SCTP_DST, Value: &ofp.OfpOxmOfbField_SctpDst{SctpDst: sctpDst}}
+}
+
+func Icmpv4Type(icmpv4Type uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ICMPV4_TYPE, Value: &ofp.OfpOxmOfbField_Icmpv4Type{Icmpv4Type: icmpv4Type}}
+}
+
+func Icmpv4Code(icmpv4Code uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ICMPV4_CODE, Value: &ofp.OfpOxmOfbField_Icmpv4Code{Icmpv4Code: icmpv4Code}}
+}
+
+func ArpOp(arpOp uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ARP_OP, Value: &ofp.OfpOxmOfbField_ArpOp{ArpOp: arpOp}}
+}
+
+func ArpSpa(arpSpa uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ARP_SPA, Value: &ofp.OfpOxmOfbField_ArpSpa{ArpSpa: arpSpa}}
+}
+
+func ArpTpa(arpTpa uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ARP_TPA, Value: &ofp.OfpOxmOfbField_ArpTpa{ArpTpa: arpTpa}}
+}
+
+func ArpSha(arpSha []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ARP_SHA, Value: &ofp.OfpOxmOfbField_ArpSha{ArpSha: arpSha}}
+}
+
+func ArpTha(arpTha []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ARP_THA, Value: &ofp.OfpOxmOfbField_ArpTha{ArpTha: arpTha}}
+}
+
+func Ipv6Src(ipv6Src []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV6_SRC, Value: &ofp.OfpOxmOfbField_Ipv6Src{Ipv6Src: ipv6Src}}
+}
+
+func Ipv6Dst(ipv6Dst []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV6_DST, Value: &ofp.OfpOxmOfbField_Ipv6Dst{Ipv6Dst: ipv6Dst}}
+}
+
+func Ipv6Flabel(ipv6Flabel uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV6_FLABEL, Value: &ofp.OfpOxmOfbField_Ipv6Flabel{Ipv6Flabel: ipv6Flabel}}
+}
+
+func Icmpv6Type(icmpv6Type uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ICMPV6_TYPE, Value: &ofp.OfpOxmOfbField_Icmpv6Type{Icmpv6Type: icmpv6Type}}
+}
+
+func Icmpv6Code(icmpv6Code uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ICMPV6_CODE, Value: &ofp.OfpOxmOfbField_Icmpv6Code{Icmpv6Code: icmpv6Code}}
+}
+
+func Ipv6NdTarget(ipv6NdTarget []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV6_ND_TARGET, Value: &ofp.OfpOxmOfbField_Ipv6NdTarget{Ipv6NdTarget: ipv6NdTarget}}
+}
+
+func OfbIpv6NdSll(ofbIpv6NdSll []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: OFB_IPV6_ND_SLL, Value: &ofp.OfpOxmOfbField_Ipv6NdSsl{Ipv6NdSsl: ofbIpv6NdSll}}
+}
+
+func Ipv6NdTll(ipv6NdTll []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV6_ND_TLL, Value: &ofp.OfpOxmOfbField_Ipv6NdTll{Ipv6NdTll: ipv6NdTll}}
+}
+
+func MplsLabel(mplsLabel uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: MPLS_LABEL, Value: &ofp.OfpOxmOfbField_MplsLabel{MplsLabel: mplsLabel}}
+}
+
+func MplsTc(mplsTc uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: MPLS_TC, Value: &ofp.OfpOxmOfbField_MplsTc{MplsTc: mplsTc}}
+}
+
+func MplsBos(mplsBos uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: MPLS_BOS, Value: &ofp.OfpOxmOfbField_MplsBos{MplsBos: mplsBos}}
+}
+
+func PbbIsid(pbbIsid uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: PBB_ISID, Value: &ofp.OfpOxmOfbField_PbbIsid{PbbIsid: pbbIsid}}
+}
+
+func TunnelId(tunnelId uint64) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: TUNNEL_ID, Value: &ofp.OfpOxmOfbField_TunnelId{TunnelId: tunnelId}}
+}
+
+func Ipv6Exthdr(ipv6Exthdr uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV6_EXTHDR, Value: &ofp.OfpOxmOfbField_Ipv6Exthdr{Ipv6Exthdr: ipv6Exthdr}}
+}
+
+//frequently used extractors
+
+func excludeAction(action *ofp.OfpAction, exclude ...ofp.OfpActionType) bool {
+	for _, actionToExclude := range exclude {
+		if action.Type == actionToExclude {
+			return true
+		}
+	}
+	return false
+}
+
+func GetActions(flow *ofp.OfpFlowStats, exclude ...ofp.OfpActionType) []*ofp.OfpAction {
+	if flow == nil {
+		return nil
+	}
+	for _, instruction := range flow.Instructions {
+		if instruction.Type == uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS) {
+			instActions := instruction.GetActions()
+			if instActions == nil {
+				return nil
+			}
+			if len(exclude) == 0 {
+				return instActions.Actions
+			} else {
+				filteredAction := make([]*ofp.OfpAction, 0)
+				for _, action := range instActions.Actions {
+					if !excludeAction(action, exclude...) {
+						filteredAction = append(filteredAction, action)
+					}
+				}
+				return filteredAction
+			}
+		}
+	}
+	return nil
+}
+
+func UpdateOutputPortByActionType(flow *ofp.OfpFlowStats, actionType uint32, toPort uint32) *ofp.OfpFlowStats {
+	if flow == nil {
+		return nil
+	}
+	nFlow := (proto.Clone(flow)).(*ofp.OfpFlowStats)
+	nFlow.Instructions = nil
+	nInsts := make([]*ofp.OfpInstruction, 0)
+	for _, instruction := range flow.Instructions {
+		if instruction.Type == actionType {
+			instActions := instruction.GetActions()
+			if instActions == nil {
+				return nil
+			}
+			nActions := make([]*ofp.OfpAction, 0)
+			for _, action := range instActions.Actions {
+				if action.GetOutput() != nil {
+					nActions = append(nActions, Output(toPort))
+				} else {
+					nActions = append(nActions, action)
+				}
+			}
+			instructionAction := ofp.OfpInstruction_Actions{Actions: &ofp.OfpInstructionActions{Actions: nActions}}
+			nInsts = append(nInsts, &ofp.OfpInstruction{Type: uint32(APPLY_ACTIONS), Data: &instructionAction})
+		} else {
+			nInsts = append(nInsts, instruction)
+		}
+	}
+	nFlow.Instructions = nInsts
+	return nFlow
+}
+
+func excludeOxmOfbField(field *ofp.OfpOxmOfbField, exclude ...ofp.OxmOfbFieldTypes) bool {
+	for _, fieldToExclude := range exclude {
+		if field.Type == fieldToExclude {
+			return true
+		}
+	}
+	return false
+}
+
+func GetOfbFields(flow *ofp.OfpFlowStats, exclude ...ofp.OxmOfbFieldTypes) []*ofp.OfpOxmOfbField {
+	if flow == nil || flow.Match == nil || flow.Match.Type != ofp.OfpMatchType_OFPMT_OXM {
+		return nil
+	}
+	ofbFields := make([]*ofp.OfpOxmOfbField, 0)
+	for _, field := range flow.Match.OxmFields {
+		if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
+			ofbFields = append(ofbFields, field.GetOfbField())
+		}
+	}
+	if len(exclude) == 0 {
+		return ofbFields
+	} else {
+		filteredFields := make([]*ofp.OfpOxmOfbField, 0)
+		for _, ofbField := range ofbFields {
+			if !excludeOxmOfbField(ofbField, exclude...) {
+				filteredFields = append(filteredFields, ofbField)
+			}
+		}
+		return filteredFields
+	}
+}
+
+func GetPacketOutPort(packet *ofp.OfpPacketOut) uint32 {
+	if packet == nil {
+		return 0
+	}
+	for _, action := range packet.GetActions() {
+		if action.Type == OUTPUT {
+			return action.GetOutput().Port
+		}
+	}
+	return 0
+}
+
+func GetOutPort(flow *ofp.OfpFlowStats) uint32 {
+	if flow == nil {
+		return 0
+	}
+	for _, action := range GetActions(flow) {
+		if action.Type == OUTPUT {
+			out := action.GetOutput()
+			if out == nil {
+				return 0
+			}
+			return out.GetPort()
+		}
+	}
+	return 0
+}
+
+func GetInPort(flow *ofp.OfpFlowStats) uint32 {
+	if flow == nil {
+		return 0
+	}
+	for _, field := range GetOfbFields(flow) {
+		if field.Type == IN_PORT {
+			return field.GetPort()
+		}
+	}
+	return 0
+}
+
+func GetGotoTableId(flow *ofp.OfpFlowStats) uint32 {
+	if flow == nil {
+		return 0
+	}
+	for _, instruction := range flow.Instructions {
+		if instruction.Type == uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE) {
+			gotoTable := instruction.GetGotoTable()
+			if gotoTable == nil {
+				return 0
+			}
+			return gotoTable.GetTableId()
+		}
+	}
+	return 0
+}
+
+func GetTunnelId(flow *ofp.OfpFlowStats) uint64 {
+	if flow == nil {
+		return 0
+	}
+	for _, field := range GetOfbFields(flow) {
+		if field.Type == TUNNEL_ID {
+			return field.GetTunnelId()
+		}
+	}
+	return 0
+}
+
+//GetMetaData - legacy get method (only want lower 32 bits)
+func GetMetaData(flow *ofp.OfpFlowStats) uint32 {
+	if flow == nil {
+		return 0
+	}
+	for _, field := range GetOfbFields(flow) {
+		if field.Type == METADATA {
+			return uint32(field.GetTableMetadata() & 0xffffffff)
+		}
+	}
+	return 0
+}
+
+func GetMetaData64Bit(flow *ofp.OfpFlowStats) uint64 {
+	if flow == nil {
+		return 0
+	}
+	for _, field := range GetOfbFields(flow) {
+		if field.Type == METADATA {
+			return field.GetTableMetadata()
+		}
+	}
+	return 0
+}
+
+// GetPortNumberFromMetadata retrieves the port number from the Metadata_ofp. The port number (UNI on ONU) is in the
+// lower 32-bits of Metadata_ofp and the inner_tag is in the upper 32-bits. This is set in the ONOS OltPipeline as
+// a Metadata_ofp field
+func GetPortNumberFromMetadata(flow *ofp.OfpFlowStats) uint64 {
+	md := GetMetaData64Bit(flow)
+	if md == 0 {
+		return 0
+	}
+	if md <= 0xffffffff {
+		log.Debugw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
+		return md
+	}
+	return md & 0xffffffff
+}
+
+//GetInnerTagFromMetaData retrieves the inner tag from the Metadata_ofp. The port number (UNI on ONU) is in the
+// lower 32-bits of Metadata_ofp and the inner_tag is in the upper 32-bits. This is set in the ONOS OltPipeline as
+//// a Metadata_ofp field
+func GetInnerTagFromMetaData(flow *ofp.OfpFlowStats) uint64 {
+	md := GetMetaData64Bit(flow)
+	if md == 0 {
+		return 0
+	}
+	if md <= 0xffffffff {
+		log.Debugw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
+		return md
+	}
+	return (md >> 32) & 0xffffffff
+}
+
+// Extract the child device port from a flow that contains the parent device peer port.  Typically the UNI port of an
+// ONU child device.  Per TST agreement this will be the lower 32 bits of tunnel id reserving upper 32 bits for later
+// use
+func GetChildPortFromTunnelId(flow *ofp.OfpFlowStats) uint32 {
+	tid := GetTunnelId(flow)
+	if tid == 0 {
+		return 0
+	}
+	// Per TST agreement we are keeping any child port id (uni port id) in the lower 32 bits
+	return uint32(tid & 0xffffffff)
+}
+
+func HasNextTable(flow *ofp.OfpFlowStats) bool {
+	if flow == nil {
+		return false
+	}
+	return GetGotoTableId(flow) != 0
+}
+
+func GetGroup(flow *ofp.OfpFlowStats) uint32 {
+	if flow == nil {
+		return 0
+	}
+	for _, action := range GetActions(flow) {
+		if action.Type == GROUP {
+			grp := action.GetGroup()
+			if grp == nil {
+				return 0
+			}
+			return grp.GetGroupId()
+		}
+	}
+	return 0
+}
+
+func HasGroup(flow *ofp.OfpFlowStats) bool {
+	return GetGroup(flow) != 0
+}
+
+// GetNextTableId returns the next table ID if the "table_id" is present in the map, otherwise return nil
+func GetNextTableId(kw OfpFlowModArgs) *uint32 {
+	if val, exist := kw["table_id"]; exist {
+		ret := uint32(val)
+		return &ret
+	}
+	return nil
+}
+
+// Return unique 64-bit integer hash for flow covering the following attributes:
+// 'table_id', 'priority', 'flags', 'cookie', 'match', '_instruction_string'
+func HashFlowStats(flow *ofp.OfpFlowStats) uint64 {
+	if flow == nil { // Should never happen
+		return 0
+	}
+	// Create string with the instructions field first
+	var instructionString bytes.Buffer
+	for _, instruction := range flow.Instructions {
+		instructionString.WriteString(instruction.String())
+	}
+	var flowString = fmt.Sprintf("%d%d%d%d%s%s", flow.TableId, flow.Priority, flow.Flags, flow.Cookie, flow.Match.String(), instructionString.String())
+	h := md5.New()
+	h.Write([]byte(flowString))
+	hash := big.NewInt(0)
+	hash.SetBytes(h.Sum(nil))
+	return hash.Uint64()
+}
+
+// flowStatsEntryFromFlowModMessage maps an ofp_flow_mod message to an ofp_flow_stats message
+func FlowStatsEntryFromFlowModMessage(mod *ofp.OfpFlowMod) *ofp.OfpFlowStats {
+	flow := &ofp.OfpFlowStats{}
+	if mod == nil {
+		return flow
+	}
+	flow.TableId = mod.TableId
+	flow.Priority = mod.Priority
+	flow.IdleTimeout = mod.IdleTimeout
+	flow.HardTimeout = mod.HardTimeout
+	flow.Flags = mod.Flags
+	flow.Cookie = mod.Cookie
+	flow.Match = mod.Match
+	flow.Instructions = mod.Instructions
+	flow.Id = HashFlowStats(flow)
+	return flow
+}
+
+func GroupEntryFromGroupMod(mod *ofp.OfpGroupMod) *ofp.OfpGroupEntry {
+	group := &ofp.OfpGroupEntry{}
+	if mod == nil {
+		return group
+	}
+	group.Desc = &ofp.OfpGroupDesc{Type: mod.Type, GroupId: mod.GroupId, Buckets: mod.Buckets}
+	group.Stats = &ofp.OfpGroupStats{GroupId: mod.GroupId}
+	//TODO do we need to instantiate bucket bins?
+	return group
+}
+
+func MkOxmFields(matchFields []ofp.OfpOxmField) []*ofp.OfpOxmField {
+	oxmFields := make([]*ofp.OfpOxmField, 0)
+	for _, matchField := range matchFields {
+		oxmField := ofp.OfpOxmField{OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC, Field: matchField.Field}
+		oxmFields = append(oxmFields, &oxmField)
+	}
+	return oxmFields
+}
+
+func MkInstructionsFromActions(actions []*ofp.OfpAction) []*ofp.OfpInstruction {
+	instructions := make([]*ofp.OfpInstruction, 0)
+	instructionAction := ofp.OfpInstruction_Actions{Actions: &ofp.OfpInstructionActions{Actions: actions}}
+	instruction := ofp.OfpInstruction{Type: uint32(APPLY_ACTIONS), Data: &instructionAction}
+	instructions = append(instructions, &instruction)
+	return instructions
+}
+
+// Convenience function to generare ofp_flow_mod message with OXM BASIC match composed from the match_fields, and
+// single APPLY_ACTIONS instruction with a list if ofp_action objects.
+func MkSimpleFlowMod(matchFields []*ofp.OfpOxmField, actions []*ofp.OfpAction, command *ofp.OfpFlowModCommand, kw OfpFlowModArgs) *ofp.OfpFlowMod {
+
+	// Process actions instructions
+	instructions := make([]*ofp.OfpInstruction, 0)
+	instructionAction := ofp.OfpInstruction_Actions{Actions: &ofp.OfpInstructionActions{Actions: actions}}
+	instruction := ofp.OfpInstruction{Type: uint32(APPLY_ACTIONS), Data: &instructionAction}
+	instructions = append(instructions, &instruction)
+
+	// Process next table
+	if tableId := GetNextTableId(kw); tableId != nil {
+		var instGotoTable ofp.OfpInstruction_GotoTable
+		instGotoTable.GotoTable = &ofp.OfpInstructionGotoTable{TableId: *tableId}
+		inst := ofp.OfpInstruction{Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE), Data: &instGotoTable}
+		instructions = append(instructions, &inst)
+	}
+
+	// Process match fields
+	oxmFields := make([]*ofp.OfpOxmField, 0)
+	for _, matchField := range matchFields {
+		oxmField := ofp.OfpOxmField{OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC, Field: matchField.Field}
+		oxmFields = append(oxmFields, &oxmField)
+	}
+	var match ofp.OfpMatch
+	match.Type = ofp.OfpMatchType_OFPMT_OXM
+	match.OxmFields = oxmFields
+
+	// Create ofp_flow_message
+	msg := &ofp.OfpFlowMod{}
+	if command == nil {
+		msg.Command = ofp.OfpFlowModCommand_OFPFC_ADD
+	} else {
+		msg.Command = *command
+	}
+	msg.Instructions = instructions
+	msg.Match = &match
+
+	// Set the variadic argument values
+	msg = setVariadicModAttributes(msg, kw)
+
+	return msg
+}
+
+func MkMulticastGroupMod(groupId uint32, buckets []*ofp.OfpBucket, command *ofp.OfpGroupModCommand) *ofp.OfpGroupMod {
+	group := &ofp.OfpGroupMod{}
+	if command == nil {
+		group.Command = ofp.OfpGroupModCommand_OFPGC_ADD
+	} else {
+		group.Command = *command
+	}
+	group.Type = ofp.OfpGroupType_OFPGT_ALL
+	group.GroupId = groupId
+	group.Buckets = buckets
+	return group
+}
+
+//SetVariadicModAttributes sets only uint64 or uint32 fields of the ofp_flow_mod message
+func setVariadicModAttributes(mod *ofp.OfpFlowMod, args OfpFlowModArgs) *ofp.OfpFlowMod {
+	if args == nil {
+		return mod
+	}
+	for key, val := range args {
+		switch key {
+		case "cookie":
+			mod.Cookie = val
+		case "cookie_mask":
+			mod.CookieMask = val
+		case "table_id":
+			mod.TableId = uint32(val)
+		case "idle_timeout":
+			mod.IdleTimeout = uint32(val)
+		case "hard_timeout":
+			mod.HardTimeout = uint32(val)
+		case "priority":
+			mod.Priority = uint32(val)
+		case "buffer_id":
+			mod.BufferId = uint32(val)
+		case "out_port":
+			mod.OutPort = uint32(val)
+		case "out_group":
+			mod.OutGroup = uint32(val)
+		case "flags":
+			mod.Flags = uint32(val)
+		}
+	}
+	return mod
+}
+
+func MkPacketIn(port uint32, packet []byte) *ofp.OfpPacketIn {
+	packetIn := &ofp.OfpPacketIn{
+		Reason: ofp.OfpPacketInReason_OFPR_ACTION,
+		Match: &ofp.OfpMatch{
+			Type: ofp.OfpMatchType_OFPMT_OXM,
+			OxmFields: []*ofp.OfpOxmField{
+				{
+					OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC,
+					Field: &ofp.OfpOxmField_OfbField{
+						OfbField: InPort(port)},
+				},
+			},
+		},
+		Data: packet,
+	}
+	return packetIn
+}
+
+// MkFlowStat is a helper method to build flows
+func MkFlowStat(fa *FlowArgs) *ofp.OfpFlowStats {
+	//Build the matchfields
+	matchFields := make([]*ofp.OfpOxmField, 0)
+	for _, val := range fa.MatchFields {
+		matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+	}
+	return FlowStatsEntryFromFlowModMessage(MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV))
+}
+
+func MkGroupStat(ga *GroupArgs) *ofp.OfpGroupEntry {
+	return GroupEntryFromGroupMod(MkMulticastGroupMod(ga.GroupId, ga.Buckets, ga.Command))
+}
+
 type OfpFlowModArgs map[string]uint64
 
 type FlowArgs struct {
@@ -127,6 +861,10 @@
 }
 
 func (fg *FlowsAndGroups) AddFlow(flow *ofp.OfpFlowStats) {
+	if flow == nil {
+		return
+	}
+
 	if fg.Flows == nil {
 		fg.Flows = ordered_map.NewOrderedMap()
 	}
@@ -139,6 +877,23 @@
 	}
 }
 
+func (fg *FlowsAndGroups) AddGroup(group *ofp.OfpGroupEntry) {
+	if group == nil {
+		return
+	}
+
+	if fg.Flows == nil {
+		fg.Flows = ordered_map.NewOrderedMap()
+	}
+	if fg.Groups == nil {
+		fg.Groups = ordered_map.NewOrderedMap()
+	}
+	//Add group only if absent
+	if _, exist := fg.Groups.Get(group.Desc.GroupId); !exist {
+		fg.Groups.Set(group.Desc.GroupId, group)
+	}
+}
+
 //AddFrom add flows and groups from the argument into this structure only if they do not already exist
 func (fg *FlowsAndGroups) AddFrom(from *FlowsAndGroups) {
 	iter := from.Flows.IterFunc()
@@ -266,6 +1021,9 @@
 //FlowMatch returns true if two flows matches on the following flow attributes:
 //TableId, Priority, Flags, Cookie, Match
 func FlowMatch(f1 *ofp.OfpFlowStats, f2 *ofp.OfpFlowStats) bool {
+	if f1 == nil || f2 == nil {
+		return false
+	}
 	keysMatter := []string{"TableId", "Priority", "Flags", "Cookie", "Match"}
 	for _, key := range keysMatter {
 		switch key {
@@ -297,6 +1055,9 @@
 //FlowMatchesMod returns True if given flow is "covered" by the wildcard flow_mod, taking into consideration of
 //both exact matches as well as masks-based match fields if any. Otherwise return False
 func FlowMatchesMod(flow *ofp.OfpFlowStats, mod *ofp.OfpFlowMod) bool {
+	if flow == nil || mod == nil {
+		return false
+	}
 	//Check if flow.cookie is covered by mod.cookie and mod.cookie_mask
 	if (flow.Cookie & mod.CookieMask) != (mod.Cookie & mod.CookieMask) {
 		return false
@@ -331,6 +1092,9 @@
 
 //FlowHasOutPort returns True if flow has a output command with the given out_port
 func FlowHasOutPort(flow *ofp.OfpFlowStats, outPort uint32) bool {
+	if flow == nil {
+		return false
+	}
 	for _, instruction := range flow.Instructions {
 		if instruction.Type == uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS) {
 			if instruction.GetActions() == nil {
@@ -351,6 +1115,9 @@
 
 //FlowHasOutGroup return True if flow has a output command with the given out_group
 func FlowHasOutGroup(flow *ofp.OfpFlowStats, groupID uint32) bool {
+	if flow == nil {
+		return false
+	}
 	for _, instruction := range flow.Instructions {
 		if instruction.Type == uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS) {
 			if instruction.GetActions() == nil {
diff --git a/rw_core/utils/flow_utils_test.go b/rw_core/utils/flow_utils_test.go
new file mode 100644
index 0000000..48a1b75
--- /dev/null
+++ b/rw_core/utils/flow_utils_test.go
@@ -0,0 +1,464 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package utils
+
+import (
+	"github.com/opencord/voltha-go/common/log"
+	ofp "github.com/opencord/voltha-protos/go/openflow_13"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"strings"
+	"testing"
+)
+
+func init() {
+	log.AddPackage(log.JSON, log.WarnLevel, nil)
+	timeoutError = status.Errorf(codes.Aborted, "timeout")
+	taskFailureError = status.Error(codes.Internal, "test failure task")
+}
+
+func TestFlowsAndGroups_AddFlow(t *testing.T) {
+	fg := NewFlowsAndGroups()
+	allFlows := fg.ListFlows()
+	assert.Equal(t, 0, len(allFlows))
+	fg.AddFlow(nil)
+	allFlows = fg.ListFlows()
+	assert.Equal(t, 0, len(allFlows))
+
+	var fa *FlowArgs
+	fa = &FlowArgs{
+		KV: OfpFlowModArgs{"priority": 500},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(1),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1),
+			TunnelId(uint64(1)),
+			EthType(0x0800),
+			Ipv4Dst(0xffffffff),
+			IpProto(17),
+			UdpSrc(68),
+			UdpDst(67),
+		},
+		Actions: []*ofp.OfpAction{
+			PushVlan(0x8100),
+			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
+			Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+		},
+	}
+	flow := MkFlowStat(fa)
+	fg.AddFlow(flow)
+
+	allFlows = fg.ListFlows()
+	assert.Equal(t, 1, len(allFlows))
+	assert.True(t, FlowMatch(flow, allFlows[0]))
+}
+
+func TestFlowsAndGroups_AddGroup(t *testing.T) {
+	var ga *GroupArgs
+
+	fg := NewFlowsAndGroups()
+	allGroups := fg.ListGroups()
+	assert.Equal(t, 0, len(allGroups))
+	fg.AddGroup(nil)
+	allGroups = fg.ListGroups()
+	assert.Equal(t, 0, len(allGroups))
+
+	ga = &GroupArgs{
+		GroupId: 10,
+		Buckets: []*ofp.OfpBucket{
+			{Actions: []*ofp.OfpAction{
+				PopVlan(),
+				Output(1),
+			},
+			},
+		},
+	}
+	group := MkGroupStat(ga)
+	fg.AddGroup(group)
+
+	allGroups = fg.ListGroups()
+	assert.Equal(t, 1, len(allGroups))
+	assert.Equal(t, ga.GroupId, allGroups[0].Desc.GroupId)
+}
+
+func TestFlowsAndGroups_Copy(t *testing.T) {
+	fg := NewFlowsAndGroups()
+	var fa *FlowArgs
+	var ga *GroupArgs
+
+	fa = &FlowArgs{
+		KV: OfpFlowModArgs{"priority": 500},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+		Actions: []*ofp.OfpAction{
+			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 10)),
+			Output(1),
+		},
+	}
+	flow := MkFlowStat(fa)
+	fg.AddFlow(flow)
+
+	ga = &GroupArgs{
+		GroupId: 10,
+		Buckets: []*ofp.OfpBucket{
+			{Actions: []*ofp.OfpAction{
+				PopVlan(),
+				Output(1),
+			},
+			},
+		},
+	}
+	group := MkGroupStat(ga)
+	fg.AddGroup(group)
+
+	fgCopy := fg.Copy()
+
+	allFlows := fgCopy.ListFlows()
+	assert.Equal(t, 1, len(allFlows))
+	assert.True(t, FlowMatch(flow, allFlows[0]))
+
+	allGroups := fgCopy.ListGroups()
+	assert.Equal(t, 1, len(allGroups))
+	assert.Equal(t, ga.GroupId, allGroups[0].Desc.GroupId)
+
+	fg = NewFlowsAndGroups()
+	fgCopy = fg.Copy()
+	allFlows = fgCopy.ListFlows()
+	allGroups = fgCopy.ListGroups()
+	assert.Equal(t, 0, len(allFlows))
+	assert.Equal(t, 0, len(allGroups))
+}
+
+func TestFlowsAndGroups_GetFlow(t *testing.T) {
+	fg := NewFlowsAndGroups()
+	var fa1 *FlowArgs
+	var fa2 *FlowArgs
+	var ga *GroupArgs
+
+	fa1 = &FlowArgs{
+		KV: OfpFlowModArgs{"priority": 500},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			Metadata_ofp((1000 << 32) | 1),
+			VlanPcp(0),
+		},
+		Actions: []*ofp.OfpAction{
+			PopVlan(),
+		},
+	}
+	flow1 := MkFlowStat(fa1)
+
+	fa2 = &FlowArgs{
+		KV: OfpFlowModArgs{"priority": 1500},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(5),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+		Actions: []*ofp.OfpAction{
+			PushVlan(0x8100),
+			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1000)),
+			SetField(VlanPcp(0)),
+			Output(2),
+		},
+	}
+	flow2 := MkFlowStat(fa2)
+
+	fg.AddFlow(flow1)
+	fg.AddFlow(flow2)
+
+	ga = &GroupArgs{
+		GroupId: 10,
+		Buckets: []*ofp.OfpBucket{
+			{Actions: []*ofp.OfpAction{
+				PopVlan(),
+				Output(1),
+			},
+			},
+		},
+	}
+	group := MkGroupStat(ga)
+	fg.AddGroup(group)
+
+	gf1 := fg.GetFlow(0)
+	assert.True(t, FlowMatch(flow1, gf1))
+
+	gf2 := fg.GetFlow(1)
+	assert.True(t, FlowMatch(flow2, gf2))
+
+	gf3 := fg.GetFlow(2)
+	assert.Nil(t, gf3)
+
+	allFlows := fg.ListFlows()
+	assert.True(t, FlowMatch(flow1, allFlows[0]))
+	assert.True(t, FlowMatch(flow2, allFlows[1]))
+}
+
+func TestFlowsAndGroups_String(t *testing.T) {
+	fg := NewFlowsAndGroups()
+	var fa *FlowArgs
+	var ga *GroupArgs
+
+	str := fg.String()
+	assert.True(t, str == "")
+
+	fa = &FlowArgs{
+		KV: OfpFlowModArgs{"priority": 500},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			VlanPcp(0),
+			EthType(0x800),
+			Ipv4Dst(0xe00a0a0a),
+		},
+		Actions: []*ofp.OfpAction{
+			Group(10),
+		},
+	}
+	flow := MkFlowStat(fa)
+	fg.AddFlow(flow)
+
+	ga = &GroupArgs{
+		GroupId: 10,
+		Buckets: []*ofp.OfpBucket{
+			{Actions: []*ofp.OfpAction{
+				PopVlan(),
+				Output(1),
+			},
+			},
+		},
+	}
+	group := MkGroupStat(ga)
+	fg.AddGroup(group)
+
+	str = fg.String()
+	assert.True(t, strings.Contains(str, "id: 1143307409938767207"))
+	assert.True(t, strings.Contains(str, "group_id: 10"))
+	assert.True(t, strings.Contains(str, "oxm_class: OFPXMC_OPENFLOW_BASICOFPXMC_OPENFLOW_BASIC"))
+	assert.True(t, strings.Contains(str, "type: OFPXMT_OFB_VLAN_VIDOFPXMT_OFB_VLAN_VID"))
+	assert.True(t, strings.Contains(str, "vlan_vid: 4096"))
+	assert.True(t, strings.Contains(str, "buckets:"))
+}
+
+func TestFlowsAndGroups_AddFrom(t *testing.T) {
+	fg := NewFlowsAndGroups()
+	var fa *FlowArgs
+	var ga *GroupArgs
+
+	str := fg.String()
+	assert.True(t, str == "")
+
+	fa = &FlowArgs{
+		KV: OfpFlowModArgs{"priority": 500},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			Metadata_ofp(1000),
+			TunnelId(uint64(1)),
+			VlanPcp(0),
+		},
+		Actions: []*ofp.OfpAction{
+			PopVlan(),
+			Output(1),
+		},
+	}
+	flow := MkFlowStat(fa)
+	fg.AddFlow(flow)
+
+	ga = &GroupArgs{
+		GroupId: 10,
+		Buckets: []*ofp.OfpBucket{
+			{Actions: []*ofp.OfpAction{
+				PopVlan(),
+				Output(1),
+			},
+			},
+		},
+	}
+	group := MkGroupStat(ga)
+	fg.AddGroup(group)
+
+	fg1 := NewFlowsAndGroups()
+	fg1.AddFrom(fg)
+
+	allFlows := fg1.ListFlows()
+	allGroups := fg1.ListGroups()
+	assert.Equal(t, 1, len(allFlows))
+	assert.Equal(t, 1, len(allGroups))
+	assert.True(t, FlowMatch(flow, allFlows[0]))
+	assert.Equal(t, group.Desc.GroupId, allGroups[0].Desc.GroupId)
+}
+
+func TestDeviceRules_AddFlow(t *testing.T) {
+	dr := NewDeviceRules()
+	rules := dr.GetRules()
+	assert.True(t, len(rules) == 0)
+
+	dr.AddFlow("123456", nil)
+	rules = dr.GetRules()
+	assert.True(t, len(rules) == 1)
+	val, ok := rules["123456"]
+	assert.True(t, ok)
+	assert.Equal(t, 0, len(val.ListFlows()))
+	assert.Equal(t, 0, len(val.ListGroups()))
+
+	var fa *FlowArgs
+	fa = &FlowArgs{
+		KV: OfpFlowModArgs{"priority": 500},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			Metadata_ofp(1000),
+			TunnelId(uint64(1)),
+			VlanPcp(0),
+		},
+		Actions: []*ofp.OfpAction{
+			PopVlan(),
+			Output(1),
+		},
+	}
+	flow := MkFlowStat(fa)
+	dr.AddFlow("123456", flow)
+	rules = dr.GetRules()
+	assert.True(t, len(rules) == 1)
+	val, ok = rules["123456"]
+	assert.True(t, ok)
+	assert.Equal(t, 1, len(val.ListFlows()))
+	assert.True(t, FlowMatch(flow, val.ListFlows()[0]))
+	assert.Equal(t, 0, len(val.ListGroups()))
+}
+
+func TestDeviceRules_AddFlowsAndGroup(t *testing.T) {
+	fg := NewFlowsAndGroups()
+	var fa *FlowArgs
+	var ga *GroupArgs
+
+	str := fg.String()
+	assert.True(t, str == "")
+
+	fa = &FlowArgs{
+		KV: OfpFlowModArgs{"priority": 2000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			Metadata_ofp(1000),
+			TunnelId(uint64(1)),
+			VlanPcp(0),
+		},
+		Actions: []*ofp.OfpAction{
+			PopVlan(),
+			Output(1),
+		},
+	}
+	flow := MkFlowStat(fa)
+	fg.AddFlow(flow)
+
+	ga = &GroupArgs{
+		GroupId: 10,
+		Buckets: []*ofp.OfpBucket{
+			{Actions: []*ofp.OfpAction{
+				PopVlan(),
+				Output(1),
+			},
+			},
+		},
+	}
+	group := MkGroupStat(ga)
+	fg.AddGroup(group)
+
+	dr := NewDeviceRules()
+	dr.AddFlowsAndGroup("123456", fg)
+	rules := dr.GetRules()
+	assert.True(t, len(rules) == 1)
+	val, ok := rules["123456"]
+	assert.True(t, ok)
+	assert.Equal(t, 1, len(val.ListFlows()))
+	assert.Equal(t, 1, len(val.ListGroups()))
+	assert.True(t, FlowMatch(flow, val.ListFlows()[0]))
+	assert.Equal(t, 10, int(val.ListGroups()[0].Desc.GroupId))
+}
+
+func TestFlowHasOutPort(t *testing.T) {
+	var flow *ofp.OfpFlowStats
+	assert.False(t, FlowHasOutPort(flow, 1))
+
+	fa := &FlowArgs{
+		KV: OfpFlowModArgs{"priority": 2000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			Metadata_ofp(1000),
+			TunnelId(uint64(1)),
+			VlanPcp(0),
+		},
+		Actions: []*ofp.OfpAction{
+			PopVlan(),
+			Output(1),
+		},
+	}
+	flow = MkFlowStat(fa)
+	assert.True(t, FlowHasOutPort(flow, 1))
+	assert.False(t, FlowHasOutPort(flow, 2))
+
+	fa = &FlowArgs{
+		KV: OfpFlowModArgs{"priority": 2000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+	}
+	flow = MkFlowStat(fa)
+	assert.False(t, FlowHasOutPort(flow, 1))
+}
+
+func TestFlowHasOutGroup(t *testing.T) {
+	var flow *ofp.OfpFlowStats
+	assert.False(t, FlowHasOutGroup(flow, 10))
+
+	fa := &FlowArgs{
+		KV: OfpFlowModArgs{"priority": 500},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			VlanPcp(0),
+			EthType(0x800),
+			Ipv4Dst(0xe00a0a0a),
+		},
+		Actions: []*ofp.OfpAction{
+			Group(10),
+		},
+	}
+	flow = MkFlowStat(fa)
+	assert.True(t, FlowHasOutGroup(flow, 10))
+	assert.False(t, FlowHasOutGroup(flow, 11))
+
+	fa = &FlowArgs{
+		KV: OfpFlowModArgs{"priority": 500},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			VlanPcp(0),
+			EthType(0x800),
+			Ipv4Dst(0xe00a0a0a),
+		},
+		Actions: []*ofp.OfpAction{
+			Output(1),
+		},
+	}
+	flow = MkFlowStat(fa)
+	assert.False(t, FlowHasOutGroup(flow, 1))
+}