[VOL-1035] Initial submission of flow decomposition code.
Additional test cases will follow to test the core of the flow
decomposition functionality

Change-Id: Ie685714ce5ab54ac89501a67f9489613de195c15
diff --git a/common/log/log_test.go b/common/log/log_test.go
index dbd9ffe..68d6ba3 100644
--- a/common/log/log_test.go
+++ b/common/log/log_test.go
@@ -28,8 +28,7 @@
 
 var testLogger log.Logger
 
-
-func TestInit (t *testing.T) {
+func TestInit(t *testing.T) {
 	var err error
 	testLogger, err = log.AddPackage(log.JSON, log.ErrorLevel, nil)
 	assert.NotNil(t, testLogger)
@@ -41,7 +40,7 @@
 	var success bool
 	for i := 0; i < 6; i++ {
 		success = testLogger.V(i)
-		if i == 1 && minimumLevel == 2{
+		if i == 1 && minimumLevel == 2 {
 			// TODO: Update the test when a new version of Zap logger is available.  It has a bug with that
 			// specific combination
 			continue
@@ -54,18 +53,18 @@
 	}
 }
 
-func TestLogLevelDebug (t *testing.T) {
+func TestLogLevelDebug(t *testing.T) {
 	for i := 0; i < 6; i++ {
 		verifyLogLevel(t, i)
 	}
 }
 
-func TestUpdateAllLoggers (t *testing.T) {
+func TestUpdateAllLoggers(t *testing.T) {
 	err := log.UpdateAllLoggers(log.Fields{"update": "update"})
 	assert.Nil(t, err)
 }
 
-func TestUpdateLoggers (t *testing.T) {
+func TestUpdateLoggers(t *testing.T) {
 	testLogger, err := log.UpdateLogger(log.Fields{"update": "update"})
 	assert.Nil(t, err)
 	assert.NotNil(t, testLogger)
@@ -76,4 +75,4 @@
 	thisLogger, _ := log.AddPackage(log.JSON, log.ErrorLevel, nil)
 	grpcLogger = thisLogger
 	assert.NotNil(t, grpcLogger)
-}
\ No newline at end of file
+}
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 6d78aa4..82c6dec 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -81,8 +81,6 @@
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
-
-
 func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reenable_device"
@@ -279,4 +277,4 @@
 func (ap *AdapterProxy) UnSuppressAlarm(filter voltha.AlarmFilter) error {
 	log.Debug("UnSuppressAlarm")
 	return nil
-}
\ No newline at end of file
+}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index d75a44f..8449ccb 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -56,7 +56,7 @@
 
 	// Setup the KV store
 	// Do not call NewBackend constructor; it creates its own KV client
-	backend := model.Backend {
+	backend := model.Backend{
 		Client:     kvClient,
 		StoreType:  cf.KVStoreType,
 		Host:       cf.KVStoreHost,
diff --git a/rw_core/core/grpc_nbi_api_handler_client_test.go b/rw_core/core/grpc_nbi_api_handler_client_test.go
index 21300cc..58dcf13 100644
--- a/rw_core/core/grpc_nbi_api_handler_client_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_client_test.go
@@ -42,7 +42,7 @@
 func setup() {
 	var err error
 
-	if err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
+	if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
 		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
 	}
 	conn, err = grpc.Dial("localhost:50057", grpc.WithInsecure())
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index dce2db7..519a0a1 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -21,8 +21,11 @@
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
 	ca "github.com/opencord/voltha-go/protos/core_adapter"
-	"github.com/opencord/voltha-go/protos/openflow_13"
+	ofp "github.com/opencord/voltha-go/protos/openflow_13"
 	"github.com/opencord/voltha-go/protos/voltha"
+	fd "github.com/opencord/voltha-go/rw_core/flow_decomposition"
+	"github.com/opencord/voltha-go/rw_core/graph"
+	fu "github.com/opencord/voltha-go/rw_core/utils"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 	"sync"
@@ -36,6 +39,8 @@
 	ldeviceMgr        *LogicalDeviceManager
 	clusterDataProxy  *model.Proxy
 	exitChannel       chan int
+	deviceGraph       *graph.DeviceGraph
+	DefaultFlowRules  *fu.DeviceRules
 	lockLogicalDevice sync.RWMutex
 }
 
@@ -48,6 +53,7 @@
 	agent.deviceMgr = deviceMgr
 	agent.clusterDataProxy = cdProxy
 	agent.ldeviceMgr = ldeviceMgr
+	//agent.deviceGraph =
 	agent.lockLogicalDevice = sync.RWMutex{}
 	return &agent
 }
@@ -63,8 +69,8 @@
 		return err
 	}
 	ld := &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
-	ld.Desc = (proto.Clone(switchCap.Desc)).(*openflow_13.OfpDesc)
-	ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*openflow_13.OfpSwitchFeatures)
+	ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
+	ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
 
 	//Add logical ports to the logical device based on the number of NNI ports discovered
 	//First get the default port capability - TODO:  each NNI port may have different capabilities,
@@ -199,4 +205,225 @@
 	return nil
 }
 
+func isNNIPort(portNo uint32, nniPortsNo []uint32) bool {
+	for _, pNo := range nniPortsNo {
+		if pNo == portNo {
+			return true
+		}
+	}
+	return false
+}
 
+func (agent *LogicalDeviceAgent) getPreCalculatedRoute(ingress, egress uint32) []graph.RouteHop {
+	for routeLink, route := range agent.deviceGraph.Routes {
+		if ingress == routeLink.Ingress && egress == routeLink.Egress {
+			return route
+		}
+	}
+	log.Warnw("no-route", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "ingress": ingress, "egress": egress})
+	return nil
+}
+
+func (agent *LogicalDeviceAgent) GetRoute(ingressPortNo *uint32, egressPortNo *uint32) []graph.RouteHop {
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+	log.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
+	// Get the updated logical device
+	var ld *ca.LogicalDevice
+	routes := make([]graph.RouteHop, 0)
+	var err error
+	if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+		return nil
+	}
+	nniLogicalPortsNo := make([]uint32, 0)
+	for _, logicalPort := range ld.Ports {
+		if logicalPort.RootPort {
+			nniLogicalPortsNo = append(nniLogicalPortsNo, logicalPort.OfpPort.PortNo)
+		}
+	}
+	if len(nniLogicalPortsNo) == 0 {
+		log.Errorw("no-nni-ports", log.Fields{"LogicalDeviceId": ld.Id})
+		return nil
+	}
+	//	Consider different possibilities
+	if egressPortNo != nil && ((*egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
+		log.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+		if isNNIPort(*ingressPortNo, nniLogicalPortsNo) {
+			log.Debug("returning-half-route")
+			//This is a trap on the NNI Port
+			//Return a 'half' route to make the flow decomposer logic happy
+			for routeLink, route := range agent.deviceGraph.Routes {
+				if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
+					routes = append(routes, graph.RouteHop{}) // first hop is set to empty
+					routes = append(routes, route[1])
+					return routes
+				}
+			}
+			log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+			return nil
+		}
+		//treat it as if the output port is the first NNI of the OLT
+		egressPortNo = &nniLogicalPortsNo[0]
+	}
+	//If ingress port is not specified (nil), it may be a wildcarded
+	//route if egress port is OFPP_CONTROLLER or a nni logical port,
+	//in which case we need to create a half-route where only the egress
+	//hop is filled, the first hop is nil
+	if ingressPortNo == nil && isNNIPort(*egressPortNo, nniLogicalPortsNo) {
+		// We can use the 2nd hop of any upstream route, so just find the first upstream:
+		for routeLink, route := range agent.deviceGraph.Routes {
+			if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
+				routes = append(routes, graph.RouteHop{}) // first hop is set to empty
+				routes = append(routes, route[1])
+				return routes
+			}
+		}
+		log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+		return nil
+	}
+	//If egress port is not specified (nil), we can also can return a "half" route
+	if egressPortNo == nil {
+		for routeLink, route := range agent.deviceGraph.Routes {
+			if routeLink.Ingress == *ingressPortNo {
+				routes = append(routes, route[0])
+				routes = append(routes, graph.RouteHop{})
+				return routes
+			}
+		}
+		log.Warnw("no-downstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+		return nil
+	}
+
+	//	Return the pre-calculated route
+	return agent.getPreCalculatedRoute(*ingressPortNo, *egressPortNo)
+}
+
+// updateRoutes updates the device routes whenever there is a device or port changes relevant to this
+// logical device.   TODO: Add more heuristics to this process to update the routes where a change has occurred
+// instead of rebuilding the entire set of routes
+func (agent *LogicalDeviceAgent) updateRoutes() {
+	if ld, err := agent.getLogicalDevice(); err == nil {
+		agent.deviceGraph.ComputeRoutes(ld.Ports)
+	}
+}
+
+func (agent *LogicalDeviceAgent) rootDeviceDefaultRules() *fu.FlowsAndGroups {
+	return fu.NewFlowsAndGroups()
+}
+
+func (agent *LogicalDeviceAgent) leafDeviceDefaultRules(deviceId string) *fu.FlowsAndGroups {
+	fg := fu.NewFlowsAndGroups()
+	var device *voltha.Device
+	var err error
+	if device, err = agent.deviceMgr.getDevice(deviceId); err != nil {
+		return fg
+	}
+	//set the upstream and downstream ports
+	upstreamPorts := make([]*voltha.Port, 0)
+	downstreamPorts := make([]*voltha.Port, 0)
+	for _, port := range device.Ports {
+		if port.Type == voltha.Port_PON_ONU || port.Type == voltha.Port_VENET_ONU {
+			upstreamPorts = append(upstreamPorts, port)
+		} else if port.Type == voltha.Port_ETHERNET_UNI {
+			downstreamPorts = append(downstreamPorts, port)
+		}
+	}
+	//it is possible that the downstream ports are not created, but the flow_decomposition has already
+	//kicked in. In such scenarios, cut short the processing and return.
+	if len(downstreamPorts) == 0 {
+		return fg
+	}
+	// set up the default flows
+	var fa *fu.FlowArgs
+	fa = &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 500},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fd.InPort(downstreamPorts[0].PortNo),
+			fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+		Actions: []*ofp.OfpAction{
+			fd.SetField(fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan)),
+		},
+	}
+	fg.AddFlow(fd.MkFlowStat(fa))
+
+	fa = &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 500},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fd.InPort(downstreamPorts[0].PortNo),
+			fd.VlanVid(0),
+		},
+		Actions: []*ofp.OfpAction{
+			fd.PushVlan(0x8100),
+			fd.SetField(fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan)),
+			fd.Output(upstreamPorts[0].PortNo),
+		},
+	}
+	fg.AddFlow(fd.MkFlowStat(fa))
+
+	fa = &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 500},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fd.InPort(upstreamPorts[0].PortNo),
+			fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan),
+		},
+		Actions: []*ofp.OfpAction{
+			fd.SetField(fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+			fd.Output(downstreamPorts[0].PortNo),
+		},
+	}
+	fg.AddFlow(fd.MkFlowStat(fa))
+
+	return fg
+}
+
+func (agent *LogicalDeviceAgent) generateDefaultRules() *fu.DeviceRules {
+	rules := fu.NewDeviceRules()
+	var ld *voltha.LogicalDevice
+	var err error
+	if ld, err = agent.getLogicalDevice(); err != nil {
+		log.Warnw("no-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+		return rules
+	}
+
+	deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
+	for deviceId, _ := range deviceNodeIds {
+		if deviceId == ld.RootDeviceId {
+			rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
+		} else {
+			rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
+		}
+	}
+	return rules
+}
+
+func (agent *LogicalDeviceAgent) GetAllDefaultRules() *fu.DeviceRules {
+	// Get latest
+	var lDevice *voltha.LogicalDevice
+	var err error
+	if lDevice, err = agent.getLogicalDevice(); err != nil {
+		return fu.NewDeviceRules()
+	}
+	if agent.DefaultFlowRules == nil { // Nothing setup yet
+		agent.deviceGraph = graph.NewDeviceGraph(agent.deviceMgr.getDevice)
+		agent.deviceGraph.ComputeRoutes(lDevice.Ports)
+		agent.DefaultFlowRules = agent.generateDefaultRules()
+	}
+	return agent.DefaultFlowRules
+}
+
+func (agent *LogicalDeviceAgent) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
+	lPorts := make([]uint32, 0)
+	var exclPort uint32
+	if len(excludePort) == 1 {
+		exclPort = excludePort[0]
+	}
+	if lDevice, _ := agent.getLogicalDevice(); lDevice != nil {
+		for _, port := range lDevice.Ports {
+			if port.OfpPort.PortNo != exclPort {
+				lPorts = append(lPorts, port.OfpPort.PortNo)
+			}
+		}
+	}
+	return lPorts
+}
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index c2a3634..a8bcfd6 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -35,57 +35,58 @@
 package core
 
 import (
-    "time"
-    "github.com/opencord/voltha-go/db/kvstore"
-    log "github.com/opencord/voltha-go/common/log"
+	log "github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/db/kvstore"
+	"time"
 )
 
 // Transaction acquisition results
 const (
-    UNKNOWN = iota
-    SEIZED_BY_SELF
-    COMPLETED_BY_OTHER
-    ABANDONED_BY_OTHER
-    STOPPED_WAITING_FOR_OTHER
+	UNKNOWN = iota
+	SEIZED_BY_SELF
+	COMPLETED_BY_OTHER
+	ABANDONED_BY_OTHER
+	STOPPED_WAITING_FOR_OTHER
 )
 
 const (
-    TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
+	TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
 )
 
 type TransactionContext struct {
-    kvClient kvstore.Client
-    kvOperationTimeout int
-    owner string
-    timeToDeleteCompletedKeys int
-    txnPrefix string
+	kvClient                  kvstore.Client
+	kvOperationTimeout        int
+	owner                     string
+	timeToDeleteCompletedKeys int
+	txnPrefix                 string
 }
+
 var ctx *TransactionContext
 
-var txnState = []string {
-    "UNKNOWN",
-    "SEIZED-BY-SELF",
-    "COMPLETED-BY-OTHER",
-    "ABANDONED-BY-OTHER",
-    "STOPPED-WAITING-FOR-OTHER"}
+var txnState = []string{
+	"UNKNOWN",
+	"SEIZED-BY-SELF",
+	"COMPLETED-BY-OTHER",
+	"ABANDONED-BY-OTHER",
+	"STOPPED-WAITING-FOR-OTHER"}
 
 func init() {
-    log.AddPackage(log.JSON, log.WarnLevel, nil)
+	log.AddPackage(log.JSON, log.WarnLevel, nil)
 }
 
 func NewTransactionContext(
-    owner string,
-    txnPrefix string,
-    kvClient kvstore.Client,
-    kvOpTimeout int,
-    keyDeleteTime int) *TransactionContext {
+	owner string,
+	txnPrefix string,
+	kvClient kvstore.Client,
+	kvOpTimeout int,
+	keyDeleteTime int) *TransactionContext {
 
-    return &TransactionContext{
-        owner: owner,
-        txnPrefix: txnPrefix,
-        kvClient: kvClient,
-        kvOperationTimeout: kvOpTimeout,
-        timeToDeleteCompletedKeys: keyDeleteTime}
+	return &TransactionContext{
+		owner:                     owner,
+		txnPrefix:                 txnPrefix,
+		kvClient:                  kvClient,
+		kvOperationTimeout:        kvOpTimeout,
+		timeToDeleteCompletedKeys: keyDeleteTime}
 }
 
 /*
@@ -104,19 +105,19 @@
  *                       TRANSACTION_COMPLETE key
  */
 func SetTransactionContext(owner string,
-    txnPrefix string,
-    kvClient kvstore.Client,
-    kvOpTimeout int,
-    keyDeleteTime int) error {
+	txnPrefix string,
+	kvClient kvstore.Client,
+	kvOpTimeout int,
+	keyDeleteTime int) error {
 
-    ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime)
-    return nil
+	ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime)
+	return nil
 }
 
 type KVTransaction struct {
-    ch chan int
-    txnId string
-    txnKey string
+	ch     chan int
+	txnId  string
+	txnKey string
 }
 
 /*
@@ -126,9 +127,9 @@
  * :return: A KVTransaction instance
  */
 func NewKVTransaction(txnId string) *KVTransaction {
-    return &KVTransaction{
-        txnId: txnId,
-        txnKey: ctx.txnPrefix + txnId}
+	return &KVTransaction{
+		txnId:  txnId,
+		txnKey: ctx.txnPrefix + txnId}
 }
 
 /*
@@ -143,98 +144,96 @@
  *          false - reservation not acquired, request being processed by another core
  */
 func (c *KVTransaction) Acquired(duration int64) bool {
-    var acquired bool
-    var currOwner string = ""
-    var res int
+	var acquired bool
+	var currOwner string = ""
+	var res int
 
-    // Convert milliseconds to seconds, rounding up
-    // The reservation TTL is specified in seconds
-    durationInSecs := duration / 1000
-    if remainder := duration % 1000; remainder > 0 {
-        durationInSecs++
-    }
-    value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
+	// Convert milliseconds to seconds, rounding up
+	// The reservation TTL is specified in seconds
+	durationInSecs := duration / 1000
+	if remainder := duration % 1000; remainder > 0 {
+		durationInSecs++
+	}
+	value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
 
-    // If the reservation failed, do we simply abort or drop into watch mode anyway?
-    // Setting value to nil leads to watch mode
-    if value != nil {
-        if currOwner, err = kvstore.ToString(value); err != nil {
-            log.Error("unexpected-owner-type")
-            value = nil
-        }
-    }
-    if err == nil && value != nil && currOwner == ctx.owner {
-        // Process the request immediately
-        res = SEIZED_BY_SELF
-    } else {
-        // Another core instance has reserved the request
-        // Watch for reservation expiry or successful request completion
-        events := ctx.kvClient.Watch(c.txnKey)
-        log.Debugw("watch-other-server",
-            log.Fields{"owner": currOwner, "timeout": duration})
+	// If the reservation failed, do we simply abort or drop into watch mode anyway?
+	// Setting value to nil leads to watch mode
+	if value != nil {
+		if currOwner, err = kvstore.ToString(value); err != nil {
+			log.Error("unexpected-owner-type")
+			value = nil
+		}
+	}
+	if err == nil && value != nil && currOwner == ctx.owner {
+		// Process the request immediately
+		res = SEIZED_BY_SELF
+	} else {
+		// Another core instance has reserved the request
+		// Watch for reservation expiry or successful request completion
+		events := ctx.kvClient.Watch(c.txnKey)
+		log.Debugw("watch-other-server",
+			log.Fields{"owner": currOwner, "timeout": duration})
 
-        select {
-        // Add a timeout here in case we miss an event from the KV
-        case <-time.After(time.Duration(duration) * time.Millisecond):
-            // In case of missing events, let's check the transaction key
-            kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout)
-            if err == nil && kvp == nil {
-                log.Debug("missed-deleted-event")
-                res = ABANDONED_BY_OTHER
-            } else if val, err := kvstore.ToString(kvp.Value);
-                err == nil && val == TRANSACTION_COMPLETE {
-                    log.Debugw("missed-put-event",
-                        log.Fields{"key": c.txnKey, "value": val})
-                    res = COMPLETED_BY_OTHER
-            } else {
-                res = STOPPED_WAITING_FOR_OTHER
-            }
+		select {
+		// Add a timeout here in case we miss an event from the KV
+		case <-time.After(time.Duration(duration) * time.Millisecond):
+			// In case of missing events, let's check the transaction key
+			kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout)
+			if err == nil && kvp == nil {
+				log.Debug("missed-deleted-event")
+				res = ABANDONED_BY_OTHER
+			} else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
+				log.Debugw("missed-put-event",
+					log.Fields{"key": c.txnKey, "value": val})
+				res = COMPLETED_BY_OTHER
+			} else {
+				res = STOPPED_WAITING_FOR_OTHER
+			}
 
-        case event := <-events:
-            log.Debugw("received-event", log.Fields{"type": event.EventType})
-            if event.EventType == kvstore.DELETE {
-                // The other core failed to process the request; step up
-                res = ABANDONED_BY_OTHER
-            } else if event.EventType == kvstore.PUT {
-                key, e1 := kvstore.ToString(event.Key)
-                val, e2 := kvstore.ToString(event.Value)
-                if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
-                    res = COMPLETED_BY_OTHER
-                    // Successful request completion has been detected
-                    // Remove the transaction key
-                    c.Delete()
-                }
-            }
-        }
-    }
-    // Clean-up: delete the transaction key after a long delay
-    go c.deleteTransactionKey()
+		case event := <-events:
+			log.Debugw("received-event", log.Fields{"type": event.EventType})
+			if event.EventType == kvstore.DELETE {
+				// The other core failed to process the request; step up
+				res = ABANDONED_BY_OTHER
+			} else if event.EventType == kvstore.PUT {
+				key, e1 := kvstore.ToString(event.Key)
+				val, e2 := kvstore.ToString(event.Value)
+				if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
+					res = COMPLETED_BY_OTHER
+					// Successful request completion has been detected
+					// Remove the transaction key
+					c.Delete()
+				}
+			}
+		}
+	}
+	// Clean-up: delete the transaction key after a long delay
+	go c.deleteTransactionKey()
 
-    log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
-    switch res {
-    case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
-        acquired = true
-    default:
-        acquired = false
-    }
-    return acquired
+	log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
+	switch res {
+	case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
+		acquired = true
+	default:
+		acquired = false
+	}
+	return acquired
 }
 
 func (c *KVTransaction) deleteTransactionKey() {
-    log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
-    time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
-    log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
-    ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
+	log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
+	time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
+	log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
+	ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
 }
 
 func (c *KVTransaction) Close() error {
-    log.Debugw("close", log.Fields{"key": c.txnKey})
-    return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout)
+	log.Debugw("close", log.Fields{"key": c.txnKey})
+	return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout)
 }
 
 func (c *KVTransaction) Delete() error {
-    log.Debugw("delete", log.Fields{"key": c.txnKey})
-    err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
-    return err
+	log.Debugw("delete", log.Fields{"key": c.txnKey})
+	err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
+	return err
 }
-
diff --git a/rw_core/coreIf/device_manager_if.go b/rw_core/coreIf/device_manager_if.go
new file mode 100644
index 0000000..c8e33c1
--- /dev/null
+++ b/rw_core/coreIf/device_manager_if.go
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+Defines a DeviceManager Interface - Used for unit testing of the flow decomposer only at this
+time.
+*/
+package coreIf
+
+import "github.com/opencord/voltha-go/protos/voltha"
+
+// DeviceManager represents a generic device manager
+type DeviceManager interface {
+	GetDevice(string) (*voltha.Device, error)
+}
diff --git a/rw_core/coreIf/logical_device_agent_if.go b/rw_core/coreIf/logical_device_agent_if.go
new file mode 100644
index 0000000..b5893ec
--- /dev/null
+++ b/rw_core/coreIf/logical_device_agent_if.go
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+  Defines a logicalDeviceAgent Interface - Used for unit testing of the flow decomposer only at this
+ time.
+*/
+package coreIf
+
+import (
+	"github.com/opencord/voltha-go/protos/voltha"
+	"github.com/opencord/voltha-go/rw_core/graph"
+	"github.com/opencord/voltha-go/rw_core/utils"
+)
+
+// LogicalAgent represents a generic agent
+type LogicalDeviceAgent interface {
+	GetLogicalDevice() *voltha.LogicalDevice
+	GetDeviceGraph() *graph.DeviceGraph
+	GetAllDefaultRules() *utils.DeviceRules
+	GetWildcardInputPorts(excludePort ...uint32) []uint32
+	GetRoute(ingressPortNo *uint32, egressPortNo *uint32) []graph.RouteHop
+}
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
new file mode 100644
index 0000000..54a6761
--- /dev/null
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -0,0 +1,1163 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package flow_decomposition
+
+import (
+	"bytes"
+	"crypto/md5"
+	"fmt"
+	"github.com/opencord/voltha-go/common/log"
+	ofp "github.com/opencord/voltha-go/protos/openflow_13"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"github.com/opencord/voltha-go/rw_core/coreIf"
+	"github.com/opencord/voltha-go/rw_core/graph"
+	fu "github.com/opencord/voltha-go/rw_core/utils"
+	"math/big"
+)
+
+func init() {
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+var (
+	// Instructions shortcut
+	APPLY_ACTIONS = ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS
+
+	//OFPAT_* shortcuts
+	OUTPUT       = ofp.OfpActionType_OFPAT_OUTPUT
+	COPY_TTL_OUT = ofp.OfpActionType_OFPAT_COPY_TTL_OUT
+	COPY_TTL_IN  = ofp.OfpActionType_OFPAT_COPY_TTL_IN
+	SET_MPLS_TTL = ofp.OfpActionType_OFPAT_SET_MPLS_TTL
+	DEC_MPLS_TTL = ofp.OfpActionType_OFPAT_DEC_MPLS_TTL
+	PUSH_VLAN    = ofp.OfpActionType_OFPAT_PUSH_VLAN
+	POP_VLAN     = ofp.OfpActionType_OFPAT_POP_VLAN
+	PUSH_MPLS    = ofp.OfpActionType_OFPAT_PUSH_MPLS
+	POP_MPLS     = ofp.OfpActionType_OFPAT_POP_MPLS
+	SET_QUEUE    = ofp.OfpActionType_OFPAT_SET_QUEUE
+	GROUP        = ofp.OfpActionType_OFPAT_GROUP
+	SET_NW_TTL   = ofp.OfpActionType_OFPAT_SET_NW_TTL
+	NW_TTL       = ofp.OfpActionType_OFPAT_DEC_NW_TTL
+	SET_FIELD    = ofp.OfpActionType_OFPAT_SET_FIELD
+	PUSH_PBB     = ofp.OfpActionType_OFPAT_PUSH_PBB
+	POP_PBB      = ofp.OfpActionType_OFPAT_POP_PBB
+	EXPERIMENTER = ofp.OfpActionType_OFPAT_EXPERIMENTER
+
+	//OFPXMT_OFB_* shortcuts (incomplete)
+	IN_PORT         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT
+	IN_PHY_PORT     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT
+	METADATA        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_METADATA
+	ETH_DST         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ETH_DST
+	ETH_SRC         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ETH_SRC
+	ETH_TYPE        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE
+	VLAN_VID        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID
+	VLAN_PCP        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_PCP
+	IP_DSCP         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IP_DSCP
+	IP_ECN          = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IP_ECN
+	IP_PROTO        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO
+	IPV4_SRC        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV4_SRC
+	IPV4_DST        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV4_DST
+	TCP_SRC         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_TCP_SRC
+	TCP_DST         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_TCP_DST
+	UDP_SRC         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC
+	UDP_DST         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST
+	SCTP_SRC        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_SCTP_SRC
+	SCTP_DST        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_SCTP_DST
+	ICMPV4_TYPE     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ICMPV4_TYPE
+	ICMPV4_CODE     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ICMPV4_CODE
+	ARP_OP          = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_OP
+	ARP_SPA         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_SPA
+	ARP_TPA         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_TPA
+	ARP_SHA         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_SHA
+	ARP_THA         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ARP_THA
+	IPV6_SRC        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_SRC
+	IPV6_DST        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_DST
+	IPV6_FLABEL     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_FLABEL
+	ICMPV6_TYPE     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ICMPV6_TYPE
+	ICMPV6_CODE     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_ICMPV6_CODE
+	IPV6_ND_TARGET  = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_ND_TARGET
+	OFB_IPV6_ND_SLL = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_ND_SLL
+	IPV6_ND_TLL     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_ND_TLL
+	MPLS_LABEL      = ofp.OxmOfbFieldTypes_OFPXMT_OFB_MPLS_LABEL
+	MPLS_TC         = ofp.OxmOfbFieldTypes_OFPXMT_OFB_MPLS_TC
+	MPLS_BOS        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_MPLS_BOS
+	PBB_ISID        = ofp.OxmOfbFieldTypes_OFPXMT_OFB_PBB_ISID
+	TUNNEL_ID       = ofp.OxmOfbFieldTypes_OFPXMT_OFB_TUNNEL_ID
+	IPV6_EXTHDR     = ofp.OxmOfbFieldTypes_OFPXMT_OFB_IPV6_EXTHDR
+)
+
+//ofp_action_* shortcuts
+
+func Output(port uint32, maxLen ...ofp.OfpControllerMaxLen) *ofp.OfpAction {
+	maxLength := ofp.OfpControllerMaxLen_OFPCML_MAX
+	if len(maxLen) > 0 {
+		maxLength = maxLen[0]
+	}
+	return &ofp.OfpAction{Type: OUTPUT, Action: &ofp.OfpAction_Output{Output: &ofp.OfpActionOutput{Port: port, MaxLen: uint32(maxLength)}}}
+}
+
+func MplsTtl(ttl uint32) *ofp.OfpAction {
+	return &ofp.OfpAction{Type: SET_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: ttl}}}
+}
+
+func PushVlan(ethType uint32) *ofp.OfpAction {
+	return &ofp.OfpAction{Type: PUSH_VLAN, Action: &ofp.OfpAction_Push{Push: &ofp.OfpActionPush{Ethertype: ethType}}}
+}
+
+func PopVlan() *ofp.OfpAction {
+	return &ofp.OfpAction{Type: POP_VLAN}
+}
+
+func PopMpls(ethType uint32) *ofp.OfpAction {
+	return &ofp.OfpAction{Type: POP_MPLS, Action: &ofp.OfpAction_PopMpls{PopMpls: &ofp.OfpActionPopMpls{Ethertype: ethType}}}
+}
+
+func Group(groupId uint32) *ofp.OfpAction {
+	return &ofp.OfpAction{Type: GROUP, Action: &ofp.OfpAction_Group{Group: &ofp.OfpActionGroup{GroupId: groupId}}}
+}
+
+func NwTtl(nwTtl uint32) *ofp.OfpAction {
+	return &ofp.OfpAction{Type: NW_TTL, Action: &ofp.OfpAction_NwTtl{NwTtl: &ofp.OfpActionNwTtl{NwTtl: nwTtl}}}
+}
+
+func SetField(field *ofp.OfpOxmOfbField) *ofp.OfpAction {
+	actionSetField := &ofp.OfpOxmField{OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC, Field: &ofp.OfpOxmField_OfbField{OfbField: field}}
+	return &ofp.OfpAction{Type: SET_FIELD, Action: &ofp.OfpAction_SetField{SetField: &ofp.OfpActionSetField{Field: actionSetField}}}
+}
+
+func Experimenter(experimenter uint32, data []byte) *ofp.OfpAction {
+	return &ofp.OfpAction{Type: EXPERIMENTER, Action: &ofp.OfpAction_Experimenter{Experimenter: &ofp.OfpActionExperimenter{Experimenter: experimenter, Data: data}}}
+}
+
+//ofb_field generators (incomplete set)
+
+func InPort(inPort uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IN_PORT, Value: &ofp.OfpOxmOfbField_Port{Port: inPort}}
+}
+
+func InPhyPort(inPhyPort uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IN_PHY_PORT, Value: &ofp.OfpOxmOfbField_Port{Port: inPhyPort}}
+}
+
+func Metadata_ofp(tableMetadata uint64) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: METADATA, Value: &ofp.OfpOxmOfbField_TableMetadata{TableMetadata: tableMetadata}}
+}
+
+// should Metadata_ofp used here ?????
+func EthDst(ethDst uint64) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ETH_DST, Value: &ofp.OfpOxmOfbField_TableMetadata{TableMetadata: ethDst}}
+}
+
+// should Metadata_ofp used here ?????
+func EthSrc(ethSrc uint64) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ETH_SRC, Value: &ofp.OfpOxmOfbField_TableMetadata{TableMetadata: ethSrc}}
+}
+
+func EthType(ethType uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ETH_TYPE, Value: &ofp.OfpOxmOfbField_EthType{EthType: ethType}}
+}
+
+func VlanVid(vlanVid uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: VLAN_VID, Value: &ofp.OfpOxmOfbField_VlanVid{VlanVid: vlanVid}}
+}
+
+func VlanPcp(vlanPcp uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: VLAN_VID, Value: &ofp.OfpOxmOfbField_VlanPcp{VlanPcp: vlanPcp}}
+}
+
+func IpDscp(ipDscp uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IP_DSCP, Value: &ofp.OfpOxmOfbField_IpDscp{IpDscp: ipDscp}}
+}
+
+func IpEcn(ipEcn uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IP_ECN, Value: &ofp.OfpOxmOfbField_IpEcn{IpEcn: ipEcn}}
+}
+
+func IpProto(ipProto uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IP_PROTO, Value: &ofp.OfpOxmOfbField_IpProto{IpProto: ipProto}}
+}
+
+func Ipv4Src(ipv4Src uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV4_SRC, Value: &ofp.OfpOxmOfbField_Ipv4Src{Ipv4Src: ipv4Src}}
+}
+
+func Ipv4Dst(ipv4Dst uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV4_DST, Value: &ofp.OfpOxmOfbField_Ipv4Dst{Ipv4Dst: ipv4Dst}}
+}
+
+func TcpSrc(tcpSrc uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: TCP_SRC, Value: &ofp.OfpOxmOfbField_TcpSrc{TcpSrc: tcpSrc}}
+}
+
+func TcpDst(tcpDst uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: TCP_DST, Value: &ofp.OfpOxmOfbField_TcpDst{TcpDst: tcpDst}}
+}
+
+func UdpSrc(udpSrc uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: UDP_SRC, Value: &ofp.OfpOxmOfbField_UdpSrc{UdpSrc: udpSrc}}
+}
+
+func UdpDst(udpDst uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: UDP_DST, Value: &ofp.OfpOxmOfbField_UdpDst{UdpDst: udpDst}}
+}
+
+func SctpSrc(sctpSrc uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: SCTP_SRC, Value: &ofp.OfpOxmOfbField_SctpSrc{SctpSrc: sctpSrc}}
+}
+
+func SctpDst(sctpDst uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: SCTP_DST, Value: &ofp.OfpOxmOfbField_SctpDst{SctpDst: sctpDst}}
+}
+
+func Icmpv4Type(icmpv4Type uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ICMPV4_TYPE, Value: &ofp.OfpOxmOfbField_Icmpv4Type{Icmpv4Type: icmpv4Type}}
+}
+
+func Icmpv4Code(icmpv4Code uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ICMPV4_CODE, Value: &ofp.OfpOxmOfbField_Icmpv4Code{Icmpv4Code: icmpv4Code}}
+}
+
+func ArpOp(arpOp uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ARP_OP, Value: &ofp.OfpOxmOfbField_ArpOp{ArpOp: arpOp}}
+}
+
+func ArpSpa(arpSpa uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ARP_SPA, Value: &ofp.OfpOxmOfbField_ArpSpa{ArpSpa: arpSpa}}
+}
+
+func ArpTpa(arpTpa uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ARP_TPA, Value: &ofp.OfpOxmOfbField_ArpTpa{ArpTpa: arpTpa}}
+}
+
+func ArpSha(arpSha []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ARP_SHA, Value: &ofp.OfpOxmOfbField_ArpSha{ArpSha: arpSha}}
+}
+
+func ArpTha(arpTha []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ARP_THA, Value: &ofp.OfpOxmOfbField_ArpTha{ArpTha: arpTha}}
+}
+
+func Ipv6Src(ipv6Src []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV6_SRC, Value: &ofp.OfpOxmOfbField_Ipv6Src{Ipv6Src: ipv6Src}}
+}
+
+func Ipv6Dst(ipv6Dst []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV6_DST, Value: &ofp.OfpOxmOfbField_Ipv6Dst{Ipv6Dst: ipv6Dst}}
+}
+
+func Ipv6Flabel(ipv6Flabel uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV6_FLABEL, Value: &ofp.OfpOxmOfbField_Ipv6Flabel{Ipv6Flabel: ipv6Flabel}}
+}
+
+func Icmpv6Type(icmpv6Type uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ICMPV6_TYPE, Value: &ofp.OfpOxmOfbField_Icmpv6Type{Icmpv6Type: icmpv6Type}}
+}
+
+func Icmpv6Code(icmpv6Code uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: ICMPV6_CODE, Value: &ofp.OfpOxmOfbField_Icmpv6Code{Icmpv6Code: icmpv6Code}}
+}
+
+func Ipv6NdTarget(ipv6NdTarget []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV6_ND_TARGET, Value: &ofp.OfpOxmOfbField_Ipv6NdTarget{Ipv6NdTarget: ipv6NdTarget}}
+}
+
+func OfbIpv6NdSll(ofbIpv6NdSll []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: OFB_IPV6_ND_SLL, Value: &ofp.OfpOxmOfbField_Ipv6NdSsl{Ipv6NdSsl: ofbIpv6NdSll}}
+}
+
+func Ipv6NdTll(ipv6NdTll []byte) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV6_ND_TLL, Value: &ofp.OfpOxmOfbField_Ipv6NdTll{Ipv6NdTll: ipv6NdTll}}
+}
+
+func MplsLabel(mplsLabel uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: MPLS_LABEL, Value: &ofp.OfpOxmOfbField_MplsLabel{MplsLabel: mplsLabel}}
+}
+
+func MplsTc(mplsTc uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: MPLS_TC, Value: &ofp.OfpOxmOfbField_MplsTc{MplsTc: mplsTc}}
+}
+
+func MplsBos(mplsBos uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: MPLS_BOS, Value: &ofp.OfpOxmOfbField_MplsBos{MplsBos: mplsBos}}
+}
+
+func PbbIsid(pbbIsid uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: PBB_ISID, Value: &ofp.OfpOxmOfbField_PbbIsid{PbbIsid: pbbIsid}}
+}
+
+func TunnelId(tunnelId uint64) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: TUNNEL_ID, Value: &ofp.OfpOxmOfbField_TunnelId{TunnelId: tunnelId}}
+}
+
+func Ipv6Exthdr(ipv6Exthdr uint32) *ofp.OfpOxmOfbField {
+	return &ofp.OfpOxmOfbField{Type: IPV6_EXTHDR, Value: &ofp.OfpOxmOfbField_Ipv6Exthdr{Ipv6Exthdr: ipv6Exthdr}}
+}
+
+//frequently used extractors
+
+func GetActions(flow *ofp.OfpFlowStats) []*ofp.OfpAction {
+	if flow == nil {
+		return nil
+	}
+	for _, instruction := range flow.Instructions {
+		if instruction.Type == uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS) {
+			instActions := instruction.GetActions()
+			if instActions == nil {
+				return nil
+			}
+			return instActions.Actions
+		}
+	}
+	return nil
+}
+
+func GetOfbFields(flow *ofp.OfpFlowStats) []*ofp.OfpOxmOfbField {
+	if flow == nil || flow.Match == nil || flow.Match.Type != ofp.OfpMatchType_OFPMT_OXM {
+		return nil
+	}
+	ofbFields := make([]*ofp.OfpOxmOfbField, 0)
+	for _, field := range flow.Match.OxmFields {
+		if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
+			ofbFields = append(ofbFields, field.GetOfbField())
+		}
+	}
+	return ofbFields
+}
+
+func GetOutPort(flow *ofp.OfpFlowStats) uint32 {
+	if flow == nil {
+		return 0
+	}
+	for _, action := range GetActions(flow) {
+		if action.Type == OUTPUT {
+			out := action.GetOutput()
+			if out == nil {
+				return 0
+			}
+			return out.GetPort()
+		}
+	}
+	return 0
+}
+
+func GetInPort(flow *ofp.OfpFlowStats) uint32 {
+	if flow == nil {
+		return 0
+	}
+	for _, field := range GetOfbFields(flow) {
+		if field.Type == IN_PORT {
+			return field.GetPort()
+		}
+	}
+	return 0
+}
+
+func GetGotoTableId(flow *ofp.OfpFlowStats) uint32 {
+	if flow == nil {
+		return 0
+	}
+	for _, instruction := range flow.Instructions {
+		if instruction.Type == uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE) {
+			gotoTable := instruction.GetGotoTable()
+			if gotoTable == nil {
+				return 0
+			}
+			return gotoTable.GetTableId()
+		}
+	}
+	return 0
+}
+
+//GetMetaData - legacy get method (only want lower 32 bits)
+func GetMetaData(flow *ofp.OfpFlowStats) uint32 {
+	if flow == nil {
+		return 0
+	}
+	for _, field := range GetOfbFields(flow) {
+		if field.Type == METADATA {
+			return uint32(field.GetTableMetadata() & 0xffffffff)
+		}
+	}
+	return 0
+}
+
+func GetMetaData64Bit(flow *ofp.OfpFlowStats) uint64 {
+	if flow == nil {
+		return 0
+	}
+	for _, field := range GetOfbFields(flow) {
+		if field.Type == METADATA {
+			return field.GetTableMetadata()
+		}
+	}
+	return 0
+}
+
+// GetPortNumberFromMetadata retrieves the port number from the Metadata_ofp. The port number (UNI on ONU) is in the
+// lower 32-bits of Metadata_ofp and the inner_tag is in the upper 32-bits. This is set in the ONOS OltPipeline as
+// a Metadata_ofp field
+func GetPortNumberFromMetadata(flow *ofp.OfpFlowStats) uint64 {
+	md := GetMetaData64Bit(flow)
+	if md == 0 {
+		return 0
+	}
+	if md <= 0xffffffff {
+		log.Warnw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
+		return md
+	}
+	return md & 0xffffffff
+}
+
+//GetInnerTagFromMetaData retrieves the inner tag from the Metadata_ofp. The port number (UNI on ONU) is in the
+// lower 32-bits of Metadata_ofp and the inner_tag is in the upper 32-bits. This is set in the ONOS OltPipeline as
+//// a Metadata_ofp field
+func GetInnerTagFromMetaData(flow *ofp.OfpFlowStats) uint64 {
+	md := GetMetaData64Bit(flow)
+	if md == 0 {
+		return 0
+	}
+	if md <= 0xffffffff {
+		log.Warnw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
+		return md
+	}
+	return (md >> 32) & 0xffffffff
+}
+
+func HasNextTable(flow *ofp.OfpFlowStats) bool {
+	if flow == nil {
+		return false
+	}
+	return GetGotoTableId(flow) != 0
+}
+
+func GetGroup(flow *ofp.OfpFlowStats) uint32 {
+	if flow == nil {
+		return 0
+	}
+	for _, action := range GetActions(flow) {
+		if action.Type == GROUP {
+			grp := action.GetGroup()
+			if grp == nil {
+				return 0
+			}
+			return grp.GetGroupId()
+		}
+	}
+	return 0
+}
+
+func HasGroup(flow *ofp.OfpFlowStats) bool {
+	return GetGroup(flow) != 0
+}
+
+// GetNextTableId returns the next table ID if the "table_id" is present in the map, otherwise return nil
+func GetNextTableId(kw fu.OfpFlowModArgs) *uint32 {
+	if val, exist := kw["table_id"]; exist {
+		ret := uint32(val)
+		return &ret
+	}
+	return nil
+}
+
+// Return unique 64-bit integer hash for flow covering the following attributes:
+// 'table_id', 'priority', 'flags', 'cookie', 'match', '_instruction_string'
+func hashFlowStats(flow *ofp.OfpFlowStats) uint64 {
+	if flow == nil { // Should never happen
+		return 0
+	}
+	// Create string with the instructions field first
+	var instructionString bytes.Buffer
+	for _, instruction := range flow.Instructions {
+		instructionString.WriteString(instruction.String())
+	}
+	var flowString = fmt.Sprintf("%d%d%d%d%s%s", flow.TableId, flow.Priority, flow.Flags, flow.Cookie, flow.Match.String(), instructionString.String())
+	h := md5.New()
+	h.Write([]byte(flowString))
+	hash := big.NewInt(0)
+	hash.SetBytes(h.Sum(nil))
+	return hash.Uint64()
+}
+
+// flowStatsEntryFromFlowModMessage maps an ofp_flow_mod message to an ofp_flow_stats message
+func FlowStatsEntryFromFlowModMessage(mod *ofp.OfpFlowMod) *ofp.OfpFlowStats {
+	flow := &ofp.OfpFlowStats{}
+	if mod == nil {
+		return flow
+	}
+	flow.TableId = mod.TableId
+	flow.Priority = mod.Priority
+	flow.IdleTimeout = mod.IdleTimeout
+	flow.HardTimeout = mod.HardTimeout
+	flow.Flags = mod.Flags
+	flow.Cookie = mod.Cookie
+	flow.Match = mod.Match
+	flow.Instructions = mod.Instructions
+	flow.Id = hashFlowStats(flow)
+	return flow
+}
+
+func GroupEntryFromGroupMod(mod *ofp.OfpGroupMod) *ofp.OfpGroupEntry {
+	group := &ofp.OfpGroupEntry{}
+	if mod == nil {
+		return group
+	}
+	group.Desc = &ofp.OfpGroupDesc{Type: mod.Type, GroupId: mod.GroupId, Buckets: mod.Buckets}
+	group.Stats = &ofp.OfpGroupStats{GroupId: mod.GroupId}
+	return group
+}
+
+func MkOxmFields(matchFields []ofp.OfpOxmField) []*ofp.OfpOxmField {
+	oxmFields := make([]*ofp.OfpOxmField, 0)
+	for _, matchField := range matchFields {
+		oxmField := ofp.OfpOxmField{OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC, Field: matchField.Field}
+		oxmFields = append(oxmFields, &oxmField)
+	}
+	return oxmFields
+}
+
+func MkInstructionsFromActions(actions []*ofp.OfpAction) []*ofp.OfpInstruction {
+	instructions := make([]*ofp.OfpInstruction, 0)
+	instructionAction := ofp.OfpInstruction_Actions{Actions: &ofp.OfpInstructionActions{Actions: actions}}
+	instruction := ofp.OfpInstruction{Type: uint32(APPLY_ACTIONS), Data: &instructionAction}
+	instructions = append(instructions, &instruction)
+	return instructions
+}
+
+// Convenience function to generare ofp_flow_mod message with OXM BASIC match composed from the match_fields, and
+// single APPLY_ACTIONS instruction with a list if ofp_action objects.
+func MkSimpleFlowMod(matchFields []*ofp.OfpOxmField, actions []*ofp.OfpAction, command *ofp.OfpFlowModCommand, kw fu.OfpFlowModArgs) *ofp.OfpFlowMod {
+
+	// Process actions instructions
+	instructions := make([]*ofp.OfpInstruction, 0)
+	instructionAction := ofp.OfpInstruction_Actions{Actions: &ofp.OfpInstructionActions{Actions: actions}}
+	instruction := ofp.OfpInstruction{Type: uint32(APPLY_ACTIONS), Data: &instructionAction}
+	instructions = append(instructions, &instruction)
+
+	// Process next table
+	if tableId := GetNextTableId(kw); tableId != nil {
+		var instGotoTable ofp.OfpInstruction_GotoTable
+		instGotoTable.GotoTable = &ofp.OfpInstructionGotoTable{TableId: *tableId}
+		inst := ofp.OfpInstruction{Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE), Data: &instGotoTable}
+		instructions = append(instructions, &inst)
+	}
+
+	// Process match fields
+	oxmFields := make([]*ofp.OfpOxmField, 0)
+	for _, matchField := range matchFields {
+		oxmField := ofp.OfpOxmField{OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC, Field: matchField.Field}
+		oxmFields = append(oxmFields, &oxmField)
+	}
+	var match ofp.OfpMatch
+	match.Type = ofp.OfpMatchType_OFPMT_OXM
+	match.OxmFields = oxmFields
+
+	// Create ofp_flow_message
+	msg := &ofp.OfpFlowMod{}
+	if command == nil {
+		msg.Command = ofp.OfpFlowModCommand_OFPFC_ADD
+	} else {
+		msg.Command = *command
+	}
+	msg.Instructions = instructions
+	msg.Match = &match
+
+	// Set the variadic argument values
+	msg = setVariadicModAttributes(msg, kw)
+
+	return msg
+}
+
+func MkMulticastGroupMod(groupId uint32, buckets []*ofp.OfpBucket, command *ofp.OfpGroupModCommand) *ofp.OfpGroupMod {
+	group := &ofp.OfpGroupMod{}
+	if command == nil {
+		group.Command = ofp.OfpGroupModCommand_OFPGC_ADD
+	} else {
+		group.Command = *command
+	}
+	group.Type = ofp.OfpGroupType_OFPGT_ALL
+	group.GroupId = groupId
+	group.Buckets = buckets
+	return group
+}
+
+//SetVariadicModAttributes sets only uint64 or uint32 fields of the ofp_flow_mod message
+func setVariadicModAttributes(mod *ofp.OfpFlowMod, args fu.OfpFlowModArgs) *ofp.OfpFlowMod {
+	if args == nil {
+		return mod
+	}
+	for key, val := range args {
+		switch key {
+		case "cookie":
+			mod.Cookie = val
+		case "cookie_mask":
+			mod.CookieMask = val
+		case "table_id":
+			mod.TableId = uint32(val)
+		case "idle_timeout":
+			mod.IdleTimeout = uint32(val)
+		case "hard_timeout":
+			mod.HardTimeout = uint32(val)
+		case "priority":
+			mod.Priority = uint32(val)
+		case "buffer_id":
+			mod.BufferId = uint32(val)
+		case "out_port":
+			mod.OutPort = uint32(val)
+		case "out_group":
+			mod.OutGroup = uint32(val)
+		case "flags":
+			mod.Flags = uint32(val)
+		}
+	}
+	return mod
+}
+
+// MkFlowStat is a helper method to build flows
+func MkFlowStat(fa *fu.FlowArgs) *ofp.OfpFlowStats {
+	//Build the matchfields
+	matchFields := make([]*ofp.OfpOxmField, 0)
+	for _, val := range fa.MatchFields {
+		matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+	}
+	return FlowStatsEntryFromFlowModMessage(MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV))
+}
+
+func MkGroupStat(groupId uint32, buckets []*ofp.OfpBucket, command *ofp.OfpGroupModCommand) *ofp.OfpGroupEntry {
+	return GroupEntryFromGroupMod(MkMulticastGroupMod(groupId, buckets, command))
+}
+
+type FlowDecomposer struct {
+	deviceMgr coreIf.DeviceManager
+}
+
+func NewFlowDecomposer(deviceMgr coreIf.DeviceManager) *FlowDecomposer {
+	var decomposer FlowDecomposer
+	decomposer.deviceMgr = deviceMgr
+	return &decomposer
+}
+
+//DecomposeRules decomposes per-device flows and flow-groups from the flows and groups defined on a logical device
+func (fd *FlowDecomposer) DecomposeRules(agent coreIf.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups) *fu.DeviceRules {
+	rules := agent.GetAllDefaultRules()
+	deviceRules := rules.Copy()
+
+	groupMap := make(map[uint32]*ofp.OfpGroupEntry)
+	for _, groupEntry := range groups.Items {
+		groupMap[groupEntry.Desc.GroupId] = groupEntry
+	}
+
+	var decomposedRules *fu.DeviceRules
+	for _, flow := range flows.Items {
+		decomposedRules = fd.decomposeFlow(agent, flow, groupMap)
+		for deviceId, flowAndGroups := range decomposedRules.Rules {
+			fmt.Println("!!!!!", deviceId, flowAndGroups)
+			deviceRules.Rules[deviceId] = fu.NewFlowsAndGroups()
+			deviceRules.Rules[deviceId].AddFrom(flowAndGroups)
+		}
+	}
+	return deviceRules
+}
+
+func (fd *FlowDecomposer) processControllerBoundFlow(agent coreIf.LogicalDeviceAgent, route []graph.RouteHop, inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules) *fu.DeviceRules {
+	log.Debugw("trap-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "flow": flow})
+	fg := fu.NewFlowsAndGroups()
+	if agent.GetDeviceGraph().IsRootPort(inPortNo) {
+		log.Debug("trap-nni")
+		var fa *fu.FlowArgs
+		fa = &fu.FlowArgs{
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			MatchFields: []*ofp.OfpOxmOfbField{
+				InPort(route[1].Egress), //egress_hop.egress_port.port_no
+			},
+			Actions: GetActions(flow),
+		}
+		// Augment the matchfields with the ofpfields from the flow
+		for _, val := range GetOfbFields(flow) {
+			if val.Type != IN_PORT {
+				fa.MatchFields = append(fa.MatchFields, val)
+			}
+		}
+		fg.AddFlow(MkFlowStat(fa))
+	} else {
+		// Trap flow for UNI port
+		log.Debug("trap-uni")
+
+		//inPortNo is 0 for wildcard input case, do not include upstream port for 4000 flow in input
+		var inPorts []uint32
+		if inPortNo == 0 {
+			inPorts = agent.GetWildcardInputPorts(route[1].Egress) // exclude egress_hop.egress_port.port_no
+		} else {
+			inPorts = []uint32{inPortNo}
+		}
+		for _, inputPort := range inPorts {
+			var fa *fu.FlowArgs
+			// Upstream flow
+			fa = &fu.FlowArgs{
+				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+				MatchFields: []*ofp.OfpOxmOfbField{
+					InPort(route[1].Ingress), //egress_hop.ingress_port.port_no
+					VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | inputPort),
+				},
+				Actions: []*ofp.OfpAction{
+					PushVlan(0x8100),
+					SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
+					Output(route[1].Egress),
+				},
+			}
+			// Augment the matchfields with the ofpfields from the flow
+			for _, val := range GetOfbFields(flow) {
+				if val.Type != IN_PORT && val.Type != VLAN_VID {
+					fa.MatchFields = append(fa.MatchFields, val)
+				}
+			}
+			fg.AddFlow(MkFlowStat(fa))
+
+			// Downstream flow
+			fa = &fu.FlowArgs{
+				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority)},
+				MatchFields: []*ofp.OfpOxmOfbField{
+					InPort(route[1].Egress), //egress_hop.ingress_port.port_no
+					VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
+					VlanPcp(0),
+					Metadata_ofp(uint64(inputPort)),
+				},
+				Actions: []*ofp.OfpAction{
+					PopVlan(),
+					Output(route[1].Ingress),
+				},
+			}
+			fg.AddFlow(MkFlowStat(fa))
+		}
+	}
+	deviceRules.AddFlowsAndGroup(route[1].DeviceID, fg)
+	return deviceRules
+}
+
+// processUpstreamNonControllerBoundFlow processes non-controller bound flow. We assume that anything that is
+// upstream needs to get Q-in-Q treatment and that this is expressed via two flow rules, the first using the
+// goto-statement. We also assume that the inner tag is applied at the ONU, while the outer tag is
+// applied at the OLT
+func (fd *FlowDecomposer) processUpstreamNonControllerBoundFlow(agent coreIf.LogicalDeviceAgent, route []graph.RouteHop, inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules) *fu.DeviceRules {
+	log.Debugw("upstream-non-controller-bound-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
+	if HasNextTable(flow) {
+		log.Debugw("has-next-table", log.Fields{"table_id": flow.TableId})
+		if outPortNo != 0 {
+			log.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
+		}
+		var fa *fu.FlowArgs
+		fa = &fu.FlowArgs{
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			MatchFields: []*ofp.OfpOxmOfbField{
+				InPort(route[0].Ingress), //ingress_hop.ingress_port.port_no
+			},
+			Actions: GetActions(flow),
+		}
+		// Augment the matchfields with the ofpfields from the flow
+		for _, val := range GetOfbFields(flow) {
+			if val.Type != IN_PORT {
+				fa.MatchFields = append(fa.MatchFields, val)
+			}
+		}
+		// Agument the Actions
+		fa.Actions = append(fa.Actions, Output(route[0].Egress))
+		fg := fu.NewFlowsAndGroups()
+		fg.AddFlow(MkFlowStat(fa))
+		deviceRules.AddFlowsAndGroup(route[0].DeviceID, fg)
+	} else {
+		var actions []ofp.OfpActionType
+		var isOutputTypeInActions bool
+		for _, action := range GetActions(flow) {
+			actions = append(actions, action.Type)
+			if !isOutputTypeInActions && action.Type == OUTPUT {
+				isOutputTypeInActions = true
+			}
+		}
+		if len(actions) == 1 && isOutputTypeInActions {
+			var fa *fu.FlowArgs
+			// child device flow
+			fa = &fu.FlowArgs{
+				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+				MatchFields: []*ofp.OfpOxmOfbField{
+					InPort(route[0].Ingress), //ingress_hop.ingress_port.port_no
+				},
+				Actions: []*ofp.OfpAction{
+					Output(route[0].Egress),
+				},
+			}
+			// Augment the matchfields with the ofpfields from the flow
+			for _, val := range GetOfbFields(flow) {
+				if val.Type != IN_PORT {
+					fa.MatchFields = append(fa.MatchFields, val)
+				}
+			}
+			fg := fu.NewFlowsAndGroups()
+			fg.AddFlow(MkFlowStat(fa))
+			deviceRules.AddFlowsAndGroup(route[0].DeviceID, fg)
+
+			// parent device flow
+			fa = &fu.FlowArgs{
+				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+				MatchFields: []*ofp.OfpOxmOfbField{
+					InPort(route[1].Ingress), //egress_hop.ingress_port.port_no
+				},
+				Actions: []*ofp.OfpAction{
+					Output(route[1].Egress),
+				},
+			}
+			// Augment the matchfields with the ofpfields from the flow
+			for _, val := range GetOfbFields(flow) {
+				if val.Type != IN_PORT {
+					fa.MatchFields = append(fa.MatchFields, val)
+				}
+			}
+			fg = fu.NewFlowsAndGroups()
+			fg.AddFlow(MkFlowStat(fa))
+			deviceRules.AddFlowsAndGroup(route[1].DeviceID, fg)
+		} else {
+			if outPortNo == 0 {
+				log.Warnw("outPort-should-be-specified", log.Fields{"outPortNo": outPortNo})
+			}
+			var fa *fu.FlowArgs
+			fa = &fu.FlowArgs{
+				KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+				MatchFields: []*ofp.OfpOxmOfbField{
+					InPort(route[1].Ingress), //egress_hop.ingress_port.port_no
+				},
+			}
+			// Augment the matchfields with the ofpfields from the flow
+			for _, val := range GetOfbFields(flow) {
+				if val.Type != IN_PORT {
+					fa.MatchFields = append(fa.MatchFields, val)
+				}
+			}
+			// Augment the Actions
+			updatedAction := make([]*ofp.OfpAction, 0)
+			for _, action := range GetActions(flow) {
+				if action.Type != OUTPUT {
+					updatedAction = append(updatedAction, action)
+				}
+			}
+			updatedAction = append(updatedAction, Output(route[1].Egress))
+			fa.Actions = updatedAction
+			fg := fu.NewFlowsAndGroups()
+			fg.AddFlow(MkFlowStat(fa))
+			deviceRules.AddFlowsAndGroup(route[1].DeviceID, fg)
+		}
+	}
+	return deviceRules
+}
+
+func (fd *FlowDecomposer) processDownstreamFlowWithNextTable(agent coreIf.LogicalDeviceAgent, route []graph.RouteHop, inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules) *fu.DeviceRules {
+	log.Debugw("downstream-flow-with-next-table", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
+	if outPortNo != 0 {
+		log.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
+	}
+	ingressHop := route[0]
+	if GetMetaData(flow) != 0 {
+		log.Debugw("creating-metadata-flow", log.Fields{"flow": flow})
+		portNumber := uint32(GetPortNumberFromMetadata(flow))
+		if portNumber != 0 {
+			recalculatedRoute := agent.GetRoute(&inPortNo, &portNumber)
+			switch len(recalculatedRoute) {
+			case 0:
+				log.Errorw("no-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": GetMetaData64Bit(flow)})
+				//	TODO: Delete flow
+				return deviceRules
+			case 2:
+				log.Debugw("route-found", log.Fields{"ingressHop": route[0], "egressHop": route[1]})
+				break
+			default:
+				log.Errorw("invalid-route-length", log.Fields{"routeLen": len(route)})
+				return deviceRules
+			}
+			ingressHop = recalculatedRoute[0]
+		}
+		innerTag := GetInnerTagFromMetaData(flow)
+		if innerTag == 0 {
+			log.Errorw("no-inner-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": GetMetaData64Bit(flow)})
+			//	TODO: Delete flow
+			return deviceRules
+		}
+		var fa *fu.FlowArgs
+		fa = &fu.FlowArgs{
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			MatchFields: []*ofp.OfpOxmOfbField{
+				InPort(ingressHop.Ingress), //ingress_hop.ingress_port.port_no
+				Metadata_ofp(innerTag),
+			},
+			Actions: GetActions(flow),
+		}
+		// Augment the matchfields with the ofpfields from the flow
+		for _, val := range GetOfbFields(flow) {
+			if val.Type != IN_PORT && val.Type != METADATA {
+				fa.MatchFields = append(fa.MatchFields, val)
+			}
+		}
+		// Agument the Actions
+		fa.Actions = append(fa.Actions, Output(ingressHop.Egress))
+		fg := fu.NewFlowsAndGroups()
+		fg.AddFlow(MkFlowStat(fa))
+		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
+	} else { // Create standard flow
+		log.Debugw("creating-standard-flow", log.Fields{"flow": flow})
+		var fa *fu.FlowArgs
+		fa = &fu.FlowArgs{
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			MatchFields: []*ofp.OfpOxmOfbField{
+				InPort(ingressHop.Ingress), //ingress_hop.ingress_port.port_no
+			},
+			Actions: GetActions(flow),
+		}
+		// Augment the matchfields with the ofpfields from the flow
+		for _, val := range GetOfbFields(flow) {
+			if val.Type != IN_PORT {
+				fa.MatchFields = append(fa.MatchFields, val)
+			}
+		}
+		// Agument the Actions
+		fa.Actions = append(fa.Actions, Output(ingressHop.Egress))
+		fg := fu.NewFlowsAndGroups()
+		fg.AddFlow(MkFlowStat(fa))
+		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
+	}
+	return deviceRules
+}
+
+func (fd *FlowDecomposer) processUnicastFlow(agent coreIf.LogicalDeviceAgent, route []graph.RouteHop, inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules) *fu.DeviceRules {
+	log.Debugw("unicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
+	var actions []ofp.OfpActionType
+	var isOutputTypeInActions bool
+	for _, action := range GetActions(flow) {
+		actions = append(actions, action.Type)
+		if !isOutputTypeInActions && action.Type == OUTPUT {
+			isOutputTypeInActions = true
+		}
+	}
+	if len(actions) == 1 && isOutputTypeInActions {
+		var fa *fu.FlowArgs
+		// Parent device flow
+		fa = &fu.FlowArgs{
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			MatchFields: []*ofp.OfpOxmOfbField{
+				InPort(route[0].Ingress), //ingress_hop.ingress_port.port_no
+			},
+			Actions: []*ofp.OfpAction{
+				Output(route[0].Egress),
+			},
+		}
+		// Augment the matchfields with the ofpfields from the flow
+		for _, val := range GetOfbFields(flow) {
+			if val.Type != IN_PORT {
+				fa.MatchFields = append(fa.MatchFields, val)
+			}
+		}
+		fg := fu.NewFlowsAndGroups()
+		fg.AddFlow(MkFlowStat(fa))
+		deviceRules.AddFlowsAndGroup(route[0].DeviceID, fg)
+
+		// Child device flow
+		fa = &fu.FlowArgs{
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			MatchFields: []*ofp.OfpOxmOfbField{
+				InPort(route[1].Ingress), //egress_hop.ingress_port.port_no
+			},
+			Actions: []*ofp.OfpAction{
+				Output(route[1].Egress),
+			},
+		}
+		// Augment the matchfields with the ofpfields from the flow
+		for _, val := range GetOfbFields(flow) {
+			if val.Type != IN_PORT {
+				fa.MatchFields = append(fa.MatchFields, val)
+			}
+		}
+		fg = fu.NewFlowsAndGroups()
+		fg.AddFlow(MkFlowStat(fa))
+		deviceRules.AddFlowsAndGroup(route[1].DeviceID, fg)
+	} else {
+		var fa *fu.FlowArgs
+		fa = &fu.FlowArgs{
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			MatchFields: []*ofp.OfpOxmOfbField{
+				InPort(route[1].Ingress), //egress_hop.ingress_port.port_no
+			},
+		}
+		// Augment the matchfields with the ofpfields from the flow
+		for _, val := range GetOfbFields(flow) {
+			if val.Type != IN_PORT {
+				fa.MatchFields = append(fa.MatchFields, val)
+			}
+		}
+		// Augment the Actions
+		updatedAction := make([]*ofp.OfpAction, 0)
+		for _, action := range GetActions(flow) {
+			if action.Type != OUTPUT {
+				updatedAction = append(updatedAction, action)
+			}
+		}
+		updatedAction = append(updatedAction, Output(route[1].Egress))
+		fa.Actions = updatedAction
+		fg := fu.NewFlowsAndGroups()
+		fg.AddFlow(MkFlowStat(fa))
+		deviceRules.AddFlowsAndGroup(route[1].DeviceID, fg)
+	}
+	return deviceRules
+}
+
+func (fd *FlowDecomposer) processMulticastFlow(agent coreIf.LogicalDeviceAgent, route []graph.RouteHop, inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, grpId uint32, groupMap map[uint32]*ofp.OfpGroupEntry) *fu.DeviceRules {
+	log.Debugw("multicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
+
+	//having no Group yet is the same as having a Group with no buckets
+	var grp *ofp.OfpGroupEntry
+	var ok bool
+	if grp, ok = groupMap[grpId]; !ok {
+		log.Warnw("Group-id-not-present-in-map", log.Fields{"grpId": grpId, "groupMap": groupMap})
+		return deviceRules
+	}
+	if grp == nil || grp.Desc == nil {
+		log.Warnw("Group-or-desc-nil", log.Fields{"grpId": grpId, "grp": grp})
+		return deviceRules
+	}
+	for _, bucket := range grp.Desc.Buckets {
+		otherActions := make([]*ofp.OfpAction, 0)
+		for _, action := range bucket.Actions {
+			if action.Type == OUTPUT {
+				outPortNo = action.GetOutput().Port
+			} else if action.Type != POP_VLAN {
+				otherActions = append(otherActions, action)
+			}
+		}
+
+		route2 := agent.GetRoute(&inPortNo, &outPortNo)
+		switch len(route2) {
+		case 0:
+			log.Errorw("mc-no-route", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "comment": "deleting flow"})
+			//	TODO: Delete flow
+			return deviceRules
+		case 2:
+			log.Debugw("route-found", log.Fields{"ingressHop": route[0], "egressHop": route[1]})
+			break
+		default:
+			log.Errorw("invalid-route-length", log.Fields{"routeLen": len(route)})
+			return deviceRules
+		}
+
+		if route[0].Ingress != route2[0].Ingress {
+			log.Errorw("mc-ingress-hop-hop2-mismatch", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "comment": "ignoring flow"})
+			return deviceRules
+		}
+		// Set the parent device flow
+		var fa *fu.FlowArgs
+		fa = &fu.FlowArgs{
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			MatchFields: []*ofp.OfpOxmOfbField{
+				InPort(route[0].Ingress), //ingress_hop.ingress_port.port_no
+			},
+		}
+		// Augment the matchfields with the ofpfields from the flow
+		for _, val := range GetOfbFields(flow) {
+			if val.Type != IN_PORT {
+				fa.MatchFields = append(fa.MatchFields, val)
+			}
+		}
+		// Augment the Actions
+		updatedAction := make([]*ofp.OfpAction, 0)
+		for _, action := range GetActions(flow) {
+			if action.Type != GROUP {
+				updatedAction = append(updatedAction, action)
+			}
+		}
+		updatedAction = append(updatedAction, PopVlan())
+		updatedAction = append(updatedAction, Output(route[1].Ingress))
+		fa.Actions = updatedAction
+		fg := fu.NewFlowsAndGroups()
+		fg.AddFlow(MkFlowStat(fa))
+		deviceRules.AddFlowsAndGroup(route[0].DeviceID, fg)
+
+		// Set the child device flow
+		fa = &fu.FlowArgs{
+			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
+			MatchFields: []*ofp.OfpOxmOfbField{
+				InPort(route[1].Ingress), //egress_hop.ingress_port.port_no
+			},
+		}
+		// Augment the matchfields with the ofpfields from the flow
+		for _, val := range GetOfbFields(flow) {
+			if val.Type != IN_PORT && val.Type != VLAN_VID && val.Type != VLAN_PCP {
+				fa.MatchFields = append(fa.MatchFields, val)
+			}
+		}
+		// Augment the Actions
+		otherActions = append(otherActions, Output(route[1].Egress))
+		fa.Actions = otherActions
+		fg = fu.NewFlowsAndGroups()
+		fg.AddFlow(MkFlowStat(fa))
+		deviceRules.AddFlowsAndGroup(route[1].DeviceID, fg)
+	}
+	return deviceRules
+}
+
+func (fd *FlowDecomposer) decomposeFlow(agent coreIf.LogicalDeviceAgent, flow *ofp.OfpFlowStats, groupMap map[uint32]*ofp.OfpGroupEntry) *fu.DeviceRules {
+	inPortNo := GetInPort(flow)
+	outPortNo := GetOutPort(flow)
+
+	deviceRules := fu.NewDeviceRules()
+
+	route := agent.GetRoute(&inPortNo, &outPortNo)
+	switch len(route) {
+	case 0:
+		log.Errorw("no-route", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "comment": "deleting-flow"})
+		//	TODO: Delete flow
+		return deviceRules
+	case 2:
+		log.Debugw("route-found", log.Fields{"ingressHop": route[0], "egressHop": route[1]})
+		break
+	default:
+		log.Errorw("invalid-route-length", log.Fields{"routeLen": len(route)})
+		return deviceRules
+	}
+
+	var ingressDevice *voltha.Device
+	//var egressDevice *voltha.Device
+	var err error
+	if ingressDevice, err = fd.deviceMgr.GetDevice(route[0].DeviceID); err != nil {
+		log.Errorw("ingress-device-not-found", log.Fields{"deviceId": route[0].DeviceID})
+		return deviceRules
+	}
+	//if egressDevice, err = fd.deviceMgr.getDevice(route[1].DeviceID); err != nil {
+	//	log.Errorw("egress-device-not-found", log.Fields{"deviceId": route[1].DeviceID})
+	//	return deviceRules
+	//}
+
+	isDownstream := ingressDevice.Root
+	isUpstream := !isDownstream
+
+	// Process controller bound flow
+	if outPortNo != 0 && (outPortNo&0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER) {
+		deviceRules = fd.processControllerBoundFlow(agent, route, inPortNo, outPortNo, flow, deviceRules)
+	} else {
+		if isUpstream {
+			deviceRules = fd.processUpstreamNonControllerBoundFlow(agent, route, inPortNo, outPortNo, flow, deviceRules)
+		} else if HasNextTable(flow) {
+			deviceRules = fd.processDownstreamFlowWithNextTable(agent, route, inPortNo, outPortNo, flow, deviceRules)
+		} else if outPortNo != 0 { // Unicast
+			deviceRules = fd.processUnicastFlow(agent, route, inPortNo, outPortNo, flow, deviceRules)
+		} else if grpId := GetGroup(flow); grpId != 0 { //Multicast
+			deviceRules = fd.processMulticastFlow(agent, route, inPortNo, outPortNo, flow, deviceRules, grpId, groupMap)
+		}
+	}
+	return deviceRules
+}
diff --git a/rw_core/flow_decomposition/flow_decomposer_test.go b/rw_core/flow_decomposition/flow_decomposer_test.go
new file mode 100644
index 0000000..b2ba777
--- /dev/null
+++ b/rw_core/flow_decomposition/flow_decomposer_test.go
@@ -0,0 +1,665 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flow_decomposition
+
+import (
+	"errors"
+	"github.com/opencord/voltha-go/common/log"
+	ofp "github.com/opencord/voltha-go/protos/openflow_13"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"github.com/opencord/voltha-go/rw_core/graph"
+	fu "github.com/opencord/voltha-go/rw_core/utils"
+	"github.com/stretchr/testify/assert"
+
+	"testing"
+)
+
+const (
+	maxOnuOnPort4 int = 1
+	maxOnuOnPort5 int = 1
+)
+
+func init() {
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
+	log.UpdateAllLoggers(log.Fields{"instanceId": "flow-descomposition"})
+	log.SetAllLogLevel(log.DebugLevel)
+}
+
+type testDeviceManager struct {
+	devices map[string]*voltha.Device
+}
+
+func newTestDeviceManager() *testDeviceManager {
+	var tdm testDeviceManager
+	tdm.devices = make(map[string]*voltha.Device)
+	tdm.devices["olt"] = &voltha.Device{
+		Id:       "olt",
+		Root:     true,
+		ParentId: "logical_device",
+		Ports: []*voltha.Port{
+			&voltha.Port{PortNo: 1, Label: "pon"},
+			&voltha.Port{PortNo: 2, Label: "nni"},
+		},
+	}
+	tdm.devices["onu1"] = &voltha.Device{
+		Id:       "onu1",
+		Root:     false,
+		ParentId: "olt",
+		Ports: []*voltha.Port{
+			&voltha.Port{PortNo: 1, Label: "pon"},
+			&voltha.Port{PortNo: 2, Label: "uni"},
+		},
+	}
+	tdm.devices["onu2"] = &voltha.Device{
+		Id:       "onu2",
+		Root:     false,
+		ParentId: "olt",
+		Ports: []*voltha.Port{
+			&voltha.Port{PortNo: 1, Label: "pon"},
+			&voltha.Port{PortNo: 2, Label: "uni"},
+		},
+	}
+	tdm.devices["onu3"] = &voltha.Device{
+		Id:       "onu3",
+		Root:     false,
+		ParentId: "olt",
+		Ports: []*voltha.Port{
+			&voltha.Port{PortNo: 1, Label: "pon"},
+			&voltha.Port{PortNo: 2, Label: "uni"},
+		},
+	}
+	tdm.devices["onu4"] = &voltha.Device{
+		Id:       "onu4",
+		Root:     false,
+		ParentId: "olt",
+		Ports: []*voltha.Port{
+			&voltha.Port{PortNo: 1, Label: "pon"},
+			&voltha.Port{PortNo: 2, Label: "uni"},
+		},
+	}
+	return &tdm
+}
+
+func (tdm *testDeviceManager) GetDevice(deviceId string) (*voltha.Device, error) {
+	if d, ok := tdm.devices[deviceId]; ok {
+		return d, nil
+	}
+	return nil, errors.New("Absent")
+}
+
+type testFlowDecomposer struct {
+	dMgr         *testDeviceManager
+	logicalPorts map[uint32]*voltha.LogicalPort
+	routes       map[graph.OFPortLink][]graph.RouteHop
+	defaultRules *fu.DeviceRules
+	deviceGraph  *graph.DeviceGraph
+	fd           *FlowDecomposer
+}
+
+func newTestFlowDecomposer(deviceMgr *testDeviceManager) *testFlowDecomposer {
+	var tfd testFlowDecomposer
+	tfd.dMgr = deviceMgr
+
+	tfd.logicalPorts = make(map[uint32]*voltha.LogicalPort)
+	// Go protobuf interpreted absence of a port as 0, so we can't use port #0 as an openflow
+	// port
+	tfd.logicalPorts[10] = &voltha.LogicalPort{Id: "10", DeviceId: "olt", DevicePortNo: 2}
+	tfd.logicalPorts[1] = &voltha.LogicalPort{Id: "1", DeviceId: "onu1", DevicePortNo: 2}
+	tfd.logicalPorts[2] = &voltha.LogicalPort{Id: "2", DeviceId: "onu2", DevicePortNo: 2}
+	tfd.logicalPorts[3] = &voltha.LogicalPort{Id: "3", DeviceId: "onu3", DevicePortNo: 2}
+	tfd.logicalPorts[4] = &voltha.LogicalPort{Id: "4", DeviceId: "onu4", DevicePortNo: 2}
+
+	tfd.routes = make(map[graph.OFPortLink][]graph.RouteHop)
+
+	//DOWNSTREAM ROUTES
+
+	tfd.routes[graph.OFPortLink{Ingress: 10, Egress: 1}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[0].PortNo,
+		},
+		graph.RouteHop{
+			DeviceID: "onu1",
+			Ingress:  tfd.dMgr.devices["onu1"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["onu1"].Ports[1].PortNo,
+		},
+	}
+
+	tfd.routes[graph.OFPortLink{Ingress: 10, Egress: 2}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[0].PortNo,
+		},
+		graph.RouteHop{
+			DeviceID: "onu2",
+			Ingress:  tfd.dMgr.devices["onu2"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["onu2"].Ports[1].PortNo,
+		},
+	}
+	tfd.routes[graph.OFPortLink{Ingress: 10, Egress: 3}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[0].PortNo,
+		},
+		graph.RouteHop{
+			DeviceID: "onu3",
+			Ingress:  tfd.dMgr.devices["onu3"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["onu3"].Ports[1].PortNo,
+		},
+	}
+	tfd.routes[graph.OFPortLink{Ingress: 10, Egress: 4}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[0].PortNo,
+		},
+		graph.RouteHop{
+			DeviceID: "onu4",
+			Ingress:  tfd.dMgr.devices["onu4"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["onu4"].Ports[1].PortNo,
+		},
+	}
+
+	//UPSTREAM DATA PLANE
+
+	tfd.routes[graph.OFPortLink{Ingress: 1, Egress: 10}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "onu1",
+			Ingress:  tfd.dMgr.devices["onu1"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["onu1"].Ports[0].PortNo,
+		},
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[1].PortNo,
+		},
+	}
+	tfd.routes[graph.OFPortLink{Ingress: 2, Egress: 10}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "onu2",
+			Ingress:  tfd.dMgr.devices["onu2"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["onu2"].Ports[0].PortNo,
+		},
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[1].PortNo,
+		},
+	}
+	tfd.routes[graph.OFPortLink{Ingress: 3, Egress: 10}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "onu3",
+			Ingress:  tfd.dMgr.devices["onu3"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["onu3"].Ports[0].PortNo,
+		},
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[1].PortNo,
+		},
+	}
+	tfd.routes[graph.OFPortLink{Ingress: 4, Egress: 10}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "onu4",
+			Ingress:  tfd.dMgr.devices["onu4"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["onu4"].Ports[0].PortNo,
+		},
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[1].PortNo,
+		},
+	}
+
+	//UPSTREAM NEXT TABLE BASED
+
+	// openflow port 0 means absence of a port - go/protobuf interpretation
+	tfd.routes[graph.OFPortLink{Ingress: 1, Egress: 0}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "onu1",
+			Ingress:  tfd.dMgr.devices["onu1"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["onu1"].Ports[0].PortNo,
+		},
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[1].PortNo,
+		},
+	}
+	tfd.routes[graph.OFPortLink{Ingress: 2, Egress: 0}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "onu2",
+			Ingress:  tfd.dMgr.devices["onu2"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["onu2"].Ports[0].PortNo,
+		},
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[1].PortNo,
+		},
+	}
+	tfd.routes[graph.OFPortLink{Ingress: 3, Egress: 0}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "onu3",
+			Ingress:  tfd.dMgr.devices["onu3"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["onu3"].Ports[0].PortNo,
+		},
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[1].PortNo,
+		},
+	}
+	tfd.routes[graph.OFPortLink{Ingress: 4, Egress: 0}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "onu4",
+			Ingress:  tfd.dMgr.devices["onu4"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["onu4"].Ports[0].PortNo,
+		},
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[1].PortNo,
+		},
+	}
+
+	// DOWNSTREAM NEXT TABLE BASED
+
+	tfd.routes[graph.OFPortLink{Ingress: 10, Egress: 0}] = []graph.RouteHop{
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[1].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[0].PortNo,
+		},
+		graph.RouteHop{}, // 2nd hop is not known yet
+	}
+
+	tfd.routes[graph.OFPortLink{Ingress: 0, Egress: 10}] = []graph.RouteHop{
+		graph.RouteHop{}, // 1st hop is wildcard
+		graph.RouteHop{
+			DeviceID: "olt",
+			Ingress:  tfd.dMgr.devices["olt"].Ports[0].PortNo,
+			Egress:   tfd.dMgr.devices["olt"].Ports[1].PortNo,
+		},
+	}
+
+	// DEFAULT RULES
+
+	tfd.defaultRules = fu.NewDeviceRules()
+	fg := fu.NewFlowsAndGroups()
+	var fa *fu.FlowArgs
+	fa = &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+		Actions: []*ofp.OfpAction{
+			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+			Output(1),
+		},
+	}
+	fg.AddFlow(MkFlowStat(fa))
+	tfd.defaultRules.AddFlowsAndGroup("onu1", fg)
+
+	fg = fu.NewFlowsAndGroups()
+	fa = &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+		Actions: []*ofp.OfpAction{
+			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 102)),
+			Output(1),
+		},
+	}
+	fg.AddFlow(MkFlowStat(fa))
+	tfd.defaultRules.AddFlowsAndGroup("onu2", fg)
+
+	fg = fu.NewFlowsAndGroups()
+	fa = &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+		Actions: []*ofp.OfpAction{
+			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 103)),
+			Output(1),
+		},
+	}
+	fg.AddFlow(MkFlowStat(fa))
+	tfd.defaultRules.AddFlowsAndGroup("onu3", fg)
+
+	fg = fu.NewFlowsAndGroups()
+	fa = &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+		Actions: []*ofp.OfpAction{
+			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 104)),
+			Output(1),
+		},
+	}
+	fg.AddFlow(MkFlowStat(fa))
+	tfd.defaultRules.AddFlowsAndGroup("onu4", fg)
+
+	//Set up the device graph - flow decomposer uses it only to verify whether a port is a root port.
+	tfd.deviceGraph = graph.NewDeviceGraph(tfd.getDeviceHelper)
+	tfd.deviceGraph.RootPorts = make(map[uint32]uint32)
+	tfd.deviceGraph.RootPorts[10] = 10
+
+	tfd.fd = NewFlowDecomposer(tfd.dMgr)
+
+	return &tfd
+}
+
+func (tfd *testFlowDecomposer) getDeviceHelper(deviceId string) (*voltha.Device, error) {
+	return tfd.dMgr.GetDevice(deviceId)
+}
+
+func (tfd *testFlowDecomposer) GetDeviceLogicalId() string {
+	return ""
+}
+
+func (tfd *testFlowDecomposer) GetLogicalDevice() *voltha.LogicalDevice {
+	return nil
+}
+
+func (tfd *testFlowDecomposer) GetDeviceGraph() *graph.DeviceGraph {
+	return tfd.deviceGraph
+}
+
+func (tfd *testFlowDecomposer) GetAllDefaultRules() *fu.DeviceRules {
+	return tfd.defaultRules
+}
+
+func (tfd *testFlowDecomposer) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
+	lPorts := make([]uint32, 0)
+	var exclPort uint32
+	if len(excludePort) == 1 {
+		exclPort = excludePort[0]
+	}
+	for portno, _ := range tfd.logicalPorts {
+		if portno != exclPort {
+			lPorts = append(lPorts, portno)
+		}
+	}
+	return lPorts
+}
+
+func (tfd *testFlowDecomposer) GetRoute(ingressPortNo *uint32, egressPortNo *uint32) []graph.RouteHop {
+	var portLink graph.OFPortLink
+	if egressPortNo == nil {
+		portLink.Egress = 0
+	} else if *egressPortNo&0x7fffffff == uint32(ofp.OfpPortNo_OFPP_CONTROLLER) {
+		portLink.Egress = 10
+	} else {
+		portLink.Egress = *egressPortNo
+	}
+	if ingressPortNo == nil {
+		portLink.Ingress = 0
+	} else {
+		portLink.Ingress = *ingressPortNo
+	}
+	for key, val := range tfd.routes {
+		if key.Ingress == portLink.Ingress && key.Egress == portLink.Egress {
+			return val
+		}
+	}
+	return nil
+}
+
+func TestEapolReRouteRuleDecomposition(t *testing.T) {
+
+	var fa *fu.FlowArgs
+	fa = &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(1),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			EthType(0x888e),
+		},
+		Actions: []*ofp.OfpAction{
+			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+			Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+		},
+	}
+
+	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{MkFlowStat(fa)}}
+	groups := ofp.FlowGroups{}
+	tfd := newTestFlowDecomposer(newTestDeviceManager())
+
+	device_rules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	onu1FlowAndGroup := device_rules.Rules["onu1"]
+	oltFlowAndGroup := device_rules.Rules["olt"]
+	assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
+	assert.Equal(t, 0, onu1FlowAndGroup.Groups.Len())
+	assert.Equal(t, 2, oltFlowAndGroup.Flows.Len())
+	assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
+
+	fa = &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+		Actions: []*ofp.OfpAction{
+			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+			Output(1),
+		},
+	}
+	expectedOnu1Flow := MkFlowStat(fa)
+	derivedFlow := onu1FlowAndGroup.GetFlow(0)
+	assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
+
+	fa = &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(1),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1),
+			EthType(0x888e),
+		},
+		Actions: []*ofp.OfpAction{
+			PushVlan(0x8100),
+			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
+			Output(2),
+		},
+	}
+	expectedOltFlow := MkFlowStat(fa)
+	derivedFlow = oltFlowAndGroup.GetFlow(0)
+	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
+
+	fa = &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
+			VlanPcp(0),
+			Metadata_ofp(1),
+		},
+		Actions: []*ofp.OfpAction{
+			PopVlan(),
+			Output(1),
+		},
+	}
+	expectedOltFlow = MkFlowStat(fa)
+	derivedFlow = oltFlowAndGroup.GetFlow(1)
+	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
+}
+
+func TestDhcpReRouteRuleDecomposition(t *testing.T) {
+
+	var fa *fu.FlowArgs
+	fa = &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(1),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			EthType(0x0800),
+			Ipv4Dst(0xffffffff),
+			IpProto(17),
+			UdpSrc(68),
+			UdpDst(67),
+		},
+		Actions: []*ofp.OfpAction{
+			Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+		},
+	}
+
+	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{MkFlowStat(fa)}}
+	groups := ofp.FlowGroups{}
+	tfd := newTestFlowDecomposer(newTestDeviceManager())
+
+	device_rules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	onu1FlowAndGroup := device_rules.Rules["onu1"]
+	oltFlowAndGroup := device_rules.Rules["olt"]
+	assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
+	assert.Equal(t, 0, onu1FlowAndGroup.Groups.Len())
+	assert.Equal(t, 2, oltFlowAndGroup.Flows.Len())
+	assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
+
+	fa = &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+		Actions: []*ofp.OfpAction{
+			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+			Output(1),
+		},
+	}
+	expectedOnu1Flow := MkFlowStat(fa)
+	derivedFlow := onu1FlowAndGroup.GetFlow(0)
+	assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
+
+	fa = &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(1),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1),
+			EthType(0x0800),
+			Ipv4Dst(0xffffffff),
+			IpProto(17),
+			UdpSrc(68),
+			UdpDst(67),
+		},
+		Actions: []*ofp.OfpAction{
+			PushVlan(0x8100),
+			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
+			Output(2),
+		},
+	}
+	expectedOltFlow := MkFlowStat(fa)
+	derivedFlow = oltFlowAndGroup.GetFlow(0)
+	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
+
+	fa = &fu.FlowArgs{
+		KV: fu.OfpFlowModArgs{"priority": 1000},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			InPort(2),
+			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
+			VlanPcp(0),
+			Metadata_ofp(1),
+		},
+		Actions: []*ofp.OfpAction{
+			PopVlan(),
+			Output(1),
+		},
+	}
+	expectedOltFlow = MkFlowStat(fa)
+	derivedFlow = oltFlowAndGroup.GetFlow(1)
+	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
+}
+
+//func TestUnicastUpstreamRuleDecomposition(t *testing.T) {
+//
+//	var fa *fu.FlowArgs
+//	fa = &fu.FlowArgs{
+//		KV: fu.OfpFlowModArgs{"priority": 500, "table_id":1},
+//		MatchFields: []*ofp.OfpOxmOfbField{
+//			InPort(1),
+//			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+//			VlanPcp(0),
+//		},
+//		Actions: []*ofp.OfpAction{
+//			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+//		},
+//	}
+//
+//	var fa2 *fu.FlowArgs
+//	fa2 = &fu.FlowArgs{
+//		KV: fu.OfpFlowModArgs{"priority": 500},
+//		MatchFields: []*ofp.OfpOxmOfbField{
+//			InPort(1),
+//			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
+//			VlanPcp(0),
+//		},
+//		Actions: []*ofp.OfpAction{
+//			PushVlan(0x8100),
+//			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1000)),
+//			SetField(VlanPcp(0)),
+//			Output(10),
+//		},
+//	}
+//
+//	flows := ofp.Flows{Items:[]*ofp.OfpFlowStats{MkFlowStat(fa), MkFlowStat(fa2)}}
+//	groups := ofp.FlowGroups{}
+//	tfd := newTestFlowDecomposer(newTestDeviceManager())
+//
+//	device_rules := tfd.fd.DecomposeRules(tfd, flows, groups)
+//	onu1FlowAndGroup := device_rules.Rules["onu1"]
+//	oltFlowAndGroup := device_rules.Rules["olt"]
+//	assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())
+//	assert.Equal(t, 0, onu1FlowAndGroup.Groups.Len())
+//	assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
+//	assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
+//
+//	fa = &fu.FlowArgs{
+//		KV: fu.OfpFlowModArgs{"priority": 500},
+//		MatchFields: []*ofp.OfpOxmOfbField{
+//			InPort(2),
+//			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+//			VlanPcp(0),
+//		},
+//		Actions: []*ofp.OfpAction{
+//			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
+//			Output(1),
+//		},
+//	}
+//	expectedOnu1Flow := MkFlowStat(fa)
+//	derivedFlow := onu1FlowAndGroup.GetFlow(1)
+//	assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
+//
+//	fa = &fu.FlowArgs{
+//		KV: fu.OfpFlowModArgs{"priority": 500},
+//		MatchFields: []*ofp.OfpOxmOfbField{
+//			InPort(1),
+//			VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101),
+//			VlanPcp(0),
+//		},
+//		Actions: []*ofp.OfpAction{
+//			PushVlan(0x8100),
+//			SetField(VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 1000)),
+//			SetField(VlanPcp(0)),
+//			Output(2),
+//		},
+//	}
+//	expectedOltFlow := MkFlowStat(fa)
+//	derivedFlow = oltFlowAndGroup.GetFlow(0)
+//	assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
+//}
diff --git a/rw_core/graph/device_graph.go b/rw_core/graph/device_graph.go
new file mode 100644
index 0000000..9acda6d
--- /dev/null
+++ b/rw_core/graph/device_graph.go
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package graph
+
+import (
+	"errors"
+	"fmt"
+	"github.com/gyuho/goraph"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"strconv"
+	"strings"
+)
+
+func init() {
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+type RouteHop struct {
+	DeviceID string
+	Ingress  uint32
+	Egress   uint32
+}
+
+type OFPortLink struct {
+	Ingress uint32
+	Egress  uint32
+}
+
+type GetDeviceFunc func(id string) (*voltha.Device, error)
+
+func concatDeviceIdPortId(deviceId string, portId uint32) string {
+	return fmt.Sprintf("%s:%d", deviceId, portId)
+}
+
+func splitIntoDeviceIdPortId(id string) (string, uint32, error) {
+	result := strings.Split(id, ":")
+	if len(result) != 2 {
+		return "", 0, errors.New(fmt.Sprintf("invalid-id-%s", id))
+	}
+	if temp, err := strconv.ParseInt(result[1], 10, 32); err != nil {
+		return "", 0, errors.New(fmt.Sprintf("invalid-id-%s-%s", id, err.Error()))
+	} else {
+		return result[0], uint32(temp), nil
+	}
+}
+
+type DeviceGraph struct {
+	GGraph        goraph.Graph
+	getDevice     GetDeviceFunc
+	logicalPorts  []*voltha.LogicalPort
+	RootPorts     map[uint32]uint32
+	Routes        map[OFPortLink][]RouteHop
+	boundaryPorts map[string]uint32
+}
+
+func NewDeviceGraph(getDevice GetDeviceFunc) *DeviceGraph {
+	var dg DeviceGraph
+	dg.GGraph = goraph.NewGraph()
+	dg.getDevice = getDevice
+	return &dg
+}
+
+func (dg *DeviceGraph) ComputeRoutes(lps []*voltha.LogicalPort) {
+	dg.logicalPorts = lps
+	// Set the root ports
+	dg.RootPorts = make(map[uint32]uint32)
+	for _, lp := range lps {
+		if lp.RootPort {
+			dg.RootPorts[lp.OfpPort.PortNo] = lp.OfpPort.PortNo
+		}
+	}
+	// set the boundary ports
+	dg.boundaryPorts = make(map[string]uint32)
+	for _, lp := range lps {
+		dg.boundaryPorts[fmt.Sprintf("%s:%d", lp.DeviceId, lp.DevicePortNo)] = lp.OfpPort.PortNo
+	}
+	dg.Routes = make(map[OFPortLink][]RouteHop)
+
+	// Build the graph
+	var device *voltha.Device
+	devicesAdded := make(map[string]string)
+	portsAdded := make(map[string]string)
+	for _, logicalPort := range dg.logicalPorts {
+		device, _ = dg.getDevice(logicalPort.DeviceId)
+		dg.GGraph = dg.addDevice(device, dg.GGraph, &devicesAdded, &portsAdded, dg.boundaryPorts)
+	}
+	dg.Routes = dg.buildRoutes()
+}
+
+func (dg *DeviceGraph) addDevice(device *voltha.Device, g goraph.Graph, devicesAdded *map[string]string, portsAdded *map[string]string,
+	boundaryPorts map[string]uint32) goraph.Graph {
+
+	if device == nil {
+		return g
+	}
+
+	if _, exist := (*devicesAdded)[device.Id]; exist {
+		return g
+	}
+	g.AddNode(goraph.NewNode(device.Id))
+	(*devicesAdded)[device.Id] = device.Id
+
+	var portId string
+	var peerPortId string
+	for _, port := range device.Ports {
+		portId = concatDeviceIdPortId(device.Id, port.PortNo)
+		if _, exist := (*portsAdded)[portId]; !exist {
+			(*portsAdded)[portId] = portId
+			g.AddNode(goraph.NewNode(portId))
+			g.AddEdge(goraph.StringID(device.Id), goraph.StringID(portId), 1)
+			g.AddEdge(goraph.StringID(portId), goraph.StringID(device.Id), 1)
+		}
+		for _, peer := range port.Peers {
+			if _, exist := (*devicesAdded)[peer.DeviceId]; !exist {
+				d, _ := dg.getDevice(peer.DeviceId)
+				g = dg.addDevice(d, g, devicesAdded, portsAdded, boundaryPorts)
+			} else {
+				peerPortId = concatDeviceIdPortId(peer.DeviceId, peer.PortNo)
+				g.AddEdge(goraph.StringID(portId), goraph.StringID(peerPortId), 1)
+				g.AddEdge(goraph.StringID(peerPortId), goraph.StringID(portId), 1)
+			}
+		}
+	}
+	return g
+}
+
+func (dg *DeviceGraph) IsRootPort(port uint32) bool {
+	_, exist := dg.RootPorts[port]
+	return exist
+}
+
+func (dg *DeviceGraph) buildRoutes() map[OFPortLink][]RouteHop {
+	var pathIds []goraph.ID
+	path := make([]RouteHop, 0)
+	paths := make(map[OFPortLink][]RouteHop)
+	var err error
+	var hop RouteHop
+	for source, sourcePort := range dg.boundaryPorts {
+		for target, targetPort := range dg.boundaryPorts {
+			if source == target {
+				continue
+			}
+			//Ignore NNI - NNI Routes
+			if dg.IsRootPort(sourcePort) && dg.IsRootPort(targetPort) {
+				continue
+			}
+
+			//Ignore UNI - UNI Routes
+			if !dg.IsRootPort(sourcePort) && !dg.IsRootPort(targetPort) {
+				continue
+			}
+
+			if pathIds, _, err = goraph.Dijkstra(dg.GGraph, goraph.StringID(source), goraph.StringID(target)); err != nil {
+				log.Errorw("no-path", log.Fields{"source": source, "target": target, "error": err})
+				continue
+			}
+			if len(pathIds)%3 != 0 {
+				continue
+			}
+			var deviceId string
+			var ingressPort uint32
+			var egressPort uint32
+			for i := 0; i < len(pathIds); i = i + 3 {
+				if deviceId, ingressPort, err = splitIntoDeviceIdPortId(pathIds[i].String()); err != nil {
+					log.Errorw("id-error", log.Fields{"source": source, "target": target, "error": err})
+					break
+				}
+				if _, egressPort, err = splitIntoDeviceIdPortId(pathIds[i+2].String()); err != nil {
+					log.Errorw("id-error", log.Fields{"source": source, "target": target, "error": err})
+					break
+				}
+				hop = RouteHop{Ingress: ingressPort, DeviceID: deviceId, Egress: egressPort}
+				path = append(path, hop)
+			}
+			tmp := make([]RouteHop, len(path))
+			copy(tmp, path)
+			path = nil
+			paths[OFPortLink{Ingress: sourcePort, Egress: targetPort}] = tmp
+		}
+	}
+	return paths
+}
+
+func (dg *DeviceGraph) GetDeviceNodeIds() map[string]string {
+	nodeIds := make(map[string]string)
+	nodesMap := dg.GGraph.GetNodes()
+	for id, node := range nodesMap {
+		if len(strings.Split(node.String(), ":")) != 2 { // not port node
+			nodeIds[id.String()] = id.String()
+		}
+	}
+	return nodeIds
+}
diff --git a/rw_core/graph/device_graph_test.go b/rw_core/graph/device_graph_test.go
new file mode 100644
index 0000000..533d03d
--- /dev/null
+++ b/rw_core/graph/device_graph_test.go
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package graph
+
+import (
+	"errors"
+	"fmt"
+	"github.com/opencord/voltha-go/protos/openflow_13"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"github.com/stretchr/testify/assert"
+	"testing"
+	"time"
+)
+
+var ld voltha.LogicalDevice
+var olt voltha.Device
+var onusOnPort4 []voltha.Device
+var onusOnPort5 []voltha.Device
+
+const (
+	maxOnuOnPort4 int = 64
+	maxOnuOnPort5 int = 64
+)
+
+func init() {
+
+	logicalDeviceId := "ld"
+	oltDeviceId := "olt"
+
+	// Setup ONUs on OLT port 4
+	onusOnPort4 = make([]voltha.Device, 0)
+	var onu voltha.Device
+	var id string
+	oltPeerPort := uint32(4)
+	for i := 0; i < maxOnuOnPort4; i++ {
+		id := fmt.Sprintf("onu%d", i)
+		onu = voltha.Device{Id: id, ParentId: oltDeviceId}
+		ponPort := voltha.Port{PortNo: 1, DeviceId: onu.Id, Type: voltha.Port_PON_ONU}
+		ponPort.Peers = make([]*voltha.Port_PeerPort, 0)
+		peerPort := voltha.Port_PeerPort{DeviceId: oltDeviceId, PortNo: oltPeerPort}
+		ponPort.Peers = append(ponPort.Peers, &peerPort)
+		uniPort := voltha.Port{PortNo: 2, DeviceId: onu.Id, Type: voltha.Port_ETHERNET_UNI}
+		onu.Ports = make([]*voltha.Port, 0)
+		onu.Ports = append(onu.Ports, &ponPort)
+		onu.Ports = append(onu.Ports, &uniPort)
+		onusOnPort4 = append(onusOnPort4, onu)
+	}
+
+	// Setup ONUs on OLT port 5
+	onusOnPort5 = make([]voltha.Device, 0)
+	oltPeerPort = uint32(5)
+	for i := 0; i < maxOnuOnPort5; i++ {
+		id := fmt.Sprintf("onu%d", i+maxOnuOnPort4)
+		onu = voltha.Device{Id: id, ParentId: oltDeviceId}
+		ponPort := voltha.Port{PortNo: 1, DeviceId: onu.Id, Type: voltha.Port_PON_ONU}
+		ponPort.Peers = make([]*voltha.Port_PeerPort, 0)
+		peerPort := voltha.Port_PeerPort{DeviceId: oltDeviceId, PortNo: oltPeerPort}
+		ponPort.Peers = append(ponPort.Peers, &peerPort)
+		uniPort := voltha.Port{PortNo: 2, DeviceId: onu.Id, Type: voltha.Port_ETHERNET_UNI}
+		onu.Ports = make([]*voltha.Port, 0)
+		onu.Ports = append(onu.Ports, &ponPort)
+		onu.Ports = append(onu.Ports, &uniPort)
+		onusOnPort5 = append(onusOnPort5, onu)
+	}
+
+	// Setup OLT
+	//	Setup the OLT device
+	olt = voltha.Device{Id: oltDeviceId, ParentId: logicalDeviceId}
+	p1 := voltha.Port{PortNo: 2, DeviceId: oltDeviceId, Type: voltha.Port_ETHERNET_NNI}
+	p2 := voltha.Port{PortNo: 3, DeviceId: oltDeviceId, Type: voltha.Port_ETHERNET_NNI}
+	p3 := voltha.Port{PortNo: 4, DeviceId: oltDeviceId, Type: voltha.Port_PON_OLT}
+	p4 := voltha.Port{PortNo: 5, DeviceId: oltDeviceId, Type: voltha.Port_PON_OLT}
+	p3.Peers = make([]*voltha.Port_PeerPort, 0)
+	for _, onu := range onusOnPort4 {
+		peerPort := voltha.Port_PeerPort{DeviceId: onu.Id, PortNo: p3.PortNo}
+		p3.Peers = append(p3.Peers, &peerPort)
+	}
+	p4.Peers = make([]*voltha.Port_PeerPort, 0)
+	for _, onu := range onusOnPort5 {
+		peerPort := voltha.Port_PeerPort{DeviceId: onu.Id, PortNo: p4.PortNo}
+		p4.Peers = append(p4.Peers, &peerPort)
+	}
+	olt.Ports = make([]*voltha.Port, 0)
+	olt.Ports = append(olt.Ports, &p1)
+	olt.Ports = append(olt.Ports, &p2)
+	olt.Ports = append(olt.Ports, &p3)
+	olt.Ports = append(olt.Ports, &p4)
+
+	// Setup the logical device
+	ld = voltha.LogicalDevice{Id: logicalDeviceId}
+	ld.Ports = make([]*voltha.LogicalPort, 0)
+	ofpPortNo := 1
+	//Add olt ports
+	for i, port := range olt.Ports {
+		if port.Type == voltha.Port_ETHERNET_NNI {
+			id = fmt.Sprintf("nni-%d", i)
+			lp := voltha.LogicalPort{Id: id, DeviceId: olt.Id, DevicePortNo: port.PortNo, OfpPort: &openflow_13.OfpPort{PortNo: uint32(ofpPortNo)}, RootPort: true}
+			ld.Ports = append(ld.Ports, &lp)
+			ofpPortNo = ofpPortNo + 1
+		}
+	}
+	//Add onu ports on port 4
+	for i, onu := range onusOnPort4 {
+		for _, port := range onu.Ports {
+			if port.Type == voltha.Port_ETHERNET_UNI {
+				id = fmt.Sprintf("uni-%d", i)
+				lp := voltha.LogicalPort{Id: id, DeviceId: onu.Id, DevicePortNo: port.PortNo, OfpPort: &openflow_13.OfpPort{PortNo: uint32(ofpPortNo)}, RootPort: false}
+				ld.Ports = append(ld.Ports, &lp)
+				ofpPortNo = ofpPortNo + 1
+			}
+		}
+	}
+	//Add onu ports on port 5
+	for i, onu := range onusOnPort5 {
+		for _, port := range onu.Ports {
+			if port.Type == voltha.Port_ETHERNET_UNI {
+				id = fmt.Sprintf("uni-%d", i+10)
+				lp := voltha.LogicalPort{Id: id, DeviceId: onu.Id, DevicePortNo: port.PortNo, OfpPort: &openflow_13.OfpPort{PortNo: uint32(ofpPortNo)}, RootPort: false}
+				ld.Ports = append(ld.Ports, &lp)
+				ofpPortNo = ofpPortNo + 1
+			}
+		}
+	}
+}
+
+func GetDeviceHelper(id string) (*voltha.Device, error) {
+	if id == "olt" {
+		return &olt, nil
+	}
+	for _, onu := range onusOnPort4 {
+		if onu.Id == id {
+			return &onu, nil
+		}
+	}
+	for _, onu := range onusOnPort5 {
+		if onu.Id == id {
+			return &onu, nil
+		}
+	}
+	return nil, errors.New("Not-found")
+}
+
+func TestGetRoutes(t *testing.T) {
+
+	getDevice := GetDeviceHelper
+
+	// Create a device graph and computes Routes
+	start := time.Now()
+	dg := NewDeviceGraph(getDevice)
+	dg.ComputeRoutes(ld.Ports)
+	fmt.Println("Total Time creating graph & compute Routes:", time.Since(start))
+	assert.NotNil(t, dg.GGraph)
+	assert.EqualValues(t, (maxOnuOnPort4*4 + maxOnuOnPort5*4), len(dg.Routes))
+	//for k, v := range dg.Routes {
+	//	fmt.Println("key", k, " value:", v)
+	//}
+
+}
diff --git a/rw_core/utils/flow_utils.go b/rw_core/utils/flow_utils.go
new file mode 100644
index 0000000..997c466
--- /dev/null
+++ b/rw_core/utils/flow_utils.go
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package utils
+
+import (
+	"bytes"
+	"github.com/cevaris/ordered_map"
+	"github.com/gogo/protobuf/proto"
+	ofp "github.com/opencord/voltha-go/protos/openflow_13"
+)
+
+type OfpFlowModArgs map[string]uint64
+
+type FlowArgs struct {
+	MatchFields []*ofp.OfpOxmOfbField
+	Actions     []*ofp.OfpAction
+	Command     *ofp.OfpFlowModCommand
+	Priority    uint32
+	KV          OfpFlowModArgs
+}
+
+type FlowsAndGroups struct {
+	Flows  *ordered_map.OrderedMap
+	Groups *ordered_map.OrderedMap
+}
+
+func NewFlowsAndGroups() *FlowsAndGroups {
+	var fg FlowsAndGroups
+	fg.Flows = ordered_map.NewOrderedMap()
+	fg.Groups = ordered_map.NewOrderedMap()
+	return &fg
+}
+
+func (fg *FlowsAndGroups) Copy() *FlowsAndGroups {
+	copyFG := NewFlowsAndGroups()
+	iter := fg.Flows.IterFunc()
+	for kv, ok := iter(); ok; kv, ok = iter() {
+		if protoMsg, isMsg := kv.Value.(*ofp.OfpFlowStats); isMsg {
+			copyFG.Flows.Set(kv.Key, proto.Clone(protoMsg))
+		}
+	}
+	iter = fg.Groups.IterFunc()
+	for kv, ok := iter(); ok; kv, ok = iter() {
+		if protoMsg, isMsg := kv.Value.(*ofp.OfpGroupEntry); isMsg {
+			copyFG.Groups.Set(kv.Key, proto.Clone(protoMsg))
+		}
+	}
+	return copyFG
+}
+
+func (fg *FlowsAndGroups) GetFlow(index int) *ofp.OfpFlowStats {
+	iter := fg.Flows.IterFunc()
+	pos := 0
+	for kv, ok := iter(); ok; kv, ok = iter() {
+		if pos == index {
+			if protoMsg, isMsg := kv.Value.(*ofp.OfpFlowStats); isMsg {
+				return protoMsg
+			}
+			return nil
+		}
+		pos += 1
+	}
+	return nil
+}
+
+func (fg *FlowsAndGroups) String() string {
+	var buffer bytes.Buffer
+	iter := fg.Flows.IterFunc()
+	for kv, ok := iter(); ok; kv, ok = iter() {
+		if protoMsg, isMsg := kv.Value.(*ofp.OfpFlowStats); isMsg {
+			buffer.WriteString("\nFlow:\n")
+			buffer.WriteString(proto.MarshalTextString(protoMsg))
+			buffer.WriteString("\n")
+		}
+	}
+	iter = fg.Groups.IterFunc()
+	for kv, ok := iter(); ok; kv, ok = iter() {
+		if protoMsg, isMsg := kv.Value.(*ofp.OfpGroupEntry); isMsg {
+			buffer.WriteString("\nGroup:\n")
+			buffer.WriteString(proto.MarshalTextString(protoMsg))
+			buffer.WriteString("\n")
+		}
+	}
+	return buffer.String()
+}
+
+func (fg *FlowsAndGroups) AddFlow(flow *ofp.OfpFlowStats) {
+	if fg.Flows == nil {
+		fg.Flows = ordered_map.NewOrderedMap()
+	}
+	if fg.Groups == nil {
+		fg.Groups = ordered_map.NewOrderedMap()
+	}
+	//Add flow only if absent
+	if _, exist := fg.Flows.Get(flow.Id); !exist {
+		fg.Flows.Set(flow.Id, flow)
+	}
+}
+
+//AddFrom add flows and groups from the argument into this structure only if they do not already exist
+func (fg *FlowsAndGroups) AddFrom(from *FlowsAndGroups) {
+	iter := from.Flows.IterFunc()
+	for kv, ok := iter(); ok; kv, ok = iter() {
+		if protoMsg, isMsg := kv.Value.(*ofp.OfpFlowStats); isMsg {
+			if _, exist := fg.Flows.Get(protoMsg.Id); !exist {
+				fg.Flows.Set(protoMsg.Id, protoMsg)
+			}
+		}
+	}
+	iter = from.Groups.IterFunc()
+	for kv, ok := iter(); ok; kv, ok = iter() {
+		if protoMsg, isMsg := kv.Value.(*ofp.OfpGroupEntry); isMsg {
+			if _, exist := fg.Groups.Get(protoMsg.Stats.GroupId); !exist {
+				fg.Groups.Set(protoMsg.Stats.GroupId, protoMsg)
+			}
+		}
+	}
+}
+
+type DeviceRules struct {
+	Rules map[string]*FlowsAndGroups
+}
+
+func NewDeviceRules() *DeviceRules {
+	var dr DeviceRules
+	dr.Rules = make(map[string]*FlowsAndGroups)
+	return &dr
+}
+
+func (dr *DeviceRules) Copy() *DeviceRules {
+	copyDR := NewDeviceRules()
+	for key, val := range dr.Rules {
+		copyDR.Rules[key] = val.Copy()
+	}
+	return copyDR
+}
+
+func (dr *DeviceRules) String() string {
+	var buffer bytes.Buffer
+	for key, value := range dr.Rules {
+		buffer.WriteString("DeviceId:")
+		buffer.WriteString(key)
+		buffer.WriteString(value.String())
+		buffer.WriteString("\n\n")
+	}
+	return buffer.String()
+}
+
+func (dr *DeviceRules) AddFlowsAndGroup(deviceId string, fg *FlowsAndGroups) {
+	dr.Rules[deviceId] = fg
+}