[VOL-1588] Improve Flow Add performance

This update consists of the following:
1) Update the performance when adding a flow to a logical device,
decomposing the flow into parent and child device and sending the
flow to the adapters.
2) Format a number of files as per GO fmt.
3) Ensure the device graph cache gets updated when a new port is
added to the graph that belongs to an existing device in cache.

The flow update/deletion performance will be addressed in a separate
commit.

Change-Id: I2eb663cc73eef9fc6172203ed88a35726f5fe008
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index f87d8ce..a282b9b 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,63 +24,65 @@
 
 // RW Core service default constants
 const (
-	ConsulStoreName               = "consul"
-	EtcdStoreName                 = "etcd"
-	default_InstanceID           = "rwcore001"
-	default_GrpcPort             = 50057
-	default_GrpcHost             = ""
-	default_KafkaAdapterHost     = "127.0.0.1"
-	default_KafkaAdapterPort     = 9092
-	default_KafkaClusterHost     = "127.0.0.1"
-	default_KafkaClusterPort     = 9094
-	default_KVStoreType          = EtcdStoreName
-	default_KVStoreTimeout       = 5 //in seconds
-	default_KVStoreHost         = "127.0.0.1"
-	default_KVStorePort         = 2379 // Consul = 8500; Etcd = 2379
-	default_KVTxnKeyDelTime     = 60
-	default_KVStoreDataPrefix   = "service/voltha"
-	default_LogLevel            = 0
-	default_Banner              = false
-	default_CoreTopic           = "rwcore"
-	default_RWCoreEndpoint      = "rwcore"
-	default_RWCoreKey           = "pki/voltha.key"
-	default_RWCoreCert          = "pki/voltha.crt"
-	default_RWCoreCA            = "pki/voltha-CA.pem"
-	default_AffinityRouterTopic = "affinityRouter"
-	default_InCompetingMode     = true
+	ConsulStoreName                   = "consul"
+	EtcdStoreName                     = "etcd"
+	default_InstanceID                = "rwcore001"
+	default_GrpcPort                  = 50057
+	default_GrpcHost                  = ""
+	default_KafkaAdapterHost          = "127.0.0.1"
+	default_KafkaAdapterPort          = 9092
+	default_KafkaClusterHost          = "127.0.0.1"
+	default_KafkaClusterPort          = 9094
+	default_KVStoreType               = EtcdStoreName
+	default_KVStoreTimeout            = 5 //in seconds
+	default_KVStoreHost               = "127.0.0.1"
+	default_KVStorePort               = 2379 // Consul = 8500; Etcd = 2379
+	default_KVTxnKeyDelTime           = 60
+	default_KVStoreDataPrefix         = "service/voltha"
+	default_LogLevel                  = 0
+	default_Banner                    = false
+	default_CoreTopic                 = "rwcore"
+	default_RWCoreEndpoint            = "rwcore"
+	default_RWCoreKey                 = "pki/voltha.key"
+	default_RWCoreCert                = "pki/voltha.crt"
+	default_RWCoreCA                  = "pki/voltha-CA.pem"
+	default_AffinityRouterTopic       = "affinityRouter"
+	default_InCompetingMode           = true
 	default_LongRunningRequestTimeout = int64(2000)
-	default_DefaultRequestTimeout = int64(500)
-	default_CoreBindingKey      = "voltha_backend_name"
+	default_DefaultRequestTimeout     = int64(500)
+	default_CoreTimeout               = int64(500)
+	default_CoreBindingKey            = "voltha_backend_name"
 )
 
 // RWCoreFlags represents the set of configurations used by the read-write core service
 type RWCoreFlags struct {
 	// Command line parameters
-	InstanceID           string
-	RWCoreEndpoint       string
-	GrpcHost            string
-	GrpcPort            int
-	KafkaAdapterHost    string
-	KafkaAdapterPort    int
-	KafkaClusterHost    string
-	KafkaClusterPort    int
-	KVStoreType         string
-	KVStoreTimeout      int // in seconds
-	KVStoreHost         string
-	KVStorePort         int
-	KVTxnKeyDelTime     int
-	KVStoreDataPrefix   string
-	CoreTopic           string
-	LogLevel            int
-	Banner              bool
-	RWCoreKey           string
-	RWCoreCert          string
-	RWCoreCA            string
-	AffinityRouterTopic string
-	InCompetingMode     bool
+	InstanceID                string
+	RWCoreEndpoint            string
+	GrpcHost                  string
+	GrpcPort                  int
+	KafkaAdapterHost          string
+	KafkaAdapterPort          int
+	KafkaClusterHost          string
+	KafkaClusterPort          int
+	KVStoreType               string
+	KVStoreTimeout            int // in seconds
+	KVStoreHost               string
+	KVStorePort               int
+	KVTxnKeyDelTime           int
+	KVStoreDataPrefix         string
+	CoreTopic                 string
+	LogLevel                  int
+	Banner                    bool
+	RWCoreKey                 string
+	RWCoreCert                string
+	RWCoreCA                  string
+	AffinityRouterTopic       string
+	InCompetingMode           bool
 	LongRunningRequestTimeout int64
-	DefaultRequestTimeout int64
-	CoreBindingKey      string
+	DefaultRequestTimeout     int64
+	DefaultCoreTimeout        int64
+	CoreBindingKey            string
 }
 
 func init() {
@@ -90,31 +92,32 @@
 // NewRWCoreFlags returns a new RWCore config
 func NewRWCoreFlags() *RWCoreFlags {
 	var rwCoreFlag = RWCoreFlags{ // Default values
-		InstanceID:          default_InstanceID,
-		RWCoreEndpoint:      default_RWCoreEndpoint,
-		GrpcHost:            default_GrpcHost,
-		GrpcPort:            default_GrpcPort,
-		KafkaAdapterHost:    default_KafkaAdapterHost,
-		KafkaAdapterPort:    default_KafkaAdapterPort,
-		KafkaClusterHost:    default_KafkaClusterHost,
-		KafkaClusterPort:     default_KafkaClusterPort,
-		KVStoreType:          default_KVStoreType,
-		KVStoreTimeout:      default_KVStoreTimeout,
-		KVStoreHost:         default_KVStoreHost,
-		KVStorePort:         default_KVStorePort,
-		KVStoreDataPrefix:   default_KVStoreDataPrefix,
-		KVTxnKeyDelTime:     default_KVTxnKeyDelTime,
-		CoreTopic:           default_CoreTopic,
-		LogLevel:            default_LogLevel,
-		Banner:              default_Banner,
-		RWCoreKey:           default_RWCoreKey,
-		RWCoreCert:          default_RWCoreCert,
-		RWCoreCA:            default_RWCoreCA,
-		AffinityRouterTopic: default_AffinityRouterTopic,
-		InCompetingMode:     default_InCompetingMode,
-		DefaultRequestTimeout:default_DefaultRequestTimeout,
-		LongRunningRequestTimeout:default_LongRunningRequestTimeout,
-		CoreBindingKey:      default_CoreBindingKey,
+		InstanceID:                default_InstanceID,
+		RWCoreEndpoint:            default_RWCoreEndpoint,
+		GrpcHost:                  default_GrpcHost,
+		GrpcPort:                  default_GrpcPort,
+		KafkaAdapterHost:          default_KafkaAdapterHost,
+		KafkaAdapterPort:          default_KafkaAdapterPort,
+		KafkaClusterHost:          default_KafkaClusterHost,
+		KafkaClusterPort:          default_KafkaClusterPort,
+		KVStoreType:               default_KVStoreType,
+		KVStoreTimeout:            default_KVStoreTimeout,
+		KVStoreHost:               default_KVStoreHost,
+		KVStorePort:               default_KVStorePort,
+		KVStoreDataPrefix:         default_KVStoreDataPrefix,
+		KVTxnKeyDelTime:           default_KVTxnKeyDelTime,
+		CoreTopic:                 default_CoreTopic,
+		LogLevel:                  default_LogLevel,
+		Banner:                    default_Banner,
+		RWCoreKey:                 default_RWCoreKey,
+		RWCoreCert:                default_RWCoreCert,
+		RWCoreCA:                  default_RWCoreCA,
+		AffinityRouterTopic:       default_AffinityRouterTopic,
+		InCompetingMode:           default_InCompetingMode,
+		DefaultRequestTimeout:     default_DefaultRequestTimeout,
+		LongRunningRequestTimeout: default_LongRunningRequestTimeout,
+		DefaultCoreTimeout:        default_CoreTimeout,
+		CoreBindingKey:            default_CoreBindingKey,
 	}
 	return &rwCoreFlag
 }
@@ -184,6 +187,9 @@
 	help = fmt.Sprintf("Default timeout for regular request")
 	flag.Int64Var(&(cf.DefaultRequestTimeout), "timeout_request", default_DefaultRequestTimeout, help)
 
+	help = fmt.Sprintf("Default Core timeout")
+	flag.Int64Var(&(cf.DefaultCoreTimeout), "core_timeout", default_CoreTimeout, help)
+
 	help = fmt.Sprintf("Show startup banner log lines")
 	flag.BoolVar(&cf.Banner, "banner", default_Banner, help)
 
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 07a4826..5d539aa 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -164,7 +164,7 @@
 	} else {
 		log.Debug("no-existing-device-type-found")
 		//	No device types data.   In order to have a proxy setup for that path let's create a fake device type
-		aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{&voltha.DeviceType{Id: SENTINEL_DEVICETYPE_ID, Adapter: SENTINEL_ADAPTER_ID}}}, true)
+		aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SENTINEL_DEVICETYPE_ID, Adapter: SENTINEL_ADAPTER_ID}}}, true)
 	}
 }
 
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index d933466..eeebd0d 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -88,7 +88,7 @@
 		return nil, errors.New("fail-to-create-transaction")
 	}
 
-	if rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id:devId}) {
+	if rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: devId}) {
 		log.Debugw("owned-by-me", log.Fields{"Id": devId})
 		if txn.Acquired(timeout) {
 			log.Debugw("processing-request", log.Fields{"Id": devId})
@@ -789,9 +789,6 @@
 		return nil, nil
 	}
 	go rhp.deviceMgr.addPort(deviceId.Id, port)
-	//if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
-	//	return nil, err
-	//}
 
 	return new(empty.Empty), nil
 }
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index a504f34..f03c7d2 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -24,8 +24,8 @@
 	"github.com/opencord/voltha-go/db/kvstore"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/kafka"
-	"github.com/opencord/voltha-protos/go/voltha"
 	"github.com/opencord/voltha-go/rw_core/config"
+	"github.com/opencord/voltha-protos/go/voltha"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -38,7 +38,7 @@
 	logicalDeviceMgr  *LogicalDeviceManager
 	grpcServer        *grpcserver.GrpcServer
 	grpcNBIAPIHandler *APIHandler
-	adapterMgr *AdapterManager
+	adapterMgr        *AdapterManager
 	config            *config.RWCoreFlags
 	kmp               *kafka.InterContainerProxy
 	clusterDataRoot   model.Root
@@ -48,9 +48,9 @@
 	exitChannel       chan int
 	kvClient          kvstore.Client
 	kafkaClient       kafka.Client
-	coreMembership *voltha.Membership
-	membershipLock *sync.RWMutex
-	deviceOwnership    *DeviceOwnership
+	coreMembership    *voltha.Membership
+	membershipLock    *sync.RWMutex
+	deviceOwnership   *DeviceOwnership
 }
 
 func init() {
@@ -92,7 +92,7 @@
 	log.Info("values", log.Fields{"kmp": core.kmp})
 	core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
 	core.deviceMgr = newDeviceManager(core)
-	core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy)
+	core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy, core.config.DefaultCoreTimeout)
 
 	if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
 		log.Fatal("Failure-registering-adapterRequestHandler")
@@ -170,17 +170,6 @@
 	return nil
 }
 
-//func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
-//	ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
-//	) error {
-//	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
-//		core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
-//	core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
-//
-//	log.Info("request-handlers")
-//	return nil
-//}
-
 func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
 	ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
 ) error {
@@ -197,7 +186,6 @@
 	return nil
 }
 
-
 func (core *Core) isMembershipRegistrationComplete() bool {
 	core.membershipLock.RLock()
 	defer core.membershipLock.RUnlock()
@@ -219,7 +207,6 @@
 
 	core.coreMembership = membership
 
-
 	// Use the group name to register a specific kafka topic for this container
 	go func(groupName string) {
 		// Register the core-pair topic to handle core-bound requests destined to the core pair
@@ -242,7 +229,6 @@
 	return core.coreMembership
 }
 
-
 func (core *Core) startDeviceManager(ctx context.Context) {
 	// TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
 	// callbacks.  For now, until the model is ready, devicemanager will keep a reference to the
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 9704fff..836269e 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -17,7 +17,6 @@
 
 import (
 	"context"
-	"fmt"
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
@@ -34,6 +33,7 @@
 type DeviceAgent struct {
 	deviceId         string
 	deviceType       string
+	isRootdevice     bool
 	lastData         *voltha.Device
 	adapterProxy     *AdapterProxy
 	adapterMgr       *AdapterManager
@@ -41,14 +41,13 @@
 	clusterDataProxy *model.Proxy
 	deviceProxy      *model.Proxy
 	exitChannel      chan int
-	flowProxy        *model.Proxy
-	groupProxy       *model.Proxy
 	lockDevice       sync.RWMutex
+	defaultTimeout   int64
 }
 
 //newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
 //preprovisioning
-func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
+func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout int64) *DeviceAgent {
 	var agent DeviceAgent
 	agent.adapterProxy = ap
 	cloned := (proto.Clone(device)).(*voltha.Device)
@@ -63,6 +62,7 @@
 		// overwritten by the child adapter during a device update request
 		cloned.Vlan = device.ProxyAddress.ChannelId
 	}
+	agent.isRootdevice = device.Root
 	agent.deviceId = cloned.Id
 	agent.deviceType = cloned.Type
 	agent.lastData = cloned
@@ -71,6 +71,7 @@
 	agent.exitChannel = make(chan int, 1)
 	agent.clusterDataProxy = cdProxy
 	agent.lockDevice = sync.RWMutex{}
+	agent.defaultTimeout = timeout
 	return &agent
 }
 
@@ -101,16 +102,6 @@
 	agent.deviceProxy = agent.clusterDataProxy.CreateProxy("/devices/"+agent.deviceId, false)
 	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
 
-	agent.flowProxy = agent.clusterDataProxy.CreateProxy(
-		fmt.Sprintf("/devices/%s/flows", agent.deviceId),
-		false)
-	agent.groupProxy = agent.clusterDataProxy.CreateProxy(
-		fmt.Sprintf("/devices/%s/flow_groups", agent.deviceId),
-		false)
-
-	agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
-	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
-
 	log.Debug("device-agent-started")
 	return nil
 }
@@ -201,38 +192,117 @@
 	return nil
 }
 
-func (agent *DeviceAgent) updateFlows(flows []*ofp.OfpFlowStats) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows})
-	var oldData *voltha.Flows
-	if storedData, err := agent.getDeviceWithoutLock(); err != nil {
-		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
-	} else {
-		oldData = proto.Clone(storedData.Flows).(*voltha.Flows)
-		log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows, "old": oldData})
-
-		// store the changed data
-		afterUpdate := agent.flowProxy.Update("/", &ofp.Flows{Items: flows}, false, "")
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
-		}
-
-		return nil
+func (agent *DeviceAgent) updateDeviceWithoutLockAsync(device *voltha.Device, ch chan interface{}) {
+	if err := agent.updateDeviceWithoutLock(device); err != nil {
+		ch <- status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
 	}
+	ch <- nil
 }
 
-func (agent *DeviceAgent) updateGroups(groups []*ofp.OfpGroupEntry) error {
+func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, ch chan interface{}) {
+	if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups); err != nil {
+		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
+		ch <- err
+	}
+	ch <- nil
+}
+
+func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, ch chan interface{}) {
+	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups); err != nil {
+		log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.lastData.Id, "error": err})
+		ch <- err
+	}
+	ch <- nil
+}
+
+func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry) error {
+	if (len(newFlows) | len(newGroups)) == 0 {
+		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+		return nil
+	}
+
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
-	log.Debugw("updateGroups", log.Fields{"deviceId": agent.deviceId, "groups": groups})
-	if _, err := agent.getDeviceWithoutLock(); err != nil {
+	log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+	var existingFlows *voltha.Flows
+	if device, err := agent.getDeviceWithoutLock(); err != nil {
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
+		existingFlows = proto.Clone(device.Flows).(*voltha.Flows)
+		existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+		log.Debugw("addFlows", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "existingFlows": existingFlows, "groups": newGroups, "existingGroups": existingGroups})
+
+		var updatedFlows []*ofp.OfpFlowStats
+		var flowsToDelete []*ofp.OfpFlowStats
+		var groupsToDelete []*ofp.OfpGroupEntry
+		var updatedGroups []*ofp.OfpGroupEntry
+
+		// Process flows
+		for _, flow := range newFlows {
+			updatedFlows = append(updatedFlows, flow)
+		}
+
+		for _, flow := range existingFlows.Items {
+			if idx := fu.FindFlows(newFlows, flow); idx == -1 {
+				updatedFlows = append(updatedFlows, flow)
+			} else {
+				flowsToDelete = append(flowsToDelete, flow)
+			}
+		}
+
+		// Process groups
+		for _, g := range newGroups {
+			updatedGroups = append(updatedGroups, g)
+		}
+
+		for _, group := range existingGroups.Items {
+			if fu.FindGroup(newGroups, group.Desc.GroupId) == -1 { // does not exist now
+				updatedGroups = append(updatedGroups, group)
+			} else {
+				groupsToDelete = append(groupsToDelete, group)
+			}
+		}
+
+		// Sanity check
+		if (len(updatedFlows) | len(flowsToDelete) | len(updatedGroups) | len(groupsToDelete)) == 0 {
+			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+			return nil
+
+		}
+		// Send update to adapters
+		chAdapters := make(chan interface{})
+		defer close(chAdapters)
+		chdB := make(chan interface{})
+		defer close(chdB)
+		dType := agent.adapterMgr.getDeviceType(device.Type)
+		if !dType.AcceptsAddRemoveFlowUpdates {
+
+			if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
+				log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+				return nil
+			}
+			go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+
+		} else {
+			flowChanges := &ofp.FlowChanges{
+				ToAdd:    &voltha.Flows{Items: newFlows},
+				ToRemove: &voltha.Flows{Items: flowsToDelete},
+			}
+			groupChanges := &ofp.FlowGroupChanges{
+				ToAdd:    &voltha.FlowGroups{Items: newGroups},
+				ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
+				ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			}
+			go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+		}
+
 		// store the changed data
-		afterUpdate := agent.groupProxy.Update("/", &ofp.FlowGroups{Items: groups}, false, "")
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		device.Flows = &voltha.Flows{Items: updatedFlows}
+		device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
+		go agent.updateDeviceWithoutLockAsync(device, chdB)
+
+		if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+			return status.Errorf(codes.Aborted, "errors-%s", res)
 		}
 
 		return nil
@@ -677,7 +747,6 @@
 
 func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
 	agent.lockDevice.Lock()
-	//defer agent.lockDevice.Unlock()
 	// Work only on latest data
 	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
 		agent.lockDevice.Unlock()
@@ -707,7 +776,6 @@
 
 func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
 	agent.lockDevice.Lock()
-	//defer agent.lockDevice.Unlock()
 	// Work only on latest data
 	// TODO: Get list of ports from device directly instead of the entire device
 	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
@@ -766,7 +834,7 @@
 func (agent *DeviceAgent) addPort(port *voltha.Port) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
-	log.Debugw("addPort", log.Fields{"deviceId": agent.deviceId})
+	log.Debugw("addLogicalPortToMap", log.Fields{"deviceId": agent.deviceId})
 	// Work only on latest data
 	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
@@ -775,7 +843,7 @@
 		cloned := proto.Clone(storeDevice).(*voltha.Device)
 		if cloned.Ports == nil {
 			//	First port
-			log.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
+			log.Debugw("addLogicalPortToMap-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
 			cloned.Ports = make([]*voltha.Port, 0)
 		}
 		cp := proto.Clone(port).(*voltha.Port)
@@ -822,160 +890,6 @@
 	}
 }
 
-//flowTableUpdated is the callback after flows have been updated in the model to push them
-//to the adapters
-func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
-	log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-
-	var previousData *voltha.Flows
-	var latestData *voltha.Flows
-
-	var ok bool
-	if previousData, ok = args[0].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-		return nil
-	}
-	if latestData, ok = args[1].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-		return nil
-	}
-
-	// Sanity check - should not happen as this is already handled in logical device agent
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debugw("flow-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
-		return nil
-	}
-
-	var device *voltha.Device
-	var err error
-	if device, err = agent.getDeviceWithoutLock(); err != nil {
-		log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
-		return nil
-	}
-	groups := device.FlowGroups
-
-	// Send update to adapters
-	dType := agent.adapterMgr.getDeviceType(device.Type)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
-			log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
-			return err
-		}
-		return nil
-	}
-	// Incremental flow changes accepted
-	var toAdd []*ofp.OfpFlowStats
-	var toDelete []*ofp.OfpFlowStats
-
-	for _, flow := range latestData.Items {
-		if fu.FindFlowById(previousData.Items, flow) == -1 { // did not exist before
-			toAdd = append(toAdd, flow)
-		}
-	}
-	for _, flow := range previousData.Items {
-		if fu.FindFlowById(latestData.Items, flow) == -1 { // does not exist now
-			toDelete = append(toDelete, flow)
-		}
-	}
-	flowChanges := &ofp.FlowChanges{
-		ToAdd:    &voltha.Flows{Items: toAdd},
-		ToRemove: &voltha.Flows{Items: toDelete},
-	}
-	// Send an empty group changes as it would be dealt with a call to groupTableUpdated
-	groupChanges := &ofp.FlowGroupChanges{}
-
-	// Send changes only
-	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
-		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
-		return err
-	}
-
-	return nil
-}
-
-//groupTableUpdated is the callback after group table has been updated in the model to push them
-//to the adapters
-func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
-	log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-
-	var previousData *voltha.FlowGroups
-	var latestData *voltha.FlowGroups
-
-	var ok bool
-	if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-		return nil
-	}
-	if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-		return nil
-	}
-
-	// Sanity check - should not happen as this is already handled in logical device agent
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debugw("group-table-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
-		return nil
-	}
-
-	var device *voltha.Device
-	var err error
-	if device, err = agent.getDeviceWithoutLock(); err != nil {
-		log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
-		return nil
-	}
-	flows := device.Flows
-
-	// Send update to adapters
-	dType := agent.adapterMgr.getDeviceType(device.Type)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
-			log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
-			return err
-		}
-		return nil
-	}
-
-	// Incremental group changes accepted
-	var toAdd []*ofp.OfpGroupEntry
-	var toDelete []*ofp.OfpGroupEntry
-	var toUpdate []*ofp.OfpGroupEntry
-
-	for _, group := range latestData.Items {
-		if idx := fu.FindGroup(previousData.Items, group.Desc.GroupId); idx == -1 { // did not exist before
-			toAdd = append(toAdd, group)
-		} else { // existed before
-			if previousData.Items[idx].String() != group.String() { // there is a change
-				toUpdate = append(toUpdate, group)
-			}
-		}
-	}
-	for _, group := range previousData.Items {
-		if fu.FindGroup(latestData.Items, group.Desc.GroupId) == -1 { // does not exist now
-			toDelete = append(toDelete, group)
-		}
-	}
-	groupChanges := &ofp.FlowGroupChanges{
-		ToAdd:    &voltha.FlowGroups{Items: toAdd},
-		ToRemove: &voltha.FlowGroups{Items: toDelete},
-		ToUpdate: &voltha.FlowGroups{Items: toUpdate},
-	}
-	// Send an empty flow changes as it should have been dealt with a call to flowTableUpdated
-	flowChanges := &ofp.FlowChanges{}
-
-	// Send changes only
-	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
-		log.Debugw("update-incremental-group-error", log.Fields{"id": agent.lastData.Id, "error": err})
-		return err
-	}
-	return nil
-}
-
 // TODO: A generic device update by attribute
 func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
 	agent.lockDevice.Lock()
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index f9da623..f6540e4 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -33,6 +33,8 @@
 
 type DeviceManager struct {
 	deviceAgents        map[string]*DeviceAgent
+	rootDevices         map[string]bool
+	lockRootDeviceMap   sync.RWMutex
 	core                *Core
 	adapterProxy        *AdapterProxy
 	adapterMgr          *AdapterManager
@@ -42,6 +44,7 @@
 	clusterDataProxy    *model.Proxy
 	coreInstanceId      string
 	exitChannel         chan int
+	defaultTimeout      int64
 	lockDeviceAgentsMap sync.RWMutex
 }
 
@@ -50,12 +53,15 @@
 	deviceMgr.core = core
 	deviceMgr.exitChannel = make(chan int, 1)
 	deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
+	deviceMgr.rootDevices = make(map[string]bool)
 	deviceMgr.kafkaICProxy = core.kmp
 	deviceMgr.adapterProxy = NewAdapterProxy(core.kmp)
 	deviceMgr.coreInstanceId = core.instanceId
 	deviceMgr.clusterDataProxy = core.clusterDataProxy
 	deviceMgr.adapterMgr = core.adapterMgr
 	deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
+	deviceMgr.lockRootDeviceMap = sync.RWMutex{}
+	deviceMgr.defaultTimeout = core.config.DefaultCoreTimeout
 	return &deviceMgr
 }
 
@@ -86,16 +92,26 @@
 
 func (dMgr *DeviceManager) addDeviceAgentToMap(agent *DeviceAgent) {
 	dMgr.lockDeviceAgentsMap.Lock()
-	defer dMgr.lockDeviceAgentsMap.Unlock()
+	//defer dMgr.lockDeviceAgentsMap.Unlock()
 	if _, exist := dMgr.deviceAgents[agent.deviceId]; !exist {
 		dMgr.deviceAgents[agent.deviceId] = agent
 	}
+	dMgr.lockDeviceAgentsMap.Unlock()
+	dMgr.lockRootDeviceMap.Lock()
+	defer dMgr.lockRootDeviceMap.Unlock()
+	dMgr.rootDevices[agent.deviceId] = agent.isRootdevice
+
 }
 
 func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
 	dMgr.lockDeviceAgentsMap.Lock()
-	defer dMgr.lockDeviceAgentsMap.Unlock()
+	//defer dMgr.lockDeviceAgentsMap.Unlock()
 	delete(dMgr.deviceAgents, agent.deviceId)
+	dMgr.lockDeviceAgentsMap.Unlock()
+	dMgr.lockRootDeviceMap.Lock()
+	defer dMgr.lockRootDeviceMap.Unlock()
+	delete(dMgr.rootDevices, agent.deviceId)
+
 }
 
 // getDeviceAgent returns the agent managing the device.  If the device is not in memory, it will loads it, if it exists
@@ -123,7 +139,7 @@
 	dMgr.lockDeviceAgentsMap.RLock()
 	defer dMgr.lockDeviceAgentsMap.RUnlock()
 	result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
-	for key, _ := range dMgr.deviceAgents {
+	for key := range dMgr.deviceAgents {
 		result.Items = append(result.Items, &voltha.ID{Id: key})
 	}
 	return result
@@ -135,7 +151,7 @@
 	// Ensure this device is set as root
 	device.Root = true
 	// Create and start a device agent for that device
-	agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy)
+	agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 	dMgr.addDeviceAgentToMap(agent)
 	agent.start(ctx, false)
 
@@ -310,11 +326,12 @@
 }
 
 func (dMgr *DeviceManager) IsRootDevice(id string) (bool, error) {
-	device, err := dMgr.GetDevice(id)
-	if err != nil {
-		return false, err
+	dMgr.lockRootDeviceMap.RLock()
+	defer dMgr.lockRootDeviceMap.RUnlock()
+	if exist := dMgr.rootDevices[id]; exist {
+		return dMgr.rootDevices[id], nil
 	}
-	return device.Root, nil
+	return false, nil
 }
 
 // ListDevices retrieves the latest devices from the data model
@@ -325,7 +342,7 @@
 		for _, device := range devices.([]interface{}) {
 			// If device is not in memory then set it up
 			if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
-				agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+				agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 				if err := agent.start(nil, true); err != nil {
 					log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
 					agent.stop(nil)
@@ -347,7 +364,7 @@
 		return nil, status.Error(codes.InvalidArgument, "deviceId empty")
 	}
 	if !dMgr.IsDeviceInCache(deviceId) {
-		agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: deviceId}, dMgr, dMgr.clusterDataProxy)
+		agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: deviceId}, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 		if err := agent.start(nil, true); err != nil {
 			agent.stop(nil)
 			return nil, err
@@ -450,7 +467,7 @@
 				//	Device Id not in memory
 				log.Debugw("reconciling-device", log.Fields{"id": id.Id})
 				// Load device from dB
-				agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: id.Id}, dMgr, dMgr.clusterDataProxy)
+				agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: id.Id}, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 				if err := agent.start(nil, true); err != nil {
 					log.Warnw("failure-loading-device", log.Fields{"deviceId": id.Id})
 					agent.stop(nil)
@@ -494,14 +511,14 @@
 				}
 			}
 		}
-		// Notify the logical device manager to setup a logical port if needed
-		if port.Type == voltha.Port_ETHERNET_NNI || port.Type == voltha.Port_ETHERNET_UNI {
-			if device, err := dMgr.GetDevice(deviceId); err == nil {
-				go dMgr.logicalDeviceMgr.addLogicalPort(device, port)
-			} else {
-				log.Errorw("failed-to-retrieve-device", log.Fields{"deviceId": deviceId})
-				return err
-			}
+		// Notify the logical device manager to setup a logical port, if needed.  If the added port is an NNI or UNI
+		// then a logical port will be added to the logical device and the device graph generated.  If the port is a
+		// PON port then only the device graph will be generated.
+		if device, err := dMgr.GetDevice(deviceId); err == nil {
+			go dMgr.logicalDeviceMgr.updateLogicalPort(device, port)
+		} else {
+			log.Errorw("failed-to-retrieve-device", log.Fields{"deviceId": deviceId})
+			return err
 		}
 		return nil
 	} else {
@@ -509,17 +526,12 @@
 	}
 }
 
-func (dMgr *DeviceManager) updateFlows(deviceId string, flows []*ofp.OfpFlowStats) error {
-	log.Debugw("updateFlows", log.Fields{"deviceid": deviceId})
+func (dMgr *DeviceManager) addFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) error {
+	log.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceId})
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
-		return agent.updateFlows(flows)
-	}
-	return status.Errorf(codes.NotFound, "%s", deviceId)
-}
-
-func (dMgr *DeviceManager) updateGroups(deviceId string, groups []*ofp.OfpGroupEntry) error {
-	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
-		return agent.updateGroups(groups)
+		return agent.addFlowsAndGroups(flows, groups)
+		//go agent.addFlowsAndGroups(flows, groups)
+		//return nil
 	}
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
@@ -624,13 +636,10 @@
 	childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceId, DeviceType: parent.Type, ChannelId: uint32(channelId), OnuId: uint32(onuId)}
 
 	// Create and start a device agent for that device
-	agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy)
+	agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 	dMgr.addDeviceAgentToMap(agent)
 	agent.start(nil, false)
 
-	//// Set device ownership
-	//dMgr.core.deviceOwnership.OwnedByMe(agent.deviceId)
-
 	// Activate the child device
 	if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
 		go agent.enableDevice(nil)
@@ -808,8 +817,8 @@
 				childDeviceIds = append(childDeviceIds, peer.DeviceId)
 			}
 		}
+		log.Debugw("returning-getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id, "childDeviceIds": childDeviceIds})
 	}
-	log.Debugw("returning-getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id, "childDeviceIds": childDeviceIds})
 	return childDeviceIds, nil
 }
 
@@ -970,7 +979,7 @@
 
 func (dMgr *DeviceManager) notAllowed(pcDevice *voltha.Device) error {
 	log.Info("notAllowed")
-	return errors.New("Transition-not-allowed")
+	return errors.New("transition-not-allowed")
 }
 
 func funcName(f interface{}) string {
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index f52efea..97de41c 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -20,8 +20,8 @@
 	"fmt"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
-	"github.com/opencord/voltha-protos/go/voltha"
 	"github.com/opencord/voltha-go/rw_core/utils"
+	"github.com/opencord/voltha-protos/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 	"sync"
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 6532b6e..8ff3f04 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -24,8 +24,8 @@
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-protos/go/common"
-	"github.com/opencord/voltha-protos/go/openflow_13"
 	"github.com/opencord/voltha-protos/go/omci"
+	"github.com/opencord/voltha-protos/go/openflow_13"
 	"github.com/opencord/voltha-protos/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/metadata"
@@ -35,38 +35,37 @@
 )
 
 const (
-	IMAGE_DOWNLOAD = iota
-	CANCEL_IMAGE_DOWNLOAD     = iota
-	ACTIVATE_IMAGE = iota
-	REVERT_IMAGE = iota
+	IMAGE_DOWNLOAD        = iota
+	CANCEL_IMAGE_DOWNLOAD = iota
+	ACTIVATE_IMAGE        = iota
+	REVERT_IMAGE          = iota
 )
 
-
 type APIHandler struct {
-	deviceMgr        *DeviceManager
-	logicalDeviceMgr *LogicalDeviceManager
-	adapterMgr *AdapterManager
-	packetInQueue    *queue.Queue
-	changeEventQueue *queue.Queue
-	coreInCompetingMode bool
+	deviceMgr                 *DeviceManager
+	logicalDeviceMgr          *LogicalDeviceManager
+	adapterMgr                *AdapterManager
+	packetInQueue             *queue.Queue
+	changeEventQueue          *queue.Queue
+	coreInCompetingMode       bool
 	longRunningRequestTimeout int64
-	defaultRequestTimeout int64
+	defaultRequestTimeout     int64
 	da.DefaultAPIHandler
 	core *Core
 }
 
 func NewAPIHandler(core *Core) *APIHandler {
 	handler := &APIHandler{
-		deviceMgr:        core.deviceMgr,
-		logicalDeviceMgr: core.logicalDeviceMgr,
-		adapterMgr: core.adapterMgr,
-		coreInCompetingMode: core.config.InCompetingMode,
-		longRunningRequestTimeout:core.config.LongRunningRequestTimeout,
-		defaultRequestTimeout:core.config.DefaultRequestTimeout,
+		deviceMgr:                 core.deviceMgr,
+		logicalDeviceMgr:          core.logicalDeviceMgr,
+		adapterMgr:                core.adapterMgr,
+		coreInCompetingMode:       core.config.InCompetingMode,
+		longRunningRequestTimeout: core.config.LongRunningRequestTimeout,
+		defaultRequestTimeout:     core.config.DefaultRequestTimeout,
 		// TODO: Figure out what the 'hint' parameter to queue.New does
-		packetInQueue: queue.New(10),
+		packetInQueue:    queue.New(10),
 		changeEventQueue: queue.New(10),
-		core: core,
+		core:             core,
 	}
 	return handler
 }
@@ -92,7 +91,7 @@
 	} else if serNum, ok = md["voltha_serial_number"]; !ok {
 		err = errors.New("serial-number-not-found")
 	}
-	if !ok {
+	if !ok || serNum == nil {
 		log.Error(err)
 		return nil, err
 	}
@@ -129,7 +128,7 @@
 	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
 	txn, err := handler.createKvTransaction(ctx)
 	if txn == nil {
-		return nil,  err
+		return nil, err
 	} else if txn.Acquired(timeout) {
 		return txn, nil
 	} else {
@@ -160,7 +159,7 @@
 	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
 	txn, err := handler.createKvTransaction(ctx)
 	if txn == nil {
-		return nil,  err
+		return nil, err
 	}
 
 	owned := false
@@ -213,7 +212,6 @@
 	return out, nil
 }
 
-
 func (handler *APIHandler) UpdateMembership(ctx context.Context, membership *voltha.Membership) (*empty.Empty, error) {
 	log.Debugw("UpdateMembership-request", log.Fields{"membership": membership})
 	out := new(empty.Empty)
@@ -231,7 +229,6 @@
 	return &voltha.Membership{}, nil
 }
 
-
 func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
 	log.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
 	if isTestMode(ctx) {
@@ -240,7 +237,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:id.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -261,7 +258,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:id.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -283,7 +280,7 @@
 
 	if handler.competeForTransaction() {
 		if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
-			if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:flow.Id}); err != nil {
+			if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {
 				defer txn.Close()
@@ -306,7 +303,7 @@
 
 	if handler.competeForTransaction() {
 		if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
-			if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:flow.Id}); err != nil {
+			if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {
 				defer txn.Close()
@@ -370,7 +367,6 @@
 	return handler.logicalDeviceMgr.listLogicalDevices()
 }
 
-
 // ListAdapters returns the contents of all adapters known to the system
 func (handler *APIHandler) ListAdapters(ctx context.Context, empty *empty.Empty) (*voltha.Adapters, error) {
 	log.Debug("ListDevices")
@@ -408,7 +404,7 @@
 				return &voltha.Device{}, err
 			}
 			if d, ok := res.(*voltha.Device); ok {
-				handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id:d.Id})
+				handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: d.Id})
 				return d, nil
 			}
 		}
@@ -429,7 +425,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:id.Id}, handler.longRunningRequestTimeout); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}, handler.longRunningRequestTimeout); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -450,7 +446,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:id.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -471,7 +467,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:id.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -514,14 +510,14 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:img.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: img.Id}); err != nil {
 			return &common.OperationResp{}, err
 		} else {
 			defer txn.Close()
 		}
 	}
 
-	failedresponse := &common.OperationResp{Code:voltha.OperationResp_OPERATION_FAILURE}
+	failedresponse := &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
 
 	ch := make(chan interface{})
 	defer close(ch)
@@ -605,7 +601,7 @@
 	failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:img.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: img.Id}); err != nil {
 			return failedresponse, err
 		} else {
 			defer txn.Close()
@@ -651,15 +647,15 @@
 func (handler *APIHandler) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
 	log.Debugw("ListImageDownloads-request", log.Fields{"deviceId": id.Id})
 	if isTestMode(ctx) {
-		resp := &voltha.ImageDownloads{Items:[]*voltha.ImageDownload{}}
+		resp := &voltha.ImageDownloads{Items: []*voltha.ImageDownload{}}
 		return resp, nil
 	}
 
 	if downloads, err := handler.deviceMgr.listImageDownloads(ctx, id.Id); err != nil {
 		failedResp := &voltha.ImageDownloads{
-			Items:[]*voltha.ImageDownload{
-				&voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN},
-		},
+			Items: []*voltha.ImageDownload{
+				{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN},
+			},
 		}
 		return failedResp, err
 	} else {
@@ -667,7 +663,6 @@
 	}
 }
 
-
 func (handler *APIHandler) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
 	log.Debugw("UpdateDevicePmConfigs-request", log.Fields{"configs": *configs})
 	if isTestMode(ctx) {
@@ -819,7 +814,7 @@
 
 //@TODO useless stub, what should this actually do?
 func (handler *APIHandler) GetMeterStatsOfLogicalDevice(
-	ctx context.Context, 
+	ctx context.Context,
 	in *common.ID,
 ) (*openflow_13.MeterStatsReply, error) {
 	log.Debug("GetMeterStatsOfLogicalDevice-stub")
@@ -828,8 +823,8 @@
 
 //@TODO useless stub, what should this actually do?
 func (handler *APIHandler) GetMibDeviceData(
-	ctx context.Context, 
-	in *common.ID, 
+	ctx context.Context,
+	in *common.ID,
 ) (*omci.MibDeviceData, error) {
 	log.Debug("GetMibDeviceData-stub")
 	return nil, nil
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 9496aa1..a2b4494 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -37,25 +37,29 @@
 type LogicalDeviceAgent struct {
 	logicalDeviceId string
 	//lastData          *voltha.LogicalDevice
-	rootDeviceId      string
-	deviceMgr         *DeviceManager
-	ldeviceMgr        *LogicalDeviceManager
-	clusterDataProxy  *model.Proxy
-	exitChannel       chan int
-	deviceGraph       *graph.DeviceGraph
-	DefaultFlowRules  *fu.DeviceRules
-	flowProxy         *model.Proxy
-	groupProxy        *model.Proxy
-	ldProxy           *model.Proxy
-	portProxies       map[string]*model.Proxy
-	portProxiesLock   sync.RWMutex
-	lockLogicalDevice sync.RWMutex
-	flowDecomposer    *fd.FlowDecomposer
+	rootDeviceId        string
+	deviceMgr           *DeviceManager
+	ldeviceMgr          *LogicalDeviceManager
+	clusterDataProxy    *model.Proxy
+	exitChannel         chan int
+	deviceGraph         *graph.DeviceGraph
+	DefaultFlowRules    *fu.DeviceRules
+	flowProxy           *model.Proxy
+	groupProxy          *model.Proxy
+	ldProxy             *model.Proxy
+	portProxies         map[string]*model.Proxy
+	portProxiesLock     sync.RWMutex
+	lockLogicalDevice   sync.RWMutex
+	logicalPortsNo      map[uint32]bool //value is true for NNI port
+	lockLogicalPortsNo  sync.RWMutex
+	flowDecomposer      *fd.FlowDecomposer
+	includeDefaultFlows bool
+	defaultTimeout      int64
 }
 
 func newLogicalDeviceAgent(id string, deviceId string, ldeviceMgr *LogicalDeviceManager,
 	deviceMgr *DeviceManager,
-	cdProxy *model.Proxy) *LogicalDeviceAgent {
+	cdProxy *model.Proxy, timeout int64) *LogicalDeviceAgent {
 	var agent LogicalDeviceAgent
 	agent.exitChannel = make(chan int, 1)
 	agent.logicalDeviceId = id
@@ -67,6 +71,10 @@
 	agent.lockLogicalDevice = sync.RWMutex{}
 	agent.portProxies = make(map[string]*model.Proxy)
 	agent.portProxiesLock = sync.RWMutex{}
+	agent.lockLogicalPortsNo = sync.RWMutex{}
+	agent.logicalPortsNo = make(map[uint32]bool)
+	agent.includeDefaultFlows = true
+	agent.defaultTimeout = timeout
 	return &agent
 }
 
@@ -131,12 +139,11 @@
 		fmt.Sprintf("/logical_devices/%s", agent.logicalDeviceId),
 		false)
 
-	agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
-	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
-
 	// TODO:  Use a port proxy once the POST_ADD is fixed
 	agent.ldProxy.RegisterCallback(model.POST_UPDATE, agent.portUpdated)
 
+	agent.includeDefaultFlows = true
+
 	agent.lockLogicalDevice.Unlock()
 
 	return nil
@@ -148,13 +155,6 @@
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 
-	// Unregister to teh callbacks
-	if agent.flowProxy != nil {
-		agent.flowProxy.UnregisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
-	}
-	if agent.groupProxy != nil {
-		agent.groupProxy.UnregisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
-	}
 	//Remove the logical device from the model
 	if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
 		log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -246,6 +246,29 @@
 	return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
 }
 
+func (agent *LogicalDeviceAgent) updateLogicalPort(device *voltha.Device, port *voltha.Port) error {
+	log.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
+	var err error
+	if port.Type == voltha.Port_ETHERNET_NNI {
+		if _, err = agent.addNNILogicalPort(device, port); err != nil {
+			return err
+		}
+		agent.addLogicalPortToMap(port.PortNo, true)
+	} else if port.Type == voltha.Port_ETHERNET_UNI {
+		if _, err = agent.addUNILogicalPort(device, port); err != nil {
+			return err
+		}
+		agent.addLogicalPortToMap(port.PortNo, false)
+	} else {
+		// Update the device graph to ensure all routes on the logical device have been calculated
+		if err = agent.updateRoutes(device, port); err != nil {
+			log.Errorw("failed-to-update-routes", log.Fields{"deviceId": device.Id, "port": port, "error": err})
+			return err
+		}
+	}
+	return nil
+}
+
 func (agent *LogicalDeviceAgent) addLogicalPort(device *voltha.Device, port *voltha.Port) error {
 	log.Debugw("addLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
 	var err error
@@ -253,10 +276,12 @@
 		if _, err = agent.addNNILogicalPort(device, port); err != nil {
 			return err
 		}
+		agent.addLogicalPortToMap(port.PortNo, true)
 	} else if port.Type == voltha.Port_ETHERNET_UNI {
 		if _, err = agent.addUNILogicalPort(device, port); err != nil {
 			return err
 		}
+		agent.addLogicalPortToMap(port.PortNo, false)
 	} else {
 		log.Debugw("invalid-port-type", log.Fields{"deviceId": device.Id, "port": port})
 		return nil
@@ -266,15 +291,13 @@
 
 // setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
 func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
-	//now := time.Now()
-	//defer fmt.Println("setupNNILogicalPorts:", deviceId, time.Since(now))
 	log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
 	var err error
 
 	var device *voltha.Device
 	if device, err = agent.deviceMgr.GetDevice(deviceId); err != nil {
-		log.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": device.Id})
+		log.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": deviceId})
 		return err
 	}
 
@@ -284,6 +307,7 @@
 			if _, err = agent.addNNILogicalPort(device, port); err != nil {
 				log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
 			}
+			agent.addLogicalPortToMap(port.PortNo, true)
 		}
 	}
 	return err
@@ -291,8 +315,6 @@
 
 // setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
 func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
-	//now := time.Now()
-	//defer fmt.Println("setupUNILogicalPorts:", childDevice.Id, time.Since(now))
 	log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
 	var err error
@@ -303,6 +325,7 @@
 			if _, err = agent.addUNILogicalPort(childDevice, port); err != nil {
 				log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
 			}
+			agent.addLogicalPortToMap(port.PortNo, false)
 		}
 	}
 	return err
@@ -357,17 +380,6 @@
 		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
 }
 
-//updateFlowGroupsWithoutLock updates the flows in the logical device without locking the logical device.  This function
-//must only be called by a function that is holding the lock on the logical device
-func (agent *LogicalDeviceAgent) updateFlowGroupsWithoutLock(groups []*ofp.OfpGroupEntry) error {
-	groupsCloned := make([]*ofp.OfpGroupEntry, len(groups))
-	copy(groupsCloned, groups)
-	if afterUpdate := agent.groupProxy.Update("/", groupsCloned, true, ""); afterUpdate == nil {
-		return errors.New(fmt.Sprintf("update-flow-group-failed:%s", agent.logicalDeviceId))
-	}
-	return nil
-}
-
 //flowAdd adds a flow to the flow table of that logical device
 func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
 	log.Debug("flowAdd")
@@ -389,6 +401,7 @@
 		flows = lDevice.Flows.Items
 	}
 
+	updatedFlows := make([]*ofp.OfpFlowStats, 0)
 	//oldData := proto.Clone(lDevice.Flows).(*voltha.Flows)
 	changed := false
 	checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
@@ -400,6 +413,7 @@
 			//	Add flow
 			flow := fd.FlowStatsEntryFromFlowModMessage(mod)
 			flows = append(flows, flow)
+			updatedFlows = append(updatedFlows, flow)
 			changed = true
 		}
 	} else {
@@ -411,26 +425,65 @@
 				flow.ByteCount = oldFlow.ByteCount
 				flow.PacketCount = oldFlow.PacketCount
 			}
-			flows[idx] = flow
+			if !reflect.DeepEqual(oldFlow, flow) {
+				flows[idx] = flow
+				updatedFlows = append(updatedFlows, flow)
+				changed = true
+			}
 		} else {
 			flows = append(flows, flow)
+			updatedFlows = append(updatedFlows, flow)
+			changed = true
 		}
-		changed = true
 	}
 	if changed {
+		// Launch a routine to decompose the flows
+		if err := agent.decomposeAndSendFlows(&ofp.Flows{Items: updatedFlows}, lDevice.FlowGroups, agent.includeDefaultFlows); err != nil {
+			log.Errorf("decomposing-and-sending-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
+
+		// We no longer need to sent the default flows, unless there is a change in device topology
+		agent.includeDefaultFlows = false
+
 		//	Update model
 		flowsToUpdate := &ofp.Flows{}
 		if lDevice.Flows != nil {
 			flowsToUpdate = &ofp.Flows{Items: flows}
 		}
 		if err := agent.updateLogicalDeviceFlowsWithoutLock(flowsToUpdate); err != nil {
-			log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	}
 	return nil
 }
 
+func (agent *LogicalDeviceAgent) decomposeAndSendFlows(flows *ofp.Flows, groups *ofp.FlowGroups, includeDefaultFlows bool) error {
+	log.Debugw("decomposeAndSendFlows", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
+
+	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *groups, includeDefaultFlows)
+	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+	chnlsList := make([]chan interface{}, 0)
+	for deviceId, value := range deviceRules.GetRules() {
+		ch := make(chan interface{})
+		chnlsList = append(chnlsList, ch)
+		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
+			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups); err != nil {
+				log.Error("flow-update-failed", log.Fields{"deviceID": deviceId})
+				ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
+			}
+			ch <- nil
+		}(deviceId, value.ListFlows(), value.ListGroups())
+	}
+	// Wait for completion
+	if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+		return status.Errorf(codes.Aborted, "errors-%s", res)
+	}
+	return nil
+}
+
 //flowDelete deletes a flow from the flow table of that logical device
 func (agent *LogicalDeviceAgent) flowDelete(mod *ofp.OfpFlowMod) error {
 	log.Debug("flowDelete")
@@ -740,15 +793,6 @@
 	return nil
 }
 
-func isNNIPort(portNo uint32, nniPortsNo []uint32) bool {
-	for _, pNo := range nniPortsNo {
-		if pNo == portNo {
-			return true
-		}
-	}
-	return false
-}
-
 func (agent *LogicalDeviceAgent) getPreCalculatedRoute(ingress, egress uint32) []graph.RouteHop {
 	log.Debugw("ROUTE", log.Fields{"len": len(agent.deviceGraph.Routes)})
 	for routeLink, route := range agent.deviceGraph.Routes {
@@ -763,67 +807,56 @@
 
 func (agent *LogicalDeviceAgent) GetRoute(ingressPortNo uint32, egressPortNo uint32) []graph.RouteHop {
 	log.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
-	// Get the updated logical device
-	var ld *ic.LogicalDevice
 	routes := make([]graph.RouteHop, 0)
-	var err error
-	if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
-		return nil
-	}
-	nniLogicalPortsNo := make([]uint32, 0)
-	for _, logicalPort := range ld.Ports {
-		if logicalPort.RootPort {
-			nniLogicalPortsNo = append(nniLogicalPortsNo, logicalPort.OfpPort.PortNo)
-		}
-	}
-	if len(nniLogicalPortsNo) == 0 {
-		log.Errorw("no-nni-ports", log.Fields{"LogicalDeviceId": ld.Id})
-		return nil
-	}
+
 	// Note: A port value of 0 is equivalent to a nil port
 
 	//	Consider different possibilities
 	if egressPortNo != 0 && ((egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
-		log.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
-		if isNNIPort(ingressPortNo, nniLogicalPortsNo) {
+		log.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
+		if agent.isNNIPort(ingressPortNo) {
 			log.Debug("returning-half-route")
 			//This is a trap on the NNI Port
 			if len(agent.deviceGraph.Routes) == 0 {
 				// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
 				// internal route
-				hop := graph.RouteHop{DeviceID: ld.RootDeviceId, Ingress: ingressPortNo, Egress: egressPortNo}
+				hop := graph.RouteHop{DeviceID: agent.rootDeviceId, Ingress: ingressPortNo, Egress: egressPortNo}
 				routes = append(routes, hop)
 				routes = append(routes, hop)
 				return routes
 			}
 			//Return a 'half' route to make the flow decomposer logic happy
 			for routeLink, route := range agent.deviceGraph.Routes {
-				if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
+				if agent.isNNIPort(routeLink.Egress) {
 					routes = append(routes, graph.RouteHop{}) // first hop is set to empty
 					routes = append(routes, route[1])
 					return routes
 				}
 			}
-			log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+			log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
 			return nil
 		}
 		//treat it as if the output port is the first NNI of the OLT
-		egressPortNo = nniLogicalPortsNo[0]
+		var err error
+		if egressPortNo, err = agent.getFirstNNIPort(); err != nil {
+			log.Warnw("no-nni-port", log.Fields{"error": err})
+			return nil
+		}
 	}
 	//If ingress port is not specified (nil), it may be a wildcarded
 	//route if egress port is OFPP_CONTROLLER or a nni logical port,
 	//in which case we need to create a half-route where only the egress
 	//hop is filled, the first hop is nil
-	if ingressPortNo == 0 && isNNIPort(egressPortNo, nniLogicalPortsNo) {
+	if ingressPortNo == 0 && agent.isNNIPort(egressPortNo) {
 		// We can use the 2nd hop of any upstream route, so just find the first upstream:
 		for routeLink, route := range agent.deviceGraph.Routes {
-			if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
+			if agent.isNNIPort(routeLink.Egress) {
 				routes = append(routes, graph.RouteHop{}) // first hop is set to empty
 				routes = append(routes, route[1])
 				return routes
 			}
 		}
-		log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+		log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
 		return nil
 	}
 	//If egress port is not specified (nil), we can also can return a "half" route
@@ -835,10 +868,9 @@
 				return routes
 			}
 		}
-		log.Warnw("no-downstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+		log.Warnw("no-downstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
 		return nil
 	}
-
 	//	Return the pre-calculated route
 	return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
 }
@@ -935,16 +967,6 @@
 }
 
 func (agent *LogicalDeviceAgent) GetAllDefaultRules() *fu.DeviceRules {
-	// Get latest
-	var err error
-	if _, err = agent.GetLogicalDevice(); err != nil {
-		return fu.NewDeviceRules()
-	}
-	if agent.DefaultFlowRules == nil { // Nothing setup yet
-		// Setup device graph if needed
-		agent.setupDeviceGraph()
-		agent.DefaultFlowRules = agent.generateDefaultRules()
-	}
 	return agent.DefaultFlowRules
 }
 
@@ -968,101 +990,64 @@
 	return agent.deviceGraph
 }
 
-//setupDeviceGraph creates the device graph if not done already
-func (agent *LogicalDeviceAgent) setupDeviceGraph() {
+//updateRoutes redo the device graph if not done already and setup the default rules as well
+func (agent *LogicalDeviceAgent) updateRoutes(device *voltha.Device, port *voltha.Port) error {
+	log.Debugf("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "device": device.Id, "port": port})
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
+	rules := fu.NewDeviceRules()
 	if agent.deviceGraph == nil {
 		agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
-		if ld, err := agent.getLogicalDeviceWithoutLock(); err == nil {
-			agent.deviceGraph.ComputeRoutes(ld.Ports)
+	}
+	// Get all the logical ports on that logical device
+	if lDevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
+		log.Errorf("unknown-logical-device", log.Fields{"error": err, "logicalDeviceId": agent.logicalDeviceId})
+		return err
+	} else {
+		//TODO:  Find a better way to refresh only missing routes
+		agent.deviceGraph.ComputeRoutes(lDevice.Ports)
+	}
+	deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
+	for deviceId := range deviceNodeIds {
+		if deviceId == agent.rootDeviceId {
+			rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
+		} else {
+			rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
 		}
 	}
+	agent.DefaultFlowRules = rules
+
+	// Reset the default flows flag to ensure all default flows are sent to all devices, including the newly added
+	// one when a flow request is received.
+	agent.includeDefaultFlows = true
+	agent.deviceGraph.Print()
+	return nil
 }
 
-//updateDeviceGraph updates the device graph if not done already
+//updateDeviceGraph updates the device graph if not done already and setup the default rules as well
 func (agent *LogicalDeviceAgent) updateDeviceGraph(lp *voltha.LogicalPort) {
+	log.Debugf("updateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+	rules := fu.NewDeviceRules()
 	if agent.deviceGraph == nil {
 		agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
 	}
 	agent.deviceGraph.AddPort(lp)
-}
-
-func (agent *LogicalDeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
-	log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
-	var previousData *ofp.Flows
-	var latestData *ofp.Flows
-
-	var ok bool
-	if previousData, ok = args[0].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-	}
-	if latestData, ok = args[1].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-	}
-
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debug("flow-update-not-required")
-		return nil
-	}
-
-	var groups *ofp.FlowGroups
-	lDevice, _ := agent.getLogicalDeviceWithoutLock()
-	groups = lDevice.FlowGroups
-	log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
-	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
-	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
-	var err error
-	for deviceId, value := range deviceRules.GetRules() {
-		if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
-			log.Error("update-flows-failed", log.Fields{"deviceID": deviceId})
-		}
-		if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
-			log.Error("update-groups-failed", log.Fields{"deviceID": deviceId})
+	deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
+	for deviceId := range deviceNodeIds {
+		if deviceId == agent.rootDeviceId {
+			rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
+		} else {
+			rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
 		}
 	}
+	agent.DefaultFlowRules = rules
 
-	return nil
-}
-
-func (agent *LogicalDeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
-	log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
-	var previousData *ofp.FlowGroups
-	var latestData *ofp.FlowGroups
-
-	var ok bool
-	if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-	}
-	if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-	}
-
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debug("flow-update-not-required")
-		return nil
-	}
-
-	var flows *ofp.Flows
-	lDevice, _ := agent.getLogicalDeviceWithoutLock()
-	flows = lDevice.Flows
-	log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
-	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
-	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-	var err error
-	for deviceId, value := range deviceRules.GetRules() {
-		if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
-			log.Error("update-flows-failed", log.Fields{"deviceID": deviceId})
-		}
-		if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
-			log.Error("update-groups-failed", log.Fields{"deviceID": deviceId})
-		}
-
-	}
-	return nil
+	// Reset the default flows flag to ensure all default flows are sent to all devices, including the newly added
+	// one when a flow request is received.
+	agent.includeDefaultFlows = true
+	agent.deviceGraph.Print()
 }
 
 // portAdded is a callback invoked when a port is added to the logical device.
@@ -1192,9 +1177,9 @@
 	newPorts, changedPorts, deletedPorts := diff(oldLD.Ports, newlD.Ports)
 
 	// Send the port change events to the OF controller
-	for _, new := range newPorts {
+	for _, newP := range newPorts {
 		go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
-			&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: new.OfpPort})
+			&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
 	}
 	for _, change := range changedPorts {
 		go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
@@ -1213,8 +1198,6 @@
 // (true, nil).   If the device is not in the correct state it will return (false, nil) as this is a valid
 // scenario. This also applies to the case where the port was already added.
 func (agent *LogicalDeviceAgent) addNNILogicalPort(device *voltha.Device, port *voltha.Port) (bool, error) {
-	//now := time.Now()
-	//defer fmt.Println("setupNNILogicalPorts:", device.Id, time.Since(now))
 	log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
 	if device.AdminState != voltha.AdminState_ENABLED || device.OperStatus != voltha.OperStatus_ACTIVE {
 		log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
@@ -1291,8 +1274,6 @@
 // (true, nil).   If the device is not in the correct state it will return (false, nil) as this is a valid
 // scenario. This also applies to the case where the port was already added.
 func (agent *LogicalDeviceAgent) addUNILogicalPort(childDevice *voltha.Device, port *voltha.Port) (bool, error) {
-	//now := time.Now()
-	//defer fmt.Println("addUNILogicalPort:", childDevice.Id, time.Since(now))
 	log.Debugw("addUNILogicalPort", log.Fields{"port": port})
 	if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
 		log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
@@ -1337,7 +1318,6 @@
 		if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
 			return false, err
 		}
-
 		// Update the device graph with this new logical port
 		clonedLP := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
 		go agent.updateDeviceGraph(clonedLP)
@@ -1361,3 +1341,39 @@
 	agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, transactionId, packetIn)
 	log.Debugw("sending-packet-in", log.Fields{"packet-in": packetIn})
 }
+
+func (agent *LogicalDeviceAgent) addLogicalPortToMap(portNo uint32, nniPort bool) {
+	agent.lockLogicalPortsNo.Lock()
+	defer agent.lockLogicalPortsNo.Unlock()
+	if exist := agent.logicalPortsNo[portNo]; !exist {
+		agent.logicalPortsNo[portNo] = nniPort
+	}
+}
+
+func (agent *LogicalDeviceAgent) deleteLogicalPortFromMap(portNo uint32) {
+	agent.lockLogicalPortsNo.Lock()
+	defer agent.lockLogicalPortsNo.Unlock()
+	if exist := agent.logicalPortsNo[portNo]; exist {
+		delete(agent.logicalPortsNo, portNo)
+	}
+}
+
+func (agent *LogicalDeviceAgent) isNNIPort(portNo uint32) bool {
+	agent.lockLogicalPortsNo.RLock()
+	defer agent.lockLogicalPortsNo.RUnlock()
+	if exist := agent.logicalPortsNo[portNo]; exist {
+		return agent.logicalPortsNo[portNo]
+	}
+	return false
+}
+
+func (agent *LogicalDeviceAgent) getFirstNNIPort() (uint32, error) {
+	agent.lockLogicalPortsNo.RLock()
+	defer agent.lockLogicalPortsNo.RUnlock()
+	for portNo, nni := range agent.logicalPortsNo {
+		if nni {
+			return portNo, nil
+		}
+	}
+	return 0, status.Error(codes.NotFound, "No NNI port found")
+}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 5cfb475..0b08321 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -39,9 +39,10 @@
 	clusterDataProxy           *model.Proxy
 	exitChannel                chan int
 	lockLogicalDeviceAgentsMap sync.RWMutex
+	defaultTimeout             int64
 }
 
-func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
+func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, timeout int64) *LogicalDeviceManager {
 	var logicalDeviceMgr LogicalDeviceManager
 	logicalDeviceMgr.core = core
 	logicalDeviceMgr.exitChannel = make(chan int, 1)
@@ -50,6 +51,7 @@
 	logicalDeviceMgr.kafkaICProxy = kafkaICProxy
 	logicalDeviceMgr.clusterDataProxy = cdProxy
 	logicalDeviceMgr.lockLogicalDeviceAgentsMap = sync.RWMutex{}
+	logicalDeviceMgr.defaultTimeout = timeout
 	return &logicalDeviceMgr
 }
 
@@ -137,6 +139,7 @@
 					ldMgr,
 					ldMgr.deviceMgr,
 					ldMgr.clusterDataProxy,
+					ldMgr.defaultTimeout,
 				)
 				ldMgr.addLogicalDeviceAgentToMap(agent)
 				go agent.start(nil, true)
@@ -151,7 +154,7 @@
 	log.Debugw("creating-logical-device", log.Fields{"deviceId": device.Id})
 	// Sanity check
 	if !device.Root {
-		return nil, errors.New("Device-not-root")
+		return nil, errors.New("device-not-root")
 	}
 
 	// Create a logical device agent - the logical device Id is based on the mac address of the device
@@ -166,13 +169,10 @@
 	}
 	log.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
 
-	agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
+	agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
 	ldMgr.addLogicalDeviceAgentToMap(agent)
 	go agent.start(ctx, false)
 
-	//// Set device ownership
-	//ldMgr.core.deviceOwnership.OwnedByMe(id)
-
 	log.Debug("creating-logical-device-ends")
 	return &id, nil
 }
@@ -186,7 +186,7 @@
 	defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
 	if ldAgent, _ := ldMgr.logicalDeviceAgents[lDeviceId]; ldAgent == nil {
 		// Logical device not in memory - create a temp logical device Agent and let it load from memory
-		agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
+		agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
 		if err := agent.start(nil, true); err != nil {
 			//agent.stop(nil)
 			return err
@@ -201,7 +201,7 @@
 	log.Debugw("deleting-logical-device", log.Fields{"deviceId": device.Id})
 	// Sanity check
 	if !device.Root {
-		return errors.New("Device-not-root")
+		return errors.New("device-not-root")
 	}
 	logDeviceId := device.ParentId
 	if agent := ldMgr.getLogicalDeviceAgent(logDeviceId); agent != nil {
@@ -275,6 +275,23 @@
 	return nil, status.Errorf(codes.NotFound, "%s-$s", lPortId.Id, lPortId.PortId)
 }
 
+// updateLogicalPort sets up a logical port on the logical device based on the device port
+// information, if needed
+func (ldMgr *LogicalDeviceManager) updateLogicalPort(device *voltha.Device, port *voltha.Port) error {
+	if ldID, err := ldMgr.getLogicalDeviceId(device); err != nil || *ldID == "" {
+		// This is not an error as the logical device may not have been created at this time.  In such a case,
+		// the ports will be created when the logical device is ready.
+		return nil
+	} else {
+		if agent := ldMgr.getLogicalDeviceAgent(*ldID); agent != nil {
+			if err := agent.updateLogicalPort(device, port); err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
 // addLogicalPort sets up a logical port on the logical device based on the device port
 // information.
 func (ldMgr *LogicalDeviceManager) addLogicalPort(device *voltha.Device, port *voltha.Port) error {
@@ -304,7 +321,7 @@
 	}
 	// Sanity check
 	if logicalPort.RootPort {
-		return errors.New("Device-root")
+		return errors.New("device-root")
 	}
 	if agent := ldMgr.getLogicalDeviceAgent(lPortId.Id); agent != nil {
 		agent.deleteLogicalPort(logicalPort)
@@ -328,15 +345,13 @@
 	log.Debugw("setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
 
 	if parentId == "" || logDeviceId == nil {
-		return errors.New("Device-in-invalid-state")
+		return errors.New("device-in-invalid-state")
 	}
 
 	if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
 		if err := agent.setupUNILogicalPorts(ctx, childDevice); err != nil {
 			return err
 		}
-		// Update the device routes - let it run in its own go routine as it can take time
-		//go agent.updateRoutes()
 	}
 	return nil
 }
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 12bf93e..711f3b5 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -35,7 +35,7 @@
 package core
 
 import (
-	log "github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
 	"time"
 )
diff --git a/rw_core/coreIf/logical_device_agent_if.go b/rw_core/coreIf/logical_device_agent_if.go
index 8394fac..c2614b2 100644
--- a/rw_core/coreIf/logical_device_agent_if.go
+++ b/rw_core/coreIf/logical_device_agent_if.go
@@ -20,9 +20,9 @@
 package coreIf
 
 import (
-	"github.com/opencord/voltha-protos/go/voltha"
 	"github.com/opencord/voltha-go/rw_core/graph"
 	"github.com/opencord/voltha-go/rw_core/utils"
+	"github.com/opencord/voltha-protos/go/voltha"
 )
 
 // LogicalAgent represents a generic agent
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index ec2904f..980420a 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -22,11 +22,11 @@
 	"fmt"
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
-	ofp "github.com/opencord/voltha-protos/go/openflow_13"
-	"github.com/opencord/voltha-protos/go/voltha"
 	"github.com/opencord/voltha-go/rw_core/coreIf"
 	"github.com/opencord/voltha-go/rw_core/graph"
 	fu "github.com/opencord/voltha-go/rw_core/utils"
+	ofp "github.com/opencord/voltha-protos/go/openflow_13"
+	"github.com/opencord/voltha-protos/go/voltha"
 	"math/big"
 )
 
@@ -751,9 +751,10 @@
 }
 
 //DecomposeRules decomposes per-device flows and flow-groups from the flows and groups defined on a logical device
-func (fd *FlowDecomposer) DecomposeRules(agent coreIf.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups) *fu.DeviceRules {
+func (fd *FlowDecomposer) DecomposeRules(agent coreIf.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups, includeDefaultFlows bool) *fu.DeviceRules {
 	rules := agent.GetAllDefaultRules()
 	deviceRules := rules.Copy()
+	devicesToUpdate := make(map[string]string)
 
 	groupMap := make(map[uint32]*ofp.OfpGroupEntry)
 	for _, groupEntry := range groups.Items {
@@ -766,9 +767,15 @@
 		for deviceId, flowAndGroups := range decomposedRules.Rules {
 			deviceRules.CreateEntryIfNotExist(deviceId)
 			deviceRules.Rules[deviceId].AddFrom(flowAndGroups)
+			devicesToUpdate[deviceId] = deviceId
 		}
 	}
-	return deviceRules
+	if includeDefaultFlows {
+		return deviceRules
+	}
+	updatedDeviceRules := deviceRules.FilterRules(devicesToUpdate)
+
+	return updatedDeviceRules
 }
 
 // Handles special case of any controller-bound flow for a parent device
@@ -804,6 +811,7 @@
 			}
 		}
 	}
+
 	return newDeviceRules
 }
 
@@ -869,6 +877,7 @@
 		}
 	}
 	deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
+
 	return deviceRules
 }
 
@@ -1052,6 +1061,7 @@
 		fg.AddFlow(MkFlowStat(fa))
 		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
 	}
+
 	return deviceRules
 }
 
@@ -1230,10 +1240,9 @@
 
 	inPortNo := GetInPort(flow)
 	outPortNo := GetOutPort(flow)
-
 	deviceRules := fu.NewDeviceRules()
-
 	route := agent.GetRoute(inPortNo, outPortNo)
+
 	switch len(route) {
 	case 0:
 		log.Errorw("no-route", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "comment": "deleting-flow"})
diff --git a/rw_core/flow_decomposition/flow_decomposer_test.go b/rw_core/flow_decomposition/flow_decomposer_test.go
index d27fd21..e5c4bbd 100644
--- a/rw_core/flow_decomposition/flow_decomposer_test.go
+++ b/rw_core/flow_decomposition/flow_decomposer_test.go
@@ -18,10 +18,10 @@
 import (
 	"errors"
 	"github.com/opencord/voltha-go/common/log"
-	ofp "github.com/opencord/voltha-protos/go/openflow_13"
-	"github.com/opencord/voltha-protos/go/voltha"
 	"github.com/opencord/voltha-go/rw_core/graph"
 	fu "github.com/opencord/voltha-go/rw_core/utils"
+	ofp "github.com/opencord/voltha-protos/go/openflow_13"
+	"github.com/opencord/voltha-protos/go/voltha"
 	"github.com/stretchr/testify/assert"
 
 	"testing"
@@ -446,7 +446,7 @@
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
@@ -526,7 +526,7 @@
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
@@ -622,7 +622,7 @@
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())
@@ -697,7 +697,7 @@
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())
@@ -770,7 +770,7 @@
 	groups := ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{MkGroupStat(ga)}}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())
diff --git a/rw_core/graph/device_graph.go b/rw_core/graph/device_graph.go
index 376df16..5583023 100644
--- a/rw_core/graph/device_graph.go
+++ b/rw_core/graph/device_graph.go
@@ -139,7 +139,7 @@
 	// Build the graph
 	var device *voltha.Device
 	for _, logicalPort := range dg.logicalPorts {
-		device, _ = dg.getDevice(logicalPort.DeviceId)
+		device, _ = dg.getDevice(logicalPort.DeviceId, false)
 		dg.GGraph = dg.addDevice(device, dg.GGraph, &dg.devicesAdded, &dg.portsAdded, dg.boundaryPorts)
 	}
 
@@ -148,6 +148,7 @@
 
 // AddPort adds a port to the graph.  If the graph is empty it will just invoke ComputeRoutes function
 func (dg *DeviceGraph) AddPort(lp *voltha.LogicalPort) {
+	log.Debugw("Addport", log.Fields{"logicalPort": lp})
 	//  If the graph does not exist invoke ComputeRoutes.
 	if len(dg.boundaryPorts) == 0 {
 		dg.ComputeRoutes([]*voltha.LogicalPort{lp})
@@ -161,12 +162,14 @@
 
 	//	If the port is already part of the boundary ports, do nothing
 	if dg.portExist(portId) {
-		fmt.Println("port exists")
 		return
 	}
+	// Add the port to the set of boundary ports
+	dg.boundaryPorts[portId] = lp.OfpPort.PortNo
+
 	// Add the device where this port is located to the device graph. If the device is already added then
 	// only the missing port will be added
-	device, _ := dg.getDevice(lp.DeviceId)
+	device, _ := dg.getDevice(lp.DeviceId, false)
 	dg.GGraph = dg.addDevice(device, dg.GGraph, &dg.devicesAdded, &dg.portsAdded, dg.boundaryPorts)
 
 	if lp.RootPort {
@@ -184,6 +187,7 @@
 }
 
 func (dg *DeviceGraph) Print() error {
+	log.Debugw("Print", log.Fields{"graph": dg.logicalDeviceId, "boundaryPorts": dg.boundaryPorts})
 	if level, err := log.GetPackageLogLevel(); err == nil && level == log.DebugLevel {
 		output := ""
 		routeNumber := 1
@@ -197,7 +201,11 @@
 			output += fmt.Sprintf("%d:{%s=>%s}   ", routeNumber, key, fmt.Sprintf("[%s]", val))
 			routeNumber += 1
 		}
-		log.Debugw("graph_routes", log.Fields{"lDeviceId": dg.logicalDeviceId, "Routes": output})
+		if len(dg.Routes) == 0 {
+			log.Debugw("no-routes-found", log.Fields{"lDeviceId": dg.logicalDeviceId, "Graph": dg.GGraph.String()})
+		} else {
+			log.Debugw("graph_routes", log.Fields{"lDeviceId": dg.logicalDeviceId, "Routes": output})
+		}
 	}
 	return nil
 }
@@ -205,14 +213,16 @@
 //getDevice returns the device either from the local cache (default) or from the model.
 //TODO: Set a cache timeout such that we do not use invalid data.  The full device lifecycle should also
 //be taken in consideration
-func (dg *DeviceGraph) getDevice(id string) (*voltha.Device, error) {
-	dg.cachedDevicesLock.RLock()
-	if d, exist := dg.cachedDevices[id]; exist {
+func (dg *DeviceGraph) getDevice(id string, useCache bool) (*voltha.Device, error) {
+	if useCache {
+		dg.cachedDevicesLock.RLock()
+		if d, exist := dg.cachedDevices[id]; exist {
+			dg.cachedDevicesLock.RUnlock()
+			//log.Debugw("getDevice - returned from cache", log.Fields{"deviceId": id})
+			return d, nil
+		}
 		dg.cachedDevicesLock.RUnlock()
-		//log.Debugw("getDevice - returned from cache", log.Fields{"deviceId": id})
-		return d, nil
 	}
-	dg.cachedDevicesLock.RUnlock()
 	//	Not cached
 	if d, err := dg.getDeviceFromModel(id); err != nil {
 		log.Errorw("device-not-found", log.Fields{"deviceId": id, "error": err})
@@ -251,13 +261,13 @@
 		}
 		for _, peer := range port.Peers {
 			if _, exist := (*devicesAdded)[peer.DeviceId]; !exist {
-				d, _ := dg.getDevice(peer.DeviceId)
+				d, _ := dg.getDevice(peer.DeviceId, true)
 				g = dg.addDevice(d, g, devicesAdded, portsAdded, boundaryPorts)
-			} else {
-				peerPortId = concatDeviceIdPortId(peer.DeviceId, peer.PortNo)
-				g.AddEdge(goraph.StringID(portId), goraph.StringID(peerPortId), 1)
-				g.AddEdge(goraph.StringID(peerPortId), goraph.StringID(portId), 1)
 			}
+			peerPortId = concatDeviceIdPortId(peer.DeviceId, peer.PortNo)
+			g.AddEdge(goraph.StringID(portId), goraph.StringID(peerPortId), 1)
+			g.AddEdge(goraph.StringID(peerPortId), goraph.StringID(portId), 1)
+
 		}
 	}
 	return g
diff --git a/rw_core/main.go b/rw_core/main.go
index 5db4078..084e339 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -23,9 +23,9 @@
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
 	"github.com/opencord/voltha-go/kafka"
-	ic "github.com/opencord/voltha-protos/go/inter_container"
 	"github.com/opencord/voltha-go/rw_core/config"
 	c "github.com/opencord/voltha-go/rw_core/core"
+	ic "github.com/opencord/voltha-protos/go/inter_container"
 	"os"
 	"os/signal"
 	"strconv"
@@ -216,7 +216,7 @@
 	cf := config.NewRWCoreFlags()
 	cf.ParseCommandArguments()
 
-	//// Setup logging
+	// Setup logging
 
 	//Setup default logger - applies for packages that do not have specific logger set
 	if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
@@ -228,9 +228,11 @@
 		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
 	}
 
+	//log.SetAllLogLevel(log.ErrorLevel)
+
 	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
-	//log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/flow_decomposition", log.DebugLevel)
-	//log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/graph", log.DebugLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/flow_decomposition", log.DebugLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/graph", log.DebugLevel)
 	//log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
 	//log.SetPackageLogLevel("github.com/opencord/voltha-go/db/model", log.DebugLevel)
 
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index 1e1ed9f..cf77d59 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -15,6 +15,13 @@
  */
 package utils
 
+import (
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"reflect"
+	"time"
+)
+
 type DeviceID struct {
 	Id string
 }
@@ -22,3 +29,61 @@
 type LogicalDeviceID struct {
 	Id string
 }
+
+//WaitForNilOrErrorResponses waits on a variadic number of channels for either a nil response or an error
+//response. If an error is received from a given channel then the returned error array will contain that error.
+//The error will be at the index corresponding to the order in which the channel appear in the parameter list.
+//If no errors is found then nil is returned.  This method also takes in a timeout in milliseconds. If a
+//timeout is obtained then this function will stop waiting for the remaining responses and abort.
+func WaitForNilOrErrorResponses(timeout int64, chnls ...chan interface{}) []error {
+	// Create a timeout channel
+	tChnl := make(chan *interface{})
+	go func() {
+		time.Sleep(time.Duration(timeout) * time.Millisecond)
+		tChnl <- nil
+	}()
+
+	errorsReceived := false
+	errors := make([]error, len(chnls))
+	cases := make([]reflect.SelectCase, len(chnls)+1)
+	for i, ch := range chnls {
+		cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
+	}
+	// Add the timeout channel
+	cases[len(chnls)] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(tChnl)}
+
+	resultsReceived := make([]bool, len(errors)+1)
+	remaining := len(cases) - 1
+	for remaining > 0 {
+		index, value, ok := reflect.Select(cases)
+		if !ok { // closed channel
+			//Set the channel at that index to nil to disable this case, hence preventing it from interfering with other cases.
+			cases[index].Chan = reflect.ValueOf(nil)
+			errors[index] = status.Errorf(codes.Internal, "channel closed")
+			errorsReceived = true
+		} else if index == len(chnls) { // Timeout has occurred
+			for k := range errors {
+				if !resultsReceived[k] {
+					errors[k] = status.Errorf(codes.Aborted, "timeout")
+				}
+			}
+			errorsReceived = true
+			break
+		} else if value.IsNil() { // Nil means a good response
+			//do nothing
+		} else if err, ok := value.Interface().(error); ok { // error returned
+			errors[index] = err
+			errorsReceived = true
+		} else { // unknown value
+			errors[index] = status.Errorf(codes.Internal, "%s", value)
+			errorsReceived = true
+		}
+		resultsReceived[index] = true
+		remaining -= 1
+	}
+
+	if errorsReceived {
+		return errors
+	}
+	return nil
+}
diff --git a/rw_core/utils/flow_utils.go b/rw_core/utils/flow_utils.go
index 10be81a..0c485bb 100644
--- a/rw_core/utils/flow_utils.go
+++ b/rw_core/utils/flow_utils.go
@@ -172,7 +172,9 @@
 func (dr *DeviceRules) Copy() *DeviceRules {
 	copyDR := NewDeviceRules()
 	for key, val := range dr.Rules {
-		copyDR.Rules[key] = val.Copy()
+		if val != nil {
+			copyDR.Rules[key] = val.Copy()
+		}
 	}
 	return copyDR
 }
@@ -183,6 +185,16 @@
 	}
 }
 
+func (dr *DeviceRules) FilterRules(deviceIds map[string]string) *DeviceRules {
+	filteredDR := NewDeviceRules()
+	for key, val := range dr.Rules {
+		if _, exist := deviceIds[key]; exist {
+			filteredDR.Rules[key] = val.Copy()
+		}
+	}
+	return filteredDR
+}
+
 func (dr *DeviceRules) AddFlow(deviceId string, flow *ofp.OfpFlowStats) {
 	if _, exist := dr.Rules[deviceId]; !exist {
 		dr.Rules[deviceId] = NewFlowsAndGroups()